[V10.4-C] 三 AI NLP 鏈修復:Hermes/NemoTron/OpenClaw 全線串通

修復 P9-2 確認的自然語言對話完全失效問題:

- services/ai_orchestrator.py:
    4 處裸字串 SQL 全部包裝 text(),修復 SQLAlchemy 2.x ArgumentError
- services/hermes_analyst_service.py:
    新增 handle_l1() async 方法(Ollama hermes3 意圖分析 + rule-based fallback)
    asyncio.get_event_loop() → get_running_loop()(Py3.12+ 相容)
- services/nemoton_dispatcher_service.py:
    新增 handle_l2() async 方法(純 Python routing,不消耗 NIM 配額)
- services/openclaw_strategist_service.py:
    新增 generate_strategy_response()(Gemini 2.0 Flash,無 key 時優雅降級)
- telegram_ai_integration.py:
    整合 OpenClaw 為第三層(complexity >= 0.7 或 dispatch_to == "openclaw")
    _format_*_response 全改為繁體中文
    asyncio.get_event_loop() → get_running_loop()
    _extract_date_range "to" → "至"

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ogt
2026-04-25 01:43:20 +08:00
parent 3f7fc0aba0
commit 5ef4151fa5
5 changed files with 312 additions and 34 deletions

View File

@@ -3,6 +3,8 @@ import asyncio
import logging
from typing import Any, Dict, Optional
from sqlalchemy import text
from services.hermes_analyst_service import HermesAnalystService
from services.nemoton_dispatcher_service import NemotronDispatcher
from database.manager import get_session
@@ -46,7 +48,7 @@ class AIOrchestrator:
session = get_session()
try:
rows = session.execute(
"SELECT context_key, context_val FROM agent_context WHERE session_id = :sid",
text("SELECT context_key, context_val FROM agent_context WHERE session_id = :sid"),
{"sid": session_id},
).fetchall()
return {r[0]: r[1] for r in rows}
@@ -57,16 +59,16 @@ class AIOrchestrator:
session = get_session()
try:
session.execute(
"DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag",
text("DELETE FROM agent_context WHERE session_id = :sid AND agent_name = :ag"),
{"sid": session_id, "ag": agent},
)
session.execute(
"""
text("""
INSERT INTO agent_context
(session_id, agent_name, context_key, context_val, created_at, ttl_minutes)
VALUES
(:sid, :ag, :ck, :cv, NOW(), 60)
""",
"""),
{
"sid": session_id,
"ag": agent,
@@ -86,12 +88,12 @@ class AIOrchestrator:
session = get_session()
try:
session.execute(
"""
text("""
INSERT INTO action_plans
(session_id, plan_type, sku, payload, status, created_by)
VALUES
(:sid, :pt, :sku, :pl, 'pending', 'nemotron')
""",
"""),
{
"sid": plan.get("session_id"),
"pt": plan.get("plan_type"),

View File

@@ -12,6 +12,7 @@ Hermes 3 競價情報分析服務 (Module 2)
SQL漏斗 → [本服務] → NemotronDispatcher → Telegram 告警
"""
import asyncio
import json
import logging
import re
@@ -86,6 +87,131 @@ class HermesAnalystService:
def __init__(self, engine=None):
self.engine = engine # SQLAlchemy engine可外部注入
# ──────────────────────────────────────────────
# L1 意圖分析介面(給 EventRouter / Telegram NLP 使用)
# ──────────────────────────────────────────────
async def handle_l1(self, event: dict, ctx: dict) -> dict:
"""L1 語意理解:輸入 event含 message輸出 intent / complexity_score / requires_data_fetch。
Contract:
event: {"message": str, "user_id": int, "chat_id": int, ...}
ctx: 既有 agent_context可為空 dict
Returns:
{
"intent": str, # greeting / help / query_sales / analyze_competitor / report / unknown
"confidence": float,
"complexity_score": float, # 0.0~1.0
"requires_data_fetch": bool,
"preliminary_answer": str,
"metadata": dict,
}
實作策略:
1) 先用 Hermes3 LLM 做意圖分類timeout 短、temp 低)
2) Ollama 掛掉時降級為關鍵字規則引擎,保證永遠回傳結構化結果
3) 任何例外都被 graceful 吞掉,不會讓 handle_message 整個炸
"""
message = (event or {}).get("message", "") or ""
llm_result = None
try:
llm_result = await asyncio.get_running_loop().run_in_executor(
None, self._call_hermes_intent, message
)
except Exception as e:
logger.warning(f"[Hermes.handle_l1] LLM 意圖分析失敗(降級規則引擎): {e}")
if llm_result:
return llm_result
return self._rule_based_intent(message)
def _call_hermes_intent(self, message: str) -> Optional[dict]:
"""呼叫 Hermes3 做輕量意圖分類(非批量分析)。失敗回 None 由上層降級。"""
if not message.strip():
return self._rule_based_intent(message)
system = (
"你是一位繁體中文意圖分類器。讀取使用者訊息,輸出單行 JSON"
"嚴禁任何解釋文字與 markdown 標記。欄位:\n"
'{"intent": "greeting|help|query_sales|analyze_competitor|'
'report|product_info|unknown", "confidence": 0.0~1.0, '
'"complexity_score": 0.0~1.0, "requires_data_fetch": true|false, '
'"preliminary_answer": "若為 greeting/help 請回一句繁中問候,否則空字串"}\n'
"規則greeting/help 類 complexity_score<=0.3;涉及數據、報告、日期、"
"品牌、競品對比者 complexity_score>=0.7 且 requires_data_fetch=true。"
)
payload = {
"model": HERMES_MODEL,
"system": system,
"prompt": f"使用者訊息:{message}\n輸出 JSON",
"stream": False,
"options": {"temperature": 0.1},
}
try:
resp = requests.post(
f"{HERMES_URL}/api/generate",
json=payload,
timeout=15, # 意圖分類不必等 120s
)
resp.raise_for_status()
raw = (resp.json().get("response", "") or "").strip()
if raw.startswith("```"):
raw = re.sub(r"^```(?:json)?\s*", "", raw, flags=re.MULTILINE)
raw = re.sub(r"\s*```\s*$", "", raw.strip(), flags=re.MULTILINE).strip()
data = json.loads(raw)
return {
"intent": data.get("intent", "unknown"),
"confidence": float(data.get("confidence", 0.5)),
"complexity_score": float(data.get("complexity_score", 0.5)),
"requires_data_fetch": bool(data.get("requires_data_fetch", False)),
"preliminary_answer": data.get("preliminary_answer", "") or "",
"metadata": {"source": "hermes_llm"},
}
except Exception as e:
logger.info(f"[Hermes.intent] 降級規則引擎({type(e).__name__}: {e}")
return None
def _rule_based_intent(self, message: str) -> dict:
"""Ollama 掛掉時的規則引擎 fallback — 永遠返回結構化結果。"""
msg = (message or "").lower().strip()
if not msg:
return {
"intent": "unknown", "confidence": 0.3, "complexity_score": 0.1,
"requires_data_fetch": False, "preliminary_answer": "",
"metadata": {"source": "rule_empty"},
}
greetings = ("hi", "hello", "哈囉", "你好", "", "早安", "午安", "晚安")
helps = ("help", "幫助", "說明", "怎麼用", "功能")
data_keywords = (
"業績", "銷量", "銷售", "營收", "排名", "威脅", "競品", "報表",
"週報", "月報", "日報", "2024", "2025", "2026",
"ppt", "report", "compare", "vs", "分析", "趨勢",
)
if any(g in msg for g in greetings):
return {
"intent": "greeting", "confidence": 0.9, "complexity_score": 0.1,
"requires_data_fetch": False,
"preliminary_answer": "您好!我是 MOMO Pro 智能助理,請問需要什麼協助?",
"metadata": {"source": "rule"},
}
if any(h in msg for h in helps):
return {
"intent": "help", "confidence": 0.85, "complexity_score": 0.2,
"requires_data_fetch": False,
"preliminary_answer": "我可以協助您查詢業績、商品、競品情報與產出報告。",
"metadata": {"source": "rule"},
}
if any(k in msg for k in data_keywords):
return {
"intent": "query_sales", "confidence": 0.7, "complexity_score": 0.8,
"requires_data_fetch": True, "preliminary_answer": "",
"metadata": {"source": "rule"},
}
return {
"intent": "unknown", "confidence": 0.4, "complexity_score": 0.5,
"requires_data_fetch": False, "preliminary_answer": "",
"metadata": {"source": "rule"},
}
# ──────────────────────────────────────────────
# Step 1SQL 漏斗 — 從 226萬筆壓縮到 ~300 筆候選
# ──────────────────────────────────────────────

View File

@@ -916,6 +916,63 @@ class NemotronDispatcher:
# ──────────────────────────────────────────────
# 公開介面
# ──────────────────────────────────────────────
async def handle_l2(self, event: dict, ctx: dict) -> dict:
"""L2 行動規劃介面(給 EventRouter / Telegram NLP 使用)。
Contract:
event: {"message": str, "user_id": int, "chat_id": int, ...}
ctx: 包含 Hermes handle_l1 結果(在 "latest" key 或 flatten 後)
Returns:
{
"action_plan": [{"action": str, "params": dict}, ...],
"dispatch_to": "openclaw|direct_response|human_review",
"metadata": dict,
}
策略:
本方法不呼叫 NIM批量 tool calling 專用),改以 L1 intent + 訊息關鍵字
決定下一步路由。複雜分析 → openclaw簡單回覆 → direct_response。
任何例外都回 direct_response 保底。
"""
try:
message = (event or {}).get("message", "") or ""
# ctx 可能是 {"latest": {...}} 或已攤平的 intent 結果
hermes = {}
if isinstance(ctx, dict):
hermes = ctx.get("latest") if isinstance(ctx.get("latest"), dict) else ctx
intent = (hermes or {}).get("intent", "unknown")
complexity = float((hermes or {}).get("complexity_score", 0.0) or 0.0)
needs_data = bool((hermes or {}).get("requires_data_fetch", False))
msg_lower = message.lower()
report_keywords = ("report", "ppt", "週報", "月報", "日報", "報表", "報告")
wants_report = any(k in msg_lower for k in report_keywords)
if wants_report or complexity >= 0.7 or needs_data or intent in (
"query_sales", "analyze_competitor", "report"
):
dispatch_to = "openclaw"
else:
dispatch_to = "direct_response"
action_plan = [{
"action": "strategist_analyze" if dispatch_to == "openclaw" else "reply_simple",
"params": {"message": message, "intent": intent},
}]
return {
"action_plan": action_plan,
"dispatch_to": dispatch_to,
"metadata": {"complexity_score": complexity, "intent": intent},
}
except Exception as e:
logger.warning(f"[NemotronDispatcher.handle_l2] 規劃失敗 fallback direct_response: {e}")
return {
"action_plan": [{"action": "reply_simple", "params": {}}],
"dispatch_to": "direct_response",
"metadata": {"error": str(e)},
}
def dispatch(self, threats: list, hermes_stats: Optional[dict] = None) -> dict:
"""
主入口:接收 Hermes 威脅清單,透過 NIM 決策後執行語意化告警

