fix(mcp): normalize audit session ids
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 58s
CD Pipeline / build-and-deploy (push) Successful in 4m39s
CD Pipeline / post-deploy-checks (push) Successful in 1m17s

This commit is contained in:
Your Name
2026-05-06 17:40:42 +08:00
parent 9b0f55fd90
commit 0e2e856f12
5 changed files with 174 additions and 2 deletions

View File

@@ -7,8 +7,61 @@ observable without changing execution semantics.
from __future__ import annotations
import hashlib
from typing import Any
MAX_MCP_AUDIT_SESSION_ID_LENGTH = 36
_STAGE_ALIASES = {
"pre_decision": "pre",
"post_execution": "post",
}
def _digest(value: str) -> str:
return hashlib.sha1(value.encode("utf-8", errors="ignore")).hexdigest()[:8]
def _compact_with_hash(prefix: str, stable_part: str, raw: str) -> str:
safe_prefix = "".join(
char for char in prefix if char.isalnum() or char in "-_"
)[:8] or "mcp"
digest = _digest(raw)
head_limit = (
MAX_MCP_AUDIT_SESSION_ID_LENGTH
- len(safe_prefix)
- len(digest)
- 2
)
head = str(stable_part)[:max(1, head_limit)]
compacted = f"{safe_prefix}:{head}:{digest}"
return compacted[:MAX_MCP_AUDIT_SESSION_ID_LENGTH]
def normalize_mcp_audit_session_id(session_id: Any | None) -> str | None:
"""Normalize MCP audit session IDs to the legacy DB column length."""
if session_id is None:
return None
raw = str(session_id)
if len(raw) <= MAX_MCP_AUDIT_SESSION_ID_LENGTH:
return raw
parts = raw.split(":")
if len(parts) >= 3 and parts[0] == "incident":
stage = _STAGE_ALIASES.get(parts[-1], parts[-1][:6])
candidate = f"inc:{parts[1]}:{stage}"
if len(candidate) <= MAX_MCP_AUDIT_SESSION_ID_LENGTH:
return candidate
if len(parts) >= 3 and parts[0] == "callback":
return _compact_with_hash("cb", parts[1], raw)
if len(parts) >= 2 and parts[0] == "approval":
return _compact_with_hash("apr", parts[1], raw)
if len(parts) >= 2 and parts[0] == "mcp_bridge":
return _compact_with_hash("bridge", parts[1], raw)
return _compact_with_hash(parts[0] if parts else "mcp", raw, raw)
def build_mcp_audit_context(
*,
@@ -25,7 +78,7 @@ def build_mcp_audit_context(
"gateway_path": gateway_path,
}
optional_values = {
"session_id": session_id,
"session_id": normalize_mcp_audit_session_id(session_id),
"incident_id": incident_id,
"flywheel_node": flywheel_node,
"agent_role": agent_role,

View File

@@ -17,6 +17,7 @@ import structlog
from sqlalchemy import text
from src.db.base import get_db_context
from src.services.mcp_audit_context import normalize_mcp_audit_session_id
logger = structlog.get_logger(__name__)
@@ -95,7 +96,9 @@ async def record_mcp_call(
"""Persist one MCP tool call and update daily aggregate stats."""
audit_context = _extract_audit_context(input_params)
session_id = session_id or audit_context.get("session_id") or str(uuid.uuid4())
session_id = normalize_mcp_audit_session_id(
session_id or audit_context.get("session_id") or str(uuid.uuid4())
)
flywheel_node = flywheel_node or infer_flywheel_node(mcp_server, tool_name)
incident_id = incident_id or _extract_incident_id(input_params)
agent_role = agent_role or audit_context.get("agent_role")

View File

@@ -1,7 +1,9 @@
from __future__ import annotations
from src.services.mcp_audit_context import (
MAX_MCP_AUDIT_SESSION_ID_LENGTH,
build_mcp_audit_context,
normalize_mcp_audit_session_id,
with_mcp_audit_context,
)
@@ -24,6 +26,33 @@ def test_build_mcp_audit_context_keeps_non_empty_fields() -> None:
}
def test_normalize_mcp_audit_session_id_compacts_known_long_patterns() -> None:
values = [
(
"incident:INC-20260505-E8033A:pre_decision",
"inc:INC-20260505-E8033A:pre",
),
(
"incident:INC-20260505-E8033A:post_execution",
"inc:INC-20260505-E8033A:post",
),
(
"callback:INC-20260505-E8033A:check_process",
"cb:INC-20260505-E8033A",
),
(
"approval:123e4567-e89b-12d3-a456-426614174000",
"apr:123e4567",
),
]
for raw, expected_prefix in values:
normalized = normalize_mcp_audit_session_id(raw)
assert normalized is not None
assert len(normalized) <= MAX_MCP_AUDIT_SESSION_ID_LENGTH
assert normalized.startswith(expected_prefix)
def test_with_mcp_audit_context_merges_existing_context_without_mutating_source() -> None:
params = {
"namespace": "awoooi-prod",

View File

@@ -0,0 +1,55 @@
from __future__ import annotations
from typing import Any
import pytest
from src.services import mcp_audit_service
class _FakeDb:
def __init__(self) -> None:
self.executed_params: list[dict[str, Any]] = []
async def execute(self, _statement: Any, params: dict[str, Any]) -> None:
self.executed_params.append(params)
class _FakeDbContext:
def __init__(self, db: _FakeDb) -> None:
self.db = db
async def __aenter__(self) -> _FakeDb:
return self.db
async def __aexit__(self, *_args: Any) -> None:
return None
@pytest.mark.asyncio
async def test_record_mcp_call_normalizes_long_session_id(monkeypatch) -> None:
db = _FakeDb()
monkeypatch.setattr(
mcp_audit_service,
"get_db_context",
lambda: _FakeDbContext(db),
)
await mcp_audit_service.record_mcp_call(
mcp_server="k8s_provider",
tool_name="get_pods",
input_params={
"_mcp_audit": {
"session_id": "incident:INC-20260505-E8033A:pre_decision",
"agent_role": "pre_decision_investigator",
},
},
output_result={"ok": True},
duration_ms=12,
success=True,
error_message=None,
)
audit_insert_params = db.executed_params[0]
assert audit_insert_params["session_id"] == "inc:INC-20260505-E8033A:pre"
assert len(audit_insert_params["session_id"]) <= 36

View File

@@ -3931,3 +3931,35 @@ ruff check apps/api/src/jobs/km_backfill_reconciler_job.py apps/api/tests/test_k
### 影響
- KM / PlayBook / RAG 飛輪的 backfill 補救鏈恢復可持續執行,避免 DLQ 堆積後造成知識庫關聯缺口。
---
## 2026-05-06台北— MCP audit session_id 長度止血
**觸發**production log 出現多筆 `mcp_audit_write_failed`,錯誤為 `value too long for type character varying(36)`。根因是 legacy MCP caller 新增 `_mcp_audit.session_id` 後,像 `incident:INC-20260505-E8033A:pre_decision` 這類可讀 session id 超過既有 `mcp_audit_log.session_id` 欄位長度,導致 MCP 稽核落地失敗。
### 已修正
| 範圍 | 結果 |
|------|------|
| `mcp_audit_context.py` | 新增 `normalize_mcp_audit_session_id()`,針對 incident / callback / approval / mcp_bridge 等已知格式壓縮到 36 字元以內,同時保留 incident 可讀性與 hash 去重 |
| `mcp_audit_service.py` | 在真正寫入 `mcp_audit_log` 前再做一次 session_id 正規化,補住手動 `_mcp_audit` 呼叫路徑 |
| 測試 | 新增 helper 與 `record_mcp_call()` 回歸測試,驗證 live incident 型 session id 不會再超出 DB 欄位 |
### 驗證
```text
pytest apps/api/tests/test_mcp_audit_context.py apps/api/tests/test_mcp_audit_service.py apps/api/tests/test_pre_decision_investigator.py::TestCollectOne::test_collect_one_injects_mcp_audit_context apps/api/tests/test_post_execution_verifier.py::TestCollectPostStateAuditContext::test_collect_post_state_injects_mcp_audit_context apps/api/tests/test_callback_dispatcher.py::test_dispatch_action_injects_mcp_audit_context apps/api/tests/test_platform_router_order.py
# 8 passed
py_compile apps/api/src/services/mcp_audit_context.py apps/api/src/services/mcp_audit_service.py apps/api/tests/test_mcp_audit_context.py apps/api/tests/test_mcp_audit_service.py
# 通過
ruff check --select F401,F821,I001 apps/api/src/services/mcp_audit_context.py apps/api/src/services/mcp_audit_service.py apps/api/tests/test_mcp_audit_context.py apps/api/tests/test_mcp_audit_service.py
# All checks passed
```
### 注意
- 這次先做 runtime 止血,避免 AwoooP / AI 飛輪的 MCP audit 盲點擴大。
- 後續仍建議用正式 migration 將 `mcp_audit_log.session_id` 放寬為 `varchar(128)``text`,讓 trace / run / session 語義可以完整保留。