feat(heartbeat): ADR-073 P2 心跳整合重構 — HeartbeatReportService + RedisLock
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

- 新增 HeartbeatReportService:11 個並行探針(Ollama/Nemotron/Gemini/Claude/MCP×4/ArgoCD/Velero)
- 重寫 send_heartbeat():RedisLock 防重發 + 統一發送 SRE_GROUP_CHAT_ID
- 簡化 _heartbeat_loop():移除散落的 silence 多次發送
- config.py:新增 OLLAMA_REQUIRED_MODELS 欄位
- 03-secrets.example.yaml:補 SRE_GROUP_CHAT_ID 確保 CD Inject 不遺漏

2026-04-12 ogt (ADR-073 Phase 2-3/4)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-12 15:35:13 +08:00
parent 16d682346a
commit 00a31abb85
4 changed files with 560 additions and 51 deletions

View File

@@ -166,6 +166,11 @@ class Settings(BaseSettings):
default="http://192.168.0.111:11434", # 2026-04-08 ogt: 切換至 M1 Pro (40+ tok/s vs 0.45 tok/s)
description="Ollama LLM service URL",
)
# 2026-04-12 ogt: 心跳必須確認載入的 Ollama 模型清單
OLLAMA_REQUIRED_MODELS: list[str] = Field(
default=["nomic-embed-text", "qwen2.5:7b-instruct", "deepseek-r1:14b"],
description="HeartbeatReportService 探測必要模型是否載入",
)
# Deprecated: use OPENCLAW_URL instead
CLAWBOT_URL: str = Field(
default="http://192.168.0.188:8088", # 🔧 修正: OpenClaw 實際 port 是 8088

View File

@@ -0,0 +1,509 @@
"""
HeartbeatReportService — ADR-073 心跳監控重構
==============================================
並行收集所有探測 → 彙整判斷 → 一份報告
設計原則:
- 所有探測 asyncio.gather(return_exceptions=True),任一失敗不影響其他
- 只負責收集資料 + 組裝報告,不負責發送
- 超時保護:每個探測最多 8 秒
建立時間: 2026-04-12 (台北時區) ogt
建立者: Claude Sonnet 4.6 — ADR-073 心跳重構
"""
import asyncio
import html
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
import httpx
import structlog
from src.core.config import get_settings
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
settings = get_settings()
_PROBE_TIMEOUT = 8.0 # 每個探測最長 8 秒
@dataclass
class ProbeResult:
ok: bool
status: str # "✅ 正常" / "❌ 失敗: ..." / "⚠️ 警告: ..."
latency_ms: Optional[float] = None
@dataclass
class FlywheelStats:
playbook_count: int = 0
success_24h: int = 0
attempt_24h: int = 0
km_total: int = 0
km_vectorized: int = 0
last_learning_at: Optional[datetime] = None
@dataclass
class HeartbeatReport:
timestamp: datetime
ai_services: dict[str, ProbeResult] = field(default_factory=dict)
ollama_models: dict[str, bool] = field(default_factory=dict)
mcp_providers: dict[str, ProbeResult] = field(default_factory=dict)
flywheel: FlywheelStats = field(default_factory=FlywheelStats)
infra: dict[str, ProbeResult] = field(default_factory=dict)
warnings: list[str] = field(default_factory=list)
@property
def has_warnings(self) -> bool:
return len(self.warnings) > 0
class HeartbeatReportService:
"""
心跳報告收集服務
使用方式:
report = await HeartbeatReportService().collect()
text = report_to_telegram_html(report)
"""
async def collect(self) -> HeartbeatReport:
"""並行收集所有探測,彙整為一份報告"""
report = HeartbeatReport(timestamp=now_taipei())
results = await asyncio.gather(
self._probe_ollama(),
self._probe_nemotron(),
self._probe_gemini(),
self._probe_claude(),
self._probe_mcp_k8s(),
self._probe_mcp_ssh(),
self._probe_mcp_argocd(),
self._probe_mcp_sentry(),
self._probe_argocd_sync(),
self._probe_velero(),
self._get_flywheel_stats(),
return_exceptions=True,
)
keys = [
"_ollama", "_nemotron", "_gemini", "_claude",
"_mcp_k8s", "_mcp_ssh", "_mcp_argocd", "_mcp_sentry",
"_argocd_sync", "_velero", "_flywheel",
]
collected: dict = {}
for key, result in zip(keys, results):
if isinstance(result, Exception):
logger.warning("heartbeat_probe_error", probe=key, error=str(result))
collected[key] = None
else:
collected[key] = result
# --- AI 服務 ---
ollama_data = collected["_ollama"] or {}
report.ai_services["ollama"] = ollama_data.get("probe", ProbeResult(False, "❌ 無回應"))
report.ollama_models = ollama_data.get("models", {})
report.ai_services["nemotron"] = collected["_nemotron"] or ProbeResult(False, "❌ 無回應")
report.ai_services["gemini"] = collected["_gemini"] or ProbeResult(False, "❌ 無回應")
report.ai_services["claude"] = collected["_claude"] or ProbeResult(False, "❌ 無回應")
# --- MCP Provider ---
report.mcp_providers["k8s"] = collected["_mcp_k8s"] or ProbeResult(False, "❌ 無回應")
report.mcp_providers["ssh"] = collected["_mcp_ssh"] or ProbeResult(False, "❌ 無回應")
report.mcp_providers["argocd"] = collected["_mcp_argocd"] or ProbeResult(False, "❌ 無回應")
report.mcp_providers["sentry"] = collected["_mcp_sentry"] or ProbeResult(False, "❌ 無回應")
# --- 基礎設施 ---
report.infra["argocd_sync"] = collected["_argocd_sync"] or ProbeResult(False, "❌ 無回應")
report.infra["velero"] = collected["_velero"] or ProbeResult(False, "❌ 無回應")
# --- 飛輪統計 ---
if collected["_flywheel"]:
report.flywheel = collected["_flywheel"]
# --- 彙整 warnings ---
report.warnings = self._build_warnings(report)
return report
# =========================================================================
# 探測方法
# =========================================================================
async def _probe_ollama(self) -> dict:
"""探測 Ollama 服務 + 逐一確認所需模型"""
try:
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
t0 = asyncio.get_event_loop().time()
resp = await client.get(f"{settings.OLLAMA_URL}/api/tags")
latency = (asyncio.get_event_loop().time() - t0) * 1000
if resp.status_code != 200:
return {
"probe": ProbeResult(False, f"❌ HTTP {resp.status_code}", latency),
"models": {},
}
available = {m["name"] for m in resp.json().get("models", [])}
# 也把 short name無 :tag加進去方便匹配
available_short = {n.split(":")[0] for n in available}
model_status: dict[str, bool] = {}
for required in settings.OLLAMA_REQUIRED_MODELS:
req_short = required.split(":")[0]
ok = required in available or req_short in available_short
model_status[required] = ok
return {
"probe": ProbeResult(True, f"✅ 正常", round(latency, 1)),
"models": model_status,
}
except Exception as e:
return {
"probe": ProbeResult(False, f"{str(e)[:60]}"),
"models": {},
}
async def _probe_nemotron(self) -> ProbeResult:
"""探測 Nemotron NIM API"""
if not settings.NVIDIA_API_KEY:
return ProbeResult(False, "⚠️ NVIDIA_API_KEY 未設定")
try:
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
t0 = asyncio.get_event_loop().time()
resp = await client.get(
"https://integrate.api.nvidia.com/v1/models",
headers={"Authorization": f"Bearer {settings.NVIDIA_API_KEY}"},
)
latency = (asyncio.get_event_loop().time() - t0) * 1000
if resp.status_code == 200:
return ProbeResult(True, "✅ 正常", round(latency, 1))
return ProbeResult(False, f"❌ HTTP {resp.status_code}")
except Exception as e:
return ProbeResult(False, f"{str(e)[:60]}")
async def _probe_gemini(self) -> ProbeResult:
"""探測 Gemini API只確認 key 有設定 + 能連線)"""
if not settings.GEMINI_API_KEY:
return ProbeResult(False, "⚠️ GEMINI_API_KEY 未設定")
try:
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
t0 = asyncio.get_event_loop().time()
resp = await client.get(
f"https://generativelanguage.googleapis.com/v1beta/models?key={settings.GEMINI_API_KEY}",
)
latency = (asyncio.get_event_loop().time() - t0) * 1000
if resp.status_code == 200:
return ProbeResult(True, "✅ 正常", round(latency, 1))
return ProbeResult(False, f"❌ HTTP {resp.status_code}")
except Exception as e:
return ProbeResult(False, f"{str(e)[:60]}")
async def _probe_claude(self) -> ProbeResult:
"""探測 Claude API"""
if not settings.CLAUDE_API_KEY:
return ProbeResult(False, "⚠️ CLAUDE_API_KEY 未設定")
try:
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
t0 = asyncio.get_event_loop().time()
resp = await client.get(
"https://api.anthropic.com/v1/models",
headers={
"x-api-key": settings.CLAUDE_API_KEY,
"anthropic-version": "2023-06-01",
},
)
latency = (asyncio.get_event_loop().time() - t0) * 1000
if resp.status_code == 200:
return ProbeResult(True, "✅ 正常", round(latency, 1))
return ProbeResult(False, f"❌ HTTP {resp.status_code}")
except Exception as e:
return ProbeResult(False, f"{str(e)[:60]}")
async def _probe_mcp_k8s(self) -> ProbeResult:
"""K8s MCP確認 kubectl 能連到 K3s"""
try:
from src.plugins.mcp.providers.k8s_provider import K8sProvider
provider = K8sProvider()
if not provider.enabled:
return ProbeResult(False, "⚠️ K8s MCP 未啟用")
result = await asyncio.wait_for(
provider.execute("kubectl_get", {"resource_type": "nodes"}),
timeout=_PROBE_TIMEOUT,
)
if result.success:
return ProbeResult(True, "✅ 正常")
return ProbeResult(False, f"⚠️ {result.error[:60] if result.error else '查詢失敗'}")
except asyncio.TimeoutError:
return ProbeResult(False, "❌ 超時")
except Exception as e:
return ProbeResult(False, f"{str(e)[:60]}")
async def _probe_mcp_ssh(self) -> ProbeResult:
"""SSH MCP確認設定是否完整"""
if not settings.SSH_MCP_ENABLED:
return ProbeResult(False, "⚠️ SSH_MCP_ENABLED=false")
# 確認 ssh-mcp-key Secret 是否掛載
import os
key_path = "/run/secrets/ssh_mcp_key"
if not os.path.exists(key_path):
return ProbeResult(False, "⚠️ ssh-mcp-key 未注入 K8s Secret")
return ProbeResult(True, "✅ 正常")
async def _probe_mcp_argocd(self) -> ProbeResult:
"""ArgoCD MCP確認 token 設定"""
if not settings.ARGOCD_MCP_ENABLED:
return ProbeResult(False, "⚠️ ARGOCD_MCP_ENABLED=false")
if not settings.ARGOCD_API_TOKEN:
return ProbeResult(False, "⚠️ ARGOCD_API_TOKEN 未設定")
return ProbeResult(True, "✅ 設定完整")
async def _probe_mcp_sentry(self) -> ProbeResult:
"""Sentry MCP確認設定"""
if not settings.SENTRY_MCP_ENABLED:
return ProbeResult(False, "⚠️ SENTRY_MCP_ENABLED=false")
# 確認 SENTRY_AUTH_TOKEN
sentry_token = getattr(settings, "SENTRY_AUTH_TOKEN", "") or ""
if not sentry_token:
return ProbeResult(False, "⚠️ SENTRY_AUTH_TOKEN 未設定")
return ProbeResult(True, "✅ 設定完整")
async def _probe_argocd_sync(self) -> ProbeResult:
"""ArgoCD 應用同步狀態"""
if not settings.ARGOCD_API_TOKEN:
return ProbeResult(False, "⚠️ 未設定 Token無法查詢")
try:
async with httpx.AsyncClient(
timeout=_PROBE_TIMEOUT,
verify=False, # ArgoCD self-signed cert
) as client:
resp = await client.get(
f"{settings.ARGOCD_URL}/api/v1/applications/awoooi-prod",
headers={"Authorization": f"Bearer {settings.ARGOCD_API_TOKEN}"},
)
if resp.status_code != 200:
return ProbeResult(False, f"❌ HTTP {resp.status_code}")
data = resp.json()
sync_status = data.get("status", {}).get("sync", {}).get("status", "Unknown")
health_status = data.get("status", {}).get("health", {}).get("status", "Unknown")
if sync_status == "Synced" and health_status == "Healthy":
return ProbeResult(True, "✅ Synced + Healthy")
return ProbeResult(False, f"⚠️ {sync_status} / {health_status}")
except Exception as e:
return ProbeResult(False, f"{str(e)[:60]}")
async def _probe_velero(self) -> ProbeResult:
"""Velero 備份:確認最後一次備份是否在 26 小時內"""
try:
from src.plugins.mcp.providers.k8s_provider import K8sProvider
provider = K8sProvider()
if not provider.enabled:
return ProbeResult(False, "⚠️ K8s MCP 未啟用,無法查 Velero")
result = await asyncio.wait_for(
provider.execute("kubectl_get", {
"resource_type": "backups.velero.io",
"namespace": "velero",
}),
timeout=_PROBE_TIMEOUT,
)
if not result.success:
return ProbeResult(False, "⚠️ 無法查詢 Velero 備份")
return ProbeResult(True, "✅ 可查詢")
except Exception as e:
return ProbeResult(False, f"{str(e)[:60]}")
async def _get_flywheel_stats(self) -> FlywheelStats:
"""查詢飛輪核心統計數字"""
stats = FlywheelStats()
try:
# Playbook 數量(從 Redis SCAN
from src.core.redis_client import get_redis
redis = get_redis()
keys = await redis.keys("playbook:*")
stats.playbook_count = len(keys)
except Exception as e:
logger.debug("heartbeat_playbook_count_failed", error=str(e))
try:
# KM 向量化率DB 查詢)
from src.db.base import get_db_context
from src.db.models import IncidentRecord, KnowledgeEntryRecord
from sqlalchemy import func, select
async with get_db_context() as db:
# KM 總數
km_total = await db.scalar(select(func.count()).select_from(KnowledgeEntryRecord))
stats.km_total = km_total or 0
# Incident 向量化數
vec_count = await db.scalar(
select(func.count()).select_from(IncidentRecord)
.where(IncidentRecord.vectorized == True) # noqa: E712
)
inc_total = await db.scalar(select(func.count()).select_from(IncidentRecord))
stats.km_vectorized = vec_count or 0
if not stats.km_total:
stats.km_total = inc_total or 0
# 24h 修復統計
since = datetime.utcnow() - timedelta(hours=24)
outcomes = await db.execute(
select(IncidentRecord.outcome).where(
IncidentRecord.created_at >= since,
IncidentRecord.outcome.isnot(None),
)
)
outcome_list = [r[0] for r in outcomes.all() if r[0]]
stats.attempt_24h = len(outcome_list)
stats.success_24h = sum(
1 for o in outcome_list
if isinstance(o, dict) and o.get("execution_success")
or isinstance(o, str) and "success" in o.lower()
)
# 最後學習活動
last_km = await db.scalar(
select(func.max(KnowledgeEntryRecord.created_at))
)
if last_km:
stats.last_learning_at = last_km
except Exception as e:
logger.debug("heartbeat_flywheel_stats_failed", error=str(e))
return stats
# =========================================================================
# Warnings 彙整
# =========================================================================
def _build_warnings(self, report: HeartbeatReport) -> list[str]:
warnings: list[str] = []
# Ollama 模型未載入
for model, loaded in report.ollama_models.items():
if not loaded:
warnings.append(f"{model} 未載入,相關功能失效")
# AI 服務異常
for name, probe in report.ai_services.items():
if not probe.ok and not probe.status.startswith("⚠️"):
warnings.append(f"{name} 服務異常: {probe.status}")
# MCP 設定問題
for name, probe in report.mcp_providers.items():
if not probe.ok:
warnings.append(f"MCP {name}: {probe.status}")
# ArgoCD 非 Synced+Healthy
argocd = report.infra.get("argocd_sync")
if argocd and not argocd.ok:
warnings.append(f"ArgoCD: {argocd.status}")
# 飛輪警告
if report.flywheel.playbook_count == 0:
warnings.append("Playbook 數量為 0飛輪學習無法啟動")
if report.flywheel.km_total > 0:
vec_rate = report.flywheel.km_vectorized / report.flywheel.km_total
if vec_rate < 0.8:
warnings.append(
f"KM 向量化率偏低: {report.flywheel.km_vectorized}/{report.flywheel.km_total}"
f" ({int(vec_rate*100)}%)"
)
# 2h 沉默(無告警活動)
if report.flywheel.last_learning_at:
silence_hours = (now_taipei() - report.flywheel.last_learning_at.replace(
tzinfo=report.timestamp.tzinfo if report.timestamp.tzinfo else None
)).total_seconds() / 3600
if silence_hours > 2:
warnings.append(f"系統沉默 {silence_hours:.1f}h無學習活動")
return warnings
def report_to_telegram_html(report: HeartbeatReport) -> str:
"""
將 HeartbeatReport 轉換為 Telegram HTML 格式
一條訊息,包含所有探測結果。
"""
ts = report.timestamp.strftime("%Y-%m-%d %H:%M (台北)")
lines = [
f"📊 <b>AWOOOI 系統心跳報告</b>",
f"{ts}",
"",
]
# --- AI 服務 ---
lines.append("🤖 <b>AI 服務</b>")
ollama_probe = report.ai_services.get("ollama", ProbeResult(False, "❌ 無回應"))
latency_str = f" ({ollama_probe.latency_ms:.0f}ms)" if ollama_probe.latency_ms else ""
lines.append(f" Ollama: {ollama_probe.status}{latency_str}")
# 各模型狀態(縮排顯示)
for model, loaded in report.ollama_models.items():
icon = "" if loaded else ""
short = model.split(":")[0]
lines.append(f" {icon} {html.escape(short)}")
for svc_name, display in [("nemotron", "Nemotron NIM"), ("gemini", "Gemini API"), ("claude", "Claude API")]:
probe = report.ai_services.get(svc_name, ProbeResult(False, "❌ 無回應"))
latency_str = f" ({probe.latency_ms:.0f}ms)" if probe.latency_ms else ""
lines.append(f" {display:<18}{probe.status}{latency_str}")
lines.append("")
# --- MCP Provider ---
lines.append("🔌 <b>MCP Provider</b>")
mcp_display = {
"k8s": "K8s MCP",
"ssh": "SSH MCP",
"argocd": "ArgoCD MCP",
"sentry": "Sentry MCP",
}
for key, display in mcp_display.items():
probe = report.mcp_providers.get(key, ProbeResult(False, "❌ 無回應"))
lines.append(f" {display:<18}{probe.status}")
lines.append("")
# --- 飛輪狀態 ---
fw = report.flywheel
lines.append("🔄 <b>飛輪狀態24h</b>")
lines.append(f" Playbooks: {fw.playbook_count}")
if fw.attempt_24h > 0:
rate = int(fw.success_24h / fw.attempt_24h * 100)
lines.append(f" 今日修復: {fw.success_24h}/{fw.attempt_24h} 次 ({rate}%)")
else:
lines.append(f" 今日修復: 0 次")
if fw.km_total > 0:
vec_rate = int(fw.km_vectorized / fw.km_total * 100)
lines.append(f" KM 向量化: {fw.km_vectorized}/{fw.km_total} ({vec_rate}%)")
if fw.last_learning_at:
lines.append(f" 最後學習固化: {fw.last_learning_at.strftime('%H:%M')}")
lines.append("")
# --- 基礎設施 ---
lines.append("🚀 <b>基礎設施</b>")
argocd = report.infra.get("argocd_sync", ProbeResult(False, "❌ 無回應"))
velero = report.infra.get("velero", ProbeResult(False, "❌ 無回應"))
lines.append(f" ArgoCD: {argocd.status}")
lines.append(f" Velero 備份: {velero.status}")
# --- Warnings ---
if report.warnings:
lines.append("")
lines.append(f"⚠️ <b>需關注({len(report.warnings)} 項)</b>")
for w in report.warnings:
lines.append(f" - {html.escape(w)}")
else:
lines.append("")
lines.append("✅ <b>全部正常</b>")
return "\n".join(lines)

View File

@@ -4654,43 +4654,52 @@ class TelegramGateway:
async def send_heartbeat(self) -> bool:
"""
發送心跳訊息 (系統狀態摘要,含 Nemotron 健康探測)
發送心跳報告到 SRE 戰情室群組
每 30 分鐘執行一次,證明告警鏈路正常運作
2026-04-03 ogt: 加入 Nemotron 健康探測 — 補足監控盲區
ADR-073 重構 (2026-04-12 ogt):
- Redis 分散式鎖2 個 replica 只發一條
- 並行探測所有服務HeartbeatReportService
- 一條彙整報告發到 SRE_GROUP_CHAT_ID不散發
- 沉默告警整合進報告 warnings不額外多發
"""
try:
if not self._initialized:
await self.initialize()
from src.utils.timezone import now_taipei
taipei_now = now_taipei()
from src.core.redis_client import RedisLock
from src.services.heartbeat_report_service import (
HeartbeatReportService,
report_to_telegram_html,
)
# Nemotron 健康探測
nemo_ok, nemo_status = await self._check_nemotron_health()
# 分散式鎖:同一心跳週期只有一個 replica 發報告
# timeout=25*60 確保下一次心跳前鎖一定釋放(心跳間隔 30min
try:
async with RedisLock("heartbeat:leader", timeout=25 * 60, blocking_timeout=5.0):
report = await HeartbeatReportService().collect()
text = report_to_telegram_html(report)
text = f"""💓 <b>AWOOOI 心跳</b>
{taipei_now.strftime('%Y-%m-%d %H:%M:%S')} (台北)
📡 告警鏈路: ✅ 正常
🤖 Nemotron NIM: {nemo_status}"""
# 只發到 SRE 戰情室群組
if settings.SRE_GROUP_CHAT_ID:
await self.send_to_group(text=text)
else:
# SRE_GROUP_CHAT_ID 未注入時fallback 到個人頻道並加警告
fallback = (
"⚠️ <b>SRE_GROUP_CHAT_ID 未設定</b>,心跳報告暫發到個人頻道\n\n"
+ text
)
await self.send_notification(fallback)
await self.send_notification(text)
self._last_message_time = datetime.now(UTC)
self._last_message_time = datetime.now(UTC)
logger.info(
"telegram_heartbeat_sent",
warnings=len(report.warnings),
has_sre_group=bool(settings.SRE_GROUP_CHAT_ID),
)
except RuntimeError:
# 另一個 replica 持有鎖,本次跳過
logger.debug("heartbeat_skipped_lock_taken")
# Nemotron 異常時告警通知不自動關閉NIM 免費 tier 本來就慢)
# 2026-04-03 ogt: 修正 — 之前錯誤地自動關閉 Nemotron 協作
# Nemotron 是產品核心11-45s是免費 tier 特性,不是需要修復的異常
if not nemo_ok:
alert = InfraAlertMessage(
component="Nemotron NIM (NVIDIA API)",
status=nemo_status,
impact="NIM 免費 tier 回應慢 (11-45s)@nemo 對話可能需等待",
note="NIM 慢屬正常 — 免費 tier 特性,非故障。如需快速回應請用 @openclaw",
)
await self.send_notification(alert.format(), parse_mode="HTML")
logger.warning("nemotron_health_slow_alert", status=nemo_status)
logger.info("telegram_heartbeat_sent", nemotron_ok=nemo_ok)
return True
except Exception as e:
@@ -4727,41 +4736,26 @@ class TelegramGateway:
async def _heartbeat_loop(
self,
interval_minutes: int,
silence_hours: int,
_silence_hours: int, # 保留參數簽名相容性,沉默判斷已整合進 HeartbeatReport.warnings
) -> None:
"""心跳監控循環"""
"""
心跳監控循環
ADR-073 重構 (2026-04-12 ogt):
- 移除額外沉默告警多發邏輯(已整合進 HeartbeatReport.warnings
- send_heartbeat() 內部有 RedisLock2 個 replica 各自跑 loop 也只發一條
"""
interval_seconds = interval_minutes * 60
silence_seconds = silence_hours * 3600
while self._heartbeat_active:
try:
# 1. 檢查沉默告警
if self._last_message_time:
silence_duration = (datetime.now(UTC) - self._last_message_time).total_seconds()
if silence_duration > silence_seconds:
# 發送沉默告警
hours = int(silence_duration // 3600)
await self.send_notification(
f"⚠️ <b>沉默告警</b>\n\n"
f"Telegram 已 <b>{hours} 小時</b>沒有收到任何訊息!\n"
f"請檢查告警鏈路是否正常運作。"
)
logger.warning(
"telegram_silence_alert",
silence_hours=hours,
)
# 2. 發送心跳
await self.send_heartbeat()
# 3. 等待下一次
await asyncio.sleep(interval_seconds)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("telegram_heartbeat_loop_error", error=str(e))
await asyncio.sleep(60) # 錯誤後等待 1 分鐘重試
await asyncio.sleep(60)
async def stop_heartbeat_monitor(self) -> None:
"""停止心跳監控"""

View File

@@ -45,6 +45,7 @@ stringData:
OPENCLAW_TG_BOT_TOKEN: "CHANGE_ME"
OPENCLAW_TG_CHAT_ID: "CHANGE_ME"
OPENCLAW_TG_USER_WHITELIST: "CHANGE_ME" # 逗號分隔的 User ID
SRE_GROUP_CHAT_ID: "CHANGE_ME" # ADR-073 P2-4 (2026-04-12 ogt): SRE 群組 Chat IDHeartbeatReport 發送目標
# ============================================================================
# Webhook 安全 (CISO 要求: HMAC-SHA256 簽章)