View File

@@ -45,9 +45,60 @@ __all__ = [
"generate_weekly_strategy_report",
"generate_monthly_report",
"generate_meta_analysis_report",
"generate_strategy_response",
]
# ═══════════════════════════════════════════════════════════════════════════════
# Telegram NLP 互動入口(輕量查詢,不走完整報告管線)
# ═══════════════════════════════════════════════════════════════════════════════
def generate_strategy_response(query: str, context: Optional[Dict[str, Any]] = None) -> str:
"""給 Telegram NLP 使用的輕量策略回覆。
Contract:
query: 使用者自然語言訊息(繁體中文)
context: 可選,{"intent": str, "user_id": int, ...}
Returns:
繁體中文回覆字串。GEMINI_API_KEY 未設或呼叫失敗時,回降級訊息
(永遠回字串,不拋例外,由呼叫端顯示於 Telegram
"""
q = (query or "").strip()
if not q:
return "請輸入您的問題,例如:本週業績趨勢、競品價差分析、產出週報 PPT。"
if not GEMINI_API_KEY:
return (
"OpenClaw 策略師目前離線(未設定 GEMINI_API_KEY\n"
"您可直接輸入以下指令取得報告:\n"
"• /daily — 每日業績\n"
"• /weekly — 週報\n"
"• /threats — 最新競價威脅\n"
"• /help — 完整功能說明"
)
system_prompt = (
"你是 MOMO Pro 電商情報策略師。以繁體中文(台灣用語)回覆使用者。"
"嚴禁簡體字,嚴禁空洞套話。若使用者要求的資料需即時查詢,"
"請告知使用者相關可用指令(例如 /daily、/weekly、/threats"
"回覆長度控制在 500 字內,可用 Markdown 條列。"
)
user_prompt = f"使用者問題:{q}\n上下文:{json.dumps(context or {}, ensure_ascii=False)}"
try:
text_reply = _call_gemini(system_prompt, user_prompt, temperature=0.5)
except Exception as e:
logger.error("[OpenClaw] generate_strategy_response 例外:%s", e)
text_reply = None
if not text_reply:
return (
"策略師暫時無法回覆(模型呼叫逾時或失敗)。\n"
"您可改用:/daily、/weekly、/threats 取得結構化報告。"
)
return text_reply
# ═══════════════════════════════════════════════════════════════════════════════
# DB 數據讀取層
# ═══════════════════════════════════════════════════════════════════════════════

