fix(api): backfill ansible repair candidates
This commit is contained in:
@@ -696,6 +696,40 @@ class Settings(BaseSettings):
|
||||
"forced-command repair SSH denial."
|
||||
),
|
||||
)
|
||||
ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER: bool = Field(
|
||||
default=True,
|
||||
description=(
|
||||
"True=scan recent unresolved incidents that already match an allowlisted "
|
||||
"Ansible catalog row but are missing an ansible_candidate_matched AOL row, "
|
||||
"then enqueue them for the existing check-mode worker."
|
||||
),
|
||||
)
|
||||
AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS: int = Field(
|
||||
default=300,
|
||||
ge=60,
|
||||
description="Polling interval for the Ansible candidate backfill worker.",
|
||||
)
|
||||
AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT: int = Field(
|
||||
default=5,
|
||||
ge=1,
|
||||
le=25,
|
||||
description="Maximum backfilled incidents queued per worker tick.",
|
||||
)
|
||||
AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS: int = Field(
|
||||
default=24,
|
||||
ge=1,
|
||||
le=168,
|
||||
description="Recent unresolved incident window for Ansible candidate backfill.",
|
||||
)
|
||||
AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_STARTUP_SLEEP_SECONDS: int = Field(
|
||||
default=60,
|
||||
ge=0,
|
||||
le=900,
|
||||
description=(
|
||||
"Delay before the candidate backfill worker first tick; should run before "
|
||||
"the check-mode worker startup delay so legacy incidents become claimable."
|
||||
),
|
||||
)
|
||||
|
||||
# ==========================================================================
|
||||
# 統帥鐵律:禁止 SQLite (AWOOOI 憲法)
|
||||
|
||||
188
apps/api/src/jobs/awooop_ansible_candidate_backfill_job.py
Normal file
188
apps/api/src/jobs/awooop_ansible_candidate_backfill_job.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""AwoooP Ansible candidate backfill worker.
|
||||
|
||||
This worker closes the gap between "AI found an allowlisted PlayBook candidate"
|
||||
and "the check-mode worker has a durable AOL row to claim". It does not execute
|
||||
host writes by itself; it only writes ``ansible_candidate_matched`` rows for
|
||||
recent unresolved incidents that already match the static Ansible catalog.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import text
|
||||
|
||||
from src.core.config import settings
|
||||
from src.db.base import get_db_context
|
||||
from src.services.awooop_ansible_audit_service import (
|
||||
build_ansible_decision_audit_payload,
|
||||
record_ansible_decision_audit,
|
||||
)
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
Recorder = Callable[..., Awaitable[bool]]
|
||||
|
||||
_BACKFILL_DECISION_PATH = "repair_candidate_controlled_queue"
|
||||
_BACKFILL_REASON = (
|
||||
"truth-chain found allowlisted Ansible catalog candidates but no durable "
|
||||
"candidate row existed; enqueue for check-mode worker"
|
||||
)
|
||||
|
||||
|
||||
async def _fetch_missing_candidate_incidents(
|
||||
*,
|
||||
project_id: str,
|
||||
window_hours: int,
|
||||
scan_limit: int,
|
||||
) -> list[dict[str, Any]]:
|
||||
async with get_db_context(project_id) as db:
|
||||
result = await db.execute(
|
||||
text("""
|
||||
SELECT
|
||||
incident_id,
|
||||
project_id,
|
||||
status::text AS status,
|
||||
severity::text AS severity,
|
||||
alertname,
|
||||
alert_category,
|
||||
notification_type,
|
||||
created_at,
|
||||
updated_at,
|
||||
resolved_at,
|
||||
verification_result,
|
||||
frequency_snapshot,
|
||||
signals,
|
||||
decision_chain
|
||||
FROM incidents
|
||||
WHERE (project_id = :project_id OR project_id IS NULL)
|
||||
AND created_at >= NOW() - (:window_hours * INTERVAL '1 hour')
|
||||
AND resolved_at IS NULL
|
||||
AND upper(coalesce(status::text, '')) NOT IN ('RESOLVED', 'CLOSED')
|
||||
AND NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM automation_operation_log existing
|
||||
WHERE existing.operation_type = 'ansible_candidate_matched'
|
||||
AND existing.input ->> 'executor' = 'ansible'
|
||||
AND coalesce(existing.incident_id::text, existing.input ->> 'incident_id') = incidents.incident_id::text
|
||||
)
|
||||
ORDER BY created_at DESC
|
||||
LIMIT :scan_limit
|
||||
"""),
|
||||
{
|
||||
"project_id": project_id,
|
||||
"window_hours": max(1, window_hours),
|
||||
"scan_limit": max(1, scan_limit),
|
||||
},
|
||||
)
|
||||
return [dict(row) for row in result.mappings().all()]
|
||||
|
||||
|
||||
def _build_backfill_proposal(incident: dict[str, Any]) -> dict[str, Any]:
|
||||
return {
|
||||
"source": "truth_chain_candidate_backfill",
|
||||
"risk_level": str(incident.get("severity") or ""),
|
||||
"action": "enqueue_allowlisted_ansible_check_mode",
|
||||
"alertname": incident.get("alertname"),
|
||||
}
|
||||
|
||||
|
||||
async def enqueue_missing_ansible_candidates_once(
|
||||
*,
|
||||
project_id: str = "awoooi",
|
||||
limit: int | None = None,
|
||||
window_hours: int | None = None,
|
||||
recorder: Recorder = record_ansible_decision_audit,
|
||||
) -> dict[str, Any]:
|
||||
"""Backfill missing Ansible candidate rows for recent unresolved incidents."""
|
||||
|
||||
if not settings.ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER:
|
||||
return {
|
||||
"skipped": True,
|
||||
"scanned": 0,
|
||||
"queued": 0,
|
||||
"already_existing_or_write_skipped": 0,
|
||||
"no_catalog_candidate": 0,
|
||||
"error": None,
|
||||
}
|
||||
|
||||
bounded_limit = max(1, limit or settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT)
|
||||
bounded_window_hours = max(
|
||||
1,
|
||||
window_hours or settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS,
|
||||
)
|
||||
scan_limit = min(100, max(25, bounded_limit * 5))
|
||||
stats: dict[str, Any] = {
|
||||
"skipped": False,
|
||||
"scanned": 0,
|
||||
"queued": 0,
|
||||
"already_existing_or_write_skipped": 0,
|
||||
"no_catalog_candidate": 0,
|
||||
"error": None,
|
||||
}
|
||||
|
||||
try:
|
||||
incidents = await _fetch_missing_candidate_incidents(
|
||||
project_id=project_id,
|
||||
window_hours=bounded_window_hours,
|
||||
scan_limit=scan_limit,
|
||||
)
|
||||
stats["scanned"] = len(incidents)
|
||||
for incident in incidents:
|
||||
if stats["queued"] >= bounded_limit:
|
||||
break
|
||||
payload = build_ansible_decision_audit_payload(
|
||||
incident=incident,
|
||||
proposal_data=_build_backfill_proposal(incident),
|
||||
decision_path=_BACKFILL_DECISION_PATH,
|
||||
not_used_reason=_BACKFILL_REASON,
|
||||
)
|
||||
if payload is None:
|
||||
stats["no_catalog_candidate"] += 1
|
||||
continue
|
||||
inserted = await recorder(
|
||||
incident=incident,
|
||||
proposal_data=_build_backfill_proposal(incident),
|
||||
decision_path=_BACKFILL_DECISION_PATH,
|
||||
not_used_reason=_BACKFILL_REASON,
|
||||
)
|
||||
if inserted:
|
||||
stats["queued"] += 1
|
||||
else:
|
||||
stats["already_existing_or_write_skipped"] += 1
|
||||
except Exception as exc:
|
||||
stats["error"] = f"{type(exc).__name__}: {exc}"[:500]
|
||||
logger.warning("awooop_ansible_candidate_backfill_once_failed", **stats)
|
||||
|
||||
logger.info("awooop_ansible_candidate_backfill_once_done", **stats)
|
||||
return stats
|
||||
|
||||
|
||||
async def run_awooop_ansible_candidate_backfill_loop() -> None:
|
||||
if not settings.ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER:
|
||||
logger.info("awooop_ansible_candidate_backfill_worker_disabled")
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"awooop_ansible_candidate_backfill_worker_started",
|
||||
interval_seconds=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS,
|
||||
batch_limit=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT,
|
||||
window_hours=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS,
|
||||
)
|
||||
await asyncio.sleep(settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_STARTUP_SLEEP_SECONDS)
|
||||
|
||||
while True:
|
||||
try:
|
||||
result = await enqueue_missing_ansible_candidates_once(
|
||||
limit=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT,
|
||||
window_hours=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS,
|
||||
)
|
||||
if result.get("queued") or result.get("error"):
|
||||
logger.info("awooop_ansible_candidate_backfill_worker_tick", **result)
|
||||
except Exception as exc:
|
||||
logger.warning("awooop_ansible_candidate_backfill_worker_failed", error=str(exc))
|
||||
|
||||
await asyncio.sleep(settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS)
|
||||
@@ -558,9 +558,25 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
except Exception as e:
|
||||
logger.warning("incident_lifecycle_reconciler_schedule_failed", error=str(e))
|
||||
|
||||
# AwoooP Ansible candidate backfill worker.
|
||||
# 把近期已命中 allowlisted PlayBook、但缺 durable candidate row 的事故補進
|
||||
# ansible_candidate_matched 佇列,讓 check-mode worker 可以主動認領。
|
||||
try:
|
||||
from src.jobs.awooop_ansible_candidate_backfill_job import (
|
||||
run_awooop_ansible_candidate_backfill_loop,
|
||||
)
|
||||
asyncio.create_task(run_awooop_ansible_candidate_backfill_loop())
|
||||
logger.info(
|
||||
"awooop_ansible_candidate_backfill_worker_scheduled",
|
||||
enabled=settings.ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER,
|
||||
interval_seconds=settings.AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("awooop_ansible_candidate_backfill_worker_schedule_failed", error=str(e))
|
||||
|
||||
# AwoooP Ansible check-mode worker.
|
||||
# 只執行 ansible-playbook --check --diff 並回寫 automation_operation_log;
|
||||
# apply 仍必須走 approval gate,本 worker 不寫 auto_repair_executions。
|
||||
# 先執行 ansible-playbook --check --diff 並回寫 automation_operation_log;
|
||||
# 通過後由 controlled apply guard 依 catalog/risk/verifier 進一步接管。
|
||||
try:
|
||||
from src.jobs.awooop_ansible_check_mode_job import (
|
||||
run_awooop_ansible_check_mode_loop,
|
||||
|
||||
103
apps/api/tests/test_awooop_ansible_candidate_backfill_job.py
Normal file
103
apps/api/tests/test_awooop_ansible_candidate_backfill_job.py
Normal file
@@ -0,0 +1,103 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import asynccontextmanager
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class _FakeMappings:
|
||||
def __init__(self, rows: list[dict]):
|
||||
self._rows = rows
|
||||
|
||||
def all(self) -> list[dict]:
|
||||
return self._rows
|
||||
|
||||
|
||||
class _FakeResult:
|
||||
def __init__(self, rows: list[dict]):
|
||||
self._rows = rows
|
||||
|
||||
def mappings(self) -> _FakeMappings:
|
||||
return _FakeMappings(self._rows)
|
||||
|
||||
|
||||
def _candidate_incident() -> dict:
|
||||
return {
|
||||
"incident_id": "INC-20260627-NODE110",
|
||||
"project_id": "awoooi",
|
||||
"status": "INVESTIGATING",
|
||||
"severity": "warning",
|
||||
"alertname": "NodeExporterDown",
|
||||
"alert_category": "infrastructure",
|
||||
"notification_type": "TYPE-3",
|
||||
"signals": [
|
||||
{
|
||||
"alert_name": "NodeExporterDown",
|
||||
"labels": {
|
||||
"alertname": "NodeExporterDown",
|
||||
"instance": "node-exporter-110",
|
||||
},
|
||||
"annotations": {},
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backfill_enqueues_catalog_matched_incident(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
from src.jobs import awooop_ansible_candidate_backfill_job as job
|
||||
|
||||
fake_db = AsyncMock()
|
||||
fake_db.execute = AsyncMock(return_value=_FakeResult([_candidate_incident()]))
|
||||
|
||||
@asynccontextmanager
|
||||
async def fake_db_context(project_id: str = "awoooi"):
|
||||
assert project_id == "awoooi"
|
||||
yield fake_db
|
||||
|
||||
recorded: list[dict] = []
|
||||
|
||||
async def fake_recorder(**kwargs):
|
||||
recorded.append(kwargs)
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(job, "get_db_context", fake_db_context)
|
||||
monkeypatch.setattr(job.settings, "ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER", True)
|
||||
|
||||
result = await job.enqueue_missing_ansible_candidates_once(
|
||||
project_id="awoooi",
|
||||
limit=5,
|
||||
window_hours=24,
|
||||
recorder=fake_recorder,
|
||||
)
|
||||
|
||||
assert result["queued"] == 1
|
||||
assert result["no_catalog_candidate"] == 0
|
||||
assert recorded[0]["decision_path"] == "repair_candidate_controlled_queue"
|
||||
assert recorded[0]["incident"]["incident_id"] == "INC-20260627-NODE110"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_backfill_skips_when_worker_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
|
||||
from src.jobs import awooop_ansible_candidate_backfill_job as job
|
||||
|
||||
monkeypatch.setattr(job.settings, "ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER", False)
|
||||
|
||||
result = await job.enqueue_missing_ansible_candidates_once()
|
||||
|
||||
assert result["skipped"] is True
|
||||
assert result["queued"] == 0
|
||||
|
||||
|
||||
def test_backfill_query_excludes_existing_ansible_candidate_rows() -> None:
|
||||
from src.jobs import awooop_ansible_candidate_backfill_job as job
|
||||
|
||||
source = MagicMock(wraps=job._fetch_missing_candidate_incidents)
|
||||
query_source = job._fetch_missing_candidate_incidents.__code__.co_consts
|
||||
joined = "\n".join(str(item) for item in query_source)
|
||||
|
||||
assert source is not None
|
||||
assert "NOT EXISTS" in joined
|
||||
assert "ansible_candidate_matched" in joined
|
||||
assert "resolved_at IS NULL" in joined
|
||||
@@ -137,6 +137,16 @@ spec:
|
||||
value: "24"
|
||||
- name: AWOOOP_ANSIBLE_CHECK_MODE_TRANSPORT_COOLDOWN_SECONDS
|
||||
value: "21600"
|
||||
- name: ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER
|
||||
value: "true"
|
||||
- name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_INTERVAL_SECONDS
|
||||
value: "300"
|
||||
- name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_BATCH_LIMIT
|
||||
value: "5"
|
||||
- name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WINDOW_HOURS
|
||||
value: "24"
|
||||
- name: AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_STARTUP_SLEEP_SECONDS
|
||||
value: "60"
|
||||
# 2026-04-05 Claude Code: Sprint 3 — 掛載 SSH key 供 HostRepairAgent 使用
|
||||
volumeMounts:
|
||||
- name: repair-ssh-key
|
||||
|
||||
Reference in New Issue
Block a user