feat(awooop): bridge legacy mcp audit into gateway timeline
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m4s
CD Pipeline / build-and-deploy (push) Successful in 3m22s
CD Pipeline / post-deploy-checks (push) Successful in 1m14s

This commit is contained in:
Your Name
2026-05-12 23:44:19 +08:00
parent 96a8cf3ad5
commit 94d006eac8
5 changed files with 286 additions and 2 deletions

View File

@@ -412,6 +412,7 @@ class McpGateway:
"agent_id": ctx.agent_id,
"run_id": str(ctx.run_id) if ctx.run_id else None,
"trace_id": ctx.trace_id,
"gateway_path": "awooop_mcp_gateway",
}
return await provider.execute(ctx.tool_name, audit_params)

View File

@@ -9,6 +9,7 @@ before every runtime model has been refreshed.
from __future__ import annotations
import json
import hashlib
import time
import uuid
from typing import Any
@@ -22,6 +23,8 @@ from src.services.mcp_audit_context import normalize_mcp_audit_session_id
logger = structlog.get_logger(__name__)
_REDACT_KEYS = {"token", "password", "secret", "api_key", "authorization", "key"}
_LEGACY_PROJECT_ID = "awoooi"
_REAL_GATEWAY_PATH = "awooop_mcp_gateway"
def infer_flywheel_node(mcp_server: str, tool_name: str) -> str:
@@ -79,6 +82,109 @@ def _extract_audit_context(parameters: dict[str, Any]) -> dict[str, Any]:
return audit_context if isinstance(audit_context, dict) else {}
def _extract_project_id(
parameters: dict[str, Any],
audit_context: dict[str, Any],
) -> str:
"""Legacy MCP calls predate AwoooP tenancy; keep the bridge explicit."""
for value in (audit_context.get("project_id"), parameters.get("project_id")):
if value:
return str(value)
return _LEGACY_PROJECT_ID
def _extract_run_id(audit_context: dict[str, Any]) -> uuid.UUID | None:
value = audit_context.get("run_id")
if not value:
return None
try:
return uuid.UUID(str(value))
except (TypeError, ValueError):
return None
def _compact(value: Any, limit: int) -> str | None:
if value is None:
return None
return str(value)[:limit]
def _stable_hash(value: Any) -> str:
canonical = json.dumps(value, sort_keys=True, ensure_ascii=False, default=str)
return hashlib.sha256(canonical.encode()).hexdigest()
def _bridge_tool_name(mcp_server: str, tool_name: str) -> str:
combined = f"legacy:{mcp_server}:{tool_name}"
if len(combined) <= 128:
return combined
return str(tool_name)[:128]
def _should_bridge_to_awooop(audit_context: dict[str, Any]) -> bool:
"""Skip real Gateway calls; they already write first-class audit rows."""
return audit_context.get("gateway_path") != _REAL_GATEWAY_PATH
async def _record_awooop_gateway_bridge(
db: Any,
*,
project_id: str,
run_id: uuid.UUID | None,
trace_id: str | None,
agent_id: str | None,
mcp_server: str,
tool_name: str,
input_params: dict[str, Any],
output_result: Any | None,
duration_ms: int,
success: bool,
error_message: str | None,
flywheel_node: str | None,
audit_context: dict[str, Any],
) -> None:
"""Mirror legacy direct provider calls into AwoooP audit as bridge rows."""
result_status = "success" if success else "failed"
gate_result = {
"schema_version": "legacy_mcp_bridge_v1",
"gateway_path": audit_context.get("gateway_path") or "legacy_registry_provider",
"policy_enforced": False,
"not_used_reason": "legacy direct provider path; bridge audit only",
"legacy_mcp_server": mcp_server,
"legacy_tool_name": tool_name,
"flywheel_node": flywheel_node,
}
await db.execute(
text(
"""
INSERT INTO awooop_mcp_gateway_audit (
project_id, run_id, trace_id, agent_id,
tool_name, input_hash, output_hash, gate_result,
result_status, block_gate, block_reason, latency_ms
)
VALUES (
:project_id, :run_id, :trace_id, :agent_id,
:tool_name, :input_hash, :output_hash, CAST(:gate_result AS jsonb),
:result_status, NULL, :block_reason, :latency_ms
)
"""
),
{
"project_id": project_id,
"run_id": run_id,
"trace_id": _compact(trace_id, 128),
"agent_id": _compact(agent_id or "legacy-mcp-provider", 128),
"tool_name": _bridge_tool_name(mcp_server, tool_name),
"input_hash": _stable_hash(input_params),
"output_hash": _stable_hash(output_result) if output_result is not None else None,
"gate_result": json.dumps(gate_result, ensure_ascii=False, default=str),
"result_status": result_status,
"block_reason": _compact(error_message, 256) if not success else None,
"latency_ms": duration_ms,
},
)
async def record_mcp_call(
*,
mcp_server: str,
@@ -102,9 +208,13 @@ async def record_mcp_call(
flywheel_node = flywheel_node or infer_flywheel_node(mcp_server, tool_name)
incident_id = incident_id or _extract_incident_id(input_params)
agent_role = agent_role or audit_context.get("agent_role")
project_id = _extract_project_id(input_params, audit_context)
run_id = _extract_run_id(audit_context)
trace_id = audit_context.get("trace_id") or incident_id or session_id
agent_id = audit_context.get("agent_id") or agent_role
try:
async with get_db_context() as db:
async with get_db_context(project_id) as db:
await db.execute(
text(
"""
@@ -165,6 +275,23 @@ async def record_mcp_call(
"duration_ms": duration_ms,
},
)
if _should_bridge_to_awooop(audit_context):
await _record_awooop_gateway_bridge(
db,
project_id=project_id,
run_id=run_id,
trace_id=trace_id,
agent_id=agent_id,
mcp_server=mcp_server,
tool_name=tool_name,
input_params=input_params,
output_result=output_result,
duration_ms=duration_ms,
success=success,
error_message=error_message,
flywheel_node=flywheel_node,
audit_context=audit_context,
)
except Exception as exc:
logger.warning(
"mcp_audit_write_failed",

View File

@@ -10,8 +10,10 @@ from src.services import mcp_audit_service
class _FakeDb:
def __init__(self) -> None:
self.executed_params: list[dict[str, Any]] = []
self.statements: list[str] = []
async def execute(self, _statement: Any, params: dict[str, Any]) -> None:
self.statements.append(str(_statement))
self.executed_params.append(params)
@@ -32,7 +34,7 @@ async def test_record_mcp_call_normalizes_long_session_id(monkeypatch) -> None:
monkeypatch.setattr(
mcp_audit_service,
"get_db_context",
lambda: _FakeDbContext(db),
lambda _project_id=None: _FakeDbContext(db),
)
await mcp_audit_service.record_mcp_call(
@@ -53,3 +55,76 @@ async def test_record_mcp_call_normalizes_long_session_id(monkeypatch) -> None:
audit_insert_params = db.executed_params[0]
assert audit_insert_params["session_id"] == "inc:INC-20260505-E8033A:pre"
assert len(audit_insert_params["session_id"]) <= 36
@pytest.mark.asyncio
async def test_record_mcp_call_bridges_legacy_direct_path_to_awooop_audit(monkeypatch) -> None:
db = _FakeDb()
monkeypatch.setattr(
mcp_audit_service,
"get_db_context",
lambda _project_id=None: _FakeDbContext(db),
)
await mcp_audit_service.record_mcp_call(
mcp_server="ssh_host",
tool_name="ssh_get_docker_logs",
input_params={
"_mcp_audit": {
"session_id": "incident:INC-20260512-B6C589:pre_decision",
"incident_id": "INC-20260512-B6C589",
"agent_role": "pre_decision_investigator",
"gateway_path": "legacy_direct_provider",
},
"project_id": "awoooi",
"secret": "must-not-leak",
},
output_result={"ok": True},
duration_ms=37,
success=True,
error_message=None,
)
bridge_params = db.executed_params[2]
assert "INSERT INTO awooop_mcp_gateway_audit" in db.statements[2]
assert bridge_params["project_id"] == "awoooi"
assert bridge_params["run_id"] is None
assert bridge_params["trace_id"] == "INC-20260512-B6C589"
assert bridge_params["agent_id"] == "pre_decision_investigator"
assert bridge_params["tool_name"] == "legacy:ssh_host:ssh_get_docker_logs"
assert bridge_params["result_status"] == "success"
assert bridge_params["block_reason"] is None
assert bridge_params["latency_ms"] == 37
assert "legacy_mcp_bridge_v1" in bridge_params["gate_result"]
assert "legacy direct provider path; bridge audit only" in bridge_params["gate_result"]
@pytest.mark.asyncio
async def test_record_mcp_call_skips_bridge_for_first_class_awooop_gateway(monkeypatch) -> None:
db = _FakeDb()
monkeypatch.setattr(
mcp_audit_service,
"get_db_context",
lambda _project_id=None: _FakeDbContext(db),
)
await mcp_audit_service.record_mcp_call(
mcp_server="kubernetes",
tool_name="kubectl_get",
input_params={
"_mcp_audit": {
"project_id": "awoooi",
"run_id": "9d769d02-a036-4c9d-b659-5656c8d1bd5d",
"agent_id": "openclaw-sre",
"trace_id": "trace-1",
"gateway_path": "awooop_mcp_gateway",
},
},
output_result={"items": []},
duration_ms=18,
success=True,
error_message=None,
)
assert len(db.executed_params) == 2
assert all("awooop_mcp_gateway_audit" not in statement for statement in db.statements)

View File

@@ -6394,3 +6394,65 @@ envelope_adapter=legacy_telegram_gateway envelope_has_payload_sha=True envelope_
- T1 Channel Event hardening已完成 deploy + production smoke。
- 仍不能宣稱完整 AI 自動修復閉環已完成T2 MCP Gateway mandatory audit、T3 Ansible executor、T4 Drift fingerprint FSM、T5 Incident status reconciliation 仍待推進。
## 2026-05-12台北— T2 MCP Gateway mandatory auditlegacy MCP bridge 第一段
**背景**
- T0 truth-chain smoke 顯示 `awooop_mcp_gateway_audit` 仍是 0但 legacy `mcp_audit_log` 24h 有大量 MCP 呼叫。
- 這代表 MCP 能力有被部分使用,卻不是 AwoooP Gateway 單一治理鏈Telegram / truth-chain 不能完整回答「用了哪些 MCP、自建 MCP、有沒有成功、是否被治理」。
- T2 第一段先不改執行語意,避免影響修復行為;先把 legacy direct provider path 橋接到 AwoooP audit。
**改動**
- `mcp_audit_service.record_mcp_call()`
- 保留原本 `mcp_audit_log``mcp_daily_stats` 寫入。
- 對 legacy direct provider path 額外寫一筆 `awooop_mcp_gateway_audit` bridge row。
- `gate_result` 明確標示:
- `schema_version=legacy_mcp_bridge_v1`
- `policy_enforced=false`
- `not_used_reason=legacy direct provider path; bridge audit only`
- legacy server/tool/flywheel node。
- `trace_id` 優先使用 incident_id讓 truth-chain 可用 incident id 串回 MCP 成敗。
- `McpGateway._execute_tool()`
- 真正走 AwoooP Gateway 的 provider 呼叫標記 `gateway_path=awooop_mcp_gateway`,避免 legacy bridge 重複寫。
- `test_mcp_audit_service.py` 新增:
- legacy direct path 會寫 bridge row。
- first-class gateway path 不重複 bridge。
**本地驗證**
```text
python -m py_compile \
apps/api/src/services/mcp_audit_service.py \
apps/api/src/plugins/mcp/gateway.py \
apps/api/tests/test_mcp_audit_service.py
# OK
/Users/ogt/awoooi/apps/api/.venv/bin/ruff check --select F,E9 \
apps/api/src/services/mcp_audit_service.py \
apps/api/src/plugins/mcp/gateway.py \
apps/api/tests/test_mcp_audit_service.py
# All checks passed
DATABASE_URL='postgresql+asyncpg://awoooi:awoooi_test_2026@localhost:5432/awoooi_test?ssl=disable' \
python -m pytest \
apps/api/tests/test_mcp_audit_service.py \
apps/api/tests/test_mcp_gateway_audit.py \
apps/api/tests/test_mcp_gateway_gate5.py -q
# 7 passed
DATABASE_URL='postgresql+asyncpg://awoooi:awoooi_test_2026@localhost:5432/awoooi_test?ssl=disable' \
python -m pytest \
apps/api/tests/test_mcp_audit_context.py \
apps/api/tests/test_pre_decision_investigator.py \
apps/api/tests/test_post_execution_verifier.py \
apps/api/tests/test_callback_dispatcher.py \
apps/api/tests/test_approval_execution_mcp_audit.py -q
# 90 passed
```
**下一步**
- 推 Gitea main等待 CD 部署。
- production rollback smoke呼叫 `record_mcp_call()`,確認同一 transaction 內同時可見 legacy `mcp_audit_log``awooop_mcp_gateway_audit` bridge rowrollback 後不污染 production。

View File

@@ -1866,6 +1866,25 @@ Phase 6 完成後
---
### 2026-05-12 晚 (台北) — T2 MCP Gateway mandatory audit — legacy MCP bridge 第一段
**範圍**
- `record_mcp_call()` 保留 legacy `mcp_audit_log` 寫入,同步橋接一筆 `awooop_mcp_gateway_audit`
- bridge row 的 `gate_result` 明確標示 `schema_version=legacy_mcp_bridge_v1``policy_enforced=false``not_used_reason=legacy direct provider path; bridge audit only`,避免把 legacy direct call 誤認為已完成五閘門治理。
- 真正走 `McpGateway` 的 provider 呼叫標記 `gateway_path=awooop_mcp_gateway`,避免重複 bridge。
- bridge `trace_id` 優先使用 `incident_id`,讓 truth-chain 可用 incident id 回查 legacy MCP 成敗。
**已驗證**
- 本地 `py_compile` / `ruff F,E9` 通過。
- MCP gateway / audit 聚焦測試 7 passed。
- PreDecision / PostExecution / Callback / Approval MCP audit 相鄰流程測試 90 passed。
**仍未宣稱完成**
- T2 API image 尚需部署後 production rollback smoke。
- 這只是 legacy bridge不是把所有呼叫強制改經 AwoooP GatewayT2 後續仍要把新 MCP caller 收斂到 first-class Gateway path。
---
### 2026-04-20 晚 (台北) — C1-C4 全流程串接 — Playbook 鏈路保護commit de2d34d
**觸發**:統帥全景盤查 AI 自動化節點後,發現 Playbook 自動修復鏈路有 3 個結構性斷點。