diff --git a/apps/api/tests/test_alert_chain_smoke_metric.py b/apps/api/tests/test_alert_chain_smoke_metric.py new file mode 100644 index 00000000..6161ae31 --- /dev/null +++ b/apps/api/tests/test_alert_chain_smoke_metric.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +import importlib.util +import sys +import time +import unittest +from pathlib import Path + + +SCRIPT_PATH = Path(__file__).resolve().parents[3] / "scripts" / "alert_chain_smoke_test.py" +SPEC = importlib.util.spec_from_file_location("alert_chain_smoke_test", SCRIPT_PATH) +alert_chain_smoke_test = importlib.util.module_from_spec(SPEC) +assert SPEC and SPEC.loader +sys.dont_write_bytecode = True +sys.modules[SPEC.name] = alert_chain_smoke_test +SPEC.loader.exec_module(alert_chain_smoke_test) + + +class AlertChainSmokeMetricTest(unittest.TestCase): + def test_parse_app_alert_chain_metric_samples(self): + samples = alert_chain_smoke_test.parse_app_alert_chain_metric_samples( + "\n".join([ + "# HELP awoooi_alert_chain_last_success_timestamp Last successful alert chain", + 'awoooi_alert_chain_last_success_timestamp{source="alertmanager"} 123.5', + 'awoooi_alert_chain_last_success_timestamp{source="sentry"} 120', + "unrelated_metric 1", + ]) + ) + + self.assertEqual( + samples, + [ + alert_chain_smoke_test.AlertChainMetricSample( + source="alertmanager", + timestamp=123.5, + evidence_path="app_metrics", + ), + alert_chain_smoke_test.AlertChainMetricSample( + source="sentry", + timestamp=120.0, + evidence_path="app_metrics", + ), + ], + ) + + def test_newest_sample_for_source_prefers_requested_source(self): + samples = [ + alert_chain_smoke_test.AlertChainMetricSample("sentry", 999.0, "prometheus"), + alert_chain_smoke_test.AlertChainMetricSample("alertmanager", 100.0, "prometheus"), + alert_chain_smoke_test.AlertChainMetricSample("alertmanager", 200.0, "app_metrics"), + ] + + sample = alert_chain_smoke_test._newest_sample_for_source(samples, "alertmanager") + + self.assertEqual(sample.timestamp, 200.0) + self.assertEqual(sample.evidence_path, "app_metrics") + + def test_alert_chain_metric_result_marks_recent_app_metric_as_scrape_delay(self): + sample = alert_chain_smoke_test.AlertChainMetricSample( + source="alertmanager", + timestamp=time.time() - 60, + evidence_path="app_metrics", + ) + + result = alert_chain_smoke_test._alert_chain_metric_result(sample, fallback=True) + + self.assertTrue(result.passed) + self.assertIn("Prometheus scrape 尚未看到", result.message) + + def test_alert_chain_metric_result_fails_persistent_silence(self): + sample = alert_chain_smoke_test.AlertChainMetricSample( + source="alertmanager", + timestamp=time.time() - alert_chain_smoke_test.MAX_ALERT_CHAIN_SILENCE_SECONDS - 60, + evidence_path="prometheus", + ) + + result = alert_chain_smoke_test._alert_chain_metric_result(sample) + + self.assertFalse(result.passed) + self.assertTrue(result.critical) + + +if __name__ == "__main__": + unittest.main() diff --git a/k8s/monitoring/alert-chain-monitor.yaml b/k8s/monitoring/alert-chain-monitor.yaml index 189905c9..06f81c3c 100644 --- a/k8s/monitoring/alert-chain-monitor.yaml +++ b/k8s/monitoring/alert-chain-monitor.yaml @@ -88,15 +88,15 @@ spec: # ----------------------------------------------------------------- - alert: NoAlertsReceived2Hours expr: | - time() - max by (source)(awoooi_alert_chain_last_success_timestamp) > 7200 + time() - max by (source)(awoooi_alert_chain_last_success_timestamp{source="alertmanager"}) > 7200 for: 5m labels: severity: warning service: alert-chain team: platform annotations: - summary: "2 小時內未收到任何告警" - description: "可能是告警鏈路問題或系統異常穩定。請執行 Smoke Test: python ops/scripts/alert_chain_smoke_test.py" + summary: "Alertmanager 主鏈路 2 小時內未收到告警" + description: "Alertmanager 是固定主鏈路;Sentry/SignOz 沉默不代表鏈路故障,錯誤率另有 AlertChainBroken_* 規則監控。請執行 Smoke Test: python scripts/alert_chain_smoke_test.py" # ----------------------------------------------------------------- # 告警鏈路健康狀態 diff --git a/ops/monitoring/alerts-unified.yml b/ops/monitoring/alerts-unified.yml index f89493e5..22c7f7a6 100644 --- a/ops/monitoring/alerts-unified.yml +++ b/ops/monitoring/alerts-unified.yml @@ -513,7 +513,7 @@ groups: description: "Sentry 錯誤可能無法正確處理" - alert: NoAlertsReceived2Hours - expr: time() - max by (source)(awoooi_alert_chain_last_success_timestamp) > 7200 + expr: time() - max by (source)(awoooi_alert_chain_last_success_timestamp{source="alertmanager"}) > 7200 for: 5m labels: severity: warning @@ -521,8 +521,8 @@ groups: team: platform auto_repair: "false" annotations: - summary: "2 小時內未收到任何告警 ({{ $labels.source }})" - description: "可能是告警鏈路問題,請執行 Smoke Test" + summary: "Alertmanager 主鏈路 2 小時內未收到告警" + description: "Alertmanager 是固定主鏈路;Sentry/SignOz 沉默不代表鏈路故障,錯誤率另有 AlertChainBroken_* 規則監控。請執行 Smoke Test" - alert: AlertChainUnhealthy expr: awoooi_alert_chain_healthy == 0 diff --git a/ops/monitoring/alerts.yml b/ops/monitoring/alerts.yml index 3a8d4596..a6b0611a 100644 --- a/ops/monitoring/alerts.yml +++ b/ops/monitoring/alerts.yml @@ -513,7 +513,7 @@ groups: description: "Sentry 錯誤可能無法正確處理" - alert: NoAlertsReceived2Hours - expr: time() - max by (source)(awoooi_alert_chain_last_success_timestamp) > 7200 + expr: time() - max by (source)(awoooi_alert_chain_last_success_timestamp{source="alertmanager"}) > 7200 for: 5m labels: severity: warning @@ -521,8 +521,8 @@ groups: team: platform auto_repair: "false" annotations: - summary: "2 小時內未收到任何告警 ({{ $labels.source }})" - description: "可能是告警鏈路問題,請執行 Smoke Test" + summary: "Alertmanager 主鏈路 2 小時內未收到告警" + description: "Alertmanager 是固定主鏈路;Sentry/SignOz 沉默不代表鏈路故障,錯誤率另有 AlertChainBroken_* 規則監控。請執行 Smoke Test" - alert: AlertChainUnhealthy expr: awoooi_alert_chain_healthy == 0 diff --git a/scripts/alert_chain_smoke_test.py b/scripts/alert_chain_smoke_test.py index 2eb559e0..31304b2c 100644 --- a/scripts/alert_chain_smoke_test.py +++ b/scripts/alert_chain_smoke_test.py @@ -28,17 +28,15 @@ from __future__ import annotations import argparse import json import os +import re import shlex import sys import time from dataclasses import dataclass, field from typing import Any - -try: - import requests -except ImportError: - print("❌ 需要安裝 requests: pip install requests") - sys.exit(1) +from urllib.error import HTTPError, URLError +from urllib.parse import urlencode +from urllib.request import Request, urlopen # ============================================================================= # 配置 @@ -54,6 +52,48 @@ MAX_ALERT_CHAIN_SILENCE_SECONDS = 2 * 60 * 60 TIMEOUT = 10 # 秒 +@dataclass(frozen=True) +class HttpGetResult: + status_code: int + text: str + + def json(self) -> dict[str, Any]: + return json.loads(self.text) + + +@dataclass(frozen=True) +class AlertChainMetricSample: + source: str + timestamp: float + evidence_path: str + + +def http_get( + url: str, + *, + params: dict[str, str] | None = None, + timeout: int = TIMEOUT, +) -> HttpGetResult: + if params: + separator = "&" if "?" in url else "?" + url = f"{url}{separator}{urlencode(params)}" + + request = Request(url, headers={"Accept": "application/json,text/plain,*/*"}) + try: + with urlopen(request, timeout=timeout) as response: + body = response.read().decode("utf-8", errors="replace") + return HttpGetResult(response.status, body) + except HTTPError as exc: + body = exc.read().decode("utf-8", errors="replace") + return HttpGetResult(exc.code, body) + + +def _http_error_message(error: Exception) -> str: + if isinstance(error, URLError): + return str(error.reason) + return str(error) + + def _statuses_from_env(env_name: str) -> list[str] | None: """Return preflight pod statuses supplied by CI, or None to use kubectl.""" if env_name not in os.environ: @@ -154,9 +194,12 @@ class SmokeTestReport: def check_api_health(api_url: str) -> CheckResult: """Check 1: API Health — 所有組件必須 UP""" try: - resp = requests.get(f"{api_url}/api/v1/health", timeout=TIMEOUT) + resp = http_get(f"{api_url}/api/v1/health", timeout=TIMEOUT) data = resp.json() + if resp.status_code >= 400: + return CheckResult("API Health", False, f"HTTP {resp.status_code}") + if data.get("status") != "healthy": return CheckResult( "API Health", @@ -183,50 +226,173 @@ def check_api_health(api_url: str) -> CheckResult: True, f"所有 {len(components)} 個組件 UP ({data.get('environment', 'unknown')})", ) - except requests.RequestException as e: - return CheckResult("API Health", False, f"無法連線: {e}") + except (URLError, TimeoutError, OSError, json.JSONDecodeError) as e: + return CheckResult("API Health", False, f"無法連線: {_http_error_message(e)}") -def check_alert_chain_metric(prometheus_url: str) -> CheckResult: - """Check 2: 告警鏈路最後成功時間不超過 2 小時""" +def _escape_prometheus_label_value(value: str) -> str: + return value.replace("\\", "\\\\").replace('"', '\\"') + + +def _prometheus_alert_chain_samples( + results: list[dict[str, Any]], +) -> list[AlertChainMetricSample]: + samples: list[AlertChainMetricSample] = [] + for item in results: + metric = item.get("metric", {}) + source = str(metric.get("source", "")).strip().lower() + value = item.get("value", []) + if len(value) < 2 or not source: + continue + try: + timestamp = float(value[1]) + except (TypeError, ValueError): + continue + samples.append(AlertChainMetricSample(source, timestamp, "prometheus")) + return samples + + +_ALERT_CHAIN_METRIC_RE = re.compile( + r'^awoooi_alert_chain_last_success_timestamp(?:\{(?P[^}]*)\})?\s+' + r'(?P[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)' +) +_SOURCE_LABEL_RE = re.compile(r'(?:^|,)source="(?P[^"]+)"') + + +def parse_app_alert_chain_metric_samples(metrics_text: str) -> list[AlertChainMetricSample]: + samples: list[AlertChainMetricSample] = [] + for raw_line in metrics_text.splitlines(): + line = raw_line.strip() + if not line or line.startswith("#"): + continue + metric_match = _ALERT_CHAIN_METRIC_RE.match(line) + if metric_match is None: + continue + labels = metric_match.group("labels") or "" + source_match = _SOURCE_LABEL_RE.search(labels) + if source_match is None: + continue + try: + timestamp = float(metric_match.group("value")) + except ValueError: + continue + samples.append( + AlertChainMetricSample( + source=source_match.group("source").strip().lower(), + timestamp=timestamp, + evidence_path="app_metrics", + ) + ) + return samples + + +def _newest_sample_for_source( + samples: list[AlertChainMetricSample], + source: str, +) -> AlertChainMetricSample | None: + source = source.strip().lower() + matches = [sample for sample in samples if sample.source == source] + if not matches: + return None + return max(matches, key=lambda sample: sample.timestamp) + + +def _alert_chain_metric_result( + sample: AlertChainMetricSample, + *, + fallback: bool = False, +) -> CheckResult: + age_seconds = time.time() - sample.timestamp + age_minutes = age_seconds / 60 + + if age_seconds > MAX_ALERT_CHAIN_SILENCE_SECONDS: + return CheckResult( + "Alert Chain Metric", + False, + ( + f"{sample.source} 告警鏈路已靜默 {age_minutes:.0f} 分鐘 " + f"(evidence={sample.evidence_path}, 超過 120 分鐘閾值)" + ), + ) + + scrape_note = ";Prometheus scrape 尚未看到" if fallback else "" + return CheckResult( + "Alert Chain Metric", + True, + ( + f"最後 {sample.source} 告警成功: {age_minutes:.0f} 分鐘前 " + f"(evidence={sample.evidence_path}{scrape_note})" + ), + ) + + +def check_alert_chain_metric( + prometheus_url: str, + api_url: str, + *, + source: str = "alertmanager", +) -> CheckResult: + """Check 2: Alertmanager 主鏈路最後成功時間不超過 2 小時. + + Prometheus scrape 可能比 app metrics 慢一輪;因此 Prometheus 為第一證據, + 缺資料時回查 API /metrics,並在訊息中明確標示 evidence path。 + """ + query = ( + "awoooi_alert_chain_last_success_timestamp" + f'{{source="{_escape_prometheus_label_value(source)}"}}' + ) try: - resp = requests.get( + resp = http_get( f"{prometheus_url}/api/v1/query", - params={"query": "awoooi_alert_chain_last_success_timestamp"}, + params={"query": query}, timeout=TIMEOUT, ) data = resp.json() + if resp.status_code >= 400: + raise URLError(f"Prometheus HTTP {resp.status_code}") results = data.get("data", {}).get("result", []) + sample = _newest_sample_for_source( + _prometheus_alert_chain_samples(results), + source, + ) + if sample: + return _alert_chain_metric_result(sample) + except (URLError, TimeoutError, OSError, json.JSONDecodeError) as e: + prometheus_error = _http_error_message(e) + else: + prometheus_error = "Prometheus 未抓到" - if not results: - return CheckResult( - "Alert Chain Metric", - False, - "awoooi_alert_chain_last_success_timestamp 指標不存在 (Prometheus 未抓到)", - critical=False, # 指標可能剛啟動 - ) - - last_success = float(results[0]["value"][1]) - age_seconds = time.time() - last_success - age_minutes = age_seconds / 60 - - if age_seconds > MAX_ALERT_CHAIN_SILENCE_SECONDS: - return CheckResult( - "Alert Chain Metric", - False, - f"告警鏈路已靜默 {age_minutes:.0f} 分鐘 (超過 120 分鐘閾值)", - ) - + try: + app_resp = http_get(f"{api_url}/metrics", timeout=TIMEOUT) + if app_resp.status_code >= 400: + raise URLError(f"API /metrics HTTP {app_resp.status_code}") + app_sample = _newest_sample_for_source( + parse_app_alert_chain_metric_samples(app_resp.text), + source, + ) + if app_sample: + return _alert_chain_metric_result(app_sample, fallback=True) + except (URLError, TimeoutError, OSError) as e: return CheckResult( "Alert Chain Metric", - True, - f"最後告警成功: {age_minutes:.0f} 分鐘前", - ) - except requests.RequestException as e: - return CheckResult( - "Alert Chain Metric", False, f"無法查詢 Prometheus: {e}", critical=False + False, + ( + f"{source} 指標不存在或不可讀 " + f"(Prometheus: {prometheus_error}; app_metrics: {_http_error_message(e)})" + ), + critical=False, ) + return CheckResult( + "Alert Chain Metric", + False, + ( + f"{source} 指標不存在 " + f"(Prometheus: {prometheus_error}; app_metrics: no sample)" + ), + critical=False, + ) + def check_webhook_health(api_url: str) -> list[CheckResult]: """Check 3: 所有 Webhook Health Endpoint""" @@ -239,15 +405,15 @@ def check_webhook_health(api_url: str) -> list[CheckResult]: for name, url in webhooks: try: - resp = requests.get(url, timeout=TIMEOUT) + resp = http_get(url, timeout=TIMEOUT) if resp.status_code == 200: results.append(CheckResult(name, True, f"HTTP 200 OK")) else: results.append( CheckResult(name, False, f"HTTP {resp.status_code}") ) - except requests.RequestException as e: - results.append(CheckResult(name, False, f"無法連線: {e}")) + except (URLError, TimeoutError, OSError) as e: + results.append(CheckResult(name, False, f"無法連線: {_http_error_message(e)}")) return results @@ -255,13 +421,13 @@ def check_webhook_health(api_url: str) -> list[CheckResult]: def check_signoz_reachable(signoz_url: str) -> CheckResult: """Check 4: SigNoz UI 可達""" try: - resp = requests.get(signoz_url, timeout=TIMEOUT) + resp = http_get(signoz_url, timeout=TIMEOUT) # SigNoz UI 通常回 200 或 301/302 if resp.status_code < 400: return CheckResult("SigNoz", True, f"HTTP {resp.status_code}") return CheckResult("SigNoz", False, f"HTTP {resp.status_code}", critical=False) - except requests.RequestException as e: - return CheckResult("SigNoz", False, f"無法連線: {e}", critical=False) + except (URLError, TimeoutError, OSError) as e: + return CheckResult("SigNoz", False, f"無法連線: {_http_error_message(e)}", critical=False) def check_otel_collector() -> CheckResult: @@ -355,7 +521,7 @@ def run_smoke_test(api_url: str, fail_fast: bool = False) -> SmokeTestReport: return report # Check 2: Alert Chain Metric - report.add(check_alert_chain_metric(PROMETHEUS_URL)) + report.add(check_alert_chain_metric(PROMETHEUS_URL, api_url)) # Check 3: Webhook Health for result in check_webhook_health(api_url):