diff --git a/apps/api/src/jobs/awooop_ansible_candidate_backfill_job.py b/apps/api/src/jobs/awooop_ansible_candidate_backfill_job.py index d9fae33b..2858239f 100644 --- a/apps/api/src/jobs/awooop_ansible_candidate_backfill_job.py +++ b/apps/api/src/jobs/awooop_ansible_candidate_backfill_job.py @@ -21,6 +21,9 @@ from src.services.awooop_ansible_audit_service import ( build_ansible_decision_audit_payload, record_ansible_decision_audit, ) +from src.services.awooop_ansible_check_mode_service import ( + backfill_missing_auto_repair_execution_receipts_once, +) logger = structlog.get_logger(__name__) @@ -96,6 +99,7 @@ async def enqueue_missing_ansible_candidates_once( limit: int | None = None, window_hours: int | None = None, recorder: Recorder = record_ansible_decision_audit, + receipt_backfiller: Callable[..., Awaitable[dict[str, Any]]] = backfill_missing_auto_repair_execution_receipts_once, ) -> dict[str, Any]: """Backfill missing Ansible candidate rows for recent unresolved incidents.""" @@ -121,6 +125,7 @@ async def enqueue_missing_ansible_candidates_once( "queued": 0, "already_existing_or_write_skipped": 0, "no_catalog_candidate": 0, + "repair_receipts_backfilled": 0, "error": None, } @@ -153,6 +158,14 @@ async def enqueue_missing_ansible_candidates_once( stats["queued"] += 1 else: stats["already_existing_or_write_skipped"] += 1 + receipt_stats = await receipt_backfiller( + project_id=project_id, + window_hours=bounded_window_hours, + limit=bounded_limit, + ) + stats["repair_receipts_backfilled"] = int(receipt_stats.get("written") or 0) + if receipt_stats.get("error") and not stats["error"]: + stats["error"] = receipt_stats["error"] except Exception as exc: stats["error"] = f"{type(exc).__name__}: {exc}"[:500] logger.warning("awooop_ansible_candidate_backfill_once_failed", **stats) diff --git a/apps/api/src/services/awooop_ansible_check_mode_service.py b/apps/api/src/services/awooop_ansible_check_mode_service.py index 10fb9271..d8c1a990 100644 --- a/apps/api/src/services/awooop_ansible_check_mode_service.py +++ b/apps/api/src/services/awooop_ansible_check_mode_service.py @@ -510,6 +510,67 @@ def _build_auto_repair_execution_receipt( } +def _int_from_value(value: Any, *, default: int = 1) -> int: + if value in (None, ""): + return default + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _claim_from_apply_operation_row(row: dict[str, Any]) -> tuple[AnsibleCheckModeClaim, AnsibleRunResult] | None: + input_payload = _json_loads(row.get("input")) + output_payload = _json_loads(row.get("output")) + dry_run_result = _json_loads(row.get("dry_run_result")) + catalog_id = str(input_payload.get("catalog_id") or "") + if not catalog_id: + return None + catalog_item = get_ansible_catalog_item(catalog_id) or {} + incident_id = str(row.get("incident_id") or input_payload.get("incident_id") or "") + if not incident_id: + return None + inventory_hosts = input_payload.get("inventory_hosts") or catalog_item.get("inventory_hosts") or [] + if not isinstance(inventory_hosts, list) or not inventory_hosts: + return None + apply_playbook_path = str( + input_payload.get("apply_playbook_path") + or input_payload.get("playbook_path") + or catalog_item.get("playbook_path") + or "" + ) + check_mode_playbook_path = str( + input_payload.get("check_mode_playbook_path") + or input_payload.get("playbook_path") + or apply_playbook_path + ) + if not apply_playbook_path: + return None + returncode = _int_from_value( + output_payload.get("returncode", dry_run_result.get("returncode")), + default=0 if str(row.get("status") or "").lower() == "success" else 1, + ) + claim = AnsibleCheckModeClaim( + op_id=str(input_payload.get("check_mode_op_id") or row.get("parent_op_id") or ""), + source_candidate_op_id=str(input_payload.get("source_candidate_op_id") or ""), + incident_id=incident_id, + catalog_id=catalog_id, + playbook_path=check_mode_playbook_path, + apply_playbook_path=apply_playbook_path, + inventory_hosts=tuple(str(host) for host in inventory_hosts), + risk_level=str(input_payload.get("risk_level") or catalog_item.get("risk_level") or ""), + input_payload=input_payload, + ) + result = AnsibleRunResult( + returncode=returncode, + stdout=str(output_payload.get("stdout_tail") or ""), + stderr=str(row.get("error") or output_payload.get("stderr_tail") or ""), + duration_ms=_int_from_value(row.get("duration_ms"), default=0), + timed_out=bool(output_payload.get("timed_out") or dry_run_result.get("timed_out") or False), + ) + return claim, result + + async def _record_auto_repair_execution_receipt( claim: AnsibleCheckModeClaim, result: AnsibleRunResult, @@ -576,6 +637,66 @@ async def _record_auto_repair_execution_receipt( return False +async def backfill_missing_auto_repair_execution_receipts_once( + *, + project_id: str = "awoooi", + window_hours: int = 24, + limit: int = 10, +) -> dict[str, Any]: + """Create auto_repair_executions receipts for existing Ansible apply rows.""" + + stats = {"scanned": 0, "written": 0, "skipped": 0, "error": None} + try: + async with get_db_context(project_id) as db: + result = await db.execute( + text(""" + SELECT + apply.op_id::text AS op_id, + apply.parent_op_id::text AS parent_op_id, + coalesce(apply.incident_id::text, apply.input ->> 'incident_id') AS incident_id, + apply.input, + apply.output, + apply.dry_run_result, + apply.error, + apply.duration_ms, + apply.status, + apply.created_at + FROM automation_operation_log apply + WHERE apply.operation_type = 'ansible_apply_executed' + AND apply.created_at >= NOW() - (:window_hours * INTERVAL '1 hour') + AND NOT EXISTS ( + SELECT 1 + FROM auto_repair_executions existing + WHERE existing.executed_steps::text LIKE '%' || apply.op_id::text || '%' + ) + ORDER BY apply.created_at DESC + LIMIT :limit + """), + {"window_hours": max(1, window_hours), "limit": max(1, limit)}, + ) + rows = [dict(row) for row in result.mappings().all()] + stats["scanned"] = len(rows) + for row in rows: + reconstructed = _claim_from_apply_operation_row(row) + if reconstructed is None: + stats["skipped"] += 1 + continue + claim, result = reconstructed + if await _record_auto_repair_execution_receipt( + claim, + result, + apply_op_id=str(row.get("op_id") or ""), + project_id=project_id, + ): + stats["written"] += 1 + else: + stats["skipped"] += 1 + except Exception as exc: + stats["error"] = f"{type(exc).__name__}: {exc}"[:500] + logger.warning("ansible_auto_repair_execution_receipt_backfill_failed", **stats) + return stats + + async def claim_pending_check_modes( *, project_id: str = "awoooi", diff --git a/apps/api/tests/test_awooop_ansible_candidate_backfill_job.py b/apps/api/tests/test_awooop_ansible_candidate_backfill_job.py index a254a145..9ce5f198 100644 --- a/apps/api/tests/test_awooop_ansible_candidate_backfill_job.py +++ b/apps/api/tests/test_awooop_ansible_candidate_backfill_job.py @@ -62,6 +62,10 @@ async def test_backfill_enqueues_catalog_matched_incident(monkeypatch: pytest.Mo recorded.append(kwargs) return True + async def fake_receipt_backfiller(**kwargs): + assert kwargs["project_id"] == "awoooi" + return {"written": 2, "error": None} + monkeypatch.setattr(job, "get_db_context", fake_db_context) monkeypatch.setattr(job.settings, "ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER", True) @@ -70,10 +74,12 @@ async def test_backfill_enqueues_catalog_matched_incident(monkeypatch: pytest.Mo limit=5, window_hours=24, recorder=fake_recorder, + receipt_backfiller=fake_receipt_backfiller, ) assert result["queued"] == 1 assert result["no_catalog_candidate"] == 0 + assert result["repair_receipts_backfilled"] == 2 assert recorded[0]["decision_path"] == "repair_candidate_controlled_queue" assert recorded[0]["incident"]["incident_id"] == "INC-20260627-NODE110" diff --git a/apps/api/tests/test_awooop_truth_chain_service.py b/apps/api/tests/test_awooop_truth_chain_service.py index 5656b8b4..0851faa4 100644 --- a/apps/api/tests/test_awooop_truth_chain_service.py +++ b/apps/api/tests/test_awooop_truth_chain_service.py @@ -16,9 +16,11 @@ from src.services.awooop_ansible_check_mode_service import ( AnsibleRunResult, _automation_operation_log_incident_id, _build_auto_repair_execution_receipt, + _claim_from_apply_operation_row, build_ansible_apply_command, build_ansible_check_mode_claim_input, build_ansible_check_mode_command, + backfill_missing_auto_repair_execution_receipts_once, claim_pending_check_modes, detect_ansible_transport_blockers, recent_ansible_transport_blockers, @@ -1479,6 +1481,44 @@ def test_ansible_controlled_apply_builds_auto_repair_receipt() -> None: assert receipt["execution_time_ms"] == 1234 +def test_ansible_apply_operation_row_can_backfill_auto_repair_receipt() -> None: + reconstructed = _claim_from_apply_operation_row({ + "op_id": "apply-op-1", + "parent_op_id": "check-op-1", + "incident_id": "INC-20260627-NODE110", + "status": "success", + "input": { + "incident_id": "INC-20260627-NODE110", + "catalog_id": "ansible:110-devops", + "source_candidate_op_id": "candidate-op-1", + "check_mode_op_id": "check-op-1", + "playbook_path": "infra/ansible/playbooks/110-devops.yml", + "apply_playbook_path": "infra/ansible/playbooks/110-devops.yml", + "inventory_hosts": ["host_110"], + "risk_level": "medium", + }, + "output": {"returncode": 0, "stdout_tail": "ok"}, + "dry_run_result": {"apply_executed": True}, + "duration_ms": 456, + }) + + assert reconstructed is not None + claim, result = reconstructed + assert claim.incident_id == "INC-20260627-NODE110" + assert claim.catalog_id == "ansible:110-devops" + assert claim.apply_playbook_path == "infra/ansible/playbooks/110-devops.yml" + assert result.returncode == 0 + assert result.duration_ms == 456 + + +def test_ansible_apply_receipt_backfill_queries_existing_apply_rows() -> None: + source = inspect.getsource(backfill_missing_auto_repair_execution_receipts_once) + + assert "operation_type = 'ansible_apply_executed'" in source + assert "auto_repair_executions existing" in source + assert "executed_steps::text LIKE" in source + + def test_ansible_claim_query_limits_recent_candidate_backlog() -> None: source = inspect.getsource(claim_pending_check_modes)