diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index a3e04ea6..a230419b 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -912,11 +912,243 @@ def _callback_reply_km_stale_completion_snapshot( return snapshot +def _callback_reply_source_ref_values(envelope: object, key: str) -> list[str]: + if not isinstance(envelope, dict): + return [] + source_refs = envelope.get("source_refs") + if not isinstance(source_refs, dict): + return [] + raw_values = source_refs.get(key) + if isinstance(raw_values, list): + return [str(item) for item in raw_values if str(item or "").strip()] + if raw_values not in (None, ""): + return [str(raw_values)] + return [] + + +def _append_unique_text(values: list[str], value: object) -> None: + text = str(value or "").strip() + if text and text not in values: + values.append(text) + + +def _callback_reply_awooop_mcp_snapshot( + truth_chain: dict[str, object] | None, +) -> dict[str, object]: + mcp = truth_chain.get("mcp") if isinstance(truth_chain, dict) else {} + if not isinstance(mcp, dict): + mcp = {} + gateway = mcp.get("awooop_gateway") if isinstance(mcp.get("awooop_gateway"), dict) else {} + legacy = mcp.get("legacy") if isinstance(mcp.get("legacy"), dict) else {} + + top_tools: list[dict[str, object]] = [] + seen_tools: set[str] = set() + for source, summary in (("gateway", gateway), ("legacy", legacy)): + by_tool = summary.get("by_tool") if isinstance(summary, dict) else [] + if not isinstance(by_tool, list): + continue + for item in by_tool: + if not isinstance(item, dict): + continue + tool_name = str(item.get("tool_name") or "unknown").strip() or "unknown" + key = f"{source}:{tool_name}" + if key in seen_tools: + continue + seen_tools.add(key) + top_tools.append({ + "source": source, + "tool_name": tool_name, + "total": ( + _safe_int(item.get("total")) + or _safe_int(item.get("success")) + + _safe_int(item.get("failed")) + + _safe_int(item.get("blocked")) + ), + "success": _safe_int(item.get("success")), + "failed": _safe_int(item.get("failed")), + "blocked": _safe_int(item.get("blocked")), + "last_error": str(item.get("last_error") or "")[:240] or None, + }) + if len(top_tools) >= 5: + break + if len(top_tools) >= 5: + break + + return { + "gateway": { + "total": _safe_int(gateway.get("total")), + "success": _safe_int(gateway.get("success")), + "failed": _safe_int(gateway.get("failed")), + "blocked": _safe_int(gateway.get("blocked")), + "first_class_total": _safe_int(gateway.get("first_class_total")), + "legacy_bridge_total": _safe_int(gateway.get("legacy_bridge_total")), + "policy_enforced_total": _safe_int(gateway.get("policy_enforced_total")), + "stage": str(gateway.get("stage") or ""), + "stage_status": str(gateway.get("stage_status") or ""), + }, + "legacy": { + "total": _safe_int(legacy.get("total")), + "success": _safe_int(legacy.get("success")), + "failed": _safe_int(legacy.get("failed")), + }, + "top_tools": top_tools, + } + + +def _callback_reply_awooop_execution_snapshot( + truth_chain: dict[str, object] | None, +) -> dict[str, object]: + execution = truth_chain.get("execution") if isinstance(truth_chain, dict) else {} + if not isinstance(execution, dict): + execution = {} + ops = execution.get("automation_operation_log") + if not isinstance(ops, list): + ops = [] + latest_op = ops[0] if ops and isinstance(ops[0], dict) else {} + + playbook_ids: list[str] = [] + playbook_paths: list[str] = [] + for row in ops: + if not isinstance(row, dict): + continue + for key in ("matched_playbook_id", "input_playbook_id", "output_playbook_id"): + _append_unique_text(playbook_ids, row.get(key)) + for key in ( + "input_playbook_path", + "output_playbook_path", + "input_ansible_playbook_path", + "output_ansible_playbook_path", + ): + _append_unique_text(playbook_paths, row.get(key)) + + ansible = execution.get("ansible") if isinstance(execution.get("ansible"), dict) else {} + ansible_records = ansible.get("records") if isinstance(ansible.get("records"), list) else [] + latest_ansible = ( + ansible_records[0] + if ansible_records and isinstance(ansible_records[0], dict) + else {} + ) + candidate_catalog = ( + ansible.get("candidate_catalog") + if isinstance(ansible.get("candidate_catalog"), dict) + else {} + ) + candidates = ( + candidate_catalog.get("candidates") + if isinstance(candidate_catalog.get("candidates"), list) + else [] + ) + + return { + "operation_total": len(ops), + "latest_operation_type": latest_op.get("operation_type"), + "latest_status": latest_op.get("status"), + "latest_actor": latest_op.get("actor"), + "latest_action": ( + latest_op.get("input_action") or latest_op.get("output_action") + ), + "latest_executor": ( + latest_op.get("input_executor") + or latest_op.get("output_executor") + or latest_op.get("input_execution_backend") + or latest_op.get("output_execution_backend") + ), + "playbook_ids": playbook_ids[:5], + "playbook_paths": playbook_paths[:5], + "ansible": { + "considered": bool(ansible.get("considered")), + "record_total": len(ansible_records), + "candidate_count": len(candidates), + "not_used_reason": ansible.get("not_used_reason"), + "latest_operation_type": latest_ansible.get("operation_type"), + "latest_status": latest_ansible.get("status"), + "latest_playbook_path": latest_ansible.get("playbook_path"), + "latest_check_mode": latest_ansible.get("check_mode"), + "candidate_playbooks": [ + { + "catalog_id": item.get("catalog_id"), + "playbook_path": item.get("playbook_path"), + "risk_level": item.get("risk_level"), + "match_score": item.get("match_score"), + } + for item in candidates[:3] + if isinstance(item, dict) + ], + }, + } + + +def _callback_reply_awooop_source_snapshot( + truth_chain: dict[str, object] | None, + source_correlation: dict[str, object] | None, +) -> dict[str, object]: + channel = truth_chain.get("channel") if isinstance(truth_chain, dict) else {} + if not isinstance(channel, dict): + channel = {} + inbound_events = channel.get("inbound_events") + outbound_messages = channel.get("outbound_messages") + if not isinstance(inbound_events, list): + inbound_events = [] + if not isinstance(outbound_messages, list): + outbound_messages = [] + + source_refs: dict[str, list[str]] = { + "alert_ids": [], + "sentry_issue_ids": [], + "signoz_alerts": [], + "fingerprints": [], + "incident_ids": [], + } + inbound_channels: list[str] = [] + for row in inbound_events: + if not isinstance(row, dict): + continue + _append_unique_text(inbound_channels, row.get("channel_type")) + envelope = row.get("source_envelope") + for key in source_refs: + for value in _callback_reply_source_ref_values(envelope, key): + _append_unique_text(source_refs[key], value) + + latest_inbound = ( + inbound_events[0] + if inbound_events and isinstance(inbound_events[0], dict) + else {} + ) + latest_outbound = ( + outbound_messages[0] + if outbound_messages and isinstance(outbound_messages[0], dict) + else {} + ) + source_snapshot: dict[str, object] = { + "inbound_total": len(inbound_events), + "outbound_total": len(outbound_messages), + "inbound_channels": inbound_channels[:5], + "refs": {key: values[:5] for key, values in source_refs.items()}, + "latest_inbound": { + "channel_type": latest_inbound.get("channel_type"), + "provider_event_id": latest_inbound.get("provider_event_id"), + "content_type": latest_inbound.get("content_type"), + "is_duplicate": latest_inbound.get("is_duplicate"), + "received_at": latest_inbound.get("received_at"), + }, + "latest_outbound": { + "channel_type": latest_outbound.get("channel_type"), + "message_type": latest_outbound.get("message_type"), + "send_status": latest_outbound.get("send_status"), + "sent_at": latest_outbound.get("sent_at"), + }, + } + if isinstance(source_correlation, dict): + source_snapshot["correlation"] = source_correlation + return source_snapshot + + def _callback_reply_awooop_status_chain_snapshot( *, incident_id: str | None, truth_chain: dict[str, object] | None = None, remediation_history: dict[str, object] | None = None, + source_correlation: dict[str, object] | None = None, ) -> dict[str, object] | None: """Persist a compact AwoooP status-chain snapshot with callback evidence.""" if not incident_id or (not truth_chain and not remediation_history): @@ -1034,6 +1266,12 @@ def _callback_reply_awooop_status_chain_snapshot( "incident": latest.get("writes_incident_state"), "auto_repair": latest.get("writes_auto_repair_result"), }, + "mcp": _callback_reply_awooop_mcp_snapshot(truth_chain), + "execution": _callback_reply_awooop_execution_snapshot(truth_chain), + "source_refs": _callback_reply_awooop_source_snapshot( + truth_chain, + source_correlation, + ), } @@ -6256,6 +6494,7 @@ class TelegramGateway: project_id=project_id, ) truth_chain: dict[str, object] | None = None + source_correlation: dict[str, object] | None = None try: from src.services.awooop_truth_chain_service import fetch_truth_chain @@ -6289,10 +6528,28 @@ class TelegramGateway: lines += _format_km_stale_completion_lines(km_completion_summary) lines += _format_remediation_history_lines(remediation_history) + if truth_chain is not None: + try: + from src.services.platform_operator_service import ( + _fetch_source_correlation_summary, + ) + + source_correlation = await _fetch_source_correlation_summary( + project_id=project_id, + incident_ids=[incident_id], + ) + except Exception as source_exc: + logger.warning( + "incident_detail_source_correlation_summary_failed", + incident_id=incident_id, + error=str(source_exc), + ) + awooop_status_chain_snapshot = _callback_reply_awooop_status_chain_snapshot( incident_id=incident_id, truth_chain=truth_chain, remediation_history=remediation_history, + source_correlation=source_correlation, ) await self._send_html_line_message( lines, @@ -6421,6 +6678,7 @@ class TelegramGateway: project_id=project_id, ) truth_chain: dict[str, object] | None = None + source_correlation: dict[str, object] | None = None try: from src.services.awooop_truth_chain_service import fetch_truth_chain @@ -6449,10 +6707,28 @@ class TelegramGateway: lines += _format_km_stale_completion_lines(km_completion_summary) lines += _format_remediation_history_lines(remediation_history) + if truth_chain is not None: + try: + from src.services.platform_operator_service import ( + _fetch_source_correlation_summary, + ) + + source_correlation = await _fetch_source_correlation_summary( + project_id=project_id, + incident_ids=[incident_id], + ) + except Exception as source_exc: + logger.warning( + "incident_history_source_correlation_summary_failed", + incident_id=incident_id, + error=str(source_exc), + ) + awooop_status_chain_snapshot = _callback_reply_awooop_status_chain_snapshot( incident_id=incident_id, truth_chain=truth_chain, remediation_history=remediation_history, + source_correlation=source_correlation, ) await self._send_html_line_message( lines, diff --git a/apps/api/tests/test_telegram_message_templates.py b/apps/api/tests/test_telegram_message_templates.py index 511652f9..bc62c6ad 100644 --- a/apps/api/tests/test_telegram_message_templates.py +++ b/apps/api/tests/test_telegram_message_templates.py @@ -199,6 +199,80 @@ def test_callback_reply_awooop_status_chain_snapshot_marks_manual_gate() -> None }, "blockers": [], }, + "mcp": { + "awooop_gateway": { + "total": 2, + "success": 1, + "failed": 1, + "blocked": 0, + "first_class_total": 2, + "legacy_bridge_total": 0, + "policy_enforced_total": 2, + "by_tool": [ + { + "tool_name": "prometheus.query", + "total": 2, + "success": 1, + "failed": 1, + "blocked": 0, + } + ], + }, + "legacy": { + "total": 1, + "success": 1, + "failed": 0, + }, + }, + "execution": { + "automation_operation_log": [ + { + "operation_type": "playbook_previewed", + "status": "waiting_approval", + "actor": "Hermes", + "input_executor": "ansible", + "input_playbook_id": "pb-host-restart", + "input_playbook_path": "infra/ansible/playbooks/188-ai-web.yml", + } + ], + "ansible": { + "considered": True, + "records": [ + { + "operation_type": "ansible_check_mode_previewed", + "status": "waiting_approval", + "playbook_path": "infra/ansible/playbooks/188-ai-web.yml", + "check_mode": True, + } + ], + "candidate_catalog": { + "candidates": [ + { + "catalog_id": "ansible:188-ai-web", + "playbook_path": "infra/ansible/playbooks/188-ai-web.yml", + "risk_level": "medium", + "match_score": 3, + } + ] + }, + }, + }, + "channel": { + "inbound_events": [ + { + "channel_type": "sentry", + "provider_event_id": "sentry:ISSUE-1", + "source_envelope": { + "source_refs": { + "sentry_issue_ids": ["ISSUE-1"], + "signoz_alerts": ["signoz:alert-1"], + "fingerprints": ["fp-1"], + } + }, + } + ], + "outbound_messages": [], + }, }, remediation_history={ "total": 1, @@ -214,6 +288,38 @@ def test_callback_reply_awooop_status_chain_snapshot_marks_manual_gate() -> None } ], }, + source_correlation={ + "schema_version": "source_provider_correlation_v1", + "status": "candidate_found", + "verification_status": "candidate_only", + "direct_ref_total": 0, + "candidate_total": 1, + "applied_link_total": 0, + "provider_event_total": 1, + "latest_applied_link_at": None, + "providers": { + "sentry": { + "direct_ref_total": 0, + "candidate_total": 1, + "applied_link_total": 0, + "latest_event_at": "2026-05-20T00:00:00Z", + }, + "signoz": { + "direct_ref_total": 0, + "candidate_total": 0, + "applied_link_total": 0, + "latest_event_at": None, + }, + }, + "top_candidates": [ + { + "provider": "sentry", + "provider_event_id": "sentry:ISSUE-1", + "score": 2, + "link_state": "candidate", + } + ], + }, ) assert snapshot is not None @@ -227,6 +333,16 @@ def test_callback_reply_awooop_status_chain_snapshot_marks_manual_gate() -> None assert snapshot["evidence"]["latest_route"] == "investigator/ssh_diagnose/read" assert snapshot["writes"]["incident"] is False assert snapshot["blockers"] == ["pending_human_approval"] + assert snapshot["mcp"]["gateway"]["policy_enforced_total"] == 2 + assert snapshot["mcp"]["top_tools"][0]["tool_name"] == "prometheus.query" + assert snapshot["execution"]["latest_executor"] == "ansible" + assert snapshot["execution"]["playbook_paths"] == [ + "infra/ansible/playbooks/188-ai-web.yml" + ] + assert snapshot["execution"]["ansible"]["candidate_count"] == 1 + assert snapshot["source_refs"]["refs"]["sentry_issue_ids"] == ["ISSUE-1"] + assert snapshot["source_refs"]["refs"]["signoz_alerts"] == ["signoz:alert-1"] + assert snapshot["source_refs"]["correlation"]["status"] == "candidate_found" def test_km_stale_completion_lines_show_owner_review_queue_state() -> None: diff --git a/apps/web/messages/en.json b/apps/web/messages/en.json index fcd3b0a9..b1572d78 100644 --- a/apps/web/messages/en.json +++ b/apps/web/messages/en.json @@ -2797,6 +2797,9 @@ "previewEmpty": "No preview", "openRun": "Open Run", "awooopSnapshotTitle": "Callback-time AwoooP Status Chain", + "awooopSnapshotMcp": "MCP: total {total} / success {success} / failed {failed} / blocked {blocked}; top {topTool}", + "awooopSnapshotExecution": "Execution: executor {executor}; playbook {playbook}; Ansible considered={ansible} / candidates={candidates}", + "awooopSnapshotSource": "Source: {status}; direct {direct} / candidate {candidate} / applied {applied}; {providers}", "kmCompletion": { "title": "KM Owner Review", "status": "Status: {status}", diff --git a/apps/web/messages/zh-TW.json b/apps/web/messages/zh-TW.json index c080cbb4..070415e9 100644 --- a/apps/web/messages/zh-TW.json +++ b/apps/web/messages/zh-TW.json @@ -2798,6 +2798,9 @@ "previewEmpty": "無摘要", "openRun": "開啟 Run", "awooopSnapshotTitle": "Callback 當下 AwoooP 狀態鏈", + "awooopSnapshotMcp": "MCP:total {total} / success {success} / failed {failed} / blocked {blocked};top {topTool}", + "awooopSnapshotExecution": "Execution:executor {executor};playbook {playbook};Ansible considered={ansible} / candidates={candidates}", + "awooopSnapshotSource": "Source:{status};direct {direct} / candidate {candidate} / applied {applied};{providers}", "kmCompletion": { "title": "KM Owner Review", "status": "狀態:{status}", diff --git a/apps/web/src/app/[locale]/awooop/runs/page.tsx b/apps/web/src/app/[locale]/awooop/runs/page.tsx index ecebd49b..ba3c9a36 100644 --- a/apps/web/src/app/[locale]/awooop/runs/page.tsx +++ b/apps/web/src/app/[locale]/awooop/runs/page.tsx @@ -1643,6 +1643,69 @@ function CallbackKmCompletionSnapshot({ ); } +function CallbackAwoooPStatusChainSnapshot({ + chain, +}: { + chain?: AwoooPStatusChain | null; +}) { + const t = useTranslations("awooop.callbackReply.events"); + if (!chain) return null; + + const mcpGateway = chain.mcp?.gateway ?? {}; + const topTool = chain.mcp?.top_tools?.[0]; + const execution = chain.execution ?? {}; + const ansible = execution.ansible ?? {}; + const correlation = chain.source_refs?.correlation; + const providerSummary = ["sentry", "signoz"].map((provider) => { + const item = correlation?.providers?.[provider]; + return `${provider} ${item?.direct_ref_total ?? 0}/${item?.candidate_total ?? 0}/${item?.applied_link_total ?? 0}`; + }).join(" · "); + + return ( +
+ {t("awooopSnapshotTitle")} +
++ {t("awooopSnapshotMcp", { + total: mcpGateway.total ?? 0, + success: mcpGateway.success ?? 0, + failed: mcpGateway.failed ?? 0, + blocked: mcpGateway.blocked ?? 0, + topTool: topTool?.tool_name ?? "--", + })} +
++ {t("awooopSnapshotExecution", { + executor: execution.latest_executor ?? "--", + playbook: execution.playbook_paths?.[0] ?? "--", + ansible: String(ansible.considered ?? false), + candidates: ansible.candidate_count ?? 0, + })} +
++ {t("awooopSnapshotSource", { + status: correlation?.status ?? "--", + direct: correlation?.direct_ref_total ?? 0, + candidate: correlation?.candidate_total ?? 0, + applied: correlation?.applied_link_total ?? 0, + providers: providerSummary, + })} +
+- {t("awooopSnapshotTitle")} -
-