fix(api): fallback log dispatch ledger operation
All checks were successful
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Successful in 23s
CD Pipeline / build-and-deploy (push) Successful in 8m52s
CD Pipeline / post-deploy-checks (push) Successful in 1m0s
All checks were successful
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Successful in 23s
CD Pipeline / build-and-deploy (push) Successful in 8m52s
CD Pipeline / post-deploy-checks (push) Successful in 1m0s
This commit is contained in:
@@ -3541,24 +3541,35 @@ async def build_ai_agent_autonomous_runtime_control_with_live_readback(
|
||||
|
||||
_RUNTIME_OPERATION_COUNTS_SQL = """
|
||||
SELECT
|
||||
operation_type,
|
||||
CASE
|
||||
WHEN operation_type = 'km_linked'
|
||||
AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched'
|
||||
THEN 'log_controlled_writeback_dispatched'
|
||||
ELSE operation_type
|
||||
END AS operation_type,
|
||||
status,
|
||||
count(*) AS total,
|
||||
count(*) FILTER (
|
||||
WHERE created_at >= NOW() - (:lookback_hours * INTERVAL '1 hour')
|
||||
) AS recent
|
||||
FROM automation_operation_log
|
||||
WHERE operation_type IN (
|
||||
'ansible_candidate_matched',
|
||||
'ansible_check_mode_executed',
|
||||
'ansible_apply_executed',
|
||||
'ansible_learning_writeback_recorded',
|
||||
'ansible_rollback_executed',
|
||||
'ansible_execution_skipped',
|
||||
'log_controlled_writeback_dispatched'
|
||||
WHERE (
|
||||
operation_type IN (
|
||||
'ansible_candidate_matched',
|
||||
'ansible_check_mode_executed',
|
||||
'ansible_apply_executed',
|
||||
'ansible_learning_writeback_recorded',
|
||||
'ansible_rollback_executed',
|
||||
'ansible_execution_skipped',
|
||||
'log_controlled_writeback_dispatched'
|
||||
)
|
||||
OR (
|
||||
operation_type = 'km_linked'
|
||||
AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched'
|
||||
)
|
||||
)
|
||||
GROUP BY operation_type, status
|
||||
ORDER BY operation_type, status
|
||||
GROUP BY 1, status
|
||||
ORDER BY 1, status
|
||||
"""
|
||||
|
||||
|
||||
@@ -3566,7 +3577,12 @@ _RUNTIME_OPERATION_LATEST_SQL = """
|
||||
SELECT
|
||||
op_id::text AS op_id,
|
||||
parent_op_id::text AS parent_op_id,
|
||||
operation_type,
|
||||
CASE
|
||||
WHEN operation_type = 'km_linked'
|
||||
AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched'
|
||||
THEN 'log_controlled_writeback_dispatched'
|
||||
ELSE operation_type
|
||||
END AS operation_type,
|
||||
status,
|
||||
actor,
|
||||
coalesce(incident_id::text, input ->> 'incident_id') AS incident_id,
|
||||
@@ -3581,14 +3597,20 @@ _RUNTIME_OPERATION_LATEST_SQL = """
|
||||
duration_ms,
|
||||
created_at
|
||||
FROM automation_operation_log
|
||||
WHERE operation_type IN (
|
||||
'ansible_candidate_matched',
|
||||
'ansible_check_mode_executed',
|
||||
'ansible_apply_executed',
|
||||
'ansible_learning_writeback_recorded',
|
||||
'ansible_rollback_executed',
|
||||
'ansible_execution_skipped',
|
||||
'log_controlled_writeback_dispatched'
|
||||
WHERE (
|
||||
operation_type IN (
|
||||
'ansible_candidate_matched',
|
||||
'ansible_check_mode_executed',
|
||||
'ansible_apply_executed',
|
||||
'ansible_learning_writeback_recorded',
|
||||
'ansible_rollback_executed',
|
||||
'ansible_execution_skipped',
|
||||
'log_controlled_writeback_dispatched'
|
||||
)
|
||||
OR (
|
||||
operation_type = 'km_linked'
|
||||
AND input ->> 'semantic_operation_type' = 'log_controlled_writeback_dispatched'
|
||||
)
|
||||
)
|
||||
ORDER BY created_at DESC
|
||||
LIMIT :limit
|
||||
|
||||
@@ -20,6 +20,7 @@ from src.services.ai_agent_log_controlled_writeback_executor_readback import (
|
||||
|
||||
SCHEMA_VERSION = "ai_agent_log_controlled_writeback_dispatch_receipt_v1"
|
||||
OPERATION_TYPE = "log_controlled_writeback_dispatched"
|
||||
FALLBACK_LEDGER_OPERATION_TYPE = "km_linked"
|
||||
EXECUTOR_ROUTE = "ai_agent_metadata_writeback_executor"
|
||||
DEFAULT_PROJECT_ID = "awoooi"
|
||||
|
||||
@@ -37,14 +38,22 @@ async def dispatch_latest_ai_agent_log_controlled_writeback(
|
||||
|
||||
rows = []
|
||||
async with get_db_context(project_id) as db:
|
||||
ledger_operation_type = await _resolve_ledger_operation_type(db)
|
||||
for batch in receipt["dispatch_batches"]:
|
||||
inserted = await _insert_dispatch_row(db, project_id=project_id, batch=batch)
|
||||
inserted = await _insert_dispatch_row(
|
||||
db,
|
||||
project_id=project_id,
|
||||
batch=batch,
|
||||
ledger_operation_type=ledger_operation_type,
|
||||
)
|
||||
rows.append(inserted)
|
||||
|
||||
inserted_count = sum(1 for row in rows if row["created"] is True)
|
||||
existing_count = sum(1 for row in rows if row["created"] is False)
|
||||
receipt["dispatch_result"] = {
|
||||
"operation_type": OPERATION_TYPE,
|
||||
"ledger_operation_type": ledger_operation_type,
|
||||
"semantic_operation_type": OPERATION_TYPE,
|
||||
"inserted_count": inserted_count,
|
||||
"existing_count": existing_count,
|
||||
"ledger_row_count": len(rows),
|
||||
@@ -217,6 +226,7 @@ async def _insert_dispatch_row(
|
||||
*,
|
||||
project_id: str,
|
||||
batch: dict[str, Any],
|
||||
ledger_operation_type: str,
|
||||
) -> dict[str, Any]:
|
||||
result = await db.execute(
|
||||
text("""
|
||||
@@ -225,7 +235,7 @@ async def _insert_dispatch_row(
|
||||
input, output, dry_run_result, tags
|
||||
)
|
||||
SELECT
|
||||
:operation_type,
|
||||
:ledger_operation_type,
|
||||
:actor,
|
||||
'success',
|
||||
CAST(:input AS jsonb),
|
||||
@@ -235,19 +245,25 @@ async def _insert_dispatch_row(
|
||||
WHERE NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM automation_operation_log existing
|
||||
WHERE existing.operation_type = :operation_type
|
||||
WHERE coalesce(
|
||||
existing.input ->> 'semantic_operation_type',
|
||||
existing.operation_type
|
||||
) = :operation_type
|
||||
AND existing.input ->> 'dispatch_receipt_id' = :dispatch_receipt_id
|
||||
)
|
||||
RETURNING op_id::text
|
||||
"""),
|
||||
{
|
||||
"operation_type": OPERATION_TYPE,
|
||||
"ledger_operation_type": ledger_operation_type,
|
||||
"actor": EXECUTOR_ROUTE,
|
||||
"dispatch_receipt_id": batch["dispatch_receipt_id"],
|
||||
"input": json.dumps(
|
||||
{
|
||||
"schema_version": SCHEMA_VERSION,
|
||||
"project_id": project_id,
|
||||
"semantic_operation_type": OPERATION_TYPE,
|
||||
"ledger_operation_type": ledger_operation_type,
|
||||
"dispatch_receipt_id": batch["dispatch_receipt_id"],
|
||||
"batch_id": batch["batch_id"],
|
||||
"target": batch["target"],
|
||||
@@ -262,6 +278,8 @@ async def _insert_dispatch_row(
|
||||
"output": json.dumps(
|
||||
{
|
||||
"executor_route": EXECUTOR_ROUTE,
|
||||
"semantic_operation_type": OPERATION_TYPE,
|
||||
"ledger_operation_type": ledger_operation_type,
|
||||
"ledger_receipt_recorded": True,
|
||||
"post_apply_verifier_refs": batch["post_apply_verifier_refs"],
|
||||
"next_action": "consume_metadata_receipt_in_km_rag_playbook_agent_context",
|
||||
@@ -305,7 +323,7 @@ async def _insert_dispatch_row(
|
||||
text("""
|
||||
SELECT op_id::text
|
||||
FROM automation_operation_log
|
||||
WHERE operation_type = :operation_type
|
||||
WHERE coalesce(input ->> 'semantic_operation_type', operation_type) = :operation_type
|
||||
AND input ->> 'dispatch_receipt_id' = :dispatch_receipt_id
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
@@ -323,6 +341,31 @@ async def _insert_dispatch_row(
|
||||
}
|
||||
|
||||
|
||||
async def _resolve_ledger_operation_type(db: Any) -> str:
|
||||
result = await db.execute(
|
||||
text("""
|
||||
SELECT CASE
|
||||
WHEN EXISTS (
|
||||
SELECT 1
|
||||
FROM pg_constraint
|
||||
WHERE conname = 'automation_operation_log_type_valid'
|
||||
AND pg_get_constraintdef(oid) LIKE '%' || :operation_type || '%'
|
||||
)
|
||||
THEN :operation_type
|
||||
ELSE :fallback_operation_type
|
||||
END
|
||||
"""),
|
||||
{
|
||||
"operation_type": OPERATION_TYPE,
|
||||
"fallback_operation_type": FALLBACK_LEDGER_OPERATION_TYPE,
|
||||
},
|
||||
)
|
||||
resolved = str(result.scalar() or "")
|
||||
if resolved == OPERATION_TYPE:
|
||||
return OPERATION_TYPE
|
||||
return FALLBACK_LEDGER_OPERATION_TYPE
|
||||
|
||||
|
||||
def _strings(value: Any) -> list[str]:
|
||||
if not isinstance(value, list):
|
||||
return []
|
||||
|
||||
Reference in New Issue
Block a user