114 lines
3.3 KiB
Python
114 lines
3.3 KiB
Python
"""In-process AI automation metrics for Prometheus export.
|
|
|
|
These counters are intentionally lightweight and dependency-free. The `/metrics`
|
|
route converts snapshots into Prometheus gauges so EventRouter and AutoHeal do
|
|
not need to know about Flask or prometheus_client.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import threading
|
|
from collections import defaultdict
|
|
from copy import deepcopy
|
|
from typing import Any, Dict, Tuple
|
|
|
|
|
|
_LOCK = threading.Lock()
|
|
_COUNTERS: Dict[Tuple[str, Tuple[Tuple[str, str], ...]], int] = defaultdict(int)
|
|
_LATENCY: Dict[Tuple[str, Tuple[Tuple[str, str], ...]], Dict[str, float]] = defaultdict(
|
|
lambda: {"count": 0, "sum": 0.0, "max": 0.0}
|
|
)
|
|
|
|
|
|
def _safe_label(value: Any, default: str = "unknown", limit: int = 80) -> str:
|
|
text = str(value if value not in (None, "") else default)
|
|
return text[:limit]
|
|
|
|
|
|
def _key(metric: str, labels: Dict[str, Any]) -> Tuple[str, Tuple[Tuple[str, str], ...]]:
|
|
return metric, tuple(sorted((k, _safe_label(v)) for k, v in labels.items()))
|
|
|
|
|
|
def _inc(metric: str, labels: Dict[str, Any], amount: int = 1) -> None:
|
|
with _LOCK:
|
|
_COUNTERS[_key(metric, labels)] += int(amount)
|
|
|
|
|
|
def _observe(metric: str, labels: Dict[str, Any], value: float) -> None:
|
|
with _LOCK:
|
|
bucket = _LATENCY[_key(metric, labels)]
|
|
bucket["count"] += 1
|
|
bucket["sum"] += float(value)
|
|
bucket["max"] = max(bucket["max"], float(value))
|
|
|
|
|
|
def record_event_router_dispatch(
|
|
*,
|
|
tier: str,
|
|
event_type: str,
|
|
delivered: bool,
|
|
queued: bool = False,
|
|
deduped: bool = False,
|
|
silenced: bool = False,
|
|
latency_ms: int | float = 0,
|
|
) -> None:
|
|
if silenced:
|
|
outcome = "silenced"
|
|
elif deduped:
|
|
outcome = "deduped"
|
|
elif queued:
|
|
outcome = "queued"
|
|
elif delivered:
|
|
outcome = "delivered"
|
|
else:
|
|
outcome = "failed"
|
|
|
|
labels = {"tier": tier, "event_type": event_type, "outcome": outcome}
|
|
_inc("event_router_dispatch_total", labels)
|
|
_observe("event_router_latency_ms", {"tier": tier, "event_type": event_type}, latency_ms)
|
|
|
|
|
|
def record_event_router_safe_action(action: str, status: str) -> None:
|
|
_inc("event_router_safe_action_total", {"action": action, "status": status})
|
|
|
|
|
|
def record_event_router_replay(*, attempted: int = 0, sent: int = 0, failed: int = 0, dropped: int = 0) -> None:
|
|
for status, value in {
|
|
"attempted": attempted,
|
|
"sent": sent,
|
|
"failed": failed,
|
|
"dropped": dropped,
|
|
}.items():
|
|
if value:
|
|
_inc("event_router_replay_total", {"status": status}, int(value))
|
|
|
|
|
|
def record_autoheal_action(
|
|
*,
|
|
action: str,
|
|
error_type: str,
|
|
success: bool,
|
|
duration_ms: int | float = 0,
|
|
) -> None:
|
|
labels = {
|
|
"action": action,
|
|
"error_type": error_type,
|
|
"result": "success" if success else "failed",
|
|
}
|
|
_inc("autoheal_action_total", labels)
|
|
_observe("autoheal_duration_ms", {"action": action, "error_type": error_type}, duration_ms)
|
|
|
|
|
|
def snapshot() -> Dict[str, Any]:
|
|
with _LOCK:
|
|
return {
|
|
"counters": {key: value for key, value in _COUNTERS.items()},
|
|
"latency": {key: deepcopy(value) for key, value in _LATENCY.items()},
|
|
}
|
|
|
|
|
|
def reset_for_tests() -> None:
|
|
with _LOCK:
|
|
_COUNTERS.clear()
|
|
_LATENCY.clear()
|