Files
ewoooc/services/telegram_ai_integration.py
ogt 4ff291b596
All checks were successful
CD Pipeline / deploy (push) Successful in 1m28s
fix: 修復 AI 對話無法使用 + 全面繁體中文化
- telegram_ai_integration.py 移至 services/ 解決 ModuleNotFoundError
  (momo-telegram-bot 只掛載 services/,根目錄檔案進不了容器)
- import 路徑更新為 from services.telegram_ai_integration
- 所有英文回覆字串改為繁體中文:
  · 歡迎訊息、fallback 訊息、錯誤提示
  · _enhanced_keyword_matching 全英文段落
  · _handle_complex_ai_response / _handle_simple_ai_response
  · Cancel 按鈕改「 取消」、callback 改 menu:main

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 09:41:55 +08:00

283 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Telegram Bot AI Integration
Integrate existing AI Orchestrator for natural language processing
All responses in Traditional Chinese
"""
import asyncio
import logging
from typing import Dict, Any, Optional
from services.ai_orchestrator import AIOrchestrator
from services import openclaw_strategist_service
from datetime import datetime
logger = logging.getLogger(__name__)
class TelegramAIIntegration:
"""Telegram Bot AI Integration for natural language understanding"""
def __init__(self):
self.orchestrator = AIOrchestrator()
async def process_natural_language_query(self, user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]:
"""
Process natural language query using existing AI infrastructure
Args:
user_message: User's message in Traditional Chinese
user_id: Telegram user ID
chat_id: Telegram chat ID
Returns:
Response dictionary with Traditional Chinese content
"""
try:
# Create session ID based on user and chat
session_id = f"tg_{user_id}_{chat_id}"
# Prepare event for AI processing
event = {
"type": "telegram_query",
"source": "telegram_bot",
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"chat_id": chat_id,
"message": user_message,
"language": "zh-TW", # Traditional Chinese
"context": "telegram_group_chat"
}
# L1: Semantic understanding (Hermes)
l1_result = await self.orchestrator.handle_l1(event, session_id)
if not l1_result or l1_result.get("metadata", {}).get("source") != "hermes_llm":
logger.warning(
f"[TelegramAIIntegration] Hermes LLM 未回應,走規則引擎降級"
f"session={session_id} source={( l1_result or {}).get('metadata', {}).get('source', 'none')}"
)
# Check if this is a complex query requiring L2 processing
if self._is_complex_query(user_message, l1_result):
# L2: Planning and execution (Nemotron)
l2_result = await self.orchestrator.handle_l2(event, session_id)
dispatch_to = (l2_result or {}).get("dispatch_to", "direct_response")
complexity = float((l1_result or {}).get("complexity_score", 0.0) or 0.0)
# 若 L2 判定走 OpenClaw 或複雜度 >= 0.7 → 呼叫策略師補真實繁中洞察
if dispatch_to == "openclaw" or complexity >= 0.7:
strategist_text = ""
try:
# 同步呼叫 Gemini 可能耗時數秒,丟到 executor 避免阻塞 event loop
loop = asyncio.get_running_loop()
strategist_text = await loop.run_in_executor(
None,
openclaw_strategist_service.generate_strategy_response,
user_message,
{
"intent": (l1_result or {}).get("intent"),
"user_id": user_id,
"chat_id": chat_id,
},
)
except Exception as e:
logger.error(
f"[TelegramAIIntegration] OpenClaw 策略師呼叫失敗"
f"{type(e).__name__}: {e}",
exc_info=True,
)
strategist_text = ""
response = self._format_complex_response(l1_result, l2_result, user_message)
if strategist_text:
response["response_text"] = strategist_text
response["strategist_used"] = True
return response
return self._format_complex_response(l1_result, l2_result, user_message)
else:
# Simple query, handle directly
return self._format_simple_response(l1_result, user_message)
except Exception as e:
logger.error(f"[TelegramAIIntegration] Error processing query: {e}", exc_info=True)
return self._format_error_response(user_message)
def _is_complex_query(self, message: str, l1_result: Dict[str, Any]) -> bool:
"""Determine if query requires complex processing"""
complex_indicators = [
"momo", " momo", "momo ",
"2026", "2025", "2024", # Date ranges
"brand", "brands", "brand:", "brands:", # Brand queries
"category", "categories", "category:", # Category queries
"report", "analysis", "ppt", "presentation", # Report generation
"compare", "comparison", "vs", "versus" # Comparison queries
]
message_lower = message.lower()
# Check for complex indicators
for indicator in complex_indicators:
if indicator in message_lower:
return True
# Check L1 analysis result
if l1_result.get("complexity_score", 0) > 0.7:
return True
if l1_result.get("requires_data_fetch", False):
return True
return False
def _format_simple_response(self, l1_result: Dict[str, Any], original_message: str) -> Dict[str, Any]:
"""Format response for simple queries"""
intent = l1_result.get("intent", "unknown")
confidence = l1_result.get("confidence", 0.0)
# 繁體中文(台灣)回應模板
responses = {
"greeting": {
"zh_tw": "您好!我是 MOMO Pro 智能助理,今天需要什麼協助?",
"suggestions": ["查看今日業績", "商品排行榜", "市場情報摘要"],
},
"help": {
"zh_tw": (
"我可以協助您處理業績查詢、商品資訊、市場情報、產出報告等需求。\n"
"請直接輸入問題,或使用選單選擇功能。"
),
"suggestions": ["業績表現", "商品趨勢", "市場分析"],
},
"unknown": {
"zh_tw": "收到您的訊息,若需特定功能,請從選單選擇或使用 /help 查看可用指令。",
"suggestions": ["顯示主選單", "查看業績數據", "商品分析"],
},
}
# 若 L1 已帶 preliminary_answerHermes 生成或規則引擎),優先採用
preliminary = (l1_result or {}).get("preliminary_answer", "") or ""
response_data = responses.get(intent, responses["unknown"])
if preliminary:
response_data = {**response_data, "zh_tw": preliminary}
return {
"success": True,
"type": "simple_response",
"intent": intent,
"confidence": confidence,
"response_text": response_data["zh_tw"],
"suggestions": response_data["suggestions"],
"show_menu": intent == "unknown"
}
def _format_complex_response(self, l1_result: Dict[str, Any], l2_result: Dict[str, Any], original_message: str) -> Dict[str, Any]:
"""Format response for complex queries requiring data fetching"""
action_plan = l2_result.get("action_plan", {})
# Extract relevant information
query_type = self._extract_query_type(original_message)
date_range = self._extract_date_range(original_message)
brands = self._extract_brands(original_message)
# 繁體中文(台灣)回應
query_type_zh = {
"sales analysis": "業績分析",
"product analysis": "商品分析",
"market intelligence": "市場情報",
"report generation": "報告產製",
"comparative analysis": "比較分析",
"general query": "一般查詢",
}.get(query_type, query_type)
response_text = f"正在處理您的「{query_type_zh}」需求"
if date_range:
response_text += f",期間:{date_range}"
if brands:
response_text += f",品牌:{', '.join(brands)}"
response_text += "。分析準備中,稍候片刻…"
return {
"success": True,
"type": "complex_response",
"query_type": query_type,
"date_range": date_range,
"brands": brands,
"action_plan": action_plan,
"response_text": response_text,
"requires_processing": True,
"processing_status": "queued"
}
def _format_error_response(self, original_message: str) -> Dict[str, Any]:
"""Format error response in Traditional Chinese"""
return {
"success": False,
"type": "error_response",
"response_text": "抱歉,處理您的訊息時發生問題,請改用選單功能或稍後再試。",
"error_suggestions": [
"查看今日業績",
"商品排行榜",
"市場情報摘要",
"使用 /help 查看可用指令",
],
"show_menu": True,
}
def _extract_query_type(self, message: str) -> str:
"""Extract type of query from message"""
if any(word in message.lower() for word in ["sales", "revenue", "performance"]):
return "sales analysis"
elif any(word in message.lower() for word in ["product", "brand", "item"]):
return "product analysis"
elif any(word in message.lower() for word in ["market", "trend", "intelligence"]):
return "market intelligence"
elif any(word in message.lower() for word in ["report", "ppt", "presentation"]):
return "report generation"
elif any(word in message.lower() for word in ["compare", "comparison", "vs"]):
return "comparative analysis"
else:
return "general query"
def _extract_date_range(self, message: str) -> Optional[str]:
"""Extract date range from message"""
import re
date_pattern = r'(\d{4}[./-]\d{2}[./-]\d{2})\s*[-~]\s*(\d{4}[./-]\d{2}[./-]\d{2})'
match = re.search(date_pattern, message)
if match:
start = match.group(1).replace('/', '-').replace('.', '-')
end = match.group(2).replace('/', '-').replace('.', '-')
return f"{start}{end}"
return None
def _extract_brands(self, message: str) -> list:
"""Extract brand names from message (Chinese and English)"""
brand_mapping = {
"nivea": "Nivea", "loreal": "Loreal", "sk-ii": "SK-II",
"kiehls": "Kiehls", "clinique": "Clinique", "dior": "Dior",
"chanel": "Chanel", "ysl": "YSL", "givenchy": "Givenchy",
"gucci": "Gucci", "prada": "Prada", "versace": "Versace",
"armani": "Armani", "coach": "Coach", "michael kors": "Michael Kors",
"neutrogena": "Neutrogena", "aveeno": "Aveeno",
"estee lauder": "Estee Lauder", "lancome": "Lancome",
"biotherm": "Biotherm", "clarins": "Clarins", "nars": "NARS",
"bobbi brown": "Bobbi Brown", "mac": "MAC", "tumi": "Tumi",
"samsonite": "Samsonite", "longchamp": "Longchamp", "shiseido": "Shiseido"
}
message_lower = message.lower()
found_brands = []
for brand_key, brand_name in brand_mapping.items():
if brand_key in message_lower and brand_name not in found_brands:
found_brands.append(brand_name)
return found_brands
# Global instance for use in telegram bot service
telegram_ai_integration = TelegramAIIntegration()
async def process_telegram_query(user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]:
"""Convenience function for processing telegram queries"""
return await telegram_ai_integration.process_natural_language_query(user_message, user_id, chat_id)