#!/usr/bin/env python3
"""
AWOOOI 監控覆蓋率報告生成器
============================
ADR-037 Wave D.2: 生成完整的監控覆蓋率報告
功能:
1. 分析 service-registry.yaml 覆蓋率
2. 檢查告警規則完整性
3. 生成 HTML/JSON 報告
4. 支援 CI/CD 整合
用法:
python ops/monitoring/coverage_report.py
python ops/monitoring/coverage_report.py --format html --output /tmp/report.html
python ops/monitoring/coverage_report.py --format json
python ops/monitoring/coverage_report.py --ci # CI 模式
版本: v1.0
建立: 2026-03-29 (台北時區)
建立者: Claude Code (Phase 21 ADR-037)
"""
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
import yaml
# 配置
SCRIPT_DIR = Path(__file__).parent
REGISTRY_FILE = SCRIPT_DIR / "service-registry.yaml"
ALERT_RULES_DIR = Path(__file__).parent.parent.parent / "k8s" / "monitoring"
# 台北時區
TZ_TAIPEI = ZoneInfo("Asia/Taipei")
def load_registry() -> dict:
"""載入服務註冊表"""
if not REGISTRY_FILE.exists():
return {"services": [], "nodes": [], "api_endpoints": [], "pages": []}
with open(REGISTRY_FILE) as f:
return yaml.safe_load(f) or {}
def load_alert_rules() -> list[dict]:
"""載入所有告警規則"""
rules = []
if not ALERT_RULES_DIR.exists():
return rules
for f in ALERT_RULES_DIR.glob("*.yaml"):
try:
with open(f) as file:
data = yaml.safe_load(file)
if data and "spec" in data and "groups" in data["spec"]:
for group in data["spec"]["groups"]:
for rule in group.get("rules", []):
if "alert" in rule:
rules.append({
"name": rule["alert"],
"severity": rule.get("labels", {}).get("severity", "unknown"),
"service": rule.get("labels", {}).get("service", "unknown"),
"file": f.name,
})
except (yaml.YAMLError, KeyError):
continue
return rules
def analyze_services(registry: dict) -> dict:
"""分析服務監控覆蓋率"""
services = registry.get("services", [])
stats = {
"total": len(services),
"by_type": {},
"by_criticality": {},
"monitoring": {
"prometheus": {"covered": 0, "missing": []},
"sentry": {"covered": 0, "missing": []},
"otel": {"covered": 0, "missing": []},
"langfuse": {"covered": 0, "missing": []},
},
"health_endpoint": {"covered": 0, "missing": []},
"alerts": {"covered": 0, "missing": []},
"auto_repair": {
"enabled": 0,
"p0_p1_without": [],
},
}
for svc in services:
name = svc.get("name", "unknown")
svc_type = svc.get("type", "unknown")
criticality = svc.get("criticality", "P2")
# 按類型統計
stats["by_type"][svc_type] = stats["by_type"].get(svc_type, 0) + 1
# 按關鍵度統計
stats["by_criticality"][criticality] = stats["by_criticality"].get(criticality, 0) + 1
# 監控覆蓋
monitoring = svc.get("monitoring", {})
for key in ["prometheus", "sentry", "otel", "langfuse"]:
if monitoring.get(key):
stats["monitoring"][key]["covered"] += 1
else:
stats["monitoring"][key]["missing"].append(name)
# Health endpoint
if svc.get("health_endpoint"):
stats["health_endpoint"]["covered"] += 1
else:
stats["health_endpoint"]["missing"].append(name)
# 告警規則
if svc.get("alerts"):
stats["alerts"]["covered"] += 1
else:
stats["alerts"]["missing"].append(name)
# 自動修復
auto_repair = svc.get("auto_repair", {})
if auto_repair.get("enabled"):
stats["auto_repair"]["enabled"] += 1
elif criticality in ["P0", "P1"]:
stats["auto_repair"]["p0_p1_without"].append(name)
return stats
def analyze_nodes(registry: dict) -> dict:
"""分析節點監控覆蓋率"""
nodes = registry.get("nodes", [])
stats = {
"total": len(nodes),
"by_role": {},
"with_alerts": 0,
"missing_alerts": [],
}
for node in nodes:
name = node.get("name", "unknown")
role = node.get("role", "unknown")
stats["by_role"][role] = stats["by_role"].get(role, 0) + 1
if node.get("alerts"):
stats["with_alerts"] += 1
else:
stats["missing_alerts"].append(name)
return stats
def analyze_api_endpoints(registry: dict) -> dict:
"""分析 API 端點監控覆蓋率"""
endpoints = registry.get("api_endpoints", [])
stats = {
"total": len(endpoints),
"critical": 0,
"with_slo": 0,
"missing_slo": [],
}
for ep in endpoints:
path = ep.get("path", "unknown")
if ep.get("critical"):
stats["critical"] += 1
if ep.get("slo"):
stats["with_slo"] += 1
else:
stats["missing_slo"].append(path)
return stats
def analyze_alert_rules(registry: dict, alert_rules: list[dict]) -> dict:
"""分析告警規則覆蓋率"""
# 從 registry 收集所有服務
services = {s["name"] for s in registry.get("services", [])}
# 從告警規則收集被覆蓋的服務
covered_by_rules = {r["service"] for r in alert_rules if r["service"] != "unknown"}
# 分析
missing = services - covered_by_rules
stats = {
"total_rules": len(alert_rules),
"by_severity": {},
"services_covered": len(services & covered_by_rules),
"services_missing": list(missing),
}
for rule in alert_rules:
severity = rule["severity"]
stats["by_severity"][severity] = stats["by_severity"].get(severity, 0) + 1
return stats
def calculate_scores(service_stats: dict, node_stats: dict, api_stats: dict, alert_stats: dict) -> dict:
"""計算各維度分數"""
scores = {}
# 服務 Prometheus 覆蓋率
if service_stats["total"] > 0:
scores["prometheus"] = round(
100 * service_stats["monitoring"]["prometheus"]["covered"] / service_stats["total"], 1
)
else:
scores["prometheus"] = 0
# Health endpoint 覆蓋率
if service_stats["total"] > 0:
scores["health_endpoint"] = round(
100 * service_stats["health_endpoint"]["covered"] / service_stats["total"], 1
)
else:
scores["health_endpoint"] = 0
# 告警覆蓋率
if service_stats["total"] > 0:
scores["alerts"] = round(
100 * service_stats["alerts"]["covered"] / service_stats["total"], 1
)
else:
scores["alerts"] = 0
# API SLO 覆蓋率
if api_stats["total"] > 0:
scores["api_slo"] = round(100 * api_stats["with_slo"] / api_stats["total"], 1)
else:
scores["api_slo"] = 0
# P0/P1 自動修復覆蓋率
p0_p1_count = service_stats["by_criticality"].get("P0", 0) + service_stats["by_criticality"].get("P1", 0)
p0_p1_without_repair = len(service_stats["auto_repair"]["p0_p1_without"])
if p0_p1_count > 0:
scores["auto_repair_p0p1"] = round(100 * (p0_p1_count - p0_p1_without_repair) / p0_p1_count, 1)
else:
scores["auto_repair_p0p1"] = 100
# 綜合分數 (加權平均)
weights = {
"prometheus": 0.3,
"health_endpoint": 0.2,
"alerts": 0.2,
"api_slo": 0.15,
"auto_repair_p0p1": 0.15,
}
scores["overall"] = round(sum(scores[k] * weights[k] for k in weights), 1)
return scores
def generate_text_report(
service_stats: dict,
node_stats: dict,
api_stats: dict,
alert_stats: dict,
scores: dict,
) -> str:
"""生成文字報告"""
now = datetime.now(TZ_TAIPEI).strftime("%Y-%m-%d %H:%M:%S")
lines = [
"=" * 70,
f" AWOOOI Monitoring Coverage Report",
f" Generated: {now} (Taipei)",
"=" * 70,
"",
" OVERALL SCORE",
" " + "-" * 40,
f" Overall: {scores['overall']}%",
f" Prometheus: {scores['prometheus']}%",
f" Health Endpoint: {scores['health_endpoint']}%",
f" Alert Rules: {scores['alerts']}%",
f" API SLO: {scores['api_slo']}%",
f" Auto-Repair P0/1: {scores['auto_repair_p0p1']}%",
"",
" SERVICES",
" " + "-" * 40,
f" Total: {service_stats['total']}",
]
# 按類型
lines.append(" By Type:")
for t, count in sorted(service_stats["by_type"].items()):
lines.append(f" - {t}: {count}")
# 按關鍵度
lines.append(" By Criticality:")
for c, count in sorted(service_stats["by_criticality"].items()):
lines.append(f" - {c}: {count}")
# 缺失項目
if service_stats["monitoring"]["prometheus"]["missing"]:
lines.append("")
lines.append(f" Missing Prometheus ({len(service_stats['monitoring']['prometheus']['missing'])}):")
for s in service_stats["monitoring"]["prometheus"]["missing"][:5]:
lines.append(f" - {s}")
if len(service_stats["monitoring"]["prometheus"]["missing"]) > 5:
lines.append(f" ... and {len(service_stats['monitoring']['prometheus']['missing']) - 5} more")
if service_stats["auto_repair"]["p0_p1_without"]:
lines.append("")
lines.append(f" P0/P1 Without Auto-Repair ({len(service_stats['auto_repair']['p0_p1_without'])}):")
for s in service_stats["auto_repair"]["p0_p1_without"]:
lines.append(f" - {s}")
# 節點
lines.extend([
"",
" NODES",
" " + "-" * 40,
f" Total: {node_stats['total']}",
f" With Alerts: {node_stats['with_alerts']}",
])
# API
lines.extend([
"",
" API ENDPOINTS",
" " + "-" * 40,
f" Total: {api_stats['total']}",
f" Critical: {api_stats['critical']}",
f" With SLO: {api_stats['with_slo']}",
])
# 告警規則
lines.extend([
"",
" ALERT RULES",
" " + "-" * 40,
f" Total Rules: {alert_stats['total_rules']}",
])
lines.append(" By Severity:")
for sev, count in sorted(alert_stats["by_severity"].items()):
lines.append(f" - {sev}: {count}")
lines.extend([
"",
"=" * 70,
])
# CI 結果
if scores["overall"] >= 90:
lines.append(" RESULT: PASS (>= 90%)")
elif scores["overall"] >= 80:
lines.append(" RESULT: WARNING (80-89%)")
else:
lines.append(" RESULT: FAIL (< 80%)")
lines.append("=" * 70)
return "\n".join(lines)
def generate_html_report(
service_stats: dict,
node_stats: dict,
api_stats: dict,
alert_stats: dict,
scores: dict,
) -> str:
"""生成 HTML 報告"""
now = datetime.now(TZ_TAIPEI).strftime("%Y-%m-%d %H:%M:%S")
def score_class(score):
if score >= 90:
return "good"
elif score >= 80:
return "warning"
return "critical"
html = f"""
AWOOOI Monitoring Coverage Report
AWOOOI Monitoring Coverage Report
Generated: {now} (Taipei) | ADR-037
{scores['overall']}%
Overall Score
{scores['prometheus']}%
Prometheus
{scores['health_endpoint']}%
Health Endpoint
{scores['alerts']}%
Alert Rules
{scores['api_slo']}%
API SLO
{scores['auto_repair_p0p1']}%
Auto-Repair P0/P1
Services ({service_stats['total']})
| Dimension |
Covered |
Missing |
Coverage |
| Prometheus |
{service_stats['monitoring']['prometheus']['covered']} |
{len(service_stats['monitoring']['prometheus']['missing'])} |
{scores['prometheus']}% |
| Health Endpoint |
{service_stats['health_endpoint']['covered']} |
{len(service_stats['health_endpoint']['missing'])} |
{scores['health_endpoint']}% |
| Alert Rules |
{service_stats['alerts']['covered']} |
{len(service_stats['alerts']['missing'])} |
{scores['alerts']}% |
"""
# P0/P1 without auto-repair
if service_stats["auto_repair"]["p0_p1_without"]:
html += """
P0/P1 Without Auto-Repair
"""
for s in service_stats["auto_repair"]["p0_p1_without"]:
html += f" - {s}
\n"
html += """
"""
html += f"""
Alert Rules ({alert_stats['total_rules']})
| Severity |
Count |
"""
for sev in ["critical", "warning", "info"]:
count = alert_stats["by_severity"].get(sev, 0)
badge_class = "critical" if sev == "critical" else ("warning" if sev == "warning" else "good")
html += f"""
| {sev} |
{count} |
"""
html += f"""
API Endpoints ({api_stats['total']})
| Metric |
Value |
| Critical Endpoints |
{api_stats['critical']} |
| With SLO Defined |
{api_stats['with_slo']} |
Nodes ({node_stats['total']})
| Role |
Count |
"""
for role, count in sorted(node_stats["by_role"].items()):
html += f"""
| {role} |
{count} |
"""
html += """
"""
return html
def main():
parser = argparse.ArgumentParser(description="AWOOOI Monitoring Coverage Report")
parser.add_argument("--format", choices=["text", "html", "json"], default="text")
parser.add_argument("--output", "-o", type=Path, help="Output file path")
parser.add_argument("--ci", action="store_true", help="CI mode: exit 1 if < 80%")
args = parser.parse_args()
# 載入資料
registry = load_registry()
alert_rules = load_alert_rules()
# 分析
service_stats = analyze_services(registry)
node_stats = analyze_nodes(registry)
api_stats = analyze_api_endpoints(registry)
alert_stats = analyze_alert_rules(registry, alert_rules)
# 計算分數
scores = calculate_scores(service_stats, node_stats, api_stats, alert_stats)
# 生成報告
if args.format == "json":
report = json.dumps({
"generated_at": datetime.now(TZ_TAIPEI).isoformat(),
"scores": scores,
"services": service_stats,
"nodes": node_stats,
"api_endpoints": api_stats,
"alert_rules": alert_stats,
}, indent=2, ensure_ascii=False)
elif args.format == "html":
report = generate_html_report(service_stats, node_stats, api_stats, alert_stats, scores)
else:
report = generate_text_report(service_stats, node_stats, api_stats, alert_stats, scores)
# 輸出
if args.output:
args.output.write_text(report)
print(f"Report saved to: {args.output}")
else:
print(report)
# CI 模式
if args.ci and scores["overall"] < 80:
sys.exit(1)
if __name__ == "__main__":
main()