Files
ewoooc/telegram_ai_integration.py
ogt d5c0feab5e
All checks were successful
CD Pipeline / deploy (push) Successful in 1m35s
fix: Telegram bot 全功能修復 — 16個await按鈕/AI對話/模型遷移/DB schema
## Telegram Bot 功能修復
- 補全 16 個 await: 按鈕的 handler(日期選擇/目標設定/促銷追蹤等),
  新增 _handle_await_callback + _process_await_input 完整狀態機
- cmd: 按鈕加入  即時回饋 + try/except 防 BadRequest
- handle_callback 加頂層 try/except 錯誤兜底
- 補 momo:cmd:suggestion + momo:menu:main callback handler
- 修復 _enhanced_keyword_matching context NameError

## AI 模型遷移(hermes3@111 → qwen2.5@188)
- hermes_analyst_service: URL 192.168.0.111→188, hermes3→qwen2.5:7b-instruct
- code_review_pipeline: 改用 HERMES_URL/HERMES_MODEL 常數
- elephant_alpha_orchestrator / nemoton_dispatcher: registry/footprint 同步
- aider_heal_executor: OLLAMA_API_BASE fallback 改 188
- ai_routes: footprint display 字串改 qwen2.5:7b-instruct

## ElephantAlpha 404 修復
- elephant_service: openrouter→NVIDIA NIM, nvidia/llama-3.1-nemotron-ultra-253b-v1
- ai_provider: 模型 ID 同步更新

## TELEGRAM_CHAT_ID 環境變數修正
- cicd_routes + aider_heal_executor: 優先讀 TELEGRAM_CHAT_IDS[0],
  fallback TELEGRAM_CHAT_ID,修復通知靜默失敗

## AI 對話 logging 改善
- telegram_ai_integration: Hermes 降級改 WARNING,OpenClaw 失敗加 exc_info
- hermes_analyst_service: 連線失敗 log 加 host/model context

## DB Schema 修復
- migrations/019: action_plans 補齊全欄位,DROP NOT NULL action_type
- autoheal_models: ActionPlan ORM 同步為超集 schema

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 03:30:14 +08:00

