fix(api): backfill ansible repair candidates
All checks were successful
Code Review / ai-code-review (push) Successful in 20s
CD Pipeline / tests (push) Successful in 1m45s
CD Pipeline / build-and-deploy (push) Successful in 4m46s
CD Pipeline / post-deploy-checks (push) Successful in 1m43s

This commit is contained in:
Your Name
2026-06-27 13:36:23 +08:00
parent bdee5e97e1
commit f88d89fc38
5 changed files with 353 additions and 2 deletions

View File

@@ -696,6 +696,40 @@ class Settings(BaseSettings):
"forced-command repair SSH denial."
),
)
ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER: bool = Field(
default=True,
description=(
"True=scan recent unresolved incidents that already match an allowlisted "
"Ansible catalog row but are missing an ansible_candidate_matched AOL row, "
"then enqueue them for the existing check-mode worker."
),
)
AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS: int = Field(
default=300,
ge=60,
description="Polling interval for the Ansible candidate backfill worker.",
)
AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT: int = Field(
default=5,
ge=1,
le=25,
description="Maximum backfilled incidents queued per worker tick.",
)
AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS: int = Field(
default=24,
ge=1,
le=168,
description="Recent unresolved incident window for Ansible candidate backfill.",
)
AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_STARTUP_SLEEP_SECONDS: int = Field(
default=60,
ge=0,
le=900,
description=(
"Delay before the candidate backfill worker first tick; should run before "
"the check-mode worker startup delay so legacy incidents become claimable."
),
)
# ==========================================================================
# 統帥鐵律:禁止 SQLite (AWOOOI 憲法)

View File

