diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index 3ddc5476..45c1a771 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -196,6 +196,113 @@ def _format_remediation_history_lines(history: dict[str, object] | None) -> list ] +def _latest_remediation_history_item(history: dict[str, object] | None) -> dict[str, object]: + if not history: + return {} + items = history.get("items") if isinstance(history.get("items"), list) else [] + latest = items[0] if items and isinstance(items[0], dict) else {} + return latest + + +def _remediation_evidence_state(history: dict[str, object] | None) -> str: + """Classify ADR-100 dry-run evidence for first-screen Telegram status.""" + if not history: + return "" + + total = int(history.get("total") or 0) + if total <= 0: + if history.get("status") == "fetch_failed": + return "fetch_failed" + return "missing" + + latest = _latest_remediation_history_item(history) + if latest.get("writes_incident_state") or latest.get("writes_auto_repair_result"): + return "write_observed" + if latest.get("allowed") is False or latest.get("success") is False: + return "blocked" + + safety_level = str(latest.get("safety_level") or "").lower() + required_scope = str(latest.get("required_scope") or "").lower() + if safety_level == "read_only" or required_scope == "read": + return "read_only" + return "observed" + + +def _format_remediation_evidence_block(history: dict[str, object] | None) -> str: + """Compact ADR-100 dry-run evidence for the root ACTION REQUIRED card.""" + if not history: + return "" + + state = _remediation_evidence_state(history) + total = int(history.get("total") or 0) + if total <= 0: + label = ( + "補救試跑查詢失敗" + if state == "fetch_failed" + else "尚無補救試跑紀錄" + ) + return f"🧪 AI 證據:{html.escape(label)}\n" + + latest = _latest_remediation_history_item(history) + agent = latest.get("agent_id") or "unknown_agent" + tool = latest.get("tool_name") or "current_state" + scope = latest.get("required_scope") or "unknown" + writes_incident = str(bool(latest.get("writes_incident_state"))).lower() + writes_auto_repair = str(bool(latest.get("writes_auto_repair_result"))).lower() + route = f"{agent}/{tool}/{scope}" + preview = latest.get("verification_result_preview") or "unknown" + + state_label = { + "read_only": "只讀試跑", + "write_observed": "有寫入旗標", + "blocked": "試跑受阻", + "observed": "已試跑", + }.get(state, "已試跑") + + return ( + f"🧪 AI 證據:{state_label} {total} 次 | " + f"{html.escape(str(route))}\n" + f"├ preview:{html.escape(str(preview))}\n" + f"└ 寫入:incident {writes_incident} / " + f"auto-repair {writes_auto_repair}\n" + ) + + +async def _fetch_remediation_summary_for_card( + *, + approval_id: str, + incident_id: str, +) -> dict[str, object] | None: + if not incident_id: + return None + try: + from src.services.adr100_remediation_service import ( + get_adr100_remediation_service, + ) + + history = await asyncio.wait_for( + get_adr100_remediation_service().history( + limit=5, + incident_id=incident_id, + ), + timeout=2.5, + ) + return history if isinstance(history, dict) else None + except Exception as remediation_exc: + logger.debug( + "telegram_approval_card_remediation_history_fetch_failed", + approval_id=approval_id, + incident_id=incident_id, + error=str(remediation_exc), + ) + return { + "schema_version": "adr100_remediation_history_v1", + "total": 0, + "items": [], + "status": "fetch_failed", + } + + def _telegram_html_chunks(lines: list[str], limit: int = _TELEGRAM_HTML_CHUNK_LIMIT) -> list[str]: """Split HTML messages by complete lines so Telegram does not receive broken tags.""" chunks: list[str] = [] @@ -476,6 +583,7 @@ class TelegramMessage: playbook_name: str = "" # 匹配到的 Playbook 名稱(空字串=規則匹配) automation_state: str = "" # diagnosis_collected_manual_required / diagnosis_failed_manual_required automation_quality: dict | None = None # truth-chain automation_quality 摘要 + remediation_summary: dict | None = None # ADR-100 read-only dry-run history 摘要 # ========================================================================== # Phase 22: Nemotron 協作欄位 (ADR-044) @@ -541,6 +649,7 @@ class TelegramMessage: auto_repair_records = int(facts.get("auto_repair_execution_records") or 0) operation_records = int(facts.get("automation_operation_records") or 0) verification = str(facts.get("verification_result") or "missing") + remediation_state = _remediation_evidence_state(self.remediation_summary) if verdict == "auto_repaired_verified": return "✅ 已驗證自動修復完成" @@ -548,6 +657,14 @@ class TelegramMessage: if verification == "missing": return "🔄 已自動執行,等待驗證證據" return f"🔄 已自動執行,驗證結果:{verification}" + if remediation_state == "read_only": + return "🔎 AI 已完成只讀補救試跑,等待人工審批" + if remediation_state == "write_observed": + return "⚠️ AI 補救試跑出現寫入旗標,需人工確認" + if remediation_state == "blocked": + return "🔴 AI 補救試跑受阻,需人工處理" + if remediation_state == "fetch_failed": + return "🟠 AI 補救試跑證據查詢失敗,需人工判斷" if verdict == "approval_required": return "🟡 需要審批後才會執行" if verdict.startswith("manual_required"): @@ -668,6 +785,9 @@ class TelegramMessage: f"└ 判定:{html.escape(verdict)} — {html.escape(conclusion)}\n" ) + def _format_remediation_evidence_block(self) -> str: + return _format_remediation_evidence_block(self.remediation_summary) + def format(self) -> str: """ 格式化為 SOUL.md 規範的訊息 (含 AI 仲裁 + SignOz) @@ -810,6 +930,7 @@ class TelegramMessage: playbook_line = "" if self.playbook_name: playbook_line = f"📖 Playbook:{html.escape(self.playbook_name)}\n" + remediation_evidence_block = self._format_remediation_evidence_block() flow_progress_block = self._format_flow_progress_block() automation_block = self._format_automation_block() @@ -821,7 +942,7 @@ class TelegramMessage: f"🎯 資源:{safe_resource}\n" f"{category_line}" f"🧭 處置狀態:{safe_automation_summary}\n" - f"\n" + f"{remediation_evidence_block}\n" f"{flow_progress_block}\n" f"{automation_block}" f"\n" @@ -968,6 +1089,7 @@ class TelegramMessage: playbook_line = "" if self.playbook_name: playbook_line = f"📖 {html.escape(self.playbook_name)}\n" + remediation_evidence_block = self._format_remediation_evidence_block() flow_progress_block = self._format_flow_progress_block() # 組裝訊息 @@ -976,6 +1098,7 @@ class TelegramMessage: f"{safe_resource}\n" f"{category_line}" f"\n" + f"{remediation_evidence_block}" f"{flow_progress_block}\n" f"{self._format_automation_block()}\n" f"{conf_line}\n" @@ -2384,6 +2507,10 @@ class TelegramGateway: ) automation_quality: dict | None = None + remediation_summary = await _fetch_remediation_summary_for_card( + approval_id=approval_id, + incident_id=incident_id, + ) if incident_id: try: from src.services.awooop_truth_chain_service import fetch_truth_chain @@ -2444,6 +2571,7 @@ class TelegramGateway: playbook_name=playbook_name, automation_state=automation_state, automation_quality=automation_quality, + remediation_summary=remediation_summary, ) # 格式化訊息 — Phase 22: 如果 Nemotron 啟用,使用雙軌格式 @@ -2615,6 +2743,10 @@ class TelegramGateway: trace_url=signoz_trace_url, ) + remediation_summary = await _fetch_remediation_summary_for_card( + approval_id=approval_id, + incident_id=incident_id, + ) message = TelegramMessage( status_emoji=emoji, risk_level=risk_level.upper(), @@ -2639,6 +2771,7 @@ class TelegramGateway: nemotron_tools=nemotron_tools, nemotron_validation=nemotron_validation, nemotron_latency_ms=nemotron_latency_ms, + remediation_summary=remediation_summary, ) text = message.format_with_nemotron() if nemotron_enabled else message.format() @@ -5378,23 +5511,6 @@ class TelegramGateway: error=str(truth_exc), ) - try: - from src.services.adr100_remediation_service import ( - get_adr100_remediation_service, - ) - - remediation_history = await get_adr100_remediation_service().history( - limit=5, - incident_id=incident_id, - ) - lines += _format_remediation_history_lines(remediation_history) - except Exception as remediation_exc: - logger.warning( - "incident_history_remediation_summary_failed", - incident_id=incident_id, - error=str(remediation_exc), - ) - await self._send_html_line_message( lines, failure_context="incident_detail", diff --git a/apps/api/tests/test_telegram_message_templates.py b/apps/api/tests/test_telegram_message_templates.py index cba9f955..bbd92d68 100644 --- a/apps/api/tests/test_telegram_message_templates.py +++ b/apps/api/tests/test_telegram_message_templates.py @@ -198,6 +198,68 @@ class TestTelegramMessageFormat: assert "AI 診斷工具失敗,需人工排查" in result assert "AI 自動修復失敗" not in result + def test_telegram_message_surfaces_read_only_remediation_evidence(self): + """主告警卡必須顯示 ADR-100 只讀補救試跑與寫入旗標。""" + msg = TelegramMessage( + status_emoji="⚠️", + risk_level="MEDIUM", + resource_name="awoooi-auto-repair-canary", + root_cause="post approval verification drift", + suggested_action="kubectl rollout restart deployment/awoooi-api", + estimated_downtime="~30s", + approval_id="INC-20260513-79ED5E", + confidence=0.82, + remediation_summary={ + "schema_version": "adr100_remediation_history_v1", + "total": 3, + "items": [ + { + "mode": "replay", + "allowed": True, + "success": True, + "safety_level": "read_only", + "verification_result_preview": "degraded", + "agent_id": "auto_repair_executor", + "tool_name": "ssh_diagnose", + "required_scope": "read", + "writes_incident_state": False, + "writes_auto_repair_result": False, + } + ], + }, + ) + + result = msg.format() + + assert "AI 已完成只讀補救試跑,等待人工審批" in result + assert "AI 證據" in result + assert "只讀試跑 3 次" in result + assert "auto_repair_executor/ssh_diagnose/read" in result + assert "incident false" in result + assert "auto-repair false" in result + + def test_telegram_message_surfaces_missing_remediation_evidence(self): + """沒有補救試跑紀錄時,主卡要明確說明,不讓值班者猜。""" + msg = TelegramMessage( + status_emoji="ℹ️", + risk_level="LOW", + resource_name="awoooi-auto-repair-canary", + root_cause="safe canary", + suggested_action="NO_ACTION", + estimated_downtime="unknown", + approval_id="INC-20260513-EMPTY", + remediation_summary={ + "schema_version": "adr100_remediation_history_v1", + "total": 0, + "items": [], + }, + ) + + result = msg.format() + + assert "AI 證據" in result + assert "尚無補救試跑紀錄" in result + def test_telegram_message_with_token_cost(self): """測試含 Token/Cost 的訊息""" msg = TelegramMessage( @@ -309,6 +371,64 @@ async def test_append_incident_update_suppresses_duplicate_failure_across_incide ] +@pytest.mark.asyncio +async def test_send_approval_card_includes_remediation_summary(monkeypatch): + """send_approval_card 要把 durable 補救試跑歷史帶進 Telegram 主卡。""" + sent_requests = [] + gateway = TelegramGateway() + + async def fake_send_request(method, payload): + sent_requests.append((method, payload)) + return {"ok": True, "result": {}} + + async def fake_keyboard(**kwargs): + return {"inline_keyboard": []} + + async def fake_remediation_summary(**kwargs): + assert kwargs["incident_id"] == "INC-20260513-79ED5E" + return { + "schema_version": "adr100_remediation_history_v1", + "total": 1, + "items": [ + { + "allowed": True, + "success": True, + "safety_level": "read_only", + "verification_result_preview": "degraded", + "agent_id": "auto_repair_executor", + "tool_name": "ssh_diagnose", + "required_scope": "read", + "writes_incident_state": False, + "writes_auto_repair_result": False, + } + ], + } + + monkeypatch.setattr(TelegramGateway, "alert_chat_id", property(lambda _self: "chat")) + monkeypatch.setattr(gateway, "_send_request", fake_send_request) + monkeypatch.setattr(gateway, "_build_inline_keyboard", fake_keyboard) + monkeypatch.setattr( + telegram_gateway_module, + "_fetch_remediation_summary_for_card", + fake_remediation_summary, + ) + + await gateway.send_approval_card( + approval_id="approval-1", + risk_level="medium", + resource_name="awoooi-auto-repair-canary", + root_cause="post approval verification drift", + suggested_action="kubectl rollout restart deployment/awoooi-api", + incident_id="INC-20260513-79ED5E", + confidence=0.82, + ) + + assert sent_requests + text = sent_requests[0][1]["text"] + assert "AI 已完成只讀補救試跑,等待人工審批" in text + assert "auto_repair_executor/ssh_diagnose/read" in text + + def test_outbound_message_type_inference(): """Legacy Telegram 訊息 mirror 到 Channel Hub 時,必須映射成有限分類。"""