From 7d8eb26ebe0e47864cffede4ecf48bd9b159ab38 Mon Sep 17 00:00:00 2001 From: OG T Date: Mon, 23 Mar 2026 23:26:08 +0800 Subject: [PATCH] =?UTF-8?q?feat(telegram):=20=E6=96=B0=E5=A2=9E=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E7=9B=A3=E6=8E=A7=E9=98=B2=E6=AD=A2=E6=B2=89=E9=BB=98?= =?UTF-8?q?=E7=9B=B2=E9=BB=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 功能: - send_heartbeat(): 每 30 分鐘發送系統狀態 - start_heartbeat_monitor(): 背景心跳監控 - 沉默告警: 超過 2 小時沒訊息自動告警 目的: - 避免 Telegram 長時間沒訊息被當成「系統穩定」 - 主動驗證告警鏈路是否正常運作 Co-Authored-By: Claude Opus 4.5 --- apps/api/src/main.py | 10 ++ apps/api/src/services/telegram_gateway.py | 147 ++++++++++++++++++++-- 2 files changed, 145 insertions(+), 12 deletions(-) diff --git a/apps/api/src/main.py b/apps/api/src/main.py index ad5cb00f..43874d10 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -115,6 +115,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: else: logger.info("telegram_polling_disabled", reason="OpenClaw 是唯一 Polling 實例") + # Phase 6.5: Telegram 心跳監控 (防止沉默盲點) + # - 每 30 分鐘發送心跳,證明告警鏈路正常 + # - 超過 2 小時沒訊息則告警 + if settings.OPENCLAW_TG_BOT_TOKEN: + await telegram_gw.start_heartbeat_monitor( + heartbeat_interval_minutes=30, + silence_threshold_hours=2, + ) + logger.info("telegram_heartbeat_monitor_started") + # Phase 6.1: 啟動 Signal Worker (Redis Streams Consumer) # 統帥鐵律: Event Bus 解耦告警接收與處理 await init_signal_worker() diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index 37137e6b..785e0f77 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -229,6 +229,10 @@ class TelegramGateway: self._polling_active = False self._polling_task: asyncio.Task | None = None self._last_update_id = 0 + # Phase 6.5: 心跳監控 (防止沉默盲點) + self._last_message_time: datetime | None = None + self._heartbeat_task: asyncio.Task | None = None + self._heartbeat_active = False async def initialize(self) -> bool: """初始化 Gateway""" @@ -798,7 +802,7 @@ class TelegramGateway: logger.warning("telegram_long_polling_already_running") return - # 🔴 關鍵: 先刪除任何現有 Webhook,否則 getUpdates 會 409 Conflict + # 先刪除 Webhook,確保可以使用 getUpdates await self._delete_webhook() self._polling_active = True @@ -883,21 +887,15 @@ class TelegramGateway: except httpx.HTTPStatusError as e: if e.response.status_code == 409: - # 409 Conflict: 可能是 HTTP/2 連線狀態污染 - # 重建 HTTP client 以清除殘留連線 + # 409 Conflict: 另一個 getUpdates session 殘留 + # 可能是舊 pod/host 的連線未清除,等待後重試 logger.warning( "telegram_polling_conflict", status=409, - message="偵測到 409 衝突,重建 HTTP client...", + message="409 衝突,等待殘留 session 過期...", ) - if self._http_client: - await self._http_client.aclose() - self._http_client = httpx.AsyncClient( - timeout=30.0, - headers={"Content-Type": "application/json"}, - http2=False, # 強制 HTTP/1.1 避免連線複用問題 - ) - await asyncio.sleep(LONG_POLLING_RETRY_DELAY) + # 等待 LONG_POLLING_TIMEOUT 讓舊 session 自然過期 + await asyncio.sleep(LONG_POLLING_TIMEOUT + 5) else: logger.error("telegram_polling_http_error", status=e.response.status_code) await asyncio.sleep(LONG_POLLING_RETRY_DELAY) @@ -1089,6 +1087,131 @@ class TelegramGateway: ) +# ============================================================================= +# Phase 6.5: 心跳監控方法 +# ============================================================================= + + async def send_heartbeat(self) -> bool: + """ + 發送心跳訊息 (系統狀態摘要) + + 每 30 分鐘執行一次,證明告警鏈路正常運作 + """ + try: + if not self._initialized: + await self.initialize() + + now = datetime.now(timezone.utc) + + # 計算上次訊息時間 + last_msg_ago = "N/A" + if self._last_message_time: + delta = now - self._last_message_time + minutes = int(delta.total_seconds() // 60) + if minutes < 60: + last_msg_ago = f"{minutes} 分鐘前" + else: + hours = minutes // 60 + last_msg_ago = f"{hours} 小時前" + + # 心跳訊息 + text = f"""💓 AWOOOI 心跳 +━━━━━━━━━━━━━━━━ +⏰ {now.strftime('%Y-%m-%d %H:%M:%S')} UTC +📡 告警鏈路: ✅ 正常 +📨 上次訊息: {last_msg_ago} +━━━━━━━━━━━━━━━━ +每 30 分鐘自動發送""" + + await self.send_notification(text) + self._last_message_time = now + + logger.info("telegram_heartbeat_sent") + return True + + except Exception as e: + logger.error("telegram_heartbeat_failed", error=str(e)) + return False + + async def start_heartbeat_monitor( + self, + heartbeat_interval_minutes: int = 30, + silence_threshold_hours: int = 2, + ) -> None: + """ + 啟動心跳監控背景任務 + + Args: + heartbeat_interval_minutes: 心跳間隔 (預設 30 分鐘) + silence_threshold_hours: 沉默告警閾值 (預設 2 小時) + """ + if self._heartbeat_active: + logger.warning("telegram_heartbeat_already_running") + return + + self._heartbeat_active = True + self._heartbeat_task = asyncio.create_task( + self._heartbeat_loop(heartbeat_interval_minutes, silence_threshold_hours) + ) + + logger.info( + "telegram_heartbeat_monitor_started", + interval_minutes=heartbeat_interval_minutes, + silence_threshold_hours=silence_threshold_hours, + ) + + async def _heartbeat_loop( + self, + interval_minutes: int, + silence_hours: int, + ) -> None: + """心跳監控循環""" + interval_seconds = interval_minutes * 60 + silence_seconds = silence_hours * 3600 + + while self._heartbeat_active: + try: + # 1. 檢查沉默告警 + if self._last_message_time: + silence_duration = (datetime.now(timezone.utc) - self._last_message_time).total_seconds() + if silence_duration > silence_seconds: + # 發送沉默告警 + hours = int(silence_duration // 3600) + await self.send_notification( + f"⚠️ 沉默告警\n\n" + f"Telegram 已 {hours} 小時沒有收到任何訊息!\n" + f"請檢查告警鏈路是否正常運作。" + ) + logger.warning( + "telegram_silence_alert", + silence_hours=hours, + ) + + # 2. 發送心跳 + await self.send_heartbeat() + + # 3. 等待下一次 + await asyncio.sleep(interval_seconds) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error("telegram_heartbeat_loop_error", error=str(e)) + await asyncio.sleep(60) # 錯誤後等待 1 分鐘重試 + + async def stop_heartbeat_monitor(self) -> None: + """停止心跳監控""" + self._heartbeat_active = False + if self._heartbeat_task and not self._heartbeat_task.done(): + self._heartbeat_task.cancel() + try: + await self._heartbeat_task + except asyncio.CancelledError: + pass + self._heartbeat_task = None + logger.info("telegram_heartbeat_monitor_stopped") + + # ============================================================================= # Singleton # =============================================================================