Files
awoooi/apps/api/tests/test_approval_execution_mcp_audit.py
Your Name 34bfe56f53
All checks were successful
Code Review / ai-code-review (push) Successful in 20s
CD Pipeline / tests (push) Successful in 3m58s
CD Pipeline / build-and-deploy (push) Successful in 3m47s
CD Pipeline / post-deploy-checks (push) Successful in 1m18s
fix(awooop): persist approved ssh gateway blocks
2026-05-13 11:15:54 +08:00

204 lines
6.8 KiB
Python

from __future__ import annotations
from contextlib import asynccontextmanager
from typing import Any
from uuid import UUID
import pytest
from src.models.approval import ApprovalRequest, RiskLevel
from src.plugins.mcp.gateway import GateApprovalError
from src.plugins.mcp.interfaces import MCPToolResult
from src.services.approval_execution import ApprovalExecutionService
class FakeRedis:
def __init__(self) -> None:
self.set_calls: list[tuple[str, str, int | None]] = []
async def set(self, key: str, value: str, ex: int | None = None) -> None:
self.set_calls.append((key, value, ex))
@pytest.mark.asyncio
async def test_ssh_approval_execution_projects_approval_into_gateway(
monkeypatch: pytest.MonkeyPatch,
) -> None:
fake_redis = FakeRedis()
gateway_calls: list[dict[str, Any]] = []
db_context_projects: list[str | None] = []
fake_db = object()
@asynccontextmanager
async def fake_db_context(project_id: str | None = None):
db_context_projects.append(project_id)
yield fake_db
class FakeGateway:
def __init__(self, db: object) -> None:
self.db = db
async def call(self, ctx: Any, parameters: dict[str, Any]) -> MCPToolResult:
gateway_calls.append({"db": self.db, "ctx": ctx, "parameters": parameters})
return MCPToolResult(
success=True,
output={"tool": ctx.tool_name, "ok": True},
execution_id="fake-gateway-ssh-exec",
)
monkeypatch.setattr("src.services.approval_execution.get_redis", lambda: fake_redis)
monkeypatch.setattr("src.services.approval_execution.get_db_context", fake_db_context)
monkeypatch.setattr("src.services.approval_execution.McpGateway", FakeGateway)
approval = ApprovalRequest(
action="docker restart sentry-worker",
description="測試 SSH approved execution gateway audit",
risk_level=RiskLevel.LOW,
requested_by="test",
required_signatures=0,
incident_id="INC-TEST-GATEWAY",
)
result = await ApprovalExecutionService()._execute_ssh_host_action(
approval=approval,
host="192.168.0.110",
)
assert result.success is True
assert db_context_projects == ["awoooi"]
assert len(gateway_calls) == 1
call = gateway_calls[0]
ctx = call["ctx"]
assert ctx.project_id == "awoooi"
assert ctx.agent_id == "approval_executor"
assert ctx.tool_name == "ssh_docker_restart"
assert ctx.required_scope == "write"
assert ctx.is_shadow is False
assert ctx.environment == {"env": "prod"}
assert ctx.run_id == approval.id
assert isinstance(ctx.run_id, UUID)
assert ctx.trace_id == "INC-TEST-GATEWAY"
parameters = call["parameters"]
assert parameters["host"] == "192.168.0.110"
assert parameters["container_name"] == "sentry-worker"
assert parameters["trust_score"] == 0.85
assert parameters["_mcp_audit"]["agent_role"] == "approval_executor"
assert parameters["_mcp_audit"]["flywheel_node"] == "execute"
assert parameters["_mcp_audit"]["incident_id"] == "INC-TEST-GATEWAY"
assert fake_redis.set_calls == [
(
f"mcp_approval:awoooi:approval_executor:ssh_docker_restart:{approval.id}",
"approved",
600,
)
]
@pytest.mark.asyncio
async def test_ssh_diagnose_approval_uses_gateway_without_gate5_projection(
monkeypatch: pytest.MonkeyPatch,
) -> None:
fake_redis = FakeRedis()
gateway_calls: list[dict[str, Any]] = []
@asynccontextmanager
async def fake_db_context(project_id: str | None = None):
yield {"project_id": project_id}
class FakeGateway:
def __init__(self, db: object) -> None:
self.db = db
async def call(self, ctx: Any, parameters: dict[str, Any]) -> MCPToolResult:
gateway_calls.append({"db": self.db, "ctx": ctx, "parameters": parameters})
return MCPToolResult(
success=True,
output={"tool": ctx.tool_name, "ok": True},
execution_id="fake-gateway-read-exec",
)
monkeypatch.setattr("src.services.approval_execution.get_redis", lambda: fake_redis)
monkeypatch.setattr("src.services.approval_execution.get_db_context", fake_db_context)
monkeypatch.setattr("src.services.approval_execution.McpGateway", FakeGateway)
approval = ApprovalRequest(
action="df -h",
description="測試 SSH diagnose gateway audit",
risk_level=RiskLevel.LOW,
requested_by="test",
required_signatures=0,
incident_id="INC-TEST-READ",
)
result = await ApprovalExecutionService()._execute_ssh_host_action(
approval=approval,
host="192.168.0.110",
)
assert result.success is True
assert fake_redis.set_calls == []
assert len(gateway_calls) == 1
ctx = gateway_calls[0]["ctx"]
assert ctx.agent_id == "approval_executor"
assert ctx.tool_name == "ssh_diagnose"
assert ctx.required_scope == "read"
assert ctx.is_shadow is False
@pytest.mark.asyncio
async def test_gateway_block_returns_failed_execution_result_without_rollback(
monkeypatch: pytest.MonkeyPatch,
) -> None:
fake_redis = FakeRedis()
db_context_exits: list[str] = []
@asynccontextmanager
async def fake_db_context(project_id: str | None = None):
try:
yield {"project_id": project_id}
except Exception:
db_context_exits.append("raised")
raise
else:
db_context_exits.append("normal")
class FakeGateway:
def __init__(self, db: object) -> None:
self.db = db
async def call(self, ctx: Any, parameters: dict[str, Any]) -> MCPToolResult:
raise GateApprovalError("approval missing in smoke path")
monkeypatch.setattr("src.services.approval_execution.get_redis", lambda: fake_redis)
monkeypatch.setattr("src.services.approval_execution.get_db_context", fake_db_context)
monkeypatch.setattr("src.services.approval_execution.McpGateway", FakeGateway)
approval = ApprovalRequest(
action="docker restart sentry-worker",
description="測試 Gateway blocked audit 不被 rollback",
risk_level=RiskLevel.LOW,
requested_by="test",
required_signatures=0,
incident_id="INC-TEST-BLOCKED",
)
result = await ApprovalExecutionService()._execute_ssh_host_action(
approval=approval,
host="192.168.0.110",
)
assert result.success is False
assert result.error is not None
assert "E-MCP-GATE-005" in result.error
assert db_context_exits == ["normal"]
assert fake_redis.set_calls == [
(
f"mcp_approval:awoooi:approval_executor:ssh_docker_restart:{approval.id}",
"approved",
600,
)
]