diff --git a/services/telegram_ai_integration.py b/services/telegram_ai_integration.py new file mode 100644 index 0000000..a96c9e4 --- /dev/null +++ b/services/telegram_ai_integration.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 +""" +Telegram Bot AI Integration +Integrate existing AI Orchestrator for natural language processing +All responses in Traditional Chinese +""" + +import asyncio +import logging +from typing import Dict, Any, Optional +from services.ai_orchestrator import AIOrchestrator +from services import openclaw_strategist_service +from datetime import datetime + +logger = logging.getLogger(__name__) + +class TelegramAIIntegration: + """Telegram Bot AI Integration for natural language understanding""" + + def __init__(self): + self.orchestrator = AIOrchestrator() + + async def process_natural_language_query(self, user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]: + """ + Process natural language query using existing AI infrastructure + + Args: + user_message: User's message in Traditional Chinese + user_id: Telegram user ID + chat_id: Telegram chat ID + + Returns: + Response dictionary with Traditional Chinese content + """ + try: + # Create session ID based on user and chat + session_id = f"tg_{user_id}_{chat_id}" + + # Prepare event for AI processing + event = { + "type": "telegram_query", + "source": "telegram_bot", + "timestamp": datetime.now().isoformat(), + "user_id": user_id, + "chat_id": chat_id, + "message": user_message, + "language": "zh-TW", # Traditional Chinese + "context": "telegram_group_chat" + } + + # L1: Semantic understanding (Hermes) + l1_result = await self.orchestrator.handle_l1(event, session_id) + if not l1_result or l1_result.get("metadata", {}).get("source") != "hermes_llm": + logger.warning( + f"[TelegramAIIntegration] Hermes LLM 未回應,走規則引擎降級" + f"(session={session_id} source={( l1_result or {}).get('metadata', {}).get('source', 'none')})" + ) + + # Check if this is a complex query requiring L2 processing + if self._is_complex_query(user_message, l1_result): + # L2: Planning and execution (Nemotron) + l2_result = await self.orchestrator.handle_l2(event, session_id) + + dispatch_to = (l2_result or {}).get("dispatch_to", "direct_response") + complexity = float((l1_result or {}).get("complexity_score", 0.0) or 0.0) + + # 若 L2 判定走 OpenClaw 或複雜度 >= 0.7 → 呼叫策略師補真實繁中洞察 + if dispatch_to == "openclaw" or complexity >= 0.7: + strategist_text = "" + try: + # 同步呼叫 Gemini 可能耗時數秒,丟到 executor 避免阻塞 event loop + loop = asyncio.get_running_loop() + strategist_text = await loop.run_in_executor( + None, + openclaw_strategist_service.generate_strategy_response, + user_message, + { + "intent": (l1_result or {}).get("intent"), + "user_id": user_id, + "chat_id": chat_id, + }, + ) + except Exception as e: + logger.error( + f"[TelegramAIIntegration] OpenClaw 策略師呼叫失敗" + f"({type(e).__name__}: {e})", + exc_info=True, + ) + strategist_text = "" + + response = self._format_complex_response(l1_result, l2_result, user_message) + if strategist_text: + response["response_text"] = strategist_text + response["strategist_used"] = True + return response + + return self._format_complex_response(l1_result, l2_result, user_message) + else: + # Simple query, handle directly + return self._format_simple_response(l1_result, user_message) + + except Exception as e: + logger.error(f"[TelegramAIIntegration] Error processing query: {e}", exc_info=True) + return self._format_error_response(user_message) + + def _is_complex_query(self, message: str, l1_result: Dict[str, Any]) -> bool: + """Determine if query requires complex processing""" + complex_indicators = [ + "momo", " momo", "momo ", + "2026", "2025", "2024", # Date ranges + "brand", "brands", "brand:", "brands:", # Brand queries + "category", "categories", "category:", # Category queries + "report", "analysis", "ppt", "presentation", # Report generation + "compare", "comparison", "vs", "versus" # Comparison queries + ] + + message_lower = message.lower() + + # Check for complex indicators + for indicator in complex_indicators: + if indicator in message_lower: + return True + + # Check L1 analysis result + if l1_result.get("complexity_score", 0) > 0.7: + return True + + if l1_result.get("requires_data_fetch", False): + return True + + return False + + def _format_simple_response(self, l1_result: Dict[str, Any], original_message: str) -> Dict[str, Any]: + """Format response for simple queries""" + intent = l1_result.get("intent", "unknown") + confidence = l1_result.get("confidence", 0.0) + + # 繁體中文(台灣)回應模板 + responses = { + "greeting": { + "zh_tw": "您好!我是 MOMO Pro 智能助理,今天需要什麼協助?", + "suggestions": ["查看今日業績", "商品排行榜", "市場情報摘要"], + }, + "help": { + "zh_tw": ( + "我可以協助您處理業績查詢、商品資訊、市場情報、產出報告等需求。\n" + "請直接輸入問題,或使用選單選擇功能。" + ), + "suggestions": ["業績表現", "商品趨勢", "市場分析"], + }, + "unknown": { + "zh_tw": "收到您的訊息,若需特定功能,請從選單選擇或使用 /help 查看可用指令。", + "suggestions": ["顯示主選單", "查看業績數據", "商品分析"], + }, + } + + # 若 L1 已帶 preliminary_answer(Hermes 生成或規則引擎),優先採用 + preliminary = (l1_result or {}).get("preliminary_answer", "") or "" + response_data = responses.get(intent, responses["unknown"]) + if preliminary: + response_data = {**response_data, "zh_tw": preliminary} + + return { + "success": True, + "type": "simple_response", + "intent": intent, + "confidence": confidence, + "response_text": response_data["zh_tw"], + "suggestions": response_data["suggestions"], + "show_menu": intent == "unknown" + } + + def _format_complex_response(self, l1_result: Dict[str, Any], l2_result: Dict[str, Any], original_message: str) -> Dict[str, Any]: + """Format response for complex queries requiring data fetching""" + action_plan = l2_result.get("action_plan", {}) + + # Extract relevant information + query_type = self._extract_query_type(original_message) + date_range = self._extract_date_range(original_message) + brands = self._extract_brands(original_message) + + # 繁體中文(台灣)回應 + query_type_zh = { + "sales analysis": "業績分析", + "product analysis": "商品分析", + "market intelligence": "市場情報", + "report generation": "報告產製", + "comparative analysis": "比較分析", + "general query": "一般查詢", + }.get(query_type, query_type) + + response_text = f"正在處理您的「{query_type_zh}」需求" + if date_range: + response_text += f",期間:{date_range}" + if brands: + response_text += f",品牌:{', '.join(brands)}" + response_text += "。分析準備中,稍候片刻…" + + return { + "success": True, + "type": "complex_response", + "query_type": query_type, + "date_range": date_range, + "brands": brands, + "action_plan": action_plan, + "response_text": response_text, + "requires_processing": True, + "processing_status": "queued" + } + + def _format_error_response(self, original_message: str) -> Dict[str, Any]: + """Format error response in Traditional Chinese""" + return { + "success": False, + "type": "error_response", + "response_text": "抱歉,處理您的訊息時發生問題,請改用選單功能或稍後再試。", + "error_suggestions": [ + "查看今日業績", + "商品排行榜", + "市場情報摘要", + "使用 /help 查看可用指令", + ], + "show_menu": True, + } + + def _extract_query_type(self, message: str) -> str: + """Extract type of query from message""" + if any(word in message.lower() for word in ["sales", "revenue", "performance"]): + return "sales analysis" + elif any(word in message.lower() for word in ["product", "brand", "item"]): + return "product analysis" + elif any(word in message.lower() for word in ["market", "trend", "intelligence"]): + return "market intelligence" + elif any(word in message.lower() for word in ["report", "ppt", "presentation"]): + return "report generation" + elif any(word in message.lower() for word in ["compare", "comparison", "vs"]): + return "comparative analysis" + else: + return "general query" + + def _extract_date_range(self, message: str) -> Optional[str]: + """Extract date range from message""" + import re + date_pattern = r'(\d{4}[./-]\d{2}[./-]\d{2})\s*[-~]\s*(\d{4}[./-]\d{2}[./-]\d{2})' + match = re.search(date_pattern, message) + + if match: + start = match.group(1).replace('/', '-').replace('.', '-') + end = match.group(2).replace('/', '-').replace('.', '-') + return f"{start} 至 {end}" + + return None + + def _extract_brands(self, message: str) -> list: + """Extract brand names from message (Chinese and English)""" + brand_mapping = { + "nivea": "Nivea", "loreal": "Loreal", "sk-ii": "SK-II", + "kiehls": "Kiehls", "clinique": "Clinique", "dior": "Dior", + "chanel": "Chanel", "ysl": "YSL", "givenchy": "Givenchy", + "gucci": "Gucci", "prada": "Prada", "versace": "Versace", + "armani": "Armani", "coach": "Coach", "michael kors": "Michael Kors", + "neutrogena": "Neutrogena", "aveeno": "Aveeno", + "estee lauder": "Estee Lauder", "lancome": "Lancome", + "biotherm": "Biotherm", "clarins": "Clarins", "nars": "NARS", + "bobbi brown": "Bobbi Brown", "mac": "MAC", "tumi": "Tumi", + "samsonite": "Samsonite", "longchamp": "Longchamp", "shiseido": "Shiseido" + } + + message_lower = message.lower() + found_brands = [] + for brand_key, brand_name in brand_mapping.items(): + if brand_key in message_lower and brand_name not in found_brands: + found_brands.append(brand_name) + return found_brands + + +# Global instance for use in telegram bot service +telegram_ai_integration = TelegramAIIntegration() + +async def process_telegram_query(user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]: + """Convenience function for processing telegram queries""" + return await telegram_ai_integration.process_natural_language_query(user_message, user_id, chat_id) diff --git a/services/telegram_bot_service.py b/services/telegram_bot_service.py index 4c24f11..3c5ca4c 100644 --- a/services/telegram_bot_service.py +++ b/services/telegram_bot_service.py @@ -178,10 +178,9 @@ class TrendTelegramBot: """開始指令 - 顯示主選單""" user = update.effective_user await update.message.reply_text( - f"Hello! I am the MOMO Pro Assistant. How can I help you today?\n\n" - f"Hi {user.first_name}!\n\n" - f"I am *MOMO Trend Assistant Bot*\n" - f"Please select the function to execute:", + f"👋 嗨,{user.first_name}!\n\n" + f"我是 *MOMO 趨勢助手*,您的智能商業分析夥伴。\n" + f"請選擇下方功能,或直接輸入問題:", reply_markup=self._get_main_menu_keyboard() ) @@ -1371,7 +1370,7 @@ class TrendTelegramBot: # Enhanced natural language processing with AI integration try: # Import AI integration - from telegram_ai_integration import process_telegram_query + from services.telegram_ai_integration import process_telegram_query # Process with AI user_id = update.effective_user.id @@ -1399,28 +1398,26 @@ class TrendTelegramBot: async def _handle_complex_ai_response(self, update: Update, ai_result: dict): """Handle complex AI responses""" - response_text = ai_result.get('response_text', 'Processing your request...') - - # Show processing message + response_text = ai_result.get('response_text', '正在處理您的請求...') + await update.message.reply_text( - f"Processing your request...\n\n{response_text}", + f"⏳ 處理中...\n\n{response_text}", parse_mode='Markdown' ) - - # For now, provide helpful guidance + query_type = ai_result.get('query_type', 'general query') suggestions = self._get_query_suggestions(query_type) - + await update.message.reply_text( - f"Based on your {query_type}, I recommend:\n\n" + + "根據您的需求,建議使用以下功能:\n\n" + "\n".join([f"· {s}" for s in suggestions]) + - "\n\nOr use the menu for direct access:", + "\n\n或透過選單直接操作:", reply_markup=self._get_main_menu_keyboard() ) async def _handle_simple_ai_response(self, update: Update, ai_result: dict): """Handle simple AI responses""" - response_text = ai_result.get('response_text', 'How can I help you?') + response_text = ai_result.get('response_text', '請問有什麼我可以幫您的嗎?') suggestions = ai_result.get('suggestions', []) show_menu = ai_result.get('show_menu', False) @@ -1483,50 +1480,50 @@ class TrendTelegramBot: start_date = date_match.group(1).replace('/', '-').replace('.', '-') end_date = date_match.group(2).replace('/', '-').replace('.', '-') - brand_text = f"Brands found: {', '.join(found_brands)}" if found_brands else "All brands" - + brand_text = f"品牌:{', '.join(found_brands)}" if found_brands else "全部品牌" + await update.message.reply_text( - f"Processing Momo flash sale query for {start_date} to {end_date}...\n\n" - f"Brands found: {brand_text}\n\n" - f"Please use the menu options for detailed analysis:", + f"⏳ 正在查詢 {start_date} 至 {end_date} 的促銷業績...\n\n" + f"{brand_text}\n\n" + f"請使用下方選單查看詳細分析:", reply_markup=self._get_main_menu_keyboard() ) - + elif found_brands and any(word in text.lower() for word in ['momo', 'product', 'brand']): brand_list = ', '.join(found_brands) await update.message.reply_text( - f"Searching for {brand_list} products...\n\n" - f"Use the menu options for detailed brand analysis:", + f"🔍 正在搜尋 {brand_list} 商品...\n\n" + f"請使用下方選單查看詳細品牌分析:", reply_markup=self._get_main_menu_keyboard() ) - - elif any(word in text for word in ['trend', 'popular', 'trend']): + + elif any(word in text for word in ['trend', 'popular']): await update.message.reply_text( - "Please select function:", + "請選擇功能:", reply_markup=self._get_main_menu_keyboard() ) - - elif any(word in text for word in ['search', 'query', 'search']): + + elif any(word in text for word in ['search', 'query']): context.user_data['waiting_for'] = 'search_query' await update.message.reply_text( - "Please enter search keywords:", + "🔍 請輸入搜尋關鍵字:", reply_markup=InlineKeyboardMarkup([[ - InlineKeyboardButton("Cancel", callback_data="menu_main") + InlineKeyboardButton("❌ 取消", callback_data="menu:main") ]]) ) - - elif any(word in text for word in ['copy', 'generate', 'copy']): + + elif any(word in text for word in ['copy', 'generate']): context.user_data['waiting_for'] = 'copy_product' await update.message.reply_text( - "Please enter product name:", + "✍️ 請輸入商品名稱:", reply_markup=InlineKeyboardMarkup([[ - InlineKeyboardButton("Cancel", callback_data="menu_main") + InlineKeyboardButton("❌ 取消", callback_data="menu:main") ]]) ) - + else: await update.message.reply_text( - "I'm analyzing your request. Please select a function or use /help for commands:", + "🤔 收到您的訊息!請選擇下方功能,或輸入 /help 查看指令說明:", reply_markup=self._get_main_menu_keyboard() )