From f88d89fc38298394ca9181029e52608f825e03bc Mon Sep 17 00:00:00 2001 From: Your Name Date: Sat, 27 Jun 2026 13:36:23 +0800 Subject: [PATCH] fix(api): backfill ansible repair candidates --- apps/api/src/core/config.py | 34 ++++ .../awooop_ansible_candidate_backfill_job.py | 188 ++++++++++++++++++ apps/api/src/main.py | 20 +- ...t_awooop_ansible_candidate_backfill_job.py | 103 ++++++++++ k8s/awoooi-prod/06-deployment-api.yaml | 10 + 5 files changed, 353 insertions(+), 2 deletions(-) create mode 100644 apps/api/src/jobs/awooop_ansible_candidate_backfill_job.py create mode 100644 apps/api/tests/test_awooop_ansible_candidate_backfill_job.py diff --git a/apps/api/src/core/config.py b/apps/api/src/core/config.py index a469cd7f..1be2d5f3 100644 --- a/apps/api/src/core/config.py +++ b/apps/api/src/core/config.py @@ -696,6 +696,40 @@ class Settings(BaseSettings): "forced-command repair SSH denial." ), ) + ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER: bool = Field( + default=True, + description=( + "True=scan recent unresolved incidents that already match an allowlisted " + "Ansible catalog row but are missing an ansible_candidate_matched AOL row, " + "then enqueue them for the existing check-mode worker." + ), + ) + AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS: int = Field( + default=300, + ge=60, + description="Polling interval for the Ansible candidate backfill worker.", + ) + AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT: int = Field( + default=5, + ge=1, + le=25, + description="Maximum backfilled incidents queued per worker tick.", + ) + AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS: int = Field( + default=24, + ge=1, + le=168, + description="Recent unresolved incident window for Ansible candidate backfill.", + ) + AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_STARTUP_SLEEP_SECONDS: int = Field( + default=60, + ge=0, + le=900, + description=( + "Delay before the candidate backfill worker first tick; should run before " + "the check-mode worker startup delay so legacy incidents become claimable." + ), + ) # ========================================================================== # 統帥鐵律:禁止 SQLite (AWOOOI 憲法) diff --git a/apps/api/src/jobs/awooop_ansible_candidate_backfill_job.py b/apps/api/src/jobs/awooop_ansible_candidate_backfill_job.py new file mode 100644 index 00000000..d9fae33b --- /dev/null +++ b/apps/api/src/jobs/awooop_ansible_candidate_backfill_job.py @@ -0,0 +1,188 @@ +"""AwoooP Ansible candidate backfill worker. + +This worker closes the gap between "AI found an allowlisted PlayBook candidate" +and "the check-mode worker has a durable AOL row to claim". It does not execute +host writes by itself; it only writes ``ansible_candidate_matched`` rows for +recent unresolved incidents that already match the static Ansible catalog. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable +from typing import Any + +import structlog +from sqlalchemy import text + +from src.core.config import settings +from src.db.base import get_db_context +from src.services.awooop_ansible_audit_service import ( + build_ansible_decision_audit_payload, + record_ansible_decision_audit, +) + +logger = structlog.get_logger(__name__) + +Recorder = Callable[..., Awaitable[bool]] + +_BACKFILL_DECISION_PATH = "repair_candidate_controlled_queue" +_BACKFILL_REASON = ( + "truth-chain found allowlisted Ansible catalog candidates but no durable " + "candidate row existed; enqueue for check-mode worker" +) + + +async def _fetch_missing_candidate_incidents( + *, + project_id: str, + window_hours: int, + scan_limit: int, +) -> list[dict[str, Any]]: + async with get_db_context(project_id) as db: + result = await db.execute( + text(""" + SELECT + incident_id, + project_id, + status::text AS status, + severity::text AS severity, + alertname, + alert_category, + notification_type, + created_at, + updated_at, + resolved_at, + verification_result, + frequency_snapshot, + signals, + decision_chain + FROM incidents + WHERE (project_id = :project_id OR project_id IS NULL) + AND created_at >= NOW() - (:window_hours * INTERVAL '1 hour') + AND resolved_at IS NULL + AND upper(coalesce(status::text, '')) NOT IN ('RESOLVED', 'CLOSED') + AND NOT EXISTS ( + SELECT 1 + FROM automation_operation_log existing + WHERE existing.operation_type = 'ansible_candidate_matched' + AND existing.input ->> 'executor' = 'ansible' + AND coalesce(existing.incident_id::text, existing.input ->> 'incident_id') = incidents.incident_id::text + ) + ORDER BY created_at DESC + LIMIT :scan_limit + """), + { + "project_id": project_id, + "window_hours": max(1, window_hours), + "scan_limit": max(1, scan_limit), + }, + ) + return [dict(row) for row in result.mappings().all()] + + +def _build_backfill_proposal(incident: dict[str, Any]) -> dict[str, Any]: + return { + "source": "truth_chain_candidate_backfill", + "risk_level": str(incident.get("severity") or ""), + "action": "enqueue_allowlisted_ansible_check_mode", + "alertname": incident.get("alertname"), + } + + +async def enqueue_missing_ansible_candidates_once( + *, + project_id: str = "awoooi", + limit: int | None = None, + window_hours: int | None = None, + recorder: Recorder = record_ansible_decision_audit, +) -> dict[str, Any]: + """Backfill missing Ansible candidate rows for recent unresolved incidents.""" + + if not settings.ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER: + return { + "skipped": True, + "scanned": 0, + "queued": 0, + "already_existing_or_write_skipped": 0, + "no_catalog_candidate": 0, + "error": None, + } + + bounded_limit = max(1, limit or settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT) + bounded_window_hours = max( + 1, + window_hours or settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS, + ) + scan_limit = min(100, max(25, bounded_limit * 5)) + stats: dict[str, Any] = { + "skipped": False, + "scanned": 0, + "queued": 0, + "already_existing_or_write_skipped": 0, + "no_catalog_candidate": 0, + "error": None, + } + + try: + incidents = await _fetch_missing_candidate_incidents( + project_id=project_id, + window_hours=bounded_window_hours, + scan_limit=scan_limit, + ) + stats["scanned"] = len(incidents) + for incident in incidents: + if stats["queued"] >= bounded_limit: + break + payload = build_ansible_decision_audit_payload( + incident=incident, + proposal_data=_build_backfill_proposal(incident), + decision_path=_BACKFILL_DECISION_PATH, + not_used_reason=_BACKFILL_REASON, + ) + if payload is None: + stats["no_catalog_candidate"] += 1 + continue + inserted = await recorder( + incident=incident, + proposal_data=_build_backfill_proposal(incident), + decision_path=_BACKFILL_DECISION_PATH, + not_used_reason=_BACKFILL_REASON, + ) + if inserted: + stats["queued"] += 1 + else: + stats["already_existing_or_write_skipped"] += 1 + except Exception as exc: + stats["error"] = f"{type(exc).__name__}: {exc}"[:500] + logger.warning("awooop_ansible_candidate_backfill_once_failed", **stats) + + logger.info("awooop_ansible_candidate_backfill_once_done", **stats) + return stats + + +async def run_awooop_ansible_candidate_backfill_loop() -> None: + if not settings.ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER: + logger.info("awooop_ansible_candidate_backfill_worker_disabled") + return + + logger.info( + "awooop_ansible_candidate_backfill_worker_started", + interval_seconds=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS, + batch_limit=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT, + window_hours=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS, + ) + await asyncio.sleep(settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_STARTUP_SLEEP_SECONDS) + + while True: + try: + result = await enqueue_missing_ansible_candidates_once( + limit=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT, + window_hours=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS, + ) + if result.get("queued") or result.get("error"): + logger.info("awooop_ansible_candidate_backfill_worker_tick", **result) + except Exception as exc: + logger.warning("awooop_ansible_candidate_backfill_worker_failed", error=str(exc)) + + await asyncio.sleep(settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS) diff --git a/apps/api/src/main.py b/apps/api/src/main.py index d8eac3a0..ed8548cb 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -558,9 +558,25 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("incident_lifecycle_reconciler_schedule_failed", error=str(e)) + # AwoooP Ansible candidate backfill worker. + # 把近期已命中 allowlisted PlayBook、但缺 durable candidate row 的事故補進 + # ansible_candidate_matched 佇列,讓 check-mode worker 可以主動認領。 + try: + from src.jobs.awooop_ansible_candidate_backfill_job import ( + run_awooop_ansible_candidate_backfill_loop, + ) + asyncio.create_task(run_awooop_ansible_candidate_backfill_loop()) + logger.info( + "awooop_ansible_candidate_backfill_worker_scheduled", + enabled=settings.ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER, + interval_seconds=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS, + ) + except Exception as e: + logger.warning("awooop_ansible_candidate_backfill_worker_schedule_failed", error=str(e)) + # AwoooP Ansible check-mode worker. - # 只執行 ansible-playbook --check --diff 並回寫 automation_operation_log; - # apply 仍必須走 approval gate,本 worker 不寫 auto_repair_executions。 + # 先執行 ansible-playbook --check --diff 並回寫 automation_operation_log; + # 通過後由 controlled apply guard 依 catalog/risk/verifier 進一步接管。 try: from src.jobs.awooop_ansible_check_mode_job import ( run_awooop_ansible_check_mode_loop, diff --git a/apps/api/tests/test_awooop_ansible_candidate_backfill_job.py b/apps/api/tests/test_awooop_ansible_candidate_backfill_job.py new file mode 100644 index 00000000..a254a145 --- /dev/null +++ b/apps/api/tests/test_awooop_ansible_candidate_backfill_job.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +from contextlib import asynccontextmanager +from unittest.mock import AsyncMock, MagicMock + +import pytest + + +class _FakeMappings: + def __init__(self, rows: list[dict]): + self._rows = rows + + def all(self) -> list[dict]: + return self._rows + + +class _FakeResult: + def __init__(self, rows: list[dict]): + self._rows = rows + + def mappings(self) -> _FakeMappings: + return _FakeMappings(self._rows) + + +def _candidate_incident() -> dict: + return { + "incident_id": "INC-20260627-NODE110", + "project_id": "awoooi", + "status": "INVESTIGATING", + "severity": "warning", + "alertname": "NodeExporterDown", + "alert_category": "infrastructure", + "notification_type": "TYPE-3", + "signals": [ + { + "alert_name": "NodeExporterDown", + "labels": { + "alertname": "NodeExporterDown", + "instance": "node-exporter-110", + }, + "annotations": {}, + } + ], + } + + +@pytest.mark.asyncio +async def test_backfill_enqueues_catalog_matched_incident(monkeypatch: pytest.MonkeyPatch) -> None: + from src.jobs import awooop_ansible_candidate_backfill_job as job + + fake_db = AsyncMock() + fake_db.execute = AsyncMock(return_value=_FakeResult([_candidate_incident()])) + + @asynccontextmanager + async def fake_db_context(project_id: str = "awoooi"): + assert project_id == "awoooi" + yield fake_db + + recorded: list[dict] = [] + + async def fake_recorder(**kwargs): + recorded.append(kwargs) + return True + + monkeypatch.setattr(job, "get_db_context", fake_db_context) + monkeypatch.setattr(job.settings, "ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER", True) + + result = await job.enqueue_missing_ansible_candidates_once( + project_id="awoooi", + limit=5, + window_hours=24, + recorder=fake_recorder, + ) + + assert result["queued"] == 1 + assert result["no_catalog_candidate"] == 0 + assert recorded[0]["decision_path"] == "repair_candidate_controlled_queue" + assert recorded[0]["incident"]["incident_id"] == "INC-20260627-NODE110" + + +@pytest.mark.asyncio +async def test_backfill_skips_when_worker_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + from src.jobs import awooop_ansible_candidate_backfill_job as job + + monkeypatch.setattr(job.settings, "ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER", False) + + result = await job.enqueue_missing_ansible_candidates_once() + + assert result["skipped"] is True + assert result["queued"] == 0 + + +def test_backfill_query_excludes_existing_ansible_candidate_rows() -> None: + from src.jobs import awooop_ansible_candidate_backfill_job as job + + source = MagicMock(wraps=job._fetch_missing_candidate_incidents) + query_source = job._fetch_missing_candidate_incidents.__code__.co_consts + joined = "\n".join(str(item) for item in query_source) + + assert source is not None + assert "NOT EXISTS" in joined + assert "ansible_candidate_matched" in joined + assert "resolved_at IS NULL" in joined diff --git a/k8s/awoooi-prod/06-deployment-api.yaml b/k8s/awoooi-prod/06-deployment-api.yaml index 2949ade0..f90d722f 100644 --- a/k8s/awoooi-prod/06-deployment-api.yaml +++ b/k8s/awoooi-prod/06-deployment-api.yaml @@ -137,6 +137,16 @@ spec: value: "24" - name: AWOOOP_ANSIBLE_CHECK_MODE_TRANSPORT_COOLDOWN_SECONDS value: "21600" + - name: ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER + value: "true" + - name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS + value: "300" + - name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT + value: "5" + - name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS + value: "24" + - name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_STARTUP_SLEEP_SECONDS + value: "60" # 2026-04-05 Claude Code: Sprint 3 — 掛載 SSH key 供 HostRepairAgent 使用 volumeMounts: - name: repair-ssh-key