From c73b430566798926537fc43ec26ca05c5c0ed700 Mon Sep 17 00:00:00 2001 From: "ogt (aider)" Date: Sun, 19 Apr 2026 21:33:43 +0800 Subject: [PATCH] services/ai_orchestrator.py ``` import logging from typing import Any, Dict, Optional MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit from services.hermes_analyst_service import HermesAnalystService from services.nemoton_dispatcher_service import NemotronDispatcher from services.openclaw_strategist_service import OpenClawStrategist from services.telegram_templates import alert from database.manager import get_session from database.autoheal_models import AgentContext, ActionPlan, ActionOutcome logger = logging.getLogger(__name__) class AIOrchestrator: """ 協調中樞:負責 EventRouter 的 L1/L2 處理、Agent 共享上下文與閉環決策追蹤。 這是新增的核心模組,將逐步替換硬編碼鏈。 """ def __init__(self): self.hermes = HermesAnalystService() self.nemotron = NemotronDispatcher() self.openclaw = OpenClawStrategist() self._retry_config = {"max_attempts": 3, "backoff_factor": 1.5} async def handle_l1(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: """ L1:語意翻譯 + 原因分析(由 Hermes 提供)。 結果會寫入 agent_context,並可作為 L2 的上下文。 """ ctx = await self._get_context(session_id) result = await self._call_with_retry(self.hermes.handle_l1, event, session_id) await self._save_context(session_id, "hermes", result) return result async def handle_l2(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: """ L2:規劃 + 審核閘。 輸入包含 L1 分析結果(若可用),產出 ActionPlan 等待批准。 """ ctx = await self._get_context(session_id) # 包含 hermes 分析 result = await self._call_with_retry(self.nemotron.handle_l2, event, session_id) await self._save_action_plan(result) # 審核閘由 routes/bot_api_routes 透過 callback 處理 return result async def handle_l3(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: """ L3:策略師介入(週報 / 複雜重分析)。 """ ctx = await self._get_context(session_id) return await self.openclaw.handle_l3(event, ctx) async def _call_with_retry(self, func, *args, **kwargs): """ 簡易重試機制,避免瞬間網路錯誤導致中斷。 """ attempt = 0 while True: try: return await func(*args, **kwargs) except Exception as e: attempt += 1 if attempt > self._retry_config["max_attempts"]: logger.error(f"[AIOrchestrator] 重試超過上限,最後一次錯誤: {e}") raise backoff = self._retry_config["backoff_factor"] ** attempt logger.warning(f"[AIOrchestrator] 第 {attempt} 次重試,延遲 {backoff:.1f}s: {e}") await asyncio.sleep(backoff) async def _get_context(self, session_id: str) -> Dict[str, Any]: """ 讀取共享上下文(按 session_id + agent),若不存在則返回空。 """ import asyncio session = get_session() try: rows = session.execute( "SELECT context_key, context_val FROM agent_context WHERE session_id = :sid", {"sid": session_id}, ).fetchall() out: Dict[str, Any] = {} for r in rows: out[r[0]] = r[1] return out finally: session.close() async def _save_context(self, session_id: str, agent: str, payload: Dict[str, Any]) -> None: import asyncio session = get_session() try: # 刪除舊 key(保留 TTL 邏輯在應用層) session.execute( "DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag", {"sid": session_id, "ag": agent}, ) session.execute( """ INSERT INTO agent_context (session_id, agent_name, context_key, context_val, created_at, ttl_minutes) VALUES (:sid, :ag, :ck, :cv, NOW(), 60) """, { "sid": session_id, "ag": agent, "ck": "latest", "cv": payload, }, ) session.commit() logger.debug(f"[AIOrchestrator] 已保存上下文 session={session_id} agent={agent}") except Exception as e: session.rollback() logger.error(f"[AIOrchestrator] save_context 失敗: {e}") raise finally: session.close() async def _save_action_plan(self, plan: Dict[str, Any]) -> None: import asyncio session = get_session() try: # 簡化:payload 直接存 JSON 字串 session.execute( """ INSERT INTO action_plans (session_id, plan_type, sku, payload, status, created_by) VALUES (:sid, :pt, :sku, :pl, 'pending', 'nemotron') """, { "sid": plan.get("session_id"), "pt": plan.get("plan_type"), "sku": plan.get("sku"), "pl": plan, }, ) session.commit() logger.info(f"[AIOrchestrator] 已建立 ActionPlan plan_type={plan.get('plan_type')} sku={plan.get('sku')}") except Exception as e: session.rollback() logger.error(f"[AIOrchestrator] save_action_plan 失敗: {e}") raise finally: session.close() async def record_outcome(self, plan_id: int, verdict: str, metrics: Dict[str, Any]) -> None: """ 記錄決策後果,並觸發策略權重更新(OpenClaw 學習)。 """ import asyncio session = get_session() try: session.execute( """ INSERT INTO action_outcomes (plan_id, metric_type, before_val, after_val, measured_at, verdict) VALUES (:pid, :mt, :bv, :av, NOW(), :vc) """, { "pid": plan_id, "mt": metrics.get("metric_type"), "bv": metrics.get("before_val"), "av": metrics.get("after_val"), "vc": verdict, }, ) # 簡化:直接呼叫學習服務(可替換為隊列) await self.openclaw.absorb_outcome(metrics, verdict) session.commit() logger.info(f"[AIOrchestrator] 已記錄 outcome plan_id={plan_id} verdict={verdict}") except Exception as e: session.rollback() logger.error(f"[AIOrchestrator] record_outcome 失敗: {e}") raise finally: session.close() ``` services/event_router.py ``` import logging from typing import Any, Dict, Optional from services.ai_orchestrator import AIOrchestrator from services.telegram_templates import alert from database.manager import get_session logger = logging.getLogger(__name__) async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]: """ L1:語意翻譯 + 原因分析(由 Hermes 提供)。 """ orchestrator = AIOrchestrator() return await orchestrator.handle_l1(event, session_id) async def _handle_l2(event: Dict[str, Any], session_id: str) -> Dict[str, Any]: """ L2:規劃 + 審核閘。 產出 ActionPlan 等待批准(Telegram 回調處理)。 """ orchestrator = AIOrchestrator() return await orchestrator.handle_l2(event, session_id) async def _handle_l0(event: Dict[str, Any]) -> Dict[str, Any]: """L0:直接回傳原始事件(兼容與監控)""" return {"status": "ok", "echo": event.get("event_type")} async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: """ 事件路由主入口(與 routes/bot_api_routes 兼容)。 輸出格式與 dispatch_v1 保持一致,以便平滑切換。 """ tier = _classify(event) session_id = f"evt:{event.get('event_type')}:{event.get('source', 'unknown')}" try: if tier == "L0": result = await _handle_l0(event) elif tier == "L1": result = await _handle_l1(event, session_id) elif tier == "L2": result = await _handle_l2(event, session_id) else: result = await _handle_l0(event) # 保留舊版回傳格式 return { "tier": tier, "sent": 1, "errors": [], "latency_ms": 0, "payload": result, } except Exception as e: logger.exception(f"[EventRouter] dispatch 失敗: {e}") return { "tier": tier, "sent": 0, "errors": [str(e)], "latency_ms": 0, "payload": None, } def _classify(event: Dict[str, Any]) -> str: sev = event.get("severity", "info") has_trace = bool(event.get("trace")) event_type = event.get("event_type", "") if sev in ("info", "success"): return "L0" if sev == "warning": return "L1" if has_trace else "L0" if sev == "alert": if event_type in {"price_threat", "db_connection_error", "crawler_timeout", "nim_quota_exhausted", "embedding_failure"}: return "L2" return "L1" return "L0" ``` services/telegram_templates.py ``` import json import logging from typing import Any, Dict, Optional from database.manager import get_session from database.telegram_models import TelegramUser sys_log = logging.getLogger("TelegramTpl") # ─── 常數 ──────────────────────────────────────────────── TELEGRAM_BOT_TOKEN_ENV = "TELEGRAM_BOT_TOKEN" TELEGRAM_CHAT_IDS_ENV = "TELEGRAM_CHAT_IDS" # ─── 工具:取得 Token 與 Chat ID(容錯) ───────────────── def _get_bot_token() -> Optional[str]: from dotenv import load_dotenv load_dotenv() import os return os.getenv(TELEGRAM_BOT_TOKEN_ENV) def _get_chat_ids() -> list: token = _get_bot_token() if not token: sys_log.warning("[TelegramTpl] %s 未設定,跳過 Telegram 通知", TELEGRAM_BOT_TOKEN_ENV) return [] raw = __import__("os").getenv(TELEGRAM_CHAT_IDS_ENV, "[]") try: return json.loads(raw) except json.JSONDecodeError: sys_log.warning("[TelegramTpl] %s 格式錯誤,應為 JSON 陣列", TELEGRAM_CHAT_IDS_ENV) return [] # ─── 原始發送(內部使用) ───────────────────────────────── def _send_telegram_raw(text: str, chat_ids: Optional[list] = None, reply_markup: Optional[Dict[str, Any]] = None, parse_mode: str = "HTML") -> bool: import requests token = _get_bot_token() if not token: return False if chat_ids is None: chat_ids = _get_chat_ids() if not chat_ids: chat_ids = [-1003940688311] # fallback url = f"https://api.telegram.org/bot{token}/sendMessage" payload = { "chat_id": chat_ids[0], "text": text, "parse_mode": parse_mode, } if reply_markup: payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False) try: r = requests.post(url, json=payload, timeout=10) if not r.ok: sys_log.warning("[TelegramTpl] sendMessage HTTP %s: %s", r.status_code, r.text[:200]) return False return True except Exception as e: sys_log.error("[TelegramTpl] send 失敗: %s", e) return False # ─── 公用模板 ───────────────────────────────────────────── def alert(title: str, content: str, actions: Optional[list] = None) -> str: """高危險警報(紅色)""" msg = f"🚨 {title}\n\n{content}" if actions: msg += "\n\n" + "\n".join(f"• {a}" for a in actions) return msg def warning(title: str, summary: str, details: Optional[dict] = None) -> str: """中風險警告(橙色)""" msg = f"⚠️ {title}\n\n{summary}" if details: msg += "\n\n細節:\n" + "\n".join(f"• {k}: {v}" for k, v in details.items()) return msg def info(title: str, module: str, content: str, time: Optional[Any] = None) -> str: """普通信息(藍色)""" t_str = f" · {time}" if time else "" return f"📊 {title} [{module}]{t_str}\n\n{content}" def success(title: str, module: str, stats: str = "") -> str: """成功通知(綠色)""" return f"✅ {title} [{module}]\n{stats}" def price_decision( product_name: str, product_sku: str, current_price: float, suggested_price: float, reason: str, insight_id: Optional[int] = None, ) -> tuple: """ 降價決策通知(含 Inline Keyboard)。 回傳 (message_text, reply_markup) """ diff = current_price - suggested_price if diff > 0: action_text = f"降價 ${diff:,.0f}" elif diff < 0: action_text = f"提價 ${-diff:,.0f}" else: action_text = "維持" message = ( f"💰 自動降價建議\n" f"商品:{product_name} (SKU: {product_sku})\n" f"現價:${current_price:,.0f} → 建議:${suggested_price:,.0f}\n" f"原因:{reason}\n" ) if insight_id: message += f"洞察 ID:{insight_id}\n" keyboard = { "inline_keyboard": [ [ {"text": "✅ 確認執行", "callback_data": f"price_decision:approve:{product_sku}"}, {"text": "❌ 拒絕", "callback_data": f"price_decision:reject:{product_sku}"}, ], [ {"text": "📊 查看洞察", "url": f"https://your-dashboard.example/insight/{insight_id}" if insight_id else "#"}, ], ] } return message, keyboard def triaged_alert( base_event: Dict[str, Any], tier_label: str, ai_summary: str, ai_cause: Optional[str] = None, ai_actions: Optional[list] = None, ai_executed: Optional[list] = None, ) -> str: """ L1/L2 整合通知(帶 AI 摘要與可執行動作)。 """ msg = ( f"⚡ {tier_label} · {base_event.get('event_type', 'alert')}\n" f"📌 {base_event.get('title')}\n\n" ) summary = base_event.get("summary", "") if summary: msg += f"🔍 概要:{summary}\n\n" if ai_summary: msg += f"🧠 AI 摘要:{ai_summary}\n\n" if ai_cause: msg += f"💡 可能原因:{ai_cause}\n\n" if ai_actions: msg += "📋 建議行動:\n" + "\n".join(f"• {a}" for a in ai_actions) + "\n\n" if ai_executed: msg += "✅ 已執行:\n" + "\n".join(f"• {a}" for a in ai_executed) + "\n\n" trace = base_event.get("trace") if trace: msg += f"
{trace[-500:]}
" keyboard = { "inline_keyboard": [ [{"text": "📊 查看详情", "url": f"https://dashboard.example/event/{base_event.get('id')}"}], [{"text": "🛑 忽略此事件", "callback_data": f"event_ignore:{base_event.get('id')}"}], ] } return msg, keyboard def report(title: str, report_type: str, period: str, content_md: str) -> str: """策略/週報模板""" return ( f"📊 {title} ({report_type})\n" f"期間:{period}\n\n" f"{content_md}" ) def success(title: str, module: str, stats: str = "") -> str: """成功通知(綠色)""" return f"✅ {title} [{module}]\n{stats}" def _send_telegram(msg: str, chat_ids: Optional[list] = None, reply_markup: Optional[Dict[str, Any]] = None) -> bool: return _send_telegram_raw(msg, chat_ids=chat_ids, reply_markup=reply_markup) ``` database/autoheal_models.py ``` from sqlalchemy import Column, Integer, String, DateTime, Text, Boolean, ForeignKey, Index from sqlalchemy.orm import relationship from database.models import Base from datetime import datetime class AgentContext(Base): """ 共享上下文表(替代硬編碼鏈),支援多 Agent 存取與 TTL。 索引:(session_id, agent_name, context_key) 以加速跨 Agent 查詢。 """ __tablename__ = 'agent_context' id = Column(Integer, primary_key=True, autoincrement=True) session_id = Column(String(64), nullable=False, index=True) agent_name = Column(String(50), nullable=False, index=True) context_key = Column(String(100), nullable=False) context_val = Column(Text) # JSON 字串 created_at = Column(DateTime, default=datetime.now) ttl_minutes = Column(Integer, default=60) __table_args__ = ( Index('idx_agent_context_session_key', 'session_id', 'agent_name', 'context_key'), Index('idx_agent_context_session_ttl', 'session_id', 'created_at'), ) class ActionPlan(Base): """ 行動計畫表(NemoTron 輸出,等待審核與執行追蹤)。 """ __tablename__ = 'action_plans' id = Column(Integer, primary_key=True, autoincrement=True) session_id = Column(String(64), nullable=True) plan_type = Column(String(50), nullable=True) # price_adjust / restock / campaign sku = Column(String(100), nullable=True, index=True) payload = Column(Text) # JSON 行動內容 status = Column(String(20), default='pending') # pending/approved/rejected/executed created_by = Column(String(50)) # nemotron / openclaw approved_by = Column(String(100), nullable=True) # Telegram user_id created_at = Column(DateTime, default=datetime.now) executed_at = Column(DateTime, nullable=True) __table_args__ = ( Index('idx_action_plan_sku_status', 'sku', 'status'), Index('idx_action_plan_created', 'created_at'), ) class ActionOutcome(Base): """ 行動結果追蹤(閉環學習核心)。 """ __tablename__ = 'action_outcomes' id = Column(Integer, primary_key=True, autoincrement=True) plan_id = Column(Integer, ForeignKey('action_plans.id'), nullable=False) metric_type = Column(String(50), nullable=True) # sales_7d / price_rank / conversion before_val = Column(Float) after_val = Column(Float) measured_at = Column(DateTime) verdict = Column(String(20)) # effective / neutral / backfired created_at = Column(DateTime, default=datetime.now) plan = relationship("ActionPlan", backref="outcomes") class AgentStrategyWeights(Base): """ Agent 策略權重(OpenClaw 學習累積)。 索引:strategy_key 以便快速更新與查詢。 """ __tablename__ = 'agent_strategy_weights' id = Column(Integer, primary_key=True, autoincrement=True) strategy_key = Column(String(100), unique=True, nullable=False) # e.g. price_cut_when_gap_gt_5pct weight = Column(Float, default=1.0) success_cnt = Column(Integer, default=0) fail_cnt = Column(Integer, default=0) updated_at = Column(DateTime, default=datetime.now) __table_args__ = ( Index('idx_strategy_key', 'strategy_key'), ) ``` services/watcher_agent.py ``` import logging import asyncio from datetime import datetime, timedelta from typing import List, Dict, Any from database.manager import get_session from services.event_router import dispatch logger = logging.getLogger(__name__) class WatcherAgent: """ 主動偵測 Agent:定期輪詢銷售快照,檢查異常並觸發 EventRouter。 設計為輕量、無外部依賴(僅用 PostgreSQL)。 """ SALES_DROP_THRESHOLD = 0.20 # 銷售下滑 >20% 觸發 PRICE_SURGE_THRESHOLD = 0.15 # 競品價格漲幅 >15% 觸發 CACHE_TTL_MIN = 30 # 輪詻間隔 def __init__(self): self.last_scan: Dict[str, float] = {} async def scan(self) -> int: """執行一次掃描,回傳觸發的異常數""" rows = await self._fetch_sales_snapshot() if not rows: logger.info("[Watcher] 無銷售快照,跳過掃描") return 0 anomalies = self._detect_anomalies(rows) if not anomalies: logger.info("[Watcher] 未檢測到異常") return 0 logger.info(f"[Watcher] 檢測到 {len(anomalies)} 筆異常,開始 dispatch") triggered = 0 for an in anomalies: if await self._dispatch_anomaly(an): triggered += 1 return triggered async def track_outcome(self, plan_id: int) -> None: """ 排程回撥:行動執行後由 DecisionTracker 調用,評估效果並更新策略。 這裡保留接口供未來擴充。 """ logger.info(f"[Watcher] 行動效果回撥 plan_id={plan_id}(待實現)") # ── 內部方法 ──────────────────────────────────────────────── async def _fetch_sales_snapshot(self) -> List[Dict[str, Any]]: """ 讀取銷售快照。欄位依實際 DB 調整。 預期欄位:sku, name, category, sales_curr, sales_prev, price_momo, price_pchome, stock_status """ session = get_session() try: sql = """ SELECT sku, name, category, COALESCE(sales_curr, 0) AS sales_curr, COALESCE(sales_prev, 0) AS sales_prev, price_momo, price_pchome, stock_status FROM daily_sales_snapshot WHERE snapshot_date = CURRENT_DATE - INTERVAL '1 day' LIMIT 500 """ result = session.execute(sql).fetchall() return [dict(row._mapping) for row in result] except Exception as e: logger.error(f"[Watcher] 無法讀取快照: {e}") return [] finally: session.close() def _detect_anomalies(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: anomalies: List[Dict[str, Any]] = [] for r in rows: sku = r["sku"] name = r["name"] curr = float(r["sales_curr"] or 0) prev = float(r["sales_prev"] or 1) pchome = r["price_pchome"] momo = r["price_momo"] stock = r.get("stock_status", "unknown") drop_pct = (curr - prev) / prev if prev else 0.0 price_gap_pct = ((momo - pchome) / pchome * 100) if pchome else 0.0 reasons: List[str] = [] # 銷量下滑異常 if drop_pct <= -self.SALES_DROP_THRESHOLD: reasons.append( f"銷量下滑 {drop_pct:+.1%}(閾值 {self.SALES_DROP_THRESHOLD:+.0%})" ) # 競品價格突漲(若我方價格低且差距擴大) if price_gap_pct > self.PRICE_SURGE_THRESHOLD: reasons.append( f"競品價格突漲 {price_gap_pct:+.1f}% 形成高價差" ) # 庫存危機 if stock in ("out_of_stock", "low_stock"): reasons.append(f"庫存狀態: {stock}") if not reasons: continue anomalies.append({ "sku": sku, "name": name, "category": r.get("category", ""), "drop_pct": drop_pct, "price_gap_pct": price_gap_pct, "reasons": reasons, "stock": stock, "momo_price": momo, "pchome_price": pchome, }) return anomalies async def _dispatch_anomaly(self, anom: Dict[str, Any]) -> bool: """ 依異常類型決定路由: - 銷量下滑 + 價差微小 → L1(分析原因) - 銷量下滑 + 價差大 → L2(規劃 + 審核) - 競品價格突漲 → L2(防範被動) """ drop = anom["drop_pct"] gap = anom["price_gap_pct"] sku = anom["sku"] name = anom["name"] session_id = self._ensure_session(sku) event = { "source": "watcher", "event_type": "sales_anomaly", "severity": "alert", "title": f"銷售異常偵測 — {sku} {name}", "summary": "; ".join(anom["reasons"]), "payload": { "sku": sku, "name": name, "category": anom["category"], "drop_pct": anom["drop_pct"], "price_gap_pct": anom["price_gap_pct"], "stock": anom["stock"], "momo_price": anom["momo_price"], "pchome_price": anom["pchome_price"], "sales_prev": anom.get("sales_prev"), "sales_curr": anom.get("sales_curr"), }, "impact": "銷量下滑可能導致收入損失", "status": "open", } # 決策路由 if drop <= -self.SALES_DROP_THRESHOLD and abs(gap) < self.PRICE_SURGE_THRESHOLD: # 銷量下滑但價差微小 → 檢查是否非價格因素(缺貨/流量) event["payload"]["non_price_factor"] = True return await self._route_l1(event, session_id) else: return await self._route_l2(event, session_id) async def _route_l1(self, event: Dict[str, Any], session_id: str) -> bool: """L1:Hermes 分析下滑原因""" try: orchestrator = AIOrchestrator() result = await orchestrator.handle_l1(event, session_id) logger.info(f"[Watcher] L1 dispatch success for {event['payload']['sku']}") await self._save_context(session_id, "hermes", { "summary": result.get("summary"), "probable_cause": result.get("probable_cause"), "actions": result.get("actions", []), }) return True except Exception as e: logger.error(f"[Watcher] L1 dispatch failed: {e}") await self._fallback_notify(event) return False async def _route_l2(self, event: Dict[str, Any], session_id: str) -> bool: """L2:NemoTron 規劃 + 審核閘""" try: orchestrator = AIOrchestrator() result = await orchestrator.handle_l2(event, session_id) logger.info(f"[Watcher] L2 dispatch success for {event['payload']['sku']}") await self._save_context(session_id, "nemotron", { "plan": result.get("plan"), "actions_taken": result.get("actions_taken", []), }) await self._save_action_plan(event, result.get("plan")) return True except Exception as e: logger.error(f"[Watcher] L2 dispatch failed: {e}") await self._fallback_notify(event) return False async def _fallback_notify(self, event: Dict[str, Any]) -> None: """當 AI 失敗時,直接通知並記錄原因""" sku = event["payload"]["sku"] name = event["payload"]["name"] text = ( f"⚠️ [Watcher Fallback] {sku} {name}\n" f"原因:{event['summary']}\n" f"建議:立即人工檢查銷售與庫存狀態。" ) await self._notify_telegram(text) async def _notify_telegram(self, text: str) -> bool: """透過 Telegram 發送訊息""" from services.telegram_templates import alert as render_alert bot_token = "TELEGRAM_BOT_TOKEN_PLACEHOLDER" # 實際由環境注入 if not bot_token: logger.warning("[Watcher] TELEGRAM_BOT_TOKEN 未設定") return False chat_ids = [] # 實際由環境注入 url = f"https://api.telegram.org/bot{bot_token}/sendMessage" payload = { "chat_id": chat_ids[0] if chat_ids else -1003940688311, "text": render_alert(title="銷售異常通知", content=text), "parse_mode": "HTML", } try: r = requests.post(url, json=payload, timeout=10) return r.ok except Exception as e: logger.error(f"[Watcher] Telegram 通知失敗: {e}") return False def _ensure_session(self, sku: str) -> str: """保證 session_id 存在(skuid 作為 session)""" return f"session:{sku}" async def _save_context(self, session_id: str, agent: str, data: Dict[str, Any]) -> None: """寫入 agent_context(共享記憶)""" session = get_session() try: session.execute( "DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag", {"sid": session_id, "ag": agent}, ) session.execute( """ INSERT INTO agent_context (session_id, agent_name, context_key, context_val, created_at, ttl_minutes) VALUES (:sid, :ag, :ck, :cv, NOW(), :ttl) """, { "sid": session_id, "ag": agent, "ck": "latest", "cv": data, "ttl": self.CACHE_TTL_MIN * 2, }, ) session.commit() logger.debug(f"[Watcher] 已保存 context session={session_id} agent={agent}") except Exception as e: session.rollback() logger.warning(f"[Watcher] 寫入 context 失敗: {e}") finally: session.close() async def _save_action_plan(self, event: Dict[str, Any], plan: Optional[Dict[str, Any]]) -> None: """將 NemoTron 的 plan 寫入 action_plans""" if not plan: return session = get_session() try: sku = event["payload"]["sku"] session.execute( """ INSERT INTO action_plans (session_id, plan_type, sku, payload, status, created_by) VALUES (:sid, :pt, :sku, :pl, 'pending', 'nemotron') """, { "sid": plan.get("session_id"), "pt": plan.get("plan_type"), "sku": sku, "pl": plan, }, ) session.commit() logger.info(f"[Watcher] 已建立 ActionPlan plan_type={plan.get('plan_type')} sku={plan.get('sku')}") except Exception as e: session.rollback() logger.warning(f"[Watcher] 寫入 action_plan 失敗: {e}") finally: session.close() ``` services/decision_tracker.py ``` import logging from datetime import datetime, timedelta from typing import Dict, Any from database.manager import get_session from services.openclaw_learning_service import store_insight logger = logging.getLogger(__name__) class DecisionTracker: """ 閉環學習與效果追蹤: - 為每條 ActionPlan 排定 outcome 量測(7天後) - 量測後記錄 verdict,並觸發 OpenClaw 學習與策略權重更新 """ OUTCOME_WINDOW_DAYS = 7 async def schedule_follow_up(self, plan_id: int, sku: str, metric: str = "sales_7d") -> None: """排程在 window 後回來量測""" logger.info(f"[DecisionTracker] 排程 outcome 追蹤 plan_id={plan_id} sku={sku} metric={metric}") async def measure_and_learn(self, plan_id: int) -> None: """ 量測 ActionPlan 的效果並回饋學習。 由 scheduled job 每隔一定時間呼叫。 """ session = get_session() try: plan = session.query(ActionPlan).get(plan_id) if not plan or plan.status not in ("approved", "executed"): return before_val, after_val, metric_type = self._measure_outcome(plan) verdict = self._judge_verdict(before_val, after_val) await self._record_outcome(plan_id, metric_type, before_val, after_val, verdict) metrics = { "metric_type": metric_type, "before_val": before_val, "after_val": after_val, } await store_insight( insight_type="auto_heal_playbook", period=datetime.now().strftime("%Y-%m-%d"), content=f"[效果追蹤] plan_id={plan_id} sku={plan.sku} before={before_val} after={after_val} verdict={verdict}", metadata={"verdict": verdict, "plan_type": plan.plan_type}, ai_model="auto_heal_engine_v1", ) await self._update_strategy_weights(metrics, verdict) except Exception as e: logger.error(f"[DecisionTracker] measure_and_learn 失敗: {e}") finally: session.close() def _measure_outcome(self, plan: ActionPlan) -> tuple: """ 模擬量測:實際應用中連接銷售/庫存系統。 返回 (before, after, metric_type) """ if plan.plan_type == "price_adjust": return 100.0, 130.0, "sales_7d" return 0.0, 0.0, "unknown" def _judge_verdict(self, before: float, after: float) -> str: if after <= 0: return "neutral" ratio = (after - before) / before if ratio >= 0.2: return "effective" if ratio <= -0.1: return "backfired" return "neutral" async def _record_outcome(self, plan_id: int, metric_type: str, before_val: float, after_val: float, verdict: str) -> None: session = get_session() try: session.execute( """ INSERT INTO action_outcomes (plan_id, metric_type, before_val, after_val, measured_at, verdict) VALUES (:pid, :mt, :bv, :av, NOW(), :vc) """, { "pid": plan_id, "mt": metric_type, "bv": before_val, "av": after_val, "vc": verdict, }, ) session.commit() except Exception as e: session.rollback() logger.error(f"[DecisionTracker] 記錄 outcome 失敗: {e}") raise finally: session.close() async def _update_strategy_weights(self, metrics: Dict[str, Any], verdict: str) -> None: """ 根據 outcome 更新策略權重(OpenClaw 學習)。 簡化:effective +1,backfired -1。 """ session = get_session() try: key = f"{metrics.get('metric_type')}_{metrics.get('plan_type', 'default')}" if verdict == "effective": session.execute( """ UPDATE agent_strategy_weights SET success_cnt = success_cnt + 1, weight = weight + 0.1, updated_at = NOW() WHERE strategy_key = :k """, {"k": key}, ) elif verdict == "backfired": session.execute( """ UPDATE agent_strategy_weights SET fail_cnt = fail_cnt + 1, weight = GREATEST(weight - 0.2, 0.0), updated_at = NOW() WHERE strategy_key = :k """, {"k": key}, ) # neutral 不更新權重 session.commit() except Exception as e: session.rollback() logger.warning(f"[DecisionTracker] 更新策略權重失敗: {e}") finally: session.close() ``` services/openclaw_learning_service.py ``` import json import logging from datetime import datetime from typing import Any, Dict, Optional from database.manager import get_session from database.autoheal_models import AIInsight sys_log = logging.getLogger(__name__) def build_rag_context_by_date(start_date: str, end_date: str) -> str: """ 依日期區間拉取 ai_insights,用於週報 RAG。 """ session = get_session() try: rows = session.execute( "SELECT insight_type, period, content FROM ai_insights " "WHERE DATE(created_at) BETWEEN :s AND :e " "ORDER BY created_at ASC", {"s": start_date, "e": end_date}, ).fetchall() if not rows: return "" parts = [f"[{r[1]}] {r[0]}: {r[2]}" for r in rows] return "\n\n---\n\n".join(parts) except Exception as e: sys_log.error(f"[OCLearn] build_rag_context_by_date 失敗: {e}") return "" finally: session.close() def store_insight( insight_type: str, content: str, period: Optional[str] = None, product_sku: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None, ai_model: Optional[str] = None, ) -> Optional[int]: """ 雙寫:寫入 ai_insights + 排程 embedding(由 embedding_retry_queue 供 worker 處理)。 """ session = get_session() try: meta_str = json.dumps(metadata, ensure_ascii=False) if metadata else None rec = AIInsight( insight_type=insight_type, period=period, product_sku=product_sku, content=content, metadata_json=meta_str, created_at=datetime.now(), updated_at=datetime.now(), ) session.add(rec) session.commit() session.refresh(rec) # 排程 embedding(持久化,由 background worker 消费) _enqueue_embedding_for_insight(rec, ai_model or "bge-m3") return rec.id except Exception as e: session.rollback() sys_log.error(f"[OCLearn] store_insight 失敗: {e}") return None finally: session.close() def _enqueue_embedding_for_insight(insight: AIInsight, model: str) -> bool: """ 將洞察文本推入 embedding_retry_queue,供 background worker 批量向量化。 """ session = get_session() try: session.execute( """ INSERT INTO embedding_retry_queue (target_table, target_id, text_content, model, status, created_at) VALUES (:t, :i, :txt, :m, 'pending', :now) """, { "t": "ai_insights", "i": insight.id, "txt": f"{insight.insight_type} ({insight.period or ''}): {insight.content}", "m": model, "now": datetime.now(), }, ) session.commit() return True except Exception as e: session.rollback() sys_log.warning(f"[OCLearn] enqueue embedding 失敗: {e}") return False finally: session.close() ``` database/autoheal_models.py ``` from sqlalchemy import Column, Integer, String, DateTime, Text, Boolean, ForeignKey, Index, Float from sqlalchemy.orm import relationship from database.models import Base from datetime import datetime class AgentContext(Base): """ 共享上下文表(替代硬編碼鏈),支援多 Agent 存取與 TTL。 索引:(session_id, agent_name, context_key) 以加速跨 Agent 查詢。 """ __tablename__ = 'agent_context' id = Column(Integer, primary_key=True, autoincrement=True) session_id = Column(String(64), nullable=False, index=True) agent_name = Column(String(50), nullable=False, index=True) context_key = Column(String(100), nullable=False) context_val = Column(Text) # JSON 字串 created_at = Column(DateTime, default=datetime.now) ttl_minutes = Column(Integer, default=60) __table_args__ = ( Index('idx_agent_context_session_key', 'session_id', 'agent_name', 'context_key'), Index('idx_agent_context_session_ttl', 'session_id', 'created_at'), ) class ActionPlan(Base): """ 行動計畫表(NemoTron 輸出,等待審核與執行追蹤)。 """ __tablename__ = 'action_plans' id = Column(Integer, primary_key=True, autoincrement=True) session_id = Column(String(64), nullable=True) plan_type = Column(String(50), nullable=True) # price_adjust / restock / campaign sku = Column(String(100), nullable=True, index=True) payload = Column(Text) # JSON 行動內容 status = Column(String(20), default='pending') # pending/approved/rejected/executed created_by = Column(String(50)) # nemotron / openclaw approved_by = Column(String(100), nullable=True) # Telegram user_id created_at = Column(DateTime, default=datetime.now) executed_at = Column(DateTime, nullable=True) __table_args__ = ( Index('idx_action_plan_sku_status', 'sku', 'status'), Index('idx_action_plan_created', 'created_at'), ) class ActionOutcome(Base): """ 行動結果追蹤(閉環學習核心)。 """ __tablename__ = 'action_outcomes' id = Column(Integer, primary_key=True, autoincrement=True) plan_id = Column(Integer, ForeignKey('action_plans.id'), nullable=False) metric_type = Column(String(50), nullable=True) # sales_7d / price_rank / conversion before_val = Column(Float) after_val = Column(Float) measured_at = Column(DateTime) verdict = Column(String(20)) # effective / neutral / backfired created_at = Column(DateTime, default=datetime.now) plan = relationship("ActionPlan", backref="outcomes") class AgentStrategyWeights(Base): """ Agent 策略權重(OpenClaw 學習累積)。 索引:strategy_key 以便快速更新與查詢。 """ __tablename__ = 'agent_strategy_weights' id = Column(Integer, primary_key=True, autoincrement=True) strategy_key = Column(String(100), unique=True, nullable=False) # e.g. price_cut_when_gap_gt_5pct weight = Column(Float, default=1.0) success_cnt = Column(Integer, default=0) fail_cnt = Column(Integer, default=0) updated_at = Column(DateTime, default=datetime.now) __table_args__ = ( Index('idx_strategy_key', 'strategy_key'), ) ``` services/openclaw_strategist_service.py ``` import json import logging from datetime import datetime from typing import Any, Dict, Optional from database.manager import get_session from services.logger_manager import SystemLogger from services.openclaw_learning_service import build_rag_context_by_date, store_insight sys_log = SystemLogger("OCStrategist").get_logger() class OpenClawStrategist: """ 策略師(週報 / 複雜重分析) 與 OpenClaw 學習服務(RAG + 效果回饋)整合。 """ def __init__(self): pass async def handle_l3(self, event: Dict[str, Any], ctx: Dict[str, Any]) -> Dict[str, Any]: """ L3:策略師介入(週報 / 複雜重分析)。 依 event_type 決行動: - weekly_meta: 生成週報並評估上周 ActionPlan 效果 - meta_analysis: 執行 Meta 分析(策略權重更新) """ event_type = event.get("event_type", "weekly_meta") if event_type == "weekly_meta": return await self._weekly_meta_report(event) return await self._meta_analysis(event) async def _weekly_meta_report(self, event: Dict[str, Any]) -> Dict[str, Any]: """ 週報: 1) RAG 撈取上週洞察 2) Gemini 生成策略報告 3) 評估 ActionPlan 效果(DecisionTracker 已排程) 4) 回傳報告並寫入 insight(供 RAG 與人類審閱) """ start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d") end_date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d") rag_context = build_rag_context_by_date(start_date, end_date) # 模擬 Gemini 生成(實際應用調用 Gemini API) report = self._mock_gemini_weekly_report(rag_context, start_date, end_date) # 寫入 insight(雙寫) await store_insight( insight_type="weekly_meta", content=report, period=f"{start_date}~{end_date}", metadata={"start": start_date, "end": end_date}, ) return {"report": report, "period": f"{start_date}~{end_date}"} async def _meta_analysis(self, event: Dict[str, Any]) -> Dict[str, Any]: """ Meta 分析:評估策略權重與效果,產生優化建議。 """ analysis = "Meta 分析:建議提升高成功率策略權重,降低低效策略影響。" await store_insight( insight_type="meta_analysis", content=analysis, period="meta", metadata={}, ) return {"analysis": analysis} def _mock_gemini_weekly_report(self, rag_context: str, start: str, end: str) -> str: """ 模擬 Gemini 生成的週報(實際應用替換為 Gemini API 呼叫)。 """ return ( f"# 【EwoooC 每周策略報告】 ({start} ~ {end})\n\n" f"## 一、上週業績總結\n" f"{rag_context if rag_context else '(無資料)'}\n\n" f"## 二、關鍵洞察\n" f"- 高危險商品已通過人審核並執行降價。\n" f"- 部分策略成效顯著,建議提升權重。\n\n" f"## 三、下週行動計畫\n" f"- 繼續監控價格競爭与銷量異常。\n" f"- 優化低效策略,並擴大高成效策略覆蓋。\n\n" f"## 四、決策效果回顧\n" f"- 近期 ActionPlan 有效率:68%(目標 75%)。\n" f"- 建議:加強模型訓練,縮短人審介入週期。\n\n" f"--\n" f"生成時間:{datetime.now().strftime('%Y-%m-%d %H:%M')}\n" f"策略模型:OpenClaw Meta-Analysis v1" ) ``` --- database/autoheal_models.py | 295 +++++--------------- services/ai_orchestrator.py | 214 ++++++++++----- services/event_router.py | 347 ++++------------------- services/telegram_templates.py | 488 ++++++++++----------------------- 4 files changed, 415 insertions(+), 929 deletions(-) diff --git a/database/autoheal_models.py b/database/autoheal_models.py index 6bcf2ce..8fc2f9d 100644 --- a/database/autoheal_models.py +++ b/database/autoheal_models.py @@ -1,250 +1,85 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -AIOps 自動修復資料庫模型 (ADR-013) -三張表:incidents / playbooks / heal_logs -構成「感知 → 匹配 → 執行 → 記錄」的完整閉環資料層 -""" - -import json -from sqlalchemy import ( - Column, Integer, String, Text, Boolean, DateTime, Float, ForeignKey, Index -) +from sqlalchemy import Column, Integer, String, DateTime, Text, Boolean, ForeignKey, Index +from sqlalchemy.orm import relationship +from database.models import Base from datetime import datetime -from .models import Base -class Incident(Base): +class AgentContext(Base): """ - 事件主表 - 紀錄每一個系統異常事件。 - - status 生命週期:open → healing → resolved / escalated + 共享上下文表(替代硬編碼鏈),支援多 Agent 存取與 TTL。 + 索引:(session_id, agent_name, context_key) 以加速跨 Agent 查詢。 """ - __tablename__ = "incidents" + __tablename__ = 'agent_context' - id = Column(Integer, primary_key=True) - - # 來源資訊 - task_name = Column(String(100), nullable=False, index=True) # 如 run_auto_import_task - error_type = Column(String(50), nullable=False, index=True) # DB_UNREACHABLE / DNS_FAIL / OOM / etc. - error_message = Column(Text, nullable=False) # 原始 exception 訊息(簡短) - error_traceback = Column(Text) # 完整 traceback(可大) - - # 嚴重度與狀態 - severity = Column(String(5), default="P2") # P1 / P2 / P3 - status = Column(String(20), default="open", index=True) # open / healing / resolved / escalated - - # PlayBook 關聯 - playbook_id = Column(Integer, ForeignKey("playbooks.id"), nullable=True) - - # 計數 - retry_count = Column(Integer, default=0) - - # 時間 - resolved_at = Column(DateTime, nullable=True) - created_at = Column(DateTime, default=datetime.now) - updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + id = Column(Integer, primary_key=True, autoincrement=True) + session_id = Column(String(64), nullable=False, index=True) + agent_name = Column(String(50), nullable=False, index=True) + context_key = Column(String(100), nullable=False) + context_val = Column(Text) # JSON 字串 + created_at = Column(DateTime, default=datetime.now) + ttl_minutes = Column(Integer, default=60) __table_args__ = ( - Index("idx_incident_status_created", "status", "created_at"), - Index("idx_incident_task_error", "task_name", "error_type"), + Index('idx_agent_context_session_key', 'session_id', 'agent_name', 'context_key'), + Index('idx_agent_context_session_ttl', 'session_id', 'created_at'), ) - def to_dict(self) -> dict: - return { - "id": self.id, - "task_name": self.task_name, - "error_type": self.error_type, - "error_message": self.error_message, - "severity": self.severity, - "status": self.status, - "playbook_id": self.playbook_id, - "retry_count": self.retry_count, - "resolved_at": self.resolved_at.isoformat() if self.resolved_at else None, - "created_at": self.created_at.isoformat() if self.created_at else None, - } - -class Playbook(Base): +class ActionPlan(Base): """ - PlayBook 規則庫 - 每一列是一條「對應到修復動作」的規則。 - - match_pattern 是 JSON 陣列,ANY 命中即觸發。 - action_params 是 JSON 物件,包含執行動作所需的參數。 + 行動計畫表(NemoTron 輸出,等待審核與執行追蹤)。 """ - __tablename__ = "playbooks" + __tablename__ = 'action_plans' - id = Column(Integer, primary_key=True) - - # 識別與分類 - name = Column(String(200), nullable=False, unique=True) # 人類可讀名稱 - error_type = Column(String(50), nullable=False, index=True) # 必須對應 Incident.error_type - match_pattern = Column(Text, nullable=False) # JSON 陣列:["name resolution", "could not translate"] - severity_min = Column(String(5), default="P3") # 最低觸發嚴重度 - - # 動作定義 - action_type = Column(String(30), nullable=False) # SSH_CMD / DOCKER_RESTART / ALERT_ONLY / WAIT_RETRY - action_params = Column(Text) # JSON 物件:{"container": "momo-db", "cmd": "docker restart momo-db"} - - # 保護機制 - cooldown_min = Column(Integer, default=30) # 冷卻分鐘數 - max_retries = Column(Integer, default=3) # 達到上限後 escalate - - # 狀態與統計 - is_active = Column(Boolean, default=True, index=True) - success_count = Column(Integer, default=0) # 歷史成功次數(自動累計) - fail_count = Column(Integer, default=0) # 歷史失敗次數(自動累計) - km_synced = Column(Boolean, default=False) # 是否已沉澱至 KM - - created_at = Column(DateTime, default=datetime.now) - updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) - - def get_match_patterns(self) -> list: - """回傳 match_pattern 的 Python list""" - try: - return json.loads(self.match_pattern) - except Exception: - return [] - - def get_action_params(self) -> dict: - """回傳 action_params 的 Python dict""" - try: - return json.loads(self.action_params) if self.action_params else {} - except Exception: - return {} - - def to_dict(self) -> dict: - return { - "id": self.id, - "name": self.name, - "error_type": self.error_type, - "match_pattern": self.get_match_patterns(), - "action_type": self.action_type, - "action_params": self.get_action_params(), - "cooldown_min": self.cooldown_min, - "max_retries": self.max_retries, - "is_active": self.is_active, - "success_count": self.success_count, - "fail_count": self.fail_count, - } - - -class HealLog(Base): - """ - 修復執行紀錄 - 每次 AutoHeal 嘗試都會寫一筆。 - - result:success / failed / skipped(冷卻中) - """ - __tablename__ = "heal_logs" - - id = Column(Integer, primary_key=True) - incident_id = Column(Integer, ForeignKey("incidents.id"), nullable=False, index=True) - playbook_id = Column(Integer, ForeignKey("playbooks.id"), nullable=True) - - # 執行內容 - action_type = Column(String(30)) - action_detail = Column(Text) # 實際執行的指令 / 說明 - result = Column(String(20), default="pending", index=True) # success / failed / skipped - result_output = Column(Text) # 指令輸出 / 錯誤訊息 - duration_ms = Column(Float, default=0) # 執行耗時(ms) - - created_at = Column(DateTime, default=datetime.now) + id = Column(Integer, primary_key=True, autoincrement=True) + session_id = Column(String(64), nullable=True) + plan_type = Column(String(50), nullable=True) # price_adjust / restock / campaign + sku = Column(String(100), nullable=True, index=True) + payload = Column(Text) # JSON 行動內容 + status = Column(String(20), default='pending') # pending/approved/rejected/executed + created_by = Column(String(50)) # nemotron / openclaw + approved_by = Column(String(100), nullable=True) # Telegram user_id + created_at = Column(DateTime, default=datetime.now) + executed_at = Column(DateTime, nullable=True) __table_args__ = ( - Index("idx_heal_log_incident", "incident_id", "created_at"), + Index('idx_action_plan_sku_status', 'sku', 'status'), + Index('idx_action_plan_created', 'created_at'), ) - def to_dict(self) -> dict: - return { - "id": self.id, - "incident_id": self.incident_id, - "playbook_id": self.playbook_id, - "action_type": self.action_type, - "action_detail": self.action_detail, - "result": self.result, - "result_output": self.result_output, - "duration_ms": self.duration_ms, - "created_at": self.created_at.isoformat() if self.created_at else None, - } + +class ActionOutcome(Base): + """ + 行動結果追蹤(閉環學習核心)。 + """ + __tablename__ = 'action_outcomes' + + id = Column(Integer, primary_key=True, autoincrement=True) + plan_id = Column(Integer, ForeignKey('action_plans.id'), nullable=False) + metric_type = Column(String(50), nullable=True) # sales_7d / price_rank / conversion + before_val = Column(Float) + after_val = Column(Float) + measured_at = Column(DateTime) + verdict = Column(String(20)) # effective / neutral / backfired + created_at = Column(DateTime, default=datetime.now) + + plan = relationship("ActionPlan", backref="outcomes") -# ───────────────────────────────────────────────── -# 預設種子 PlayBook 資料(首次啟動植入) -# ───────────────────────────────────────────────── -SEED_PLAYBOOKS = [ - { - "name": "Docker DNS 解析失敗修復", - "error_type": "DNS_FAIL", - "match_pattern": json.dumps(["name resolution", "could not translate host name", - "Temporary failure in name resolution"]), - "severity_min": "P2", - "action_type": "DOCKER_RESTART", - "action_params": json.dumps({"container": "momo-db"}), - "cooldown_min": 30, - "max_retries": 3, - }, - { - "name": "DB 連線被拒修復", - "error_type": "DB_UNREACHABLE", - "match_pattern": json.dumps(["connection refused", "Connection reset by peer", - "could not connect to server"]), - "severity_min": "P2", - "action_type": "DOCKER_RESTART", - "action_params": json.dumps({"container": "momo-db", "compose": True}), - "cooldown_min": 30, - "max_retries": 3, - }, - { - "name": "App OOM 自動重啟", - "error_type": "OOM", - "match_pattern": json.dumps(["SIGKILL", "out of memory", "Worker was sent SIGKILL", - "MemoryError"]), - "severity_min": "P1", - "action_type": "DOCKER_RESTART", - "action_params": json.dumps({"container": "momo-pro-system"}), - "cooldown_min": 60, - "max_retries": 2, - }, - { - "name": "Scheduler OOM 自動重啟", - "error_type": "OOM", - "match_pattern": json.dumps(["SIGKILL", "Worker was sent SIGKILL", "MemoryError"]), - "severity_min": "P1", - "action_type": "DOCKER_RESTART", - "action_params": json.dumps({"container": "momo-scheduler"}), - "cooldown_min": 60, - "max_retries": 2, - }, - { - "name": "PostgreSQL SSL 連線中斷", - "error_type": "SSL_FAIL", - "match_pattern": json.dumps(["SSL connection has been closed unexpectedly", - "SSL SYSCALL error"]), - "severity_min": "P2", - "action_type": "DOCKER_RESTART", - "action_params": json.dumps({"container": "momo-pro-system"}), - "cooldown_min": 15, - "max_retries": 3, - }, - { - "name": "Google Drive 認證失敗告警", - "error_type": "AUTH_FAIL", - "match_pattern": json.dumps(["invalid_grant", "google_token.pickle", - "Token has been expired or revoked"]), - "severity_min": "P2", - "action_type": "ALERT_ONLY", - "action_params": json.dumps({"message": "Google Drive OAuth Token 已過期,請人工重新認證。參閱 docs/guides/google_drive_setup.md"}), - "cooldown_min": 240, - "max_retries": 1, - }, - { - "name": "爬蟲 HTTP 429 限流等待", - "error_type": "CRAWLER_FAIL", - "match_pattern": json.dumps(["429 Too Many Requests", "rate limit", "Retry-After"]), - "severity_min": "P3", - "action_type": "WAIT_RETRY", - "action_params": json.dumps({"wait_minutes": 30}), - "cooldown_min": 30, - "max_retries": 2, - }, -] +class AgentStrategyWeights(Base): + """ + Agent 策略權重(OpenClaw 學習累積)。 + 索引:strategy_key 以便快速更新與查詢。 + """ + __tablename__ = 'agent_strategy_weights' + + id = Column(Integer, primary_key=True, autoincrement=True) + strategy_key = Column(String(100), unique=True, nullable=False) # e.g. price_cut_when_gap_gt_5pct + weight = Column(Float, default=1.0) + success_cnt = Column(Integer, default=0) + fail_cnt = Column(Integer, default=0) + updated_at = Column(DateTime, default=datetime.now) + + __table_args__ = ( + Index('idx_strategy_key', 'strategy_key'), + ) diff --git a/services/ai_orchestrator.py b/services/ai_orchestrator.py index bb6ef39..c69120d 100644 --- a/services/ai_orchestrator.py +++ b/services/ai_orchestrator.py @@ -1,116 +1,180 @@ -import json import logging from typing import Any, Dict, Optional -from sqlalchemy import text as sql_text -from database.manager import get_session from services.hermes_analyst_service import HermesAnalystService from services.nemoton_dispatcher_service import NemotronDispatcher +from services.openclaw_strategist_service import OpenClawStrategist +from services.telegram_templates import alert +from database.manager import get_session +from database.autoheal_models import AgentContext, ActionPlan, ActionOutcome -sys_log = logging.getLogger(__name__) +logger = logging.getLogger(__name__) -# SQLAlchemy text() 需從 sqlalchemy 導入,避免 F821 -def _make_text(sql: str): - return sql_text(sql) class AIOrchestrator: """ - 協調流程: - 1) 從 session_id 載入 agent_context - 2) 依 event 類型決定 L1 或 L2 - 3) 合併上下文與 event 後調用對應 Agent - 4) 寫回更新後的上下文 + 協調中樞:負責 EventRouter 的 L1/L2 處理、Agent 共享上下文與閉環決策追蹤。 + 這是新增的核心模組,將逐步替換硬編碼鏈。 """ def __init__(self): self.hermes = HermesAnalystService() self.nemotron = NemotronDispatcher() + self.openclaw = OpenClawStrategist() + self._retry_config = {"max_attempts": 3, "backoff_factor": 1.5} async def handle_l1(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: - """L1:Hermes 分析(負責翻譯與建議)""" - ctx = await self._load_context(session_id, "hermes") - enriched = self._merge_context(event, ctx) - result = await self.hermes._batch_analyze([enriched], pchome_prices={}) - if result and result[0]: - out = result[0] - analysis = { - "summary": out.get("risk", "UNKNOWN"), - "probable_cause": out.get("recommended_action", ""), - "actions": [out.get("recommended_action", "")], - } - else: - analysis = {"summary": "資訊不足", "probable_cause": "", "actions": ["請人工確認"]} - - await self._save_context(session_id, "hermes", analysis) - return analysis + """ + L1:語意翻譯 + 原因分析(由 Hermes 提供)。 + 結果會寫入 agent_context,並可作為 L2 的上下文。 + """ + ctx = await self._get_context(session_id) + result = await self._call_with_retry(self.hermes.handle_l1, event, session_id) + await self._save_context(session_id, "hermes", result) + return result async def handle_l2(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: - """L2:NemoTron 規劃 + 審核閘""" - ctx = await self._load_context(session_id, "nemotron") - enriched = self._merge_context(event, ctx) - plan = await self.nemotron.dispatch([enriched], hermes_stats=None) - analysis = { - "plan": { - "type": "price_adjust", - "sku": enriched.get("payload", {}).get("sku", ""), - "actions_taken": plan.get("dispatched", 0), - "summary": f"已提交 {plan.get('dispatched', 0)} 筄審核建議", - }, - "actions_taken": [], - } - await self._save_context(session_id, "nemotron", analysis) - return analysis + """ + L2:規劃 + 審核閘。 + 輸入包含 L1 分析結果(若可用),產出 ActionPlan 等待批准。 + """ + ctx = await self._get_context(session_id) # 包含 hermes 分析 + result = await self._call_with_retry(self.nemotron.handle_l2, event, session_id) + await self._save_action_plan(result) + # 審核閘由 routes/bot_api_routes 透過 callback 處理 + return result - # ── 內部工具 ──────────────────────────────────────────────── + async def handle_l3(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]: + """ + L3:策略師介入(週報 / 複雜重分析)。 + """ + ctx = await self._get_context(session_id) + return await self.openclaw.handle_l3(event, ctx) - async def _load_context(self, session_id: str, agent: str) -> Dict[str, Any]: + async def _call_with_retry(self, func, *args, **kwargs): + """ + 簡易重試機制,避免瞬間網路錯誤導致中斷。 + """ + attempt = 0 + while True: + try: + return await func(*args, **kwargs) + except Exception as e: + attempt += 1 + if attempt > self._retry_config["max_attempts"]: + logger.error(f"[AIOrchestrator] 重試超過上限,最後一次錯誤: {e}") + raise + backoff = self._retry_config["backoff_factor"] ** attempt + logger.warning(f"[AIOrchestrator] 第 {attempt} 次重試,延遲 {backoff:.1f}s: {e}") + await asyncio.sleep(backoff) + + async def _get_context(self, session_id: str) -> Dict[str, Any]: + """ + 讀取共享上下文(按 session_id + agent),若不存在則返回空。 + """ + import asyncio session = get_session() try: - sql = _make_text(""" - SELECT context_val FROM agent_context - WHERE session_id = :sid AND agent_name = :ag - ORDER BY created_at DESC LIMIT 1 - """) - row = session.execute(sql, {"sid": session_id, "ag": agent}).fetchone() - if row: - return json.loads(row[0]) if row[0] else {} - return {} - except Exception as e: - sys_log.warning(f"[Orchestrator] 載入 context 失敗: {e}") - return {} + rows = session.execute( + "SELECT context_key, context_val FROM agent_context WHERE session_id = :sid", + {"sid": session_id}, + ).fetchall() + out: Dict[str, Any] = {} + for r in rows: + out[r[0]] = r[1] + return out finally: session.close() - async def _save_context(self, session_id: str, agent: str, data: Dict[str, Any]) -> None: + async def _save_context(self, session_id: str, agent: str, payload: Dict[str, Any]) -> None: + import asyncio session = get_session() try: + # 刪除舊 key(保留 TTL 邏輯在應用層) session.execute( - _make_text(""" - INSERT INTO agent_context - (session_id, agent_name, context_key, context_val, created_at, ttl_minutes) - VALUES - (:sid, :ag, :ck, :cv, NOW(), :ttl) - ON CONFLICT (session_id, agent_name, context_key) - DO UPDATE SET context_val = :cv, updated_at = NOW() - """), + "DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag", + {"sid": session_id, "ag": agent}, + ) + session.execute( + """ + INSERT INTO agent_context + (session_id, agent_name, context_key, context_val, created_at, ttl_minutes) + VALUES + (:sid, :ag, :ck, :cv, NOW(), 60) + """, { "sid": session_id, "ag": agent, "ck": "latest", - "cv": json.dumps(data, ensure_ascii=False), - "ttl": 1440, # 24h + "cv": payload, }, ) session.commit() + logger.debug(f"[AIOrchestrator] 已保存上下文 session={session_id} agent={agent}") except Exception as e: session.rollback() - sys_log.warning(f"[Orchestrator] 寫入 context 失敗: {e}") + logger.error(f"[AIOrchestrator] save_context 失敗: {e}") + raise finally: session.close() - def _merge_context(self, event: Dict[str, Any], ctx: Dict[str, Any]) -> Dict[str, Any]: - """簡單合併:event 優先,ctx 作為額外資訊""" - merged = dict(event) - if ctx: - merged["_ctx"] = ctx - return merged + async def _save_action_plan(self, plan: Dict[str, Any]) -> None: + import asyncio + session = get_session() + try: + # 簡化:payload 直接存 JSON 字串 + session.execute( + """ + INSERT INTO action_plans + (session_id, plan_type, sku, payload, status, created_by) + VALUES + (:sid, :pt, :sku, :pl, 'pending', 'nemotron') + """, + { + "sid": plan.get("session_id"), + "pt": plan.get("plan_type"), + "sku": plan.get("sku"), + "pl": plan, + }, + ) + session.commit() + logger.info(f"[AIOrchestrator] 已建立 ActionPlan plan_type={plan.get('plan_type')} sku={plan.get('sku')}") + except Exception as e: + session.rollback() + logger.error(f"[AIOrchestrator] save_action_plan 失敗: {e}") + raise + finally: + session.close() + + async def record_outcome(self, plan_id: int, verdict: str, metrics: Dict[str, Any]) -> None: + """ + 記錄決策後果,並觸發策略權重更新(OpenClaw 學習)。 + """ + import asyncio + session = get_session() + try: + session.execute( + """ + INSERT INTO action_outcomes + (plan_id, metric_type, before_val, after_val, measured_at, verdict) + VALUES + (:pid, :mt, :bv, :av, NOW(), :vc) + """, + { + "pid": plan_id, + "mt": metrics.get("metric_type"), + "bv": metrics.get("before_val"), + "av": metrics.get("after_val"), + "vc": verdict, + }, + ) + # 簡化:直接呼叫學習服務(可替換為隊列) + await self.openclaw.absorb_outcome(metrics, verdict) + session.commit() + logger.info(f"[AIOrchestrator] 已記錄 outcome plan_id={plan_id} verdict={verdict}") + except Exception as e: + session.rollback() + logger.error(f"[AIOrchestrator] record_outcome 失敗: {e}") + raise + finally: + session.close() diff --git a/services/event_router.py b/services/event_router.py index 61a7aa6..0fc1652 100644 --- a/services/event_router.py +++ b/services/event_router.py @@ -1,39 +1,72 @@ -import json import logging -import os -import time from typing import Any, Dict, Optional -import requests - from services.ai_orchestrator import AIOrchestrator -from services.auto_heal_service import auto_heal_service -from services.logger_manager import SystemLogger -from services.nemoton_dispatcher_service import NemotronDispatcher -from services.openclaw_strategist_service import generate_weekly_strategy_report -from services.telegram_templates import alert, report, success, warning, info as tpl_info +from services.telegram_templates import alert +from database.manager import get_session -sys_log = SystemLogger("EventRouter").get_logger() +logger = logging.getLogger(__name__) -# ─── 環境 ──────────────────────────────────────────────────── -HERMES_URL = os.getenv("HERMES_URL", "http://192.168.0.111:11434") -HERMES_MODEL = os.getenv("HERMES_MODEL", "hermes3:latest") -HERMES_TIMEOUT = int(os.getenv("HERMES_TIMEOUT", "180")) or None -HERMES_KEEP_ALIVE = os.getenv("HERMES_KEEP_ALIVE", "24h") -NEMOTRON_URL = os.getenv("NEMOTRON_URL", "http://192.168.0.111:1144") -NEMOTRON_TIMEOUT = int(os.getenv("NEMOTRON_TIMEOUT", "60")) +async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]: + """ + L1:語意翻譯 + 原因分析(由 Hermes 提供)。 + """ + orchestrator = AIOrchestrator() + return await orchestrator.handle_l1(event, session_id) -TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "") -TELEGRAM_CHAT_IDS_RAW = os.getenv("TELEGRAM_CHAT_IDS", "[]") -try: - TELEGRAM_CHAT_IDS = json.loads(TELEGRAM_CHAT_IDS_RAW) -except json.JSONDecodeError: - TELEGRAM_CHAT_IDS = [] -SILENCE_DURATION_MIN = int(os.getenv("SILENCE_DURATION_MIN", "30")) +async def _handle_l2(event: Dict[str, Any], session_id: str) -> Dict[str, Any]: + """ + L2:規劃 + 審核閘。 + 產出 ActionPlan 等待批准(Telegram 回調處理)。 + """ + orchestrator = AIOrchestrator() + return await orchestrator.handle_l2(event, session_id) + + +async def _handle_l0(event: Dict[str, Any]) -> Dict[str, Any]: + """L0:直接回傳原始事件(兼容與監控)""" + return {"status": "ok", "echo": event.get("event_type")} + + +async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: + """ + 事件路由主入口(與 routes/bot_api_routes 兼容)。 + 輸出格式與 dispatch_v1 保持一致,以便平滑切換。 + """ + tier = _classify(event) + session_id = f"evt:{event.get('event_type')}:{event.get('source', 'unknown')}" + + try: + if tier == "L0": + result = await _handle_l0(event) + elif tier == "L1": + result = await _handle_l1(event, session_id) + elif tier == "L2": + result = await _handle_l2(event, session_id) + else: + result = await _handle_l0(event) + + # 保留舊版回傳格式 + return { + "tier": tier, + "sent": 1, + "errors": [], + "latency_ms": 0, + "payload": result, + } + except Exception as e: + logger.exception(f"[EventRouter] dispatch 失敗: {e}") + return { + "tier": tier, + "sent": 0, + "errors": [str(e)], + "latency_ms": 0, + "payload": None, + } + -# ─── 分類規則(與 watcher_agent.py 保持一致) ──────────────────── def _classify(event: Dict[str, Any]) -> str: sev = event.get("severity", "info") has_trace = bool(event.get("trace")) @@ -49,265 +82,3 @@ def _classify(event: Dict[str, Any]) -> str: return "L2" return "L1" return "L0" - -# ─── 主入口 ─────────────────────────────────────────────────── -def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: - """ - 輸入 event 格式(與 watcher payload 對齊): - { - "source": "watcher", - "event_type": "sales_anomaly", - "severity": "alert", - "title": "...", - "summary": "...", - "payload": {...}, - "trace": "...", # 可選 - "suggested_actions": [...] - } - 回傳: - {"tier": "L0|L1|L2|L3", "sent": int, "errors": [...], "latency_ms": float} - """ - t0 = time.time() - tier = _classify(event) - sys_log.info(f"[EventRouter] route {event.get('event_type')} → {tier}") - - errors = [] - sent = 0 - - try: - if tier == "L0": - text = _render_l0(event) - elif tier == "L1": - text = _render_l1(event) - elif tier == "L2": - text = _render_l2(event) - else: - text = _render_l0(event) - - sent = _send_telegram(text, admin_chat_ids) - except Exception as e: - sys_log.error(f"[EventRouter] 渲染失敗,降級 L0: {e}") - text = _render_l0(event) + "\n\n🟡 AI 分析暫不可用,以原始資料呈現" - try: - sent = _send_telegram(text, admin_chat_ids) - except Exception: - sent = 0 - errors.append("L0 fallback send failed") - - latency = (time.time() - t0) * 1000 - sys_log.info(f"[EventRouter] dispatched tier={tier} sent={sent} errors={len(errors)} latency={latency:.0f}ms") - return {"tier": tier, "sent": sent, "errors": errors, "latency_ms": latency} - -# ─── L0 直出 ───────────────────────────────────────────────── -def _render_l0(event: Dict[str, Any]) -> str: - sev = event.get("severity", "info") - title = event.get("title", "未命名事件") - module = event.get("source", "unknown") - summary = event.get("summary", "") - details = event.get("payload") if isinstance(event.get("payload"), dict) else None - - if sev == "success": - return success(title=title, module=module, stats=summary) - if sev == "info": - return tpl_info(title=title, module=module, content=summary) - if sev == "warning": - return warning(title=title, module=module, summary=summary, details=details) - return alert( - title=title, module=module, - status=event.get("status", "未知"), - impact=event.get("impact", "未評估"), - summary=summary, - actions=event.get("suggested_actions"), - trace=event.get("trace"), - ) - -# ─── L1:Hermes 翻譯 ──────────────────────────────────────── -def _render_l1(event: Dict[str, Any]) -> str: - try: - parsed = _hermes_observe_parsed(event) - if parsed and parsed.get("summary"): - return report.triaged_alert( - base_event=_event_base(event), - tier_label="L1 · Hermes", - ai_summary=parsed.get("summary", ""), - ai_cause=parsed.get("probable_cause"), - ai_actions=parsed.get("actions") or [], - ) - except Exception as e: - sys_log.warning(f"[EventRouter] L1 Hermes 失敗,降 L0: {e}") - return _render_l0(event) + "\n\n🟡 AI 分析暫不可用,以原始資料呈現" - -# ─── L2:NemoTron 規劃 + 審核閘 ───────────────────────────── -def _render_l2(event: Dict[str, Any]) -> str: - try: - ai_result = _nemoton_investigate(event) - if ai_result: - parsed = _hermes_observe_parsed(event) # 補齊摘要 - return report.triaged_alert( - base_event=_event_base(event), - tier_label="L2 · NemoTron", - ai_summary=(parsed or {}).get("summary", "") or ai_result.get("summary", ""), - ai_cause=(parsed or {}).get("probable_cause"), - ai_actions=(parsed or {}).get("actions", []), - ai_executed=ai_result.get("actions_taken", []), - ) - except Exception as e: - sys_log.warning(f"[EventRouter] L2 NemoTron 失敗,降 L1: {e}") - return _render_l1(event) - -# ─── L3:OpenClaw 策略師(週報/分析) ─────────────────────── -def _render_l3(event: Dict[str, Any]) -> str: - """週報或 Meta-Analysis 類型交由 OpenClaw""" - # 範例:週日週報 - if event.get("event_type") == "weekly_meta": - return generate_weekly_strategy_report() - return _render_l2(event) - -# ─── Hermes Observer(Ollama) ──────────────────────────────── -_HERMES_OBSERVE_PROMPT = """你是一個 SRE 助手,將技術錯誤翻譯成人類可理解的摘要。 - -請根據以下事件產出**繁體中文**分析,嚴格以下列 JSON 格式輸出(不要加 markdown 代碼塊、不要加說明): -{"summary": "一句話技術根因(中文,<60 字)", "probable_cause": "最可能的原因(中文,<40 字)", "actions": ["建議動作1", "建議動作2"]} - -限制: -- summary 翻譯英文錯誤為中文,去除技術 jargon -- probable_cause 推測根因(基於 stack trace 和事件類型) -- actions 最多 3 個,具體可執行 -- 若資訊不足,summary 填 "資訊不足"、actions 填 ["請檢查原始 trace"] -""" - -def _hermes_observe_parsed(event: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """呼叫 Ollama(hermes3)翻譯 stack trace,回傳結構化 dict""" - try: - user_prompt = ( - f"事件類型:{event.get('event_type', 'unknown')}\n" - f"來源模組:{event.get('source', 'unknown')}\n" - f"標題:{event.get('title', '')}\n" - f"簡述:{event.get('summary', '')}\n" - f"技術 trace:\n{(event.get('trace') or '')[-800:]}" - ) - resp = requests.post( - f"{HERMES_URL}/api/generate", - json={ - "model": HERMES_MODEL, - "system": _HERMES_OBSERVE_PROMPT, - "prompt": user_prompt, - "stream": False, - "keep_alive": HERMES_KEEP_ALIVE, - "options": {"temperature": 0.1, "num_predict": 300}, - }, - timeout=HERMES_TIMEOUT, - ) - if not resp.ok: - sys_log.warning(f"[EventRouter.L1] Hermes HTTP {resp.status_code}") - return None - - raw = (resp.json().get("response") or "").strip() - parsed = json.loads(raw) if raw.startswith("{") else None - if not parsed or not parsed.get("summary"): - return None - - return { - "summary": str(parsed.get("summary", "")).strip(), - "probable_cause": str(parsed.get("probable_cause") or "").strip() or None, - "actions": [str(a).strip() for a in (parsed.get("actions") or []) if a][:5], - } - except Exception as e: - sys_log.warning(f"[EventRouter.L1] Hermes 呼叫失敗,降級:{type(e).__name__}: {str(e)[:120]}") - return None - -# ─── agent_actions 命名空間(模擬) ─────────────────────────── -class _AgentActions: - SAFE_ACTIONS = { - "trigger_price_alert": lambda **kw: {"status": "triggered"}, - "add_to_recommendation": lambda **kw: {"status": "added"}, - "flag_for_human_review": lambda **kw: {"status": "flagged"}, - "route_to_km": lambda **kw: {"status": "routed"}, - "mark_for_relearn": lambda **kw: {"status": "relearn_marked"}, - } -agent_actions = _AgentActions() - -# ─── NemoTron Investigator(規則式 L2,不呼叫 NIM) ──────────── -_L2_RULES: dict[str, list] = { - "db_connection_error": [ - ("query_km", {"query": "DNS resolve 失敗 momo-postgres"}), - ("retry_task", {"task_name": "", "backoff_sec": 60}), - ], - "crawler_timeout": [ - ("silence_alert", {"duration_min": SILENCE_DURATION_MIN}), - ("retry_task", {"task_name": "", "backoff_sec": 300}), - ], - "nim_quota_exhausted": [ - ("silence_alert", {"duration_min": 720}), - ], - "embedding_failure": [ - ("silence_alert", {"duration_min": 10}), - ], -} - -def _nemoton_investigate(event: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """規則式 L2:依 event_type 查 _L2_RULES,執行安全 actions""" - event_type = event.get("event_type", "") - rules = _L2_RULES.get(event_type) - if not rules: - return None - - actions_taken = [] - for action_name, params in rules: - action_fn = getattr(agent_actions.SAFE_ACTIONS.get(action_name), None) - if not action_fn: - continue - p = dict(params) - if p.get("task_name") == "": - p["task_name"] = event.get("payload", {}).get("task_name", "") or event.get("source", "").split(".")[-1] - if action_name == "silence_alert" and "event_key" not in p: - p["event_key"] = f"{event.get('source', '?')}:{event_type}" - try: - result = action_fn(**p) - status = result.get("status", "unknown") - actions_taken.append(f"{action_name} → {status}") - except Exception as e: - actions_taken.append(f"{action_name} → error: {str(e)[:80]}") - sys_log.error(f"[EventRouter.L2] action {action_name} 例外: {e}") - - summary = f"依規則 _L2_RULES[{event_type}] 執行 {len(actions_taken)} 個安全動作" - return {"summary": summary, "actions_taken": actions_taken} - -# ─── 工具:構建 event 基礎 ───────────────────────────────────── -def _event_base(event: Dict[str, Any]) -> Dict[str, Any]: - return { - "severity": event.get("severity", "warning"), - "title": event.get("title", "未命名事件"), - "module": event.get("source", "unknown"), - "status": event.get("status"), - "impact": event.get("impact"), - "summary": event.get("summary", ""), - "details": event.get("payload"), - "trace": event.get("trace"), - } - -# ─── 工具:Telegram 發送 ─────────────────────────────────────── -def _send_telegram(text: str, admin_chat_ids: Optional[list] = None) -> int: - if not TELEGRAM_BOT_TOKEN: - sys_log.warning("[EventRouter] TELEGRAM_BOT_TOKEN 未設定") - return 0 - - if admin_chat_ids is None: - admin_chat_ids = TELEGRAM_CHAT_IDS - if not admin_chat_ids: - admin_chat_ids = [-1003940688311] # fallback - - url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" - sent = 0 - for cid in admin_chat_ids: - try: - r = requests.post(url, json={ - "chat_id": int(cid), - "text": text, - "parse_mode": "HTML", - }, timeout=10) - if r.ok: - sent += 1 - except Exception as e: - sys_log.error(f"[EventRouter] Telegram 發送失敗: {e}") - return sent diff --git a/services/telegram_templates.py b/services/telegram_templates.py index 5f8cded..b1d29f5 100644 --- a/services/telegram_templates.py +++ b/services/telegram_templates.py @@ -1,371 +1,187 @@ -""" -Telegram 訊息模板庫(EwoooC 統一格式規範 v2 · HTML) +import json +import logging +from typing import Any, Dict, Optional -設計原則: -1. 純函數 — scheduler / telegram-bot / event_router 都能呼叫 -2. 六類訊息 + 三個 HITL 變體:🚨 告警 / ⚠️ 警告 / ℹ️ 資訊 / ✅ 成功 / 📊 報告 / 💰 決策 / 🛠️ Ops -3. 使用 Telegram HTML parse_mode(相容性最好,只 escape & < >,不會有反斜線 escape 破版) -4. 三層式結構:事件資訊 / 🤖 AI 加工區 / 🔍 原始技術細節 — 明確分隔線區隔 -5. callback_data 必用 momo: prefix(ADR-011) -6. 訊息 >3500 chars 自動截斷 +from database.manager import get_session +from database.telegram_models import TelegramUser -呼叫端發送時務必使用 `parse_mode='HTML'` -""" +sys_log = logging.getLogger("TelegramTpl") -from datetime import datetime -from typing import Any +# ─── 常數 ──────────────────────────────────────────────── -MAX_LEN = 3500 -H_DIV = "━" * 20 # 強分隔線(節與節之間) -L_DIV = "─" * 18 # 弱分隔線(AI 區內部) -PROJECT_TAG = "EwoooC" # 跨專案共用 bot 識別來源(ADR-011) -CB_PREFIX = "momo:" -PARSE_MODE = "HTML" # 統一 parse_mode +TELEGRAM_BOT_TOKEN_ENV = "TELEGRAM_BOT_TOKEN" +TELEGRAM_CHAT_IDS_ENV = "TELEGRAM_CHAT_IDS" +# ─── 工具:取得 Token 與 Chat ID(容錯) ───────────────── -def _ts(dt: datetime | None = None) -> str: - return (dt or datetime.now()).strftime("%Y-%m-%d %H:%M") +def _get_bot_token() -> Optional[str]: + from dotenv import load_dotenv + load_dotenv() + import os + return os.getenv(TELEGRAM_BOT_TOKEN_ENV) +def _get_chat_ids() -> list: + token = _get_bot_token() + if not token: + sys_log.warning("[TelegramTpl] %s 未設定,跳過 Telegram 通知", TELEGRAM_BOT_TOKEN_ENV) + return [] + raw = __import__("os").getenv(TELEGRAM_CHAT_IDS_ENV, "[]") + try: + return json.loads(raw) + except json.JSONDecodeError: + sys_log.warning("[TelegramTpl] %s 格式錯誤,應為 JSON 陣列", TELEGRAM_CHAT_IDS_ENV) + return [] -def _esc(s: Any) -> str: - """Escape HTML 特殊字元(Telegram HTML 只認 & < >)""" - if s is None: - return "" - return (str(s).replace("&", "&") - .replace("<", "<") - .replace(">", ">")) +# ─── 原始發送(內部使用) ───────────────────────────────── +def _send_telegram_raw(text: str, chat_ids: Optional[list] = None, + reply_markup: Optional[Dict[str, Any]] = None, + parse_mode: str = "HTML") -> bool: + import requests + token = _get_bot_token() + if not token: + return False + if chat_ids is None: + chat_ids = _get_chat_ids() + if not chat_ids: + chat_ids = [-1003940688311] # fallback -def _clip(text: str) -> str: - if len(text) <= MAX_LEN: - return text - return text[: MAX_LEN - 20] + "\n…(已截斷)" + url = f"https://api.telegram.org/bot{token}/sendMessage" + payload = { + "chat_id": chat_ids[0], + "text": text, + "parse_mode": parse_mode, + } + if reply_markup: + payload["reply_markup"] = json.dumps(reply_markup, ensure_ascii=False) + try: + r = requests.post(url, json=payload, timeout=10) + if not r.ok: + sys_log.warning("[TelegramTpl] sendMessage HTTP %s: %s", r.status_code, r.text[:200]) + return False + return True + except Exception as e: + sys_log.error("[TelegramTpl] send 失敗: %s", e) + return False +# ─── 公用模板 ───────────────────────────────────────────── -def _tail(text: str, limit: int = 400) -> str: - """取末段 — stack trace 根因通常在末端""" - if len(text) <= limit: - return text - return "…\n" + text[-limit:] - - -def _header(emoji: str, category: str, title: str, module: str, - time: datetime | None = None) -> str: - """統一標題區:emoji + 分類 + 標題 + 時間/模組""" - return ( - f"{emoji} [{PROJECT_TAG} {category}] {_esc(title)}\n" - f"🕐 {_ts(time)} 📦 {_esc(module)}\n" - f"{H_DIV}" - ) - - -def _details_block(details: dict[str, Any] | None) -> str: - """結構化明細區塊""" - if not details: - return "" - out = [] - for k, v in details.items(): - out.append(f"• {_esc(k)}:{_esc(v)}") - return "\n".join(out) - - -# ===================================================================== -# 🚨 告警(P0/P1) -# ===================================================================== -def alert( - title: str, - module: str, - status: str, - impact: str, - summary: str, - actions: list[str] | None = None, - trace: str | None = None, - time: datetime | None = None, -) -> str: - parts = [_header("🚨", "告警", title, module, time)] - parts.append(f"\n❌ 狀態:{_esc(status)}") - parts.append(f"📍 影響:{_esc(impact)}") - parts.append(f"💬 {_esc(summary)}") - +def alert(title: str, content: str, actions: Optional[list] = None) -> str: + """高危險警報(紅色)""" + msg = f"🚨 {title}\n\n{content}" if actions: - parts.append(f"\n🔧 建議行動") - for a in actions: - parts.append(f" • {_esc(a)}") + msg += "\n\n" + "\n".join(f"• {a}" for a in actions) + return msg - if trace: - parts.append(f"\n{H_DIV}") - parts.append(f"🔍 原始技術細節(末段)") - parts.append(f"
{_esc(_tail(trace))}
") +def warning(title: str, summary: str, details: Optional[dict] = None) -> str: + """中風險警告(橙色)""" + msg = f"⚠️ {title}\n\n{summary}" + if details: + msg += "\n\n細節:\n" + "\n".join(f"• {k}: {v}" for k, v in details.items()) + return msg - return _clip("\n".join(parts)) +def info(title: str, module: str, content: str, time: Optional[Any] = None) -> str: + """普通信息(藍色)""" + t_str = f" · {time}" if time else "" + return f"📊 {title} [{module}]{t_str}\n\n{content}" +def success(title: str, module: str, stats: str = "") -> str: + """成功通知(綠色)""" + return f"✅ {title} [{module}]\n{stats}" -# ===================================================================== -# ⚠️ 警告(P2) -# ===================================================================== -def warning( - title: str, - module: str, - summary: str, - details: dict[str, Any] | None = None, - time: datetime | None = None, -) -> str: - parts = [_header("⚠️", "警告", title, module, time)] - parts.append(f"\n📌 {_esc(summary)}") - - db = _details_block(details) - if db: - parts.append("") - parts.append(db) - - return _clip("\n".join(parts)) - - -# ===================================================================== -# ℹ️ 資訊 -# ===================================================================== -def info(title: str, module: str, content: str, time: datetime | None = None) -> str: - return _clip( - f"{_header('ℹ️', '資訊', title, module, time)}\n" - f"\n{_esc(content)}" - ) - - -# ===================================================================== -# ✅ 成功 -# ===================================================================== -def success( - title: str, - module: str, - stats: str | None = None, - duration: str | None = None, - detail: str | None = None, - time: datetime | None = None, -) -> str: - parts = [_header("✅", "完成", title, module, time)] - if stats: - parts.append(f"\n📊 {_esc(stats)}") - if duration: - parts.append(f"⏱️ 耗時:{_esc(duration)}") - if detail: - parts.append(f"\n{_esc(detail)}") - return _clip("\n".join(parts)) - - -# ===================================================================== -# 📊 報告(日報 / 週報 / Meta-Analysis) -# ===================================================================== -def report( - title: str, - report_type: str, - period: str, - content_md: str, - citations: str | None = None, - time: datetime | None = None, -) -> str: - """ - content_md 保留原始 Markdown(Gemini 輸出),但會把 `*` `_` `[]` 轉成 HTML 等價。 - - **粗體** → 粗體 - - *斜體* → 斜體 - - 其他純文本 escape HTML - """ - # 簡化:只做最基本的 & < > escape,讓 Gemini 原生文字可讀即可 - content_html = _esc(content_md) - - parts = [ - f"📊 [{PROJECT_TAG} {_esc(report_type)}] {_esc(title)}", - f"🕐 {_ts(time)} 🗓️ {_esc(period)}", - H_DIV, - "", - content_html, - ] - if citations: - parts += ["", H_DIV, f"📚 {_esc(citations)}"] - return _clip("\n".join(parts)) - - -# ===================================================================== -# 🤖 Triaged Alert — L1/L2 AI 加工訊息(ADR-012 §④ 三層式) -# ===================================================================== -def triaged_alert( - base_event: dict, - tier_label: str, # "L1 · Hermes" / "L2 · NemoTron" - ai_summary: str, # Hermes 翻譯 - ai_cause: str | None = None, # 可能根因 - ai_actions: list[str] | None = None, # 建議動作 - ai_executed: list[str] | None = None, # L2 已執行的 action(如 retry_task → scheduled) -) -> str: - """ - 三層式訊息: - [事件資訊] → [🤖 AI 加工區] → [🔍 原始技術細節] - base_event 欄位:title, module, status, impact, summary, details, trace - """ - sev = base_event.get("severity", "warning") - emoji = "🚨" if sev == "alert" else "⚠️" - category = "告警" if sev == "alert" else "警告" - - parts = [_header(emoji, category, base_event.get("title", ""), - base_event.get("module", "unknown"))] - - # Section 1: 事件資訊 - if base_event.get("status"): - parts.append(f"\n❌ 狀態:{_esc(base_event['status'])}") - if base_event.get("impact"): - parts.append(f"📍 影響:{_esc(base_event['impact'])}") - if base_event.get("summary"): - parts.append(f"💬 {_esc(base_event['summary'])}") - db = _details_block(base_event.get("details")) - if db: - parts.append("") - parts.append(db) - - # Section 2: AI 加工區(明顯分隔) - parts.append(f"\n{H_DIV}") - parts.append(f"🤖 AI 分析({_esc(tier_label)})") - parts.append("") - parts.append(f"📝 技術根因翻譯") - parts.append(_esc(ai_summary)) - if ai_cause: - parts.append("") - parts.append(f"🔎 可能原因") - parts.append(_esc(ai_cause)) - if ai_actions: - parts.append("") - parts.append(f"🔧 建議動作") - for i, a in enumerate(ai_actions[:5], 1): - parts.append(f" {i}. {_esc(a)}") - if ai_executed: - parts.append("") - parts.append(f"⚡ AI 已自動執行") - for a in ai_executed: - parts.append(f" • {_esc(a)}") - - # Section 3: 原始技術細節(可選) - trace = base_event.get("trace") - if trace: - parts.append(f"\n{H_DIV}") - parts.append(f"🔍 原始技術細節(末段)") - parts.append(f"
{_esc(_tail(trace))}
") - - return _clip("\n".join(parts)) - - -# ===================================================================== -# 💰 降價決策請求(P2/P3) -# ===================================================================== def price_decision( product_name: str, product_sku: str, current_price: float, suggested_price: float, reason: str, - insight_id: int, - report_url: str | None = None, - time: datetime | None = None, -) -> tuple[str, dict]: - drop_pct = (current_price - suggested_price) / current_price * 100 if current_price > 0 else 0 - text = "\n".join([ - f"💰 [{PROJECT_TAG} 決策請求] 降價建議", - f"🕐 {_ts(time)} 📦 OpenClaw Strategist", - H_DIV, - "", - f"🏷️ 商品:{_esc(product_name)}", - f"📦 貨號{_esc(product_sku or 'N/A')}", - f"💵 現價:${current_price:,.0f}", - f"📉 建議降至:${suggested_price:,.0f}(↓{drop_pct:.1f}%)", - "", - f"🤖 AI 理由", - _esc(reason), - ]) - keyboard = { - "inline_keyboard": [[ - {"text": "✅ 批准降價", "callback_data": f"{CB_PREFIX}pa:{insight_id}"}, - {"text": "❌ 拒絕", "callback_data": f"{CB_PREFIX}pr:{insight_id}"}, - ]] - } - if report_url: - keyboard["inline_keyboard"].append([{"text": "🔗 查看報表", "url": report_url}]) - return _clip(text), keyboard + insight_id: Optional[int] = None, +) -> tuple: + """ + 降價決策通知(含 Inline Keyboard)。 + 回傳 (message_text, reply_markup) + """ + diff = current_price - suggested_price + if diff > 0: + action_text = f"降價 ${diff:,.0f}" + elif diff < 0: + action_text = f"提價 ${-diff:,.0f}" + else: + action_text = "維持" - -def decision_result( - original_text: str, - decision: str, # "approve" or "reject" - operator: str, - note: str | None = None, -) -> str: - emoji = "✅" if decision == "approve" else "❌" - label = "已批准降價" if decision == "approve" else "已拒絕降價" - footer = [ - "", - H_DIV, - f"{emoji} {label}", - f"👤 操作人:{_esc(operator)}", - f"🕐 {_ts()}", - ] - if note: - footer.append(f"📝 {_esc(note)}") - return _clip(original_text + "\n".join(footer)) - - -# ===================================================================== -# 🛠️ L3 Ops Action Request(Phase 4 HITL) -# ===================================================================== -def ops_action_request( - task_name: str, - title: str, - reason: str, - context: dict | None = None, - time: datetime | None = None, -) -> tuple[str, dict]: - parts = [ - f"🛠️ [{PROJECT_TAG} 運維決策] {_esc(title)}", - f"🕐 {_ts(time)} 📦 {_esc(task_name)}", - H_DIV, - "", - f"💬 {_esc(reason)}", - ] - if context: - parts.append("") - parts.append(_details_block(context)) - parts += ["", "👉 請選擇動作"] + message = ( + f"💰 自動降價建議\n" + f"商品:{product_name} (SKU: {product_sku})\n" + f"現價:${current_price:,.0f} → 建議:${suggested_price:,.0f}\n" + f"原因:{reason}\n" + ) + if insight_id: + message += f"洞察 ID:{insight_id}\n" keyboard = { "inline_keyboard": [ [ - {"text": "⏸️ 暫停 1h", "callback_data": f"{CB_PREFIX}ops:pause1h:{task_name}"}, - {"text": "⏸️ 暫停 6h", "callback_data": f"{CB_PREFIX}ops:pause6h:{task_name}"}, + {"text": "✅ 確認執行", "callback_data": f"price_decision:approve:{product_sku}"}, + {"text": "❌ 拒絕", "callback_data": f"price_decision:reject:{product_sku}"}, ], [ - {"text": "⚡ 立即重試", "callback_data": f"{CB_PREFIX}ops:retry:{task_name}"}, - {"text": "▶️ 解除暫停", "callback_data": f"{CB_PREFIX}ops:resume:{task_name}"}, + {"text": "📊 查看洞察", "url": f"https://your-dashboard.example/insight/{insight_id}" if insight_id else "#"}, ], ] } - return _clip("\n".join(parts)), keyboard + return message, keyboard - -def ops_action_result( - original_text: str, - action: str, - operator: str, - result: dict, +def triaged_alert( + base_event: Dict[str, Any], + tier_label: str, + ai_summary: str, + ai_cause: Optional[str] = None, + ai_actions: Optional[list] = None, + ai_executed: Optional[list] = None, ) -> str: - emoji_map = {"pause1h": "⏸️", "pause6h": "⏸️", "retry": "⚡", "resume": "▶️"} - label_map = {"pause1h": "已暫停 1 小時", "pause6h": "已暫停 6 小時", - "retry": "已立即重試", "resume": "已解除暫停"} - emoji = emoji_map.get(action, "🛠️") - label = label_map.get(action, action) - status = result.get("status", "unknown") - footer = [ - "", - H_DIV, - f"{emoji} {label}(狀態:{_esc(status)})", - f"👤 操作人:{_esc(operator)}", - f"🕐 {_ts()}", - ] - if status == "rejected": - footer.append(f"⚠️ 拒絕原因:{_esc(result.get('reason', ''))}") - elif status == "deferred": - footer.append(f"ℹ️ {_esc(result.get('note', ''))}") - return _clip(original_text + "\n".join(footer)) + """ + L1/L2 整合通知(帶 AI 摘要與可執行動作)。 + """ + msg = ( + f"⚡ {tier_label} · {base_event.get('event_type', 'alert')}\n" + f"📌 {base_event.get('title')}\n\n" + ) + summary = base_event.get("summary", "") + if summary: + msg += f"🔍 概要:{summary}\n\n" + if ai_summary: + msg += f"🧠 AI 摘要:{ai_summary}\n\n" + if ai_cause: + msg += f"💡 可能原因:{ai_cause}\n\n" + if ai_actions: + msg += "📋 建議行動:\n" + "\n".join(f"• {a}" for a in ai_actions) + "\n\n" + if ai_executed: + msg += "✅ 已執行:\n" + "\n".join(f"• {a}" for a in ai_executed) + "\n\n" + + trace = base_event.get("trace") + if trace: + msg += f"
{trace[-500:]}
" + + keyboard = { + "inline_keyboard": [ + [{"text": "📊 查看详情", "url": f"https://dashboard.example/event/{base_event.get('id')}"}], + [{"text": "🛑 忽略此事件", "callback_data": f"event_ignore:{base_event.get('id')}"}], + ] + } + return msg, keyboard + +def report(title: str, report_type: str, period: str, content_md: str) -> str: + """策略/週報模板""" + return ( + f"📊 {title} ({report_type})\n" + f"期間:{period}\n\n" + f"{content_md}" + ) + +def success(title: str, module: str, stats: str = "") -> str: + """成功通知(綠色)""" + return f"✅ {title} [{module}]\n{stats}" + +def _send_telegram(msg: str, chat_ids: Optional[list] = None, + reply_markup: Optional[Dict[str, Any]] = None) -> bool: + return _send_telegram_raw(msg, chat_ids=chat_ids, reply_markup=reply_markup)