424 lines
13 KiB
Python
424 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
AWOOOI Alert Chain Smoke Test
|
|
================================
|
|
Wave A.6 (ADR-037): 驗證告警鏈路 E2E 完整性
|
|
|
|
檢查項目:
|
|
1. API Health — /api/v1/health 全組件 UP
|
|
2. Alert Chain Metric — awoooi_alert_chain_last_success_timestamp 不超過 2h
|
|
3. Webhook 可達性 — /api/v1/webhooks/alertmanager, /signoz, /sentry health
|
|
4. Telegram Secret — K8s Secret 存在且非空
|
|
5. SigNoz 可達 — 192.168.0.188:3301
|
|
6. Prometheus Alertmanager — 192.168.0.188:9093 (可選)
|
|
|
|
使用方式:
|
|
python3 scripts/alert_chain_smoke_test.py [--api-url URL] [--fail-fast]
|
|
|
|
CI 整合 (cd.yaml):
|
|
python3 scripts/alert_chain_smoke_test.py \
|
|
--api-url http://localhost:32334 \
|
|
--fail-fast
|
|
|
|
# Phase O-4.5 2026-04-02 (台北時間)
|
|
# 建立者: Claude Code (首席架構師)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shlex
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
try:
|
|
import requests
|
|
except ImportError:
|
|
print("❌ 需要安裝 requests: pip install requests")
|
|
sys.exit(1)
|
|
|
|
# =============================================================================
|
|
# 配置
|
|
# =============================================================================
|
|
DEFAULT_API_URL = "http://192.168.0.125:32334"
|
|
SIGNOZ_URL = "http://192.168.0.188:3301"
|
|
ALERTMANAGER_URL = "http://192.168.0.188:9093"
|
|
PROMETHEUS_URL = "http://192.168.0.110:9090"
|
|
|
|
# 告警鏈路最大允許靜默時間 (2 小時)
|
|
MAX_ALERT_CHAIN_SILENCE_SECONDS = 2 * 60 * 60
|
|
|
|
TIMEOUT = 10 # 秒
|
|
|
|
|
|
def _statuses_from_env(env_name: str) -> list[str] | None:
|
|
"""Return preflight pod statuses supplied by CI, or None to use kubectl."""
|
|
if env_name not in os.environ:
|
|
return None
|
|
return [
|
|
line.strip()
|
|
for line in os.environ[env_name].splitlines()
|
|
if line.strip()
|
|
]
|
|
|
|
|
|
def _status_error_from_env(env_name: str) -> str | None:
|
|
value = os.environ.get(env_name, "").strip()
|
|
return value or None
|
|
|
|
|
|
def _check_running_statuses(
|
|
name: str,
|
|
statuses: list[str],
|
|
empty_message: str,
|
|
) -> CheckResult:
|
|
running = [s for s in statuses if s == "Running"]
|
|
if len(running) == 0:
|
|
return CheckResult(name, False, empty_message)
|
|
return CheckResult(name, True, f"{len(running)} Pod(s) Running")
|
|
|
|
|
|
def _kubectl_base_command() -> list[str]:
|
|
# CI may provide a full safe wrapper such as:
|
|
# sudo kubectl --kubeconfig=/etc/rancher/k3s/k3s.yaml --server=https://...
|
|
return shlex.split(os.environ.get("AWOOOI_KUBECTL_CMD", "kubectl"))
|
|
|
|
|
|
def _run_kubectl_status_query(label: str) -> list[str] | None:
|
|
import subprocess
|
|
|
|
result = subprocess.run(
|
|
[
|
|
*_kubectl_base_command(),
|
|
"get",
|
|
"pods",
|
|
"-n",
|
|
"observability",
|
|
"-l",
|
|
f"app.kubernetes.io/name={label}",
|
|
"--no-headers",
|
|
"-o",
|
|
"custom-columns=STATUS:.status.phase",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=15,
|
|
)
|
|
if result.returncode != 0:
|
|
return None
|
|
return [line.strip() for line in result.stdout.splitlines() if line.strip()]
|
|
|
|
|
|
# =============================================================================
|
|
# 測試結果
|
|
# =============================================================================
|
|
@dataclass
|
|
class CheckResult:
|
|
name: str
|
|
passed: bool
|
|
message: str
|
|
critical: bool = True # critical=False 表示失敗不中斷
|
|
|
|
|
|
@dataclass
|
|
class SmokeTestReport:
|
|
checks: list[CheckResult] = field(default_factory=list)
|
|
start_time: float = field(default_factory=time.time)
|
|
|
|
def add(self, result: CheckResult) -> None:
|
|
self.checks.append(result)
|
|
icon = "✅" if result.passed else ("❌" if result.critical else "⚠️")
|
|
print(f" {icon} [{result.name}] {result.message}")
|
|
|
|
@property
|
|
def passed(self) -> bool:
|
|
return all(c.passed for c in self.checks if c.critical)
|
|
|
|
@property
|
|
def failed_critical(self) -> list[CheckResult]:
|
|
return [c for c in self.checks if not c.passed and c.critical]
|
|
|
|
def summary(self) -> str:
|
|
total = len(self.checks)
|
|
passed = sum(1 for c in self.checks if c.passed)
|
|
duration = time.time() - self.start_time
|
|
return f"{passed}/{total} checks passed in {duration:.1f}s"
|
|
|
|
|
|
# =============================================================================
|
|
# 檢查函數
|
|
# =============================================================================
|
|
def check_api_health(api_url: str) -> CheckResult:
|
|
"""Check 1: API Health — 所有組件必須 UP"""
|
|
try:
|
|
resp = requests.get(f"{api_url}/api/v1/health", timeout=TIMEOUT)
|
|
data = resp.json()
|
|
|
|
if data.get("status") != "healthy":
|
|
return CheckResult(
|
|
"API Health",
|
|
False,
|
|
f"API status={data.get('status')} (expected healthy)",
|
|
)
|
|
|
|
# 檢查每個組件
|
|
components = data.get("components", {})
|
|
down_components = [
|
|
name for name, info in components.items()
|
|
if info.get("status") != "up"
|
|
]
|
|
|
|
if down_components:
|
|
return CheckResult(
|
|
"API Health",
|
|
False,
|
|
f"組件異常: {', '.join(down_components)}",
|
|
)
|
|
|
|
return CheckResult(
|
|
"API Health",
|
|
True,
|
|
f"所有 {len(components)} 個組件 UP ({data.get('environment', 'unknown')})",
|
|
)
|
|
except requests.RequestException as e:
|
|
return CheckResult("API Health", False, f"無法連線: {e}")
|
|
|
|
|
|
def check_alert_chain_metric(prometheus_url: str) -> CheckResult:
|
|
"""Check 2: 告警鏈路最後成功時間不超過 2 小時"""
|
|
try:
|
|
resp = requests.get(
|
|
f"{prometheus_url}/api/v1/query",
|
|
params={"query": "awoooi_alert_chain_last_success_timestamp"},
|
|
timeout=TIMEOUT,
|
|
)
|
|
data = resp.json()
|
|
results = data.get("data", {}).get("result", [])
|
|
|
|
if not results:
|
|
return CheckResult(
|
|
"Alert Chain Metric",
|
|
False,
|
|
"awoooi_alert_chain_last_success_timestamp 指標不存在 (Prometheus 未抓到)",
|
|
critical=False, # 指標可能剛啟動
|
|
)
|
|
|
|
last_success = float(results[0]["value"][1])
|
|
age_seconds = time.time() - last_success
|
|
age_minutes = age_seconds / 60
|
|
|
|
if age_seconds > MAX_ALERT_CHAIN_SILENCE_SECONDS:
|
|
return CheckResult(
|
|
"Alert Chain Metric",
|
|
False,
|
|
f"告警鏈路已靜默 {age_minutes:.0f} 分鐘 (超過 120 分鐘閾值)",
|
|
)
|
|
|
|
return CheckResult(
|
|
"Alert Chain Metric",
|
|
True,
|
|
f"最後告警成功: {age_minutes:.0f} 分鐘前",
|
|
)
|
|
except requests.RequestException as e:
|
|
return CheckResult(
|
|
"Alert Chain Metric", False, f"無法查詢 Prometheus: {e}", critical=False
|
|
)
|
|
|
|
|
|
def check_webhook_health(api_url: str) -> list[CheckResult]:
|
|
"""Check 3: 所有 Webhook Health Endpoint"""
|
|
results = []
|
|
webhooks = [
|
|
("Alertmanager Webhook", f"{api_url}/api/v1/webhooks/health"),
|
|
("SignOz Webhook", f"{api_url}/api/v1/webhooks/signoz/health"),
|
|
("Sentry Webhook", f"{api_url}/api/v1/webhooks/sentry/health"),
|
|
]
|
|
|
|
for name, url in webhooks:
|
|
try:
|
|
resp = requests.get(url, timeout=TIMEOUT)
|
|
if resp.status_code == 200:
|
|
results.append(CheckResult(name, True, f"HTTP 200 OK"))
|
|
else:
|
|
results.append(
|
|
CheckResult(name, False, f"HTTP {resp.status_code}")
|
|
)
|
|
except requests.RequestException as e:
|
|
results.append(CheckResult(name, False, f"無法連線: {e}"))
|
|
|
|
return results
|
|
|
|
|
|
def check_signoz_reachable(signoz_url: str) -> CheckResult:
|
|
"""Check 4: SigNoz UI 可達"""
|
|
try:
|
|
resp = requests.get(signoz_url, timeout=TIMEOUT)
|
|
# SigNoz UI 通常回 200 或 301/302
|
|
if resp.status_code < 400:
|
|
return CheckResult("SigNoz", True, f"HTTP {resp.status_code}")
|
|
return CheckResult("SigNoz", False, f"HTTP {resp.status_code}", critical=False)
|
|
except requests.RequestException as e:
|
|
return CheckResult("SigNoz", False, f"無法連線: {e}", critical=False)
|
|
|
|
|
|
def check_otel_collector() -> CheckResult:
|
|
"""Check 5: OTEL Collector DaemonSet 是否在 K3s 運行"""
|
|
preflight_error = _status_error_from_env("AWOOOI_OTEL_COLLECTOR_ERROR")
|
|
if preflight_error:
|
|
return CheckResult(
|
|
"OTEL Collector",
|
|
False,
|
|
f"host kubectl preflight failed: {preflight_error}",
|
|
critical=False,
|
|
)
|
|
|
|
preflight_statuses = _statuses_from_env("AWOOOI_OTEL_COLLECTOR_STATUSES")
|
|
if preflight_statuses is not None:
|
|
return _check_running_statuses(
|
|
"OTEL Collector",
|
|
preflight_statuses,
|
|
"沒有 Running 的 OTEL Collector Pod",
|
|
)
|
|
|
|
try:
|
|
statuses = _run_kubectl_status_query("otel-collector")
|
|
if statuses is None:
|
|
return CheckResult(
|
|
"OTEL Collector", False, "kubectl 查詢失敗", critical=False
|
|
)
|
|
|
|
return _check_running_statuses(
|
|
"OTEL Collector",
|
|
statuses,
|
|
"沒有 Running 的 OTEL Collector Pod",
|
|
)
|
|
except Exception as e:
|
|
return CheckResult(
|
|
"OTEL Collector", False, f"無法檢查: {e}", critical=False
|
|
)
|
|
|
|
|
|
def check_event_exporter() -> CheckResult:
|
|
"""Check 6: Event Exporter 是否在 K3s 運行"""
|
|
preflight_error = _status_error_from_env("AWOOOI_EVENT_EXPORTER_ERROR")
|
|
if preflight_error:
|
|
return CheckResult(
|
|
"Event Exporter",
|
|
False,
|
|
f"host kubectl preflight failed: {preflight_error}",
|
|
critical=False,
|
|
)
|
|
|
|
preflight_statuses = _statuses_from_env("AWOOOI_EVENT_EXPORTER_STATUSES")
|
|
if preflight_statuses is not None:
|
|
return _check_running_statuses(
|
|
"Event Exporter",
|
|
preflight_statuses,
|
|
"沒有 Running 的 Event Exporter Pod",
|
|
)
|
|
|
|
try:
|
|
statuses = _run_kubectl_status_query("event-exporter")
|
|
if statuses is None:
|
|
return CheckResult(
|
|
"Event Exporter", False, "kubectl 查詢失敗", critical=False
|
|
)
|
|
|
|
return _check_running_statuses(
|
|
"Event Exporter",
|
|
statuses,
|
|
"沒有 Running 的 Event Exporter Pod",
|
|
)
|
|
except Exception as e:
|
|
return CheckResult(
|
|
"Event Exporter", False, f"無法檢查: {e}", critical=False
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# 主程式
|
|
# =============================================================================
|
|
def run_smoke_test(api_url: str, fail_fast: bool = False) -> SmokeTestReport:
|
|
report = SmokeTestReport()
|
|
|
|
print(f"\n🔍 AWOOOI Alert Chain Smoke Test")
|
|
print(f" API: {api_url}")
|
|
print(f" 時間: {time.strftime('%Y-%m-%d %H:%M:%S %Z')}")
|
|
print("-" * 50)
|
|
|
|
# Check 1: API Health
|
|
report.add(check_api_health(api_url))
|
|
if fail_fast and not report.passed:
|
|
return report
|
|
|
|
# Check 2: Alert Chain Metric
|
|
report.add(check_alert_chain_metric(PROMETHEUS_URL))
|
|
|
|
# Check 3: Webhook Health
|
|
for result in check_webhook_health(api_url):
|
|
report.add(result)
|
|
if fail_fast and not result.passed and result.critical:
|
|
return report
|
|
|
|
# Check 4: SigNoz
|
|
report.add(check_signoz_reachable(SIGNOZ_URL))
|
|
|
|
# Check 5: OTEL Collector
|
|
report.add(check_otel_collector())
|
|
|
|
# Check 6: Event Exporter
|
|
report.add(check_event_exporter())
|
|
|
|
return report
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="AWOOOI Alert Chain Smoke Test")
|
|
parser.add_argument(
|
|
"--api-url", default=DEFAULT_API_URL, help="API base URL"
|
|
)
|
|
parser.add_argument(
|
|
"--fail-fast", action="store_true", help="第一個 critical 失敗即中止"
|
|
)
|
|
parser.add_argument(
|
|
"--json", action="store_true", help="輸出 JSON 格式結果"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
report = run_smoke_test(args.api_url, args.fail_fast)
|
|
|
|
print("-" * 50)
|
|
if report.passed:
|
|
print(f"✅ PASSED — {report.summary()}")
|
|
else:
|
|
print(f"❌ FAILED — {report.summary()}")
|
|
if report.failed_critical:
|
|
print("\n失敗的 Critical 檢查:")
|
|
for c in report.failed_critical:
|
|
print(f" - [{c.name}] {c.message}")
|
|
|
|
if args.json:
|
|
output = {
|
|
"passed": report.passed,
|
|
"summary": report.summary(),
|
|
"checks": [
|
|
{
|
|
"name": c.name,
|
|
"passed": c.passed,
|
|
"message": c.message,
|
|
"critical": c.critical,
|
|
}
|
|
for c in report.checks
|
|
],
|
|
}
|
|
print("\n" + json.dumps(output, ensure_ascii=False, indent=2))
|
|
|
|
return 0 if report.passed else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|