fix(api): backfill ansible apply repair receipts
Some checks failed
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / tests (push) Successful in 1m39s
AI 技術雷達監控 / ai-technology-watch (push) Successful in 39s
CD Pipeline / build-and-deploy (push) Failing after 6m15s
CD Pipeline / post-deploy-checks (push) Has been skipped
Some checks failed
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / tests (push) Successful in 1m39s
AI 技術雷達監控 / ai-technology-watch (push) Successful in 39s
CD Pipeline / build-and-deploy (push) Failing after 6m15s
CD Pipeline / post-deploy-checks (push) Has been skipped
This commit is contained in:
@@ -21,6 +21,9 @@ from src.services.awooop_ansible_audit_service import (
|
||||
build_ansible_decision_audit_payload,
|
||||
record_ansible_decision_audit,
|
||||
)
|
||||
from src.services.awooop_ansible_check_mode_service import (
|
||||
backfill_missing_auto_repair_execution_receipts_once,
|
||||
)
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
@@ -96,6 +99,7 @@ async def enqueue_missing_ansible_candidates_once(
|
||||
limit: int | None = None,
|
||||
window_hours: int | None = None,
|
||||
recorder: Recorder = record_ansible_decision_audit,
|
||||
receipt_backfiller: Callable[..., Awaitable[dict[str, Any]]] = backfill_missing_auto_repair_execution_receipts_once,
|
||||
) -> dict[str, Any]:
|
||||
"""Backfill missing Ansible candidate rows for recent unresolved incidents."""
|
||||
|
||||
@@ -121,6 +125,7 @@ async def enqueue_missing_ansible_candidates_once(
|
||||
"queued": 0,
|
||||
"already_existing_or_write_skipped": 0,
|
||||
"no_catalog_candidate": 0,
|
||||
"repair_receipts_backfilled": 0,
|
||||
"error": None,
|
||||
}
|
||||
|
||||
@@ -153,6 +158,14 @@ async def enqueue_missing_ansible_candidates_once(
|
||||
stats["queued"] += 1
|
||||
else:
|
||||
stats["already_existing_or_write_skipped"] += 1
|
||||
receipt_stats = await receipt_backfiller(
|
||||
project_id=project_id,
|
||||
window_hours=bounded_window_hours,
|
||||
limit=bounded_limit,
|
||||
)
|
||||
stats["repair_receipts_backfilled"] = int(receipt_stats.get("written") or 0)
|
||||
if receipt_stats.get("error") and not stats["error"]:
|
||||
stats["error"] = receipt_stats["error"]
|
||||
except Exception as exc:
|
||||
stats["error"] = f"{type(exc).__name__}: {exc}"[:500]
|
||||
logger.warning("awooop_ansible_candidate_backfill_once_failed", **stats)
|
||||
|
||||
@@ -510,6 +510,67 @@ def _build_auto_repair_execution_receipt(
|
||||
}
|
||||
|
||||
|
||||
def _int_from_value(value: Any, *, default: int = 1) -> int:
|
||||
if value in (None, ""):
|
||||
return default
|
||||
try:
|
||||
return int(value)
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
|
||||
def _claim_from_apply_operation_row(row: dict[str, Any]) -> tuple[AnsibleCheckModeClaim, AnsibleRunResult] | None:
|
||||
input_payload = _json_loads(row.get("input"))
|
||||
output_payload = _json_loads(row.get("output"))
|
||||
dry_run_result = _json_loads(row.get("dry_run_result"))
|
||||
catalog_id = str(input_payload.get("catalog_id") or "")
|
||||
if not catalog_id:
|
||||
return None
|
||||
catalog_item = get_ansible_catalog_item(catalog_id) or {}
|
||||
incident_id = str(row.get("incident_id") or input_payload.get("incident_id") or "")
|
||||
if not incident_id:
|
||||
return None
|
||||
inventory_hosts = input_payload.get("inventory_hosts") or catalog_item.get("inventory_hosts") or []
|
||||
if not isinstance(inventory_hosts, list) or not inventory_hosts:
|
||||
return None
|
||||
apply_playbook_path = str(
|
||||
input_payload.get("apply_playbook_path")
|
||||
or input_payload.get("playbook_path")
|
||||
or catalog_item.get("playbook_path")
|
||||
or ""
|
||||
)
|
||||
check_mode_playbook_path = str(
|
||||
input_payload.get("check_mode_playbook_path")
|
||||
or input_payload.get("playbook_path")
|
||||
or apply_playbook_path
|
||||
)
|
||||
if not apply_playbook_path:
|
||||
return None
|
||||
returncode = _int_from_value(
|
||||
output_payload.get("returncode", dry_run_result.get("returncode")),
|
||||
default=0 if str(row.get("status") or "").lower() == "success" else 1,
|
||||
)
|
||||
claim = AnsibleCheckModeClaim(
|
||||
op_id=str(input_payload.get("check_mode_op_id") or row.get("parent_op_id") or ""),
|
||||
source_candidate_op_id=str(input_payload.get("source_candidate_op_id") or ""),
|
||||
incident_id=incident_id,
|
||||
catalog_id=catalog_id,
|
||||
playbook_path=check_mode_playbook_path,
|
||||
apply_playbook_path=apply_playbook_path,
|
||||
inventory_hosts=tuple(str(host) for host in inventory_hosts),
|
||||
risk_level=str(input_payload.get("risk_level") or catalog_item.get("risk_level") or ""),
|
||||
input_payload=input_payload,
|
||||
)
|
||||
result = AnsibleRunResult(
|
||||
returncode=returncode,
|
||||
stdout=str(output_payload.get("stdout_tail") or ""),
|
||||
stderr=str(row.get("error") or output_payload.get("stderr_tail") or ""),
|
||||
duration_ms=_int_from_value(row.get("duration_ms"), default=0),
|
||||
timed_out=bool(output_payload.get("timed_out") or dry_run_result.get("timed_out") or False),
|
||||
)
|
||||
return claim, result
|
||||
|
||||
|
||||
async def _record_auto_repair_execution_receipt(
|
||||
claim: AnsibleCheckModeClaim,
|
||||
result: AnsibleRunResult,
|
||||
@@ -576,6 +637,66 @@ async def _record_auto_repair_execution_receipt(
|
||||
return False
|
||||
|
||||
|
||||
async def backfill_missing_auto_repair_execution_receipts_once(
|
||||
*,
|
||||
project_id: str = "awoooi",
|
||||
window_hours: int = 24,
|
||||
limit: int = 10,
|
||||
) -> dict[str, Any]:
|
||||
"""Create auto_repair_executions receipts for existing Ansible apply rows."""
|
||||
|
||||
stats = {"scanned": 0, "written": 0, "skipped": 0, "error": None}
|
||||
try:
|
||||
async with get_db_context(project_id) as db:
|
||||
result = await db.execute(
|
||||
text("""
|
||||
SELECT
|
||||
apply.op_id::text AS op_id,
|
||||
apply.parent_op_id::text AS parent_op_id,
|
||||
coalesce(apply.incident_id::text, apply.input ->> 'incident_id') AS incident_id,
|
||||
apply.input,
|
||||
apply.output,
|
||||
apply.dry_run_result,
|
||||
apply.error,
|
||||
apply.duration_ms,
|
||||
apply.status,
|
||||
apply.created_at
|
||||
FROM automation_operation_log apply
|
||||
WHERE apply.operation_type = 'ansible_apply_executed'
|
||||
AND apply.created_at >= NOW() - (:window_hours * INTERVAL '1 hour')
|
||||
AND NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM auto_repair_executions existing
|
||||
WHERE existing.executed_steps::text LIKE '%' || apply.op_id::text || '%'
|
||||
)
|
||||
ORDER BY apply.created_at DESC
|
||||
LIMIT :limit
|
||||
"""),
|
||||
{"window_hours": max(1, window_hours), "limit": max(1, limit)},
|
||||
)
|
||||
rows = [dict(row) for row in result.mappings().all()]
|
||||
stats["scanned"] = len(rows)
|
||||
for row in rows:
|
||||
reconstructed = _claim_from_apply_operation_row(row)
|
||||
if reconstructed is None:
|
||||
stats["skipped"] += 1
|
||||
continue
|
||||
claim, result = reconstructed
|
||||
if await _record_auto_repair_execution_receipt(
|
||||
claim,
|
||||
result,
|
||||
apply_op_id=str(row.get("op_id") or ""),
|
||||
project_id=project_id,
|
||||
):
|
||||
stats["written"] += 1
|
||||
else:
|
||||
stats["skipped"] += 1
|
||||
except Exception as exc:
|
||||
stats["error"] = f"{type(exc).__name__}: {exc}"[:500]
|
||||
logger.warning("ansible_auto_repair_execution_receipt_backfill_failed", **stats)
|
||||
return stats
|
||||
|
||||
|
||||
async def claim_pending_check_modes(
|
||||
*,
|
||||
project_id: str = "awoooi",
|
||||
|
||||
@@ -62,6 +62,10 @@ async def test_backfill_enqueues_catalog_matched_incident(monkeypatch: pytest.Mo
|
||||
recorded.append(kwargs)
|
||||
return True
|
||||
|
||||
async def fake_receipt_backfiller(**kwargs):
|
||||
assert kwargs["project_id"] == "awoooi"
|
||||
return {"written": 2, "error": None}
|
||||
|
||||
monkeypatch.setattr(job, "get_db_context", fake_db_context)
|
||||
monkeypatch.setattr(job.settings, "ENABLE_AWOOOP_ANSIBLE_CANDIDATE_BACKFILL_WORKER", True)
|
||||
|
||||
@@ -70,10 +74,12 @@ async def test_backfill_enqueues_catalog_matched_incident(monkeypatch: pytest.Mo
|
||||
limit=5,
|
||||
window_hours=24,
|
||||
recorder=fake_recorder,
|
||||
receipt_backfiller=fake_receipt_backfiller,
|
||||
)
|
||||
|
||||
assert result["queued"] == 1
|
||||
assert result["no_catalog_candidate"] == 0
|
||||
assert result["repair_receipts_backfilled"] == 2
|
||||
assert recorded[0]["decision_path"] == "repair_candidate_controlled_queue"
|
||||
assert recorded[0]["incident"]["incident_id"] == "INC-20260627-NODE110"
|
||||
|
||||
|
||||
@@ -16,9 +16,11 @@ from src.services.awooop_ansible_check_mode_service import (
|
||||
AnsibleRunResult,
|
||||
_automation_operation_log_incident_id,
|
||||
_build_auto_repair_execution_receipt,
|
||||
_claim_from_apply_operation_row,
|
||||
build_ansible_apply_command,
|
||||
build_ansible_check_mode_claim_input,
|
||||
build_ansible_check_mode_command,
|
||||
backfill_missing_auto_repair_execution_receipts_once,
|
||||
claim_pending_check_modes,
|
||||
detect_ansible_transport_blockers,
|
||||
recent_ansible_transport_blockers,
|
||||
@@ -1479,6 +1481,44 @@ def test_ansible_controlled_apply_builds_auto_repair_receipt() -> None:
|
||||
assert receipt["execution_time_ms"] == 1234
|
||||
|
||||
|
||||
def test_ansible_apply_operation_row_can_backfill_auto_repair_receipt() -> None:
|
||||
reconstructed = _claim_from_apply_operation_row({
|
||||
"op_id": "apply-op-1",
|
||||
"parent_op_id": "check-op-1",
|
||||
"incident_id": "INC-20260627-NODE110",
|
||||
"status": "success",
|
||||
"input": {
|
||||
"incident_id": "INC-20260627-NODE110",
|
||||
"catalog_id": "ansible:110-devops",
|
||||
"source_candidate_op_id": "candidate-op-1",
|
||||
"check_mode_op_id": "check-op-1",
|
||||
"playbook_path": "infra/ansible/playbooks/110-devops.yml",
|
||||
"apply_playbook_path": "infra/ansible/playbooks/110-devops.yml",
|
||||
"inventory_hosts": ["host_110"],
|
||||
"risk_level": "medium",
|
||||
},
|
||||
"output": {"returncode": 0, "stdout_tail": "ok"},
|
||||
"dry_run_result": {"apply_executed": True},
|
||||
"duration_ms": 456,
|
||||
})
|
||||
|
||||
assert reconstructed is not None
|
||||
claim, result = reconstructed
|
||||
assert claim.incident_id == "INC-20260627-NODE110"
|
||||
assert claim.catalog_id == "ansible:110-devops"
|
||||
assert claim.apply_playbook_path == "infra/ansible/playbooks/110-devops.yml"
|
||||
assert result.returncode == 0
|
||||
assert result.duration_ms == 456
|
||||
|
||||
|
||||
def test_ansible_apply_receipt_backfill_queries_existing_apply_rows() -> None:
|
||||
source = inspect.getsource(backfill_missing_auto_repair_execution_receipts_once)
|
||||
|
||||
assert "operation_type = 'ansible_apply_executed'" in source
|
||||
assert "auto_repair_executions existing" in source
|
||||
assert "executed_steps::text LIKE" in source
|
||||
|
||||
|
||||
def test_ansible_claim_query_limits_recent_candidate_backlog() -> None:
|
||||
source = inspect.getsource(claim_pending_check_modes)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user