Files
awoooi/scripts/ops/recovery-scorecard-contract-check.py

250 lines
8.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""Validate recovery scorecard recording-rule contract."""
from __future__ import annotations
import argparse
import json
import re
import sys
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
try:
import yaml
except ModuleNotFoundError: # pragma: no cover - exercised on lean operator hosts
yaml = None
YAML_ERROR_TYPES: tuple[type[BaseException], ...] = ()
else:
YAML_ERROR_TYPES = (yaml.YAMLError,)
DEFAULT_RULES = Path("ops/monitoring/alerts-unified.yml")
DEFAULT_BASELINE = Path("ops/reboot-recovery/full-stack-backup-baseline.yml")
EXPECTED_CORE = 'awoooi_recovery_core_ready{host="110",scope="110_120_121_188"}'
EXPECTED_DR = 'awoooi_recovery_dr_offsite_ready{host="110"}'
class ContractError(RuntimeError):
pass
RECOVERABLE_ERRORS = (ContractError, OSError, json.JSONDecodeError) + YAML_ERROR_TYPES
_RECORD_RE = re.compile(r"^(?P<indent>\s*)-\s+record:\s*(?P<record>.+?)\s*$")
_RULE_START_RE = re.compile(r"^(?P<indent>\s*)-\s+(?:record|alert):\s*.+$")
_EXPR_RE = re.compile(r"^(?P<indent>\s*)expr:\s*(?P<tail>.*)$")
_PROM_RULES_RE = re.compile(r"^(?P<indent>\s*)prometheus_recording_rules:\s*$")
_LIST_ITEM_RE = re.compile(r"^(?P<indent>\s*)-\s+(?P<value>.+?)\s*$")
def _strip_yaml_scalar(value: str) -> str:
return value.strip().strip('"').strip("'")
def _indent_width(line: str) -> int:
return len(line) - len(line.lstrip(" "))
def _fallback_rules(path: Path) -> list[dict[str, Any]]:
lines = path.read_text(encoding="utf-8").splitlines()
rules: list[dict[str, Any]] = []
index = 0
while index < len(lines):
record_match = _RECORD_RE.match(lines[index])
if not record_match:
index += 1
continue
record_indent = len(record_match.group("indent"))
rule: dict[str, Any] = {"record": _strip_yaml_scalar(record_match.group("record"))}
index += 1
while index < len(lines):
next_rule = _RULE_START_RE.match(lines[index])
if next_rule and len(next_rule.group("indent")) <= record_indent:
break
expr_match = _EXPR_RE.match(lines[index])
if not expr_match:
index += 1
continue
expr_indent = len(expr_match.group("indent"))
tail = expr_match.group("tail").strip()
if tail not in {"|", "|-", "|+"}:
rule["expr"] = _strip_yaml_scalar(tail)
index += 1
continue
block: list[str] = []
index += 1
while index < len(lines):
block_next_rule = _RULE_START_RE.match(lines[index])
if block_next_rule and len(block_next_rule.group("indent")) <= record_indent:
break
if lines[index].strip() and _indent_width(lines[index]) <= expr_indent:
break
block.append(lines[index])
index += 1
rule["expr"] = "\n".join(block)
rules.append(rule)
if not rules:
raise ContractError(f"missing recording rules in {path}")
return rules
def _fallback_expected_recording_rules(path: Path) -> list[str]:
lines = path.read_text(encoding="utf-8").splitlines()
for index, line in enumerate(lines):
key_match = _PROM_RULES_RE.match(line)
if not key_match:
continue
key_indent = len(key_match.group("indent"))
rules: list[str] = []
for child in lines[index + 1 :]:
if not child.strip():
continue
child_indent = _indent_width(child)
if child_indent <= key_indent:
break
item_match = _LIST_ITEM_RE.match(child)
if item_match and len(item_match.group("indent")) > key_indent:
rules.append(_strip_yaml_scalar(item_match.group("value")))
if rules:
return rules
raise ContractError(f"missing monitoring_contract.prometheus_recording_rules in {path}")
def _rules(path: Path) -> list[dict[str, Any]]:
if yaml is None:
return _fallback_rules(path)
data = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
rules: list[dict[str, Any]] = []
for group in data.get("groups") or []:
rules.extend(group.get("rules") or [])
return rules
def _expected_recording_rules(path: Path) -> list[str]:
if yaml is None:
return _fallback_expected_recording_rules(path)
data = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
rules = data.get("monitoring_contract", {}).get("prometheus_recording_rules") or []
if not rules:
raise ContractError(f"missing monitoring_contract.prometheus_recording_rules in {path}")
return [str(rule) for rule in rules]
def static_check(rules_path: Path, baseline_path: Path) -> list[str]:
rules = _rules(rules_path)
by_record = {str(rule.get("record")): rule for rule in rules if rule.get("record")}
expected = _expected_recording_rules(baseline_path)
missing = sorted(set(expected) - set(by_record))
if missing:
raise ContractError(f"alerts-unified.yml missing recovery recording rules: {missing}")
core_expr = str(by_record["awoooi_recovery_core_ready"].get("expr", ""))
for required in [
"awoooi_cold_start_last_result",
"awoooi_cold_start_warn_gates",
"awoooi_cold_start_blocked_gates",
"awoooi_cold_start_last_green_timestamp",
]:
if required not in core_expr:
raise ContractError(f"awoooi_recovery_core_ready expr missing {required}")
dr_expr = str(by_record["awoooi_recovery_dr_offsite_ready"].get("expr", ""))
for required in [
"awoooi_backup_offsite_configured",
"awoooi_backup_offsite_fresh",
"awoooi_backup_credential_escrow_fresh",
]:
if required not in dr_expr:
raise ContractError(f"awoooi_recovery_dr_offsite_ready expr missing {required}")
return [
"OK alerts-unified.yml contains every recovery scorecard recording rule",
"OK recovery core rule depends on cold-start green/warn/blocked/last-green metrics",
"OK recovery DR rule depends on provider-neutral offsite freshness and credential escrow freshness",
]
def _prom_query(base_url: str, expr: str) -> list[dict[str, Any]]:
url = f"{base_url.rstrip('/')}/api/v1/query?" + urllib.parse.urlencode({"query": expr})
with urllib.request.urlopen(url, timeout=8) as response:
payload = json.loads(response.read().decode("utf-8"))
if payload.get("status") != "success":
raise ContractError(f"Prometheus query failed for {expr}: {payload}")
return payload.get("data", {}).get("result") or []
def _single_value(base_url: str, expr: str) -> float:
rows = _prom_query(base_url, expr)
if len(rows) != 1:
raise ContractError(f"Prometheus query expected one series for {expr}, got {len(rows)}")
value = rows[0].get("value") or []
if len(value) < 2:
raise ContractError(f"Prometheus query returned malformed value for {expr}: {rows[0]}")
try:
number = float(value[1])
except (TypeError, ValueError) as exc:
raise ContractError(f"Prometheus query returned non-numeric value for {expr}: {rows[0]}") from exc
if number not in {0.0, 1.0}:
raise ContractError(f"Prometheus recovery scorecard metric must be 0 or 1: {expr}={number}")
return number
def live_check(
base_url: str,
expect_core_ready: bool = False,
expect_dr_ready: bool = False,
) -> list[str]:
core = _single_value(base_url, EXPECTED_CORE)
dr = _single_value(base_url, EXPECTED_DR)
lines = [
f"OK live {EXPECTED_CORE} value={int(core)}",
f"OK live {EXPECTED_DR} value={int(dr)}",
]
if expect_core_ready and core != 1.0:
raise ContractError(f"expected core recovery ready, got {core}")
if expect_dr_ready and dr != 1.0:
raise ContractError(f"expected DR offsite ready, got {dr}")
return lines
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--rules", type=Path, default=DEFAULT_RULES)
parser.add_argument("--baseline", type=Path, default=DEFAULT_BASELINE)
parser.add_argument("--prometheus-url", default="")
parser.add_argument("--expect-core-ready", action="store_true")
parser.add_argument("--expect-dr-ready", action="store_true")
args = parser.parse_args()
try:
for line in static_check(args.rules, args.baseline):
print(line)
if args.prometheus_url:
for line in live_check(
args.prometheus_url,
args.expect_core_ready,
args.expect_dr_ready,
):
print(line)
except RECOVERABLE_ERRORS as exc:
print(f"RECOVERY_SCORECARD_CONTRACT_FAILED {exc}", file=sys.stderr)
return 1
print("RECOVERY_SCORECARD_CONTRACT_OK")
return 0
if __name__ == "__main__":
raise SystemExit(main())