Files
ewoooc/services/event_router.py
ogt bda4edd23b
All checks were successful
CD Pipeline / deploy (push) Successful in 1m11s
feat(ai-ops): ADR-012 Phase 2/3/4 完整實作
Phase 2 — Hermes L1 Observer 真實接入:
- services/event_router.py::_hermes_observe() 呼叫 hermes3:latest
  @192.168.0.111:11434/api/generate,做 stack trace 翻譯
- 輸出 JSON {summary, probable_cause, actions},容錯 markdown fence
- scheduler.py run_auto_import_task / run_momo_task 兩個 outer
  except 改走 event_router.dispatch(),帶完整 trace

Phase 3 — NemoTron L2 Investigator 規則式實作:
- event_router._L2_RULES: event_type → [(action, params)] 規則表
  • db_connection_error → query_km + retry_task(60s backoff)
  • crawler_timeout    → silence_alert(30min) + retry_task(300s)
  • nim_quota_exhausted → silence_alert(720min)
  • embedding_failure   → silence_alert(10min)
- agent_actions.retry_task 真實實作: threading.Timer + exponential
  backoff (60→120→240s) + _retry_state 追蹤 + ALLOWED_RETRY_TASKS
  白名單 + 非 scheduler 容器回 'deferred'

Phase 4 — L3 HITL Ops 擴充:
- agent_actions: pause_task / resume_task / force_retry_now / is_task_paused
- OPS_ACTIONS 白名單與 SAFE_ACTIONS 嚴格分離(L2 不可呼叫 L3)
- telegram_templates.ops_action_request(): 4 按鈕 inline keyboard
  (暫停1h / 暫停6h / 立即重試 / 解除暫停)
- telegram_bot_service._handle_ops_callback(): 接 momo:ops:<action>:<task>
- scheduler.py run_momo_task + run_auto_import_task 開頭加
  is_task_paused() 檢查(Phase 4 暫停機制生效)

安全邊界(ADR-012 §①):
- L1 Hermes 只讀 → 失敗降 L0 + 🟡 標記
- L2 NemoTron 只碰 ai_insights + 發 Telegram + SAFE_ACTIONS
- L3 OpenClaw 任意動作必經 HITL inline keyboard 批准
- 不做容器重啟按鈕(需 docker socket,風險過高)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-19 13:26:51 +08:00

