72 lines
2.3 KiB
Python
72 lines
2.3 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
import pytest
|
|
|
|
from src.models.approval import ApprovalRequest, RiskLevel
|
|
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult
|
|
from src.services.approval_execution import ApprovalExecutionService
|
|
|
|
|
|
class FakeSSHProvider(MCPToolProvider):
|
|
name = "ssh"
|
|
enabled = True
|
|
seen_parameters: dict[str, Any] | None = None
|
|
|
|
async def list_tools(self) -> list[MCPTool]:
|
|
return []
|
|
|
|
async def execute(self, tool_name: str, parameters: dict) -> MCPToolResult:
|
|
self.seen_parameters = dict(parameters)
|
|
return MCPToolResult(
|
|
success=True,
|
|
output={"tool": tool_name, "ok": True},
|
|
execution_id="fake-ssh-exec",
|
|
)
|
|
|
|
async def health_check(self) -> bool:
|
|
return True
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ssh_approval_execution_uses_audited_provider(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
fake_provider = FakeSSHProvider()
|
|
audit_calls: list[dict[str, Any]] = []
|
|
|
|
class ProviderFactory:
|
|
def __call__(self) -> FakeSSHProvider:
|
|
return fake_provider
|
|
|
|
async def fake_record_mcp_call(**kwargs: Any) -> None:
|
|
audit_calls.append(kwargs)
|
|
|
|
monkeypatch.setattr("src.plugins.mcp.providers.ssh_provider.SSHProvider", ProviderFactory())
|
|
monkeypatch.setattr("src.services.mcp_audit_service.record_mcp_call", fake_record_mcp_call)
|
|
|
|
approval = ApprovalRequest(
|
|
action="docker restart sentry-worker",
|
|
description="測試 SSH approved execution audit",
|
|
risk_level=RiskLevel.LOW,
|
|
requested_by="test",
|
|
required_signatures=0,
|
|
incident_id="INC-TEST-AUDIT",
|
|
)
|
|
|
|
result = await ApprovalExecutionService()._execute_ssh_host_action(
|
|
approval=approval,
|
|
host="192.168.0.110",
|
|
)
|
|
|
|
assert result.success is True
|
|
assert fake_provider.seen_parameters is not None
|
|
assert "_mcp_audit" not in fake_provider.seen_parameters
|
|
assert audit_calls
|
|
audit = audit_calls[0]
|
|
assert audit["mcp_server"] == "ssh"
|
|
assert audit["tool_name"] == "ssh_docker_restart"
|
|
assert audit["incident_id"] == "INC-TEST-AUDIT"
|
|
assert audit["agent_role"] == "approval_executor"
|
|
assert audit["flywheel_node"] == "execute"
|
|
assert audit["input_params"]["_mcp_audit"]["gateway_path"] == "legacy_direct_provider"
|