From 6d0d9641bdd1c781a4386f61feeecc46fd86e596 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 30 Jun 2026 01:09:48 +0800 Subject: [PATCH] fix(api): fallback log dispatch ledger operation --- .../ai_agent_autonomous_runtime_control.py | 62 +++++++++++++------ ...agent_log_controlled_writeback_dispatch.py | 51 +++++++++++++-- ...est_ai_agent_autonomous_runtime_control.py | 10 +++ ...t_log_controlled_writeback_dispatch_api.py | 18 +++++- 4 files changed, 114 insertions(+), 27 deletions(-) diff --git a/apps/api/src/services/ai_agent_autonomous_runtime_control.py b/apps/api/src/services/ai_agent_autonomous_runtime_control.py index 18676dbd..0ca66232 100644 --- a/apps/api/src/services/ai_agent_autonomous_runtime_control.py +++ b/apps/api/src/services/ai_agent_autonomous_runtime_control.py @@ -3541,24 +3541,35 @@ async def build_ai_agent_autonomous_runtime_control_with_live_readback( _RUNTIME_OPERATION_COUNTS_SQL = """ SELECT - operation_type, + CASE + WHEN operation_type = 'km_linked' + AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched' + THEN 'log_controlled_writeback_dispatched' + ELSE operation_type + END AS operation_type, status, count(*) AS total, count(*) FILTER ( WHERE created_at >= NOW() - (:lookback_hours * INTERVAL '1 hour') ) AS recent FROM automation_operation_log - WHERE operation_type IN ( - 'ansible_candidate_matched', - 'ansible_check_mode_executed', - 'ansible_apply_executed', - 'ansible_learning_writeback_recorded', - 'ansible_rollback_executed', - 'ansible_execution_skipped', - 'log_controlled_writeback_dispatched' + WHERE ( + operation_type IN ( + 'ansible_candidate_matched', + 'ansible_check_mode_executed', + 'ansible_apply_executed', + 'ansible_learning_writeback_recorded', + 'ansible_rollback_executed', + 'ansible_execution_skipped', + 'log_controlled_writeback_dispatched' + ) + OR ( + operation_type = 'km_linked' + AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched' + ) ) - GROUP BY operation_type, status - ORDER BY operation_type, status + GROUP BY 1, status + ORDER BY 1, status """ @@ -3566,7 +3577,12 @@ _RUNTIME_OPERATION_LATEST_SQL = """ SELECT op_id::text AS op_id, parent_op_id::text AS parent_op_id, - operation_type, + CASE + WHEN operation_type = 'km_linked' + AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched' + THEN 'log_controlled_writeback_dispatched' + ELSE operation_type + END AS operation_type, status, actor, coalesce(incident_id::text, input ->> 'incident_id') AS incident_id, @@ -3581,14 +3597,20 @@ _RUNTIME_OPERATION_LATEST_SQL = """ duration_ms, created_at FROM automation_operation_log - WHERE operation_type IN ( - 'ansible_candidate_matched', - 'ansible_check_mode_executed', - 'ansible_apply_executed', - 'ansible_learning_writeback_recorded', - 'ansible_rollback_executed', - 'ansible_execution_skipped', - 'log_controlled_writeback_dispatched' + WHERE ( + operation_type IN ( + 'ansible_candidate_matched', + 'ansible_check_mode_executed', + 'ansible_apply_executed', + 'ansible_learning_writeback_recorded', + 'ansible_rollback_executed', + 'ansible_execution_skipped', + 'log_controlled_writeback_dispatched' + ) + OR ( + operation_type = 'km_linked' + AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched' + ) ) ORDER BY created_at DESC LIMIT :limit diff --git a/apps/api/src/services/ai_agent_log_controlled_writeback_dispatch.py b/apps/api/src/services/ai_agent_log_controlled_writeback_dispatch.py index ca59a563..028cd638 100644 --- a/apps/api/src/services/ai_agent_log_controlled_writeback_dispatch.py +++ b/apps/api/src/services/ai_agent_log_controlled_writeback_dispatch.py @@ -20,6 +20,7 @@ from src.services.ai_agent_log_controlled_writeback_executor_readback import ( SCHEMA_VERSION = "ai_agent_log_controlled_writeback_dispatch_receipt_v1" OPERATION_TYPE = "log_controlled_writeback_dispatched" +FALLBACK_LEDGER_OPERATION_TYPE = "km_linked" EXECUTOR_ROUTE = "ai_agent_metadata_writeback_executor" DEFAULT_PROJECT_ID = "awoooi" @@ -37,14 +38,22 @@ async def dispatch_latest_ai_agent_log_controlled_writeback( rows = [] async with get_db_context(project_id) as db: + ledger_operation_type = await _resolve_ledger_operation_type(db) for batch in receipt["dispatch_batches"]: - inserted = await _insert_dispatch_row(db, project_id=project_id, batch=batch) + inserted = await _insert_dispatch_row( + db, + project_id=project_id, + batch=batch, + ledger_operation_type=ledger_operation_type, + ) rows.append(inserted) inserted_count = sum(1 for row in rows if row["created"] is True) existing_count = sum(1 for row in rows if row["created"] is False) receipt["dispatch_result"] = { "operation_type": OPERATION_TYPE, + "ledger_operation_type": ledger_operation_type, + "semantic_operation_type": OPERATION_TYPE, "inserted_count": inserted_count, "existing_count": existing_count, "ledger_row_count": len(rows), @@ -217,6 +226,7 @@ async def _insert_dispatch_row( *, project_id: str, batch: dict[str, Any], + ledger_operation_type: str, ) -> dict[str, Any]: result = await db.execute( text(""" @@ -225,7 +235,7 @@ async def _insert_dispatch_row( input, output, dry_run_result, tags ) SELECT - :operation_type, + :ledger_operation_type, :actor, 'success', CAST(:input AS jsonb), @@ -235,19 +245,25 @@ async def _insert_dispatch_row( WHERE NOT EXISTS ( SELECT 1 FROM automation_operation_log existing - WHERE existing.operation_type = :operation_type + WHERE coalesce( + existing.input ->> 'semantic_operation_type', + existing.operation_type + ) = :operation_type AND existing.input ->> 'dispatch_receipt_id' = :dispatch_receipt_id ) RETURNING op_id::text """), { "operation_type": OPERATION_TYPE, + "ledger_operation_type": ledger_operation_type, "actor": EXECUTOR_ROUTE, "dispatch_receipt_id": batch["dispatch_receipt_id"], "input": json.dumps( { "schema_version": SCHEMA_VERSION, "project_id": project_id, + "semantic_operation_type": OPERATION_TYPE, + "ledger_operation_type": ledger_operation_type, "dispatch_receipt_id": batch["dispatch_receipt_id"], "batch_id": batch["batch_id"], "target": batch["target"], @@ -262,6 +278,8 @@ async def _insert_dispatch_row( "output": json.dumps( { "executor_route": EXECUTOR_ROUTE, + "semantic_operation_type": OPERATION_TYPE, + "ledger_operation_type": ledger_operation_type, "ledger_receipt_recorded": True, "post_apply_verifier_refs": batch["post_apply_verifier_refs"], "next_action": "consume_metadata_receipt_in_km_rag_playbook_agent_context", @@ -305,7 +323,7 @@ async def _insert_dispatch_row( text(""" SELECT op_id::text FROM automation_operation_log - WHERE operation_type = :operation_type + WHERE coalesce(input ->> 'semantic_operation_type', operation_type) = :operation_type AND input ->> 'dispatch_receipt_id' = :dispatch_receipt_id ORDER BY created_at DESC LIMIT 1 @@ -323,6 +341,31 @@ async def _insert_dispatch_row( } +async def _resolve_ledger_operation_type(db: Any) -> str: + result = await db.execute( + text(""" + SELECT CASE + WHEN EXISTS ( + SELECT 1 + FROM pg_constraint + WHERE conname = 'automation_operation_log_type_valid' + AND pg_get_constraintdef(oid) LIKE '%' || :operation_type || '%' + ) + THEN :operation_type + ELSE :fallback_operation_type + END + """), + { + "operation_type": OPERATION_TYPE, + "fallback_operation_type": FALLBACK_LEDGER_OPERATION_TYPE, + }, + ) + resolved = str(result.scalar() or "") + if resolved == OPERATION_TYPE: + return OPERATION_TYPE + return FALLBACK_LEDGER_OPERATION_TYPE + + def _strings(value: Any) -> list[str]: if not isinstance(value, list): return [] diff --git a/apps/api/tests/test_ai_agent_autonomous_runtime_control.py b/apps/api/tests/test_ai_agent_autonomous_runtime_control.py index 0d5a0612..ed81b74f 100644 --- a/apps/api/tests/test_ai_agent_autonomous_runtime_control.py +++ b/apps/api/tests/test_ai_agent_autonomous_runtime_control.py @@ -55,6 +55,16 @@ def _assert_log_controlled_writeback_executor(payload: dict): def test_runtime_receipt_auxiliary_sql_keeps_source_family_counts_schema_safe(): + from src.services.ai_agent_autonomous_runtime_control import ( + _RUNTIME_OPERATION_COUNTS_SQL, + _RUNTIME_OPERATION_LATEST_SQL, + ) + + assert "semantic_operation_type" in _RUNTIME_OPERATION_COUNTS_SQL + assert "km_linked" in _RUNTIME_OPERATION_COUNTS_SQL + assert "log_controlled_writeback_dispatched" in _RUNTIME_OPERATION_COUNTS_SQL + assert "semantic_operation_type" in _RUNTIME_OPERATION_LATEST_SQL + assert "km_linked" in _RUNTIME_OPERATION_LATEST_SQL assert "GROUP BY coalesce(status, 'unknown')" in _RUNTIME_TIMELINE_COUNTS_SQL assert "FROM timeline_events" in _RUNTIME_TIMELINE_COUNTS_SQL assert "count(*) AS total" in _RUNTIME_PLAYBOOK_TRUST_COUNTS_FALLBACK_SQL diff --git a/apps/api/tests/test_ai_agent_log_controlled_writeback_dispatch_api.py b/apps/api/tests/test_ai_agent_log_controlled_writeback_dispatch_api.py index b092c5b4..f6d87b14 100644 --- a/apps/api/tests/test_ai_agent_log_controlled_writeback_dispatch_api.py +++ b/apps/api/tests/test_ai_agent_log_controlled_writeback_dispatch_api.py @@ -25,10 +25,13 @@ class _FakeDb: def __init__(self): self.params: list[dict] = [] - async def execute(self, _statement, params: dict): + async def execute(self, statement, params: dict): self.params.append(params) + if "fallback_operation_type" in params: + return _FakeResult("km_linked") if "input" in params: return _FakeResult(f"op-{params['dispatch_receipt_id'].rsplit('::', 1)[-1]}") + assert "pg_get_constraintdef" not in str(statement) return _FakeResult("existing-op") @@ -95,12 +98,21 @@ async def test_log_controlled_writeback_dispatch_writes_idempotent_ledger_rows(m assert payload["rollups"]["dispatch_ledger_row_count"] == 6 assert payload["rollups"]["inserted_dispatch_ledger_row_count"] == 6 assert payload["operation_boundaries"]["executor_dispatch_performed"] is True - assert len(fake_db.params) == 6 + write_params = [params for params in fake_db.params if "input" in params] + assert len(write_params) == 6 + assert payload["dispatch_result"]["operation_type"] == "log_controlled_writeback_dispatched" + assert payload["dispatch_result"]["ledger_operation_type"] == "km_linked" + assert payload["dispatch_result"]["semantic_operation_type"] == ( + "log_controlled_writeback_dispatched" + ) - for params in fake_db.params: + for params in write_params: assert params["operation_type"] == "log_controlled_writeback_dispatched" + assert params["ledger_operation_type"] == "km_linked" assert params["actor"] == "ai_agent_metadata_writeback_executor" assert "dispatch_receipt_id" in params + assert '"semantic_operation_type": "log_controlled_writeback_dispatched"' in params["input"] + assert '"ledger_operation_type": "km_linked"' in params["input"] assert "raw_payload_included" in params["input"] assert "False" not in params["input"] assert "telegram_send_performed" in params["output"]