From cedf97b0aa30218bf10d65a042475c0bea2ed4b9 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 1 Jul 2026 07:50:02 +0800 Subject: [PATCH] feat(agent): expose queue readback normalizer contract --- ..._controlled_writeback_executor_readback.py | 36 +++++++++++++++++++ .../awoooi_priority_work_order_readback.py | 34 ++++++++++++++++-- ...trolled_writeback_executor_readback_api.py | 12 +++++++ ...awoooi_priority_work_order_readback_api.py | 22 ++++++++++++ 4 files changed, 102 insertions(+), 2 deletions(-) diff --git a/apps/api/src/services/ai_agent_log_controlled_writeback_executor_readback.py b/apps/api/src/services/ai_agent_log_controlled_writeback_executor_readback.py index 9041b525..4092f622 100644 --- a/apps/api/src/services/ai_agent_log_controlled_writeback_executor_readback.py +++ b/apps/api/src/services/ai_agent_log_controlled_writeback_executor_readback.py @@ -353,6 +353,10 @@ def _current_blocker_queue_item(recovery: dict[str, Any]) -> dict[str, Any]: "harbor_recovery_receipt_input_count": len( _harbor_recovery_receipt_inputs() ), + "queue_readback_normalizer_contract": _queue_readback_normalizer_contract(), + "queue_readback_normalizer_contract_count": len( + _queue_readback_normalizer_contract() + ), "controlled_local_console_execution_plan": _list_of_dicts( recovery.get("controlled_local_console_execution_plan") ), @@ -454,6 +458,38 @@ def _harbor_recovery_receipt_inputs() -> list[dict[str, Any]]: ] +def _queue_readback_normalizer_contract() -> list[dict[str, Any]]: + return [ + { + "field_id": "cd_run_jobs_payload_classifier", + "purpose": "classify stale or cross-run CD jobs API payloads", + "writes_blockers": [ + "gitea_queue_cd_jobs_head_sha_mismatch", + "gitea_queue_cd_jobs_run_id_mismatch", + "gitea_queue_cd_jobs_stale_or_mismatched", + ], + "learning_targets": ["km", "rag", "playbook", "mcp", "ai_agent"], + }, + { + "field_id": "harbor_110_repair_jobs_payload_classifier", + "purpose": "classify Harbor repair jobs API payload workflow drift", + "writes_blockers": [ + "gitea_queue_harbor_110_repair_jobs_cross_workflow_mismatch", + "gitea_queue_harbor_110_repair_jobs_stale_or_mismatched", + ], + "learning_targets": ["km", "rag", "playbook", "mcp", "ai_agent"], + }, + { + "field_id": "latest_visible_harbor_110_repair_no_matching_runner_label", + "purpose": "classify whether the awoooi-host controlled repair lane is unavailable", + "writes_blockers": [ + "gitea_queue_harbor_110_repair_no_matching_runner", + ], + "learning_targets": ["km", "rag", "playbook", "mcp", "verifier"], + }, + ] + + def _list_of_dicts(value: Any) -> list[dict[str, Any]]: if not isinstance(value, list): return [] diff --git a/apps/api/src/services/awoooi_priority_work_order_readback.py b/apps/api/src/services/awoooi_priority_work_order_readback.py index 8be595b0..68be2969 100644 --- a/apps/api/src/services/awoooi_priority_work_order_readback.py +++ b/apps/api/src/services/awoooi_priority_work_order_readback.py @@ -542,6 +542,15 @@ def apply_ai_loop_current_blocker_execution_queue( for item in harbor_recovery_receipt_inputs if item.get("input_id") ] + queue_readback_normalizer_contract = [ + _dict(item) + for item in _list(first_item.get("queue_readback_normalizer_contract")) + ] + queue_readback_normalizer_field_ids = [ + str(item.get("field_id") or "") + for item in queue_readback_normalizer_contract + if item.get("field_id") + ] forbidden_runtime_actions = _strings(first_item.get("forbidden_runtime_actions")) external_blocker = str(first_item.get("external_control_path_blocker") or "") pressure_blocker = str(first_item.get("control_path_pressure_blocker") or "") @@ -580,6 +589,12 @@ def apply_ai_loop_current_blocker_execution_queue( state["ai_loop_current_blocker_harbor_recovery_receipt_input_ids"] = ( harbor_recovery_receipt_input_ids ) + state["ai_loop_current_blocker_queue_readback_normalizer_contract_count"] = len( + queue_readback_normalizer_contract + ) + state["ai_loop_current_blocker_queue_readback_normalizer_field_ids"] = ( + queue_readback_normalizer_field_ids + ) state["ai_loop_current_blocker_forbidden_runtime_action_count"] = len( forbidden_runtime_actions ) @@ -650,6 +665,12 @@ def apply_ai_loop_current_blocker_execution_queue( evidence["ai_loop_current_blocker_harbor_recovery_receipt_inputs"] = ( harbor_recovery_receipt_inputs ) + evidence["ai_loop_current_blocker_queue_readback_normalizer_contract"] = ( + queue_readback_normalizer_contract + ) + evidence["ai_loop_current_blocker_queue_readback_normalizer_field_ids"] = ( + queue_readback_normalizer_field_ids + ) evidence["ai_loop_current_blocker_forbidden_runtime_actions"] = ( forbidden_runtime_actions ) @@ -671,13 +692,16 @@ def apply_ai_loop_current_blocker_execution_queue( "writeback targets. The next action is not generic manual triage; " "it is 110 control-path readback plus the bounded local recovery " "package, ordered local-console phases, and post-apply verifier." + " Queue readback classifiers are normalized into explicit AI " + "blockers before KM/RAG/MCP/PlayBook writeback." ) professional_fix = _dict(workplan.setdefault("professional_fix", {})) professional_fix["action"] = ( "Run the AI Loop current blocker package in order: " f"{', '.join(local_console_phase_ids)}, then rerun " "Harbor queue / registry /v2/ / deploy-marker readbacks and write " - "metadata-only KM/RAG/MCP/PlayBook receipts. Do not restart Docker " + "metadata-only KM/RAG/MCP/PlayBook receipts from normalized queue " + "classifier fields. Do not restart Docker " "daemon, reboot hosts, drain nodes, switch registry provider, " "trigger workflows, or read runner tokens from this lane." ) @@ -692,7 +716,7 @@ def apply_ai_loop_current_blocker_execution_queue( f"{blocker_id} queue item through 110 control-path readback, " f"{len(local_console_plan)} ordered local-console phases, post-recovery " "readback commands, Harbor receipt inputs, and metadata-only " - "KM/RAG/MCP/PlayBook writeback." + "KM/RAG/MCP/PlayBook writeback from normalized queue classifiers." ), ( "P0-006-HARBOR-REGISTRY-CONTROLLED-RECOVERY-PREFLIGHT: after the " @@ -735,6 +759,12 @@ def apply_ai_loop_current_blocker_execution_queue( summary["ai_loop_current_blocker_harbor_recovery_receipt_input_ids"] = ( harbor_recovery_receipt_input_ids ) + summary["ai_loop_current_blocker_queue_readback_normalizer_contract_count"] = len( + queue_readback_normalizer_contract + ) + summary["ai_loop_current_blocker_queue_readback_normalizer_field_ids"] = ( + queue_readback_normalizer_field_ids + ) summary["ai_loop_current_blocker_forbidden_runtime_action_count"] = len( forbidden_runtime_actions ) diff --git a/apps/api/tests/test_ai_agent_log_controlled_writeback_executor_readback_api.py b/apps/api/tests/test_ai_agent_log_controlled_writeback_executor_readback_api.py index 7b5c8935..eff49588 100644 --- a/apps/api/tests/test_ai_agent_log_controlled_writeback_executor_readback_api.py +++ b/apps/api/tests/test_ai_agent_log_controlled_writeback_executor_readback_api.py @@ -150,6 +150,18 @@ def _assert_executor_readback(payload: dict, *, public_endpoint: bool = False): assert current_queue[0]["harbor_recovery_receipt_inputs"][-1][ "expected_schema" ] == "awoooi_production_deploy_readback_blocker_v1" + assert current_queue[0]["queue_readback_normalizer_contract_count"] == 3 + assert [ + item["field_id"] + for item in current_queue[0]["queue_readback_normalizer_contract"] + ] == [ + "cd_run_jobs_payload_classifier", + "harbor_110_repair_jobs_payload_classifier", + "latest_visible_harbor_110_repair_no_matching_runner_label", + ] + assert "gitea_queue_cd_jobs_stale_or_mismatched" in current_queue[0][ + "queue_readback_normalizer_contract" + ][0]["writes_blockers"] assert all( item["metadata_only"] is True and item["raw_output_allowed"] is False diff --git a/apps/api/tests/test_awoooi_priority_work_order_readback_api.py b/apps/api/tests/test_awoooi_priority_work_order_readback_api.py index f0f56970..1141e034 100644 --- a/apps/api/tests/test_awoooi_priority_work_order_readback_api.py +++ b/apps/api/tests/test_awoooi_priority_work_order_readback_api.py @@ -343,6 +343,18 @@ def test_awoooi_priority_work_order_readback_overlays_ai_loop_current_blocker_qu assert evidence["ai_loop_current_blocker_harbor_recovery_receipt_inputs"][-1][ "expected_schema" ] == "awoooi_production_deploy_readback_blocker_v1" + assert evidence["ai_loop_current_blocker_queue_readback_normalizer_field_ids"] == [ + "cd_run_jobs_payload_classifier", + "harbor_110_repair_jobs_payload_classifier", + "latest_visible_harbor_110_repair_no_matching_runner_label", + ] + assert evidence["ai_loop_current_blocker_queue_readback_normalizer_contract"][0][ + "writes_blockers" + ] == [ + "gitea_queue_cd_jobs_head_sha_mismatch", + "gitea_queue_cd_jobs_run_id_mismatch", + "gitea_queue_cd_jobs_stale_or_mismatched", + ] assert all( item["metadata_only"] is True and item["raw_output_allowed"] is False @@ -380,6 +392,15 @@ def test_awoooi_priority_work_order_readback_overlays_ai_loop_current_blocker_qu assert payload["summary"][ "ai_loop_current_blocker_harbor_recovery_receipt_input_ids" ][-1] == "deploy_marker_readback" + assert ( + payload["summary"][ + "ai_loop_current_blocker_queue_readback_normalizer_contract_count" + ] + == 3 + ) + assert payload["summary"][ + "ai_loop_current_blocker_queue_readback_normalizer_field_ids" + ][0] == "cd_run_jobs_payload_classifier" assert payload["summary"][ "ai_loop_current_blocker_forbidden_runtime_action_count" ] == 6 @@ -388,6 +409,7 @@ def test_awoooi_priority_work_order_readback_overlays_ai_loop_current_blocker_qu ) assert "5 ordered local-console phases" in payload["next_execution_order"][0] assert "Harbor receipt inputs" in payload["next_execution_order"][0] + assert "normalized queue classifiers" in payload["next_execution_order"][0] def test_awoooi_priority_work_order_readback_overlays_live_stockplatform_drift():