diff --git a/apps/api/src/main.py b/apps/api/src/main.py
index ad5cb00f..43874d10 100644
--- a/apps/api/src/main.py
+++ b/apps/api/src/main.py
@@ -115,6 +115,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
else:
logger.info("telegram_polling_disabled", reason="OpenClaw 是唯一 Polling 實例")
+ # Phase 6.5: Telegram 心跳監控 (防止沉默盲點)
+ # - 每 30 分鐘發送心跳,證明告警鏈路正常
+ # - 超過 2 小時沒訊息則告警
+ if settings.OPENCLAW_TG_BOT_TOKEN:
+ await telegram_gw.start_heartbeat_monitor(
+ heartbeat_interval_minutes=30,
+ silence_threshold_hours=2,
+ )
+ logger.info("telegram_heartbeat_monitor_started")
+
# Phase 6.1: 啟動 Signal Worker (Redis Streams Consumer)
# 統帥鐵律: Event Bus 解耦告警接收與處理
await init_signal_worker()
diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py
index 37137e6b..785e0f77 100644
--- a/apps/api/src/services/telegram_gateway.py
+++ b/apps/api/src/services/telegram_gateway.py
@@ -229,6 +229,10 @@ class TelegramGateway:
self._polling_active = False
self._polling_task: asyncio.Task | None = None
self._last_update_id = 0
+ # Phase 6.5: 心跳監控 (防止沉默盲點)
+ self._last_message_time: datetime | None = None
+ self._heartbeat_task: asyncio.Task | None = None
+ self._heartbeat_active = False
async def initialize(self) -> bool:
"""初始化 Gateway"""
@@ -798,7 +802,7 @@ class TelegramGateway:
logger.warning("telegram_long_polling_already_running")
return
- # 🔴 關鍵: 先刪除任何現有 Webhook,否則 getUpdates 會 409 Conflict
+ # 先刪除 Webhook,確保可以使用 getUpdates
await self._delete_webhook()
self._polling_active = True
@@ -883,21 +887,15 @@ class TelegramGateway:
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
- # 409 Conflict: 可能是 HTTP/2 連線狀態污染
- # 重建 HTTP client 以清除殘留連線
+ # 409 Conflict: 另一個 getUpdates session 殘留
+ # 可能是舊 pod/host 的連線未清除,等待後重試
logger.warning(
"telegram_polling_conflict",
status=409,
- message="偵測到 409 衝突,重建 HTTP client...",
+ message="409 衝突,等待殘留 session 過期...",
)
- if self._http_client:
- await self._http_client.aclose()
- self._http_client = httpx.AsyncClient(
- timeout=30.0,
- headers={"Content-Type": "application/json"},
- http2=False, # 強制 HTTP/1.1 避免連線複用問題
- )
- await asyncio.sleep(LONG_POLLING_RETRY_DELAY)
+ # 等待 LONG_POLLING_TIMEOUT 讓舊 session 自然過期
+ await asyncio.sleep(LONG_POLLING_TIMEOUT + 5)
else:
logger.error("telegram_polling_http_error", status=e.response.status_code)
await asyncio.sleep(LONG_POLLING_RETRY_DELAY)
@@ -1089,6 +1087,131 @@ class TelegramGateway:
)
+# =============================================================================
+# Phase 6.5: 心跳監控方法
+# =============================================================================
+
+ async def send_heartbeat(self) -> bool:
+ """
+ 發送心跳訊息 (系統狀態摘要)
+
+ 每 30 分鐘執行一次,證明告警鏈路正常運作
+ """
+ try:
+ if not self._initialized:
+ await self.initialize()
+
+ now = datetime.now(timezone.utc)
+
+ # 計算上次訊息時間
+ last_msg_ago = "N/A"
+ if self._last_message_time:
+ delta = now - self._last_message_time
+ minutes = int(delta.total_seconds() // 60)
+ if minutes < 60:
+ last_msg_ago = f"{minutes} 分鐘前"
+ else:
+ hours = minutes // 60
+ last_msg_ago = f"{hours} 小時前"
+
+ # 心跳訊息
+ text = f"""💓 AWOOOI 心跳
+━━━━━━━━━━━━━━━━
+⏰ {now.strftime('%Y-%m-%d %H:%M:%S')} UTC
+📡 告警鏈路: ✅ 正常
+📨 上次訊息: {last_msg_ago}
+━━━━━━━━━━━━━━━━
+每 30 分鐘自動發送"""
+
+ await self.send_notification(text)
+ self._last_message_time = now
+
+ logger.info("telegram_heartbeat_sent")
+ return True
+
+ except Exception as e:
+ logger.error("telegram_heartbeat_failed", error=str(e))
+ return False
+
+ async def start_heartbeat_monitor(
+ self,
+ heartbeat_interval_minutes: int = 30,
+ silence_threshold_hours: int = 2,
+ ) -> None:
+ """
+ 啟動心跳監控背景任務
+
+ Args:
+ heartbeat_interval_minutes: 心跳間隔 (預設 30 分鐘)
+ silence_threshold_hours: 沉默告警閾值 (預設 2 小時)
+ """
+ if self._heartbeat_active:
+ logger.warning("telegram_heartbeat_already_running")
+ return
+
+ self._heartbeat_active = True
+ self._heartbeat_task = asyncio.create_task(
+ self._heartbeat_loop(heartbeat_interval_minutes, silence_threshold_hours)
+ )
+
+ logger.info(
+ "telegram_heartbeat_monitor_started",
+ interval_minutes=heartbeat_interval_minutes,
+ silence_threshold_hours=silence_threshold_hours,
+ )
+
+ async def _heartbeat_loop(
+ self,
+ interval_minutes: int,
+ silence_hours: int,
+ ) -> None:
+ """心跳監控循環"""
+ interval_seconds = interval_minutes * 60
+ silence_seconds = silence_hours * 3600
+
+ while self._heartbeat_active:
+ try:
+ # 1. 檢查沉默告警
+ if self._last_message_time:
+ silence_duration = (datetime.now(timezone.utc) - self._last_message_time).total_seconds()
+ if silence_duration > silence_seconds:
+ # 發送沉默告警
+ hours = int(silence_duration // 3600)
+ await self.send_notification(
+ f"⚠️ 沉默告警\n\n"
+ f"Telegram 已 {hours} 小時沒有收到任何訊息!\n"
+ f"請檢查告警鏈路是否正常運作。"
+ )
+ logger.warning(
+ "telegram_silence_alert",
+ silence_hours=hours,
+ )
+
+ # 2. 發送心跳
+ await self.send_heartbeat()
+
+ # 3. 等待下一次
+ await asyncio.sleep(interval_seconds)
+
+ except asyncio.CancelledError:
+ break
+ except Exception as e:
+ logger.error("telegram_heartbeat_loop_error", error=str(e))
+ await asyncio.sleep(60) # 錯誤後等待 1 分鐘重試
+
+ async def stop_heartbeat_monitor(self) -> None:
+ """停止心跳監控"""
+ self._heartbeat_active = False
+ if self._heartbeat_task and not self._heartbeat_task.done():
+ self._heartbeat_task.cancel()
+ try:
+ await self._heartbeat_task
+ except asyncio.CancelledError:
+ pass
+ self._heartbeat_task = None
+ logger.info("telegram_heartbeat_monitor_stopped")
+
+
# =============================================================================
# Singleton
# =============================================================================