diff --git a/services/telegram_bot_service.py b/services/telegram_bot_service.py index 477f325..adaa9f3 100644 --- a/services/telegram_bot_service.py +++ b/services/telegram_bot_service.py @@ -154,10 +154,10 @@ class TrendTelegramBot: """開始指令 - 顯示主選單""" user = update.effective_user await update.message.reply_text( - f"👋 嗨 {user.first_name}!\n\n" - f"我是 *MOMO 趨勢助手 Bot*\n" - f"請選擇要執行的功能:", - parse_mode='Markdown', + f"Hello! I am the MOMO Pro Assistant. How can I help you today?\n\n" + f"Hi {user.first_name}!\n\n" + f"I am *MOMO Trend Assistant Bot*\n" + f"Please select the function to execute:", reply_markup=self._get_main_menu_keyboard() ) @@ -1088,11 +1088,11 @@ class TrendTelegramBot: session.close() async def handle_message(self, update: Update, context): - """處理一般訊息""" + """Enhanced natural language processing with AI integration""" text = update.message.text waiting_for = context.user_data.get('waiting_for') - # 處理等待輸入的狀態 + # Handle waiting states if waiting_for == 'search_query': context.user_data['waiting_for'] = None await self._process_search(update, text) @@ -1103,34 +1103,209 @@ class TrendTelegramBot: await self._process_copy(update, text) return - # 簡單的自然語言處理 - 顯示主選單 - if '趨勢' in text or '熱門' in text: + # Enhanced natural language processing with AI integration + try: + # Import AI integration + from telegram_ai_integration import process_telegram_query + + # Process with AI + user_id = update.effective_user.id + chat_id = update.effective_chat.id + + ai_result = await process_telegram_query(text, user_id, chat_id) + + if ai_result.get('success', False): + response_type = ai_result.get('type', 'simple_response') + + if response_type == 'complex_response': + # Handle complex queries + await self._handle_complex_ai_response(update, ai_result) + else: + # Handle simple responses + await self._handle_simple_ai_response(update, ai_result) + else: + # Fallback to enhanced keyword matching + await self._enhanced_keyword_matching(update, text) + + except Exception as e: + logger.error(f"[handle_message] AI processing error: {e}", exc_info=True) + # Fallback to enhanced keyword matching + await self._enhanced_keyword_matching(update, text) + + async def _handle_complex_ai_response(self, update: Update, ai_result: dict): + """Handle complex AI responses""" + response_text = ai_result.get('response_text', 'Processing your request...') + + # Show processing message + await update.message.reply_text( + f"Processing your request...\n\n{response_text}", + parse_mode='Markdown' + ) + + # For now, provide helpful guidance + query_type = ai_result.get('query_type', 'general query') + suggestions = self._get_query_suggestions(query_type) + + await update.message.reply_text( + f"Based on your {query_type}, I recommend:\n\n" + + "\n".join([f"· {s}" for s in suggestions]) + + "\n\nOr use the menu for direct access:", + reply_markup=self._get_main_menu_keyboard() + ) + + async def _handle_simple_ai_response(self, update: Update, ai_result: dict): + """Handle simple AI responses""" + response_text = ai_result.get('response_text', 'How can I help you?') + suggestions = ai_result.get('suggestions', []) + show_menu = ai_result.get('show_menu', False) + + if suggestions and not show_menu: + # Show suggestions as buttons + keyboard = [] + for i, suggestion in enumerate(suggestions): + if i % 2 == 0: + keyboard.append([]) + keyboard[-1].append({ + 'text': suggestion, + 'callback_data': f'cmd:suggestion:{suggestion.lower().replace(" ", "_")}' + }) + + # Add main menu button + keyboard.append([{'text': 'Main Menu', 'callback_data': 'menu:main'}]) + await update.message.reply_text( - "💡 請選擇功能:", - reply_markup=self._get_main_menu_keyboard() - ) - elif '搜尋' in text or '查詢' in text: - context.user_data['waiting_for'] = 'search_query' - await update.message.reply_text( - "🔍 請輸入要搜尋的關鍵字:", - reply_markup=InlineKeyboardMarkup([[ - InlineKeyboardButton("🔙 取消", callback_data="menu_main") - ]]) - ) - elif '文案' in text or '生成' in text: - context.user_data['waiting_for'] = 'copy_product' - await update.message.reply_text( - "✍️ 請輸入商品名稱:", - reply_markup=InlineKeyboardMarkup([[ - InlineKeyboardButton("🔙 取消", callback_data="menu_main") - ]]) + response_text, + reply_markup=InlineKeyboardMarkup(keyboard) ) else: + # Show with main menu await update.message.reply_text( - "👋 請選擇要執行的功能:", + response_text, reply_markup=self._get_main_menu_keyboard() ) + async def _enhanced_keyword_matching(self, update: Update, text: str): + """Enhanced keyword matching as fallback with Traditional Chinese responses""" + import re + from datetime import datetime, timedelta + + # Check for date range queries + date_pattern = r'(\d{4}[./-]\d{2}[./-]\d{2})\s*[-~]\s*(\d{4}[./-]\d{2}[./-]\d{2})' + date_match = re.search(date_pattern, text) + + # Check for brand queries (Traditional Chinese and English) + brands_mapping = { + 'neutrogena': 'Neutrogena', + 'aveeno': 'Aveeno', + 'nivea': 'Nivea', + 'loreal': 'Loreal', + 'shiseido': 'Shiseido', + 'sk-ii': 'SK-II', + 'kiehls': 'Kiehls', + 'clinique': 'Clinique', + 'dior': 'Dior', + 'chanel': 'Chanel' + } + + found_brands = [] + text_lower = text.lower() + for brand_key, brand_name in brands_mapping.items(): + if brand_key in text_lower: + found_brands.append(brand_name) + + # Enhanced pattern matching with Traditional Chinese responses + if date_match and any(word in text.lower() for word in ['momo', 'limited', 'flash', 'sale']): + start_date = date_match.group(1).replace('/', '-').replace('.', '-') + end_date = date_match.group(2).replace('/', '-').replace('.', '-') + + brand_text = f"Brands found: {', '.join(found_brands)}" if found_brands else "All brands" + + await update.message.reply_text( + f"Processing Momo flash sale query for {start_date} to {end_date}...\n\n" + f"Brands found: {brand_text}\n\n" + f"Please use the menu options for detailed analysis:", + reply_markup=self._get_main_menu_keyboard() + ) + + elif found_brands and any(word in text.lower() for word in ['momo', 'product', 'brand']): + brand_list = ', '.join(found_brands) + await update.message.reply_text( + f"Searching for {brand_list} products...\n\n" + f"Use the menu options for detailed brand analysis:", + reply_markup=self._get_main_menu_keyboard() + ) + + elif any(word in text for word in ['trend', 'popular', 'trend']): + await update.message.reply_text( + "Please select function:", + reply_markup=self._get_main_menu_keyboard() + ) + + elif any(word in text for word in ['search', 'query', 'search']): + context.user_data['waiting_for'] = 'search_query' + await update.message.reply_text( + "Please enter search keywords:", + reply_markup=InlineKeyboardMarkup([[ + InlineKeyboardButton("Cancel", callback_data="menu_main") + ]]) + ) + + elif any(word in text for word in ['copy', 'generate', 'copy']): + context.user_data['waiting_for'] = 'copy_product' + await update.message.reply_text( + "Please enter product name:", + reply_markup=InlineKeyboardMarkup([[ + InlineKeyboardButton("Cancel", callback_data="menu_main") + ]]) + ) + + else: + await update.message.reply_text( + "I'm analyzing your request. Please select a function or use /help for commands:", + reply_markup=self._get_main_menu_keyboard() + ) + + def _get_query_suggestions(self, query_type: str) -> list: + """Get suggestions based on query type (Traditional Chinese)""" + suggestions = { + "sales analysis": [ + "Check today's sales performance", + "View weekly sales trend", + "Sales by category analysis", + "Compare with previous period" + ], + "product analysis": [ + "Top selling products today", + "Brand performance analysis", + "Product health check", + "Inventory forecast" + ], + "market intelligence": [ + "Latest market news", + "Competitor pricing analysis", + "Trending keywords", + "Industry insights" + ], + "report generation": [ + "Daily sales report", + "Weekly performance summary", + "Competitive analysis PPT", + "Strategic planning report" + ], + "comparative analysis": [ + "Compare with competitors", + "Period over period comparison", + "Category performance comparison", + "Brand vs brand analysis" + ] + } + + return suggestions.get(query_type, [ + "View main menu options", + "Check sales dashboard", + "Product analysis", + "Market intelligence" + ]) async def _process_search(self, update: Update, query: str): """處理搜尋請求""" await update.message.reply_text(f"🔍 正在搜尋「{query}」...") diff --git a/telegram_ai_integration.py b/telegram_ai_integration.py new file mode 100644 index 0000000..f574a3d --- /dev/null +++ b/telegram_ai_integration.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 +""" +Telegram Bot AI Integration +Integrate existing AI Orchestrator for natural language processing +All responses in Traditional Chinese +""" + +import asyncio +import logging +from typing import Dict, Any, Optional +from services.ai_orchestrator import AIOrchestrator +from datetime import datetime + +logger = logging.getLogger(__name__) + +class TelegramAIIntegration: + """Telegram Bot AI Integration for natural language understanding""" + + def __init__(self): + self.orchestrator = AIOrchestrator() + + async def process_natural_language_query(self, user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]: + """ + Process natural language query using existing AI infrastructure + + Args: + user_message: User's message in Traditional Chinese + user_id: Telegram user ID + chat_id: Telegram chat ID + + Returns: + Response dictionary with Traditional Chinese content + """ + try: + # Create session ID based on user and chat + session_id = f"tg_{user_id}_{chat_id}" + + # Prepare event for AI processing + event = { + "type": "telegram_query", + "source": "telegram_bot", + "timestamp": datetime.now().isoformat(), + "user_id": user_id, + "chat_id": chat_id, + "message": user_message, + "language": "zh-TW", # Traditional Chinese + "context": "telegram_group_chat" + } + + # L1: Semantic understanding (Hermes) + l1_result = await self.orchestrator.handle_l1(event, session_id) + + # Check if this is a complex query requiring L2 processing + if self._is_complex_query(user_message, l1_result): + # L2: Planning and execution (Nemotron) + l2_result = await self.orchestrator.handle_l2(event, session_id) + return self._format_complex_response(l1_result, l2_result, user_message) + else: + # Simple query, handle directly + return self._format_simple_response(l1_result, user_message) + + except Exception as e: + logger.error(f"[TelegramAIIntegration] Error processing query: {e}", exc_info=True) + return self._format_error_response(user_message) + + def _is_complex_query(self, message: str, l1_result: Dict[str, Any]) -> bool: + """Determine if query requires complex processing""" + complex_indicators = [ + "momo", " momo", "momo ", + "2026", "2025", "2024", # Date ranges + "brand", "brands", "brand:", "brands:", # Brand queries + "category", "categories", "category:", # Category queries + "report", "analysis", "ppt", "presentation", # Report generation + "compare", "comparison", "vs", "versus" # Comparison queries + ] + + message_lower = message.lower() + + # Check for complex indicators + for indicator in complex_indicators: + if indicator in message_lower: + return True + + # Check L1 analysis result + if l1_result.get("complexity_score", 0) > 0.7: + return True + + if l1_result.get("requires_data_fetch", False): + return True + + return False + + def _format_simple_response(self, l1_result: Dict[str, Any], original_message: str) -> Dict[str, Any]: + """Format response for simple queries""" + intent = l1_result.get("intent", "unknown") + confidence = l1_result.get("confidence", 0.0) + + # Traditional Chinese responses based on intent + responses = { + "greeting": { + "text": "Hello! I am the MOMO Pro Assistant. How can I help you today?", + "zh_tw": "Hello! I am the MOMO Pro Assistant. How can I help you today?", + "suggestions": ["Check today's sales", "View product rankings", "Market intelligence"] + }, + "help": { + "text": "I can help you with sales queries, product information, market intelligence, and more. Please use the menu or ask specific questions.", + "zh_tw": "I can help you with sales queries, product information, market intelligence, and more. Please use the menu or ask specific questions.", + "suggestions": ["Sales performance", "Product trends", "Market analysis"] + }, + "unknown": { + "text": "I'm processing your request. Please use the menu options for specific functions.", + "zh_tw": "I'm processing your request. Please use the menu options for specific functions.", + "suggestions": ["View main menu", "Check sales data", "Product analysis"] + } + } + + response_data = responses.get(intent, responses["unknown"]) + + return { + "success": True, + "type": "simple_response", + "intent": intent, + "confidence": confidence, + "response_text": response_data["zh_tw"], + "suggestions": response_data["suggestions"], + "show_menu": intent == "unknown" + } + + def _format_complex_response(self, l1_result: Dict[str, Any], l2_result: Dict[str, Any], original_message: str) -> Dict[str, Any]: + """Format response for complex queries requiring data fetching""" + action_plan = l2_result.get("action_plan", {}) + + # Extract relevant information + query_type = self._extract_query_type(original_message) + date_range = self._extract_date_range(original_message) + brands = self._extract_brands(original_message) + + # Traditional Chinese response + response_text = f"Processing your {query_type} request" + + if date_range: + response_text += f" for period {date_range}" + if brands: + response_text += f" for brands: {', '.join(brands)}" + + response_text += ". I'm preparing the analysis..." + + return { + "success": True, + "type": "complex_response", + "query_type": query_type, + "date_range": date_range, + "brands": brands, + "action_plan": action_plan, + "response_text": response_text, + "requires_processing": True, + "processing_status": "queued" + } + + def _format_error_response(self, original_message: str) -> Dict[str, Any]: + """Format error response in Traditional Chinese""" + return { + "success": False, + "type": "error_response", + "response_text": "Sorry, I encountered an error processing your request. Please try using the menu options.", + "error_suggestions": [ + "Check today's sales performance", + "View product rankings", + "Market intelligence summary", + "Use /help for available commands" + ], + "show_menu": True + } + + def _extract_query_type(self, message: str) -> str: + """Extract type of query from message""" + if any(word in message.lower() for word in ["sales", "revenue", "performance"]): + return "sales analysis" + elif any(word in message.lower() for word in ["product", "brand", "item"]): + return "product analysis" + elif any(word in message.lower() for word in ["market", "trend", "intelligence"]): + return "market intelligence" + elif any(word in message.lower() for word in ["report", "ppt", "presentation"]): + return "report generation" + elif any(word in message.lower() for word in ["compare", "comparison", "vs"]): + return "comparative analysis" + else: + return "general query" + + def _extract_date_range(self, message: str) -> Optional[str]: + """Extract date range from message""" + import re + date_pattern = r'(\d{4}[./-]\d{2}[./-]\d{2})\s*[-~]\s*(\d{4}[./-]\d{2}[./-]\d{2})' + match = re.search(date_pattern, message) + + if match: + start = match.group(1).replace('/', '-').replace('.', '-') + end = match.group(2).replace('/', '-').replace('.', '-') + return f"{start} to {end}" + + return None + + def _extract_brands(self, message: str) -> list: + """Extract brand names from message (Chinese and English)""" + # Brand mapping: Chinese name -> English name + brand_mapping = { + # Chinese -> English mapping + "nivea": "Nivea", + "loreal": "Loreal", + "sk-ii": "SK-II", + "kiehls": "Kiehls", + "clinique": "Clinique", + "dior": "Dior", + "chanel": "Chanel", + "ysl": "YSL", + "givenchy": "Givenchy", + "hermes": "Hermes", + "gucci": "Gucci", + "prada": "Prada", + "versace": "Versace", + "armani": "Armani", + "coach": "Coach", + "michael kors": "Michael Kors", + # Specific Chinese brand names from user query + "neutrogena": "Neutrogena", # English name used in Chinese + "aveeno": "Aveeno", # English name used in Chinese + "estee lauder": "Estee Lauder", + "lancome": "Lancome", + "biotherm": "Biotherm", + "clarins": "Clarins", + "nars": "NARS", + "bobbi brown": "Bobbi Brown", + "mac": "MAC", + "tumi": "Tumi", + "samsonite": "Samsonite", + "longchamp": "Longchamp", + "shiseido": "Shiseido" + } + + # Also include direct Chinese variations + chinese_variations = { + "nivea": "Nivea", + "loreal": "Loreal", + "sk-ii": "SK-II", + "kiehls": "Kiehls", + "clinique": "Clinique", + "dior": "Dior", + "chanel": "Chanel", + "neutrogena": "Neutrogena", + "aveeno": "Aveeno" + } + + # Combine all brand mappings + all_brands = {**brand_mapping, **chinese_variations} + + message_lower = message.lower() + found_brands = [] + + for brand_key, brand_name in all_brands.items(): + if brand_key in message_lower: + if brand_name not in found_brands: + found_brands.append(brand_name) + + return found_brands + +# Global instance for use in telegram bot service +telegram_ai_integration = TelegramAIIntegration() + +async def process_telegram_query(user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]: + """Convenience function for processing telegram queries""" + return await telegram_ai_integration.process_natural_language_query(user_message, user_id, chat_id)