763 lines
29 KiB
Python
763 lines
29 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
from uuid import UUID
|
|
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
from src.api.v1.ai_slo import router
|
|
from src.models.approval import ApprovalRequest, ApprovalStatus, DataImpact, RiskLevel
|
|
from src.models.incident import Incident, IncidentStatus, Severity, Signal
|
|
from src.models.playbook import (
|
|
ActionType,
|
|
Playbook,
|
|
RepairStep,
|
|
RiskLevel as PlaybookRiskLevel,
|
|
)
|
|
from src.services.adr100_remediation_service import (
|
|
Adr100RemediationService,
|
|
RemediationNotFoundError,
|
|
)
|
|
from src.services.auto_repair_service import AutoRepairService
|
|
|
|
|
|
class _FakeSloService:
|
|
def __init__(self, items: list[dict[str, Any]]) -> None:
|
|
self.items = items
|
|
|
|
async def fetch_report(self) -> dict[str, Any]:
|
|
return {
|
|
"verification_coverage": {
|
|
"remediation_queue": {
|
|
"items": self.items,
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
class _FakeIncidentRepository:
|
|
def __init__(self, incident: Incident | None) -> None:
|
|
self.incident = incident
|
|
|
|
async def get_by_id(self, incident_id: str) -> Incident | None:
|
|
if self.incident and self.incident.incident_id == incident_id:
|
|
return self.incident
|
|
return None
|
|
|
|
|
|
class _FakeVerifier:
|
|
def __init__(self, state: dict[str, Any]) -> None:
|
|
self.state = state
|
|
self.calls = 0
|
|
|
|
async def _collect_post_state(self, incident: Incident) -> dict[str, Any]:
|
|
self.calls += 1
|
|
return self.state
|
|
|
|
|
|
class _FakeAlertOperationLogRepository:
|
|
def __init__(self) -> None:
|
|
self.calls: list[dict[str, Any]] = []
|
|
self.records: list[Any] = []
|
|
|
|
async def append(self, event_type: str, **kwargs: Any):
|
|
self.calls.append({"event_type": event_type, **kwargs})
|
|
record = type(
|
|
"AlertOperationRecord",
|
|
(),
|
|
{
|
|
"id": f"aol-{len(self.records) + 1}",
|
|
"incident_id": kwargs.get("incident_id"),
|
|
"auto_repair_id": kwargs.get("auto_repair_id"),
|
|
"event_type": event_type,
|
|
"actor": kwargs.get("actor"),
|
|
"success": kwargs.get("success"),
|
|
"context": kwargs.get("context") or {},
|
|
"created_at": datetime(2026, 5, 14, 14, 45, len(self.records), tzinfo=timezone.utc),
|
|
},
|
|
)()
|
|
self.records.append(record)
|
|
return record
|
|
|
|
async def list_recent(
|
|
self,
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
event_type: str | None = None,
|
|
incident_id: str | None = None,
|
|
):
|
|
rows = [
|
|
record
|
|
for record in self.records
|
|
if (event_type is None or record.event_type == event_type)
|
|
and (incident_id is None or record.incident_id == incident_id)
|
|
]
|
|
rows = sorted(rows, key=lambda record: record.created_at, reverse=True)
|
|
return rows[offset:offset + limit], len(rows)
|
|
|
|
|
|
class _FakeTimelineService:
|
|
def __init__(self) -> None:
|
|
self.calls: list[dict[str, Any]] = []
|
|
|
|
async def add_event(self, **kwargs: Any) -> dict[str, Any]:
|
|
self.calls.append(kwargs)
|
|
return {"id": "timeline-1"}
|
|
|
|
|
|
class _FakeApprovalService:
|
|
def __init__(self) -> None:
|
|
self.requests: list[Any] = []
|
|
self.fingerprints: list[str] = []
|
|
self.existing: ApprovalRequest | None = None
|
|
|
|
async def create_approval(self, request: Any) -> ApprovalRequest:
|
|
return await self.create_approval_with_fingerprint(request, "")
|
|
|
|
async def create_approval_with_fingerprint(
|
|
self,
|
|
request: Any,
|
|
fingerprint: str,
|
|
) -> ApprovalRequest:
|
|
self.requests.append(request)
|
|
self.fingerprints.append(fingerprint)
|
|
return ApprovalRequest(
|
|
id=UUID("00000000-0000-0000-0000-00000000a100"),
|
|
action=request.action,
|
|
description=request.description,
|
|
status=ApprovalStatus.PENDING,
|
|
risk_level=request.risk_level,
|
|
blast_radius=request.blast_radius,
|
|
dry_run_checks=request.dry_run_checks,
|
|
required_signatures=1,
|
|
signatures=[],
|
|
requested_by=request.requested_by,
|
|
expires_at=request.expires_at,
|
|
metadata=request.metadata,
|
|
incident_id=request.incident_id,
|
|
matched_playbook_id=request.matched_playbook_id,
|
|
)
|
|
|
|
async def find_by_fingerprint(self, fingerprint: str) -> ApprovalRequest | None:
|
|
self.fingerprints.append(f"lookup:{fingerprint}")
|
|
return self.existing
|
|
|
|
|
|
class _FakeAwoooPApprovalProjector:
|
|
def __init__(self) -> None:
|
|
self.calls: list[dict[str, Any]] = []
|
|
|
|
async def project_runtime_replay_approval(
|
|
self,
|
|
*,
|
|
item: dict[str, Any],
|
|
incident: Incident,
|
|
request: Any,
|
|
payload: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
self.calls.append({
|
|
"item": item,
|
|
"incident": incident,
|
|
"request": request,
|
|
"payload": payload,
|
|
})
|
|
return {
|
|
"schema_version": "adr100_runtime_replay_awooop_projection_v1",
|
|
"projected": True,
|
|
"inserted": True,
|
|
"deduplicated": False,
|
|
"projection_mode": "approval_projection_only",
|
|
"run_id": "11111111-1111-5111-8111-111111111111",
|
|
"project_id": "awoooi",
|
|
"state": "waiting_approval",
|
|
"trigger_type": "adr100_runtime_replay_gate5",
|
|
"trigger_ref": "adr100_gate5:INC-20260514-TEST01:00000000-0000-0000-0000-00000000a100",
|
|
"decision_endpoint_enabled": False,
|
|
"execution_authorized": False,
|
|
"repair_attempted": False,
|
|
"repair_executed": False,
|
|
"step_journal": {
|
|
"step_seq": 1,
|
|
"result_status": "pending",
|
|
"was_blocked": True,
|
|
"block_reason": "approval_projection_only",
|
|
},
|
|
}
|
|
|
|
|
|
class _NoopPlaybookService:
|
|
async def get_recommendations(self, *_args, **_kwargs): # noqa: ANN002, ANN003
|
|
return []
|
|
|
|
async def get_by_id(self, _playbook_id: str) -> Playbook | None:
|
|
return None
|
|
|
|
async def record_execution(self, _playbook_id: str, _success: bool) -> bool:
|
|
return True
|
|
|
|
|
|
class _FakePlaybookService(_NoopPlaybookService):
|
|
def __init__(self, playbook: Playbook | None) -> None:
|
|
self.playbook = playbook
|
|
|
|
async def get_by_id(self, playbook_id: str) -> Playbook | None:
|
|
if self.playbook and self.playbook.playbook_id == playbook_id:
|
|
return self.playbook
|
|
return None
|
|
|
|
|
|
async def _no_cooldown(*_args, **_kwargs) -> tuple[bool, str]: # noqa: ANN002, ANN003
|
|
return True, "test"
|
|
|
|
|
|
def _incident() -> Incident:
|
|
now = datetime.now(timezone.utc)
|
|
return Incident(
|
|
incident_id="INC-20260514-TEST01",
|
|
status=IncidentStatus.INVESTIGATING,
|
|
severity=Severity.P2,
|
|
affected_services=["momo-scheduler"],
|
|
alert_category="infrastructure",
|
|
signals=[
|
|
Signal(
|
|
alert_name="DockerContainerMemoryLimitPressure",
|
|
severity=Severity.P2,
|
|
source="prometheus",
|
|
fired_at=now,
|
|
labels={
|
|
"alertname": "DockerContainerMemoryLimitPressure",
|
|
"host": "110",
|
|
"container_name": "momo-scheduler",
|
|
},
|
|
),
|
|
],
|
|
)
|
|
|
|
|
|
def _queue_item(**overrides: Any) -> dict[str, Any]:
|
|
item = {
|
|
"work_item_id": "verification:INC-20260514-TEST01:are-1",
|
|
"incident_id": "INC-20260514-TEST01",
|
|
"auto_repair_id": "are-1",
|
|
"alertname": "DockerContainerMemoryLimitPressure",
|
|
"playbook_id": "PB-1",
|
|
"verification_result": "degraded",
|
|
"remediation_status": "ready_for_replay",
|
|
"remediation_action": "replay_with_supported_executor",
|
|
"remediation_owner": "auto_repair_executor",
|
|
}
|
|
item.update(overrides)
|
|
return item
|
|
|
|
|
|
def _service(
|
|
*,
|
|
item: dict[str, Any],
|
|
incident: Incident | None = None,
|
|
state: dict[str, Any] | None = None,
|
|
playbook_service: Any | None = None,
|
|
approval_service: Any | None = None,
|
|
awooop_approval_projector: Any | None = None,
|
|
timeline_service: Any | None = None,
|
|
alert_operation_log_repository: Any | None = None,
|
|
record_history: bool = False,
|
|
) -> Adr100RemediationService:
|
|
return Adr100RemediationService(
|
|
slo_service=_FakeSloService([item]),
|
|
incident_repository=_FakeIncidentRepository(incident or _incident()),
|
|
auto_repair_service=AutoRepairService(
|
|
playbook_service=_NoopPlaybookService(),
|
|
cooldown_checker=_no_cooldown,
|
|
),
|
|
playbook_service=playbook_service,
|
|
verifier=_FakeVerifier(state or {"k8s_get_pod_status": {"phase": "Running"}}),
|
|
approval_service=approval_service,
|
|
awooop_approval_projector=awooop_approval_projector,
|
|
timeline_service=timeline_service,
|
|
alert_operation_log_repository=alert_operation_log_repository,
|
|
record_history=record_history,
|
|
)
|
|
|
|
|
|
def _runtime_replay_playbook() -> Playbook:
|
|
return Playbook(
|
|
playbook_id="PB-1",
|
|
name="Docker container restart",
|
|
description="Replay Docker restart through governed MCP write route.",
|
|
repair_steps=[
|
|
RepairStep(
|
|
step_number=1,
|
|
action_type=ActionType.SSH_COMMAND,
|
|
command="ssh {host} 'docker restart {container}'",
|
|
expected_result="container restarted",
|
|
requires_approval=False,
|
|
risk_level=PlaybookRiskLevel.MEDIUM,
|
|
)
|
|
],
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_preview_marks_replay_work_item_read_only():
|
|
svc = _service(item=_queue_item())
|
|
|
|
result = await svc.preview("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["allowed"] is True
|
|
assert result["mode"] == "replay"
|
|
assert result["safety_level"] == "read_only"
|
|
assert result["writes_incident_state"] is False
|
|
assert result["plan"]["agent_id"] == "auto_repair_executor"
|
|
assert result["plan"]["writes"] == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_preview_marks_observe_only_work_item_as_ticket_proposal():
|
|
item = _queue_item(
|
|
remediation_status="needs_playbook_ticket",
|
|
remediation_action="promote_diagnostic_to_repair_playbook",
|
|
remediation_owner="solver_or_operator",
|
|
failure_class="observe_only_playbook",
|
|
)
|
|
svc = _service(item=item)
|
|
|
|
result = await svc.preview("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["allowed"] is True
|
|
assert result["mode"] == "ticket"
|
|
assert result["writes_incident_state"] is False
|
|
assert result["writes_auto_repair_result"] is False
|
|
assert result["plan"]["agent_id"] == "openclaw_playbook_planner"
|
|
assert result["plan"]["required_scope"] == "record_only"
|
|
assert result["plan"]["target_action"] == "promote_diagnostic_to_repair_playbook"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dry_run_ticket_proposal_records_internal_history_only():
|
|
alert_repo = _FakeAlertOperationLogRepository()
|
|
timeline = _FakeTimelineService()
|
|
item = _queue_item(
|
|
remediation_status="needs_playbook_ticket",
|
|
remediation_action="promote_diagnostic_to_repair_playbook",
|
|
remediation_owner="solver_or_operator",
|
|
failure_class="observe_only_playbook",
|
|
)
|
|
svc = _service(
|
|
item=item,
|
|
timeline_service=timeline,
|
|
alert_operation_log_repository=alert_repo,
|
|
record_history=True,
|
|
)
|
|
|
|
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["allowed"] is True
|
|
assert result["executed"] is True
|
|
assert result["mode"] == "ticket"
|
|
assert result["verification_result_preview"] == "ticket_proposal"
|
|
assert result["writes_ticket"] is False
|
|
assert result["creates_external_ticket"] is False
|
|
assert result["ticket_preview"]["would_create"] is True
|
|
assert result["ticket_preview"]["external_ticket_created"] is False
|
|
assert result["ticket_preview"]["playbook_id"] == "PB-1"
|
|
assert "momo-scheduler" in result["ticket_preview"]["target"]
|
|
assert result["history"]["recorded"] is True
|
|
assert alert_repo.calls[0]["event_type"] == "PRE_FLIGHT_PASSED"
|
|
assert alert_repo.calls[0]["context"]["ticket_preview"]["next_step"] == (
|
|
"author_mutating_repair_step"
|
|
)
|
|
assert alert_repo.calls[0]["context"]["creates_external_ticket"] is False
|
|
assert timeline.calls[0]["actor_role"] == "ticket"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_approval_request_is_record_only_and_does_not_authorize_repair():
|
|
alert_repo = _FakeAlertOperationLogRepository()
|
|
timeline = _FakeTimelineService()
|
|
approval_service = _FakeApprovalService()
|
|
item = _queue_item(
|
|
remediation_status="needs_playbook_ticket",
|
|
remediation_action="promote_diagnostic_to_repair_playbook",
|
|
remediation_owner="solver_or_operator",
|
|
failure_class="observe_only_playbook",
|
|
)
|
|
svc = _service(
|
|
item=item,
|
|
approval_service=approval_service,
|
|
timeline_service=timeline,
|
|
alert_operation_log_repository=alert_repo,
|
|
record_history=True,
|
|
)
|
|
|
|
result = await svc.create_approval_request("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["schema_version"] == "adr100_remediation_approval_v1"
|
|
assert result["allowed"] is True
|
|
assert result["mode"] == "approval"
|
|
assert result["writes_approval_record"] is True
|
|
assert result["deduplicated"] is False
|
|
assert len(result["fingerprint"]) == 64
|
|
assert result["writes_incident_state"] is False
|
|
assert result["writes_auto_repair_result"] is False
|
|
assert result["approval_id"] == "00000000-0000-0000-0000-00000000a100"
|
|
assert result["approval"]["status"] == "pending"
|
|
assert result["approval"]["risk_level"] == "medium"
|
|
assert result["plan"]["required_scope"] == "record_only"
|
|
assert result["history"]["recorded"] is True
|
|
assert approval_service.requests[0].risk_level == RiskLevel.MEDIUM
|
|
assert approval_service.requests[0].metadata["approval_kind"] == "adr100_playbook_authoring"
|
|
assert approval_service.requests[0].metadata["execution_authorized"] is False
|
|
assert approval_service.requests[0].metadata["repair_executed"] is False
|
|
assert approval_service.fingerprints[0].startswith("lookup:")
|
|
assert len(approval_service.fingerprints[0].removeprefix("lookup:")) == 64
|
|
assert len(approval_service.fingerprints[1]) == 64
|
|
assert alert_repo.calls[0]["event_type"] == "APPROVAL_ESCALATED"
|
|
assert alert_repo.calls[0]["approval_id"] == "00000000-0000-0000-0000-00000000a100"
|
|
assert alert_repo.calls[0]["context"]["writes_approval_record"] is True
|
|
assert timeline.calls[0]["actor_role"] == "approval"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_approval_request_deduplicates_existing_pending_approval():
|
|
approval_service = _FakeApprovalService()
|
|
approval_service.existing = ApprovalRequest(
|
|
id=UUID("00000000-0000-0000-0000-00000000a101"),
|
|
action="PLAYBOOK_AUTHORING_RECORD_ONLY: existing",
|
|
description="existing",
|
|
status=ApprovalStatus.PENDING,
|
|
risk_level=RiskLevel.MEDIUM,
|
|
required_signatures=1,
|
|
requested_by="adr100_remediation_service",
|
|
metadata={"approval_kind": "adr100_playbook_authoring"},
|
|
incident_id="INC-20260514-TEST01",
|
|
matched_playbook_id="PB-1",
|
|
)
|
|
item = _queue_item(
|
|
remediation_status="needs_playbook_ticket",
|
|
remediation_action="promote_diagnostic_to_repair_playbook",
|
|
failure_class="observe_only_playbook",
|
|
)
|
|
svc = _service(item=item, approval_service=approval_service)
|
|
|
|
result = await svc.create_approval_request("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["approval_id"] == "00000000-0000-0000-0000-00000000a101"
|
|
assert result["writes_approval_record"] is False
|
|
assert result["deduplicated"] is True
|
|
assert approval_service.requests == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_approval_request_for_runtime_replay_creates_gate5_record_only():
|
|
alert_repo = _FakeAlertOperationLogRepository()
|
|
timeline = _FakeTimelineService()
|
|
approval_service = _FakeApprovalService()
|
|
awooop_projector = _FakeAwoooPApprovalProjector()
|
|
svc = _service(
|
|
item=_queue_item(),
|
|
playbook_service=_FakePlaybookService(_runtime_replay_playbook()),
|
|
approval_service=approval_service,
|
|
awooop_approval_projector=awooop_projector,
|
|
timeline_service=timeline,
|
|
alert_operation_log_repository=alert_repo,
|
|
record_history=True,
|
|
)
|
|
|
|
result = await svc.create_approval_request("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["schema_version"] == "adr100_remediation_approval_v1"
|
|
assert result["allowed"] is True
|
|
assert result["mode"] == "approval"
|
|
assert result["approval_kind"] == "adr100_runtime_replay_gate5"
|
|
assert result["verification_result_preview"] == "runtime_replay_approval_requested"
|
|
assert result["writes_approval_record"] is True
|
|
assert result["writes_incident_state"] is False
|
|
assert result["writes_auto_repair_result"] is False
|
|
assert result["replay_gate"]["status"] == "runtime_replay_ready"
|
|
assert result["replay_gate"]["repair_executed"] is False
|
|
assert result["awooop_projection"]["projected"] is True
|
|
assert result["awooop_projection"]["projection_mode"] == "approval_projection_only"
|
|
assert result["awooop_projection"]["state"] == "waiting_approval"
|
|
assert result["awooop_projection"]["decision_endpoint_enabled"] is False
|
|
assert result["awooop_projection"]["execution_authorized"] is False
|
|
assert result["awooop_projection"]["repair_executed"] is False
|
|
assert result["approval"]["status"] == "pending"
|
|
assert result["plan"]["step"] == "request_runtime_replay_gate5_approval"
|
|
assert awooop_projector.calls[0]["payload"]["approval_kind"] == (
|
|
"adr100_runtime_replay_gate5"
|
|
)
|
|
request = approval_service.requests[0]
|
|
assert request.action.startswith("RUNTIME_REPLAY_GATE5:")
|
|
assert request.blast_radius.data_impact == DataImpact.WRITE
|
|
assert request.metadata["approval_kind"] == "adr100_runtime_replay_gate5"
|
|
assert request.metadata["execution_authorized"] is False
|
|
assert request.metadata["repair_attempted"] is False
|
|
assert request.metadata["repair_executed"] is False
|
|
assert request.metadata["required_scope"] == "write_after_approval"
|
|
assert request.metadata["mcp_gate"] == "gate5_required"
|
|
assert request.metadata["write_routes"][0]["tool_name"] == "ssh_docker_restart"
|
|
assert request.metadata["write_routes"][0]["required_scope"] == "write"
|
|
assert request.metadata["replay_gate"]["status"] == "runtime_replay_ready"
|
|
assert alert_repo.calls[0]["event_type"] == "APPROVAL_ESCALATED"
|
|
assert alert_repo.calls[0]["action_detail"] == (
|
|
"adr100_runtime_replay_gate5_approval_requested"
|
|
)
|
|
assert alert_repo.calls[0]["context"]["approval_kind"] == "adr100_runtime_replay_gate5"
|
|
assert alert_repo.calls[0]["context"]["replay_gate"]["status"] == "runtime_replay_ready"
|
|
assert alert_repo.calls[0]["context"]["awooop_projection"]["state"] == "waiting_approval"
|
|
assert timeline.calls[0]["title"] == "ADR-100 runtime replay Gate 5 approval requested"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_approval_request_blocks_runtime_replay_when_gate_not_ready():
|
|
approval_service = _FakeApprovalService()
|
|
alert_repo = _FakeAlertOperationLogRepository()
|
|
awooop_projector = _FakeAwoooPApprovalProjector()
|
|
svc = _service(
|
|
item=_queue_item(),
|
|
playbook_service=_FakePlaybookService(None),
|
|
approval_service=approval_service,
|
|
awooop_approval_projector=awooop_projector,
|
|
alert_operation_log_repository=alert_repo,
|
|
record_history=True,
|
|
)
|
|
|
|
result = await svc.create_approval_request("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["allowed"] is False
|
|
assert result["writes_approval_record"] is False
|
|
assert result["approval_id"] is None
|
|
assert result["verification_result_preview"] == "runtime_replay_gate_blocked"
|
|
assert result["replay_gate"]["status"] == "blocked_playbook_not_found"
|
|
assert approval_service.requests == []
|
|
assert awooop_projector.calls == []
|
|
assert alert_repo.calls[0]["event_type"] == "PRE_FLIGHT_FAILED"
|
|
assert alert_repo.calls[0]["context"]["replay_gate"]["status"] == (
|
|
"blocked_playbook_not_found"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dry_run_reverify_collects_state_without_writes():
|
|
item = _queue_item(
|
|
remediation_status="ready_for_reverify",
|
|
remediation_action="reverify_with_promql_template",
|
|
remediation_owner="post_execution_verifier",
|
|
)
|
|
svc = _service(item=item, state={"k8s_get_pod_status": {"phase": "Running"}})
|
|
|
|
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["allowed"] is True
|
|
assert result["executed"] is True
|
|
assert result["mode"] == "reverify"
|
|
assert result["verification_result_preview"] == "success"
|
|
assert result["writes_auto_repair_result"] is False
|
|
assert result["post_state_summary"]["tool_count"] == 1
|
|
assert result["mcp_route"]["agent_id"] == "post_execution_verifier"
|
|
assert result["mcp_route"]["required_scope"] == "read"
|
|
assert result["history"]["recorded"] is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dry_run_replay_validates_supported_executor_route():
|
|
svc = _service(item=_queue_item())
|
|
|
|
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["allowed"] is True
|
|
assert result["mode"] == "replay"
|
|
assert result["mcp_route"]["agent_id"] == "auto_repair_executor"
|
|
assert result["mcp_route"]["tool_name"] == "ssh_diagnose"
|
|
assert result["mcp_route"]["required_scope"] == "read"
|
|
assert result["mcp_route"]["params"]["host"] == "192.168.0.110"
|
|
assert result["mcp_route"]["params"]["container_name"] == "momo-scheduler"
|
|
assert result["diagnostic_command_preview"].startswith("ssh 110")
|
|
assert result["replay_gate"]["status"] == "blocked_playbook_not_found"
|
|
assert result["replay_gate"]["can_runtime_replay"] is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dry_run_replay_surfaces_runtime_write_gate_without_executing():
|
|
alert_repo = _FakeAlertOperationLogRepository()
|
|
svc = _service(
|
|
item=_queue_item(),
|
|
playbook_service=_FakePlaybookService(_runtime_replay_playbook()),
|
|
alert_operation_log_repository=alert_repo,
|
|
record_history=True,
|
|
)
|
|
|
|
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["allowed"] is True
|
|
assert result["mode"] == "replay"
|
|
assert result["writes_incident_state"] is False
|
|
assert result["writes_auto_repair_result"] is False
|
|
gate = result["replay_gate"]
|
|
assert gate["schema_version"] == "adr100_replay_gate_v1"
|
|
assert gate["status"] == "runtime_replay_ready"
|
|
assert gate["can_runtime_replay"] is True
|
|
assert gate["execution_authorized"] is False
|
|
assert gate["repair_executed"] is False
|
|
assert gate["mutating_step_count"] == 1
|
|
assert gate["supported_write_route_count"] == 1
|
|
assert gate["unsupported_step_count"] == 0
|
|
assert gate["next_step"] == "queue_runtime_replay_with_gate5_projection"
|
|
assert gate["steps"][0]["write_route"]["tool_name"] == "ssh_docker_restart"
|
|
assert gate["steps"][0]["write_route"]["required_scope"] == "write"
|
|
assert gate["steps"][0]["write_route"]["params"]["host"] == "192.168.0.110"
|
|
assert gate["steps"][0]["write_route"]["params"]["container_name"] == "momo-scheduler"
|
|
assert alert_repo.calls[0]["context"]["replay_gate"]["status"] == "runtime_replay_ready"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dry_run_blocks_when_incident_missing():
|
|
svc = _service(item=_queue_item(), incident=None)
|
|
svc._incident_repository = _FakeIncidentRepository(None)
|
|
|
|
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["allowed"] is False
|
|
assert result["executed"] is False
|
|
assert result["verification_result_preview"] == "blocked"
|
|
assert any(check["name"] == "incident_loaded" and not check["passed"] for check in result["checks"])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dry_run_records_alert_operation_and_timeline_history():
|
|
alert_repo = _FakeAlertOperationLogRepository()
|
|
timeline = _FakeTimelineService()
|
|
svc = _service(
|
|
item=_queue_item(),
|
|
timeline_service=timeline,
|
|
alert_operation_log_repository=alert_repo,
|
|
record_history=True,
|
|
)
|
|
|
|
result = await svc.dry_run("verification:INC-20260514-TEST01:are-1")
|
|
|
|
assert result["history"] == {
|
|
"recorded": True,
|
|
"alert_operation_id": "aol-1",
|
|
"timeline_event_id": "timeline-1",
|
|
}
|
|
assert alert_repo.calls[0]["event_type"] == "PRE_FLIGHT_PASSED"
|
|
assert alert_repo.calls[0]["incident_id"] == "INC-20260514-TEST01"
|
|
assert alert_repo.calls[0]["success"] is True
|
|
assert alert_repo.calls[0]["context"]["schema_version"] == (
|
|
"adr100_remediation_dry_run_history_v1"
|
|
)
|
|
assert alert_repo.calls[0]["context"]["writes_incident_state"] is False
|
|
assert timeline.calls[0]["event_type"] == "verifier"
|
|
assert timeline.calls[0]["status"] == "success"
|
|
assert timeline.calls[0]["actor_role"] == "replay"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_history_lists_dry_run_records_grouped_by_work_item():
|
|
alert_repo = _FakeAlertOperationLogRepository()
|
|
svc = _service(
|
|
item=_queue_item(),
|
|
alert_operation_log_repository=alert_repo,
|
|
record_history=True,
|
|
)
|
|
|
|
await svc.dry_run("verification:INC-20260514-TEST01:are-1")
|
|
history = await svc.history(limit=10)
|
|
|
|
assert history["schema_version"] == "adr100_remediation_history_v1"
|
|
assert history["total"] == 1
|
|
assert history["items"][0]["work_item_id"] == "verification:INC-20260514-TEST01:are-1"
|
|
assert history["items"][0]["agent_id"] == "auto_repair_executor"
|
|
assert history["items"][0]["tool_name"] == "ssh_diagnose"
|
|
assert history["items"][0]["required_scope"] == "read"
|
|
assert history["items"][0]["writes_incident_state"] is False
|
|
assert history["by_work_item"][0]["count"] == 1
|
|
assert history["by_work_item"][0]["latest_tool_name"] == "ssh_diagnose"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_missing_work_item_raises_not_found():
|
|
svc = _service(item=_queue_item())
|
|
|
|
with pytest.raises(RemediationNotFoundError):
|
|
await svc.preview("verification:missing")
|
|
|
|
|
|
def test_ai_slo_remediation_endpoints(monkeypatch):
|
|
app = FastAPI()
|
|
app.include_router(router, prefix="/api/v1")
|
|
|
|
class _FakeService:
|
|
async def preview(self, work_item_id: str, mode: str = "auto") -> dict[str, Any]:
|
|
return {"work_item_id": work_item_id, "mode": mode, "allowed": True}
|
|
|
|
async def dry_run(self, work_item_id: str, mode: str = "auto") -> dict[str, Any]:
|
|
return {"work_item_id": work_item_id, "mode": mode, "executed": True}
|
|
|
|
async def create_approval_request(
|
|
self,
|
|
work_item_id: str,
|
|
mode: str = "approval",
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"schema_version": "adr100_remediation_approval_v1",
|
|
"work_item_id": work_item_id,
|
|
"mode": mode,
|
|
"approval_id": "approval-1",
|
|
}
|
|
|
|
async def history(
|
|
self,
|
|
*,
|
|
limit: int = 50,
|
|
incident_id: str | None = None,
|
|
work_item_id: str | None = None,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"schema_version": "adr100_remediation_history_v1",
|
|
"limit": limit,
|
|
"filters": {
|
|
"incident_id": incident_id,
|
|
"work_item_id": work_item_id,
|
|
},
|
|
"items": [],
|
|
"by_work_item": [],
|
|
}
|
|
|
|
monkeypatch.setattr(
|
|
"src.api.v1.ai_slo.get_adr100_remediation_service",
|
|
lambda: _FakeService(),
|
|
)
|
|
|
|
client = TestClient(app)
|
|
preview = client.get(
|
|
"/api/v1/ai/slo/remediation/preview",
|
|
params={"work_item_id": "verification:INC:are-1", "mode": "reverify"},
|
|
)
|
|
dry_run = client.post(
|
|
"/api/v1/ai/slo/remediation/dry-run",
|
|
json={"work_item_id": "verification:INC:are-1", "mode": "replay"},
|
|
)
|
|
history = client.get(
|
|
"/api/v1/ai/slo/remediation/history",
|
|
params={"limit": 10, "work_item_id": "verification:INC:are-1"},
|
|
)
|
|
approval = client.post(
|
|
"/api/v1/ai/slo/remediation/approval-request",
|
|
json={"work_item_id": "verification:INC:are-1", "mode": "approval"},
|
|
)
|
|
|
|
assert preview.status_code == 200
|
|
assert preview.json()["mode"] == "reverify"
|
|
assert dry_run.status_code == 200
|
|
assert dry_run.json()["executed"] is True
|
|
assert approval.status_code == 200
|
|
assert approval.json()["approval_id"] == "approval-1"
|
|
assert history.status_code == 200
|
|
assert history.json()["schema_version"] == "adr100_remediation_history_v1"
|
|
assert history.json()["filters"]["work_item_id"] == "verification:INC:are-1"
|