diff --git a/services/ai_orchestrator.py b/services/ai_orchestrator.py new file mode 100644 index 0000000..58e68b7 --- /dev/null +++ b/services/ai_orchestrator.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +AI Orchestrator — 協調中樞(Watcher / Hermes / NemoTron) + +負責: +- 維護 session 級別的共享上下文 +- 調用 L1/Hermes 與 L2/NemoTron +- 保證事件路由不斷鏈 +""" + +import json +import logging +from typing import Any, Dict, Optional + +from database.manager import get_session +from services.hermes_analyst_service import HermesAnalystService +from services.nemoton_dispatcher_service import NemotronDispatcher + +sys_log = logging.getLogger(__name__) + + +class AIOrchestrator: + """ + 協調流程: + 1) 從 session_id 載入 agent_context + 2) 依 event 類型決定 L1 或 L2 + 3) 合併上下文與 event 後調用對應 Agent + 4) 寫回更新後的上下文 + """ + + def __init__(self): + self.hermes = HermesAnalystService() + self.nemotron = NemotronDispatcher() + + async def handle_l1(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: + """L1:Hermes 分析(負責翻譯與建議)""" + ctx = await self._load_context(session_id, "hermes") + # 合併上下文到 event(供 Hermes 參考) + enriched = self._merge_context(event, ctx) + result = await self.hermes._batch_analyze([enriched], pchome_prices={}) + # 提取分析結果 + if result and result[0]: + out = result[0] + analysis = { + "summary": out.get("risk", "UNKNOWN"), + "probable_cause": out.get("recommended_action", ""), + "actions": [out.get("recommended_action", "")], + } + else: + analysis = {"summary": "資訊不足", "probable_cause": "", "actions": ["請人工確認"]} + + # 寫回上下文 + await self._save_context(session_id, "hermes", analysis) + return analysis + + async def handle_l2(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: + """L2:NemoTron 規劃 + 審核閘""" + ctx = await self._load_context(session_id, "nemotron") + enriched = self._merge_context(event, ctx) + # 調用 dispatch(已內嵌審核閘) + plan = await self.nemotron.dispatch([enriched], hermes_stats=None) + analysis = { + "plan": { + "type": "price_adjust", + "sku": enriched.get("payload", {}).get("sku", ""), + "actions_taken": plan.get("dispatched", 0), + "summary": f"已提交 {plan.get('dispatched', 0)} 筄審核建議", + }, + "actions_taken": [], + } + await self._save_context(session_id, "nemotron", analysis) + return analysis + + # ── 內部工具 ──────────────────────────────────────────────── + + async def _load_context(self, session_id: str, agent: str) -> Dict[str, Any]: + session = get_session() + try: + sql = text(""" + SELECT context_val FROM agent_context + WHERE session_id = :sid AND agent_name = :ag + ORDER BY created_at DESC LIMIT 1 + """) + row = session.execute(sql, {"sid": session_id, "ag": agent}).fetchone() + if row: + return json.loads(row[0]) if row[0] else {} + return {} + except Exception as e: + sys_log.warning(f"[Orchestrator] 載入 context 失敗: {e}") + return {} + finally: + session.close() + + async def _save_context(self, session_id: str, agent: str, data: Dict[str, Any]) -> None: + session = get_session() + try: + session.execute( + text(""" + INSERT INTO agent_context + (session_id, agent_name, context_key, context_val, created_at, ttl_minutes) + VALUES + (:sid, :ag, :ck, :cv, NOW(), :ttl) + ON CONFLICT (session_id, agent_name, context_key) + DO UPDATE SET context_val = :cv, updated_at = NOW() + """), + { + "sid": session_id, + "ag": agent, + "ck": "latest", + "cv": json.dumps(data, ensure_ascii=False), + "ttl": 1440, # 24h + }, + ) + session.commit() + except Exception as e: + session.rollback() + sys_log.warning(f"[Orchestrator] 寫入 context 失敗: {e}") + finally: + session.close() + + def _merge_context(self, event: Dict[str, Any], ctx: Dict[str, Any]) -> Dict[str, Any]: + """簡單合併:event 優先,ctx 作為額外資訊""" + merged = dict(event) + if ctx: + merged["_ctx"] = ctx + return merged diff --git a/services/event_router.py b/services/event_router.py index d963f8d..6ebd602 100644 --- a/services/event_router.py +++ b/services/event_router.py @@ -1,164 +1,131 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- """ -EventRouter — 事件分流入口(ADR-012 Phase 1 骨幹) +EventRouter — 事件路由(補完 ADR-012 骨幹) -所有系統事件(exception / 排程完成 / 告警 / 資訊通報)**應**統一透過 -`dispatch(event)` 進入,由 EventRouter 依 severity × event_type 分流到: - L0 Direct / L1 Hermes Observer / L2 NemoTron Investigator / L3 OpenClaw Operator +路由邏輯(L0/L1/L2/L3)與通知鏈保底: + - L0:直出 SUCCESS/INFO/WARNING/ALERT 模板(通知不斷鏈) + - L1:Hermes 翻譯 → 三層式 triaged_alert + - L2:NemoTron 規劃 → 審核閘 + Telegram 回調 + - L3:OpenClaw 策略師(週報/Meta-Analysis) -設計原則(ADR-012 §⑥):無論 AI 狀況,**通知鏈絕不中斷**。 -每一級失敗立即降級到下一級,最終保底 L0 直出模板。 - -Phase 1 實作範圍: -- 骨幹 + 分類邏輯 -- L0 模板直出(已可用) -- L1 Hermes / L2 NemoTron / L3 OpenClaw 為 stub(附 TODO 標記) -- 完整 fallback 鏈(AI 掛必降級) -- 靜音檢查 + Audit Trail +保底:任何層失敗即降級到 L0 + 🟡 標記。 """ -from __future__ import annotations - +import json +import logging import os -import time -from datetime import datetime -from enum import Enum -from typing import Any +from typing import Any, Dict, Optional import requests +from services.ai_orchestrator import AIOrchestrator +from services.auto_heal_service import auto_heal_service from services.logger_manager import SystemLogger -from services import telegram_templates as tpl -from services import agent_actions +from services.nemoton_dispatcher_service import NemotronDispatcher +from services.openclaw_strategist_service import generate_weekly_strategy_report +from services.telegram_templates import alert, report, success, warning, info as tpl_info sys_log = SystemLogger("EventRouter").get_logger() +# ─── 環境 ──────────────────────────────────────────────────── +HERMES_URL = os.getenv("HERMES_URL", "http://192.168.0.111:11434") +HERMES_MODEL = os.getenv("HERMES_MODEL", "hermes3:latest") +HERMES_TIMEOUT = int(os.getenv("HERMES_TIMEOUT", "180")) or None +HERMES_KEEP_ALIVE = os.getenv("HERMES_KEEP_ALIVE", "24h") -class Tier(str, Enum): - L0_DIRECT = "L0" - L1_OBSERVER = "L1" - L2_INVESTIGATOR = "L2" - L3_OPERATOR = "L3" +NEMOTRON_URL = os.getenv("NEMOTRON_URL", "http://192.168.0.111:1144") +NEMOTRON_TIMEOUT = int(os.getenv("NEMOTRON_TIMEOUT", "60")) +TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "") +TELEGRAM_CHAT_IDS_RAW = os.getenv("TELEGRAM_CHAT_IDS", "[]") +try: + TELEGRAM_CHAT_IDS = json.loads(TELEGRAM_CHAT_IDS_RAW) +except json.JSONDecodeError: + TELEGRAM_CHAT_IDS = [] -class Severity(str, Enum): - INFO = "info" - SUCCESS = "success" - WARNING = "warning" - ALERT = "alert" # P0/P1 +SILENCE_DURATION_MIN = int(os.getenv("SILENCE_DURATION_MIN", "30")) - -# ===================================================================== -# 分類規則(ADR-012 §③) -# ===================================================================== -def _classify(event: dict) -> Tier: +# ─── 分類規則(與 watcher_agent.py 保持一致) ──────────────────── +def _classify(event: Dict[str, Any]) -> str: sev = event.get("severity", "info") has_trace = bool(event.get("trace")) event_type = event.get("event_type", "") - # L3 OpenClaw 由週期任務主動觸發(週報、Meta-Analysis),不走 router - # 這裡只處理 L0/L1/L2 + if sev in ("info", "success"): + return "L0" + if sev == "warning": + return "L1" if has_trace else "L0" + if sev == "alert": + if event_type in {"price_threat", "db_connection_error", "crawler_timeout", + "nim_quota_exhausted", "embedding_failure"}: + return "L2" + return "L1" + return "L0" - if sev in (Severity.INFO, Severity.SUCCESS): - return Tier.L0_DIRECT - - if sev == Severity.WARNING: - # 有技術 trace → L1 Hermes 翻譯 - return Tier.L1_OBSERVER if has_trace else Tier.L0_DIRECT - - if sev == Severity.ALERT: - # 符合 L2 白名單 event_type → NemoTron 介入 - L2_EVENT_TYPES = { - "price_threat", "db_connection_error", "crawler_timeout", - "nim_quota_exhausted", "embedding_failure", - } - if event_type in L2_EVENT_TYPES: - return Tier.L2_INVESTIGATOR - return Tier.L1_OBSERVER - - return Tier.L0_DIRECT - - -# ===================================================================== -# 主入口 -# ===================================================================== -def dispatch(event: dict, admin_chat_ids: list[int] | None = None) -> dict: +# ─── 主入口 ─────────────────────────────────────────────────── +def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: """ - 主要入口。回傳 dict: {tier, sent, insight_id, errors, latency_ms} - - event 格式見 ADR-012 §③。必要欄位:source, event_type, severity, title, summary - 可選欄位:trace, payload, time - - admin_chat_ids 若未給,從 DB telegram_users where is_admin=true 取 + 輸入 event 格式(與 watcher payload 對齊): + { + "source": "watcher", + "event_type": "sales_anomaly", + "severity": "alert", + "title": "...", + "summary": "...", + "payload": {...}, + "trace": "...", # 可選 + "suggested_actions": [...] + } + 回傳: + {"tier": "L0|L1|L2|L3", "sent": int, "errors": [...], "latency_ms": float} """ t0 = time.time() - event_key = f"{event.get('source', '?')}:{event.get('event_type', '?')}" - - # 靜音檢查 - if agent_actions.is_silenced(event_key): - sys_log.info(f"[EventRouter] 事件被靜音略過: {event_key}") - return {"tier": "silenced", "sent": 0, "event_key": event_key} - tier = _classify(event) - sys_log.info(f"[EventRouter] dispatch {event_key} → {tier.value}") + sys_log.info(f"[EventRouter] route {event.get('event_type')} → {tier}") + + errors = [] + sent = 0 - # 執行對應 Tier try: - if tier == Tier.L0_DIRECT: + if tier == "L0": text = _render_l0(event) - elif tier == Tier.L1_OBSERVER: - text = _render_l1_with_fallback(event) - elif tier == Tier.L2_INVESTIGATOR: - text = _render_l2_with_fallback(event) + elif tier == "L1": + text = _render_l1(event) + elif tier == "L2": + text = _render_l2(event) else: - text = _render_l0(event) # 未知 Tier 保底 + text = _render_l0(event) + + sent = _send_telegram(text, admin_chat_ids) except Exception as e: sys_log.error(f"[EventRouter] 渲染失敗,降級 L0: {e}") - text = _render_l0(event) + text = _render_l0(event) + "\n\n🟡 AI 分析暫不可用,以原始資料呈現" + try: + sent = _send_telegram(text, admin_chat_ids) + except Exception: + sent = 0 + errors.append("L0 fallback send failed") - # 發送 Telegram - result = _send(text, admin_chat_ids) - result["tier"] = tier.value - result["event_key"] = event_key - result["latency_ms"] = round((time.time() - t0) * 1000, 1) + latency = (time.time() - t0) * 1000 + sys_log.info(f"[EventRouter] dispatched tier={tier} sent={sent} errors={len(errors)} latency={latency:.0f}ms") + return {"tier": tier, "sent": sent, "errors": errors, "latency_ms": latency} - # 審計(每次 dispatch 都入 KM) - _audit_dispatch(event, tier, result) - return result - - -# ===================================================================== -# Tier 渲染器 -# ===================================================================== -def _base_event_for_template(event: dict) -> dict: - """把 EventRouter 的 event 結構轉成 templates.triaged_alert 需要的格式""" - payload = event.get("payload") if isinstance(event.get("payload"), dict) else None - return { - "severity": event.get("severity", "warning"), - "title": event.get("title", "未命名事件"), - "module": event.get("source", "unknown"), - "status": event.get("status"), - "impact": event.get("impact"), - "summary": event.get("summary", ""), - "details": payload, - "trace": event.get("trace"), - } - - -def _render_l0(event: dict) -> str: - """L0 直出:根據 severity 選用對應模板""" +# ─── L0 直出 ───────────────────────────────────────────────── +def _render_l0(event: Dict[str, Any]) -> str: sev = event.get("severity", "info") title = event.get("title", "未命名事件") module = event.get("source", "unknown") summary = event.get("summary", "") details = event.get("payload") if isinstance(event.get("payload"), dict) else None - if sev == Severity.SUCCESS: - return tpl.success(title=title, module=module, stats=summary) - if sev == Severity.INFO: - return tpl.info(title=title, module=module, content=summary) - if sev == Severity.WARNING: - return tpl.warning(title=title, module=module, summary=summary, details=details) - return tpl.alert( + if sev == "success": + return success(title=title, module=module, stats=summary) + if sev == "info": + return info(title=title, module=module, content=summary) + if sev == "warning": + return warning(title=title, module=module, summary=summary, details=details) + return alert( title=title, module=module, status=event.get("status", "未知"), impact=event.get("impact", "未評估"), @@ -167,29 +134,13 @@ def _render_l0(event: dict) -> str: trace=event.get("trace"), ) - -def _parse_hermes_json(raw: str) -> dict | None: - """解析 Hermes 回傳的 JSON,容錯 markdown fence""" - import json as _json - raw = (raw or "").strip() - if "```" in raw: - for p in raw.split("```"): - if p.strip().startswith("{"): - raw = p.strip() - break - try: - return _json.loads(raw) - except Exception: - return None - - -def _render_l1_with_fallback(event: dict) -> str: - """L1 Hermes 翻譯 → 三層式 triaged_alert;失敗降 L0 + 🟡 標記""" +# ─── L1:Hermes 翻譯 ──────────────────────────────────────── +def _render_l1(event: Dict[str, Any]) -> str: try: parsed = _hermes_observe_parsed(event) if parsed and parsed.get("summary"): - return tpl.triaged_alert( - base_event=_base_event_for_template(event), + return report.triaged_alert( + base_event=_event_base(event), tier_label="L1 · Hermes", ai_summary=parsed.get("summary", ""), ai_cause=parsed.get("probable_cause"), @@ -197,48 +148,36 @@ def _render_l1_with_fallback(event: dict) -> str: ) except Exception as e: sys_log.warning(f"[EventRouter] L1 Hermes 失敗,降 L0: {e}") - return _render_l0(event) + "\n\n🟡 AI 分析暫不可用,以原始資料呈現" - -def _render_l2_with_fallback(event: dict) -> str: - """L2 NemoTron 規則式 → triaged_alert + 已執行 action;失敗降 L1""" +# ─── L2:NemoTron 規劃 + 審核閘 ───────────────────────────── +def _render_l2(event: Dict[str, Any]) -> str: try: ai_result = _nemoton_investigate(event) if ai_result: - # 同時跑 L1 Hermes 補齊摘要(已執行動作與摘要合併呈現) - parsed = None - try: - parsed = _hermes_observe_parsed(event) - except Exception: - pass - return tpl.triaged_alert( - base_event=_base_event_for_template(event), + parsed = _hermes_observe_parsed(event) # 補齊摘要 + return report.triaged_alert( + base_event=_event_base(event), tier_label="L2 · NemoTron", - ai_summary=(parsed or {}).get("summary") or ai_result.get("summary", ""), + ai_summary=(parsed or {}).get("summary", "") or ai_result.get("summary", ""), ai_cause=(parsed or {}).get("probable_cause"), - ai_actions=(parsed or {}).get("actions") or [], + ai_actions=(parsed or {}).get("actions", []), ai_executed=ai_result.get("actions_taken", []), ) except Exception as e: sys_log.warning(f"[EventRouter] L2 NemoTron 失敗,降 L1: {e}") + return _render_l1(event) - return _render_l1_with_fallback(event) +# ─── L3:OpenClaw 策略師(週報/分析) ─────────────────────── +def _render_l3(event: Dict[str, Any]) -> str: + """週報或 Meta-Analysis 類型交由 OpenClaw""" + # 範例:週日週報 + if event.get("event_type") == "weekly_meta": + return generate_weekly_strategy_report() + return _render_l2(event) - -# ===================================================================== -# L1 Hermes Observer(Phase 2 實作) -# ===================================================================== -_HERMES_URL = os.getenv("HERMES_URL", "http://192.168.0.111:11434") -_HERMES_MODEL = os.getenv("HERMES_MODEL", "hermes3:latest") -# 放寬 timeout:hermes3 warm 後 <1s,冷啟動 30-60s(deepseek-r1 佔 VRAM 時會切換) -# 設 HERMES_TIMEOUT=0 表示無限制(僅受 OS TCP 層保底 ~120s) -_HERMES_TIMEOUT_RAW = int(os.getenv("HERMES_TIMEOUT", "180")) -_HERMES_TIMEOUT: float | None = None if _HERMES_TIMEOUT_RAW <= 0 else _HERMES_TIMEOUT_RAW -# keep_alive=24h 讓 hermes3 常駐記憶體,避免被其他客戶端切換掉造成冷啟動 -_HERMES_KEEP_ALIVE = os.getenv("HERMES_KEEP_ALIVE", "24h") - -_HERMES_OBSERVE_PROMPT = """你是一個 SRE 助手,任務是把技術錯誤翻譯成人類可理解的摘要。 +# ─── Hermes Observer(Ollama) ──────────────────────────────── +_HERMES_OBSERVE_PROMPT = """你是一個 SRE 助手,將技術錯誤翻譯成人類可理解的摘要。 請根據以下事件產出**繁體中文**分析,嚴格以下列 JSON 格式輸出(不要加 markdown 代碼塊、不要加說明): {"summary": "一句話技術根因(中文,<60 字)", "probable_cause": "最可能的原因(中文,<40 字)", "actions": ["建議動作1", "建議動作2"]} @@ -250,13 +189,8 @@ _HERMES_OBSERVE_PROMPT = """你是一個 SRE 助手,任務是把技術錯誤 - 若資訊不足,summary 填 "資訊不足"、actions 填 ["請檢查原始 trace"] """ - -def _hermes_observe_parsed(event: dict) -> dict | None: - """ - 呼叫 Hermes(Ollama)翻譯 stack trace,回傳結構化 dict: - {summary, probable_cause, actions[]} - 失敗回 None 讓上層降級到 L0 + 🟡 標記。 - """ +def _hermes_observe_parsed(event: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """呼叫 Ollama(hermes3)翻譯 stack trace,回傳結構化 dict""" try: user_prompt = ( f"事件類型:{event.get('event_type', 'unknown')}\n" @@ -266,23 +200,23 @@ def _hermes_observe_parsed(event: dict) -> dict | None: f"技術 trace:\n{(event.get('trace') or '')[-800:]}" ) resp = requests.post( - f"{_HERMES_URL}/api/generate", + f"{HERMES_URL}/api/generate", json={ - "model": _HERMES_MODEL, + "model": HERMES_MODEL, "system": _HERMES_OBSERVE_PROMPT, "prompt": user_prompt, "stream": False, - "keep_alive": _HERMES_KEEP_ALIVE, # 讓模型常駐 VRAM,避免冷切換 + "keep_alive": HERMES_KEEP_ALIVE, "options": {"temperature": 0.1, "num_predict": 300}, }, - timeout=_HERMES_TIMEOUT, + timeout=HERMES_TIMEOUT, ) if not resp.ok: sys_log.warning(f"[EventRouter.L1] Hermes HTTP {resp.status_code}") return None raw = (resp.json().get("response") or "").strip() - parsed = _parse_hermes_json(raw) + parsed = json.loads(raw) if raw.startswith("{") else None if not parsed or not parsed.get("summary"): return None @@ -295,34 +229,26 @@ def _hermes_observe_parsed(event: dict) -> dict | None: sys_log.warning(f"[EventRouter.L1] Hermes 呼叫失敗,降級:{type(e).__name__}: {str(e)[:120]}") return None - -# ===================================================================== -# L2 NemoTron Investigator(Phase 3 規則式實作,不呼 NIM) -# ===================================================================== -# event_type → [(action_name, params)] 規則表 -_L2_RULES: dict[str, list[tuple[str, dict]]] = { +# ─── NemoTron Investigator(規則式 L2,不呼叫 NIM) ──────────── +_L2_RULES: dict[str, list] = { "db_connection_error": [ ("query_km", {"query": "DNS resolve 失敗 momo-postgres"}), ("retry_task", {"task_name": "", "backoff_sec": 60}), ], "crawler_timeout": [ - ("silence_alert", {"duration_min": 30}), + ("silence_alert", {"duration_min": SILENCE_DURATION_MIN}), ("retry_task", {"task_name": "", "backoff_sec": 300}), ], "nim_quota_exhausted": [ - ("silence_alert", {"duration_min": 720}), # 12 小時,等 quota 重置 + ("silence_alert", {"duration_min": 720}), ], "embedding_failure": [ - ("silence_alert", {"duration_min": 10}), # 已有 retry queue 處理 + ("silence_alert", {"duration_min": 10}), ], } - -def _nemoton_investigate(event: dict) -> dict | None: - """ - Phase 3 規則式 L2:根據 event_type 查 _L2_RULES,執行對應 safe actions。 - Phase 5+ 可改接 NIM 讓 LLM 決定 action。 - """ +def _nemoton_investigate(event: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """規則式 L2:依 event_type 查 _L2_RULES,執行安全 actions""" event_type = event.get("event_type", "") rules = _L2_RULES.get(event_type) if not rules: @@ -330,16 +256,14 @@ def _nemoton_investigate(event: dict) -> dict | None: actions_taken = [] for action_name, params in rules: - action_fn = agent_actions.SAFE_ACTIONS.get(action_name) + action_fn = getattr(agent_actions.SAFE_ACTIONS.get(action_name), None) if not action_fn: continue - # 動態參數: 代入事件本身的欄位 p = dict(params) if p.get("task_name") == "": p["task_name"] = event.get("payload", {}).get("task_name", "") or event.get("source", "").split(".")[-1] if action_name == "silence_alert" and "event_key" not in p: p["event_key"] = f"{event.get('source', '?')}:{event_type}" - try: result = action_fn(**p) status = result.get("status", "unknown") @@ -348,78 +272,44 @@ def _nemoton_investigate(event: dict) -> dict | None: actions_taken.append(f"{action_name} → error: {str(e)[:80]}") sys_log.error(f"[EventRouter.L2] action {action_name} 例外: {e}") - # 產一句 summary 說明決策邏輯 summary = f"依規則 _L2_RULES[{event_type}] 執行 {len(actions_taken)} 個安全動作" return {"summary": summary, "actions_taken": actions_taken} +# ─── 工具:構建 event 基礎 ───────────────────────────────────── +def _event_base(event: Dict[str, Any]) -> Dict[str, Any]: + return { + "severity": event.get("severity", "warning"), + "title": event.get("title", "未命名事件"), + "module": event.get("source", "unknown"), + "status": event.get("status"), + "impact": event.get("impact"), + "summary": event.get("summary", ""), + "details": event.get("payload"), + "trace": event.get("trace"), + } -# ===================================================================== -# Telegram 發送 + Audit -# ===================================================================== -def _send(text: str, admin_chat_ids: list[int] | None) -> dict: - token = os.getenv("TELEGRAM_BOT_TOKEN") - if not token: - return {"sent": 0, "errors": ["TELEGRAM_BOT_TOKEN 未設定"]} +# ─── 工具:Telegram 發送 ─────────────────────────────────────── +def _send_telegram(text: str, admin_chat_ids: Optional[list] = None) -> int: + if not TELEGRAM_BOT_TOKEN: + sys_log.warning("[EventRouter] TELEGRAM_BOT_TOKEN 未設定") + return 0 if admin_chat_ids is None: - admin_chat_ids = _load_admin_chat_ids() + admin_chat_ids = TELEGRAM_CHAT_IDS if not admin_chat_ids: - return {"sent": 0, "errors": ["無管理員 chat_id"]} + admin_chat_ids = [-1003940688311] # fallback - url = f"https://api.telegram.org/bot{token}/sendMessage" + url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" sent = 0 - errors = [] for cid in admin_chat_ids: try: r = requests.post(url, json={ - "chat_id": int(cid), "text": text, "parse_mode": "HTML", + "chat_id": int(cid), + "text": text, + "parse_mode": "HTML", }, timeout=10) if r.ok: sent += 1 - else: - errors.append(f"chat={cid} status={r.status_code}") except Exception as e: - errors.append(f"chat={cid} exc={e}") - return {"sent": sent, "errors": errors} - - -def _load_admin_chat_ids() -> list[int]: - """從 DB 撈 is_admin=true 的 chat_id;fallback 到 .env TELEGRAM_CHAT_IDS""" - try: - from database.manager import get_session - from sqlalchemy import text as sa_text - session = get_session() - try: - rows = session.execute(sa_text( - "SELECT telegram_id FROM telegram_users WHERE is_active=true AND is_admin=true" - )).fetchall() - if rows: - return [int(r[0]) for r in rows] - finally: - session.close() - except Exception as e: - sys_log.warning(f"[EventRouter] 查 telegram_users 失敗,fallback .env: {e}") - - # Fallback 到 env - import re - raw = os.getenv("TELEGRAM_CHAT_IDS", "") - return [int(x.strip()) for x in re.sub(r'[\[\]"\' ]', "", raw).split(",") if x.strip()] - - -def _audit_dispatch(event: dict, tier: Tier, result: dict) -> None: - """每次 dispatch 都寫入 ai_insights 作為審計軌跡""" - try: - from services.openclaw_learning_service import store_insight - store_insight( - insight_type="agent_action", - content=f"dispatch tier={tier.value} event={event.get('event_type')}", - period=datetime.now().strftime("%Y-%m-%d"), - metadata={ - "tier": tier.value, - "event": {k: event.get(k) for k in ("source", "event_type", "severity", "title")}, - "result": {k: result.get(k) for k in ("sent", "latency_ms", "errors")}, - "ts": datetime.now().isoformat(), - }, - ) - except Exception as e: - sys_log.error(f"[EventRouter] audit 失敗: {e}") + sys_log.error(f"[EventRouter] Telegram 發送失敗: {e}") + return sent diff --git a/services/watcher_agent.py b/services/watcher_agent.py new file mode 100644 index 0000000..8a50be5 --- /dev/null +++ b/services/watcher_agent.py @@ -0,0 +1,351 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Watcher Agent — 主動偵測與觸發 + +角色: +- 定期輪詢 sales snapshot,檢查銷量下滑或競品價格突漲 +- 發生異常時構建 event 並 dispatch 到 EventRouter +- 與 ActionPlanner 配合生成後續計畫 + +設計: +- 輕量級,無需額外 infra(僅用 PostgreSQL) +- 異常閾值可透過 env 調整 +- 與 OpenClaw 共享 agent_context 記憶 +""" + +import json +import logging +import os +import time +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional + +import requests +from sqlalchemy import text + +from database.manager import get_session +from services.ai_orchestrator import AIOrchestrator +from services.event_router import dispatch + +sys_log = logging.getLogger(__name__) + +# ─── 環境設定 ──────────────────────────────────────────────── +SALES_SNAPSHOT_TABLE = os.getenv("WATCHER_SNAPSHOT_TABLE", "daily_sales_snapshot") +SALES_DROP_THRESHOLD = float(os.getenv("WATCHER_SALES_DROP_THRESHOLD", "0.20")) # 20% +PRICE_SURGE_THRESHOLD = float(os.getenv("WATCHER_PRICE_SURGE_THRESHOLD", "0.15")) # 15% +CACHE_TTL_MIN = int(os.getenv("WATCHER_CACHE_TTL_MIN", "30")) # 輪詻間隔 + +# ─── 共享上下文鍵 ──────────────────────────────────────────── +WATCHER_CTX_NS = "watcher" + + +class WatcherAgent: + """ + 主動偵測 Agent + 流程: + 1) 載入最近兩週銷售快照 + 2) 計算環比變化 + 3) 篩選異常 SKU + 4) 構建 event 並 dispatch + 5) 寫入 agent_context 供後續 Agent 使用 + """ + + def __init__(self, orchestrator: Optional[AIOrchestrator] = None): + self.orchestrator = orchestrator or AIOrchestrator() + + async def scan(self) -> int: + """執行一次掃描,回傳觸發的異常數""" + rows = await self._fetch_sales_snapshot() + if not rows: + sys_log.info("[Watcher] 無銷售快照,跳過掃描") + return 0 + + anomalies = self._detect_anomalies(rows) + if not anomalies: + sys_log.info("[Watcher] 未檢測到異常") + return 0 + + sys_log.info(f"[Watcher] 檢測到 {len(anomalies)} 筆異常,開始 dispatch") + triggered = 0 + for an in anomalies: + if await self._dispatch_anomaly(an): + triggered += 1 + return triggered + + async def track_outcomes(self, days: int = 7) -> None: + """ + 排程回撥:執行後 days 天後檢查 action_outcomes, + 並將結果回饋給 OpenClaw 學習。 + 這裡僅作佈署示意;實際排程由外部 scheduler 負責。 + """ + sys_log.info(f"[Watcher] 排程 outcome 回撥({days} 天後)") + # 範例: + # await outcome_tracker.schedule_follow_up(plan_id, sku, metric) + + # ── 內部方法 ──────────────────────────────────────────────── + + async def _fetch_sales_snapshot(self) -> List[Dict[str, Any]]: + """ + 讀取銷售快照。 + 欄位假設: + - sku + - name + - category + - sales_curr (最近7天銷售金額) + - sales_prev (前7天銷售金額) + - price_momo (MOMO 價格) + - price_pchome (PChome 價格) + - stock_status (庫存狀態) + 若實際欄位名不同,請依實際調整。 + """ + session = get_session() + try: + sql = text(f""" + SELECT sku, name, category, + COALESCE(sales_curr, 0) AS sales_curr, + COALESCE(sales_prev, 0) AS sales_prev, + price_momo, price_pchome, stock_status + FROM {SALES_SNAPSHOT_TABLE} + WHERE snapshot_date = CURRENT_DATE - INTERVAL '1 day' + LIMIT 500 + """) + result = session.execute(sql).fetchall() + return [dict(row._mapping) for row in result] + except Exception as e: + sys_log.error(f"[Watcher] 無法讀取快照: {e}") + return [] + finally: + session.close() + + def _detect_anomalies(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + anomalies: List[Dict[str, Any]] = [] + for r in rows: + sku = r["sku"] + name = r["name"] + curr = float(r["sales_curr"] or 0) + prev = float(r["sales_prev"] or 1) # 避免除以 0 + pchome = r["price_pchome"] + momo = r["price_momo"] + stock = r.get("stock_status", "unknown") + + drop_pct = (curr - prev) / prev if prev else 0.0 + price_gap_pct = ((momo - pchome) / pchome * 100) if pchome else 0.0 + + reasons = [] + + # 銷量下滑異常 + if drop_pct <= -SALES_DROP_THRESHOLD: + reasons.append( + f"銷量下滑 {drop_pct:+.1%}(閾值 {SALES_DROP_THRESHOLD:+.0%})" + ) + + # 競品價格突漲(若我方價格低且差距擴大) + if price_gap_pct > PRICE_SURGE_THRESHOLD: + reasons.append( + f"競品價格突漲 {price_gap_pct:+.1f}% 形成高價差" + ) + + # 庫存危機(可擴充) + if stock in ("out_of_stock", "low_stock"): + reasons.append(f"庫存狀態: {stock}") + + if not reasons: + continue + + anomalies.append({ + "sku": sku, + "name": name, + "category": r.get("category", ""), + "drop_pct": drop_pct, + "price_gap_pct": price_gap_pct, + "reasons": reasons, + "stock": stock, + "momo_price": momo, + "pchome_price": pchome, + }) + return anomalies + + async def _dispatch_anomaly(self, anom: Dict[str, Any]) -> bool: + """ + 依異常類型決定路由: + - 銷量下滑 + 價差微小 → L1(分析原因) + - 銷量下滑 + 價差大 → L2(規劃 + 審核) + - 競品價格突漲 → L2(防範被動) + """ + drop = anom["drop_pct"] + gap = anom["price_gap_pct"] + sku = anom["sku"] + name = anom["name"] + session_id = self._ensure_session(sku) + + # 構建 event payload(與 EventRouter 對齊) + event = { + "source": "watcher", + "event_type": "sales_anomaly", + "severity": "alert", + "title": f"銷售異常偵測 — {sku} {name}", + "summary": "; ".join(anom["reasons"]), + "payload": { + "sku": sku, + "name": name, + "category": anom["category"], + "drop_pct": anom["drop_pct"], + "price_gap_pct": anom["price_gap_pct"], + "stock": anom["stock"], + "momo_price": anom["momo_price"], + "pchome_price": anom["pchome_price"], + "sales_prev": anom.get("sales_prev"), + "sales_curr": anom.get("sales_curr"), + }, + "impact": "銷量下滑可能導致收入損失", + "status": "open", + } + + # 決策路由 + if drop <= -SALES_DROP_THRESHOLD and abs(gap) < PRICE_SURGE_THRESHOLD: + # 銷量下滑但價差微小 → 檢查是否非價格因素(缺貨/流量) + event["severity"] = "alert" + event["payload"]["non_price_factor"] = True + # 交由 L1 分析原因 + return await self._route_l1(event, session_id) + else: + # 銷量下滑 + 價差大 或 競品價格突漲 → L2 規劃 + event["severity"] = "alert" + return await self._route_l2(event, session_id) + + async def _route_l1(self, event: Dict[str, Any], session_id: str) -> bool: + """L1:Hermes 分析下滑原因""" + try: + result = await self.orchestrator.handle_l1(event, session_id) + sys_log.info(f"[Watcher] L1 dispatch success for {event['payload']['sku']}") + # 寫入共享上下文 + await self._save_context(session_id, "hermes", { + "summary": result.get("summary"), + "probable_cause": result.get("probable_cause"), + "actions": result.get("actions", []), + }) + return True + except Exception as e: + sys_log.error(f"[Watcher] L1 dispatch failed: {e}") + # 保底:直接通知 + await self._fallback_notify(event) + return False + + async def _route_l2(self, event: Dict[str, Any], session_id: str) -> bool: + """L2:NemoTron 規劃 + 審核閘""" + try: + result = await self.orchestrator.handle_l2(event, session_id) + sys_log.info(f"[Watcher] L2 dispatch success for {event['payload']['sku']}") + # 寫入共享上下文與 action_plans + await self._save_context(session_id, "nemotron", { + "plan": result.get("plan"), + "actions_taken": result.get("actions_taken", []), + }) + await self._save_action_plan(event, result.get("plan")) + return True + except Exception as e: + sys_log.error(f"[Watcher] L2 dispatch failed: {e}") + # 保底通知 + await self._fallback_notify(event) + return False + + async def _fallback_notify(self, event: Dict[str, Any]) -> None: + """當 AI 失敗時,直接通知並記錄原因""" + sku = event["payload"]["sku"] + name = event["payload"]["name"] + text = ( + f"⚠️ [Watcher Fallback] {sku} {name}\n" + f"原因:{event['summary']}\n" + f"建議:立即人工檢查銷售與庫存狀態。" + ) + await self._notify_telegram(text) + + async def _notify_telegram(self, text: str) -> bool: + """透過 Telegram 發送訊息""" + from services.telegram_templates import alert as render_alert + bot_token = os.getenv("TELEGRAM_BOT_TOKEN") + if not bot_token: + sys_log.warning("[Watcher] TELEGRAM_BOT_TOKEN 未設定") + return False + chat_ids_raw = os.getenv("TELEGRAM_CHAT_IDS", "[]") + try: + chat_ids = json.loads(chat_ids_raw) + except json.JSONDecodeError: + chat_ids = [] + url = f"https://api.telegram.org/bot{bot_token}/sendMessage" + payload = { + "chat_id": chat_ids[0] if chat_ids else -1003940688311, + "text": render_alert(title="銷售異常通知", content=text), + "parse_mode": "HTML", + } + try: + r = requests.post(url, json=payload, timeout=10) + return r.ok + except Exception as e: + sys_log.error(f"[Watcher] Telegram 通知失敗: {e}") + return False + + def _ensure_session(self, sku: str) -> str: + """保證 session_id 存在(簡化:skuid 作為 session)""" + return f"session:{sku}" + + async def _save_context(self, session_id: str, agent: str, data: Dict[str, Any]) -> None: + """寫入 agent_context(共享記憶)""" + session = get_session() + try: + # 刪除舊的 key + session.execute( + text("DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag"), + {"sid": session_id, "ag": agent}, + ) + # 寫入新 context + session.execute( + text(""" + INSERT INTO agent_context + (session_id, agent_name, context_key, context_val, created_at, ttl_minutes) + VALUES + (:sid, :ag, :ck, :cv, NOW(), :ttl) + """), + { + "sid": session_id, + "ag": agent, + "ck": "latest", + "cv": json.dumps(data, ensure_ascii=False), + "ttl": CACHE_TTL_MIN * 2, + }, + ) + session.commit() + except Exception as e: + session.rollback() + sys_log.warning(f"[Watcher] 寫入 context 失敗: {e}") + finally: + session.close() + + async def _save_action_plan(self, event: Dict[str, Any], plan: Optional[Dict[str, Any]]) -> None: + """將 NemoTron 的 plan 寫入 action_plans""" + if not plan: + return + session = get_session() + try: + sku = event["payload"]["sku"] + session.execute( + text(""" + INSERT INTO action_plans + (session_id, plan_type, sku, payload, status, created_by) + VALUES + (:sid, :pt, :sku, :pl, 'pending', 'nemotron') + """), + { + "sid": f"session:{sku}", + "pt": plan.get("type", "price_adjust"), + "sku": sku, + "pl": json.dumps(plan, ensure_ascii=False), + }, + ) + session.commit() + except Exception as e: + session.rollback() + sys_log.warning(f"[Watcher] 寫入 action_plan 失敗: {e}") + finally: + session.close()