Files
awoooi/apps/api/tests/test_approval_execution_mcp_audit.py
Your Name a0a2a5b1f0
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
run-migration / migrate (push) Successful in 9s
CD Pipeline / tests (push) Successful in 1m22s
CD Pipeline / build-and-deploy (push) Successful in 6m36s
CD Pipeline / post-deploy-checks (push) Successful in 1m42s
feat(awooop): gate approved ssh execution
2026-05-13 11:02:24 +08:00

148 lines
4.9 KiB
Python

from __future__ import annotations
from contextlib import asynccontextmanager
from typing import Any
from uuid import UUID
import pytest
from src.models.approval import ApprovalRequest, RiskLevel
from src.plugins.mcp.interfaces import MCPToolResult
from src.services.approval_execution import ApprovalExecutionService
class FakeRedis:
def __init__(self) -> None:
self.set_calls: list[tuple[str, str, int | None]] = []
async def set(self, key: str, value: str, ex: int | None = None) -> None:
self.set_calls.append((key, value, ex))
@pytest.mark.asyncio
async def test_ssh_approval_execution_projects_approval_into_gateway(
monkeypatch: pytest.MonkeyPatch,
) -> None:
fake_redis = FakeRedis()
gateway_calls: list[dict[str, Any]] = []
db_context_projects: list[str | None] = []
fake_db = object()
@asynccontextmanager
async def fake_db_context(project_id: str | None = None):
db_context_projects.append(project_id)
yield fake_db
class FakeGateway:
def __init__(self, db: object) -> None:
self.db = db
async def call(self, ctx: Any, parameters: dict[str, Any]) -> MCPToolResult:
gateway_calls.append({"db": self.db, "ctx": ctx, "parameters": parameters})
return MCPToolResult(
success=True,
output={"tool": ctx.tool_name, "ok": True},
execution_id="fake-gateway-ssh-exec",
)
monkeypatch.setattr("src.services.approval_execution.get_redis", lambda: fake_redis)
monkeypatch.setattr("src.services.approval_execution.get_db_context", fake_db_context)
monkeypatch.setattr("src.services.approval_execution.McpGateway", FakeGateway)
approval = ApprovalRequest(
action="docker restart sentry-worker",
description="測試 SSH approved execution gateway audit",
risk_level=RiskLevel.LOW,
requested_by="test",
required_signatures=0,
incident_id="INC-TEST-GATEWAY",
)
result = await ApprovalExecutionService()._execute_ssh_host_action(
approval=approval,
host="192.168.0.110",
)
assert result.success is True
assert db_context_projects == ["awoooi"]
assert len(gateway_calls) == 1
call = gateway_calls[0]
ctx = call["ctx"]
assert ctx.project_id == "awoooi"
assert ctx.agent_id == "approval_executor"
assert ctx.tool_name == "ssh_docker_restart"
assert ctx.required_scope == "write"
assert ctx.is_shadow is False
assert ctx.environment == {"env": "prod"}
assert ctx.run_id == approval.id
assert isinstance(ctx.run_id, UUID)
assert ctx.trace_id == "INC-TEST-GATEWAY"
parameters = call["parameters"]
assert parameters["host"] == "192.168.0.110"
assert parameters["container_name"] == "sentry-worker"
assert parameters["trust_score"] == 0.85
assert parameters["_mcp_audit"]["agent_role"] == "approval_executor"
assert parameters["_mcp_audit"]["flywheel_node"] == "execute"
assert parameters["_mcp_audit"]["incident_id"] == "INC-TEST-GATEWAY"
assert fake_redis.set_calls == [
(
f"mcp_approval:awoooi:approval_executor:ssh_docker_restart:{approval.id}",
"approved",
600,
)
]
@pytest.mark.asyncio
async def test_ssh_diagnose_approval_uses_gateway_without_gate5_projection(
monkeypatch: pytest.MonkeyPatch,
) -> None:
fake_redis = FakeRedis()
gateway_calls: list[dict[str, Any]] = []
@asynccontextmanager
async def fake_db_context(project_id: str | None = None):
yield {"project_id": project_id}
class FakeGateway:
def __init__(self, db: object) -> None:
self.db = db
async def call(self, ctx: Any, parameters: dict[str, Any]) -> MCPToolResult:
gateway_calls.append({"db": self.db, "ctx": ctx, "parameters": parameters})
return MCPToolResult(
success=True,
output={"tool": ctx.tool_name, "ok": True},
execution_id="fake-gateway-read-exec",
)
monkeypatch.setattr("src.services.approval_execution.get_redis", lambda: fake_redis)
monkeypatch.setattr("src.services.approval_execution.get_db_context", fake_db_context)
monkeypatch.setattr("src.services.approval_execution.McpGateway", FakeGateway)
approval = ApprovalRequest(
action="df -h",
description="測試 SSH diagnose gateway audit",
risk_level=RiskLevel.LOW,
requested_by="test",
required_signatures=0,
incident_id="INC-TEST-READ",
)
result = await ApprovalExecutionService()._execute_ssh_host_action(
approval=approval,
host="192.168.0.110",
)
assert result.success is True
assert fake_redis.set_calls == []
assert len(gateway_calls) == 1
ctx = gateway_calls[0]["ctx"]
assert ctx.agent_id == "approval_executor"
assert ctx.tool_name == "ssh_diagnose"
assert ctx.required_scope == "read"
assert ctx.is_shadow is False