fix(api): fallback log dispatch ledger operation
All checks were successful
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Successful in 23s
CD Pipeline / build-and-deploy (push) Successful in 8m52s
CD Pipeline / post-deploy-checks (push) Successful in 1m0s
All checks were successful
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Successful in 23s
CD Pipeline / build-and-deploy (push) Successful in 8m52s
CD Pipeline / post-deploy-checks (push) Successful in 1m0s
This commit is contained in:
@@ -3541,24 +3541,35 @@ async def build_ai_agent_autonomous_runtime_control_with_live_readback(
|
||||
|
||||
_RUNTIME_OPERATION_COUNTS_SQL = """
|
||||
SELECT
|
||||
operation_type,
|
||||
CASE
|
||||
WHEN operation_type = 'km_linked'
|
||||
AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched'
|
||||
THEN 'log_controlled_writeback_dispatched'
|
||||
ELSE operation_type
|
||||
END AS operation_type,
|
||||
status,
|
||||
count(*) AS total,
|
||||
count(*) FILTER (
|
||||
WHERE created_at >= NOW() - (:lookback_hours * INTERVAL '1 hour')
|
||||
) AS recent
|
||||
FROM automation_operation_log
|
||||
WHERE operation_type IN (
|
||||
'ansible_candidate_matched',
|
||||
'ansible_check_mode_executed',
|
||||
'ansible_apply_executed',
|
||||
'ansible_learning_writeback_recorded',
|
||||
'ansible_rollback_executed',
|
||||
'ansible_execution_skipped',
|
||||
'log_controlled_writeback_dispatched'
|
||||
WHERE (
|
||||
operation_type IN (
|
||||
'ansible_candidate_matched',
|
||||
'ansible_check_mode_executed',
|
||||
'ansible_apply_executed',
|
||||
'ansible_learning_writeback_recorded',
|
||||
'ansible_rollback_executed',
|
||||
'ansible_execution_skipped',
|
||||
'log_controlled_writeback_dispatched'
|
||||
)
|
||||
OR (
|
||||
operation_type = 'km_linked'
|
||||
AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched'
|
||||
)
|
||||
)
|
||||
GROUP BY operation_type, status
|
||||
ORDER BY operation_type, status
|
||||
GROUP BY 1, status
|
||||
ORDER BY 1, status
|
||||
"""
|
||||
|
||||
|
||||
@@ -3566,7 +3577,12 @@ _RUNTIME_OPERATION_LATEST_SQL = """
|
||||
SELECT
|
||||
op_id::text AS op_id,
|
||||
parent_op_id::text AS parent_op_id,
|
||||
operation_type,
|
||||
CASE
|
||||
WHEN operation_type = 'km_linked'
|
||||
AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched'
|
||||
THEN 'log_controlled_writeback_dispatched'
|
||||
ELSE operation_type
|
||||
END AS operation_type,
|
||||
status,
|
||||
actor,
|
||||
coalesce(incident_id::text, input ->> 'incident_id') AS incident_id,
|
||||
@@ -3581,14 +3597,20 @@ _RUNTIME_OPERATION_LATEST_SQL = """
|
||||
duration_ms,
|
||||
created_at
|
||||
FROM automation_operation_log
|
||||
WHERE operation_type IN (
|
||||
'ansible_candidate_matched',
|
||||
'ansible_check_mode_executed',
|
||||
'ansible_apply_executed',
|
||||
'ansible_learning_writeback_recorded',
|
||||
'ansible_rollback_executed',
|
||||
'ansible_execution_skipped',
|
||||
'log_controlled_writeback_dispatched'
|
||||
WHERE (
|
||||
operation_type IN (
|
||||
'ansible_candidate_matched',
|
||||
'ansible_check_mode_executed',
|
||||
'ansible_apply_executed',
|
||||
'ansible_learning_writeback_recorded',
|
||||
'ansible_rollback_executed',
|
||||
'ansible_execution_skipped',
|
||||
'log_controlled_writeback_dispatched'
|
||||
)
|
||||
OR (
|
||||
operation_type = 'km_linked'
|
||||
AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched'
|
||||
)
|
||||
)
|
||||
ORDER BY created_at DESC
|
||||
LIMIT :limit
|
||||
|
||||
@@ -20,6 +20,7 @@ from src.services.ai_agent_log_controlled_writeback_executor_readback import (
|
||||
|
||||
SCHEMA_VERSION = "ai_agent_log_controlled_writeback_dispatch_receipt_v1"
|
||||
OPERATION_TYPE = "log_controlled_writeback_dispatched"
|
||||
FALLBACK_LEDGER_OPERATION_TYPE = "km_linked"
|
||||
EXECUTOR_ROUTE = "ai_agent_metadata_writeback_executor"
|
||||
DEFAULT_PROJECT_ID = "awoooi"
|
||||
|
||||
@@ -37,14 +38,22 @@ async def dispatch_latest_ai_agent_log_controlled_writeback(
|
||||
|
||||
rows = []
|
||||
async with get_db_context(project_id) as db:
|
||||
ledger_operation_type = await _resolve_ledger_operation_type(db)
|
||||
for batch in receipt["dispatch_batches"]:
|
||||
inserted = await _insert_dispatch_row(db, project_id=project_id, batch=batch)
|
||||
inserted = await _insert_dispatch_row(
|
||||
db,
|
||||
project_id=project_id,
|
||||
batch=batch,
|
||||
ledger_operation_type=ledger_operation_type,
|
||||
)
|
||||
rows.append(inserted)
|
||||
|
||||
inserted_count = sum(1 for row in rows if row["created"] is True)
|
||||
existing_count = sum(1 for row in rows if row["created"] is False)
|
||||
receipt["dispatch_result"] = {
|
||||
"operation_type": OPERATION_TYPE,
|
||||
"ledger_operation_type": ledger_operation_type,
|
||||
"semantic_operation_type": OPERATION_TYPE,
|
||||
"inserted_count": inserted_count,
|
||||
"existing_count": existing_count,
|
||||
"ledger_row_count": len(rows),
|
||||
@@ -217,6 +226,7 @@ async def _insert_dispatch_row(
|
||||
*,
|
||||
project_id: str,
|
||||
batch: dict[str, Any],
|
||||
ledger_operation_type: str,
|
||||
) -> dict[str, Any]:
|
||||
result = await db.execute(
|
||||
text("""
|
||||
@@ -225,7 +235,7 @@ async def _insert_dispatch_row(
|
||||
input, output, dry_run_result, tags
|
||||
)
|
||||
SELECT
|
||||
:operation_type,
|
||||
:ledger_operation_type,
|
||||
:actor,
|
||||
'success',
|
||||
CAST(:input AS jsonb),
|
||||
@@ -235,19 +245,25 @@ async def _insert_dispatch_row(
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM automation_operation_log existing
|
||||
WHERE existing.operation_type = :operation_type
|
||||
WHERE coalesce(
|
||||
existing.input ->> 'semantic_operation_type',
|
||||
existing.operation_type
|
||||
) = :operation_type
|
||||
AND existing.input ->> 'dispatch_receipt_id' = :dispatch_receipt_id
|
||||
)
|
||||
RETURNING op_id::text
|
||||
"""),
|
||||
{
|
||||
"operation_type": OPERATION_TYPE,
|
||||
"ledger_operation_type": ledger_operation_type,
|
||||
"actor": EXECUTOR_ROUTE,
|
||||
"dispatch_receipt_id": batch["dispatch_receipt_id"],
|
||||
"input": json.dumps(
|
||||
{
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"project_id": project_id,
|
||||
"semantic_operation_type": OPERATION_TYPE,
|
||||
"ledger_operation_type": ledger_operation_type,
|
||||
"dispatch_receipt_id": batch["dispatch_receipt_id"],
|
||||
"batch_id": batch["batch_id"],
|
||||
"target": batch["target"],
|
||||
@@ -262,6 +278,8 @@ async def _insert_dispatch_row(
|
||||
"output": json.dumps(
|
||||
{
|
||||
"executor_route": EXECUTOR_ROUTE,
|
||||
"semantic_operation_type": OPERATION_TYPE,
|
||||
"ledger_operation_type": ledger_operation_type,
|
||||
"ledger_receipt_recorded": True,
|
||||
"post_apply_verifier_refs": batch["post_apply_verifier_refs"],
|
||||
"next_action": "consume_metadata_receipt_in_km_rag_playbook_agent_context",
|
||||
@@ -305,7 +323,7 @@ async def _insert_dispatch_row(
|
||||
text("""
|
||||
SELECT op_id::text
|
||||
FROM automation_operation_log
|
||||
WHERE operation_type = :operation_type
|
||||
WHERE coalesce(input ->> 'semantic_operation_type', operation_type) = :operation_type
|
||||
AND input ->> 'dispatch_receipt_id' = :dispatch_receipt_id
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
@@ -323,6 +341,31 @@ async def _insert_dispatch_row(
|
||||
}
|
||||
|
||||
|
||||
async def _resolve_ledger_operation_type(db: Any) -> str:
|
||||
result = await db.execute(
|
||||
text("""
|
||||
SELECT CASE
|
||||
WHEN EXISTS (
|
||||
SELECT 1
|
||||
FROM pg_constraint
|
||||
WHERE conname = 'automation_operation_log_type_valid'
|
||||
AND pg_get_constraintdef(oid) LIKE '%' || :operation_type || '%'
|
||||
)
|
||||
THEN :operation_type
|
||||
ELSE :fallback_operation_type
|
||||
END
|
||||
"""),
|
||||
{
|
||||
"operation_type": OPERATION_TYPE,
|
||||
"fallback_operation_type": FALLBACK_LEDGER_OPERATION_TYPE,
|
||||
},
|
||||
)
|
||||
resolved = str(result.scalar() or "")
|
||||
if resolved == OPERATION_TYPE:
|
||||
return OPERATION_TYPE
|
||||
return FALLBACK_LEDGER_OPERATION_TYPE
|
||||
|
||||
|
||||
def _strings(value: Any) -> list[str]:
|
||||
if not isinstance(value, list):
|
||||
return []
|
||||
|
||||
@@ -55,6 +55,16 @@ def _assert_log_controlled_writeback_executor(payload: dict):
|
||||
|
||||
|
||||
def test_runtime_receipt_auxiliary_sql_keeps_source_family_counts_schema_safe():
|
||||
from src.services.ai_agent_autonomous_runtime_control import (
|
||||
_RUNTIME_OPERATION_COUNTS_SQL,
|
||||
_RUNTIME_OPERATION_LATEST_SQL,
|
||||
)
|
||||
|
||||
assert "semantic_operation_type" in _RUNTIME_OPERATION_COUNTS_SQL
|
||||
assert "km_linked" in _RUNTIME_OPERATION_COUNTS_SQL
|
||||
assert "log_controlled_writeback_dispatched" in _RUNTIME_OPERATION_COUNTS_SQL
|
||||
assert "semantic_operation_type" in _RUNTIME_OPERATION_LATEST_SQL
|
||||
assert "km_linked" in _RUNTIME_OPERATION_LATEST_SQL
|
||||
assert "GROUP BY coalesce(status, 'unknown')" in _RUNTIME_TIMELINE_COUNTS_SQL
|
||||
assert "FROM timeline_events" in _RUNTIME_TIMELINE_COUNTS_SQL
|
||||
assert "count(*) AS total" in _RUNTIME_PLAYBOOK_TRUST_COUNTS_FALLBACK_SQL
|
||||
|
||||
@@ -25,10 +25,13 @@ class _FakeDb:
|
||||
def __init__(self):
|
||||
self.params: list[dict] = []
|
||||
|
||||
async def execute(self, _statement, params: dict):
|
||||
async def execute(self, statement, params: dict):
|
||||
self.params.append(params)
|
||||
if "fallback_operation_type" in params:
|
||||
return _FakeResult("km_linked")
|
||||
if "input" in params:
|
||||
return _FakeResult(f"op-{params['dispatch_receipt_id'].rsplit('::', 1)[-1]}")
|
||||
assert "pg_get_constraintdef" not in str(statement)
|
||||
return _FakeResult("existing-op")
|
||||
|
||||
|
||||
@@ -95,12 +98,21 @@ async def test_log_controlled_writeback_dispatch_writes_idempotent_ledger_rows(m
|
||||
assert payload["rollups"]["dispatch_ledger_row_count"] == 6
|
||||
assert payload["rollups"]["inserted_dispatch_ledger_row_count"] == 6
|
||||
assert payload["operation_boundaries"]["executor_dispatch_performed"] is True
|
||||
assert len(fake_db.params) == 6
|
||||
write_params = [params for params in fake_db.params if "input" in params]
|
||||
assert len(write_params) == 6
|
||||
assert payload["dispatch_result"]["operation_type"] == "log_controlled_writeback_dispatched"
|
||||
assert payload["dispatch_result"]["ledger_operation_type"] == "km_linked"
|
||||
assert payload["dispatch_result"]["semantic_operation_type"] == (
|
||||
"log_controlled_writeback_dispatched"
|
||||
)
|
||||
|
||||
for params in fake_db.params:
|
||||
for params in write_params:
|
||||
assert params["operation_type"] == "log_controlled_writeback_dispatched"
|
||||
assert params["ledger_operation_type"] == "km_linked"
|
||||
assert params["actor"] == "ai_agent_metadata_writeback_executor"
|
||||
assert "dispatch_receipt_id" in params
|
||||
assert '"semantic_operation_type": "log_controlled_writeback_dispatched"' in params["input"]
|
||||
assert '"ledger_operation_type": "km_linked"' in params["input"]
|
||||
assert "raw_payload_included" in params["input"]
|
||||
assert "False" not in params["input"]
|
||||
assert "telegram_send_performed" in params["output"]
|
||||
|
||||
Reference in New Issue
Block a user