feat(telegram): 新增心跳監控防止沉默盲點

功能:
- send_heartbeat(): 每 30 分鐘發送系統狀態
- start_heartbeat_monitor(): 背景心跳監控
- 沉默告警: 超過 2 小時沒訊息自動告警

目的:
- 避免 Telegram 長時間沒訊息被當成「系統穩定」
- 主動驗證告警鏈路是否正常運作

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-23 23:26:08 +08:00
parent eca3759fde
commit 7d8eb26ebe
2 changed files with 145 additions and 12 deletions

View File

@@ -115,6 +115,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
else:
logger.info("telegram_polling_disabled", reason="OpenClaw 是唯一 Polling 實例")
# Phase 6.5: Telegram 心跳監控 (防止沉默盲點)
# - 每 30 分鐘發送心跳,證明告警鏈路正常
# - 超過 2 小時沒訊息則告警
if settings.OPENCLAW_TG_BOT_TOKEN:
await telegram_gw.start_heartbeat_monitor(
heartbeat_interval_minutes=30,
silence_threshold_hours=2,
)
logger.info("telegram_heartbeat_monitor_started")
# Phase 6.1: 啟動 Signal Worker (Redis Streams Consumer)
# 統帥鐵律: Event Bus 解耦告警接收與處理
await init_signal_worker()

View File

@@ -229,6 +229,10 @@ class TelegramGateway:
self._polling_active = False
self._polling_task: asyncio.Task | None = None
self._last_update_id = 0
# Phase 6.5: 心跳監控 (防止沉默盲點)
self._last_message_time: datetime | None = None
self._heartbeat_task: asyncio.Task | None = None
self._heartbeat_active = False
async def initialize(self) -> bool:
"""初始化 Gateway"""
@@ -798,7 +802,7 @@ class TelegramGateway:
logger.warning("telegram_long_polling_already_running")
return
# 🔴 關鍵: 先刪除任何現有 Webhook否則 getUpdates 會 409 Conflict
# 先刪除 Webhook確保可以使用 getUpdates
await self._delete_webhook()
self._polling_active = True
@@ -883,21 +887,15 @@ class TelegramGateway:
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
# 409 Conflict: 可能是 HTTP/2 連線狀態污染
# 重建 HTTP client 以清除殘留連線
# 409 Conflict: 另一個 getUpdates session 殘留
# 可能是舊 pod/host 的連線未清除,等待後重試
logger.warning(
"telegram_polling_conflict",
status=409,
message="偵測到 409 衝突,重建 HTTP client...",
message="409 衝突,等待殘留 session 過期...",
)
if self._http_client:
await self._http_client.aclose()
self._http_client = httpx.AsyncClient(
timeout=30.0,
headers={"Content-Type": "application/json"},
http2=False, # 強制 HTTP/1.1 避免連線複用問題
)
await asyncio.sleep(LONG_POLLING_RETRY_DELAY)
# 等待 LONG_POLLING_TIMEOUT 讓舊 session 自然過期
await asyncio.sleep(LONG_POLLING_TIMEOUT + 5)
else:
logger.error("telegram_polling_http_error", status=e.response.status_code)
await asyncio.sleep(LONG_POLLING_RETRY_DELAY)
@@ -1089,6 +1087,131 @@ class TelegramGateway:
)
# =============================================================================
# Phase 6.5: 心跳監控方法
# =============================================================================
async def send_heartbeat(self) -> bool:
"""
發送心跳訊息 (系統狀態摘要)
每 30 分鐘執行一次,證明告警鏈路正常運作
"""
try:
if not self._initialized:
await self.initialize()
now = datetime.now(timezone.utc)
# 計算上次訊息時間
last_msg_ago = "N/A"
if self._last_message_time:
delta = now - self._last_message_time
minutes = int(delta.total_seconds() // 60)
if minutes < 60:
last_msg_ago = f"{minutes} 分鐘前"
else:
hours = minutes // 60
last_msg_ago = f"{hours} 小時前"
# 心跳訊息
text = f"""💓 <b>AWOOOI 心跳</b>
━━━━━━━━━━━━━━━━
{now.strftime('%Y-%m-%d %H:%M:%S')} UTC
📡 告警鏈路: ✅ 正常
📨 上次訊息: {last_msg_ago}
━━━━━━━━━━━━━━━━
<i>每 30 分鐘自動發送</i>"""
await self.send_notification(text)
self._last_message_time = now
logger.info("telegram_heartbeat_sent")
return True
except Exception as e:
logger.error("telegram_heartbeat_failed", error=str(e))
return False
async def start_heartbeat_monitor(
self,
heartbeat_interval_minutes: int = 30,
silence_threshold_hours: int = 2,
) -> None:
"""
啟動心跳監控背景任務
Args:
heartbeat_interval_minutes: 心跳間隔 (預設 30 分鐘)
silence_threshold_hours: 沉默告警閾值 (預設 2 小時)
"""
if self._heartbeat_active:
logger.warning("telegram_heartbeat_already_running")
return
self._heartbeat_active = True
self._heartbeat_task = asyncio.create_task(
self._heartbeat_loop(heartbeat_interval_minutes, silence_threshold_hours)
)
logger.info(
"telegram_heartbeat_monitor_started",
interval_minutes=heartbeat_interval_minutes,
silence_threshold_hours=silence_threshold_hours,
)
async def _heartbeat_loop(
self,
interval_minutes: int,
silence_hours: int,
) -> None:
"""心跳監控循環"""
interval_seconds = interval_minutes * 60
silence_seconds = silence_hours * 3600
while self._heartbeat_active:
try:
# 1. 檢查沉默告警
if self._last_message_time:
silence_duration = (datetime.now(timezone.utc) - self._last_message_time).total_seconds()
if silence_duration > silence_seconds:
# 發送沉默告警
hours = int(silence_duration // 3600)
await self.send_notification(
f"⚠️ <b>沉默告警</b>\n\n"
f"Telegram 已 <b>{hours} 小時</b>沒有收到任何訊息!\n"
f"請檢查告警鏈路是否正常運作。"
)
logger.warning(
"telegram_silence_alert",
silence_hours=hours,
)
# 2. 發送心跳
await self.send_heartbeat()
# 3. 等待下一次
await asyncio.sleep(interval_seconds)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("telegram_heartbeat_loop_error", error=str(e))
await asyncio.sleep(60) # 錯誤後等待 1 分鐘重試
async def stop_heartbeat_monitor(self) -> None:
"""停止心跳監控"""
self._heartbeat_active = False
if self._heartbeat_task and not self._heartbeat_task.done():
self._heartbeat_task.cancel()
try:
await self._heartbeat_task
except asyncio.CancelledError:
pass
self._heartbeat_task = None
logger.info("telegram_heartbeat_monitor_stopped")
# =============================================================================
# Singleton
# =============================================================================