""" Telegram Bot 服務 提供趨勢資料的 Telegram 互動功能 注意: 需要安裝 python-telegram-bot 套件 pip install python-telegram-bot>=20.0 功能: - 主選單按鈕介面 - 分類選擇按鈕 - 趨勢查詢、AI 搜尋、文案生成 - 每日趨勢摘要推播 """ import os import json import asyncio import logging import requests from typing import Optional from datetime import date, datetime, timedelta from services.telegram_update_guard import is_duplicate_update as is_global_duplicate_update logger = logging.getLogger(__name__) # 嘗試匯入 telegram 模組 try: from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, ReplyKeyboardMarkup, KeyboardButton from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ConversationHandler TELEGRAM_AVAILABLE = True except ImportError: TELEGRAM_AVAILABLE = False logger.warning("python-telegram-bot 套件未安裝,Telegram Bot 功能將無法使用") # 商品分類列表 CATEGORIES = ['美妝', '3C', '服飾', '居家', '母嬰', '電商', '優惠', '生活', '美食', '熱門'] class TrendTelegramBot: """趨勢資料庫 Telegram Bot 服務""" @staticmethod def _bool_env(name: str) -> bool: return os.getenv(name, "").strip().lower() in {"1", "true", "yes", "on"} def should_run_polling(self) -> bool: """決定是否啟動 polling:若已設定 webhook,預設不啟動 polling。""" if self._bool_env("TELEGRAM_FORCE_POLLING"): logger.info("[TrendTelegramBot] 強制啟用 polling(TELEGRAM_FORCE_POLLING=1)") return True if self._bool_env("TELEGRAM_DISABLE_POLLING"): logger.warning("[TrendTelegramBot] 停用 polling(TELEGRAM_DISABLE_POLLING=1)") return False if not self.token: logger.error("Telegram Token 未設定,無法判斷 webhook 狀態") return False try: payload = requests.get( f"https://api.telegram.org/bot{self.token}/getWebhookInfo", timeout=5, ).json() if not payload.get("ok"): return True return not bool((payload.get("result") or {}).get("url")) except Exception as exc: logger.warning(f"檢查 webhook 狀態失敗,預設不啟動 polling(除非 TELEGRAM_FORCE_POLLING=1):{exc}") return self._bool_env("TELEGRAM_FORCE_POLLING") def __init__(self, token: str = None): """ 初始化 Bot Args: token: Telegram Bot Token (從 BotFather 取得) """ self.token = token or os.getenv('TELEGRAM_BOT_TOKEN') self.application = None self.is_running = False if not TELEGRAM_AVAILABLE: logger.error("Telegram Bot 無法初始化: 缺少 python-telegram-bot 套件") return if not self.token: logger.warning("Telegram Bot Token 未設定") async def start(self): """啟動 Bot""" if not TELEGRAM_AVAILABLE or not self.token: logger.error("無法啟動 Telegram Bot") return False if not self.should_run_polling(): logger.warning("檢測到 webhook 已啟用,跳過 polling 模式啟動") return False try: self.application = Application.builder().token(self.token).build() # 註冊指令處理器 self.application.add_handler(CommandHandler("start", self.cmd_start)) self.application.add_handler(CommandHandler("menu", self.cmd_menu)) self.application.add_handler(CommandHandler("help", self.cmd_help)) self.application.add_handler(CommandHandler("trend", self.cmd_trend)) self.application.add_handler(CommandHandler("search", self.cmd_search)) self.application.add_handler(CommandHandler("copy", self.cmd_copy)) self.application.add_handler(CommandHandler("keywords", self.cmd_keywords)) self.application.add_handler(CommandHandler("daily", self.cmd_daily)) # 回調查詢處理器 self.application.add_handler(CallbackQueryHandler(self.handle_callback)) # 一般訊息處理器 self.application.add_handler(MessageHandler( filters.TEXT & ~filters.COMMAND, self.handle_message )) # 啟動 await self.application.initialize() await self.application.start() await self.application.updater.start_polling() self.is_running = True logger.info("Telegram Bot 已啟動") return True except Exception as e: logger.error(f"Telegram Bot 啟動失敗: {e}") return False async def stop(self): """停止 Bot""" if self.application and self.is_running: try: await self.application.updater.stop() await self.application.stop() await self.application.shutdown() self.is_running = False logger.info("Telegram Bot 已停止") except Exception as e: logger.error(f"Telegram Bot 停止失敗: {e}") # ========== 指令處理器 ========== def _get_main_menu_keyboard(self): """建立主選單按鈕""" keyboard = [ [ InlineKeyboardButton("📊 熱門趨勢", callback_data="menu_trend"), InlineKeyboardButton("🔍 AI 搜尋", callback_data="menu_search"), ], [ InlineKeyboardButton("✍️ 生成文案", callback_data="menu_copy"), InlineKeyboardButton("🏷️ 熱門關鍵字", callback_data="menu_keywords"), ], [ InlineKeyboardButton("📰 每日摘要", callback_data="menu_daily"), InlineKeyboardButton("⚙️ 設定", callback_data="menu_settings"), ], ] return InlineKeyboardMarkup(keyboard) def _get_category_keyboard(self, callback_prefix: str = "trend"): """建立分類選擇按鈕""" # 每行 3 個按鈕 keyboard = [] row = [] for i, cat in enumerate(CATEGORIES): row.append(InlineKeyboardButton(cat, callback_data=f"{callback_prefix}_{cat}")) if len(row) == 3: keyboard.append(row) row = [] if row: keyboard.append(row) # 加入返回按鈕 keyboard.append([InlineKeyboardButton("🔙 返回主選單", callback_data="menu_main")]) return InlineKeyboardMarkup(keyboard) def _to_inline_markup(self, keyboard): """將 OpenClaw dict keyboard 轉成 python-telegram-bot markup。""" if not keyboard: return None return InlineKeyboardMarkup([ [ InlineKeyboardButton( text=button['text'], callback_data=button['callback_data'], ) for button in row ] for row in keyboard ]) async def cmd_menu(self, update: Update, context): """顯示 OpenClaw 完整主選單。""" from routes import openclaw_bot_routes as openclaw await update.message.reply_text( "👋 *OpenClaw(小O)* — 電商智能助理\n\n" "點下方按鈕,或直接用中文跟我說話 👇", parse_mode='Markdown', reply_markup=self._to_inline_markup(openclaw.main_menu_keyboard()), ) async def cmd_start(self, update: Update, context): """開始指令 - 顯示主選單""" user = update.effective_user await update.message.reply_text( f"👋 嗨 {user.first_name}!\n\n" f"我是 *MOMO 趨勢助手 Bot*\n" f"請選擇要執行的功能:", parse_mode='Markdown', reply_markup=self._get_main_menu_keyboard() ) async def cmd_help(self, update: Update, context): """說明指令""" help_text = """ 📖 *指令說明* *趨勢查詢* /trend - 查看所有分類熱門趨勢 /trend 美妝 - 查看美妝分類趨勢 *AI 搜尋* /search 夏季防曬推薦 - AI 搜尋並分析 *文案生成* /copy 防曬乳 - 為商品生成行銷文案 *關鍵字* /keywords - 近 7 天熱門關鍵字 *每日摘要* /daily - 查看今日趨勢摘要 """ await update.message.reply_text(help_text, parse_mode='Markdown') async def cmd_trend(self, update: Update, context): """趨勢查詢指令""" category = ' '.join(context.args) if context.args else None await update.message.reply_text("🔄 正在查詢趨勢資料...") try: from database.trend_models import TrendRecord from database.manager import get_session session = get_session() date_from = date.today() - timedelta(days=7) # 查詢熱門趨勢 query = session.query(TrendRecord).filter( TrendRecord.trend_date >= date_from ) if category: query = query.filter(TrendRecord.category == category) records = query.order_by( TrendRecord.popularity_score.desc() ).limit(10).all() session.close() if not records: await update.message.reply_text( f"📭 {f'{category}分類' if category else ''}近 7 天沒有趨勢資料" ) return # 格式化回覆 title = f"📊 {category if category else '所有分類'}熱門趨勢 (近7天)\n\n" lines = [] for i, r in enumerate(records, 1): source_emoji = { 'ptt': '💬', 'dcard': '📱', 'google_news': '📰', 'youtube': '🎬' }.get(r.source, '📄') lines.append( f"{i}. {source_emoji} {r.title[:30]}{'...' if len(r.title) > 30 else ''} " f"(熱度:{r.popularity_score})" ) await update.message.reply_text(title + '\n'.join(lines)) except Exception as e: logger.error(f"Telegram cmd_trend 失敗: {e}") await update.message.reply_text(f"❌ 查詢失敗: {str(e)}") async def cmd_search(self, update: Update, context): """AI 搜尋指令""" query = ' '.join(context.args) if not query: await update.message.reply_text("❓ 請輸入搜尋關鍵字\n範例: /search 夏季防曬推薦") return await update.message.reply_text(f"🔍 正在搜尋「{query}」...") try: from services.trend_crawler_service import get_trend_crawler_service service = get_trend_crawler_service() result = service.web_search_with_cache(query, search_type='trends') if result['success']: data = result['data'] parsed = data.get('result', {}) summary = parsed.get('summary', '無摘要') results = parsed.get('results', [])[:5] reply = f"🔍 *搜尋結果: {query}*\n\n" reply += f"📝 *摘要:*\n{summary}\n\n" if results: reply += "*相關資訊:*\n" for i, r in enumerate(results, 1): title = r.get('title', '無標題')[:40] reply += f"{i}. {title}\n" await update.message.reply_text(reply, parse_mode='Markdown') else: await update.message.reply_text(f"❌ 搜尋失敗: {result.get('error', '未知錯誤')}") except Exception as e: logger.error(f"Telegram cmd_search 失敗: {e}") await update.message.reply_text(f"❌ 搜尋失敗: {str(e)}") async def cmd_copy(self, update: Update, context): """文案生成指令""" if not context.args: await update.message.reply_text("❓ 請輸入商品名稱\n範例: /copy 防曬乳") return product_name = context.args[0] context_hint = ' '.join(context.args[1:]) if len(context.args) > 1 else None await update.message.reply_text(f"✍️ 正在為「{product_name}」生成文案...") try: from services.ollama_service import OllamaService ollama = OllamaService() prompt = f"""請為以下商品生成 3 種風格的行銷文案: 商品: {product_name} {f'情境: {context_hint}' if context_hint else ''} 請分別生成: 1. 標準版 (100字內) 2. 活潑版 (含表情符號,100字內) 3. 限時版 (強調緊迫感,100字內) 以繁體中文回覆。""" response = ollama.generate(prompt, temperature=0.7) if response.success: await update.message.reply_text( f"✨ *{product_name} 行銷文案*\n\n{response.content}", parse_mode='Markdown' ) else: await update.message.reply_text(f"❌ 生成失敗: {response.error}") except Exception as e: logger.error(f"Telegram cmd_copy 失敗: {e}") await update.message.reply_text(f"❌ 生成失敗: {str(e)}") async def cmd_keywords(self, update: Update, context): """熱門關鍵字指令""" category = ' '.join(context.args) if context.args else None try: from database.trend_models import TrendKeyword from database.manager import get_session from sqlalchemy import func session = get_session() date_from = date.today() - timedelta(days=7) query = session.query( TrendKeyword.keyword, func.sum(TrendKeyword.mention_count).label('total') ).filter( TrendKeyword.trend_date >= date_from ).group_by(TrendKeyword.keyword) if category: query = query.filter(TrendKeyword.category == category) keywords = query.order_by( func.sum(TrendKeyword.mention_count).desc() ).limit(20).all() session.close() if not keywords: await update.message.reply_text("📭 近 7 天沒有熱門關鍵字資料") return title = f"🏷️ {category if category else '所有分類'}熱門關鍵字 (近7天)\n\n" lines = [f"{i}. {kw.keyword} ({kw.total}次)" for i, kw in enumerate(keywords, 1)] await update.message.reply_text(title + '\n'.join(lines)) except Exception as e: logger.error(f"Telegram cmd_keywords 失敗: {e}") await update.message.reply_text(f"❌ 查詢失敗: {str(e)}") async def cmd_daily(self, update: Update, context): """每日趨勢摘要""" try: from database.trend_models import TrendAnalysis from database.manager import get_session session = get_session() analysis = session.query(TrendAnalysis).filter( TrendAnalysis.analysis_date == date.today(), TrendAnalysis.analysis_type == 'daily_summary' ).first() session.close() if not analysis: await update.message.reply_text("📭 今日尚無趨勢分析報告") return hot_keywords = json.loads(analysis.hot_keywords or '[]') marketing = json.loads(analysis.marketing_suggestions or '[]') reply = f"📰 *今日趨勢摘要*\n\n" reply += f"📝 *概況:*\n{analysis.summary}\n\n" if hot_keywords: reply += f"🏷️ *熱門關鍵字:*\n{', '.join(hot_keywords[:10])}\n\n" if marketing: reply += f"💡 *行銷建議:*\n" for i, m in enumerate(marketing[:5], 1): reply += f"{i}. {m}\n" await update.message.reply_text(reply, parse_mode='Markdown') except Exception as e: logger.error(f"Telegram cmd_daily 失敗: {e}") await update.message.reply_text(f"❌ 查詢失敗: {str(e)}") async def handle_callback(self, update: Update, context): """處理按鈕回調""" query = update.callback_query data = query.data or '' def _build_polling_callback_dedupe_key(): update_id = getattr(update, "update_id", None) msg = getattr(query, "message", None) msg_id = getattr(msg, "message_id", None) chat_id = getattr(msg, "chat_id", None) if msg is not None else None user_id = getattr(getattr(update, "effective_user", None), "id", None) if user_id is None: user_id = getattr(getattr(query, "from_user", None), "id", None) parts = [] if update_id is not None: if query.id: parts.append(f"cbq:{query.id}") else: parts.append(f"uid:{update_id}") elif query.id: parts.append(f"cbq:{query.id}") if chat_id is not None: parts.append(f"chat:{chat_id}") if user_id is not None: parts.append(f"user:{user_id}") if msg_id is not None: parts.append(f"msg:{msg_id}") data_key = data or "" if data_key: parts.append(f"data:{data_key}") return "cb:" + "|".join(parts) if parts else f"cb-query:{query.id}" try: from routes import openclaw_bot_routes as openclaw if data.startswith('menu_'): key = data[5:] if key in openclaw._SUBMENUS: data = f"menu:{key}" elif data.startswith('await_'): key = data[6:] if key in openclaw._AWAIT_PROMPTS: data = f"await:{key}" elif data.startswith('cmd_'): data = f"cmd:{data[4:]}" except Exception: logger.debug("OpenClaw callback normalization failed", exc_info=True) dedupe_key = _build_polling_callback_dedupe_key() if is_global_duplicate_update(dedupe_key, namespace="telegram_update"): logger.warning(f"忽略重複 callback key={dedupe_key}") try: await query.answer() except Exception as exc: logger.debug(f"callback 重複回覆失敗: {exc}") return await query.answer() # ADR-012 / A' 軌:HITL escalation 按鈕(momo:eig:{event_id} = event_ignore) # triaged_alert 鍵盤按鈕「🛑 忽略此事件」會 callback momo:eig: if data.startswith('momo:eig:'): await self._handle_event_ignore_callback(query, data) return if data.startswith(('menu:', 'cmd:', 'await:')): await self._handle_openclaw_callback(query, context, data) return # ===== 主選單按鈕 ===== if data == "menu_main": await query.edit_message_text( "請選擇要執行的功能:", reply_markup=self._get_main_menu_keyboard() ) elif data == "menu_trend": await query.edit_message_text( "📊 *熱門趨勢*\n\n請選擇分類:", parse_mode='Markdown', reply_markup=self._get_category_keyboard("trend") ) elif data == "menu_search": context.user_data['waiting_for'] = 'search_query' await query.edit_message_text( "🔍 *AI 搜尋*\n\n請輸入要搜尋的關鍵字:\n\n" "例如:夏季防曬推薦、母親節禮物", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("🔙 返回主選單", callback_data="menu_main") ]]) ) elif data == "menu_copy": context.user_data['waiting_for'] = 'copy_product' await query.edit_message_text( "✍️ *生成文案*\n\n請輸入商品名稱:\n\n" "例如:防曬乳、保濕面膜", parse_mode='Markdown', reply_markup=InlineKeyboardMarkup([[ InlineKeyboardButton("🔙 返回主選單", callback_data="menu_main") ]]) ) elif data == "menu_keywords": await query.edit_message_text( "🏷️ *熱門關鍵字*\n\n請選擇分類:", parse_mode='Markdown', reply_markup=self._get_category_keyboard("keywords") ) elif data == "menu_daily": await query.edit_message_text("📰 正在載入今日趨勢摘要...") await self._show_daily_summary(query) elif data == "menu_settings": await self._show_settings(query) # ===== 趨勢分類按鈕 ===== elif data.startswith("trend_"): category = data[6:] await query.edit_message_text(f"🔄 正在查詢 {category} 趨勢...") await self._show_trend_by_category(query, category) # ===== 關鍵字分類按鈕 ===== elif data.startswith("keywords_"): category = data[9:] await query.edit_message_text(f"🔄 正在查詢 {category} 熱門關鍵字...") await self._show_keywords_by_category(query, category) # ===== 設定按鈕 ===== elif data.startswith("settings_"): await self._handle_settings_callback(query, data) async def _handle_event_ignore_callback(self, query, data: str): """A' 軌:HITL 升級審核「🛑 忽略此事件」按鈕處理。 callback_data 格式:momo:eig: 動作: 1. 寫入 ai_insights 一筆 status='ignored' 紀錄(含 event_id 供 audit) 2. 編輯原訊息:標記「已忽略 by @ 」 3. 失敗 best-effort,不阻斷 user 體驗 ADR-012 §③:人工決策必有 audit trail;ai_insights 為 SOT。 Critic Critical-1 fix: user_label / ts_label 寫入 HTML 前須 escape; Critic Medium-2 fix: 空 event_id 直接拒絕,避免污染 audit 資料。 """ from datetime import datetime as _dt from html import escape as _html_escape # Critic Medium-2: 空 event_id 即拒絕(避免攻擊者送 `momo:eig:` 污染 audit) parts_in = data.split(':', 2) event_id = parts_in[2].strip() if len(parts_in) >= 3 else '' if not event_id: try: await query.answer("event_id 缺失,忽略動作未生效", show_alert=False) except Exception: logger.debug("empty event_id callback answer failed", exc_info=True) logger.warning("[EA HITL] empty event_id callback rejected: %r", data) return user = getattr(query, 'from_user', None) user_label_raw = ( getattr(user, 'username', None) or getattr(user, 'first_name', None) or str(getattr(user, 'id', '?')) ) ts_label_raw = _dt.now().strftime('%Y-%m-%d %H:%M') try: from database.manager import get_session from sqlalchemy import text session = get_session() try: session.execute( text(""" INSERT INTO ai_insights (insight_type, content, confidence, created_by, status, metadata_json) VALUES (:type, :content, :conf, :by, :status, :meta) """), { "type": "human_review", "content": f"[EA HITL] 事件 {event_id} 由 {user_label_raw} 忽略", "conf": 1.0, "by": f"telegram:{user_label_raw}", "status": "ignored", "meta": json.dumps({ "event_id": event_id, "decided_by": user_label_raw, "decided_at": ts_label_raw, "decision": "ignored", }, ensure_ascii=False), }, ) session.commit() finally: session.close() except Exception as audit_err: logger.warning(f"[EA HITL] ai_insights audit 寫入失敗(不阻斷 UI): {audit_err}") # Critic Critical-1: user_label / ts_label 須 HTML escape,避免攻擊者 # 透過 Telegram username 注入 /
/破版標籤
        user_label_safe = _html_escape(str(user_label_raw))
        ts_label_safe = _html_escape(ts_label_raw)
        try:
            original = query.message.text_html if query.message and getattr(query.message, 'text_html', None) else (query.message.text if query.message else "")
            footer = f"\n\n🛑 已忽略 by {user_label_safe} @ {ts_label_safe}"
            await query.edit_message_text(
                (original or '事件已忽略') + footer,
                parse_mode='HTML',
            )
        except Exception as ui_err:
            logger.debug(f"[EA HITL] edit_message 失敗(不阻斷): {ui_err}")

        logger.info(f"[EA HITL] event_ignore event_id={event_id} by={user_label_raw}")

    async def _handle_openclaw_callback(self, query, context, data: str):
        """轉接 OpenClaw 完整菜單 callback,避免長輪詢 Bot 吃掉 /menu。"""
        chat_id = query.message.chat_id
        reply_to = query.message.message_id

        try:
            if data.startswith('menu:'):
                from routes import openclaw_bot_routes as openclaw

                key = data[5:]
                submenu = openclaw._SUBMENUS.get(key)
                if not submenu:
                    await query.message.reply_text("⚠️ 找不到這個選單")
                    return

                titles = {
                    'main': '👋 *OpenClaw* — 請選擇功能類別',
                    'sales': '📊 *業績查詢* — 選擇日期或直接輸入',
                    'products': '🏆 *商品廠商* — 選擇查詢範圍',
                    'goals': '🎯 *目標管理* — 查看或設定業績目標',
                    'analysis': '📈 *智能分析* — 選擇分析類型',
                    'trend': '📈 *業績趨勢* — 選擇時間範圍',
                    'reports': '📄 *簡報報表* — 選擇報告類型',
                    'market': '🌐 *市場情報* — 即時資訊',
                    'competitor': '📊 *競品比價日報* — 選擇分析日期',
                    'competitor_ppt': '📄 *競品比價簡報* — 選擇時間範圍',
                    'category': '🗂 *分類業績鑽取* — 點選分類深入分析',
                }
                await query.edit_message_text(
                    titles.get(key, '請選擇'),
                    parse_mode='Markdown',
                    reply_markup=self._to_inline_markup(submenu()),
                )
                return

            if data.startswith('await:'):
                from routes.openclaw_bot_routes import _AWAIT_PROMPTS

                action = data[6:]
                prompt = _AWAIT_PROMPTS.get(action)
                if not prompt:
                    await query.message.reply_text("⚠️ 找不到這個輸入流程")
                    return

                context.user_data['openclaw_waiting_for'] = action
                prompt_text, _label = prompt
                await query.edit_message_text(
                    f"{prompt_text}\n\n_輸入 `/取消` 可退出_",
                    parse_mode='Markdown',
                    reply_markup=self._to_inline_markup([
                        [{'text': '✖ 取消', 'callback_data': 'menu:main'}]
                    ]),
                )
                return

            if data.startswith('cmd:'):
                from routes.openclaw_bot_routes import handle_cmd
                from routes import openclaw_bot_routes as openclaw

                parts = data[4:].split(':', 1)
                await query.message.reply_chat_action(action='typing')
                with openclaw._run_with_callback_cmd_context():
                    handle_cmd(parts[0], parts[1] if len(parts) > 1 else '', chat_id, reply_to)
                return

        except Exception as e:
            logger.error(f"OpenClaw callback 轉接失敗: {e}", exc_info=True)
            await query.message.reply_text("⚠️ 功能執行失敗,請稍後再試。")

    async def _show_trend_by_category(self, query, category: str):
        """顯示指定分類的趨勢"""
        try:
            from database.trend_models import TrendRecord
            from database.manager import get_session

            session = get_session()
            date_from = date.today() - timedelta(days=7)

            records = session.query(TrendRecord).filter(
                TrendRecord.trend_date >= date_from,
                TrendRecord.category == category
            ).order_by(
                TrendRecord.popularity_score.desc()
            ).limit(10).all()

            session.close()

            if not records:
                await query.edit_message_text(
                    f"📭 {category} 分類近 7 天沒有趨勢資料",
                    reply_markup=InlineKeyboardMarkup([[
                        InlineKeyboardButton("🔙 返回選擇分類", callback_data="menu_trend"),
                        InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
                    ]])
                )
                return

            lines = [f"📊 *{category} 熱門趨勢* (近7天)\n"]
            for i, r in enumerate(records, 1):
                source_emoji = {'ptt': '💬', 'dcard': '📱', 'google_news': '📰', 'youtube': '🎬'}.get(r.source, '📄')
                title = r.title[:25] + '...' if len(r.title) > 25 else r.title
                lines.append(f"{i}. {source_emoji} {title} ({r.popularity_score})")

            await query.edit_message_text(
                '\n'.join(lines),
                parse_mode='Markdown',
                reply_markup=InlineKeyboardMarkup([[
                    InlineKeyboardButton("🔄 重新整理", callback_data=f"trend_{category}"),
                    InlineKeyboardButton("📊 其他分類", callback_data="menu_trend")
                ], [
                    InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
                ]])
            )

        except Exception as e:
            logger.error(f"_show_trend_by_category 失敗: {e}")
            await query.edit_message_text(
                f"❌ 查詢失敗: {str(e)}",
                reply_markup=InlineKeyboardMarkup([[
                    InlineKeyboardButton("🔙 返回", callback_data="menu_main")
                ]])
            )

    async def _show_keywords_by_category(self, query, category: str):
        """顯示指定分類的熱門關鍵字"""
        try:
            from database.trend_models import TrendKeyword
            from database.manager import get_session
            from sqlalchemy import func

            session = get_session()
            date_from = date.today() - timedelta(days=7)

            keywords = session.query(
                TrendKeyword.keyword,
                func.sum(TrendKeyword.mention_count).label('total')
            ).filter(
                TrendKeyword.trend_date >= date_from,
                TrendKeyword.category == category
            ).group_by(TrendKeyword.keyword).order_by(
                func.sum(TrendKeyword.mention_count).desc()
            ).limit(15).all()

            session.close()

            if not keywords:
                await query.edit_message_text(
                    f"📭 {category} 分類近 7 天沒有熱門關鍵字",
                    reply_markup=InlineKeyboardMarkup([[
                        InlineKeyboardButton("🔙 返回選擇分類", callback_data="menu_keywords"),
                        InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
                    ]])
                )
                return

            lines = [f"🏷️ *{category} 熱門關鍵字* (近7天)\n"]
            for i, kw in enumerate(keywords, 1):
                lines.append(f"{i}. {kw.keyword} ({kw.total}次)")

            await query.edit_message_text(
                '\n'.join(lines),
                parse_mode='Markdown',
                reply_markup=InlineKeyboardMarkup([[
                    InlineKeyboardButton("🔄 重新整理", callback_data=f"keywords_{category}"),
                    InlineKeyboardButton("🏷️ 其他分類", callback_data="menu_keywords")
                ], [
                    InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
                ]])
            )

        except Exception as e:
            logger.error(f"_show_keywords_by_category 失敗: {e}")
            await query.edit_message_text(
                f"❌ 查詢失敗: {str(e)}",
                reply_markup=InlineKeyboardMarkup([[
                    InlineKeyboardButton("🔙 返回", callback_data="menu_main")
                ]])
            )

    async def _show_daily_summary(self, query):
        """顯示每日趨勢摘要"""
        try:
            from database.trend_models import TrendAnalysis
            from database.manager import get_session

            session = get_session()
            analysis = session.query(TrendAnalysis).filter(
                TrendAnalysis.analysis_date == date.today(),
                TrendAnalysis.analysis_type == 'daily_summary'
            ).first()
            session.close()

            if not analysis:
                await query.edit_message_text(
                    "📭 今日尚無趨勢分析報告\n\n趨勢爬蟲每 2 小時執行一次",
                    reply_markup=InlineKeyboardMarkup([[
                        InlineKeyboardButton("🔄 重新載入", callback_data="menu_daily"),
                        InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
                    ]])
                )
                return

            hot_keywords = json.loads(analysis.hot_keywords or '[]')
            marketing = json.loads(analysis.marketing_suggestions or '[]')

            lines = ["📰 *今日趨勢摘要*\n"]
            lines.append(f"📝 *概況:*\n{analysis.summary[:200]}...\n" if len(analysis.summary) > 200 else f"📝 *概況:*\n{analysis.summary}\n")

            if hot_keywords:
                lines.append(f"🏷️ *熱門關鍵字:*\n{', '.join(hot_keywords[:8])}\n")

            if marketing:
                lines.append("💡 *行銷建議:*")
                for i, m in enumerate(marketing[:3], 1):
                    lines.append(f"{i}. {m[:50]}...")

            await query.edit_message_text(
                '\n'.join(lines),
                parse_mode='Markdown',
                reply_markup=InlineKeyboardMarkup([[
                    InlineKeyboardButton("🔄 重新載入", callback_data="menu_daily"),
                    InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
                ]])
            )

        except Exception as e:
            logger.error(f"_show_daily_summary 失敗: {e}")
            await query.edit_message_text(
                f"❌ 載入失敗: {str(e)}",
                reply_markup=InlineKeyboardMarkup([[
                    InlineKeyboardButton("🔙 返回", callback_data="menu_main")
                ]])
            )

    async def _show_settings(self, query):
        """顯示設定選單"""
        keyboard = [
            [InlineKeyboardButton("🔔 開啟趨勢通知", callback_data="settings_notify_on")],
            [InlineKeyboardButton("🔕 關閉趨勢通知", callback_data="settings_notify_off")],
            [InlineKeyboardButton("📊 訂閱每日摘要", callback_data="settings_daily_on")],
            [InlineKeyboardButton("📭 取消每日摘要", callback_data="settings_daily_off")],
            [InlineKeyboardButton("🏠 主選單", callback_data="menu_main")],
        ]
        await query.edit_message_text(
            "⚙️ *設定*\n\n請選擇要調整的設定:",
            parse_mode='Markdown',
            reply_markup=InlineKeyboardMarkup(keyboard)
        )

    async def _handle_settings_callback(self, query, data: str):
        """處理設定相關的回調"""
        from database.trend_models import TelegramUser
        from database.manager import get_session

        user_id = query.from_user.id
        session = get_session()

        try:
            # 取得或建立用戶記錄
            tg_user = session.query(TelegramUser).filter(
                TelegramUser.telegram_id == user_id
            ).first()

            if not tg_user:
                tg_user = TelegramUser(
                    telegram_id=user_id,
                    telegram_username=query.from_user.username,
                    display_name=query.from_user.first_name,
                    is_active=True
                )
                session.add(tg_user)

            # 處理設定變更
            if data == "settings_notify_on":
                tg_user.notify_trends = True
                msg = "✅ 已開啟趨勢通知"
            elif data == "settings_notify_off":
                tg_user.notify_trends = False
                msg = "🔕 已關閉趨勢通知"
            elif data == "settings_daily_on":
                tg_user.notify_daily_summary = True
                msg = "✅ 已訂閱每日摘要 (每天 09:00 發送)"
            elif data == "settings_daily_off":
                tg_user.notify_daily_summary = False
                msg = "📭 已取消每日摘要訂閱"
            else:
                msg = "❓ 未知的設定"

            session.commit()
            await query.edit_message_text(
                f"{msg}\n\n目前設定:\n"
                f"🔔 趨勢通知: {'開啟' if tg_user.notify_trends else '關閉'}\n"
                f"📰 每日摘要: {'訂閱中' if tg_user.notify_daily_summary else '未訂閱'}",
                reply_markup=InlineKeyboardMarkup([[
                    InlineKeyboardButton("⚙️ 返回設定", callback_data="menu_settings"),
                    InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
                ]])
            )

        except Exception as e:
            logger.error(f"_handle_settings_callback 失敗: {e}")
            session.rollback()
            await query.edit_message_text(f"❌ 設定失敗: {str(e)}")
        finally:
            session.close()

    async def handle_message(self, update: Update, context):
        """處理一般訊息"""
        text = update.message.text
        waiting_for = context.user_data.get('waiting_for')
        openclaw_waiting_for = context.user_data.get('openclaw_waiting_for')

        if openclaw_waiting_for:
            context.user_data['openclaw_waiting_for'] = None
            await self._process_openclaw_input(update, openclaw_waiting_for, text)
            return

        # 處理等待輸入的狀態
        if waiting_for == 'search_query':
            context.user_data['waiting_for'] = None
            await self._process_search(update, text)
            return

        if waiting_for == 'copy_product':
            context.user_data['waiting_for'] = None
            await self._process_copy(update, text)
            return

        # 簡單的自然語言處理 - 顯示主選單
        if '趨勢' in text or '熱門' in text:
            await update.message.reply_text(
                "💡 請選擇功能:",
                reply_markup=self._get_main_menu_keyboard()
            )
        elif '搜尋' in text or '查詢' in text:
            context.user_data['waiting_for'] = 'search_query'
            await update.message.reply_text(
                "🔍 請輸入要搜尋的關鍵字:",
                reply_markup=InlineKeyboardMarkup([[
                    InlineKeyboardButton("🔙 取消", callback_data="menu_main")
                ]])
            )
        elif '文案' in text or '生成' in text:
            context.user_data['waiting_for'] = 'copy_product'
            await update.message.reply_text(
                "✍️ 請輸入商品名稱:",
                reply_markup=InlineKeyboardMarkup([[
                    InlineKeyboardButton("🔙 取消", callback_data="menu_main")
                ]])
            )
        else:
            await update.message.reply_chat_action(action='typing')
            try:
                from routes.openclaw_bot_routes import openclaw_answer
                _chat_id = getattr(update.effective_chat, 'id', None)
                txt, kb = openclaw_answer(text, chat_id=_chat_id)

                reply_markup = None
                if kb:
                    inline_kb = []
                    for row in kb:
                        inline_row = []
                        for btn in row:
                            inline_row.append(InlineKeyboardButton(text=btn['text'], callback_data=btn['callback_data']))
                        inline_kb.append(inline_row)
                    reply_markup = InlineKeyboardMarkup(inline_kb)
                
                await update.message.reply_text(txt, reply_markup=reply_markup)
            except Exception as e:
                logger.error(f"自然對話處理失敗: {e}")
                await update.message.reply_text("不好意思,我現在無法回答這個問題,請稍後再試。")

    async def _process_openclaw_input(self, update: Update, action: str, text: str):
        """處理 OpenClaw 菜單進入的文字輸入流程。"""
        from routes import openclaw_bot_routes as openclaw

        chat_id = update.effective_chat.id
        reply_to = update.message.message_id
        val = text.strip().replace(',', '').replace('NT$', '').replace('$', '').strip()

        if text in ('/取消', '/cancel'):
            await update.message.reply_text(
                "已取消",
                reply_markup=self._to_inline_markup(openclaw.main_menu_keyboard()),
            )
            return

        try:
            if action.startswith('goal_'):
                period_map = {
                    'goal_daily': 'daily',
                    'goal_monthly': 'monthly',
                    'goal_quarterly': 'quarterly',
                    'goal_half': 'half',
                    'goal_yearly': 'yearly',
                }
                period = period_map[action]
                amount = float(val)
                openclaw._GOALS[period] = amount
                await update.message.reply_text(
                    f"✅ 目標已設定為 NT$ {amount:,.0f}",
                    reply_markup=self._to_inline_markup(openclaw._submenu_goals()),
                )
                return

            if action == 'search_compare':
                openclaw.handle_cmd('competitor', val, chat_id, reply_to)
                return

            if action == 'date_range_sales':
                import re

                dates = re.findall(r'\d{4}[/\-]\d{1,2}[/\-]\d{1,2}', val)
                month_only = re.match(r'(\d{4})[/\-](\d{1,2})$', val)
                if len(dates) >= 2:
                    start = openclaw.normalize_date(dates[0])
                    end = openclaw.normalize_date(dates[1])
                    if start == end:
                        openclaw.handle_cmd('sales', start, chat_id, reply_to)
                    else:
                        start_d = datetime.strptime(start.replace('/', '-'), '%Y-%m-%d').date()
                        end_d = datetime.strptime(end.replace('/', '-'), '%Y-%m-%d').date()
                        days_count = (end_d - start_d).days + 1
                        data = openclaw.query_trend_range(start, end)
                        if data:
                            period_label = f'{start} ~ {end}({days_count}天)'
                            openclaw.send_message(
                                chat_id,
                                openclaw.fmt_trend(data, period_label),
                                reply_to,
                                openclaw._submenu_sales(),
                            )
                        else:
                            openclaw.send_message(chat_id, f"⚠️ {start} ~ {end} 查無業績資料", reply_to)
                elif len(dates) == 1:
                    openclaw.handle_cmd('sales', openclaw.normalize_date(dates[0]), chat_id, reply_to)
                elif month_only:
                    openclaw.handle_cmd(
                        'history',
                        f"{month_only.group(1)}/{int(month_only.group(2)):02d}",
                        chat_id,
                        reply_to,
                    )
                else:
                    await update.message.reply_text("⚠️ 格式錯誤,請重新輸入日期或日期區間。")
                return

            if action.startswith('date_trend_'):
                openclaw.handle_cmd('trend', val.replace('-', '/'), chat_id, reply_to)
                return

            if action.startswith('date_'):
                import re

                date_val = val.replace('-', '/')
                if not re.match(r'\d{4}/\d{1,2}(/\d{1,2})?$', date_val):
                    await update.message.reply_text("⚠️ 日期格式錯誤,請重新輸入。")
                    return

                command_map = {
                    'date_sales': ('sales', date_val),
                    'date_top': ('top', date_val),
                    'date_analysis': ('strategy', date_val),
                    'date_ppt_daily': ('ppt', f'daily {date_val}'),
                    'date_ppt_monthly': ('ppt', f'monthly {date_val}'),
                    'date_competitor': ('ppt', f'competitor {date_val}'),
                }
                command = command_map.get(action)
                if command:
                    openclaw.handle_cmd(command[0], command[1], chat_id, reply_to)
                return

            if action == 'promo_range':
                import re

                dates = re.findall(r'\d{4}[/\-]\d{1,2}[/\-]\d{1,2}', val)
                if len(dates) >= 2:
                    openclaw.handle_cmd(
                        'promo',
                        f"{openclaw.normalize_date(dates[0])}-{openclaw.normalize_date(dates[1])}",
                        chat_id,
                        reply_to,
                    )
                else:
                    await update.message.reply_text("⚠️ 格式錯誤,例如:2026/04/01-2026/04/07")
                return

            await update.message.reply_text("⚠️ 這個輸入流程暫時無法處理。")

        except Exception as e:
            logger.error(f"OpenClaw 輸入流程處理失敗: {e}", exc_info=True)
            await update.message.reply_text("⚠️ 輸入處理失敗,請稍後再試。")

    async def _process_search(self, update: Update, query: str):
        """處理搜尋請求"""
        await update.message.reply_text(f"🔍 正在搜尋「{query}」...")

        try:
            from services.trend_crawler_service import get_trend_crawler_service

            service = get_trend_crawler_service()
            result = service.web_search_with_cache(query, search_type='trends')

            if result['success']:
                data = result['data']
                parsed = data.get('result', {})
                summary = parsed.get('summary', '無摘要')

                await update.message.reply_text(
                    f"🔍 *搜尋結果: {query}*\n\n{summary[:500]}",
                    parse_mode='Markdown',
                    reply_markup=InlineKeyboardMarkup([[
                        InlineKeyboardButton("🔍 繼續搜尋", callback_data="menu_search"),
                        InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
                    ]])
                )
            else:
                await update.message.reply_text(
                    f"❌ 搜尋失敗: {result.get('error', '未知錯誤')}",
                    reply_markup=self._get_main_menu_keyboard()
                )

        except Exception as e:
            logger.error(f"_process_search 失敗: {e}")
            await update.message.reply_text(
                f"❌ 搜尋失敗: {str(e)}",
                reply_markup=self._get_main_menu_keyboard()
            )

    async def _process_copy(self, update: Update, product_name: str):
        """處理文案生成請求"""
        await update.message.reply_text(f"✍️ 正在為「{product_name}」生成文案...")

        try:
            from services.ollama_service import OllamaService

            ollama = OllamaService()
            prompt = f"""請為以下商品生成 3 種風格的行銷文案:

商品: {product_name}

請分別生成:
1. 標準版 (80字內)
2. 活潑版 (含表情符號,80字內)
3. 限時版 (強調緊迫感,80字內)

以繁體中文回覆,格式簡潔。"""

            response = ollama.generate(prompt, temperature=0.7)

            if response.success:
                await update.message.reply_text(
                    f"✨ *{product_name} 行銷文案*\n\n{response.content[:1000]}",
                    parse_mode='Markdown',
                    reply_markup=InlineKeyboardMarkup([[
                        InlineKeyboardButton("✍️ 再生成一次", callback_data="menu_copy"),
                        InlineKeyboardButton("🏠 主選單", callback_data="menu_main")
                    ]])
                )
            else:
                await update.message.reply_text(
                    f"❌ 生成失敗: {response.error}",
                    reply_markup=self._get_main_menu_keyboard()
                )

        except Exception as e:
            logger.error(f"_process_copy 失敗: {e}")
            await update.message.reply_text(
                f"❌ 生成失敗: {str(e)}",
                reply_markup=self._get_main_menu_keyboard()
            )

    # ========== 推播功能 ==========

    async def send_message(self, chat_id: int, message: str, parse_mode: str = 'Markdown'):
        """發送訊息到指定聊天室"""
        if not self.application or not self.is_running:
            logger.warning("Bot 未運行,無法發送訊息")
            return False

        try:
            await self.application.bot.send_message(
                chat_id=chat_id,
                text=message,
                parse_mode=parse_mode
            )
            return True
        except Exception as e:
            logger.error(f"發送訊息失敗: {e}")
            return False

    async def broadcast_trend_alert(self, message: str, category: str = None):
        """推播趨勢警報給所有訂閱用戶"""
        from database.trend_models import TelegramUser
        from database.manager import get_session

        session = get_session()
        try:
            query = session.query(TelegramUser).filter(
                TelegramUser.is_active == True,
                TelegramUser.notify_trends == True
            )

            if category:
                query = query.filter(
                    TelegramUser.preferred_categories.like(f'%{category}%')
                )

            users = query.all()
            sent_count = 0

            for user in users:
                if await self.send_message(user.telegram_id, message):
                    sent_count += 1

            logger.info(f"趨勢警報已推播給 {sent_count}/{len(users)} 位用戶")
            return sent_count

        finally:
            session.close()

    async def send_daily_summary(self):
        """推播每日趨勢摘要給訂閱用戶"""
        from database.trend_models import TelegramUser, TrendAnalysis
        from database.manager import get_session

        session = get_session()
        try:
            # 取得今日摘要
            analysis = session.query(TrendAnalysis).filter(
                TrendAnalysis.analysis_date == date.today(),
                TrendAnalysis.analysis_type == 'daily_summary'
            ).first()

            if not analysis:
                logger.warning("每日摘要推播: 今日尚無分析報告")
                return 0

            # 組裝訊息
            hot_keywords = json.loads(analysis.hot_keywords or '[]')
            marketing = json.loads(analysis.marketing_suggestions or '[]')

            message = "📰 *MOMO 每日趨勢摘要*\n\n"
            message += f"📅 {date.today().strftime('%Y-%m-%d')}\n\n"
            message += f"📝 *概況:*\n{analysis.summary[:300]}{'...' if len(analysis.summary) > 300 else ''}\n\n"

            if hot_keywords:
                message += f"🏷️ *熱門關鍵字:*\n{', '.join(hot_keywords[:10])}\n\n"

            if marketing:
                message += "💡 *行銷建議:*\n"
                for i, m in enumerate(marketing[:3], 1):
                    message += f"{i}. {m[:60]}...\n"

            message += "\n👉 在 Bot 輸入 /daily 查看完整報告"

            # 取得訂閱用戶
            users = session.query(TelegramUser).filter(
                TelegramUser.is_active == True,
                TelegramUser.notify_daily_summary == True
            ).all()

            sent_count = 0
            for user in users:
                if await self.send_message(user.telegram_id, message):
                    sent_count += 1

            logger.info(f"每日摘要已推播給 {sent_count}/{len(users)} 位用戶")
            return sent_count

        except Exception as e:
            logger.error(f"send_daily_summary 失敗: {e}")
            return 0
        finally:
            session.close()

    def get_application(self):
        """取得 Application 實例 (供外部啟動使用)"""
        if not TELEGRAM_AVAILABLE or not self.token:
            return None

        if not self.application:
            self.application = Application.builder().token(self.token).build()

            # 註冊處理器
            self.application.add_handler(CommandHandler("start", self.cmd_start))
            self.application.add_handler(CommandHandler("menu", self.cmd_menu))
            self.application.add_handler(CommandHandler("help", self.cmd_help))
            self.application.add_handler(CommandHandler("trend", self.cmd_trend))
            self.application.add_handler(CommandHandler("search", self.cmd_search))
            self.application.add_handler(CommandHandler("copy", self.cmd_copy))
            self.application.add_handler(CommandHandler("keywords", self.cmd_keywords))
            self.application.add_handler(CommandHandler("daily", self.cmd_daily))
            self.application.add_handler(CallbackQueryHandler(self.handle_callback))
            self.application.add_handler(MessageHandler(
                filters.TEXT & ~filters.COMMAND,
                self.handle_message
            ))

        return self.application


# 全域實例
_bot_instance: Optional[TrendTelegramBot] = None


def get_telegram_bot() -> TrendTelegramBot:
    """取得 Telegram Bot 實例"""
    global _bot_instance
    if _bot_instance is None:
        _bot_instance = TrendTelegramBot()
    return _bot_instance


def is_telegram_available() -> bool:
    """檢查 Telegram 功能是否可用"""
    return TELEGRAM_AVAILABLE and bool(os.getenv('TELEGRAM_BOT_TOKEN'))


# 別名 (供 run_telegram_bot.py 使用)
TelegramBotService = TrendTelegramBot