# services/event_router.py
#
# ADR-012 §③: 單一入口 dispatch(event) — L0/L1/L2 分流
# W2-C: L2 優先走 Elephant Alpha Orchestrator;EA 不可用時 fallback AIOrchestrator
#
import asyncio
import json
import logging
import os
import threading
import traceback
import time
from datetime import datetime
from html import escape
from typing import Any, Dict, Optional
from services.ai_orchestrator import AIOrchestrator
from services.ai_automation_metrics import (
record_event_router_dispatch,
record_event_router_replay,
record_event_router_safe_action,
)
from services.telegram_templates import send_telegram_with_result, triaged_alert
logger = logging.getLogger(__name__)
_QUEUE_PATH = os.getenv(
"MOMO_EVENT_ROUTER_QUEUE",
os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "event_router_failed_deliveries.jsonl"),
)
_QUEUE_LOCK = threading.Lock()
_DEDUP_LOCK = threading.Lock()
_EVENT_DEDUP: Dict[str, float] = {}
_DEFAULT_DEDUP_SEC = int(os.getenv("MOMO_EVENT_ROUTER_DEFAULT_DEDUP_SEC", "0"))
_REPLAY_ON_SUCCESS = os.getenv("MOMO_EVENT_ROUTER_REPLAY_ON_SUCCESS", "true").lower() == "true"
_REPLAY_LIMIT = int(os.getenv("MOMO_EVENT_ROUTER_REPLAY_LIMIT", "3"))
_REPLAY_TEXT_LIMIT = 700
_REPLAY_TRACE_LIMIT = 1200
async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""L1: semantic translation + reason analysis (Hermes)."""
orchestrator = AIOrchestrator()
return await orchestrator.handle_l1(event, session_id)
async def _handle_l2(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""
L2: W2-C — EA Orchestrator 優先(動態路由 256K ctx);
EA 不可用(API key 未設或連線失敗)時 fallback AIOrchestrator。
ADR-012: audit trail 由 EA._log_decision + triaged_alert 雙寫保證。
"""
try:
from services.elephant_service import elephant_service
from services.elephant_alpha_orchestrator import elephant_orchestrator
# 護欄:EA API key 未設定則直接 fallback,不嘗試連線
if not elephant_service.api_key:
raise RuntimeError("NVIDIA_API_KEY not configured, using fallback")
# 護欄:連線快取確認(W3-A cache 300s,不會每次都 ping)
if not elephant_service.check_connection():
raise RuntimeError("EA connection unavailable, using fallback")
decision = await elephant_orchestrator.analyze_and_coordinate({
"event": event,
"tier": "L2",
"session_id": session_id,
"urgency": "high",
"complexity": "medium",
"task_type": event.get("event_type", "general_analysis"),
})
return {
"source": "elephant_alpha",
"priority": decision.priority,
"confidence": decision.confidence,
"execution_plan": decision.execution_plan,
"agents_required": decision.agents_required,
"reasoning": decision.reasoning,
}
except Exception as e:
logger.warning(f"[EventRouter] EA L2 failed ({e}), fallback → AIOrchestrator")
orchestrator = AIOrchestrator()
return await orchestrator.handle_l2(event, session_id)
async def _handle_l0(event: Dict[str, Any]) -> Dict[str, Any]:
"""L0: return raw event (compatibility/monitoring)."""
return {"status": "ok", "echo": event.get("event_type")}
async def _run_tier_handler(tier: str, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""Run AI tier with L0-safe degradation; handler failure must not break alerts."""
try:
if tier == "L0":
return await _handle_l0(event)
if tier == "L1":
return await _handle_l1(event, session_id)
if tier == "L2":
return await _handle_l2(event, session_id)
return await _handle_l0(event)
except Exception as exc:
logger.exception("[EventRouter] %s handler failed, degraded to template alert: %s", tier, exc)
return {
"status": "degraded",
"summary": "AI 分析暫不可用,已降級為模板告警;原始事件仍保留。",
"cause": f"{type(exc).__name__}: {str(exc)[:200]}",
"suggested_actions": ["先依原始事實排查", "若重複發生,查看 event_router_failed_deliveries.jsonl 與服務 logs"],
"handler_error": str(exc)[:500],
}
def _decision_envelope_from_event(event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Return an attached decision envelope from top-level or payload."""
envelope = event.get("decision_envelope") or event.get("decision")
if isinstance(envelope, dict) and envelope:
return envelope
payload = event.get("payload") if isinstance(event.get("payload"), dict) else {}
envelope = payload.get("decision_envelope") or payload.get("decision")
if isinstance(envelope, dict) and envelope:
return envelope
return None
def _should_render_decision_direct(event: Dict[str, Any]) -> bool:
"""Evidence-ready decision events should not be re-summarized by L1/L2 AI."""
return _decision_envelope_from_event(event) is not None
def _direct_decision_result(event: Dict[str, Any]) -> Dict[str, Any]:
envelope = _decision_envelope_from_event(event) or {}
return {
"status": "decision_envelope_direct",
"summary": "已收到完整決策信封,略過 AI 重新摘要,直接以證據模板通知。",
"decision_envelope": envelope,
}
def _event_key(event: Dict[str, Any]) -> str:
return f"{event.get('source', 'unknown')}:{event.get('event_type', 'unknown')}"
def _is_event_silenced(event: Dict[str, Any]) -> bool:
try:
from services.agent_actions import is_silenced
key = _event_key(event)
return is_silenced(key) or is_silenced(str(event.get("event_type", "")))
except Exception as exc:
logger.warning("[EventRouter] silence check failed: %s", exc)
return False
def _dedup_ttl_sec(event: Dict[str, Any]) -> int:
payload = event.get("payload") if isinstance(event.get("payload"), dict) else {}
raw = event.get("dedup_ttl_sec", payload.get("dedup_ttl_sec", _DEFAULT_DEDUP_SEC))
try:
return max(0, int(raw or 0))
except (TypeError, ValueError):
return 0
def _is_duplicate_event(event: Dict[str, Any]) -> bool:
ttl = _dedup_ttl_sec(event)
if ttl <= 0:
return False
key = _event_key(event)
now = time.time()
with _DEDUP_LOCK:
until = _EVENT_DEDUP.get(key)
if until and until > now:
return True
_EVENT_DEDUP[key] = now + ttl
return False
def _queue_failed_delivery(
event: Dict[str, Any],
tier: str,
message: Optional[str],
errors: list,
reason: str,
) -> bool:
"""Append failed notification delivery to a local JSONL queue for later replay."""
record = {
"ts": datetime.now().isoformat(),
"reason": reason,
"tier": tier,
"event_key": _event_key(event),
"event": event,
"message": message,
"errors": errors,
}
try:
os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True)
with _QUEUE_LOCK:
with open(_QUEUE_PATH, "a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=False, default=str) + "\n")
return True
except Exception as exc:
logger.error("[EventRouter] failed to queue delivery fallback: %s", exc)
return False
def _queued_record_message(record: Dict[str, Any]) -> str:
event = record.get("event") if isinstance(record.get("event"), dict) else {}
title = event.get("title") or record.get("event_key") or "EventRouter queued event"
summary = event.get("summary") or event.get("status") or record.get("reason") or "queued delivery replay"
trace = event.get("trace") or event.get("traceback") or event.get("error_traceback")
errors = record.get("errors") if isinstance(record.get("errors"), list) else []
lines = [
"♻️ EventRouter Queue Replay",
"━━━━━━━━━━━━━━━━━━━━",
f"📌 {escape(_clip_text(title, 120))}",
f"🔖 {escape(_clip_text(record.get('event_key', 'unknown'), 120))}",
f"🕒 queued_at={escape(_clip_text(record.get('ts', 'unknown'), 80))}",
"",
f"🔍 概要:{escape(_clip_text(summary, _REPLAY_TEXT_LIMIT))}",
]
if errors:
lines.extend([
"",
f"⚠️ 上次錯誤:{escape(_clip_text('; '.join(str(item) for item in errors), 300))}",
])
if trace:
lines.extend([
"",
"
" + escape(_clip_text(trace, _REPLAY_TRACE_LIMIT)) + "", ]) lines.extend([ "", "💡 處置:此為失敗佇列安全回放;原始長訊息已濃縮,避免 Telegram HTML 400 重複卡住。", ]) return "\n".join(lines) def _clip_text(value: Any, limit: int) -> str: text = str(value or "").strip() if limit <= 0 or len(text) <= limit: return text return text[: max(0, limit - 1)].rstrip() + "…" def replay_failed_deliveries(limit: int = 20, admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: """Replay queued Telegram deliveries and keep failures in the JSONL queue.""" if limit <= 0 or not os.path.exists(_QUEUE_PATH): return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0} with _QUEUE_LOCK: try: with open(_QUEUE_PATH, "r", encoding="utf-8") as fh: lines = fh.readlines() except FileNotFoundError: return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0} remaining = [] attempted = sent = failed = dropped = 0 for idx, line in enumerate(lines): if idx >= limit: remaining.append(line) continue try: record = json.loads(line) except json.JSONDecodeError: dropped += 1 continue attempted += 1 result = send_telegram_with_result(_queued_record_message(record), chat_ids=admin_chat_ids) if result["ok"]: sent += 1 else: failed += 1 record["last_replay_error"] = result.get("errors", []) record["last_replay_at"] = datetime.now().isoformat() remaining.append(json.dumps(record, ensure_ascii=False, default=str) + "\n") if remaining: os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True) with open(_QUEUE_PATH, "w", encoding="utf-8") as fh: fh.writelines(remaining) else: try: os.remove(_QUEUE_PATH) except FileNotFoundError: pass result = {"attempted": attempted, "sent": sent, "failed": failed, "dropped": dropped} record_event_router_replay(**result) return result def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list[Dict[str, Any]]: """ Execute only ADR-012 L2 SAFE_ACTIONS when NemoTron explicitly marks the plan auto-safe. All action functions own their own audit writes. """ if not isinstance(result, dict): return [] if not (result.get("auto_execute") or result.get("dispatch_to") in {"safe_action", "auto_execute"}): return [] action_plan = result.get("action_plan") or result.get("execution_plan") or [] if not isinstance(action_plan, list): return [] try: from services.agent_actions import SAFE_ACTIONS except Exception as exc: return [{"action": "load_safe_actions", "status": "error", "error": str(exc)[:200]}] executed = [] for step in action_plan[:3]: if not isinstance(step, dict): continue action = step.get("action") params = step.get("params") or {} if action not in SAFE_ACTIONS: executed.append({"action": action, "status": "rejected", "reason": "not in SAFE_ACTIONS"}) record_event_router_safe_action(action or "unknown", "rejected") continue try: action_result = SAFE_ACTIONS[action](**params) executed.append({"action": action, "status": "ok", "result": action_result}) record_event_router_safe_action(action, "ok") except Exception as exc: logger.exception("[EventRouter] safe action failed: %s", action) executed.append({"action": action, "status": "error", "error": str(exc)[:300]}) record_event_router_safe_action(action or "unknown", "error") return executed async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: """ Main event routing entry (ADR-012 §③ — 唯一入口). Output format compatible with routes/bot_api_routes. """ tier = _classify(event) session_id = f"evt:{event.get('event_type')}:{event.get('source', 'unknown')}" started_at = time.perf_counter() try: if _is_event_silenced(event): response = { "tier": tier, "sent": 0, "errors": [], "latency_ms": int((time.perf_counter() - started_at) * 1000), "payload": {"status": "silenced", "event_key": _event_key(event)}, "delivered": True, "silenced": True, "queued": False, "deduped": False, } record_event_router_dispatch( tier=tier, event_type=event.get("event_type", "unknown"), delivered=True, silenced=True, latency_ms=response["latency_ms"], ) return response if _is_duplicate_event(event): response = { "tier": tier, "sent": 0, "errors": [], "latency_ms": int((time.perf_counter() - started_at) * 1000), "payload": {"status": "deduped", "event_key": _event_key(event)}, "delivered": True, "silenced": False, "queued": False, "deduped": True, } record_event_router_dispatch( tier=tier, event_type=event.get("event_type", "unknown"), delivered=True, deduped=True, latency_ms=response["latency_ms"], ) return response if _should_render_decision_direct(event): result = _direct_decision_result(event) else: result = await _run_tier_handler(tier, event, session_id) executed_actions = _execute_safe_actions(result, event) if executed_actions: result["executed_actions"] = executed_actions message, reply_markup = _build_telegram_message(event, tier, result) send_result = send_telegram_with_result(message, chat_ids=admin_chat_ids, reply_markup=reply_markup) queued = False replayed = {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0} if not send_result["ok"]: queued = _queue_failed_delivery(event, tier, message, send_result["errors"], "telegram_delivery_failed") elif _REPLAY_ON_SUCCESS: replayed = replay_failed_deliveries(limit=_REPLAY_LIMIT, admin_chat_ids=admin_chat_ids) latency_ms = int((time.perf_counter() - started_at) * 1000) response = { "tier": tier, "sent": send_result["sent"], "errors": send_result["errors"], "latency_ms": latency_ms, "payload": result, "delivered": send_result["ok"], "silenced": False, "queued": queued, "deduped": False, "replayed": replayed, } record_event_router_dispatch( tier=tier, event_type=event.get("event_type", "unknown"), delivered=response["delivered"], queued=response["queued"], latency_ms=latency_ms, ) return response except Exception as e: logger.exception(f"[EventRouter] dispatch failed: {e}") queued = _queue_failed_delivery(event, tier, None, [str(e)], "dispatch_exception") response = { "tier": tier, "sent": 0, "errors": [str(e)], "latency_ms": int((time.perf_counter() - started_at) * 1000), "payload": None, "delivered": False, "silenced": False, "queued": queued, "deduped": False, } record_event_router_dispatch( tier=tier, event_type=event.get("event_type", "unknown"), delivered=False, queued=queued, latency_ms=response["latency_ms"], ) return response def _run_coroutine_in_thread(coro) -> Dict[str, Any]: result = {} def runner(): try: result["value"] = asyncio.run(coro) except Exception as e: result["value"] = { "tier": "unknown", "sent": 0, "errors": [str(e)], "latency_ms": 0, "payload": None, "delivered": False, } thread = threading.Thread(target=runner, daemon=True) thread.start() thread.join(timeout=15) if thread.is_alive(): return { "tier": "unknown", "sent": 0, "errors": ["dispatch_sync timed out"], "latency_ms": 15000, "payload": None, "delivered": False, } return result["value"] def dispatch_sync(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: """同步環境使用的 EventRouter 入口。""" try: asyncio.get_running_loop() except RuntimeError: return asyncio.run(dispatch(event, admin_chat_ids=admin_chat_ids)) return _run_coroutine_in_thread(dispatch(event, admin_chat_ids=admin_chat_ids)) def notify_failure( task_name: str, error: Exception, *, source: Optional[str] = None, event_type: str = "scheduler_task_failure", priority: str = "P2", title: Optional[str] = None, trace: Optional[str] = None, payload: Optional[Dict[str, Any]] = None, dedup_ttl_sec: Optional[int] = None, ) -> Dict[str, Any]: """排程/背景任務失敗的同步通知 helper。""" severity = "alert" if priority in {"P1", "P2"} else "warning" event = { "source": source or f"Scheduler.{task_name}", "event_type": event_type, "severity": severity, "title": title or f"{task_name} 任務異常", "status": "任務失敗", "impact": f"{priority} - 背景任務需要檢查", "summary": str(error)[:200], "trace": trace or "".join(traceback.format_exception(type(error), error, error.__traceback__)), "payload": {"task_name": task_name, **(payload or {})}, } if dedup_ttl_sec is not None: event["dedup_ttl_sec"] = dedup_ttl_sec return dispatch_sync(event) def _classify(event: Dict[str, Any]) -> str: sev = event.get("severity", "info") has_trace = bool(event.get("trace")) event_type = event.get("event_type", "") if sev in ("info", "success"): return "L0" if sev == "warning": return "L1" if has_trace else "L0" if sev == "alert": if event_type in {"price_threat", "db_connection_error", "crawler_timeout", "nim_quota_exhausted", "embedding_failure", "scheduler_task_failure"}: return "L2" return "L1" return "L0" def _build_telegram_message(event: Dict[str, Any], tier: str, result: Optional[Dict[str, Any]]) -> tuple[str, Optional[Dict[str, Any]]]: decision_envelope = _decision_envelope_from_event(event) if decision_envelope: render_event = dict(event) render_event["decision_envelope"] = decision_envelope render_event["event_type"] = decision_envelope.get("decision_type") or event.get("event_type", "decision") if not render_event.get("id") and decision_envelope.get("decision_id"): render_event["id"] = decision_envelope.get("decision_id") subject = decision_envelope.get("subject") if isinstance(decision_envelope.get("subject"), dict) else {} sku = str(subject.get("sku") or "").strip() name = str(subject.get("name") or "").strip() if sku or name: render_event["summary"] = " · ".join(part for part in (f"SKU {sku}" if sku else "", name) if part) raw_source_agent = str(decision_envelope.get("source_agent") or event.get("source") or "Decision") source_agent = { "nemotron": "NemoTron", "openclaw": "OpenClaw", "elephant_alpha": "Elephant Alpha", "hermes": "Hermes", }.get(raw_source_agent.lower(), raw_source_agent.replace("_", " ").title()) severity = str(decision_envelope.get("severity") or tier) ai_summary = "" if isinstance(result, dict): ai_summary = str(result.get("summary") or "") if not ai_summary: ai_summary = str(decision_envelope.get("analysis") or "依決策信封進行人工覆核。") return triaged_alert( render_event, tier_label=f"{source_agent} · {severity}", ai_summary=ai_summary, ) if tier == "L0": title = event.get("title") or event.get("event_type", "system_event") summary = event.get("summary") or event.get("status") or "系統事件" body = event.get("impact") or event.get("source") or "" message = ( f"ℹ️ {title}\n" f"━━━━━━━━━━━━━━━━━━━━\n" f"{summary}\n" f"{body}\n" ) return message, None ai_summary = "" ai_cause = None ai_actions = None ai_executed = None if isinstance(result, dict): ai_summary = ( result.get("summary") or result.get("reasoning") or result.get("message") or str(result)[:400] ) ai_cause = result.get("cause") or result.get("root_cause") ai_actions = result.get("suggested_actions") or result.get("execution_plan") ai_executed = result.get("executed_actions") return triaged_alert( event, tier_label=tier, ai_summary=ai_summary, ai_cause=ai_cause, ai_actions=ai_actions if isinstance(ai_actions, list) else ([str(ai_actions)] if ai_actions else None), ai_executed=ai_executed if isinstance(ai_executed, list) else ([str(ai_executed)] if ai_executed else None), )