""" AwoooP Operator Console — Platform Operator Service ==================================================== leWOOOgo 積木化:DB 存取集中在 Service 層,Router 不直接引用 get_db。 ADR-106(AwoooP Agent Platform) 2026-05-05 ogt + Claude Sonnet 4.6 """ from __future__ import annotations import asyncio import re import time import uuid from collections import defaultdict from collections.abc import Mapping from datetime import UTC, datetime, timedelta from typing import Any, get_args from urllib.parse import urlencode from uuid import UUID import httpx import structlog from fastapi import HTTPException, status from sqlalchemy import func, select, text, update from sqlalchemy import or_ as sa_or from src.core.config import get_settings from src.db.awooop_models import ( AwoooPContractRevision, AwoooPConversationEvent, AwoooPMcpGatewayAudit, AwoooPOutboundMessage, AwoooPRunState, AwoooPRunStepJournal, ) from src.db.base import get_db_context from src.db.models import IncidentRecord, MCPAuditLog from src.services.audit_sink import write_audit from src.services.awooop_approval_token import issue_approval_token, record_approval from src.services.awooop_truth_chain_service import ( _summarize_gateway_mcp, _summarize_mcp, fetch_truth_chain, ) from src.services.governance_km_stale_review_service import ( query_km_stale_owner_review_completion_queue, ) from src.services.ollama_endpoint_resolver import ( OllamaEndpointSelection, OllamaWorkloadType, resolve_ollama_order, ) from src.services.ollama_failover_manager import ( OllamaEndpoint, OllamaRoutingResult, get_ollama_failover_manager, ) from src.services.ollama_health_monitor import HealthReport, HealthStatus from src.services.run_state_machine import transition logger = structlog.get_logger(__name__) _MAX_CONTRACTS = 200 _DEFAULT_PER_PAGE = 50 _MAX_PER_PAGE = 200 _MAX_EVENTS = 100 _MAX_TIMELINE_ITEMS = 100 _MAX_LIST_CONTEXT_ROWS = 500 _MAX_STEP_SUMMARY_CHARS = 128 _AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS = 12.0 _AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS = 2.5 _REMEDIATION_HISTORY_LIMIT = 20 _INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b") _REMEDIATION_STATUS_FILTERS = { "mcp_observed", "no_evidence", "read_only_dry_run", "write_observed", "blocked", "observed", } _CALLBACK_REPLY_STATUS_FILTERS = { "no_callback", "sent", "fallback_sent", "rescue_sent", "failed", "observed", } _CALLBACK_REPLY_RAW_STATUS_BY_FILTER = { "sent": "callback_reply_sent", "fallback_sent": "callback_reply_fallback_sent", "rescue_sent": "callback_reply_rescue_sent", "failed": "callback_reply_failed", } _CALLBACK_REPLY_ACTION_RE = re.compile(r"^[a-z0-9_:-]{1,64}$", re.IGNORECASE) _CICD_STATUS_FILTERS = {"running", "success", "failed", "pending"} _CICD_STAGE_RE = re.compile(r"^[a-z0-9_:-]{1,64}$", re.IGNORECASE) _AI_ROUTE_STATUS_SCHEMA_VERSION = "awooop_ai_route_status_v1" _AI_ROUTE_WORKLOADS = set(get_args(OllamaWorkloadType)) _AI_ROUTE_REPAIR_EVIDENCE_PROVIDER = "ai_route_repair" _AI_ROUTE_REPAIR_EVIDENCE_STAGE = "repair_diagnosis" _SOURCE_CORRELATION_SCHEMA_VERSION = "source_provider_correlation_v1" _SOURCE_CORRELATION_PROVIDERS = ("sentry", "signoz") _SOURCE_CORRELATION_EVENT_LIMIT = 200 _SOURCE_CORRELATION_LOOKBACK_DAYS = 7 _SOURCE_CORRELATION_PRE_WINDOW_HOURS = 2 _KM_STALE_COMPLETION_CALLBACK_SCHEMA_VERSION = ( "km_stale_owner_review_completion_callback_summary_v1" ) _CALLBACK_EVIDENCE_CAPTURE_STATUS_SCHEMA_VERSION = "callback_evidence_capture_status_v1" _CALLBACK_REPLY_AUDIT_SUMMARY_SCHEMA_VERSION = ( "telegram_callback_reply_audit_summary_v1" ) # ============================================================================= # Tenants # ============================================================================= async def list_tenants() -> dict[str, Any]: """列出所有 AwoooP 租戶(Operator Console,不依 RLS 過濾)。""" async with get_db_context("awoooi") as db: result = await db.execute( text(""" SELECT project_id, display_name, migration_mode, budget_limit_usd, is_active, created_at FROM awooop_operator_list_projects() """) ) rows = list(result.mappings().all()) tenants = [ { "project_id": r["project_id"], "display_name": r["display_name"], "migration_mode": r["migration_mode"], "budget_limit_usd": r["budget_limit_usd"], "is_active": r["is_active"], "created_at": r["created_at"], } for r in rows ] return {"tenants": tenants, "total": len(tenants)} # ============================================================================= # Contracts # ============================================================================= async def list_contracts( project_id: str | None, lifecycle_status: str | None, ) -> dict[str, Any]: """列出合約 revisions(可 filter by project_id / lifecycle_status)。""" async with get_db_context("awoooi") as db: stmt = select(AwoooPContractRevision).order_by( AwoooPContractRevision.created_at.desc() ) if project_id is not None: stmt = stmt.where(AwoooPContractRevision.project_id == project_id) if lifecycle_status is not None: stmt = stmt.where( AwoooPContractRevision.lifecycle_status == lifecycle_status ) count_stmt = select(func.count()).select_from(stmt.subquery()) total_result = await db.execute(count_stmt) total = total_result.scalar_one() stmt = stmt.limit(_MAX_CONTRACTS) result = await db.execute(stmt) rows = list(result.scalars().all()) contracts = [ { "revision_id": r.revision_id, "contract_id": r.contract_id, "contract_family": r.contract_family, "lifecycle_status": r.lifecycle_status, "body_hash": r.body_hash, "version_major": r.version_major, "version_minor": r.version_minor, "created_at": r.created_at, "project_id": r.project_id, } for r in rows ] return {"contracts": contracts, "total": total} # ============================================================================= # Runs # ============================================================================= async def list_runs( project_id: str | None, state: str | None, remediation_status: str | None, callback_reply_status: str | None, incident_id: str | None, page: int, per_page: int, ) -> dict[str, Any]: """列出 runs,支援 project/state/evidence/callback/incident filter 與分頁。""" _validate_remediation_status_filter(remediation_status) _validate_callback_reply_status_filter(callback_reply_status) _validate_incident_id_filter(incident_id) async with get_db_context("awoooi") as db: stmt = select(AwoooPRunState).order_by(AwoooPRunState.created_at.desc()) if project_id is not None: stmt = stmt.where(AwoooPRunState.project_id == project_id) if state is not None: stmt = stmt.where(AwoooPRunState.state == state) offset = (page - 1) * per_page if remediation_status or incident_id or callback_reply_status: result = await db.execute(stmt) candidate_rows = list(result.scalars().all()) context_limit = _list_filter_context_limit(len(candidate_rows)) inbound_by_run, outbound_by_run = await _load_run_message_context( db, candidate_rows, limit=context_limit, ) remediation_summaries = await _build_run_remediation_summaries( runs=candidate_rows, inbound_by_run=inbound_by_run, outbound_by_run=outbound_by_run, ) callback_reply_summaries = { row.run_id: _run_callback_reply_summary(outbound_by_run.get(row.run_id, [])) for row in candidate_rows } filtered_rows = [ row for row in candidate_rows if _remediation_summary_matches_status( remediation_summaries.get(row.run_id), remediation_status, ) and _remediation_summary_matches_incident_id( remediation_summaries.get(row.run_id), incident_id, ) and _callback_reply_summary_matches_status( callback_reply_summaries.get(row.run_id), callback_reply_status, ) ] total = len(filtered_rows) rows = filtered_rows[offset : offset + per_page] else: count_stmt = select(func.count()).select_from(stmt.subquery()) total_result = await db.execute(count_stmt) total = total_result.scalar_one() stmt = stmt.offset(offset).limit(per_page) result = await db.execute(stmt) rows = list(result.scalars().all()) inbound_by_run, outbound_by_run = await _load_run_message_context(db, rows) remediation_summaries = await _build_run_remediation_summaries( runs=rows, inbound_by_run=inbound_by_run, outbound_by_run=outbound_by_run, ) callback_reply_summaries = { row.run_id: _run_callback_reply_summary(outbound_by_run.get(row.run_id, [])) for row in rows } runs = [ { "run_id": r.run_id, "project_id": r.project_id, "agent_id": r.agent_id, "state": r.state, "is_shadow": r.is_shadow, "cost_usd": r.cost_usd, "step_count": r.step_count, "created_at": r.created_at, "timeout_at": r.timeout_at, "remediation_summary": remediation_summaries.get(r.run_id), "callback_reply_summary": callback_reply_summaries.get(r.run_id), } for r in rows ] return {"runs": runs, "total": total, "page": page, "per_page": per_page} async def list_callback_replies( project_id: str | None, callback_reply_status: str | None, action: str | None, incident_id: str | None, page: int, per_page: int, ) -> dict[str, Any]: """列出 Telegram detail/history callback reply evidence,不改 runtime 狀態。""" _validate_callback_reply_status_filter(callback_reply_status) callback_action = _validate_callback_reply_action_filter(action) _validate_incident_id_filter(incident_id) if callback_reply_status == "no_callback": return { "items": [], "total": 0, "page": page, "per_page": per_page, } where_clauses = [ "m.source_envelope ? 'callback_reply'", ] params: dict[str, Any] = { "limit": per_page, "offset": (page - 1) * per_page, } if project_id: where_clauses.append("m.project_id = :project_id") params["project_id"] = project_id raw_status = _CALLBACK_REPLY_RAW_STATUS_BY_FILTER.get( str(callback_reply_status or "") ) if raw_status: where_clauses.append( "m.source_envelope #>> '{callback_reply,status}' = :raw_status" ) params["raw_status"] = raw_status elif callback_reply_status == "observed": where_clauses.append( """ COALESCE(m.source_envelope #>> '{callback_reply,status}', '') NOT IN ( 'callback_reply_sent', 'callback_reply_fallback_sent', 'callback_reply_rescue_sent', 'callback_reply_failed' ) """ ) if callback_action: where_clauses.append( "LOWER(m.source_envelope #>> '{callback_reply,action}') = :callback_action" ) params["callback_action"] = callback_action if incident_id: where_clauses.append( "m.source_envelope #>> '{callback_reply,incident_id}' = :incident_id" ) params["incident_id"] = incident_id where_sql = " AND ".join(where_clauses) count_sql = text(f""" SELECT COUNT(*) AS total FROM awooop_outbound_message m WHERE {where_sql} """) list_sql = text(f""" SELECT m.message_id, m.project_id, m.run_id, m.channel_type, m.message_type, m.content_preview, m.provider_message_id, m.send_status, m.send_error, m.queued_at, m.sent_at, m.triggered_by_state, m.source_envelope -> 'callback_reply' AS callback_reply, m.source_envelope -> 'awooop_status_chain' AS persisted_awooop_status_chain, m.source_envelope -> 'km_stale_completion_summary' AS persisted_km_stale_completion_summary, r.agent_id, r.state AS run_state, r.created_at AS run_created_at FROM awooop_outbound_message m LEFT JOIN awooop_run_state r ON r.project_id = m.project_id AND r.run_id = m.run_id WHERE {where_sql} ORDER BY COALESCE(m.sent_at, m.queued_at) DESC, m.message_id DESC LIMIT :limit OFFSET :offset """) async with get_db_context(project_id or "awoooi") as db: count_result = await db.execute(count_sql, params) total = count_result.scalar_one() rows_result = await db.execute(list_sql, params) rows = list(rows_result.mappings().all()) audit_summary = await _fetch_callback_reply_audit_summary( db, project_id=project_id or "awoooi", ) items = [_callback_reply_event_item(row) for row in rows] status_chain_cache: dict[tuple[str, str], dict[str, Any]] = {} km_completion_queue_cache: dict[str, Any] = {} km_completion_summary_cache: dict[tuple[str, str | None], dict[str, Any]] = {} for item in items: incident = item.get("incident_id") item_project_id = str(item.get("project_id") or project_id or "awoooi") if not incident: item["awooop_status_chain"] = _build_awooop_status_chain( incident_ids=[], source_id=None, ) item["km_stale_completion_summary"] = ( _empty_km_stale_completion_summary( project_id=item_project_id, incident_id=None, status_value="no_incident", reason="callback_reply_missing_incident_id", ) ) continue incident_id = str(incident) cache_key = (item_project_id, incident_id) cached = status_chain_cache.get(cache_key) if cached is not None: item["awooop_status_chain"] = cached else: remediation_history = await _fetch_run_remediation_history( [incident_id], limit=5, ) chain = await _fetch_awooop_status_chain( incident_ids=[incident_id], project_id=item_project_id, remediation_history=remediation_history, ) status_chain_cache[cache_key] = chain item["awooop_status_chain"] = chain summary_cache_key = (item_project_id, incident_id) km_summary = km_completion_summary_cache.get(summary_cache_key) if km_summary is None: km_summary = await _fetch_km_stale_completion_summary_for_incident( project_id=item_project_id, incident_id=incident_id, queue_cache=km_completion_queue_cache, ) km_completion_summary_cache[summary_cache_key] = km_summary item["km_stale_completion_summary"] = km_summary return { "items": items, "total": total, "page": page, "per_page": per_page, "summary": audit_summary, } async def _fetch_callback_reply_audit_summary( db: Any, *, project_id: str, ) -> dict[str, Any]: """Summarize Telegram outbound mirror and callback evidence capture coverage.""" result = await db.execute( text(""" WITH outbound AS ( SELECT m.*, EXISTS ( SELECT 1 FROM jsonb_each( CASE WHEN jsonb_typeof( COALESCE( m.source_envelope -> 'source_refs', '{}'::jsonb ) ) = 'object' THEN COALESCE( m.source_envelope -> 'source_refs', '{}'::jsonb ) ELSE '{}'::jsonb END ) AS refs(key, value) WHERE jsonb_typeof(refs.value) = 'array' AND refs.value <> '[]'::jsonb ) AS has_trace_ref FROM awooop_outbound_message m WHERE m.project_id = :project_id AND m.channel_type = 'telegram' ), trace_gap_cutoff AS ( SELECT MAX(COALESCE(sent_at, queued_at)) AS latest_missing_trace_ref_at FROM outbound WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND NOT has_trace_ref ) SELECT COUNT(*) AS outbound_total, COUNT(*) FILTER ( WHERE source_envelope <> '{}'::jsonb ) AS outbound_source_envelope_total, COUNT(*) FILTER ( WHERE source_envelope ? 'source_refs' ) AS outbound_source_refs_total, COUNT(*) FILTER ( WHERE has_trace_ref ) AS outbound_trace_ref_total, COUNT(*) FILTER ( WHERE COALESCE( source_envelope #> '{source_refs,incident_ids}', '[]'::jsonb ) <> '[]'::jsonb ) AS outbound_incident_ref_total, COUNT(*) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' ) AS outbound_reply_markup_total, COUNT(*) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND COALESCE( source_envelope #> '{source_refs,incident_ids}', '[]'::jsonb ) = '[]'::jsonb ) AS outbound_reply_markup_missing_incident_ref_total, COUNT(*) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND COALESCE( source_envelope #> '{source_refs,incident_ids}', '[]'::jsonb ) = '[]'::jsonb AND COALESCE(sent_at, queued_at) >= NOW() - INTERVAL '1 hour' ) AS outbound_reply_markup_missing_incident_ref_recent_1h_total, COUNT(*) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND COALESCE( source_envelope #> '{source_refs,incident_ids}', '[]'::jsonb ) = '[]'::jsonb AND COALESCE(sent_at, queued_at) >= NOW() - INTERVAL '24 hours' ) AS outbound_reply_markup_missing_incident_ref_recent_24h_total, MAX(COALESCE(sent_at, queued_at)) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND COALESCE( source_envelope #> '{source_refs,incident_ids}', '[]'::jsonb ) = '[]'::jsonb ) AS outbound_reply_markup_missing_incident_ref_latest_sent_at, COUNT(*) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND NOT has_trace_ref ) AS outbound_reply_markup_missing_trace_ref_total, COUNT(*) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND NOT has_trace_ref AND COALESCE(sent_at, queued_at) >= NOW() - INTERVAL '1 hour' ) AS outbound_reply_markup_missing_trace_ref_recent_1h_total, COUNT(*) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND NOT has_trace_ref AND COALESCE(sent_at, queued_at) >= NOW() - INTERVAL '24 hours' ) AS outbound_reply_markup_missing_trace_ref_recent_24h_total, MAX(COALESCE(sent_at, queued_at)) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND NOT has_trace_ref ) AS outbound_reply_markup_missing_trace_ref_latest_sent_at, COUNT(*) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND has_trace_ref AND trace_gap_cutoff.latest_missing_trace_ref_at IS NOT NULL AND COALESCE(sent_at, queued_at) > trace_gap_cutoff.latest_missing_trace_ref_at ) AS outbound_reply_markup_trace_ref_after_gap_total, MIN(COALESCE(sent_at, queued_at)) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND has_trace_ref AND trace_gap_cutoff.latest_missing_trace_ref_at IS NOT NULL AND COALESCE(sent_at, queued_at) > trace_gap_cutoff.latest_missing_trace_ref_at ) AS outbound_reply_markup_trace_ref_after_gap_first_sent_at, MAX(COALESCE(sent_at, queued_at)) FILTER ( WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND has_trace_ref AND trace_gap_cutoff.latest_missing_trace_ref_at IS NOT NULL AND COALESCE(sent_at, queued_at) > trace_gap_cutoff.latest_missing_trace_ref_at ) AS outbound_reply_markup_trace_ref_after_gap_latest_sent_at, COALESCE(( SELECT jsonb_agg( jsonb_build_object( 'prefix', prefix, 'total', total, 'recent_24h_total', recent_24h_total, 'first_sent_at', first_sent_at, 'last_sent_at', last_sent_at ) ORDER BY total DESC, prefix ASC ) FROM ( SELECT COALESCE( NULLIF( source_envelope #>> '{reply_markup,buttons,0,callback_prefix}', '' ), 'unknown' ) AS prefix, COUNT(*) AS total, COUNT(*) FILTER ( WHERE COALESCE(sent_at, queued_at) >= NOW() - INTERVAL '24 hours' ) AS recent_24h_total, MIN(COALESCE(sent_at, queued_at)) AS first_sent_at, MAX(COALESCE(sent_at, queued_at)) AS last_sent_at FROM outbound WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND COALESCE( source_envelope #> '{source_refs,incident_ids}', '[]'::jsonb ) = '[]'::jsonb GROUP BY 1 ORDER BY total DESC, prefix ASC LIMIT 5 ) missing_prefixes ), '[]'::jsonb) AS outbound_reply_markup_missing_incident_ref_top_prefixes, COALESCE(( SELECT jsonb_agg( jsonb_build_object( 'prefix', prefix, 'total', total, 'recent_24h_total', recent_24h_total, 'first_sent_at', first_sent_at, 'last_sent_at', last_sent_at ) ORDER BY total DESC, prefix ASC ) FROM ( SELECT COALESCE( NULLIF( source_envelope #>> '{reply_markup,buttons,0,callback_prefix}', '' ), 'unknown' ) AS prefix, COUNT(*) AS total, COUNT(*) FILTER ( WHERE COALESCE(sent_at, queued_at) >= NOW() - INTERVAL '24 hours' ) AS recent_24h_total, MIN(COALESCE(sent_at, queued_at)) AS first_sent_at, MAX(COALESCE(sent_at, queued_at)) AS last_sent_at FROM outbound WHERE source_envelope #>> '{reply_markup,present}' = 'true' AND NOT has_trace_ref GROUP BY 1 ORDER BY total DESC, prefix ASC LIMIT 5 ) missing_trace_prefixes ), '[]'::jsonb) AS outbound_reply_markup_missing_trace_ref_top_prefixes, COUNT(*) FILTER ( WHERE send_status = 'failed' ) AS outbound_failed_total, COUNT(*) FILTER ( WHERE source_envelope ? 'callback_reply' ) AS callback_total, COUNT(*) FILTER ( WHERE source_envelope #>> '{callback_reply,status}' = 'callback_reply_sent' ) AS callback_sent_total, COUNT(*) FILTER ( WHERE source_envelope #>> '{callback_reply,status}' = 'callback_reply_fallback_sent' ) AS callback_fallback_total, COUNT(*) FILTER ( WHERE source_envelope #>> '{callback_reply,status}' = 'callback_reply_rescue_sent' ) AS callback_rescue_total, COUNT(*) FILTER ( WHERE source_envelope #>> '{callback_reply,status}' = 'callback_reply_failed' ) AS callback_failed_total, COUNT(*) FILTER ( WHERE LOWER(source_envelope #>> '{callback_reply,action}') = 'detail' ) AS callback_detail_total, COUNT(*) FILTER ( WHERE LOWER(source_envelope #>> '{callback_reply,action}') = 'history' ) AS callback_history_total, COUNT(*) FILTER ( WHERE source_envelope ? 'callback_reply' AND source_envelope ? 'awooop_status_chain' AND source_envelope ? 'km_stale_completion_summary' ) AS callback_snapshot_captured_total, COUNT(*) FILTER ( WHERE source_envelope ? 'callback_reply' AND ( source_envelope ? 'awooop_status_chain' OR source_envelope ? 'km_stale_completion_summary' ) AND NOT ( source_envelope ? 'awooop_status_chain' AND source_envelope ? 'km_stale_completion_summary' ) ) AS callback_snapshot_partial_total, COUNT(*) FILTER ( WHERE source_envelope ? 'callback_reply' AND NOT ( source_envelope ? 'awooop_status_chain' OR source_envelope ? 'km_stale_completion_summary' ) ) AS callback_snapshot_missing_total, COUNT(DISTINCT source_envelope #>> '{callback_reply,incident_id}') FILTER ( WHERE source_envelope ? 'callback_reply' AND COALESCE( source_envelope #>> '{callback_reply,incident_id}', '' ) <> '' ) AS callback_incident_total, MAX(COALESCE(sent_at, queued_at)) AS latest_outbound_at, MAX(COALESCE(sent_at, queued_at)) FILTER ( WHERE source_envelope ? 'callback_reply' ) AS latest_callback_at FROM outbound CROSS JOIN trace_gap_cutoff """), {"project_id": project_id}, ) return _callback_reply_audit_summary_from_row( result.mappings().one(), project_id=project_id, ) def _callback_reply_audit_summary_from_row( row: Mapping[str, Any], *, project_id: str, ) -> dict[str, Any]: """Convert aggregate SQL row into the public callback evidence audit summary.""" outbound_total = _safe_int(row.get("outbound_total")) callback_total = _safe_int(row.get("callback_total")) captured = _safe_int(row.get("callback_snapshot_captured_total")) partial = _safe_int(row.get("callback_snapshot_partial_total")) missing = _safe_int(row.get("callback_snapshot_missing_total")) outbound_incident_refs = _safe_int(row.get("outbound_incident_ref_total")) top_missing_prefixes = _reply_markup_gap_prefixes_from_value( row.get("outbound_reply_markup_missing_incident_ref_top_prefixes") ) top_missing_trace_prefixes = _reply_markup_gap_prefixes_from_value( row.get("outbound_reply_markup_missing_trace_ref_top_prefixes") ) missing_trace_total = _safe_int( row.get("outbound_reply_markup_missing_trace_ref_total") ) missing_trace_recent_1h = _safe_int( row.get("outbound_reply_markup_missing_trace_ref_recent_1h_total") ) missing_trace_recent_24h = _safe_int( row.get("outbound_reply_markup_missing_trace_ref_recent_24h_total") ) trace_gap_status, trace_gap_next_action = _trace_ref_gap_decision( total=missing_trace_total, recent_1h=missing_trace_recent_1h, recent_24h=missing_trace_recent_24h, ) trace_ref_after_gap_total = _safe_int( row.get("outbound_reply_markup_trace_ref_after_gap_total") ) trace_gap_recovery_status = _trace_ref_gap_recovery_status( missing_total=missing_trace_total, after_gap_total=trace_ref_after_gap_total, ) if callback_total <= 0: snapshot_status = "no_callback" next_action = "press_telegram_detail_or_history" elif captured > 0 and (missing > 0 or partial > 0): snapshot_status = "partial" next_action = "review_legacy_callback_snapshot_gap" elif partial > 0: snapshot_status = "partial" next_action = "press_telegram_detail_or_history_after_rollout" elif missing > 0: snapshot_status = "not_captured" next_action = "press_telegram_detail_or_history_after_rollout" elif outbound_total > 0 and outbound_incident_refs == 0: snapshot_status = "captured" next_action = "review_outbound_source_refs" else: snapshot_status = "captured" next_action = "none" return { "schema_version": _CALLBACK_REPLY_AUDIT_SUMMARY_SCHEMA_VERSION, "project_id": project_id, "outbound_total": outbound_total, "outbound_source_envelope_total": _safe_int( row.get("outbound_source_envelope_total") ), "outbound_source_refs_total": _safe_int( row.get("outbound_source_refs_total") ), "outbound_trace_ref_total": _safe_int(row.get("outbound_trace_ref_total")), "outbound_incident_ref_total": outbound_incident_refs, "outbound_reply_markup_total": _safe_int( row.get("outbound_reply_markup_total") ), "outbound_reply_markup_missing_incident_ref_total": _safe_int( row.get("outbound_reply_markup_missing_incident_ref_total") ), "outbound_reply_markup_missing_incident_ref_recent_1h_total": _safe_int( row.get("outbound_reply_markup_missing_incident_ref_recent_1h_total") ), "outbound_reply_markup_missing_incident_ref_recent_24h_total": _safe_int( row.get("outbound_reply_markup_missing_incident_ref_recent_24h_total") ), "outbound_reply_markup_missing_incident_ref_latest_sent_at": row.get( "outbound_reply_markup_missing_incident_ref_latest_sent_at" ), "outbound_reply_markup_missing_trace_ref_total": missing_trace_total, "outbound_reply_markup_missing_trace_ref_recent_1h_total": ( missing_trace_recent_1h ), "outbound_reply_markup_missing_trace_ref_recent_24h_total": ( missing_trace_recent_24h ), "outbound_reply_markup_missing_trace_ref_latest_sent_at": row.get( "outbound_reply_markup_missing_trace_ref_latest_sent_at" ), "outbound_reply_markup_trace_ref_gap_status": trace_gap_status, "outbound_reply_markup_trace_ref_gap_next_action": trace_gap_next_action, "outbound_reply_markup_trace_ref_after_gap_total": ( trace_ref_after_gap_total ), "outbound_reply_markup_trace_ref_after_gap_first_sent_at": row.get( "outbound_reply_markup_trace_ref_after_gap_first_sent_at" ), "outbound_reply_markup_trace_ref_after_gap_latest_sent_at": row.get( "outbound_reply_markup_trace_ref_after_gap_latest_sent_at" ), "outbound_reply_markup_trace_ref_gap_recovery_status": ( trace_gap_recovery_status ), "outbound_reply_markup_missing_incident_ref_top_prefixes": ( top_missing_prefixes ), "outbound_reply_markup_missing_trace_ref_top_prefixes": ( top_missing_trace_prefixes ), "outbound_failed_total": _safe_int(row.get("outbound_failed_total")), "callback_total": callback_total, "callback_sent_total": _safe_int(row.get("callback_sent_total")), "callback_fallback_total": _safe_int(row.get("callback_fallback_total")), "callback_rescue_total": _safe_int(row.get("callback_rescue_total")), "callback_failed_total": _safe_int(row.get("callback_failed_total")), "callback_detail_total": _safe_int(row.get("callback_detail_total")), "callback_history_total": _safe_int(row.get("callback_history_total")), "callback_snapshot_captured_total": captured, "callback_snapshot_partial_total": partial, "callback_snapshot_missing_total": missing, "callback_incident_total": _safe_int(row.get("callback_incident_total")), "snapshot_status": snapshot_status, "next_action": next_action, "latest_outbound_at": row.get("latest_outbound_at"), "latest_callback_at": row.get("latest_callback_at"), } def _trace_ref_gap_decision( *, total: int, recent_1h: int, recent_24h: int, ) -> tuple[str, str]: """Classify reply_markup messages without any source_refs into operator actions.""" if total <= 0: return "clean", "none" if recent_1h > 0: return "active_gap", "inspect_recent_outbound_source_refs" if recent_24h > 0: return "recent_backlog", "watch_24h_decay" return "legacy_backlog", "backfill_or_archive_legacy_callbacks" def _trace_ref_gap_recovery_status( *, missing_total: int, after_gap_total: int, ) -> str: """Describe whether traced reply_markup messages resumed after the last gap.""" if missing_total <= 0: return "not_needed" if after_gap_total > 0: return "recovered_after_gap" return "no_recovery_signal" def _reply_markup_gap_prefixes_from_value(value: Any) -> list[dict[str, Any]]: if not isinstance(value, list): return [] prefixes: list[dict[str, Any]] = [] for item in value: if not isinstance(item, Mapping): continue prefix = str(item.get("prefix") or "unknown").strip() or "unknown" prefixes.append({ "prefix": prefix[:80], "total": _safe_int(item.get("total")), "recent_24h_total": _safe_int(item.get("recent_24h_total")), "first_sent_at": item.get("first_sent_at"), "last_sent_at": item.get("last_sent_at"), }) if len(prefixes) >= 5: break return prefixes async def _fetch_km_stale_completion_summary_for_incident( *, project_id: str, incident_id: str | None, queue_cache: dict[str, Any] | None = None, ) -> dict[str, Any]: """Fetch read-only KM owner-review completion context for callback evidence.""" normalized_project_id = project_id or "awoooi" normalized_incident_id = str(incident_id or "").strip() or None if not normalized_incident_id: return _empty_km_stale_completion_summary( project_id=normalized_project_id, incident_id=None, status_value="no_incident", reason="callback_reply_missing_incident_id", ) cache = queue_cache if queue_cache is not None else {} queue = cache.get(normalized_project_id) if queue is None: try: queue = await query_km_stale_owner_review_completion_queue( project_id=normalized_project_id, status_bucket="all", limit=100, ) except Exception as exc: logger.warning( "operator_km_stale_completion_summary_fetch_failed", project_id=normalized_project_id, incident_id=normalized_incident_id, error=str(exc), ) return _empty_km_stale_completion_summary( project_id=normalized_project_id, incident_id=normalized_incident_id, status_value="fetch_failed", reason="km_stale_completion_queue_fetch_failed", ) cache[normalized_project_id] = queue return _build_km_stale_completion_summary( queue=queue, project_id=normalized_project_id, incident_id=normalized_incident_id, ) async def list_cicd_events( *, project_id: str | None, stage: str | None, status_filter: str | None, limit: int, ) -> dict[str, Any]: """列出 CI/CD notification evidence,來源是 alert_operation_log。""" safe_limit = max(1, min(limit, 50)) normalized_stage = _validate_cicd_stage_filter(stage) normalized_status = _validate_cicd_status_filter(status_filter) # alert_operation_log 目前是 legacy/global evidence table,CI/CD notification # 只屬於 AWOOOI production;非 awoooi project filter 回空集合,避免誤導多租戶 UI。 if project_id and project_id != "awoooi": return {"items": [], "total": 0, "limit": safe_limit} where_clauses = [ "event_type = 'ALERT_RECEIVED'", "actor = 'alertmanager'", """ COALESCE( context #>> '{labels,alertname}', context ->> 'alertname', '' ) LIKE 'CI_%' """, ] params: dict[str, Any] = {"limit": safe_limit} if normalized_stage: where_clauses.append( "LOWER(COALESCE(context #>> '{labels,stage}', '')) = :stage" ) params["stage"] = normalized_stage if normalized_status: where_clauses.append( "LOWER(COALESCE(context #>> '{labels,status}', '')) = :status" ) params["status"] = normalized_status where_sql = " AND ".join(where_clauses) sql = text(f""" SELECT id, action_detail, success, created_at, context, COALESCE( context #>> '{{labels,alertname}}', context ->> 'alertname', '' ) AS alertname, context #>> '{{labels,stage}}' AS stage, context #>> '{{labels,status}}' AS status, context #>> '{{labels,severity}}' AS severity, context #>> '{{labels,commit}}' AS commit_sha, context #>> '{{labels,triggered_by}}' AS triggered_by, context #>> '{{labels,duration_seconds}}' AS duration_seconds, context #>> '{{annotations,summary}}' AS summary, context #>> '{{annotations,description}}' AS description, context #>> '{{annotations,workflow_url}}' AS workflow_url, context ->> 'alert_id' AS alert_id, context ->> 'source' AS source FROM alert_operation_log WHERE {where_sql} ORDER BY created_at DESC, id DESC LIMIT :limit """) async with get_db_context("awoooi") as db: result = await db.execute(sql, params) rows = list(result.mappings().all()) items = [_cicd_event_item_from_row(row, project_id=project_id or "awoooi") for row in rows] return {"items": items, "total": len(items), "limit": safe_limit} async def get_ai_route_status( workload_type: str | None = None, ) -> dict[str, Any]: """回傳目前 AI/Ollama provider routing 的只讀狀態,供 Operator Console 顯示。""" workload = _validate_ai_route_workload(workload_type) policy_order = _ai_route_policy_order(workload) checked_at = _utc_now_naive() try: route = await asyncio.wait_for( get_ollama_failover_manager().select_provider(task_type=workload), timeout=_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS, ) except TimeoutError: logger.warning( "ai_route_status_check_timeout", workload_type=workload, timeout_seconds=_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS, ) return await _ai_route_lightweight_status_from_policy( workload=workload, policy_order=policy_order, checked_at=checked_at, route_reason="route_check_timeout", route_error=( f"route status timed out after " f"{_AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS:g}s" ), ) except Exception as exc: return await _ai_route_lightweight_status_from_policy( workload=workload, policy_order=policy_order, checked_at=checked_at, route_reason="route_check_failed", route_error=str(exc), ) health = _ai_route_health_map(route) response = { "schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION, "workload_type": workload, "policy_order": policy_order, "selected_provider": route.primary.provider_name, "selected_url": route.primary.url or None, "selected_model": route.primary.model, "fallback_chain": [ _ai_route_runtime_endpoint_item(endpoint, priority=index + 2) for index, endpoint in enumerate(route.fallback_chain) ], "route_reason": route.routing_reason, "route_source": "ollama_failover_manager", "route_error": None, "health": health, "checked_at": checked_at, } response.update(_ai_route_lane_state( policy_order=policy_order, selected_provider=route.primary.provider_name, health=health, )) return await _ai_route_response_with_repair_evidence(response) def _validate_ai_route_workload(workload_type: str | None) -> OllamaWorkloadType: """Normalize and validate workload filter for the public route status endpoint.""" workload = str(workload_type or "deep_rca").strip() or "deep_rca" if workload not in _AI_ROUTE_WORKLOADS: allowed = ", ".join(sorted(_AI_ROUTE_WORKLOADS)) raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=f"Unsupported workload_type: {workload}. Allowed: {allowed}", ) return workload # type: ignore[return-value] def _ai_route_policy_order(workload: OllamaWorkloadType) -> list[dict[str, Any]]: """Expose configured policy order: GCP-A -> GCP-B -> 111 -> Gemini.""" items = [ _ai_route_policy_endpoint_item(endpoint, priority=index + 1) for index, endpoint in enumerate(resolve_ollama_order(workload)) ] items.append({ "priority": len(items) + 1, "provider_name": "gemini", "url": None, "workload_type": workload, "reason": "final_cloud_fallback_after_all_ollama_endpoints", "role": "final_fallback", "runtime": "cloud", }) return items async def _ai_route_lightweight_status_from_policy( *, workload: OllamaWorkloadType, policy_order: list[dict[str, Any]], checked_at: datetime, route_reason: str, route_error: str, ) -> dict[str, Any]: """Fallback read model for route status; never changes the execution router.""" endpoints = list(resolve_ollama_order(workload)) try: reports = await asyncio.gather( *[_ai_route_probe_connectivity(endpoint) for endpoint in endpoints], ) except Exception as exc: logger.warning( "ai_route_status_lightweight_probe_failed", workload_type=workload, route_reason=route_reason, error=str(exc), ) response = _ai_route_unavailable_status( workload=workload, policy_order=policy_order, checked_at=checked_at, route_reason=route_reason, route_error=route_error, route_source="ollama_failover_manager", ) return await _ai_route_response_with_repair_evidence(response) health_by_provider = { endpoint.provider_name: _ai_route_health_item(report) for endpoint, report in zip(endpoints, reports, strict=False) } selected_index = next( ( index for index, report in enumerate(reports) if report.status != HealthStatus.OFFLINE ), None, ) if selected_index is None: response = { "schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION, "workload_type": workload, "policy_order": policy_order, "selected_provider": "gemini", "selected_url": None, "selected_model": None, "fallback_chain": [], "route_reason": ( f"{route_reason}; lightweight connectivity found all Ollama " "endpoints offline; final fallback policy is Gemini" ), "route_source": "lightweight_connectivity_fallback", "route_error": None, "health": health_by_provider, "checked_at": checked_at, } response.update(_ai_route_lane_state( policy_order=policy_order, selected_provider="gemini", health=health_by_provider, )) return await _ai_route_response_with_repair_evidence(response) selected = endpoints[selected_index] model = get_settings().OLLAMA_HEALTH_CHECK_MODEL fallback_chain = [ _ai_route_runtime_policy_endpoint_item( endpoint, priority=index + 1, model=model, ) for index, endpoint in enumerate(endpoints[selected_index + 1 :], start=selected_index + 1) ] fallback_chain.append({ "priority": len(endpoints) + 1, "provider_name": "gemini", "url": None, "model": None, "runtime": "cloud", }) response = { "schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION, "workload_type": workload, "policy_order": policy_order, "selected_provider": selected.provider_name, "selected_url": selected.url, "selected_model": model, "fallback_chain": fallback_chain, "route_reason": ( f"{route_reason}; lightweight connectivity selected " f"{selected.provider_name}" ), "route_source": "lightweight_connectivity_fallback", "route_error": None, "health": health_by_provider, "checked_at": checked_at, } response.update(_ai_route_lane_state( policy_order=policy_order, selected_provider=selected.provider_name, health=health_by_provider, )) return await _ai_route_response_with_repair_evidence(response) async def _ai_route_probe_connectivity( endpoint: OllamaEndpointSelection, ) -> HealthReport: """Cheap read-only /api/tags probe for Operator Console status fallback.""" if not endpoint.url: return HealthReport( status=HealthStatus.OFFLINE, host=endpoint.url, reason="no_ollama_endpoint_url", ) start = time.perf_counter() try: async with httpx.AsyncClient( timeout=httpx.Timeout(_AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS), ) as client: response = await client.get(f"{endpoint.url.rstrip('/')}/api/tags") latency_ms = (time.perf_counter() - start) * 1000 if response.status_code == 200: return HealthReport( status=HealthStatus.HEALTHY, host=endpoint.url, latency_ms=latency_ms, reason="status_only_connectivity_ok", ) return HealthReport( status=HealthStatus.OFFLINE, host=endpoint.url, latency_ms=latency_ms, reason=f"status_only_connectivity_http_{response.status_code}", ) except Exception as exc: latency_ms = (time.perf_counter() - start) * 1000 return HealthReport( status=HealthStatus.OFFLINE, host=endpoint.url, latency_ms=latency_ms, reason=f"status_only_connectivity_error:{type(exc).__name__}", ) def _ai_route_runtime_policy_endpoint_item( endpoint: OllamaEndpointSelection, *, priority: int, model: str, ) -> dict[str, Any]: return { "priority": priority, "provider_name": endpoint.provider_name, "url": endpoint.url or None, "model": model, "runtime": "ollama", } def _ai_route_unavailable_status( *, workload: OllamaWorkloadType, policy_order: list[dict[str, Any]], checked_at: datetime, route_reason: str, route_error: str, route_source: str, ) -> dict[str, Any]: response = { "schema_version": _AI_ROUTE_STATUS_SCHEMA_VERSION, "workload_type": workload, "policy_order": policy_order, "selected_provider": None, "selected_url": None, "selected_model": None, "fallback_chain": [], "route_reason": route_reason, "route_source": route_source, "route_error": route_error, "health": {}, "checked_at": checked_at, } response.update(_ai_route_lane_state( policy_order=policy_order, selected_provider=None, health={}, )) return response async def _ai_route_response_with_repair_evidence( response: dict[str, Any], ) -> dict[str, Any]: """Attach latest read-only repair dossier evidence when a lane is degraded.""" response["repair_evidence"] = None if response.get("lane_mode") not in { "degraded_failover", "cloud_fallback", "unavailable", }: return response target_provider = _ai_route_repair_evidence_target(response) response["repair_evidence"] = await _latest_ai_route_repair_evidence( target_provider=target_provider, ) return response def _ai_route_repair_evidence_target(response: Mapping[str, Any]) -> str | None: skipped_lanes = response.get("skipped_lanes") if isinstance(skipped_lanes, list): for lane in skipped_lanes: if not isinstance(lane, dict): continue provider_name = str(lane.get("provider_name") or "").strip() if provider_name and lane.get("action_required") is True: return provider_name for lane in skipped_lanes: if isinstance(lane, dict): provider_name = str(lane.get("provider_name") or "").strip() if provider_name: return provider_name policy_order = response.get("policy_order") if isinstance(policy_order, list): for item in policy_order: if not isinstance(item, dict): continue if item.get("runtime") == "ollama": provider_name = str(item.get("provider_name") or "").strip() if provider_name: return provider_name return None async def _latest_ai_route_repair_evidence( *, project_id: str = "awoooi", target_provider: str | None = None, ) -> dict[str, Any] | None: """Fetch the newest AI route repair diagnosis stored in AwoooP event DB.""" params: dict[str, Any] = { "project_id": project_id, "provider": _AI_ROUTE_REPAIR_EVIDENCE_PROVIDER, "stage": _AI_ROUTE_REPAIR_EVIDENCE_STAGE, } target_clause = "" if target_provider: target_clause = """ AND COALESCE( NULLIF(source_envelope #>> '{log_correlation,target_resource}', ''), NULLIF(source_envelope #>> '{extra,payload,target_resource}', '') ) = :target_provider """ params["target_provider"] = target_provider try: item = await _fetch_latest_ai_route_repair_evidence( params=params, target_clause=target_clause, ) if item is None and target_provider: params.pop("target_provider", None) item = await _fetch_latest_ai_route_repair_evidence( params=params, target_clause="", ) return item except Exception as exc: logger.warning( "ai_route_repair_evidence_fetch_failed", project_id=project_id, target_provider=target_provider, error=str(exc), ) return None async def _fetch_latest_ai_route_repair_evidence( *, params: dict[str, Any], target_clause: str, ) -> dict[str, Any] | None: sql = text(f""" SELECT event_id, run_id, provider_event_id, source_envelope, provider_ts, received_at FROM awooop_conversation_event WHERE project_id = :project_id AND LOWER(COALESCE( NULLIF(source_envelope->>'provider', ''), NULLIF(split_part(provider_event_id, ':', 1), ''), channel_type )) = :provider AND LOWER(COALESCE(NULLIF(source_envelope->>'stage', ''), 'received')) = :stage {target_clause} ORDER BY received_at DESC, event_id DESC LIMIT 1 """) async with get_db_context("awoooi") as db: result = await db.execute(sql, params) row = result.mappings().first() return _ai_route_repair_evidence_item(row) if row else None def _ai_route_repair_evidence_item( row: Mapping[str, Any], ) -> dict[str, Any]: """Project route-repair source envelopes into a compact operator-safe view.""" envelope = _as_dict(row.get("source_envelope")) extra = _as_dict(envelope.get("extra")) payload = _as_dict(extra.get("payload")) log_correlation = _as_dict(envelope.get("log_correlation")) live_probe = _as_dict(payload.get("live_probe")) observed_state = _as_dict(payload.get("observed_state")) side_effects = _ai_route_repair_side_effects(payload.get("side_effects")) evidence = { "schema_version": ( payload.get("schema_version") or envelope.get("schema_version") or "ai_route_repair_evidence_projection_v1" ), "provider": ( envelope.get("provider") or str(row.get("provider_event_id") or "").split(":", 1)[0] ), "stage": envelope.get("stage") or _AI_ROUTE_REPAIR_EVIDENCE_STAGE, "provider_event_id": row.get("provider_event_id"), "conversation_event_id": _string_or_none(row.get("event_id")), "run_id": _string_or_none(row.get("run_id")), "alertname": log_correlation.get("alertname"), "severity": log_correlation.get("severity"), "fingerprint": log_correlation.get("fingerprint"), "target_resource": ( log_correlation.get("target_resource") or payload.get("target_resource") or observed_state.get("target_resource") ), "observed_state": observed_state, "live_probe": live_probe, "access_blockers": _as_string_list(payload.get("access_blockers")), "side_effects": side_effects, "source_ref_count": _source_ref_count(envelope), "provider_ts": row.get("provider_ts"), "received_at": row.get("received_at"), } evidence["work_item"] = _ai_route_repair_work_item(evidence) evidence["playbook_recommendation"] = _ai_route_repair_playbook_recommendation( evidence ) evidence["owner_action"] = _ai_route_repair_owner_action(evidence) return evidence def _ai_route_repair_side_effects(value: Any) -> dict[str, bool | None]: raw = _as_dict(value) return { "incident_created": _bool_or_none(raw.get("incident_created")), "telegram_sent": _bool_or_none(raw.get("telegram_sent")), "approval_created": _bool_or_none(raw.get("approval_created")), "runtime_route_changed": _bool_or_none(raw.get("runtime_route_changed")), } def _as_string_list(value: Any) -> list[str]: if isinstance(value, list): return [str(item) for item in value if str(item or "").strip()] if value not in (None, ""): return [str(value)] return [] def _string_or_none(value: Any) -> str | None: if value in (None, ""): return None return str(value) def _bool_or_none(value: Any) -> bool | None: return value if isinstance(value, bool) else None def _source_ref_count(envelope: Any) -> int: source_refs = _as_dict(_as_dict(envelope).get("source_refs")) total = 0 for value in source_refs.values(): if isinstance(value, list): total += len([item for item in value if str(item or "").strip()]) elif value not in (None, ""): total += 1 return total def _ai_route_repair_work_item(evidence: Mapping[str, Any]) -> dict[str, Any]: target = str(evidence.get("target_resource") or "unknown").strip() blockers = _as_string_list(evidence.get("access_blockers")) open_item = bool(blockers) work_item_id = f"ai-route-repair:{target or 'unknown'}" return { "schema_version": "awooop_ai_route_repair_work_item_v1", "work_item_id": work_item_id, "status": "open" if open_item else "watching", "kind": "ai_route_primary_lane_repair", "next_step": ( "restore_primary_ollama_lane_access" if open_item else "continue_route_monitoring" ), "reason": "primary_lane_unavailable" if open_item else "primary_lane_observed", "needs_human": open_item, "owner": "cloud_sre_operator", "target_resource": target or None, "target_href": "/awooop/runs", "decision_effect": "none", "safety_level": "read_only_work_item_projection", "writes_incident_state": False, "writes_auto_repair_result": False, "writes_runtime_route": False, } def _ai_route_repair_playbook_recommendation( evidence: Mapping[str, Any], ) -> dict[str, Any]: blockers = set(_as_string_list(evidence.get("access_blockers"))) live_probe = _as_dict(evidence.get("live_probe")) steps: list[dict[str, Any]] = [] if any(blocker.startswith("gcloud_") for blocker in blockers): steps.append({ "step": "verify_cloud_control_plane_access", "scope": "gcp_compute_read", "mode": "manual_or_approved", }) if "gcp_a_ssh_refused" in blockers or ( live_probe.get("gcp_a_direct_22") == "connection_refused" ): steps.append({ "step": "restore_gcp_a_os_access", "scope": "gcp_serial_console_or_os_login", "mode": "manual_or_approved", }) if "gcp_a_ollama_11434_refused" in blockers or ( live_probe.get("gcp_a_direct_11434") == "connection_refused" ): steps.append({ "step": "restore_ollama_service_on_gcp_a", "scope": "systemd_ollama", "mode": "manual_or_approved", }) if live_probe.get("proxy_110_11435") == "http_502": steps.append({ "step": "verify_110_proxy_after_gcp_a_recovery", "scope": "nginx_proxy_readback", "mode": "read_only_verification", }) steps.append({ "step": "verify_ai_route_status_returns_primary", "scope": "awooop_ai_route_status", "mode": "read_only_verification", }) return { "schema_version": "awooop_ai_route_playbook_recommendation_v1", "playbook_id": "ai_route_primary_lane_recovery", "status": "candidate_from_live_evidence", "safe_to_auto_execute": False, "requires_approval": True, "decision_effect": "none", "steps": steps, } def _ai_route_repair_owner_action(evidence: Mapping[str, Any]) -> dict[str, Any]: work_item = _as_dict(evidence.get("work_item")) playbook = _as_dict(evidence.get("playbook_recommendation")) return { "schema_version": "awooop_ai_route_owner_action_v1", "lead_agent": "Hermes", "supporting_agents": ["OpenClaw", "ElephantAlpha"], "human_owner": "Cloud/SRE owner", "automation_state": "blocked_by_external_cloud_or_os_access", "next_step": work_item.get("next_step") or "continue_route_monitoring", "playbook_id": playbook.get("playbook_id"), "safe_to_auto_repair": False, "blocking_reason": work_item.get("reason") or "unknown", } def _ai_route_lane_state( *, policy_order: list[dict[str, Any]], selected_provider: str | None, health: dict[str, dict[str, Any]], ) -> dict[str, Any]: """Expose failover lane state separately from policy labels.""" selected_index = next( ( index for index, item in enumerate(policy_order) if item.get("provider_name") == selected_provider ), None, ) active_item = ( policy_order[selected_index] if selected_index is not None else None ) skipped_items = policy_order[:selected_index] if selected_index is not None else [] skipped_lanes = [ _ai_route_lane_item(item, health.get(str(item.get("provider_name")))) for item in skipped_items if item.get("runtime") == "ollama" ] if not selected_provider or active_item is None: lane_mode = "unavailable" operator_action = { "human_required": True, "action": "inspect_ai_router", "reason": "no_active_provider", } elif active_item.get("runtime") == "cloud": lane_mode = "cloud_fallback" operator_action = { "human_required": True, "action": "restore_ollama_lanes", "reason": "all_ollama_lanes_unavailable", } elif skipped_lanes: lane_mode = "degraded_failover" operator_action = { "human_required": True, "action": "repair_skipped_primary_lane", "reason": "fallback_lane_active", } else: lane_mode = "primary" operator_action = { "human_required": False, "action": "monitor", "reason": "primary_lane_active", } return { "lane_mode": lane_mode, "active_lane": ( _ai_route_lane_item(active_item, health.get(str(active_item.get("provider_name")))) if active_item else None ), "skipped_lanes": skipped_lanes, "operator_action": operator_action, } def _ai_route_lane_item( item: dict[str, Any], health_item: dict[str, Any] | None, ) -> dict[str, Any]: return { "priority": item.get("priority"), "provider_name": item.get("provider_name"), "role": item.get("role"), "runtime": item.get("runtime"), "url": item.get("url"), "health_status": (health_item or {}).get("status", "not_checked"), "reason": (health_item or {}).get("reason") or item.get("reason"), "action_required": (health_item or {}).get("status") not in { "healthy", "not_checked", None, }, } def _ai_route_policy_endpoint_item( endpoint: OllamaEndpointSelection, *, priority: int, ) -> dict[str, Any]: role = { "ollama_gcp_a": "primary", "ollama_gcp_b": "secondary", "ollama_local": "local_fallback", }.get(endpoint.provider_name, "ollama") return { "priority": priority, "provider_name": endpoint.provider_name, "url": endpoint.url, "workload_type": endpoint.workload_type, "reason": endpoint.reason, "role": role, "runtime": "ollama", } def _ai_route_runtime_endpoint_item( endpoint: OllamaEndpoint, *, priority: int, ) -> dict[str, Any]: return { "priority": priority, "provider_name": endpoint.provider_name, "url": endpoint.url or None, "model": endpoint.model, "runtime": "ollama" if endpoint.provider_name.startswith("ollama") else "cloud", } def _ai_route_health_map(route: OllamaRoutingResult) -> dict[str, dict[str, Any]]: """Convert failover health reports into provider keyed status for the UI.""" health: dict[str, dict[str, Any]] = { "ollama_gcp_a": _ai_route_health_item(route.health_gcp_a), } if route.health_gcp_b: health["ollama_gcp_b"] = _ai_route_health_item(route.health_gcp_b) else: health["ollama_gcp_b"] = _ai_route_not_checked_health_item() if route.health_local: health["ollama_local"] = _ai_route_health_item(route.health_local) else: health["ollama_local"] = _ai_route_not_checked_health_item() return health def _ai_route_health_item(report: HealthReport) -> dict[str, Any]: payload = report.to_dict() payload["checked"] = True return payload def _ai_route_not_checked_health_item() -> dict[str, Any]: return { "status": "not_checked", "host": "", "latency_ms": None, "reason": "standby_not_checked_primary_healthy", "checked_at": None, "from_cache": False, "checked": False, } def _timeline_item( *, ts: Any, kind: str, title: str, status: str, summary: str | None = None, metadata: dict[str, Any] | None = None, ) -> dict[str, Any]: """Build one Operator Console timeline item.""" return { "ts": ts, "kind": kind, "title": title, "status": status, "summary": summary, "metadata": metadata or {}, } def _utc_now_naive() -> datetime: """回傳與 AwoooP timestamp-without-timezone 欄位相容的 UTC 時間。""" return datetime.now(UTC).replace(tzinfo=None) def _truncate_step_summary(value: str | None) -> str | None: """壓縮 Step summary,避免超過 DB 欄位與前端 timeline 需要的短摘要。""" if not value: return None compact = " ".join(str(value).split()) if len(compact) <= _MAX_STEP_SUMMARY_CHARS: return compact return f"{compact[: _MAX_STEP_SUMMARY_CHARS - 1]}…" def _approval_step_title(tool_name: str, step_seq: int) -> str: """將 operator_console.* step 轉成人能一眼理解的 timeline 標題。""" if tool_name == "operator_console.approve": return f"人工審批 {step_seq}: 核准" if tool_name == "operator_console.reject": return f"人工審批 {step_seq}: 拒絕" return f"Step {step_seq}: {tool_name}" def _outbound_timeline_title( channel_type: str, message_type: str, content_preview: str | None, callback_reply: dict[str, Any] | None = None, ) -> str: """將 legacy Telegram outbound 分類成 Operator 看得懂的語義標題。""" channel = channel_type.upper() preview = content_preview or "" if callback_reply: action = str(callback_reply.get("action") or "").strip() status = str(callback_reply.get("status") or "").strip() action_label = { "detail": "詳情", "history": "歷史", }.get(action, action or "Callback") status_label = { "callback_reply_sent": "已送出", "callback_reply_fallback_sent": " fallback 已送出", "callback_reply_rescue_sent": "救援已送出", "callback_reply_failed": "送出失敗", }.get(status, status or "已記錄") return f"{channel}:{action_label}回覆{status_label}" if "RUNBOOK REVIEW" in preview: return f"{channel}:Runbook 待人工審核" if "[AWOOOI CI/CD]" in preview or "AWOOOI CI/CD" in preview: return f"{channel}:CI/CD 狀態通知" if "AI 治理警報" in preview: return f"{channel}:AI 治理警報" if "HANDOFF REQUIRED" in preview or "AI 自動修復失敗" in preview: return f"{channel}:AI 自動修復失敗,已轉人工" if "AUTO RESOLVED" in preview or "AI 自動修復完成" in preview: return f"{channel}:AI 自動修復完成" if "ESCALATION" in preview or "事故升級" in preview: return f"{channel}:事故升級通知" if "ACTION REQUIRED" in preview: return f"{channel}:告警審批卡" fallback = { "approval_request": "人工審批請求", "error": "錯誤回覆", "final": "處置結果", "interim": "漸進式狀態回饋", }.get(message_type, message_type) return f"{channel}:{fallback}" def _outbound_callback_reply(source_envelope: Any) -> dict[str, Any] | None: """Extract Telegram callback reply evidence from outbound source envelope.""" if not isinstance(source_envelope, dict): return None callback_reply = source_envelope.get("callback_reply") return callback_reply if isinstance(callback_reply, dict) else None def _callback_reply_public_status(callback_reply: dict[str, Any]) -> str: """Map raw Telegram callback reply result into the Operator Console filter value.""" raw_status = str(callback_reply.get("status") or "") return { "callback_reply_sent": "sent", "callback_reply_fallback_sent": "fallback_sent", "callback_reply_rescue_sent": "rescue_sent", "callback_reply_failed": "failed", }.get(raw_status, "observed") def _callback_reply_evidence_capture_status( *, callback_reply: Mapping[str, Any], persisted_awooop_status_chain: dict[str, Any] | None, persisted_km_stale_completion_summary: dict[str, Any] | None, event_at: Any, ) -> dict[str, Any]: """Explain whether callback-time evidence snapshots were persisted.""" captured: list[str] = [] missing: list[str] = [] if persisted_awooop_status_chain: captured.append("awooop_status_chain") else: missing.append("awooop_status_chain") if persisted_km_stale_completion_summary: captured.append("km_stale_completion_summary") else: missing.append("km_stale_completion_summary") if not missing: status_value = "captured" reason = "ok" next_action = "none" elif captured: status_value = "partial" reason = "partial_snapshot_rollout_transition" next_action = "press_telegram_detail_or_history_after_rollout" else: status_value = "not_captured" raw_status = str(callback_reply.get("status") or "") reason = ( "callback_reply_delivery_failed_snapshot_missing" if raw_status == "callback_reply_failed" else "legacy_callback_before_snapshot_rollout" ) next_action = "press_telegram_detail_or_history_after_rollout" return { "schema_version": _CALLBACK_EVIDENCE_CAPTURE_STATUS_SCHEMA_VERSION, "status": status_value, "reason": reason, "action": str(callback_reply.get("action") or "").strip() or None, "captured": captured, "missing": missing, "snapshot_rollout": "t167_t169", "next_action": next_action, "event_at": event_at, } def _callback_reply_capture_status_from_outbound( row: AwoooPOutboundMessage, callback_reply: Mapping[str, Any], ) -> dict[str, Any]: """Build capture status directly from one outbound source envelope.""" source_envelope = _as_dict(row.source_envelope) return _callback_reply_evidence_capture_status( callback_reply=callback_reply, persisted_awooop_status_chain=( _as_dict(source_envelope.get("awooop_status_chain")) or None ), persisted_km_stale_completion_summary=( _as_dict(source_envelope.get("km_stale_completion_summary")) or None ), event_at=row.sent_at or row.queued_at, ) def _callback_reply_event_item(row: Mapping[str, Any]) -> dict[str, Any]: """Convert one callback reply outbound row into a read-only evidence item.""" callback_reply = _as_dict(row.get("callback_reply")) action = str(callback_reply.get("action") or "").strip() or None incident_id = str(callback_reply.get("incident_id") or "").strip() or None project_id = str(row.get("project_id") or "") run_id = row.get("run_id") status_value = _callback_reply_public_status(callback_reply) event_at = row.get("sent_at") or row.get("queued_at") persisted_awooop_status_chain = _as_dict( row.get("persisted_awooop_status_chain"), ) or None persisted_km_stale_completion_summary = _as_dict( row.get("persisted_km_stale_completion_summary"), ) or None return { "message_id": row.get("message_id"), "run_id": run_id, "project_id": project_id, "status": status_value, "needs_human": status_value == "failed", "action": action, "incident_id": incident_id, "event_at": event_at, "channel_type": row.get("channel_type"), "message_type": row.get("message_type"), "send_status": row.get("send_status"), "send_error": row.get("send_error"), "provider_message_id": row.get("provider_message_id"), "triggered_by_state": row.get("triggered_by_state"), "content_preview": row.get("content_preview"), "run_state": row.get("run_state"), "agent_id": row.get("agent_id"), "run_created_at": row.get("run_created_at"), "callback_reply": callback_reply, "persisted_awooop_status_chain": persisted_awooop_status_chain, "persisted_km_stale_completion_summary": ( persisted_km_stale_completion_summary ), "evidence_capture_status": _callback_reply_evidence_capture_status( callback_reply=callback_reply, persisted_awooop_status_chain=persisted_awooop_status_chain, persisted_km_stale_completion_summary=( persisted_km_stale_completion_summary ), event_at=event_at, ), "run_detail_href": ( f"/awooop/runs/{run_id}?project_id={project_id}" if run_id and project_id else None ), } def _empty_km_stale_completion_summary( *, project_id: str, incident_id: str | None, status_value: str, reason: str | None = None, ) -> dict[str, Any]: """Build the nullable KM owner-review summary shape for callback evidence.""" return { "schema_version": _KM_STALE_COMPLETION_CALLBACK_SCHEMA_VERSION, "project_id": project_id, "incident_id": incident_id, "status": status_value, "missing_reason": reason, "total": 0, "returned": 0, "pending_count": 0, "ready_count": 0, "blocked_count": 0, "completed_count": 0, "failed_count": 0, "writes_on_read": False, "manual_review_required": True, "batch_writes_allowed": False, "items_truncated": False, "related_total": 0, "related_items": [], "work_item": _km_stale_callback_owner_review_work_item( project_id=project_id, incident_id=incident_id, status_value=status_value, reason=reason, ), } def _object_field(payload: Any, name: str, default: Any = None) -> Any: if isinstance(payload, Mapping): return payload.get(name, default) return getattr(payload, name, default) def _object_int_field(payload: Any, name: str) -> int: try: return int(_object_field(payload, name, 0) or 0) except (TypeError, ValueError): return 0 def _build_km_stale_completion_summary( *, queue: Any, project_id: str, incident_id: str, ) -> dict[str, Any]: """Summarize KM owner-review completion queue state for one incident.""" related_items: list[dict[str, Any]] = [] for item in list(_object_field(queue, "items", []) or []): if str(_object_field(item, "related_incident_id") or "").strip() != incident_id: continue related_items.append({ "entry_id": _object_field(item, "entry_id"), "title": _object_field(item, "title"), "dispatch_id": _object_field(item, "dispatch_id"), "governance_event_id": _object_field(item, "governance_event_id"), "readiness": _object_field(item, "readiness"), "workflow_stage": _object_field(item, "workflow_stage"), "next_action": _object_field(item, "next_action"), "priority_tier": _object_field(item, "priority_tier"), "recommended_completion_outcome": _object_field( item, "recommended_completion_outcome", ), "can_preview": bool(_object_field(item, "can_preview", False)), }) total = _object_int_field(queue, "total") returned = _object_int_field(queue, "returned") return { "schema_version": _KM_STALE_COMPLETION_CALLBACK_SCHEMA_VERSION, "project_id": project_id, "incident_id": incident_id, "status": "matched_owner_review" if related_items else "no_related_owner_review", "missing_reason": None if related_items else "no_matching_completion_item", "total": total, "returned": returned, "pending_count": _object_int_field(queue, "pending_count"), "ready_count": _object_int_field(queue, "ready_count"), "blocked_count": _object_int_field(queue, "blocked_count"), "completed_count": _object_int_field(queue, "completed_count"), "failed_count": _object_int_field(queue, "failed_count"), "writes_on_read": bool(_object_field(queue, "writes_on_read", False)), "manual_review_required": bool( _object_field(queue, "manual_review_required", True) ), "batch_writes_allowed": bool( _object_field(queue, "batch_writes_allowed", False) ), "items_truncated": total > returned, "related_total": len(related_items), "related_items": related_items[:3], "work_item": _km_stale_callback_owner_review_work_item( project_id=project_id, incident_id=incident_id, status_value=( "matched_owner_review" if related_items else "no_related_owner_review" ), reason=None if related_items else "no_matching_completion_item", ), } def _km_stale_callback_owner_review_work_item( *, project_id: str, incident_id: str | None, status_value: str, reason: str | None, ) -> dict[str, Any] | None: """Generate a read-only Work Items link for callback evidence gaps.""" if not incident_id or status_value != "no_related_owner_review": return None work_item_id = f"km-callback-owner-review:{project_id}:{incident_id}" target_query = urlencode( { "project_id": project_id, "incident_id": incident_id, "callback_reply_status": "sent", } ) work_item_query = urlencode( { "project_id": project_id, "work_item_id": work_item_id, "incident_id": incident_id, } ) return { "schema_version": "km_stale_callback_owner_review_work_item_v1", "work_item_id": work_item_id, "kind": "km_stale_callback_owner_review", "status": "open", "project_id": project_id, "incident_id": incident_id, "reason": reason or "no_matching_completion_item", "title": "Telegram callback incident has no matching KM owner-review item", "next_step": "review_or_queue_km_owner_review", "target_surface": "awooop_runs_callback_evidence", "target_href": f"/awooop/runs?{target_query}", "work_item_href": f"/awooop/work-items?{work_item_query}", "triage": { "schema_version": "km_stale_callback_owner_review_triage_v1", "flow_stage": "callback_observed_owner_review_link_missing", "ai_lead_agent": "Hermes", "supporting_agents": ["OpenClaw", "ElephantAlpha"], "automation_state": "manual_owner_review_required", "safe_to_auto_repair": False, "blocking_reason": reason or "no_matching_completion_item", "matching_strategy": "related_incident_id_exact_match", "already_done": [ "callback_reply_persisted", "completion_queue_checked", "generated_read_only_work_item", ], "next_actions": [ "review_runs_callback_evidence", "queue_matching_km_stale_candidate", "complete_owner_review_after_owner_approval", ], }, "writes_on_read": False, "manual_review_required": True, "batch_writes_allowed": False, } def _outbound_timeline_status( send_status: str, callback_reply: dict[str, Any] | None, ) -> str: """Prefer callback delivery status when the outbound row records one.""" if callback_reply: status = callback_reply.get("status") if isinstance(status, str) and status: return status return send_status def _outbound_timeline_summary( *, content_preview: str | None, send_error: str | None, callback_reply: dict[str, Any] | None, ) -> str | None: """Summarize callback reply state without forcing operators to inspect raw JSON.""" if not callback_reply: return content_preview or send_error parts = [ f"callback={callback_reply.get('action') or '--'}", f"incident={callback_reply.get('incident_id') or '--'}", f"status={callback_reply.get('status') or '--'}", ] parse_mode = callback_reply.get("parse_mode") if parse_mode: parts.append(f"parse_mode={parse_mode}") error = callback_reply.get("error") if error: parts.append(f"error={error}") if content_preview: parts.append(str(content_preview)) return " · ".join(parts) def _outbound_timeline_metadata( row: AwoooPOutboundMessage, callback_reply: dict[str, Any] | None, ) -> dict[str, Any]: """Build compact outbound metadata with callback fields first when present.""" metadata: dict[str, Any] = {} if callback_reply: metadata.update({ "callback_status": callback_reply.get("status"), "callback_action": callback_reply.get("action"), "callback_incident_id": callback_reply.get("incident_id"), "callback_parse_mode": callback_reply.get("parse_mode"), }) metadata.update({ "message_type": row.message_type, "provider_message_id": row.provider_message_id, "triggered_by_state": row.triggered_by_state, }) return metadata def _validate_cicd_stage_filter(value: str | None) -> str | None: """Normalize a CI/CD stage filter without allowing arbitrary SQL fragments.""" if value is None: return None stage = value.strip().lower() if not stage: return None if not _CICD_STAGE_RE.fullmatch(stage): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail="stage 格式錯誤,僅允許 a-z、0-9、底線、冒號與短橫線", ) return stage def _validate_cicd_status_filter(value: str | None) -> str | None: """Normalize and validate CI/CD status filter.""" if value is None: return None status_value = value.strip().lower() if not status_value: return None if status_value not in _CICD_STATUS_FILTERS: allowed = ", ".join(sorted(_CICD_STATUS_FILTERS)) raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, detail=f"status 必須是: {allowed}", ) return status_value def _cicd_duration_seconds(value: Any) -> int: """Coerce Alertmanager duration_seconds label into a non-negative integer.""" try: duration = int(str(value or "0")) except (TypeError, ValueError): return 0 return max(duration, 0) def _cicd_event_needs_attention(status_value: str | None, severity: str | None) -> bool: """Return whether a CI/CD evidence row should be highlighted for operators.""" normalized_status = str(status_value or "").lower() normalized_severity = str(severity or "").lower() return normalized_status in {"failed", "pending"} or normalized_severity in { "critical", "warning", } def _cicd_event_item_from_row(row: Mapping[str, Any], *, project_id: str) -> dict[str, Any]: """Convert one alert_operation_log CI/CD row into an operator-facing item.""" context = _as_dict(row.get("context")) labels = _as_dict(context.get("labels")) annotations = _as_dict(context.get("annotations")) status_value = str(row.get("status") or labels.get("status") or "").lower() or None severity = str(row.get("severity") or labels.get("severity") or "").lower() or None summary = row.get("summary") or annotations.get("summary") description = row.get("description") or annotations.get("description") workflow_url = row.get("workflow_url") or annotations.get("workflow_url") return { "id": str(row.get("id") or ""), "project_id": project_id, "alertname": str(row.get("alertname") or labels.get("alertname") or ""), "stage": row.get("stage") or labels.get("stage"), "status": status_value, "severity": severity, "commit_sha": row.get("commit_sha") or labels.get("commit"), "triggered_by": row.get("triggered_by") or labels.get("triggered_by"), "duration_seconds": _cicd_duration_seconds( row.get("duration_seconds") or labels.get("duration_seconds") ), "summary": str(summary).strip() if summary else None, "description": str(description).strip() if description else None, "workflow_url": str(workflow_url).strip() if workflow_url else None, "alert_id": row.get("alert_id") or context.get("alert_id"), "source": row.get("source") or context.get("source"), "action_detail": row.get("action_detail"), "needs_attention": _cicd_event_needs_attention(status_value, severity), "created_at": row.get("created_at"), } def _run_callback_reply_summary( outbound_messages: list[AwoooPOutboundMessage], ) -> dict[str, Any]: """Summarize Telegram detail/history callback reply delivery for Run List.""" callback_rows: list[tuple[AwoooPOutboundMessage, dict[str, Any]]] = [] for row in outbound_messages: callback_reply = _outbound_callback_reply(row.source_envelope) if callback_reply: callback_rows.append((row, callback_reply)) if not callback_rows: return { "schema_version": "awooop_run_callback_reply_summary_v1", "status": "no_callback", "total": 0, "sent": 0, "fallback_sent": 0, "rescue_sent": 0, "failed": 0, "needs_human": False, "latest_status": None, "latest_action": None, "latest_incident_id": None, "latest_at": None, "latest_provider_message_id": None, "capture_status": "no_callback", "capture_captured": 0, "capture_partial": 0, "capture_not_captured": 0, "latest_capture_status": None, "latest_capture_missing": [], "latest_capture_next_action": None, } sorted_rows = sorted( callback_rows, key=lambda item: str(item[0].sent_at or item[0].queued_at or ""), reverse=True, ) latest_row, latest_callback = sorted_rows[0] statuses = [ str(callback.get("status") or "") for _, callback in sorted_rows ] failed = statuses.count("callback_reply_failed") latest_status = str(latest_callback.get("status") or "") summary_status = _callback_reply_public_status(latest_callback) capture_rows = [ _callback_reply_capture_status_from_outbound(row, callback) for row, callback in sorted_rows ] capture_statuses = [ str(capture.get("status") or "observed") for capture in capture_rows ] capture_not_captured = capture_statuses.count("not_captured") capture_partial = capture_statuses.count("partial") capture_captured = capture_statuses.count("captured") latest_capture = capture_rows[0] if capture_rows else {} if capture_not_captured > 0: capture_status = "not_captured" elif capture_partial > 0: capture_status = "partial" elif capture_captured > 0 and capture_captured == len(capture_rows): capture_status = "captured" else: capture_status = "observed" return { "schema_version": "awooop_run_callback_reply_summary_v1", "status": summary_status, "total": len(sorted_rows), "sent": statuses.count("callback_reply_sent"), "fallback_sent": statuses.count("callback_reply_fallback_sent"), "rescue_sent": statuses.count("callback_reply_rescue_sent"), "failed": failed, "needs_human": failed > 0 or latest_status == "callback_reply_failed", "latest_status": latest_status or None, "latest_action": latest_callback.get("action"), "latest_incident_id": latest_callback.get("incident_id"), "latest_at": latest_row.sent_at or latest_row.queued_at, "latest_provider_message_id": latest_row.provider_message_id, "capture_status": capture_status, "capture_captured": capture_captured, "capture_partial": capture_partial, "capture_not_captured": capture_not_captured, "latest_capture_status": latest_capture.get("status"), "latest_capture_missing": latest_capture.get("missing") or [], "latest_capture_next_action": latest_capture.get("next_action"), } def _mcp_gateway_summary_row(row: AwoooPMcpGatewayAudit) -> dict[str, Any]: """Convert SQLAlchemy audit rows into the truth-chain summary shape.""" return { "agent_id": row.agent_id, "tool_name": row.tool_name, "result_status": row.result_status, "block_gate": row.block_gate, "gate_result": row.gate_result or {}, } def _as_dict(value: Any) -> dict[str, Any]: """Return dict payloads defensively; DB JSON fields may be null or stale.""" return value if isinstance(value, dict) else {} def _append_unique(values: list[str], candidate: Any) -> None: """Append non-empty string once while preserving discovery order.""" text_value = str(candidate or "").strip() if text_value and text_value not in values: values.append(text_value) def _append_incident_ids_from_text(values: list[str], text_value: Any) -> None: """Extract incident ids from legacy text payloads.""" if not text_value: return for incident_id in _INCIDENT_ID_RE.findall(str(text_value)): _append_unique(values, incident_id) def _append_incident_ids_from_source_envelope(values: list[str], envelope: Any) -> None: """Extract incident ids from AwoooP channel event source_refs.""" source_refs = _as_dict(_as_dict(envelope).get("source_refs")) incident_ids = source_refs.get("incident_ids") if isinstance(incident_ids, list): for incident_id in incident_ids: _append_unique(values, incident_id) else: _append_unique(values, incident_ids) def _collect_run_incident_ids( *, run: AwoooPRunState, inbound_events: list[AwoooPConversationEvent], outbound_messages: list[AwoooPOutboundMessage], ) -> list[str]: """Collect incident ids that tie a Run back to legacy incident evidence.""" incident_ids: list[str] = [] _append_incident_ids_from_text(incident_ids, run.trigger_ref) _append_incident_ids_from_text(incident_ids, run.error_detail) for event in inbound_events: _append_incident_ids_from_source_envelope(incident_ids, event.source_envelope) _append_incident_ids_from_text(incident_ids, event.content_preview) _append_incident_ids_from_text(incident_ids, event.content_redacted) for message in outbound_messages: _append_incident_ids_from_source_envelope(incident_ids, message.source_envelope) _append_incident_ids_from_text(incident_ids, message.content_preview) _append_incident_ids_from_text(incident_ids, message.send_error) return incident_ids async def _load_run_message_context( db: Any, runs: list[AwoooPRunState], *, limit: int = _MAX_LIST_CONTEXT_ROWS, ) -> tuple[ dict[UUID, list[AwoooPConversationEvent]], dict[UUID, list[AwoooPOutboundMessage]], ]: """Load list-page sidecar events needed to link runs back to incidents.""" if not runs: return {}, {} run_ids = [run.run_id for run in runs] run_ids_set = set(run_ids) trigger_refs = [str(run.trigger_ref) for run in runs if run.trigger_ref] trigger_ref_to_run = { str(run.trigger_ref): run.run_id for run in runs if run.trigger_ref } trigger_event_ids: list[UUID] = [] for trigger_ref in trigger_refs: try: trigger_event_ids.append(uuid.UUID(trigger_ref)) except ValueError: continue inbound_filters = [AwoooPConversationEvent.run_id.in_(run_ids)] if trigger_refs: inbound_filters.append(AwoooPConversationEvent.provider_event_id.in_(trigger_refs)) if trigger_event_ids: inbound_filters.append(AwoooPConversationEvent.event_id.in_(trigger_event_ids)) inbound_result = await db.execute( select(AwoooPConversationEvent) .where(sa_or(*inbound_filters)) .order_by(AwoooPConversationEvent.received_at.desc()) .limit(limit) ) inbound_by_run: dict[UUID, list[AwoooPConversationEvent]] = defaultdict(list) for event in inbound_result.scalars().all(): target_run_id = event.run_id if event.run_id in run_ids_set else None if target_run_id is None: target_run_id = trigger_ref_to_run.get(str(event.provider_event_id)) if target_run_id is None: target_run_id = trigger_ref_to_run.get(str(event.event_id)) if target_run_id is not None: inbound_by_run[target_run_id].append(event) outbound_result = await db.execute( select(AwoooPOutboundMessage) .where(AwoooPOutboundMessage.run_id.in_(run_ids)) .order_by(AwoooPOutboundMessage.queued_at.desc()) .limit(limit) ) outbound_by_run: dict[UUID, list[AwoooPOutboundMessage]] = defaultdict(list) for message in outbound_result.scalars().all(): outbound_by_run[message.run_id].append(message) return dict(inbound_by_run), dict(outbound_by_run) def _list_filter_context_limit(candidate_count: int) -> int: return min(max(candidate_count * 4, _MAX_LIST_CONTEXT_ROWS), 20_000) def _route_label_from_remediation(item: dict[str, Any]) -> str: """Render remediation MCP route consistently with Telegram / Work Items.""" return "/".join( str(part) for part in ( item.get("agent_id"), item.get("tool_name"), item.get("required_scope"), ) if part ) or "--" def _route_label_from_legacy_mcp(record: dict[str, Any]) -> str: """Render self-built/legacy MCP evidence as agent/tool/scope for list UX.""" tool = record.get("tool_name") server = record.get("mcp_server") tool_label = ".".join(str(part) for part in (server, tool) if part) or tool return "/".join( str(part) for part in ( record.get("agent_role"), tool_label, "read", ) if part ) or "--" def _remediation_timeline_status(item: dict[str, Any]) -> str: if item.get("success") is False or item.get("allowed") is False: return "failed" if item.get("verification_result_preview") == "success": return "success" return "warning" def _remediation_timeline_summary(item: dict[str, Any]) -> str: return ( f"incident={item.get('incident_id') or '--'} " f"mode={item.get('mode') or '--'} " f"preview={item.get('verification_result_preview') or '--'} " f"route={_route_label_from_remediation(item)} " f"writes_incident={item.get('writes_incident_state')} " f"writes_auto_repair={item.get('writes_auto_repair_result')}" )[:500] def _legacy_mcp_timeline_status(record: dict[str, Any]) -> str: if record.get("success") is True: return "success" if record.get("success") is False: return "failed" return "warning" def _legacy_mcp_timeline_summary(record: dict[str, Any]) -> str: return ( f"incident={record.get('incident_id') or '--'} " f"agent={record.get('agent_role') or '--'} " f"node={record.get('flywheel_node') or '--'} " f"duration_ms={record.get('duration_ms') if record.get('duration_ms') is not None else '--'} " f"error={record.get('error_message') or '--'}" )[:500] def _run_remediation_list_summary( *, run: AwoooPRunState, incident_ids: list[str], items: list[dict[str, Any]], legacy_mcp_records: list[dict[str, Any]] | None = None, errors: list[dict[str, str]] | None = None, ) -> dict[str, Any]: """Summarize durable ADR-100 dry-run and MCP investigation evidence for list UX.""" sorted_items = sorted( (item for item in items if isinstance(item, dict)), key=lambda item: str(item.get("created_at") or ""), reverse=True, ) sorted_mcp_records = sorted( (record for record in (legacy_mcp_records or []) if isinstance(record, dict)), key=lambda record: str(record.get("created_at") or ""), reverse=True, ) latest = sorted_items[0] if sorted_items else {} latest_mcp = sorted_mcp_records[0] if sorted_mcp_records else {} writes_incident = latest.get("writes_incident_state") writes_auto_repair = latest.get("writes_auto_repair_result") route = ( _route_label_from_remediation(latest) if latest else _route_label_from_legacy_mcp(latest_mcp) if latest_mcp else "--" ) write_observed = writes_incident is True or writes_auto_repair is True is_read_only = ( bool(latest) and latest.get("required_scope") == "read" and writes_incident is False and writes_auto_repair is False ) mcp_total = len(sorted_mcp_records) mcp_success = sum(1 for record in sorted_mcp_records if record.get("success") is True) mcp_failed = sum(1 for record in sorted_mcp_records if record.get("success") is False) if not sorted_items: status_value = "mcp_observed" if mcp_total > 0 else "no_evidence" elif latest.get("success") is False or latest.get("allowed") is False: status_value = "blocked" elif write_observed: status_value = "write_observed" elif is_read_only: status_value = "read_only_dry_run" else: status_value = "observed" return { "schema_version": "awooop_run_remediation_summary_v1", "source": "alert_operation_log" if sorted_items else "mcp_audit_log" if mcp_total > 0 else "none", "incident_ids": incident_ids, "total": len(sorted_items), "evidence_total": len(sorted_items) + mcp_total, "status": status_value, "has_dry_run": bool(sorted_items), "has_mcp_investigation": mcp_total > 0, "is_read_only": is_read_only, "human_gate_open": run.state == "waiting_approval", "latest_at": latest.get("created_at"), "latest_preview": latest.get("verification_result_preview"), "latest_mode": latest.get("mode"), "latest_route": route, "latest_agent_id": latest.get("agent_id") or latest_mcp.get("agent_role"), "latest_tool_name": latest.get("tool_name") or latest_mcp.get("tool_name"), "latest_required_scope": latest.get("required_scope") or ("read" if latest_mcp else None), "writes_incident_state": writes_incident, "writes_auto_repair_result": writes_auto_repair, "mcp_observation_total": mcp_total, "mcp_observation_success": mcp_success, "mcp_observation_failed": mcp_failed, "latest_mcp_server": latest_mcp.get("mcp_server"), "errors": errors or [], } def _safe_int(value: Any) -> int: try: return int(value or 0) except (TypeError, ValueError): return 0 def _latest_remediation_history_item( history: dict[str, Any] | None, ) -> dict[str, Any]: if not isinstance(history, dict): return {} items = history.get("items") if isinstance(history.get("items"), list) else [] latest = items[0] if items and isinstance(items[0], dict) else {} return latest def _remediation_evidence_state(history: dict[str, Any] | None) -> str: """Classify ADR-100 evidence with the same operator semantics as Telegram.""" if not isinstance(history, dict): return "missing" total = _safe_int(history.get("total")) if total <= 0: if history.get("status") == "fetch_failed": return "fetch_failed" return "missing" latest = _latest_remediation_history_item(history) if latest.get("writes_incident_state") or latest.get("writes_auto_repair_result"): return "write_observed" if latest.get("allowed") is False or latest.get("success") is False: return "blocked" if ( str(latest.get("safety_level") or "").lower() == "read_only" or str(latest.get("required_scope") or "").lower() == "read" ): return "read_only" return "observed" def _select_status_chain_source_id( incident_ids: list[str], remediation_history: dict[str, Any] | None, ) -> str | None: latest_incident_id = str( _latest_remediation_history_item(remediation_history).get("incident_id") or "" ).strip() if latest_incident_id and latest_incident_id in incident_ids: return latest_incident_id return incident_ids[0] if incident_ids else latest_incident_id or None def _status_chain_mcp_section(truth_chain: dict[str, Any] | None) -> dict[str, Any]: mcp = truth_chain.get("mcp") if isinstance(truth_chain, dict) else {} if not isinstance(mcp, dict): mcp = {} gateway = mcp.get("awooop_gateway") if isinstance(mcp.get("awooop_gateway"), dict) else {} legacy = mcp.get("legacy") if isinstance(mcp.get("legacy"), dict) else {} top_tools: list[dict[str, Any]] = [] seen_tools: set[str] = set() for source, summary in (("gateway", gateway), ("legacy", legacy)): by_tool = summary.get("by_tool") if isinstance(summary, dict) else [] if not isinstance(by_tool, list): continue for item in by_tool: if not isinstance(item, dict): continue tool_name = str(item.get("tool_name") or "unknown").strip() or "unknown" key = f"{source}:{tool_name}" if key in seen_tools: continue seen_tools.add(key) top_tools.append({ "source": source, "tool_name": tool_name, "total": ( _safe_int(item.get("total")) or _safe_int(item.get("success")) + _safe_int(item.get("failed")) + _safe_int(item.get("blocked")) ), "success": _safe_int(item.get("success")), "failed": _safe_int(item.get("failed")), "blocked": _safe_int(item.get("blocked")), "last_error": item.get("last_error"), }) if len(top_tools) >= 5: break if len(top_tools) >= 5: break return { "gateway": { "total": _safe_int(gateway.get("total")), "success": _safe_int(gateway.get("success")), "failed": _safe_int(gateway.get("failed")), "blocked": _safe_int(gateway.get("blocked")), "first_class_total": _safe_int(gateway.get("first_class_total")), "legacy_bridge_total": _safe_int(gateway.get("legacy_bridge_total")), "policy_enforced_total": _safe_int(gateway.get("policy_enforced_total")), "stage": gateway.get("stage"), "stage_status": gateway.get("stage_status"), }, "legacy": { "total": _safe_int(legacy.get("total")), "success": _safe_int(legacy.get("success")), "failed": _safe_int(legacy.get("failed")), }, "top_tools": top_tools, } def _first_non_empty(row: Mapping[str, Any], keys: tuple[str, ...]) -> Any: for key in keys: value = row.get(key) if value not in (None, ""): return value return None def _status_chain_execution_section(truth_chain: dict[str, Any] | None) -> dict[str, Any]: execution = truth_chain.get("execution") if isinstance(truth_chain, dict) else {} if not isinstance(execution, dict): execution = {} ops = execution.get("automation_operation_log") if not isinstance(ops, list): ops = [] latest_op = ops[0] if ops and isinstance(ops[0], dict) else {} playbook_ids: list[str] = [] playbook_paths: list[str] = [] for row in ops: if not isinstance(row, dict): continue _append_unique(playbook_ids, row.get("matched_playbook_id")) _append_unique(playbook_ids, row.get("input_playbook_id")) _append_unique(playbook_ids, row.get("output_playbook_id")) _append_unique(playbook_paths, row.get("input_playbook_path")) _append_unique(playbook_paths, row.get("output_playbook_path")) _append_unique(playbook_paths, row.get("input_ansible_playbook_path")) _append_unique(playbook_paths, row.get("output_ansible_playbook_path")) ansible = execution.get("ansible") if isinstance(execution.get("ansible"), dict) else {} ansible_records = ansible.get("records") if isinstance(ansible.get("records"), list) else [] latest_ansible = ( ansible_records[0] if ansible_records and isinstance(ansible_records[0], dict) else {} ) candidate_catalog = ( ansible.get("candidate_catalog") if isinstance(ansible.get("candidate_catalog"), dict) else {} ) candidates = ( candidate_catalog.get("candidates") if isinstance(candidate_catalog.get("candidates"), list) else [] ) return { "operation_total": len(ops), "latest_operation_type": latest_op.get("operation_type"), "latest_status": latest_op.get("status"), "latest_actor": latest_op.get("actor"), "latest_action": _first_non_empty(latest_op, ("input_action", "output_action")), "latest_executor": _first_non_empty( latest_op, ( "input_executor", "output_executor", "input_execution_backend", "output_execution_backend", ), ), "playbook_ids": playbook_ids[:5], "playbook_paths": playbook_paths[:5], "ansible": { "considered": bool(ansible.get("considered")), "record_total": len(ansible_records), "candidate_count": len(candidates), "not_used_reason": ansible.get("not_used_reason"), "latest_operation_type": latest_ansible.get("operation_type"), "latest_status": latest_ansible.get("status"), "latest_playbook_path": latest_ansible.get("playbook_path"), "latest_check_mode": latest_ansible.get("check_mode"), "candidate_playbooks": [ { "catalog_id": item.get("catalog_id"), "playbook_path": item.get("playbook_path"), "risk_level": item.get("risk_level"), "match_score": item.get("match_score"), } for item in candidates[:3] if isinstance(item, dict) ], }, } def _source_ref_values(envelope: Any, key: str) -> list[str]: if not isinstance(envelope, dict): return [] source_refs = envelope.get("source_refs") if not isinstance(source_refs, dict): return [] raw_values = source_refs.get(key) if isinstance(raw_values, list): return [str(item) for item in raw_values if str(item or "").strip()] if raw_values not in (None, ""): return [str(raw_values)] return [] def _source_correlation_empty( incident_ids: list[str], *, status_value: str, missing_reason: str, ) -> dict[str, Any]: return { "schema_version": _SOURCE_CORRELATION_SCHEMA_VERSION, "status": status_value, "missing_reason": missing_reason, "incident_ids": incident_ids, "direct_ref_total": 0, "candidate_total": 0, "applied_link_total": 0, "provider_event_total": 0, "latest_applied_link_at": None, "verification_status": status_value, "providers": { provider: { "direct_ref_total": 0, "candidate_total": 0, "applied_link_total": 0, "latest_event_at": None, "latest_heartbeat_at": None, "latest_applied_link_at": None, } for provider in _SOURCE_CORRELATION_PROVIDERS }, "top_candidates": [], "matching_criteria": [ "source_correlation_linked_stage", "direct_source_ref", "fingerprint_overlap", "alertname_overlap", "service_or_namespace_overlap", "severity_overlap", ], } def _normalize_correlation_value(value: Any) -> str: if hasattr(value, "value"): value = value.value return str(value or "").strip().lower() def _append_correlation_term(values: list[str], value: Any) -> None: term = _normalize_correlation_value(value) if term in {"", "--", "n/a", "none", "null", "unknown"}: return if len(term) < 2: return if term not in values: values.append(term) def _intersection(left: list[str], right: list[str]) -> list[str]: right_set = set(right) return [item for item in left if item in right_set] def _as_utc_naive(value: Any) -> datetime | None: if not isinstance(value, datetime): return None if value.tzinfo is not None: return value.astimezone(UTC).replace(tzinfo=None) return value def _iso_or_none(value: Any) -> str | None: if hasattr(value, "isoformat"): return value.isoformat() if value in (None, ""): return None return str(value) def _incident_correlation_context(record: IncidentRecord) -> dict[str, list[str]]: """Build compact incident terms used only for read-only source matching.""" alertnames: list[str] = [] severities: list[str] = [] fingerprints: list[str] = [] namespaces: list[str] = [] targets: list[str] = [] _append_correlation_term(alertnames, record.alertname) _append_correlation_term(severities, record.severity) for service in record.affected_services or []: _append_correlation_term(targets, service) for signal in record.signals or []: if not isinstance(signal, dict): continue _append_correlation_term(alertnames, signal.get("alert_name")) _append_correlation_term(severities, signal.get("severity")) _append_correlation_term(fingerprints, signal.get("fingerprint")) labels = _as_dict(signal.get("labels")) annotations = _as_dict(signal.get("annotations")) _append_correlation_term(alertnames, labels.get("alertname")) _append_correlation_term(fingerprints, labels.get("fingerprint")) for key in ( "namespace", "kubernetes_namespace", ): _append_correlation_term(namespaces, labels.get(key)) for key in ( "service", "service_name", "pod", "pod_name", "deployment", "deployment_name", "container", "job", "instance", "target", "target_resource", "workload", "app", "app.kubernetes.io/name", ): _append_correlation_term(targets, labels.get(key)) for key in ("summary", "description"): _append_correlation_term(alertnames, annotations.get(key)) return { "incident_ids": [record.incident_id], "alertnames": alertnames, "severities": severities, "fingerprints": fingerprints, "namespaces": namespaces, "targets": targets, } def _source_event_correlation_context(row: Mapping[str, Any]) -> dict[str, Any]: envelope = _as_dict(row.get("source_envelope")) source_refs = _as_dict(envelope.get("source_refs")) log_correlation = _as_dict(envelope.get("log_correlation")) labels = _as_dict(envelope.get("labels")) annotations = _as_dict(envelope.get("annotations")) alertnames: list[str] = [] severities: list[str] = [] fingerprints: list[str] = [] namespaces: list[str] = [] targets: list[str] = [] _append_correlation_term(alertnames, log_correlation.get("alertname")) _append_correlation_term(alertnames, labels.get("alertname")) for value in _source_ref_values(envelope, "signoz_alerts"): _append_correlation_term(alertnames, value) _append_correlation_term(severities, log_correlation.get("severity")) _append_correlation_term(severities, labels.get("severity")) _append_correlation_term(fingerprints, log_correlation.get("fingerprint")) _append_correlation_term(fingerprints, labels.get("fingerprint")) for value in _source_ref_values(envelope, "fingerprints"): _append_correlation_term(fingerprints, value) for key in ("namespace", "kubernetes_namespace"): _append_correlation_term(namespaces, log_correlation.get(key)) _append_correlation_term(namespaces, labels.get(key)) for key in ( "target_resource", "service", "service_name", "pod", "pod_name", "deployment", "deployment_name", "container", "job", "instance", "target", "workload", "app", "app.kubernetes.io/name", ): _append_correlation_term(targets, log_correlation.get(key)) _append_correlation_term(targets, labels.get(key)) for key in ("summary", "description"): _append_correlation_term(alertnames, annotations.get(key)) return { "provider": str(row.get("provider") or envelope.get("provider") or "").lower(), "stage": str(row.get("stage") or envelope.get("stage") or ""), "provider_event_id": row.get("provider_event_id") or envelope.get("provider_event_id"), "received_at": row.get("received_at"), "source_refs": source_refs, "incident_ids": _source_ref_values(envelope, "incident_ids"), "alertnames": alertnames, "severities": severities, "fingerprints": fingerprints, "namespaces": namespaces, "targets": targets, } def _score_source_correlation_event( incident_context: dict[str, list[str]], event_context: dict[str, Any], ) -> dict[str, Any]: """Return a deterministic, read-only source-match score for UI evidence.""" reasons: list[str] = [] score = 0 is_direct = False if _intersection(incident_context["incident_ids"], event_context["incident_ids"]): is_direct = True score += 100 reasons.append("direct_incident_ref") fingerprint_hits = _intersection( incident_context["fingerprints"], event_context["fingerprints"], ) if fingerprint_hits: is_direct = True score += 80 reasons.append("fingerprint_overlap") if _intersection(incident_context["alertnames"], event_context["alertnames"]): score += 35 reasons.append("alertname_overlap") if _intersection(incident_context["targets"], event_context["targets"]): score += 25 reasons.append("target_overlap") if _intersection(incident_context["namespaces"], event_context["namespaces"]): score += 10 reasons.append("namespace_overlap") if _intersection(incident_context["severities"], event_context["severities"]): score += 5 reasons.append("severity_overlap") return { "is_direct": is_direct, "is_candidate": bool(is_direct or score >= 35), "score": min(score, 100), "reasons": reasons[:5], } def _is_source_correlation_applied_link( event_context: dict[str, Any], scored: dict[str, Any], ) -> bool: """Applied source links must be append-only events that still match directly.""" return ( str(event_context.get("stage") or "").lower() == "source_correlation_linked" and bool(scored.get("is_direct")) ) async def _fetch_source_correlation_summary( *, project_id: str, incident_ids: list[str], ) -> dict[str, Any]: """Fetch read-only Sentry/SigNoz evidence candidates for incident status-chain.""" if not incident_ids: return _source_correlation_empty( incident_ids, status_value="no_incident_context", missing_reason="no_incident_ids", ) safe_project_id = project_id or "awoooi" async with get_db_context(safe_project_id) as db: incident_result = await db.execute( select(IncidentRecord) .where(IncidentRecord.project_id == safe_project_id) .where(IncidentRecord.incident_id.in_(incident_ids)) ) incident_rows = list(incident_result.scalars().all()) if not incident_rows: heartbeat_rows = [] source_rows = [] else: now = _utc_now_naive() created_candidates = [ value for value in (_as_utc_naive(row.created_at) for row in incident_rows) if value is not None ] earliest_created = min(created_candidates) if created_candidates else now window_start = max( earliest_created - timedelta(hours=_SOURCE_CORRELATION_PRE_WINDOW_HOURS), now - timedelta(days=_SOURCE_CORRELATION_LOOKBACK_DAYS), ) provider_sql = ( "LOWER(COALESCE(NULLIF(source_envelope->>'provider', ''), " "NULLIF(split_part(provider_event_id, ':', 1), ''), channel_type))" ) source_result = await db.execute( text(f""" SELECT event_id::text AS event_id, project_id, channel_type, provider_event_id, content_preview, source_envelope, received_at, {provider_sql} AS provider, LOWER(COALESCE(source_envelope->>'stage', '')) AS stage FROM awooop_conversation_event WHERE project_id = :project_id AND {provider_sql} IN ('sentry', 'signoz') AND LOWER(COALESCE(source_envelope->>'stage', '')) <> 'heartbeat' AND received_at >= :window_start ORDER BY received_at DESC LIMIT :limit """), { "project_id": safe_project_id, "window_start": window_start, "limit": _SOURCE_CORRELATION_EVENT_LIMIT, }, ) source_rows = list(source_result.mappings().all()) heartbeat_result = await db.execute( text(f""" SELECT {provider_sql} AS provider, MAX(received_at) AS latest_heartbeat_at FROM awooop_conversation_event WHERE project_id = :project_id AND {provider_sql} IN ('sentry', 'signoz') AND LOWER(COALESCE(source_envelope->>'stage', '')) = 'heartbeat' GROUP BY {provider_sql} """), {"project_id": safe_project_id}, ) heartbeat_rows = list(heartbeat_result.mappings().all()) if not incident_rows: summary = _source_correlation_empty( incident_ids, status_value="no_incident_context", missing_reason="incident_not_found", ) return summary contexts = [_incident_correlation_context(row) for row in incident_rows] summary = _source_correlation_empty( incident_ids, status_value="missing", missing_reason="no_matching_provider_source_event", ) providers = summary["providers"] for heartbeat in heartbeat_rows: provider = str(heartbeat.get("provider") or "").lower() if provider in providers: providers[provider]["latest_heartbeat_at"] = _iso_or_none( heartbeat.get("latest_heartbeat_at") ) top_candidates: list[dict[str, Any]] = [] for row in source_rows: event_context = _source_event_correlation_context(row) provider = str(event_context.get("provider") or "").lower() if provider not in providers: continue provider_item = providers[provider] if provider_item.get("latest_event_at") is None: provider_item["latest_event_at"] = _iso_or_none(row.get("received_at")) best_match: dict[str, Any] | None = None for context in contexts: scored = _score_source_correlation_event(context, event_context) if best_match is None or scored["score"] > best_match["score"]: best_match = scored if not best_match or not best_match["is_candidate"]: continue summary["provider_event_total"] += 1 if best_match["is_direct"]: summary["direct_ref_total"] += 1 provider_item["direct_ref_total"] += 1 else: summary["candidate_total"] += 1 provider_item["candidate_total"] += 1 is_applied_link = _is_source_correlation_applied_link( event_context, best_match, ) if is_applied_link: applied_at = _iso_or_none(row.get("received_at")) summary["applied_link_total"] += 1 provider_item["applied_link_total"] += 1 if summary.get("latest_applied_link_at") is None: summary["latest_applied_link_at"] = applied_at if provider_item.get("latest_applied_link_at") is None: provider_item["latest_applied_link_at"] = applied_at top_candidates.append( { "provider": provider, "provider_event_id": str(event_context.get("provider_event_id") or ""), "stage": str(event_context.get("stage") or ""), "score": best_match["score"], "match_type": "direct" if best_match["is_direct"] else "candidate", "link_state": ( "applied" if is_applied_link else "direct_ref" if best_match["is_direct"] else "candidate" ), "verification_status": ( "applied_link_verified" if is_applied_link else "direct_ref_verified" if best_match["is_direct"] else "candidate_only" ), "reasons": best_match["reasons"], "received_at": _iso_or_none(row.get("received_at")), } ) if summary["applied_link_total"] > 0: summary["status"] = "linked" summary["verification_status"] = "applied_link_verified" summary["missing_reason"] = None elif summary["direct_ref_total"] > 0: summary["status"] = "linked" summary["verification_status"] = "direct_ref_verified" summary["missing_reason"] = None elif summary["candidate_total"] > 0: summary["status"] = "candidate_found" summary["verification_status"] = "candidate_only" summary["missing_reason"] = None elif any(item.get("latest_heartbeat_at") for item in providers.values()): summary["status"] = "provider_fresh_no_match" summary["verification_status"] = "provider_fresh_no_match" summary["missing_reason"] = "provider_heartbeat_present_but_no_incident_match" summary["top_candidates"] = sorted( top_candidates, key=lambda item: (item.get("score") or 0, item.get("received_at") or ""), reverse=True, )[:5] return summary def _status_chain_source_section(truth_chain: dict[str, Any] | None) -> dict[str, Any]: channel = truth_chain.get("channel") if isinstance(truth_chain, dict) else {} if not isinstance(channel, dict): channel = {} inbound_events = channel.get("inbound_events") outbound_messages = channel.get("outbound_messages") if not isinstance(inbound_events, list): inbound_events = [] if not isinstance(outbound_messages, list): outbound_messages = [] source_refs: dict[str, list[str]] = { "alert_ids": [], "sentry_issue_ids": [], "signoz_alerts": [], "fingerprints": [], "incident_ids": [], } inbound_channels: list[str] = [] for row in inbound_events: if not isinstance(row, dict): continue _append_unique(inbound_channels, row.get("channel_type")) envelope = row.get("source_envelope") for key in source_refs: for value in _source_ref_values(envelope, key): _append_unique(source_refs[key], value) latest_inbound = inbound_events[0] if inbound_events and isinstance(inbound_events[0], dict) else {} latest_outbound = ( outbound_messages[0] if outbound_messages and isinstance(outbound_messages[0], dict) else {} ) return { "inbound_total": len(inbound_events), "outbound_total": len(outbound_messages), "inbound_channels": inbound_channels[:5], "refs": {key: values[:5] for key, values in source_refs.items()}, "latest_inbound": { "channel_type": latest_inbound.get("channel_type"), "provider_event_id": latest_inbound.get("provider_event_id"), "content_type": latest_inbound.get("content_type"), "is_duplicate": latest_inbound.get("is_duplicate"), "received_at": latest_inbound.get("received_at"), }, "latest_outbound": { "channel_type": latest_outbound.get("channel_type"), "message_type": latest_outbound.get("message_type"), "send_status": latest_outbound.get("send_status"), "sent_at": latest_outbound.get("sent_at"), }, } def _build_awooop_status_chain( *, incident_ids: list[str], truth_chain: dict[str, Any] | None = None, remediation_history: dict[str, Any] | None = None, source_id: str | None = None, fetch_error: str | None = None, source_correlation: dict[str, Any] | None = None, ) -> dict[str, Any]: """Build the shared read-only status chain used by Telegram and Operator UI.""" truth_status = ( truth_chain.get("truth_status") if isinstance(truth_chain, dict) and isinstance(truth_chain.get("truth_status"), dict) else {} ) quality = ( truth_chain.get("automation_quality") if isinstance(truth_chain, dict) and isinstance(truth_chain.get("automation_quality"), dict) else {} ) facts = quality.get("facts") if isinstance(quality.get("facts"), dict) else {} latest = _latest_remediation_history_item(remediation_history) remediation_state = _remediation_evidence_state(remediation_history) remediation_total = ( _safe_int(remediation_history.get("total")) if isinstance(remediation_history, dict) else 0 ) latest_route = _route_label_from_remediation(latest) if latest else "--" current_stage = str(truth_status.get("current_stage") or "unknown") stage_status = str(truth_status.get("stage_status") or "unknown") verdict = str(quality.get("verdict") or "unknown") verification = ( facts.get("verification_result") or latest.get("verification_result_preview") or "missing" ) auto_repair_records = _safe_int(facts.get("auto_repair_execution_records")) operation_records = _safe_int(facts.get("automation_operation_records")) gateway_total = _safe_int(facts.get("mcp_gateway_total")) km_entries = _safe_int(facts.get("knowledge_entries")) needs_human = bool(truth_status.get("needs_human")) if verdict == "auto_repaired_verified": repair_state = "auto_repaired_verified" next_step = "monitor_for_regression" elif auto_repair_records > 0 or operation_records > 0: repair_state = ( "executed_pending_verification" if str(verification) == "missing" else "executed" ) next_step = "verify_execution_result" elif remediation_state == "read_only": repair_state = "read_only_dry_run" next_step = "approve_or_escalate_from_awooop" elif remediation_state == "write_observed": repair_state = "write_observed_manual_review" next_step = "review_write_evidence" elif remediation_state == "blocked": repair_state = "blocked_manual_required" next_step = "manual_investigation" elif needs_human: repair_state = "manual_required" next_step = "manual_investigation" else: repair_state = "no_execution_evidence" next_step = "collect_evidence_or_wait" if remediation_state in {"blocked", "fetch_failed"}: needs_human = True if ( remediation_state == "write_observed" and repair_state != "auto_repaired_verified" ): needs_human = True mcp_section = _status_chain_mcp_section(truth_chain) execution_section = _status_chain_execution_section(truth_chain) source_section = _status_chain_source_section(truth_chain) if source_correlation is not None: source_section["correlation"] = source_correlation blockers = [ str(item) for item in [ *(truth_status.get("blockers") if isinstance(truth_status.get("blockers"), list) else []), *(quality.get("blockers") if isinstance(quality.get("blockers"), list) else []), ] if item ] if fetch_error: blockers.append("truth_chain_fetch_failed") return { "schema_version": "awooop_status_chain_v1", "source": "truth_chain+adr100_history", "source_id": source_id, "incident_ids": incident_ids, "current_stage": current_stage, "stage_status": stage_status, "verdict": verdict, "repair_state": repair_state, "verification": str(verification), "needs_human": needs_human, "next_step": next_step, "blockers": blockers[:8], "fetch_error": fetch_error, "evidence": { "auto_repair_records": auto_repair_records, "operation_records": operation_records, "mcp_gateway_total": gateway_total, "knowledge_entries": km_entries, "remediation_total": remediation_total, "remediation_state": remediation_state, "latest_route": latest_route, "latest_mode": latest.get("mode"), "latest_at": latest.get("created_at"), "latest_preview": latest.get("verification_result_preview"), }, "writes": { "incident": latest.get("writes_incident_state"), "auto_repair": latest.get("writes_auto_repair_result"), }, "mcp": mcp_section, "execution": execution_section, "source_refs": source_section, } async def _fetch_awooop_status_chain( *, incident_ids: list[str], project_id: str, remediation_history: dict[str, Any] | None, ) -> dict[str, Any]: """Fetch read-only truth-chain state and merge it with ADR-100 evidence.""" source_id = _select_status_chain_source_id(incident_ids, remediation_history) truth_chain: dict[str, Any] | None = None fetch_error: str | None = None if source_id: try: truth_chain = await fetch_truth_chain( source_id=source_id, project_id=project_id or "awoooi", ) except Exception as exc: fetch_error = str(exc) logger.warning( "operator_awooop_status_chain_fetch_failed", source_id=source_id, project_id=project_id, error=fetch_error, ) try: source_correlation = await _fetch_source_correlation_summary( incident_ids=incident_ids, project_id=project_id or "awoooi", ) except Exception as exc: logger.warning( "operator_source_correlation_fetch_failed", incident_ids=incident_ids, project_id=project_id, error=str(exc), ) source_correlation = _source_correlation_empty( incident_ids, status_value="fetch_failed", missing_reason="source_correlation_fetch_failed", ) return _build_awooop_status_chain( incident_ids=incident_ids, truth_chain=truth_chain, remediation_history=remediation_history, source_id=source_id, fetch_error=fetch_error, source_correlation=source_correlation, ) async def get_awooop_status_chain( *, project_id: str | None, incident_ids: list[str], ) -> dict[str, Any]: """Return the shared AwoooP status chain for UI surfaces without writing state.""" normalized_incident_ids: list[str] = [] for incident_id in incident_ids: safe_incident_id = str(incident_id or "").strip() if not safe_incident_id: continue _validate_incident_id_filter(safe_incident_id) _append_unique(normalized_incident_ids, safe_incident_id) if not normalized_incident_ids: return _build_awooop_status_chain(incident_ids=[], source_id=None) remediation_history = await _fetch_run_remediation_history( normalized_incident_ids, limit=5, ) return await _fetch_awooop_status_chain( incident_ids=normalized_incident_ids, project_id=project_id or "awoooi", remediation_history=remediation_history, ) def _validate_remediation_status_filter(value: str | None) -> None: if value is None: return if value not in _REMEDIATION_STATUS_FILTERS: allowed = ", ".join(sorted(_REMEDIATION_STATUS_FILTERS)) raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"remediation_status 必須是: {allowed}", ) def _validate_callback_reply_status_filter(value: str | None) -> None: if value is None: return if value not in _CALLBACK_REPLY_STATUS_FILTERS: allowed = ", ".join(sorted(_CALLBACK_REPLY_STATUS_FILTERS)) raise HTTPException( status_code=422, detail=f"callback_reply_status 必須是: {allowed}", ) def _validate_callback_reply_action_filter(value: str | None) -> str | None: if value is None: return None normalized = value.strip().lower() if not normalized: return None if not _CALLBACK_REPLY_ACTION_RE.fullmatch(normalized): raise HTTPException( status_code=422, detail="callback action 格式錯誤,僅允許 a-z、0-9、底線、冒號與短橫線", ) return normalized def _validate_incident_id_filter(value: str | None) -> None: if value is None: return if not _INCIDENT_ID_RE.fullmatch(value): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="incident_id 格式錯誤,必須是 INC-YYYYMMDD-XXXX", ) def _remediation_summary_matches_status( summary: dict[str, Any] | None, remediation_status: str | None, ) -> bool: if remediation_status is None: return True status_value = str((summary or {}).get("status") or "no_evidence") return status_value == remediation_status def _callback_reply_summary_matches_status( summary: dict[str, Any] | None, callback_reply_status: str | None, ) -> bool: if callback_reply_status is None: return True status_value = str((summary or {}).get("status") or "no_callback") return status_value == callback_reply_status def _remediation_summary_matches_incident_id( summary: dict[str, Any] | None, incident_id: str | None, ) -> bool: if incident_id is None: return True incident_ids = (summary or {}).get("incident_ids") return isinstance(incident_ids, list) and incident_id in incident_ids async def _build_run_remediation_summaries( *, runs: list[AwoooPRunState], inbound_by_run: dict[UUID, list[AwoooPConversationEvent]], outbound_by_run: dict[UUID, list[AwoooPOutboundMessage]], ) -> dict[UUID, dict[str, Any]]: """Build remediation summaries for list endpoints without writing state.""" if not runs: return {} incident_ids_by_run: dict[UUID, list[str]] = {} all_incident_ids: list[str] = [] for run in runs: incident_ids = _collect_run_incident_ids( run=run, inbound_events=inbound_by_run.get(run.run_id, []), outbound_messages=outbound_by_run.get(run.run_id, []), ) incident_ids_by_run[run.run_id] = incident_ids for incident_id in incident_ids: _append_unique(all_incident_ids, incident_id) histories_by_incident: dict[str, list[dict[str, Any]]] = {} legacy_mcp_by_incident: dict[str, list[dict[str, Any]]] = {} errors_by_incident: dict[str, dict[str, str]] = {} if all_incident_ids: from src.services.adr100_remediation_service import Adr100RemediationService service = Adr100RemediationService(record_history=False) for incident_id in all_incident_ids: try: history = await service.history( limit=_REMEDIATION_HISTORY_LIMIT, incident_id=incident_id, ) histories_by_incident[incident_id] = [ item for item in history.get("items", []) if isinstance(item, dict) ] except Exception as exc: logger.warning( "run_list_remediation_history_fetch_failed", incident_id=incident_id, error=str(exc), ) errors_by_incident[incident_id] = { "incident_id": incident_id, "error": str(exc), } legacy_mcp_by_incident = await _fetch_legacy_mcp_by_incident_ids( all_incident_ids, limit=min(max(len(all_incident_ids) * _REMEDIATION_HISTORY_LIMIT, 100), 5_000), ) summaries: dict[UUID, dict[str, Any]] = {} for run in runs: incident_ids = incident_ids_by_run.get(run.run_id, []) items: list[dict[str, Any]] = [] legacy_mcp_records: list[dict[str, Any]] = [] errors: list[dict[str, str]] = [] for incident_id in incident_ids: items.extend(histories_by_incident.get(incident_id, [])) legacy_mcp_records.extend(legacy_mcp_by_incident.get(incident_id, [])) if incident_id in errors_by_incident: errors.append(errors_by_incident[incident_id]) summaries[run.run_id] = _run_remediation_list_summary( run=run, incident_ids=incident_ids, items=items, legacy_mcp_records=legacy_mcp_records, errors=errors, ) return summaries def _timeline_sort_key(item: dict[str, Any], fallback_ts: Any) -> str: """Normalize mixed DB datetime / ISO string timestamps for timeline sorting.""" value = item.get("ts") or fallback_ts if hasattr(value, "isoformat"): return value.isoformat() return str(value or "") def _summarize_run_remediation_by_work_item( items: list[dict[str, Any]], ) -> list[dict[str, Any]]: summary: dict[str, dict[str, Any]] = {} for item in items: key = str(item.get("work_item_id") or item.get("incident_id") or item.get("id")) if key not in summary: summary[key] = { "work_item_id": item.get("work_item_id"), "incident_id": item.get("incident_id"), "count": 0, "latest_at": item.get("created_at"), "latest_preview": item.get("verification_result_preview"), "latest_mode": item.get("mode"), "latest_route": _route_label_from_remediation(item), } summary[key]["count"] += 1 return list(summary.values()) async def _fetch_run_remediation_history( incident_ids: list[str], *, limit: int = _REMEDIATION_HISTORY_LIMIT, ) -> dict[str, Any]: """Fetch durable ADR-100 remediation dry-run evidence linked to run incidents.""" if not incident_ids: return { "schema_version": "awooop_run_remediation_evidence_v1", "source": "alert_operation_log", "incident_ids": [], "total": 0, "limit": limit, "items": [], "by_work_item": [], "errors": [], } from src.services.adr100_remediation_service import Adr100RemediationService service = Adr100RemediationService(record_history=False) items: list[dict[str, Any]] = [] errors: list[dict[str, str]] = [] for incident_id in incident_ids: try: history = await service.history(limit=limit, incident_id=incident_id) items.extend( item for item in history.get("items", []) if isinstance(item, dict) ) except Exception as exc: logger.warning( "run_remediation_history_fetch_failed", incident_id=incident_id, error=str(exc), ) errors.append({"incident_id": incident_id, "error": str(exc)}) items.sort(key=lambda item: str(item.get("created_at") or ""), reverse=True) visible_items = items[:limit] return { "schema_version": "awooop_run_remediation_evidence_v1", "source": "alert_operation_log", "incident_ids": incident_ids, "total": len(items), "limit": limit, "items": visible_items, "by_work_item": _summarize_run_remediation_by_work_item(visible_items), "errors": errors, } def _legacy_mcp_record(row: MCPAuditLog) -> dict[str, Any]: return { "id": row.id, "session_id": row.session_id, "flywheel_node": row.flywheel_node, "mcp_server": row.mcp_server, "tool_name": row.tool_name, "duration_ms": row.duration_ms, "success": row.success, "error_message": row.error_message, "incident_id": row.incident_id, "agent_role": row.agent_role, "created_at": row.created_at, } async def _fetch_legacy_mcp_by_incident_ids( incident_ids: list[str], *, limit: int, ) -> dict[str, list[dict[str, Any]]]: """Fetch legacy/self-built MCP rows for list evidence summaries.""" if not incident_ids: return {} async with get_db_context("awoooi") as db: result = await db.execute( select(MCPAuditLog) .where(MCPAuditLog.incident_id.in_(incident_ids)) .order_by(MCPAuditLog.created_at.desc()) .limit(limit) ) rows = list(result.scalars().all()) by_incident: dict[str, list[dict[str, Any]]] = defaultdict(list) for row in rows: if row.incident_id: by_incident[row.incident_id].append(_legacy_mcp_record(row)) return dict(by_incident) async def _fetch_run_legacy_mcp_history( incident_ids: list[str], *, limit: int = _MAX_TIMELINE_ITEMS, ) -> dict[str, Any]: """Fetch legacy/self-built MCP audit rows linked through incident ids.""" if not incident_ids: return { "schema_version": "awooop_run_legacy_mcp_evidence_v1", "source": "mcp_audit_log", "incident_ids": [], "total": 0, "limit": limit, "records": [], "summary": _summarize_mcp([]), } async with get_db_context("awoooi") as db: result = await db.execute( select(MCPAuditLog) .where(MCPAuditLog.incident_id.in_(incident_ids)) .order_by(MCPAuditLog.created_at.desc()) .limit(limit) ) rows = list(result.scalars().all()) records = [_legacy_mcp_record(row) for row in rows] return { "schema_version": "awooop_run_legacy_mcp_evidence_v1", "source": "mcp_audit_log", "incident_ids": incident_ids, "total": len(records), "limit": limit, "records": records, "summary": _summarize_mcp(records), } async def get_run_detail( run_id: str, project_id: str | None = None, ) -> dict[str, Any]: """取得單一 Run 的處置脈絡,供 AwoooP Run detail / Timeline 顯示。""" try: run_uuid = uuid.UUID(run_id) except ValueError as exc: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"run_id 格式錯誤: {exc}", ) from exc async with get_db_context(project_id or "awoooi") as db: run_stmt = select(AwoooPRunState).where(AwoooPRunState.run_id == run_uuid) if project_id is not None: run_stmt = run_stmt.where(AwoooPRunState.project_id == project_id) run_result = await db.execute(run_stmt) run = run_result.scalar_one_or_none() if run is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"run {run_id!r} 不存在", ) steps_result = await db.execute( select(AwoooPRunStepJournal) .where(AwoooPRunStepJournal.run_id == run_uuid) .order_by(AwoooPRunStepJournal.step_seq.asc()) .limit(_MAX_TIMELINE_ITEMS) ) steps = list(steps_result.scalars().all()) inbound_where = [AwoooPConversationEvent.run_id == run_uuid] if run.trigger_ref: try: trigger_event_uuid = uuid.UUID(run.trigger_ref) inbound_where.append(AwoooPConversationEvent.event_id == trigger_event_uuid) except ValueError: inbound_where.append( AwoooPConversationEvent.provider_event_id == run.trigger_ref ) inbound_result = await db.execute( select(AwoooPConversationEvent) .where(sa_or(*inbound_where)) .order_by(AwoooPConversationEvent.received_at.asc()) .limit(_MAX_TIMELINE_ITEMS) ) inbound_events = list(inbound_result.scalars().all()) outbound_result = await db.execute( select(AwoooPOutboundMessage) .where(AwoooPOutboundMessage.run_id == run_uuid) .order_by(AwoooPOutboundMessage.queued_at.asc()) .limit(_MAX_TIMELINE_ITEMS) ) outbound_messages = list(outbound_result.scalars().all()) mcp_result = await db.execute( select(AwoooPMcpGatewayAudit) .where(AwoooPMcpGatewayAudit.run_id == run_uuid) .order_by(AwoooPMcpGatewayAudit.created_at.asc()) .limit(_MAX_TIMELINE_ITEMS) ) mcp_calls = list(mcp_result.scalars().all()) run_payload = { "run_id": run.run_id, "project_id": run.project_id, "agent_id": run.agent_id, "state": run.state, "is_shadow": run.is_shadow, "trace_id": run.trace_id, "trigger_type": run.trigger_type, "trigger_ref": run.trigger_ref, "cost_usd": run.cost_usd, "step_count": run.step_count, "attempt_count": run.attempt_count, "max_attempts": run.max_attempts, "error_code": run.error_code, "error_detail": run.error_detail, "created_at": run.created_at, "started_at": run.started_at, "completed_at": run.completed_at, "timeout_at": run.timeout_at, "heartbeat_at": run.heartbeat_at, } step_items = [ { "step_id": row.step_id, "step_seq": row.step_seq, "tool_name": row.tool_name, "result_status": row.result_status, "was_blocked": row.was_blocked, "block_reason": row.block_reason, "error_code": row.error_code, "latency_ms": row.latency_ms, "created_at": row.created_at, "completed_at": row.completed_at, } for row in steps ] inbound_items = [ { "event_id": row.event_id, "channel_type": row.channel_type, "provider_event_id": row.provider_event_id, "content_preview": row.content_preview, "is_duplicate": row.is_duplicate, "received_at": row.received_at, } for row in inbound_events ] outbound_items = [] for row in outbound_messages: callback_reply = _outbound_callback_reply(row.source_envelope) outbound_items.append({ "message_id": row.message_id, "channel_type": row.channel_type, "message_type": row.message_type, "content_preview": row.content_preview, "send_status": row.send_status, "send_error": row.send_error, "provider_message_id": row.provider_message_id, "queued_at": row.queued_at, "sent_at": row.sent_at, "triggered_by_state": row.triggered_by_state, "callback_reply": callback_reply, }) def _mcp_item(row: AwoooPMcpGatewayAudit) -> dict[str, Any]: gate_result = row.gate_result if isinstance(row.gate_result, dict) else {} return { "call_id": row.call_id, "agent_id": row.agent_id, "tool_name": row.tool_name, "result_status": row.result_status, "block_gate": row.block_gate, "block_reason": row.block_reason, "latency_ms": row.latency_ms, "created_at": row.created_at, "required_scope": gate_result.get("required_scope"), "policy_enforced": gate_result.get("policy_enforced"), "is_shadow": gate_result.get("is_shadow"), "gate_result": gate_result, } mcp_items = [_mcp_item(row) for row in mcp_calls] mcp_gateway_summary = _summarize_gateway_mcp([ _mcp_gateway_summary_row(row) for row in mcp_calls ]) incident_ids = _collect_run_incident_ids( run=run, inbound_events=inbound_events, outbound_messages=outbound_messages, ) legacy_mcp_history = await _fetch_run_legacy_mcp_history(incident_ids) remediation_history = await _fetch_run_remediation_history(incident_ids) awooop_status_chain = await _fetch_awooop_status_chain( incident_ids=incident_ids, project_id=run.project_id, remediation_history=remediation_history, ) timeline: list[dict[str, Any]] = [ _timeline_item( ts=run.created_at, kind="run", title="Run 建立", status=run.state, summary=f"{run.trigger_type or 'unknown'} → {run.agent_id}", metadata={"trace_id": run.trace_id, "trigger_ref": run.trigger_ref}, ) ] if run.started_at: timeline.append( _timeline_item( ts=run.started_at, kind="run", title="Run 開始執行", status="running", summary=run.worker_id, ) ) for row in inbound_events: timeline.append( _timeline_item( ts=row.received_at, kind="inbound", title=f"{row.channel_type} 入站事件", status="duplicate" if row.is_duplicate else "received", summary=row.content_preview, metadata={"provider_event_id": row.provider_event_id}, ) ) for row in steps: is_approval_step = row.tool_name.startswith("operator_console.") timeline.append( _timeline_item( ts=row.completed_at or row.created_at, kind="approval" if is_approval_step else "step", title=_approval_step_title(row.tool_name, row.step_seq), status=row.result_status, summary=row.block_reason or row.error_code, metadata={ "was_blocked": row.was_blocked, "latency_ms": row.latency_ms, }, ) ) for row in mcp_calls: gate_result = row.gate_result if isinstance(row.gate_result, dict) else {} scope = gate_result.get("required_scope") policy_enforced = gate_result.get("policy_enforced") summary = row.block_reason if summary is None: summary = ( f"agent={row.agent_id or 'unknown'}" f" scope={scope or 'unknown'}" f" policy_enforced={policy_enforced}" ) timeline.append( _timeline_item( ts=row.created_at, kind="mcp", title=f"MCP: {row.tool_name}", status=row.result_status, summary=summary, metadata={ "agent_id": row.agent_id, "block_gate": row.block_gate, "required_scope": scope, "policy_enforced": policy_enforced, "latency_ms": row.latency_ms, }, ) ) for record in legacy_mcp_history.get("records", []): if not isinstance(record, dict): continue tool_route = "/".join( part for part in ( str(record.get("mcp_server") or ""), str(record.get("tool_name") or ""), ) if part ) or "unknown" timeline.append( _timeline_item( ts=record.get("created_at"), kind="mcp", title=f"Legacy MCP: {tool_route}", status=_legacy_mcp_timeline_status(record), summary=_legacy_mcp_timeline_summary(record), metadata={ "incident_id": record.get("incident_id"), "agent_role": record.get("agent_role"), "flywheel_node": record.get("flywheel_node"), "history_source": "mcp_audit_log", }, ) ) for item in remediation_history.get("items", []): if not isinstance(item, dict): continue timeline.append( _timeline_item( ts=item.get("created_at"), kind="remediation", title="ADR-100 補救試跑", status=_remediation_timeline_status(item), summary=_remediation_timeline_summary(item), metadata={ "incident_id": item.get("incident_id"), "work_item_id": item.get("work_item_id"), "mcp_route": _route_label_from_remediation(item), "writes_incident_state": item.get("writes_incident_state"), "writes_auto_repair_result": item.get("writes_auto_repair_result"), "history_source": "alert_operation_log", }, ) ) for row in outbound_messages: callback_reply = _outbound_callback_reply(row.source_envelope) timeline.append( _timeline_item( ts=row.sent_at or row.queued_at, kind="outbound", title=_outbound_timeline_title( row.channel_type, row.message_type, row.content_preview, callback_reply, ), status=_outbound_timeline_status(row.send_status, callback_reply), summary=_outbound_timeline_summary( content_preview=row.content_preview, send_error=row.send_error, callback_reply=callback_reply, ), metadata=_outbound_timeline_metadata(row, callback_reply), ) ) if run.completed_at: timeline.append( _timeline_item( ts=run.completed_at, kind="run", title="Run 結束", status=run.state, summary=run.error_detail or run.error_code, ) ) timeline = sorted( timeline, key=lambda item: _timeline_sort_key(item, run.created_at), )[:_MAX_TIMELINE_ITEMS] return { "run": run_payload, "steps": step_items, "inbound_events": inbound_items, "outbound_messages": outbound_items, "mcp_calls": mcp_items, "mcp_gateway": mcp_gateway_summary, "mcp_legacy": legacy_mcp_history, "remediation_history": remediation_history, "awooop_status_chain": awooop_status_chain, "timeline": timeline, "counts": { "steps": len(step_items), "inbound_events": len(inbound_items), "outbound_messages": len(outbound_items), "mcp_calls": len(mcp_items), "legacy_mcp_calls": legacy_mcp_history.get("total", 0), "remediation_history": remediation_history.get("total", 0), "timeline": len(timeline), }, } # ============================================================================= # Channel Events # ============================================================================= async def list_recent_channel_events( *, project_id: str | None, channel_type: str | None, provider_prefix: str | None, limit: int, ) -> dict[str, Any]: """列出最近 channel events,供 Operator Console 顯示收斂/鏡像脈絡。""" safe_limit = max(1, min(limit, _MAX_EVENTS)) async with get_db_context("awoooi") as db: stmt = select(AwoooPConversationEvent).order_by( AwoooPConversationEvent.received_at.desc() ) if project_id is not None: stmt = stmt.where(AwoooPConversationEvent.project_id == project_id) if channel_type is not None: stmt = stmt.where(AwoooPConversationEvent.channel_type == channel_type) if provider_prefix is not None: stmt = stmt.where( AwoooPConversationEvent.provider_event_id.like( f"{provider_prefix}%" ) ) result = await db.execute(stmt.limit(safe_limit)) rows = list(result.scalars().all()) events = [ { "event_id": r.event_id, "project_id": r.project_id, "channel_type": r.channel_type, "provider_event_id": r.provider_event_id, "channel_chat_id": r.channel_chat_id, "content_preview": r.content_preview, "is_duplicate": r.is_duplicate, "received_at": r.received_at, } for r in rows ] return {"events": events, "total": len(events), "limit": safe_limit} # ============================================================================= # Approvals # ============================================================================= async def list_approvals( project_id: str | None, run_id: str | None = None, remediation_status: str | None = None, ) -> dict[str, Any]: """列出 waiting_approval runs,可依 project_id / run_id / remediation_status 篩選。""" _validate_remediation_status_filter(remediation_status) run_uuid: UUID | None = None if run_id: try: run_uuid = uuid.UUID(run_id) except ValueError as exc: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"run_id 格式錯誤: {exc}", ) from exc async with get_db_context("awoooi") as db: stmt = ( select(AwoooPRunState) .where(AwoooPRunState.state == "waiting_approval") .order_by(AwoooPRunState.created_at.asc()) ) if project_id is not None: stmt = stmt.where(AwoooPRunState.project_id == project_id) if run_uuid is not None: stmt = stmt.where(AwoooPRunState.run_id == run_uuid) count_stmt = select(func.count()).select_from(stmt.subquery()) total_result = await db.execute(count_stmt) total = total_result.scalar_one() result = await db.execute(stmt) rows = list(result.scalars().all()) inbound_by_run, outbound_by_run = await _load_run_message_context(db, rows) remediation_summaries = await _build_run_remediation_summaries( runs=rows, inbound_by_run=inbound_by_run, outbound_by_run=outbound_by_run, ) if remediation_status: rows = [ row for row in rows if _remediation_summary_matches_status( remediation_summaries.get(row.run_id), remediation_status, ) ] total = len(rows) status_chain_cache: dict[tuple[str, tuple[str, ...]], dict[str, Any]] = {} items = [] for r in rows: summary = remediation_summaries.get(r.run_id) summary_incident_ids = summary.get("incident_ids") if isinstance(summary, dict) else [] incident_ids = [ str(incident_id) for incident_id in summary_incident_ids if isinstance(incident_id, str) and incident_id ] cache_key = (r.project_id, tuple(incident_ids)) status_chain = status_chain_cache.get(cache_key) if status_chain is None: status_chain = await get_awooop_status_chain( project_id=r.project_id, incident_ids=incident_ids, ) status_chain_cache[cache_key] = status_chain items.append({ "run_id": r.run_id, "project_id": r.project_id, "agent_id": r.agent_id, "created_at": r.created_at, "timeout_at": r.timeout_at, "remediation_summary": summary, "awooop_status_chain": status_chain, }) return {"approvals": items, "total": total, "items": items} async def decide_approval( run_id: str, project_id: str, decision: str, approver_id: str, reason: str | None, ) -> dict[str, Any]: """核准或拒絕一個待審核的 run(ADR-116 Gate 5)。""" try: run_uuid = uuid.UUID(run_id) except ValueError as exc: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"run_id 格式錯誤: {exc}", ) from exc async with get_db_context(project_id) as db: result = await db.execute( select(AwoooPRunState).where( AwoooPRunState.run_id == run_uuid, AwoooPRunState.project_id == project_id, ) ) run = result.scalar_one_or_none() if run is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"run {run_id!r} 不存在或非此 project 所有", ) if run.state != "waiting_approval": raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=f"run {run_id!r} 目前狀態為 {run.state!r},無法審核(需為 waiting_approval)", ) approval_token_jti: str | None = None new_state: str if decision == "approve": token = issue_approval_token( project_id=project_id, run_id=run_id, tool_name="operator_console_approve", approver_id=approver_id, ) try: await record_approval( project_id=project_id, run_id=run_id, tool_name="operator_console_approve", approver_id=approver_id, token=token, ) except Exception as exc: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"核准記錄失敗: {exc}", ) from exc await transition(run_uuid, project_id, "running") new_state = "running" await _record_approval_decision_step( run_id=run_uuid, project_id=project_id, decision=decision, approver_id=approver_id, reason=reason, ) import base64 import json as _json try: p_b64 = token.split(".")[1] padding = 4 - len(p_b64) % 4 if padding != 4: p_b64 += "=" * padding payload = _json.loads(base64.urlsafe_b64decode(p_b64)) approval_token_jti = payload.get("jti") except Exception: approval_token_jti = None else: await transition( run_uuid, project_id, "cancelled", error_code="E-APPR-REJECTED", error_detail=f"operator 拒絕: approver={approver_id!r}, reason={reason!r}", ) new_state = "cancelled" await _record_approval_decision_step( run_id=run_uuid, project_id=project_id, decision=decision, approver_id=approver_id, reason=reason, ) try: await write_audit( project_id=project_id, action=f"run.approval.{decision}", resource_type="run", resource_id=run_id, details={ "approver_id": approver_id, "decision": decision, "reason": reason, "new_state": new_state, }, run_id=run_id, ) except Exception as exc: logger.warning("approval_audit_write_failed", run_id=run_id, error=str(exc)) return { "run_id": run_id, "decision": decision, "new_state": new_state, "approval_token_jti": approval_token_jti, } async def _record_approval_decision_step( *, run_id: UUID, project_id: str, decision: str, approver_id: str, reason: str | None, ) -> None: """把 Operator Console 的人工審批決策寫進 Run Step Journal。 這是治理與可觀測節點,不是執行閘門本身;寫入失敗不可反向阻擋 已完成的 approve / reject,否則會讓人工決策狀態機產生二次故障。 """ tool_name = ( "operator_console.approve" if decision == "approve" else "operator_console.reject" ) summary = _truncate_step_summary( f"approver={approver_id}; decision={decision}; reason={reason or '-'}" ) try: async with get_db_context(project_id) as db: max_result = await db.execute( select(func.coalesce(func.max(AwoooPRunStepJournal.step_seq), 0)).where( AwoooPRunStepJournal.run_id == run_id, AwoooPRunStepJournal.project_id == project_id, ) ) step_seq = int(max_result.scalar_one()) + 1 db.add( AwoooPRunStepJournal( run_id=run_id, project_id=project_id, step_seq=step_seq, tool_name=tool_name, result_status="success", block_reason=summary, completed_at=_utc_now_naive(), ) ) await db.execute( update(AwoooPRunState) .where( AwoooPRunState.run_id == run_id, AwoooPRunState.project_id == project_id, ) .values(step_count=AwoooPRunState.step_count + 1) ) logger.info( "approval_decision_step_recorded", run_id=str(run_id), project_id=project_id, decision=decision, approver_id=approver_id, ) except Exception as exc: logger.warning( "approval_decision_step_record_failed", run_id=str(run_id), project_id=project_id, decision=decision, error=str(exc), )