fix(api): backfill ansible apply repair receipts
Some checks failed
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / tests (push) Successful in 1m39s
AI 技術雷達監控 / ai-technology-watch (push) Successful in 39s
CD Pipeline / build-and-deploy (push) Failing after 6m15s
CD Pipeline / post-deploy-checks (push) Has been skipped

This commit is contained in:
Your Name
2026-06-27 13:56:13 +08:00
parent adceae6f44
commit 7a1ad85d64
4 changed files with 180 additions and 0 deletions

View File

@@ -21,6 +21,9 @@ from src.services.awooop_ansible_audit_service import (
build_ansible_decision_audit_payload,
record_ansible_decision_audit,
)
from src.services.awooop_ansible_check_mode_service import (
backfill_missing_auto_repair_execution_receipts_once,
)
logger = structlog.get_logger(__name__)
@@ -96,6 +99,7 @@ async def enqueue_missing_ansible_candidates_once(
limit: int | None = None,
window_hours: int | None = None,
recorder: Recorder = record_ansible_decision_audit,
receipt_backfiller: Callable[..., Awaitable[dict[str, Any]]] = backfill_missing_auto_repair_execution_receipts_once,
) -> dict[str, Any]:
"""Backfill missing Ansible candidate rows for recent unresolved incidents."""
@@ -121,6 +125,7 @@ async def enqueue_missing_ansible_candidates_once(
"queued": 0,
"already_existing_or_write_skipped": 0,
"no_catalog_candidate": 0,
"repair_receipts_backfilled": 0,
"error": None,
}
@@ -153,6 +158,14 @@ async def enqueue_missing_ansible_candidates_once(
stats["queued"] += 1
else:
stats["already_existing_or_write_skipped"] += 1
receipt_stats = await receipt_backfiller(
project_id=project_id,
window_hours=bounded_window_hours,
limit=bounded_limit,
)
stats["repair_receipts_backfilled"] = int(receipt_stats.get("written") or 0)
if receipt_stats.get("error") and not stats["error"]:
stats["error"] = receipt_stats["error"]
except Exception as exc:
stats["error"] = f"{type(exc).__name__}: {exc}"[:500]
logger.warning("awooop_ansible_candidate_backfill_once_failed", **stats)

View File

@@ -510,6 +510,67 @@ def _build_auto_repair_execution_receipt(
}
def _int_from_value(value: Any, *, default: int = 1) -> int:
if value in (None, ""):
return default
try:
return int(value)
except (TypeError, ValueError):
return default
def _claim_from_apply_operation_row(row: dict[str, Any]) -> tuple[AnsibleCheckModeClaim, AnsibleRunResult] | None:
input_payload = _json_loads(row.get("input"))
output_payload = _json_loads(row.get("output"))
dry_run_result = _json_loads(row.get("dry_run_result"))
catalog_id = str(input_payload.get("catalog_id") or "")
if not catalog_id:
return None
catalog_item = get_ansible_catalog_item(catalog_id) or {}
incident_id = str(row.get("incident_id") or input_payload.get("incident_id") or "")
if not incident_id:
return None
inventory_hosts = input_payload.get("inventory_hosts") or catalog_item.get("inventory_hosts") or []
if not isinstance(inventory_hosts, list) or not inventory_hosts:
return None
apply_playbook_path = str(
input_payload.get("apply_playbook_path")
or input_payload.get("playbook_path")
or catalog_item.get("playbook_path")
or ""
)
check_mode_playbook_path = str(
input_payload.get("check_mode_playbook_path")
or input_payload.get("playbook_path")
or apply_playbook_path
)
if not apply_playbook_path:
return None
returncode = _int_from_value(
output_payload.get("returncode", dry_run_result.get("returncode")),
default=0 if str(row.get("status") or "").lower() == "success" else 1,
)
claim = AnsibleCheckModeClaim(
op_id=str(input_payload.get("check_mode_op_id") or row.get("parent_op_id") or ""),
source_candidate_op_id=str(input_payload.get("source_candidate_op_id") or ""),
incident_id=incident_id,
catalog_id=catalog_id,
playbook_path=check_mode_playbook_path,
apply_playbook_path=apply_playbook_path,
inventory_hosts=tuple(str(host) for host in inventory_hosts),
risk_level=str(input_payload.get("risk_level") or catalog_item.get("risk_level") or ""),
input_payload=input_payload,
)
result = AnsibleRunResult(
returncode=returncode,
stdout=str(output_payload.get("stdout_tail") or ""),
stderr=str(row.get("error") or output_payload.get("stderr_tail") or ""),
duration_ms=_int_from_value(row.get("duration_ms"), default=0),
timed_out=bool(output_payload.get("timed_out") or dry_run_result.get("timed_out") or False),
)
return claim, result
async def _record_auto_repair_execution_receipt(
claim: AnsibleCheckModeClaim,
result: AnsibleRunResult,
@@ -576,6 +637,66 @@ async def _record_auto_repair_execution_receipt(
return False
async def backfill_missing_auto_repair_execution_receipts_once(
*,
project_id: str = "awoooi",
window_hours: int = 24,
limit: int = 10,
) -> dict[str, Any]:
"""Create auto_repair_executions receipts for existing Ansible apply rows."""
stats = {"scanned": 0, "written": 0, "skipped": 0, "error": None}
try:
async with get_db_context(project_id) as db:
result = await db.execute(
text("""
SELECT
apply.op_id::text AS op_id,
apply.parent_op_id::text AS parent_op_id,
coalesce(apply.incident_id::text, apply.input ->> 'incident_id') AS incident_id,
apply.input,
apply.output,
apply.dry_run_result,
apply.error,
apply.duration_ms,
apply.status,
apply.created_at
FROM automation_operation_log apply
WHERE apply.operation_type = 'ansible_apply_executed'
AND apply.created_at >= NOW() - (:window_hours * INTERVAL '1 hour')
AND NOT EXISTS (
SELECT 1
FROM auto_repair_executions existing
WHERE existing.executed_steps::text LIKE '%' || apply.op_id::text || '%'
)
ORDER BY apply.created_at DESC
LIMIT :limit
"""),
{"window_hours": max(1, window_hours), "limit": max(1, limit)},
)
rows = [dict(row) for row in result.mappings().all()]
stats["scanned"] = len(rows)
for row in rows:
reconstructed = _claim_from_apply_operation_row(row)
if reconstructed is None:
stats["skipped"] += 1
continue
claim, result = reconstructed
if await _record_auto_repair_execution_receipt(
claim,
result,
apply_op_id=str(row.get("op_id") or ""),
project_id=project_id,
):
stats["written"] += 1
else:
stats["skipped"] += 1
except Exception as exc:
stats["error"] = f"{type(exc).__name__}: {exc}"[:500]
logger.warning("ansible_auto_repair_execution_receipt_backfill_failed", **stats)
return stats
async def claim_pending_check_modes(
*,
project_id: str = "awoooi",

View File

@@ -62,6 +62,10 @@ async def test_backfill_enqueues_catalog_matched_incident(monkeypatch: pytest.Mo
recorded.append(kwargs)
return True
async def fake_receipt_backfiller(**kwargs):
assert kwargs["project_id"] == "awoooi"
return {"written": 2, "error": None}
monkeypatch.setattr(job, "get_db_context", fake_db_context)
monkeypatch.setattr(job.settings, "ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER", True)
@@ -70,10 +74,12 @@ async def test_backfill_enqueues_catalog_matched_incident(monkeypatch: pytest.Mo
limit=5,
window_hours=24,
recorder=fake_recorder,
receipt_backfiller=fake_receipt_backfiller,
)
assert result["queued"] == 1
assert result["no_catalog_candidate"] == 0
assert result["repair_receipts_backfilled"] == 2
assert recorded[0]["decision_path"] == "repair_candidate_controlled_queue"
assert recorded[0]["incident"]["incident_id"] == "INC-20260627-NODE110"

View File

@@ -16,9 +16,11 @@ from src.services.awooop_ansible_check_mode_service import (
AnsibleRunResult,
_automation_operation_log_incident_id,
_build_auto_repair_execution_receipt,
_claim_from_apply_operation_row,
build_ansible_apply_command,
build_ansible_check_mode_claim_input,
build_ansible_check_mode_command,
backfill_missing_auto_repair_execution_receipts_once,
claim_pending_check_modes,
detect_ansible_transport_blockers,
recent_ansible_transport_blockers,
@@ -1479,6 +1481,44 @@ def test_ansible_controlled_apply_builds_auto_repair_receipt() -> None:
assert receipt["execution_time_ms"] == 1234
def test_ansible_apply_operation_row_can_backfill_auto_repair_receipt() -> None:
reconstructed = _claim_from_apply_operation_row({
"op_id": "apply-op-1",
"parent_op_id": "check-op-1",
"incident_id": "INC-20260627-NODE110",
"status": "success",
"input": {
"incident_id": "INC-20260627-NODE110",
"catalog_id": "ansible:110-devops",
"source_candidate_op_id": "candidate-op-1",
"check_mode_op_id": "check-op-1",
"playbook_path": "infra/ansible/playbooks/110-devops.yml",
"apply_playbook_path": "infra/ansible/playbooks/110-devops.yml",
"inventory_hosts": ["host_110"],
"risk_level": "medium",
},
"output": {"returncode": 0, "stdout_tail": "ok"},
"dry_run_result": {"apply_executed": True},
"duration_ms": 456,
})
assert reconstructed is not None
claim, result = reconstructed
assert claim.incident_id == "INC-20260627-NODE110"
assert claim.catalog_id == "ansible:110-devops"
assert claim.apply_playbook_path == "infra/ansible/playbooks/110-devops.yml"
assert result.returncode == 0
assert result.duration_ms == 456
def test_ansible_apply_receipt_backfill_queries_existing_apply_rows() -> None:
source = inspect.getsource(backfill_missing_auto_repair_execution_receipts_once)
assert "operation_type = 'ansible_apply_executed'" in source
assert "auto_repair_executions existing" in source
assert "executed_steps::text LIKE" in source
def test_ansible_claim_query_limits_recent_candidate_backlog() -> None:
source = inspect.getsource(claim_pending_check_modes)