From fa0e956c0e823bfb5481b7dfbef34c490823c310 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 6 May 2026 17:18:52 +0800 Subject: [PATCH] fix(mcp): tag legacy provider calls with audit context --- apps/api/src/plugins/mcp/mcp_bridge.py | 8 +++ apps/api/src/services/callback_dispatcher.py | 11 +++- .../src/services/heartbeat_report_service.py | 30 ++++++++--- apps/api/src/services/mcp_audit_context.py | 49 ++++++++++++++++++ .../src/services/post_execution_verifier.py | 13 +++-- .../src/services/pre_decision_investigator.py | 10 +++- apps/api/tests/test_callback_dispatcher.py | 36 +++++++++++++ apps/api/tests/test_mcp_audit_context.py | 45 ++++++++++++++++ .../api/tests/test_post_execution_verifier.py | 51 +++++++++++++++++++ .../tests/test_pre_decision_investigator.py | 28 ++++++++++ docs/LOGBOOK.md | 30 +++++++++++ 11 files changed, 300 insertions(+), 11 deletions(-) create mode 100644 apps/api/src/services/mcp_audit_context.py create mode 100644 apps/api/tests/test_mcp_audit_context.py diff --git a/apps/api/src/plugins/mcp/mcp_bridge.py b/apps/api/src/plugins/mcp/mcp_bridge.py index 782c89f5..5daf14b1 100644 --- a/apps/api/src/plugins/mcp/mcp_bridge.py +++ b/apps/api/src/plugins/mcp/mcp_bridge.py @@ -24,6 +24,7 @@ from typing import Any import httpx from src.core.config import settings # P0-13: K8s namespace 由 settings.AWOOOI_K8S_NAMESPACE 提供 +from src.services.mcp_audit_context import with_mcp_audit_context from src.utils.timezone import now_taipei logger = logging.getLogger(__name__) @@ -518,6 +519,13 @@ class MCPBridge: raise ValueError(f"Unknown MCP Server: {server_name}") server = self._servers[server_name] + parameters = with_mcp_audit_context( + parameters, + session_id=f"mcp_bridge:{execution_id}", + flywheel_node="govern", + agent_role="mcp_bridge", + gateway_path="legacy_mcp_bridge", + ) result = await self._execute_tool(server, tool_name, parameters) # ======================================== diff --git a/apps/api/src/services/callback_dispatcher.py b/apps/api/src/services/callback_dispatcher.py index e90b9ddf..94218ed3 100644 --- a/apps/api/src/services/callback_dispatcher.py +++ b/apps/api/src/services/callback_dispatcher.py @@ -280,6 +280,7 @@ async def dispatch_action( # MCP registry dispatch from src.plugins.mcp.registry import get_provider + from src.services.mcp_audit_context import with_mcp_audit_context provider_name = _resolve_provider_name(spec.mcp_provider) provider = get_provider(provider_name) if not provider: @@ -293,8 +294,16 @@ async def dispatch_action( ) # 執行 MCP tool with timeout + audited_params = with_mcp_audit_context( + resolved_params, + session_id=f"callback:{incident_id}:{action_name}", + incident_id=incident_id, + flywheel_node="operate", + agent_role="telegram_callback_dispatcher", + operator_user_id=user_id, + ) mcp_result = await asyncio.wait_for( - provider.execute(spec.mcp_tool, resolved_params), + provider.execute(spec.mcp_tool, audited_params), timeout=float(spec.timeout_sec), ) diff --git a/apps/api/src/services/heartbeat_report_service.py b/apps/api/src/services/heartbeat_report_service.py index 106e6f1e..3d856293 100644 --- a/apps/api/src/services/heartbeat_report_service.py +++ b/apps/api/src/services/heartbeat_report_service.py @@ -317,11 +317,20 @@ class HeartbeatReportService: """K8s MCP:確認 kubectl 能連到 K3s""" try: from src.plugins.mcp.providers.k8s_provider import K8sProvider - provider = K8sProvider() + from src.plugins.mcp.registry import AuditedMCPToolProvider + from src.services.mcp_audit_context import with_mcp_audit_context + provider = AuditedMCPToolProvider(K8sProvider()) if not provider.enabled: return ProbeResult(False, "⚠️ K8s MCP 未啟用") + params = with_mcp_audit_context( + {"resource_type": "nodes"}, + session_id="heartbeat:mcp_k8s", + flywheel_node="govern", + agent_role="heartbeat_report_service", + gateway_path="legacy_heartbeat_provider", + ) result = await asyncio.wait_for( - provider.execute("kubectl_get", {"resource_type": "nodes"}), + provider.execute("kubectl_get", params), timeout=_PROBE_TIMEOUT, ) if result.success: @@ -389,14 +398,23 @@ class HeartbeatReportService: """Velero 備份:確認最後一次備份是否在 26 小時內""" try: from src.plugins.mcp.providers.k8s_provider import K8sProvider - provider = K8sProvider() + from src.plugins.mcp.registry import AuditedMCPToolProvider + from src.services.mcp_audit_context import with_mcp_audit_context + provider = AuditedMCPToolProvider(K8sProvider()) if not provider.enabled: return ProbeResult(False, "⚠️ K8s MCP 未啟用,無法查 Velero") - result = await asyncio.wait_for( - provider.execute("kubectl_get", { + params = with_mcp_audit_context( + { "resource_type": "backups.velero.io", "namespace": "velero", - }), + }, + session_id="heartbeat:velero", + flywheel_node="govern", + agent_role="heartbeat_report_service", + gateway_path="legacy_heartbeat_provider", + ) + result = await asyncio.wait_for( + provider.execute("kubectl_get", params), timeout=_PROBE_TIMEOUT, ) if not result.success: diff --git a/apps/api/src/services/mcp_audit_context.py b/apps/api/src/services/mcp_audit_context.py new file mode 100644 index 00000000..b89ccb47 --- /dev/null +++ b/apps/api/src/services/mcp_audit_context.py @@ -0,0 +1,49 @@ +"""MCP audit context helpers. + +Legacy production callers still use the provider registry directly while +AwoooP MCP Gateway is rolled in by lane. These helpers make those calls +observable without changing execution semantics. +""" + +from __future__ import annotations + +from typing import Any + + +def build_mcp_audit_context( + *, + session_id: str | None = None, + incident_id: str | None = None, + flywheel_node: str | None = None, + agent_role: str | None = None, + gateway_path: str = "legacy_registry_provider", + operator_user_id: int | str | None = None, +) -> dict[str, Any]: + """Build the `_mcp_audit` metadata object carried beside tool params.""" + + context: dict[str, Any] = { + "gateway_path": gateway_path, + } + optional_values = { + "session_id": session_id, + "incident_id": incident_id, + "flywheel_node": flywheel_node, + "agent_role": agent_role, + "operator_user_id": operator_user_id, + } + context.update({key: value for key, value in optional_values.items() if value is not None}) + return context + + +def with_mcp_audit_context( + parameters: dict[str, Any], + **audit_kwargs: Any, +) -> dict[str, Any]: + """Return a shallow copy of provider parameters with merged audit context.""" + + audited = dict(parameters) + existing = audited.get("_mcp_audit") + merged = dict(existing) if isinstance(existing, dict) else {} + merged.update(build_mcp_audit_context(**audit_kwargs)) + audited["_mcp_audit"] = merged + return audited diff --git a/apps/api/src/services/post_execution_verifier.py b/apps/api/src/services/post_execution_verifier.py index 87cbe2fa..b32b55b0 100644 --- a/apps/api/src/services/post_execution_verifier.py +++ b/apps/api/src/services/post_execution_verifier.py @@ -34,14 +34,14 @@ MASTER §3.1 L6×D1 from __future__ import annotations import asyncio -import time from typing import TYPE_CHECKING, Any import structlog from src.services.evidence_snapshot import EvidenceSnapshot +from src.services.mcp_audit_context import with_mcp_audit_context from src.services.mcp_tool_registry import SensorDimension, get_mcp_tool_registry -from src.services.sanitization_service import sanitize_dict_values +from src.services.sanitization_service import sanitize, sanitize_dict_values # W2 PR-V1: 頂層 import 讓測試 patch 路徑固定(延遲 import 無法被 patch) # ENABLE_SELF_HEALING_VALIDATOR=False 時此 import 不影響效能(純 python 模組) from src.services import self_healing_validator as _shv_module @@ -216,8 +216,15 @@ class PostExecutionVerifier: async def _call_one(reg) -> tuple[str, Any]: try: + audited_params = with_mcp_audit_context( + params, + session_id=f"incident:{_get_incident_id(incident)}:post_execution", + incident_id=_get_incident_id(incident), + flywheel_node="verify", + agent_role="post_execution_verifier", + ) result = await asyncio.wait_for( - reg.provider.execute(reg.tool.name, params), + reg.provider.execute(reg.tool.name, audited_params), timeout=TOOL_TIMEOUT_SEC, ) if result.success and result.output: diff --git a/apps/api/src/services/pre_decision_investigator.py b/apps/api/src/services/pre_decision_investigator.py index b5b2c7b6..fc5ccf44 100644 --- a/apps/api/src/services/pre_decision_investigator.py +++ b/apps/api/src/services/pre_decision_investigator.py @@ -33,6 +33,7 @@ from typing import TYPE_CHECKING, Any import structlog from src.services.evidence_snapshot import EvidenceSnapshot +from src.services.mcp_audit_context import with_mcp_audit_context from src.services.mcp_tool_registry import RegisteredTool, SensorDimension, get_mcp_tool_registry from src.services.sanitization_service import sanitize, sanitize_dict_values @@ -323,8 +324,15 @@ class PreDecisionInvestigator: _mcp_status = "failed" _mcp_error = None try: + audited_params = with_mcp_audit_context( + params, + session_id=f"incident:{snapshot.incident_id}:pre_decision", + incident_id=snapshot.incident_id, + flywheel_node="sense", + agent_role="pre_decision_investigator", + ) result = await asyncio.wait_for( - reg.provider.execute(tool_name, params), + reg.provider.execute(tool_name, audited_params), timeout=MCP_TOOL_TIMEOUT_SEC, ) diff --git a/apps/api/tests/test_callback_dispatcher.py b/apps/api/tests/test_callback_dispatcher.py index e80eecb5..9e7bb2d0 100644 --- a/apps/api/tests/test_callback_dispatcher.py +++ b/apps/api/tests/test_callback_dispatcher.py @@ -15,6 +15,7 @@ Phase 5 Sprint 5.0-5.1 Callback Dispatcher 單元測試 import pytest +from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult from src.services import callback_dispatcher as callback_dispatcher_module from src.services.callback_dispatcher import ( dispatch_action, @@ -320,3 +321,38 @@ class TestMcpFailurePath: # 但絕不能 crash assert isinstance(result.success, bool) assert result.result_text # 有合理的錯誤訊息 + + +class _CaptureMcpProvider(MCPToolProvider): + name = "ssh_host" + seen_parameters: dict | None = None + + async def list_tools(self) -> list[MCPTool]: + return [] + + async def execute(self, tool_name: str, parameters: dict) -> MCPToolResult: + self.seen_parameters = dict(parameters) + return MCPToolResult(success=True, output={"stdout": "ok"}) + + +@pytest.mark.asyncio +async def test_dispatch_action_injects_mcp_audit_context(monkeypatch): + provider = _CaptureMcpProvider() + + monkeypatch.setattr("src.plugins.mcp.registry.get_provider", lambda name: provider) + + result = await dispatch_action( + action_name="check_process", + incident_id="INC-CB-AUDIT", + user_id=12345, + labels={"instance": "192.168.0.110"}, + ) + + assert result.success is True + assert provider.seen_parameters is not None + audit_context = provider.seen_parameters["_mcp_audit"] + assert audit_context["incident_id"] == "INC-CB-AUDIT" + assert audit_context["session_id"] == "callback:INC-CB-AUDIT:check_process" + assert audit_context["flywheel_node"] == "operate" + assert audit_context["agent_role"] == "telegram_callback_dispatcher" + assert audit_context["operator_user_id"] == 12345 diff --git a/apps/api/tests/test_mcp_audit_context.py b/apps/api/tests/test_mcp_audit_context.py new file mode 100644 index 00000000..d1cefcac --- /dev/null +++ b/apps/api/tests/test_mcp_audit_context.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +from src.services.mcp_audit_context import ( + build_mcp_audit_context, + with_mcp_audit_context, +) + + +def test_build_mcp_audit_context_keeps_non_empty_fields() -> None: + context = build_mcp_audit_context( + session_id="incident:INC-1:pre_decision", + incident_id="INC-1", + flywheel_node="sense", + agent_role="pre_decision_investigator", + operator_user_id=None, + ) + + assert context == { + "gateway_path": "legacy_registry_provider", + "session_id": "incident:INC-1:pre_decision", + "incident_id": "INC-1", + "flywheel_node": "sense", + "agent_role": "pre_decision_investigator", + } + + +def test_with_mcp_audit_context_merges_existing_context_without_mutating_source() -> None: + params = { + "namespace": "awoooi-prod", + "_mcp_audit": {"trace_id": "trace-1", "flywheel_node": "old"}, + } + + audited = with_mcp_audit_context( + params, + incident_id="INC-2", + flywheel_node="verify", + agent_role="post_execution_verifier", + ) + + assert params["_mcp_audit"]["flywheel_node"] == "old" + assert audited is not params + assert audited["_mcp_audit"]["trace_id"] == "trace-1" + assert audited["_mcp_audit"]["incident_id"] == "INC-2" + assert audited["_mcp_audit"]["flywheel_node"] == "verify" + assert audited["_mcp_audit"]["agent_role"] == "post_execution_verifier" diff --git a/apps/api/tests/test_post_execution_verifier.py b/apps/api/tests/test_post_execution_verifier.py index 6d0cc911..56bcd0e3 100644 --- a/apps/api/tests/test_post_execution_verifier.py +++ b/apps/api/tests/test_post_execution_verifier.py @@ -22,6 +22,7 @@ import asyncio import pytest from unittest.mock import AsyncMock, patch +from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult from src.services.post_execution_verifier import ( PostExecutionVerifier, _assess_recovery, @@ -29,6 +30,7 @@ from src.services.post_execution_verifier import ( _get_labels, ) from src.services.evidence_snapshot import EvidenceSnapshot +from src.services.mcp_tool_registry import RegisteredTool, SensorDimension # ───────────────────────────────────────────────────────────────────────────── @@ -306,3 +308,52 @@ class TestCapturePreState: await verifier.capture_pre_execution_state(incident, snapshot) assert snapshot.pre_execution_state == {} + + +class _CaptureProvider(MCPToolProvider): + name = "capture" + seen_parameters: dict | None = None + + async def list_tools(self) -> list[MCPTool]: + return [] + + async def execute(self, tool_name: str, parameters: dict) -> MCPToolResult: + self.seen_parameters = dict(parameters) + return MCPToolResult(success=True, output={"status": "Running"}) + + +class _FakeRegistry: + def __init__(self, provider: _CaptureProvider) -> None: + self.provider = provider + + def suggest_tools(self, alertname: str = "", incident_labels: dict | None = None) -> list[RegisteredTool]: + return [ + RegisteredTool( + tool=MCPTool( + name="kubectl_get", + description="", + input_schema={}, + server_name="capture", + ), + provider=self.provider, + dimensions=[SensorDimension.D1_K8S_STATE], + ) + ] + + +class TestCollectPostStateAuditContext: + @pytest.mark.asyncio + async def test_collect_post_state_injects_mcp_audit_context(self): + provider = _CaptureProvider() + verifier = PostExecutionVerifier() + verifier._registry = _FakeRegistry(provider) + + state = await verifier._collect_post_state(_stub_incident()) + + assert state["kubectl_get"]["status"] == "Running" + assert provider.seen_parameters is not None + audit_context = provider.seen_parameters["_mcp_audit"] + assert audit_context["incident_id"] == "INC-TEST" + assert audit_context["session_id"] == "incident:INC-TEST:post_execution" + assert audit_context["flywheel_node"] == "verify" + assert audit_context["agent_role"] == "post_execution_verifier" diff --git a/apps/api/tests/test_pre_decision_investigator.py b/apps/api/tests/test_pre_decision_investigator.py index d67c9b95..831a8672 100644 --- a/apps/api/tests/test_pre_decision_investigator.py +++ b/apps/api/tests/test_pre_decision_investigator.py @@ -91,6 +91,18 @@ class _TimeoutProvider(MCPToolProvider): return MCPToolResult(success=True, execution_id="never", output={}) +class _CaptureProvider(_SuccessProvider): + """成功 provider,額外保存收到的參數。""" + + def __init__(self) -> None: + super().__init__({"status": "Running"}) + self.seen_parameters: dict | None = None + + async def execute(self, tool_name: str, parameters: dict) -> MCPToolResult: + self.seen_parameters = dict(parameters) + return await super().execute(tool_name, parameters) + + def _stub_incident( alertname: str = "KubePodCrashLooping", namespace: str = "awoooi-prod", @@ -268,6 +280,22 @@ class TestCollectOne: assert snap.sensors_succeeded == 1 assert snap.k8s_state is not None + @pytest.mark.asyncio + async def test_collect_one_injects_mcp_audit_context(self): + investigator = PreDecisionInvestigator() + snap = EvidenceSnapshot(incident_id="INC-AUDIT") + provider = _CaptureProvider() + reg = _reg("kubectl_describe", provider, SensorDimension.D1_K8S_STATE) + + await investigator._collect_one(snap, reg, {"namespace": "prod"}) + + assert provider.seen_parameters is not None + audit_context = provider.seen_parameters["_mcp_audit"] + assert audit_context["incident_id"] == "INC-AUDIT" + assert audit_context["session_id"] == "incident:INC-AUDIT:pre_decision" + assert audit_context["flywheel_node"] == "sense" + assert audit_context["agent_role"] == "pre_decision_investigator" + @pytest.mark.asyncio async def test_failed_tool_marks_health_false(self): investigator = PreDecisionInvestigator() diff --git a/docs/LOGBOOK.md b/docs/LOGBOOK.md index 8a2cd573..9887a117 100644 --- a/docs/LOGBOOK.md +++ b/docs/LOGBOOK.md @@ -1,3 +1,33 @@ +## 2026-05-06 | MCP legacy provider 路徑補上飛輪稽核脈絡 + +**背景**:AwoooP MCP Gateway 已有五閘門與 gateway audit,但 production 仍有多條 legacy caller 直接走 ProviderRegistry 或 provider wrapper。硬切 Gateway 會因 contract/grant 尚未覆蓋所有路徑而造成修復鏈中斷,因此本輪先做「不改語義的稽核包裝」。 + +**本次修補**: +- 新增 `mcp_audit_context.py`,統一產生 `_mcp_audit` 脈絡,保留 `session_id`、`incident_id`、`flywheel_node`、`agent_role`、`gateway_path`。 +- `PreDecisionInvestigator` MCP 感官蒐集注入 `flywheel_node=sense`。 +- `PostExecutionVerifier` 執行後驗證注入 `flywheel_node=verify`。 +- `CallbackDispatcher` Telegram 操作按鈕注入 `flywheel_node=operate` 與 `operator_user_id`。 +- `MCPBridge` legacy bridge 注入 `gateway_path=legacy_mcp_bridge`。 +- `HeartbeatReportService` 的 K8s / Velero probe 改用 `AuditedMCPToolProvider`,讓系統報告也留下 govern 稽核軌跡。 + +**策略**: +- 這不是最終 enforcement。它先讓所有 legacy production path 可觀測、可追蹤,下一步才依 AwoooP contract/grant 分 lane 切到 `McpGateway.call()`。 + +## 2026-05-06 | 111 Ollama 第三順位目前是網路不可達,不是 Router 跳過 + +**背景**:統帥指出 111 主機應該一直活著,但告警仍可能顯示 Gemini。重新用 live network path 驗證後,GCP-A / GCP-B 可從 K8s Pod 與本機存取,但 `192.168.0.111` 在多來源均不可達。 + +**現場證據**: +- `awoooi-prod` Pod 連 `34.143.170.20:11434` 與 `34.21.145.224:11434` `/api/tags` 成功。 +- `awoooi-prod` Pod 連 `192.168.0.111:11434` timeout。 +- 本機連 `192.168.0.111:11434` timeout,SSH `192.168.0.111:22` timeout。 +- 110 / 120 / 121 / 188 對 `192.168.0.111` ping 100% loss,`nc 22` 回 `No route to host`,`curl 11434` 回 `No route to host` 或 timeout。 +- production log 顯示 provider order 仍是 `ollama_gcp_a → ollama_gcp_b → ollama_local → gemini`,且 GCP-A/GCP-B 都判定 healthy;`ollama_local` 被判為 offline 是網路事實,不是順序設定錯誤。 + +**判讀**: +- 188 Ollama 已退場;`192.168.0.188:11434` 從 LAN / K8s 連線被拒絕,這是預期狀態。 +- 111 需要另行恢復 LAN reachability 或改走 mesh/proxy;在 111 不可達期間,第三順位無法提供備援,Gemini 仍只保留為 Ollama 全部失敗後的付費備援。 + ## 2026-05-06 | MCPToolResult 相容舊 provider 的 data alias **背景**:AwoooP 整合風險 P0-D 指出部分 MCP provider 成功路徑仍使用 `MCPToolResult(data=...)`,但標準 dataclass 欄位是 `output` 且 `execution_id` 必填;Sentry / ArgoCD 等成功路徑可能因此在有效 API 回應後反而 crash。