243 lines
9.1 KiB
Python
Executable File
243 lines
9.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Verify live visibility for backup gap alerts.
|
|
|
|
This read-only check closes the gap between "metrics exist" and "alerts are
|
|
actually visible". If the offsite or credential-escrow gap metrics are present,
|
|
the corresponding Prometheus firing alerts must be visible. When Alertmanager is
|
|
provided, those same alerts must also be active there.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
import urllib.request
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
|
|
class VisibilityError(RuntimeError):
|
|
pass
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RequiredAlert:
|
|
name: str
|
|
labels: dict[str, str]
|
|
|
|
|
|
COMMON_LABELS = {
|
|
"host": "110",
|
|
"auto_repair": "false",
|
|
"alert_category": "infrastructure",
|
|
"notification_type": "TYPE-1",
|
|
"severity": "warning",
|
|
}
|
|
|
|
|
|
def _json_get(url: str, timeout: int) -> Any:
|
|
with urllib.request.urlopen(url, timeout=timeout) as response:
|
|
return json.loads(response.read().decode("utf-8"))
|
|
|
|
|
|
def _prom_query(base_url: str, expr: str, timeout: int) -> list[dict[str, Any]]:
|
|
query = urllib.parse.urlencode({"query": expr})
|
|
url = f"{base_url.rstrip('/')}/api/v1/query?{query}"
|
|
payload = _json_get(url, timeout)
|
|
if payload.get("status") != "success":
|
|
raise VisibilityError(f"Prometheus query failed for {expr}: {payload}")
|
|
return payload.get("data", {}).get("result") or []
|
|
|
|
|
|
def _prom_alerts(base_url: str, timeout: int) -> list[dict[str, Any]]:
|
|
url = f"{base_url.rstrip('/')}/api/v1/alerts"
|
|
payload = _json_get(url, timeout)
|
|
if payload.get("status") != "success":
|
|
raise VisibilityError(f"Prometheus alerts query failed: {payload}")
|
|
return payload.get("data", {}).get("alerts") or []
|
|
|
|
|
|
def _alertmanager_alerts(base_url: str, timeout: int) -> list[dict[str, Any]]:
|
|
url = f"{base_url.rstrip('/')}/api/v2/alerts"
|
|
payload = _json_get(url, timeout)
|
|
if not isinstance(payload, list):
|
|
raise VisibilityError(f"Alertmanager alerts query returned unexpected payload: {payload}")
|
|
return payload
|
|
|
|
|
|
def _float_value(row: dict[str, Any], expr: str) -> float:
|
|
value = row.get("value")
|
|
if not isinstance(value, list) or len(value) < 2:
|
|
raise VisibilityError(f"Prometheus query returned unexpected value for {expr}: {row}")
|
|
try:
|
|
return float(value[1])
|
|
except (TypeError, ValueError) as exc:
|
|
raise VisibilityError(f"Prometheus query returned non-numeric value for {expr}: {row}") from exc
|
|
|
|
|
|
def _metric_labels(row: dict[str, Any]) -> dict[str, str]:
|
|
metric = row.get("metric") or {}
|
|
return {str(key): str(value) for key, value in metric.items()}
|
|
|
|
|
|
def _labels_match(actual: dict[str, str], expected: dict[str, str]) -> bool:
|
|
return all(actual.get(key) == value for key, value in expected.items())
|
|
|
|
|
|
def _find_prom_alert(alerts: list[dict[str, Any]], required: RequiredAlert) -> dict[str, Any] | None:
|
|
expected = {"alertname": required.name, **required.labels}
|
|
for alert in alerts:
|
|
if str(alert.get("state", "")) != "firing":
|
|
continue
|
|
labels = {str(key): str(value) for key, value in (alert.get("labels") or {}).items()}
|
|
if _labels_match(labels, expected):
|
|
return alert
|
|
return None
|
|
|
|
|
|
def _find_alertmanager_alert(alerts: list[dict[str, Any]], required: RequiredAlert) -> dict[str, Any] | None:
|
|
expected = {"alertname": required.name, **required.labels}
|
|
for alert in alerts:
|
|
status = alert.get("status") or {}
|
|
if str(status.get("state", "")) != "active":
|
|
continue
|
|
labels = {str(key): str(value) for key, value in (alert.get("labels") or {}).items()}
|
|
if _labels_match(labels, expected):
|
|
return alert
|
|
return None
|
|
|
|
|
|
def _require_prom_alert(alerts: list[dict[str, Any]], required: RequiredAlert) -> None:
|
|
if _find_prom_alert(alerts, required) is None:
|
|
raise VisibilityError(
|
|
f"missing Prometheus firing alert {required.name} with labels {required.labels}"
|
|
)
|
|
|
|
|
|
def _require_alertmanager_alert(alerts: list[dict[str, Any]], required: RequiredAlert) -> None:
|
|
if _find_alertmanager_alert(alerts, required) is None:
|
|
raise VisibilityError(
|
|
f"missing Alertmanager active alert {required.name} with labels {required.labels}"
|
|
)
|
|
|
|
|
|
def _sum_query_values(prometheus_url: str, expr: str, timeout: int) -> float:
|
|
return sum(_float_value(row, expr) for row in _prom_query(prometheus_url, expr, timeout))
|
|
|
|
|
|
def _max_query_value(prometheus_url: str, expr: str, timeout: int) -> float:
|
|
rows = _prom_query(prometheus_url, expr, timeout)
|
|
if not rows:
|
|
return 0
|
|
return max(_float_value(row, expr) for row in rows)
|
|
|
|
|
|
def _offsite_required_alerts(prometheus_url: str, host: str, timeout: int) -> tuple[list[RequiredAlert], str]:
|
|
expr = f'awoooi_backup_offsite_configured{{host="{host}"}}'
|
|
rows = _prom_query(prometheus_url, expr, timeout)
|
|
if not rows:
|
|
raise VisibilityError(f"Prometheus query returned no offsite configured series: {expr}")
|
|
configured_total = sum(_float_value(row, expr) for row in rows)
|
|
if configured_total == 0:
|
|
return (
|
|
[RequiredAlert("BackupOffsiteCopyNotConfigured", {**COMMON_LABELS, "host": host})],
|
|
"OK offsite gap metric requires BackupOffsiteCopyNotConfigured visibility",
|
|
)
|
|
|
|
fresh_expr = f'awoooi_backup_offsite_fresh{{host="{host}"}}'
|
|
if _sum_query_values(prometheus_url, fresh_expr, timeout) > 0:
|
|
return [], "OK offsite full marker is fresh; no offsite gap alert required"
|
|
|
|
enabled_expr = f'awoooi_backup_offsite_full_sync_enabled{{host="{host}"}}'
|
|
enabled_total = _sum_query_values(prometheus_url, enabled_expr, timeout)
|
|
if enabled_total > 0:
|
|
timestamp_expr = f'awoooi_backup_offsite_full_sync_enabled_timestamp{{host="{host}"}}'
|
|
enabled_timestamp = _max_query_value(prometheus_url, timestamp_expr, timeout)
|
|
enabled_age = int(time.time() - enabled_timestamp) if enabled_timestamp else 0
|
|
if enabled_timestamp and enabled_age <= 30 * 3600:
|
|
return (
|
|
[],
|
|
f"OK offsite full sync enabled within grace window; BackupOffsiteCopyStale not required yet age_seconds={enabled_age}",
|
|
)
|
|
|
|
return (
|
|
[RequiredAlert("BackupOffsiteCopyStale", {**COMMON_LABELS, "host": host})],
|
|
"OK offsite full marker gap requires BackupOffsiteCopyStale visibility",
|
|
)
|
|
|
|
|
|
def _escrow_required_alerts(prometheus_url: str, host: str, timeout: int) -> list[RequiredAlert]:
|
|
expr = f'awoooi_backup_credential_escrow_fresh{{host="{host}"}} == 0'
|
|
rows = _prom_query(prometheus_url, expr, timeout)
|
|
required: list[RequiredAlert] = []
|
|
for row in rows:
|
|
labels = _metric_labels(row)
|
|
item = labels.get("item")
|
|
if not item:
|
|
raise VisibilityError(f"Credential escrow gap metric missing item label: {row}")
|
|
required.append(
|
|
RequiredAlert(
|
|
"BackupCredentialEscrowEvidenceMissing",
|
|
{**COMMON_LABELS, "host": host, "item": item},
|
|
)
|
|
)
|
|
return sorted(required, key=lambda alert: alert.labels["item"])
|
|
|
|
|
|
def live_check(prometheus_url: str, alertmanager_url: str, host: str, timeout: int) -> list[str]:
|
|
required_alerts: list[RequiredAlert] = []
|
|
lines: list[str] = []
|
|
|
|
offsite_alerts, offsite_line = _offsite_required_alerts(prometheus_url, host, timeout)
|
|
required_alerts.extend(offsite_alerts)
|
|
lines.append(offsite_line)
|
|
|
|
escrow_alerts = _escrow_required_alerts(prometheus_url, host, timeout)
|
|
required_alerts.extend(escrow_alerts)
|
|
if escrow_alerts:
|
|
escrow_items = ", ".join(alert.labels["item"] for alert in escrow_alerts)
|
|
lines.append(
|
|
f"OK credential escrow gap metrics require {len(escrow_alerts)} alert(s): {escrow_items}"
|
|
)
|
|
else:
|
|
lines.append("OK credential escrow markers are fresh; no escrow gap alert required")
|
|
|
|
prom_alerts = _prom_alerts(prometheus_url, timeout)
|
|
for required in required_alerts:
|
|
_require_prom_alert(prom_alerts, required)
|
|
lines.append(f"OK Prometheus exposes {len(required_alerts)} required backup gap firing alert(s)")
|
|
|
|
if alertmanager_url:
|
|
am_alerts = _alertmanager_alerts(alertmanager_url, timeout)
|
|
for required in required_alerts:
|
|
_require_alertmanager_alert(am_alerts, required)
|
|
lines.append(f"OK Alertmanager exposes {len(required_alerts)} required backup gap active alert(s)")
|
|
|
|
return lines
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--prometheus-url", required=True)
|
|
parser.add_argument("--alertmanager-url", default="")
|
|
parser.add_argument("--host", default="110")
|
|
parser.add_argument("--timeout", type=int, default=8)
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
for line in live_check(args.prometheus_url, args.alertmanager_url, args.host, args.timeout):
|
|
print(line)
|
|
except (VisibilityError, OSError, json.JSONDecodeError) as exc:
|
|
print(f"BACKUP_ALERT_LIVE_VISIBILITY_FAILED {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
print("BACKUP_ALERT_LIVE_VISIBILITY_OK")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|