323 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Telegram Bot AI Integration
Integrate existing AI Orchestrator for natural language processing
All responses in Traditional Chinese
"""
import asyncio
import logging
from typing import Dict, Any, Optional
from services.ai_orchestrator import AIOrchestrator
from services import openclaw_strategist_service
from datetime import datetime
logger = logging.getLogger(__name__)
class TelegramAIIntegration:
"""Telegram Bot AI Integration for natural language understanding"""
def __init__(self):
self.orchestrator = AIOrchestrator()
async def process_natural_language_query(self, user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]:
"""
Process natural language query using existing AI infrastructure
Args:
user_message: User's message in Traditional Chinese
user_id: Telegram user ID
chat_id: Telegram chat ID
Returns:
Response dictionary with Traditional Chinese content
"""
try:
# Create session ID based on user and chat
session_id = f"tg_{user_id}_{chat_id}"
# Prepare event for AI processing
event = {
"type": "telegram_query",
"source": "telegram_bot",
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"chat_id": chat_id,
"message": user_message,
"language": "zh-TW", # Traditional Chinese
"context": "telegram_group_chat"
}
# L1: Semantic understanding (Hermes)
l1_result = await self.orchestrator.handle_l1(event, session_id)
if not l1_result or l1_result.get("metadata", {}).get("source") != "hermes_llm":
logger.warning(
f"[TelegramAIIntegration] Hermes LLM 未回應,走規則引擎降級"
f"session={session_id} source={( l1_result or {}).get('metadata', {}).get('source', 'none')}"
)
# Check if this is a complex query requiring L2 processing
if self._is_complex_query(user_message, l1_result):
# L2: Planning and execution (Nemotron)
l2_result = await self.orchestrator.handle_l2(event, session_id)
dispatch_to = (l2_result or {}).get("dispatch_to", "direct_response")
complexity = float((l1_result or {}).get("complexity_score", 0.0) or 0.0)
# 若 L2 判定走 OpenClaw 或複雜度 >= 0.7 → 呼叫策略師補真實繁中洞察
if dispatch_to == "openclaw" or complexity >= 0.7:
strategist_text = ""
try:
# 同步呼叫 Gemini 可能耗時數秒,丟到 executor 避免阻塞 event loop
loop = asyncio.get_running_loop()
strategist_text = await loop.run_in_executor(
None,
openclaw_strategist_service.generate_strategy_response,
user_message,
{
"intent": (l1_result or {}).get("intent"),
"user_id": user_id,
"chat_id": chat_id,
},
)
except Exception as e:
logger.error(
f"[TelegramAIIntegration] OpenClaw 策略師呼叫失敗"
f"{type(e).__name__}: {e}",
exc_info=True,
)
strategist_text = ""
response = self._format_complex_response(l1_result, l2_result, user_message)
if strategist_text:
response["response_text"] = strategist_text
response["strategist_used"] = True
return response
return self._format_complex_response(l1_result, l2_result, user_message)
else:
# Simple query, handle directly
return self._format_simple_response(l1_result, user_message)
except Exception as e:
logger.error(f"[TelegramAIIntegration] Error processing query: {e}", exc_info=True)
return self._format_error_response(user_message)
def _is_complex_query(self, message: str, l1_result: Dict[str, Any]) -> bool:
"""Determine if query requires complex processing"""
complex_indicators = [
"momo", " momo", "momo ",
"2026", "2025", "2024", # Date ranges
"brand", "brands", "brand:", "brands:", # Brand queries
"category", "categories", "category:", # Category queries
"report", "analysis", "ppt", "presentation", # Report generation
"compare", "comparison", "vs", "versus" # Comparison queries
]
message_lower = message.lower()
# Check for complex indicators
for indicator in complex_indicators:
if indicator in message_lower:
return True
# Check L1 analysis result
if l1_result.get("complexity_score", 0) > 0.7:
return True
if l1_result.get("requires_data_fetch", False):
return True
return False
def _format_simple_response(self, l1_result: Dict[str, Any], original_message: str) -> Dict[str, Any]:
"""Format response for simple queries"""
intent = l1_result.get("intent", "unknown")
confidence = l1_result.get("confidence", 0.0)
# 繁體中文(台灣)回應模板
responses = {
"greeting": {
"zh_tw": "您好!我是 MOMO Pro 智能助理,今天需要什麼協助?",
"suggestions": ["查看今日業績", "商品排行榜", "市場情報摘要"],
},
"help": {
"zh_tw": (
"我可以協助您處理業績查詢、商品資訊、市場情報、產出報告等需求。\n"
"請直接輸入問題,或使用選單選擇功能。"
),
"suggestions": ["業績表現", "商品趨勢", "市場分析"],
},
"unknown": {
"zh_tw": "收到您的訊息,若需特定功能,請從選單選擇或使用 /help 查看可用指令。",
"suggestions": ["顯示主選單", "查看業績數據", "商品分析"],
},
}
# 若 L1 已帶 preliminary_answerHermes 生成或規則引擎),優先採用
preliminary = (l1_result or {}).get("preliminary_answer", "") or ""
response_data = responses.get(intent, responses["unknown"])
if preliminary:
response_data = {**response_data, "zh_tw": preliminary}
return {
"success": True,
"type": "simple_response",
"intent": intent,
"confidence": confidence,
"response_text": response_data["zh_tw"],
"suggestions": response_data["suggestions"],
"show_menu": intent == "unknown"
}
def _format_complex_response(self, l1_result: Dict[str, Any], l2_result: Dict[str, Any], original_message: str) -> Dict[str, Any]:
"""Format response for complex queries requiring data fetching"""
action_plan = l2_result.get("action_plan", {})
# Extract relevant information
query_type = self._extract_query_type(original_message)
date_range = self._extract_date_range(original_message)
brands = self._extract_brands(original_message)
# 繁體中文(台灣)回應
query_type_zh = {
"sales analysis": "業績分析",
"product analysis": "商品分析",
"market intelligence": "市場情報",
"report generation": "報告產製",
"comparative analysis": "比較分析",
"general query": "一般查詢",
}.get(query_type, query_type)
response_text = f"正在處理您的「{query_type_zh}」需求"
if date_range:
response_text += f",期間:{date_range}"
if brands:
response_text += f",品牌:{', '.join(brands)}"
response_text += "。分析準備中,稍候片刻…"
return {
"success": True,
"type": "complex_response",
"query_type": query_type,
"date_range": date_range,
"brands": brands,
"action_plan": action_plan,
"response_text": response_text,
"requires_processing": True,
"processing_status": "queued"
}
def _format_error_response(self, original_message: str) -> Dict[str, Any]:
"""Format error response in Traditional Chinese"""
return {
"success": False,
"type": "error_response",
"response_text": "抱歉,處理您的訊息時發生問題,請改用選單功能或稍後再試。",
"error_suggestions": [
"查看今日業績",
"商品排行榜",
"市場情報摘要",
"使用 /help 查看可用指令",
],
"show_menu": True,
}
def _extract_query_type(self, message: str) -> str:
"""Extract type of query from message"""
if any(word in message.lower() for word in ["sales", "revenue", "performance"]):
return "sales analysis"
elif any(word in message.lower() for word in ["product", "brand", "item"]):
return "product analysis"
elif any(word in message.lower() for word in ["market", "trend", "intelligence"]):
return "market intelligence"
elif any(word in message.lower() for word in ["report", "ppt", "presentation"]):
return "report generation"
elif any(word in message.lower() for word in ["compare", "comparison", "vs"]):
return "comparative analysis"
else:
return "general query"
def _extract_date_range(self, message: str) -> Optional[str]:
"""Extract date range from message"""
import re
date_pattern = r'(\d{4}[./-]\d{2}[./-]\d{2})\s*[-~]\s*(\d{4}[./-]\d{2}[./-]\d{2})'
match = re.search(date_pattern, message)
if match:
start = match.group(1).replace('/', '-').replace('.', '-')
end = match.group(2).replace('/', '-').replace('.', '-')
return f"{start}{end}"
return None
def _extract_brands(self, message: str) -> list:
"""Extract brand names from message (Chinese and English)"""
# Brand mapping: Chinese name -> English name
brand_mapping = {
# Chinese -> English mapping
"nivea": "Nivea",
"loreal": "Loreal",
"sk-ii": "SK-II",
"kiehls": "Kiehls",
"clinique": "Clinique",
"dior": "Dior",
"chanel": "Chanel",
"ysl": "YSL",
"givenchy": "Givenchy",
"hermes": "Hermes",
"gucci": "Gucci",
"prada": "Prada",
"versace": "Versace",
"armani": "Armani",
"coach": "Coach",
"michael kors": "Michael Kors",
# Specific Chinese brand names from user query
"neutrogena": "Neutrogena", # English name used in Chinese
"aveeno": "Aveeno", # English name used in Chinese
"estee lauder": "Estee Lauder",
"lancome": "Lancome",
"biotherm": "Biotherm",
"clarins": "Clarins",
"nars": "NARS",
"bobbi brown": "Bobbi Brown",
"mac": "MAC",
"tumi": "Tumi",
"samsonite": "Samsonite",
"longchamp": "Longchamp",
"shiseido": "Shiseido"
}
# Also include direct Chinese variations
chinese_variations = {
"nivea": "Nivea",
"loreal": "Loreal",
"sk-ii": "SK-II",
"kiehls": "Kiehls",
"clinique": "Clinique",
"dior": "Dior",
"chanel": "Chanel",
"neutrogena": "Neutrogena",
"aveeno": "Aveeno"
}
# Combine all brand mappings
all_brands = {**brand_mapping, **chinese_variations}
message_lower = message.lower()
found_brands = []
for brand_key, brand_name in all_brands.items():
if brand_key in message_lower:
if brand_name not in found_brands:
found_brands.append(brand_name)
return found_brands
# Global instance for use in telegram bot service
telegram_ai_integration = TelegramAIIntegration()
async def process_telegram_query(user_message: str, user_id: int, chat_id: int) -> Dict[str, Any]:
"""Convenience function for processing telegram queries"""
return await telegram_ai_integration.process_natural_language_query(user_message, user_id, chat_id)