261 lines
10 KiB
Python
Executable File
261 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Validate the backup alert label contract.
|
|
|
|
Node exporter textfile metrics use labels such as job="backup_all" locally, but
|
|
Prometheus rewrites that metric label to exported_job because the scrape target
|
|
already has job="node-exporter-110". Backup alerts must therefore use
|
|
$labels.exported_job in user-facing text and exported_job="..." in expressions.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import urllib.parse
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
|
|
DEFAULT_RULES = Path("ops/monitoring/alerts-unified.yml")
|
|
DEFAULT_BASELINE = Path("ops/reboot-recovery/full-stack-backup-baseline.yml")
|
|
|
|
|
|
class ContractError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def _load_alerts(path: Path) -> dict[str, dict[str, Any]]:
|
|
data = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
|
|
alerts: dict[str, dict[str, Any]] = {}
|
|
for group in data.get("groups") or []:
|
|
for rule in group.get("rules") or []:
|
|
name = rule.get("alert")
|
|
if name:
|
|
alerts[name] = rule
|
|
return alerts
|
|
|
|
|
|
def _annotation_text(rule: dict[str, Any]) -> str:
|
|
annotations = rule.get("annotations") or {}
|
|
return "\n".join(str(value) for value in annotations.values())
|
|
|
|
|
|
def _require_alert(alerts: dict[str, dict[str, Any]], name: str) -> dict[str, Any]:
|
|
if name not in alerts:
|
|
raise ContractError(f"missing alert: {name}")
|
|
return alerts[name]
|
|
|
|
|
|
def _require_contains(value: str, expected: str, label: str) -> None:
|
|
if expected not in value:
|
|
raise ContractError(f"{label} must contain {expected!r}")
|
|
|
|
|
|
def _require_not_contains(value: str, forbidden: str, label: str) -> None:
|
|
if forbidden in value:
|
|
raise ContractError(f"{label} must not contain {forbidden!r}")
|
|
|
|
|
|
def _expected_backup_alerts(path: Path) -> list[str]:
|
|
data = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
|
|
alerts = data.get("monitoring_contract", {}).get("prometheus_alerts") or []
|
|
if not alerts:
|
|
raise ContractError(f"missing monitoring_contract.prometheus_alerts in {path}")
|
|
return [str(alert) for alert in alerts]
|
|
|
|
|
|
def static_check(path: Path, baseline_path: Path) -> list[str]:
|
|
alerts = _load_alerts(path)
|
|
lines: list[str] = []
|
|
|
|
missing = sorted(set(_expected_backup_alerts(baseline_path)) - set(alerts))
|
|
if missing:
|
|
raise ContractError(f"alerts-unified.yml missing baseline backup alerts: {missing}")
|
|
lines.append("OK alerts-unified.yml contains every baseline backup alert")
|
|
|
|
rule = _require_alert(alerts, "BackupExpectedJobMissing")
|
|
_require_contains(str(rule.get("expr", "")), "awoooi_backup_job_configured", "BackupExpectedJobMissing expr")
|
|
text = _annotation_text(rule)
|
|
_require_contains(text, "$labels.exported_job", "BackupExpectedJobMissing annotations")
|
|
_require_not_contains(text, "$labels.job", "BackupExpectedJobMissing annotations")
|
|
lines.append("OK BackupExpectedJobMissing uses exported_job label")
|
|
|
|
rule = _require_alert(alerts, "BackupJobStale")
|
|
_require_contains(str(rule.get("expr", "")), "awoooi_backup_job_fresh", "BackupJobStale expr")
|
|
text = _annotation_text(rule)
|
|
_require_contains(text, "$labels.exported_job", "BackupJobStale annotations")
|
|
_require_not_contains(text, "$labels.job", "BackupJobStale annotations")
|
|
for required_label in ["$labels.max_age_hours", "$labels.source", "$labels.target"]:
|
|
_require_contains(text, required_label, "BackupJobStale annotations")
|
|
lines.append("OK BackupJobStale uses exported_job/source/target labels")
|
|
|
|
rule = _require_alert(alerts, "BackupAggregateRunFailed")
|
|
_require_contains(
|
|
str(rule.get("expr", "")),
|
|
'awoooi_backup_last_run_failed_count{host="110",exported_job="backup_all"}',
|
|
"BackupAggregateRunFailed expr",
|
|
)
|
|
lines.append("OK BackupAggregateRunFailed filters exported_job=backup_all")
|
|
|
|
rule = _require_alert(alerts, "BackupConfigCapturePartial")
|
|
_require_contains(str(rule.get("expr", "")), "awoooi_backup_config_capture_ok", "BackupConfigCapturePartial expr")
|
|
text = _annotation_text(rule)
|
|
for required_label in ["$labels.target", "$labels.source"]:
|
|
_require_contains(text, required_label, "BackupConfigCapturePartial annotations")
|
|
lines.append("OK BackupConfigCapturePartial uses target/source labels")
|
|
|
|
rule = _require_alert(alerts, "BackupConfigCaptureStatusStale")
|
|
_require_contains(
|
|
str(rule.get("expr", "")),
|
|
"awoooi_backup_config_capture_status_timestamp",
|
|
"BackupConfigCaptureStatusStale expr",
|
|
)
|
|
lines.append("OK BackupConfigCaptureStatusStale checks config capture status timestamp")
|
|
|
|
rule = _require_alert(alerts, "BackupScriptMissing")
|
|
_require_contains(_annotation_text(rule), "$labels.script", "BackupScriptMissing annotations")
|
|
lines.append("OK BackupScriptMissing uses script label")
|
|
|
|
rule = _require_alert(alerts, "BackupCredentialEscrowEvidenceMissing")
|
|
_require_contains(_annotation_text(rule), "$labels.item", "BackupCredentialEscrowEvidenceMissing annotations")
|
|
lines.append("OK BackupCredentialEscrowEvidenceMissing uses item label")
|
|
|
|
return lines
|
|
|
|
|
|
def _prom_query(base_url: str, expr: str) -> list[dict[str, Any]]:
|
|
query = urllib.parse.urlencode({"query": expr})
|
|
url = f"{base_url.rstrip('/')}/api/v1/query?{query}"
|
|
with urllib.request.urlopen(url, timeout=8) as response:
|
|
payload = json.loads(response.read().decode("utf-8"))
|
|
if payload.get("status") != "success":
|
|
raise ContractError(f"Prometheus query failed for {expr}: {payload}")
|
|
return payload.get("data", {}).get("result") or []
|
|
|
|
|
|
def _prom_rules(base_url: str) -> list[dict[str, Any]]:
|
|
url = f"{base_url.rstrip('/')}/api/v1/rules"
|
|
with urllib.request.urlopen(url, timeout=8) as response:
|
|
payload = json.loads(response.read().decode("utf-8"))
|
|
if payload.get("status") != "success":
|
|
raise ContractError(f"Prometheus rules query failed: {payload}")
|
|
rules: list[dict[str, Any]] = []
|
|
for group in payload.get("data", {}).get("groups") or []:
|
|
for rule in group.get("rules") or []:
|
|
name = rule.get("name") or rule.get("alert")
|
|
if not name:
|
|
continue
|
|
rules.append(
|
|
{
|
|
"name": str(name),
|
|
"health": str(rule.get("health", "")),
|
|
"state": str(rule.get("state", "")),
|
|
"group": str(group.get("name", "")),
|
|
}
|
|
)
|
|
return rules
|
|
|
|
|
|
def _require_live_label(base_url: str, expr: str, labels: set[str]) -> str:
|
|
rows = _prom_query(base_url, expr)
|
|
if not rows:
|
|
raise ContractError(f"Prometheus query returned no series: {expr}")
|
|
metric = rows[0].get("metric") or {}
|
|
missing = sorted(label for label in labels if label not in metric)
|
|
if missing:
|
|
raise ContractError(f"{expr} missing labels {missing}; labels={sorted(metric)}")
|
|
return f"OK live {expr} exposes labels {','.join(sorted(labels))}"
|
|
|
|
|
|
def _require_live_rules(base_url: str, expected_alerts: list[str]) -> list[str]:
|
|
rules = _prom_rules(base_url)
|
|
by_name = {rule["name"]: rule for rule in rules}
|
|
missing = sorted(set(expected_alerts) - set(by_name))
|
|
if missing:
|
|
raise ContractError(f"Prometheus missing loaded backup alert rules: {missing}")
|
|
|
|
unhealthy = [
|
|
f"{rule['name']} health={rule['health']} group={rule['group']}"
|
|
for rule in by_name.values()
|
|
if rule["name"] in expected_alerts and rule["health"] not in {"", "ok"}
|
|
]
|
|
if unhealthy:
|
|
raise ContractError(f"Prometheus backup alert rule health is not ok: {unhealthy}")
|
|
|
|
state_counts: dict[str, int] = {}
|
|
for name in expected_alerts:
|
|
state = by_name[name]["state"] or "unknown"
|
|
state_counts[state] = state_counts.get(state, 0) + 1
|
|
state_summary = ",".join(f"{key}={state_counts[key]}" for key in sorted(state_counts))
|
|
return [
|
|
f"OK live Prometheus loaded {len(expected_alerts)} baseline backup alert rules",
|
|
f"OK live Prometheus backup alert rule states {state_summary}",
|
|
]
|
|
|
|
|
|
def live_check(base_url: str, baseline_path: Path) -> list[str]:
|
|
lines = [
|
|
_require_live_label(
|
|
base_url,
|
|
'awoooi_backup_job_configured{host="110"}',
|
|
{"exported_job", "host", "job"},
|
|
),
|
|
_require_live_label(
|
|
base_url,
|
|
'awoooi_backup_job_fresh{host="110"}',
|
|
{"exported_job", "host", "job", "source", "target", "max_age_hours"},
|
|
),
|
|
_require_live_label(
|
|
base_url,
|
|
'awoooi_backup_last_run_failed_count{host="110"}',
|
|
{"exported_job", "host", "job"},
|
|
),
|
|
_require_live_label(
|
|
base_url,
|
|
'awoooi_backup_dr_next_step_info{host="110"}',
|
|
{"host", "next_step"},
|
|
),
|
|
_require_live_label(
|
|
base_url,
|
|
'awoooi_backup_offsite_partial_fresh{host="110",provider="rclone"}',
|
|
{"host", "provider", "scope", "max_age_hours"},
|
|
),
|
|
_require_live_label(
|
|
base_url,
|
|
'awoooi_backup_config_capture_ok{host="110"}',
|
|
{"host", "target", "source", "critical"},
|
|
),
|
|
]
|
|
lines.extend(_require_live_rules(base_url, _expected_backup_alerts(baseline_path)))
|
|
return lines
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--rules", type=Path, default=DEFAULT_RULES)
|
|
parser.add_argument("--baseline", type=Path, default=DEFAULT_BASELINE)
|
|
parser.add_argument("--prometheus-url", default="")
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
for line in static_check(args.rules, args.baseline):
|
|
print(line)
|
|
if args.prometheus_url:
|
|
for line in live_check(args.prometheus_url, args.baseline):
|
|
print(line)
|
|
except (ContractError, OSError, yaml.YAMLError, json.JSONDecodeError) as exc:
|
|
print(f"BACKUP_ALERT_LABEL_CONTRACT_FAILED {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
print("BACKUP_ALERT_LABEL_CONTRACT_OK")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|