"""Current AI Agent autonomous runtime control plane. This read model is the current directive layer. Historical P2 snapshots can still describe earlier no-send / no-live states, but this payload states what the product should enforce now: low, medium, and high risk routes may proceed through controlled automation when allowlist, check-mode, verifier, rollback, KM, and Telegram receipts are present. """ from __future__ import annotations from collections.abc import Iterable, Mapping from datetime import datetime, timezone from typing import Any from src.core.config import settings from src.core.logging import get_logger from sqlalchemy import text from src.db.base import get_db_context from src.services.report_generation_service import ( DAILY_REPORT_HOUR_TAIPEI, MONTHLY_REPORT_DAY_TAIPEI, MONTHLY_REPORT_HOUR_TAIPEI, WEEKLY_REPORT_HOUR_TAIPEI, WEEKLY_REPORT_WEEKDAY_TAIPEI, ) _SCHEMA_VERSION = "ai_agent_autonomous_runtime_control_v1" _RUNTIME_AUTHORITY = "current_owner_directive_controlled_ai_automation" _DEPLOY_READBACK_MARKER = "p2_416_d1n_autonomous_runtime_control_prod_readback_v2" _DEPLOY_ATTEMPT_NOTE = "cd_3673_retry_after_host_pressure_gate_fix" _LIVE_READBACK_SCHEMA_VERSION = "ai_agent_autonomous_runtime_receipt_readback_v1" _DEFAULT_PROJECT_ID = "awoooi" _DEFAULT_LOOKBACK_HOURS = 24 # CD cancel-stale-cd triggers are workflow-only and must not change payloads. _EXECUTOR_OPERATION_TYPES = ( "ansible_candidate_matched", "ansible_check_mode_executed", "ansible_apply_executed", "ansible_rollback_executed", "ansible_execution_skipped", ) logger = get_logger(__name__) def _allowed_risk_levels() -> list[str]: raw = str(settings.AWOOOP_ANSIBLE_CONTROLLED_APPLY_ALLOWED_RISK_LEVELS or "") return sorted({item.strip().lower() for item in raw.split(",") if item.strip()}) def _utc_iso(value: Any) -> str | None: if value is None: return None if isinstance(value, datetime): if value.tzinfo is None: value = value.replace(tzinfo=timezone.utc) return value.astimezone(timezone.utc).isoformat() return str(value) def _row_mapping(row: Mapping[str, Any] | Any) -> dict[str, Any]: if isinstance(row, Mapping): return dict(row) mapping = getattr(row, "_mapping", None) if mapping is not None: return dict(mapping) return dict(row) def _int_value(value: Any) -> int: try: return int(value or 0) except (TypeError, ValueError): return 0 def _sanitize_latest_rows( rows: Iterable[Mapping[str, Any] | Any], *, allowed_keys: tuple[str, ...], time_keys: tuple[str, ...] = ("created_at", "collected_at", "queued_at", "sent_at"), limit: int = 5, ) -> list[dict[str, Any]]: clean_rows: list[dict[str, Any]] = [] for row in rows: item = _row_mapping(row) clean: dict[str, Any] = {} for key in allowed_keys: if key not in item: continue value = item.get(key) clean[key] = _utc_iso(value) if key in time_keys else value clean_rows.append(clean) if len(clean_rows) >= limit: break return clean_rows def _operation_counts( rows: Iterable[Mapping[str, Any] | Any], ) -> dict[str, dict[str, Any]]: counts = { operation_type: { "total": 0, "recent": 0, "by_status": {}, } for operation_type in _EXECUTOR_OPERATION_TYPES } for row in rows: item = _row_mapping(row) operation_type = str(item.get("operation_type") or "unknown") status = str(item.get("status") or "unknown") bucket = counts.setdefault( operation_type, { "total": 0, "recent": 0, "by_status": {}, }, ) total = _int_value(item.get("total")) recent = _int_value(item.get("recent")) bucket["total"] += total bucket["recent"] += recent bucket["by_status"][status] = bucket["by_status"].get(status, 0) + total return counts def _status_counts( rows: Iterable[Mapping[str, Any] | Any], *, status_key: str, ) -> dict[str, Any]: by_status: dict[str, int] = {} total = 0 recent = 0 for row in rows: item = _row_mapping(row) status = str(item.get(status_key) or "unknown") row_total = _int_value(item.get("total")) by_status[status] = by_status.get(status, 0) + row_total total += row_total recent += _int_value(item.get("recent")) return { "total": total, "recent": recent, "by_status": by_status, } def _latest_flow_closure( *, operation_latest_rows: Iterable[Mapping[str, Any] | Any], verifier_latest_rows: Iterable[Mapping[str, Any] | Any], km_latest_rows: Iterable[Mapping[str, Any] | Any], telegram_latest_rows: Iterable[Mapping[str, Any] | Any], ) -> dict[str, Any]: operation_rows = [_row_mapping(row) for row in operation_latest_rows] verifier_rows = [_row_mapping(row) for row in verifier_latest_rows] km_rows = [_row_mapping(row) for row in km_latest_rows] telegram_rows = [_row_mapping(row) for row in telegram_latest_rows] latest_apply = next( ( row for row in operation_rows if str(row.get("operation_type") or "") == "ansible_apply_executed" ), None, ) if latest_apply is None: return { "apply_op_id": None, "incident_id": None, "has_post_apply_verifier": False, "has_km_writeback": False, "has_telegram_receipt": False, "closed": False, "missing": [ "ansible_apply_executed", "post_apply_verifier", "km_writeback", "telegram_receipt", ], } apply_op_id = str(latest_apply.get("op_id") or "") incident_id = str(latest_apply.get("incident_id") or "") km_path_type = f"ansible_apply_receipt:{apply_op_id[:8]}" if apply_op_id else "" has_verifier = any( str(row.get("apply_op_id") or "") == apply_op_id for row in verifier_rows ) has_km = any( str(row.get("path_type") or "") == km_path_type or ( incident_id and str(row.get("related_incident_id") or "") == incident_id ) for row in km_rows ) has_telegram = any( str(row.get("send_status") or "") == "sent" and str(row.get("action") or "") == "controlled_apply_result" and ( not incident_id or str(row.get("incident_id") or "") == incident_id ) for row in telegram_rows ) missing = [ name for name, present in ( ("post_apply_verifier", has_verifier), ("km_writeback", has_km), ("telegram_receipt", has_telegram), ) if not present ] return { "apply_op_id": apply_op_id or None, "incident_id": incident_id or None, "has_post_apply_verifier": has_verifier, "has_km_writeback": has_km, "has_telegram_receipt": has_telegram, "closed": not missing, "missing": missing, } def build_runtime_receipt_readback_from_rows( *, project_id: str = _DEFAULT_PROJECT_ID, lookback_hours: int = _DEFAULT_LOOKBACK_HOURS, db_read_status: str = "ok", operation_count_rows: Iterable[Mapping[str, Any] | Any] = (), operation_latest_rows: Iterable[Mapping[str, Any] | Any] = (), verifier_count_rows: Iterable[Mapping[str, Any] | Any] = (), verifier_latest_rows: Iterable[Mapping[str, Any] | Any] = (), km_count_rows: Iterable[Mapping[str, Any] | Any] = (), km_latest_rows: Iterable[Mapping[str, Any] | Any] = (), telegram_count_rows: Iterable[Mapping[str, Any] | Any] = (), telegram_latest_rows: Iterable[Mapping[str, Any] | Any] = (), error_type: str | None = None, ) -> dict[str, Any]: """Build the live executor receipt readback from already-fetched rows.""" operation_latest = list(operation_latest_rows) verifier_latest = list(verifier_latest_rows) km_latest = list(km_latest_rows) telegram_latest = list(telegram_latest_rows) operation_summary = _operation_counts(operation_count_rows) verifier_summary = _status_counts( verifier_count_rows, status_key="verification_result", ) km_summary = _status_counts(km_count_rows, status_key="status") telegram_summary = _status_counts(telegram_count_rows, status_key="send_status") latest_closure = _latest_flow_closure( operation_latest_rows=operation_latest, verifier_latest_rows=verifier_latest, km_latest_rows=km_latest, telegram_latest_rows=telegram_latest, ) apply_summary = operation_summary.get("ansible_apply_executed") or {} readback = { "schema_version": _LIVE_READBACK_SCHEMA_VERSION, "project_id": project_id, "lookback_hours": max(1, int(lookback_hours or _DEFAULT_LOOKBACK_HOURS)), "db_read_status": db_read_status, "writes_on_read": False, "ansible_operations": { "counts": operation_summary, "latest": _sanitize_latest_rows( operation_latest, allowed_keys=( "op_id", "parent_op_id", "operation_type", "status", "actor", "incident_id", "catalog_id", "playbook_path", "execution_mode", "returncode", "duration_ms", "created_at", ), ), }, "ansible_apply_executed": { "total": _int_value(apply_summary.get("total")), "recent": _int_value(apply_summary.get("recent")), "by_status": apply_summary.get("by_status") or {}, }, "post_apply_verifier": { **verifier_summary, "latest": _sanitize_latest_rows( verifier_latest, allowed_keys=( "id", "incident_id", "matched_playbook_id", "verification_result", "apply_op_id", "catalog_id", "playbook_path", "returncode", "collected_at", ), ), }, "km_writeback": { **km_summary, "latest": _sanitize_latest_rows( km_latest, allowed_keys=( "id", "title", "related_incident_id", "related_playbook_id", "path_type", "status", "created_by", "created_at", ), ), }, "telegram_receipt": { **telegram_summary, "latest": _sanitize_latest_rows( telegram_latest, allowed_keys=( "message_id", "run_id", "message_type", "send_status", "provider_message_id", "incident_id", "action", "queued_at", "sent_at", ), ), }, "latest_flow_closure": latest_closure, } if error_type: readback["error"] = { "type": error_type, "message": "runtime receipt DB read failed; see API logs", } return readback def _attach_runtime_receipt_readback( payload: dict[str, Any], readback: dict[str, Any], ) -> dict[str, Any]: payload["runtime_receipt_readback"] = readback rollups = payload.setdefault("rollups", {}) rollups.update({ "live_ansible_apply_executed_count": _int_value( readback.get("ansible_apply_executed", {}).get("total") ), "live_post_apply_verifier_count": _int_value( readback.get("post_apply_verifier", {}).get("total") ), "live_km_writeback_count": _int_value( readback.get("km_writeback", {}).get("total") ), "live_telegram_receipt_count": _int_value( readback.get("telegram_receipt", {}).get("total") ), "live_executor_latest_flow_closed_count": ( 1 if (readback.get("latest_flow_closure") or {}).get("closed") is True else 0 ), }) return payload def build_ai_agent_autonomous_runtime_control() -> dict[str, Any]: """Build the current AI Agent autonomy control-plane readback.""" allowed_risks = _allowed_risk_levels() report_cadences = [ { "cadence": "daily", "display_name": "日報", "schedule": f"每日 {DAILY_REPORT_HOUR_TAIPEI:02d}:00 台北時間", "worker": "report_generation_service.run_daily_report_loop", "telegram_gateway_delivery_enabled": True, "direct_bot_api_allowed": False, "receipt_source": "daily_report_sent log + Telegram Gateway result", }, { "cadence": "weekly", "display_name": "週報", "schedule": ( f"每週五 {WEEKLY_REPORT_HOUR_TAIPEI:02d}:00 台北時間" if WEEKLY_REPORT_WEEKDAY_TAIPEI == 4 else f"每週 weekday={WEEKLY_REPORT_WEEKDAY_TAIPEI} {WEEKLY_REPORT_HOUR_TAIPEI:02d}:00 台北時間" ), "worker": "report_generation_service.run_weekly_report_loop", "telegram_gateway_delivery_enabled": True, "direct_bot_api_allowed": False, "receipt_source": "weekly_report_sent log + Telegram Gateway result", }, { "cadence": "monthly", "display_name": "月報", "schedule": f"每月 {MONTHLY_REPORT_DAY_TAIPEI} 日 {MONTHLY_REPORT_HOUR_TAIPEI:02d}:00 台北時間", "worker": "report_generation_service.run_monthly_report_loop", "telegram_gateway_delivery_enabled": True, "direct_bot_api_allowed": False, "receipt_source": "monthly_report_sent log + Telegram Gateway result", }, ] executor_receipts = [ { "operation_type": "ansible_candidate_matched", "owner_agent": "Hermes", "purpose": "把修復候選寫入 executor 可認領佇列", "writes_runtime_state": False, }, { "operation_type": "ansible_check_mode_executed", "owner_agent": "AwoooP Ansible check-mode worker", "purpose": "執行 ansible-playbook --check --diff 並留下乾跑收據", "writes_runtime_state": False, }, { "operation_type": "ansible_apply_executed", "owner_agent": "AwoooP controlled apply worker", "purpose": "check-mode 通過後,對 allowlisted low / medium / high PlayBook 受控 apply", "writes_runtime_state": True, }, { "operation_type": "incident_evidence.post_execution_state", "owner_agent": "post_apply_verifier", "purpose": "apply 後寫入 verifier 結果與 post-execution evidence", "writes_runtime_state": True, }, { "operation_type": "knowledge_entries", "owner_agent": "Hermes", "purpose": "把已驗證執行沉澱成 KM / PlayBook trust 候選", "writes_runtime_state": True, }, ] hard_blockers = [ "secret_token_private_key_cookie_session_auth_header_cleartext", "drop_truncate_restore_prune_destructive_database_operation", "reboot_node_drain_irreversible_firewall_or_host_lockout", "credentialed_exploit_or_external_active_scan", "new_paid_provider_cost_ceiling_or_provider_switch_without_replay_shadow_canary", "force_push_delete_repo_refs_or_visibility_change", "critical_or_break_glass_route_without_explicit_break_glass_contract", ] legacy_overrides = [ { "legacy_area": "report_status_board_no_live_send", "current_effect": "overridden", "new_behavior": "日報 / 週報 / 月報透過 Telegram Gateway 排程派送", }, { "legacy_area": "report_live_delivery_owner_review_required", "current_effect": "overridden", "new_behavior": "報告派送走低/中/高風險自動化政策;critical 才 break-glass", }, { "legacy_area": "high_risk_owner_review_queue", "current_effect": "overridden_for_high_non_critical", "new_behavior": "high 風險允許 controlled apply;critical / hard blocker 仍不自動", }, { "legacy_area": "telegram_no_send_preview_only", "current_effect": "overridden", "new_behavior": "用 Telegram Gateway 實送報告與 actionable receipt;不直接暴露 Bot API", }, ] payload = { "schema_version": _SCHEMA_VERSION, "generated_at": datetime.now(timezone.utc).isoformat(), "program_status": { "current_task_id": "P2-416-D1N", "status": "current_directive_control_plane_active", "runtime_authority": _RUNTIME_AUTHORITY, "deploy_readback_marker": _DEPLOY_READBACK_MARKER, "deploy_attempt_note": _DEPLOY_ATTEMPT_NOTE, "legacy_no_send_no_live_rules_overridden": True, "implementation_completion_percent": 88, "status_note": ( "目前有效規則:low / medium / high 風險由 AI Agent 在 allowlist、" "Ansible check-mode、verifier、rollback、KM 與 Telegram receipt 下受控自動處理。" ), }, "current_policy": { "low_risk_controlled_apply_allowed": "low" in allowed_risks, "medium_risk_controlled_apply_allowed": "medium" in allowed_risks, "high_risk_controlled_apply_allowed": "high" in allowed_risks, "critical_break_glass_required": True, "owner_review_required_for_low_medium_high": False, "direct_bot_api_allowed": False, "telegram_gateway_required": True, "post_apply_verifier_required": True, "km_learning_writeback_required": True, }, "runtime_switches": { "ansible_check_mode_worker_enabled": bool(settings.ENABLE_AWOOOP_ANSIBLE_CHECK_MODE_WORKER), "ansible_controlled_apply_enabled": bool(settings.ENABLE_AWOOOP_ANSIBLE_CONTROLLED_APPLY), "ansible_controlled_apply_allowed_risk_levels": allowed_risks, "ansible_check_mode_interval_seconds": settings.AWOOOP_ANSIBLE_CHECK_MODE_INTERVAL_SECONDS, "ansible_check_mode_batch_limit": settings.AWOOOP_ANSIBLE_CHECK_MODE_BATCH_LIMIT, "ansible_check_mode_timeout_seconds": settings.AWOOOP_ANSIBLE_CHECK_MODE_TIMEOUT_SECONDS, "ansible_controlled_apply_timeout_seconds": settings.AWOOOP_ANSIBLE_CONTROLLED_APPLY_TIMEOUT_SECONDS, }, "agent_roles": [ { "agent_id": "openclaw", "role": "仲裁 / hard blocker / replay-shadow-canary gate", "current_job": "只阻擋真正 critical 與 hard blocker,不再用身份保護舊架構", }, { "agent_id": "hermes", "role": "報告 / Telegram digest / KM 與 PlayBook trust writeback", "current_job": "日週月報、收據摘要與 verifier 後學習沉澱", }, { "agent_id": "nemotron", "role": "市場技術雷達 / no-write replay / challenger scorecard", "current_job": "用市場與回放數據挑戰 OpenClaw / provider / Agent 組合", }, { "agent_id": "awooop_ansible_worker", "role": "executor", "current_job": "candidate → check-mode → controlled apply → verifier → KM", }, { "agent_id": "telegram_ops", "role": "Telegram Gateway receipt", "current_job": "群組報告、actionable receipt、失敗告警;不展示敏感值或未脫敏資料", }, ], "report_delivery": { "status": "telegram_gateway_delivery_enabled", "cadences": report_cadences, }, "controlled_executor": { "status": "check_mode_then_apply_enabled" if settings.ENABLE_AWOOOP_ANSIBLE_CONTROLLED_APPLY else "check_mode_only_by_config", "operation_receipts": executor_receipts, "required_flow": [ "allowlisted_candidate", "ansible_check_mode_success", "controlled_apply", "post_apply_verifier", "auto_repair_execution_receipt", "km_learning_writeback", "telegram_receipt_or_alert", ], }, "legacy_policy_overrides": legacy_overrides, "hard_blockers": hard_blockers, "visibility_contract": { "frontend_displays_runtime_truth": True, "work_window_transcript_display_allowed": False, "prompt_body_display_allowed": False, "internal_reasoning_display_allowed": False, "sensitive_value_display_allowed": False, "telegram_unredacted_payload_display_allowed": False, "lan_topology_redaction_required": True, }, "rollups": { "automated_risk_tier_count": sum(1 for risk in ("low", "medium", "high") if risk in allowed_risks), "hard_blocker_count": len(hard_blockers), "report_cadence_enabled_count": len(report_cadences), "telegram_gateway_delivery_enabled_count": sum( 1 for item in report_cadences if item["telegram_gateway_delivery_enabled"] ), "direct_bot_api_allowed_count": 0, "controlled_executor_operation_receipt_count": len(executor_receipts), "runtime_write_receipt_type_count": sum( 1 for item in executor_receipts if item["writes_runtime_state"] ), "legacy_policy_overridden_count": len(legacy_overrides), }, } _attach_runtime_receipt_readback( payload, build_runtime_receipt_readback_from_rows( project_id=_DEFAULT_PROJECT_ID, db_read_status="not_queried", ), ) _validate_payload(payload) return payload async def load_ai_agent_autonomous_runtime_receipt_readback( *, project_id: str = _DEFAULT_PROJECT_ID, lookback_hours: int = _DEFAULT_LOOKBACK_HOURS, limit: int = 20, ) -> dict[str, Any]: """Read live executor receipts without sending messages or mutating runtime state.""" params = { "project_id": project_id, "lookback_hours": max(1, int(lookback_hours or _DEFAULT_LOOKBACK_HOURS)), "limit": max(1, int(limit or 20)), } try: async with get_db_context(project_id) as db: await db.execute(text("SET LOCAL statement_timeout = '5000ms'")) operation_counts = ( await db.execute(text(_RUNTIME_OPERATION_COUNTS_SQL), params) ).mappings().all() operation_latest = ( await db.execute(text(_RUNTIME_OPERATION_LATEST_SQL), params) ).mappings().all() verifier_counts = ( await db.execute(text(_RUNTIME_VERIFIER_COUNTS_SQL), params) ).mappings().all() verifier_latest = ( await db.execute(text(_RUNTIME_VERIFIER_LATEST_SQL), params) ).mappings().all() km_counts = ( await db.execute(text(_RUNTIME_KM_COUNTS_SQL), params) ).mappings().all() km_latest = ( await db.execute(text(_RUNTIME_KM_LATEST_SQL), params) ).mappings().all() telegram_counts = ( await db.execute(text(_RUNTIME_TELEGRAM_COUNTS_SQL), params) ).mappings().all() telegram_latest = ( await db.execute(text(_RUNTIME_TELEGRAM_LATEST_SQL), params) ).mappings().all() except Exception as exc: logger.warning( "ai_agent_autonomous_runtime_receipt_readback_failed", project_id=project_id, error_type=type(exc).__name__, ) return build_runtime_receipt_readback_from_rows( project_id=project_id, lookback_hours=params["lookback_hours"], db_read_status="unavailable", error_type=type(exc).__name__, ) return build_runtime_receipt_readback_from_rows( project_id=project_id, lookback_hours=params["lookback_hours"], db_read_status="ok", operation_count_rows=operation_counts, operation_latest_rows=operation_latest, verifier_count_rows=verifier_counts, verifier_latest_rows=verifier_latest, km_count_rows=km_counts, km_latest_rows=km_latest, telegram_count_rows=telegram_counts, telegram_latest_rows=telegram_latest, ) async def build_ai_agent_autonomous_runtime_control_with_live_readback( *, project_id: str = _DEFAULT_PROJECT_ID, lookback_hours: int = _DEFAULT_LOOKBACK_HOURS, ) -> dict[str, Any]: """Build the control plane and attach live DB receipt readback.""" payload = build_ai_agent_autonomous_runtime_control() readback = await load_ai_agent_autonomous_runtime_receipt_readback( project_id=project_id, lookback_hours=lookback_hours, ) _attach_runtime_receipt_readback(payload, readback) _validate_payload(payload) return payload _RUNTIME_OPERATION_COUNTS_SQL = """ SELECT operation_type, status, count(*) AS total, count(*) FILTER ( WHERE created_at >= NOW() - (:lookback_hours * INTERVAL '1 hour') ) AS recent FROM automation_operation_log WHERE operation_type IN ( 'ansible_candidate_matched', 'ansible_check_mode_executed', 'ansible_apply_executed', 'ansible_rollback_executed', 'ansible_execution_skipped' ) GROUP BY operation_type, status ORDER BY operation_type, status """ _RUNTIME_OPERATION_LATEST_SQL = """ SELECT op_id::text AS op_id, parent_op_id::text AS parent_op_id, operation_type, status, actor, coalesce(incident_id::text, input ->> 'incident_id') AS incident_id, input ->> 'catalog_id' AS catalog_id, coalesce(input ->> 'apply_playbook_path', input ->> 'playbook_path') AS playbook_path, input ->> 'execution_mode' AS execution_mode, coalesce(output ->> 'returncode', dry_run_result ->> 'returncode') AS returncode, duration_ms, created_at FROM automation_operation_log WHERE operation_type IN ( 'ansible_candidate_matched', 'ansible_check_mode_executed', 'ansible_apply_executed', 'ansible_rollback_executed', 'ansible_execution_skipped' ) ORDER BY created_at DESC LIMIT :limit """ _RUNTIME_VERIFIER_COUNTS_SQL = """ SELECT coalesce(verification_result, 'missing') AS verification_result, count(*) AS total, count(*) FILTER ( WHERE collected_at >= NOW() - (:lookback_hours * INTERVAL '1 hour') ) AS recent FROM incident_evidence WHERE post_execution_state ->> 'apply_op_id' IS NOT NULL GROUP BY coalesce(verification_result, 'missing') ORDER BY verification_result """ _RUNTIME_VERIFIER_LATEST_SQL = """ SELECT id, incident_id, matched_playbook_id, coalesce(verification_result, 'missing') AS verification_result, post_execution_state ->> 'apply_op_id' AS apply_op_id, post_execution_state ->> 'catalog_id' AS catalog_id, post_execution_state ->> 'playbook_path' AS playbook_path, post_execution_state ->> 'returncode' AS returncode, collected_at FROM incident_evidence WHERE post_execution_state ->> 'apply_op_id' IS NOT NULL ORDER BY collected_at DESC LIMIT :limit """ _RUNTIME_KM_COUNTS_SQL = """ SELECT status, count(*) AS total, count(*) FILTER ( WHERE created_at >= NOW() - (:lookback_hours * INTERVAL '1 hour') ) AS recent FROM knowledge_entries WHERE project_id = :project_id AND ( path_type LIKE 'ansible_apply_receipt:%' OR tags::text LIKE '%ansible_controlled_apply%' ) GROUP BY status ORDER BY status """ _RUNTIME_KM_LATEST_SQL = """ SELECT id, title, related_incident_id, related_playbook_id, path_type, status, created_by, created_at FROM knowledge_entries WHERE project_id = :project_id AND ( path_type LIKE 'ansible_apply_receipt:%' OR tags::text LIKE '%ansible_controlled_apply%' ) ORDER BY created_at DESC LIMIT :limit """ _RUNTIME_TELEGRAM_COUNTS_SQL = """ SELECT send_status, count(*) AS total, count(*) FILTER ( WHERE queued_at >= NOW() - (:lookback_hours * INTERVAL '1 hour') ) AS recent FROM awooop_outbound_message WHERE project_id = :project_id AND channel_type = 'telegram' AND source_envelope #>> '{callback_reply,action}' = 'controlled_apply_result' GROUP BY send_status ORDER BY send_status """ _RUNTIME_TELEGRAM_LATEST_SQL = """ SELECT message_id::text AS message_id, run_id::text AS run_id, message_type, send_status, provider_message_id, source_envelope #>> '{callback_reply,incident_id}' AS incident_id, source_envelope #>> '{callback_reply,action}' AS action, queued_at, sent_at FROM awooop_outbound_message WHERE project_id = :project_id AND channel_type = 'telegram' AND source_envelope #>> '{callback_reply,action}' = 'controlled_apply_result' ORDER BY queued_at DESC LIMIT :limit """ def _validate_payload(payload: dict[str, Any]) -> None: if payload.get("schema_version") != _SCHEMA_VERSION: raise ValueError(f"schema_version must be {_SCHEMA_VERSION}") status = payload.get("program_status") or {} if status.get("runtime_authority") != _RUNTIME_AUTHORITY: raise ValueError(f"runtime_authority must be {_RUNTIME_AUTHORITY}") if status.get("deploy_readback_marker") != _DEPLOY_READBACK_MARKER: raise ValueError(f"deploy_readback_marker must be {_DEPLOY_READBACK_MARKER}") if status.get("deploy_attempt_note") != _DEPLOY_ATTEMPT_NOTE: raise ValueError(f"deploy_attempt_note must be {_DEPLOY_ATTEMPT_NOTE}") policy = payload.get("current_policy") or {} for key in ( "low_risk_controlled_apply_allowed", "medium_risk_controlled_apply_allowed", "high_risk_controlled_apply_allowed", "telegram_gateway_required", "post_apply_verifier_required", "km_learning_writeback_required", ): if policy.get(key) is not True: raise ValueError(f"current_policy.{key} must be true") if policy.get("owner_review_required_for_low_medium_high") is not False: raise ValueError("owner_review_required_for_low_medium_high must be false") if policy.get("direct_bot_api_allowed") is not False: raise ValueError("direct_bot_api_allowed must be false") visibility = payload.get("visibility_contract") or {} for key in ( "work_window_transcript_display_allowed", "prompt_body_display_allowed", "internal_reasoning_display_allowed", "sensitive_value_display_allowed", "telegram_unredacted_payload_display_allowed", ): if visibility.get(key) is not False: raise ValueError(f"visibility_contract.{key} must remain false")