544 lines
20 KiB
Python
544 lines
20 KiB
Python
# services/event_router.py
|
||
#
|
||
# ADR-012 §③: 單一入口 dispatch(event) — L0/L1/L2 分流
|
||
# W2-C: L2 優先走 Elephant Alpha Orchestrator;EA 不可用時 fallback AIOrchestrator
|
||
#
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import threading
|
||
import traceback
|
||
import time
|
||
from datetime import datetime
|
||
from html import escape
|
||
from typing import Any, Dict, Optional
|
||
|
||
from services.ai_orchestrator import AIOrchestrator
|
||
from services.ai_automation_metrics import (
|
||
record_event_router_dispatch,
|
||
record_event_router_replay,
|
||
record_event_router_safe_action,
|
||
)
|
||
from services.telegram_templates import send_telegram_with_result, triaged_alert
|
||
|
||
logger = logging.getLogger(__name__)
|
||
_QUEUE_PATH = os.getenv(
|
||
"MOMO_EVENT_ROUTER_QUEUE",
|
||
os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "event_router_failed_deliveries.jsonl"),
|
||
)
|
||
_QUEUE_LOCK = threading.Lock()
|
||
_DEDUP_LOCK = threading.Lock()
|
||
_EVENT_DEDUP: Dict[str, float] = {}
|
||
_DEFAULT_DEDUP_SEC = int(os.getenv("MOMO_EVENT_ROUTER_DEFAULT_DEDUP_SEC", "0"))
|
||
_REPLAY_ON_SUCCESS = os.getenv("MOMO_EVENT_ROUTER_REPLAY_ON_SUCCESS", "true").lower() == "true"
|
||
_REPLAY_LIMIT = int(os.getenv("MOMO_EVENT_ROUTER_REPLAY_LIMIT", "3"))
|
||
_REPLAY_TEXT_LIMIT = 700
|
||
_REPLAY_TRACE_LIMIT = 1200
|
||
|
||
|
||
async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
|
||
"""L1: semantic translation + reason analysis (Hermes)."""
|
||
orchestrator = AIOrchestrator()
|
||
return await orchestrator.handle_l1(event, session_id)
|
||
|
||
|
||
async def _handle_l2(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
|
||
"""
|
||
L2: W2-C — EA Orchestrator 優先(動態路由 256K ctx);
|
||
EA 不可用(API key 未設或連線失敗)時 fallback AIOrchestrator。
|
||
ADR-012: audit trail 由 EA._log_decision + triaged_alert 雙寫保證。
|
||
"""
|
||
try:
|
||
from services.elephant_service import elephant_service
|
||
from services.elephant_alpha_orchestrator import elephant_orchestrator
|
||
|
||
# 護欄:EA API key 未設定則直接 fallback,不嘗試連線
|
||
if not elephant_service.api_key:
|
||
raise RuntimeError("NVIDIA_API_KEY not configured, using fallback")
|
||
|
||
# 護欄:連線快取確認(W3-A cache 300s,不會每次都 ping)
|
||
if not elephant_service.check_connection():
|
||
raise RuntimeError("EA connection unavailable, using fallback")
|
||
|
||
decision = await elephant_orchestrator.analyze_and_coordinate({
|
||
"event": event,
|
||
"tier": "L2",
|
||
"session_id": session_id,
|
||
"urgency": "high",
|
||
"complexity": "medium",
|
||
"task_type": event.get("event_type", "general_analysis"),
|
||
})
|
||
|
||
return {
|
||
"source": "elephant_alpha",
|
||
"priority": decision.priority,
|
||
"confidence": decision.confidence,
|
||
"execution_plan": decision.execution_plan,
|
||
"agents_required": decision.agents_required,
|
||
"reasoning": decision.reasoning,
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[EventRouter] EA L2 failed ({e}), fallback → AIOrchestrator")
|
||
orchestrator = AIOrchestrator()
|
||
return await orchestrator.handle_l2(event, session_id)
|
||
|
||
|
||
async def _handle_l0(event: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""L0: return raw event (compatibility/monitoring)."""
|
||
return {"status": "ok", "echo": event.get("event_type")}
|
||
|
||
|
||
async def _run_tier_handler(tier: str, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
|
||
"""Run AI tier with L0-safe degradation; handler failure must not break alerts."""
|
||
try:
|
||
if tier == "L0":
|
||
return await _handle_l0(event)
|
||
if tier == "L1":
|
||
return await _handle_l1(event, session_id)
|
||
if tier == "L2":
|
||
return await _handle_l2(event, session_id)
|
||
return await _handle_l0(event)
|
||
except Exception as exc:
|
||
logger.exception("[EventRouter] %s handler failed, degraded to template alert: %s", tier, exc)
|
||
return {
|
||
"status": "degraded",
|
||
"summary": "AI 分析暫不可用,已降級為模板告警;原始事件仍保留。",
|
||
"cause": f"{type(exc).__name__}: {str(exc)[:200]}",
|
||
"suggested_actions": ["先依原始事實排查", "若重複發生,查看 event_router_failed_deliveries.jsonl 與服務 logs"],
|
||
"handler_error": str(exc)[:500],
|
||
}
|
||
|
||
|
||
def _event_key(event: Dict[str, Any]) -> str:
|
||
return f"{event.get('source', 'unknown')}:{event.get('event_type', 'unknown')}"
|
||
|
||
|
||
def _is_event_silenced(event: Dict[str, Any]) -> bool:
|
||
try:
|
||
from services.agent_actions import is_silenced
|
||
key = _event_key(event)
|
||
return is_silenced(key) or is_silenced(str(event.get("event_type", "")))
|
||
except Exception as exc:
|
||
logger.warning("[EventRouter] silence check failed: %s", exc)
|
||
return False
|
||
|
||
|
||
def _dedup_ttl_sec(event: Dict[str, Any]) -> int:
|
||
payload = event.get("payload") if isinstance(event.get("payload"), dict) else {}
|
||
raw = event.get("dedup_ttl_sec", payload.get("dedup_ttl_sec", _DEFAULT_DEDUP_SEC))
|
||
try:
|
||
return max(0, int(raw or 0))
|
||
except (TypeError, ValueError):
|
||
return 0
|
||
|
||
|
||
def _is_duplicate_event(event: Dict[str, Any]) -> bool:
|
||
ttl = _dedup_ttl_sec(event)
|
||
if ttl <= 0:
|
||
return False
|
||
key = _event_key(event)
|
||
now = time.time()
|
||
with _DEDUP_LOCK:
|
||
until = _EVENT_DEDUP.get(key)
|
||
if until and until > now:
|
||
return True
|
||
_EVENT_DEDUP[key] = now + ttl
|
||
return False
|
||
|
||
|
||
def _queue_failed_delivery(
|
||
event: Dict[str, Any],
|
||
tier: str,
|
||
message: Optional[str],
|
||
errors: list,
|
||
reason: str,
|
||
) -> bool:
|
||
"""Append failed notification delivery to a local JSONL queue for later replay."""
|
||
record = {
|
||
"ts": datetime.now().isoformat(),
|
||
"reason": reason,
|
||
"tier": tier,
|
||
"event_key": _event_key(event),
|
||
"event": event,
|
||
"message": message,
|
||
"errors": errors,
|
||
}
|
||
try:
|
||
os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True)
|
||
with _QUEUE_LOCK:
|
||
with open(_QUEUE_PATH, "a", encoding="utf-8") as fh:
|
||
fh.write(json.dumps(record, ensure_ascii=False, default=str) + "\n")
|
||
return True
|
||
except Exception as exc:
|
||
logger.error("[EventRouter] failed to queue delivery fallback: %s", exc)
|
||
return False
|
||
|
||
|
||
def _queued_record_message(record: Dict[str, Any]) -> str:
|
||
event = record.get("event") if isinstance(record.get("event"), dict) else {}
|
||
title = event.get("title") or record.get("event_key") or "EventRouter queued event"
|
||
summary = event.get("summary") or event.get("status") or record.get("reason") or "queued delivery replay"
|
||
trace = event.get("trace") or event.get("traceback") or event.get("error_traceback")
|
||
errors = record.get("errors") if isinstance(record.get("errors"), list) else []
|
||
|
||
lines = [
|
||
"♻️ <b>EventRouter Queue Replay</b>",
|
||
"━━━━━━━━━━━━━━━━━━━━",
|
||
f"📌 <b>{escape(_clip_text(title, 120))}</b>",
|
||
f"🔖 <code>{escape(_clip_text(record.get('event_key', 'unknown'), 120))}</code>",
|
||
f"🕒 queued_at={escape(_clip_text(record.get('ts', 'unknown'), 80))}",
|
||
"",
|
||
f"🔍 <b>概要:</b>{escape(_clip_text(summary, _REPLAY_TEXT_LIMIT))}",
|
||
]
|
||
if errors:
|
||
lines.extend([
|
||
"",
|
||
f"⚠️ <b>上次錯誤:</b>{escape(_clip_text('; '.join(str(item) for item in errors), 300))}",
|
||
])
|
||
if trace:
|
||
lines.extend([
|
||
"",
|
||
"<pre>" + escape(_clip_text(trace, _REPLAY_TRACE_LIMIT)) + "</pre>",
|
||
])
|
||
lines.extend([
|
||
"",
|
||
"💡 <b>處置:</b>此為失敗佇列安全回放;原始長訊息已濃縮,避免 Telegram HTML 400 重複卡住。",
|
||
])
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _clip_text(value: Any, limit: int) -> str:
|
||
text = str(value or "").strip()
|
||
if limit <= 0 or len(text) <= limit:
|
||
return text
|
||
return text[: max(0, limit - 1)].rstrip() + "…"
|
||
|
||
|
||
def replay_failed_deliveries(limit: int = 20, admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
|
||
"""Replay queued Telegram deliveries and keep failures in the JSONL queue."""
|
||
if limit <= 0 or not os.path.exists(_QUEUE_PATH):
|
||
return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0}
|
||
|
||
with _QUEUE_LOCK:
|
||
try:
|
||
with open(_QUEUE_PATH, "r", encoding="utf-8") as fh:
|
||
lines = fh.readlines()
|
||
except FileNotFoundError:
|
||
return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0}
|
||
|
||
remaining = []
|
||
attempted = sent = failed = dropped = 0
|
||
for idx, line in enumerate(lines):
|
||
if idx >= limit:
|
||
remaining.append(line)
|
||
continue
|
||
try:
|
||
record = json.loads(line)
|
||
except json.JSONDecodeError:
|
||
dropped += 1
|
||
continue
|
||
|
||
attempted += 1
|
||
result = send_telegram_with_result(_queued_record_message(record), chat_ids=admin_chat_ids)
|
||
if result["ok"]:
|
||
sent += 1
|
||
else:
|
||
failed += 1
|
||
record["last_replay_error"] = result.get("errors", [])
|
||
record["last_replay_at"] = datetime.now().isoformat()
|
||
remaining.append(json.dumps(record, ensure_ascii=False, default=str) + "\n")
|
||
|
||
if remaining:
|
||
os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True)
|
||
with open(_QUEUE_PATH, "w", encoding="utf-8") as fh:
|
||
fh.writelines(remaining)
|
||
else:
|
||
try:
|
||
os.remove(_QUEUE_PATH)
|
||
except FileNotFoundError:
|
||
pass
|
||
|
||
result = {"attempted": attempted, "sent": sent, "failed": failed, "dropped": dropped}
|
||
record_event_router_replay(**result)
|
||
return result
|
||
|
||
|
||
def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list[Dict[str, Any]]:
|
||
"""
|
||
Execute only ADR-012 L2 SAFE_ACTIONS when NemoTron explicitly marks the plan auto-safe.
|
||
All action functions own their own audit writes.
|
||
"""
|
||
if not isinstance(result, dict):
|
||
return []
|
||
if not (result.get("auto_execute") or result.get("dispatch_to") in {"safe_action", "auto_execute"}):
|
||
return []
|
||
|
||
action_plan = result.get("action_plan") or result.get("execution_plan") or []
|
||
if not isinstance(action_plan, list):
|
||
return []
|
||
|
||
try:
|
||
from services.agent_actions import SAFE_ACTIONS
|
||
except Exception as exc:
|
||
return [{"action": "load_safe_actions", "status": "error", "error": str(exc)[:200]}]
|
||
|
||
executed = []
|
||
for step in action_plan[:3]:
|
||
if not isinstance(step, dict):
|
||
continue
|
||
action = step.get("action")
|
||
params = step.get("params") or {}
|
||
if action not in SAFE_ACTIONS:
|
||
executed.append({"action": action, "status": "rejected", "reason": "not in SAFE_ACTIONS"})
|
||
record_event_router_safe_action(action or "unknown", "rejected")
|
||
continue
|
||
try:
|
||
action_result = SAFE_ACTIONS[action](**params)
|
||
executed.append({"action": action, "status": "ok", "result": action_result})
|
||
record_event_router_safe_action(action, "ok")
|
||
except Exception as exc:
|
||
logger.exception("[EventRouter] safe action failed: %s", action)
|
||
executed.append({"action": action, "status": "error", "error": str(exc)[:300]})
|
||
record_event_router_safe_action(action or "unknown", "error")
|
||
return executed
|
||
|
||
|
||
async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
|
||
"""
|
||
Main event routing entry (ADR-012 §③ — 唯一入口).
|
||
Output format compatible with routes/bot_api_routes.
|
||
"""
|
||
tier = _classify(event)
|
||
session_id = f"evt:{event.get('event_type')}:{event.get('source', 'unknown')}"
|
||
started_at = time.perf_counter()
|
||
|
||
try:
|
||
if _is_event_silenced(event):
|
||
response = {
|
||
"tier": tier,
|
||
"sent": 0,
|
||
"errors": [],
|
||
"latency_ms": int((time.perf_counter() - started_at) * 1000),
|
||
"payload": {"status": "silenced", "event_key": _event_key(event)},
|
||
"delivered": True,
|
||
"silenced": True,
|
||
"queued": False,
|
||
"deduped": False,
|
||
}
|
||
record_event_router_dispatch(
|
||
tier=tier,
|
||
event_type=event.get("event_type", "unknown"),
|
||
delivered=True,
|
||
silenced=True,
|
||
latency_ms=response["latency_ms"],
|
||
)
|
||
return response
|
||
|
||
if _is_duplicate_event(event):
|
||
response = {
|
||
"tier": tier,
|
||
"sent": 0,
|
||
"errors": [],
|
||
"latency_ms": int((time.perf_counter() - started_at) * 1000),
|
||
"payload": {"status": "deduped", "event_key": _event_key(event)},
|
||
"delivered": True,
|
||
"silenced": False,
|
||
"queued": False,
|
||
"deduped": True,
|
||
}
|
||
record_event_router_dispatch(
|
||
tier=tier,
|
||
event_type=event.get("event_type", "unknown"),
|
||
delivered=True,
|
||
deduped=True,
|
||
latency_ms=response["latency_ms"],
|
||
)
|
||
return response
|
||
|
||
result = await _run_tier_handler(tier, event, session_id)
|
||
executed_actions = _execute_safe_actions(result, event)
|
||
if executed_actions:
|
||
result["executed_actions"] = executed_actions
|
||
|
||
message, reply_markup = _build_telegram_message(event, tier, result)
|
||
send_result = send_telegram_with_result(message, chat_ids=admin_chat_ids, reply_markup=reply_markup)
|
||
queued = False
|
||
replayed = {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0}
|
||
if not send_result["ok"]:
|
||
queued = _queue_failed_delivery(event, tier, message, send_result["errors"], "telegram_delivery_failed")
|
||
elif _REPLAY_ON_SUCCESS:
|
||
replayed = replay_failed_deliveries(limit=_REPLAY_LIMIT, admin_chat_ids=admin_chat_ids)
|
||
latency_ms = int((time.perf_counter() - started_at) * 1000)
|
||
|
||
response = {
|
||
"tier": tier,
|
||
"sent": send_result["sent"],
|
||
"errors": send_result["errors"],
|
||
"latency_ms": latency_ms,
|
||
"payload": result,
|
||
"delivered": send_result["ok"],
|
||
"silenced": False,
|
||
"queued": queued,
|
||
"deduped": False,
|
||
"replayed": replayed,
|
||
}
|
||
record_event_router_dispatch(
|
||
tier=tier,
|
||
event_type=event.get("event_type", "unknown"),
|
||
delivered=response["delivered"],
|
||
queued=response["queued"],
|
||
latency_ms=latency_ms,
|
||
)
|
||
return response
|
||
except Exception as e:
|
||
logger.exception(f"[EventRouter] dispatch failed: {e}")
|
||
queued = _queue_failed_delivery(event, tier, None, [str(e)], "dispatch_exception")
|
||
response = {
|
||
"tier": tier,
|
||
"sent": 0,
|
||
"errors": [str(e)],
|
||
"latency_ms": int((time.perf_counter() - started_at) * 1000),
|
||
"payload": None,
|
||
"delivered": False,
|
||
"silenced": False,
|
||
"queued": queued,
|
||
"deduped": False,
|
||
}
|
||
record_event_router_dispatch(
|
||
tier=tier,
|
||
event_type=event.get("event_type", "unknown"),
|
||
delivered=False,
|
||
queued=queued,
|
||
latency_ms=response["latency_ms"],
|
||
)
|
||
return response
|
||
|
||
|
||
def _run_coroutine_in_thread(coro) -> Dict[str, Any]:
|
||
result = {}
|
||
|
||
def runner():
|
||
try:
|
||
result["value"] = asyncio.run(coro)
|
||
except Exception as e:
|
||
result["value"] = {
|
||
"tier": "unknown",
|
||
"sent": 0,
|
||
"errors": [str(e)],
|
||
"latency_ms": 0,
|
||
"payload": None,
|
||
"delivered": False,
|
||
}
|
||
|
||
thread = threading.Thread(target=runner, daemon=True)
|
||
thread.start()
|
||
thread.join(timeout=15)
|
||
if thread.is_alive():
|
||
return {
|
||
"tier": "unknown",
|
||
"sent": 0,
|
||
"errors": ["dispatch_sync timed out"],
|
||
"latency_ms": 15000,
|
||
"payload": None,
|
||
"delivered": False,
|
||
}
|
||
return result["value"]
|
||
|
||
|
||
def dispatch_sync(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
|
||
"""同步環境使用的 EventRouter 入口。"""
|
||
try:
|
||
asyncio.get_running_loop()
|
||
except RuntimeError:
|
||
return asyncio.run(dispatch(event, admin_chat_ids=admin_chat_ids))
|
||
return _run_coroutine_in_thread(dispatch(event, admin_chat_ids=admin_chat_ids))
|
||
|
||
|
||
def notify_failure(
|
||
task_name: str,
|
||
error: Exception,
|
||
*,
|
||
source: Optional[str] = None,
|
||
event_type: str = "scheduler_task_failure",
|
||
priority: str = "P2",
|
||
title: Optional[str] = None,
|
||
trace: Optional[str] = None,
|
||
payload: Optional[Dict[str, Any]] = None,
|
||
dedup_ttl_sec: Optional[int] = None,
|
||
) -> Dict[str, Any]:
|
||
"""排程/背景任務失敗的同步通知 helper。"""
|
||
severity = "alert" if priority in {"P1", "P2"} else "warning"
|
||
event = {
|
||
"source": source or f"Scheduler.{task_name}",
|
||
"event_type": event_type,
|
||
"severity": severity,
|
||
"title": title or f"{task_name} 任務異常",
|
||
"status": "任務失敗",
|
||
"impact": f"{priority} - 背景任務需要檢查",
|
||
"summary": str(error)[:200],
|
||
"trace": trace or "".join(traceback.format_exception(type(error), error, error.__traceback__)),
|
||
"payload": {"task_name": task_name, **(payload or {})},
|
||
}
|
||
if dedup_ttl_sec is not None:
|
||
event["dedup_ttl_sec"] = dedup_ttl_sec
|
||
return dispatch_sync(event)
|
||
|
||
|
||
def _classify(event: Dict[str, Any]) -> str:
|
||
sev = event.get("severity", "info")
|
||
has_trace = bool(event.get("trace"))
|
||
event_type = event.get("event_type", "")
|
||
|
||
if sev in ("info", "success"):
|
||
return "L0"
|
||
if sev == "warning":
|
||
return "L1" if has_trace else "L0"
|
||
if sev == "alert":
|
||
if event_type in {"price_threat", "db_connection_error", "crawler_timeout",
|
||
"nim_quota_exhausted", "embedding_failure",
|
||
"scheduler_task_failure"}:
|
||
return "L2"
|
||
return "L1"
|
||
return "L0"
|
||
|
||
|
||
def _build_telegram_message(event: Dict[str, Any], tier: str, result: Optional[Dict[str, Any]]) -> tuple[str, Optional[Dict[str, Any]]]:
|
||
if tier == "L0":
|
||
title = event.get("title") or event.get("event_type", "system_event")
|
||
summary = event.get("summary") or event.get("status") or "系統事件"
|
||
body = event.get("impact") or event.get("source") or ""
|
||
message = (
|
||
f"ℹ️ <b>{title}</b>\n"
|
||
f"━━━━━━━━━━━━━━━━━━━━\n"
|
||
f"{summary}\n"
|
||
f"{body}\n"
|
||
)
|
||
return message, None
|
||
|
||
ai_summary = ""
|
||
ai_cause = None
|
||
ai_actions = None
|
||
ai_executed = None
|
||
|
||
if isinstance(result, dict):
|
||
ai_summary = (
|
||
result.get("summary")
|
||
or result.get("reasoning")
|
||
or result.get("message")
|
||
or str(result)[:400]
|
||
)
|
||
ai_cause = result.get("cause") or result.get("root_cause")
|
||
ai_actions = result.get("suggested_actions") or result.get("execution_plan")
|
||
ai_executed = result.get("executed_actions")
|
||
|
||
return triaged_alert(
|
||
event,
|
||
tier_label=tier,
|
||
ai_summary=ai_summary,
|
||
ai_cause=ai_cause,
|
||
ai_actions=ai_actions if isinstance(ai_actions, list) else ([str(ai_actions)] if ai_actions else None),
|
||
ai_executed=ai_executed if isinstance(ai_executed, list) else ([str(ai_executed)] if ai_executed else None),
|
||
)
|