@@ -0,0 +1,188 @@
"""AwoooP Ansible candidate backfill worker.
This worker closes the gap between "AI found an allowlisted PlayBook candidate"
and "the check-mode worker has a durable AOL row to claim". It does not execute
host writes by itself; it only writes ``ansible_candidate_matched`` rows for
recent unresolved incidents that already match the static Ansible catalog.
"""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
from typing import Any
import structlog
from sqlalchemy import text
from src.core.config import settings
from src.db.base import get_db_context
from src.services.awooop_ansible_audit_service import (
build_ansible_decision_audit_payload,
record_ansible_decision_audit,
)
logger = structlog.get_logger(__name__)
Recorder = Callable[..., Awaitable[bool]]
_BACKFILL_DECISION_PATH = "repair_candidate_controlled_queue"
_BACKFILL_REASON = (
"truth-chain found allowlisted Ansible catalog candidates but no durable "
"candidate row existed; enqueue for check-mode worker"
)
async def _fetch_missing_candidate_incidents(
*,
project_id: str,
window_hours: int,
scan_limit: int,
) -> list[dict[str, Any]]:
async with get_db_context(project_id) as db:
result = await db.execute(
text("""
SELECT
incident_id,
project_id,
status::text AS status,
severity::text AS severity,
alertname,
alert_category,
notification_type,
created_at,
updated_at,
resolved_at,
verification_result,
frequency_snapshot,
signals,
decision_chain
FROM incidents
WHERE (project_id = :project_id OR project_id IS NULL)
AND created_at >= NOW() - (:window_hours * INTERVAL '1 hour')
AND resolved_at IS NULL
AND upper(coalesce(status::text, '')) NOT IN ('RESOLVED', 'CLOSED')
AND NOT EXISTS (
SELECT 1
FROM automation_operation_log existing
WHERE existing.operation_type = 'ansible_candidate_matched'
AND existing.input ->> 'executor' = 'ansible'
AND coalesce(existing.incident_id::text, existing.input ->> 'incident_id') = incidents.incident_id::text
)
ORDER BY created_at DESC
LIMIT :scan_limit
"""),
{
"project_id": project_id,
"window_hours": max(1, window_hours),
"scan_limit": max(1, scan_limit),
},
)
return [dict(row) for row in result.mappings().all()]
def _build_backfill_proposal(incident: dict[str, Any]) -> dict[str, Any]:
return {
"source": "truth_chain_candidate_backfill",
"risk_level": str(incident.get("severity") or ""),
"action": "enqueue_allowlisted_ansible_check_mode",
"alertname": incident.get("alertname"),
}
async def enqueue_missing_ansible_candidates_once(
*,
project_id: str = "awoooi",
limit: int | None = None,
window_hours: int | None = None,
recorder: Recorder = record_ansible_decision_audit,
) -> dict[str, Any]:
"""Backfill missing Ansible candidate rows for recent unresolved incidents."""
if not settings.ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER:
return {
"skipped": True,
"scanned": 0,
"queued": 0,
"already_existing_or_write_skipped": 0,
"no_catalog_candidate": 0,
"error": None,
}
bounded_limit = max(1, limit or settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT)
bounded_window_hours = max(
1,
window_hours or settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS,
)
scan_limit = min(100, max(25, bounded_limit * 5))
stats: dict[str, Any] = {
"skipped": False,
"scanned": 0,
"queued": 0,
"already_existing_or_write_skipped": 0,
"no_catalog_candidate": 0,
"error": None,
}
try:
incidents = await _fetch_missing_candidate_incidents(
project_id=project_id,
window_hours=bounded_window_hours,
scan_limit=scan_limit,
)
stats["scanned"] = len(incidents)
for incident in incidents:
if stats["queued"] >= bounded_limit:
break
payload = build_ansible_decision_audit_payload(
incident=incident,
proposal_data=_build_backfill_proposal(incident),
decision_path=_BACKFILL_DECISION_PATH,
not_used_reason=_BACKFILL_REASON,
)
if payload is None:
stats["no_catalog_candidate"] += 1
continue
inserted = await recorder(
incident=incident,
proposal_data=_build_backfill_proposal(incident),
decision_path=_BACKFILL_DECISION_PATH,
not_used_reason=_BACKFILL_REASON,
)
if inserted:
stats["queued"] += 1
else:
stats["already_existing_or_write_skipped"] += 1
except Exception as exc:
stats["error"] = f"{type(exc).__name__}: {exc}"[:500]
logger.warning("awooop_ansible_candidate_backfill_once_failed", **stats)
logger.info("awooop_ansible_candidate_backfill_once_done", **stats)
return stats
async def run_awooop_ansible_candidate_backfill_loop() -> None:
if not settings.ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER:
logger.info("awooop_ansible_candidate_backfill_worker_disabled")
return
logger.info(
"awooop_ansible_candidate_backfill_worker_started",
interval_seconds=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS,
batch_limit=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT,
window_hours=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS,
)
await asyncio.sleep(settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_STARTUP_SLEEP_SECONDS)
while True:
try:
result = await enqueue_missing_ansible_candidates_once(
limit=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT,
window_hours=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS,
)
if result.get("queued") or result.get("error"):
logger.info("awooop_ansible_candidate_backfill_worker_tick", **result)
except Exception as exc:
logger.warning("awooop_ansible_candidate_backfill_worker_failed", error=str(exc))
await asyncio.sleep(settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS)

View File

