Files
ewoooc/services/telegram_bot_service.py
ogt b3a7909b2b
All checks were successful
CD Pipeline / deploy (push) Successful in 1m29s
fix: add try/except guards to all unprotected Telegram handler functions
- Replace 2 silent `except Exception: pass` with logger.warning in handle_callback
- Wrap _handle_await_callback, _handle_main_menu_callback with top-level try/except (query.answer on error)
- Wrap _handle_complex_ai_response, _handle_simple_ai_response, _enhanced_keyword_matching, _process_await_input with top-level try/except (update.message.reply_text on error)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 19:47:49 +08:00

1904 lines
81 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Telegram Bot 服務
提供趨勢資料的 Telegram 互動功能
注意: 需要安裝 python-telegram-bot 套件
pip install python-telegram-bot>=20.0
功能:
- 主選單按鈕介面
- 分類選擇按鈕
- 趨勢查詢、AI 搜尋、文案生成
- 每日趨勢摘要推播
"""
import os
import json
import asyncio
import logging
import time as _time_mod
from typing import Optional
from datetime import date, timedelta
logger = logging.getLogger(__name__)
# H6 修補callback 速率限制(每 user_id 每分鐘最多 30 次 callback
# 訊息流在 routes/openclaw_bot_routes.py 已有 rate-limit這裡補上 callback 缺口。
_CB_RATE_LIMIT_PER_MIN = 30
_CB_RATE_WINDOW_SEC = 60
_cb_rate_tracker: dict = {} # {user_id: [timestamp, ...]}
def _check_cb_rate_limit(user_id: int) -> bool:
"""回傳 True = 允許False = 超過速率限制"""
if user_id is None:
return True # 無法辨識 user 時不阻擋(例如 inline query
now = _time_mod.time()
window = _cb_rate_tracker.setdefault(user_id, [])
# 清掉 60 秒以前的紀錄
_cb_rate_tracker[user_id] = [t for t in window if now - t < _CB_RATE_WINDOW_SEC]
if len(_cb_rate_tracker[user_id]) >= _CB_RATE_LIMIT_PER_MIN:
return False
_cb_rate_tracker[user_id].append(now)
return True
# 嘗試匯入 telegram 模組
try:
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardMarkup, KeyboardButton
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ConversationHandler
TELEGRAM_AVAILABLE = True
except ImportError:
TELEGRAM_AVAILABLE = False
logger.warning("python-telegram-bot 套件未安裝Telegram Bot 功能將無法使用")
# 商品分類列表
CATEGORIES = [
'美妝保養', '3C家電', '服飾配件', '居家生活', '母嬰用品',
'生鮮食品', '圖書文具', '戶外運動', '餐券票券', '醫療保健',
'美體保健', '寵物用品', '箱包精品', '車類百貨', '情趣用品'
]
class TrendTelegramBot:
"""趨勢資料庫 Telegram Bot 服務"""
def __init__(self, token: str = None):
"""
初始化 Bot
Args:
token: Telegram Bot Token (從 BotFather 取得)
"""
self.token = token or os.getenv('TELEGRAM_BOT_TOKEN')
self.application = None
self.is_running = False
if not TELEGRAM_AVAILABLE:
logger.error("Telegram Bot 無法初始化: 缺少 python-telegram-bot 套件")
return
if not self.token:
logger.warning("Telegram Bot Token 未設定")
async def start(self):
"""啟動 Bot"""
if not TELEGRAM_AVAILABLE or not self.token:
logger.error("無法啟動 Telegram Bot")
return False
try:
self.application = Application.builder().token(self.token).build()
# 註冊指令處理器
self.application.add_handler(CommandHandler("start", self.cmd_start))
self.application.add_handler(CommandHandler("help", self.cmd_help))
self.application.add_handler(CommandHandler("trend", self.cmd_trend))
self.application.add_handler(CommandHandler("search", self.cmd_search))
self.application.add_handler(CommandHandler("copy", self.cmd_copy))
self.application.add_handler(CommandHandler("keywords", self.cmd_keywords))
self.application.add_handler(CommandHandler("daily", self.cmd_daily))
# 回調查詢處理器
self.application.add_handler(CallbackQueryHandler(self.handle_callback))
# 一般訊息處理器
self.application.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND,
self.handle_message
))
# 啟動
await self.application.initialize()
await self.application.start()
await self.application.updater.start_polling()
self.is_running = True
logger.info("Telegram Bot 已啟動")
return True
except Exception as e:
logger.error(f"Telegram Bot 啟動失敗: {e}")
return False
async def stop(self):
"""停止 Bot"""
if self.application and self.is_running:
try:
await self.application.updater.stop()
await self.application.stop()
await self.application.shutdown()
self.is_running = False
logger.info("Telegram Bot 已停止")
except Exception as e:
logger.error(f"Telegram Bot 停止失敗: {e}")
# ========== 指令處理器 ==========
def _get_main_menu_keyboard(self):
"""建立主選單按鈕 - 7大功能類別"""
keyboard = [
[
InlineKeyboardButton("📊 業績查詢", callback_data="menu:sales"),
InlineKeyboardButton("🏆 商品廠商", callback_data="menu:products"),
],
[
InlineKeyboardButton("🎯 目標管理", callback_data="menu:goals"),
InlineKeyboardButton("📈 智能分析", callback_data="menu:analysis"),
],
[
InlineKeyboardButton("📄 簡報報表", callback_data="menu:reports"),
InlineKeyboardButton("<EFBFBD> 市場情報", callback_data="menu:market"),
],
[
InlineKeyboardButton("<EFBFBD> 競品日報", callback_data="menu:competitor"),
],
[
InlineKeyboardButton("❓ 使用說明", callback_data="cmd:help"),
],
]
return InlineKeyboardMarkup(keyboard)
def _get_category_keyboard(self, callback_prefix: str = "cat"):
"""建立分類選擇按鈕"""
# 每行 3 個按鈕
keyboard = []
row = []
for i, cat in enumerate(CATEGORIES):
row.append(InlineKeyboardButton(cat, callback_data=f"{callback_prefix}_{cat}"))
if len(row) == 3:
keyboard.append(row)
row = []
if row:
keyboard.append(row)
# 加入返回按鈕
keyboard.append([InlineKeyboardButton("🔙 返回主選單", callback_data="menu_main")])
return InlineKeyboardMarkup(keyboard)
async def cmd_start(self, update: Update, context):
"""開始指令 - 顯示主選單"""
user = update.effective_user
await update.message.reply_text(
f"👋 嗨,{user.first_name}\n\n"
f"我是 *MOMO 趨勢助手*,您的智能商業分析夥伴。\n"
f"請選擇下方功能,或直接輸入問題:",
reply_markup=self._get_main_menu_keyboard()
)
async def cmd_help(self, update: Update, context):
"""說明指令"""
help_text = """
📖 *指令說明*
*趨勢查詢*
/trend - 查看所有分類熱門趨勢
/trend 美妝 - 查看美妝分類趨勢
*AI 搜尋*
/search 夏季防曬推薦 - AI 搜尋並分析
*文案生成*
/copy 防曬乳 - 為商品生成行銷文案
*關鍵字*
/keywords - 近 7 天熱門關鍵字
*每日摘要*
/daily - 查看今日趨勢摘要
"""
await update.message.reply_text(help_text, parse_mode='Markdown')
async def cmd_trend(self, update: Update, context):
"""趨勢查詢指令"""
category = ' '.join(context.args) if context.args else None
await update.message.reply_text("🔄 正在查詢趨勢資料...")
try:
from database.trend_models import TrendRecord
from database.manager import get_session
session = get_session()
try:
date_from = date.today() - timedelta(days=7)
# 查詢熱門趨勢
query = session.query(TrendRecord).filter(
TrendRecord.trend_date >= date_from
)
if category:
query = query.filter(TrendRecord.category == category)
records = query.order_by(
TrendRecord.popularity_score.desc()
).limit(10).all()
finally:
session.close()
if not records:
await update.message.reply_text(
f"📭 {f'{category}分類' if category else ''}近 7 天沒有趨勢資料"
)
return
# 格式化回覆
title = f"📊 {category if category else '所有分類'}熱門趨勢 (近7天)\n\n"
lines = []
for i, r in enumerate(records, 1):
source_emoji = {
'ptt': '💬',
'dcard': '📱',
'google_news': '📰',
'youtube': '🎬'
}.get(r.source, '📄')
lines.append(
f"{i}. {source_emoji} {r.title[:30]}{'...' if len(r.title) > 30 else ''} "
f"(熱度:{r.popularity_score})"
)
await update.message.reply_text(title + '\n'.join(lines))
except Exception as e:
logger.error(f"[cmd_trend] error: {e}", exc_info=True)
await update.message.reply_text("❌ 系統錯誤,請稍後再試")
async def cmd_search(self, update: Update, context):
"""AI 搜尋指令"""
query = ' '.join(context.args)
if not query:
await update.message.reply_text("❓ 請輸入搜尋關鍵字\n範例: /search 夏季防曬推薦")
return
await update.message.reply_text(f"🔍 正在搜尋「{query}」...")
try:
from services.trend_crawler_service import get_trend_crawler_service
service = get_trend_crawler_service()
result = service.web_search_with_cache(query, search_type='trends')
if result['success']:
data = result['data']
parsed = data.get('result', {})
summary = parsed.get('summary', '無摘要')
results = parsed.get('results', [])[:5]
reply = f"🔍 *搜尋結果: {query}*\n\n"
reply += f"📝 *摘要:*\n{summary}\n\n"
if results:
reply += "*相關資訊:*\n"
for i, r in enumerate(results, 1):
title = r.get('title', '無標題')[:40]
reply += f"{i}. {title}\n"
await update.message.reply_text(reply, parse_mode='Markdown')
else:
await update.message.reply_text(f"❌ 搜尋失敗: {result.get('error', '未知錯誤')}")
except Exception as e:
logger.error(f"[cmd_search] error: {e}", exc_info=True)
await update.message.reply_text("❌ 系統錯誤,請稍後再試")
async def cmd_copy(self, update: Update, context):
"""文案生成指令"""
if not context.args:
await update.message.reply_text("❓ 請輸入商品名稱\n範例: /copy 防曬乳")
return
product_name = context.args[0]
context_hint = ' '.join(context.args[1:]) if len(context.args) > 1 else None
await update.message.reply_text(f"✍️ 正在為「{product_name}」生成文案...")
try:
from services.ollama_service import OllamaService
ollama = OllamaService()
prompt = f"""請為以下商品生成 3 種風格的行銷文案:
商品: {product_name}
{f'情境: {context_hint}' if context_hint else ''}
請分別生成:
1. 標準版 (100字內)
2. 活潑版 (含表情符號100字內)
3. 限時版 (強調緊迫感100字內)
以繁體中文回覆。"""
response = ollama.generate(prompt, temperature=0.7)
if response.success:
await update.message.reply_text(
f"✨ *{product_name} 行銷文案*\n\n{response.content}",
parse_mode='Markdown'
)
else:
await update.message.reply_text(f"❌ 生成失敗: {response.error}")
except Exception as e:
logger.error(f"[cmd_copy] error: {e}", exc_info=True)
await update.message.reply_text("❌ 系統錯誤,請稍後再試")
async def cmd_keywords(self, update: Update, context):
"""熱門關鍵字指令"""
category = ' '.join(context.args) if context.args else None
try:
from database.trend_models import TrendKeyword
from database.manager import get_session
from sqlalchemy import func
session = get_session()
try:
date_from = date.today() - timedelta(days=7)
query = session.query(
TrendKeyword.keyword,
func.sum(TrendKeyword.mention_count).label('total')
).filter(
TrendKeyword.trend_date >= date_from
).group_by(TrendKeyword.keyword)
if category:
query = query.filter(TrendKeyword.category == category)
keywords = query.order_by(
func.sum(TrendKeyword.mention_count).desc()
).limit(20).all()
finally:
session.close()
if not keywords:
await update.message.reply_text("📭 近 7 天沒有熱門關鍵字資料")
return
title = f"🏷️ {category if category else '所有分類'}熱門關鍵字 (近7天)\n\n"
lines = [f"{i}. {kw.keyword} ({kw.total}次)" for i, kw in enumerate(keywords, 1)]
await update.message.reply_text(title + '\n'.join(lines))
except Exception as e:
logger.error(f"[cmd_keywords] error: {e}", exc_info=True)
await update.message.reply_text("❌ 系統錯誤,請稍後再試")
async def cmd_daily(self, update: Update, context):
"""每日趨勢摘要"""
try:
from database.trend_models import TrendAnalysis
from database.manager import get_session
session = get_session()
try:
analysis = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == date.today(),
TrendAnalysis.analysis_type == 'daily_summary'
).first()
finally:
session.close()
if not analysis:
await update.message.reply_text("📭 今日尚無趨勢分析報告")
return
hot_keywords = json.loads(analysis.hot_keywords or '[]')
marketing = json.loads(analysis.marketing_suggestions or '[]')
reply = f"📰 *今日趨勢摘要*\n\n"
reply += f"📝 *概況:*\n{analysis.summary}\n\n"
if hot_keywords:
reply += f"🏷️ *熱門關鍵字:*\n{', '.join(hot_keywords[:10])}\n\n"
if marketing:
reply += f"💡 *行銷建議:*\n"
for i, m in enumerate(marketing[:5], 1):
reply += f"{i}. {m}\n"
await update.message.reply_text(reply, parse_mode='Markdown')
except Exception as e:
logger.error(f"[cmd_daily] error: {e}", exc_info=True)
await update.message.reply_text("❌ 系統錯誤,請稍後再試")
async def handle_callback(self, update: Update, context):
"""處理按鈕回調"""
query = update.callback_query
# H6: per-user rate-limit每分鐘 30 次)。
# 放在 query.answer() 之前;若本檔未來加入授權檢查,應置於授權之後以免讓
# 未授權 user 佔用 rate counter目前本檔無授權層由 python-telegram-bot
# Application 自行處理 token 範圍信任)。
_uid = query.from_user.id if query.from_user else None
if not _check_cb_rate_limit(_uid):
try:
await query.answer("操作太頻繁,請稍後再試", show_alert=False)
except Exception as _e:
logger.debug(f"[handle_callback] rate-limit answer failed: {_e}")
return
await query.answer()
data = query.data
try:
# ===== 主選單按鈕 =====
if data == "menu_main" or data == "menu:main":
await query.edit_message_text(
"👋 *MOMO 趨勢助手 Bot* — 請選擇功能類別:",
parse_mode='Markdown',
reply_markup=self._get_main_menu_keyboard()
)
# ===== 新的完整菜單系統 =====
elif data.startswith("menu:"):
await self._handle_main_menu_callback(query, data)
# ===== 舊的簡單菜單(向下相容) =====
elif data == "menu_trend":
await query.edit_message_text(
"📊 *熱門趨勢*\n\n請選擇分類:",
parse_mode='Markdown',
reply_markup=self._get_category_keyboard("trend")
)
elif data == "menu_search":
context.user_data['waiting_for'] = 'search_query'
await query.edit_message_text(
"🔍 *AI 搜尋*\n\n請輸入要搜尋的關鍵字:\n\n"
"例如:夏季防曬推薦、母親節禮物",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回主選單", callback_data="menu:main")
]])
)
elif data == "menu_copy":
context.user_data['waiting_for'] = 'copy_product'
await query.edit_message_text(
"✍️ *生成文案*\n\n請輸入商品名稱:\n\n"
"例如:防曬乳、保濕面膜",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回主選單", callback_data="menu:main")
]])
)
elif data == "menu_keywords":
await query.edit_message_text(
"🏷️ *熱門關鍵字*\n\n請選擇分類:",
parse_mode='Markdown',
reply_markup=self._get_category_keyboard("keywords")
)
elif data == "menu_daily":
await query.edit_message_text("📰 正在載入今日趨勢摘要...")
await self._show_daily_summary(query)
elif data == "menu_settings":
await self._show_settings(query)
# ===== 趨勢分類按鈕 =====
elif data.startswith("trend_"):
category = data[6:]
await query.edit_message_text(f"🔄 正在查詢 {category} 趨勢...")
await self._show_trend_by_category(query, category)
# ===== 關鍵字分類按鈕 =====
elif data.startswith("keywords_"):
category = data[9:]
await query.edit_message_text(f"🔄 正在查詢 {category} 熱門關鍵字...")
await self._show_keywords_by_category(query, category)
# ===== 設定按鈕 =====
elif data.startswith("settings_"):
await self._handle_settings_callback(query, data)
# ===== 降價決策按鈕(僅支援 momo:pa:xxx / momo:pr:xxx 格式)=====
elif data.startswith("momo:pa:"):
await self._handle_price_approve(query, data.split(":")[-1])
elif data.startswith("momo:pr:"):
await self._handle_price_reject(query, data.split(":")[-1])
# ===== L3 運維決策按鈕momo:ops:<action>:<task_name>=====
elif data.startswith("momo:ops:"):
await self._handle_ops_callback(query, data)
# ===== 批次定價決策momo:bpa / bpr:<batch_id>=====
elif data.startswith("momo:bpa:"):
await self._handle_batch_price_decision(query, data.split(":", 2)[-1], "approve")
elif data.startswith("momo:bpr:"):
await self._handle_batch_price_decision(query, data.split(":", 2)[-1], "reject")
# ===== 事件忽略momo:eig:<event_id>=====
elif data.startswith("momo:eig:"):
await self._handle_event_ignore(query, data.split(":", 2)[-1])
# ===== AI 回應建議按鈕 =====
elif data == "momo:menu:main":
await query.edit_message_text(
"👋 *MOMO 趨勢助手 Bot* — 請選擇功能類別:",
parse_mode='Markdown',
reply_markup=self._get_main_menu_keyboard()
)
elif data.startswith("momo:cmd:suggestion:"):
suggestion_text = data.split(":", 3)[-1].replace("_", " ")
await query.edit_message_text(f"⏳ 正在查詢:{suggestion_text}...")
chat_id = query.message.chat_id
import threading as _t
_t.Thread(
target=self._forward_cmd_to_openclaw,
args=("suggestion", suggestion_text, chat_id),
daemon=True,
).start()
# ===== 待輸入狀態按鈕await:xxx=====
elif data.startswith("await:"):
await self._handle_await_callback(query, data[6:], context)
# ===== OpenClaw 指令按鈕cmd:<cmd>:<arg>=====
elif data.startswith("cmd:"):
parts = data[4:].split(":", 1)
cmd = parts[0]
arg = parts[1] if len(parts) > 1 else ""
chat_id = query.message.chat_id
try:
await query.edit_message_text("⏳ 正在處理,請稍候...")
except Exception as _e:
logger.warning(f"[訊息編輯失敗,略過] {_e}")
import threading as _t
_t.Thread(
target=self._forward_cmd_to_openclaw,
args=(cmd, arg, chat_id),
daemon=True,
).start()
except Exception as _e:
logger.error(f"[handle_callback] error processing '{data}': {_e}", exc_info=True)
try:
await query.edit_message_text(
"❌ 系統處理異常,請重試或返回主選單",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回主選單", callback_data="menu:main")
]])
)
except Exception as _e:
logger.warning(f"[訊息編輯失敗,略過] {_e}")
async def _handle_await_callback(self, query, await_type: str, context):
"""處理所有 await: 類型的按鈕,設定 waiting_for 狀態並提示用戶輸入"""
try:
prompts = {
'date_range_sales': ('📅 業績查詢 — 指定日期區間', '請輸入日期區間\n格式YYYY/MM/DD-YYYY/MM/DD\n例如2026/04/01-2026/04/07'),
'date_top': ('📅 商品排行 — 指定日期', '請輸入日期\n格式YYYY/MM/DD\n例如2026/04/25'),
'date_competitor': ('📅 競品報告 — 指定日期', '請輸入日期\n格式YYYY/MM/DD\n例如2026/04/25'),
'date_ppt_daily': ('📅 日報簡報 — 指定日期', '請輸入日期\n格式YYYY/MM/DD\n例如2026/04/25'),
'date_ppt_monthly': ('📅 月報簡報 — 指定月份', '請輸入月份\n格式YYYY/MM\n例如2026/04'),
'date_ppt_vendor': ('📅 廠商簡報 — 指定月份', '請輸入月份\n格式YYYY/MM\n例如2026/04'),
'date_trend_month': ('📅 月份趨勢 — 指定月份', '請輸入月份\n格式YYYY/MM\n例如2026/04'),
'date_trend_year': ('📅 年度趨勢 — 指定年份', '請輸入年份\n格式YYYY\n例如2026'),
'date_trend_quarter':('📅 季度趨勢 — 指定季度', '請輸入季度\n格式YYYY/QQ=1~4\n例如2026/2'),
'goal_daily': ('🎯 日目標設定', '請輸入日目標金額(元,純數字)\n例如500000'),
'goal_monthly': ('🎯 月目標設定', '請輸入月目標金額(元,純數字)\n例如15000000'),
'goal_quarterly': ('🎯 季目標設定', '請輸入季目標金額(元,純數字)\n例如45000000'),
'goal_half': ('🎯 半年目標設定', '請輸入半年目標金額(元,純數字)\n例如90000000'),
'goal_yearly': ('🎯 年目標設定', '請輸入年目標金額(元,純數字)\n例如180000000'),
'promo_range': ('🎉 促銷效益追蹤', '請輸入促銷日期區間\n格式YYYY/MM/DD-YYYY/MM/DD\n例如2026/04/01-2026/04/07'),
'search_compare': ('🔍 競品關鍵字比價', '請輸入商品關鍵字\n例如:防曬乳 SPF50'),
}
if await_type not in prompts:
await query.answer(f"未知類型:{await_type}", show_alert=True)
return
title, hint = prompts[await_type]
context.user_data['waiting_for'] = await_type
await query.edit_message_text(
f"*{title}*\n\n{hint}\n\n請直接在對話框輸入:",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("❌ 取消", callback_data="menu:main")
]])
)
except Exception as e:
logger.error(f"[_handle_await_callback] 處理失敗: {e}", exc_info=True)
try:
await query.answer("❌ 處理失敗,請重試", show_alert=True)
except Exception:
pass
def _forward_cmd_to_openclaw(self, cmd: str, arg: str, chat_id: int):
"""轉發 cmd:* 指令到 OpenClaw Flask 內部 API"""
import requests as _req
try:
internal_url = os.getenv("OPENCLAW_INTERNAL_URL", "http://momo-pro-system:80")
token = os.getenv("INTERNAL_WEBHOOK_TOKEN", "")
_req.post(
f"{internal_url}/bot/internal/cmd",
json={"chat_id": chat_id, "cmd": cmd, "arg": arg},
headers={"X-Internal-Token": token},
timeout=10,
)
except Exception as e:
logger.warning(f"[TelegramBot] forward cmd failed: {e}")
async def _handle_main_menu_callback(self, query, data: str):
"""處理主選單回調 - 完整功能菜單系統"""
try:
key = data[5:] # 移除 'menu:' 前綴
titles = {
'sales': '📊 *業績查詢* — 選擇日期或直接輸入',
'products': '🏆 *商品廠商* — 選擇查詢範圍',
'goals': '🎯 *目標管理* — 查看或設定業績目標',
'analysis': '📈 *智能分析* — 選擇分析類型',
'reports': '📄 *簡報報表* — 選擇報告類型',
'market': '🌐 *市場情報* — 即時資訊',
'competitor': '📊 *競品比價日報* — 選擇分析日期',
'competitor_ppt': '📄 *競品比價簡報* — 選擇時間範圍',
'category': '🗂 *分類業績鑽取* — 點選分類深入分析',
'trend': '📈 *業績趨勢* — 選擇時間範圍',
}
# 生成子選單
if key == 'sales':
keyboard = self._get_sales_submenu()
elif key == 'products':
keyboard = self._get_products_submenu()
elif key == 'goals':
keyboard = self._get_goals_submenu()
elif key == 'analysis':
keyboard = self._get_analysis_submenu()
elif key == 'reports':
keyboard = self._get_reports_submenu()
elif key == 'market':
keyboard = self._get_market_submenu()
elif key == 'competitor':
keyboard = self._get_competitor_submenu()
elif key == 'competitor_ppt':
keyboard = self._get_competitor_ppt_submenu()
elif key == 'category':
keyboard = self._get_category_submenu()
elif key == 'trend':
keyboard = self._get_trend_submenu()
else:
# 未知選單,返回主選單
keyboard = self._get_main_menu_keyboard()
key = 'main'
titles[key] = '👋 *MOMO 趨勢助手 Bot* — 請選擇功能類別'
await query.edit_message_text(
titles.get(key, '請選擇'),
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup(keyboard)
)
except Exception as e:
logger.error(f"[_handle_main_menu_callback] 處理失敗: {e}", exc_info=True)
try:
await query.answer("❌ 處理失敗,請重試", show_alert=True)
except Exception:
pass
def _get_sales_submenu(self):
"""業績查詢子選單"""
from datetime import datetime, timedelta
today = datetime.now().strftime('%Y/%m/%d')
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y/%m/%d')
current_month = datetime.now().strftime('%Y/%m')
return [
[{'text': f'📊 今日 ({today[-5:]})', 'callback_data': 'cmd:sales:' + today},
{'text': f'⬅ 昨日 ({yesterday[-5:]})', 'callback_data': 'cmd:sales:' + yesterday}],
[{'text': '📅 每週業績', 'callback_data': 'cmd:trend:week'},
{'text': '📅 每月業績', 'callback_data': 'cmd:history:' + current_month}],
[{'text': '📅 每季業績', 'callback_data': 'cmd:trend:quarter'},
{'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}],
[{'text': '📈 趨勢分析', 'callback_data': 'menu:trend'},
{'text': '🔄 同期比較', 'callback_data': 'cmd:compare:' + today}],
[{'text': '🗂 分類業績', 'callback_data': 'cmd:category:' + today},
{'text': '📅 日期/區間', 'callback_data': 'await:date_range_sales'}],
[{'text': '🗃 月份覽', 'callback_data': 'cmd:history'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_products_submenu(self):
"""商品廠商子選單"""
from datetime import datetime, timedelta
today = datetime.now().strftime('%Y/%m/%d')
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y/%m/%d')
return [
[{'text': f'🏆 熱銷商品 ({today[-5:]})', 'callback_data': 'cmd:top:' + today},
{'text': f'🏭 熱銷廠商 ({today[-5:]})', 'callback_data': 'cmd:vendor:' + today}],
[{'text': f'⬅ 昨日商品 ({yesterday[-5:]})', 'callback_data': 'cmd:top:' + yesterday},
{'text': '🧬 商品健康', 'callback_data': 'cmd:health:' + today}],
[{'text': '📦 補貨預測', 'callback_data': 'cmd:restock'},
{'text': '🗂 分類鑽取', 'callback_data': 'menu:category'}],
[{'text': '📅 指定日期', 'callback_data': 'await:date_top'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_goals_submenu(self):
"""目標管理子選單"""
return [
[{'text': '📋 查看達成率', 'callback_data': 'cmd:goal'},
{'text': '📊 日目標設定', 'callback_data': 'await:goal_daily'}],
[{'text': '📅 月目標設定', 'callback_data': 'await:goal_monthly'},
{'text': '📊 季目標設定', 'callback_data': 'await:goal_quarterly'}],
[{'text': '📈 半年目標設定', 'callback_data': 'await:goal_half'},
{'text': '📊 年目標設定', 'callback_data': 'await:goal_yearly'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_analysis_submenu(self):
"""智能分析子選單"""
from datetime import datetime
today = datetime.now().strftime('%Y/%m/%d')
return [
[{'text': '🎲 策略矩陣', 'callback_data': 'cmd:strategy:' + today},
{'text': '📈 業績趨勢', 'callback_data': 'menu:trend'}],
[{'text': '🧬 商品健康', 'callback_data': 'cmd:health:' + today},
{'text': '🗂 分類業績', 'callback_data': 'cmd:category:' + today}],
[{'text': '🎉 促銷追蹤', 'callback_data': 'await:promo_range'},
{'text': '📦 補貨預測', 'callback_data': 'cmd:restock'}],
[{'text': '📊 趨勢圖表', 'callback_data': 'cmd:chart'},
{'text': '🔄 同期比較', 'callback_data': 'cmd:compare:' + today}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_reports_submenu(self):
"""簡報報表子選單"""
return [
# ── 定期報告
[{'text': '📄 日報', 'callback_data': 'cmd:ppt:daily'},
{'text': '📈 週報', 'callback_data': 'cmd:ppt:weekly'}],
[{'text': '📅 月報', 'callback_data': 'cmd:ppt:monthly'},
{'text': '📋 下載報表','callback_data': 'cmd:report'}],
# ── 策略簡報
[{'text': '🧩 策略(日)', 'callback_data': 'cmd:ppt:strategy'},
{'text': '🧩 策略(週)', 'callback_data': 'cmd:ppt:strategy weekly'}],
[{'text': '🧩 策略(月)', 'callback_data': 'cmd:ppt:strategy monthly'},
{'text': '🧩 策略(季)', 'callback_data': 'cmd:ppt:strategy quarterly'}],
[{'text': '🧩 策略(半年)','callback_data': 'cmd:ppt:strategy half'},
{'text': '🧩 策略(年)', 'callback_data': 'cmd:ppt:strategy yearly'}],
# ── 促銷 & 競品
[{'text': '🎉 促銷效益簡報', 'callback_data': 'await:promo_range'},
{'text': '🔍 競品比較', 'callback_data': 'menu:competitor'}],
# ── 新增:成長趨勢 / 廠商 / BCG
[{'text': '📈 成長趨勢報告', 'callback_data': 'cmd:ppt:growth'},
{'text': '🏭 廠商業績報告', 'callback_data': 'cmd:ppt:vendor'}],
[{'text': '🎯 BCG 品牌矩陣', 'callback_data': 'cmd:ppt:bcg'},
{'text': '📅 指定月份廠商', 'callback_data': 'await:date_ppt_vendor'}],
# ── 自訂
[{'text': '📅 指定日期日報', 'callback_data': 'await:date_ppt_daily'},
{'text': '📅 指定月份月報', 'callback_data': 'await:date_ppt_monthly'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_market_submenu(self):
"""市場情報子選單 - 完整版本"""
return [
[{'text': '📰 電商新聞', 'callback_data': 'cmd:news'},
{'text': '🌤 台北天氣', 'callback_data': 'cmd:weather'}],
[{'text': '🔥 Google熱搜', 'callback_data': 'cmd:trends'},
{'text': '💬 Dcard口碑', 'callback_data': 'cmd:dcard'}],
[{'text': '💱 台銀匯率', 'callback_data': 'cmd:exchange'},
{'text': '📅 電商節慶', 'callback_data': 'cmd:calendar'}],
[{'text': '▶️ YouTube爆紅商品', 'callback_data': 'cmd:youtube'},
{'text': '🧠 AI學習狀態', 'callback_data': 'cmd:learn'}],
[{'text': '🔍 關鍵字比價', 'callback_data': 'await:search_compare'},
{'text': '📷 圖片比價說明', 'callback_data': 'cmd:photo_search_help'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_competitor_submenu(self):
"""競品日報第二層:所有選項直接產 PPT"""
from datetime import datetime, timedelta
today = datetime.now()
yesterday = today - timedelta(days=1)
td_str = today.strftime('%Y/%m/%d')
yd_str = yesterday.strftime('%Y/%m/%d')
td_label = today.strftime('%m/%d')
yd_label = yesterday.strftime('%m/%d')
return [
[{'text': f'📊 今日簡報 ({td_label})', 'callback_data': f'cmd:ppt:competitor {td_str}'},
{'text': f'📊 昨日報 ({yd_label})', 'callback_data': f'cmd:ppt:competitor {yd_str}'}],
[{'text': '📈 本週比較', 'callback_data': 'cmd:ppt:competitor weekly'},
{'text': '📆 本月比較', 'callback_data': 'cmd:ppt:competitor monthly'}],
[{'text': '🗃 本季比較', 'callback_data': 'cmd:ppt:competitor quarterly'},
{'text': '📅 指定日期', 'callback_data': 'await:date_competitor'}],
[{'text': '📄 更多週期 →', 'callback_data': 'menu:competitor_ppt'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_competitor_ppt_submenu(self):
"""競品 PPT 長週期選單(第三層)— 半年/年;日/週/月/季已在第二層"""
return [
[{'text': '📆 半年比較', 'callback_data': 'cmd:ppt:competitor half'},
{'text': '🗓 年比較', 'callback_data': 'cmd:ppt:competitor yearly'}],
[{'text': '← 返回競品日報', 'callback_data': 'menu:competitor'}],
]
def _get_category_submenu(self):
"""分類業績鑽取 — 顯示 L1 固定分類按鈕"""
from datetime import datetime
today = datetime.now().strftime('%Y/%m/%d')
CATS = [
('美妝保養', '💄'), ('3C家電', '📱'), ('服飾配件', '👕'),
('居家生活', '🏠'), ('母嬰用品', '🍼'), ('生鮮食品', '🥗'),
('圖書文具', '📚'), ('戶外運動', ''), ('餐券票券', '🎫'),
('醫療保健', '💊'), ('美體保健', '💆'), ('寵物用品', '🐕'),
('箱包精品', '👜'), ('車類百貨', '🚗'), ('情趣用品', '❤️'),
]
rows = []
for i in range(0, len(CATS), 2):
pair = []
for cat, icon in CATS[i:i+2]:
pair.append({'text': f'{icon} {cat}', 'callback_data': f'cmd:catdetail:{cat}:{today}'})
rows.append(pair)
rows.append([{'text': '🗂 全分類清單', 'callback_data': f'cmd:category:{today}'}])
rows.append([{'text': '← 返回主選單', 'callback_data': 'menu:main'}])
return rows
def _get_trend_submenu(self):
"""業績趨勢子選單"""
return [
[{'text': '📅 近7日', 'callback_data': 'cmd:trend:7'},
{'text': '📅 近1個月', 'callback_data': 'cmd:trend:month'}],
[{'text': '📅 近3個月', 'callback_data': 'cmd:trend:quarter'},
{'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}],
[{'text': '📅 近1年', 'callback_data': 'cmd:trend:year'},
{'text': '📅 指定月份', 'callback_data': 'await:date_trend_month'}],
[{'text': '📅 指定年份', 'callback_data': 'await:date_trend_year'},
{'text': '📅 指定季度', 'callback_data': 'await:date_trend_quarter'}],
[{'text': '← 返回業績查詢', 'callback_data': 'menu:sales'}],
]
async def _handle_price_approve(self, query, insight_id_str: str):
"""批准降價:寫 KM feedback + 移除按鈕"""
from services.openclaw_learning_service import store_insight
from datetime import date as date_cls
try:
insight_id = int(insight_id_str)
except ValueError:
await query.answer("無效的決策 ID", show_alert=True)
return
user = query.from_user
operator = user.full_name or f"id_{user.id}"
store_insight(
insight_type="price_decision_feedback",
content=f"管理員批准降價建議source_insight_id={insight_id}",
period=date_cls.today().isoformat(),
metadata={
"decision": "approve",
"source_insight_id": insight_id,
"operator": operator,
"operator_tg_id": user.id,
}
)
from services.telegram_templates import decision_result
await query.edit_message_text(
decision_result(query.message.text or "", "approve", operator),
parse_mode='HTML'
)
async def _handle_price_reject(self, query, insight_id_str: str):
"""拒絕降價:寫 KM 訓練保守策略 + 移除按鈕"""
from services.openclaw_learning_service import store_insight
from datetime import date as date_cls
try:
insight_id = int(insight_id_str)
except ValueError:
await query.answer("無效的決策 ID", show_alert=True)
return
user = query.from_user
operator = user.full_name or f"id_{user.id}"
store_insight(
insight_type="price_decision_feedback",
content=f"管理員拒絕降價建議source_insight_id={insight_id}),訓練保守策略",
period=date_cls.today().isoformat(),
metadata={
"decision": "reject",
"source_insight_id": insight_id,
"operator": operator,
"operator_tg_id": user.id,
}
)
from services.telegram_templates import decision_result
await query.edit_message_text(
decision_result(
query.message.text or "", "reject", operator,
note="已記錄為保守策略訓練資料"
),
parse_mode='HTML'
)
async def _handle_ops_callback(self, query, data: str):
"""
L3 運維決策 callbackADR-012 Phase 4
callback_data 格式momo:ops:<action>:<task_name>
actions: pause1h / pause6h / retry / resume
"""
from services.agent_actions import OPS_ACTIONS
from services.telegram_templates import ops_action_result
try:
_, _, action, task_name = data.split(":", 3)
except ValueError:
await query.answer("無效的 ops callback 格式", show_alert=True)
return
user = query.from_user
operator = user.full_name or f"id_{user.id}"
# action → OPS_ACTIONS 呼叫對應
action_map = {
"pause1h": ("pause_task", {"task_name": task_name, "duration_min": 60}),
"pause6h": ("pause_task", {"task_name": task_name, "duration_min": 360}),
"retry": ("force_retry_now", {"task_name": task_name}),
"resume": ("resume_task", {"task_name": task_name}),
}
mapped = action_map.get(action)
if mapped is None:
await query.answer(f"未知 ops action: {action}", show_alert=True)
return
fn_name, params = mapped
fn = OPS_ACTIONS.get(fn_name)
if fn is None:
await query.answer(f"OPS_ACTIONS 未定義 {fn_name}", show_alert=True)
return
params["operator"] = operator
try:
result = fn(**params)
except Exception as e:
result = {"status": "error", "error": str(e)[:200]}
logger.error(f"[TelegramBot] ops {action} 例外:{e}")
await query.edit_message_text(
ops_action_result(query.message.text or "", action, operator, result),
parse_mode='HTML'
)
async def _handle_batch_price_decision(self, query, batch_id: str, action: str):
"""
批次定價決策 callbackADR-012 C2 修復)
callback_data: momo:bpa:<batch_id> / momo:bpr:<batch_id>
流程:(1) 走 event_router.dispatch 走 L2 audit(2) 寫 KM feedback(3) 編輯訊息
"""
from services.openclaw_learning_service import store_insight
from services.event_router import dispatch as event_dispatch
from datetime import date as date_cls, datetime
user = query.from_user
operator = user.full_name or f"id_{user.id}"
action_label = "批准" if action == "approve" else "拒絕"
# 1) EventRouter dispatch — audit trailL2
try:
await event_dispatch({
"event_type": "batch_price_decision",
"severity": "alert",
"source": "telegram_bot",
"title": f"批次定價 {action_label}",
"summary": f"batch_id={batch_id} by {operator}",
"metadata": {
"batch_id": batch_id,
"decision": action,
"operator": operator,
"operator_tg_id": user.id,
},
})
except Exception as e:
logger.warning(f"[TelegramBot] batch_price event_dispatch failed: {e}")
# 2) KM feedback保守/積極策略訓練數據)
try:
note = "訓練積極定價策略" if action == "approve" else "訓練保守定價策略"
store_insight(
insight_type="batch_price_decision_feedback",
content=f"管理員{action_label}批次定價batch_id={batch_id}{note}",
period=date_cls.today().isoformat(),
metadata={
"decision": action,
"batch_id": batch_id,
"operator": operator,
"operator_tg_id": user.id,
},
)
except Exception as e:
logger.error(f"[TelegramBot] batch_price store_insight failed: {e}")
await query.answer("決策已記錄但 KM 寫入失敗", show_alert=True)
return
# 3) ack + 編輯原訊息加上執行紀錄
await query.answer(f"{action_label}批次決策", show_alert=False)
ts = datetime.now().strftime("%H:%M")
icon = "" if action == "approve" else ""
footer = f"\n\n━━━━━━━━━━━━━━━━━━━━\n{icon}{action_label} by {operator} at {ts}"
try:
await query.edit_message_text(
(query.message.text or "") + footer,
parse_mode='HTML',
)
except Exception as e:
logger.warning(f"[TelegramBot] batch_price edit_message failed: {e}")
async def _handle_event_ignore(self, query, event_id: str):
"""
事件忽略 callbackADR-012 C2 修復)
callback_data: momo:eig:<event_id>
流程:寫 KM + event_router 留痕 + 編輯訊息
"""
from services.openclaw_learning_service import store_insight
from services.event_router import dispatch as event_dispatch
from datetime import date as date_cls, datetime
user = query.from_user
operator = user.full_name or f"id_{user.id}"
# 1) EventRouter — L0 留痕severity=info不再觸發 AI 分析)
try:
await event_dispatch({
"event_type": "event_ignored",
"severity": "info",
"source": "telegram_bot",
"title": f"事件 {event_id} 已忽略",
"summary": f"操作員 {operator} 手動忽略告警事件",
"metadata": {
"event_id": event_id,
"operator": operator,
"operator_tg_id": user.id,
},
})
except Exception as e:
logger.warning(f"[TelegramBot] event_ignore dispatch failed: {e}")
# 2) KM 訓練數據 —— 幫 L1/L2 學習哪類事件不需打擾
try:
store_insight(
insight_type="event_ignore_feedback",
content=f"管理員忽略事件event_id={event_id}),訓練降噪策略",
period=date_cls.today().isoformat(),
metadata={
"event_id": event_id,
"decision": "ignore",
"operator": operator,
"operator_tg_id": user.id,
},
)
except Exception as e:
logger.error(f"[TelegramBot] event_ignore store_insight failed: {e}")
await query.answer("決策已發送但 KM 寫入失敗", show_alert=True)
return
# 3) ack + 編輯訊息
await query.answer("已忽略此事件", show_alert=False)
ts = datetime.now().strftime("%H:%M")
footer = f"\n\n━━━━━━━━━━━━━━━━━━━━\n🛑 已忽略 by {operator} at {ts}"
try:
await query.edit_message_text(
(query.message.text or "") + footer,
parse_mode='HTML',
)
except Exception as e:
logger.warning(f"[TelegramBot] event_ignore edit_message failed: {e}")
async def _show_trend_by_category(self, query, category: str):
"""顯示指定分類的趨勢"""
try:
from database.trend_models import TrendRecord
from database.manager import get_session
session = get_session()
try:
date_from = date.today() - timedelta(days=7)
records = session.query(TrendRecord).filter(
TrendRecord.trend_date >= date_from,
TrendRecord.category == category
).order_by(
TrendRecord.popularity_score.desc()
).limit(10).all()
finally:
session.close()
if not records:
await query.edit_message_text(
f"📭 {category} 分類近 7 天沒有趨勢資料",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回選擇分類", callback_data="menu_trend"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
return
lines = [f"📊 *{category} 熱門趨勢* (近7天)\n"]
for i, r in enumerate(records, 1):
source_emoji = {'ptt': '💬', 'dcard': '📱', 'google_news': '📰', 'youtube': '🎬'}.get(r.source, '📄')
title = r.title[:25] + '...' if len(r.title) > 25 else r.title
lines.append(f"{i}. {source_emoji} {title} ({r.popularity_score})")
await query.edit_message_text(
'\n'.join(lines),
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔄 重新整理", callback_data=f"trend_{category}"),
InlineKeyboardButton("📊 其他分類", callback_data="menu_trend")
], [
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
except Exception as e:
logger.error(f"[_show_trend_by_category] error: {e}", exc_info=True)
await query.edit_message_text(
"❌ 系統錯誤,請稍後再試",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回", callback_data="menu_main")
]])
)
async def _show_keywords_by_category(self, query, category: str):
"""顯示指定分類的熱門關鍵字"""
try:
from database.trend_models import TrendKeyword
from database.manager import get_session
from sqlalchemy import func
session = get_session()
try:
date_from = date.today() - timedelta(days=7)
keywords = session.query(
TrendKeyword.keyword,
func.sum(TrendKeyword.mention_count).label('total')
).filter(
TrendKeyword.trend_date >= date_from,
TrendKeyword.category == category
).group_by(TrendKeyword.keyword).order_by(
func.sum(TrendKeyword.mention_count).desc()
).limit(15).all()
finally:
session.close()
if not keywords:
await query.edit_message_text(
f"📭 {category} 分類近 7 天沒有熱門關鍵字",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回選擇分類", callback_data="menu_keywords"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
return
lines = [f"🏷️ *{category} 熱門關鍵字* (近7天)\n"]
for i, kw in enumerate(keywords, 1):
lines.append(f"{i}. {kw.keyword} ({kw.total}次)")
await query.edit_message_text(
'\n'.join(lines),
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔄 重新整理", callback_data=f"keywords_{category}"),
InlineKeyboardButton("🏷️ 其他分類", callback_data="menu_keywords")
], [
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
except Exception as e:
logger.error(f"[_show_keywords_by_category] error: {e}", exc_info=True)
await query.edit_message_text(
"❌ 系統錯誤,請稍後再試",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回", callback_data="menu_main")
]])
)
async def _show_daily_summary(self, query):
"""顯示每日趨勢摘要"""
try:
from database.trend_models import TrendAnalysis
from database.manager import get_session
session = get_session()
try:
analysis = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == date.today(),
TrendAnalysis.analysis_type == 'daily_summary'
).first()
finally:
session.close()
if not analysis:
await query.edit_message_text(
"📭 今日尚無趨勢分析報告\n\n趨勢爬蟲每 2 小時執行一次",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔄 重新載入", callback_data="menu_daily"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
return
hot_keywords = json.loads(analysis.hot_keywords or '[]')
marketing = json.loads(analysis.marketing_suggestions or '[]')
lines = ["📰 *今日趨勢摘要*\n"]
lines.append(f"📝 *概況:*\n{analysis.summary[:200]}...\n" if len(analysis.summary) > 200 else f"📝 *概況:*\n{analysis.summary}\n")
if hot_keywords:
lines.append(f"🏷️ *熱門關鍵字:*\n{', '.join(hot_keywords[:8])}\n")
if marketing:
lines.append("💡 *行銷建議:*")
for i, m in enumerate(marketing[:3], 1):
lines.append(f"{i}. {m[:50]}...")
await query.edit_message_text(
'\n'.join(lines),
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔄 重新載入", callback_data="menu_daily"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
except Exception as e:
logger.error(f"[_show_daily_summary] error: {e}", exc_info=True)
await query.edit_message_text(
"❌ 系統錯誤,請稍後再試",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回", callback_data="menu_main")
]])
)
async def _show_settings(self, query):
"""顯示設定選單"""
keyboard = [
[InlineKeyboardButton("🔔 開啟趨勢通知", callback_data="settings_notify_on")],
[InlineKeyboardButton("🔕 關閉趨勢通知", callback_data="settings_notify_off")],
[InlineKeyboardButton("📊 訂閱每日摘要", callback_data="settings_daily_on")],
[InlineKeyboardButton("📭 取消每日摘要", callback_data="settings_daily_off")],
[InlineKeyboardButton("🏠 主選單", callback_data="menu_main")],
]
await query.edit_message_text(
"⚙️ *設定*\n\n請選擇要調整的設定:",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup(keyboard)
)
async def _handle_settings_callback(self, query, data: str):
"""處理設定相關的回調"""
from database.trend_models import TelegramUser
from database.manager import get_session
user_id = query.from_user.id
session = get_session()
try:
# 取得或建立用戶記錄
tg_user = session.query(TelegramUser).filter(
TelegramUser.telegram_id == user_id
).first()
if not tg_user:
tg_user = TelegramUser(
telegram_id=user_id,
telegram_username=query.from_user.username,
display_name=query.from_user.first_name,
is_active=True
)
session.add(tg_user)
# 處理設定變更
if data == "settings_notify_on":
tg_user.notify_trends = True
msg = "✅ 已開啟趨勢通知"
elif data == "settings_notify_off":
tg_user.notify_trends = False
msg = "🔕 已關閉趨勢通知"
elif data == "settings_daily_on":
tg_user.notify_daily_summary = True
msg = "✅ 已訂閱每日摘要 (每天 09:00 發送)"
elif data == "settings_daily_off":
tg_user.notify_daily_summary = False
msg = "📭 已取消每日摘要訂閱"
else:
msg = "❓ 未知的設定"
session.commit()
await query.edit_message_text(
f"{msg}\n\n目前設定:\n"
f"🔔 趨勢通知: {'開啟' if tg_user.notify_trends else '關閉'}\n"
f"📰 每日摘要: {'訂閱中' if tg_user.notify_daily_summary else '未訂閱'}",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("⚙️ 返回設定", callback_data="menu_settings"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
except Exception as e:
logger.error(f"[_handle_settings_callback] error: {e}", exc_info=True)
session.rollback()
await query.edit_message_text("❌ 系統錯誤,請稍後再試")
finally:
session.close()
async def handle_message(self, update: Update, context):
"""Enhanced natural language processing with AI integration"""
text = update.message.text
waiting_for = context.user_data.get('waiting_for')
# Handle waiting states
if waiting_for == 'search_query':
context.user_data['waiting_for'] = None
await self._process_search(update, text)
return
if waiting_for == 'copy_product':
context.user_data['waiting_for'] = None
await self._process_copy(update, text)
return
if waiting_for and waiting_for.startswith(('date_', 'goal_', 'promo_', 'search_compare')):
context.user_data['waiting_for'] = None
await self._process_await_input(update, context, waiting_for, text)
return
# Enhanced natural language processing with AI integration
try:
# Import AI integration
from services.telegram_ai_integration import process_telegram_query
# Process with AI
user_id = update.effective_user.id
chat_id = update.effective_chat.id
ai_result = await process_telegram_query(text, user_id, chat_id)
if ai_result.get('success', False):
response_type = ai_result.get('type', 'simple_response')
if response_type == 'complex_response':
# Handle complex queries
await self._handle_complex_ai_response(update, ai_result)
else:
# Handle simple responses
await self._handle_simple_ai_response(update, ai_result)
else:
# Fallback to enhanced keyword matching
await self._enhanced_keyword_matching(update, context, text)
except Exception as e:
logger.error(f"[handle_message] AI processing error: {e}", exc_info=True)
# Fallback to enhanced keyword matching
await self._enhanced_keyword_matching(update, context, text)
async def _handle_complex_ai_response(self, update: Update, ai_result: dict):
"""Handle complex AI responses"""
try:
response_text = ai_result.get('response_text', '正在處理您的請求...')
await update.message.reply_text(
f"⏳ 處理中...\n\n{response_text}",
parse_mode='Markdown'
)
query_type = ai_result.get('query_type', 'general query')
suggestions = self._get_query_suggestions(query_type)
await update.message.reply_text(
"根據您的需求,建議使用以下功能:\n\n" +
"\n".join([f"· {s}" for s in suggestions]) +
"\n\n或透過選單直接操作:",
reply_markup=self._get_main_menu_keyboard()
)
except Exception as e:
logger.error(f"[_handle_complex_ai_response] 處理失敗: {e}", exc_info=True)
try:
await update.message.reply_text("❌ 處理時發生錯誤,請稍後再試或使用 /help")
except Exception:
pass
async def _handle_simple_ai_response(self, update: Update, ai_result: dict):
"""Handle simple AI responses"""
try:
response_text = ai_result.get('response_text', '請問有什麼我可以幫您的嗎?')
suggestions = ai_result.get('suggestions', [])
show_menu = ai_result.get('show_menu', False)
if suggestions and not show_menu:
# Show suggestions as buttons
keyboard = []
for i, suggestion in enumerate(suggestions):
if i % 2 == 0:
keyboard.append([])
keyboard[-1].append({
'text': suggestion,
'callback_data': f'momo:cmd:suggestion:{suggestion.lower().replace(" ", "_")}'
})
# Add main menu button
keyboard.append([{'text': '主選單', 'callback_data': 'momo:menu:main'}])
await update.message.reply_text(
response_text,
reply_markup=InlineKeyboardMarkup(keyboard)
)
else:
# Show with main menu
await update.message.reply_text(
response_text,
reply_markup=self._get_main_menu_keyboard()
)
except Exception as e:
logger.error(f"[_handle_simple_ai_response] 處理失敗: {e}", exc_info=True)
try:
await update.message.reply_text("❌ 處理時發生錯誤,請稍後再試或使用 /help")
except Exception:
pass
async def _enhanced_keyword_matching(self, update: Update, context, text: str):
"""Enhanced keyword matching as fallback with Traditional Chinese responses"""
try:
import re
from datetime import datetime, timedelta
# Check for date range queries
date_pattern = r'(\d{4}[./-]\d{2}[./-]\d{2})\s*[-~]\s*(\d{4}[./-]\d{2}[./-]\d{2})'
date_match = re.search(date_pattern, text)
# Check for brand queries (Traditional Chinese and English)
brands_mapping = {
'neutrogena': 'Neutrogena',
'aveeno': 'Aveeno',
'nivea': 'Nivea',
'loreal': 'Loreal',
'shiseido': 'Shiseido',
'sk-ii': 'SK-II',
'kiehls': 'Kiehls',
'clinique': 'Clinique',
'dior': 'Dior',
'chanel': 'Chanel'
}
found_brands = []
text_lower = text.lower()
for brand_key, brand_name in brands_mapping.items():
if brand_key in text_lower:
found_brands.append(brand_name)
# Enhanced pattern matching with Traditional Chinese responses
if date_match and any(word in text.lower() for word in ['momo', 'limited', 'flash', 'sale']):
start_date = date_match.group(1).replace('/', '-').replace('.', '-')
end_date = date_match.group(2).replace('/', '-').replace('.', '-')
brand_text = f"品牌:{', '.join(found_brands)}" if found_brands else "全部品牌"
await update.message.reply_text(
f"⏳ 正在查詢 {start_date}{end_date} 的促銷業績...\n\n"
f"{brand_text}\n\n"
f"請使用下方選單查看詳細分析:",
reply_markup=self._get_main_menu_keyboard()
)
elif found_brands and any(word in text.lower() for word in ['momo', 'product', 'brand']):
brand_list = ', '.join(found_brands)
await update.message.reply_text(
f"🔍 正在搜尋 {brand_list} 商品...\n\n"
f"請使用下方選單查看詳細品牌分析:",
reply_markup=self._get_main_menu_keyboard()
)
elif any(word in text for word in ['trend', 'popular']):
await update.message.reply_text(
"請選擇功能:",
reply_markup=self._get_main_menu_keyboard()
)
elif any(word in text for word in ['search', 'query']):
context.user_data['waiting_for'] = 'search_query'
await update.message.reply_text(
"🔍 請輸入搜尋關鍵字:",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("❌ 取消", callback_data="menu:main")
]])
)
elif any(word in text for word in ['copy', 'generate']):
context.user_data['waiting_for'] = 'copy_product'
await update.message.reply_text(
"✍️ 請輸入商品名稱:",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("❌ 取消", callback_data="menu:main")
]])
)
else:
await update.message.reply_text(
"🤔 收到您的訊息!請選擇下方功能,或輸入 /help 查看指令說明:",
reply_markup=self._get_main_menu_keyboard()
)
except Exception as e:
logger.error(f"[_enhanced_keyword_matching] 處理失敗: {e}", exc_info=True)
try:
await update.message.reply_text("❌ 處理時發生錯誤,請稍後再試或使用 /help")
except Exception:
pass
def _get_query_suggestions(self, query_type: str) -> list:
"""Get suggestions based on query type (Traditional Chinese)"""
suggestions = {
"sales analysis": [
"查看今日業績表現",
"查看週業績趨勢",
"品類銷售分析",
"與前期對比"
],
"product analysis": [
"今日熱銷商品",
"品牌績效分析",
"商品健康檢查",
"庫存預測"
],
"market intelligence": [
"最新市場動態",
"競品定價分析",
"熱門關鍵字",
"產業洞察"
],
"report generation": [
"每日業績報告",
"週績效摘要",
"競品分析簡報",
"策略規劃報告"
],
"comparative analysis": [
"與競品比較",
"期間對比分析",
"品類績效比較",
"品牌對比分析"
]
}
return suggestions.get(query_type, [
"查看主選單",
"查看業績儀表板",
"商品分析",
"市場情報"
])
async def _process_await_input(self, update, context, await_type: str, text: str):
"""處理所有 await: 狀態的用戶輸入,路由到對應 cmd"""
try:
import re
chat_id = update.effective_chat.id
date_cmd_map = {
'date_range_sales': ('sales', text.strip()),
'date_top': ('top', text.strip()),
'date_competitor': ('ppt', 'competitor ' + text.strip()),
'date_ppt_daily': ('ppt', 'daily ' + text.strip()),
'date_ppt_monthly': ('ppt', 'monthly ' + text.strip()),
'date_ppt_vendor': ('ppt', 'vendor ' + text.strip()),
'date_trend_month': ('trend', text.strip()),
'date_trend_year': ('trend', text.strip()),
'date_trend_quarter':('trend', text.strip()),
'promo_range': ('promo', text.strip()),
'search_compare': ('competitor', text.strip()),
}
goal_types = {
'goal_daily': 'daily',
'goal_monthly': 'monthly',
'goal_quarterly': 'quarterly',
'goal_half': 'half',
'goal_yearly': 'yearly',
}
if await_type in date_cmd_map:
cmd, arg = date_cmd_map[await_type]
await update.message.reply_text(f"⏳ 正在處理 `{text.strip()}`...", parse_mode='Markdown')
import threading as _t
_t.Thread(
target=self._forward_cmd_to_openclaw,
args=(cmd, arg, chat_id),
daemon=True,
).start()
elif await_type in goal_types:
period = goal_types[await_type]
amount_str = re.sub(r'[^\d]', '', text)
if not amount_str:
await update.message.reply_text(
"⚠️ 格式錯誤請輸入純數字例如500000",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回目標管理", callback_data="menu:goals")
]])
)
return
await update.message.reply_text(
f"⏳ 正在設定{period}目標 {int(amount_str):,} 元...",
parse_mode='Markdown'
)
import threading as _t
_t.Thread(
target=self._forward_cmd_to_openclaw,
args=('goal', f'set:{period}:{amount_str}', chat_id),
daemon=True,
).start()
else:
await update.message.reply_text(
"⚠️ 無法識別輸入類型,請重新選擇功能",
reply_markup=self._get_main_menu_keyboard()
)
except Exception as e:
logger.error(f"[_process_await_input] 處理失敗: {e}", exc_info=True)
try:
await update.message.reply_text("❌ 處理時發生錯誤,請稍後再試或使用 /help")
except Exception:
pass
async def _process_search(self, update: Update, query: str):
"""處理搜尋請求"""
await update.message.reply_text(f"🔍 正在搜尋「{query}」...")
try:
from services.trend_crawler_service import get_trend_crawler_service
service = get_trend_crawler_service()
result = service.web_search_with_cache(query, search_type='trends')
if result['success']:
data = result['data']
parsed = data.get('result', {})
summary = parsed.get('summary', '無摘要')
await update.message.reply_text(
f"🔍 *搜尋結果: {query}*\n\n{summary[:500]}",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔍 繼續搜尋", callback_data="menu_search"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
else:
await update.message.reply_text(
f"❌ 搜尋失敗: {result.get('error', '未知錯誤')}",
reply_markup=self._get_main_menu_keyboard()
)
except Exception as e:
logger.error(f"[_process_search] error: {e}", exc_info=True)
await update.message.reply_text(
"❌ 系統錯誤,請稍後再試",
reply_markup=self._get_main_menu_keyboard()
)
async def _process_copy(self, update: Update, product_name: str):
"""處理文案生成請求"""
await update.message.reply_text(f"✍️ 正在為「{product_name}」生成文案...")
try:
from services.ollama_service import OllamaService
ollama = OllamaService()
prompt = f"""請為以下商品生成 3 種風格的行銷文案:
商品: {product_name}
請分別生成:
1. 標準版 (80字內)
2. 活潑版 (含表情符號80字內)
3. 限時版 (強調緊迫感80字內)
以繁體中文回覆,格式簡潔。"""
response = ollama.generate(prompt, temperature=0.7)
if response.success:
await update.message.reply_text(
f"✨ *{product_name} 行銷文案*\n\n{response.content[:1000]}",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("✍️ 再生成一次", callback_data="menu_copy"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
else:
await update.message.reply_text(
f"❌ 生成失敗: {response.error}",
reply_markup=self._get_main_menu_keyboard()
)
except Exception as e:
logger.error(f"[_process_copy] error: {e}", exc_info=True)
await update.message.reply_text(
"❌ 系統錯誤,請稍後再試",
reply_markup=self._get_main_menu_keyboard()
)
# ========== 推播功能 ==========
async def send_message(self, chat_id: int, message: str, parse_mode: str = 'Markdown'):
"""發送訊息到指定聊天室"""
if not self.application or not self.is_running:
logger.warning("Bot 未運行,無法發送訊息")
return False
try:
await self.application.bot.send_message(
chat_id=chat_id,
text=message,
parse_mode=parse_mode
)
return True
except Exception as e:
logger.error(f"發送訊息失敗: {e}")
return False
async def broadcast_trend_alert(self, message: str, category: str = None):
"""推播趨勢警報給所有訂閱用戶"""
from database.trend_models import TelegramUser
from database.manager import get_session
session = get_session()
try:
query = session.query(TelegramUser).filter(
TelegramUser.is_active == True,
TelegramUser.notify_trends == True
)
if category:
query = query.filter(
TelegramUser.preferred_categories.like(f'%{category}%')
)
users = query.all()
sent_count = 0
for user in users:
if await self.send_message(user.telegram_id, message):
sent_count += 1
logger.info(f"趨勢警報已推播給 {sent_count}/{len(users)} 位用戶")
return sent_count
finally:
session.close()
async def send_daily_summary(self):
"""推播每日趨勢摘要給訂閱用戶"""
from database.trend_models import TelegramUser, TrendAnalysis
from database.manager import get_session
session = get_session()
try:
# 取得今日摘要
analysis = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == date.today(),
TrendAnalysis.analysis_type == 'daily_summary'
).first()
if not analysis:
logger.warning("每日摘要推播: 今日尚無分析報告")
return 0
# 組裝訊息
hot_keywords = json.loads(analysis.hot_keywords or '[]')
marketing = json.loads(analysis.marketing_suggestions or '[]')
message = "📰 *MOMO 每日趨勢摘要*\n\n"
message += f"📅 {date.today().strftime('%Y-%m-%d')}\n\n"
message += f"📝 *概況:*\n{analysis.summary[:300]}{'...' if len(analysis.summary) > 300 else ''}\n\n"
if hot_keywords:
message += f"🏷️ *熱門關鍵字:*\n{', '.join(hot_keywords[:10])}\n\n"
if marketing:
message += "💡 *行銷建議:*\n"
for i, m in enumerate(marketing[:3], 1):
message += f"{i}. {m[:60]}...\n"
message += "\n👉 在 Bot 輸入 /daily 查看完整報告"
# 取得訂閱用戶
users = session.query(TelegramUser).filter(
TelegramUser.is_active == True,
TelegramUser.notify_daily_summary == True
).all()
sent_count = 0
for user in users:
if await self.send_message(user.telegram_id, message):
sent_count += 1
logger.info(f"每日摘要已推播給 {sent_count}/{len(users)} 位用戶")
return sent_count
except Exception as e:
logger.error(f"send_daily_summary 失敗: {e}")
return 0
finally:
session.close()
def get_application(self):
"""取得 Application 實例 (供外部啟動使用)"""
if not TELEGRAM_AVAILABLE or not self.token:
return None
if not self.application:
self.application = Application.builder().token(self.token).build()
# 註冊處理器
self.application.add_handler(CommandHandler("start", self.cmd_start))
self.application.add_handler(CommandHandler("help", self.cmd_help))
self.application.add_handler(CommandHandler("menu", self.cmd_start)) # /menu 指令映射到 cmd_start
self.application.add_handler(CommandHandler("trend", self.cmd_trend))
self.application.add_handler(CommandHandler("search", self.cmd_search))
self.application.add_handler(CommandHandler("copy", self.cmd_copy))
self.application.add_handler(CommandHandler("keywords", self.cmd_keywords))
self.application.add_handler(CommandHandler("daily", self.cmd_daily))
self.application.add_handler(CallbackQueryHandler(self.handle_callback))
self.application.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND,
self.handle_message
))
return self.application
# 全域實例
_bot_instance: Optional[TrendTelegramBot] = None
def get_telegram_bot() -> TrendTelegramBot:
"""取得 Telegram Bot 實例"""
global _bot_instance
if _bot_instance is None:
_bot_instance = TrendTelegramBot()
return _bot_instance
def is_telegram_available() -> bool:
"""檢查 Telegram 功能是否可用"""
return TELEGRAM_AVAILABLE and bool(os.getenv('TELEGRAM_BOT_TOKEN'))
# 別名 (供 run_telegram_bot.py 使用)
TelegramBotService = TrendTelegramBot