feat(heartbeat): 系統報告新增 5 大動態區塊
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 13m50s

新增告警流水線(24h)、DB/Redis 狀態、K8s Pods、Scanner 狀態、Telegram Bot
各區塊採 asyncio.gather(return_exceptions=True) 平行探測,任一失敗不影響其他
新增 AlertPipelineStats/DbRedisStats/PodInfo/ScannerStats/TelegramBotStats dataclasses
_build_warnings() 加入 DB/Redis 異常、PENDING>10、Pod 未就緒/高重啟次數判斷
report_to_telegram_html() 對應輸出 5 個新 HTML 區塊

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-22 09:29:07 +08:00
parent 3bd105be9a
commit 9244c5e845

View File

@@ -48,6 +48,45 @@ class FlywheelStats:
last_learning_at: Optional[datetime] = None
@dataclass
class AlertPipelineStats:
total_24h: int = 0
auto_resolved_24h: int = 0
pending_approval: int = 0
execution_success_24h: int = 0
execution_failed_24h: int = 0
@dataclass
class DbRedisStats:
db_ok: bool = False
db_status: str = "❌ 未查詢"
redis_ok: bool = False
redis_status: str = "❌ 未查詢"
redis_key_count: int = 0
@dataclass
class PodInfo:
name: str
ready: bool
status: str
restarts: int = 0
@dataclass
class ScannerStats:
# key = scanner name, value = last run ISO string or None
last_runs: dict[str, Optional[str]] = field(default_factory=dict)
@dataclass
class TelegramBotStats:
polling_ok: bool = False
status: str = "❌ 未查詢"
last_callback_ago_min: Optional[float] = None
@dataclass
class HeartbeatReport:
timestamp: datetime
@@ -57,6 +96,12 @@ class HeartbeatReport:
flywheel: FlywheelStats = field(default_factory=FlywheelStats)
infra: dict[str, ProbeResult] = field(default_factory=dict)
warnings: list[str] = field(default_factory=list)
# 2026-04-22 新增動態區塊
alert_pipeline: AlertPipelineStats = field(default_factory=AlertPipelineStats)
db_redis: DbRedisStats = field(default_factory=DbRedisStats)
pods: list[PodInfo] = field(default_factory=list)
scanners: ScannerStats = field(default_factory=ScannerStats)
telegram_bot: TelegramBotStats = field(default_factory=TelegramBotStats)
@property
def has_warnings(self) -> bool:
@@ -88,6 +133,12 @@ class HeartbeatReportService:
self._probe_argocd_sync(),
self._probe_velero(),
self._get_flywheel_stats(),
# 2026-04-22 新增動態探測
self._get_alert_pipeline_stats(),
self._probe_db_redis(),
self._get_pod_status(),
self._get_scanner_stats(),
self._probe_telegram_bot(),
return_exceptions=True,
)
@@ -95,6 +146,7 @@ class HeartbeatReportService:
"_ollama", "_nemotron", "_gemini", "_claude",
"_mcp_k8s", "_mcp_ssh", "_mcp_argocd", "_mcp_sentry",
"_argocd_sync", "_velero", "_flywheel",
"_alert_pipeline", "_db_redis", "_pods", "_scanners", "_tg_bot",
]
collected: dict = {}
for key, result in zip(keys, results):
@@ -126,6 +178,18 @@ class HeartbeatReportService:
if collected["_flywheel"]:
report.flywheel = collected["_flywheel"]
# --- 新動態區塊 ---
if collected["_alert_pipeline"]:
report.alert_pipeline = collected["_alert_pipeline"]
if collected["_db_redis"]:
report.db_redis = collected["_db_redis"]
if collected["_pods"]:
report.pods = collected["_pods"]
if collected["_scanners"]:
report.scanners = collected["_scanners"]
if collected["_tg_bot"]:
report.telegram_bot = collected["_tg_bot"]
# --- 彙整 warnings ---
report.warnings = self._build_warnings(report)
@@ -374,6 +438,136 @@ class HeartbeatReportService:
return stats
# =========================================================================
# 2026-04-22 新增動態探測方法
# =========================================================================
async def _get_alert_pipeline_stats(self) -> AlertPipelineStats:
"""查 24h 告警流水線統計approval_records"""
stats = AlertPipelineStats()
try:
from src.db.base import get_db_context
from sqlalchemy import text as sa_text
async with get_db_context() as db:
r = await db.execute(sa_text("""
SELECT
COUNT(*) AS total,
COUNT(*) FILTER (WHERE UPPER(status::text) = 'PENDING') AS pending,
COUNT(*) FILTER (WHERE UPPER(status::text) = 'EXECUTION_SUCCESS') AS success,
COUNT(*) FILTER (WHERE UPPER(status::text) = 'EXECUTION_FAILED') AS failed,
COUNT(*) FILTER (WHERE UPPER(status::text) IN ('APPROVED','EXECUTION_SUCCESS','EXECUTION_FAILED')) AS auto_resolved
FROM approval_records
WHERE created_at >= NOW() - interval '24 hours'
"""))
row = r.one()
stats.total_24h = int(row.total or 0)
stats.pending_approval = int(row.pending or 0)
stats.execution_success_24h = int(row.success or 0)
stats.execution_failed_24h = int(row.failed or 0)
stats.auto_resolved_24h = int(row.auto_resolved or 0)
except Exception as e:
logger.debug("heartbeat_alert_pipeline_failed", error=str(e))
return stats
async def _probe_db_redis(self) -> DbRedisStats:
"""探測 PostgreSQL 與 Redis 連線健康"""
s = DbRedisStats()
try:
from src.db.base import get_db_context
from sqlalchemy import text as sa_text
async with get_db_context() as db:
await db.execute(sa_text("SELECT 1"))
s.db_ok = True
s.db_status = "✅ 正常"
except Exception as e:
s.db_status = f"{str(e)[:40]}"
try:
from src.core.redis_client import get_redis
redis = get_redis()
info = await redis.info("memory")
used_mb = int(info.get("used_memory", 0)) // (1024 * 1024)
all_keys = await redis.dbsize()
s.redis_ok = True
s.redis_key_count = all_keys
s.redis_status = f"✅ 正常 {used_mb}MB / {all_keys} keys"
except Exception as e:
s.redis_status = f"{str(e)[:40]}"
return s
async def _get_pod_status(self) -> list[PodInfo]:
"""查 awoooi-prod namespace 的所有 Pod 狀態"""
pods: list[PodInfo] = []
try:
import subprocess
r = subprocess.run(
["kubectl", "-n", "awoooi-prod", "get", "pods",
"--no-headers", "-o",
"custom-columns=NAME:.metadata.name,READY:.status.containerStatuses[0].ready,"
"STATUS:.status.phase,RESTARTS:.status.containerStatuses[0].restartCount"],
capture_output=True, text=True, timeout=8,
)
for line in r.stdout.strip().splitlines():
parts = line.split()
if len(parts) >= 3:
name = parts[0]
ready = parts[1].lower() == "true"
status = parts[2]
restarts = int(parts[3]) if len(parts) >= 4 and parts[3].isdigit() else 0
pods.append(PodInfo(name=name, ready=ready, status=status, restarts=restarts))
except Exception as e:
logger.debug("heartbeat_pod_status_failed", error=str(e))
return pods
async def _get_scanner_stats(self) -> ScannerStats:
"""查各 scanner 最後執行時間Redis daily lock key TTL 反推)"""
stats = ScannerStats()
scanner_names = [
"capacity_forecaster", "hermes_rule_quality",
"compliance_scanner", "coverage_evaluator", "daily_report",
]
try:
from src.core.redis_client import get_redis
from src.utils.timezone import now_taipei as _now
redis = get_redis()
today = _now().date().isoformat()
for name in scanner_names:
key = f"aiops:daily_lock:{name}:{today}"
ttl = await redis.ttl(key)
if ttl > 0:
# TTL=25h 時剛跑完;剩餘 TTL 推算跑完時間
ran_at_sec = 25 * 3600 - ttl
h, m = divmod(ran_at_sec // 60, 60)
stats.last_runs[name] = f"今日 {h:02d}:{m:02d}"
else:
stats.last_runs[name] = None # 今日尚未執行
except Exception as e:
logger.debug("heartbeat_scanner_stats_failed", error=str(e))
return stats
async def _probe_telegram_bot(self) -> TelegramBotStats:
"""探測 Telegram Bot polling 狀態"""
s = TelegramBotStats()
try:
from src.core.redis_client import get_redis
redis = get_redis()
# polling leader lock
leader = await redis.get("telegram:polling_leader")
if leader:
s.polling_ok = True
s.status = f"✅ Polling 活躍 (leader: {leader.decode()[:20] if isinstance(leader, bytes) else str(leader)[:20]})"
else:
# 嘗試查最近 callback 時間tg_msg: key 存在即有活動)
keys = await redis.keys("tg_msg:*")
if keys:
s.polling_ok = True
s.status = f"✅ 有活動 ({len(keys)} msg keys)"
else:
s.status = "⚠️ 無 polling leader key可能重啟中"
except Exception as e:
s.status = f"{str(e)[:40]}"
return s
# =========================================================================
# Warnings 彙整
# =========================================================================
@@ -420,6 +614,23 @@ class HeartbeatReportService:
if silence_hours > 24:
warnings.append(f"系統沉默 {silence_hours:.1f}h無學習活動")
# DB / Redis 異常
if not report.db_redis.db_ok:
warnings.append(f"PostgreSQL: {report.db_redis.db_status}")
if not report.db_redis.redis_ok:
warnings.append(f"Redis: {report.db_redis.redis_status}")
# Pending 積壓告警
if report.alert_pipeline.pending_approval > 10:
warnings.append(f"PENDING 積壓 {report.alert_pipeline.pending_approval} 筆,需人工處理")
# Pod 異常
for pod in report.pods:
if not pod.ready:
warnings.append(f"Pod {pod.name} 未就緒({pod.status}")
if pod.restarts >= 3:
warnings.append(f"Pod {pod.name} 重啟 {pod.restarts}")
return warnings
@@ -497,6 +708,53 @@ def report_to_telegram_html(report: HeartbeatReport) -> str:
lines.append(f"├─ ArgoCD: {argocd.status}")
lines.append(f"└─ Velero: {velero.status}")
# --- 告警流水線 ---
ap = report.alert_pipeline
lines.append("")
lines.append("📊 <b>告警流水線24h</b>")
lines.append(f"├─ 總計: {ap.total_24h} PENDING: {ap.pending_approval}")
if ap.execution_success_24h > 0 and ap.execution_failed_24h == 0:
exec_icon = ""
elif ap.execution_failed_24h > 0:
exec_icon = "⚠️"
else:
exec_icon = ""
lines.append(f"└─ 執行: {exec_icon} 成功 {ap.execution_success_24h} 失敗 {ap.execution_failed_24h}")
# --- DB & Redis ---
dr = report.db_redis
lines.append("")
lines.append("🗄️ <b>資料庫 & Redis</b>")
lines.append(f"├─ PostgreSQL: {dr.db_status}")
lines.append(f"└─ Redis: {dr.redis_status} Keys: {dr.redis_key_count}")
# --- K8s Pods ---
if report.pods:
lines.append("")
lines.append("☸️ <b>Kubernetes Pods</b>")
for i, pod in enumerate(report.pods):
prefix = "└─" if i == len(report.pods) - 1 else "├─"
ready_icon = "" if pod.ready else ""
restart_str = f" (重啟×{pod.restarts})" if pod.restarts > 0 else ""
lines.append(f"{prefix} {ready_icon} {html.escape(pod.name[:35])}{restart_str}")
# --- Scanner 狀態 ---
if report.scanners.last_runs:
lines.append("")
lines.append("⏱️ <b>Scanner 狀態(今日)</b>")
scanner_items = list(report.scanners.last_runs.items())
for i, (name, ran_at) in enumerate(scanner_items):
prefix = "└─" if i == len(scanner_items) - 1 else "├─"
icon = "" if ran_at else "⏸️"
ran_str = ran_at or "尚未執行"
lines.append(f"{prefix} {icon} {html.escape(name)}: {ran_str}")
# --- Telegram Bot ---
tg = report.telegram_bot
lines.append("")
lines.append("🤖 <b>Telegram Bot</b>")
lines.append(f"└─ {tg.status}")
# --- Warnings / 總結 ---
lines.append("")
if report.warnings: