All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 15m4s
舊平鋪文字 → ├─/└─ 樹狀結構對齊 ACTION REQUIRED 卡片風格 - 標題: 💚/⚠️ INFO | AWOOOI 系統報告 - 加 ────── 分隔線 - AI/MCP/飛輪/基礎設施各節統一 ├─/└─ 格式 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
511 lines
21 KiB
Python
511 lines
21 KiB
Python
"""
|
||
HeartbeatReportService — ADR-073 心跳監控重構
|
||
==============================================
|
||
並行收集所有探測 → 彙整判斷 → 一份報告
|
||
|
||
設計原則:
|
||
- 所有探測 asyncio.gather(return_exceptions=True),任一失敗不影響其他
|
||
- 只負責收集資料 + 組裝報告,不負責發送
|
||
- 超時保護:每個探測最多 8 秒
|
||
|
||
建立時間: 2026-04-12 (台北時區) ogt
|
||
建立者: Claude Sonnet 4.6 — ADR-073 心跳重構
|
||
"""
|
||
|
||
import asyncio
|
||
import html
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional
|
||
|
||
import httpx
|
||
import structlog
|
||
|
||
from src.core.config import get_settings
|
||
from src.utils.timezone import now_taipei
|
||
|
||
logger = structlog.get_logger(__name__)
|
||
|
||
settings = get_settings()
|
||
|
||
_PROBE_TIMEOUT = 8.0 # 每個探測最長 8 秒
|
||
|
||
|
||
@dataclass
|
||
class ProbeResult:
|
||
ok: bool
|
||
status: str # "✅ 正常" / "❌ 失敗: ..." / "⚠️ 警告: ..."
|
||
latency_ms: Optional[float] = None
|
||
|
||
|
||
@dataclass
|
||
class FlywheelStats:
|
||
playbook_count: int = 0
|
||
success_24h: int = 0
|
||
attempt_24h: int = 0
|
||
km_total: int = 0
|
||
km_vectorized: int = 0
|
||
last_learning_at: Optional[datetime] = None
|
||
|
||
|
||
@dataclass
|
||
class HeartbeatReport:
|
||
timestamp: datetime
|
||
ai_services: dict[str, ProbeResult] = field(default_factory=dict)
|
||
ollama_models: dict[str, bool] = field(default_factory=dict)
|
||
mcp_providers: dict[str, ProbeResult] = field(default_factory=dict)
|
||
flywheel: FlywheelStats = field(default_factory=FlywheelStats)
|
||
infra: dict[str, ProbeResult] = field(default_factory=dict)
|
||
warnings: list[str] = field(default_factory=list)
|
||
|
||
@property
|
||
def has_warnings(self) -> bool:
|
||
return len(self.warnings) > 0
|
||
|
||
|
||
class HeartbeatReportService:
|
||
"""
|
||
心跳報告收集服務
|
||
|
||
使用方式:
|
||
report = await HeartbeatReportService().collect()
|
||
text = report_to_telegram_html(report)
|
||
"""
|
||
|
||
async def collect(self) -> HeartbeatReport:
|
||
"""並行收集所有探測,彙整為一份報告"""
|
||
report = HeartbeatReport(timestamp=now_taipei())
|
||
|
||
results = await asyncio.gather(
|
||
self._probe_ollama(),
|
||
self._probe_nemotron(),
|
||
self._probe_gemini(),
|
||
self._probe_claude(),
|
||
self._probe_mcp_k8s(),
|
||
self._probe_mcp_ssh(),
|
||
self._probe_mcp_argocd(),
|
||
self._probe_mcp_sentry(),
|
||
self._probe_argocd_sync(),
|
||
self._probe_velero(),
|
||
self._get_flywheel_stats(),
|
||
return_exceptions=True,
|
||
)
|
||
|
||
keys = [
|
||
"_ollama", "_nemotron", "_gemini", "_claude",
|
||
"_mcp_k8s", "_mcp_ssh", "_mcp_argocd", "_mcp_sentry",
|
||
"_argocd_sync", "_velero", "_flywheel",
|
||
]
|
||
collected: dict = {}
|
||
for key, result in zip(keys, results):
|
||
if isinstance(result, Exception):
|
||
logger.warning("heartbeat_probe_error", probe=key, error=str(result))
|
||
collected[key] = None
|
||
else:
|
||
collected[key] = result
|
||
|
||
# --- AI 服務 ---
|
||
ollama_data = collected["_ollama"] or {}
|
||
report.ai_services["ollama"] = ollama_data.get("probe", ProbeResult(False, "❌ 無回應"))
|
||
report.ollama_models = ollama_data.get("models", {})
|
||
report.ai_services["nemotron"] = collected["_nemotron"] or ProbeResult(False, "❌ 無回應")
|
||
report.ai_services["gemini"] = collected["_gemini"] or ProbeResult(False, "❌ 無回應")
|
||
report.ai_services["claude"] = collected["_claude"] or ProbeResult(False, "❌ 無回應")
|
||
|
||
# --- MCP Provider ---
|
||
report.mcp_providers["k8s"] = collected["_mcp_k8s"] or ProbeResult(False, "❌ 無回應")
|
||
report.mcp_providers["ssh"] = collected["_mcp_ssh"] or ProbeResult(False, "❌ 無回應")
|
||
report.mcp_providers["argocd"] = collected["_mcp_argocd"] or ProbeResult(False, "❌ 無回應")
|
||
report.mcp_providers["sentry"] = collected["_mcp_sentry"] or ProbeResult(False, "❌ 無回應")
|
||
|
||
# --- 基礎設施 ---
|
||
report.infra["argocd_sync"] = collected["_argocd_sync"] or ProbeResult(False, "❌ 無回應")
|
||
report.infra["velero"] = collected["_velero"] or ProbeResult(False, "❌ 無回應")
|
||
|
||
# --- 飛輪統計 ---
|
||
if collected["_flywheel"]:
|
||
report.flywheel = collected["_flywheel"]
|
||
|
||
# --- 彙整 warnings ---
|
||
report.warnings = self._build_warnings(report)
|
||
|
||
return report
|
||
|
||
# =========================================================================
|
||
# 探測方法
|
||
# =========================================================================
|
||
|
||
async def _probe_ollama(self) -> dict:
|
||
"""探測 Ollama 服務 + 逐一確認所需模型"""
|
||
try:
|
||
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
|
||
t0 = asyncio.get_event_loop().time()
|
||
resp = await client.get(f"{settings.OLLAMA_URL}/api/tags")
|
||
latency = (asyncio.get_event_loop().time() - t0) * 1000
|
||
|
||
if resp.status_code != 200:
|
||
return {
|
||
"probe": ProbeResult(False, f"❌ HTTP {resp.status_code}", latency),
|
||
"models": {},
|
||
}
|
||
|
||
available = {m["name"] for m in resp.json().get("models", [])}
|
||
# 也把 short name(無 :tag)加進去方便匹配
|
||
available_short = {n.split(":")[0] for n in available}
|
||
|
||
model_status: dict[str, bool] = {}
|
||
for required in settings.OLLAMA_REQUIRED_MODELS:
|
||
req_short = required.split(":")[0]
|
||
ok = required in available or req_short in available_short
|
||
model_status[required] = ok
|
||
|
||
return {
|
||
"probe": ProbeResult(True, f"✅ 正常", round(latency, 1)),
|
||
"models": model_status,
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
"probe": ProbeResult(False, f"❌ {str(e)[:60]}"),
|
||
"models": {},
|
||
}
|
||
|
||
async def _probe_nemotron(self) -> ProbeResult:
|
||
"""探測 Nemotron NIM API"""
|
||
if not settings.NVIDIA_API_KEY:
|
||
return ProbeResult(False, "⚠️ NVIDIA_API_KEY 未設定")
|
||
try:
|
||
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
|
||
t0 = asyncio.get_event_loop().time()
|
||
resp = await client.get(
|
||
"https://integrate.api.nvidia.com/v1/models",
|
||
headers={"Authorization": f"Bearer {settings.NVIDIA_API_KEY}"},
|
||
)
|
||
latency = (asyncio.get_event_loop().time() - t0) * 1000
|
||
if resp.status_code == 200:
|
||
return ProbeResult(True, "✅ 正常", round(latency, 1))
|
||
return ProbeResult(False, f"❌ HTTP {resp.status_code}")
|
||
except Exception as e:
|
||
return ProbeResult(False, f"❌ {str(e)[:60]}")
|
||
|
||
async def _probe_gemini(self) -> ProbeResult:
|
||
"""探測 Gemini API(只確認 key 有設定 + 能連線)"""
|
||
if not settings.GEMINI_API_KEY:
|
||
return ProbeResult(False, "⚠️ GEMINI_API_KEY 未設定")
|
||
try:
|
||
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
|
||
t0 = asyncio.get_event_loop().time()
|
||
resp = await client.get(
|
||
f"https://generativelanguage.googleapis.com/v1beta/models?key={settings.GEMINI_API_KEY}",
|
||
)
|
||
latency = (asyncio.get_event_loop().time() - t0) * 1000
|
||
if resp.status_code == 200:
|
||
return ProbeResult(True, "✅ 正常", round(latency, 1))
|
||
return ProbeResult(False, f"❌ HTTP {resp.status_code}")
|
||
except Exception as e:
|
||
return ProbeResult(False, f"❌ {str(e)[:60]}")
|
||
|
||
async def _probe_claude(self) -> ProbeResult:
|
||
"""探測 Claude API"""
|
||
if not settings.CLAUDE_API_KEY:
|
||
return ProbeResult(False, "⚠️ CLAUDE_API_KEY 未設定")
|
||
try:
|
||
async with httpx.AsyncClient(timeout=_PROBE_TIMEOUT) as client:
|
||
t0 = asyncio.get_event_loop().time()
|
||
resp = await client.get(
|
||
"https://api.anthropic.com/v1/models",
|
||
headers={
|
||
"x-api-key": settings.CLAUDE_API_KEY,
|
||
"anthropic-version": "2023-06-01",
|
||
},
|
||
)
|
||
latency = (asyncio.get_event_loop().time() - t0) * 1000
|
||
if resp.status_code == 200:
|
||
return ProbeResult(True, "✅ 正常", round(latency, 1))
|
||
return ProbeResult(False, f"❌ HTTP {resp.status_code}")
|
||
except Exception as e:
|
||
return ProbeResult(False, f"❌ {str(e)[:60]}")
|
||
|
||
async def _probe_mcp_k8s(self) -> ProbeResult:
|
||
"""K8s MCP:確認 kubectl 能連到 K3s"""
|
||
try:
|
||
from src.plugins.mcp.providers.k8s_provider import K8sProvider
|
||
provider = K8sProvider()
|
||
if not provider.enabled:
|
||
return ProbeResult(False, "⚠️ K8s MCP 未啟用")
|
||
result = await asyncio.wait_for(
|
||
provider.execute("kubectl_get", {"resource_type": "nodes"}),
|
||
timeout=_PROBE_TIMEOUT,
|
||
)
|
||
if result.success:
|
||
return ProbeResult(True, "✅ 正常")
|
||
return ProbeResult(False, f"⚠️ {result.error[:60] if result.error else '查詢失敗'}")
|
||
except asyncio.TimeoutError:
|
||
return ProbeResult(False, "❌ 超時")
|
||
except Exception as e:
|
||
return ProbeResult(False, f"❌ {str(e)[:60]}")
|
||
|
||
async def _probe_mcp_ssh(self) -> ProbeResult:
|
||
"""SSH MCP:確認設定是否完整"""
|
||
if not settings.SSH_MCP_ENABLED:
|
||
return ProbeResult(False, "⚠️ SSH_MCP_ENABLED=false")
|
||
# 確認 ssh-mcp-key Secret 是否掛載
|
||
import os
|
||
key_path = "/run/secrets/ssh_mcp_key"
|
||
if not os.path.exists(key_path):
|
||
return ProbeResult(False, "⚠️ ssh-mcp-key 未注入 K8s Secret")
|
||
return ProbeResult(True, "✅ 正常")
|
||
|
||
async def _probe_mcp_argocd(self) -> ProbeResult:
|
||
"""ArgoCD MCP:確認 token 設定"""
|
||
if not settings.ARGOCD_MCP_ENABLED:
|
||
return ProbeResult(False, "⚠️ ARGOCD_MCP_ENABLED=false")
|
||
if not settings.ARGOCD_API_TOKEN:
|
||
return ProbeResult(False, "⚠️ ARGOCD_API_TOKEN 未設定")
|
||
return ProbeResult(True, "✅ 設定完整")
|
||
|
||
async def _probe_mcp_sentry(self) -> ProbeResult:
|
||
"""Sentry MCP:確認設定"""
|
||
if not settings.SENTRY_MCP_ENABLED:
|
||
return ProbeResult(False, "⚠️ SENTRY_MCP_ENABLED=false")
|
||
# 確認 SENTRY_AUTH_TOKEN
|
||
sentry_token = getattr(settings, "SENTRY_AUTH_TOKEN", "") or ""
|
||
if not sentry_token:
|
||
return ProbeResult(False, "⚠️ SENTRY_AUTH_TOKEN 未設定")
|
||
return ProbeResult(True, "✅ 設定完整")
|
||
|
||
async def _probe_argocd_sync(self) -> ProbeResult:
|
||
"""ArgoCD 應用同步狀態"""
|
||
if not settings.ARGOCD_API_TOKEN:
|
||
return ProbeResult(False, "⚠️ 未設定 Token,無法查詢")
|
||
try:
|
||
async with httpx.AsyncClient(
|
||
timeout=_PROBE_TIMEOUT,
|
||
verify=False, # ArgoCD self-signed cert
|
||
) as client:
|
||
resp = await client.get(
|
||
f"{settings.ARGOCD_URL}/api/v1/applications/awoooi-prod",
|
||
headers={"Authorization": f"Bearer {settings.ARGOCD_API_TOKEN}"},
|
||
)
|
||
if resp.status_code != 200:
|
||
return ProbeResult(False, f"❌ HTTP {resp.status_code}")
|
||
data = resp.json()
|
||
sync_status = data.get("status", {}).get("sync", {}).get("status", "Unknown")
|
||
health_status = data.get("status", {}).get("health", {}).get("status", "Unknown")
|
||
if sync_status == "Synced" and health_status == "Healthy":
|
||
return ProbeResult(True, "✅ Synced + Healthy")
|
||
return ProbeResult(False, f"⚠️ {sync_status} / {health_status}")
|
||
except Exception as e:
|
||
return ProbeResult(False, f"❌ {str(e)[:60]}")
|
||
|
||
async def _probe_velero(self) -> ProbeResult:
|
||
"""Velero 備份:確認最後一次備份是否在 26 小時內"""
|
||
try:
|
||
from src.plugins.mcp.providers.k8s_provider import K8sProvider
|
||
provider = K8sProvider()
|
||
if not provider.enabled:
|
||
return ProbeResult(False, "⚠️ K8s MCP 未啟用,無法查 Velero")
|
||
result = await asyncio.wait_for(
|
||
provider.execute("kubectl_get", {
|
||
"resource_type": "backups.velero.io",
|
||
"namespace": "velero",
|
||
}),
|
||
timeout=_PROBE_TIMEOUT,
|
||
)
|
||
if not result.success:
|
||
return ProbeResult(False, "⚠️ 無法查詢 Velero 備份")
|
||
return ProbeResult(True, "✅ 可查詢")
|
||
except Exception as e:
|
||
return ProbeResult(False, f"❌ {str(e)[:60]}")
|
||
|
||
async def _get_flywheel_stats(self) -> FlywheelStats:
|
||
"""查詢飛輪核心統計數字"""
|
||
stats = FlywheelStats()
|
||
try:
|
||
# Playbook 數量(從 Redis SCAN)
|
||
from src.core.redis_client import get_redis
|
||
redis = get_redis()
|
||
keys = await redis.keys("playbook:*")
|
||
stats.playbook_count = len(keys)
|
||
except Exception as e:
|
||
logger.debug("heartbeat_playbook_count_failed", error=str(e))
|
||
|
||
try:
|
||
# KM 向量化率(DB 查詢)
|
||
from src.db.base import get_db_context
|
||
from src.db.models import IncidentRecord, KnowledgeEntryRecord
|
||
from sqlalchemy import func, select
|
||
async with get_db_context() as db:
|
||
# KM 總數
|
||
km_total = await db.scalar(select(func.count()).select_from(KnowledgeEntryRecord))
|
||
stats.km_total = km_total or 0
|
||
|
||
# KM 向量化數(embedding IS NOT NULL)
|
||
# KnowledgeEntryRecord ORM 無 embedding 欄位,改用 raw SQL
|
||
from sqlalchemy import text as sa_text
|
||
vec_result = await db.execute(
|
||
sa_text("SELECT COUNT(*) FROM knowledge_entries WHERE embedding IS NOT NULL")
|
||
)
|
||
stats.km_vectorized = vec_result.scalar() or 0
|
||
|
||
# 24h 修復統計
|
||
since = datetime.utcnow() - timedelta(hours=24)
|
||
outcomes = await db.execute(
|
||
select(IncidentRecord.outcome).where(
|
||
IncidentRecord.created_at >= since,
|
||
IncidentRecord.outcome.isnot(None),
|
||
)
|
||
)
|
||
outcome_list = [r[0] for r in outcomes.all() if r[0]]
|
||
stats.attempt_24h = len(outcome_list)
|
||
stats.success_24h = sum(
|
||
1 for o in outcome_list
|
||
if isinstance(o, dict) and o.get("execution_success")
|
||
or isinstance(o, str) and "success" in o.lower()
|
||
)
|
||
|
||
# 最後學習活動
|
||
last_km = await db.scalar(
|
||
select(func.max(KnowledgeEntryRecord.created_at))
|
||
)
|
||
if last_km:
|
||
stats.last_learning_at = last_km
|
||
except Exception as e:
|
||
logger.debug("heartbeat_flywheel_stats_failed", error=str(e))
|
||
|
||
return stats
|
||
|
||
# =========================================================================
|
||
# Warnings 彙整
|
||
# =========================================================================
|
||
|
||
def _build_warnings(self, report: HeartbeatReport) -> list[str]:
|
||
warnings: list[str] = []
|
||
|
||
# Ollama 模型未載入
|
||
for model, loaded in report.ollama_models.items():
|
||
if not loaded:
|
||
warnings.append(f"{model} 未載入,相關功能失效")
|
||
|
||
# AI 服務異常
|
||
for name, probe in report.ai_services.items():
|
||
if not probe.ok and not probe.status.startswith("⚠️"):
|
||
warnings.append(f"{name} 服務異常: {probe.status}")
|
||
|
||
# MCP 設定問題
|
||
for name, probe in report.mcp_providers.items():
|
||
if not probe.ok:
|
||
warnings.append(f"MCP {name}: {probe.status}")
|
||
|
||
# ArgoCD 非 Synced+Healthy
|
||
argocd = report.infra.get("argocd_sync")
|
||
if argocd and not argocd.ok:
|
||
warnings.append(f"ArgoCD: {argocd.status}")
|
||
|
||
# 飛輪警告
|
||
if report.flywheel.playbook_count == 0:
|
||
warnings.append("Playbook 數量為 0,飛輪學習無法啟動")
|
||
if report.flywheel.km_total > 0:
|
||
vec_rate = report.flywheel.km_vectorized / report.flywheel.km_total
|
||
if vec_rate < 0.8:
|
||
warnings.append(
|
||
f"KM 向量化率偏低: {report.flywheel.km_vectorized}/{report.flywheel.km_total}"
|
||
f" ({int(vec_rate*100)}%)"
|
||
)
|
||
|
||
# 2h 沉默(無告警活動)
|
||
if report.flywheel.last_learning_at:
|
||
silence_hours = (now_taipei() - report.flywheel.last_learning_at.replace(
|
||
tzinfo=report.timestamp.tzinfo if report.timestamp.tzinfo else None
|
||
)).total_seconds() / 3600
|
||
if silence_hours > 2:
|
||
warnings.append(f"系統沉默 {silence_hours:.1f}h(無學習活動)")
|
||
|
||
return warnings
|
||
|
||
|
||
def report_to_telegram_html(report: HeartbeatReport) -> str:
|
||
"""
|
||
將 HeartbeatReport 轉換為 Telegram HTML 格式
|
||
|
||
ADR-075 TYPE-1 格式 (2026-04-12 ogt):
|
||
💚 INFO | AWOOOI 系統報告 + ├─/└─ 樹狀結構
|
||
"""
|
||
ts = report.timestamp.strftime("%Y-%m-%d %H:%M (台北)")
|
||
overall_ok = not report.warnings
|
||
|
||
header_icon = "💚" if overall_ok else "⚠️"
|
||
header_label = "全系統正常" if overall_ok else f"需關注 {len(report.warnings)} 項"
|
||
|
||
lines = [
|
||
f"{header_icon} <b>INFO | AWOOOI 系統報告</b>",
|
||
f"⏰ {ts}",
|
||
"──────────────────────",
|
||
"",
|
||
]
|
||
|
||
# --- AI 服務 ---
|
||
ollama = report.ai_services.get("ollama", ProbeResult(False, "❌"))
|
||
ollama_lat = f" {ollama.latency_ms:.0f}ms" if ollama.latency_ms else ""
|
||
models_ok = [m.split(":")[0] for m, ok in report.ollama_models.items() if ok]
|
||
models_str = " / ".join(models_ok) if models_ok else "無模型"
|
||
nem = report.ai_services.get("nemotron", ProbeResult(False, "❌"))
|
||
gem = report.ai_services.get("gemini", ProbeResult(False, "❌"))
|
||
cla = report.ai_services.get("claude", ProbeResult(False, "❌"))
|
||
|
||
lines.append("🤖 <b>AI 服務</b>")
|
||
lines.append(f"├─ Ollama: {ollama.status}{ollama_lat} <code>{html.escape(models_str)}</code>")
|
||
lines.append(f"├─ Nemotron NIM: {nem.status}" + (f" {nem.latency_ms:.0f}ms" if nem.latency_ms else ""))
|
||
lines.append(f"├─ Gemini API: {gem.status}" + (f" {gem.latency_ms:.0f}ms" if gem.latency_ms else ""))
|
||
lines.append(f"└─ Claude API: {cla.status}" + (f" {cla.latency_ms:.0f}ms" if cla.latency_ms else ""))
|
||
lines.append("")
|
||
|
||
# --- MCP Provider ---
|
||
k8s = report.mcp_providers.get("k8s", ProbeResult(False, "❌"))
|
||
ssh = report.mcp_providers.get("ssh", ProbeResult(False, "❌"))
|
||
argocd_mcp = report.mcp_providers.get("argocd", ProbeResult(False, "❌"))
|
||
sentry_mcp = report.mcp_providers.get("sentry", ProbeResult(False, "❌"))
|
||
|
||
lines.append("🔌 <b>MCP Provider</b>")
|
||
lines.append(f"├─ K8s: {k8s.status} SSH: {ssh.status}")
|
||
lines.append(f"└─ ArgoCD: {argocd_mcp.status} Sentry: {sentry_mcp.status}")
|
||
lines.append("")
|
||
|
||
# --- 飛輪狀態 ---
|
||
fw = report.flywheel
|
||
if fw.attempt_24h > 0:
|
||
rate = int(fw.success_24h / fw.attempt_24h * 100)
|
||
repair_str = f"{fw.success_24h}/{fw.attempt_24h} ({rate}%)"
|
||
else:
|
||
repair_str = "0 次"
|
||
km_str = ""
|
||
if fw.km_total > 0:
|
||
vec_rate = int(fw.km_vectorized / fw.km_total * 100)
|
||
km_icon = "✅" if vec_rate >= 90 else "⚠️"
|
||
km_str = f"KM: {km_icon} {fw.km_vectorized}/{fw.km_total} ({vec_rate}%)"
|
||
learn_str = f" 學習: {fw.last_learning_at.strftime('%H:%M')}" if fw.last_learning_at else ""
|
||
|
||
lines.append("🔄 <b>飛輪狀態(24h)</b>")
|
||
lines.append(f"├─ Playbooks: {fw.playbook_count} 修復: {repair_str}")
|
||
lines.append(f"└─ {km_str}{learn_str}" if km_str or learn_str else "└─ KM 統計不可用")
|
||
lines.append("")
|
||
|
||
# --- 基礎設施 ---
|
||
argocd = report.infra.get("argocd_sync", ProbeResult(False, "❌"))
|
||
velero = report.infra.get("velero", ProbeResult(False, "❌"))
|
||
|
||
lines.append("🚀 <b>基礎設施</b>")
|
||
lines.append(f"├─ ArgoCD: {argocd.status}")
|
||
lines.append(f"└─ Velero: {velero.status}")
|
||
|
||
# --- Warnings / 總結 ---
|
||
lines.append("")
|
||
if report.warnings:
|
||
lines.append(f"⚠️ <b>需關注({len(report.warnings)} 項)</b>")
|
||
for w in report.warnings[:-1]:
|
||
lines.append(f"├─ {html.escape(w)}")
|
||
lines.append(f"└─ {html.escape(report.warnings[-1])}")
|
||
else:
|
||
lines.append(f"✅ <b>{header_label}</b>")
|
||
|
||
return "\n".join(lines)
|