Files
awoooi/apps/api/tests/test_adr100_remediation_service.py
Your Name 17ba879ac6
All checks were successful
CD Pipeline / tests (push) Successful in 1m35s
Code Review / ai-code-review (push) Successful in 14s
CD Pipeline / build-and-deploy (push) Successful in 4m2s
CD Pipeline / post-deploy-checks (push) Successful in 1m42s
feat(adr100): project gate5 approvals into awooop
2026-06-02 11:21:17 +08:00

763 lines
29 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from uuid import UUID
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from src.api.v1.ai_slo import router
from src.models.approval import ApprovalRequest, ApprovalStatus, DataImpact, RiskLevel
from src.models.incident import Incident, IncidentStatus, Severity, Signal
from src.models.playbook import (
ActionType,
Playbook,
RepairStep,
RiskLevel as PlaybookRiskLevel,
)
from src.services.adr100_remediation_service import (
Adr100RemediationService,
RemediationNotFoundError,
)
from src.services.auto_repair_service import AutoRepairService
class _FakeSloService:
def __init__(self, items: list[dict[str, Any]]) -> None:
self.items = items
async def fetch_report(self) -> dict[str, Any]:
return {
"verification_coverage": {
"remediation_queue": {
"items": self.items,
},
},
}
class _FakeIncidentRepository:
def __init__(self, incident: Incident | None) -> None:
self.incident = incident
async def get_by_id(self, incident_id: str) -> Incident | None:
if self.incident and self.incident.incident_id == incident_id:
return self.incident
return None
class _FakeVerifier:
def __init__(self, state: dict[str, Any]) -> None:
self.state = state
self.calls = 0
async def _collect_post_state(self, incident: Incident) -> dict[str, Any]:
self.calls += 1
return self.state
class _FakeAlertOperationLogRepository:
def __init__(self) -> None:
self.calls: list[dict[str, Any]] = []
self.records: list[Any] = []
async def append(self, event_type: str, **kwargs: Any):
self.calls.append({"event_type": event_type, **kwargs})
record = type(
"AlertOperationRecord",
(),
{
"id": f"aol-{len(self.records) + 1}",
"incident_id": kwargs.get("incident_id"),
"auto_repair_id": kwargs.get("auto_repair_id"),
"event_type": event_type,
"actor": kwargs.get("actor"),
"success": kwargs.get("success"),
"context": kwargs.get("context") or {},
"created_at": datetime(2026, 5, 14, 14, 45, len(self.records), tzinfo=timezone.utc),
},
)()
self.records.append(record)
return record
async def list_recent(
self,
limit: int = 50,
offset: int = 0,
event_type: str | None = None,
incident_id: str | None = None,
):
rows = [
record
for record in self.records
if (event_type is None or record.event_type == event_type)
and (incident_id is None or record.incident_id == incident_id)
]
rows = sorted(rows, key=lambda record: record.created_at, reverse=True)
return rows[offset:offset + limit], len(rows)
class _FakeTimelineService:
def __init__(self) -> None:
self.calls: list[dict[str, Any]] = []
async def add_event(self, **kwargs: Any) -> dict[str, Any]:
self.calls.append(kwargs)
return {"id": "timeline-1"}
class _FakeApprovalService:
def __init__(self) -> None:
self.requests: list[Any] = []
self.fingerprints: list[str] = []
self.existing: ApprovalRequest | None = None
async def create_approval(self, request: Any) -> ApprovalRequest:
return await self.create_approval_with_fingerprint(request, "")
async def create_approval_with_fingerprint(
self,
request: Any,
fingerprint: str,
) -> ApprovalRequest:
self.requests.append(request)
self.fingerprints.append(fingerprint)
return ApprovalRequest(
id=UUID("00000000-0000-0000-0000-00000000a100"),
action=request.action,
description=request.description,
status=ApprovalStatus.PENDING,
risk_level=request.risk_level,
blast_radius=request.blast_radius,
dry_run_checks=request.dry_run_checks,
required_signatures=1,
signatures=[],
requested_by=request.requested_by,
expires_at=request.expires_at,
metadata=request.metadata,
incident_id=request.incident_id,
matched_playbook_id=request.matched_playbook_id,
)
async def find_by_fingerprint(self, fingerprint: str) -> ApprovalRequest | None:
self.fingerprints.append(f"lookup:{fingerprint}")
return self.existing
class _FakeAwoooPApprovalProjector:
def __init__(self) -> None:
self.calls: list[dict[str, Any]] = []
async def project_runtime_replay_approval(
self,
*,
item: dict[str, Any],
incident: Incident,
request: Any,
payload: dict[str, Any],
) -> dict[str, Any]:
self.calls.append({
"item": item,
"incident": incident,
"request": request,
"payload": payload,
})
return {
"schema_version": "adr100_runtime_replay_awooop_projection_v1",
"projected": True,
"inserted": True,
"deduplicated": False,
"projection_mode": "approval_projection_only",
"run_id": "11111111-1111-5111-8111-111111111111",
"project_id": "awoooi",
"state": "waiting_approval",
"trigger_type": "adr100_runtime_replay_gate5",
"trigger_ref": "adr100_gate5:INC-20260514-TEST01:00000000-0000-0000-0000-00000000a100",
"decision_endpoint_enabled": False,
"execution_authorized": False,
"repair_attempted": False,
"repair_executed": False,
"step_journal": {
"step_seq": 1,
"result_status": "pending",
"was_blocked": True,
"block_reason": "approval_projection_only",
},
}
class _NoopPlaybookService:
async def get_recommendations(self, *_args, **_kwargs): # noqa: ANN002, ANN003
return []
async def get_by_id(self, _playbook_id: str) -> Playbook | None:
return None
async def record_execution(self, _playbook_id: str, _success: bool) -> bool:
return True
class _FakePlaybookService(_NoopPlaybookService):
def __init__(self, playbook: Playbook | None) -> None:
self.playbook = playbook
async def get_by_id(self, playbook_id: str) -> Playbook | None:
if self.playbook and self.playbook.playbook_id == playbook_id:
return self.playbook
return None
async def _no_cooldown(*_args, **_kwargs) -> tuple[bool, str]: # noqa: ANN002, ANN003
return True, "test"
def _incident() -> Incident:
now = datetime.now(timezone.utc)
return Incident(
incident_id="INC-20260514-TEST01",
status=IncidentStatus.INVESTIGATING,
severity=Severity.P2,
affected_services=["momo-scheduler"],
alert_category="infrastructure",
signals=[
Signal(
alert_name="DockerContainerMemoryLimitPressure",
severity=Severity.P2,
source="prometheus",
fired_at=now,
labels={
"alertname": "DockerContainerMemoryLimitPressure",
"host": "110",
"container_name": "momo-scheduler",
},
),
],
)
def _queue_item(**overrides: Any) -> dict[str, Any]:
item = {
"work_item_id": "verification:INC-20260514-TEST01:are-1",
"incident_id": "INC-20260514-TEST01",
"auto_repair_id": "are-1",
"alertname": "DockerContainerMemoryLimitPressure",
"playbook_id": "PB-1",
"verification_result": "degraded",
"remediation_status": "ready_for_replay",
"remediation_action": "replay_with_supported_executor",
"remediation_owner": "auto_repair_executor",
}
item.update(overrides)
return item
def _service(
*,
item: dict[str, Any],
incident: Incident | None = None,
state: dict[str, Any] | None = None,
playbook_service: Any | None = None,
approval_service: Any | None = None,
awooop_approval_projector: Any | None = None,
timeline_service: Any | None = None,
alert_operation_log_repository: Any | None = None,
record_history: bool = False,
) -> Adr100RemediationService:
return Adr100RemediationService(
slo_service=_FakeSloService([item]),
incident_repository=_FakeIncidentRepository(incident or _incident()),
auto_repair_service=AutoRepairService(
playbook_service=_NoopPlaybookService(),
cooldown_checker=_no_cooldown,
),
playbook_service=playbook_service,
verifier=_FakeVerifier(state or {"k8s_get_pod_status": {"phase": "Running"}}),
approval_service=approval_service,
awooop_approval_projector=awooop_approval_projector,
timeline_service=timeline_service,
alert_operation_log_repository=alert_operation_log_repository,
record_history=record_history,
)
def _runtime_replay_playbook() -> Playbook:
return Playbook(
playbook_id="PB-1",
name="Docker container restart",
description="Replay Docker restart through governed MCP write route.",
repair_steps=[
RepairStep(
step_number=1,
action_type=ActionType.SSH_COMMAND,
command="ssh {host} 'docker restart {container}'",
expected_result="container restarted",
requires_approval=False,
risk_level=PlaybookRiskLevel.MEDIUM,
)
],
)
@pytest.mark.asyncio
async def test_preview_marks_replay_work_item_read_only():
svc = _service(item=_queue_item())
result = await svc.preview("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is True
assert result["mode"] == "replay"
assert result["safety_level"] == "read_only"
assert result["writes_incident_state"] is False
assert result["plan"]["agent_id"] == "auto_repair_executor"
assert result["plan"]["writes"] == []
@pytest.mark.asyncio
async def test_preview_marks_observe_only_work_item_as_ticket_proposal():
item = _queue_item(
remediation_status="needs_playbook_ticket",
remediation_action="promote_diagnostic_to_repair_playbook",
remediation_owner="solver_or_operator",
failure_class="observe_only_playbook",
)
svc = _service(item=item)
result = await svc.preview("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is True
assert result["mode"] == "ticket"
assert result["writes_incident_state"] is False
assert result["writes_auto_repair_result"] is False
assert result["plan"]["agent_id"] == "openclaw_playbook_planner"
assert result["plan"]["required_scope"] == "record_only"
assert result["plan"]["target_action"] == "promote_diagnostic_to_repair_playbook"
@pytest.mark.asyncio
async def test_dry_run_ticket_proposal_records_internal_history_only():
alert_repo = _FakeAlertOperationLogRepository()
timeline = _FakeTimelineService()
item = _queue_item(
remediation_status="needs_playbook_ticket",
remediation_action="promote_diagnostic_to_repair_playbook",
remediation_owner="solver_or_operator",
failure_class="observe_only_playbook",
)
svc = _service(
item=item,
timeline_service=timeline,
alert_operation_log_repository=alert_repo,
record_history=True,
)
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is True
assert result["executed"] is True
assert result["mode"] == "ticket"
assert result["verification_result_preview"] == "ticket_proposal"
assert result["writes_ticket"] is False
assert result["creates_external_ticket"] is False
assert result["ticket_preview"]["would_create"] is True
assert result["ticket_preview"]["external_ticket_created"] is False
assert result["ticket_preview"]["playbook_id"] == "PB-1"
assert "momo-scheduler" in result["ticket_preview"]["target"]
assert result["history"]["recorded"] is True
assert alert_repo.calls[0]["event_type"] == "PRE_FLIGHT_PASSED"
assert alert_repo.calls[0]["context"]["ticket_preview"]["next_step"] == (
"author_mutating_repair_step"
)
assert alert_repo.calls[0]["context"]["creates_external_ticket"] is False
assert timeline.calls[0]["actor_role"] == "ticket"
@pytest.mark.asyncio
async def test_create_approval_request_is_record_only_and_does_not_authorize_repair():
alert_repo = _FakeAlertOperationLogRepository()
timeline = _FakeTimelineService()
approval_service = _FakeApprovalService()
item = _queue_item(
remediation_status="needs_playbook_ticket",
remediation_action="promote_diagnostic_to_repair_playbook",
remediation_owner="solver_or_operator",
failure_class="observe_only_playbook",
)
svc = _service(
item=item,
approval_service=approval_service,
timeline_service=timeline,
alert_operation_log_repository=alert_repo,
record_history=True,
)
result = await svc.create_approval_request("verification:INC-20260514-TEST01:are-1")
assert result["schema_version"] == "adr100_remediation_approval_v1"
assert result["allowed"] is True
assert result["mode"] == "approval"
assert result["writes_approval_record"] is True
assert result["deduplicated"] is False
assert len(result["fingerprint"]) == 64
assert result["writes_incident_state"] is False
assert result["writes_auto_repair_result"] is False
assert result["approval_id"] == "00000000-0000-0000-0000-00000000a100"
assert result["approval"]["status"] == "pending"
assert result["approval"]["risk_level"] == "medium"
assert result["plan"]["required_scope"] == "record_only"
assert result["history"]["recorded"] is True
assert approval_service.requests[0].risk_level == RiskLevel.MEDIUM
assert approval_service.requests[0].metadata["approval_kind"] == "adr100_playbook_authoring"
assert approval_service.requests[0].metadata["execution_authorized"] is False
assert approval_service.requests[0].metadata["repair_executed"] is False
assert approval_service.fingerprints[0].startswith("lookup:")
assert len(approval_service.fingerprints[0].removeprefix("lookup:")) == 64
assert len(approval_service.fingerprints[1]) == 64
assert alert_repo.calls[0]["event_type"] == "APPROVAL_ESCALATED"
assert alert_repo.calls[0]["approval_id"] == "00000000-0000-0000-0000-00000000a100"
assert alert_repo.calls[0]["context"]["writes_approval_record"] is True
assert timeline.calls[0]["actor_role"] == "approval"
@pytest.mark.asyncio
async def test_create_approval_request_deduplicates_existing_pending_approval():
approval_service = _FakeApprovalService()
approval_service.existing = ApprovalRequest(
id=UUID("00000000-0000-0000-0000-00000000a101"),
action="PLAYBOOK_AUTHORING_RECORD_ONLY: existing",
description="existing",
status=ApprovalStatus.PENDING,
risk_level=RiskLevel.MEDIUM,
required_signatures=1,
requested_by="adr100_remediation_service",
metadata={"approval_kind": "adr100_playbook_authoring"},
incident_id="INC-20260514-TEST01",
matched_playbook_id="PB-1",
)
item = _queue_item(
remediation_status="needs_playbook_ticket",
remediation_action="promote_diagnostic_to_repair_playbook",
failure_class="observe_only_playbook",
)
svc = _service(item=item, approval_service=approval_service)
result = await svc.create_approval_request("verification:INC-20260514-TEST01:are-1")
assert result["approval_id"] == "00000000-0000-0000-0000-00000000a101"
assert result["writes_approval_record"] is False
assert result["deduplicated"] is True
assert approval_service.requests == []
@pytest.mark.asyncio
async def test_create_approval_request_for_runtime_replay_creates_gate5_record_only():
alert_repo = _FakeAlertOperationLogRepository()
timeline = _FakeTimelineService()
approval_service = _FakeApprovalService()
awooop_projector = _FakeAwoooPApprovalProjector()
svc = _service(
item=_queue_item(),
playbook_service=_FakePlaybookService(_runtime_replay_playbook()),
approval_service=approval_service,
awooop_approval_projector=awooop_projector,
timeline_service=timeline,
alert_operation_log_repository=alert_repo,
record_history=True,
)
result = await svc.create_approval_request("verification:INC-20260514-TEST01:are-1")
assert result["schema_version"] == "adr100_remediation_approval_v1"
assert result["allowed"] is True
assert result["mode"] == "approval"
assert result["approval_kind"] == "adr100_runtime_replay_gate5"
assert result["verification_result_preview"] == "runtime_replay_approval_requested"
assert result["writes_approval_record"] is True
assert result["writes_incident_state"] is False
assert result["writes_auto_repair_result"] is False
assert result["replay_gate"]["status"] == "runtime_replay_ready"
assert result["replay_gate"]["repair_executed"] is False
assert result["awooop_projection"]["projected"] is True
assert result["awooop_projection"]["projection_mode"] == "approval_projection_only"
assert result["awooop_projection"]["state"] == "waiting_approval"
assert result["awooop_projection"]["decision_endpoint_enabled"] is False
assert result["awooop_projection"]["execution_authorized"] is False
assert result["awooop_projection"]["repair_executed"] is False
assert result["approval"]["status"] == "pending"
assert result["plan"]["step"] == "request_runtime_replay_gate5_approval"
assert awooop_projector.calls[0]["payload"]["approval_kind"] == (
"adr100_runtime_replay_gate5"
)
request = approval_service.requests[0]
assert request.action.startswith("RUNTIME_REPLAY_GATE5:")
assert request.blast_radius.data_impact == DataImpact.WRITE
assert request.metadata["approval_kind"] == "adr100_runtime_replay_gate5"
assert request.metadata["execution_authorized"] is False
assert request.metadata["repair_attempted"] is False
assert request.metadata["repair_executed"] is False
assert request.metadata["required_scope"] == "write_after_approval"
assert request.metadata["mcp_gate"] == "gate5_required"
assert request.metadata["write_routes"][0]["tool_name"] == "ssh_docker_restart"
assert request.metadata["write_routes"][0]["required_scope"] == "write"
assert request.metadata["replay_gate"]["status"] == "runtime_replay_ready"
assert alert_repo.calls[0]["event_type"] == "APPROVAL_ESCALATED"
assert alert_repo.calls[0]["action_detail"] == (
"adr100_runtime_replay_gate5_approval_requested"
)
assert alert_repo.calls[0]["context"]["approval_kind"] == "adr100_runtime_replay_gate5"
assert alert_repo.calls[0]["context"]["replay_gate"]["status"] == "runtime_replay_ready"
assert alert_repo.calls[0]["context"]["awooop_projection"]["state"] == "waiting_approval"
assert timeline.calls[0]["title"] == "ADR-100 runtime replay Gate 5 approval requested"
@pytest.mark.asyncio
async def test_create_approval_request_blocks_runtime_replay_when_gate_not_ready():
approval_service = _FakeApprovalService()
alert_repo = _FakeAlertOperationLogRepository()
awooop_projector = _FakeAwoooPApprovalProjector()
svc = _service(
item=_queue_item(),
playbook_service=_FakePlaybookService(None),
approval_service=approval_service,
awooop_approval_projector=awooop_projector,
alert_operation_log_repository=alert_repo,
record_history=True,
)
result = await svc.create_approval_request("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is False
assert result["writes_approval_record"] is False
assert result["approval_id"] is None
assert result["verification_result_preview"] == "runtime_replay_gate_blocked"
assert result["replay_gate"]["status"] == "blocked_playbook_not_found"
assert approval_service.requests == []
assert awooop_projector.calls == []
assert alert_repo.calls[0]["event_type"] == "PRE_FLIGHT_FAILED"
assert alert_repo.calls[0]["context"]["replay_gate"]["status"] == (
"blocked_playbook_not_found"
)
@pytest.mark.asyncio
async def test_dry_run_reverify_collects_state_without_writes():
item = _queue_item(
remediation_status="ready_for_reverify",
remediation_action="reverify_with_promql_template",
remediation_owner="post_execution_verifier",
)
svc = _service(item=item, state={"k8s_get_pod_status": {"phase": "Running"}})
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is True
assert result["executed"] is True
assert result["mode"] == "reverify"
assert result["verification_result_preview"] == "success"
assert result["writes_auto_repair_result"] is False
assert result["post_state_summary"]["tool_count"] == 1
assert result["mcp_route"]["agent_id"] == "post_execution_verifier"
assert result["mcp_route"]["required_scope"] == "read"
assert result["history"]["recorded"] is False
@pytest.mark.asyncio
async def test_dry_run_replay_validates_supported_executor_route():
svc = _service(item=_queue_item())
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is True
assert result["mode"] == "replay"
assert result["mcp_route"]["agent_id"] == "auto_repair_executor"
assert result["mcp_route"]["tool_name"] == "ssh_diagnose"
assert result["mcp_route"]["required_scope"] == "read"
assert result["mcp_route"]["params"]["host"] == "192.168.0.110"
assert result["mcp_route"]["params"]["container_name"] == "momo-scheduler"
assert result["diagnostic_command_preview"].startswith("ssh 110")
assert result["replay_gate"]["status"] == "blocked_playbook_not_found"
assert result["replay_gate"]["can_runtime_replay"] is False
@pytest.mark.asyncio
async def test_dry_run_replay_surfaces_runtime_write_gate_without_executing():
alert_repo = _FakeAlertOperationLogRepository()
svc = _service(
item=_queue_item(),
playbook_service=_FakePlaybookService(_runtime_replay_playbook()),
alert_operation_log_repository=alert_repo,
record_history=True,
)
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is True
assert result["mode"] == "replay"
assert result["writes_incident_state"] is False
assert result["writes_auto_repair_result"] is False
gate = result["replay_gate"]
assert gate["schema_version"] == "adr100_replay_gate_v1"
assert gate["status"] == "runtime_replay_ready"
assert gate["can_runtime_replay"] is True
assert gate["execution_authorized"] is False
assert gate["repair_executed"] is False
assert gate["mutating_step_count"] == 1
assert gate["supported_write_route_count"] == 1
assert gate["unsupported_step_count"] == 0
assert gate["next_step"] == "queue_runtime_replay_with_gate5_projection"
assert gate["steps"][0]["write_route"]["tool_name"] == "ssh_docker_restart"
assert gate["steps"][0]["write_route"]["required_scope"] == "write"
assert gate["steps"][0]["write_route"]["params"]["host"] == "192.168.0.110"
assert gate["steps"][0]["write_route"]["params"]["container_name"] == "momo-scheduler"
assert alert_repo.calls[0]["context"]["replay_gate"]["status"] == "runtime_replay_ready"
@pytest.mark.asyncio
async def test_dry_run_blocks_when_incident_missing():
svc = _service(item=_queue_item(), incident=None)
svc._incident_repository = _FakeIncidentRepository(None)
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
assert result["allowed"] is False
assert result["executed"] is False
assert result["verification_result_preview"] == "blocked"
assert any(check["name"] == "incident_loaded" and not check["passed"] for check in result["checks"])
@pytest.mark.asyncio
async def test_dry_run_records_alert_operation_and_timeline_history():
alert_repo = _FakeAlertOperationLogRepository()
timeline = _FakeTimelineService()
svc = _service(
item=_queue_item(),
timeline_service=timeline,
alert_operation_log_repository=alert_repo,
record_history=True,
)
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
assert result["history"] == {
"recorded": True,
"alert_operation_id": "aol-1",
"timeline_event_id": "timeline-1",
}
assert alert_repo.calls[0]["event_type"] == "PRE_FLIGHT_PASSED"
assert alert_repo.calls[0]["incident_id"] == "INC-20260514-TEST01"
assert alert_repo.calls[0]["success"] is True
assert alert_repo.calls[0]["context"]["schema_version"] == (
"adr100_remediation_dry_run_history_v1"
)
assert alert_repo.calls[0]["context"]["writes_incident_state"] is False
assert timeline.calls[0]["event_type"] == "verifier"
assert timeline.calls[0]["status"] == "success"
assert timeline.calls[0]["actor_role"] == "replay"
@pytest.mark.asyncio
async def test_history_lists_dry_run_records_grouped_by_work_item():
alert_repo = _FakeAlertOperationLogRepository()
svc = _service(
item=_queue_item(),
alert_operation_log_repository=alert_repo,
record_history=True,
)
await svc.dry_run("verification:INC-20260514-TEST01:are-1")
history = await svc.history(limit=10)
assert history["schema_version"] == "adr100_remediation_history_v1"
assert history["total"] == 1
assert history["items"][0]["work_item_id"] == "verification:INC-20260514-TEST01:are-1"
assert history["items"][0]["agent_id"] == "auto_repair_executor"
assert history["items"][0]["tool_name"] == "ssh_diagnose"
assert history["items"][0]["required_scope"] == "read"
assert history["items"][0]["writes_incident_state"] is False
assert history["by_work_item"][0]["count"] == 1
assert history["by_work_item"][0]["latest_tool_name"] == "ssh_diagnose"
@pytest.mark.asyncio
async def test_missing_work_item_raises_not_found():
svc = _service(item=_queue_item())
with pytest.raises(RemediationNotFoundError):
await svc.preview("verification:missing")
def test_ai_slo_remediation_endpoints(monkeypatch):
app = FastAPI()
app.include_router(router, prefix="/api/v1")
class _FakeService:
async def preview(self, work_item_id: str, mode: str = "auto") -> dict[str, Any]:
return {"work_item_id": work_item_id, "mode": mode, "allowed": True}
async def dry_run(self, work_item_id: str, mode: str = "auto") -> dict[str, Any]:
return {"work_item_id": work_item_id, "mode": mode, "executed": True}
async def create_approval_request(
self,
work_item_id: str,
mode: str = "approval",
) -> dict[str, Any]:
return {
"schema_version": "adr100_remediation_approval_v1",
"work_item_id": work_item_id,
"mode": mode,
"approval_id": "approval-1",
}
async def history(
self,
*,
limit: int = 50,
incident_id: str | None = None,
work_item_id: str | None = None,
) -> dict[str, Any]:
return {
"schema_version": "adr100_remediation_history_v1",
"limit": limit,
"filters": {
"incident_id": incident_id,
"work_item_id": work_item_id,
},
"items": [],
"by_work_item": [],
}
monkeypatch.setattr(
"src.api.v1.ai_slo.get_adr100_remediation_service",
lambda: _FakeService(),
)
client = TestClient(app)
preview = client.get(
"/api/v1/ai/slo/remediation/preview",
params={"work_item_id": "verification:INC:are-1", "mode": "reverify"},
)
dry_run = client.post(
"/api/v1/ai/slo/remediation/dry-run",
json={"work_item_id": "verification:INC:are-1", "mode": "replay"},
)
history = client.get(
"/api/v1/ai/slo/remediation/history",
params={"limit": 10, "work_item_id": "verification:INC:are-1"},
)
approval = client.post(
"/api/v1/ai/slo/remediation/approval-request",
json={"work_item_id": "verification:INC:are-1", "mode": "approval"},
)
assert preview.status_code == 200
assert preview.json()["mode"] == "reverify"
assert dry_run.status_code == 200
assert dry_run.json()["executed"] is True
assert approval.status_code == 200
assert approval.json()["approval_id"] == "approval-1"
assert history.status_code == 200
assert history.json()["schema_version"] == "adr100_remediation_history_v1"
assert history.json()["filters"]["work_item_id"] == "verification:INC:are-1"