feat(awooop): bridge legacy mcp audit into gateway timeline
This commit is contained in:
@@ -412,6 +412,7 @@ class McpGateway:
|
||||
"agent_id": ctx.agent_id,
|
||||
"run_id": str(ctx.run_id) if ctx.run_id else None,
|
||||
"trace_id": ctx.trace_id,
|
||||
"gateway_path": "awooop_mcp_gateway",
|
||||
}
|
||||
return await provider.execute(ctx.tool_name, audit_params)
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ before every runtime model has been refreshed.
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import hashlib
|
||||
import time
|
||||
import uuid
|
||||
from typing import Any
|
||||
@@ -22,6 +23,8 @@ from src.services.mcp_audit_context import normalize_mcp_audit_session_id
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
_REDACT_KEYS = {"token", "password", "secret", "api_key", "authorization", "key"}
|
||||
_LEGACY_PROJECT_ID = "awoooi"
|
||||
_REAL_GATEWAY_PATH = "awooop_mcp_gateway"
|
||||
|
||||
|
||||
def infer_flywheel_node(mcp_server: str, tool_name: str) -> str:
|
||||
@@ -79,6 +82,109 @@ def _extract_audit_context(parameters: dict[str, Any]) -> dict[str, Any]:
|
||||
return audit_context if isinstance(audit_context, dict) else {}
|
||||
|
||||
|
||||
def _extract_project_id(
|
||||
parameters: dict[str, Any],
|
||||
audit_context: dict[str, Any],
|
||||
) -> str:
|
||||
"""Legacy MCP calls predate AwoooP tenancy; keep the bridge explicit."""
|
||||
for value in (audit_context.get("project_id"), parameters.get("project_id")):
|
||||
if value:
|
||||
return str(value)
|
||||
return _LEGACY_PROJECT_ID
|
||||
|
||||
|
||||
def _extract_run_id(audit_context: dict[str, Any]) -> uuid.UUID | None:
|
||||
value = audit_context.get("run_id")
|
||||
if not value:
|
||||
return None
|
||||
try:
|
||||
return uuid.UUID(str(value))
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def _compact(value: Any, limit: int) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
return str(value)[:limit]
|
||||
|
||||
|
||||
def _stable_hash(value: Any) -> str:
|
||||
canonical = json.dumps(value, sort_keys=True, ensure_ascii=False, default=str)
|
||||
return hashlib.sha256(canonical.encode()).hexdigest()
|
||||
|
||||
|
||||
def _bridge_tool_name(mcp_server: str, tool_name: str) -> str:
|
||||
combined = f"legacy:{mcp_server}:{tool_name}"
|
||||
if len(combined) <= 128:
|
||||
return combined
|
||||
return str(tool_name)[:128]
|
||||
|
||||
|
||||
def _should_bridge_to_awooop(audit_context: dict[str, Any]) -> bool:
|
||||
"""Skip real Gateway calls; they already write first-class audit rows."""
|
||||
return audit_context.get("gateway_path") != _REAL_GATEWAY_PATH
|
||||
|
||||
|
||||
async def _record_awooop_gateway_bridge(
|
||||
db: Any,
|
||||
*,
|
||||
project_id: str,
|
||||
run_id: uuid.UUID | None,
|
||||
trace_id: str | None,
|
||||
agent_id: str | None,
|
||||
mcp_server: str,
|
||||
tool_name: str,
|
||||
input_params: dict[str, Any],
|
||||
output_result: Any | None,
|
||||
duration_ms: int,
|
||||
success: bool,
|
||||
error_message: str | None,
|
||||
flywheel_node: str | None,
|
||||
audit_context: dict[str, Any],
|
||||
) -> None:
|
||||
"""Mirror legacy direct provider calls into AwoooP audit as bridge rows."""
|
||||
result_status = "success" if success else "failed"
|
||||
gate_result = {
|
||||
"schema_version": "legacy_mcp_bridge_v1",
|
||||
"gateway_path": audit_context.get("gateway_path") or "legacy_registry_provider",
|
||||
"policy_enforced": False,
|
||||
"not_used_reason": "legacy direct provider path; bridge audit only",
|
||||
"legacy_mcp_server": mcp_server,
|
||||
"legacy_tool_name": tool_name,
|
||||
"flywheel_node": flywheel_node,
|
||||
}
|
||||
await db.execute(
|
||||
text(
|
||||
"""
|
||||
INSERT INTO awooop_mcp_gateway_audit (
|
||||
project_id, run_id, trace_id, agent_id,
|
||||
tool_name, input_hash, output_hash, gate_result,
|
||||
result_status, block_gate, block_reason, latency_ms
|
||||
)
|
||||
VALUES (
|
||||
:project_id, :run_id, :trace_id, :agent_id,
|
||||
:tool_name, :input_hash, :output_hash, CAST(:gate_result AS jsonb),
|
||||
:result_status, NULL, :block_reason, :latency_ms
|
||||
)
|
||||
"""
|
||||
),
|
||||
{
|
||||
"project_id": project_id,
|
||||
"run_id": run_id,
|
||||
"trace_id": _compact(trace_id, 128),
|
||||
"agent_id": _compact(agent_id or "legacy-mcp-provider", 128),
|
||||
"tool_name": _bridge_tool_name(mcp_server, tool_name),
|
||||
"input_hash": _stable_hash(input_params),
|
||||
"output_hash": _stable_hash(output_result) if output_result is not None else None,
|
||||
"gate_result": json.dumps(gate_result, ensure_ascii=False, default=str),
|
||||
"result_status": result_status,
|
||||
"block_reason": _compact(error_message, 256) if not success else None,
|
||||
"latency_ms": duration_ms,
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
async def record_mcp_call(
|
||||
*,
|
||||
mcp_server: str,
|
||||
@@ -102,9 +208,13 @@ async def record_mcp_call(
|
||||
flywheel_node = flywheel_node or infer_flywheel_node(mcp_server, tool_name)
|
||||
incident_id = incident_id or _extract_incident_id(input_params)
|
||||
agent_role = agent_role or audit_context.get("agent_role")
|
||||
project_id = _extract_project_id(input_params, audit_context)
|
||||
run_id = _extract_run_id(audit_context)
|
||||
trace_id = audit_context.get("trace_id") or incident_id or session_id
|
||||
agent_id = audit_context.get("agent_id") or agent_role
|
||||
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
async with get_db_context(project_id) as db:
|
||||
await db.execute(
|
||||
text(
|
||||
"""
|
||||
@@ -165,6 +275,23 @@ async def record_mcp_call(
|
||||
"duration_ms": duration_ms,
|
||||
},
|
||||
)
|
||||
if _should_bridge_to_awooop(audit_context):
|
||||
await _record_awooop_gateway_bridge(
|
||||
db,
|
||||
project_id=project_id,
|
||||
run_id=run_id,
|
||||
trace_id=trace_id,
|
||||
agent_id=agent_id,
|
||||
mcp_server=mcp_server,
|
||||
tool_name=tool_name,
|
||||
input_params=input_params,
|
||||
output_result=output_result,
|
||||
duration_ms=duration_ms,
|
||||
success=success,
|
||||
error_message=error_message,
|
||||
flywheel_node=flywheel_node,
|
||||
audit_context=audit_context,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"mcp_audit_write_failed",
|
||||
|
||||
@@ -10,8 +10,10 @@ from src.services import mcp_audit_service
|
||||
class _FakeDb:
|
||||
def __init__(self) -> None:
|
||||
self.executed_params: list[dict[str, Any]] = []
|
||||
self.statements: list[str] = []
|
||||
|
||||
async def execute(self, _statement: Any, params: dict[str, Any]) -> None:
|
||||
self.statements.append(str(_statement))
|
||||
self.executed_params.append(params)
|
||||
|
||||
|
||||
@@ -32,7 +34,7 @@ async def test_record_mcp_call_normalizes_long_session_id(monkeypatch) -> None:
|
||||
monkeypatch.setattr(
|
||||
mcp_audit_service,
|
||||
"get_db_context",
|
||||
lambda: _FakeDbContext(db),
|
||||
lambda _project_id=None: _FakeDbContext(db),
|
||||
)
|
||||
|
||||
await mcp_audit_service.record_mcp_call(
|
||||
@@ -53,3 +55,76 @@ async def test_record_mcp_call_normalizes_long_session_id(monkeypatch) -> None:
|
||||
audit_insert_params = db.executed_params[0]
|
||||
assert audit_insert_params["session_id"] == "inc:INC-20260505-E8033A:pre"
|
||||
assert len(audit_insert_params["session_id"]) <= 36
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_record_mcp_call_bridges_legacy_direct_path_to_awooop_audit(monkeypatch) -> None:
|
||||
db = _FakeDb()
|
||||
monkeypatch.setattr(
|
||||
mcp_audit_service,
|
||||
"get_db_context",
|
||||
lambda _project_id=None: _FakeDbContext(db),
|
||||
)
|
||||
|
||||
await mcp_audit_service.record_mcp_call(
|
||||
mcp_server="ssh_host",
|
||||
tool_name="ssh_get_docker_logs",
|
||||
input_params={
|
||||
"_mcp_audit": {
|
||||
"session_id": "incident:INC-20260512-B6C589:pre_decision",
|
||||
"incident_id": "INC-20260512-B6C589",
|
||||
"agent_role": "pre_decision_investigator",
|
||||
"gateway_path": "legacy_direct_provider",
|
||||
},
|
||||
"project_id": "awoooi",
|
||||
"secret": "must-not-leak",
|
||||
},
|
||||
output_result={"ok": True},
|
||||
duration_ms=37,
|
||||
success=True,
|
||||
error_message=None,
|
||||
)
|
||||
|
||||
bridge_params = db.executed_params[2]
|
||||
assert "INSERT INTO awooop_mcp_gateway_audit" in db.statements[2]
|
||||
assert bridge_params["project_id"] == "awoooi"
|
||||
assert bridge_params["run_id"] is None
|
||||
assert bridge_params["trace_id"] == "INC-20260512-B6C589"
|
||||
assert bridge_params["agent_id"] == "pre_decision_investigator"
|
||||
assert bridge_params["tool_name"] == "legacy:ssh_host:ssh_get_docker_logs"
|
||||
assert bridge_params["result_status"] == "success"
|
||||
assert bridge_params["block_reason"] is None
|
||||
assert bridge_params["latency_ms"] == 37
|
||||
assert "legacy_mcp_bridge_v1" in bridge_params["gate_result"]
|
||||
assert "legacy direct provider path; bridge audit only" in bridge_params["gate_result"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_record_mcp_call_skips_bridge_for_first_class_awooop_gateway(monkeypatch) -> None:
|
||||
db = _FakeDb()
|
||||
monkeypatch.setattr(
|
||||
mcp_audit_service,
|
||||
"get_db_context",
|
||||
lambda _project_id=None: _FakeDbContext(db),
|
||||
)
|
||||
|
||||
await mcp_audit_service.record_mcp_call(
|
||||
mcp_server="kubernetes",
|
||||
tool_name="kubectl_get",
|
||||
input_params={
|
||||
"_mcp_audit": {
|
||||
"project_id": "awoooi",
|
||||
"run_id": "9d769d02-a036-4c9d-b659-5656c8d1bd5d",
|
||||
"agent_id": "openclaw-sre",
|
||||
"trace_id": "trace-1",
|
||||
"gateway_path": "awooop_mcp_gateway",
|
||||
},
|
||||
},
|
||||
output_result={"items": []},
|
||||
duration_ms=18,
|
||||
success=True,
|
||||
error_message=None,
|
||||
)
|
||||
|
||||
assert len(db.executed_params) == 2
|
||||
assert all("awooop_mcp_gateway_audit" not in statement for statement in db.statements)
|
||||
|
||||
@@ -6394,3 +6394,65 @@ envelope_adapter=legacy_telegram_gateway envelope_has_payload_sha=True envelope_
|
||||
|
||||
- T1 Channel Event hardening:已完成 deploy + production smoke。
|
||||
- 仍不能宣稱完整 AI 自動修復閉環已完成;T2 MCP Gateway mandatory audit、T3 Ansible executor、T4 Drift fingerprint FSM、T5 Incident status reconciliation 仍待推進。
|
||||
|
||||
## 2026-05-12(台北)— T2 MCP Gateway mandatory audit:legacy MCP bridge 第一段
|
||||
|
||||
**背景**:
|
||||
|
||||
- T0 truth-chain smoke 顯示 `awooop_mcp_gateway_audit` 仍是 0,但 legacy `mcp_audit_log` 24h 有大量 MCP 呼叫。
|
||||
- 這代表 MCP 能力有被部分使用,卻不是 AwoooP Gateway 單一治理鏈;Telegram / truth-chain 不能完整回答「用了哪些 MCP、自建 MCP、有沒有成功、是否被治理」。
|
||||
- T2 第一段先不改執行語意,避免影響修復行為;先把 legacy direct provider path 橋接到 AwoooP audit。
|
||||
|
||||
**改動**:
|
||||
|
||||
- `mcp_audit_service.record_mcp_call()`:
|
||||
- 保留原本 `mcp_audit_log` 與 `mcp_daily_stats` 寫入。
|
||||
- 對 legacy direct provider path 額外寫一筆 `awooop_mcp_gateway_audit` bridge row。
|
||||
- `gate_result` 明確標示:
|
||||
- `schema_version=legacy_mcp_bridge_v1`
|
||||
- `policy_enforced=false`
|
||||
- `not_used_reason=legacy direct provider path; bridge audit only`
|
||||
- legacy server/tool/flywheel node。
|
||||
- `trace_id` 優先使用 incident_id,讓 truth-chain 可用 incident id 串回 MCP 成敗。
|
||||
- `McpGateway._execute_tool()`:
|
||||
- 真正走 AwoooP Gateway 的 provider 呼叫標記 `gateway_path=awooop_mcp_gateway`,避免 legacy bridge 重複寫。
|
||||
- `test_mcp_audit_service.py` 新增:
|
||||
- legacy direct path 會寫 bridge row。
|
||||
- first-class gateway path 不重複 bridge。
|
||||
|
||||
**本地驗證**:
|
||||
|
||||
```text
|
||||
python -m py_compile \
|
||||
apps/api/src/services/mcp_audit_service.py \
|
||||
apps/api/src/plugins/mcp/gateway.py \
|
||||
apps/api/tests/test_mcp_audit_service.py
|
||||
# OK
|
||||
|
||||
/Users/ogt/awoooi/apps/api/.venv/bin/ruff check --select F,E9 \
|
||||
apps/api/src/services/mcp_audit_service.py \
|
||||
apps/api/src/plugins/mcp/gateway.py \
|
||||
apps/api/tests/test_mcp_audit_service.py
|
||||
# All checks passed
|
||||
|
||||
DATABASE_URL='postgresql+asyncpg://awoooi:awoooi_test_2026@localhost:5432/awoooi_test?ssl=disable' \
|
||||
python -m pytest \
|
||||
apps/api/tests/test_mcp_audit_service.py \
|
||||
apps/api/tests/test_mcp_gateway_audit.py \
|
||||
apps/api/tests/test_mcp_gateway_gate5.py -q
|
||||
# 7 passed
|
||||
|
||||
DATABASE_URL='postgresql+asyncpg://awoooi:awoooi_test_2026@localhost:5432/awoooi_test?ssl=disable' \
|
||||
python -m pytest \
|
||||
apps/api/tests/test_mcp_audit_context.py \
|
||||
apps/api/tests/test_pre_decision_investigator.py \
|
||||
apps/api/tests/test_post_execution_verifier.py \
|
||||
apps/api/tests/test_callback_dispatcher.py \
|
||||
apps/api/tests/test_approval_execution_mcp_audit.py -q
|
||||
# 90 passed
|
||||
```
|
||||
|
||||
**下一步**:
|
||||
|
||||
- 推 Gitea main,等待 CD 部署。
|
||||
- production rollback smoke:呼叫 `record_mcp_call()`,確認同一 transaction 內同時可見 legacy `mcp_audit_log` 與 `awooop_mcp_gateway_audit` bridge row,rollback 後不污染 production。
|
||||
|
||||
@@ -1866,6 +1866,25 @@ Phase 6 完成後
|
||||
|
||||
---
|
||||
|
||||
### 2026-05-12 晚 (台北) — T2 MCP Gateway mandatory audit — legacy MCP bridge 第一段
|
||||
|
||||
**範圍**:
|
||||
- `record_mcp_call()` 保留 legacy `mcp_audit_log` 寫入,同步橋接一筆 `awooop_mcp_gateway_audit`。
|
||||
- bridge row 的 `gate_result` 明確標示 `schema_version=legacy_mcp_bridge_v1`、`policy_enforced=false`、`not_used_reason=legacy direct provider path; bridge audit only`,避免把 legacy direct call 誤認為已完成五閘門治理。
|
||||
- 真正走 `McpGateway` 的 provider 呼叫標記 `gateway_path=awooop_mcp_gateway`,避免重複 bridge。
|
||||
- bridge `trace_id` 優先使用 `incident_id`,讓 truth-chain 可用 incident id 回查 legacy MCP 成敗。
|
||||
|
||||
**已驗證**:
|
||||
- 本地 `py_compile` / `ruff F,E9` 通過。
|
||||
- MCP gateway / audit 聚焦測試 7 passed。
|
||||
- PreDecision / PostExecution / Callback / Approval MCP audit 相鄰流程測試 90 passed。
|
||||
|
||||
**仍未宣稱完成**:
|
||||
- T2 API image 尚需部署後 production rollback smoke。
|
||||
- 這只是 legacy bridge,不是把所有呼叫強制改經 AwoooP Gateway;T2 後續仍要把新 MCP caller 收斂到 first-class Gateway path。
|
||||
|
||||
---
|
||||
|
||||
### 2026-04-20 晚 (台北) — C1-C4 全流程串接 — Playbook 鏈路保護(commit de2d34d)
|
||||
|
||||
**觸發**:統帥全景盤查 AI 自動化節點後,發現 Playbook 自動修復鏈路有 3 個結構性斷點。
|
||||
|
||||
Reference in New Issue
Block a user