View File

@@ -9,6 +9,7 @@ import asyncio
import logging
from typing import Dict, Any, Optional
from services.ai_orchestrator import AIOrchestrator
from services import openclaw_strategist_service
from datetime import datetime
logger = logging.getLogger(__name__)
@@ -49,16 +50,46 @@ class TelegramAIIntegration:
# L1: Semantic understanding (Hermes)
l1_result = await self.orchestrator.handle_l1(event, session_id)
# Check if this is a complex query requiring L2 processing
if self._is_complex_query(user_message, l1_result):
# L2: Planning and execution (Nemotron)
l2_result = await self.orchestrator.handle_l2(event, session_id)
dispatch_to = (l2_result or {}).get("dispatch_to", "direct_response")
complexity = float((l1_result or {}).get("complexity_score", 0.0) or 0.0)
# 若 L2 判定走 OpenClaw 或複雜度 >= 0.7 → 呼叫策略師補真實繁中洞察
if dispatch_to == "openclaw" or complexity >= 0.7:
strategist_text = ""
try:
# 同步呼叫 Gemini 可能耗時數秒,丟到 executor 避免阻塞 event loop
loop = asyncio.get_running_loop()
strategist_text = await loop.run_in_executor(
None,
openclaw_strategist_service.generate_strategy_response,
user_message,
{
"intent": (l1_result or {}).get("intent"),
"user_id": user_id,
"chat_id": chat_id,
},
)
except Exception as e:
logger.warning(f"[TelegramAIIntegration] OpenClaw 策略師呼叫失敗: {e}")
strategist_text = ""
response = self._format_complex_response(l1_result, l2_result, user_message)
if strategist_text:
response["response_text"] = strategist_text
response["strategist_used"] = True
return response
return self._format_complex_response(l1_result, l2_result, user_message)
else:
# Simple query, handle directly
return self._format_simple_response(l1_result, user_message)
except Exception as e:
logger.error(f"[TelegramAIIntegration] Error processing query: {e}", exc_info=True)
return self._format_error_response(user_message)
@@ -95,26 +126,30 @@ class TelegramAIIntegration:
intent = l1_result.get("intent", "unknown")
confidence = l1_result.get("confidence", 0.0)
# Traditional Chinese responses based on intent
# 繁體中文(台灣)回應模板
responses = {
"greeting": {
"text": "Hello! I am the MOMO Pro Assistant. How can I help you today?",
"zh_tw": "Hello! I am the MOMO Pro Assistant. How can I help you today?",
"suggestions": ["Check today's sales", "View product rankings", "Market intelligence"]
"zh_tw": "您好!我是 MOMO Pro 智能助理,今天需要什麼協助?",
"suggestions": ["查看今日業績", "商品排行榜", "市場情報摘要"],
},
"help": {
"text": "I can help you with sales queries, product information, market intelligence, and more. Please use the menu or ask specific questions.",
"zh_tw": "I can help you with sales queries, product information, market intelligence, and more. Please use the menu or ask specific questions.",
"suggestions": ["Sales performance", "Product trends", "Market analysis"]
"zh_tw": (
"我可以協助您處理業績查詢、商品資訊、市場情報、產出報告等需求。\n"
"請直接輸入問題,或使用選單選擇功能。"
),
"suggestions": ["業績表現", "商品趨勢", "市場分析"],
},
"unknown": {
"text": "I'm processing your request. Please use the menu options for specific functions.",
"zh_tw": "I'm processing your request. Please use the menu options for specific functions.",
"suggestions": ["View main menu", "Check sales data", "Product analysis"]
}
"zh_tw": "收到您的訊息,若需特定功能,請從選單選擇或使用 /help 查看可用指令。",
"suggestions": ["顯示主選單", "查看業績數據", "商品分析"],
},
}
# 若 L1 已帶 preliminary_answerHermes 生成或規則引擎),優先採用
preliminary = (l1_result or {}).get("preliminary_answer", "") or ""
response_data = responses.get(intent, responses["unknown"])
if preliminary:
response_data = {**response_data, "zh_tw": preliminary}
return {
"success": True,
@@ -135,15 +170,22 @@ class TelegramAIIntegration:
date_range = self._extract_date_range(original_message)
brands = self._extract_brands(original_message)
# Traditional Chinese response
response_text = f"Processing your {query_type} request"
# 繁體中文(台灣)回應
query_type_zh = {
"sales analysis": "業績分析",
"product analysis": "商品分析",
"market intelligence": "市場情報",
"report generation": "報告產製",
"comparative analysis": "比較分析",
"general query": "一般查詢",
}.get(query_type, query_type)
response_text = f"正在處理您的「{query_type_zh}」需求"
if date_range:
response_text += f" for period {date_range}"
response_text += f",期間:{date_range}"
if brands:
response_text += f" for brands: {', '.join(brands)}"
response_text += ". I'm preparing the analysis..."
response_text += f",品牌:{', '.join(brands)}"
response_text += "。分析準備中,稍候片刻…"
return {
"success": True,
@@ -162,14 +204,14 @@ class TelegramAIIntegration:
return {
"success": False,
"type": "error_response",
"response_text": "Sorry, I encountered an error processing your request. Please try using the menu options.",
"response_text": "抱歉,處理您的訊息時發生問題,請改用選單功能或稍後再試。",
"error_suggestions": [
"Check today's sales performance",
"View product rankings",
"Market intelligence summary",
"Use /help for available commands"
"查看今日業績",
"商品排行榜",
"市場情報摘要",
"使用 /help 查看可用指令",
],
"show_menu": True
"show_menu": True,
}
def _extract_query_type(self, message: str) -> str:
@@ -196,7 +238,7 @@ class TelegramAIIntegration:
if match:
start = match.group(1).replace('/', '-').replace('.', '-')
end = match.group(2).replace('/', '-').replace('.', '-')
return f"{start} to {end}"
return f"{start} {end}"
return None