Files
awoooi/scripts/alert_chain_smoke_test.py
Your Name 598f33ae8b
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
Deploy Alert Rules / Deploy Prometheus Alert Rules (push) Successful in 22s
CD Pipeline / tests (push) Successful in 3m55s
CD Pipeline / build-and-deploy (push) Successful in 3m31s
CD Pipeline / post-deploy-checks (push) Successful in 1m33s
fix(monitoring): clarify alert chain smoke evidence
2026-05-20 13:11:44 +08:00

590 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
AWOOOI Alert Chain Smoke Test
================================
Wave A.6 (ADR-037): 驗證告警鏈路 E2E 完整性
檢查項目:
1. API Health — /api/v1/health 全組件 UP
2. Alert Chain Metric — awoooi_alert_chain_last_success_timestamp 不超過 2h
3. Webhook 可達性 — /api/v1/webhooks/alertmanager, /signoz, /sentry health
4. Telegram Secret — K8s Secret 存在且非空
5. SigNoz 可達 — 192.168.0.188:3301
6. Prometheus Alertmanager — 192.168.0.188:9093 (可選)
使用方式:
python3 scripts/alert_chain_smoke_test.py [--api-url URL] [--fail-fast]
CI 整合 (cd.yaml):
python3 scripts/alert_chain_smoke_test.py \
--api-url http://localhost:32334 \
--fail-fast
# Phase O-4.5 2026-04-02 (台北時間)
# 建立者: Claude Code (首席架構師)
"""
from __future__ import annotations
import argparse
import json
import os
import re
import shlex
import sys
import time
from dataclasses import dataclass, field
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
# =============================================================================
# 配置
# =============================================================================
DEFAULT_API_URL = "http://192.168.0.125:32334"
SIGNOZ_URL = "http://192.168.0.188:3301"
ALERTMANAGER_URL = "http://192.168.0.188:9093"
PROMETHEUS_URL = "http://192.168.0.110:9090"
# 告警鏈路最大允許靜默時間 (2 小時)
MAX_ALERT_CHAIN_SILENCE_SECONDS = 2 * 60 * 60
TIMEOUT = 10 # 秒
@dataclass(frozen=True)
class HttpGetResult:
status_code: int
text: str
def json(self) -> dict[str, Any]:
return json.loads(self.text)
@dataclass(frozen=True)
class AlertChainMetricSample:
source: str
timestamp: float
evidence_path: str
def http_get(
url: str,
*,
params: dict[str, str] | None = None,
timeout: int = TIMEOUT,
) -> HttpGetResult:
if params:
separator = "&" if "?" in url else "?"
url = f"{url}{separator}{urlencode(params)}"
request = Request(url, headers={"Accept": "application/json,text/plain,*/*"})
try:
with urlopen(request, timeout=timeout) as response:
body = response.read().decode("utf-8", errors="replace")
return HttpGetResult(response.status, body)
except HTTPError as exc:
body = exc.read().decode("utf-8", errors="replace")
return HttpGetResult(exc.code, body)
def _http_error_message(error: Exception) -> str:
if isinstance(error, URLError):
return str(error.reason)
return str(error)
def _statuses_from_env(env_name: str) -> list[str] | None:
"""Return preflight pod statuses supplied by CI, or None to use kubectl."""
if env_name not in os.environ:
return None
return [
line.strip()
for line in os.environ[env_name].splitlines()
if line.strip()
]
def _status_error_from_env(env_name: str) -> str | None:
value = os.environ.get(env_name, "").strip()
return value or None
def _check_running_statuses(
name: str,
statuses: list[str],
empty_message: str,
) -> CheckResult:
running = [s for s in statuses if s == "Running"]
if len(running) == 0:
return CheckResult(name, False, empty_message)
return CheckResult(name, True, f"{len(running)} Pod(s) Running")
def _kubectl_base_command() -> list[str]:
# CI may provide a full safe wrapper such as:
# sudo kubectl --kubeconfig=/etc/rancher/k3s/k3s.yaml --server=https://...
return shlex.split(os.environ.get("AWOOOI_KUBECTL_CMD", "kubectl"))
def _run_kubectl_status_query(label: str) -> list[str] | None:
import subprocess
result = subprocess.run(
[
*_kubectl_base_command(),
"get",
"pods",
"-n",
"observability",
"-l",
f"app.kubernetes.io/name={label}",
"--no-headers",
"-o",
"custom-columns=STATUS:.status.phase",
],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
return None
return [line.strip() for line in result.stdout.splitlines() if line.strip()]
# =============================================================================
# 測試結果
# =============================================================================
@dataclass
class CheckResult:
name: str
passed: bool
message: str
critical: bool = True # critical=False 表示失敗不中斷
@dataclass
class SmokeTestReport:
checks: list[CheckResult] = field(default_factory=list)
start_time: float = field(default_factory=time.time)
def add(self, result: CheckResult) -> None:
self.checks.append(result)
icon = "" if result.passed else ("" if result.critical else "⚠️")
print(f" {icon} [{result.name}] {result.message}")
@property
def passed(self) -> bool:
return all(c.passed for c in self.checks if c.critical)
@property
def failed_critical(self) -> list[CheckResult]:
return [c for c in self.checks if not c.passed and c.critical]
def summary(self) -> str:
total = len(self.checks)
passed = sum(1 for c in self.checks if c.passed)
duration = time.time() - self.start_time
return f"{passed}/{total} checks passed in {duration:.1f}s"
# =============================================================================
# 檢查函數
# =============================================================================
def check_api_health(api_url: str) -> CheckResult:
"""Check 1: API Health — 所有組件必須 UP"""
try:
resp = http_get(f"{api_url}/api/v1/health", timeout=TIMEOUT)
data = resp.json()
if resp.status_code >= 400:
return CheckResult("API Health", False, f"HTTP {resp.status_code}")
if data.get("status") != "healthy":
return CheckResult(
"API Health",
False,
f"API status={data.get('status')} (expected healthy)",
)
# 檢查每個組件
components = data.get("components", {})
down_components = [
name for name, info in components.items()
if info.get("status") != "up"
]
if down_components:
return CheckResult(
"API Health",
False,
f"組件異常: {', '.join(down_components)}",
)
return CheckResult(
"API Health",
True,
f"所有 {len(components)} 個組件 UP ({data.get('environment', 'unknown')})",
)
except (URLError, TimeoutError, OSError, json.JSONDecodeError) as e:
return CheckResult("API Health", False, f"無法連線: {_http_error_message(e)}")
def _escape_prometheus_label_value(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
def _prometheus_alert_chain_samples(
results: list[dict[str, Any]],
) -> list[AlertChainMetricSample]:
samples: list[AlertChainMetricSample] = []
for item in results:
metric = item.get("metric", {})
source = str(metric.get("source", "")).strip().lower()
value = item.get("value", [])
if len(value) < 2 or not source:
continue
try:
timestamp = float(value[1])
except (TypeError, ValueError):
continue
samples.append(AlertChainMetricSample(source, timestamp, "prometheus"))
return samples
_ALERT_CHAIN_METRIC_RE = re.compile(
r'^awoooi_alert_chain_last_success_timestamp(?:\{(?P<labels>[^}]*)\})?\s+'
r'(?P<value>[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)'
)
_SOURCE_LABEL_RE = re.compile(r'(?:^|,)source="(?P<source>[^"]+)"')
def parse_app_alert_chain_metric_samples(metrics_text: str) -> list[AlertChainMetricSample]:
samples: list[AlertChainMetricSample] = []
for raw_line in metrics_text.splitlines():
line = raw_line.strip()
if not line or line.startswith("#"):
continue
metric_match = _ALERT_CHAIN_METRIC_RE.match(line)
if metric_match is None:
continue
labels = metric_match.group("labels") or ""
source_match = _SOURCE_LABEL_RE.search(labels)
if source_match is None:
continue
try:
timestamp = float(metric_match.group("value"))
except ValueError:
continue
samples.append(
AlertChainMetricSample(
source=source_match.group("source").strip().lower(),
timestamp=timestamp,
evidence_path="app_metrics",
)
)
return samples
def _newest_sample_for_source(
samples: list[AlertChainMetricSample],
source: str,
) -> AlertChainMetricSample | None:
source = source.strip().lower()
matches = [sample for sample in samples if sample.source == source]
if not matches:
return None
return max(matches, key=lambda sample: sample.timestamp)
def _alert_chain_metric_result(
sample: AlertChainMetricSample,
*,
fallback: bool = False,
) -> CheckResult:
age_seconds = time.time() - sample.timestamp
age_minutes = age_seconds / 60
if age_seconds > MAX_ALERT_CHAIN_SILENCE_SECONDS:
return CheckResult(
"Alert Chain Metric",
False,
(
f"{sample.source} 告警鏈路已靜默 {age_minutes:.0f} 分鐘 "
f"(evidence={sample.evidence_path}, 超過 120 分鐘閾值)"
),
)
scrape_note = "Prometheus scrape 尚未看到" if fallback else ""
return CheckResult(
"Alert Chain Metric",
True,
(
f"最後 {sample.source} 告警成功: {age_minutes:.0f} 分鐘前 "
f"(evidence={sample.evidence_path}{scrape_note})"
),
)
def check_alert_chain_metric(
prometheus_url: str,
api_url: str,
*,
source: str = "alertmanager",
) -> CheckResult:
"""Check 2: Alertmanager 主鏈路最後成功時間不超過 2 小時.
Prometheus scrape 可能比 app metrics 慢一輪;因此 Prometheus 為第一證據,
缺資料時回查 API /metrics並在訊息中明確標示 evidence path。
"""
query = (
"awoooi_alert_chain_last_success_timestamp"
f'{{source="{_escape_prometheus_label_value(source)}"}}'
)
try:
resp = http_get(
f"{prometheus_url}/api/v1/query",
params={"query": query},
timeout=TIMEOUT,
)
data = resp.json()
if resp.status_code >= 400:
raise URLError(f"Prometheus HTTP {resp.status_code}")
results = data.get("data", {}).get("result", [])
sample = _newest_sample_for_source(
_prometheus_alert_chain_samples(results),
source,
)
if sample:
return _alert_chain_metric_result(sample)
except (URLError, TimeoutError, OSError, json.JSONDecodeError) as e:
prometheus_error = _http_error_message(e)
else:
prometheus_error = "Prometheus 未抓到"
try:
app_resp = http_get(f"{api_url}/metrics", timeout=TIMEOUT)
if app_resp.status_code >= 400:
raise URLError(f"API /metrics HTTP {app_resp.status_code}")
app_sample = _newest_sample_for_source(
parse_app_alert_chain_metric_samples(app_resp.text),
source,
)
if app_sample:
return _alert_chain_metric_result(app_sample, fallback=True)
except (URLError, TimeoutError, OSError) as e:
return CheckResult(
"Alert Chain Metric",
False,
(
f"{source} 指標不存在或不可讀 "
f"(Prometheus: {prometheus_error}; app_metrics: {_http_error_message(e)})"
),
critical=False,
)
return CheckResult(
"Alert Chain Metric",
False,
(
f"{source} 指標不存在 "
f"(Prometheus: {prometheus_error}; app_metrics: no sample)"
),
critical=False,
)
def check_webhook_health(api_url: str) -> list[CheckResult]:
"""Check 3: 所有 Webhook Health Endpoint"""
results = []
webhooks = [
("Alertmanager Webhook", f"{api_url}/api/v1/webhooks/health"),
("SignOz Webhook", f"{api_url}/api/v1/webhooks/signoz/health"),
("Sentry Webhook", f"{api_url}/api/v1/webhooks/sentry/health"),
]
for name, url in webhooks:
try:
resp = http_get(url, timeout=TIMEOUT)
if resp.status_code == 200:
results.append(CheckResult(name, True, f"HTTP 200 OK"))
else:
results.append(
CheckResult(name, False, f"HTTP {resp.status_code}")
)
except (URLError, TimeoutError, OSError) as e:
results.append(CheckResult(name, False, f"無法連線: {_http_error_message(e)}"))
return results
def check_signoz_reachable(signoz_url: str) -> CheckResult:
"""Check 4: SigNoz UI 可達"""
try:
resp = http_get(signoz_url, timeout=TIMEOUT)
# SigNoz UI 通常回 200 或 301/302
if resp.status_code < 400:
return CheckResult("SigNoz", True, f"HTTP {resp.status_code}")
return CheckResult("SigNoz", False, f"HTTP {resp.status_code}", critical=False)
except (URLError, TimeoutError, OSError) as e:
return CheckResult("SigNoz", False, f"無法連線: {_http_error_message(e)}", critical=False)
def check_otel_collector() -> CheckResult:
"""Check 5: OTEL Collector DaemonSet 是否在 K3s 運行"""
preflight_error = _status_error_from_env("AWOOOI_OTEL_COLLECTOR_ERROR")
if preflight_error:
return CheckResult(
"OTEL Collector",
False,
f"host kubectl preflight failed: {preflight_error}",
critical=False,
)
preflight_statuses = _statuses_from_env("AWOOOI_OTEL_COLLECTOR_STATUSES")
if preflight_statuses is not None:
return _check_running_statuses(
"OTEL Collector",
preflight_statuses,
"沒有 Running 的 OTEL Collector Pod",
)
try:
statuses = _run_kubectl_status_query("otel-collector")
if statuses is None:
return CheckResult(
"OTEL Collector", False, "kubectl 查詢失敗", critical=False
)
return _check_running_statuses(
"OTEL Collector",
statuses,
"沒有 Running 的 OTEL Collector Pod",
)
except Exception as e:
return CheckResult(
"OTEL Collector", False, f"無法檢查: {e}", critical=False
)
def check_event_exporter() -> CheckResult:
"""Check 6: Event Exporter 是否在 K3s 運行"""
preflight_error = _status_error_from_env("AWOOOI_EVENT_EXPORTER_ERROR")
if preflight_error:
return CheckResult(
"Event Exporter",
False,
f"host kubectl preflight failed: {preflight_error}",
critical=False,
)
preflight_statuses = _statuses_from_env("AWOOOI_EVENT_EXPORTER_STATUSES")
if preflight_statuses is not None:
return _check_running_statuses(
"Event Exporter",
preflight_statuses,
"沒有 Running 的 Event Exporter Pod",
)
try:
statuses = _run_kubectl_status_query("event-exporter")
if statuses is None:
return CheckResult(
"Event Exporter", False, "kubectl 查詢失敗", critical=False
)
return _check_running_statuses(
"Event Exporter",
statuses,
"沒有 Running 的 Event Exporter Pod",
)
except Exception as e:
return CheckResult(
"Event Exporter", False, f"無法檢查: {e}", critical=False
)
# =============================================================================
# 主程式
# =============================================================================
def run_smoke_test(api_url: str, fail_fast: bool = False) -> SmokeTestReport:
report = SmokeTestReport()
print(f"\n🔍 AWOOOI Alert Chain Smoke Test")
print(f" API: {api_url}")
print(f" 時間: {time.strftime('%Y-%m-%d %H:%M:%S %Z')}")
print("-" * 50)
# Check 1: API Health
report.add(check_api_health(api_url))
if fail_fast and not report.passed:
return report
# Check 2: Alert Chain Metric
report.add(check_alert_chain_metric(PROMETHEUS_URL, api_url))
# Check 3: Webhook Health
for result in check_webhook_health(api_url):
report.add(result)
if fail_fast and not result.passed and result.critical:
return report
# Check 4: SigNoz
report.add(check_signoz_reachable(SIGNOZ_URL))
# Check 5: OTEL Collector
report.add(check_otel_collector())
# Check 6: Event Exporter
report.add(check_event_exporter())
return report
def main() -> int:
parser = argparse.ArgumentParser(description="AWOOOI Alert Chain Smoke Test")
parser.add_argument(
"--api-url", default=DEFAULT_API_URL, help="API base URL"
)
parser.add_argument(
"--fail-fast", action="store_true", help="第一個 critical 失敗即中止"
)
parser.add_argument(
"--json", action="store_true", help="輸出 JSON 格式結果"
)
args = parser.parse_args()
report = run_smoke_test(args.api_url, args.fail_fast)
print("-" * 50)
if report.passed:
print(f"✅ PASSED — {report.summary()}")
else:
print(f"❌ FAILED — {report.summary()}")
if report.failed_critical:
print("\n失敗的 Critical 檢查:")
for c in report.failed_critical:
print(f" - [{c.name}] {c.message}")
if args.json:
output = {
"passed": report.passed,
"summary": report.summary(),
"checks": [
{
"name": c.name,
"passed": c.passed,
"message": c.message,
"critical": c.critical,
}
for c in report.checks
],
}
print("\n" + json.dumps(output, ensure_ascii=False, indent=2))
return 0 if report.passed else 1
if __name__ == "__main__":
sys.exit(main())