Files
ewoooc/services/event_router.py
2026-05-24 22:49:46 +08:00

606 lines
23 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# services/event_router.py
#
# ADR-012 §③: 單一入口 dispatch(event) — L0/L1/L2 分流
# W2-C: L2 優先走 Elephant Alpha OrchestratorEA 不可用時 fallback AIOrchestrator
#
import asyncio
import json
import logging
import os
import threading
import traceback
import time
from datetime import datetime
from html import escape
from typing import Any, Dict, Optional
from services.ai_orchestrator import AIOrchestrator
from services.ai_automation_metrics import (
record_event_router_dispatch,
record_event_router_replay,
record_event_router_safe_action,
)
from services.telegram_templates import send_telegram_with_result, triaged_alert
logger = logging.getLogger(__name__)
_QUEUE_PATH = os.getenv(
"MOMO_EVENT_ROUTER_QUEUE",
os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "event_router_failed_deliveries.jsonl"),
)
_QUEUE_LOCK = threading.Lock()
_DEDUP_LOCK = threading.Lock()
_EVENT_DEDUP: Dict[str, float] = {}
_DEFAULT_DEDUP_SEC = int(os.getenv("MOMO_EVENT_ROUTER_DEFAULT_DEDUP_SEC", "0"))
_REPLAY_ON_SUCCESS = os.getenv("MOMO_EVENT_ROUTER_REPLAY_ON_SUCCESS", "true").lower() == "true"
_REPLAY_LIMIT = int(os.getenv("MOMO_EVENT_ROUTER_REPLAY_LIMIT", "3"))
_REPLAY_TEXT_LIMIT = 700
_REPLAY_TRACE_LIMIT = 1200
async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""L1: semantic translation + reason analysis (Hermes)."""
orchestrator = AIOrchestrator()
return await orchestrator.handle_l1(event, session_id)
async def _handle_l2(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""
L2: W2-C — EA Orchestrator 優先(動態路由 256K ctx
EA 不可用API key 未設或連線失敗)時 fallback AIOrchestrator。
ADR-012: audit trail 由 EA._log_decision + triaged_alert 雙寫保證。
"""
try:
from services.elephant_service import elephant_service
from services.elephant_alpha_orchestrator import elephant_orchestrator
# 護欄EA API key 未設定則直接 fallback不嘗試連線
if not elephant_service.api_key:
raise RuntimeError("NVIDIA_API_KEY not configured, using fallback")
# 護欄連線快取確認W3-A cache 300s不會每次都 ping
if not elephant_service.check_connection():
raise RuntimeError("EA connection unavailable, using fallback")
decision = await elephant_orchestrator.analyze_and_coordinate({
"event": event,
"tier": "L2",
"session_id": session_id,
"urgency": "high",
"complexity": "medium",
"task_type": event.get("event_type", "general_analysis"),
})
return {
"source": "elephant_alpha",
"priority": decision.priority,
"confidence": decision.confidence,
"execution_plan": decision.execution_plan,
"agents_required": decision.agents_required,
"reasoning": decision.reasoning,
}
except Exception as e:
logger.warning(f"[EventRouter] EA L2 failed ({e}), fallback → AIOrchestrator")
orchestrator = AIOrchestrator()
return await orchestrator.handle_l2(event, session_id)
async def _handle_l0(event: Dict[str, Any]) -> Dict[str, Any]:
"""L0: return raw event (compatibility/monitoring)."""
return {"status": "ok", "echo": event.get("event_type")}
async def _run_tier_handler(tier: str, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""Run AI tier with L0-safe degradation; handler failure must not break alerts."""
try:
if tier == "L0":
return await _handle_l0(event)
if tier == "L1":
return await _handle_l1(event, session_id)
if tier == "L2":
return await _handle_l2(event, session_id)
return await _handle_l0(event)
except Exception as exc:
logger.exception("[EventRouter] %s handler failed, degraded to template alert: %s", tier, exc)
return {
"status": "degraded",
"summary": "AI 分析暫不可用,已降級為模板告警;原始事件仍保留。",
"cause": f"{type(exc).__name__}: {str(exc)[:200]}",
"suggested_actions": ["先依原始事實排查", "若重複發生,查看 event_router_failed_deliveries.jsonl 與服務 logs"],
"handler_error": str(exc)[:500],
}
def _decision_envelope_from_event(event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Return an attached decision envelope from top-level or payload."""
envelope = event.get("decision_envelope") or event.get("decision")
if isinstance(envelope, dict) and envelope:
return envelope
payload = event.get("payload") if isinstance(event.get("payload"), dict) else {}
envelope = payload.get("decision_envelope") or payload.get("decision")
if isinstance(envelope, dict) and envelope:
return envelope
return None
def _should_render_decision_direct(event: Dict[str, Any]) -> bool:
"""Evidence-ready decision events should not be re-summarized by L1/L2 AI."""
return _decision_envelope_from_event(event) is not None
def _direct_decision_result(event: Dict[str, Any]) -> Dict[str, Any]:
envelope = _decision_envelope_from_event(event) or {}
return {
"status": "decision_envelope_direct",
"summary": "已收到完整決策信封,略過 AI 重新摘要,直接以證據模板通知。",
"decision_envelope": envelope,
}
def _event_key(event: Dict[str, Any]) -> str:
return f"{event.get('source', 'unknown')}:{event.get('event_type', 'unknown')}"
def _is_event_silenced(event: Dict[str, Any]) -> bool:
try:
from services.agent_actions import is_silenced
key = _event_key(event)
return is_silenced(key) or is_silenced(str(event.get("event_type", "")))
except Exception as exc:
logger.warning("[EventRouter] silence check failed: %s", exc)
return False
def _dedup_ttl_sec(event: Dict[str, Any]) -> int:
payload = event.get("payload") if isinstance(event.get("payload"), dict) else {}
raw = event.get("dedup_ttl_sec", payload.get("dedup_ttl_sec", _DEFAULT_DEDUP_SEC))
try:
return max(0, int(raw or 0))
except (TypeError, ValueError):
return 0
def _is_duplicate_event(event: Dict[str, Any]) -> bool:
ttl = _dedup_ttl_sec(event)
if ttl <= 0:
return False
key = _event_key(event)
now = time.time()
with _DEDUP_LOCK:
until = _EVENT_DEDUP.get(key)
if until and until > now:
return True
_EVENT_DEDUP[key] = now + ttl
return False
def _queue_failed_delivery(
event: Dict[str, Any],
tier: str,
message: Optional[str],
errors: list,
reason: str,
) -> bool:
"""Append failed notification delivery to a local JSONL queue for later replay."""
record = {
"ts": datetime.now().isoformat(),
"reason": reason,
"tier": tier,
"event_key": _event_key(event),
"event": event,
"message": message,
"errors": errors,
}
try:
os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True)
with _QUEUE_LOCK:
with open(_QUEUE_PATH, "a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=False, default=str) + "\n")
return True
except Exception as exc:
logger.error("[EventRouter] failed to queue delivery fallback: %s", exc)
return False
def _queued_record_message(record: Dict[str, Any]) -> str:
event = record.get("event") if isinstance(record.get("event"), dict) else {}
title = event.get("title") or record.get("event_key") or "EventRouter queued event"
summary = event.get("summary") or event.get("status") or record.get("reason") or "queued delivery replay"
trace = event.get("trace") or event.get("traceback") or event.get("error_traceback")
errors = record.get("errors") if isinstance(record.get("errors"), list) else []
lines = [
"♻️ <b>EventRouter Queue Replay</b>",
"━━━━━━━━━━━━━━━━━━━━",
f"📌 <b>{escape(_clip_text(title, 120))}</b>",
f"🔖 <code>{escape(_clip_text(record.get('event_key', 'unknown'), 120))}</code>",
f"🕒 queued_at={escape(_clip_text(record.get('ts', 'unknown'), 80))}",
"",
f"🔍 <b>概要:</b>{escape(_clip_text(summary, _REPLAY_TEXT_LIMIT))}",
]
if errors:
lines.extend([
"",
f"⚠️ <b>上次錯誤:</b>{escape(_clip_text('; '.join(str(item) for item in errors), 300))}",
])
if trace:
lines.extend([
"",
"<pre>" + escape(_clip_text(trace, _REPLAY_TRACE_LIMIT)) + "</pre>",
])
lines.extend([
"",
"💡 <b>處置:</b>此為失敗佇列安全回放;原始長訊息已濃縮,避免 Telegram HTML 400 重複卡住。",
])
return "\n".join(lines)
def _clip_text(value: Any, limit: int) -> str:
text = str(value or "").strip()
if limit <= 0 or len(text) <= limit:
return text
return text[: max(0, limit - 1)].rstrip() + ""
def replay_failed_deliveries(limit: int = 20, admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
"""Replay queued Telegram deliveries and keep failures in the JSONL queue."""
if limit <= 0 or not os.path.exists(_QUEUE_PATH):
return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0}
with _QUEUE_LOCK:
try:
with open(_QUEUE_PATH, "r", encoding="utf-8") as fh:
lines = fh.readlines()
except FileNotFoundError:
return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0}
remaining = []
attempted = sent = failed = dropped = 0
for idx, line in enumerate(lines):
if idx >= limit:
remaining.append(line)
continue
try:
record = json.loads(line)
except json.JSONDecodeError:
dropped += 1
continue
attempted += 1
result = send_telegram_with_result(_queued_record_message(record), chat_ids=admin_chat_ids)
if result["ok"]:
sent += 1
else:
failed += 1
record["last_replay_error"] = result.get("errors", [])
record["last_replay_at"] = datetime.now().isoformat()
remaining.append(json.dumps(record, ensure_ascii=False, default=str) + "\n")
if remaining:
os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True)
with open(_QUEUE_PATH, "w", encoding="utf-8") as fh:
fh.writelines(remaining)
else:
try:
os.remove(_QUEUE_PATH)
except FileNotFoundError:
pass
result = {"attempted": attempted, "sent": sent, "failed": failed, "dropped": dropped}
record_event_router_replay(**result)
return result
def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list[Dict[str, Any]]:
"""
Execute only ADR-012 L2 SAFE_ACTIONS when NemoTron explicitly marks the plan auto-safe.
All action functions own their own audit writes.
"""
if not isinstance(result, dict):
return []
if not (result.get("auto_execute") or result.get("dispatch_to") in {"safe_action", "auto_execute"}):
return []
action_plan = result.get("action_plan") or result.get("execution_plan") or []
if not isinstance(action_plan, list):
return []
try:
from services.agent_actions import SAFE_ACTIONS
except Exception as exc:
return [{"action": "load_safe_actions", "status": "error", "error": str(exc)[:200]}]
executed = []
for step in action_plan[:3]:
if not isinstance(step, dict):
continue
action = step.get("action")
params = step.get("params") or {}
if action not in SAFE_ACTIONS:
executed.append({"action": action, "status": "rejected", "reason": "not in SAFE_ACTIONS"})
record_event_router_safe_action(action or "unknown", "rejected")
continue
try:
action_result = SAFE_ACTIONS[action](**params)
executed.append({"action": action, "status": "ok", "result": action_result})
record_event_router_safe_action(action, "ok")
except Exception as exc:
logger.exception("[EventRouter] safe action failed: %s", action)
executed.append({"action": action, "status": "error", "error": str(exc)[:300]})
record_event_router_safe_action(action or "unknown", "error")
return executed
async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
"""
Main event routing entry (ADR-012 §③ — 唯一入口).
Output format compatible with routes/bot_api_routes.
"""
tier = _classify(event)
session_id = f"evt:{event.get('event_type')}:{event.get('source', 'unknown')}"
started_at = time.perf_counter()
try:
if _is_event_silenced(event):
response = {
"tier": tier,
"sent": 0,
"errors": [],
"latency_ms": int((time.perf_counter() - started_at) * 1000),
"payload": {"status": "silenced", "event_key": _event_key(event)},
"delivered": True,
"silenced": True,
"queued": False,
"deduped": False,
}
record_event_router_dispatch(
tier=tier,
event_type=event.get("event_type", "unknown"),
delivered=True,
silenced=True,
latency_ms=response["latency_ms"],
)
return response
if _is_duplicate_event(event):
response = {
"tier": tier,
"sent": 0,
"errors": [],
"latency_ms": int((time.perf_counter() - started_at) * 1000),
"payload": {"status": "deduped", "event_key": _event_key(event)},
"delivered": True,
"silenced": False,
"queued": False,
"deduped": True,
}
record_event_router_dispatch(
tier=tier,
event_type=event.get("event_type", "unknown"),
delivered=True,
deduped=True,
latency_ms=response["latency_ms"],
)
return response
if _should_render_decision_direct(event):
result = _direct_decision_result(event)
else:
result = await _run_tier_handler(tier, event, session_id)
executed_actions = _execute_safe_actions(result, event)
if executed_actions:
result["executed_actions"] = executed_actions
message, reply_markup = _build_telegram_message(event, tier, result)
send_result = send_telegram_with_result(message, chat_ids=admin_chat_ids, reply_markup=reply_markup)
queued = False
replayed = {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0}
if not send_result["ok"]:
queued = _queue_failed_delivery(event, tier, message, send_result["errors"], "telegram_delivery_failed")
elif _REPLAY_ON_SUCCESS:
replayed = replay_failed_deliveries(limit=_REPLAY_LIMIT, admin_chat_ids=admin_chat_ids)
latency_ms = int((time.perf_counter() - started_at) * 1000)
response = {
"tier": tier,
"sent": send_result["sent"],
"errors": send_result["errors"],
"latency_ms": latency_ms,
"payload": result,
"delivered": send_result["ok"],
"silenced": False,
"queued": queued,
"deduped": False,
"replayed": replayed,
}
record_event_router_dispatch(
tier=tier,
event_type=event.get("event_type", "unknown"),
delivered=response["delivered"],
queued=response["queued"],
latency_ms=latency_ms,
)
return response
except Exception as e:
logger.exception(f"[EventRouter] dispatch failed: {e}")
queued = _queue_failed_delivery(event, tier, None, [str(e)], "dispatch_exception")
response = {
"tier": tier,
"sent": 0,
"errors": [str(e)],
"latency_ms": int((time.perf_counter() - started_at) * 1000),
"payload": None,
"delivered": False,
"silenced": False,
"queued": queued,
"deduped": False,
}
record_event_router_dispatch(
tier=tier,
event_type=event.get("event_type", "unknown"),
delivered=False,
queued=queued,
latency_ms=response["latency_ms"],
)
return response
def _run_coroutine_in_thread(coro) -> Dict[str, Any]:
result = {}
def runner():
try:
result["value"] = asyncio.run(coro)
except Exception as e:
result["value"] = {
"tier": "unknown",
"sent": 0,
"errors": [str(e)],
"latency_ms": 0,
"payload": None,
"delivered": False,
}
thread = threading.Thread(target=runner, daemon=True)
thread.start()
thread.join(timeout=15)
if thread.is_alive():
return {
"tier": "unknown",
"sent": 0,
"errors": ["dispatch_sync timed out"],
"latency_ms": 15000,
"payload": None,
"delivered": False,
}
return result["value"]
def dispatch_sync(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
"""同步環境使用的 EventRouter 入口。"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(dispatch(event, admin_chat_ids=admin_chat_ids))
return _run_coroutine_in_thread(dispatch(event, admin_chat_ids=admin_chat_ids))
def notify_failure(
task_name: str,
error: Exception,
*,
source: Optional[str] = None,
event_type: str = "scheduler_task_failure",
priority: str = "P2",
title: Optional[str] = None,
trace: Optional[str] = None,
payload: Optional[Dict[str, Any]] = None,
dedup_ttl_sec: Optional[int] = None,
) -> Dict[str, Any]:
"""排程/背景任務失敗的同步通知 helper。"""
severity = "alert" if priority in {"P1", "P2"} else "warning"
event = {
"source": source or f"Scheduler.{task_name}",
"event_type": event_type,
"severity": severity,
"title": title or f"{task_name} 任務異常",
"status": "任務失敗",
"impact": f"{priority} - 背景任務需要檢查",
"summary": str(error)[:200],
"trace": trace or "".join(traceback.format_exception(type(error), error, error.__traceback__)),
"payload": {"task_name": task_name, **(payload or {})},
}
if dedup_ttl_sec is not None:
event["dedup_ttl_sec"] = dedup_ttl_sec
return dispatch_sync(event)
def _classify(event: Dict[str, Any]) -> str:
sev = event.get("severity", "info")
has_trace = bool(event.get("trace"))
event_type = event.get("event_type", "")
if sev in ("info", "success"):
return "L0"
if sev == "warning":
return "L1" if has_trace else "L0"
if sev == "alert":
if event_type in {"price_threat", "db_connection_error", "crawler_timeout",
"nim_quota_exhausted", "embedding_failure",
"scheduler_task_failure"}:
return "L2"
return "L1"
return "L0"
def _build_telegram_message(event: Dict[str, Any], tier: str, result: Optional[Dict[str, Any]]) -> tuple[str, Optional[Dict[str, Any]]]:
decision_envelope = _decision_envelope_from_event(event)
if decision_envelope:
render_event = dict(event)
render_event["decision_envelope"] = decision_envelope
render_event["event_type"] = decision_envelope.get("decision_type") or event.get("event_type", "decision")
if not render_event.get("id") and decision_envelope.get("decision_id"):
render_event["id"] = decision_envelope.get("decision_id")
subject = decision_envelope.get("subject") if isinstance(decision_envelope.get("subject"), dict) else {}
sku = str(subject.get("sku") or "").strip()
name = str(subject.get("name") or "").strip()
if sku or name:
render_event["summary"] = " · ".join(part for part in (f"SKU {sku}" if sku else "", name) if part)
raw_source_agent = str(decision_envelope.get("source_agent") or event.get("source") or "Decision")
source_agent = {
"nemotron": "NemoTron",
"openclaw": "OpenClaw",
"elephant_alpha": "Elephant Alpha",
"hermes": "Hermes",
}.get(raw_source_agent.lower(), raw_source_agent.replace("_", " ").title())
severity = str(decision_envelope.get("severity") or tier)
ai_summary = ""
if isinstance(result, dict):
ai_summary = str(result.get("summary") or "")
if not ai_summary:
ai_summary = str(decision_envelope.get("analysis") or "依決策信封進行人工覆核。")
return triaged_alert(
render_event,
tier_label=f"{source_agent} · {severity}",
ai_summary=ai_summary,
)
if tier == "L0":
title = event.get("title") or event.get("event_type", "system_event")
summary = event.get("summary") or event.get("status") or "系統事件"
body = event.get("impact") or event.get("source") or ""
message = (
f" <b>{title}</b>\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"{summary}\n"
f"{body}\n"
)
return message, None
ai_summary = ""
ai_cause = None
ai_actions = None
ai_executed = None
if isinstance(result, dict):
ai_summary = (
result.get("summary")
or result.get("reasoning")
or result.get("message")
or str(result)[:400]
)
ai_cause = result.get("cause") or result.get("root_cause")
ai_actions = result.get("suggested_actions") or result.get("execution_plan")
ai_executed = result.get("executed_actions")
return triaged_alert(
event,
tier_label=tier,
ai_summary=ai_summary,
ai_cause=ai_cause,
ai_actions=ai_actions if isinstance(ai_actions, list) else ([str(ai_actions)] if ai_actions else None),
ai_executed=ai_executed if isinstance(ai_executed, list) else ([str(ai_executed)] if ai_executed else None),
)