diff --git a/CONSTITUTION.md b/CONSTITUTION.md index c1874b2..e8d0861 100644 --- a/CONSTITUTION.md +++ b/CONSTITUTION.md @@ -2,7 +2,7 @@ > 本文件定義專案開發的核心準則與不可違反的規範 > **建立日期**: 2026-01-12 -> **當前版本**: V10.4 (四 AI Agent 自動化治理版) +> **當前版本**: V10.5 (四 AI Agent 自動化可觀測性版) > **最後更新**: 2026-04-29 --- diff --git a/TODO_NEXT_STEPS.txt b/TODO_NEXT_STEPS.txt index 9559adc..70e139c 100644 --- a/TODO_NEXT_STEPS.txt +++ b/TODO_NEXT_STEPS.txt @@ -7,12 +7,13 @@ - ADR-018:四 AI Agent 自動化控制面立案。 - Memory:新增 `docs/memory/ai_automation_closure_20260429.md`。 - Guide/Skills 替代:新增 `docs/guides/ai_automation_session_sop.md`。 - - SOT:更新 `docs/AI_INTELLIGENCE_MODULE_SOT.md` 至 V10.4 AI Automation 架構。 + - SOT:更新 `docs/AI_INTELLIGENCE_MODULE_SOT.md` 至 V10.5 AI Automation Metrics 架構。 - Codex 規則:更新 `AGENTS.md`、`CONSTITUTION.md`、ADR/memory 索引。 + - Prometheus 指標化:新增 EventRouter / AutoHeal / safe action / replay in-process metrics,並接入 `/metrics`。 【下次待辦】 - - Prometheus / Superset 指標化 `agent_action_total`、`agent_latency_seconds`、notification replay count。 - 補線上 smoke dashboard:EventRouter、AutoHeal、NemoTron fallback、OpenClaw embedding queue、ElephantAlpha HITL。 + - Superset / Grafana 視覺化:`momo_ai_event_router_dispatch_total`、`momo_ai_event_router_latency_ms_*`、`momo_ai_autoheal_action_total`。 ================================================================================ 品牌資產最終處理與維護 (Phase 7) [DONE] diff --git a/app.py b/app.py index 2d2109e..feb5271 100644 --- a/app.py +++ b/app.py @@ -95,9 +95,9 @@ except Exception as e: sys_log.error(f"無法檢測磁碟空間: {e}") # 🚩 系統版本定義 (備份與顯示用) -# 🚩 2026-04-29 V10.4: 四 AI Agent 自動化閉環 — EventRouter / AutoHeal / -# OpenClaw Memory / ElephantAlpha bridge 文件與安全規則同步 -SYSTEM_VERSION = "V10.4" +# 🚩 2026-04-29 V10.5: AI 自動化可觀測性 — EventRouter / AutoHeal / +# safe action / Telegram replay metrics 接入 /metrics +SYSTEM_VERSION = "V10.5" # ========================================== # 🔒 SQL Injection 防護函數 diff --git a/config.py b/config.py index 116c131..31ca607 100644 --- a/config.py +++ b/config.py @@ -253,7 +253,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '') # ========================================== # 系統版本與路徑 # ========================================== -SYSTEM_VERSION = "V10.4" +SYSTEM_VERSION = "V10.5" LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log') public_url = PUBLIC_URL # 用於模板顯示 diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index 8823aad..75f44bf 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -1,8 +1,8 @@ # MOMO PRO — AI 競價情報模組 Single Source of Truth > **最後更新**: 2026-04-29 (台北時間) -> **狀態**: 🟢 四 AI Agent 自動化閉環已落地 — EventRouter / AutoHeal / OpenClaw Memory / ElephantAlpha bridge 具測試覆蓋 -> **適用版本**: V10.4 AI Automation 架構 +> **狀態**: 🟢 四 AI Agent 自動化閉環已落地 — EventRouter / AutoHeal / OpenClaw Memory / ElephantAlpha bridge / Prometheus metrics 具測試覆蓋 +> **適用版本**: V10.5 AI Automation Metrics 架構 --- @@ -55,6 +55,14 @@ SQL漏斗(~300筆) - raw `ai_insights` insert 必須接 `enqueue_insight_embedding()` 或可被 backfill。 - ElephantAlpha 只做編排與 bridge,不可繞過 ADR-011 / ADR-012 / ADR-013。 +可觀測性: + +- `/metrics` 匯出 `momo_ai_event_router_dispatch_total`。 +- `/metrics` 匯出 `momo_ai_event_router_latency_ms_count/sum/max`。 +- `/metrics` 匯出 `momo_ai_event_router_safe_action_total`。 +- `/metrics` 匯出 `momo_ai_event_router_replay_total`。 +- `/metrics` 匯出 `momo_ai_autoheal_action_total` 與 `momo_ai_autoheal_duration_ms_count/sum/max`。 + --- ## 二、真實資料庫 Schema(已校對確認) diff --git a/docs/adr/ADR-012-agent-action-ladder.md b/docs/adr/ADR-012-agent-action-ladder.md index ba41d77..6542b88 100644 --- a/docs/adr/ADR-012-agent-action-ladder.md +++ b/docs/adr/ADR-012-agent-action-ladder.md @@ -137,13 +137,14 @@ L1 Hermes 掛 → L0 模板直出 + 🟡 「AI 分析暫不可用」 - **Phase 2**:Hermes L1 接入 scheduler.run_auto_import_task + run_momo_task 兩個 exception - **Phase 3**:NemoTron 擴充 3 個新 tool (retry/query_km/silence) - **Phase 4**:依需求擴 L3 HITL 按鈕 -- **Phase 5**:Prometheus metric 接入(`agent_action_total{tier,agent,event_type}`、`agent_latency_seconds`) +- **Phase 5**:Prometheus metric 接入(`momo_ai_event_router_dispatch_total`、`momo_ai_event_router_latency_ms_*`) ## 2026-04-29 Implementation Update - Phase 1~3 核心已落地:EventRouter 分流、L2 safe action、NemoTron fallback、OpenClaw memory 寫入與通知 replay 已具測試覆蓋。 - L3 已擴展為 OpenClaw + ElephantAlpha:OpenClaw 負責策略/記憶,ElephantAlpha 負責 orchestration/HITL/AutoHeal bridge。 -- 尚未完成:Prometheus metrics 與完整線上 smoke dashboard。 +- 2026-04-29 已補 `/metrics` 匯出:EventRouter dispatch、L2 safe action、Telegram replay、AutoHeal action 與 latency/duration。 +- 尚未完成:完整線上 smoke dashboard 與 Grafana/Superset 視覺化面板。 ## References - `services/event_router.py` — 分流入口(Phase 1) diff --git a/docs/guides/ai_automation_session_sop.md b/docs/guides/ai_automation_session_sop.md index 4589d93..c189aa3 100644 --- a/docs/guides/ai_automation_session_sop.md +++ b/docs/guides/ai_automation_session_sop.md @@ -24,6 +24,7 @@ - EventRouter 失敗時必須降級到 Hermes rule / L0 template,不可中斷通知。 - Telegram 失敗必須可暫存與 replay。 +- EventRouter / AutoHeal 變更必須更新 `services/ai_automation_metrics.py` 指標或確認既有指標已覆蓋。 - L2 action 必須在 `SAFE_ACTIONS` 且可審計、可回放、低副作用。 - AutoHeal 不得 restart / stop / recreate `momo-db` 或 `momo-postgres`。 - raw `ai_insights` 寫入後必須 enqueue embedding;若 enqueue 失敗,必須可 backfill。 @@ -32,7 +33,7 @@ ## 收尾 checklist -- 相關測試至少覆蓋 EventRouter、AutoHeal、NemoTron fallback、OpenClaw embedding bridge、ElephantAlpha engine、agent_actions。 +- 相關測試至少覆蓋 EventRouter、AutoHeal、NemoTron fallback、OpenClaw embedding bridge、ElephantAlpha engine、agent_actions、AI automation metrics。 - 若有架構決策,新增 ADR 並更新 `docs/adr/README.md`。 - 若有長期實況,更新 `docs/memory/*.md` 與 `docs/memory/README.md`。 - 若 AI 架構事實改變,更新 `docs/AI_INTELLIGENCE_MODULE_SOT.md`。 diff --git a/docs/memory/ai_automation_closure_20260429.md b/docs/memory/ai_automation_closure_20260429.md index 856f6c0..80a3315 100644 --- a/docs/memory/ai_automation_closure_20260429.md +++ b/docs/memory/ai_automation_closure_20260429.md @@ -9,6 +9,7 @@ - AutoHeal 是自癒副作用入口;`momo-db` / `momo-postgres` 受保護,不可自動 restart / stop / recreate。 - OpenClaw learning 是 AI 記憶與 embedding queue 的橋接層;raw `ai_insights` 寫入者必須 enqueue embedding 或可回補。 - ElephantAlpha 只負責 orchestration / HITL / AutoHeal bridge,不可繞過 ADR-011、ADR-012、ADR-013。 +- AI 自動化最小 Prometheus 指標已接入 `/metrics`,來源為 `services/ai_automation_metrics.py`。 ## 已落地範圍 @@ -20,10 +21,12 @@ - Scheduler 重要失敗路徑接入 EventRouter,減少裸 exception 漏通知。 - ElephantAlpha 執行引擎補 sync timeout、HITL reply_markup、未知 step fail fast、code/resource action 走 AutoHeal bridge。 - L2 `agent_actions.py` 的 `flag_for_human_review`、`route_to_km`、`mark_for_relearn` 已從 stub 改為可審計 OpenClaw memory 寫入。 +- `/metrics` 已匯出 EventRouter dispatch、latency、safe action、Telegram replay、AutoHeal action 與 duration 指標。 ## 驗證紀錄 -- 2026-04-29 最後一批:`24 passed`。 +- 2026-04-29 AI metrics 批次:`26 passed`。 +- 2026-04-29 L2 安全記憶批次:`24 passed`。 - collect-only:`48 tests collected`。 - `git diff --check` 已通過。 @@ -38,6 +41,7 @@ - `d486598` 補強 ElephantAlpha 執行與通知閉環 - `5b25f55` 補齊 EventRouter 失敗通知回放 - `162a76b` 落地 L2 安全記憶動作 +- `d58e4d0` 同步四 Agent AI 自動化治理紀錄 ## 下次進場先看 diff --git a/docs/memory/history_logs.md b/docs/memory/history_logs.md index 8780075..f8e8f04 100644 --- a/docs/memory/history_logs.md +++ b/docs/memory/history_logs.md @@ -24,6 +24,7 @@ - **自癒安全回看**: AutoHeal 保護 `momo-db` / `momo-postgres`,DB/DNS 無法安全修復時降級為 alert / wait retry。 - **記憶閉環**: raw `ai_insights` insert 路徑補 embedding enqueue;OpenClaw learning 支援 stale reset 與 missing embedding backfill。 - **L2 action 落地**: `flag_for_human_review`、`route_to_km`、`mark_for_relearn` 改為可審計 OpenClaw memory 寫入。 +- **可觀測性落地**: `/metrics` 匯出 EventRouter dispatch/latency、safe action、Telegram replay、AutoHeal action/duration 指標。 ### 2026-04-28~29:Phase 3e 重構大戰 + daily_sales cache 隱形 bug 根除 - **app.py 縮減 -10.8%**: 7,386 → 6,590 行,11 commits 全綠零 502。 diff --git a/routes/system_public_routes.py b/routes/system_public_routes.py index 294be92..987e2ac 100644 --- a/routes/system_public_routes.py +++ b/routes/system_public_routes.py @@ -50,6 +50,7 @@ def prometheus_metrics(): """Prometheus 指標端點 - 供 Prometheus 抓取監控資料""" try: from prometheus_client import generate_latest, CONTENT_TYPE_LATEST, Gauge, CollectorRegistry + from services.ai_automation_metrics import snapshot as ai_metrics_snapshot registry = CollectorRegistry() @@ -87,6 +88,11 @@ def prometheus_metrics(): except Exception as e: sys_log.warning(f"[Metrics] 無法取得資料庫統計: {e}") + try: + _register_ai_automation_metrics(registry, Gauge, ai_metrics_snapshot()) + except Exception as e: + sys_log.warning(f"[Metrics] 無法取得 AI 自動化指標: {e}") + return Response(generate_latest(registry), mimetype=CONTENT_TYPE_LATEST) except ImportError: @@ -103,6 +109,73 @@ momo_app_info{{version="{SYSTEM_VERSION}",database_type="{DATABASE_TYPE}"}} 1 return Response(f"# Error: {e}\n", mimetype='text/plain; charset=utf-8'), 500 +def _labels_to_dict(labels): + return dict(labels) + + +def _register_ai_automation_metrics(registry, gauge_cls, metrics_snapshot): + """Export dependency-free AI metrics into a per-request Prometheus registry.""" + gauges = {} + + def get_gauge(name, help_text, label_names): + if name not in gauges: + gauges[name] = gauge_cls(name, help_text, label_names, registry=registry) + return gauges[name] + + definitions = { + "event_router_dispatch_total": ( + "momo_ai_event_router_dispatch_total", + "EventRouter dispatch count", + ["event_type", "outcome", "tier"], + ), + "event_router_safe_action_total": ( + "momo_ai_event_router_safe_action_total", + "EventRouter L2 safe action count", + ["action", "status"], + ), + "event_router_replay_total": ( + "momo_ai_event_router_replay_total", + "EventRouter queued Telegram replay count", + ["status"], + ), + "autoheal_action_total": ( + "momo_ai_autoheal_action_total", + "AutoHeal action count", + ["action", "error_type", "result"], + ), + } + + for (metric, labels), value in metrics_snapshot.get("counters", {}).items(): + if metric not in definitions: + continue + name, help_text, label_names = definitions[metric] + gauge = get_gauge(name, help_text, label_names) + label_values = _labels_to_dict(labels) + gauge.labels(**{name: label_values.get(name, "unknown") for name in label_names}).set(value) + + latency_defs = { + "event_router_latency_ms": ( + "momo_ai_event_router_latency_ms", + "EventRouter dispatch latency in milliseconds", + ["event_type", "tier"], + ), + "autoheal_duration_ms": ( + "momo_ai_autoheal_duration_ms", + "AutoHeal action duration in milliseconds", + ["action", "error_type"], + ), + } + + for (metric, labels), values in metrics_snapshot.get("latency", {}).items(): + if metric not in latency_defs: + continue + name, help_text, label_names = latency_defs[metric] + label_values = _labels_to_dict(labels) + for suffix in ("count", "sum", "max"): + gauge = get_gauge(f"{name}_{suffix}", f"{help_text} ({suffix})", label_names) + gauge.labels(**{name: label_values.get(name, "unknown") for name in label_names}).set(values.get(suffix, 0)) + + @system_public_bp.route('/settings') def settings(): """分類設定頁面""" diff --git a/services/ai_automation_metrics.py b/services/ai_automation_metrics.py new file mode 100644 index 0000000..3f0578c --- /dev/null +++ b/services/ai_automation_metrics.py @@ -0,0 +1,113 @@ +"""In-process AI automation metrics for Prometheus export. + +These counters are intentionally lightweight and dependency-free. The `/metrics` +route converts snapshots into Prometheus gauges so EventRouter and AutoHeal do +not need to know about Flask or prometheus_client. +""" + +from __future__ import annotations + +import threading +from collections import defaultdict +from copy import deepcopy +from typing import Any, Dict, Tuple + + +_LOCK = threading.Lock() +_COUNTERS: Dict[Tuple[str, Tuple[Tuple[str, str], ...]], int] = defaultdict(int) +_LATENCY: Dict[Tuple[str, Tuple[Tuple[str, str], ...]], Dict[str, float]] = defaultdict( + lambda: {"count": 0, "sum": 0.0, "max": 0.0} +) + + +def _safe_label(value: Any, default: str = "unknown", limit: int = 80) -> str: + text = str(value if value not in (None, "") else default) + return text[:limit] + + +def _key(metric: str, labels: Dict[str, Any]) -> Tuple[str, Tuple[Tuple[str, str], ...]]: + return metric, tuple(sorted((k, _safe_label(v)) for k, v in labels.items())) + + +def _inc(metric: str, labels: Dict[str, Any], amount: int = 1) -> None: + with _LOCK: + _COUNTERS[_key(metric, labels)] += int(amount) + + +def _observe(metric: str, labels: Dict[str, Any], value: float) -> None: + with _LOCK: + bucket = _LATENCY[_key(metric, labels)] + bucket["count"] += 1 + bucket["sum"] += float(value) + bucket["max"] = max(bucket["max"], float(value)) + + +def record_event_router_dispatch( + *, + tier: str, + event_type: str, + delivered: bool, + queued: bool = False, + deduped: bool = False, + silenced: bool = False, + latency_ms: int | float = 0, +) -> None: + if silenced: + outcome = "silenced" + elif deduped: + outcome = "deduped" + elif queued: + outcome = "queued" + elif delivered: + outcome = "delivered" + else: + outcome = "failed" + + labels = {"tier": tier, "event_type": event_type, "outcome": outcome} + _inc("event_router_dispatch_total", labels) + _observe("event_router_latency_ms", {"tier": tier, "event_type": event_type}, latency_ms) + + +def record_event_router_safe_action(action: str, status: str) -> None: + _inc("event_router_safe_action_total", {"action": action, "status": status}) + + +def record_event_router_replay(*, attempted: int = 0, sent: int = 0, failed: int = 0, dropped: int = 0) -> None: + for status, value in { + "attempted": attempted, + "sent": sent, + "failed": failed, + "dropped": dropped, + }.items(): + if value: + _inc("event_router_replay_total", {"status": status}, int(value)) + + +def record_autoheal_action( + *, + action: str, + error_type: str, + success: bool, + duration_ms: int | float = 0, +) -> None: + labels = { + "action": action, + "error_type": error_type, + "result": "success" if success else "failed", + } + _inc("autoheal_action_total", labels) + _observe("autoheal_duration_ms", {"action": action, "error_type": error_type}, duration_ms) + + +def snapshot() -> Dict[str, Any]: + with _LOCK: + return { + "counters": {key: value for key, value in _COUNTERS.items()}, + "latency": {key: deepcopy(value) for key, value in _LATENCY.items()}, + } + + +def reset_for_tests() -> None: + with _LOCK: + _COUNTERS.clear() + _LATENCY.clear() diff --git a/services/auto_heal_service.py b/services/auto_heal_service.py index 8ed9d2a..96f1bf1 100644 --- a/services/auto_heal_service.py +++ b/services/auto_heal_service.py @@ -25,6 +25,7 @@ from typing import Dict, Any, List, Optional from sqlalchemy import text from services.logger_manager import SystemLogger +from services.ai_automation_metrics import record_autoheal_action from database.manager import get_session logger = SystemLogger("AutoHealService").get_logger() @@ -228,11 +229,13 @@ class AutoHealService: if not playbook: msg = f"No playbook matched for error_type={error_type}" self._log.info("[AutoHeal] %s", msg) + record_autoheal_action(action="NO_PLAYBOOK", error_type=error_type, success=False) return AutoHealResult(success=False, action=None, message=msg) if playbook["action_type"] not in _ALLOWED_ACTION_TYPES: msg = f"action_type '{playbook['action_type']}' is not allowed" self._log.warning("[AutoHeal] %s", msg) + record_autoheal_action(action=playbook["action_type"], error_type=error_type, success=False) return AutoHealResult(success=False, action=playbook["action_type"], message=msg) # cooldown guard @@ -241,6 +244,7 @@ class AutoHealService: if last and (datetime.now().timestamp() - last) / 60 < playbook["cooldown_min"]: msg = f"Cooldown active for {playbook['action_type']}" self._log.info("[AutoHeal] %s", msg) + record_autoheal_action(action=playbook["action_type"], error_type=error_type, success=False) return AutoHealResult(success=False, action=playbook["action_type"], message=msg) if playbook["action_type"] == "CODE_FIX": @@ -248,6 +252,12 @@ class AutoHealService: result = self._handle_code_fix(playbook, context) duration_ms = (datetime.now() - started_at).total_seconds() * 1000 self._alert_and_store(playbook, context, result, duration_ms) + record_autoheal_action( + action=result.action or playbook["action_type"], + error_type=error_type, + success=result.success, + duration_ms=duration_ms, + ) return result # generic action execution @@ -366,6 +376,12 @@ class AutoHealService: duration_ms = (datetime.now() - started_at).total_seconds() * 1000 self._alert_and_store(playbook, context, result, duration_ms) + record_autoheal_action( + action=result.action or action_type, + error_type=str(context.get("error_type") or "unknown"), + success=result.success, + duration_ms=duration_ms, + ) return result def _docker_restart(self, params: Dict[str, Any]) -> AutoHealResult: diff --git a/services/event_router.py b/services/event_router.py index fcf2914..dad20ec 100644 --- a/services/event_router.py +++ b/services/event_router.py @@ -14,6 +14,11 @@ from datetime import datetime from typing import Any, Dict, Optional from services.ai_orchestrator import AIOrchestrator +from services.ai_automation_metrics import ( + record_event_router_dispatch, + record_event_router_replay, + record_event_router_safe_action, +) from services.telegram_templates import send_telegram_with_result, triaged_alert logger = logging.getLogger(__name__) @@ -228,7 +233,9 @@ def replay_failed_deliveries(limit: int = 20, admin_chat_ids: Optional[list] = N except FileNotFoundError: pass - return {"attempted": attempted, "sent": sent, "failed": failed, "dropped": dropped} + result = {"attempted": attempted, "sent": sent, "failed": failed, "dropped": dropped} + record_event_router_replay(**result) + return result def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list[Dict[str, Any]]: @@ -258,13 +265,16 @@ def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list params = step.get("params") or {} if action not in SAFE_ACTIONS: executed.append({"action": action, "status": "rejected", "reason": "not in SAFE_ACTIONS"}) + record_event_router_safe_action(action or "unknown", "rejected") continue try: action_result = SAFE_ACTIONS[action](**params) executed.append({"action": action, "status": "ok", "result": action_result}) + record_event_router_safe_action(action, "ok") except Exception as exc: logger.exception("[EventRouter] safe action failed: %s", action) executed.append({"action": action, "status": "error", "error": str(exc)[:300]}) + record_event_router_safe_action(action or "unknown", "error") return executed @@ -279,7 +289,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) try: if _is_event_silenced(event): - return { + response = { "tier": tier, "sent": 0, "errors": [], @@ -290,9 +300,17 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) "queued": False, "deduped": False, } + record_event_router_dispatch( + tier=tier, + event_type=event.get("event_type", "unknown"), + delivered=True, + silenced=True, + latency_ms=response["latency_ms"], + ) + return response if _is_duplicate_event(event): - return { + response = { "tier": tier, "sent": 0, "errors": [], @@ -303,6 +321,14 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) "queued": False, "deduped": True, } + record_event_router_dispatch( + tier=tier, + event_type=event.get("event_type", "unknown"), + delivered=True, + deduped=True, + latency_ms=response["latency_ms"], + ) + return response result = await _run_tier_handler(tier, event, session_id) executed_actions = _execute_safe_actions(result, event) @@ -319,7 +345,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) replayed = replay_failed_deliveries(limit=_REPLAY_LIMIT, admin_chat_ids=admin_chat_ids) latency_ms = int((time.perf_counter() - started_at) * 1000) - return { + response = { "tier": tier, "sent": send_result["sent"], "errors": send_result["errors"], @@ -331,10 +357,18 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) "deduped": False, "replayed": replayed, } + record_event_router_dispatch( + tier=tier, + event_type=event.get("event_type", "unknown"), + delivered=response["delivered"], + queued=response["queued"], + latency_ms=latency_ms, + ) + return response except Exception as e: logger.exception(f"[EventRouter] dispatch failed: {e}") queued = _queue_failed_delivery(event, tier, None, [str(e)], "dispatch_exception") - return { + response = { "tier": tier, "sent": 0, "errors": [str(e)], @@ -345,6 +379,14 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) "queued": queued, "deduped": False, } + record_event_router_dispatch( + tier=tier, + event_type=event.get("event_type", "unknown"), + delivered=False, + queued=queued, + latency_ms=response["latency_ms"], + ) + return response def _run_coroutine_in_thread(coro) -> Dict[str, Any]: diff --git a/tests/test_ai_automation_metrics.py b/tests/test_ai_automation_metrics.py new file mode 100644 index 0000000..9f5e1c4 --- /dev/null +++ b/tests/test_ai_automation_metrics.py @@ -0,0 +1,63 @@ +def test_ai_automation_metrics_snapshot_records_event_router_and_autoheal(): + from services import ai_automation_metrics as metrics + + metrics.reset_for_tests() + + metrics.record_event_router_dispatch( + tier="L2", + event_type="scheduler_task_failure", + delivered=False, + queued=True, + latency_ms=123, + ) + metrics.record_event_router_safe_action("retry_task", "ok") + metrics.record_event_router_replay(attempted=2, sent=1, failed=1) + metrics.record_autoheal_action( + action="ALERT_ONLY", + error_type="DB_UNREACHABLE", + success=True, + duration_ms=45, + ) + + snap = metrics.snapshot() + counters = { + (metric, dict(labels).get("outcome") or dict(labels).get("status") or dict(labels).get("result")): value + for (metric, labels), value in snap["counters"].items() + } + + assert counters[("event_router_dispatch_total", "queued")] == 1 + assert counters[("event_router_safe_action_total", "ok")] == 1 + assert counters[("event_router_replay_total", "attempted")] == 2 + assert counters[("event_router_replay_total", "sent")] == 1 + assert counters[("autoheal_action_total", "success")] == 1 + + latency = next( + values for (metric, _labels), values in snap["latency"].items() + if metric == "event_router_latency_ms" + ) + assert latency["count"] == 1 + assert latency["sum"] == 123 + assert latency["max"] == 123 + + +def test_system_metrics_exports_ai_automation_metrics(): + from prometheus_client import CollectorRegistry, Gauge, generate_latest + from routes.system_public_routes import _register_ai_automation_metrics + from services import ai_automation_metrics as metrics + + metrics.reset_for_tests() + metrics.record_event_router_dispatch( + tier="L1", + event_type="crawler_timeout", + delivered=True, + latency_ms=7, + ) + + registry = CollectorRegistry() + _register_ai_automation_metrics(registry, Gauge, metrics.snapshot()) + output = generate_latest(registry).decode("utf-8") + + assert "momo_ai_event_router_dispatch_total" in output + assert 'event_type="crawler_timeout"' in output + assert 'outcome="delivered"' in output + assert "momo_ai_event_router_latency_ms_count" in output