Files
awoooi/apps/api/tests/test_generate_monitoring_stabilization.py
Your Name 8fa8d690a2
Some checks failed
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 4m7s
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled
fix(monitoring): stabilize post-deploy target coverage
2026-05-20 12:41:09 +08:00

110 lines
3.3 KiB
Python

from __future__ import annotations
import importlib.util
import unittest
from pathlib import Path
SCRIPT_PATH = Path(__file__).resolve().parents[3] / "scripts" / "generate_monitoring.py"
SPEC = importlib.util.spec_from_file_location("generate_monitoring", SCRIPT_PATH)
generate_monitoring = importlib.util.module_from_spec(SPEC)
assert SPEC and SPEC.loader
SPEC.loader.exec_module(generate_monitoring)
def targets_payload(down_jobs: set[str] | None = None, missing_jobs: set[str] | None = None):
down_jobs = down_jobs or set()
missing_jobs = missing_jobs or set()
active_targets = []
for job in generate_monitoring.EXPECTED_JOBS:
if job in missing_jobs:
continue
active_targets.append(
{
"labels": {"job": job, "instance": f"{job}:1"},
"health": "down" if job in down_jobs else "up",
}
)
return {"activeTargets": active_targets}
class GenerateMonitoringStabilizationTest(unittest.TestCase):
def test_stabilized_report_uses_later_clean_target_snapshot(self):
snapshots = [
targets_payload(down_jobs={"awoooi-api"}),
targets_payload(),
]
def fetch_targets():
return snapshots.pop(0)
report = generate_monitoring.build_stabilized_report(
fetch_targets,
attempts=3,
sleep_seconds=0,
emit_status=False,
)
self.assertEqual(report["summary"]["real_down_jobs"], 0)
self.assertEqual(report["stabilization"]["attempt"], 2)
self.assertEqual(report["stabilization"]["status"], "cleared")
def test_stabilized_report_retries_missing_expected_target(self):
snapshots = [
targets_payload(missing_jobs={"awoooi-api"}),
targets_payload(),
]
def fetch_targets():
return snapshots.pop(0)
report = generate_monitoring.build_stabilized_report(
fetch_targets,
attempts=3,
sleep_seconds=0,
emit_status=False,
)
self.assertEqual(report["missing_expected"], [])
self.assertEqual(report["stabilization"]["attempt"], 2)
self.assertEqual(report["stabilization"]["status"], "cleared")
def test_stabilized_report_keeps_real_down_after_attempts_exhausted(self):
def fetch_targets():
return targets_payload(down_jobs={"awoooi-api"})
report = generate_monitoring.build_stabilized_report(
fetch_targets,
attempts=2,
sleep_seconds=0,
emit_status=False,
)
self.assertEqual(report["summary"]["real_down_jobs"], 1)
self.assertEqual(report["real_down_jobs"], ["awoooi-api"])
self.assertEqual(report["stabilization"]["status"], "failed")
def test_stabilized_report_does_not_retry_clean_snapshot(self):
calls = 0
def fetch_targets():
nonlocal calls
calls += 1
return targets_payload()
report = generate_monitoring.build_stabilized_report(
fetch_targets,
attempts=3,
sleep_seconds=0,
emit_status=False,
)
self.assertEqual(calls, 1)
self.assertEqual(report["stabilization"]["status"], "stable")
if __name__ == "__main__":
unittest.main()