402 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
EventRouter — 事件分流入口ADR-012 Phase 1 骨幹)
所有系統事件exception / 排程完成 / 告警 / 資訊通報)**應**統一透過
`dispatch(event)` 進入,由 EventRouter 依 severity × event_type 分流到:
L0 Direct / L1 Hermes Observer / L2 NemoTron Investigator / L3 OpenClaw Operator
設計原則ADR-012 §⑥):無論 AI 狀況,**通知鏈絕不中斷**。
每一級失敗立即降級到下一級,最終保底 L0 直出模板。
Phase 1 實作範圍:
- 骨幹 + 分類邏輯
- L0 模板直出(已可用)
- L1 Hermes / L2 NemoTron / L3 OpenClaw 為 stub附 TODO 標記)
- 完整 fallback 鏈AI 掛必降級)
- 靜音檢查 + Audit Trail
"""
from __future__ import annotations
import os
import time
from datetime import datetime
from enum import Enum
from typing import Any
import requests
from services.logger_manager import SystemLogger
from services import telegram_templates as tpl
from services import agent_actions
sys_log = SystemLogger("EventRouter").get_logger()
class Tier(str, Enum):
L0_DIRECT = "L0"
L1_OBSERVER = "L1"
L2_INVESTIGATOR = "L2"
L3_OPERATOR = "L3"
class Severity(str, Enum):
INFO = "info"
SUCCESS = "success"
WARNING = "warning"
ALERT = "alert" # P0/P1
# =====================================================================
# 分類規則ADR-012 §③)
# =====================================================================
def _classify(event: dict) -> Tier:
sev = event.get("severity", "info")
has_trace = bool(event.get("trace"))
event_type = event.get("event_type", "")
# L3 OpenClaw 由週期任務主動觸發週報、Meta-Analysis不走 router
# 這裡只處理 L0/L1/L2
if sev in (Severity.INFO, Severity.SUCCESS):
return Tier.L0_DIRECT
if sev == Severity.WARNING:
# 有技術 trace → L1 Hermes 翻譯
return Tier.L1_OBSERVER if has_trace else Tier.L0_DIRECT
if sev == Severity.ALERT:
# 符合 L2 白名單 event_type → NemoTron 介入
L2_EVENT_TYPES = {
"price_threat", "db_connection_error", "crawler_timeout",
"nim_quota_exhausted", "embedding_failure",
}
if event_type in L2_EVENT_TYPES:
return Tier.L2_INVESTIGATOR
return Tier.L1_OBSERVER
return Tier.L0_DIRECT
# =====================================================================
# 主入口
# =====================================================================
def dispatch(event: dict, admin_chat_ids: list[int] | None = None) -> dict:
"""
主要入口。回傳 dict: {tier, sent, insight_id, errors, latency_ms}
event 格式見 ADR-012 §③。必要欄位source, event_type, severity, title, summary
可選欄位trace, payload, time
admin_chat_ids 若未給,從 DB telegram_users where is_admin=true 取
"""
t0 = time.time()
event_key = f"{event.get('source', '?')}:{event.get('event_type', '?')}"
# 靜音檢查
if agent_actions.is_silenced(event_key):
sys_log.info(f"[EventRouter] 事件被靜音略過: {event_key}")
return {"tier": "silenced", "sent": 0, "event_key": event_key}
tier = _classify(event)
sys_log.info(f"[EventRouter] dispatch {event_key}{tier.value}")
# 執行對應 Tier
try:
if tier == Tier.L0_DIRECT:
text = _render_l0(event)
elif tier == Tier.L1_OBSERVER:
text = _render_l1_with_fallback(event)
elif tier == Tier.L2_INVESTIGATOR:
text = _render_l2_with_fallback(event)
else:
text = _render_l0(event) # 未知 Tier 保底
except Exception as e:
sys_log.error(f"[EventRouter] 渲染失敗,降級 L0: {e}")
text = _render_l0(event)
# 發送 Telegram
result = _send(text, admin_chat_ids)
result["tier"] = tier.value
result["event_key"] = event_key
result["latency_ms"] = round((time.time() - t0) * 1000, 1)
# 審計(每次 dispatch 都入 KM
_audit_dispatch(event, tier, result)
return result
# =====================================================================
# Tier 渲染器
# =====================================================================
def _render_l0(event: dict) -> str:
"""L0 直出:根據 severity 選用對應模板"""
sev = event.get("severity", "info")
title = event.get("title", "未命名事件")
module = event.get("source", "unknown")
summary = event.get("summary", "")
details = event.get("payload") if isinstance(event.get("payload"), dict) else None
if sev == Severity.SUCCESS:
return tpl.success(title=title, module=module, stats=summary)
if sev == Severity.INFO:
return tpl.info(title=title, module=module, content=summary)
if sev == Severity.WARNING:
return tpl.warning(title=title, module=module, summary=summary, details=details)
# alert 但降級到 L0
return tpl.alert(
title=title, module=module,
status=event.get("status", "未知"),
impact=event.get("impact", "未評估"),
summary=summary,
actions=event.get("suggested_actions"),
trace=event.get("trace"),
)
def _render_l1_with_fallback(event: dict) -> str:
"""L1 Hermes 翻譯 stack trace。Phase 1 stub — 直接降 L0 + 標記"""
# TODO Phase 2: 呼叫 Hermes 做 stack trace 翻譯與摘要
try:
ai_summary = _hermes_observe(event) # stub
if ai_summary:
return _compose_triaged(event, tier_label="L1 · Hermes", ai_summary=ai_summary)
except Exception as e:
sys_log.warning(f"[EventRouter] L1 Hermes 失敗,降 L0: {e}")
# FallbackL0 模板 + 降級標記
text = _render_l0(event)
return text + "\n\n🟡 _AI 分析暫不可用以原始資料呈現_"
def _render_l2_with_fallback(event: dict) -> str:
"""L2 NemoTron 介入(含 tool call。Phase 1 stub — 降 L1"""
# TODO Phase 3: 呼叫 NemoTron dispatcher允許執行 SAFE_ACTIONS 中的 tool
try:
ai_result = _nemoton_investigate(event) # stub
if ai_result:
return _compose_triaged(
event, tier_label="L2 · NemoTron",
ai_summary=ai_result.get("summary", ""),
ai_actions=ai_result.get("actions_taken", []),
)
except Exception as e:
sys_log.warning(f"[EventRouter] L2 NemoTron 失敗,降 L1: {e}")
return _render_l1_with_fallback(event)
def _compose_triaged(event: dict, tier_label: str, ai_summary: str,
ai_actions: list | None = None) -> str:
"""三層式訊息AI 摘要 + 原始事實 + 建議行動ADR-012 §④)"""
base = _render_l0(event)
parts = [f"🤖 *AI 摘要({tier_label}*", ai_summary, ""]
if ai_actions:
parts.append("🛠️ *AI 已執行動作:*")
for a in ai_actions:
parts.append(f"{a}")
parts.append("")
return base + "\n\n" + "\n".join(parts)
# =====================================================================
# L1 Hermes ObserverPhase 2 實作)
# =====================================================================
_HERMES_URL = os.getenv("HERMES_URL", "http://192.168.0.111:11434")
_HERMES_MODEL = os.getenv("HERMES_MODEL", "hermes3:latest")
_HERMES_TIMEOUT = int(os.getenv("HERMES_TIMEOUT", "30"))
_HERMES_OBSERVE_PROMPT = """你是一個 SRE 助手,任務是把技術錯誤翻譯成人類可理解的摘要。
請根據以下事件產出**繁體中文**分析,嚴格以下列 JSON 格式輸出(不要加 markdown 代碼塊、不要加說明):
{"summary": "一句話技術根因(中文,<60 字)", "probable_cause": "最可能的原因(中文,<40 字)", "actions": ["建議動作1", "建議動作2"]}
限制:
- summary 翻譯英文錯誤為中文,去除技術 jargon
- probable_cause 推測根因(基於 stack trace 和事件類型)
- actions 最多 3 個,具體可執行
- 若資訊不足summary 填 "資訊不足"、actions 填 ["請檢查原始 trace"]
"""
def _hermes_observe(event: dict) -> str | None:
"""呼叫 HermesOllama翻譯 stack trace 為人類摘要。失敗回 None 讓上層降級。"""
try:
user_prompt = (
f"事件類型:{event.get('event_type', 'unknown')}\n"
f"來源模組:{event.get('source', 'unknown')}\n"
f"標題:{event.get('title', '')}\n"
f"簡述:{event.get('summary', '')}\n"
f"技術 trace\n{(event.get('trace') or '')[-800:]}"
)
resp = requests.post(
f"{_HERMES_URL}/api/generate",
json={
"model": _HERMES_MODEL,
"system": _HERMES_OBSERVE_PROMPT,
"prompt": user_prompt,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 300},
},
timeout=_HERMES_TIMEOUT,
)
if not resp.ok:
sys_log.warning(f"[EventRouter.L1] Hermes HTTP {resp.status_code}")
return None
raw = (resp.json().get("response") or "").strip()
# 容錯Hermes 可能多出 markdown fence
if "```" in raw:
parts = raw.split("```")
for p in parts:
if p.strip().startswith("{"):
raw = p.strip()
break
import json as _json
parsed = _json.loads(raw)
summary = parsed.get("summary", "").strip()
cause = parsed.get("probable_cause", "").strip()
actions = parsed.get("actions", []) or []
out = [summary]
if cause:
out.append(f"\n*可能根因:* {cause}")
if actions:
out.append("\n*建議動作:*")
for a in actions[:3]:
out.append(f"{a}")
return "\n".join(out)
except Exception as e:
sys_log.warning(f"[EventRouter.L1] Hermes 呼叫失敗,降級:{type(e).__name__}: {str(e)[:120]}")
return None
# =====================================================================
# L2 NemoTron InvestigatorPhase 3 規則式實作,不呼 NIM
# =====================================================================
# event_type → [(action_name, params)] 規則表
_L2_RULES: dict[str, list[tuple[str, dict]]] = {
"db_connection_error": [
("query_km", {"query": "DNS resolve 失敗 momo-postgres"}),
("retry_task", {"task_name": "<auto>", "backoff_sec": 60}),
],
"crawler_timeout": [
("silence_alert", {"duration_min": 30}),
("retry_task", {"task_name": "<auto>", "backoff_sec": 300}),
],
"nim_quota_exhausted": [
("silence_alert", {"duration_min": 720}), # 12 小時,等 quota 重置
],
"embedding_failure": [
("silence_alert", {"duration_min": 10}), # 已有 retry queue 處理
],
}
def _nemoton_investigate(event: dict) -> dict | None:
"""
Phase 3 規則式 L2根據 event_type 查 _L2_RULES執行對應 safe actions。
Phase 5+ 可改接 NIM 讓 LLM 決定 action。
"""
event_type = event.get("event_type", "")
rules = _L2_RULES.get(event_type)
if not rules:
return None
actions_taken = []
for action_name, params in rules:
action_fn = agent_actions.SAFE_ACTIONS.get(action_name)
if not action_fn:
continue
# 動態參數:<auto> 代入事件本身的欄位
p = dict(params)
if p.get("task_name") == "<auto>":
p["task_name"] = event.get("payload", {}).get("task_name", "") or event.get("source", "").split(".")[-1]
if action_name == "silence_alert" and "event_key" not in p:
p["event_key"] = f"{event.get('source', '?')}:{event_type}"
try:
result = action_fn(**p)
status = result.get("status", "unknown")
actions_taken.append(f"{action_name}{status}")
except Exception as e:
actions_taken.append(f"{action_name} → error: {str(e)[:80]}")
sys_log.error(f"[EventRouter.L2] action {action_name} 例外: {e}")
# 產一句 summary 說明決策邏輯
summary = f"依規則 _L2_RULES[{event_type}] 執行 {len(actions_taken)} 個安全動作"
return {"summary": summary, "actions_taken": actions_taken}
# =====================================================================
# Telegram 發送 + Audit
# =====================================================================
def _send(text: str, admin_chat_ids: list[int] | None) -> dict:
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token:
return {"sent": 0, "errors": ["TELEGRAM_BOT_TOKEN 未設定"]}
if admin_chat_ids is None:
admin_chat_ids = _load_admin_chat_ids()
if not admin_chat_ids:
return {"sent": 0, "errors": ["無管理員 chat_id"]}
url = f"https://api.telegram.org/bot{token}/sendMessage"
sent = 0
errors = []
for cid in admin_chat_ids:
try:
r = requests.post(url, json={
"chat_id": int(cid), "text": text, "parse_mode": "Markdown",
}, timeout=10)
if r.ok:
sent += 1
else:
errors.append(f"chat={cid} status={r.status_code}")
except Exception as e:
errors.append(f"chat={cid} exc={e}")
return {"sent": sent, "errors": errors}
def _load_admin_chat_ids() -> list[int]:
"""從 DB 撈 is_admin=true 的 chat_idfallback 到 .env TELEGRAM_CHAT_IDS"""
try:
from database.manager import get_session
from sqlalchemy import text as sa_text
session = get_session()
try:
rows = session.execute(sa_text(
"SELECT telegram_id FROM telegram_users WHERE is_active=true AND is_admin=true"
)).fetchall()
if rows:
return [int(r[0]) for r in rows]
finally:
session.close()
except Exception as e:
sys_log.warning(f"[EventRouter] 查 telegram_users 失敗fallback .env: {e}")
# Fallback 到 env
import re
raw = os.getenv("TELEGRAM_CHAT_IDS", "")
return [int(x.strip()) for x in re.sub(r'[\[\]"\' ]', "", raw).split(",") if x.strip()]
def _audit_dispatch(event: dict, tier: Tier, result: dict) -> None:
"""每次 dispatch 都寫入 ai_insights 作為審計軌跡"""
try:
from services.openclaw_learning_service import store_insight
store_insight(
insight_type="agent_action",
content=f"dispatch tier={tier.value} event={event.get('event_type')}",
period=datetime.now().strftime("%Y-%m-%d"),
metadata={
"tier": tier.value,
"event": {k: event.get(k) for k in ("source", "event_type", "severity", "title")},
"result": {k: result.get(k) for k in ("sent", "latency_ms", "errors")},
"ts": datetime.now().isoformat(),
},
)
except Exception as e:
sys_log.error(f"[EventRouter] audit 失敗: {e}")