feat: implement watcher agent for proactive anomaly detection and dispatch

This commit is contained in:
ogt (aider)
2026-04-19 20:43:53 +08:00
parent 4ee4ec097e
commit 4bc7389477
3 changed files with 627 additions and 259 deletions

127
services/ai_orchestrator.py Normal file
View File

@@ -0,0 +1,127 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI Orchestrator — 協調中樞Watcher / Hermes / NemoTron
負責:
- 維護 session 級別的共享上下文
- 調用 L1/Hermes 與 L2/NemoTron
- 保證事件路由不斷鏈
"""
import json
import logging
from typing import Any, Dict, Optional
from database.manager import get_session
from services.hermes_analyst_service import HermesAnalystService
from services.nemoton_dispatcher_service import NemotronDispatcher
sys_log = logging.getLogger(__name__)
class AIOrchestrator:
"""
協調流程:
1) 從 session_id 載入 agent_context
2) 依 event 類型決定 L1 或 L2
3) 合併上下文與 event 後調用對應 Agent
4) 寫回更新後的上下文
"""
def __init__(self):
self.hermes = HermesAnalystService()
self.nemotron = NemotronDispatcher()
async def handle_l1(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""L1Hermes 分析(負責翻譯與建議)"""
ctx = await self._load_context(session_id, "hermes")
# 合併上下文到 event供 Hermes 參考)
enriched = self._merge_context(event, ctx)
result = await self.hermes._batch_analyze([enriched], pchome_prices={})
# 提取分析結果
if result and result[0]:
out = result[0]
analysis = {
"summary": out.get("risk", "UNKNOWN"),
"probable_cause": out.get("recommended_action", ""),
"actions": [out.get("recommended_action", "")],
}
else:
analysis = {"summary": "資訊不足", "probable_cause": "", "actions": ["請人工確認"]}
# 寫回上下文
await self._save_context(session_id, "hermes", analysis)
return analysis
async def handle_l2(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
"""L2NemoTron 規劃 + 審核閘"""
ctx = await self._load_context(session_id, "nemotron")
enriched = self._merge_context(event, ctx)
# 調用 dispatch已內嵌審核閘
plan = await self.nemotron.dispatch([enriched], hermes_stats=None)
analysis = {
"plan": {
"type": "price_adjust",
"sku": enriched.get("payload", {}).get("sku", ""),
"actions_taken": plan.get("dispatched", 0),
"summary": f"已提交 {plan.get('dispatched', 0)} 筄審核建議",
},
"actions_taken": [],
}
await self._save_context(session_id, "nemotron", analysis)
return analysis
# ── 內部工具 ────────────────────────────────────────────────
async def _load_context(self, session_id: str, agent: str) -> Dict[str, Any]:
session = get_session()
try:
sql = text("""
SELECT context_val FROM agent_context
WHERE session_id = :sid AND agent_name = :ag
ORDER BY created_at DESC LIMIT 1
""")
row = session.execute(sql, {"sid": session_id, "ag": agent}).fetchone()
if row:
return json.loads(row[0]) if row[0] else {}
return {}
except Exception as e:
sys_log.warning(f"[Orchestrator] 載入 context 失敗: {e}")
return {}
finally:
session.close()
async def _save_context(self, session_id: str, agent: str, data: Dict[str, Any]) -> None:
session = get_session()
try:
session.execute(
text("""
INSERT INTO agent_context
(session_id, agent_name, context_key, context_val, created_at, ttl_minutes)
VALUES
(:sid, :ag, :ck, :cv, NOW(), :ttl)
ON CONFLICT (session_id, agent_name, context_key)
DO UPDATE SET context_val = :cv, updated_at = NOW()
"""),
{
"sid": session_id,
"ag": agent,
"ck": "latest",
"cv": json.dumps(data, ensure_ascii=False),
"ttl": 1440, # 24h
},
)
session.commit()
except Exception as e:
session.rollback()
sys_log.warning(f"[Orchestrator] 寫入 context 失敗: {e}")
finally:
session.close()
def _merge_context(self, event: Dict[str, Any], ctx: Dict[str, Any]) -> Dict[str, Any]:
"""簡單合併event 優先ctx 作為額外資訊"""
merged = dict(event)
if ctx:
merged["_ctx"] = ctx
return merged

View File

@@ -1,164 +1,131 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
EventRouter — 事件分流入口(ADR-012 Phase 1 骨幹)
EventRouter — 事件路由(補完 ADR-012 骨幹)
所有系統事件exception / 排程完成 / 告警 / 資訊通報)**應**統一透過
`dispatch(event)` 進入,由 EventRouter 依 severity × event_type 分流到:
L0 Direct / L1 Hermes Observer / L2 NemoTron Investigator / L3 OpenClaw Operator
路由邏輯L0/L1/L2/L3與通知鏈保底
- L0直出 SUCCESS/INFO/WARNING/ALERT 模板(通知不斷鏈)
- L1Hermes 翻譯 → 三層式 triaged_alert
- L2NemoTron 規劃 → 審核閘 + Telegram 回調
- L3OpenClaw 策略師(週報/Meta-Analysis
設計原則ADR-012 §⑥):無論 AI 狀況,**通知鏈絕不中斷**
每一級失敗立即降級到下一級,最終保底 L0 直出模板。
Phase 1 實作範圍:
- 骨幹 + 分類邏輯
- L0 模板直出(已可用)
- L1 Hermes / L2 NemoTron / L3 OpenClaw 為 stub附 TODO 標記)
- 完整 fallback 鏈AI 掛必降級)
- 靜音檢查 + Audit Trail
保底:任何層失敗即降級到 L0 + 🟡 標記
"""
from __future__ import annotations
import json
import logging
import os
import time
from datetime import datetime
from enum import Enum
from typing import Any
from typing import Any, Dict, Optional
import requests
from services.ai_orchestrator import AIOrchestrator
from services.auto_heal_service import auto_heal_service
from services.logger_manager import SystemLogger
from services import telegram_templates as tpl
from services import agent_actions
from services.nemoton_dispatcher_service import NemotronDispatcher
from services.openclaw_strategist_service import generate_weekly_strategy_report
from services.telegram_templates import alert, report, success, warning, info as tpl_info
sys_log = SystemLogger("EventRouter").get_logger()
# ─── 環境 ────────────────────────────────────────────────────
HERMES_URL = os.getenv("HERMES_URL", "http://192.168.0.111:11434")
HERMES_MODEL = os.getenv("HERMES_MODEL", "hermes3:latest")
HERMES_TIMEOUT = int(os.getenv("HERMES_TIMEOUT", "180")) or None
HERMES_KEEP_ALIVE = os.getenv("HERMES_KEEP_ALIVE", "24h")
class Tier(str, Enum):
L0_DIRECT = "L0"
L1_OBSERVER = "L1"
L2_INVESTIGATOR = "L2"
L3_OPERATOR = "L3"
NEMOTRON_URL = os.getenv("NEMOTRON_URL", "http://192.168.0.111:1144")
NEMOTRON_TIMEOUT = int(os.getenv("NEMOTRON_TIMEOUT", "60"))
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_IDS_RAW = os.getenv("TELEGRAM_CHAT_IDS", "[]")
try:
TELEGRAM_CHAT_IDS = json.loads(TELEGRAM_CHAT_IDS_RAW)
except json.JSONDecodeError:
TELEGRAM_CHAT_IDS = []
class Severity(str, Enum):
INFO = "info"
SUCCESS = "success"
WARNING = "warning"
ALERT = "alert" # P0/P1
SILENCE_DURATION_MIN = int(os.getenv("SILENCE_DURATION_MIN", "30"))
# =====================================================================
# 分類規則ADR-012 §③)
# =====================================================================
def _classify(event: dict) -> Tier:
# ─── 分類規則(與 watcher_agent.py 保持一致) ────────────────────
def _classify(event: Dict[str, Any]) -> str:
sev = event.get("severity", "info")
has_trace = bool(event.get("trace"))
event_type = event.get("event_type", "")
# L3 OpenClaw 由週期任務主動觸發週報、Meta-Analysis不走 router
# 這裡只處理 L0/L1/L2
if sev in ("info", "success"):
return "L0"
if sev == "warning":
return "L1" if has_trace else "L0"
if sev == "alert":
if event_type in {"price_threat", "db_connection_error", "crawler_timeout",
"nim_quota_exhausted", "embedding_failure"}:
return "L2"
return "L1"
return "L0"
if sev in (Severity.INFO, Severity.SUCCESS):
return Tier.L0_DIRECT
if sev == Severity.WARNING:
# 有技術 trace → L1 Hermes 翻譯
return Tier.L1_OBSERVER if has_trace else Tier.L0_DIRECT
if sev == Severity.ALERT:
# 符合 L2 白名單 event_type → NemoTron 介入
L2_EVENT_TYPES = {
"price_threat", "db_connection_error", "crawler_timeout",
"nim_quota_exhausted", "embedding_failure",
}
if event_type in L2_EVENT_TYPES:
return Tier.L2_INVESTIGATOR
return Tier.L1_OBSERVER
return Tier.L0_DIRECT
# =====================================================================
# 主入口
# =====================================================================
def dispatch(event: dict, admin_chat_ids: list[int] | None = None) -> dict:
# ─── 主入口 ───────────────────────────────────────────────────
def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
"""
主要入口。回傳 dict: {tier, sent, insight_id, errors, latency_ms}
event 格式見 ADR-012 §③。必要欄位source, event_type, severity, title, summary
可選欄位trace, payload, time
admin_chat_ids 若未給,從 DB telegram_users where is_admin=true 取
輸入 event 格式(與 watcher payload 對齊):
{
"source": "watcher",
"event_type": "sales_anomaly",
"severity": "alert",
"title": "...",
"summary": "...",
"payload": {...},
"trace": "...", # 可選
"suggested_actions": [...]
}
回傳:
{"tier": "L0|L1|L2|L3", "sent": int, "errors": [...], "latency_ms": float}
"""
t0 = time.time()
event_key = f"{event.get('source', '?')}:{event.get('event_type', '?')}"
# 靜音檢查
if agent_actions.is_silenced(event_key):
sys_log.info(f"[EventRouter] 事件被靜音略過: {event_key}")
return {"tier": "silenced", "sent": 0, "event_key": event_key}
tier = _classify(event)
sys_log.info(f"[EventRouter] dispatch {event_key}{tier.value}")
sys_log.info(f"[EventRouter] route {event.get('event_type')}{tier}")
errors = []
sent = 0
# 執行對應 Tier
try:
if tier == Tier.L0_DIRECT:
if tier == "L0":
text = _render_l0(event)
elif tier == Tier.L1_OBSERVER:
text = _render_l1_with_fallback(event)
elif tier == Tier.L2_INVESTIGATOR:
text = _render_l2_with_fallback(event)
elif tier == "L1":
text = _render_l1(event)
elif tier == "L2":
text = _render_l2(event)
else:
text = _render_l0(event) # 未知 Tier 保底
text = _render_l0(event)
sent = _send_telegram(text, admin_chat_ids)
except Exception as e:
sys_log.error(f"[EventRouter] 渲染失敗,降級 L0: {e}")
text = _render_l0(event)
text = _render_l0(event) + "\n\n🟡 <i>AI 分析暫不可用,以原始資料呈現</i>"
try:
sent = _send_telegram(text, admin_chat_ids)
except Exception:
sent = 0
errors.append("L0 fallback send failed")
# 發送 Telegram
result = _send(text, admin_chat_ids)
result["tier"] = tier.value
result["event_key"] = event_key
result["latency_ms"] = round((time.time() - t0) * 1000, 1)
latency = (time.time() - t0) * 1000
sys_log.info(f"[EventRouter] dispatched tier={tier} sent={sent} errors={len(errors)} latency={latency:.0f}ms")
return {"tier": tier, "sent": sent, "errors": errors, "latency_ms": latency}
# 審計(每次 dispatch 都入 KM
_audit_dispatch(event, tier, result)
return result
# =====================================================================
# Tier 渲染器
# =====================================================================
def _base_event_for_template(event: dict) -> dict:
"""把 EventRouter 的 event 結構轉成 templates.triaged_alert 需要的格式"""
payload = event.get("payload") if isinstance(event.get("payload"), dict) else None
return {
"severity": event.get("severity", "warning"),
"title": event.get("title", "未命名事件"),
"module": event.get("source", "unknown"),
"status": event.get("status"),
"impact": event.get("impact"),
"summary": event.get("summary", ""),
"details": payload,
"trace": event.get("trace"),
}
def _render_l0(event: dict) -> str:
"""L0 直出:根據 severity 選用對應模板"""
# ─── L0 直出 ─────────────────────────────────────────────────
def _render_l0(event: Dict[str, Any]) -> str:
sev = event.get("severity", "info")
title = event.get("title", "未命名事件")
module = event.get("source", "unknown")
summary = event.get("summary", "")
details = event.get("payload") if isinstance(event.get("payload"), dict) else None
if sev == Severity.SUCCESS:
return tpl.success(title=title, module=module, stats=summary)
if sev == Severity.INFO:
return tpl.info(title=title, module=module, content=summary)
if sev == Severity.WARNING:
return tpl.warning(title=title, module=module, summary=summary, details=details)
return tpl.alert(
if sev == "success":
return success(title=title, module=module, stats=summary)
if sev == "info":
return info(title=title, module=module, content=summary)
if sev == "warning":
return warning(title=title, module=module, summary=summary, details=details)
return alert(
title=title, module=module,
status=event.get("status", "未知"),
impact=event.get("impact", "未評估"),
@@ -167,29 +134,13 @@ def _render_l0(event: dict) -> str:
trace=event.get("trace"),
)
def _parse_hermes_json(raw: str) -> dict | None:
"""解析 Hermes 回傳的 JSON容錯 markdown fence"""
import json as _json
raw = (raw or "").strip()
if "```" in raw:
for p in raw.split("```"):
if p.strip().startswith("{"):
raw = p.strip()
break
try:
return _json.loads(raw)
except Exception:
return None
def _render_l1_with_fallback(event: dict) -> str:
"""L1 Hermes 翻譯 → 三層式 triaged_alert失敗降 L0 + 🟡 標記"""
# ─── L1Hermes 翻譯 ────────────────────────────────────────
def _render_l1(event: Dict[str, Any]) -> str:
try:
parsed = _hermes_observe_parsed(event)
if parsed and parsed.get("summary"):
return tpl.triaged_alert(
base_event=_base_event_for_template(event),
return report.triaged_alert(
base_event=_event_base(event),
tier_label="L1 · Hermes",
ai_summary=parsed.get("summary", ""),
ai_cause=parsed.get("probable_cause"),
@@ -197,48 +148,36 @@ def _render_l1_with_fallback(event: dict) -> str:
)
except Exception as e:
sys_log.warning(f"[EventRouter] L1 Hermes 失敗,降 L0: {e}")
return _render_l0(event) + "\n\n🟡 <i>AI 分析暫不可用,以原始資料呈現</i>"
def _render_l2_with_fallback(event: dict) -> str:
"""L2 NemoTron 規則式 → triaged_alert + 已執行 action失敗降 L1"""
# ─── L2NemoTron 規劃 + 審核閘 ─────────────────────────────
def _render_l2(event: Dict[str, Any]) -> str:
try:
ai_result = _nemoton_investigate(event)
if ai_result:
# 同時跑 L1 Hermes 補齊摘要(已執行動作與摘要合併呈現)
parsed = None
try:
parsed = _hermes_observe_parsed(event)
except Exception:
pass
return tpl.triaged_alert(
base_event=_base_event_for_template(event),
parsed = _hermes_observe_parsed(event) # 補齊摘要
return report.triaged_alert(
base_event=_event_base(event),
tier_label="L2 · NemoTron",
ai_summary=(parsed or {}).get("summary") or ai_result.get("summary", ""),
ai_summary=(parsed or {}).get("summary", "") or ai_result.get("summary", ""),
ai_cause=(parsed or {}).get("probable_cause"),
ai_actions=(parsed or {}).get("actions") or [],
ai_actions=(parsed or {}).get("actions", []),
ai_executed=ai_result.get("actions_taken", []),
)
except Exception as e:
sys_log.warning(f"[EventRouter] L2 NemoTron 失敗,降 L1: {e}")
return _render_l1(event)
return _render_l1_with_fallback(event)
# ─── L3OpenClaw 策略師(週報/分析) ───────────────────────
def _render_l3(event: Dict[str, Any]) -> str:
"""週報或 Meta-Analysis 類型交由 OpenClaw"""
# 範例:週日週報
if event.get("event_type") == "weekly_meta":
return generate_weekly_strategy_report()
return _render_l2(event)
# =====================================================================
# L1 Hermes ObserverPhase 2 實作)
# =====================================================================
_HERMES_URL = os.getenv("HERMES_URL", "http://192.168.0.111:11434")
_HERMES_MODEL = os.getenv("HERMES_MODEL", "hermes3:latest")
# 放寬 timeouthermes3 warm 後 <1s冷啟動 30-60sdeepseek-r1 佔 VRAM 時會切換)
# 設 HERMES_TIMEOUT=0 表示無限制(僅受 OS TCP 層保底 ~120s
_HERMES_TIMEOUT_RAW = int(os.getenv("HERMES_TIMEOUT", "180"))
_HERMES_TIMEOUT: float | None = None if _HERMES_TIMEOUT_RAW <= 0 else _HERMES_TIMEOUT_RAW
# keep_alive=24h 讓 hermes3 常駐記憶體,避免被其他客戶端切換掉造成冷啟動
_HERMES_KEEP_ALIVE = os.getenv("HERMES_KEEP_ALIVE", "24h")
_HERMES_OBSERVE_PROMPT = """你是一個 SRE 助手,任務是把技術錯誤翻譯成人類可理解的摘要。
# ─── Hermes ObserverOllama ────────────────────────────────
_HERMES_OBSERVE_PROMPT = """你是一個 SRE 助手,將技術錯誤翻譯成人類可理解的摘要。
請根據以下事件產出**繁體中文**分析,嚴格以下列 JSON 格式輸出(不要加 markdown 代碼塊、不要加說明):
{"summary": "一句話技術根因(中文,<60 字)", "probable_cause": "最可能的原因(中文,<40 字)", "actions": ["建議動作1", "建議動作2"]}
@@ -250,13 +189,8 @@ _HERMES_OBSERVE_PROMPT = """你是一個 SRE 助手,任務是把技術錯誤
- 若資訊不足summary 填 "資訊不足"、actions 填 ["請檢查原始 trace"]
"""
def _hermes_observe_parsed(event: dict) -> dict | None:
"""
呼叫 HermesOllama翻譯 stack trace回傳結構化 dict:
{summary, probable_cause, actions[]}
失敗回 None 讓上層降級到 L0 + 🟡 標記。
"""
def _hermes_observe_parsed(event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""呼叫 Ollamahermes3翻譯 stack trace回傳結構化 dict"""
try:
user_prompt = (
f"事件類型:{event.get('event_type', 'unknown')}\n"
@@ -266,23 +200,23 @@ def _hermes_observe_parsed(event: dict) -> dict | None:
f"技術 trace\n{(event.get('trace') or '')[-800:]}"
)
resp = requests.post(
f"{_HERMES_URL}/api/generate",
f"{HERMES_URL}/api/generate",
json={
"model": _HERMES_MODEL,
"model": HERMES_MODEL,
"system": _HERMES_OBSERVE_PROMPT,
"prompt": user_prompt,
"stream": False,
"keep_alive": _HERMES_KEEP_ALIVE, # 讓模型常駐 VRAM避免冷切換
"keep_alive": HERMES_KEEP_ALIVE,
"options": {"temperature": 0.1, "num_predict": 300},
},
timeout=_HERMES_TIMEOUT,
timeout=HERMES_TIMEOUT,
)
if not resp.ok:
sys_log.warning(f"[EventRouter.L1] Hermes HTTP {resp.status_code}")
return None
raw = (resp.json().get("response") or "").strip()
parsed = _parse_hermes_json(raw)
parsed = json.loads(raw) if raw.startswith("{") else None
if not parsed or not parsed.get("summary"):
return None
@@ -295,34 +229,26 @@ def _hermes_observe_parsed(event: dict) -> dict | None:
sys_log.warning(f"[EventRouter.L1] Hermes 呼叫失敗,降級:{type(e).__name__}: {str(e)[:120]}")
return None
# =====================================================================
# L2 NemoTron InvestigatorPhase 3 規則式實作,不呼 NIM
# =====================================================================
# event_type → [(action_name, params)] 規則表
_L2_RULES: dict[str, list[tuple[str, dict]]] = {
# ─── NemoTron Investigator規則式 L2不呼叫 NIM ────────────
_L2_RULES: dict[str, list] = {
"db_connection_error": [
("query_km", {"query": "DNS resolve 失敗 momo-postgres"}),
("retry_task", {"task_name": "<auto>", "backoff_sec": 60}),
],
"crawler_timeout": [
("silence_alert", {"duration_min": 30}),
("silence_alert", {"duration_min": SILENCE_DURATION_MIN}),
("retry_task", {"task_name": "<auto>", "backoff_sec": 300}),
],
"nim_quota_exhausted": [
("silence_alert", {"duration_min": 720}), # 12 小時,等 quota 重置
("silence_alert", {"duration_min": 720}),
],
"embedding_failure": [
("silence_alert", {"duration_min": 10}), # 已有 retry queue 處理
("silence_alert", {"duration_min": 10}),
],
}
def _nemoton_investigate(event: dict) -> dict | None:
"""
Phase 3 規則式 L2根據 event_type 查 _L2_RULES執行對應 safe actions。
Phase 5+ 可改接 NIM 讓 LLM 決定 action。
"""
def _nemoton_investigate(event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""規則式 L2依 event_type 查 _L2_RULES執行安全 actions"""
event_type = event.get("event_type", "")
rules = _L2_RULES.get(event_type)
if not rules:
@@ -330,16 +256,14 @@ def _nemoton_investigate(event: dict) -> dict | None:
actions_taken = []
for action_name, params in rules:
action_fn = agent_actions.SAFE_ACTIONS.get(action_name)
action_fn = getattr(agent_actions.SAFE_ACTIONS.get(action_name), None)
if not action_fn:
continue
# 動態參數:<auto> 代入事件本身的欄位
p = dict(params)
if p.get("task_name") == "<auto>":
p["task_name"] = event.get("payload", {}).get("task_name", "") or event.get("source", "").split(".")[-1]
if action_name == "silence_alert" and "event_key" not in p:
p["event_key"] = f"{event.get('source', '?')}:{event_type}"
try:
result = action_fn(**p)
status = result.get("status", "unknown")
@@ -348,78 +272,44 @@ def _nemoton_investigate(event: dict) -> dict | None:
actions_taken.append(f"{action_name} → error: {str(e)[:80]}")
sys_log.error(f"[EventRouter.L2] action {action_name} 例外: {e}")
# 產一句 summary 說明決策邏輯
summary = f"依規則 _L2_RULES[{event_type}] 執行 {len(actions_taken)} 個安全動作"
return {"summary": summary, "actions_taken": actions_taken}
# ─── 工具:構建 event 基礎 ─────────────────────────────────────
def _event_base(event: Dict[str, Any]) -> Dict[str, Any]:
return {
"severity": event.get("severity", "warning"),
"title": event.get("title", "未命名事件"),
"module": event.get("source", "unknown"),
"status": event.get("status"),
"impact": event.get("impact"),
"summary": event.get("summary", ""),
"details": event.get("payload"),
"trace": event.get("trace"),
}
# =====================================================================
# Telegram 發送 + Audit
# =====================================================================
def _send(text: str, admin_chat_ids: list[int] | None) -> dict:
token = os.getenv("TELEGRAM_BOT_TOKEN")
if not token:
return {"sent": 0, "errors": ["TELEGRAM_BOT_TOKEN 未設定"]}
# ─── 工具Telegram 發送 ───────────────────────────────────────
def _send_telegram(text: str, admin_chat_ids: Optional[list] = None) -> int:
if not TELEGRAM_BOT_TOKEN:
sys_log.warning("[EventRouter] TELEGRAM_BOT_TOKEN 未設定")
return 0
if admin_chat_ids is None:
admin_chat_ids = _load_admin_chat_ids()
admin_chat_ids = TELEGRAM_CHAT_IDS
if not admin_chat_ids:
return {"sent": 0, "errors": ["無管理員 chat_id"]}
admin_chat_ids = [-1003940688311] # fallback
url = f"https://api.telegram.org/bot{token}/sendMessage"
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
sent = 0
errors = []
for cid in admin_chat_ids:
try:
r = requests.post(url, json={
"chat_id": int(cid), "text": text, "parse_mode": "HTML",
"chat_id": int(cid),
"text": text,
"parse_mode": "HTML",
}, timeout=10)
if r.ok:
sent += 1
else:
errors.append(f"chat={cid} status={r.status_code}")
except Exception as e:
errors.append(f"chat={cid} exc={e}")
return {"sent": sent, "errors": errors}
def _load_admin_chat_ids() -> list[int]:
"""從 DB 撈 is_admin=true 的 chat_idfallback 到 .env TELEGRAM_CHAT_IDS"""
try:
from database.manager import get_session
from sqlalchemy import text as sa_text
session = get_session()
try:
rows = session.execute(sa_text(
"SELECT telegram_id FROM telegram_users WHERE is_active=true AND is_admin=true"
)).fetchall()
if rows:
return [int(r[0]) for r in rows]
finally:
session.close()
except Exception as e:
sys_log.warning(f"[EventRouter] 查 telegram_users 失敗fallback .env: {e}")
# Fallback 到 env
import re
raw = os.getenv("TELEGRAM_CHAT_IDS", "")
return [int(x.strip()) for x in re.sub(r'[\[\]"\' ]', "", raw).split(",") if x.strip()]
def _audit_dispatch(event: dict, tier: Tier, result: dict) -> None:
"""每次 dispatch 都寫入 ai_insights 作為審計軌跡"""
try:
from services.openclaw_learning_service import store_insight
store_insight(
insight_type="agent_action",
content=f"dispatch tier={tier.value} event={event.get('event_type')}",
period=datetime.now().strftime("%Y-%m-%d"),
metadata={
"tier": tier.value,
"event": {k: event.get(k) for k in ("source", "event_type", "severity", "title")},
"result": {k: result.get(k) for k in ("sent", "latency_ms", "errors")},
"ts": datetime.now().isoformat(),
},
)
except Exception as e:
sys_log.error(f"[EventRouter] audit 失敗: {e}")
sys_log.error(f"[EventRouter] Telegram 發送失敗: {e}")
return sent

351
services/watcher_agent.py Normal file
View File

@@ -0,0 +1,351 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Watcher Agent — 主動偵測與觸發
角色:
- 定期輪詢 sales snapshot檢查銷量下滑或競品價格突漲
- 發生異常時構建 event 並 dispatch 到 EventRouter
- 與 ActionPlanner 配合生成後續計畫
設計:
- 輕量級,無需額外 infra僅用 PostgreSQL
- 異常閾值可透過 env 調整
- 與 OpenClaw 共享 agent_context 記憶
"""
import json
import logging
import os
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
import requests
from sqlalchemy import text
from database.manager import get_session
from services.ai_orchestrator import AIOrchestrator
from services.event_router import dispatch
sys_log = logging.getLogger(__name__)
# ─── 環境設定 ────────────────────────────────────────────────
SALES_SNAPSHOT_TABLE = os.getenv("WATCHER_SNAPSHOT_TABLE", "daily_sales_snapshot")
SALES_DROP_THRESHOLD = float(os.getenv("WATCHER_SALES_DROP_THRESHOLD", "0.20")) # 20%
PRICE_SURGE_THRESHOLD = float(os.getenv("WATCHER_PRICE_SURGE_THRESHOLD", "0.15")) # 15%
CACHE_TTL_MIN = int(os.getenv("WATCHER_CACHE_TTL_MIN", "30")) # 輪詻間隔
# ─── 共享上下文鍵 ────────────────────────────────────────────
WATCHER_CTX_NS = "watcher"
class WatcherAgent:
"""
主動偵測 Agent
流程:
1) 載入最近兩週銷售快照
2) 計算環比變化
3) 篩選異常 SKU
4) 構建 event 並 dispatch
5) 寫入 agent_context 供後續 Agent 使用
"""
def __init__(self, orchestrator: Optional[AIOrchestrator] = None):
self.orchestrator = orchestrator or AIOrchestrator()
async def scan(self) -> int:
"""執行一次掃描,回傳觸發的異常數"""
rows = await self._fetch_sales_snapshot()
if not rows:
sys_log.info("[Watcher] 無銷售快照,跳過掃描")
return 0
anomalies = self._detect_anomalies(rows)
if not anomalies:
sys_log.info("[Watcher] 未檢測到異常")
return 0
sys_log.info(f"[Watcher] 檢測到 {len(anomalies)} 筆異常,開始 dispatch")
triggered = 0
for an in anomalies:
if await self._dispatch_anomaly(an):
triggered += 1
return triggered
async def track_outcomes(self, days: int = 7) -> None:
"""
排程回撥:執行後 days 天後檢查 action_outcomes
並將結果回饋給 OpenClaw 學習。
這裡僅作佈署示意;實際排程由外部 scheduler 負責。
"""
sys_log.info(f"[Watcher] 排程 outcome 回撥({days} 天後)")
# 範例:
# await outcome_tracker.schedule_follow_up(plan_id, sku, metric)
# ── 內部方法 ────────────────────────────────────────────────
async def _fetch_sales_snapshot(self) -> List[Dict[str, Any]]:
"""
讀取銷售快照。
欄位假設:
- sku
- name
- category
- sales_curr (最近7天銷售金額)
- sales_prev (前7天銷售金額)
- price_momo (MOMO 價格)
- price_pchome (PChome 價格)
- stock_status (庫存狀態)
若實際欄位名不同,請依實際調整。
"""
session = get_session()
try:
sql = text(f"""
SELECT sku, name, category,
COALESCE(sales_curr, 0) AS sales_curr,
COALESCE(sales_prev, 0) AS sales_prev,
price_momo, price_pchome, stock_status
FROM {SALES_SNAPSHOT_TABLE}
WHERE snapshot_date = CURRENT_DATE - INTERVAL '1 day'
LIMIT 500
""")
result = session.execute(sql).fetchall()
return [dict(row._mapping) for row in result]
except Exception as e:
sys_log.error(f"[Watcher] 無法讀取快照: {e}")
return []
finally:
session.close()
def _detect_anomalies(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
anomalies: List[Dict[str, Any]] = []
for r in rows:
sku = r["sku"]
name = r["name"]
curr = float(r["sales_curr"] or 0)
prev = float(r["sales_prev"] or 1) # 避免除以 0
pchome = r["price_pchome"]
momo = r["price_momo"]
stock = r.get("stock_status", "unknown")
drop_pct = (curr - prev) / prev if prev else 0.0
price_gap_pct = ((momo - pchome) / pchome * 100) if pchome else 0.0
reasons = []
# 銷量下滑異常
if drop_pct <= -SALES_DROP_THRESHOLD:
reasons.append(
f"銷量下滑 {drop_pct:+.1%}(閾值 {SALES_DROP_THRESHOLD:+.0%}"
)
# 競品價格突漲(若我方價格低且差距擴大)
if price_gap_pct > PRICE_SURGE_THRESHOLD:
reasons.append(
f"競品價格突漲 {price_gap_pct:+.1f}% 形成高價差"
)
# 庫存危機(可擴充)
if stock in ("out_of_stock", "low_stock"):
reasons.append(f"庫存狀態: {stock}")
if not reasons:
continue
anomalies.append({
"sku": sku,
"name": name,
"category": r.get("category", ""),
"drop_pct": drop_pct,
"price_gap_pct": price_gap_pct,
"reasons": reasons,
"stock": stock,
"momo_price": momo,
"pchome_price": pchome,
})
return anomalies
async def _dispatch_anomaly(self, anom: Dict[str, Any]) -> bool:
"""
依異常類型決定路由:
- 銷量下滑 + 價差微小 → L1分析原因
- 銷量下滑 + 價差大 → L2規劃 + 審核)
- 競品價格突漲 → L2防範被動
"""
drop = anom["drop_pct"]
gap = anom["price_gap_pct"]
sku = anom["sku"]
name = anom["name"]
session_id = self._ensure_session(sku)
# 構建 event payload與 EventRouter 對齊)
event = {
"source": "watcher",
"event_type": "sales_anomaly",
"severity": "alert",
"title": f"銷售異常偵測 — {sku} {name}",
"summary": "; ".join(anom["reasons"]),
"payload": {
"sku": sku,
"name": name,
"category": anom["category"],
"drop_pct": anom["drop_pct"],
"price_gap_pct": anom["price_gap_pct"],
"stock": anom["stock"],
"momo_price": anom["momo_price"],
"pchome_price": anom["pchome_price"],
"sales_prev": anom.get("sales_prev"),
"sales_curr": anom.get("sales_curr"),
},
"impact": "銷量下滑可能導致收入損失",
"status": "open",
}
# 決策路由
if drop <= -SALES_DROP_THRESHOLD and abs(gap) < PRICE_SURGE_THRESHOLD:
# 銷量下滑但價差微小 → 檢查是否非價格因素(缺貨/流量)
event["severity"] = "alert"
event["payload"]["non_price_factor"] = True
# 交由 L1 分析原因
return await self._route_l1(event, session_id)
else:
# 銷量下滑 + 價差大 或 競品價格突漲 → L2 規劃
event["severity"] = "alert"
return await self._route_l2(event, session_id)
async def _route_l1(self, event: Dict[str, Any], session_id: str) -> bool:
"""L1Hermes 分析下滑原因"""
try:
result = await self.orchestrator.handle_l1(event, session_id)
sys_log.info(f"[Watcher] L1 dispatch success for {event['payload']['sku']}")
# 寫入共享上下文
await self._save_context(session_id, "hermes", {
"summary": result.get("summary"),
"probable_cause": result.get("probable_cause"),
"actions": result.get("actions", []),
})
return True
except Exception as e:
sys_log.error(f"[Watcher] L1 dispatch failed: {e}")
# 保底:直接通知
await self._fallback_notify(event)
return False
async def _route_l2(self, event: Dict[str, Any], session_id: str) -> bool:
"""L2NemoTron 規劃 + 審核閘"""
try:
result = await self.orchestrator.handle_l2(event, session_id)
sys_log.info(f"[Watcher] L2 dispatch success for {event['payload']['sku']}")
# 寫入共享上下文與 action_plans
await self._save_context(session_id, "nemotron", {
"plan": result.get("plan"),
"actions_taken": result.get("actions_taken", []),
})
await self._save_action_plan(event, result.get("plan"))
return True
except Exception as e:
sys_log.error(f"[Watcher] L2 dispatch failed: {e}")
# 保底通知
await self._fallback_notify(event)
return False
async def _fallback_notify(self, event: Dict[str, Any]) -> None:
"""當 AI 失敗時,直接通知並記錄原因"""
sku = event["payload"]["sku"]
name = event["payload"]["name"]
text = (
f"⚠️ [Watcher Fallback] {sku} {name}\n"
f"原因:{event['summary']}\n"
f"建議:立即人工檢查銷售與庫存狀態。"
)
await self._notify_telegram(text)
async def _notify_telegram(self, text: str) -> bool:
"""透過 Telegram 發送訊息"""
from services.telegram_templates import alert as render_alert
bot_token = os.getenv("TELEGRAM_BOT_TOKEN")
if not bot_token:
sys_log.warning("[Watcher] TELEGRAM_BOT_TOKEN 未設定")
return False
chat_ids_raw = os.getenv("TELEGRAM_CHAT_IDS", "[]")
try:
chat_ids = json.loads(chat_ids_raw)
except json.JSONDecodeError:
chat_ids = []
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
payload = {
"chat_id": chat_ids[0] if chat_ids else -1003940688311,
"text": render_alert(title="銷售異常通知", content=text),
"parse_mode": "HTML",
}
try:
r = requests.post(url, json=payload, timeout=10)
return r.ok
except Exception as e:
sys_log.error(f"[Watcher] Telegram 通知失敗: {e}")
return False
def _ensure_session(self, sku: str) -> str:
"""保證 session_id 存在簡化skuid 作為 session"""
return f"session:{sku}"
async def _save_context(self, session_id: str, agent: str, data: Dict[str, Any]) -> None:
"""寫入 agent_context共享記憶"""
session = get_session()
try:
# 刪除舊的 key
session.execute(
text("DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag"),
{"sid": session_id, "ag": agent},
)
# 寫入新 context
session.execute(
text("""
INSERT INTO agent_context
(session_id, agent_name, context_key, context_val, created_at, ttl_minutes)
VALUES
(:sid, :ag, :ck, :cv, NOW(), :ttl)
"""),
{
"sid": session_id,
"ag": agent,
"ck": "latest",
"cv": json.dumps(data, ensure_ascii=False),
"ttl": CACHE_TTL_MIN * 2,
},
)
session.commit()
except Exception as e:
session.rollback()
sys_log.warning(f"[Watcher] 寫入 context 失敗: {e}")
finally:
session.close()
async def _save_action_plan(self, event: Dict[str, Any], plan: Optional[Dict[str, Any]]) -> None:
"""將 NemoTron 的 plan 寫入 action_plans"""
if not plan:
return
session = get_session()
try:
sku = event["payload"]["sku"]
session.execute(
text("""
INSERT INTO action_plans
(session_id, plan_type, sku, payload, status, created_by)
VALUES
(:sid, :pt, :sku, :pl, 'pending', 'nemotron')
"""),
{
"sid": f"session:{sku}",
"pt": plan.get("type", "price_adjust"),
"sku": sku,
"pl": json.dumps(plan, ensure_ascii=False),
},
)
session.commit()
except Exception as e:
session.rollback()
sys_log.warning(f"[Watcher] 寫入 action_plan 失敗: {e}")
finally:
session.close()