@@ -558,9 +558,25 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("incident_lifecycle_reconciler_schedule_failed", error=str(e))
# AwoooP Ansible candidate backfill worker.
# 把近期已命中 allowlisted PlayBook、但缺 durable candidate row 的事故補進
# ansible_candidate_matched 佇列,讓 check-mode worker 可以主動認領。
try:
from src.jobs.awooop_ansible_candidate_backfill_job import (
run_awooop_ansible_candidate_backfill_loop,
)
asyncio.create_task(run_awooop_ansible_candidate_backfill_loop())
logger.info(
"awooop_ansible_candidate_backfill_worker_scheduled",
enabled=settings.ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER,
interval_seconds=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS,
)
except Exception as e:
logger.warning("awooop_ansible_candidate_backfill_worker_schedule_failed", error=str(e))
# AwoooP Ansible check-mode worker.
# 執行 ansible-playbook --check --diff 並回寫 automation_operation_log
# apply 仍必須走 approval gate本 worker 不寫 auto_repair_executions
# 執行 ansible-playbook --check --diff 並回寫 automation_operation_log
# 通過後由 controlled apply guard 依 catalog/risk/verifier 進一步接管
try:
from src.jobs.awooop_ansible_check_mode_job import (
run_awooop_ansible_check_mode_loop,

View File

@@ -0,0 +1,103 @@
from __future__ import annotations
from contextlib import asynccontextmanager
from unittest.mock import AsyncMock, MagicMock
import pytest
class _FakeMappings:
def __init__(self, rows: list[dict]):
self._rows = rows
def all(self) -> list[dict]:
return self._rows
class _FakeResult:
def __init__(self, rows: list[dict]):
self._rows = rows
def mappings(self) -> _FakeMappings:
return _FakeMappings(self._rows)
def _candidate_incident() -> dict:
return {
"incident_id": "INC-20260627-NODE110",
"project_id": "awoooi",
"status": "INVESTIGATING",
"severity": "warning",
"alertname": "NodeExporterDown",
"alert_category": "infrastructure",
"notification_type": "TYPE-3",
"signals": [
{
"alert_name": "NodeExporterDown",
"labels": {
"alertname": "NodeExporterDown",
"instance": "node-exporter-110",
},
"annotations": {},
}
],
}
@pytest.mark.asyncio
async def test_backfill_enqueues_catalog_matched_incident(monkeypatch: pytest.MonkeyPatch) -> None:
from src.jobs import awooop_ansible_candidate_backfill_job as job
fake_db = AsyncMock()
fake_db.execute = AsyncMock(return_value=_FakeResult([_candidate_incident()]))
@asynccontextmanager
async def fake_db_context(project_id: str = "awoooi"):
assert project_id == "awoooi"
yield fake_db
recorded: list[dict] = []
async def fake_recorder(**kwargs):
recorded.append(kwargs)
return True
monkeypatch.setattr(job, "get_db_context", fake_db_context)
monkeypatch.setattr(job.settings, "ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER", True)
result = await job.enqueue_missing_ansible_candidates_once(
project_id="awoooi",
limit=5,
window_hours=24,
recorder=fake_recorder,
)
assert result["queued"] == 1
assert result["no_catalog_candidate"] == 0
assert recorded[0]["decision_path"] == "repair_candidate_controlled_queue"
assert recorded[0]["incident"]["incident_id"] == "INC-20260627-NODE110"
@pytest.mark.asyncio
async def test_backfill_skips_when_worker_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
from src.jobs import awooop_ansible_candidate_backfill_job as job
monkeypatch.setattr(job.settings, "ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER", False)
result = await job.enqueue_missing_ansible_candidates_once()
assert result["skipped"] is True
assert result["queued"] == 0
def test_backfill_query_excludes_existing_ansible_candidate_rows() -> None:
from src.jobs import awooop_ansible_candidate_backfill_job as job
source = MagicMock(wraps=job._fetch_missing_candidate_incidents)
query_source = job._fetch_missing_candidate_incidents.__code__.co_consts
joined = "\n".join(str(item) for item in query_source)
assert source is not None
assert "NOT EXISTS" in joined
assert "ansible_candidate_matched" in joined
assert "resolved_at IS NULL" in joined

View File

@@ -137,6 +137,16 @@ spec:
value: "24"
- name: AWOOOP_ANSIBLE_CHECK_MODE_TRANSPORT_COOLDOWN_SECONDS
value: "21600"
- name: ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER
value: "true"
- name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS
value: "300"
- name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT
value: "5"
- name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS
value: "24"
- name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_STARTUP_SLEEP_SECONDS
value: "60"
# 2026-04-05 Claude Code: Sprint 3 — 掛載 SSH key 供 HostRepairAgent 使用
volumeMounts:
- name: repair-ssh-key