feat(governance): surface adr100 slo states
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m0s
CD Pipeline / build-and-deploy (push) Successful in 4m0s
CD Pipeline / post-deploy-checks (push) Successful in 1m55s

This commit is contained in:
Your Name
2026-05-14 19:57:32 +08:00
parent 6c16a7b162
commit 809bc9670b
7 changed files with 559 additions and 40 deletions

View File

@@ -20,6 +20,7 @@ from __future__ import annotations
import structlog
from fastapi import APIRouter, Query
from src.services.adr100_slo_status_service import get_adr100_slo_status_service
from src.services.ai_slo_calculator import AiSloCalculator
logger = structlog.get_logger(__name__)
@@ -50,9 +51,11 @@ async def get_ai_slo(
if cached:
data = cached.to_dict()
data["cache_hit"] = True
data["adr100"] = await get_adr100_slo_status_service().fetch_report()
return data
report = await calc.run()
data = report.to_dict()
data["cache_hit"] = False
data["adr100"] = await get_adr100_slo_status_service().fetch_report()
return data

View File

@@ -0,0 +1,278 @@
"""
Read-only ADR-100 SLO status snapshot.
GovernanceAgent.check_slo_compliance() can emit governance alerts when an SLO is
violated. This service is intentionally read-only so dashboards can show the
same Prometheus-backed state without producing Telegram/DB side effects.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any
import httpx
import structlog
from src.core.config import settings
logger = structlog.get_logger(__name__)
@dataclass(frozen=True)
class Adr100SloDefinition:
name: str
query: str
target: float
hard_red_line: float
direction: str
unit: str
window: str
denominator_query: str | None = None
denominator_window_seconds: int = 0
minimum_events: float = 1.0
ADR100_SLO_DEFINITIONS: tuple[Adr100SloDefinition, ...] = (
Adr100SloDefinition(
name="autonomy_rate",
query="sli:autonomy_rate:5m",
target=0.80,
hard_red_line=0.70,
direction="above",
unit="percent",
window="5m",
denominator_query="sum(rate(automation_operation_log_total[5m]))",
denominator_window_seconds=300,
),
Adr100SloDefinition(
name="decision_accuracy",
query="sli:decision_accuracy:5m",
target=0.90,
hard_red_line=0.85,
direction="above",
unit="percent",
window="5m",
denominator_query='sum(rate(automation_operation_log_total{outcome="auto_executed"}[5m]))',
denominator_window_seconds=300,
),
Adr100SloDefinition(
name="confidence_calibration",
query="sli:confidence_calibration:1h",
target=0.80,
hard_red_line=0.70,
direction="above",
unit="percent",
window="1h",
denominator_query="sum(rate(approval_records_high_confidence_total[1h]))",
denominator_window_seconds=3600,
),
Adr100SloDefinition(
name="km_growth_rate",
query="max(knowledge_entries_created_24h) or max(sli:km_growth_rate:24h)",
target=20.0,
hard_red_line=5.0,
direction="above",
unit="count",
window="24h",
),
)
class Adr100SloStatusService:
"""Fetch ADR-100 SLO status from Prometheus without writing governance events."""
async def fetch_report(self) -> dict[str, Any]:
prom_url = getattr(
settings,
"PROMETHEUS_URL",
"http://prometheus.observability.svc:9090",
).rstrip("/")
metrics: list[dict[str, Any]] = []
async with httpx.AsyncClient(timeout=5.0) as client:
for definition in ADR100_SLO_DEFINITIONS:
metrics.append(await self._fetch_metric(client, prom_url, definition))
evaluable = [metric for metric in metrics if metric.get("evaluable")]
ok_count = sum(1 for metric in evaluable if metric.get("status") == "ok")
overall_compliance = (ok_count / len(evaluable)) if evaluable else None
overall_status = _overall_status(metrics, evaluable)
return {
"schema_version": "adr100_slo_status_v1",
"source": "prometheus",
"evaluated_at": datetime.now(UTC).isoformat(),
"overall_status": overall_status,
"overall_compliance": overall_compliance,
"evaluable_count": len(evaluable),
"metric_count": len(metrics),
"metrics": metrics,
}
async def _fetch_metric(
self,
client: httpx.AsyncClient,
prom_url: str,
definition: Adr100SloDefinition,
) -> dict[str, Any]:
denominator_value: float | None = None
sample_count: float | None = None
if definition.denominator_query:
denominator_result = await _query_prometheus_value(
client,
prom_url,
definition.denominator_query,
)
if denominator_result["status"] != "ok":
return _metric_payload(
definition,
value=None,
status="no_data",
reason=denominator_result["reason"],
denominator_value=None,
sample_count=None,
)
denominator_value = float(denominator_result["value"])
sample_count = denominator_value * definition.denominator_window_seconds
if sample_count < definition.minimum_events:
return _metric_payload(
definition,
value=None,
status="skipped_low_volume",
reason="denominator_below_minimum_events",
denominator_value=denominator_value,
sample_count=sample_count,
)
value_result = await _query_prometheus_value(client, prom_url, definition.query)
if value_result["status"] != "ok":
status = (
"skipped_low_volume"
if value_result["reason"] == "prometheus_nan_or_inf"
else "no_data"
)
return _metric_payload(
definition,
value=None,
status=status,
reason=value_result["reason"],
denominator_value=denominator_value,
sample_count=sample_count,
)
value = float(value_result["value"])
status = _classify_status(value, definition)
return _metric_payload(
definition,
value=value,
status=status,
reason=None,
denominator_value=denominator_value,
sample_count=sample_count if sample_count is not None else value,
)
async def _query_prometheus_value(
client: httpx.AsyncClient,
prom_url: str,
query: str,
) -> dict[str, Any]:
try:
response = await client.get(
f"{prom_url}/api/v1/query",
params={"query": query},
)
data = response.json()
if data.get("status") != "success":
return {"status": "error", "reason": "prometheus_query_failed"}
results = data.get("data", {}).get("result", [])
if not results:
return {
"status": "no_data",
"reason": "prometheus_empty_result_metric_not_emitted",
}
raw_value = results[0]["value"][1]
value = float(raw_value)
if not math.isfinite(value):
return {
"status": "skipped",
"reason": "prometheus_nan_or_inf",
"raw_value": raw_value,
}
return {"status": "ok", "value": value}
except Exception as exc:
logger.warning("adr100_slo_prometheus_query_error", query=query, error=str(exc))
return {"status": "error", "reason": "prometheus_query_error"}
def _metric_payload(
definition: Adr100SloDefinition,
*,
value: float | None,
status: str,
reason: str | None,
denominator_value: float | None,
sample_count: float | None,
) -> dict[str, Any]:
return {
"name": definition.name,
"query": definition.query,
"value": value,
"target": definition.target,
"hard_red_line": definition.hard_red_line,
"direction": definition.direction,
"unit": definition.unit,
"window": definition.window,
"status": status,
"evaluable": status in {"ok", "warning", "violated"},
"reason": reason,
"denominator_query": definition.denominator_query,
"denominator_value": denominator_value,
"sample_count": sample_count,
}
def _classify_status(value: float, definition: Adr100SloDefinition) -> str:
if definition.direction == "above":
if value < definition.hard_red_line:
return "violated"
if value < definition.target:
return "warning"
return "ok"
if value > definition.hard_red_line:
return "violated"
if value > definition.target:
return "warning"
return "ok"
def _overall_status(metrics: list[dict[str, Any]], evaluable: list[dict[str, Any]]) -> str:
if any(metric.get("status") == "violated" for metric in metrics):
return "violated"
if any(metric.get("status") == "warning" for metric in metrics):
return "warning"
if evaluable and any(metric.get("status") == "skipped_low_volume" for metric in metrics):
return "partial"
if evaluable:
return "ok"
if any(metric.get("status") == "no_data" for metric in metrics):
return "no_data"
return "skipped_low_volume"
_adr100_slo_status_service: Adr100SloStatusService | None = None
def get_adr100_slo_status_service() -> Adr100SloStatusService:
global _adr100_slo_status_service
if _adr100_slo_status_service is None:
_adr100_slo_status_service = Adr100SloStatusService()
return _adr100_slo_status_service