Files
ewoooc/services/ai_automation_metrics.py
OoO e6a1c9d09f
All checks were successful
CD Pipeline / deploy (push) Successful in 1m15s
補齊 AI 自動化可觀測性指標
2026-04-29 23:41:03 +08:00

114 lines
3.3 KiB
Python

"""In-process AI automation metrics for Prometheus export.
These counters are intentionally lightweight and dependency-free. The `/metrics`
route converts snapshots into Prometheus gauges so EventRouter and AutoHeal do
not need to know about Flask or prometheus_client.
"""
from __future__ import annotations
import threading
from collections import defaultdict
from copy import deepcopy
from typing import Any, Dict, Tuple
_LOCK = threading.Lock()
_COUNTERS: Dict[Tuple[str, Tuple[Tuple[str, str], ...]], int] = defaultdict(int)
_LATENCY: Dict[Tuple[str, Tuple[Tuple[str, str], ...]], Dict[str, float]] = defaultdict(
lambda: {"count": 0, "sum": 0.0, "max": 0.0}
)
def _safe_label(value: Any, default: str = "unknown", limit: int = 80) -> str:
text = str(value if value not in (None, "") else default)
return text[:limit]
def _key(metric: str, labels: Dict[str, Any]) -> Tuple[str, Tuple[Tuple[str, str], ...]]:
return metric, tuple(sorted((k, _safe_label(v)) for k, v in labels.items()))
def _inc(metric: str, labels: Dict[str, Any], amount: int = 1) -> None:
with _LOCK:
_COUNTERS[_key(metric, labels)] += int(amount)
def _observe(metric: str, labels: Dict[str, Any], value: float) -> None:
with _LOCK:
bucket = _LATENCY[_key(metric, labels)]
bucket["count"] += 1
bucket["sum"] += float(value)
bucket["max"] = max(bucket["max"], float(value))
def record_event_router_dispatch(
*,
tier: str,
event_type: str,
delivered: bool,
queued: bool = False,
deduped: bool = False,
silenced: bool = False,
latency_ms: int | float = 0,
) -> None:
if silenced:
outcome = "silenced"
elif deduped:
outcome = "deduped"
elif queued:
outcome = "queued"
elif delivered:
outcome = "delivered"
else:
outcome = "failed"
labels = {"tier": tier, "event_type": event_type, "outcome": outcome}
_inc("event_router_dispatch_total", labels)
_observe("event_router_latency_ms", {"tier": tier, "event_type": event_type}, latency_ms)
def record_event_router_safe_action(action: str, status: str) -> None:
_inc("event_router_safe_action_total", {"action": action, "status": status})
def record_event_router_replay(*, attempted: int = 0, sent: int = 0, failed: int = 0, dropped: int = 0) -> None:
for status, value in {
"attempted": attempted,
"sent": sent,
"failed": failed,
"dropped": dropped,
}.items():
if value:
_inc("event_router_replay_total", {"status": status}, int(value))
def record_autoheal_action(
*,
action: str,
error_type: str,
success: bool,
duration_ms: int | float = 0,
) -> None:
labels = {
"action": action,
"error_type": error_type,
"result": "success" if success else "failed",
}
_inc("autoheal_action_total", labels)
_observe("autoheal_duration_ms", {"action": action, "error_type": error_type}, duration_ms)
def snapshot() -> Dict[str, Any]:
with _LOCK:
return {
"counters": {key: value for key, value in _COUNTERS.items()},
"latency": {key: deepcopy(value) for key, value in _LATENCY.items()},
}
def reset_for_tests() -> None:
with _LOCK:
_COUNTERS.clear()
_LATENCY.clear()