Some checks failed
Code Review / ai-code-review (push) Successful in 16s
run-migration / migrate (push) Failing after 9s
CD Pipeline / tests (push) Successful in 1m8s
CD Pipeline / build-and-deploy (push) Successful in 3m59s
CD Pipeline / post-deploy-checks (push) Successful in 1m46s
54 lines
1.4 KiB
Python
54 lines
1.4 KiB
Python
from __future__ import annotations
|
|
|
|
import uuid
|
|
|
|
import pytest
|
|
|
|
from src.plugins.mcp.gateway import GateCheckResult, GatewayContext, McpGateway
|
|
|
|
|
|
class FakeDb:
|
|
def __init__(self) -> None:
|
|
self.added: list[object] = []
|
|
self.flush_count = 0
|
|
|
|
def add(self, item: object) -> None:
|
|
self.added.append(item)
|
|
|
|
async def flush(self) -> None:
|
|
self.flush_count += 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_audit_persists_blocked_gate_without_tool_row() -> None:
|
|
db = FakeDb()
|
|
run_id = uuid.uuid4()
|
|
|
|
await McpGateway(db)._write_audit(
|
|
ctx=GatewayContext(
|
|
project_id="awoooi",
|
|
agent_id="openclaw-sre",
|
|
tool_name="missing_tool",
|
|
run_id=run_id,
|
|
trace_id="trace-audit-gap",
|
|
),
|
|
tool_row=None,
|
|
parameters={"namespace": "awoooi-prod"},
|
|
result=None,
|
|
gate_result=GateCheckResult(),
|
|
result_status="blocked",
|
|
block_gate=1,
|
|
block_reason="E-MCP-GATE-001: project blocked",
|
|
latency_ms=12,
|
|
)
|
|
|
|
assert db.flush_count == 1
|
|
assert len(db.added) == 1
|
|
audit = db.added[0]
|
|
assert audit.project_id == "awoooi"
|
|
assert audit.run_id == run_id
|
|
assert audit.tool_id is None
|
|
assert audit.tool_name == "missing_tool"
|
|
assert audit.result_status == "blocked"
|
|
assert audit.block_gate == 1
|