fix: 修復 AI 對話無法使用 + 全面繁體中文化
All checks were successful
CD Pipeline / deploy (push) Successful in 1m28s

- telegram_ai_integration.py 移至 services/ 解決 ModuleNotFoundError
  (momo-telegram-bot 只掛載 services/,根目錄檔案進不了容器)
- import 路徑更新為 from services.telegram_ai_integration
- 所有英文回覆字串改為繁體中文:
  · 歡迎訊息、fallback 訊息、錯誤提示
  · _enhanced_keyword_matching 全英文段落
  · _handle_complex_ai_response / _handle_simple_ai_response
  · Cancel 按鈕改「 取消」、callback 改 menu:main

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ogt
2026-04-25 09:41:55 +08:00
parent d5c0feab5e
commit 4ff291b596
2 changed files with 315 additions and 36 deletions

View File

@@ -0,0 +1,282 @@
#!/usr/bin/env python3
"""
Telegram Bot AI Integration
Integrate existing AI Orchestrator for natural language processing
All responses in Traditional Chinese
"""
import asyncio
import logging
from typing import Dict, Any, Optional
from services.ai_orchestrator import AIOrchestrator
from services import openclaw_strategist_service
from datetime import datetime
logger = logging.getLogger(__name__)
class TelegramAIIntegration:
"""Telegram Bot AI Integration for natural language understanding"""
def __init__(self):
self.orchestrator = AIOrchestrator()
async def process_natural_language_query(self, user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]:
"""
Process natural language query using existing AI infrastructure
Args:
user_message: User's message in Traditional Chinese
user_id: Telegram user ID
chat_id: Telegram chat ID
Returns:
Response dictionary with Traditional Chinese content
"""
try:
# Create session ID based on user and chat
session_id = f"tg_{user_id}_{chat_id}"
# Prepare event for AI processing
event = {
"type": "telegram_query",
"source": "telegram_bot",
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"chat_id": chat_id,
"message": user_message,
"language": "zh-TW", # Traditional Chinese
"context": "telegram_group_chat"
}
# L1: Semantic understanding (Hermes)
l1_result = await self.orchestrator.handle_l1(event, session_id)
if not l1_result or l1_result.get("metadata", {}).get("source") != "hermes_llm":
logger.warning(
f"[TelegramAIIntegration] Hermes LLM 未回應,走規則引擎降級"
f"session={session_id} source={( l1_result or {}).get('metadata', {}).get('source', 'none')}"
)
# Check if this is a complex query requiring L2 processing
if self._is_complex_query(user_message, l1_result):
# L2: Planning and execution (Nemotron)
l2_result = await self.orchestrator.handle_l2(event, session_id)
dispatch_to = (l2_result or {}).get("dispatch_to", "direct_response")
complexity = float((l1_result or {}).get("complexity_score", 0.0) or 0.0)
# 若 L2 判定走 OpenClaw 或複雜度 >= 0.7 → 呼叫策略師補真實繁中洞察
if dispatch_to == "openclaw" or complexity >= 0.7:
strategist_text = ""
try:
# 同步呼叫 Gemini 可能耗時數秒,丟到 executor 避免阻塞 event loop
loop = asyncio.get_running_loop()
strategist_text = await loop.run_in_executor(
None,
openclaw_strategist_service.generate_strategy_response,
user_message,
{
"intent": (l1_result or {}).get("intent"),
"user_id": user_id,
"chat_id": chat_id,
},
)
except Exception as e:
logger.error(
f"[TelegramAIIntegration] OpenClaw 策略師呼叫失敗"
f"{type(e).__name__}: {e}",
exc_info=True,
)
strategist_text = ""
response = self._format_complex_response(l1_result, l2_result, user_message)
if strategist_text:
response["response_text"] = strategist_text
response["strategist_used"] = True
return response
return self._format_complex_response(l1_result, l2_result, user_message)
else:
# Simple query, handle directly
return self._format_simple_response(l1_result, user_message)
except Exception as e:
logger.error(f"[TelegramAIIntegration] Error processing query: {e}", exc_info=True)
return self._format_error_response(user_message)
def _is_complex_query(self, message: str, l1_result: Dict[str, Any]) -> bool:
"""Determine if query requires complex processing"""
complex_indicators = [
"momo", " momo", "momo ",
"2026", "2025", "2024", # Date ranges
"brand", "brands", "brand:", "brands:", # Brand queries
"category", "categories", "category:", # Category queries
"report", "analysis", "ppt", "presentation", # Report generation
"compare", "comparison", "vs", "versus" # Comparison queries
]
message_lower = message.lower()
# Check for complex indicators
for indicator in complex_indicators:
if indicator in message_lower:
return True
# Check L1 analysis result
if l1_result.get("complexity_score", 0) > 0.7:
return True
if l1_result.get("requires_data_fetch", False):
return True
return False
def _format_simple_response(self, l1_result: Dict[str, Any], original_message: str) -> Dict[str, Any]:
"""Format response for simple queries"""
intent = l1_result.get("intent", "unknown")
confidence = l1_result.get("confidence", 0.0)
# 繁體中文(台灣)回應模板
responses = {
"greeting": {
"zh_tw": "您好!我是 MOMO Pro 智能助理,今天需要什麼協助?",
"suggestions": ["查看今日業績", "商品排行榜", "市場情報摘要"],
},
"help": {
"zh_tw": (
"我可以協助您處理業績查詢、商品資訊、市場情報、產出報告等需求。\n"
"請直接輸入問題,或使用選單選擇功能。"
),
"suggestions": ["業績表現", "商品趨勢", "市場分析"],
},
"unknown": {
"zh_tw": "收到您的訊息,若需特定功能,請從選單選擇或使用 /help 查看可用指令。",
"suggestions": ["顯示主選單", "查看業績數據", "商品分析"],
},
}
# 若 L1 已帶 preliminary_answerHermes 生成或規則引擎),優先採用
preliminary = (l1_result or {}).get("preliminary_answer", "") or ""
response_data = responses.get(intent, responses["unknown"])
if preliminary:
response_data = {**response_data, "zh_tw": preliminary}
return {
"success": True,
"type": "simple_response",
"intent": intent,
"confidence": confidence,
"response_text": response_data["zh_tw"],
"suggestions": response_data["suggestions"],
"show_menu": intent == "unknown"
}
def _format_complex_response(self, l1_result: Dict[str, Any], l2_result: Dict[str, Any], original_message: str) -> Dict[str, Any]:
"""Format response for complex queries requiring data fetching"""
action_plan = l2_result.get("action_plan", {})
# Extract relevant information
query_type = self._extract_query_type(original_message)
date_range = self._extract_date_range(original_message)
brands = self._extract_brands(original_message)
# 繁體中文(台灣)回應
query_type_zh = {
"sales analysis": "業績分析",
"product analysis": "商品分析",
"market intelligence": "市場情報",
"report generation": "報告產製",
"comparative analysis": "比較分析",
"general query": "一般查詢",
}.get(query_type, query_type)
response_text = f"正在處理您的「{query_type_zh}」需求"
if date_range:
response_text += f",期間:{date_range}"
if brands:
response_text += f",品牌:{', '.join(brands)}"
response_text += "。分析準備中,稍候片刻…"
return {
"success": True,
"type": "complex_response",
"query_type": query_type,
"date_range": date_range,
"brands": brands,
"action_plan": action_plan,
"response_text": response_text,
"requires_processing": True,
"processing_status": "queued"
}
def _format_error_response(self, original_message: str) -> Dict[str, Any]:
"""Format error response in Traditional Chinese"""
return {
"success": False,
"type": "error_response",
"response_text": "抱歉,處理您的訊息時發生問題,請改用選單功能或稍後再試。",
"error_suggestions": [
"查看今日業績",
"商品排行榜",
"市場情報摘要",
"使用 /help 查看可用指令",
],
"show_menu": True,
}
def _extract_query_type(self, message: str) -> str:
"""Extract type of query from message"""
if any(word in message.lower() for word in ["sales", "revenue", "performance"]):
return "sales analysis"
elif any(word in message.lower() for word in ["product", "brand", "item"]):
return "product analysis"
elif any(word in message.lower() for word in ["market", "trend", "intelligence"]):
return "market intelligence"
elif any(word in message.lower() for word in ["report", "ppt", "presentation"]):
return "report generation"
elif any(word in message.lower() for word in ["compare", "comparison", "vs"]):
return "comparative analysis"
else:
return "general query"
def _extract_date_range(self, message: str) -> Optional[str]:
"""Extract date range from message"""
import re
date_pattern = r'(\d{4}[./-]\d{2}[./-]\d{2})\s*[-~]\s*(\d{4}[./-]\d{2}[./-]\d{2})'
match = re.search(date_pattern, message)
if match:
start = match.group(1).replace('/', '-').replace('.', '-')
end = match.group(2).replace('/', '-').replace('.', '-')
return f"{start}{end}"
return None
def _extract_brands(self, message: str) -> list:
"""Extract brand names from message (Chinese and English)"""
brand_mapping = {
"nivea": "Nivea", "loreal": "Loreal", "sk-ii": "SK-II",
"kiehls": "Kiehls", "clinique": "Clinique", "dior": "Dior",
"chanel": "Chanel", "ysl": "YSL", "givenchy": "Givenchy",
"gucci": "Gucci", "prada": "Prada", "versace": "Versace",
"armani": "Armani", "coach": "Coach", "michael kors": "Michael Kors",
"neutrogena": "Neutrogena", "aveeno": "Aveeno",
"estee lauder": "Estee Lauder", "lancome": "Lancome",
"biotherm": "Biotherm", "clarins": "Clarins", "nars": "NARS",
"bobbi brown": "Bobbi Brown", "mac": "MAC", "tumi": "Tumi",
"samsonite": "Samsonite", "longchamp": "Longchamp", "shiseido": "Shiseido"
}
message_lower = message.lower()
found_brands = []
for brand_key, brand_name in brand_mapping.items():
if brand_key in message_lower and brand_name not in found_brands:
found_brands.append(brand_name)
return found_brands
# Global instance for use in telegram bot service
telegram_ai_integration = TelegramAIIntegration()
async def process_telegram_query(user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]:
"""Convenience function for processing telegram queries"""
return await telegram_ai_integration.process_natural_language_query(user_message, user_id, chat_id)

