All checks were successful
CD Pipeline / deploy (push) Successful in 1m28s
- telegram_ai_integration.py 移至 services/ 解決 ModuleNotFoundError (momo-telegram-bot 只掛載 services/,根目錄檔案進不了容器) - import 路徑更新為 from services.telegram_ai_integration - 所有英文回覆字串改為繁體中文: · 歡迎訊息、fallback 訊息、錯誤提示 · _enhanced_keyword_matching 全英文段落 · _handle_complex_ai_response / _handle_simple_ai_response · Cancel 按鈕改「❌ 取消」、callback 改 menu:main Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
283 lines
12 KiB
Python
283 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Telegram Bot AI Integration
|
||
Integrate existing AI Orchestrator for natural language processing
|
||
All responses in Traditional Chinese
|
||
"""
|
||
|
||
import asyncio
|
||
import logging
|
||
from typing import Dict, Any, Optional
|
||
from services.ai_orchestrator import AIOrchestrator
|
||
from services import openclaw_strategist_service
|
||
from datetime import datetime
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class TelegramAIIntegration:
|
||
"""Telegram Bot AI Integration for natural language understanding"""
|
||
|
||
def __init__(self):
|
||
self.orchestrator = AIOrchestrator()
|
||
|
||
async def process_natural_language_query(self, user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]:
|
||
"""
|
||
Process natural language query using existing AI infrastructure
|
||
|
||
Args:
|
||
user_message: User's message in Traditional Chinese
|
||
user_id: Telegram user ID
|
||
chat_id: Telegram chat ID
|
||
|
||
Returns:
|
||
Response dictionary with Traditional Chinese content
|
||
"""
|
||
try:
|
||
# Create session ID based on user and chat
|
||
session_id = f"tg_{user_id}_{chat_id}"
|
||
|
||
# Prepare event for AI processing
|
||
event = {
|
||
"type": "telegram_query",
|
||
"source": "telegram_bot",
|
||
"timestamp": datetime.now().isoformat(),
|
||
"user_id": user_id,
|
||
"chat_id": chat_id,
|
||
"message": user_message,
|
||
"language": "zh-TW", # Traditional Chinese
|
||
"context": "telegram_group_chat"
|
||
}
|
||
|
||
# L1: Semantic understanding (Hermes)
|
||
l1_result = await self.orchestrator.handle_l1(event, session_id)
|
||
if not l1_result or l1_result.get("metadata", {}).get("source") != "hermes_llm":
|
||
logger.warning(
|
||
f"[TelegramAIIntegration] Hermes LLM 未回應,走規則引擎降級"
|
||
f"(session={session_id} source={( l1_result or {}).get('metadata', {}).get('source', 'none')})"
|
||
)
|
||
|
||
# Check if this is a complex query requiring L2 processing
|
||
if self._is_complex_query(user_message, l1_result):
|
||
# L2: Planning and execution (Nemotron)
|
||
l2_result = await self.orchestrator.handle_l2(event, session_id)
|
||
|
||
dispatch_to = (l2_result or {}).get("dispatch_to", "direct_response")
|
||
complexity = float((l1_result or {}).get("complexity_score", 0.0) or 0.0)
|
||
|
||
# 若 L2 判定走 OpenClaw 或複雜度 >= 0.7 → 呼叫策略師補真實繁中洞察
|
||
if dispatch_to == "openclaw" or complexity >= 0.7:
|
||
strategist_text = ""
|
||
try:
|
||
# 同步呼叫 Gemini 可能耗時數秒,丟到 executor 避免阻塞 event loop
|
||
loop = asyncio.get_running_loop()
|
||
strategist_text = await loop.run_in_executor(
|
||
None,
|
||
openclaw_strategist_service.generate_strategy_response,
|
||
user_message,
|
||
{
|
||
"intent": (l1_result or {}).get("intent"),
|
||
"user_id": user_id,
|
||
"chat_id": chat_id,
|
||
},
|
||
)
|
||
except Exception as e:
|
||
logger.error(
|
||
f"[TelegramAIIntegration] OpenClaw 策略師呼叫失敗"
|
||
f"({type(e).__name__}: {e})",
|
||
exc_info=True,
|
||
)
|
||
strategist_text = ""
|
||
|
||
response = self._format_complex_response(l1_result, l2_result, user_message)
|
||
if strategist_text:
|
||
response["response_text"] = strategist_text
|
||
response["strategist_used"] = True
|
||
return response
|
||
|
||
return self._format_complex_response(l1_result, l2_result, user_message)
|
||
else:
|
||
# Simple query, handle directly
|
||
return self._format_simple_response(l1_result, user_message)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[TelegramAIIntegration] Error processing query: {e}", exc_info=True)
|
||
return self._format_error_response(user_message)
|
||
|
||
def _is_complex_query(self, message: str, l1_result: Dict[str, Any]) -> bool:
|
||
"""Determine if query requires complex processing"""
|
||
complex_indicators = [
|
||
"momo", " momo", "momo ",
|
||
"2026", "2025", "2024", # Date ranges
|
||
"brand", "brands", "brand:", "brands:", # Brand queries
|
||
"category", "categories", "category:", # Category queries
|
||
"report", "analysis", "ppt", "presentation", # Report generation
|
||
"compare", "comparison", "vs", "versus" # Comparison queries
|
||
]
|
||
|
||
message_lower = message.lower()
|
||
|
||
# Check for complex indicators
|
||
for indicator in complex_indicators:
|
||
if indicator in message_lower:
|
||
return True
|
||
|
||
# Check L1 analysis result
|
||
if l1_result.get("complexity_score", 0) > 0.7:
|
||
return True
|
||
|
||
if l1_result.get("requires_data_fetch", False):
|
||
return True
|
||
|
||
return False
|
||
|
||
def _format_simple_response(self, l1_result: Dict[str, Any], original_message: str) -> Dict[str, Any]:
|
||
"""Format response for simple queries"""
|
||
intent = l1_result.get("intent", "unknown")
|
||
confidence = l1_result.get("confidence", 0.0)
|
||
|
||
# 繁體中文(台灣)回應模板
|
||
responses = {
|
||
"greeting": {
|
||
"zh_tw": "您好!我是 MOMO Pro 智能助理,今天需要什麼協助?",
|
||
"suggestions": ["查看今日業績", "商品排行榜", "市場情報摘要"],
|
||
},
|
||
"help": {
|
||
"zh_tw": (
|
||
"我可以協助您處理業績查詢、商品資訊、市場情報、產出報告等需求。\n"
|
||
"請直接輸入問題,或使用選單選擇功能。"
|
||
),
|
||
"suggestions": ["業績表現", "商品趨勢", "市場分析"],
|
||
},
|
||
"unknown": {
|
||
"zh_tw": "收到您的訊息,若需特定功能,請從選單選擇或使用 /help 查看可用指令。",
|
||
"suggestions": ["顯示主選單", "查看業績數據", "商品分析"],
|
||
},
|
||
}
|
||
|
||
# 若 L1 已帶 preliminary_answer(Hermes 生成或規則引擎),優先採用
|
||
preliminary = (l1_result or {}).get("preliminary_answer", "") or ""
|
||
response_data = responses.get(intent, responses["unknown"])
|
||
if preliminary:
|
||
response_data = {**response_data, "zh_tw": preliminary}
|
||
|
||
return {
|
||
"success": True,
|
||
"type": "simple_response",
|
||
"intent": intent,
|
||
"confidence": confidence,
|
||
"response_text": response_data["zh_tw"],
|
||
"suggestions": response_data["suggestions"],
|
||
"show_menu": intent == "unknown"
|
||
}
|
||
|
||
def _format_complex_response(self, l1_result: Dict[str, Any], l2_result: Dict[str, Any], original_message: str) -> Dict[str, Any]:
|
||
"""Format response for complex queries requiring data fetching"""
|
||
action_plan = l2_result.get("action_plan", {})
|
||
|
||
# Extract relevant information
|
||
query_type = self._extract_query_type(original_message)
|
||
date_range = self._extract_date_range(original_message)
|
||
brands = self._extract_brands(original_message)
|
||
|
||
# 繁體中文(台灣)回應
|
||
query_type_zh = {
|
||
"sales analysis": "業績分析",
|
||
"product analysis": "商品分析",
|
||
"market intelligence": "市場情報",
|
||
"report generation": "報告產製",
|
||
"comparative analysis": "比較分析",
|
||
"general query": "一般查詢",
|
||
}.get(query_type, query_type)
|
||
|
||
response_text = f"正在處理您的「{query_type_zh}」需求"
|
||
if date_range:
|
||
response_text += f",期間:{date_range}"
|
||
if brands:
|
||
response_text += f",品牌:{', '.join(brands)}"
|
||
response_text += "。分析準備中,稍候片刻…"
|
||
|
||
return {
|
||
"success": True,
|
||
"type": "complex_response",
|
||
"query_type": query_type,
|
||
"date_range": date_range,
|
||
"brands": brands,
|
||
"action_plan": action_plan,
|
||
"response_text": response_text,
|
||
"requires_processing": True,
|
||
"processing_status": "queued"
|
||
}
|
||
|
||
def _format_error_response(self, original_message: str) -> Dict[str, Any]:
|
||
"""Format error response in Traditional Chinese"""
|
||
return {
|
||
"success": False,
|
||
"type": "error_response",
|
||
"response_text": "抱歉,處理您的訊息時發生問題,請改用選單功能或稍後再試。",
|
||
"error_suggestions": [
|
||
"查看今日業績",
|
||
"商品排行榜",
|
||
"市場情報摘要",
|
||
"使用 /help 查看可用指令",
|
||
],
|
||
"show_menu": True,
|
||
}
|
||
|
||
def _extract_query_type(self, message: str) -> str:
|
||
"""Extract type of query from message"""
|
||
if any(word in message.lower() for word in ["sales", "revenue", "performance"]):
|
||
return "sales analysis"
|
||
elif any(word in message.lower() for word in ["product", "brand", "item"]):
|
||
return "product analysis"
|
||
elif any(word in message.lower() for word in ["market", "trend", "intelligence"]):
|
||
return "market intelligence"
|
||
elif any(word in message.lower() for word in ["report", "ppt", "presentation"]):
|
||
return "report generation"
|
||
elif any(word in message.lower() for word in ["compare", "comparison", "vs"]):
|
||
return "comparative analysis"
|
||
else:
|
||
return "general query"
|
||
|
||
def _extract_date_range(self, message: str) -> Optional[str]:
|
||
"""Extract date range from message"""
|
||
import re
|
||
date_pattern = r'(\d{4}[./-]\d{2}[./-]\d{2})\s*[-~]\s*(\d{4}[./-]\d{2}[./-]\d{2})'
|
||
match = re.search(date_pattern, message)
|
||
|
||
if match:
|
||
start = match.group(1).replace('/', '-').replace('.', '-')
|
||
end = match.group(2).replace('/', '-').replace('.', '-')
|
||
return f"{start} 至 {end}"
|
||
|
||
return None
|
||
|
||
def _extract_brands(self, message: str) -> list:
|
||
"""Extract brand names from message (Chinese and English)"""
|
||
brand_mapping = {
|
||
"nivea": "Nivea", "loreal": "Loreal", "sk-ii": "SK-II",
|
||
"kiehls": "Kiehls", "clinique": "Clinique", "dior": "Dior",
|
||
"chanel": "Chanel", "ysl": "YSL", "givenchy": "Givenchy",
|
||
"gucci": "Gucci", "prada": "Prada", "versace": "Versace",
|
||
"armani": "Armani", "coach": "Coach", "michael kors": "Michael Kors",
|
||
"neutrogena": "Neutrogena", "aveeno": "Aveeno",
|
||
"estee lauder": "Estee Lauder", "lancome": "Lancome",
|
||
"biotherm": "Biotherm", "clarins": "Clarins", "nars": "NARS",
|
||
"bobbi brown": "Bobbi Brown", "mac": "MAC", "tumi": "Tumi",
|
||
"samsonite": "Samsonite", "longchamp": "Longchamp", "shiseido": "Shiseido"
|
||
}
|
||
|
||
message_lower = message.lower()
|
||
found_brands = []
|
||
for brand_key, brand_name in brand_mapping.items():
|
||
if brand_key in message_lower and brand_name not in found_brands:
|
||
found_brands.append(brand_name)
|
||
return found_brands
|
||
|
||
|
||
# Global instance for use in telegram bot service
|
||
telegram_ai_integration = TelegramAIIntegration()
|
||
|
||
async def process_telegram_query(user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]:
|
||
"""Convenience function for processing telegram queries"""
|
||
return await telegram_ai_integration.process_natural_language_query(user_message, user_id, chat_id)
|