1246 lines
40 KiB
Python
1246 lines
40 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
AWOOOI Alert Chain Smoke Test
|
||
================================
|
||
Wave A.6 (ADR-037): 驗證告警鏈路 E2E 完整性
|
||
|
||
檢查項目:
|
||
1. API Health — /api/v1/health 核心組件 UP,AI provider 降級列為警告
|
||
2. Alert Chain Metric — awoooi_alert_chain_last_success_timestamp 不超過 2h
|
||
3. Webhook 可達性 — /api/v1/webhooks/alertmanager, /signoz, /sentry health
|
||
4. Telegram Secret — K8s Secret 存在且非空
|
||
5. SigNoz 可達 — 192.168.0.188:3301
|
||
6. Prometheus Alertmanager — 192.168.0.188:9093 (可選)
|
||
|
||
使用方式:
|
||
python3 scripts/alert_chain_smoke_test.py [--api-url URL] [--fail-fast]
|
||
|
||
CI 整合 (cd.yaml):
|
||
python3 scripts/alert_chain_smoke_test.py \
|
||
--api-url https://awoooi.wooo.work \
|
||
--fail-fast
|
||
|
||
# Phase O-4.5 2026-04-02 (台北時間)
|
||
# 建立者: Claude Code (首席架構師)
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import re
|
||
import shlex
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass, field
|
||
from typing import Any
|
||
from urllib.error import HTTPError, URLError
|
||
from urllib.parse import urlencode
|
||
from urllib.request import Request, urlopen
|
||
|
||
# =============================================================================
|
||
# 配置
|
||
# =============================================================================
|
||
def _env_int(name: str, default: int) -> int:
|
||
try:
|
||
return int(os.environ.get(name, str(default)))
|
||
except ValueError:
|
||
return default
|
||
|
||
|
||
def _env_float(name: str, default: float) -> float:
|
||
try:
|
||
return float(os.environ.get(name, str(default)))
|
||
except ValueError:
|
||
return default
|
||
|
||
|
||
DEFAULT_API_URL = "https://awoooi.wooo.work"
|
||
SIGNOZ_URL = "http://192.168.0.188:3301"
|
||
ALERTMANAGER_URL = "http://192.168.0.188:9093"
|
||
PROMETHEUS_URL = "http://192.168.0.110:9090"
|
||
|
||
# 告警鏈路最大允許靜默時間 (2 小時)
|
||
MAX_ALERT_CHAIN_SILENCE_SECONDS = 2 * 60 * 60
|
||
|
||
TIMEOUT = 10 # 秒
|
||
API_HEALTH_TIMEOUT = _env_int("ALERT_CHAIN_API_HEALTH_TIMEOUT", 20)
|
||
API_HEALTH_ATTEMPTS = max(
|
||
1,
|
||
_env_int("ALERT_CHAIN_API_HEALTH_ATTEMPTS", 3),
|
||
)
|
||
API_HEALTH_RETRY_DELAY = _env_float("ALERT_CHAIN_API_HEALTH_RETRY_DELAY", 3.0)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class HttpGetResult:
|
||
status_code: int
|
||
text: str
|
||
|
||
def json(self) -> dict[str, Any]:
|
||
return json.loads(self.text)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class AlertChainMetricSample:
|
||
source: str
|
||
timestamp: float
|
||
evidence_path: str
|
||
|
||
|
||
def http_get(
|
||
url: str,
|
||
*,
|
||
params: dict[str, str] | None = None,
|
||
timeout: int = TIMEOUT,
|
||
) -> HttpGetResult:
|
||
if params:
|
||
separator = "&" if "?" in url else "?"
|
||
url = f"{url}{separator}{urlencode(params)}"
|
||
|
||
request = Request(url, headers={"Accept": "application/json,text/plain,*/*"})
|
||
try:
|
||
with urlopen(request, timeout=timeout) as response:
|
||
body = response.read().decode("utf-8", errors="replace")
|
||
return HttpGetResult(response.status, body)
|
||
except HTTPError as exc:
|
||
body = exc.read().decode("utf-8", errors="replace")
|
||
return HttpGetResult(exc.code, body)
|
||
|
||
|
||
def http_post_json(
|
||
url: str,
|
||
payload: dict[str, Any],
|
||
*,
|
||
headers: dict[str, str] | None = None,
|
||
timeout: int = TIMEOUT,
|
||
) -> HttpGetResult:
|
||
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||
request_headers = {
|
||
"Accept": "application/json,text/plain,*/*",
|
||
"Content-Type": "application/json",
|
||
**(headers or {}),
|
||
}
|
||
request = Request(url, data=body, headers=request_headers, method="POST")
|
||
try:
|
||
with urlopen(request, timeout=timeout) as response:
|
||
response_body = response.read().decode("utf-8", errors="replace")
|
||
return HttpGetResult(response.status, response_body)
|
||
except HTTPError as exc:
|
||
response_body = exc.read().decode("utf-8", errors="replace")
|
||
return HttpGetResult(exc.code, response_body)
|
||
|
||
|
||
def _http_error_message(error: Exception) -> str:
|
||
if isinstance(error, URLError):
|
||
return str(error.reason)
|
||
return str(error)
|
||
|
||
|
||
def _response_body_preview(text: str, limit: int = 240) -> str:
|
||
cleaned = " ".join((text or "").split())
|
||
if not cleaned:
|
||
return "<empty body>"
|
||
if len(cleaned) <= limit:
|
||
return cleaned
|
||
return f"{cleaned[:limit]}..."
|
||
|
||
|
||
def _response_json(resp: HttpGetResult) -> dict[str, Any] | None:
|
||
text = (resp.text or "").strip()
|
||
if not text:
|
||
return None
|
||
return resp.json()
|
||
|
||
|
||
def _api_health_probe_summary(attempt: int) -> str:
|
||
return (
|
||
f"attempts={attempt}/{API_HEALTH_ATTEMPTS}, "
|
||
f"timeout={API_HEALTH_TIMEOUT}s"
|
||
)
|
||
|
||
|
||
def _statuses_from_env(env_name: str) -> list[str] | None:
|
||
"""Return preflight pod statuses supplied by CI, or None to use kubectl."""
|
||
if env_name not in os.environ:
|
||
return None
|
||
return [
|
||
line.strip()
|
||
for line in os.environ[env_name].splitlines()
|
||
if line.strip()
|
||
]
|
||
|
||
|
||
def _status_error_from_env(env_name: str) -> str | None:
|
||
value = os.environ.get(env_name, "").strip()
|
||
return value or None
|
||
|
||
|
||
def _check_running_statuses(
|
||
name: str,
|
||
statuses: list[str],
|
||
empty_message: str,
|
||
) -> CheckResult:
|
||
running = [s for s in statuses if s == "Running"]
|
||
if len(running) == 0:
|
||
return CheckResult(name, False, empty_message)
|
||
return CheckResult(name, True, f"{len(running)} Pod(s) Running")
|
||
|
||
|
||
def _kubectl_base_command() -> list[str]:
|
||
# CI may provide a full safe wrapper such as:
|
||
# sudo kubectl --kubeconfig=/etc/rancher/k3s/k3s.yaml --server=https://...
|
||
return shlex.split(os.environ.get("AWOOOI_KUBECTL_CMD", "kubectl"))
|
||
|
||
|
||
def _run_kubectl_status_query(label: str) -> list[str] | None:
|
||
import subprocess
|
||
|
||
result = subprocess.run(
|
||
[
|
||
*_kubectl_base_command(),
|
||
"get",
|
||
"pods",
|
||
"-n",
|
||
"observability",
|
||
"-l",
|
||
f"app.kubernetes.io/name={label}",
|
||
"--no-headers",
|
||
"-o",
|
||
"custom-columns=STATUS:.status.phase",
|
||
],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=15,
|
||
)
|
||
if result.returncode != 0:
|
||
return None
|
||
return [line.strip() for line in result.stdout.splitlines() if line.strip()]
|
||
|
||
|
||
# =============================================================================
|
||
# 測試結果
|
||
# =============================================================================
|
||
@dataclass
|
||
class CheckResult:
|
||
name: str
|
||
passed: bool
|
||
message: str
|
||
critical: bool = True # critical=False 表示失敗不中斷
|
||
|
||
|
||
@dataclass
|
||
class SmokeTestReport:
|
||
checks: list[CheckResult] = field(default_factory=list)
|
||
start_time: float = field(default_factory=time.time)
|
||
|
||
def add(self, result: CheckResult) -> None:
|
||
self.checks.append(result)
|
||
icon = "✅" if result.passed else ("❌" if result.critical else "⚠️")
|
||
print(f" {icon} [{result.name}] {result.message}")
|
||
|
||
@property
|
||
def passed(self) -> bool:
|
||
return all(c.passed for c in self.checks if c.critical)
|
||
|
||
@property
|
||
def failed_critical(self) -> list[CheckResult]:
|
||
return [c for c in self.checks if not c.passed and c.critical]
|
||
|
||
def summary(self) -> str:
|
||
total = len(self.checks)
|
||
passed = sum(1 for c in self.checks if c.passed)
|
||
duration = time.time() - self.start_time
|
||
return f"{passed}/{total} checks passed in {duration:.1f}s"
|
||
|
||
|
||
# =============================================================================
|
||
# 檢查函數
|
||
# =============================================================================
|
||
def check_api_health(api_url: str) -> CheckResult:
|
||
"""Check 1: API Health — core runtime must be up; provider degradation is warning evidence."""
|
||
last_error = "unknown"
|
||
resp: HttpGetResult | None = None
|
||
data: dict[str, Any] | None = None
|
||
used_attempt = 0
|
||
|
||
for attempt in range(1, API_HEALTH_ATTEMPTS + 1):
|
||
used_attempt = attempt
|
||
try:
|
||
resp = http_get(
|
||
f"{api_url}/api/v1/health",
|
||
timeout=API_HEALTH_TIMEOUT,
|
||
)
|
||
data = resp.json()
|
||
except (URLError, TimeoutError, OSError, json.JSONDecodeError) as e:
|
||
last_error = _http_error_message(e)
|
||
if attempt < API_HEALTH_ATTEMPTS:
|
||
time.sleep(API_HEALTH_RETRY_DELAY)
|
||
continue
|
||
return CheckResult(
|
||
"API Health",
|
||
False,
|
||
(
|
||
f"無法連線: {last_error} "
|
||
f"({_api_health_probe_summary(attempt)})"
|
||
),
|
||
)
|
||
|
||
if resp.status_code >= 500 and attempt < API_HEALTH_ATTEMPTS:
|
||
last_error = f"HTTP {resp.status_code}"
|
||
time.sleep(API_HEALTH_RETRY_DELAY)
|
||
continue
|
||
|
||
break
|
||
|
||
if resp is None or data is None:
|
||
return CheckResult("API Health", False, f"無法連線: {last_error}")
|
||
|
||
try:
|
||
if resp.status_code >= 400:
|
||
return CheckResult(
|
||
"API Health",
|
||
False,
|
||
(
|
||
f"HTTP {resp.status_code} "
|
||
f"({_api_health_probe_summary(used_attempt)})"
|
||
),
|
||
)
|
||
|
||
components = data.get("components", {})
|
||
core_components = ("api", "postgresql", "redis")
|
||
down_core_components = [
|
||
name for name in core_components
|
||
if components.get(name, {}).get("status") != "up"
|
||
]
|
||
if down_core_components:
|
||
return CheckResult(
|
||
"API Health",
|
||
False,
|
||
f"核心組件異常: {', '.join(down_core_components)}",
|
||
)
|
||
|
||
down_components = [
|
||
name for name, info in components.items()
|
||
if info.get("status") != "up"
|
||
]
|
||
if down_components:
|
||
return CheckResult(
|
||
"API Health",
|
||
True,
|
||
(
|
||
f"核心組件 UP;非阻塞降級: {', '.join(down_components)} "
|
||
f"({_api_health_probe_summary(used_attempt)})"
|
||
),
|
||
)
|
||
|
||
return CheckResult(
|
||
"API Health",
|
||
True,
|
||
(
|
||
f"所有 {len(components)} 個組件 UP "
|
||
f"({data.get('environment', 'unknown')}; "
|
||
f"{_api_health_probe_summary(used_attempt)})"
|
||
),
|
||
)
|
||
except (URLError, TimeoutError, OSError, json.JSONDecodeError) as e:
|
||
return CheckResult(
|
||
"API Health",
|
||
False,
|
||
(
|
||
f"無法連線: {_http_error_message(e)} "
|
||
f"({_api_health_probe_summary(used_attempt)})"
|
||
),
|
||
)
|
||
|
||
|
||
def _escape_prometheus_label_value(value: str) -> str:
|
||
return value.replace("\\", "\\\\").replace('"', '\\"')
|
||
|
||
|
||
def _prometheus_alert_chain_samples(
|
||
results: list[dict[str, Any]],
|
||
) -> list[AlertChainMetricSample]:
|
||
samples: list[AlertChainMetricSample] = []
|
||
for item in results:
|
||
metric = item.get("metric", {})
|
||
source = str(metric.get("source", "")).strip().lower()
|
||
value = item.get("value", [])
|
||
if len(value) < 2 or not source:
|
||
continue
|
||
try:
|
||
timestamp = float(value[1])
|
||
except (TypeError, ValueError):
|
||
continue
|
||
samples.append(AlertChainMetricSample(source, timestamp, "prometheus"))
|
||
return samples
|
||
|
||
|
||
_ALERT_CHAIN_METRIC_RE = re.compile(
|
||
r'^awoooi_alert_chain_last_success_timestamp(?:\{(?P<labels>[^}]*)\})?\s+'
|
||
r'(?P<value>[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)'
|
||
)
|
||
_SOURCE_LABEL_RE = re.compile(r'(?:^|,)source="(?P<source>[^"]+)"')
|
||
|
||
|
||
def parse_app_alert_chain_metric_samples(metrics_text: str) -> list[AlertChainMetricSample]:
|
||
samples: list[AlertChainMetricSample] = []
|
||
for raw_line in metrics_text.splitlines():
|
||
line = raw_line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
metric_match = _ALERT_CHAIN_METRIC_RE.match(line)
|
||
if metric_match is None:
|
||
continue
|
||
labels = metric_match.group("labels") or ""
|
||
source_match = _SOURCE_LABEL_RE.search(labels)
|
||
if source_match is None:
|
||
continue
|
||
try:
|
||
timestamp = float(metric_match.group("value"))
|
||
except ValueError:
|
||
continue
|
||
samples.append(
|
||
AlertChainMetricSample(
|
||
source=source_match.group("source").strip().lower(),
|
||
timestamp=timestamp,
|
||
evidence_path="app_metrics",
|
||
)
|
||
)
|
||
return samples
|
||
|
||
|
||
def _newest_sample_for_source(
|
||
samples: list[AlertChainMetricSample],
|
||
source: str,
|
||
) -> AlertChainMetricSample | None:
|
||
source = source.strip().lower()
|
||
matches = [sample for sample in samples if sample.source == source]
|
||
if not matches:
|
||
return None
|
||
return max(matches, key=lambda sample: sample.timestamp)
|
||
|
||
|
||
def _alert_chain_metric_result(
|
||
sample: AlertChainMetricSample,
|
||
*,
|
||
fallback: bool = False,
|
||
) -> CheckResult:
|
||
age_seconds = time.time() - sample.timestamp
|
||
age_minutes = age_seconds / 60
|
||
|
||
if age_seconds > MAX_ALERT_CHAIN_SILENCE_SECONDS:
|
||
return CheckResult(
|
||
"Alert Chain Metric",
|
||
False,
|
||
(
|
||
f"{sample.source} 告警鏈路已靜默 {age_minutes:.0f} 分鐘 "
|
||
f"(evidence={sample.evidence_path}, 超過 120 分鐘閾值)"
|
||
),
|
||
)
|
||
|
||
scrape_note = ";Prometheus scrape 尚未看到" if fallback else ""
|
||
return CheckResult(
|
||
"Alert Chain Metric",
|
||
True,
|
||
(
|
||
f"最後 {sample.source} 告警成功: {age_minutes:.0f} 分鐘前 "
|
||
f"(evidence={sample.evidence_path}{scrape_note})"
|
||
),
|
||
)
|
||
|
||
|
||
def check_alert_chain_metric(
|
||
prometheus_url: str,
|
||
api_url: str,
|
||
*,
|
||
source: str = "alertmanager",
|
||
) -> CheckResult:
|
||
"""Check 2: Alertmanager 主鏈路最後成功時間不超過 2 小時.
|
||
|
||
Prometheus scrape 可能比 app metrics 慢一輪;因此 Prometheus 為第一證據,
|
||
缺資料時回查 API /metrics,並在訊息中明確標示 evidence path。
|
||
"""
|
||
query = (
|
||
"awoooi_alert_chain_last_success_timestamp"
|
||
f'{{source="{_escape_prometheus_label_value(source)}"}}'
|
||
)
|
||
prometheus_result: CheckResult | None = None
|
||
try:
|
||
resp = http_get(
|
||
f"{prometheus_url}/api/v1/query",
|
||
params={"query": query},
|
||
timeout=TIMEOUT,
|
||
)
|
||
data = resp.json()
|
||
if resp.status_code >= 400:
|
||
raise URLError(f"Prometheus HTTP {resp.status_code}")
|
||
results = data.get("data", {}).get("result", [])
|
||
sample = _newest_sample_for_source(
|
||
_prometheus_alert_chain_samples(results),
|
||
source,
|
||
)
|
||
if sample:
|
||
prometheus_result = _alert_chain_metric_result(sample)
|
||
if prometheus_result.passed:
|
||
return prometheus_result
|
||
except (URLError, TimeoutError, OSError, json.JSONDecodeError) as e:
|
||
prometheus_error = _http_error_message(e)
|
||
else:
|
||
prometheus_error = (
|
||
prometheus_result.message
|
||
if prometheus_result is not None
|
||
else "Prometheus 未抓到"
|
||
)
|
||
|
||
try:
|
||
app_resp = http_get(f"{api_url}/metrics", timeout=TIMEOUT)
|
||
if app_resp.status_code >= 400:
|
||
raise URLError(f"API /metrics HTTP {app_resp.status_code}")
|
||
app_sample = _newest_sample_for_source(
|
||
parse_app_alert_chain_metric_samples(app_resp.text),
|
||
source,
|
||
)
|
||
if app_sample:
|
||
app_result = _alert_chain_metric_result(app_sample, fallback=True)
|
||
if app_result.passed or prometheus_result is None:
|
||
return app_result
|
||
return prometheus_result
|
||
except (URLError, TimeoutError, OSError) as e:
|
||
if prometheus_result is not None:
|
||
return prometheus_result
|
||
return CheckResult(
|
||
"Alert Chain Metric",
|
||
False,
|
||
(
|
||
f"{source} 指標不存在或不可讀 "
|
||
f"(Prometheus: {prometheus_error}; app_metrics: {_http_error_message(e)})"
|
||
),
|
||
critical=False,
|
||
)
|
||
|
||
if prometheus_result is not None:
|
||
return prometheus_result
|
||
|
||
return CheckResult(
|
||
"Alert Chain Metric",
|
||
False,
|
||
(
|
||
f"{source} 指標不存在 "
|
||
f"(Prometheus: {prometheus_error}; app_metrics: no sample)"
|
||
),
|
||
critical=False,
|
||
)
|
||
|
||
|
||
def check_webhook_health(api_url: str) -> list[CheckResult]:
|
||
"""Check 3: 所有 Webhook Health Endpoint"""
|
||
results = []
|
||
webhooks = [
|
||
("Alertmanager Webhook", f"{api_url}/api/v1/webhooks/health"),
|
||
("SignOz Webhook", f"{api_url}/api/v1/webhooks/signoz/health"),
|
||
("Sentry Webhook", f"{api_url}/api/v1/webhooks/sentry/health"),
|
||
]
|
||
|
||
for name, url in webhooks:
|
||
try:
|
||
resp = http_get(url, timeout=TIMEOUT)
|
||
if resp.status_code == 200:
|
||
results.append(CheckResult(name, True, "HTTP 200 OK"))
|
||
else:
|
||
results.append(
|
||
CheckResult(name, False, f"HTTP {resp.status_code}")
|
||
)
|
||
except (URLError, TimeoutError, OSError) as e:
|
||
results.append(CheckResult(name, False, f"無法連線: {_http_error_message(e)}"))
|
||
|
||
return results
|
||
|
||
|
||
def _clean_source_providers(providers: list[str]) -> list[str]:
|
||
return [
|
||
provider.strip().lower()
|
||
for provider in providers
|
||
if provider.strip().lower() in {"sentry", "signoz"}
|
||
]
|
||
|
||
|
||
def _safe_run_ref(run_ref: str | None) -> str:
|
||
raw = (run_ref or f"manual-{int(time.time())}").strip()
|
||
cleaned = re.sub(r"[^A-Za-z0-9_.-]+", "-", raw).strip("-")
|
||
return cleaned[:64] or f"manual-{int(time.time())}"
|
||
|
||
|
||
def send_source_provider_heartbeat(
|
||
api_url: str,
|
||
*,
|
||
providers: list[str],
|
||
operator_key: str | None,
|
||
operator_id: str,
|
||
run_ref: str | None = None,
|
||
) -> CheckResult:
|
||
"""Record low-noise provider freshness evidence without creating incidents."""
|
||
cleaned_providers = _clean_source_providers(providers)
|
||
if not cleaned_providers:
|
||
return CheckResult(
|
||
"Source Provider Heartbeat",
|
||
False,
|
||
"沒有有效 provider(允許 sentry/signoz)",
|
||
)
|
||
if not operator_key:
|
||
return CheckResult(
|
||
"Source Provider Heartbeat",
|
||
False,
|
||
"AWOOOP_OPERATOR_API_KEY 未設定;無法寫入受保護 freshness heartbeat",
|
||
)
|
||
|
||
payload = {
|
||
"project_id": "awoooi",
|
||
"providers": cleaned_providers,
|
||
"reason": "scheduled_provider_freshness_smoke",
|
||
"run_ref": run_ref,
|
||
}
|
||
try:
|
||
resp = http_post_json(
|
||
f"{api_url}/api/v1/platform/events/dossier/provider-heartbeat",
|
||
payload,
|
||
headers={
|
||
"X-AwoooP-Operator-Id": operator_id,
|
||
"X-AwoooP-Operator-Key": operator_key,
|
||
},
|
||
timeout=TIMEOUT,
|
||
)
|
||
data = resp.json()
|
||
if resp.status_code >= 400:
|
||
return CheckResult(
|
||
"Source Provider Heartbeat",
|
||
False,
|
||
f"HTTP {resp.status_code}: {data.get('detail', resp.text) if isinstance(data, dict) else resp.text}",
|
||
)
|
||
items = data.get("items", []) if isinstance(data, dict) else []
|
||
recorded = sorted(
|
||
str(item.get("provider", "")).strip().lower()
|
||
for item in items
|
||
if isinstance(item, dict)
|
||
)
|
||
expected = sorted(set(cleaned_providers))
|
||
if recorded != expected:
|
||
return CheckResult(
|
||
"Source Provider Heartbeat",
|
||
False,
|
||
f"recorded providers mismatch: expected={expected}, actual={recorded}",
|
||
)
|
||
return CheckResult(
|
||
"Source Provider Heartbeat",
|
||
True,
|
||
f"recorded {', '.join(recorded)} freshness heartbeat(s)",
|
||
)
|
||
except (URLError, TimeoutError, OSError, json.JSONDecodeError) as e:
|
||
return CheckResult(
|
||
"Source Provider Heartbeat",
|
||
False,
|
||
f"無法寫入 provider heartbeat: {_http_error_message(e)}",
|
||
)
|
||
|
||
|
||
def _build_sentry_upstream_canary_payload(safe_ref: str) -> dict[str, Any]:
|
||
issue_id = f"awoooi-canary-{safe_ref}"
|
||
return {
|
||
"action": "triggered",
|
||
"data": {
|
||
"issue": {
|
||
"id": issue_id,
|
||
"shortId": "AWOOOI-CANARY",
|
||
"title": "AwoooPSourceProviderCanary",
|
||
"culprit": "source-provider-ingestion",
|
||
"level": "info",
|
||
"project": {"slug": "awoooi"},
|
||
"permalink": "https://awoooi.wooo.work/zh-TW/awooop/work-items",
|
||
},
|
||
"event": {
|
||
"message": "AwoooP upstream source provider canary",
|
||
"platform": "python",
|
||
"tags": [
|
||
["awoooi_canary", "true"],
|
||
["run_ref", safe_ref],
|
||
],
|
||
},
|
||
},
|
||
"actor": {"type": "application", "name": "AwoooP E2E"},
|
||
}
|
||
|
||
|
||
def _build_sentry_source_link_canary_payload(
|
||
safe_ref: str,
|
||
*,
|
||
target_incident_id: str,
|
||
) -> dict[str, Any]:
|
||
issue_id = f"awoooi-source-link-canary-{safe_ref}"
|
||
return {
|
||
"action": "triggered",
|
||
"data": {
|
||
"issue": {
|
||
"id": issue_id,
|
||
"shortId": "AWOOOI-CANARY-SOURCE-LINK",
|
||
"title": "AwoooPSourceLinkCanary",
|
||
"culprit": "source-correlation-refresh",
|
||
"level": "info",
|
||
"project": {"slug": "awoooi"},
|
||
"permalink": "https://awoooi.wooo.work/zh-TW/awooop/work-items",
|
||
},
|
||
"event": {
|
||
"message": "AwoooP source correlation refresh canary",
|
||
"platform": "python",
|
||
"tags": [
|
||
["awoooi_canary", "true"],
|
||
["source_link_canary", "true"],
|
||
["run_ref", safe_ref],
|
||
["target_incident_id", target_incident_id],
|
||
],
|
||
},
|
||
},
|
||
"actor": {"type": "application", "name": "AwoooP E2E"},
|
||
}
|
||
|
||
|
||
def _build_signoz_upstream_canary_payload(safe_ref: str) -> dict[str, Any]:
|
||
fingerprint = f"source-provider-canary:signoz:{safe_ref}"
|
||
return {
|
||
"alerts": [
|
||
{
|
||
"status": "firing",
|
||
"alertname": "AwoooPSourceProviderCanary",
|
||
"labels": {
|
||
"alertname": "AwoooPSourceProviderCanary",
|
||
"severity": "info",
|
||
"namespace": "awoooi-prod",
|
||
"service": "source-provider-ingestion",
|
||
"service_name": "source-provider-ingestion",
|
||
"awoooi_canary": "true",
|
||
"run_ref": safe_ref,
|
||
"fingerprint": fingerprint,
|
||
},
|
||
"annotations": {
|
||
"summary": "AwoooP upstream source provider canary",
|
||
"description": (
|
||
"Synthetic provider-shaped SignOz alert used only to "
|
||
"verify source dossier ingestion."
|
||
),
|
||
},
|
||
"generatorURL": "https://awoooi.wooo.work/zh-TW/awooop/work-items",
|
||
}
|
||
]
|
||
}
|
||
|
||
|
||
def _validate_upstream_canary_response(provider: str, data: dict[str, Any]) -> str | None:
|
||
if provider == "sentry":
|
||
if data.get("status") == "canary_recorded" and data.get("provider") == "sentry":
|
||
return None
|
||
return f"unexpected Sentry response: {data}"
|
||
results = data.get("results", [])
|
||
recorded = [
|
||
item
|
||
for item in results
|
||
if isinstance(item, dict)
|
||
and item.get("status") == "canary_recorded"
|
||
and item.get("provider") == "signoz"
|
||
]
|
||
if recorded:
|
||
return None
|
||
return f"unexpected SignOz response: {data}"
|
||
|
||
|
||
def send_source_provider_upstream_canary(
|
||
api_url: str,
|
||
*,
|
||
providers: list[str],
|
||
operator_key: str | None,
|
||
operator_id: str,
|
||
run_ref: str | None = None,
|
||
) -> CheckResult:
|
||
"""Send provider-shaped canaries through webhook-native ingestion paths."""
|
||
cleaned_providers = _clean_source_providers(providers)
|
||
if not cleaned_providers:
|
||
return CheckResult(
|
||
"Source Provider Upstream Canary",
|
||
False,
|
||
"沒有有效 provider(允許 sentry/signoz)",
|
||
)
|
||
if not operator_key:
|
||
return CheckResult(
|
||
"Source Provider Upstream Canary",
|
||
False,
|
||
"AWOOOP_OPERATOR_API_KEY 未設定;無法打入受保護 upstream canary",
|
||
)
|
||
|
||
safe_ref = _safe_run_ref(run_ref)
|
||
endpoints = {
|
||
"sentry": (
|
||
f"{api_url}/api/v1/webhooks/sentry/error",
|
||
_build_sentry_upstream_canary_payload(safe_ref),
|
||
),
|
||
"signoz": (
|
||
f"{api_url}/api/v1/webhooks/signoz/alert",
|
||
_build_signoz_upstream_canary_payload(safe_ref),
|
||
),
|
||
}
|
||
recorded: list[str] = []
|
||
for provider in cleaned_providers:
|
||
url, payload = endpoints[provider]
|
||
try:
|
||
resp = http_post_json(
|
||
url,
|
||
payload,
|
||
headers={
|
||
"X-AwoooP-Operator-Id": operator_id,
|
||
"X-AwoooP-Operator-Key": operator_key,
|
||
},
|
||
timeout=TIMEOUT,
|
||
)
|
||
except (URLError, TimeoutError, OSError) as e:
|
||
return CheckResult(
|
||
"Source Provider Upstream Canary",
|
||
False,
|
||
f"{provider} upstream canary failed: {_http_error_message(e)}",
|
||
)
|
||
if resp.status_code >= 400:
|
||
try:
|
||
data = _response_json(resp)
|
||
except json.JSONDecodeError:
|
||
data = None
|
||
detail = (
|
||
data.get("detail")
|
||
if isinstance(data, dict)
|
||
else _response_body_preview(resp.text)
|
||
)
|
||
return CheckResult(
|
||
"Source Provider Upstream Canary",
|
||
False,
|
||
f"{provider} HTTP {resp.status_code}: {detail}",
|
||
)
|
||
try:
|
||
data = _response_json(resp)
|
||
except json.JSONDecodeError:
|
||
return CheckResult(
|
||
"Source Provider Upstream Canary",
|
||
False,
|
||
(
|
||
f"{provider} upstream canary returned non-JSON "
|
||
f"HTTP {resp.status_code}: {_response_body_preview(resp.text)}"
|
||
),
|
||
)
|
||
if data is None:
|
||
return CheckResult(
|
||
"Source Provider Upstream Canary",
|
||
False,
|
||
f"{provider} upstream canary returned empty HTTP {resp.status_code}",
|
||
)
|
||
validation_error = _validate_upstream_canary_response(provider, data)
|
||
if validation_error:
|
||
return CheckResult(
|
||
"Source Provider Upstream Canary",
|
||
False,
|
||
validation_error,
|
||
)
|
||
recorded.append(provider)
|
||
|
||
return CheckResult(
|
||
"Source Provider Upstream Canary",
|
||
True,
|
||
f"recorded {', '.join(sorted(recorded))} webhook-native canary event(s)",
|
||
)
|
||
|
||
|
||
def send_source_link_canary(
|
||
api_url: str,
|
||
*,
|
||
target_incident_id: str,
|
||
operator_key: str | None,
|
||
operator_id: str,
|
||
run_ref: str | None = None,
|
||
) -> CheckResult:
|
||
"""Send a Sentry canary meant specifically for source-link refresh proof."""
|
||
target_id = str(target_incident_id or "").strip()
|
||
if not target_id:
|
||
return CheckResult(
|
||
"Source Link Canary",
|
||
False,
|
||
"target_incident_id 未設定;無法建立 source-link refresh canary",
|
||
)
|
||
if not operator_key:
|
||
return CheckResult(
|
||
"Source Link Canary",
|
||
False,
|
||
"AWOOOP_OPERATOR_API_KEY 未設定;無法打入受保護 source-link canary",
|
||
)
|
||
|
||
safe_ref = _safe_run_ref(run_ref)
|
||
url = f"{api_url}/api/v1/webhooks/sentry/error"
|
||
payload = _build_sentry_source_link_canary_payload(
|
||
safe_ref,
|
||
target_incident_id=target_id,
|
||
)
|
||
try:
|
||
resp = http_post_json(
|
||
url,
|
||
payload,
|
||
headers={
|
||
"X-AwoooP-Operator-Id": operator_id,
|
||
"X-AwoooP-Operator-Key": operator_key,
|
||
},
|
||
timeout=TIMEOUT,
|
||
)
|
||
except (URLError, TimeoutError, OSError) as e:
|
||
return CheckResult(
|
||
"Source Link Canary",
|
||
False,
|
||
f"sentry source-link canary failed: {_http_error_message(e)}",
|
||
)
|
||
if resp.status_code >= 400:
|
||
try:
|
||
data = _response_json(resp)
|
||
except json.JSONDecodeError:
|
||
data = None
|
||
detail = (
|
||
data.get("detail")
|
||
if isinstance(data, dict)
|
||
else _response_body_preview(resp.text)
|
||
)
|
||
return CheckResult(
|
||
"Source Link Canary",
|
||
False,
|
||
f"sentry HTTP {resp.status_code}: {detail}",
|
||
)
|
||
if not (resp.text or "").strip():
|
||
return CheckResult(
|
||
"Source Link Canary",
|
||
True,
|
||
(
|
||
"accepted sentry source-link canary post with empty "
|
||
f"HTTP {resp.status_code}; source-correlation smoke must verify readback"
|
||
),
|
||
)
|
||
try:
|
||
data = resp.json()
|
||
except json.JSONDecodeError:
|
||
return CheckResult(
|
||
"Source Link Canary",
|
||
False,
|
||
(
|
||
"sentry source-link canary returned non-JSON "
|
||
f"HTTP {resp.status_code}: {_response_body_preview(resp.text)}"
|
||
),
|
||
)
|
||
validation_error = _validate_upstream_canary_response("sentry", data)
|
||
if validation_error:
|
||
return CheckResult("Source Link Canary", False, validation_error)
|
||
return CheckResult(
|
||
"Source Link Canary",
|
||
True,
|
||
f"recorded sentry source-link canary event for {target_id}",
|
||
)
|
||
|
||
|
||
def check_signoz_reachable(signoz_url: str) -> CheckResult:
|
||
"""Check 4: SigNoz UI 可達"""
|
||
try:
|
||
resp = http_get(signoz_url, timeout=TIMEOUT)
|
||
# SigNoz UI 通常回 200 或 301/302
|
||
if resp.status_code < 400:
|
||
return CheckResult("SigNoz", True, f"HTTP {resp.status_code}")
|
||
return CheckResult("SigNoz", False, f"HTTP {resp.status_code}", critical=False)
|
||
except (URLError, TimeoutError, OSError) as e:
|
||
return CheckResult("SigNoz", False, f"無法連線: {_http_error_message(e)}", critical=False)
|
||
|
||
|
||
def check_otel_collector() -> CheckResult:
|
||
"""Check 5: OTEL Collector DaemonSet 是否在 K3s 運行"""
|
||
preflight_error = _status_error_from_env("AWOOOI_OTEL_COLLECTOR_ERROR")
|
||
if preflight_error:
|
||
return CheckResult(
|
||
"OTEL Collector",
|
||
False,
|
||
f"host kubectl preflight failed: {preflight_error}",
|
||
critical=False,
|
||
)
|
||
|
||
preflight_statuses = _statuses_from_env("AWOOOI_OTEL_COLLECTOR_STATUSES")
|
||
if preflight_statuses is not None:
|
||
return _check_running_statuses(
|
||
"OTEL Collector",
|
||
preflight_statuses,
|
||
"沒有 Running 的 OTEL Collector Pod",
|
||
)
|
||
|
||
try:
|
||
statuses = _run_kubectl_status_query("otel-collector")
|
||
if statuses is None:
|
||
return CheckResult(
|
||
"OTEL Collector", False, "kubectl 查詢失敗", critical=False
|
||
)
|
||
|
||
return _check_running_statuses(
|
||
"OTEL Collector",
|
||
statuses,
|
||
"沒有 Running 的 OTEL Collector Pod",
|
||
)
|
||
except Exception as e:
|
||
return CheckResult(
|
||
"OTEL Collector", False, f"無法檢查: {e}", critical=False
|
||
)
|
||
|
||
|
||
def check_event_exporter() -> CheckResult:
|
||
"""Check 6: Event Exporter 是否在 K3s 運行"""
|
||
preflight_error = _status_error_from_env("AWOOOI_EVENT_EXPORTER_ERROR")
|
||
if preflight_error:
|
||
return CheckResult(
|
||
"Event Exporter",
|
||
False,
|
||
f"host kubectl preflight failed: {preflight_error}",
|
||
critical=False,
|
||
)
|
||
|
||
preflight_statuses = _statuses_from_env("AWOOOI_EVENT_EXPORTER_STATUSES")
|
||
if preflight_statuses is not None:
|
||
return _check_running_statuses(
|
||
"Event Exporter",
|
||
preflight_statuses,
|
||
"沒有 Running 的 Event Exporter Pod",
|
||
)
|
||
|
||
try:
|
||
statuses = _run_kubectl_status_query("event-exporter")
|
||
if statuses is None:
|
||
return CheckResult(
|
||
"Event Exporter", False, "kubectl 查詢失敗", critical=False
|
||
)
|
||
|
||
return _check_running_statuses(
|
||
"Event Exporter",
|
||
statuses,
|
||
"沒有 Running 的 Event Exporter Pod",
|
||
)
|
||
except Exception as e:
|
||
return CheckResult(
|
||
"Event Exporter", False, f"無法檢查: {e}", critical=False
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# 主程式
|
||
# =============================================================================
|
||
def run_smoke_test(
|
||
api_url: str,
|
||
fail_fast: bool = False,
|
||
*,
|
||
metrics_api_url: str | None = None,
|
||
source_provider_heartbeat: bool = False,
|
||
source_provider_upstream_canary: bool = False,
|
||
source_providers: list[str] | None = None,
|
||
operator_key: str | None = None,
|
||
operator_id: str = "gitea-e2e-health",
|
||
run_ref: str | None = None,
|
||
source_link_canary_target_incident_id: str | None = None,
|
||
) -> SmokeTestReport:
|
||
report = SmokeTestReport()
|
||
metrics_url = metrics_api_url or api_url
|
||
|
||
print("\n🔍 AWOOOI Alert Chain Smoke Test")
|
||
print(f" API: {api_url}")
|
||
if metrics_url != api_url:
|
||
print(f" Metrics API: {metrics_url}")
|
||
print(f" 時間: {time.strftime('%Y-%m-%d %H:%M:%S %Z')}")
|
||
print("-" * 50)
|
||
|
||
# Check 1: API Health
|
||
report.add(check_api_health(api_url))
|
||
if fail_fast and not report.passed:
|
||
return report
|
||
|
||
# Check 2: Alert Chain Metric
|
||
report.add(check_alert_chain_metric(PROMETHEUS_URL, metrics_url))
|
||
|
||
# Check 3: Webhook Health
|
||
for result in check_webhook_health(api_url):
|
||
report.add(result)
|
||
if fail_fast and not result.passed and result.critical:
|
||
return report
|
||
|
||
if source_provider_heartbeat:
|
||
provider_list = source_providers or ["sentry", "signoz"]
|
||
heartbeat_result = send_source_provider_heartbeat(
|
||
api_url,
|
||
providers=provider_list,
|
||
operator_key=operator_key,
|
||
operator_id=operator_id,
|
||
run_ref=run_ref,
|
||
)
|
||
report.add(heartbeat_result)
|
||
if fail_fast and not heartbeat_result.passed and heartbeat_result.critical:
|
||
return report
|
||
|
||
if heartbeat_result.passed:
|
||
for source in provider_list:
|
||
report.add(
|
||
check_alert_chain_metric(
|
||
PROMETHEUS_URL,
|
||
metrics_url,
|
||
source=source,
|
||
)
|
||
)
|
||
|
||
if source_provider_upstream_canary:
|
||
provider_list = source_providers or ["sentry", "signoz"]
|
||
canary_result = send_source_provider_upstream_canary(
|
||
api_url,
|
||
providers=provider_list,
|
||
operator_key=operator_key,
|
||
operator_id=operator_id,
|
||
run_ref=run_ref,
|
||
)
|
||
report.add(canary_result)
|
||
if fail_fast and not canary_result.passed and canary_result.critical:
|
||
return report
|
||
|
||
if canary_result.passed:
|
||
for source in provider_list:
|
||
report.add(
|
||
check_alert_chain_metric(
|
||
PROMETHEUS_URL,
|
||
metrics_url,
|
||
source=source,
|
||
)
|
||
)
|
||
|
||
if source_link_canary_target_incident_id:
|
||
source_link_result = send_source_link_canary(
|
||
api_url,
|
||
target_incident_id=source_link_canary_target_incident_id,
|
||
operator_key=operator_key,
|
||
operator_id=operator_id,
|
||
run_ref=run_ref,
|
||
)
|
||
report.add(source_link_result)
|
||
if fail_fast and not source_link_result.passed and source_link_result.critical:
|
||
return report
|
||
|
||
# Check 4: SigNoz
|
||
report.add(check_signoz_reachable(SIGNOZ_URL))
|
||
|
||
# Check 5: OTEL Collector
|
||
report.add(check_otel_collector())
|
||
|
||
# Check 6: Event Exporter
|
||
report.add(check_event_exporter())
|
||
|
||
return report
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(description="AWOOOI Alert Chain Smoke Test")
|
||
parser.add_argument(
|
||
"--api-url", default=DEFAULT_API_URL, help="API base URL"
|
||
)
|
||
parser.add_argument(
|
||
"--metrics-api-url",
|
||
default=os.environ.get("ALERT_CHAIN_METRICS_API_URL"),
|
||
help=(
|
||
"API base URL used only for /metrics fallback; useful when public "
|
||
"API routes /metrics to the frontend"
|
||
),
|
||
)
|
||
parser.add_argument(
|
||
"--fail-fast", action="store_true", help="第一個 critical 失敗即中止"
|
||
)
|
||
parser.add_argument(
|
||
"--json", action="store_true", help="輸出 JSON 格式結果"
|
||
)
|
||
parser.add_argument(
|
||
"--source-provider-heartbeat",
|
||
action="store_true",
|
||
help="寫入 Sentry/SignOz 低噪音 freshness heartbeat 並驗證 provider 指標",
|
||
)
|
||
parser.add_argument(
|
||
"--source-provider-upstream-canary",
|
||
action="store_true",
|
||
help="透過 Sentry/SignOz 原生 webhook 寫入 provider-shaped canary 來源證據",
|
||
)
|
||
parser.add_argument(
|
||
"--source-provider",
|
||
action="append",
|
||
choices=["sentry", "signoz"],
|
||
help="指定要驗證的 provider;可重複指定,預設 sentry+signoz",
|
||
)
|
||
parser.add_argument(
|
||
"--operator-id",
|
||
default=os.environ.get("AWOOOP_OPERATOR_ID", "gitea-e2e-health"),
|
||
help="AwoooP operator identity header",
|
||
)
|
||
parser.add_argument(
|
||
"--operator-key-env",
|
||
default="AWOOOP_OPERATOR_API_KEY",
|
||
help="讀取 AwoooP operator key 的環境變數名稱",
|
||
)
|
||
parser.add_argument(
|
||
"--run-ref",
|
||
default=os.environ.get("GITHUB_RUN_ID") or os.environ.get("GITEA_RUN_ID"),
|
||
help="CI run reference stored in heartbeat payload",
|
||
)
|
||
parser.add_argument(
|
||
"--source-link-canary-target-incident-id",
|
||
help=(
|
||
"Write a Sentry source-link canary for this target Incident so "
|
||
"source-correlation refresh can use semantically dedicated evidence."
|
||
),
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
report = run_smoke_test(
|
||
args.api_url,
|
||
args.fail_fast,
|
||
metrics_api_url=args.metrics_api_url,
|
||
source_provider_heartbeat=args.source_provider_heartbeat,
|
||
source_provider_upstream_canary=args.source_provider_upstream_canary,
|
||
source_providers=args.source_provider,
|
||
operator_key=os.environ.get(args.operator_key_env),
|
||
operator_id=args.operator_id,
|
||
run_ref=args.run_ref,
|
||
source_link_canary_target_incident_id=(
|
||
args.source_link_canary_target_incident_id
|
||
),
|
||
)
|
||
|
||
print("-" * 50)
|
||
if report.passed:
|
||
print(f"✅ PASSED — {report.summary()}")
|
||
else:
|
||
print(f"❌ FAILED — {report.summary()}")
|
||
if report.failed_critical:
|
||
print("\n失敗的 Critical 檢查:")
|
||
for c in report.failed_critical:
|
||
print(f" - [{c.name}] {c.message}")
|
||
|
||
if args.json:
|
||
output = {
|
||
"passed": report.passed,
|
||
"summary": report.summary(),
|
||
"checks": [
|
||
{
|
||
"name": c.name,
|
||
"passed": c.passed,
|
||
"message": c.message,
|
||
"critical": c.critical,
|
||
}
|
||
for c in report.checks
|
||
],
|
||
}
|
||
print("\n" + json.dumps(output, ensure_ascii=False, indent=2))
|
||
|
||
return 0 if report.passed else 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|