Files
awoooi/apps/api/tests/test_repair_candidate_service.py
Your Name 06b40f316c
Some checks are pending
Ansible / Reboot Recovery Contract / validate (push) Waiting to run
CD Pipeline / build-and-deploy (push) Blocked by required conditions
CD Pipeline / post-deploy-checks (push) Blocked by required conditions
Code Review / ai-code-review (push) Successful in 23s
CD Pipeline / tests (push) Successful in 1m52s
fix(api): route repair candidates to controlled automation queue
2026-06-27 12:59:24 +08:00

518 lines
23 KiB
Python

from datetime import datetime, timezone
from uuid import uuid4
import pytest
from src.models.approval import ApprovalRequestCreate, BlastRadius, DataImpact, RiskLevel
from src.models.incident import Incident, Severity, Signal
from src.models.playbook import (
ActionType,
Playbook,
PlaybookSource,
PlaybookStatus,
RepairStep,
)
from src.models.playbook import RiskLevel as PlaybookRiskLevel
from src.services.approval_db import approval_request_to_record_data
from src.services.evidence_snapshot import EvidenceSnapshot
from src.services.repair_candidate_service import RepairCandidateService
class FakeInvestigator:
def __init__(self, evidence: EvidenceSnapshot | None) -> None:
self.evidence = evidence
async def investigate(self, incident: Incident) -> EvidenceSnapshot | None:
return self.evidence
class FakePlaybookRepository:
def __init__(self, playbook: Playbook | None) -> None:
self.playbook = playbook
async def get_by_id(self, playbook_id: str) -> Playbook | None:
if self.playbook and self.playbook.playbook_id == playbook_id:
return self.playbook
return None
class FakeIncidentService:
async def get_from_working_memory(self, incident_id: str) -> None:
return None
class FakeAutoRepairService:
def preview_write_ssh_mcp_route(self, incident: Incident, command: str) -> bool:
return True
def _incident() -> Incident:
return Incident(
incident_id="INC-TEST-REPAIR",
severity=Severity.P2,
signals=[
Signal(
alert_name="NodeExporterDown",
severity=Severity.P2,
source="alertmanager",
fired_at=datetime.now(timezone.utc),
labels={"namespace": "awoooi-prod", "deployment": "awoooi-api"},
annotations={"summary": "node exporter down"},
)
],
affected_services=["awoooi-api"],
)
def _evidence(incident_id: str, *, sensors_succeeded: int = 2) -> EvidenceSnapshot:
return EvidenceSnapshot(
incident_id=incident_id,
sensors_attempted=3,
sensors_succeeded=sensors_succeeded,
mcp_health={"k8s": sensors_succeeded > 0, "prometheus": sensors_succeeded > 1},
evidence_summary="k8s events and metrics collected",
)
def _playbook(command: str, *, risk_level: PlaybookRiskLevel = PlaybookRiskLevel.LOW) -> Playbook:
return Playbook(
playbook_id="PB-REPAIR-001",
name="重啟 API deployment",
description="以 approved PlayBook 修復 API 工作負載",
status=PlaybookStatus.APPROVED,
source=PlaybookSource.MANUAL,
trust_score=0.72,
estimated_duration_minutes=4,
repair_steps=[
RepairStep(
step_number=1,
action_type=ActionType.KUBECTL,
command=command,
expected_result="deployment restarted and rollout verified",
risk_level=risk_level,
requires_approval=True,
)
],
)
def _generic_fallback_playbook() -> Playbook:
playbook = _playbook(
"kubectl rollout restart deployment/{target} -n {namespace}",
risk_level=PlaybookRiskLevel.MEDIUM,
)
playbook.playbook_id = "PB-GENERIC-FALLBACK"
playbook.name = "通用兜底規則"
playbook.symptom_pattern.alert_names = ["*"]
return playbook
@pytest.mark.asyncio
async def test_build_candidate_from_mcp_evidence_and_approved_playbook() -> None:
incident = _incident()
service = RepairCandidateService(
incident_service=FakeIncidentService(),
investigator=FakeInvestigator(_evidence(incident.incident_id)),
playbook_repository=FakePlaybookRepository(
_playbook("kubectl rollout restart deployment/awoooi-api -n awoooi-prod")
),
auto_repair_service=FakeAutoRepairService(),
)
result = await service.build_from_incident(
incident=incident,
alertname="NodeExporterDown",
target_resource="awoooi-api",
namespace="awoooi-prod",
message="node exporter is down",
fallback_action="NO_ACTION - REPAIR_CANDIDATE_MISSING",
matched_playbook_id="PB-REPAIR-001",
severity="medium",
)
assert result.candidate_found is True
assert result.approval_request is not None
assert result.approval_request.action == "kubectl rollout restart deployment/awoooi-api -n awoooi-prod"
assert result.approval_request.risk_level == RiskLevel.MEDIUM
assert result.approval_request.matched_playbook_id == "PB-REPAIR-001"
assert result.metadata["repair_candidate_status"] == "candidate_ready_for_approval"
assert result.metadata["mcp_evidence"]["sensors_succeeded"] == 2
assert result.metadata["playbook_trust"]["trust_score"] == 0.72
assert result.metadata["verifier_plan"]
@pytest.mark.asyncio
async def test_candidate_blocked_when_mcp_evidence_missing() -> None:
incident = _incident()
service = RepairCandidateService(
incident_service=FakeIncidentService(),
investigator=FakeInvestigator(_evidence(incident.incident_id, sensors_succeeded=0)),
playbook_repository=FakePlaybookRepository(
_playbook("kubectl rollout restart deployment/awoooi-api -n awoooi-prod")
),
auto_repair_service=FakeAutoRepairService(),
)
result = await service.build_from_incident(
incident=incident,
alertname="NodeExporterDown",
target_resource="awoooi-api",
namespace="awoooi-prod",
message="node exporter is down",
fallback_action="NO_ACTION - REPAIR_CANDIDATE_MISSING",
matched_playbook_id="PB-REPAIR-001",
severity="medium",
)
assert result.candidate_found is False
assert "mcp_evidence_missing" in result.blockers
assert result.metadata["repair_candidate_status"] == "blocked"
@pytest.mark.asyncio
async def test_candidate_blocked_when_playbook_is_observe_only() -> None:
incident = _incident()
service = RepairCandidateService(
incident_service=FakeIncidentService(),
investigator=FakeInvestigator(_evidence(incident.incident_id)),
playbook_repository=FakePlaybookRepository(
_playbook("kubectl get pods -n awoooi-prod")
),
auto_repair_service=FakeAutoRepairService(),
)
result = await service.build_from_incident(
incident=incident,
alertname="NodeExporterDown",
target_resource="awoooi-api",
namespace="awoooi-prod",
message="node exporter is down",
fallback_action="NO_ACTION - REPAIR_CANDIDATE_MISSING",
matched_playbook_id="PB-REPAIR-001",
severity="medium",
)
assert result.candidate_found is False
assert "playbook_observe_only" in result.blockers
@pytest.mark.asyncio
async def test_candidate_blocked_when_playbook_is_generic_fallback() -> None:
incident = _incident()
service = RepairCandidateService(
incident_service=FakeIncidentService(),
investigator=FakeInvestigator(_evidence(incident.incident_id)),
playbook_repository=FakePlaybookRepository(_generic_fallback_playbook()),
auto_repair_service=FakeAutoRepairService(),
)
result = await service.build_from_incident(
incident=incident,
alertname="UnknownAlert",
target_resource="awoooi-api",
namespace="awoooi-prod",
message="unknown alert",
fallback_action="NO_ACTION - REPAIR_CANDIDATE_MISSING",
matched_playbook_id="PB-GENERIC-FALLBACK",
severity="medium",
)
assert result.candidate_found is False
assert "playbook_generic_fallback_not_repair" in result.blockers
assert "通用兜底" in result.metadata["repair_candidate_blocker_summary"]
assert result.metadata["playbook_draft_required"] is True
assert result.metadata["repair_candidate_draft_package"]["schema_version"] == (
"repair_candidate_draft_package_v1"
)
assert result.metadata["repair_candidate_draft_package"]["lane"] == (
"create_service_specific_repair_playbook"
)
assert "建立專屬 PlayBook 草案" in result.metadata["repair_candidate_next_step"]
assert "草案已預填" in result.metadata["repair_candidate_next_step"]
assert "route=k8s_rollout_after_owner_review" in result.metadata["repair_candidate_next_step"]
assert "repair_template=kubectl rollout restart deployment/awoooi-api -n awoooi-prod" in (
result.metadata["repair_candidate_next_step"]
)
assert "repair_command" in result.metadata["repair_candidate_draft_package"]["required_fields"]
assert "script_or_ansible_ref" in result.metadata["repair_candidate_draft_package"]["required_fields"]
assert "automation_asset_record" in result.metadata["repair_candidate_draft_package"]["required_fields"]
assert "km_update_draft" in result.metadata["repair_candidate_draft_package"]["required_writebacks"]
asset_requirements = result.metadata["repair_candidate_draft_package"][
"automation_asset_requirements"
]
assert [item["asset_type"] for item in asset_requirements] == [
"KM",
"PlayBook",
"ScriptOrAnsible",
"ScheduleOrMonitoringRule",
"Verifier",
]
coverage_gap = result.metadata["repair_candidate_draft_package"]["coverage_gap"]
assert coverage_gap["schema_version"] == "repair_candidate_coverage_gap_v1"
assert coverage_gap["coverage_key"] == "unknownalert:awoooi-api"
assert coverage_gap["blocking_stage"] == "service_playbook_coverage"
assert "automation_asset_record" in coverage_gap["playbook_template_fields"]
assert coverage_gap["next_owner_lane"] == "create_service_specific_repair_playbook"
assert coverage_gap["mcp_evidence_ready"] is True
assert coverage_gap["runtime_execution_authorized"] is False
assert "recurrence_fingerprint" in coverage_gap["required_mcp_evidence_refs"]
assert "repair_steps.command_or_ansible_ref" in coverage_gap["playbook_template_fields"]
draft_template = result.metadata["repair_candidate_draft_package"]["playbook_draft_template"]
assert draft_template["schema_version"] == "service_specific_playbook_draft_template_v1"
assert draft_template["status"] == "prefilled_controlled_queue_candidate"
assert draft_template["coverage_key"] == "unknownalert:awoooi-api"
assert draft_template["suggested_route"] == "k8s_rollout_after_owner_review"
assert draft_template["repair_command_template"] == (
"kubectl rollout restart deployment/awoooi-api -n awoooi-prod"
)
assert draft_template["rollback_command_template"] == (
"kubectl rollout undo deployment/awoooi-api -n awoooi-prod"
)
assert draft_template["template_is_executable"] is False
assert draft_template["approval_required_before_execution"] is True
assert draft_template["runtime_execution_authorized"] is False
assert "確認 MCP evidence refs 足以支持修復假設" in draft_template["owner_review_checklist"]
work_item = result.metadata["repair_candidate_draft_package"]["awooop_work_item"]
assert work_item["schema_version"] == "awooop_repair_candidate_draft_work_item_v1"
assert work_item["work_item_id"].startswith(
"repair-candidate-draft:awoooi:INC-TEST-REPAIR:"
)
assert work_item["status"] == "open"
assert work_item["needs_human"] is True
assert work_item["decision_effect"] == "none"
assert work_item["writes_runtime_state"] is False
assert work_item["coverage_gap"]["coverage_key"] == "unknownalert:awoooi-api"
assert work_item["playbook_draft_template"]["repair_command_template"] == (
"kubectl rollout restart deployment/awoooi-api -n awoooi-prod"
)
assert work_item["playbook_draft_template"]["template_is_executable"] is False
assert "/awooop/work-items?" in work_item["work_item_href"]
assert "https://awoooi.wooo.work/zh-TW/awooop/work-items?" in work_item["work_item_url"]
@pytest.mark.asyncio
async def test_candidate_blocked_observe_only_prompts_repair_playbook_draft() -> None:
incident = _incident()
playbook = _playbook(
"ssh 192.168.0.188 'uptime; ps aux --sort=-%cpu | head -20; docker stats --no-stream'",
risk_level=PlaybookRiskLevel.LOW,
)
playbook.repair_steps[0].action_type = ActionType.SSH_COMMAND
service = RepairCandidateService(
incident_service=FakeIncidentService(),
investigator=FakeInvestigator(_evidence(incident.incident_id)),
playbook_repository=FakePlaybookRepository(playbook),
auto_repair_service=FakeAutoRepairService(),
)
service._auto_repair = type(
"NoRouteAutoRepairService",
(),
{"preview_write_ssh_mcp_route": lambda self, incident, command: False},
)()
result = await service.build_from_incident(
incident=incident,
alertname="NodeExporterDown",
target_resource="node-exporter-188",
namespace="awoooi-prod",
message="node exporter is down",
fallback_action="NO_ACTION - REPAIR_CANDIDATE_MISSING",
matched_playbook_id="PB-REPAIR-001",
severity="medium",
)
assert result.candidate_found is False
assert result.draft_ready_for_owner_review is True
assert result.metadata["repair_candidate_status"] == "controlled_playbook_queue_ready"
assert result.metadata["repair_candidate_draft_ready"] is True
assert result.metadata["repair_candidate_controlled_playbook_queue_ready"] is True
assert result.metadata["repair_candidate_owner_review_required"] is False
assert result.metadata["repair_candidate_runtime_execution_authorized"] is False
assert "playbook_observe_only" in result.blockers
draft_package = result.metadata["repair_candidate_draft_package"]
assert draft_package["status"] == "controlled_playbook_queue_ready"
assert draft_package["owner_review_gate"] == "auto_waived_by_current_policy"
assert draft_package["controlled_playbook_queue"] is True
assert draft_package["runtime_execution_authorized"] is False
assert draft_package["lane"] == "promote_diagnostic_to_repair_playbook"
coverage_gap = draft_package["coverage_gap"]
assert coverage_gap["coverage_key"] == "nodeexporterdown:node-exporter-188"
assert coverage_gap["target_kind"] == "host_service"
assert coverage_gap["blocking_stage"] == "service_playbook_coverage"
assert coverage_gap["matched_playbook_id"] == "PB-REPAIR-001"
assert "systemd_or_container_status" in coverage_gap["required_mcp_evidence_refs"]
assert "診斷命令保留為 MCP evidence collector" in result.metadata["repair_candidate_next_step"]
assert "route=host_service_route_after_owner_review" in (
result.metadata["repair_candidate_next_step"]
)
draft_template = draft_package["playbook_draft_template"]
assert draft_template["suggested_route"] == "host_service_route_after_owner_review"
assert draft_template["repair_command_template"] == "systemctl restart node-exporter-188"
assert draft_template["template_is_executable"] is False
assert draft_template["runtime_execution_authorized"] is False
promotion_contract = draft_package["candidate_promotion_contract"]
assert promotion_contract["schema_version"] == "repair_candidate_promotion_contract_v1"
assert promotion_contract["status"] == "controlled_playbook_queue_ready"
assert promotion_contract["route_id"] == "host_service_route_after_owner_review"
assert promotion_contract["repair_command_template"] == "systemctl restart node-exporter-188"
assert promotion_contract["ready_count"] == 11
assert promotion_contract["total_count"] == 11
assert promotion_contract["blocked_count"] == 0
assert promotion_contract["blocked_fields"] == []
assert "owner_review" in promotion_contract["ready_fields"]
assert "maintenance_window" in promotion_contract["ready_fields"]
assert promotion_contract["controlled_playbook_queue"] is True
assert promotion_contract["approval_required_before_execution"] is False
assert promotion_contract["owner_review_required"] is False
assert promotion_contract["runtime_execution_authorized"] is False
assert promotion_contract["runtime_write_allowed"] is False
closure = promotion_contract["controlled_automation_closure"]
assert closure["schema_version"] == "repair_candidate_controlled_automation_closure_v1"
assert closure["status"] == "controlled_playbook_queue_ready_no_write_rehearsal"
assert closure["runtime_write_gate"] == "controlled_queue_pending_check_mode"
assert closure["no_write_rehearsal"]["status"] == "queued_by_ai"
assert closure["no_write_rehearsal"]["executes_command"] is False
assert closure["verifier_writeback_plan"]["status"] == "planned_not_executed"
assert "incident_timeline" in closure["verifier_writeback_plan"]["writes_after_execution"]
assert [item["asset_type"] for item in closure["automation_asset_ledger"]] == [
"KM",
"PlayBook",
"ScriptOrAnsible",
"Verifier",
]
assert result.metadata["repair_candidate_promotion_contract"] == promotion_contract
assert "promotion=11/11" in result.metadata["repair_candidate_promotion_summary"]
assert "runtime=false" in result.metadata["repair_candidate_promotion_summary"]
work_item = draft_package["awooop_work_item"]
assert work_item["status"] == "controlled_playbook_queue_ready"
assert work_item["next_action"] == "queue_check_mode_then_controlled_apply"
assert work_item["owner_review_required"] is False
assert work_item["controlled_playbook_queue"] is True
assert work_item["runtime_execution_authorized"] is False
assert work_item["candidate_promotion_contract"]["route_id"] == (
"host_service_route_after_owner_review"
)
assert work_item["target_resource"] == "node-exporter-188"
assert work_item["lane"] == "promote_diagnostic_to_repair_playbook"
assert work_item["safety_level"] == "read_only_work_item_projection"
assert work_item["coverage_gap"]["next_owner_lane"] == "promote_diagnostic_to_repair_playbook"
def test_promotion_summary_marks_controlled_runtime_when_apply_gate_passes() -> None:
service = RepairCandidateService()
summary = service._promotion_summary_for_operator({
"route_id": "ansible:188-ai-web",
"ready_count": 11,
"total_count": 11,
"blocked_count": 0,
"status": "controlled_apply_auto_authorized",
"runtime_execution_authorized": True,
"runtime_write_allowed": True,
})
assert summary == (
"route=ansible:188-ai-web; promotion=11/11; "
"blocked=0; status=controlled_apply_auto_authorized; runtime=controlled"
)
@pytest.mark.asyncio
async def test_postgres_slow_query_gap_prefills_database_owner_review_not_restart() -> None:
incident = _incident()
incident.signals[0].alert_name = "PostgreSQLSlowQueries"
incident.signals[0].labels = {"namespace": "default", "statefulset": "postgres"}
incident.affected_services = ["postgres"]
service = RepairCandidateService(
incident_service=FakeIncidentService(),
investigator=FakeInvestigator(_evidence(incident.incident_id)),
playbook_repository=FakePlaybookRepository(_generic_fallback_playbook()),
auto_repair_service=FakeAutoRepairService(),
)
result = await service.build_from_incident(
incident=incident,
alertname="PostgreSQLSlowQueries",
target_resource="postgres",
namespace="default",
message="PostgreSQL slow query and lock waiting latency high",
fallback_action="NO_ACTION - REPAIR_CANDIDATE_MISSING",
matched_playbook_id="PB-GENERIC-FALLBACK",
severity="high",
)
assert result.candidate_found is False
assert result.draft_ready_for_owner_review is False
assert result.metadata["repair_candidate_status"] == "blocked"
assert result.metadata["repair_candidate_draft_ready"] is False
coverage_gap = result.metadata["repair_candidate_draft_package"]["coverage_gap"]
assert coverage_gap["coverage_key"] == "postgresqlslowqueries:postgres"
assert coverage_gap["target_kind"] == "database"
assert coverage_gap["blocking_stage"] == "service_playbook_coverage"
assert "postgres_readonly_activity" in coverage_gap["required_mcp_evidence_refs"]
assert "pg_stat_statements_top_queries" in coverage_gap["required_mcp_evidence_refs"]
assert "terminate_backend_without_owner_review" in coverage_gap["blocked_operations"]
draft_template = result.metadata["repair_candidate_draft_package"]["playbook_draft_template"]
assert draft_template["suggested_route"] == "database_slow_query_owner_review"
assert draft_template["repair_command_template"] == (
"owner_supplied_query_or_index_fix_after_readonly_evidence_review"
)
assert "kubectl rollout restart deployment/postgres" not in (
draft_template["repair_command_template"]
)
assert "read_only_pg_stat_activity_and_pg_locks_before_after" in (
draft_template["verifier_plan_template"]
)
assert draft_template["template_is_executable"] is False
assert draft_template["runtime_execution_authorized"] is False
@pytest.mark.asyncio
async def test_missing_mcp_evidence_records_collectable_coverage_gap() -> None:
incident = _incident()
service = RepairCandidateService(
incident_service=FakeIncidentService(),
investigator=FakeInvestigator(_evidence(incident.incident_id, sensors_succeeded=0)),
playbook_repository=FakePlaybookRepository(
_playbook("kubectl rollout restart deployment/awoooi-api -n awoooi-prod")
),
auto_repair_service=FakeAutoRepairService(),
)
result = await service.build_from_incident(
incident=incident,
alertname="NodeExporterDown",
target_resource="awoooi-api",
namespace="awoooi-prod",
message="node exporter is down",
fallback_action="NO_ACTION - REPAIR_CANDIDATE_MISSING",
matched_playbook_id="PB-REPAIR-001",
severity="medium",
)
coverage_gap = result.metadata["repair_candidate_draft_package"]["coverage_gap"]
assert coverage_gap["coverage_key"] == "nodeexporterdown:awoooi-api"
assert coverage_gap["blocking_stage"] == "mcp_evidence"
assert coverage_gap["mcp_evidence_ready"] is False
assert coverage_gap["target_kind"] == "k8s_workload"
assert "mcp_health_snapshot" in coverage_gap["required_mcp_evidence_refs"]
assert coverage_gap["runtime_execution_authorized"] is False
def test_approval_record_data_uses_preallocated_id_without_leaking_metadata() -> None:
approval_id = str(uuid4())
request = ApprovalRequestCreate(
action="kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
description="candidate",
risk_level=RiskLevel.MEDIUM,
requested_by="repair-candidate-test",
blast_radius=BlastRadius(data_impact=DataImpact.WRITE),
metadata={"preallocated_approval_id": approval_id, "source": "test"},
)
data = approval_request_to_record_data(request, RiskLevel.MEDIUM, required_sigs=1)
assert data["id"] == approval_id
assert data["extra_metadata"] == {"source": "test"}