# services/event_router.py # # ADR-012 §③: 單一入口 dispatch(event) — L0/L1/L2 分流 # W2-C: L2 優先走 Elephant Alpha Orchestrator;EA 不可用時 fallback AIOrchestrator # import asyncio import json import logging import os import threading import traceback import time from datetime import datetime from html import escape from typing import Any, Dict, Optional from services.ai_orchestrator import AIOrchestrator from services.ai_automation_metrics import ( record_event_router_dispatch, record_event_router_replay, record_event_router_safe_action, ) from services.telegram_templates import send_telegram_with_result, triaged_alert logger = logging.getLogger(__name__) _QUEUE_PATH = os.getenv( "MOMO_EVENT_ROUTER_QUEUE", os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "event_router_failed_deliveries.jsonl"), ) _QUEUE_LOCK = threading.Lock() _DEDUP_LOCK = threading.Lock() _EVENT_DEDUP: Dict[str, float] = {} _DEFAULT_DEDUP_SEC = int(os.getenv("MOMO_EVENT_ROUTER_DEFAULT_DEDUP_SEC", "0")) _REPLAY_ON_SUCCESS = os.getenv("MOMO_EVENT_ROUTER_REPLAY_ON_SUCCESS", "true").lower() == "true" _REPLAY_LIMIT = int(os.getenv("MOMO_EVENT_ROUTER_REPLAY_LIMIT", "3")) _REPLAY_TEXT_LIMIT = 700 _REPLAY_TRACE_LIMIT = 1200 async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]: """L1: semantic translation + reason analysis (Hermes).""" orchestrator = AIOrchestrator() return await orchestrator.handle_l1(event, session_id) async def _handle_l2(event: Dict[str, Any], session_id: str) -> Dict[str, Any]: """ L2: W2-C — EA Orchestrator 優先(動態路由 256K ctx); EA 不可用(API key 未設或連線失敗)時 fallback AIOrchestrator。 ADR-012: audit trail 由 EA._log_decision + triaged_alert 雙寫保證。 """ try: from services.elephant_service import elephant_service from services.elephant_alpha_orchestrator import elephant_orchestrator # 護欄:EA API key 未設定則直接 fallback,不嘗試連線 if not elephant_service.api_key: raise RuntimeError("NVIDIA_API_KEY not configured, using fallback") # 護欄:連線快取確認(W3-A cache 300s,不會每次都 ping) if not elephant_service.check_connection(): raise RuntimeError("EA connection unavailable, using fallback") decision = await elephant_orchestrator.analyze_and_coordinate({ "event": event, "tier": "L2", "session_id": session_id, "urgency": "high", "complexity": "medium", "task_type": event.get("event_type", "general_analysis"), }) return { "source": "elephant_alpha", "priority": decision.priority, "confidence": decision.confidence, "execution_plan": decision.execution_plan, "agents_required": decision.agents_required, "reasoning": decision.reasoning, } except Exception as e: logger.warning(f"[EventRouter] EA L2 failed ({e}), fallback → AIOrchestrator") orchestrator = AIOrchestrator() return await orchestrator.handle_l2(event, session_id) async def _handle_l0(event: Dict[str, Any]) -> Dict[str, Any]: """L0: return raw event (compatibility/monitoring).""" return {"status": "ok", "echo": event.get("event_type")} async def _run_tier_handler(tier: str, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: """Run AI tier with L0-safe degradation; handler failure must not break alerts.""" try: if tier == "L0": return await _handle_l0(event) if tier == "L1": return await _handle_l1(event, session_id) if tier == "L2": return await _handle_l2(event, session_id) return await _handle_l0(event) except Exception as exc: logger.exception("[EventRouter] %s handler failed, degraded to template alert: %s", tier, exc) return { "status": "degraded", "summary": "AI 分析暫不可用,已降級為模板告警;原始事件仍保留。", "cause": f"{type(exc).__name__}: {str(exc)[:200]}", "suggested_actions": ["先依原始事實排查", "若重複發生,查看 event_router_failed_deliveries.jsonl 與服務 logs"], "handler_error": str(exc)[:500], } def _decision_envelope_from_event(event: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Return an attached decision envelope from top-level or payload.""" envelope = event.get("decision_envelope") or event.get("decision") if isinstance(envelope, dict) and envelope: return envelope payload = event.get("payload") if isinstance(event.get("payload"), dict) else {} envelope = payload.get("decision_envelope") or payload.get("decision") if isinstance(envelope, dict) and envelope: return envelope return None def _should_render_decision_direct(event: Dict[str, Any]) -> bool: """Evidence-ready decision events should not be re-summarized by L1/L2 AI.""" return _decision_envelope_from_event(event) is not None def _direct_decision_result(event: Dict[str, Any]) -> Dict[str, Any]: envelope = _decision_envelope_from_event(event) or {} return { "status": "decision_envelope_direct", "summary": "已收到完整決策信封,略過 AI 重新摘要,直接以證據模板通知。", "decision_envelope": envelope, } def _event_key(event: Dict[str, Any]) -> str: return f"{event.get('source', 'unknown')}:{event.get('event_type', 'unknown')}" def _is_event_silenced(event: Dict[str, Any]) -> bool: try: from services.agent_actions import is_silenced key = _event_key(event) return is_silenced(key) or is_silenced(str(event.get("event_type", ""))) except Exception as exc: logger.warning("[EventRouter] silence check failed: %s", exc) return False def _dedup_ttl_sec(event: Dict[str, Any]) -> int: payload = event.get("payload") if isinstance(event.get("payload"), dict) else {} raw = event.get("dedup_ttl_sec", payload.get("dedup_ttl_sec", _DEFAULT_DEDUP_SEC)) try: return max(0, int(raw or 0)) except (TypeError, ValueError): return 0 def _is_duplicate_event(event: Dict[str, Any]) -> bool: ttl = _dedup_ttl_sec(event) if ttl <= 0: return False key = _event_key(event) now = time.time() with _DEDUP_LOCK: until = _EVENT_DEDUP.get(key) if until and until > now: return True _EVENT_DEDUP[key] = now + ttl return False def _queue_failed_delivery( event: Dict[str, Any], tier: str, message: Optional[str], errors: list, reason: str, ) -> bool: """Append failed notification delivery to a local JSONL queue for later replay.""" record = { "ts": datetime.now().isoformat(), "reason": reason, "tier": tier, "event_key": _event_key(event), "event": event, "message": message, "errors": errors, } try: os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True) with _QUEUE_LOCK: with open(_QUEUE_PATH, "a", encoding="utf-8") as fh: fh.write(json.dumps(record, ensure_ascii=False, default=str) + "\n") return True except Exception as exc: logger.error("[EventRouter] failed to queue delivery fallback: %s", exc) return False def _queued_record_message(record: Dict[str, Any]) -> str: event = record.get("event") if isinstance(record.get("event"), dict) else {} title = event.get("title") or record.get("event_key") or "EventRouter queued event" summary = event.get("summary") or event.get("status") or record.get("reason") or "queued delivery replay" trace = event.get("trace") or event.get("traceback") or event.get("error_traceback") errors = record.get("errors") if isinstance(record.get("errors"), list) else [] lines = [ "♻️ EventRouter Queue Replay", "━━━━━━━━━━━━━━━━━━━━", f"📌 {escape(_clip_text(title, 120))}", f"🔖 {escape(_clip_text(record.get('event_key', 'unknown'), 120))}", f"🕒 queued_at={escape(_clip_text(record.get('ts', 'unknown'), 80))}", "", f"🔍 概要:{escape(_clip_text(summary, _REPLAY_TEXT_LIMIT))}", ] if errors: lines.extend([ "", f"⚠️ 上次錯誤:{escape(_clip_text('; '.join(str(item) for item in errors), 300))}", ]) if trace: lines.extend([ "", "
" + escape(_clip_text(trace, _REPLAY_TRACE_LIMIT)) + "
", ]) lines.extend([ "", "💡 處置:此為失敗佇列安全回放;原始長訊息已濃縮,避免 Telegram HTML 400 重複卡住。", ]) return "\n".join(lines) def _clip_text(value: Any, limit: int) -> str: text = str(value or "").strip() if limit <= 0 or len(text) <= limit: return text return text[: max(0, limit - 1)].rstrip() + "…" def replay_failed_deliveries(limit: int = 20, admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: """Replay queued Telegram deliveries and keep failures in the JSONL queue.""" if limit <= 0 or not os.path.exists(_QUEUE_PATH): return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0} with _QUEUE_LOCK: try: with open(_QUEUE_PATH, "r", encoding="utf-8") as fh: lines = fh.readlines() except FileNotFoundError: return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0} remaining = [] attempted = sent = failed = dropped = 0 for idx, line in enumerate(lines): if idx >= limit: remaining.append(line) continue try: record = json.loads(line) except json.JSONDecodeError: dropped += 1 continue attempted += 1 result = send_telegram_with_result(_queued_record_message(record), chat_ids=admin_chat_ids) if result["ok"]: sent += 1 else: failed += 1 record["last_replay_error"] = result.get("errors", []) record["last_replay_at"] = datetime.now().isoformat() remaining.append(json.dumps(record, ensure_ascii=False, default=str) + "\n") if remaining: os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True) with open(_QUEUE_PATH, "w", encoding="utf-8") as fh: fh.writelines(remaining) else: try: os.remove(_QUEUE_PATH) except FileNotFoundError: pass result = {"attempted": attempted, "sent": sent, "failed": failed, "dropped": dropped} record_event_router_replay(**result) return result def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list[Dict[str, Any]]: """ Execute only ADR-012 L2 SAFE_ACTIONS when NemoTron explicitly marks the plan auto-safe. All action functions own their own audit writes. """ if not isinstance(result, dict): return [] if not (result.get("auto_execute") or result.get("dispatch_to") in {"safe_action", "auto_execute"}): return [] action_plan = result.get("action_plan") or result.get("execution_plan") or [] if not isinstance(action_plan, list): return [] try: from services.agent_actions import SAFE_ACTIONS except Exception as exc: return [{"action": "load_safe_actions", "status": "error", "error": str(exc)[:200]}] executed = [] for step in action_plan[:3]: if not isinstance(step, dict): continue action = step.get("action") params = step.get("params") or {} if action not in SAFE_ACTIONS: executed.append({"action": action, "status": "rejected", "reason": "not in SAFE_ACTIONS"}) record_event_router_safe_action(action or "unknown", "rejected") continue try: action_result = SAFE_ACTIONS[action](**params) executed.append({"action": action, "status": "ok", "result": action_result}) record_event_router_safe_action(action, "ok") except Exception as exc: logger.exception("[EventRouter] safe action failed: %s", action) executed.append({"action": action, "status": "error", "error": str(exc)[:300]}) record_event_router_safe_action(action or "unknown", "error") return executed async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: """ Main event routing entry (ADR-012 §③ — 唯一入口). Output format compatible with routes/bot_api_routes. """ tier = _classify(event) session_id = f"evt:{event.get('event_type')}:{event.get('source', 'unknown')}" started_at = time.perf_counter() try: if _is_event_silenced(event): response = { "tier": tier, "sent": 0, "errors": [], "latency_ms": int((time.perf_counter() - started_at) * 1000), "payload": {"status": "silenced", "event_key": _event_key(event)}, "delivered": True, "silenced": True, "queued": False, "deduped": False, } record_event_router_dispatch( tier=tier, event_type=event.get("event_type", "unknown"), delivered=True, silenced=True, latency_ms=response["latency_ms"], ) return response if _is_duplicate_event(event): response = { "tier": tier, "sent": 0, "errors": [], "latency_ms": int((time.perf_counter() - started_at) * 1000), "payload": {"status": "deduped", "event_key": _event_key(event)}, "delivered": True, "silenced": False, "queued": False, "deduped": True, } record_event_router_dispatch( tier=tier, event_type=event.get("event_type", "unknown"), delivered=True, deduped=True, latency_ms=response["latency_ms"], ) return response if _should_render_decision_direct(event): result = _direct_decision_result(event) else: result = await _run_tier_handler(tier, event, session_id) executed_actions = _execute_safe_actions(result, event) if executed_actions: result["executed_actions"] = executed_actions message, reply_markup = _build_telegram_message(event, tier, result) send_result = send_telegram_with_result(message, chat_ids=admin_chat_ids, reply_markup=reply_markup) queued = False replayed = {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0} if not send_result["ok"]: queued = _queue_failed_delivery(event, tier, message, send_result["errors"], "telegram_delivery_failed") elif _REPLAY_ON_SUCCESS: replayed = replay_failed_deliveries(limit=_REPLAY_LIMIT, admin_chat_ids=admin_chat_ids) latency_ms = int((time.perf_counter() - started_at) * 1000) response = { "tier": tier, "sent": send_result["sent"], "errors": send_result["errors"], "latency_ms": latency_ms, "payload": result, "delivered": send_result["ok"], "silenced": False, "queued": queued, "deduped": False, "replayed": replayed, } record_event_router_dispatch( tier=tier, event_type=event.get("event_type", "unknown"), delivered=response["delivered"], queued=response["queued"], latency_ms=latency_ms, ) return response except Exception as e: logger.exception(f"[EventRouter] dispatch failed: {e}") queued = _queue_failed_delivery(event, tier, None, [str(e)], "dispatch_exception") response = { "tier": tier, "sent": 0, "errors": [str(e)], "latency_ms": int((time.perf_counter() - started_at) * 1000), "payload": None, "delivered": False, "silenced": False, "queued": queued, "deduped": False, } record_event_router_dispatch( tier=tier, event_type=event.get("event_type", "unknown"), delivered=False, queued=queued, latency_ms=response["latency_ms"], ) return response def _run_coroutine_in_thread(coro) -> Dict[str, Any]: result = {} def runner(): try: result["value"] = asyncio.run(coro) except Exception as e: result["value"] = { "tier": "unknown", "sent": 0, "errors": [str(e)], "latency_ms": 0, "payload": None, "delivered": False, } thread = threading.Thread(target=runner, daemon=True) thread.start() thread.join(timeout=15) if thread.is_alive(): return { "tier": "unknown", "sent": 0, "errors": ["dispatch_sync timed out"], "latency_ms": 15000, "payload": None, "delivered": False, } return result["value"] def dispatch_sync(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: """同步環境使用的 EventRouter 入口。""" try: asyncio.get_running_loop() except RuntimeError: return asyncio.run(dispatch(event, admin_chat_ids=admin_chat_ids)) return _run_coroutine_in_thread(dispatch(event, admin_chat_ids=admin_chat_ids)) def notify_failure( task_name: str, error: Exception, *, source: Optional[str] = None, event_type: str = "scheduler_task_failure", priority: str = "P2", title: Optional[str] = None, trace: Optional[str] = None, payload: Optional[Dict[str, Any]] = None, dedup_ttl_sec: Optional[int] = None, ) -> Dict[str, Any]: """排程/背景任務失敗的同步通知 helper。""" severity = "alert" if priority in {"P1", "P2"} else "warning" event = { "source": source or f"Scheduler.{task_name}", "event_type": event_type, "severity": severity, "title": title or f"{task_name} 任務異常", "status": "任務失敗", "impact": f"{priority} - 背景任務需要檢查", "summary": str(error)[:200], "trace": trace or "".join(traceback.format_exception(type(error), error, error.__traceback__)), "payload": {"task_name": task_name, **(payload or {})}, } if dedup_ttl_sec is not None: event["dedup_ttl_sec"] = dedup_ttl_sec return dispatch_sync(event) def _classify(event: Dict[str, Any]) -> str: sev = event.get("severity", "info") has_trace = bool(event.get("trace")) event_type = event.get("event_type", "") if sev in ("info", "success"): return "L0" if sev == "warning": return "L1" if has_trace else "L0" if sev == "alert": if event_type in {"price_threat", "db_connection_error", "crawler_timeout", "nim_quota_exhausted", "embedding_failure", "scheduler_task_failure"}: return "L2" return "L1" return "L0" def _build_telegram_message(event: Dict[str, Any], tier: str, result: Optional[Dict[str, Any]]) -> tuple[str, Optional[Dict[str, Any]]]: decision_envelope = _decision_envelope_from_event(event) if decision_envelope: render_event = dict(event) render_event["decision_envelope"] = decision_envelope render_event["event_type"] = decision_envelope.get("decision_type") or event.get("event_type", "decision") if not render_event.get("id") and decision_envelope.get("decision_id"): render_event["id"] = decision_envelope.get("decision_id") subject = decision_envelope.get("subject") if isinstance(decision_envelope.get("subject"), dict) else {} sku = str(subject.get("sku") or "").strip() name = str(subject.get("name") or "").strip() if sku or name: render_event["summary"] = " · ".join(part for part in (f"SKU {sku}" if sku else "", name) if part) raw_source_agent = str(decision_envelope.get("source_agent") or event.get("source") or "Decision") source_agent = { "nemotron": "NemoTron", "openclaw": "OpenClaw", "elephant_alpha": "Elephant Alpha", "hermes": "Hermes", }.get(raw_source_agent.lower(), raw_source_agent.replace("_", " ").title()) severity = str(decision_envelope.get("severity") or tier) ai_summary = "" if isinstance(result, dict): ai_summary = str(result.get("summary") or "") if not ai_summary: ai_summary = str(decision_envelope.get("analysis") or "依決策信封進行人工覆核。") return triaged_alert( render_event, tier_label=f"{source_agent} · {severity}", ai_summary=ai_summary, ) if tier == "L0": title = event.get("title") or event.get("event_type", "system_event") summary = event.get("summary") or event.get("status") or "系統事件" body = event.get("impact") or event.get("source") or "" message = ( f"ℹ️ {title}\n" f"━━━━━━━━━━━━━━━━━━━━\n" f"{summary}\n" f"{body}\n" ) return message, None ai_summary = "" ai_cause = None ai_actions = None ai_executed = None if isinstance(result, dict): ai_summary = ( result.get("summary") or result.get("reasoning") or result.get("message") or str(result)[:400] ) ai_cause = result.get("cause") or result.get("root_cause") ai_actions = result.get("suggested_actions") or result.get("execution_plan") ai_executed = result.get("executed_actions") return triaged_alert( event, tier_label=tier, ai_summary=ai_summary, ai_cause=ai_cause, ai_actions=ai_actions if isinstance(ai_actions, list) else ([str(ai_actions)] if ai_actions else None), ai_executed=ai_executed if isinstance(ai_executed, list) else ([str(ai_executed)] if ai_executed else None), )