View File

@@ -178,10 +178,9 @@ class TrendTelegramBot:
"""開始指令 - 顯示主選單"""
user = update.effective_user
await update.message.reply_text(
f"Hello! I am the MOMO Pro Assistant. How can I help you today?\n\n"
f"Hi {user.first_name}!\n\n"
f"I am *MOMO Trend Assistant Bot*\n"
f"Please select the function to execute:",
f"👋 嗨,{user.first_name}\n\n"
f"我是 *MOMO 趨勢助手*,您的智能商業分析夥伴。\n"
f"請選擇下方功能,或直接輸入問題:",
reply_markup=self._get_main_menu_keyboard()
)
@@ -1371,7 +1370,7 @@ class TrendTelegramBot:
# Enhanced natural language processing with AI integration
try:
# Import AI integration
from telegram_ai_integration import process_telegram_query
from services.telegram_ai_integration import process_telegram_query
# Process with AI
user_id = update.effective_user.id
@@ -1399,28 +1398,26 @@ class TrendTelegramBot:
async def _handle_complex_ai_response(self, update: Update, ai_result: dict):
"""Handle complex AI responses"""
response_text = ai_result.get('response_text', 'Processing your request...')
# Show processing message
response_text = ai_result.get('response_text', '正在處理您的請求...')
await update.message.reply_text(
f"Processing your request...\n\n{response_text}",
f"⏳ 處理中...\n\n{response_text}",
parse_mode='Markdown'
)
# For now, provide helpful guidance
query_type = ai_result.get('query_type', 'general query')
suggestions = self._get_query_suggestions(query_type)
await update.message.reply_text(
f"Based on your {query_type}, I recommend:\n\n" +
"根據您的需求,建議使用以下功能:\n\n" +
"\n".join([f"· {s}" for s in suggestions]) +
"\n\nOr use the menu for direct access:",
"\n\n或透過選單直接操作:",
reply_markup=self._get_main_menu_keyboard()
)
async def _handle_simple_ai_response(self, update: Update, ai_result: dict):
"""Handle simple AI responses"""
response_text = ai_result.get('response_text', 'How can I help you?')
response_text = ai_result.get('response_text', '請問有什麼我可以幫您的嗎?')
suggestions = ai_result.get('suggestions', [])
show_menu = ai_result.get('show_menu', False)
@@ -1483,50 +1480,50 @@ class TrendTelegramBot:
start_date = date_match.group(1).replace('/', '-').replace('.', '-')
end_date = date_match.group(2).replace('/', '-').replace('.', '-')
brand_text = f"Brands found: {', '.join(found_brands)}" if found_brands else "All brands"
brand_text = f"品牌:{', '.join(found_brands)}" if found_brands else "全部品牌"
await update.message.reply_text(
f"Processing Momo flash sale query for {start_date} to {end_date}...\n\n"
f"Brands found: {brand_text}\n\n"
f"Please use the menu options for detailed analysis:",
f"⏳ 正在查詢 {start_date} {end_date} 的促銷業績...\n\n"
f"{brand_text}\n\n"
f"請使用下方選單查看詳細分析:",
reply_markup=self._get_main_menu_keyboard()
)
elif found_brands and any(word in text.lower() for word in ['momo', 'product', 'brand']):
brand_list = ', '.join(found_brands)
await update.message.reply_text(
f"Searching for {brand_list} products...\n\n"
f"Use the menu options for detailed brand analysis:",
f"🔍 正在搜尋 {brand_list} 商品...\n\n"
f"請使用下方選單查看詳細品牌分析:",
reply_markup=self._get_main_menu_keyboard()
)
elif any(word in text for word in ['trend', 'popular', 'trend']):
elif any(word in text for word in ['trend', 'popular']):
await update.message.reply_text(
"Please select function:",
"請選擇功能:",
reply_markup=self._get_main_menu_keyboard()
)
elif any(word in text for word in ['search', 'query', 'search']):
elif any(word in text for word in ['search', 'query']):
context.user_data['waiting_for'] = 'search_query'
await update.message.reply_text(
"Please enter search keywords:",
"🔍 請輸入搜尋關鍵字:",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("Cancel", callback_data="menu_main")
InlineKeyboardButton("❌ 取消", callback_data="menu:main")
]])
)
elif any(word in text for word in ['copy', 'generate', 'copy']):
elif any(word in text for word in ['copy', 'generate']):
context.user_data['waiting_for'] = 'copy_product'
await update.message.reply_text(
"Please enter product name:",
"✍️ 請輸入商品名稱:",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("Cancel", callback_data="menu_main")
InlineKeyboardButton("❌ 取消", callback_data="menu:main")
]])
)
else:
await update.message.reply_text(
"I'm analyzing your request. Please select a function or use /help for commands:",
"🤔 收到您的訊息!請選擇下方功能,或輸入 /help 查看指令說明:",
reply_markup=self._get_main_menu_keyboard()
)