diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index 1e9a4312..e6153882 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -66,6 +66,7 @@ POLLING_LEADER_WATCH = 30 # seconds - 非 Leader Pod 每 30s 嘗試接管 logger = structlog.get_logger(__name__) _TELEGRAM_BOT_URL_RE = re.compile(r"(api\.telegram\.org/bot)[^/\s]+") +_INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b") def _sanitize_telegram_error(text: str) -> str: @@ -81,6 +82,21 @@ def _is_noisy_failure_update(status_line: str) -> bool: ) +def _extract_incident_id_from_text(text: str) -> str | None: + """從 Telegram 出站文字擷取 Incident ID。""" + match = _INCIDENT_ID_RE.search(text or "") + return match.group(0) if match else None + + +def _has_reply_context(payload: dict) -> bool: + return "reply_to_message_id" in payload or "reply_parameters" in payload + + +def _is_root_action_required_card(text: str) -> bool: + """主告警卡片本身不自動 reply,避免把新主卡接到舊訊息下。""" + return "ACTION REQUIRED" in text and "AI 自動化鏈路" in text + + def _legacy_outbound_run_id(chat_id: str, provider_message_id: str) -> UUID: """Legacy Telegram 發送尚未有 run_id 時,產生穩定 soft run_id 供 Channel Hub 串接。""" return uuid5(NAMESPACE_URL, f"awoooi:legacy-telegram:{chat_id}:{provider_message_id}") @@ -88,7 +104,9 @@ def _legacy_outbound_run_id(chat_id: str, provider_message_id: str) -> UUID: def _infer_outbound_message_type(text: str, payload: dict) -> str: """將既有 Telegram 訊息映射成 AwoooP outbound_message 的有限分類。""" - if "reply_to_message_id" in payload: + if "RUNBOOK REVIEW" in text or "待審核" in text: + return "approval_request" + if _has_reply_context(payload): if "失敗" in text or "錯誤" in text or "FAILED" in text: return "error" return "final" @@ -1505,6 +1523,8 @@ class TelegramGateway: if not self._http_client: raise TelegramGatewayError("HTTP client not initialized") + await self._attach_incident_thread_reply(method, payload) + url = f"{self.api_url}/{method}" # OTEL Span: telegram.api.{method} @@ -1567,6 +1587,58 @@ class TelegramGateway: ) raise TelegramGatewayError(safe_error) from None + async def _attach_incident_thread_reply(self, method: str, payload: dict) -> None: + """將同一 Incident 的後續 Telegram 訊息接回原告警卡片。 + + 2026-05-07 Codex — 主卡 `tg_msg:{incident_id}` 已存在時,後續 + Runbook / escalation / 執行摘要不要再形成頂層訊息洪水,而是以 + Telegram reply thread 延續;主 ACTION REQUIRED 卡與已顯式 reply 的 + payload 不改動。 + """ + if method != "sendMessage" or _has_reply_context(payload): + return + + text = str(payload.get("text") or "") + if not text or _is_root_action_required_card(text): + return + + incident_id = _extract_incident_id_from_text(text) + if not incident_id: + return + + try: + stored = await get_redis().get(f"tg_msg:{incident_id}") + except Exception as exc: + logger.debug( + "telegram_incident_thread_lookup_failed", + incident_id=incident_id, + error=str(exc), + ) + return + + if not stored: + return + + try: + message_id = int(stored) + except (TypeError, ValueError): + logger.debug( + "telegram_incident_thread_invalid_message_id", + incident_id=incident_id, + stored=str(stored), + ) + return + + payload["reply_parameters"] = { + "message_id": message_id, + "allow_sending_without_reply": True, + } + logger.info( + "telegram_incident_thread_reply_attached", + incident_id=incident_id, + message_id=message_id, + ) + async def _mirror_outbound_message( self, *, diff --git a/apps/api/tests/test_telegram_message_templates.py b/apps/api/tests/test_telegram_message_templates.py index 590d5ba1..12eb8f25 100644 --- a/apps/api/tests/test_telegram_message_templates.py +++ b/apps/api/tests/test_telegram_message_templates.py @@ -251,10 +251,79 @@ def test_outbound_message_type_inference(): assert ( telegram_gateway_module._infer_outbound_message_type( "✅ 執行成功", - {"reply_to_message_id": 123}, + {"reply_parameters": {"message_id": 123}}, ) == "final" ) + assert ( + telegram_gateway_module._infer_outbound_message_type( + "📄 RUNBOOK REVIEW|待審核", + {"reply_parameters": {"message_id": 123}}, + ) + == "approval_request" + ) + + +def test_extract_incident_id_from_text(): + """Telegram 出站文字可擷取 Incident ID,供後續訊息接回原告警卡片。""" + + assert ( + telegram_gateway_module._extract_incident_id_from_text( + "Incident: INC-20260506-E54736\nEntry ID: abc" + ) + == "INC-20260506-E54736" + ) + assert telegram_gateway_module._extract_incident_id_from_text("沒有事件編號") is None + + +@pytest.mark.asyncio +async def test_attach_incident_thread_reply(monkeypatch): + """非主卡、含 Incident ID 的後續訊息,應自動 reply 原告警 message_id。""" + + class FakeRedis: + async def get(self, key): + assert key == "tg_msg:INC-20260506-E54736" + return "9876" + + gateway = TelegramGateway() + payload = { + "chat_id": gateway.alert_chat_id, + "text": "📄 RUNBOOK REVIEW|待審核\nIncident: INC-20260506-E54736", + } + + monkeypatch.setattr(telegram_gateway_module, "get_redis", lambda: FakeRedis()) + + await gateway._attach_incident_thread_reply("sendMessage", payload) + + assert payload["reply_parameters"] == { + "message_id": 9876, + "allow_sending_without_reply": True, + } + + +@pytest.mark.asyncio +async def test_attach_incident_thread_skips_root_action_card(monkeypatch): + """ACTION REQUIRED 主告警卡不應自動 reply 舊訊息。""" + + class FakeRedis: + async def get(self, key): # pragma: no cover - 不應被呼叫 + raise AssertionError(key) + + gateway = TelegramGateway() + payload = { + "chat_id": gateway.alert_chat_id, + "text": ( + "ℹ️ ACTION REQUIRED | 低風險\n" + "📋 INC-20260506-E54736\n" + "🤖 AI 自動化鏈路" + ), + } + + monkeypatch.setattr(telegram_gateway_module, "get_redis", lambda: FakeRedis()) + + await gateway._attach_incident_thread_reply("sendMessage", payload) + + assert "reply_parameters" not in payload def test_legacy_outbound_run_id_is_stable():