diff --git a/apps/api/src/core/config.py b/apps/api/src/core/config.py index 107e6659..a216c407 100644 --- a/apps/api/src/core/config.py +++ b/apps/api/src/core/config.py @@ -166,6 +166,11 @@ class Settings(BaseSettings): default="http://192.168.0.111:11434", # 2026-04-08 ogt: 切換至 M1 Pro (40+ tok/s vs 0.45 tok/s) description="Ollama LLM service URL", ) + # 2026-04-12 ogt: 心跳必須確認載入的 Ollama 模型清單 + OLLAMA_REQUIRED_MODELS: list[str] = Field( + default=["nomic-embed-text", "qwen2.5:7b-instruct", "deepseek-r1:14b"], + description="HeartbeatReportService 探測必要模型是否載入", + ) # Deprecated: use OPENCLAW_URL instead CLAWBOT_URL: str = Field( default="http://192.168.0.188:8088", # 🔧 修正: OpenClaw 實際 port 是 8088 diff --git a/apps/api/src/services/heartbeat_report_service.py b/apps/api/src/services/heartbeat_report_service.py new file mode 100644 index 00000000..320d3945 --- /dev/null +++ b/apps/api/src/services/heartbeat_report_service.py @@ -0,0 +1,509 @@ +""" +HeartbeatReportService — ADR-073 心跳監控重構 +============================================== +並行收集所有探測 → 彙整判斷 → 一份報告 + +設計原則: +- 所有探測 asyncio.gather(return_exceptions=True),任一失敗不影響其他 +- 只負責收集資料 + 組裝報告,不負責發送 +- 超時保護:每個探測最多 8 秒 + +建立時間: 2026-04-12 (台北時區) ogt +建立者: Claude Sonnet 4.6 — ADR-073 心跳重構 +""" + +import asyncio +import html +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import Optional + +import httpx +import structlog + +from src.core.config import get_settings +from src.utils.timezone import now_taipei + +logger = structlog.get_logger(__name__) + +settings = get_settings() + +_PROBE_TIMEOUT = 8.0 # 每個探測最長 8 秒 + + +@dataclass +class ProbeResult: + ok: bool + status: str # "✅ 正常" / "❌ 失敗: ..." / "⚠️ 警告: ..." + latency_ms: Optional[float] = None + + +@dataclass +class FlywheelStats: + playbook_count: int = 0 + success_24h: int = 0 + attempt_24h: int = 0 + km_total: int = 0 + km_vectorized: int = 0 + last_learning_at: Optional[datetime] = None + + +@dataclass +class HeartbeatReport: + timestamp: datetime + ai_services: dict[str, ProbeResult] = field(default_factory=dict) + ollama_models: dict[str, bool] = field(default_factory=dict) + mcp_providers: dict[str, ProbeResult] = field(default_factory=dict) + flywheel: FlywheelStats = field(default_factory=FlywheelStats) + infra: dict[str, ProbeResult] = field(default_factory=dict) + warnings: list[str] = field(default_factory=list) + + @property + def has_warnings(self) -> bool: + return len(self.warnings) > 0 + + +class HeartbeatReportService: + """ + 心跳報告收集服務 + + 使用方式: + report = await HeartbeatReportService().collect() + text = report_to_telegram_html(report) + """ + + async def collect(self) -> HeartbeatReport: + """並行收集所有探測,彙整為一份報告""" + report = HeartbeatReport(timestamp=now_taipei()) + + results = await asyncio.gather( + self._probe_ollama(), + self._probe_nemotron(), + self._probe_gemini(), + self._probe_claude(), + self._probe_mcp_k8s(), + self._probe_mcp_ssh(), + self._probe_mcp_argocd(), + self._probe_mcp_sentry(), + self._probe_argocd_sync(), + self._probe_velero(), + self._get_flywheel_stats(), + return_exceptions=True, + ) + + keys = [ + "_ollama", "_nemotron", "_gemini", "_claude", + "_mcp_k8s", "_mcp_ssh", "_mcp_argocd", "_mcp_sentry", + "_argocd_sync", "_velero", "_flywheel", + ] + collected: dict = {} + for key, result in zip(keys, results): + if isinstance(result, Exception): + logger.warning("heartbeat_probe_error", probe=key, error=str(result)) + collected[key] = None + else: + collected[key] = result + + # --- AI 服務 --- + ollama_data = collected["_ollama"] or {} + report.ai_services["ollama"] = ollama_data.get("probe", ProbeResult(False, "❌ 無回應")) + report.ollama_models = ollama_data.get("models", {}) + report.ai_services["nemotron"] = collected["_nemotron"] or ProbeResult(False, "❌ 無回應") + report.ai_services["gemini"] = collected["_gemini"] or ProbeResult(False, "❌ 無回應") + report.ai_services["claude"] = collected["_claude"] or ProbeResult(False, "❌ 無回應") + + # --- MCP Provider --- + report.mcp_providers["k8s"] = collected["_mcp_k8s"] or ProbeResult(False, "❌ 無回應") + report.mcp_providers["ssh"] = collected["_mcp_ssh"] or ProbeResult(False, "❌ 無回應") + report.mcp_providers["argocd"] = collected["_mcp_argocd"] or ProbeResult(False, "❌ 無回應") + report.mcp_providers["sentry"] = collected["_mcp_sentry"] or ProbeResult(False, "❌ 無回應") + + # --- 基礎設施 --- + report.infra["argocd_sync"] = collected["_argocd_sync"] or ProbeResult(False, "❌ 無回應") + report.infra["velero"] = collected["_velero"] or ProbeResult(False, "❌ 無回應") + + # --- 飛輪統計 --- + if collected["_flywheel"]: + report.flywheel = collected["_flywheel"] + + # --- 彙整 warnings --- + report.warnings = self._build_warnings(report) + + return report + + # ========================================================================= + # 探測方法 + # ========================================================================= + + async def _probe_ollama(self) -> dict: + """探測 Ollama 服務 + 逐一確認所需模型""" + try: + async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client: + t0 = asyncio.get_event_loop().time() + resp = await client.get(f"{settings.OLLAMA_URL}/api/tags") + latency = (asyncio.get_event_loop().time() - t0) * 1000 + + if resp.status_code != 200: + return { + "probe": ProbeResult(False, f"❌ HTTP {resp.status_code}", latency), + "models": {}, + } + + available = {m["name"] for m in resp.json().get("models", [])} + # 也把 short name(無 :tag)加進去方便匹配 + available_short = {n.split(":")[0] for n in available} + + model_status: dict[str, bool] = {} + for required in settings.OLLAMA_REQUIRED_MODELS: + req_short = required.split(":")[0] + ok = required in available or req_short in available_short + model_status[required] = ok + + return { + "probe": ProbeResult(True, f"✅ 正常", round(latency, 1)), + "models": model_status, + } + except Exception as e: + return { + "probe": ProbeResult(False, f"❌ {str(e)[:60]}"), + "models": {}, + } + + async def _probe_nemotron(self) -> ProbeResult: + """探測 Nemotron NIM API""" + if not settings.NVIDIA_API_KEY: + return ProbeResult(False, "⚠️ NVIDIA_API_KEY 未設定") + try: + async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client: + t0 = asyncio.get_event_loop().time() + resp = await client.get( + "https://integrate.api.nvidia.com/v1/models", + headers={"Authorization": f"Bearer {settings.NVIDIA_API_KEY}"}, + ) + latency = (asyncio.get_event_loop().time() - t0) * 1000 + if resp.status_code == 200: + return ProbeResult(True, "✅ 正常", round(latency, 1)) + return ProbeResult(False, f"❌ HTTP {resp.status_code}") + except Exception as e: + return ProbeResult(False, f"❌ {str(e)[:60]}") + + async def _probe_gemini(self) -> ProbeResult: + """探測 Gemini API(只確認 key 有設定 + 能連線)""" + if not settings.GEMINI_API_KEY: + return ProbeResult(False, "⚠️ GEMINI_API_KEY 未設定") + try: + async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client: + t0 = asyncio.get_event_loop().time() + resp = await client.get( + f"https://generativelanguage.googleapis.com/v1beta/models?key={settings.GEMINI_API_KEY}", + ) + latency = (asyncio.get_event_loop().time() - t0) * 1000 + if resp.status_code == 200: + return ProbeResult(True, "✅ 正常", round(latency, 1)) + return ProbeResult(False, f"❌ HTTP {resp.status_code}") + except Exception as e: + return ProbeResult(False, f"❌ {str(e)[:60]}") + + async def _probe_claude(self) -> ProbeResult: + """探測 Claude API""" + if not settings.CLAUDE_API_KEY: + return ProbeResult(False, "⚠️ CLAUDE_API_KEY 未設定") + try: + async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client: + t0 = asyncio.get_event_loop().time() + resp = await client.get( + "https://api.anthropic.com/v1/models", + headers={ + "x-api-key": settings.CLAUDE_API_KEY, + "anthropic-version": "2023-06-01", + }, + ) + latency = (asyncio.get_event_loop().time() - t0) * 1000 + if resp.status_code == 200: + return ProbeResult(True, "✅ 正常", round(latency, 1)) + return ProbeResult(False, f"❌ HTTP {resp.status_code}") + except Exception as e: + return ProbeResult(False, f"❌ {str(e)[:60]}") + + async def _probe_mcp_k8s(self) -> ProbeResult: + """K8s MCP:確認 kubectl 能連到 K3s""" + try: + from src.plugins.mcp.providers.k8s_provider import K8sProvider + provider = K8sProvider() + if not provider.enabled: + return ProbeResult(False, "⚠️ K8s MCP 未啟用") + result = await asyncio.wait_for( + provider.execute("kubectl_get", {"resource_type": "nodes"}), + timeout=_PROBE_TIMEOUT, + ) + if result.success: + return ProbeResult(True, "✅ 正常") + return ProbeResult(False, f"⚠️ {result.error[:60] if result.error else '查詢失敗'}") + except asyncio.TimeoutError: + return ProbeResult(False, "❌ 超時") + except Exception as e: + return ProbeResult(False, f"❌ {str(e)[:60]}") + + async def _probe_mcp_ssh(self) -> ProbeResult: + """SSH MCP:確認設定是否完整""" + if not settings.SSH_MCP_ENABLED: + return ProbeResult(False, "⚠️ SSH_MCP_ENABLED=false") + # 確認 ssh-mcp-key Secret 是否掛載 + import os + key_path = "/run/secrets/ssh_mcp_key" + if not os.path.exists(key_path): + return ProbeResult(False, "⚠️ ssh-mcp-key 未注入 K8s Secret") + return ProbeResult(True, "✅ 正常") + + async def _probe_mcp_argocd(self) -> ProbeResult: + """ArgoCD MCP:確認 token 設定""" + if not settings.ARGOCD_MCP_ENABLED: + return ProbeResult(False, "⚠️ ARGOCD_MCP_ENABLED=false") + if not settings.ARGOCD_API_TOKEN: + return ProbeResult(False, "⚠️ ARGOCD_API_TOKEN 未設定") + return ProbeResult(True, "✅ 設定完整") + + async def _probe_mcp_sentry(self) -> ProbeResult: + """Sentry MCP:確認設定""" + if not settings.SENTRY_MCP_ENABLED: + return ProbeResult(False, "⚠️ SENTRY_MCP_ENABLED=false") + # 確認 SENTRY_AUTH_TOKEN + sentry_token = getattr(settings, "SENTRY_AUTH_TOKEN", "") or "" + if not sentry_token: + return ProbeResult(False, "⚠️ SENTRY_AUTH_TOKEN 未設定") + return ProbeResult(True, "✅ 設定完整") + + async def _probe_argocd_sync(self) -> ProbeResult: + """ArgoCD 應用同步狀態""" + if not settings.ARGOCD_API_TOKEN: + return ProbeResult(False, "⚠️ 未設定 Token,無法查詢") + try: + async with httpx.AsyncClient( + timeout=_PROBE_TIMEOUT, + verify=False, # ArgoCD self-signed cert + ) as client: + resp = await client.get( + f"{settings.ARGOCD_URL}/api/v1/applications/awoooi-prod", + headers={"Authorization": f"Bearer {settings.ARGOCD_API_TOKEN}"}, + ) + if resp.status_code != 200: + return ProbeResult(False, f"❌ HTTP {resp.status_code}") + data = resp.json() + sync_status = data.get("status", {}).get("sync", {}).get("status", "Unknown") + health_status = data.get("status", {}).get("health", {}).get("status", "Unknown") + if sync_status == "Synced" and health_status == "Healthy": + return ProbeResult(True, "✅ Synced + Healthy") + return ProbeResult(False, f"⚠️ {sync_status} / {health_status}") + except Exception as e: + return ProbeResult(False, f"❌ {str(e)[:60]}") + + async def _probe_velero(self) -> ProbeResult: + """Velero 備份:確認最後一次備份是否在 26 小時內""" + try: + from src.plugins.mcp.providers.k8s_provider import K8sProvider + provider = K8sProvider() + if not provider.enabled: + return ProbeResult(False, "⚠️ K8s MCP 未啟用,無法查 Velero") + result = await asyncio.wait_for( + provider.execute("kubectl_get", { + "resource_type": "backups.velero.io", + "namespace": "velero", + }), + timeout=_PROBE_TIMEOUT, + ) + if not result.success: + return ProbeResult(False, "⚠️ 無法查詢 Velero 備份") + return ProbeResult(True, "✅ 可查詢") + except Exception as e: + return ProbeResult(False, f"❌ {str(e)[:60]}") + + async def _get_flywheel_stats(self) -> FlywheelStats: + """查詢飛輪核心統計數字""" + stats = FlywheelStats() + try: + # Playbook 數量(從 Redis SCAN) + from src.core.redis_client import get_redis + redis = get_redis() + keys = await redis.keys("playbook:*") + stats.playbook_count = len(keys) + except Exception as e: + logger.debug("heartbeat_playbook_count_failed", error=str(e)) + + try: + # KM 向量化率(DB 查詢) + from src.db.base import get_db_context + from src.db.models import IncidentRecord, KnowledgeEntryRecord + from sqlalchemy import func, select + async with get_db_context() as db: + # KM 總數 + km_total = await db.scalar(select(func.count()).select_from(KnowledgeEntryRecord)) + stats.km_total = km_total or 0 + + # Incident 向量化數 + vec_count = await db.scalar( + select(func.count()).select_from(IncidentRecord) + .where(IncidentRecord.vectorized == True) # noqa: E712 + ) + inc_total = await db.scalar(select(func.count()).select_from(IncidentRecord)) + stats.km_vectorized = vec_count or 0 + if not stats.km_total: + stats.km_total = inc_total or 0 + + # 24h 修復統計 + since = datetime.utcnow() - timedelta(hours=24) + outcomes = await db.execute( + select(IncidentRecord.outcome).where( + IncidentRecord.created_at >= since, + IncidentRecord.outcome.isnot(None), + ) + ) + outcome_list = [r[0] for r in outcomes.all() if r[0]] + stats.attempt_24h = len(outcome_list) + stats.success_24h = sum( + 1 for o in outcome_list + if isinstance(o, dict) and o.get("execution_success") + or isinstance(o, str) and "success" in o.lower() + ) + + # 最後學習活動 + last_km = await db.scalar( + select(func.max(KnowledgeEntryRecord.created_at)) + ) + if last_km: + stats.last_learning_at = last_km + except Exception as e: + logger.debug("heartbeat_flywheel_stats_failed", error=str(e)) + + return stats + + # ========================================================================= + # Warnings 彙整 + # ========================================================================= + + def _build_warnings(self, report: HeartbeatReport) -> list[str]: + warnings: list[str] = [] + + # Ollama 模型未載入 + for model, loaded in report.ollama_models.items(): + if not loaded: + warnings.append(f"{model} 未載入,相關功能失效") + + # AI 服務異常 + for name, probe in report.ai_services.items(): + if not probe.ok and not probe.status.startswith("⚠️"): + warnings.append(f"{name} 服務異常: {probe.status}") + + # MCP 設定問題 + for name, probe in report.mcp_providers.items(): + if not probe.ok: + warnings.append(f"MCP {name}: {probe.status}") + + # ArgoCD 非 Synced+Healthy + argocd = report.infra.get("argocd_sync") + if argocd and not argocd.ok: + warnings.append(f"ArgoCD: {argocd.status}") + + # 飛輪警告 + if report.flywheel.playbook_count == 0: + warnings.append("Playbook 數量為 0,飛輪學習無法啟動") + if report.flywheel.km_total > 0: + vec_rate = report.flywheel.km_vectorized / report.flywheel.km_total + if vec_rate < 0.8: + warnings.append( + f"KM 向量化率偏低: {report.flywheel.km_vectorized}/{report.flywheel.km_total}" + f" ({int(vec_rate*100)}%)" + ) + + # 2h 沉默(無告警活動) + if report.flywheel.last_learning_at: + silence_hours = (now_taipei() - report.flywheel.last_learning_at.replace( + tzinfo=report.timestamp.tzinfo if report.timestamp.tzinfo else None + )).total_seconds() / 3600 + if silence_hours > 2: + warnings.append(f"系統沉默 {silence_hours:.1f}h(無學習活動)") + + return warnings + + +def report_to_telegram_html(report: HeartbeatReport) -> str: + """ + 將 HeartbeatReport 轉換為 Telegram HTML 格式 + + 一條訊息,包含所有探測結果。 + """ + ts = report.timestamp.strftime("%Y-%m-%d %H:%M (台北)") + + lines = [ + f"📊 AWOOOI 系統心跳報告", + f"⏰ {ts}", + "", + ] + + # --- AI 服務 --- + lines.append("🤖 AI 服務") + ollama_probe = report.ai_services.get("ollama", ProbeResult(False, "❌ 無回應")) + latency_str = f" ({ollama_probe.latency_ms:.0f}ms)" if ollama_probe.latency_ms else "" + lines.append(f" Ollama: {ollama_probe.status}{latency_str}") + + # 各模型狀態(縮排顯示) + for model, loaded in report.ollama_models.items(): + icon = "✅" if loaded else "❌" + short = model.split(":")[0] + lines.append(f" {icon} {html.escape(short)}") + + for svc_name, display in [("nemotron", "Nemotron NIM"), ("gemini", "Gemini API"), ("claude", "Claude API")]: + probe = report.ai_services.get(svc_name, ProbeResult(False, "❌ 無回應")) + latency_str = f" ({probe.latency_ms:.0f}ms)" if probe.latency_ms else "" + lines.append(f" {display:<18}{probe.status}{latency_str}") + + lines.append("") + + # --- MCP Provider --- + lines.append("🔌 MCP Provider") + mcp_display = { + "k8s": "K8s MCP", + "ssh": "SSH MCP", + "argocd": "ArgoCD MCP", + "sentry": "Sentry MCP", + } + for key, display in mcp_display.items(): + probe = report.mcp_providers.get(key, ProbeResult(False, "❌ 無回應")) + lines.append(f" {display:<18}{probe.status}") + + lines.append("") + + # --- 飛輪狀態 --- + fw = report.flywheel + lines.append("🔄 飛輪狀態(24h)") + lines.append(f" Playbooks: {fw.playbook_count} 個") + if fw.attempt_24h > 0: + rate = int(fw.success_24h / fw.attempt_24h * 100) + lines.append(f" 今日修復: {fw.success_24h}/{fw.attempt_24h} 次 ({rate}%)") + else: + lines.append(f" 今日修復: 0 次") + if fw.km_total > 0: + vec_rate = int(fw.km_vectorized / fw.km_total * 100) + lines.append(f" KM 向量化: {fw.km_vectorized}/{fw.km_total} ({vec_rate}%)") + if fw.last_learning_at: + lines.append(f" 最後學習固化: {fw.last_learning_at.strftime('%H:%M')}") + + lines.append("") + + # --- 基礎設施 --- + lines.append("🚀 基礎設施") + argocd = report.infra.get("argocd_sync", ProbeResult(False, "❌ 無回應")) + velero = report.infra.get("velero", ProbeResult(False, "❌ 無回應")) + lines.append(f" ArgoCD: {argocd.status}") + lines.append(f" Velero 備份: {velero.status}") + + # --- Warnings --- + if report.warnings: + lines.append("") + lines.append(f"⚠️ 需關注({len(report.warnings)} 項)") + for w in report.warnings: + lines.append(f" - {html.escape(w)}") + else: + lines.append("") + lines.append("✅ 全部正常") + + return "\n".join(lines) diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index 711a5e16..7865f547 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -4654,43 +4654,52 @@ class TelegramGateway: async def send_heartbeat(self) -> bool: """ - 發送心跳訊息 (系統狀態摘要,含 Nemotron 健康探測) + 發送心跳報告到 SRE 戰情室群組 - 每 30 分鐘執行一次,證明告警鏈路正常運作 - 2026-04-03 ogt: 加入 Nemotron 健康探測 — 補足監控盲區 + ADR-073 重構 (2026-04-12 ogt): + - Redis 分散式鎖:2 個 replica 只發一條 + - 並行探測所有服務(HeartbeatReportService) + - 一條彙整報告發到 SRE_GROUP_CHAT_ID,不散發 + - 沉默告警整合進報告 warnings,不額外多發 """ try: if not self._initialized: await self.initialize() - from src.utils.timezone import now_taipei - taipei_now = now_taipei() + from src.core.redis_client import RedisLock + from src.services.heartbeat_report_service import ( + HeartbeatReportService, + report_to_telegram_html, + ) - # Nemotron 健康探測 - nemo_ok, nemo_status = await self._check_nemotron_health() + # 分散式鎖:同一心跳週期只有一個 replica 發報告 + # timeout=25*60 確保下一次心跳前鎖一定釋放(心跳間隔 30min) + try: + async with RedisLock("heartbeat:leader", timeout=25 * 60, blocking_timeout=5.0): + report = await HeartbeatReportService().collect() + text = report_to_telegram_html(report) - text = f"""💓 AWOOOI 心跳 -⏰ {taipei_now.strftime('%Y-%m-%d %H:%M:%S')} (台北) -📡 告警鏈路: ✅ 正常 -🤖 Nemotron NIM: {nemo_status}""" + # 只發到 SRE 戰情室群組 + if settings.SRE_GROUP_CHAT_ID: + await self.send_to_group(text=text) + else: + # SRE_GROUP_CHAT_ID 未注入時,fallback 到個人頻道並加警告 + fallback = ( + "⚠️ SRE_GROUP_CHAT_ID 未設定,心跳報告暫發到個人頻道\n\n" + + text + ) + await self.send_notification(fallback) - await self.send_notification(text) - self._last_message_time = datetime.now(UTC) + self._last_message_time = datetime.now(UTC) + logger.info( + "telegram_heartbeat_sent", + warnings=len(report.warnings), + has_sre_group=bool(settings.SRE_GROUP_CHAT_ID), + ) + except RuntimeError: + # 另一個 replica 持有鎖,本次跳過 + logger.debug("heartbeat_skipped_lock_taken") - # Nemotron 異常時:告警通知(不自動關閉,NIM 免費 tier 本來就慢) - # 2026-04-03 ogt: 修正 — 之前錯誤地自動關閉 Nemotron 協作 - # Nemotron 是產品核心,慢(11-45s)是免費 tier 特性,不是需要修復的異常 - if not nemo_ok: - alert = InfraAlertMessage( - component="Nemotron NIM (NVIDIA API)", - status=nemo_status, - impact="NIM 免費 tier 回應慢 (11-45s),@nemo 對話可能需等待", - note="NIM 慢屬正常 — 免費 tier 特性,非故障。如需快速回應請用 @openclaw", - ) - await self.send_notification(alert.format(), parse_mode="HTML") - logger.warning("nemotron_health_slow_alert", status=nemo_status) - - logger.info("telegram_heartbeat_sent", nemotron_ok=nemo_ok) return True except Exception as e: @@ -4727,41 +4736,26 @@ class TelegramGateway: async def _heartbeat_loop( self, interval_minutes: int, - silence_hours: int, + _silence_hours: int, # 保留參數簽名相容性,沉默判斷已整合進 HeartbeatReport.warnings ) -> None: - """心跳監控循環""" + """ + 心跳監控循環 + + ADR-073 重構 (2026-04-12 ogt): + - 移除額外沉默告警多發邏輯(已整合進 HeartbeatReport.warnings) + - send_heartbeat() 內部有 RedisLock,2 個 replica 各自跑 loop 也只發一條 + """ interval_seconds = interval_minutes * 60 - silence_seconds = silence_hours * 3600 while self._heartbeat_active: try: - # 1. 檢查沉默告警 - if self._last_message_time: - silence_duration = (datetime.now(UTC) - self._last_message_time).total_seconds() - if silence_duration > silence_seconds: - # 發送沉默告警 - hours = int(silence_duration // 3600) - await self.send_notification( - f"⚠️ 沉默告警\n\n" - f"Telegram 已 {hours} 小時沒有收到任何訊息!\n" - f"請檢查告警鏈路是否正常運作。" - ) - logger.warning( - "telegram_silence_alert", - silence_hours=hours, - ) - - # 2. 發送心跳 await self.send_heartbeat() - - # 3. 等待下一次 await asyncio.sleep(interval_seconds) - except asyncio.CancelledError: break except Exception as e: logger.error("telegram_heartbeat_loop_error", error=str(e)) - await asyncio.sleep(60) # 錯誤後等待 1 分鐘重試 + await asyncio.sleep(60) async def stop_heartbeat_monitor(self) -> None: """停止心跳監控""" diff --git a/k8s/awoooi-prod/03-secrets.example.yaml b/k8s/awoooi-prod/03-secrets.example.yaml index 64d45dec..c03fea7b 100644 --- a/k8s/awoooi-prod/03-secrets.example.yaml +++ b/k8s/awoooi-prod/03-secrets.example.yaml @@ -45,6 +45,7 @@ stringData: OPENCLAW_TG_BOT_TOKEN: "CHANGE_ME" OPENCLAW_TG_CHAT_ID: "CHANGE_ME" OPENCLAW_TG_USER_WHITELIST: "CHANGE_ME" # 逗號分隔的 User ID + SRE_GROUP_CHAT_ID: "CHANGE_ME" # ADR-073 P2-4 (2026-04-12 ogt): SRE 群組 Chat ID,HeartbeatReport 發送目標 # ============================================================================ # Webhook 安全 (CISO 要求: HMAC-SHA256 簽章)