Files
awoooi/apps/api/tests/test_agent_loop_foundation.py
Your Name b0da6da1e9
Some checks failed
CD Pipeline / tests (push) Successful in 2m50s
Code Review / ai-code-review (push) Successful in 33s
CD Pipeline / build-and-deploy (push) Failing after 25m48s
CD Pipeline / post-deploy-checks (push) Has been cancelled
feat(aiops): structure agent loop shadow output
2026-05-01 15:09:57 +08:00

230 lines
7.7 KiB
Python

import pytest
from src.plugins.mcp.interfaces import MCPTool, MCPToolProvider, MCPToolResult
from src.plugins.mcp.registry import AuditedMCPToolProvider
from src.services.ai_providers.agent_loop import AgentToolExecutor
from src.services.ai_providers.interfaces import AIResult
from src.services.ai_providers.permissions import (
filter_tools_for_agent,
is_tool_allowed,
)
from src.services.ai_providers.tool_schema import (
anthropic_tool_schema,
openai_tool_schema,
to_provider_tool_name,
tool_by_provider_name,
)
class FakeProvider(MCPToolProvider):
def __init__(self, name="kubernetes"):
self.calls = []
self._name = name
@property
def name(self):
return self._name
async def list_tools(self):
return []
async def execute(self, tool_name, parameters):
self.calls.append((tool_name, parameters))
return MCPToolResult(success=True, execution_id="exec-1", output={"ok": True})
def _tool(server: str, name: str) -> MCPTool:
return MCPTool(
name=name,
description=f"{server}.{name}",
input_schema={"type": "object", "properties": {}},
server_name=server,
)
def test_agent_tool_permissions_are_role_scoped():
k8s_get = _tool("kubernetes", "kubectl_get")
k8s_restart = _tool("kubernetes", "kubectl_restart")
prom_query = _tool("prometheus", "prometheus_query")
db_tool = _tool("database", "write_km_entry")
assert is_tool_allowed(k8s_restart, "openclaw") is True
assert is_tool_allowed(k8s_get, "nemotron") is True
assert is_tool_allowed(k8s_restart, "nemotron") is False
assert is_tool_allowed(prom_query, "hermes") is True
assert is_tool_allowed(db_tool, "hermes") is True
assert is_tool_allowed(k8s_restart, "elephant_alpha") is False
assert is_tool_allowed(_tool("database", "execute_sql"), "elephant_alpha") is False
assert is_tool_allowed(_tool("ssh_host", "run_command"), "elephant_alpha") is False
filtered = filter_tools_for_agent([k8s_get, k8s_restart, prom_query], "nemotron")
assert [tool.name for tool in filtered] == ["kubectl_get", "prometheus_query"]
def test_tool_schema_round_trips_provider_safe_names():
tool = _tool("kubernetes", "kubectl_get")
safe_name = to_provider_tool_name(tool)
assert safe_name == "kubernetes__kubectl_get"
assert anthropic_tool_schema(tool)["name"] == safe_name
assert openai_tool_schema(tool)["function"]["name"] == safe_name
assert tool_by_provider_name([tool], safe_name) is tool
def test_openclaw_agent_loop_shadow_parser_normalizes_json():
from src.services.openclaw import OpenClawService
raw = """```json
{
"root_cause_check": "current RCA still needs pod evidence",
"evidence_used": ["event spike", "error rate"],
"confidence_delta": -0.42,
"missing_evidence": ["deployment rollout history"],
"human_or_ai_next_step": "query rollout history with read-only tools"
}
```"""
parsed = OpenClawService._parse_agent_loop_shadow_response(raw)
assert parsed["parse_status"] == "ok"
assert parsed["root_cause_check"] == "current RCA still needs pod evidence"
assert parsed["evidence_used"] == ["event spike", "error rate"]
assert parsed["confidence_delta"] == -0.15
assert parsed["missing_evidence"] == ["deployment rollout history"]
def test_openclaw_agent_loop_shadow_parser_never_boosts_confidence():
from src.services.openclaw import OpenClawService
parsed = OpenClawService._parse_agent_loop_shadow_response(
'{"root_cause_check":"looks good","confidence_delta":0.2}'
)
assert parsed["confidence_delta"] == 0.0
@pytest.mark.asyncio
async def test_audited_provider_strips_internal_audit_context(monkeypatch):
audit_calls = []
async def fake_record_mcp_call(**kwargs):
audit_calls.append(kwargs)
monkeypatch.setattr(
"src.services.mcp_audit_service.record_mcp_call",
fake_record_mcp_call,
)
provider = FakeProvider()
audited = AuditedMCPToolProvider(provider)
result = await audited.execute(
"kubectl_get",
{
"resource": "pods",
"_mcp_audit": {
"agent_role": "openclaw",
"session_id": "session-1",
"incident_id": "INC-1",
"flywheel_node": "sense",
},
},
)
assert result.success is True
assert provider.calls == [("kubectl_get", {"resource": "pods"})]
assert audit_calls[0]["agent_role"] == "openclaw"
assert audit_calls[0]["session_id"] == "session-1"
assert audit_calls[0]["incident_id"] == "INC-1"
@pytest.mark.asyncio
async def test_agent_tool_executor_blocks_disallowed_tool():
restart_tool = _tool("kubernetes", "kubectl_restart")
provider = FakeProvider()
executor = AgentToolExecutor(
available_tools=[restart_tool],
providers={"kubernetes": provider},
agent_role="nemotron",
incident_id="INC-1",
)
result = await executor.execute("kubernetes__kubectl_restart", {"deployment": "api"})
assert result.success is False
assert "not allowed" in (result.error or "")
assert provider.calls == []
@pytest.mark.asyncio
async def test_openclaw_agent_loop_shadow_uses_read_only_tools(monkeypatch):
from src.core.config import settings
from src.services.openclaw import OpenClawService
monkeypatch.setattr(settings, "ENABLE_OPENCLAW_AGENT_LOOP_SHADOW", True)
monkeypatch.setattr(settings, "OPENCLAW_AGENT_LOOP_MAX_ITERATIONS", 2)
class FakeAIProvider:
def __init__(self):
self.seen_tools = []
async def analyze_with_tools(
self,
prompt,
available_tools,
tool_executor,
max_iterations=5,
agent_role="openclaw",
context=None,
):
self.seen_tools = available_tools
return AIResult(
raw_response='{"root_cause_check":"ok"}',
success=True,
provider="ollama_agent_loop",
latency_ms=12.3,
)
class FakeAIRegistry:
def __init__(self, provider):
self.provider = provider
def get(self, name):
return self.provider if name == "ollama" else None
class FakeMCPRegistry:
def all(self):
return [FakeProvider("database"), FakeProvider("ssh_host")]
async def fake_list_tools(self):
if self.name == "database":
return [
_tool("database", "list_incidents"),
_tool("database", "execute_sql"),
]
return [_tool("ssh_host", "run_command")]
fake_ai_provider = FakeAIProvider()
monkeypatch.setattr(FakeProvider, "list_tools", fake_list_tools)
monkeypatch.setattr("src.services.ai_router.get_ai_registry", lambda: FakeAIRegistry(fake_ai_provider))
monkeypatch.setattr("src.plugins.mcp.registry.get_provider_registry", lambda: FakeMCPRegistry())
service = object.__new__(OpenClawService)
proposal = {"risk_level": "medium", "action": "investigate"}
await service._maybe_run_openclaw_agent_loop_shadow(
proposal=proposal,
incident_id="INC-1",
severity="P2",
signals=[{"alertname": "ApiErrorRateHigh"}],
affected_services=["awoooi-api"],
expert_context={"source": "test"},
)
assert proposal["agent_loop_shadow"]["success"] is True
assert proposal["agent_loop_shadow"]["decision_impact"] == "none"
assert proposal["agent_loop_shadow"]["structured"]["parse_status"] == "ok"
assert proposal["agent_loop_shadow"]["structured"]["root_cause_check"] == "ok"
assert [tool.name for tool in fake_ai_provider.seen_tools] == ["list_incidents"]