From 8fa8d690a29140f676d5955305c81af16a63fcc2 Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 20 May 2026 12:41:09 +0800 Subject: [PATCH] fix(monitoring): stabilize post-deploy target coverage --- .../test_generate_monitoring_stabilization.py | 109 +++++++++++++ scripts/generate_monitoring.py | 151 ++++++++++++++++-- 2 files changed, 247 insertions(+), 13 deletions(-) create mode 100644 apps/api/tests/test_generate_monitoring_stabilization.py diff --git a/apps/api/tests/test_generate_monitoring_stabilization.py b/apps/api/tests/test_generate_monitoring_stabilization.py new file mode 100644 index 00000000..f7f84116 --- /dev/null +++ b/apps/api/tests/test_generate_monitoring_stabilization.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import importlib.util +import unittest +from pathlib import Path + + +SCRIPT_PATH = Path(__file__).resolve().parents[3] / "scripts" / "generate_monitoring.py" +SPEC = importlib.util.spec_from_file_location("generate_monitoring", SCRIPT_PATH) +generate_monitoring = importlib.util.module_from_spec(SPEC) +assert SPEC and SPEC.loader +SPEC.loader.exec_module(generate_monitoring) + + +def targets_payload(down_jobs: set[str] | None = None, missing_jobs: set[str] | None = None): + down_jobs = down_jobs or set() + missing_jobs = missing_jobs or set() + + active_targets = [] + for job in generate_monitoring.EXPECTED_JOBS: + if job in missing_jobs: + continue + active_targets.append( + { + "labels": {"job": job, "instance": f"{job}:1"}, + "health": "down" if job in down_jobs else "up", + } + ) + + return {"activeTargets": active_targets} + + +class GenerateMonitoringStabilizationTest(unittest.TestCase): + def test_stabilized_report_uses_later_clean_target_snapshot(self): + snapshots = [ + targets_payload(down_jobs={"awoooi-api"}), + targets_payload(), + ] + + def fetch_targets(): + return snapshots.pop(0) + + report = generate_monitoring.build_stabilized_report( + fetch_targets, + attempts=3, + sleep_seconds=0, + emit_status=False, + ) + + self.assertEqual(report["summary"]["real_down_jobs"], 0) + self.assertEqual(report["stabilization"]["attempt"], 2) + self.assertEqual(report["stabilization"]["status"], "cleared") + + def test_stabilized_report_retries_missing_expected_target(self): + snapshots = [ + targets_payload(missing_jobs={"awoooi-api"}), + targets_payload(), + ] + + def fetch_targets(): + return snapshots.pop(0) + + report = generate_monitoring.build_stabilized_report( + fetch_targets, + attempts=3, + sleep_seconds=0, + emit_status=False, + ) + + self.assertEqual(report["missing_expected"], []) + self.assertEqual(report["stabilization"]["attempt"], 2) + self.assertEqual(report["stabilization"]["status"], "cleared") + + def test_stabilized_report_keeps_real_down_after_attempts_exhausted(self): + def fetch_targets(): + return targets_payload(down_jobs={"awoooi-api"}) + + report = generate_monitoring.build_stabilized_report( + fetch_targets, + attempts=2, + sleep_seconds=0, + emit_status=False, + ) + + self.assertEqual(report["summary"]["real_down_jobs"], 1) + self.assertEqual(report["real_down_jobs"], ["awoooi-api"]) + self.assertEqual(report["stabilization"]["status"], "failed") + + def test_stabilized_report_does_not_retry_clean_snapshot(self): + calls = 0 + + def fetch_targets(): + nonlocal calls + calls += 1 + return targets_payload() + + report = generate_monitoring.build_stabilized_report( + fetch_targets, + attempts=3, + sleep_seconds=0, + emit_status=False, + ) + + self.assertEqual(calls, 1) + self.assertEqual(report["stabilization"]["status"], "stable") + + +if __name__ == "__main__": + unittest.main() diff --git a/scripts/generate_monitoring.py b/scripts/generate_monitoring.py index 31013ac6..f2fd7e63 100644 --- a/scripts/generate_monitoring.py +++ b/scripts/generate_monitoring.py @@ -16,11 +16,14 @@ Phase O-5 Wave C.1 (2026-04-02 ogt) import argparse import json +import os import subprocess import sys +import time from datetime import datetime - -import requests +from typing import Callable +from urllib.error import HTTPError, URLError +from urllib.request import urlopen # ============================================================ # 設定 @@ -28,6 +31,8 @@ import requests PROMETHEUS_URL = "http://192.168.0.110:9090" COVERAGE_THRESHOLD = 70 # CI 模式: 覆蓋率低於此值則 exit 1 +DEFAULT_STABILIZATION_ATTEMPTS = 3 +DEFAULT_STABILIZATION_SLEEP_SECONDS = 10.0 # 已知服務清單 (job名稱 → 說明) EXPECTED_JOBS = { @@ -52,13 +57,27 @@ KNOWN_DOWN_TARGETS = { } +def _int_env(name: str, default: int) -> int: + try: + return max(1, int(os.environ.get(name, default))) + except ValueError: + return default + + +def _float_env(name: str, default: float) -> float: + try: + return max(0.0, float(os.environ.get(name, default))) + except ValueError: + return default + + def get_prometheus_targets() -> dict: """查詢 Prometheus targets API""" try: - resp = requests.get(f"{PROMETHEUS_URL}/api/v1/targets", timeout=10) - resp.raise_for_status() - return resp.json()["data"] - except requests.RequestException as e: + with urlopen(f"{PROMETHEUS_URL}/api/v1/targets", timeout=10) as resp: + payload = json.loads(resp.read().decode("utf-8")) + return payload["data"] + except (HTTPError, URLError, TimeoutError, json.JSONDecodeError, KeyError) as e: print(f"❌ 無法連接 Prometheus ({PROMETHEUS_URL}): {e}", file=sys.stderr) sys.exit(1) @@ -133,6 +152,76 @@ def build_report(jobs: dict) -> dict: } +def build_report_from_targets(targets_data: dict) -> dict: + """從 Prometheus targets API payload 建立覆蓋率報告""" + return build_report(analyze_targets(targets_data)) + + +def report_needs_stabilization(report: dict) -> bool: + """是否需要重查,避免 post-deploy 瞬間 scrape 狀態造成 false red.""" + return bool(report["real_down_jobs"] or report["missing_expected"]) + + +def stabilization_reason(report: dict) -> str: + parts: list[str] = [] + if report["real_down_jobs"]: + parts.append(f"real_down={','.join(report['real_down_jobs'])}") + if report["missing_expected"]: + parts.append(f"missing_expected={','.join(report['missing_expected'])}") + return "; ".join(parts) if parts else "stable" + + +def build_stabilized_report( + fetch_targets: Callable[[], dict], + attempts: int, + sleep_seconds: float, + emit_status: bool = True, +) -> dict: + """重查 Prometheus targets,讓 CI gate 避開 rollout/scrape freshness 瞬間值.""" + attempts = max(1, attempts) + sleep_seconds = max(0.0, sleep_seconds) + + report: dict | None = None + for attempt in range(1, attempts + 1): + report = build_report_from_targets(fetch_targets()) + needs_retry = report_needs_stabilization(report) + status = "stable" + if needs_retry and attempt < attempts: + status = "retrying" + elif needs_retry: + status = "failed" + elif attempt > 1: + status = "cleared" + + report["stabilization"] = { + "attempt": attempt, + "attempts": attempts, + "sleep_seconds": sleep_seconds, + "status": status, + "reason": stabilization_reason(report), + } + + if not needs_retry or attempt == attempts: + if emit_status and attempt > 1 and not needs_retry: + print( + "✅ Prometheus target stabilization cleared transient coverage drift", + file=sys.stderr, + ) + return report + + if emit_status: + print( + "⏳ Prometheus target stabilization " + f"{attempt}/{attempts}: {stabilization_reason(report)}", + file=sys.stderr, + ) + time.sleep(sleep_seconds) + + if report is None: + raise RuntimeError("monitoring report stabilization did not run") + return report + + def print_human_report(report: dict) -> None: """輸出人可讀格式報告""" s = report["summary"] @@ -161,11 +250,15 @@ def print_human_report(report: dict) -> None: status = "❌ DOWN" print(f" {status:<30} {job:<25} {desc}") - if report["known_down"]: + known_down_present = [ + (job, reason) + for job, reason in report["known_down"].items() + if job in report["jobs"] and report["jobs"][job]["down"] + ] + if known_down_present: print(f"\n⚠️ 已知 DOWN (不影響覆蓋率)") - for job, reason in report["known_down"].items(): - if job in report["jobs"]: - print(f" {job:<30} {reason}") + for job, reason in known_down_present: + print(f" {job:<30} {reason}") if report["real_down_jobs"]: print(f"\n🔴 需處理的 DOWN targets") @@ -178,6 +271,15 @@ def print_human_report(report: dict) -> None: for job in report["missing_expected"]: print(f" {job}: {report['expected_jobs'][job]}") + stabilization = report.get("stabilization") + if stabilization and stabilization["attempt"] > 1: + print(f"\n⏱️ Prometheus target 穩定化") + print( + " " + f"{stabilization['status']} after " + f"{stabilization['attempt']}/{stabilization['attempts']} attempts" + ) + pct = s["expected_coverage_pct"] threshold = COVERAGE_THRESHOLD if pct >= threshold and not report["real_down_jobs"]: @@ -196,11 +298,34 @@ def main() -> None: action="store_true", help=f"CI 模式: 覆蓋率 < {COVERAGE_THRESHOLD}% 則 exit 1", ) + parser.add_argument( + "--stabilization-attempts", + type=int, + default=_int_env( + "AWOOOI_MONITORING_TARGET_STABILIZATION_ATTEMPTS", + DEFAULT_STABILIZATION_ATTEMPTS, + ), + help="CI 模式: Prometheus target 狀態重查次數", + ) + parser.add_argument( + "--stabilization-sleep-seconds", + type=float, + default=_float_env( + "AWOOOI_MONITORING_TARGET_STABILIZATION_SLEEP_SECONDS", + DEFAULT_STABILIZATION_SLEEP_SECONDS, + ), + help="CI 模式: Prometheus target 重查間隔秒數", + ) args = parser.parse_args() - targets_data = get_prometheus_targets() - jobs = analyze_targets(targets_data) - report = build_report(jobs) + if args.check: + report = build_stabilized_report( + get_prometheus_targets, + attempts=args.stabilization_attempts, + sleep_seconds=args.stabilization_sleep_seconds, + ) + else: + report = build_report_from_targets(get_prometheus_targets()) if args.json: print(json.dumps(report, ensure_ascii=False, indent=2))