Files
ewoooc/services/telegram_bot_service.py

1537 lines
62 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Telegram Bot 服務
提供趨勢資料的 Telegram 互動功能
注意: 需要安裝 python-telegram-bot 套件
pip install python-telegram-bot>=20.0
功能:
- 主選單按鈕介面
- 分類選擇按鈕
- 趨勢查詢、AI 搜尋、文案生成
- 每日趨勢摘要推播
"""
import os
import json
import asyncio
import logging
from typing import Optional
from datetime import date, timedelta
logger = logging.getLogger(__name__)
# 嘗試匯入 telegram 模組
try:
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardMarkup, KeyboardButton
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ConversationHandler
TELEGRAM_AVAILABLE = True
except ImportError:
TELEGRAM_AVAILABLE = False
logger.warning("python-telegram-bot 套件未安裝Telegram Bot 功能將無法使用")
# 商品分類列表
CATEGORIES = ['美妝', '3C', '服飾', '居家', '母嬰', '電商', '優惠', '生活', '美食', '熱門']
class TrendTelegramBot:
"""趨勢資料庫 Telegram Bot 服務"""
def __init__(self, token: str = None):
"""
初始化 Bot
Args:
token: Telegram Bot Token (從 BotFather 取得)
"""
self.token = token or os.getenv('TELEGRAM_BOT_TOKEN')
self.application = None
self.is_running = False
if not TELEGRAM_AVAILABLE:
logger.error("Telegram Bot 無法初始化: 缺少 python-telegram-bot 套件")
return
if not self.token:
logger.warning("Telegram Bot Token 未設定")
async def start(self):
"""啟動 Bot"""
if not TELEGRAM_AVAILABLE or not self.token:
logger.error("無法啟動 Telegram Bot")
return False
try:
self.application = Application.builder().token(self.token).build()
# 註冊指令處理器
self.application.add_handler(CommandHandler("start", self.cmd_start))
self.application.add_handler(CommandHandler("help", self.cmd_help))
self.application.add_handler(CommandHandler("trend", self.cmd_trend))
self.application.add_handler(CommandHandler("search", self.cmd_search))
self.application.add_handler(CommandHandler("copy", self.cmd_copy))
self.application.add_handler(CommandHandler("keywords", self.cmd_keywords))
self.application.add_handler(CommandHandler("daily", self.cmd_daily))
# 回調查詢處理器
self.application.add_handler(CallbackQueryHandler(self.handle_callback))
# 一般訊息處理器
self.application.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND,
self.handle_message
))
# 啟動
await self.application.initialize()
await self.application.start()
await self.application.updater.start_polling()
self.is_running = True
logger.info("Telegram Bot 已啟動")
return True
except Exception as e:
logger.error(f"Telegram Bot 啟動失敗: {e}")
return False
async def stop(self):
"""停止 Bot"""
if self.application and self.is_running:
try:
await self.application.updater.stop()
await self.application.stop()
await self.application.shutdown()
self.is_running = False
logger.info("Telegram Bot 已停止")
except Exception as e:
logger.error(f"Telegram Bot 停止失敗: {e}")
# ========== 指令處理器 ==========
def _get_main_menu_keyboard(self):
"""建立主選單按鈕 - 7大功能類別"""
keyboard = [
[
InlineKeyboardButton("📊 業績查詢", callback_data="menu:sales"),
InlineKeyboardButton("🏆 商品廠商", callback_data="menu:products"),
],
[
InlineKeyboardButton("🎯 目標管理", callback_data="menu:goals"),
InlineKeyboardButton("📈 智能分析", callback_data="menu:analysis"),
],
[
InlineKeyboardButton("📄 簡報報表", callback_data="menu:reports"),
InlineKeyboardButton("<EFBFBD> 市場情報", callback_data="menu:market"),
],
[
InlineKeyboardButton("<EFBFBD> 競品日報", callback_data="menu:competitor"),
],
[
InlineKeyboardButton("❓ 使用說明", callback_data="cmd:help"),
],
]
return InlineKeyboardMarkup(keyboard)
def _get_category_keyboard(self, callback_prefix: str = "cat"):
"""建立分類選擇按鈕"""
# 每行 3 個按鈕
keyboard = []
row = []
for i, cat in enumerate(CATEGORIES):
row.append(InlineKeyboardButton(cat, callback_data=f"{callback_prefix}_{cat}"))
if len(row) == 3:
keyboard.append(row)
row = []
if row:
keyboard.append(row)
# 加入返回按鈕
keyboard.append([InlineKeyboardButton("🔙 返回主選單", callback_data="menu_main")])
return InlineKeyboardMarkup(keyboard)
async def cmd_start(self, update: Update, context):
"""開始指令 - 顯示主選單"""
user = update.effective_user
await update.message.reply_text(
f"Hello! I am the MOMO Pro Assistant. How can I help you today?\n\n"
f"Hi {user.first_name}!\n\n"
f"I am *MOMO Trend Assistant Bot*\n"
f"Please select the function to execute:",
reply_markup=self._get_main_menu_keyboard()
)
async def cmd_help(self, update: Update, context):
"""說明指令"""
help_text = """
📖 *指令說明*
*趨勢查詢*
/trend - 查看所有分類熱門趨勢
/trend 美妝 - 查看美妝分類趨勢
*AI 搜尋*
/search 夏季防曬推薦 - AI 搜尋並分析
*文案生成*
/copy 防曬乳 - 為商品生成行銷文案
*關鍵字*
/keywords - 近 7 天熱門關鍵字
*每日摘要*
/daily - 查看今日趨勢摘要
"""
await update.message.reply_text(help_text, parse_mode='Markdown')
async def cmd_trend(self, update: Update, context):
"""趨勢查詢指令"""
category = ' '.join(context.args) if context.args else None
await update.message.reply_text("🔄 正在查詢趨勢資料...")
try:
from database.trend_models import TrendRecord
from database.manager import get_session
session = get_session()
try:
date_from = date.today() - timedelta(days=7)
# 查詢熱門趨勢
query = session.query(TrendRecord).filter(
TrendRecord.trend_date >= date_from
)
if category:
query = query.filter(TrendRecord.category == category)
records = query.order_by(
TrendRecord.popularity_score.desc()
).limit(10).all()
finally:
session.close()
if not records:
await update.message.reply_text(
f"📭 {f'{category}分類' if category else ''}近 7 天沒有趨勢資料"
)
return
# 格式化回覆
title = f"📊 {category if category else '所有分類'}熱門趨勢 (近7天)\n\n"
lines = []
for i, r in enumerate(records, 1):
source_emoji = {
'ptt': '💬',
'dcard': '📱',
'google_news': '📰',
'youtube': '🎬'
}.get(r.source, '📄')
lines.append(
f"{i}. {source_emoji} {r.title[:30]}{'...' if len(r.title) > 30 else ''} "
f"(熱度:{r.popularity_score})"
)
await update.message.reply_text(title + '\n'.join(lines))
except Exception as e:
logger.error(f"[cmd_trend] error: {e}", exc_info=True)
await update.message.reply_text("❌ 系統錯誤,請稍後再試")
async def cmd_search(self, update: Update, context):
"""AI 搜尋指令"""
query = ' '.join(context.args)
if not query:
await update.message.reply_text("❓ 請輸入搜尋關鍵字\n範例: /search 夏季防曬推薦")
return
await update.message.reply_text(f"🔍 正在搜尋「{query}」...")
try:
from services.trend_crawler_service import get_trend_crawler_service
service = get_trend_crawler_service()
result = service.web_search_with_cache(query, search_type='trends')
if result['success']:
data = result['data']
parsed = data.get('result', {})
summary = parsed.get('summary', '無摘要')
results = parsed.get('results', [])[:5]
reply = f"🔍 *搜尋結果: {query}*\n\n"
reply += f"📝 *摘要:*\n{summary}\n\n"
if results:
reply += "*相關資訊:*\n"
for i, r in enumerate(results, 1):
title = r.get('title', '無標題')[:40]
reply += f"{i}. {title}\n"
await update.message.reply_text(reply, parse_mode='Markdown')
else:
await update.message.reply_text(f"❌ 搜尋失敗: {result.get('error', '未知錯誤')}")
except Exception as e:
logger.error(f"[cmd_search] error: {e}", exc_info=True)
await update.message.reply_text("❌ 系統錯誤,請稍後再試")
async def cmd_copy(self, update: Update, context):
"""文案生成指令"""
if not context.args:
await update.message.reply_text("❓ 請輸入商品名稱\n範例: /copy 防曬乳")
return
product_name = context.args[0]
context_hint = ' '.join(context.args[1:]) if len(context.args) > 1 else None
await update.message.reply_text(f"✍️ 正在為「{product_name}」生成文案...")
try:
from services.ollama_service import OllamaService
ollama = OllamaService()
prompt = f"""請為以下商品生成 3 種風格的行銷文案:
商品: {product_name}
{f'情境: {context_hint}' if context_hint else ''}
請分別生成:
1. 標準版 (100字內)
2. 活潑版 (含表情符號100字內)
3. 限時版 (強調緊迫感100字內)
以繁體中文回覆。"""
response = ollama.generate(prompt, temperature=0.7)
if response.success:
await update.message.reply_text(
f"✨ *{product_name} 行銷文案*\n\n{response.content}",
parse_mode='Markdown'
)
else:
await update.message.reply_text(f"❌ 生成失敗: {response.error}")
except Exception as e:
logger.error(f"[cmd_copy] error: {e}", exc_info=True)
await update.message.reply_text("❌ 系統錯誤,請稍後再試")
async def cmd_keywords(self, update: Update, context):
"""熱門關鍵字指令"""
category = ' '.join(context.args) if context.args else None
try:
from database.trend_models import TrendKeyword
from database.manager import get_session
from sqlalchemy import func
session = get_session()
try:
date_from = date.today() - timedelta(days=7)
query = session.query(
TrendKeyword.keyword,
func.sum(TrendKeyword.mention_count).label('total')
).filter(
TrendKeyword.trend_date >= date_from
).group_by(TrendKeyword.keyword)
if category:
query = query.filter(TrendKeyword.category == category)
keywords = query.order_by(
func.sum(TrendKeyword.mention_count).desc()
).limit(20).all()
finally:
session.close()
if not keywords:
await update.message.reply_text("📭 近 7 天沒有熱門關鍵字資料")
return
title = f"🏷️ {category if category else '所有分類'}熱門關鍵字 (近7天)\n\n"
lines = [f"{i}. {kw.keyword} ({kw.total}次)" for i, kw in enumerate(keywords, 1)]
await update.message.reply_text(title + '\n'.join(lines))
except Exception as e:
logger.error(f"[cmd_keywords] error: {e}", exc_info=True)
await update.message.reply_text("❌ 系統錯誤,請稍後再試")
async def cmd_daily(self, update: Update, context):
"""每日趨勢摘要"""
try:
from database.trend_models import TrendAnalysis
from database.manager import get_session
session = get_session()
try:
analysis = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == date.today(),
TrendAnalysis.analysis_type == 'daily_summary'
).first()
finally:
session.close()
if not analysis:
await update.message.reply_text("📭 今日尚無趨勢分析報告")
return
hot_keywords = json.loads(analysis.hot_keywords or '[]')
marketing = json.loads(analysis.marketing_suggestions or '[]')
reply = f"📰 *今日趨勢摘要*\n\n"
reply += f"📝 *概況:*\n{analysis.summary}\n\n"
if hot_keywords:
reply += f"🏷️ *熱門關鍵字:*\n{', '.join(hot_keywords[:10])}\n\n"
if marketing:
reply += f"💡 *行銷建議:*\n"
for i, m in enumerate(marketing[:5], 1):
reply += f"{i}. {m}\n"
await update.message.reply_text(reply, parse_mode='Markdown')
except Exception as e:
logger.error(f"[cmd_daily] error: {e}", exc_info=True)
await update.message.reply_text("❌ 系統錯誤,請稍後再試")
async def handle_callback(self, update: Update, context):
"""處理按鈕回調"""
query = update.callback_query
await query.answer()
data = query.data
# ===== 主選單按鈕 =====
if data == "menu_main" or data == "menu:main":
await query.edit_message_text(
"👋 *MOMO 趨勢助手 Bot* — 請選擇功能類別:",
parse_mode='Markdown',
reply_markup=self._get_main_menu_keyboard()
)
# ===== 新的完整菜單系統 =====
elif data.startswith("menu:"):
await self._handle_main_menu_callback(query, data)
# ===== 舊的簡單菜單(向下相容) =====
elif data == "menu_trend":
await query.edit_message_text(
"📊 *熱門趨勢*\n\n請選擇分類:",
parse_mode='Markdown',
reply_markup=self._get_category_keyboard("trend")
)
elif data == "menu_search":
context.user_data['waiting_for'] = 'search_query'
await query.edit_message_text(
"🔍 *AI 搜尋*\n\n請輸入要搜尋的關鍵字:\n\n"
"例如:夏季防曬推薦、母親節禮物",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回主選單", callback_data="menu:main")
]])
)
elif data == "menu_copy":
context.user_data['waiting_for'] = 'copy_product'
await query.edit_message_text(
"✍️ *生成文案*\n\n請輸入商品名稱:\n\n"
"例如:防曬乳、保濕面膜",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回主選單", callback_data="menu:main")
]])
)
elif data == "menu_keywords":
await query.edit_message_text(
"🏷️ *熱門關鍵字*\n\n請選擇分類:",
parse_mode='Markdown',
reply_markup=self._get_category_keyboard("keywords")
)
elif data == "menu_daily":
await query.edit_message_text("📰 正在載入今日趨勢摘要...")
await self._show_daily_summary(query)
elif data == "menu_settings":
await self._show_settings(query)
# ===== 趨勢分類按鈕 =====
elif data.startswith("trend_"):
category = data[6:]
await query.edit_message_text(f"🔄 正在查詢 {category} 趨勢...")
await self._show_trend_by_category(query, category)
# ===== 關鍵字分類按鈕 =====
elif data.startswith("keywords_"):
category = data[9:]
await query.edit_message_text(f"🔄 正在查詢 {category} 熱門關鍵字...")
await self._show_keywords_by_category(query, category)
# ===== 設定按鈕 =====
elif data.startswith("settings_"):
await self._handle_settings_callback(query, data)
# ===== 降價決策按鈕(僅支援 momo:pa:xxx / momo:pr:xxx 格式)=====
elif data.startswith("momo:pa:"):
await self._handle_price_approve(query, data.split(":")[-1])
elif data.startswith("momo:pr:"):
await self._handle_price_reject(query, data.split(":")[-1])
# ===== L3 運維決策按鈕momo:ops:<action>:<task_name>=====
elif data.startswith("momo:ops:"):
await self._handle_ops_callback(query, data)
# ===== OpenClaw 指令按鈕cmd:<cmd>:<arg>=====
elif data.startswith("cmd:"):
parts = data[4:].split(":", 1)
cmd = parts[0]
arg = parts[1] if len(parts) > 1 else ""
chat_id = query.message.chat_id
import threading as _t
_t.Thread(
target=self._forward_cmd_to_openclaw,
args=(cmd, arg, chat_id),
daemon=True,
).start()
def _forward_cmd_to_openclaw(self, cmd: str, arg: str, chat_id: int):
"""轉發 cmd:* 指令到 OpenClaw Flask 內部 API"""
import requests as _req
try:
internal_url = os.getenv("OPENCLAW_INTERNAL_URL", "http://momo-pro-system:80")
token = os.getenv("INTERNAL_WEBHOOK_TOKEN", "")
_req.post(
f"{internal_url}/bot/internal/cmd",
json={"chat_id": chat_id, "cmd": cmd, "arg": arg},
headers={"X-Internal-Token": token},
timeout=10,
)
except Exception as e:
logger.warning(f"[TelegramBot] forward cmd failed: {e}")
async def _handle_main_menu_callback(self, query, data: str):
"""處理主選單回調 - 完整功能菜單系統"""
key = data[5:] # 移除 'menu:' 前綴
titles = {
'sales': '📊 *業績查詢* — 選擇日期或直接輸入',
'products': '🏆 *商品廠商* — 選擇查詢範圍',
'goals': '🎯 *目標管理* — 查看或設定業績目標',
'analysis': '📈 *智能分析* — 選擇分析類型',
'reports': '📄 *簡報報表* — 選擇報告類型',
'market': '🌐 *市場情報* — 即時資訊',
'competitor': '📊 *競品比價日報* — 選擇分析日期',
'competitor_ppt': '📄 *競品比價簡報* — 選擇時間範圍',
'category': '🗂 *分類業績鑽取* — 點選分類深入分析',
'trend': '📈 *業績趨勢* — 選擇時間範圍',
}
# 生成子選單
if key == 'sales':
keyboard = self._get_sales_submenu()
elif key == 'products':
keyboard = self._get_products_submenu()
elif key == 'goals':
keyboard = self._get_goals_submenu()
elif key == 'analysis':
keyboard = self._get_analysis_submenu()
elif key == 'reports':
keyboard = self._get_reports_submenu()
elif key == 'market':
keyboard = self._get_market_submenu()
elif key == 'competitor':
keyboard = self._get_competitor_submenu()
elif key == 'competitor_ppt':
keyboard = self._get_competitor_ppt_submenu()
elif key == 'category':
keyboard = self._get_category_submenu()
elif key == 'trend':
keyboard = self._get_trend_submenu()
else:
# 未知選單,返回主選單
keyboard = self._get_main_menu_keyboard()
key = 'main'
titles[key] = '👋 *MOMO 趨勢助手 Bot* — 請選擇功能類別'
await query.edit_message_text(
titles.get(key, '請選擇'),
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup(keyboard)
)
def _get_sales_submenu(self):
"""業績查詢子選單"""
from datetime import datetime, timedelta
today = datetime.now().strftime('%Y/%m/%d')
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y/%m/%d')
current_month = datetime.now().strftime('%Y/%m')
return [
[{'text': f'📊 今日 ({today[-5:]})', 'callback_data': 'cmd:sales:' + today},
{'text': f'⬅ 昨日 ({yesterday[-5:]})', 'callback_data': 'cmd:sales:' + yesterday}],
[{'text': '📅 每週業績', 'callback_data': 'cmd:trend:week'},
{'text': '📅 每月業績', 'callback_data': 'cmd:history:' + current_month}],
[{'text': '📅 每季業績', 'callback_data': 'cmd:trend:quarter'},
{'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}],
[{'text': '📈 趨勢分析', 'callback_data': 'menu:trend'},
{'text': '🔄 同期比較', 'callback_data': 'cmd:compare:' + today}],
[{'text': '🗂 分類業績', 'callback_data': 'cmd:category:' + today},
{'text': '📅 日期/區間', 'callback_data': 'await:date_range_sales'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_products_submenu(self):
"""商品廠商子選單"""
from datetime import datetime, timedelta
today = datetime.now().strftime('%Y/%m/%d')
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y/%m/%d')
return [
[{'text': f'🏆 熱銷商品 ({today[-5:]})', 'callback_data': 'cmd:top:' + today},
{'text': f'🏭 熱銷廠商 ({today[-5:]})', 'callback_data': 'cmd:vendor:' + today}],
[{'text': f'⬅ 昨日商品 ({yesterday[-5:]})', 'callback_data': 'cmd:top:' + yesterday},
{'text': '🧬 商品健康', 'callback_data': 'cmd:health:' + today}],
[{'text': '📦 補貨預測', 'callback_data': 'cmd:restock'},
{'text': '🗂 分類鑽取', 'callback_data': 'menu:category'}],
[{'text': '📅 指定日期', 'callback_data': 'await:date_top'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_goals_submenu(self):
"""目標管理子選單"""
return [
[{'text': '📋 查看達成率', 'callback_data': 'cmd:goal'},
{'text': '📊 日目標設定', 'callback_data': 'await:goal_daily'}],
[{'text': '📅 月目標設定', 'callback_data': 'await:goal_monthly'},
{'text': '📊 季目標設定', 'callback_data': 'await:goal_quarterly'}],
[{'text': '📈 半年目標設定', 'callback_data': 'await:goal_half'},
{'text': '📊 年目標設定', 'callback_data': 'await:goal_yearly'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_analysis_submenu(self):
"""智能分析子選單"""
from datetime import datetime
today = datetime.now().strftime('%Y/%m/%d')
return [
[{'text': '🎲 策略矩陣', 'callback_data': 'cmd:strategy:' + today},
{'text': '📈 業績趨勢', 'callback_data': 'menu:trend'}],
[{'text': '🧬 商品健康', 'callback_data': 'cmd:health:' + today},
{'text': '🗂 分類業績', 'callback_data': 'cmd:category:' + today}],
[{'text': '🎉 促銷追蹤', 'callback_data': 'await:promo_range'},
{'text': '📦 補貨預測', 'callback_data': 'cmd:restock'}],
[{'text': '📊 趨勢圖表', 'callback_data': 'cmd:chart'},
{'text': '🔄 同期比較', 'callback_data': 'cmd:compare:' + today}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_reports_submenu(self):
"""簡報報表子選單"""
return [
[{'text': '📄 日報', 'callback_data': 'cmd:ppt:daily'},
{'text': '📊 週報', 'callback_data': 'cmd:ppt:weekly'}],
[{'text': '📈 月報', 'callback_data': 'cmd:ppt:monthly'},
{'text': '🧩 策略(季)', 'callback_data': 'cmd:ppt:strategy quarterly'}],
[{'text': '🧩 策略(年)', 'callback_data': 'cmd:ppt:strategy yearly'},
{'text': '🎉 促銷效益簡報', 'callback_data': 'await:promo_range'}],
[{'text': '🔍 競品比較', 'callback_data': 'menu:competitor'},
{'text': '📈 成長趨勢報告', 'callback_data': 'cmd:ppt:growth'}],
[{'text': '🏭 廠商業績報告', 'callback_data': 'cmd:ppt:vendor'},
{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_market_submenu(self):
"""市場情報子選單 - 完整版本"""
return [
[{'text': '📰 電商新聞', 'callback_data': 'cmd:news'},
{'text': '🌤 台北天氣', 'callback_data': 'cmd:weather'}],
[{'text': '<EFBFBD> Google熱搜', 'callback_data': 'cmd:trends'},
{'text': '<EFBFBD> Dcard口碑', 'callback_data': 'cmd:dcard'}],
[{'text': '💱 台銀匯率', 'callback_data': 'cmd:exchange'},
{'text': '📅 電商節慶', 'callback_data': 'cmd:calendar'}],
[{'text': '▶️ YouTube爆紅商品', 'callback_data': 'cmd:youtube'},
{'text': '🧠 AI學習狀態', 'callback_data': 'cmd:learn'}],
[{'text': '🔍 關鍵字比價', 'callback_data': 'await:search_compare'},
{'text': '<EFBFBD> 圖片比價說明', 'callback_data': 'cmd:photo_search_help'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_competitor_submenu(self):
"""競品日報第二層:所有選項直接產 PPT"""
from datetime import datetime, timedelta
today = datetime.now()
yesterday = today - timedelta(days=1)
td_str = today.strftime('%Y/%m/%d')
yd_str = yesterday.strftime('%Y/%m/%d')
td_label = today.strftime('%m/%d')
yd_label = yesterday.strftime('%m/%d')
return [
[{'text': f'📊 今日簡報 ({td_label})', 'callback_data': f'cmd:ppt:competitor {td_str}'},
{'text': f'<EFBFBD> 昨日簡報 ({yd_label})', 'callback_data': f'cmd:ppt:competitor {yd_str}'}],
[{'text': '📈 本週比較', 'callback_data': 'cmd:ppt:competitor weekly'},
{'text': '📆 本月比較', 'callback_data': 'cmd:ppt:competitor monthly'}],
[{'text': '🗃 本季比較', 'callback_data': 'cmd:ppt:competitor quarterly'},
{'text': '<EFBFBD> 指定日期', 'callback_data': 'await:date_competitor'}],
[{'text': '📄 更多週期 →', 'callback_data': 'menu:competitor_ppt'}],
[{'text': '← 返回主選單', 'callback_data': 'menu:main'}],
]
def _get_competitor_ppt_submenu(self):
"""競品 PPT 長週期選單(第三層)— 半年/年;日/週/月/季已在第二層"""
return [
[{'text': '📆 半年比較', 'callback_data': 'cmd:ppt:competitor half'},
{'text': '<EFBFBD> 年比較', 'callback_data': 'cmd:ppt:competitor yearly'}],
[{'text': '← 返回競品日報', 'callback_data': 'menu:competitor'}],
]
def _get_category_submenu(self):
"""分類業績鑽取 — 顯示 L1 固定分類按鈕"""
from datetime import datetime
today = datetime.now().strftime('%Y/%m/%d')
CATS = [
('美妝保養', '💄'), ('3C家電', '📱'), ('服飾配件', '👕'),
('居家生活', '🏠'), ('母嬰用品', '🍼'), ('生鮮食品', '🥗'),
('圖書文具', '📚'), ('戶外運動', ''), ('餐券票券', '🎫'),
('醫療保健', '💊'), ('美體保健', '💆'), ('寵物用品', '🐕'),
('箱包精品', '👜'), ('車類百貨', '🚗'), ('情趣用品', '❤️'),
]
rows = []
for i in range(0, len(CATS), 2):
pair = []
for cat, icon in CATS[i:i+2]:
pair.append({'text': f'{icon} {cat}', 'callback_data': f'cmd:catdetail:{cat}:{today}'})
rows.append(pair)
rows.append([{'text': '🗂 全分類清單', 'callback_data': f'cmd:category:{today}'}])
rows.append([{'text': '← 返回主選單', 'callback_data': 'menu:main'}])
return rows
def _get_trend_submenu(self):
"""業績趨勢子選單"""
return [
[{'text': '📅 近7日', 'callback_data': 'cmd:trend:7'},
{'text': '📅 近1個月', 'callback_data': 'cmd:trend:month'}],
[{'text': '📅 近3個月', 'callback_data': 'cmd:trend:quarter'},
{'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}],
[{'text': '📅 近1年', 'callback_data': 'cmd:trend:year'},
{'text': '📅 指定月份', 'callback_data': 'await:date_trend_month'}],
[{'text': '📅 指定年份', 'callback_data': 'await:date_trend_year'},
{'text': '📅 指定季度', 'callback_data': 'await:date_trend_quarter'}],
[{'text': '← 返回業績查詢', 'callback_data': 'menu:sales'}],
]
async def _handle_price_approve(self, query, insight_id_str: str):
"""批准降價:寫 KM feedback + 移除按鈕"""
from services.openclaw_learning_service import store_insight
from datetime import date as date_cls
try:
insight_id = int(insight_id_str)
except ValueError:
await query.answer("無效的決策 ID", show_alert=True)
return
user = query.from_user
operator = user.full_name or f"id_{user.id}"
store_insight(
insight_type="price_decision_feedback",
content=f"管理員批准降價建議source_insight_id={insight_id}",
period=date_cls.today().isoformat(),
metadata={
"decision": "approve",
"source_insight_id": insight_id,
"operator": operator,
"operator_tg_id": user.id,
}
)
from services.telegram_templates import decision_result
await query.edit_message_text(
decision_result(query.message.text or "", "approve", operator),
parse_mode='HTML'
)
async def _handle_price_reject(self, query, insight_id_str: str):
"""拒絕降價:寫 KM 訓練保守策略 + 移除按鈕"""
from services.openclaw_learning_service import store_insight
from datetime import date as date_cls
try:
insight_id = int(insight_id_str)
except ValueError:
await query.answer("無效的決策 ID", show_alert=True)
return
user = query.from_user
operator = user.full_name or f"id_{user.id}"
store_insight(
insight_type="price_decision_feedback",
content=f"管理員拒絕降價建議source_insight_id={insight_id}),訓練保守策略",
period=date_cls.today().isoformat(),
metadata={
"decision": "reject",
"source_insight_id": insight_id,
"operator": operator,
"operator_tg_id": user.id,
}
)
from services.telegram_templates import decision_result
await query.edit_message_text(
decision_result(
query.message.text or "", "reject", operator,
note="已記錄為保守策略訓練資料"
),
parse_mode='HTML'
)
async def _handle_ops_callback(self, query, data: str):
"""
L3 運維決策 callbackADR-012 Phase 4
callback_data 格式momo:ops:<action>:<task_name>
actions: pause1h / pause6h / retry / resume
"""
from services.agent_actions import OPS_ACTIONS
from services.telegram_templates import ops_action_result
try:
_, _, action, task_name = data.split(":", 3)
except ValueError:
await query.answer("無效的 ops callback 格式", show_alert=True)
return
user = query.from_user
operator = user.full_name or f"id_{user.id}"
# action → OPS_ACTIONS 呼叫對應
action_map = {
"pause1h": ("pause_task", {"task_name": task_name, "duration_min": 60}),
"pause6h": ("pause_task", {"task_name": task_name, "duration_min": 360}),
"retry": ("force_retry_now", {"task_name": task_name}),
"resume": ("resume_task", {"task_name": task_name}),
}
mapped = action_map.get(action)
if mapped is None:
await query.answer(f"未知 ops action: {action}", show_alert=True)
return
fn_name, params = mapped
fn = OPS_ACTIONS.get(fn_name)
if fn is None:
await query.answer(f"OPS_ACTIONS 未定義 {fn_name}", show_alert=True)
return
params["operator"] = operator
try:
result = fn(**params)
except Exception as e:
result = {"status": "error", "error": str(e)[:200]}
logger.error(f"[TelegramBot] ops {action} 例外:{e}")
await query.edit_message_text(
ops_action_result(query.message.text or "", action, operator, result),
parse_mode='HTML'
)
async def _show_trend_by_category(self, query, category: str):
"""顯示指定分類的趨勢"""
try:
from database.trend_models import TrendRecord
from database.manager import get_session
session = get_session()
try:
date_from = date.today() - timedelta(days=7)
records = session.query(TrendRecord).filter(
TrendRecord.trend_date >= date_from,
TrendRecord.category == category
).order_by(
TrendRecord.popularity_score.desc()
).limit(10).all()
finally:
session.close()
if not records:
await query.edit_message_text(
f"📭 {category} 分類近 7 天沒有趨勢資料",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回選擇分類", callback_data="menu_trend"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
return
lines = [f"📊 *{category} 熱門趨勢* (近7天)\n"]
for i, r in enumerate(records, 1):
source_emoji = {'ptt': '💬', 'dcard': '📱', 'google_news': '📰', 'youtube': '🎬'}.get(r.source, '📄')
title = r.title[:25] + '...' if len(r.title) > 25 else r.title
lines.append(f"{i}. {source_emoji} {title} ({r.popularity_score})")
await query.edit_message_text(
'\n'.join(lines),
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔄 重新整理", callback_data=f"trend_{category}"),
InlineKeyboardButton("📊 其他分類", callback_data="menu_trend")
], [
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
except Exception as e:
logger.error(f"[_show_trend_by_category] error: {e}", exc_info=True)
await query.edit_message_text(
"❌ 系統錯誤,請稍後再試",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回", callback_data="menu_main")
]])
)
async def _show_keywords_by_category(self, query, category: str):
"""顯示指定分類的熱門關鍵字"""
try:
from database.trend_models import TrendKeyword
from database.manager import get_session
from sqlalchemy import func
session = get_session()
try:
date_from = date.today() - timedelta(days=7)
keywords = session.query(
TrendKeyword.keyword,
func.sum(TrendKeyword.mention_count).label('total')
).filter(
TrendKeyword.trend_date >= date_from,
TrendKeyword.category == category
).group_by(TrendKeyword.keyword).order_by(
func.sum(TrendKeyword.mention_count).desc()
).limit(15).all()
finally:
session.close()
if not keywords:
await query.edit_message_text(
f"📭 {category} 分類近 7 天沒有熱門關鍵字",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回選擇分類", callback_data="menu_keywords"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
return
lines = [f"🏷️ *{category} 熱門關鍵字* (近7天)\n"]
for i, kw in enumerate(keywords, 1):
lines.append(f"{i}. {kw.keyword} ({kw.total}次)")
await query.edit_message_text(
'\n'.join(lines),
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔄 重新整理", callback_data=f"keywords_{category}"),
InlineKeyboardButton("🏷️ 其他分類", callback_data="menu_keywords")
], [
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
except Exception as e:
logger.error(f"[_show_keywords_by_category] error: {e}", exc_info=True)
await query.edit_message_text(
"❌ 系統錯誤,請稍後再試",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回", callback_data="menu_main")
]])
)
async def _show_daily_summary(self, query):
"""顯示每日趨勢摘要"""
try:
from database.trend_models import TrendAnalysis
from database.manager import get_session
session = get_session()
try:
analysis = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == date.today(),
TrendAnalysis.analysis_type == 'daily_summary'
).first()
finally:
session.close()
if not analysis:
await query.edit_message_text(
"📭 今日尚無趨勢分析報告\n\n趨勢爬蟲每 2 小時執行一次",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔄 重新載入", callback_data="menu_daily"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
return
hot_keywords = json.loads(analysis.hot_keywords or '[]')
marketing = json.loads(analysis.marketing_suggestions or '[]')
lines = ["📰 *今日趨勢摘要*\n"]
lines.append(f"📝 *概況:*\n{analysis.summary[:200]}...\n" if len(analysis.summary) > 200 else f"📝 *概況:*\n{analysis.summary}\n")
if hot_keywords:
lines.append(f"🏷️ *熱門關鍵字:*\n{', '.join(hot_keywords[:8])}\n")
if marketing:
lines.append("💡 *行銷建議:*")
for i, m in enumerate(marketing[:3], 1):
lines.append(f"{i}. {m[:50]}...")
await query.edit_message_text(
'\n'.join(lines),
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔄 重新載入", callback_data="menu_daily"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
except Exception as e:
logger.error(f"[_show_daily_summary] error: {e}", exc_info=True)
await query.edit_message_text(
"❌ 系統錯誤,請稍後再試",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔙 返回", callback_data="menu_main")
]])
)
async def _show_settings(self, query):
"""顯示設定選單"""
keyboard = [
[InlineKeyboardButton("🔔 開啟趨勢通知", callback_data="settings_notify_on")],
[InlineKeyboardButton("🔕 關閉趨勢通知", callback_data="settings_notify_off")],
[InlineKeyboardButton("📊 訂閱每日摘要", callback_data="settings_daily_on")],
[InlineKeyboardButton("📭 取消每日摘要", callback_data="settings_daily_off")],
[InlineKeyboardButton("🏠 主選單", callback_data="menu_main")],
]
await query.edit_message_text(
"⚙️ *設定*\n\n請選擇要調整的設定:",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup(keyboard)
)
async def _handle_settings_callback(self, query, data: str):
"""處理設定相關的回調"""
from database.trend_models import TelegramUser
from database.manager import get_session
user_id = query.from_user.id
session = get_session()
try:
# 取得或建立用戶記錄
tg_user = session.query(TelegramUser).filter(
TelegramUser.telegram_id == user_id
).first()
if not tg_user:
tg_user = TelegramUser(
telegram_id=user_id,
telegram_username=query.from_user.username,
display_name=query.from_user.first_name,
is_active=True
)
session.add(tg_user)
# 處理設定變更
if data == "settings_notify_on":
tg_user.notify_trends = True
msg = "✅ 已開啟趨勢通知"
elif data == "settings_notify_off":
tg_user.notify_trends = False
msg = "🔕 已關閉趨勢通知"
elif data == "settings_daily_on":
tg_user.notify_daily_summary = True
msg = "✅ 已訂閱每日摘要 (每天 09:00 發送)"
elif data == "settings_daily_off":
tg_user.notify_daily_summary = False
msg = "📭 已取消每日摘要訂閱"
else:
msg = "❓ 未知的設定"
session.commit()
await query.edit_message_text(
f"{msg}\n\n目前設定:\n"
f"🔔 趨勢通知: {'開啟' if tg_user.notify_trends else '關閉'}\n"
f"📰 每日摘要: {'訂閱中' if tg_user.notify_daily_summary else '未訂閱'}",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("⚙️ 返回設定", callback_data="menu_settings"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
except Exception as e:
logger.error(f"[_handle_settings_callback] error: {e}", exc_info=True)
session.rollback()
await query.edit_message_text("❌ 系統錯誤,請稍後再試")
finally:
session.close()
async def handle_message(self, update: Update, context):
"""Enhanced natural language processing with AI integration"""
text = update.message.text
waiting_for = context.user_data.get('waiting_for')
# Handle waiting states
if waiting_for == 'search_query':
context.user_data['waiting_for'] = None
await self._process_search(update, text)
return
if waiting_for == 'copy_product':
context.user_data['waiting_for'] = None
await self._process_copy(update, text)
return
# Enhanced natural language processing with AI integration
try:
# Import AI integration
from telegram_ai_integration import process_telegram_query
# Process with AI
user_id = update.effective_user.id
chat_id = update.effective_chat.id
ai_result = await process_telegram_query(text, user_id, chat_id)
if ai_result.get('success', False):
response_type = ai_result.get('type', 'simple_response')
if response_type == 'complex_response':
# Handle complex queries
await self._handle_complex_ai_response(update, ai_result)
else:
# Handle simple responses
await self._handle_simple_ai_response(update, ai_result)
else:
# Fallback to enhanced keyword matching
await self._enhanced_keyword_matching(update, text)
except Exception as e:
logger.error(f"[handle_message] AI processing error: {e}", exc_info=True)
# Fallback to enhanced keyword matching
await self._enhanced_keyword_matching(update, text)
async def _handle_complex_ai_response(self, update: Update, ai_result: dict):
"""Handle complex AI responses"""
response_text = ai_result.get('response_text', 'Processing your request...')
# Show processing message
await update.message.reply_text(
f"Processing your request...\n\n{response_text}",
parse_mode='Markdown'
)
# For now, provide helpful guidance
query_type = ai_result.get('query_type', 'general query')
suggestions = self._get_query_suggestions(query_type)
await update.message.reply_text(
f"Based on your {query_type}, I recommend:\n\n" +
"\n".join([f"· {s}" for s in suggestions]) +
"\n\nOr use the menu for direct access:",
reply_markup=self._get_main_menu_keyboard()
)
async def _handle_simple_ai_response(self, update: Update, ai_result: dict):
"""Handle simple AI responses"""
response_text = ai_result.get('response_text', 'How can I help you?')
suggestions = ai_result.get('suggestions', [])
show_menu = ai_result.get('show_menu', False)
if suggestions and not show_menu:
# Show suggestions as buttons
keyboard = []
for i, suggestion in enumerate(suggestions):
if i % 2 == 0:
keyboard.append([])
keyboard[-1].append({
'text': suggestion,
'callback_data': f'cmd:suggestion:{suggestion.lower().replace(" ", "_")}'
})
# Add main menu button
keyboard.append([{'text': 'Main Menu', 'callback_data': 'menu:main'}])
await update.message.reply_text(
response_text,
reply_markup=InlineKeyboardMarkup(keyboard)
)
else:
# Show with main menu
await update.message.reply_text(
response_text,
reply_markup=self._get_main_menu_keyboard()
)
async def _enhanced_keyword_matching(self, update: Update, text: str):
"""Enhanced keyword matching as fallback with Traditional Chinese responses"""
import re
from datetime import datetime, timedelta
# Check for date range queries
date_pattern = r'(\d{4}[./-]\d{2}[./-]\d{2})\s*[-~]\s*(\d{4}[./-]\d{2}[./-]\d{2})'
date_match = re.search(date_pattern, text)
# Check for brand queries (Traditional Chinese and English)
brands_mapping = {
'neutrogena': 'Neutrogena',
'aveeno': 'Aveeno',
'nivea': 'Nivea',
'loreal': 'Loreal',
'shiseido': 'Shiseido',
'sk-ii': 'SK-II',
'kiehls': 'Kiehls',
'clinique': 'Clinique',
'dior': 'Dior',
'chanel': 'Chanel'
}
found_brands = []
text_lower = text.lower()
for brand_key, brand_name in brands_mapping.items():
if brand_key in text_lower:
found_brands.append(brand_name)
# Enhanced pattern matching with Traditional Chinese responses
if date_match and any(word in text.lower() for word in ['momo', 'limited', 'flash', 'sale']):
start_date = date_match.group(1).replace('/', '-').replace('.', '-')
end_date = date_match.group(2).replace('/', '-').replace('.', '-')
brand_text = f"Brands found: {', '.join(found_brands)}" if found_brands else "All brands"
await update.message.reply_text(
f"Processing Momo flash sale query for {start_date} to {end_date}...\n\n"
f"Brands found: {brand_text}\n\n"
f"Please use the menu options for detailed analysis:",
reply_markup=self._get_main_menu_keyboard()
)
elif found_brands and any(word in text.lower() for word in ['momo', 'product', 'brand']):
brand_list = ', '.join(found_brands)
await update.message.reply_text(
f"Searching for {brand_list} products...\n\n"
f"Use the menu options for detailed brand analysis:",
reply_markup=self._get_main_menu_keyboard()
)
elif any(word in text for word in ['trend', 'popular', 'trend']):
await update.message.reply_text(
"Please select function:",
reply_markup=self._get_main_menu_keyboard()
)
elif any(word in text for word in ['search', 'query', 'search']):
context.user_data['waiting_for'] = 'search_query'
await update.message.reply_text(
"Please enter search keywords:",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("Cancel", callback_data="menu_main")
]])
)
elif any(word in text for word in ['copy', 'generate', 'copy']):
context.user_data['waiting_for'] = 'copy_product'
await update.message.reply_text(
"Please enter product name:",
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("Cancel", callback_data="menu_main")
]])
)
else:
await update.message.reply_text(
"I'm analyzing your request. Please select a function or use /help for commands:",
reply_markup=self._get_main_menu_keyboard()
)
def _get_query_suggestions(self, query_type: str) -> list:
"""Get suggestions based on query type (Traditional Chinese)"""
suggestions = {
"sales analysis": [
"Check today's sales performance",
"View weekly sales trend",
"Sales by category analysis",
"Compare with previous period"
],
"product analysis": [
"Top selling products today",
"Brand performance analysis",
"Product health check",
"Inventory forecast"
],
"market intelligence": [
"Latest market news",
"Competitor pricing analysis",
"Trending keywords",
"Industry insights"
],
"report generation": [
"Daily sales report",
"Weekly performance summary",
"Competitive analysis PPT",
"Strategic planning report"
],
"comparative analysis": [
"Compare with competitors",
"Period over period comparison",
"Category performance comparison",
"Brand vs brand analysis"
]
}
return suggestions.get(query_type, [
"View main menu options",
"Check sales dashboard",
"Product analysis",
"Market intelligence"
])
async def _process_search(self, update: Update, query: str):
"""處理搜尋請求"""
await update.message.reply_text(f"🔍 正在搜尋「{query}」...")
try:
from services.trend_crawler_service import get_trend_crawler_service
service = get_trend_crawler_service()
result = service.web_search_with_cache(query, search_type='trends')
if result['success']:
data = result['data']
parsed = data.get('result', {})
summary = parsed.get('summary', '無摘要')
await update.message.reply_text(
f"🔍 *搜尋結果: {query}*\n\n{summary[:500]}",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("🔍 繼續搜尋", callback_data="menu_search"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
else:
await update.message.reply_text(
f"❌ 搜尋失敗: {result.get('error', '未知錯誤')}",
reply_markup=self._get_main_menu_keyboard()
)
except Exception as e:
logger.error(f"[_process_search] error: {e}", exc_info=True)
await update.message.reply_text(
"❌ 系統錯誤,請稍後再試",
reply_markup=self._get_main_menu_keyboard()
)
async def _process_copy(self, update: Update, product_name: str):
"""處理文案生成請求"""
await update.message.reply_text(f"✍️ 正在為「{product_name}」生成文案...")
try:
from services.ollama_service import OllamaService
ollama = OllamaService()
prompt = f"""請為以下商品生成 3 種風格的行銷文案:
商品: {product_name}
請分別生成:
1. 標準版 (80字內)
2. 活潑版 (含表情符號80字內)
3. 限時版 (強調緊迫感80字內)
以繁體中文回覆,格式簡潔。"""
response = ollama.generate(prompt, temperature=0.7)
if response.success:
await update.message.reply_text(
f"✨ *{product_name} 行銷文案*\n\n{response.content[:1000]}",
parse_mode='Markdown',
reply_markup=InlineKeyboardMarkup([[
InlineKeyboardButton("✍️ 再生成一次", callback_data="menu_copy"),
InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
]])
)
else:
await update.message.reply_text(
f"❌ 生成失敗: {response.error}",
reply_markup=self._get_main_menu_keyboard()
)
except Exception as e:
logger.error(f"[_process_copy] error: {e}", exc_info=True)
await update.message.reply_text(
"❌ 系統錯誤,請稍後再試",
reply_markup=self._get_main_menu_keyboard()
)
# ========== 推播功能 ==========
async def send_message(self, chat_id: int, message: str, parse_mode: str = 'Markdown'):
"""發送訊息到指定聊天室"""
if not self.application or not self.is_running:
logger.warning("Bot 未運行,無法發送訊息")
return False
try:
await self.application.bot.send_message(
chat_id=chat_id,
text=message,
parse_mode=parse_mode
)
return True
except Exception as e:
logger.error(f"發送訊息失敗: {e}")
return False
async def broadcast_trend_alert(self, message: str, category: str = None):
"""推播趨勢警報給所有訂閱用戶"""
from database.trend_models import TelegramUser
from database.manager import get_session
session = get_session()
try:
query = session.query(TelegramUser).filter(
TelegramUser.is_active == True,
TelegramUser.notify_trends == True
)
if category:
query = query.filter(
TelegramUser.preferred_categories.like(f'%{category}%')
)
users = query.all()
sent_count = 0
for user in users:
if await self.send_message(user.telegram_id, message):
sent_count += 1
logger.info(f"趨勢警報已推播給 {sent_count}/{len(users)} 位用戶")
return sent_count
finally:
session.close()
async def send_daily_summary(self):
"""推播每日趨勢摘要給訂閱用戶"""
from database.trend_models import TelegramUser, TrendAnalysis
from database.manager import get_session
session = get_session()
try:
# 取得今日摘要
analysis = session.query(TrendAnalysis).filter(
TrendAnalysis.analysis_date == date.today(),
TrendAnalysis.analysis_type == 'daily_summary'
).first()
if not analysis:
logger.warning("每日摘要推播: 今日尚無分析報告")
return 0
# 組裝訊息
hot_keywords = json.loads(analysis.hot_keywords or '[]')
marketing = json.loads(analysis.marketing_suggestions or '[]')
message = "📰 *MOMO 每日趨勢摘要*\n\n"
message += f"📅 {date.today().strftime('%Y-%m-%d')}\n\n"
message += f"📝 *概況:*\n{analysis.summary[:300]}{'...' if len(analysis.summary) > 300 else ''}\n\n"
if hot_keywords:
message += f"🏷️ *熱門關鍵字:*\n{', '.join(hot_keywords[:10])}\n\n"
if marketing:
message += "💡 *行銷建議:*\n"
for i, m in enumerate(marketing[:3], 1):
message += f"{i}. {m[:60]}...\n"
message += "\n👉 在 Bot 輸入 /daily 查看完整報告"
# 取得訂閱用戶
users = session.query(TelegramUser).filter(
TelegramUser.is_active == True,
TelegramUser.notify_daily_summary == True
).all()
sent_count = 0
for user in users:
if await self.send_message(user.telegram_id, message):
sent_count += 1
logger.info(f"每日摘要已推播給 {sent_count}/{len(users)} 位用戶")
return sent_count
except Exception as e:
logger.error(f"send_daily_summary 失敗: {e}")
return 0
finally:
session.close()
def get_application(self):
"""取得 Application 實例 (供外部啟動使用)"""
if not TELEGRAM_AVAILABLE or not self.token:
return None
if not self.application:
self.application = Application.builder().token(self.token).build()
# 註冊處理器
self.application.add_handler(CommandHandler("start", self.cmd_start))
self.application.add_handler(CommandHandler("help", self.cmd_help))
self.application.add_handler(CommandHandler("menu", self.cmd_start)) # /menu 指令映射到 cmd_start
self.application.add_handler(CommandHandler("trend", self.cmd_trend))
self.application.add_handler(CommandHandler("search", self.cmd_search))
self.application.add_handler(CommandHandler("copy", self.cmd_copy))
self.application.add_handler(CommandHandler("keywords", self.cmd_keywords))
self.application.add_handler(CommandHandler("daily", self.cmd_daily))
self.application.add_handler(CallbackQueryHandler(self.handle_callback))
self.application.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND,
self.handle_message
))
return self.application
# 全域實例
_bot_instance: Optional[TrendTelegramBot] = None
def get_telegram_bot() -> TrendTelegramBot:
"""取得 Telegram Bot 實例"""
global _bot_instance
if _bot_instance is None:
_bot_instance = TrendTelegramBot()
return _bot_instance
def is_telegram_available() -> bool:
"""檢查 Telegram 功能是否可用"""
return TELEGRAM_AVAILABLE and bool(os.getenv('TELEGRAM_BOT_TOKEN'))
# 別名 (供 run_telegram_bot.py 使用)
TelegramBotService = TrendTelegramBot