diff --git a/services/ai_orchestrator.py b/services/ai_orchestrator.py index 704740e..c5e180b 100644 --- a/services/ai_orchestrator.py +++ b/services/ai_orchestrator.py @@ -3,6 +3,8 @@ import asyncio import logging from typing import Any, Dict, Optional +from sqlalchemy import text + from services.hermes_analyst_service import HermesAnalystService from services.nemoton_dispatcher_service import NemotronDispatcher from database.manager import get_session @@ -46,7 +48,7 @@ class AIOrchestrator: session = get_session() try: rows = session.execute( - "SELECT context_key, context_val FROM agent_context WHERE session_id = :sid", + text("SELECT context_key, context_val FROM agent_context WHERE session_id = :sid"), {"sid": session_id}, ).fetchall() return {r[0]: r[1] for r in rows} @@ -57,16 +59,16 @@ class AIOrchestrator: session = get_session() try: session.execute( - "DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag", + text("DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag"), {"sid": session_id, "ag": agent}, ) session.execute( - """ + text(""" INSERT INTO agent_context (session_id, agent_name, context_key, context_val, created_at, ttl_minutes) VALUES (:sid, :ag, :ck, :cv, NOW(), 60) - """, + """), { "sid": session_id, "ag": agent, @@ -86,12 +88,12 @@ class AIOrchestrator: session = get_session() try: session.execute( - """ + text(""" INSERT INTO action_plans (session_id, plan_type, sku, payload, status, created_by) VALUES (:sid, :pt, :sku, :pl, 'pending', 'nemotron') - """, + """), { "sid": plan.get("session_id"), "pt": plan.get("plan_type"), diff --git a/services/hermes_analyst_service.py b/services/hermes_analyst_service.py index 85b53d9..1f4015b 100644 --- a/services/hermes_analyst_service.py +++ b/services/hermes_analyst_service.py @@ -12,6 +12,7 @@ Hermes 3 競價情報分析服務 (Module 2) SQL漏斗 → [本服務] → NemotronDispatcher → Telegram 告警 """ +import asyncio import json import logging import re @@ -86,6 +87,131 @@ class HermesAnalystService: def __init__(self, engine=None): self.engine = engine # SQLAlchemy engine,可外部注入 + # ────────────────────────────────────────────── + # L1 意圖分析介面(給 EventRouter / Telegram NLP 使用) + # ────────────────────────────────────────────── + async def handle_l1(self, event: dict, ctx: dict) -> dict: + """L1 語意理解:輸入 event(含 message),輸出 intent / complexity_score / requires_data_fetch。 + + Contract: + event: {"message": str, "user_id": int, "chat_id": int, ...} + ctx: 既有 agent_context(可為空 dict) + Returns: + { + "intent": str, # greeting / help / query_sales / analyze_competitor / report / unknown + "confidence": float, + "complexity_score": float, # 0.0~1.0 + "requires_data_fetch": bool, + "preliminary_answer": str, + "metadata": dict, + } + + 實作策略: + 1) 先用 Hermes3 LLM 做意圖分類(timeout 短、temp 低) + 2) Ollama 掛掉時降級為關鍵字規則引擎,保證永遠回傳結構化結果 + 3) 任何例外都被 graceful 吞掉,不會讓 handle_message 整個炸 + """ + message = (event or {}).get("message", "") or "" + llm_result = None + try: + llm_result = await asyncio.get_running_loop().run_in_executor( + None, self._call_hermes_intent, message + ) + except Exception as e: + logger.warning(f"[Hermes.handle_l1] LLM 意圖分析失敗(降級規則引擎): {e}") + + if llm_result: + return llm_result + return self._rule_based_intent(message) + + def _call_hermes_intent(self, message: str) -> Optional[dict]: + """呼叫 Hermes3 做輕量意圖分類(非批量分析)。失敗回 None 由上層降級。""" + if not message.strip(): + return self._rule_based_intent(message) + + system = ( + "你是一位繁體中文意圖分類器。讀取使用者訊息,輸出單行 JSON," + "嚴禁任何解釋文字與 markdown 標記。欄位:\n" + '{"intent": "greeting|help|query_sales|analyze_competitor|' + 'report|product_info|unknown", "confidence": 0.0~1.0, ' + '"complexity_score": 0.0~1.0, "requires_data_fetch": true|false, ' + '"preliminary_answer": "若為 greeting/help 請回一句繁中問候,否則空字串"}\n' + "規則:greeting/help 類 complexity_score<=0.3;涉及數據、報告、日期、" + "品牌、競品對比者 complexity_score>=0.7 且 requires_data_fetch=true。" + ) + payload = { + "model": HERMES_MODEL, + "system": system, + "prompt": f"使用者訊息:{message}\n輸出 JSON:", + "stream": False, + "options": {"temperature": 0.1}, + } + try: + resp = requests.post( + f"{HERMES_URL}/api/generate", + json=payload, + timeout=15, # 意圖分類不必等 120s + ) + resp.raise_for_status() + raw = (resp.json().get("response", "") or "").strip() + if raw.startswith("```"): + raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.MULTILINE) + raw = re.sub(r"\s*```\s*$", "", raw.strip(), flags=re.MULTILINE).strip() + data = json.loads(raw) + return { + "intent": data.get("intent", "unknown"), + "confidence": float(data.get("confidence", 0.5)), + "complexity_score": float(data.get("complexity_score", 0.5)), + "requires_data_fetch": bool(data.get("requires_data_fetch", False)), + "preliminary_answer": data.get("preliminary_answer", "") or "", + "metadata": {"source": "hermes_llm"}, + } + except Exception as e: + logger.info(f"[Hermes.intent] 降級規則引擎({type(e).__name__}: {e})") + return None + + def _rule_based_intent(self, message: str) -> dict: + """Ollama 掛掉時的規則引擎 fallback — 永遠返回結構化結果。""" + msg = (message or "").lower().strip() + if not msg: + return { + "intent": "unknown", "confidence": 0.3, "complexity_score": 0.1, + "requires_data_fetch": False, "preliminary_answer": "", + "metadata": {"source": "rule_empty"}, + } + greetings = ("hi", "hello", "哈囉", "你好", "嗨", "早安", "午安", "晚安") + helps = ("help", "幫助", "說明", "怎麼用", "功能") + data_keywords = ( + "業績", "銷量", "銷售", "營收", "排名", "威脅", "競品", "報表", + "週報", "月報", "日報", "2024", "2025", "2026", + "ppt", "report", "compare", "vs", "分析", "趨勢", + ) + if any(g in msg for g in greetings): + return { + "intent": "greeting", "confidence": 0.9, "complexity_score": 0.1, + "requires_data_fetch": False, + "preliminary_answer": "您好!我是 MOMO Pro 智能助理,請問需要什麼協助?", + "metadata": {"source": "rule"}, + } + if any(h in msg for h in helps): + return { + "intent": "help", "confidence": 0.85, "complexity_score": 0.2, + "requires_data_fetch": False, + "preliminary_answer": "我可以協助您查詢業績、商品、競品情報與產出報告。", + "metadata": {"source": "rule"}, + } + if any(k in msg for k in data_keywords): + return { + "intent": "query_sales", "confidence": 0.7, "complexity_score": 0.8, + "requires_data_fetch": True, "preliminary_answer": "", + "metadata": {"source": "rule"}, + } + return { + "intent": "unknown", "confidence": 0.4, "complexity_score": 0.5, + "requires_data_fetch": False, "preliminary_answer": "", + "metadata": {"source": "rule"}, + } + # ────────────────────────────────────────────── # Step 1:SQL 漏斗 — 從 226萬筆壓縮到 ~300 筆候選 # ────────────────────────────────────────────── diff --git a/services/nemoton_dispatcher_service.py b/services/nemoton_dispatcher_service.py index 47dcb21..126f641 100644 --- a/services/nemoton_dispatcher_service.py +++ b/services/nemoton_dispatcher_service.py @@ -916,6 +916,63 @@ class NemotronDispatcher: # ────────────────────────────────────────────── # 公開介面 # ────────────────────────────────────────────── + async def handle_l2(self, event: dict, ctx: dict) -> dict: + """L2 行動規劃介面(給 EventRouter / Telegram NLP 使用)。 + + Contract: + event: {"message": str, "user_id": int, "chat_id": int, ...} + ctx: 包含 Hermes handle_l1 結果(在 "latest" key 或 flatten 後) + Returns: + { + "action_plan": [{"action": str, "params": dict}, ...], + "dispatch_to": "openclaw|direct_response|human_review", + "metadata": dict, + } + + 策略: + 本方法不呼叫 NIM(批量 tool calling 專用),改以 L1 intent + 訊息關鍵字 + 決定下一步路由。複雜分析 → openclaw;簡單回覆 → direct_response。 + 任何例外都回 direct_response 保底。 + """ + try: + message = (event or {}).get("message", "") or "" + # ctx 可能是 {"latest": {...}} 或已攤平的 intent 結果 + hermes = {} + if isinstance(ctx, dict): + hermes = ctx.get("latest") if isinstance(ctx.get("latest"), dict) else ctx + + intent = (hermes or {}).get("intent", "unknown") + complexity = float((hermes or {}).get("complexity_score", 0.0) or 0.0) + needs_data = bool((hermes or {}).get("requires_data_fetch", False)) + + msg_lower = message.lower() + report_keywords = ("report", "ppt", "週報", "月報", "日報", "報表", "報告") + wants_report = any(k in msg_lower for k in report_keywords) + + if wants_report or complexity >= 0.7 or needs_data or intent in ( + "query_sales", "analyze_competitor", "report" + ): + dispatch_to = "openclaw" + else: + dispatch_to = "direct_response" + + action_plan = [{ + "action": "strategist_analyze" if dispatch_to == "openclaw" else "reply_simple", + "params": {"message": message, "intent": intent}, + }] + return { + "action_plan": action_plan, + "dispatch_to": dispatch_to, + "metadata": {"complexity_score": complexity, "intent": intent}, + } + except Exception as e: + logger.warning(f"[NemotronDispatcher.handle_l2] 規劃失敗 fallback direct_response: {e}") + return { + "action_plan": [{"action": "reply_simple", "params": {}}], + "dispatch_to": "direct_response", + "metadata": {"error": str(e)}, + } + def dispatch(self, threats: list, hermes_stats: Optional[dict] = None) -> dict: """ 主入口:接收 Hermes 威脅清單,透過 NIM 決策後執行語意化告警 diff --git a/services/openclaw_strategist_service.py b/services/openclaw_strategist_service.py index 628f4ad..a3e197e 100644 --- a/services/openclaw_strategist_service.py +++ b/services/openclaw_strategist_service.py @@ -45,9 +45,60 @@ __all__ = [ "generate_weekly_strategy_report", "generate_monthly_report", "generate_meta_analysis_report", + "generate_strategy_response", ] +# ═══════════════════════════════════════════════════════════════════════════════ +# Telegram NLP 互動入口(輕量查詢,不走完整報告管線) +# ═══════════════════════════════════════════════════════════════════════════════ + +def generate_strategy_response(query: str, context: Optional[Dict[str, Any]] = None) -> str: + """給 Telegram NLP 使用的輕量策略回覆。 + + Contract: + query: 使用者自然語言訊息(繁體中文) + context: 可選,{"intent": str, "user_id": int, ...} + Returns: + 繁體中文回覆字串。GEMINI_API_KEY 未設或呼叫失敗時,回降級訊息 + (永遠回字串,不拋例外,由呼叫端顯示於 Telegram)。 + """ + q = (query or "").strip() + if not q: + return "請輸入您的問題,例如:本週業績趨勢、競品價差分析、產出週報 PPT。" + + if not GEMINI_API_KEY: + return ( + "OpenClaw 策略師目前離線(未設定 GEMINI_API_KEY)。\n" + "您可直接輸入以下指令取得報告:\n" + "• /daily — 每日業績\n" + "• /weekly — 週報\n" + "• /threats — 最新競價威脅\n" + "• /help — 完整功能說明" + ) + + system_prompt = ( + "你是 MOMO Pro 電商情報策略師。以繁體中文(台灣用語)回覆使用者。" + "嚴禁簡體字,嚴禁空洞套話。若使用者要求的資料需即時查詢," + "請告知使用者相關可用指令(例如 /daily、/weekly、/threats)。" + "回覆長度控制在 500 字內,可用 Markdown 條列。" + ) + user_prompt = f"使用者問題:{q}\n上下文:{json.dumps(context or {}, ensure_ascii=False)}" + + try: + text_reply = _call_gemini(system_prompt, user_prompt, temperature=0.5) + except Exception as e: + logger.error("[OpenClaw] generate_strategy_response 例外:%s", e) + text_reply = None + + if not text_reply: + return ( + "策略師暫時無法回覆(模型呼叫逾時或失敗)。\n" + "您可改用:/daily、/weekly、/threats 取得結構化報告。" + ) + return text_reply + + # ═══════════════════════════════════════════════════════════════════════════════ # DB 數據讀取層 # ═══════════════════════════════════════════════════════════════════════════════ diff --git a/telegram_ai_integration.py b/telegram_ai_integration.py index f574a3d..17f97b0 100644 --- a/telegram_ai_integration.py +++ b/telegram_ai_integration.py @@ -9,6 +9,7 @@ import asyncio import logging from typing import Dict, Any, Optional from services.ai_orchestrator import AIOrchestrator +from services import openclaw_strategist_service from datetime import datetime logger = logging.getLogger(__name__) @@ -49,16 +50,46 @@ class TelegramAIIntegration: # L1: Semantic understanding (Hermes) l1_result = await self.orchestrator.handle_l1(event, session_id) - + # Check if this is a complex query requiring L2 processing if self._is_complex_query(user_message, l1_result): # L2: Planning and execution (Nemotron) l2_result = await self.orchestrator.handle_l2(event, session_id) + + dispatch_to = (l2_result or {}).get("dispatch_to", "direct_response") + complexity = float((l1_result or {}).get("complexity_score", 0.0) or 0.0) + + # 若 L2 判定走 OpenClaw 或複雜度 >= 0.7 → 呼叫策略師補真實繁中洞察 + if dispatch_to == "openclaw" or complexity >= 0.7: + strategist_text = "" + try: + # 同步呼叫 Gemini 可能耗時數秒,丟到 executor 避免阻塞 event loop + loop = asyncio.get_running_loop() + strategist_text = await loop.run_in_executor( + None, + openclaw_strategist_service.generate_strategy_response, + user_message, + { + "intent": (l1_result or {}).get("intent"), + "user_id": user_id, + "chat_id": chat_id, + }, + ) + except Exception as e: + logger.warning(f"[TelegramAIIntegration] OpenClaw 策略師呼叫失敗: {e}") + strategist_text = "" + + response = self._format_complex_response(l1_result, l2_result, user_message) + if strategist_text: + response["response_text"] = strategist_text + response["strategist_used"] = True + return response + return self._format_complex_response(l1_result, l2_result, user_message) else: # Simple query, handle directly return self._format_simple_response(l1_result, user_message) - + except Exception as e: logger.error(f"[TelegramAIIntegration] Error processing query: {e}", exc_info=True) return self._format_error_response(user_message) @@ -95,26 +126,30 @@ class TelegramAIIntegration: intent = l1_result.get("intent", "unknown") confidence = l1_result.get("confidence", 0.0) - # Traditional Chinese responses based on intent + # 繁體中文(台灣)回應模板 responses = { "greeting": { - "text": "Hello! I am the MOMO Pro Assistant. How can I help you today?", - "zh_tw": "Hello! I am the MOMO Pro Assistant. How can I help you today?", - "suggestions": ["Check today's sales", "View product rankings", "Market intelligence"] + "zh_tw": "您好!我是 MOMO Pro 智能助理,今天需要什麼協助?", + "suggestions": ["查看今日業績", "商品排行榜", "市場情報摘要"], }, "help": { - "text": "I can help you with sales queries, product information, market intelligence, and more. Please use the menu or ask specific questions.", - "zh_tw": "I can help you with sales queries, product information, market intelligence, and more. Please use the menu or ask specific questions.", - "suggestions": ["Sales performance", "Product trends", "Market analysis"] + "zh_tw": ( + "我可以協助您處理業績查詢、商品資訊、市場情報、產出報告等需求。\n" + "請直接輸入問題,或使用選單選擇功能。" + ), + "suggestions": ["業績表現", "商品趨勢", "市場分析"], }, "unknown": { - "text": "I'm processing your request. Please use the menu options for specific functions.", - "zh_tw": "I'm processing your request. Please use the menu options for specific functions.", - "suggestions": ["View main menu", "Check sales data", "Product analysis"] - } + "zh_tw": "收到您的訊息,若需特定功能,請從選單選擇或使用 /help 查看可用指令。", + "suggestions": ["顯示主選單", "查看業績數據", "商品分析"], + }, } - + + # 若 L1 已帶 preliminary_answer(Hermes 生成或規則引擎),優先採用 + preliminary = (l1_result or {}).get("preliminary_answer", "") or "" response_data = responses.get(intent, responses["unknown"]) + if preliminary: + response_data = {**response_data, "zh_tw": preliminary} return { "success": True, @@ -135,15 +170,22 @@ class TelegramAIIntegration: date_range = self._extract_date_range(original_message) brands = self._extract_brands(original_message) - # Traditional Chinese response - response_text = f"Processing your {query_type} request" - + # 繁體中文(台灣)回應 + query_type_zh = { + "sales analysis": "業績分析", + "product analysis": "商品分析", + "market intelligence": "市場情報", + "report generation": "報告產製", + "comparative analysis": "比較分析", + "general query": "一般查詢", + }.get(query_type, query_type) + + response_text = f"正在處理您的「{query_type_zh}」需求" if date_range: - response_text += f" for period {date_range}" + response_text += f",期間:{date_range}" if brands: - response_text += f" for brands: {', '.join(brands)}" - - response_text += ". I'm preparing the analysis..." + response_text += f",品牌:{', '.join(brands)}" + response_text += "。分析準備中,稍候片刻…" return { "success": True, @@ -162,14 +204,14 @@ class TelegramAIIntegration: return { "success": False, "type": "error_response", - "response_text": "Sorry, I encountered an error processing your request. Please try using the menu options.", + "response_text": "抱歉,處理您的訊息時發生問題,請改用選單功能或稍後再試。", "error_suggestions": [ - "Check today's sales performance", - "View product rankings", - "Market intelligence summary", - "Use /help for available commands" + "查看今日業績", + "商品排行榜", + "市場情報摘要", + "使用 /help 查看可用指令", ], - "show_menu": True + "show_menu": True, } def _extract_query_type(self, message: str) -> str: @@ -196,7 +238,7 @@ class TelegramAIIntegration: if match: start = match.group(1).replace('/', '-').replace('.', '-') end = match.group(2).replace('/', '-').replace('.', '-') - return f"{start} to {end}" + return f"{start} 至 {end}" return None