feat(ai): add autonomous runtime loop ledger
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 2m44s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped

This commit is contained in:
Your Name
2026-06-29 13:53:04 +08:00
parent 023f46f286
commit 52490d25dd
6 changed files with 638 additions and 32 deletions

View File

@@ -151,6 +151,298 @@ def _status_counts(
}
def _first_operation(
rows: Iterable[Mapping[str, Any]],
operation_type: str,
) -> dict[str, Any] | None:
for row in rows:
if str(row.get("operation_type") or "") == operation_type:
return dict(row)
return None
def _operation_by_id(
rows: Iterable[Mapping[str, Any]],
op_id: Any,
) -> dict[str, Any] | None:
needle = str(op_id or "")
if not needle:
return None
for row in rows:
if str(row.get("op_id") or "") == needle:
return dict(row)
return None
def _stage_status(row: Mapping[str, Any] | None, *, fallback_status: str | None = None) -> str:
if row is None:
return fallback_status or "missing"
return str(row.get("status") or row.get("result_status") or fallback_status or "present")
def _loop_stage(
*,
stage_id: str,
receipt_source: str,
present: bool,
status: str,
ref_id: str | None,
writes_runtime_state: bool,
next_action_if_missing: str,
) -> dict[str, Any]:
return {
"stage_id": stage_id,
"receipt_source": receipt_source,
"present": present,
"status": status,
"ref_id": ref_id,
"writes_runtime_state": writes_runtime_state,
"next_action_if_missing": None if present else next_action_if_missing,
}
def _autonomous_execution_loop_ledger(
*,
project_id: str,
operation_latest_rows: Iterable[Mapping[str, Any] | Any],
verifier_latest_rows: Iterable[Mapping[str, Any] | Any],
km_latest_rows: Iterable[Mapping[str, Any] | Any],
telegram_latest_rows: Iterable[Mapping[str, Any] | Any],
auto_repair_latest_rows: Iterable[Mapping[str, Any] | Any],
latest_flow_closure: Mapping[str, Any],
latest_failure_classification: Mapping[str, Any],
controlled_retry_package: Mapping[str, Any],
) -> dict[str, Any]:
"""Build the operation-id ledger that proves whether the runtime loop closed."""
operation_rows = [_row_mapping(row) for row in operation_latest_rows]
verifier_rows = [_row_mapping(row) for row in verifier_latest_rows]
km_rows = [_row_mapping(row) for row in km_latest_rows]
telegram_rows = [_row_mapping(row) for row in telegram_latest_rows]
auto_repair_rows = [_row_mapping(row) for row in auto_repair_latest_rows]
latest_apply = _first_operation(operation_rows, "ansible_apply_executed")
latest_check = None
latest_candidate = None
if latest_apply is not None:
latest_check = _operation_by_id(
operation_rows,
latest_apply.get("check_mode_op_id") or latest_apply.get("parent_op_id"),
)
else:
latest_check = _first_operation(operation_rows, "ansible_check_mode_executed")
source_candidate_op_id = None
if latest_check is not None:
source_candidate_op_id = latest_check.get("parent_op_id") or latest_check.get("source_candidate_op_id")
if latest_apply is not None and not source_candidate_op_id:
source_candidate_op_id = latest_apply.get("source_candidate_op_id")
latest_candidate = _operation_by_id(operation_rows, source_candidate_op_id)
if latest_candidate is None and latest_apply is None and latest_check is None:
latest_candidate = _first_operation(operation_rows, "ansible_candidate_matched")
anchor = latest_apply or latest_check or latest_candidate or {}
apply_op_id = str((latest_apply or {}).get("op_id") or "")
check_mode_op_id = str(
(latest_check or {}).get("op_id")
or (latest_apply or {}).get("check_mode_op_id")
or (latest_apply or {}).get("parent_op_id")
or ""
)
candidate_op_id = str(
(latest_candidate or {}).get("op_id")
or source_candidate_op_id
or ""
)
incident_id = str(anchor.get("incident_id") or "")
catalog_id = str(anchor.get("catalog_id") or "")
playbook_path = str(anchor.get("playbook_path") or "")
verifier = next(
(
row
for row in verifier_rows
if apply_op_id and str(row.get("apply_op_id") or "") == apply_op_id
),
None,
)
km_path_type = f"ansible_apply_receipt:{apply_op_id[:8]}" if apply_op_id else ""
km = next(
(
row
for row in km_rows
if (
km_path_type
and str(row.get("path_type") or "") == km_path_type
)
or (
incident_id
and str(row.get("related_incident_id") or "") == incident_id
)
),
None,
)
telegram = next(
(
row
for row in telegram_rows
if str(row.get("send_status") or "") == "sent"
and str(row.get("action") or "") == "controlled_apply_result"
and (
not incident_id
or str(row.get("incident_id") or "") == incident_id
)
),
None,
)
auto_repair = next(
(
row
for row in auto_repair_rows
if apply_op_id
and apply_op_id
in str(row.get("executed_steps_text") or row.get("executed_steps") or "")
),
None,
)
candidate_present = bool(latest_candidate or candidate_op_id)
check_present = bool(latest_check or check_mode_op_id)
apply_present = latest_apply is not None
auto_repair_present = auto_repair is not None
verifier_present = verifier is not None
km_present = km is not None
telegram_present = telegram is not None
stages = [
_loop_stage(
stage_id="candidate",
receipt_source="automation_operation_log:ansible_candidate_matched",
present=candidate_present,
status=_stage_status(latest_candidate, fallback_status="inferred_from_check_mode")
if candidate_present
else "missing",
ref_id=candidate_op_id or None,
writes_runtime_state=False,
next_action_if_missing="candidate_backfill_worker_enqueue_allowlisted_playbook",
),
_loop_stage(
stage_id="check_mode",
receipt_source="automation_operation_log:ansible_check_mode_executed",
present=check_present,
status=_stage_status(latest_check, fallback_status="inferred_from_apply_parent")
if check_present
else "missing",
ref_id=check_mode_op_id or None,
writes_runtime_state=False,
next_action_if_missing="ansible_check_mode_worker_claims_candidate",
),
_loop_stage(
stage_id="controlled_apply",
receipt_source="automation_operation_log:ansible_apply_executed",
present=apply_present,
status=_stage_status(latest_apply),
ref_id=apply_op_id or None,
writes_runtime_state=True,
next_action_if_missing="controlled_apply_worker_waits_for_check_mode_success",
),
_loop_stage(
stage_id="auto_repair_execution_receipt",
receipt_source="auto_repair_executions:ansible_controlled_apply",
present=auto_repair_present,
status=str((auto_repair or {}).get("result_status") or "missing"),
ref_id=str((auto_repair or {}).get("id") or "") or None,
writes_runtime_state=True,
next_action_if_missing="receipt_backfill_records_auto_repair_execution",
),
_loop_stage(
stage_id="post_apply_verifier",
receipt_source="incident_evidence.post_execution_state",
present=verifier_present,
status=str((verifier or {}).get("verification_result") or "missing"),
ref_id=str((verifier or {}).get("id") or "") or None,
writes_runtime_state=True,
next_action_if_missing="post_apply_verifier_writes_incident_evidence",
),
_loop_stage(
stage_id="km_playbook_writeback",
receipt_source="knowledge_entries:ansible_apply_receipt",
present=km_present,
status=str((km or {}).get("status") or "missing"),
ref_id=str((km or {}).get("id") or "") or None,
writes_runtime_state=True,
next_action_if_missing="hermes_writes_km_playbook_trust_candidate",
),
_loop_stage(
stage_id="telegram_receipt",
receipt_source="awooop_outbound_message:controlled_apply_result",
present=telegram_present,
status=str((telegram or {}).get("send_status") or "missing"),
ref_id=str((telegram or {}).get("message_id") or "") or None,
writes_runtime_state=True,
next_action_if_missing="live_apply_gateway_sends_controlled_apply_result_receipt",
),
]
missing_stage_ids = [
str(stage["stage_id"])
for stage in stages
if stage["present"] is not True
]
closed = bool(
apply_op_id
and auto_repair_present
and latest_flow_closure.get("closed") is True
)
classification = str(latest_failure_classification.get("classification") or "")
if not candidate_present and not check_present and not apply_present:
execution_state = "waiting_for_candidate"
next_executor_action = "candidate_backfill_worker_waits_for_matching_incident"
elif not apply_present:
execution_state = "executor_in_progress_or_waiting"
next_executor_action = "continue_candidate_to_check_mode_to_apply"
elif closed and classification == "latest_controlled_apply_closed_success":
execution_state = "closed_success"
next_executor_action = "keep_receipt_chain_closed"
elif closed:
execution_state = "closed_failed_apply_repair_ready"
next_executor_action = str(
controlled_retry_package.get("next_ai_action")
or "run_no_write_check_mode_replay"
)
elif "telegram_receipt" in missing_stage_ids:
execution_state = "open_waiting_for_live_gateway_receipt"
next_executor_action = "do_not_fake_send_backfill_wait_for_live_apply_gateway"
else:
execution_state = "open_missing_internal_receipts"
next_executor_action = "backfill_missing_auto_repair_verifier_km_receipts"
return {
"schema_version": "ai_agent_autonomous_execution_loop_ledger_v1",
"project_id": project_id,
"operation_id": apply_op_id or check_mode_op_id or candidate_op_id or None,
"root_candidate_op_id": candidate_op_id or None,
"check_mode_op_id": check_mode_op_id or None,
"apply_op_id": apply_op_id or None,
"incident_id": incident_id or None,
"catalog_id": catalog_id or None,
"playbook_path": playbook_path or None,
"execution_state": execution_state,
"closed": closed,
"missing_stage_ids": missing_stage_ids,
"next_executor_action": next_executor_action,
"stages": stages,
"safety_contract": {
"writes_on_read": False,
"backfill_may_write_auto_repair_verifier_km": True,
"backfill_may_send_telegram": False,
"live_apply_may_send_telegram_gateway_receipt": True,
"reads_raw_sessions": False,
"reads_secret_values": False,
},
}
def _latest_flow_closure(
*,
operation_latest_rows: Iterable[Mapping[str, Any] | Any],
@@ -568,6 +860,8 @@ def build_runtime_receipt_readback_from_rows(
db_read_status: str = "ok",
operation_count_rows: Iterable[Mapping[str, Any] | Any] = (),
operation_latest_rows: Iterable[Mapping[str, Any] | Any] = (),
auto_repair_count_rows: Iterable[Mapping[str, Any] | Any] = (),
auto_repair_latest_rows: Iterable[Mapping[str, Any] | Any] = (),
verifier_count_rows: Iterable[Mapping[str, Any] | Any] = (),
verifier_latest_rows: Iterable[Mapping[str, Any] | Any] = (),
km_count_rows: Iterable[Mapping[str, Any] | Any] = (),
@@ -579,10 +873,15 @@ def build_runtime_receipt_readback_from_rows(
"""Build the live executor receipt readback from already-fetched rows."""
operation_latest = list(operation_latest_rows)
auto_repair_latest = list(auto_repair_latest_rows)
verifier_latest = list(verifier_latest_rows)
km_latest = list(km_latest_rows)
telegram_latest = list(telegram_latest_rows)
operation_summary = _operation_counts(operation_count_rows)
auto_repair_summary = _status_counts(
auto_repair_count_rows,
status_key="result_status",
)
verifier_summary = _status_counts(
verifier_count_rows,
status_key="verification_result",
@@ -601,6 +900,17 @@ def build_runtime_receipt_readback_from_rows(
latest_flow_closure=latest_closure,
)
retry_package = _controlled_retry_package(latest_failure)
loop_ledger = _autonomous_execution_loop_ledger(
project_id=project_id,
operation_latest_rows=operation_latest,
verifier_latest_rows=verifier_latest,
km_latest_rows=km_latest,
telegram_latest_rows=telegram_latest,
auto_repair_latest_rows=auto_repair_latest,
latest_flow_closure=latest_closure,
latest_failure_classification=latest_failure,
controlled_retry_package=retry_package,
)
apply_summary = operation_summary.get("ansible_apply_executed") or {}
readback = {
"schema_version": _LIVE_READBACK_SCHEMA_VERSION,
@@ -622,12 +932,33 @@ def build_runtime_receipt_readback_from_rows(
"catalog_id",
"playbook_path",
"execution_mode",
"source_candidate_op_id",
"check_mode_op_id",
"risk_level",
"controlled_apply_allowed",
"returncode",
"duration_ms",
"created_at",
),
),
},
"auto_repair_execution_receipt": {
**auto_repair_summary,
"latest": _sanitize_latest_rows(
auto_repair_latest,
allowed_keys=(
"id",
"incident_id",
"catalog_id",
"playbook_name",
"result_status",
"triggered_by",
"risk_level",
"execution_time_ms",
"created_at",
),
),
},
"ansible_apply_executed": {
"total": _int_value(apply_summary.get("total")),
"recent": _int_value(apply_summary.get("recent")),
@@ -686,6 +1017,7 @@ def build_runtime_receipt_readback_from_rows(
"latest_flow_closure": latest_closure,
"latest_failure_classification": latest_failure,
"controlled_retry_package": retry_package,
"autonomous_execution_loop_ledger": loop_ledger,
}
if error_type:
readback["error"] = {
@@ -705,6 +1037,9 @@ def _attach_runtime_receipt_readback(
"live_ansible_apply_executed_count": _int_value(
readback.get("ansible_apply_executed", {}).get("total")
),
"live_auto_repair_execution_receipt_count": _int_value(
readback.get("auto_repair_execution_receipt", {}).get("total")
),
"live_post_apply_verifier_count": _int_value(
readback.get("post_apply_verifier", {}).get("total")
),
@@ -719,6 +1054,11 @@ def _attach_runtime_receipt_readback(
if (readback.get("latest_flow_closure") or {}).get("closed") is True
else 0
),
"live_autonomous_execution_loop_closed_count": (
1
if (readback.get("autonomous_execution_loop_ledger") or {}).get("closed") is True
else 0
),
"live_executor_latest_apply_repair_required_count": (
1
if (
@@ -987,6 +1327,12 @@ async def load_ai_agent_autonomous_runtime_receipt_readback(
operation_latest = (
await db.execute(text(_RUNTIME_OPERATION_LATEST_SQL), params)
).mappings().all()
auto_repair_counts = (
await db.execute(text(_RUNTIME_AUTO_REPAIR_COUNTS_SQL), params)
).mappings().all()
auto_repair_latest = (
await db.execute(text(_RUNTIME_AUTO_REPAIR_LATEST_SQL), params)
).mappings().all()
verifier_counts = (
await db.execute(text(_RUNTIME_VERIFIER_COUNTS_SQL), params)
).mappings().all()
@@ -1024,6 +1370,8 @@ async def load_ai_agent_autonomous_runtime_receipt_readback(
db_read_status="ok",
operation_count_rows=operation_counts,
operation_latest_rows=operation_latest,
auto_repair_count_rows=auto_repair_counts,
auto_repair_latest_rows=auto_repair_latest,
verifier_count_rows=verifier_counts,
verifier_latest_rows=verifier_latest,
km_count_rows=km_counts,
@@ -1082,6 +1430,10 @@ _RUNTIME_OPERATION_LATEST_SQL = """
input ->> 'catalog_id' AS catalog_id,
coalesce(input ->> 'apply_playbook_path', input ->> 'playbook_path') AS playbook_path,
input ->> 'execution_mode' AS execution_mode,
input ->> 'source_candidate_op_id' AS source_candidate_op_id,
input ->> 'check_mode_op_id' AS check_mode_op_id,
input ->> 'risk_level' AS risk_level,
input ->> 'controlled_apply_allowed' AS controlled_apply_allowed,
coalesce(output ->> 'returncode', dry_run_result ->> 'returncode') AS returncode,
duration_ms,
created_at
@@ -1098,6 +1450,39 @@ _RUNTIME_OPERATION_LATEST_SQL = """
"""
_RUNTIME_AUTO_REPAIR_COUNTS_SQL = """
SELECT
CASE WHEN success THEN 'success' ELSE 'failed' END AS result_status,
count(*) AS total,
count(*) FILTER (
WHERE created_at >= NOW() - (:lookback_hours * INTERVAL '1 hour')
) AS recent
FROM auto_repair_executions
WHERE triggered_by = 'ansible_controlled_apply'
GROUP BY CASE WHEN success THEN 'success' ELSE 'failed' END
ORDER BY result_status
"""
_RUNTIME_AUTO_REPAIR_LATEST_SQL = """
SELECT
id,
incident_id,
playbook_id AS catalog_id,
playbook_name,
CASE WHEN success THEN 'success' ELSE 'failed' END AS result_status,
executed_steps::text AS executed_steps_text,
triggered_by,
risk_level,
execution_time_ms,
created_at
FROM auto_repair_executions
WHERE triggered_by = 'ansible_controlled_apply'
ORDER BY created_at DESC
LIMIT :limit
"""
_RUNTIME_VERIFIER_COUNTS_SQL = """
SELECT
coalesce(verification_result, 'missing') AS verification_result,