feat(governance): add remediation dry run entrypoint
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m5s
CD Pipeline / build-and-deploy (push) Successful in 3m43s
CD Pipeline / post-deploy-checks (push) Successful in 1m33s

This commit is contained in:
Your Name
2026-05-14 22:20:34 +08:00
parent 102f92dfc3
commit 04fdaee83a
8 changed files with 820 additions and 3 deletions

View File

@@ -0,0 +1,227 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.api.v1.ai_slo import router
from src.models.incident import Incident, IncidentStatus, Severity, Signal
from src.models.playbook import Playbook
from src.services.adr100_remediation_service import (
Adr100RemediationService,
RemediationNotFoundError,
)
from src.services.auto_repair_service import AutoRepairService
class _FakeSloService:
def __init__(self, items: list[dict[str, Any]]) -> None:
self.items = items
async def fetch_report(self) -> dict[str, Any]:
return {
"verification_coverage": {
"remediation_queue": {
"items": self.items,
},
},
}
class _FakeIncidentRepository:
def __init__(self, incident: Incident | None) -> None:
self.incident = incident
async def get_by_id(self, incident_id: str) -> Incident | None:
if self.incident and self.incident.incident_id == incident_id:
return self.incident
return None
class _FakeVerifier:
def __init__(self, state: dict[str, Any]) -> None:
self.state = state
self.calls = 0
async def _collect_post_state(self, incident: Incident) -> dict[str, Any]:
self.calls += 1
return self.state
class _NoopPlaybookService:
async def get_recommendations(self, *_args, **_kwargs): # noqa: ANN002, ANN003
return []
async def get_by_id(self, _playbook_id: str) -> Playbook | None:
return None
async def record_execution(self, _playbook_id: str, _success: bool) -> bool:
return True
async def _no_cooldown(*_args, **_kwargs) -> tuple[bool, str]: # noqa: ANN002, ANN003
return True, "test"
def _incident() -> Incident:
now = datetime.now(timezone.utc)
return Incident(
incident_id="INC-20260514-TEST01",
status=IncidentStatus.INVESTIGATING,
severity=Severity.P2,
affected_services=["momo-scheduler"],
alert_category="infrastructure",
signals=[
Signal(
alert_name="DockerContainerMemoryLimitPressure",
severity=Severity.P2,
source="prometheus",
fired_at=now,
labels={
"alertname": "DockerContainerMemoryLimitPressure",
"host": "110",
"container_name": "momo-scheduler",
},
),
],
)
def _queue_item(**overrides: Any) -> dict[str, Any]:
item = {
"work_item_id": "verification:INC-20260514-TEST01:are-1",
"incident_id": "INC-20260514-TEST01",
"auto_repair_id": "are-1",
"alertname": "DockerContainerMemoryLimitPressure",
"playbook_id": "PB-1",
"verification_result": "degraded",
"remediation_status": "ready_for_replay",
"remediation_action": "replay_with_supported_executor",
"remediation_owner": "auto_repair_executor",
}
item.update(overrides)
return item
def _service(
*,
item: dict[str, Any],
incident: Incident | None = None,
state: dict[str, Any] | None = None,
) -> Adr100RemediationService:
return Adr100RemediationService(
slo_service=_FakeSloService([item]),
incident_repository=_FakeIncidentRepository(incident or _incident()),
auto_repair_service=AutoRepairService(
playbook_service=_NoopPlaybookService(),
cooldown_checker=_no_cooldown,
),
verifier=_FakeVerifier(state or {"k8s_get_pod_status": {"phase": "Running"}}),
)
@pytest.mark.asyncio
async def test_preview_marks_replay_work_item_read_only():
svc = _service(item=_queue_item())
result = await svc.preview("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is True
assert result["mode"] == "replay"
assert result["safety_level"] == "read_only"
assert result["writes_incident_state"] is False
assert result["plan"]["agent_id"] == "auto_repair_executor"
assert result["plan"]["writes"] == []
@pytest.mark.asyncio
async def test_dry_run_reverify_collects_state_without_writes():
item = _queue_item(
remediation_status="ready_for_reverify",
remediation_action="reverify_with_promql_template",
remediation_owner="post_execution_verifier",
)
svc = _service(item=item, state={"k8s_get_pod_status": {"phase": "Running"}})
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is True
assert result["executed"] is True
assert result["mode"] == "reverify"
assert result["verification_result_preview"] == "success"
assert result["writes_auto_repair_result"] is False
assert result["post_state_summary"]["tool_count"] == 1
assert result["mcp_route"]["agent_id"] == "post_execution_verifier"
assert result["mcp_route"]["required_scope"] == "read"
@pytest.mark.asyncio
async def test_dry_run_replay_validates_supported_executor_route():
svc = _service(item=_queue_item())
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is True
assert result["mode"] == "replay"
assert result["mcp_route"]["agent_id"] == "auto_repair_executor"
assert result["mcp_route"]["tool_name"] == "ssh_diagnose"
assert result["mcp_route"]["required_scope"] == "read"
assert result["mcp_route"]["params"]["host"] == "192.168.0.110"
assert result["mcp_route"]["params"]["container_name"] == "momo-scheduler"
assert result["diagnostic_command_preview"].startswith("ssh 110")
@pytest.mark.asyncio
async def test_dry_run_blocks_when_incident_missing():
svc = _service(item=_queue_item(), incident=None)
svc._incident_repository = _FakeIncidentRepository(None)
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is False
assert result["executed"] is False
assert result["verification_result_preview"] == "blocked"
assert any(check["name"] == "incident_loaded" and not check["passed"] for check in result["checks"])
@pytest.mark.asyncio
async def test_missing_work_item_raises_not_found():
svc = _service(item=_queue_item())
with pytest.raises(RemediationNotFoundError):
await svc.preview("verification:missing")
def test_ai_slo_remediation_endpoints(monkeypatch):
app = FastAPI()
app.include_router(router, prefix="/api/v1")
class _FakeService:
async def preview(self, work_item_id: str, mode: str = "auto") -> dict[str, Any]:
return {"work_item_id": work_item_id, "mode": mode, "allowed": True}
async def dry_run(self, work_item_id: str, mode: str = "auto") -> dict[str, Any]:
return {"work_item_id": work_item_id, "mode": mode, "executed": True}
monkeypatch.setattr(
"src.api.v1.ai_slo.get_adr100_remediation_service",
lambda: _FakeService(),
)
client = TestClient(app)
preview = client.get(
"/api/v1/ai/slo/remediation/preview",
params={"work_item_id": "verification:INC:are-1", "mode": "reverify"},
)
dry_run = client.post(
"/api/v1/ai/slo/remediation/dry-run",
json={"work_item_id": "verification:INC:are-1", "mode": "replay"},
)
assert preview.status_code == 200
assert preview.json()["mode"] == "reverify"
assert dry_run.status_code == 200
assert dry_run.json()["executed"] is True