fix(mcp): audit approved ssh execution path
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m5s
CD Pipeline / build-and-deploy (push) Successful in 3m45s
CD Pipeline / post-deploy-checks (push) Successful in 1m20s

This commit is contained in:
Your Name
2026-05-06 16:34:39 +08:00
parent fcf93aac11
commit a7a9ba996d
3 changed files with 132 additions and 2 deletions

View File

@@ -685,10 +685,36 @@ class ApprovalExecutionService:
)
# 呼叫 SSH MCP Provider
# 2026-05-06 Codex: approved execution 是高風險「實際執行」路徑。
# 在 AwoooP MCP Gateway 完全接管前,至少必須經過 AuditedMCPToolProvider
# 寫入 durable mcp_audit_log並標記這仍是 legacy direct provider path。
from src.plugins.mcp.providers.ssh_provider import SSHProvider
provider = SSHProvider()
from src.plugins.mcp.registry import AuditedMCPToolProvider
provider = AuditedMCPToolProvider(SSHProvider())
params_with_audit = {
**params,
"_mcp_audit": {
"session_id": f"approval:{approval.id}",
"incident_id": approval.incident_id,
"agent_role": "approval_executor",
"flywheel_node": "execute",
"gateway_path": "legacy_direct_provider",
},
}
try:
mcp_result = await provider.execute(tool_name=tool_name, parameters=params)
logger.warning(
"mcp_gateway_legacy_direct_provider_path",
approval_id=str(approval.id),
incident_id=approval.incident_id,
tool=tool_name,
host=host,
reason="awooop_gateway_not_enforced_for_legacy_approval_execution",
)
mcp_result = await provider.execute(
tool_name=tool_name,
parameters=params_with_audit,
)
duration_ms = int((time.time() - start) * 1000)
success = bool(mcp_result.success)
return ExecutionResult(

View File

@@ -0,0 +1,71 @@
from __future__ import annotations
from typing import Any
import pytest
from src.models.approval import ApprovalRequest, RiskLevel
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult
from src.services.approval_execution import ApprovalExecutionService
class FakeSSHProvider(MCPToolProvider):
name = "ssh"
enabled = True
seen_parameters: dict[str, Any] | None = None
async def list_tools(self) -> list[MCPTool]:
return []
async def execute(self, tool_name: str, parameters: dict) -> MCPToolResult:
self.seen_parameters = dict(parameters)
return MCPToolResult(
success=True,
output={"tool": tool_name, "ok": True},
execution_id="fake-ssh-exec",
)
async def health_check(self) -> bool:
return True
@pytest.mark.asyncio
async def test_ssh_approval_execution_uses_audited_provider(monkeypatch: pytest.MonkeyPatch) -> None:
fake_provider = FakeSSHProvider()
audit_calls: list[dict[str, Any]] = []
class ProviderFactory:
def __call__(self) -> FakeSSHProvider:
return fake_provider
async def fake_record_mcp_call(**kwargs: Any) -> None:
audit_calls.append(kwargs)
monkeypatch.setattr("src.plugins.mcp.providers.ssh_provider.SSHProvider", ProviderFactory())
monkeypatch.setattr("src.services.mcp_audit_service.record_mcp_call", fake_record_mcp_call)
approval = ApprovalRequest(
action="docker restart sentry-worker",
description="測試 SSH approved execution audit",
risk_level=RiskLevel.LOW,
requested_by="test",
required_signatures=0,
incident_id="INC-TEST-AUDIT",
)
result = await ApprovalExecutionService()._execute_ssh_host_action(
approval=approval,
host="192.168.0.110",
)
assert result.success is True
assert fake_provider.seen_parameters is not None
assert "_mcp_audit" not in fake_provider.seen_parameters
assert audit_calls
audit = audit_calls[0]
assert audit["mcp_server"] == "ssh"
assert audit["tool_name"] == "ssh_docker_restart"
assert audit["incident_id"] == "INC-TEST-AUDIT"
assert audit["agent_role"] == "approval_executor"
assert audit["flywheel_node"] == "execute"
assert audit["input_params"]["_mcp_audit"]["gateway_path"] == "legacy_direct_provider"

View File

@@ -3792,3 +3792,36 @@ ALTER TABLE awooop_mcp_gateway_audit
- 套用後確認 `tool_id_not_null=false`
- 同輪已修正 `.gitea/workflows/run-migration.yml`:平常仍優先使用 `MIGRATION_DATABASE_URL` 限權帳號;只有 PostgreSQL 明確回報 `must be owner of table` 時,才以 `DATABASE_URL` table owner 連線重試,且不輸出任何連線串。
- 後續仍需獨立檢討 DB ownership 模型:`awoooi_migrator` 目前可新增部分 schema但不能 ALTER 由 `awoooi` 擁有的既有表owner fallback 是營運修補,不是長期最終治理模型。
---
## 2026-05-06台北— Approved SSH execution 納入 MCP durable audit
**觸發**AwoooP / AI 自動化飛輪整合盤點指出 production 多條 MCP caller 仍直接呼叫 providerMCP Gateway 尚未成為真正 choke point。最危險的是已批准後的 SSH 實際執行路徑,因為它會改動主機或容器狀態。
### 已修正
| 範圍 | 結果 |
|------|------|
| `approval_execution.py` | SSH_HOST approved execution 改用 `AuditedMCPToolProvider(SSHProvider())` 包裝,不再裸呼叫 `SSHProvider.execute()` |
| 稽核上下文 | 注入 `_mcp_audit``session_id=approval:{approval_id}``incident_id``agent_role=approval_executor``flywheel_node=execute` |
| 遷移標記 | 追加 `gateway_path=legacy_direct_provider`,明確標示這仍是舊 direct provider path供後續 Gateway strangler 收斂 |
| 回歸測試 | 新增 `test_approval_execution_mcp_audit.py`,驗證 provider 實際收到的參數已移除 `_mcp_audit`,但 audit subsystem 可取得完整上下文 |
### 驗證
```text
pytest apps/api/tests/test_approval_execution_mcp_audit.py apps/api/tests/test_approval_execution_retry.py apps/api/tests/test_mcp_credential_isolation.py apps/api/tests/test_mcp_tool_result_compat.py -q
# 45 passed
py_compile apps/api/src/services/approval_execution.py apps/api/tests/test_approval_execution_mcp_audit.py
# 通過
ruff check apps/api/tests/test_approval_execution_mcp_audit.py
# All checks passed
```
### 注意
- 本次是「先補 durable audit + legacy 標記」,不是直接硬切 MCP Gateway enforcement原因是 AwoooP project / agent / grant contract 尚未覆蓋所有 legacy 修復路徑,硬切會中斷現有 approved execution。
- 下一步應將 `decision_manager.py``pre_decision_investigator.py``post_execution_verifier.py``callback_dispatcher.py` 的 direct MCP caller 逐步套同一種可追蹤 wrapper最後再切到 `McpGateway.call()` enforcement。