This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
|
||||
> 本文件定義專案開發的核心準則與不可違反的規範
|
||||
> **建立日期**: 2026-01-12
|
||||
> **當前版本**: V10.4 (四 AI Agent 自動化治理版)
|
||||
> **當前版本**: V10.5 (四 AI Agent 自動化可觀測性版)
|
||||
> **最後更新**: 2026-04-29
|
||||
|
||||
---
|
||||
|
||||
@@ -7,12 +7,13 @@
|
||||
- ADR-018:四 AI Agent 自動化控制面立案。
|
||||
- Memory:新增 `docs/memory/ai_automation_closure_20260429.md`。
|
||||
- Guide/Skills 替代:新增 `docs/guides/ai_automation_session_sop.md`。
|
||||
- SOT:更新 `docs/AI_INTELLIGENCE_MODULE_SOT.md` 至 V10.4 AI Automation 架構。
|
||||
- SOT:更新 `docs/AI_INTELLIGENCE_MODULE_SOT.md` 至 V10.5 AI Automation Metrics 架構。
|
||||
- Codex 規則:更新 `AGENTS.md`、`CONSTITUTION.md`、ADR/memory 索引。
|
||||
- Prometheus 指標化:新增 EventRouter / AutoHeal / safe action / replay in-process metrics,並接入 `/metrics`。
|
||||
|
||||
【下次待辦】
|
||||
- Prometheus / Superset 指標化 `agent_action_total`、`agent_latency_seconds`、notification replay count。
|
||||
- 補線上 smoke dashboard:EventRouter、AutoHeal、NemoTron fallback、OpenClaw embedding queue、ElephantAlpha HITL。
|
||||
- Superset / Grafana 視覺化:`momo_ai_event_router_dispatch_total`、`momo_ai_event_router_latency_ms_*`、`momo_ai_autoheal_action_total`。
|
||||
|
||||
================================================================================
|
||||
品牌資產最終處理與維護 (Phase 7) [DONE]
|
||||
|
||||
6
app.py
6
app.py
@@ -95,9 +95,9 @@ except Exception as e:
|
||||
sys_log.error(f"無法檢測磁碟空間: {e}")
|
||||
|
||||
# 🚩 系統版本定義 (備份與顯示用)
|
||||
# 🚩 2026-04-29 V10.4: 四 AI Agent 自動化閉環 — EventRouter / AutoHeal /
|
||||
# OpenClaw Memory / ElephantAlpha bridge 文件與安全規則同步
|
||||
SYSTEM_VERSION = "V10.4"
|
||||
# 🚩 2026-04-29 V10.5: AI 自動化可觀測性 — EventRouter / AutoHeal /
|
||||
# safe action / Telegram replay metrics 接入 /metrics
|
||||
SYSTEM_VERSION = "V10.5"
|
||||
|
||||
# ==========================================
|
||||
# 🔒 SQL Injection 防護函數
|
||||
|
||||
@@ -253,7 +253,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
|
||||
# ==========================================
|
||||
# 系統版本與路徑
|
||||
# ==========================================
|
||||
SYSTEM_VERSION = "V10.4"
|
||||
SYSTEM_VERSION = "V10.5"
|
||||
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
|
||||
public_url = PUBLIC_URL # 用於模板顯示
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
# MOMO PRO — AI 競價情報模組 Single Source of Truth
|
||||
|
||||
> **最後更新**: 2026-04-29 (台北時間)
|
||||
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地 — EventRouter / AutoHeal / OpenClaw Memory / ElephantAlpha bridge 具測試覆蓋
|
||||
> **適用版本**: V10.4 AI Automation 架構
|
||||
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地 — EventRouter / AutoHeal / OpenClaw Memory / ElephantAlpha bridge / Prometheus metrics 具測試覆蓋
|
||||
> **適用版本**: V10.5 AI Automation Metrics 架構
|
||||
|
||||
---
|
||||
|
||||
@@ -55,6 +55,14 @@ SQL漏斗(~300筆)
|
||||
- raw `ai_insights` insert 必須接 `enqueue_insight_embedding()` 或可被 backfill。
|
||||
- ElephantAlpha 只做編排與 bridge,不可繞過 ADR-011 / ADR-012 / ADR-013。
|
||||
|
||||
可觀測性:
|
||||
|
||||
- `/metrics` 匯出 `momo_ai_event_router_dispatch_total`。
|
||||
- `/metrics` 匯出 `momo_ai_event_router_latency_ms_count/sum/max`。
|
||||
- `/metrics` 匯出 `momo_ai_event_router_safe_action_total`。
|
||||
- `/metrics` 匯出 `momo_ai_event_router_replay_total`。
|
||||
- `/metrics` 匯出 `momo_ai_autoheal_action_total` 與 `momo_ai_autoheal_duration_ms_count/sum/max`。
|
||||
|
||||
---
|
||||
|
||||
## 二、真實資料庫 Schema(已校對確認)
|
||||
|
||||
@@ -137,13 +137,14 @@ L1 Hermes 掛 → L0 模板直出 + 🟡 「AI 分析暫不可用」
|
||||
- **Phase 2**:Hermes L1 接入 scheduler.run_auto_import_task + run_momo_task 兩個 exception
|
||||
- **Phase 3**:NemoTron 擴充 3 個新 tool (retry/query_km/silence)
|
||||
- **Phase 4**:依需求擴 L3 HITL 按鈕
|
||||
- **Phase 5**:Prometheus metric 接入(`agent_action_total{tier,agent,event_type}`、`agent_latency_seconds`)
|
||||
- **Phase 5**:Prometheus metric 接入(`momo_ai_event_router_dispatch_total`、`momo_ai_event_router_latency_ms_*`)
|
||||
|
||||
## 2026-04-29 Implementation Update
|
||||
|
||||
- Phase 1~3 核心已落地:EventRouter 分流、L2 safe action、NemoTron fallback、OpenClaw memory 寫入與通知 replay 已具測試覆蓋。
|
||||
- L3 已擴展為 OpenClaw + ElephantAlpha:OpenClaw 負責策略/記憶,ElephantAlpha 負責 orchestration/HITL/AutoHeal bridge。
|
||||
- 尚未完成:Prometheus metrics 與完整線上 smoke dashboard。
|
||||
- 2026-04-29 已補 `/metrics` 匯出:EventRouter dispatch、L2 safe action、Telegram replay、AutoHeal action 與 latency/duration。
|
||||
- 尚未完成:完整線上 smoke dashboard 與 Grafana/Superset 視覺化面板。
|
||||
|
||||
## References
|
||||
- `services/event_router.py` — 分流入口(Phase 1)
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
|
||||
- EventRouter 失敗時必須降級到 Hermes rule / L0 template,不可中斷通知。
|
||||
- Telegram 失敗必須可暫存與 replay。
|
||||
- EventRouter / AutoHeal 變更必須更新 `services/ai_automation_metrics.py` 指標或確認既有指標已覆蓋。
|
||||
- L2 action 必須在 `SAFE_ACTIONS` 且可審計、可回放、低副作用。
|
||||
- AutoHeal 不得 restart / stop / recreate `momo-db` 或 `momo-postgres`。
|
||||
- raw `ai_insights` 寫入後必須 enqueue embedding;若 enqueue 失敗,必須可 backfill。
|
||||
@@ -32,7 +33,7 @@
|
||||
|
||||
## 收尾 checklist
|
||||
|
||||
- 相關測試至少覆蓋 EventRouter、AutoHeal、NemoTron fallback、OpenClaw embedding bridge、ElephantAlpha engine、agent_actions。
|
||||
- 相關測試至少覆蓋 EventRouter、AutoHeal、NemoTron fallback、OpenClaw embedding bridge、ElephantAlpha engine、agent_actions、AI automation metrics。
|
||||
- 若有架構決策,新增 ADR 並更新 `docs/adr/README.md`。
|
||||
- 若有長期實況,更新 `docs/memory/*.md` 與 `docs/memory/README.md`。
|
||||
- 若 AI 架構事實改變,更新 `docs/AI_INTELLIGENCE_MODULE_SOT.md`。
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
- AutoHeal 是自癒副作用入口;`momo-db` / `momo-postgres` 受保護,不可自動 restart / stop / recreate。
|
||||
- OpenClaw learning 是 AI 記憶與 embedding queue 的橋接層;raw `ai_insights` 寫入者必須 enqueue embedding 或可回補。
|
||||
- ElephantAlpha 只負責 orchestration / HITL / AutoHeal bridge,不可繞過 ADR-011、ADR-012、ADR-013。
|
||||
- AI 自動化最小 Prometheus 指標已接入 `/metrics`,來源為 `services/ai_automation_metrics.py`。
|
||||
|
||||
## 已落地範圍
|
||||
|
||||
@@ -20,10 +21,12 @@
|
||||
- Scheduler 重要失敗路徑接入 EventRouter,減少裸 exception 漏通知。
|
||||
- ElephantAlpha 執行引擎補 sync timeout、HITL reply_markup、未知 step fail fast、code/resource action 走 AutoHeal bridge。
|
||||
- L2 `agent_actions.py` 的 `flag_for_human_review`、`route_to_km`、`mark_for_relearn` 已從 stub 改為可審計 OpenClaw memory 寫入。
|
||||
- `/metrics` 已匯出 EventRouter dispatch、latency、safe action、Telegram replay、AutoHeal action 與 duration 指標。
|
||||
|
||||
## 驗證紀錄
|
||||
|
||||
- 2026-04-29 最後一批:`24 passed`。
|
||||
- 2026-04-29 AI metrics 批次:`26 passed`。
|
||||
- 2026-04-29 L2 安全記憶批次:`24 passed`。
|
||||
- collect-only:`48 tests collected`。
|
||||
- `git diff --check` 已通過。
|
||||
|
||||
@@ -38,6 +41,7 @@
|
||||
- `d486598` 補強 ElephantAlpha 執行與通知閉環
|
||||
- `5b25f55` 補齊 EventRouter 失敗通知回放
|
||||
- `162a76b` 落地 L2 安全記憶動作
|
||||
- `d58e4d0` 同步四 Agent AI 自動化治理紀錄
|
||||
|
||||
## 下次進場先看
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
- **自癒安全回看**: AutoHeal 保護 `momo-db` / `momo-postgres`,DB/DNS 無法安全修復時降級為 alert / wait retry。
|
||||
- **記憶閉環**: raw `ai_insights` insert 路徑補 embedding enqueue;OpenClaw learning 支援 stale reset 與 missing embedding backfill。
|
||||
- **L2 action 落地**: `flag_for_human_review`、`route_to_km`、`mark_for_relearn` 改為可審計 OpenClaw memory 寫入。
|
||||
- **可觀測性落地**: `/metrics` 匯出 EventRouter dispatch/latency、safe action、Telegram replay、AutoHeal action/duration 指標。
|
||||
|
||||
### 2026-04-28~29:Phase 3e 重構大戰 + daily_sales cache 隱形 bug 根除
|
||||
- **app.py 縮減 -10.8%**: 7,386 → 6,590 行,11 commits 全綠零 502。
|
||||
|
||||
@@ -50,6 +50,7 @@ def prometheus_metrics():
|
||||
"""Prometheus 指標端點 - 供 Prometheus 抓取監控資料"""
|
||||
try:
|
||||
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST, Gauge, CollectorRegistry
|
||||
from services.ai_automation_metrics import snapshot as ai_metrics_snapshot
|
||||
|
||||
registry = CollectorRegistry()
|
||||
|
||||
@@ -87,6 +88,11 @@ def prometheus_metrics():
|
||||
except Exception as e:
|
||||
sys_log.warning(f"[Metrics] 無法取得資料庫統計: {e}")
|
||||
|
||||
try:
|
||||
_register_ai_automation_metrics(registry, Gauge, ai_metrics_snapshot())
|
||||
except Exception as e:
|
||||
sys_log.warning(f"[Metrics] 無法取得 AI 自動化指標: {e}")
|
||||
|
||||
return Response(generate_latest(registry), mimetype=CONTENT_TYPE_LATEST)
|
||||
|
||||
except ImportError:
|
||||
@@ -103,6 +109,73 @@ momo_app_info{{version="{SYSTEM_VERSION}",database_type="{DATABASE_TYPE}"}} 1
|
||||
return Response(f"# Error: {e}\n", mimetype='text/plain; charset=utf-8'), 500
|
||||
|
||||
|
||||
def _labels_to_dict(labels):
|
||||
return dict(labels)
|
||||
|
||||
|
||||
def _register_ai_automation_metrics(registry, gauge_cls, metrics_snapshot):
|
||||
"""Export dependency-free AI metrics into a per-request Prometheus registry."""
|
||||
gauges = {}
|
||||
|
||||
def get_gauge(name, help_text, label_names):
|
||||
if name not in gauges:
|
||||
gauges[name] = gauge_cls(name, help_text, label_names, registry=registry)
|
||||
return gauges[name]
|
||||
|
||||
definitions = {
|
||||
"event_router_dispatch_total": (
|
||||
"momo_ai_event_router_dispatch_total",
|
||||
"EventRouter dispatch count",
|
||||
["event_type", "outcome", "tier"],
|
||||
),
|
||||
"event_router_safe_action_total": (
|
||||
"momo_ai_event_router_safe_action_total",
|
||||
"EventRouter L2 safe action count",
|
||||
["action", "status"],
|
||||
),
|
||||
"event_router_replay_total": (
|
||||
"momo_ai_event_router_replay_total",
|
||||
"EventRouter queued Telegram replay count",
|
||||
["status"],
|
||||
),
|
||||
"autoheal_action_total": (
|
||||
"momo_ai_autoheal_action_total",
|
||||
"AutoHeal action count",
|
||||
["action", "error_type", "result"],
|
||||
),
|
||||
}
|
||||
|
||||
for (metric, labels), value in metrics_snapshot.get("counters", {}).items():
|
||||
if metric not in definitions:
|
||||
continue
|
||||
name, help_text, label_names = definitions[metric]
|
||||
gauge = get_gauge(name, help_text, label_names)
|
||||
label_values = _labels_to_dict(labels)
|
||||
gauge.labels(**{name: label_values.get(name, "unknown") for name in label_names}).set(value)
|
||||
|
||||
latency_defs = {
|
||||
"event_router_latency_ms": (
|
||||
"momo_ai_event_router_latency_ms",
|
||||
"EventRouter dispatch latency in milliseconds",
|
||||
["event_type", "tier"],
|
||||
),
|
||||
"autoheal_duration_ms": (
|
||||
"momo_ai_autoheal_duration_ms",
|
||||
"AutoHeal action duration in milliseconds",
|
||||
["action", "error_type"],
|
||||
),
|
||||
}
|
||||
|
||||
for (metric, labels), values in metrics_snapshot.get("latency", {}).items():
|
||||
if metric not in latency_defs:
|
||||
continue
|
||||
name, help_text, label_names = latency_defs[metric]
|
||||
label_values = _labels_to_dict(labels)
|
||||
for suffix in ("count", "sum", "max"):
|
||||
gauge = get_gauge(f"{name}_{suffix}", f"{help_text} ({suffix})", label_names)
|
||||
gauge.labels(**{name: label_values.get(name, "unknown") for name in label_names}).set(values.get(suffix, 0))
|
||||
|
||||
|
||||
@system_public_bp.route('/settings')
|
||||
def settings():
|
||||
"""分類設定頁面"""
|
||||
|
||||
113
services/ai_automation_metrics.py
Normal file
113
services/ai_automation_metrics.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""In-process AI automation metrics for Prometheus export.
|
||||
|
||||
These counters are intentionally lightweight and dependency-free. The `/metrics`
|
||||
route converts snapshots into Prometheus gauges so EventRouter and AutoHeal do
|
||||
not need to know about Flask or prometheus_client.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from collections import defaultdict
|
||||
from copy import deepcopy
|
||||
from typing import Any, Dict, Tuple
|
||||
|
||||
|
||||
_LOCK = threading.Lock()
|
||||
_COUNTERS: Dict[Tuple[str, Tuple[Tuple[str, str], ...]], int] = defaultdict(int)
|
||||
_LATENCY: Dict[Tuple[str, Tuple[Tuple[str, str], ...]], Dict[str, float]] = defaultdict(
|
||||
lambda: {"count": 0, "sum": 0.0, "max": 0.0}
|
||||
)
|
||||
|
||||
|
||||
def _safe_label(value: Any, default: str = "unknown", limit: int = 80) -> str:
|
||||
text = str(value if value not in (None, "") else default)
|
||||
return text[:limit]
|
||||
|
||||
|
||||
def _key(metric: str, labels: Dict[str, Any]) -> Tuple[str, Tuple[Tuple[str, str], ...]]:
|
||||
return metric, tuple(sorted((k, _safe_label(v)) for k, v in labels.items()))
|
||||
|
||||
|
||||
def _inc(metric: str, labels: Dict[str, Any], amount: int = 1) -> None:
|
||||
with _LOCK:
|
||||
_COUNTERS[_key(metric, labels)] += int(amount)
|
||||
|
||||
|
||||
def _observe(metric: str, labels: Dict[str, Any], value: float) -> None:
|
||||
with _LOCK:
|
||||
bucket = _LATENCY[_key(metric, labels)]
|
||||
bucket["count"] += 1
|
||||
bucket["sum"] += float(value)
|
||||
bucket["max"] = max(bucket["max"], float(value))
|
||||
|
||||
|
||||
def record_event_router_dispatch(
|
||||
*,
|
||||
tier: str,
|
||||
event_type: str,
|
||||
delivered: bool,
|
||||
queued: bool = False,
|
||||
deduped: bool = False,
|
||||
silenced: bool = False,
|
||||
latency_ms: int | float = 0,
|
||||
) -> None:
|
||||
if silenced:
|
||||
outcome = "silenced"
|
||||
elif deduped:
|
||||
outcome = "deduped"
|
||||
elif queued:
|
||||
outcome = "queued"
|
||||
elif delivered:
|
||||
outcome = "delivered"
|
||||
else:
|
||||
outcome = "failed"
|
||||
|
||||
labels = {"tier": tier, "event_type": event_type, "outcome": outcome}
|
||||
_inc("event_router_dispatch_total", labels)
|
||||
_observe("event_router_latency_ms", {"tier": tier, "event_type": event_type}, latency_ms)
|
||||
|
||||
|
||||
def record_event_router_safe_action(action: str, status: str) -> None:
|
||||
_inc("event_router_safe_action_total", {"action": action, "status": status})
|
||||
|
||||
|
||||
def record_event_router_replay(*, attempted: int = 0, sent: int = 0, failed: int = 0, dropped: int = 0) -> None:
|
||||
for status, value in {
|
||||
"attempted": attempted,
|
||||
"sent": sent,
|
||||
"failed": failed,
|
||||
"dropped": dropped,
|
||||
}.items():
|
||||
if value:
|
||||
_inc("event_router_replay_total", {"status": status}, int(value))
|
||||
|
||||
|
||||
def record_autoheal_action(
|
||||
*,
|
||||
action: str,
|
||||
error_type: str,
|
||||
success: bool,
|
||||
duration_ms: int | float = 0,
|
||||
) -> None:
|
||||
labels = {
|
||||
"action": action,
|
||||
"error_type": error_type,
|
||||
"result": "success" if success else "failed",
|
||||
}
|
||||
_inc("autoheal_action_total", labels)
|
||||
_observe("autoheal_duration_ms", {"action": action, "error_type": error_type}, duration_ms)
|
||||
|
||||
|
||||
def snapshot() -> Dict[str, Any]:
|
||||
with _LOCK:
|
||||
return {
|
||||
"counters": {key: value for key, value in _COUNTERS.items()},
|
||||
"latency": {key: deepcopy(value) for key, value in _LATENCY.items()},
|
||||
}
|
||||
|
||||
|
||||
def reset_for_tests() -> None:
|
||||
with _LOCK:
|
||||
_COUNTERS.clear()
|
||||
_LATENCY.clear()
|
||||
@@ -25,6 +25,7 @@ from typing import Dict, Any, List, Optional
|
||||
from sqlalchemy import text
|
||||
|
||||
from services.logger_manager import SystemLogger
|
||||
from services.ai_automation_metrics import record_autoheal_action
|
||||
from database.manager import get_session
|
||||
|
||||
logger = SystemLogger("AutoHealService").get_logger()
|
||||
@@ -228,11 +229,13 @@ class AutoHealService:
|
||||
if not playbook:
|
||||
msg = f"No playbook matched for error_type={error_type}"
|
||||
self._log.info("[AutoHeal] %s", msg)
|
||||
record_autoheal_action(action="NO_PLAYBOOK", error_type=error_type, success=False)
|
||||
return AutoHealResult(success=False, action=None, message=msg)
|
||||
|
||||
if playbook["action_type"] not in _ALLOWED_ACTION_TYPES:
|
||||
msg = f"action_type '{playbook['action_type']}' is not allowed"
|
||||
self._log.warning("[AutoHeal] %s", msg)
|
||||
record_autoheal_action(action=playbook["action_type"], error_type=error_type, success=False)
|
||||
return AutoHealResult(success=False, action=playbook["action_type"], message=msg)
|
||||
|
||||
# cooldown guard
|
||||
@@ -241,6 +244,7 @@ class AutoHealService:
|
||||
if last and (datetime.now().timestamp() - last) / 60 < playbook["cooldown_min"]:
|
||||
msg = f"Cooldown active for {playbook['action_type']}"
|
||||
self._log.info("[AutoHeal] %s", msg)
|
||||
record_autoheal_action(action=playbook["action_type"], error_type=error_type, success=False)
|
||||
return AutoHealResult(success=False, action=playbook["action_type"], message=msg)
|
||||
|
||||
if playbook["action_type"] == "CODE_FIX":
|
||||
@@ -248,6 +252,12 @@ class AutoHealService:
|
||||
result = self._handle_code_fix(playbook, context)
|
||||
duration_ms = (datetime.now() - started_at).total_seconds() * 1000
|
||||
self._alert_and_store(playbook, context, result, duration_ms)
|
||||
record_autoheal_action(
|
||||
action=result.action or playbook["action_type"],
|
||||
error_type=error_type,
|
||||
success=result.success,
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
return result
|
||||
|
||||
# generic action execution
|
||||
@@ -366,6 +376,12 @@ class AutoHealService:
|
||||
|
||||
duration_ms = (datetime.now() - started_at).total_seconds() * 1000
|
||||
self._alert_and_store(playbook, context, result, duration_ms)
|
||||
record_autoheal_action(
|
||||
action=result.action or action_type,
|
||||
error_type=str(context.get("error_type") or "unknown"),
|
||||
success=result.success,
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
return result
|
||||
|
||||
def _docker_restart(self, params: Dict[str, Any]) -> AutoHealResult:
|
||||
|
||||
@@ -14,6 +14,11 @@ from datetime import datetime
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from services.ai_orchestrator import AIOrchestrator
|
||||
from services.ai_automation_metrics import (
|
||||
record_event_router_dispatch,
|
||||
record_event_router_replay,
|
||||
record_event_router_safe_action,
|
||||
)
|
||||
from services.telegram_templates import send_telegram_with_result, triaged_alert
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -228,7 +233,9 @@ def replay_failed_deliveries(limit: int = 20, admin_chat_ids: Optional[list] = N
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
return {"attempted": attempted, "sent": sent, "failed": failed, "dropped": dropped}
|
||||
result = {"attempted": attempted, "sent": sent, "failed": failed, "dropped": dropped}
|
||||
record_event_router_replay(**result)
|
||||
return result
|
||||
|
||||
|
||||
def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list[Dict[str, Any]]:
|
||||
@@ -258,13 +265,16 @@ def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list
|
||||
params = step.get("params") or {}
|
||||
if action not in SAFE_ACTIONS:
|
||||
executed.append({"action": action, "status": "rejected", "reason": "not in SAFE_ACTIONS"})
|
||||
record_event_router_safe_action(action or "unknown", "rejected")
|
||||
continue
|
||||
try:
|
||||
action_result = SAFE_ACTIONS[action](**params)
|
||||
executed.append({"action": action, "status": "ok", "result": action_result})
|
||||
record_event_router_safe_action(action, "ok")
|
||||
except Exception as exc:
|
||||
logger.exception("[EventRouter] safe action failed: %s", action)
|
||||
executed.append({"action": action, "status": "error", "error": str(exc)[:300]})
|
||||
record_event_router_safe_action(action or "unknown", "error")
|
||||
return executed
|
||||
|
||||
|
||||
@@ -279,7 +289,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
|
||||
|
||||
try:
|
||||
if _is_event_silenced(event):
|
||||
return {
|
||||
response = {
|
||||
"tier": tier,
|
||||
"sent": 0,
|
||||
"errors": [],
|
||||
@@ -290,9 +300,17 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
|
||||
"queued": False,
|
||||
"deduped": False,
|
||||
}
|
||||
record_event_router_dispatch(
|
||||
tier=tier,
|
||||
event_type=event.get("event_type", "unknown"),
|
||||
delivered=True,
|
||||
silenced=True,
|
||||
latency_ms=response["latency_ms"],
|
||||
)
|
||||
return response
|
||||
|
||||
if _is_duplicate_event(event):
|
||||
return {
|
||||
response = {
|
||||
"tier": tier,
|
||||
"sent": 0,
|
||||
"errors": [],
|
||||
@@ -303,6 +321,14 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
|
||||
"queued": False,
|
||||
"deduped": True,
|
||||
}
|
||||
record_event_router_dispatch(
|
||||
tier=tier,
|
||||
event_type=event.get("event_type", "unknown"),
|
||||
delivered=True,
|
||||
deduped=True,
|
||||
latency_ms=response["latency_ms"],
|
||||
)
|
||||
return response
|
||||
|
||||
result = await _run_tier_handler(tier, event, session_id)
|
||||
executed_actions = _execute_safe_actions(result, event)
|
||||
@@ -319,7 +345,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
|
||||
replayed = replay_failed_deliveries(limit=_REPLAY_LIMIT, admin_chat_ids=admin_chat_ids)
|
||||
latency_ms = int((time.perf_counter() - started_at) * 1000)
|
||||
|
||||
return {
|
||||
response = {
|
||||
"tier": tier,
|
||||
"sent": send_result["sent"],
|
||||
"errors": send_result["errors"],
|
||||
@@ -331,10 +357,18 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
|
||||
"deduped": False,
|
||||
"replayed": replayed,
|
||||
}
|
||||
record_event_router_dispatch(
|
||||
tier=tier,
|
||||
event_type=event.get("event_type", "unknown"),
|
||||
delivered=response["delivered"],
|
||||
queued=response["queued"],
|
||||
latency_ms=latency_ms,
|
||||
)
|
||||
return response
|
||||
except Exception as e:
|
||||
logger.exception(f"[EventRouter] dispatch failed: {e}")
|
||||
queued = _queue_failed_delivery(event, tier, None, [str(e)], "dispatch_exception")
|
||||
return {
|
||||
response = {
|
||||
"tier": tier,
|
||||
"sent": 0,
|
||||
"errors": [str(e)],
|
||||
@@ -345,6 +379,14 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
|
||||
"queued": queued,
|
||||
"deduped": False,
|
||||
}
|
||||
record_event_router_dispatch(
|
||||
tier=tier,
|
||||
event_type=event.get("event_type", "unknown"),
|
||||
delivered=False,
|
||||
queued=queued,
|
||||
latency_ms=response["latency_ms"],
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
def _run_coroutine_in_thread(coro) -> Dict[str, Any]:
|
||||
|
||||
63
tests/test_ai_automation_metrics.py
Normal file
63
tests/test_ai_automation_metrics.py
Normal file
@@ -0,0 +1,63 @@
|
||||
def test_ai_automation_metrics_snapshot_records_event_router_and_autoheal():
|
||||
from services import ai_automation_metrics as metrics
|
||||
|
||||
metrics.reset_for_tests()
|
||||
|
||||
metrics.record_event_router_dispatch(
|
||||
tier="L2",
|
||||
event_type="scheduler_task_failure",
|
||||
delivered=False,
|
||||
queued=True,
|
||||
latency_ms=123,
|
||||
)
|
||||
metrics.record_event_router_safe_action("retry_task", "ok")
|
||||
metrics.record_event_router_replay(attempted=2, sent=1, failed=1)
|
||||
metrics.record_autoheal_action(
|
||||
action="ALERT_ONLY",
|
||||
error_type="DB_UNREACHABLE",
|
||||
success=True,
|
||||
duration_ms=45,
|
||||
)
|
||||
|
||||
snap = metrics.snapshot()
|
||||
counters = {
|
||||
(metric, dict(labels).get("outcome") or dict(labels).get("status") or dict(labels).get("result")): value
|
||||
for (metric, labels), value in snap["counters"].items()
|
||||
}
|
||||
|
||||
assert counters[("event_router_dispatch_total", "queued")] == 1
|
||||
assert counters[("event_router_safe_action_total", "ok")] == 1
|
||||
assert counters[("event_router_replay_total", "attempted")] == 2
|
||||
assert counters[("event_router_replay_total", "sent")] == 1
|
||||
assert counters[("autoheal_action_total", "success")] == 1
|
||||
|
||||
latency = next(
|
||||
values for (metric, _labels), values in snap["latency"].items()
|
||||
if metric == "event_router_latency_ms"
|
||||
)
|
||||
assert latency["count"] == 1
|
||||
assert latency["sum"] == 123
|
||||
assert latency["max"] == 123
|
||||
|
||||
|
||||
def test_system_metrics_exports_ai_automation_metrics():
|
||||
from prometheus_client import CollectorRegistry, Gauge, generate_latest
|
||||
from routes.system_public_routes import _register_ai_automation_metrics
|
||||
from services import ai_automation_metrics as metrics
|
||||
|
||||
metrics.reset_for_tests()
|
||||
metrics.record_event_router_dispatch(
|
||||
tier="L1",
|
||||
event_type="crawler_timeout",
|
||||
delivered=True,
|
||||
latency_ms=7,
|
||||
)
|
||||
|
||||
registry = CollectorRegistry()
|
||||
_register_ai_automation_metrics(registry, Gauge, metrics.snapshot())
|
||||
output = generate_latest(registry).decode("utf-8")
|
||||
|
||||
assert "momo_ai_event_router_dispatch_total" in output
|
||||
assert 'event_type="crawler_timeout"' in output
|
||||
assert 'outcome="delivered"' in output
|
||||
assert "momo_ai_event_router_latency_ms_count" in output
|
||||
Reference in New Issue
Block a user