fix(mcp): normalize audit session ids
This commit is contained in:
@@ -7,8 +7,61 @@ observable without changing execution semantics.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
from typing import Any
|
||||
|
||||
MAX_MCP_AUDIT_SESSION_ID_LENGTH = 36
|
||||
|
||||
_STAGE_ALIASES = {
|
||||
"pre_decision": "pre",
|
||||
"post_execution": "post",
|
||||
}
|
||||
|
||||
|
||||
def _digest(value: str) -> str:
|
||||
return hashlib.sha1(value.encode("utf-8", errors="ignore")).hexdigest()[:8]
|
||||
|
||||
|
||||
def _compact_with_hash(prefix: str, stable_part: str, raw: str) -> str:
|
||||
safe_prefix = "".join(
|
||||
char for char in prefix if char.isalnum() or char in "-_"
|
||||
)[:8] or "mcp"
|
||||
digest = _digest(raw)
|
||||
head_limit = (
|
||||
MAX_MCP_AUDIT_SESSION_ID_LENGTH
|
||||
- len(safe_prefix)
|
||||
- len(digest)
|
||||
- 2
|
||||
)
|
||||
head = str(stable_part)[:max(1, head_limit)]
|
||||
compacted = f"{safe_prefix}:{head}:{digest}"
|
||||
return compacted[:MAX_MCP_AUDIT_SESSION_ID_LENGTH]
|
||||
|
||||
|
||||
def normalize_mcp_audit_session_id(session_id: Any | None) -> str | None:
|
||||
"""Normalize MCP audit session IDs to the legacy DB column length."""
|
||||
|
||||
if session_id is None:
|
||||
return None
|
||||
raw = str(session_id)
|
||||
if len(raw) <= MAX_MCP_AUDIT_SESSION_ID_LENGTH:
|
||||
return raw
|
||||
|
||||
parts = raw.split(":")
|
||||
if len(parts) >= 3 and parts[0] == "incident":
|
||||
stage = _STAGE_ALIASES.get(parts[-1], parts[-1][:6])
|
||||
candidate = f"inc:{parts[1]}:{stage}"
|
||||
if len(candidate) <= MAX_MCP_AUDIT_SESSION_ID_LENGTH:
|
||||
return candidate
|
||||
if len(parts) >= 3 and parts[0] == "callback":
|
||||
return _compact_with_hash("cb", parts[1], raw)
|
||||
if len(parts) >= 2 and parts[0] == "approval":
|
||||
return _compact_with_hash("apr", parts[1], raw)
|
||||
if len(parts) >= 2 and parts[0] == "mcp_bridge":
|
||||
return _compact_with_hash("bridge", parts[1], raw)
|
||||
|
||||
return _compact_with_hash(parts[0] if parts else "mcp", raw, raw)
|
||||
|
||||
|
||||
def build_mcp_audit_context(
|
||||
*,
|
||||
@@ -25,7 +78,7 @@ def build_mcp_audit_context(
|
||||
"gateway_path": gateway_path,
|
||||
}
|
||||
optional_values = {
|
||||
"session_id": session_id,
|
||||
"session_id": normalize_mcp_audit_session_id(session_id),
|
||||
"incident_id": incident_id,
|
||||
"flywheel_node": flywheel_node,
|
||||
"agent_role": agent_role,
|
||||
|
||||
@@ -17,6 +17,7 @@ import structlog
|
||||
from sqlalchemy import text
|
||||
|
||||
from src.db.base import get_db_context
|
||||
from src.services.mcp_audit_context import normalize_mcp_audit_session_id
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
@@ -95,7 +96,9 @@ async def record_mcp_call(
|
||||
"""Persist one MCP tool call and update daily aggregate stats."""
|
||||
|
||||
audit_context = _extract_audit_context(input_params)
|
||||
session_id = session_id or audit_context.get("session_id") or str(uuid.uuid4())
|
||||
session_id = normalize_mcp_audit_session_id(
|
||||
session_id or audit_context.get("session_id") or str(uuid.uuid4())
|
||||
)
|
||||
flywheel_node = flywheel_node or infer_flywheel_node(mcp_server, tool_name)
|
||||
incident_id = incident_id or _extract_incident_id(input_params)
|
||||
agent_role = agent_role or audit_context.get("agent_role")
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from src.services.mcp_audit_context import (
|
||||
MAX_MCP_AUDIT_SESSION_ID_LENGTH,
|
||||
build_mcp_audit_context,
|
||||
normalize_mcp_audit_session_id,
|
||||
with_mcp_audit_context,
|
||||
)
|
||||
|
||||
@@ -24,6 +26,33 @@ def test_build_mcp_audit_context_keeps_non_empty_fields() -> None:
|
||||
}
|
||||
|
||||
|
||||
def test_normalize_mcp_audit_session_id_compacts_known_long_patterns() -> None:
|
||||
values = [
|
||||
(
|
||||
"incident:INC-20260505-E8033A:pre_decision",
|
||||
"inc:INC-20260505-E8033A:pre",
|
||||
),
|
||||
(
|
||||
"incident:INC-20260505-E8033A:post_execution",
|
||||
"inc:INC-20260505-E8033A:post",
|
||||
),
|
||||
(
|
||||
"callback:INC-20260505-E8033A:check_process",
|
||||
"cb:INC-20260505-E8033A",
|
||||
),
|
||||
(
|
||||
"approval:123e4567-e89b-12d3-a456-426614174000",
|
||||
"apr:123e4567",
|
||||
),
|
||||
]
|
||||
|
||||
for raw, expected_prefix in values:
|
||||
normalized = normalize_mcp_audit_session_id(raw)
|
||||
assert normalized is not None
|
||||
assert len(normalized) <= MAX_MCP_AUDIT_SESSION_ID_LENGTH
|
||||
assert normalized.startswith(expected_prefix)
|
||||
|
||||
|
||||
def test_with_mcp_audit_context_merges_existing_context_without_mutating_source() -> None:
|
||||
params = {
|
||||
"namespace": "awoooi-prod",
|
||||
|
||||
55
apps/api/tests/test_mcp_audit_service.py
Normal file
55
apps/api/tests/test_mcp_audit_service.py
Normal file
@@ -0,0 +1,55 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from src.services import mcp_audit_service
|
||||
|
||||
|
||||
class _FakeDb:
|
||||
def __init__(self) -> None:
|
||||
self.executed_params: list[dict[str, Any]] = []
|
||||
|
||||
async def execute(self, _statement: Any, params: dict[str, Any]) -> None:
|
||||
self.executed_params.append(params)
|
||||
|
||||
|
||||
class _FakeDbContext:
|
||||
def __init__(self, db: _FakeDb) -> None:
|
||||
self.db = db
|
||||
|
||||
async def __aenter__(self) -> _FakeDb:
|
||||
return self.db
|
||||
|
||||
async def __aexit__(self, *_args: Any) -> None:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_record_mcp_call_normalizes_long_session_id(monkeypatch) -> None:
|
||||
db = _FakeDb()
|
||||
monkeypatch.setattr(
|
||||
mcp_audit_service,
|
||||
"get_db_context",
|
||||
lambda: _FakeDbContext(db),
|
||||
)
|
||||
|
||||
await mcp_audit_service.record_mcp_call(
|
||||
mcp_server="k8s_provider",
|
||||
tool_name="get_pods",
|
||||
input_params={
|
||||
"_mcp_audit": {
|
||||
"session_id": "incident:INC-20260505-E8033A:pre_decision",
|
||||
"agent_role": "pre_decision_investigator",
|
||||
},
|
||||
},
|
||||
output_result={"ok": True},
|
||||
duration_ms=12,
|
||||
success=True,
|
||||
error_message=None,
|
||||
)
|
||||
|
||||
audit_insert_params = db.executed_params[0]
|
||||
assert audit_insert_params["session_id"] == "inc:INC-20260505-E8033A:pre"
|
||||
assert len(audit_insert_params["session_id"]) <= 36
|
||||
@@ -3931,3 +3931,35 @@ ruff check apps/api/src/jobs/km_backfill_reconciler_job.py apps/api/tests/test_k
|
||||
### 影響
|
||||
|
||||
- KM / PlayBook / RAG 飛輪的 backfill 補救鏈恢復可持續執行,避免 DLQ 堆積後造成知識庫關聯缺口。
|
||||
|
||||
---
|
||||
|
||||
## 2026-05-06(台北)— MCP audit session_id 長度止血
|
||||
|
||||
**觸發**:production log 出現多筆 `mcp_audit_write_failed`,錯誤為 `value too long for type character varying(36)`。根因是 legacy MCP caller 新增 `_mcp_audit.session_id` 後,像 `incident:INC-20260505-E8033A:pre_decision` 這類可讀 session id 超過既有 `mcp_audit_log.session_id` 欄位長度,導致 MCP 稽核落地失敗。
|
||||
|
||||
### 已修正
|
||||
|
||||
| 範圍 | 結果 |
|
||||
|------|------|
|
||||
| `mcp_audit_context.py` | 新增 `normalize_mcp_audit_session_id()`,針對 incident / callback / approval / mcp_bridge 等已知格式壓縮到 36 字元以內,同時保留 incident 可讀性與 hash 去重 |
|
||||
| `mcp_audit_service.py` | 在真正寫入 `mcp_audit_log` 前再做一次 session_id 正規化,補住手動 `_mcp_audit` 呼叫路徑 |
|
||||
| 測試 | 新增 helper 與 `record_mcp_call()` 回歸測試,驗證 live incident 型 session id 不會再超出 DB 欄位 |
|
||||
|
||||
### 驗證
|
||||
|
||||
```text
|
||||
pytest apps/api/tests/test_mcp_audit_context.py apps/api/tests/test_mcp_audit_service.py apps/api/tests/test_pre_decision_investigator.py::TestCollectOne::test_collect_one_injects_mcp_audit_context apps/api/tests/test_post_execution_verifier.py::TestCollectPostStateAuditContext::test_collect_post_state_injects_mcp_audit_context apps/api/tests/test_callback_dispatcher.py::test_dispatch_action_injects_mcp_audit_context apps/api/tests/test_platform_router_order.py
|
||||
# 8 passed
|
||||
|
||||
py_compile apps/api/src/services/mcp_audit_context.py apps/api/src/services/mcp_audit_service.py apps/api/tests/test_mcp_audit_context.py apps/api/tests/test_mcp_audit_service.py
|
||||
# 通過
|
||||
|
||||
ruff check --select F401,F821,I001 apps/api/src/services/mcp_audit_context.py apps/api/src/services/mcp_audit_service.py apps/api/tests/test_mcp_audit_context.py apps/api/tests/test_mcp_audit_service.py
|
||||
# All checks passed
|
||||
```
|
||||
|
||||
### 注意
|
||||
|
||||
- 這次先做 runtime 止血,避免 AwoooP / AI 飛輪的 MCP audit 盲點擴大。
|
||||
- 後續仍建議用正式 migration 將 `mcp_audit_log.session_id` 放寬為 `varchar(128)` 或 `text`,讓 trace / run / session 語義可以完整保留。
|
||||
|
||||
Reference in New Issue
Block a user