diff --git a/apps/api/src/services/heartbeat_report_service.py b/apps/api/src/services/heartbeat_report_service.py index 85167f97..63686ee8 100644 --- a/apps/api/src/services/heartbeat_report_service.py +++ b/apps/api/src/services/heartbeat_report_service.py @@ -48,6 +48,45 @@ class FlywheelStats: last_learning_at: Optional[datetime] = None +@dataclass +class AlertPipelineStats: + total_24h: int = 0 + auto_resolved_24h: int = 0 + pending_approval: int = 0 + execution_success_24h: int = 0 + execution_failed_24h: int = 0 + + +@dataclass +class DbRedisStats: + db_ok: bool = False + db_status: str = "❌ 未查詢" + redis_ok: bool = False + redis_status: str = "❌ 未查詢" + redis_key_count: int = 0 + + +@dataclass +class PodInfo: + name: str + ready: bool + status: str + restarts: int = 0 + + +@dataclass +class ScannerStats: + # key = scanner name, value = last run ISO string or None + last_runs: dict[str, Optional[str]] = field(default_factory=dict) + + +@dataclass +class TelegramBotStats: + polling_ok: bool = False + status: str = "❌ 未查詢" + last_callback_ago_min: Optional[float] = None + + @dataclass class HeartbeatReport: timestamp: datetime @@ -57,6 +96,12 @@ class HeartbeatReport: flywheel: FlywheelStats = field(default_factory=FlywheelStats) infra: dict[str, ProbeResult] = field(default_factory=dict) warnings: list[str] = field(default_factory=list) + # 2026-04-22 新增動態區塊 + alert_pipeline: AlertPipelineStats = field(default_factory=AlertPipelineStats) + db_redis: DbRedisStats = field(default_factory=DbRedisStats) + pods: list[PodInfo] = field(default_factory=list) + scanners: ScannerStats = field(default_factory=ScannerStats) + telegram_bot: TelegramBotStats = field(default_factory=TelegramBotStats) @property def has_warnings(self) -> bool: @@ -88,6 +133,12 @@ class HeartbeatReportService: self._probe_argocd_sync(), self._probe_velero(), self._get_flywheel_stats(), + # 2026-04-22 新增動態探測 + self._get_alert_pipeline_stats(), + self._probe_db_redis(), + self._get_pod_status(), + self._get_scanner_stats(), + self._probe_telegram_bot(), return_exceptions=True, ) @@ -95,6 +146,7 @@ class HeartbeatReportService: "_ollama", "_nemotron", "_gemini", "_claude", "_mcp_k8s", "_mcp_ssh", "_mcp_argocd", "_mcp_sentry", "_argocd_sync", "_velero", "_flywheel", + "_alert_pipeline", "_db_redis", "_pods", "_scanners", "_tg_bot", ] collected: dict = {} for key, result in zip(keys, results): @@ -126,6 +178,18 @@ class HeartbeatReportService: if collected["_flywheel"]: report.flywheel = collected["_flywheel"] + # --- 新動態區塊 --- + if collected["_alert_pipeline"]: + report.alert_pipeline = collected["_alert_pipeline"] + if collected["_db_redis"]: + report.db_redis = collected["_db_redis"] + if collected["_pods"]: + report.pods = collected["_pods"] + if collected["_scanners"]: + report.scanners = collected["_scanners"] + if collected["_tg_bot"]: + report.telegram_bot = collected["_tg_bot"] + # --- 彙整 warnings --- report.warnings = self._build_warnings(report) @@ -374,6 +438,136 @@ class HeartbeatReportService: return stats + # ========================================================================= + # 2026-04-22 新增動態探測方法 + # ========================================================================= + + async def _get_alert_pipeline_stats(self) -> AlertPipelineStats: + """查 24h 告警流水線統計(approval_records)""" + stats = AlertPipelineStats() + try: + from src.db.base import get_db_context + from sqlalchemy import text as sa_text + async with get_db_context() as db: + r = await db.execute(sa_text(""" + SELECT + COUNT(*) AS total, + COUNT(*) FILTER (WHERE UPPER(status::text) = 'PENDING') AS pending, + COUNT(*) FILTER (WHERE UPPER(status::text) = 'EXECUTION_SUCCESS') AS success, + COUNT(*) FILTER (WHERE UPPER(status::text) = 'EXECUTION_FAILED') AS failed, + COUNT(*) FILTER (WHERE UPPER(status::text) IN ('APPROVED','EXECUTION_SUCCESS','EXECUTION_FAILED')) AS auto_resolved + FROM approval_records + WHERE created_at >= NOW() - interval '24 hours' + """)) + row = r.one() + stats.total_24h = int(row.total or 0) + stats.pending_approval = int(row.pending or 0) + stats.execution_success_24h = int(row.success or 0) + stats.execution_failed_24h = int(row.failed or 0) + stats.auto_resolved_24h = int(row.auto_resolved or 0) + except Exception as e: + logger.debug("heartbeat_alert_pipeline_failed", error=str(e)) + return stats + + async def _probe_db_redis(self) -> DbRedisStats: + """探測 PostgreSQL 與 Redis 連線健康""" + s = DbRedisStats() + try: + from src.db.base import get_db_context + from sqlalchemy import text as sa_text + async with get_db_context() as db: + await db.execute(sa_text("SELECT 1")) + s.db_ok = True + s.db_status = "✅ 正常" + except Exception as e: + s.db_status = f"❌ {str(e)[:40]}" + + try: + from src.core.redis_client import get_redis + redis = get_redis() + info = await redis.info("memory") + used_mb = int(info.get("used_memory", 0)) // (1024 * 1024) + all_keys = await redis.dbsize() + s.redis_ok = True + s.redis_key_count = all_keys + s.redis_status = f"✅ 正常 {used_mb}MB / {all_keys} keys" + except Exception as e: + s.redis_status = f"❌ {str(e)[:40]}" + return s + + async def _get_pod_status(self) -> list[PodInfo]: + """查 awoooi-prod namespace 的所有 Pod 狀態""" + pods: list[PodInfo] = [] + try: + import subprocess + r = subprocess.run( + ["kubectl", "-n", "awoooi-prod", "get", "pods", + "--no-headers", "-o", + "custom-columns=NAME:.metadata.name,READY:.status.containerStatuses[0].ready," + "STATUS:.status.phase,RESTARTS:.status.containerStatuses[0].restartCount"], + capture_output=True, text=True, timeout=8, + ) + for line in r.stdout.strip().splitlines(): + parts = line.split() + if len(parts) >= 3: + name = parts[0] + ready = parts[1].lower() == "true" + status = parts[2] + restarts = int(parts[3]) if len(parts) >= 4 and parts[3].isdigit() else 0 + pods.append(PodInfo(name=name, ready=ready, status=status, restarts=restarts)) + except Exception as e: + logger.debug("heartbeat_pod_status_failed", error=str(e)) + return pods + + async def _get_scanner_stats(self) -> ScannerStats: + """查各 scanner 最後執行時間(Redis daily lock key TTL 反推)""" + stats = ScannerStats() + scanner_names = [ + "capacity_forecaster", "hermes_rule_quality", + "compliance_scanner", "coverage_evaluator", "daily_report", + ] + try: + from src.core.redis_client import get_redis + from src.utils.timezone import now_taipei as _now + redis = get_redis() + today = _now().date().isoformat() + for name in scanner_names: + key = f"aiops:daily_lock:{name}:{today}" + ttl = await redis.ttl(key) + if ttl > 0: + # TTL=25h 時剛跑完;剩餘 TTL 推算跑完時間 + ran_at_sec = 25 * 3600 - ttl + h, m = divmod(ran_at_sec // 60, 60) + stats.last_runs[name] = f"今日 {h:02d}:{m:02d}" + else: + stats.last_runs[name] = None # 今日尚未執行 + except Exception as e: + logger.debug("heartbeat_scanner_stats_failed", error=str(e)) + return stats + + async def _probe_telegram_bot(self) -> TelegramBotStats: + """探測 Telegram Bot polling 狀態""" + s = TelegramBotStats() + try: + from src.core.redis_client import get_redis + redis = get_redis() + # polling leader lock + leader = await redis.get("telegram:polling_leader") + if leader: + s.polling_ok = True + s.status = f"✅ Polling 活躍 (leader: {leader.decode()[:20] if isinstance(leader, bytes) else str(leader)[:20]})" + else: + # 嘗試查最近 callback 時間(tg_msg: key 存在即有活動) + keys = await redis.keys("tg_msg:*") + if keys: + s.polling_ok = True + s.status = f"✅ 有活動 ({len(keys)} msg keys)" + else: + s.status = "⚠️ 無 polling leader key(可能重啟中)" + except Exception as e: + s.status = f"❌ {str(e)[:40]}" + return s + # ========================================================================= # Warnings 彙整 # ========================================================================= @@ -420,6 +614,23 @@ class HeartbeatReportService: if silence_hours > 24: warnings.append(f"系統沉默 {silence_hours:.1f}h(無學習活動)") + # DB / Redis 異常 + if not report.db_redis.db_ok: + warnings.append(f"PostgreSQL: {report.db_redis.db_status}") + if not report.db_redis.redis_ok: + warnings.append(f"Redis: {report.db_redis.redis_status}") + + # Pending 積壓告警 + if report.alert_pipeline.pending_approval > 10: + warnings.append(f"PENDING 積壓 {report.alert_pipeline.pending_approval} 筆,需人工處理") + + # Pod 異常 + for pod in report.pods: + if not pod.ready: + warnings.append(f"Pod {pod.name} 未就緒({pod.status})") + if pod.restarts >= 3: + warnings.append(f"Pod {pod.name} 重啟 {pod.restarts} 次") + return warnings @@ -497,6 +708,53 @@ def report_to_telegram_html(report: HeartbeatReport) -> str: lines.append(f"├─ ArgoCD: {argocd.status}") lines.append(f"└─ Velero: {velero.status}") + # --- 告警流水線 --- + ap = report.alert_pipeline + lines.append("") + lines.append("📊 告警流水線(24h)") + lines.append(f"├─ 總計: {ap.total_24h} PENDING: {ap.pending_approval}") + if ap.execution_success_24h > 0 and ap.execution_failed_24h == 0: + exec_icon = "✅" + elif ap.execution_failed_24h > 0: + exec_icon = "⚠️" + else: + exec_icon = "—" + lines.append(f"└─ 執行: {exec_icon} 成功 {ap.execution_success_24h} 失敗 {ap.execution_failed_24h}") + + # --- DB & Redis --- + dr = report.db_redis + lines.append("") + lines.append("🗄️ 資料庫 & Redis") + lines.append(f"├─ PostgreSQL: {dr.db_status}") + lines.append(f"└─ Redis: {dr.redis_status} Keys: {dr.redis_key_count}") + + # --- K8s Pods --- + if report.pods: + lines.append("") + lines.append("☸️ Kubernetes Pods") + for i, pod in enumerate(report.pods): + prefix = "└─" if i == len(report.pods) - 1 else "├─" + ready_icon = "✅" if pod.ready else "❌" + restart_str = f" (重啟×{pod.restarts})" if pod.restarts > 0 else "" + lines.append(f"{prefix} {ready_icon} {html.escape(pod.name[:35])}{restart_str}") + + # --- Scanner 狀態 --- + if report.scanners.last_runs: + lines.append("") + lines.append("⏱️ Scanner 狀態(今日)") + scanner_items = list(report.scanners.last_runs.items()) + for i, (name, ran_at) in enumerate(scanner_items): + prefix = "└─" if i == len(scanner_items) - 1 else "├─" + icon = "✅" if ran_at else "⏸️" + ran_str = ran_at or "尚未執行" + lines.append(f"{prefix} {icon} {html.escape(name)}: {ran_str}") + + # --- Telegram Bot --- + tg = report.telegram_bot + lines.append("") + lines.append("🤖 Telegram Bot") + lines.append(f"└─ {tg.status}") + # --- Warnings / 總結 --- lines.append("") if report.warnings: