From 1a886d962b6cd8fc48e25010071f1f4d94a9f9c6 Mon Sep 17 00:00:00 2001 From: OoO Date: Sat, 2 May 2026 12:01:04 +0800 Subject: [PATCH] fix(telegram): dedupe webhook+polling updates via shared DB guard Webhook (Flask) and polling (momo-telegram-bot) consumed the same Telegram update_id, causing /menu callbacks to fire twice. Add a shared dedup module backed by telegram_update_dedup table (300s TTL, 60s cleanup) with in-memory fallback, wired into both paths. Polling launcher now skips startup when webhook is configured to prevent dual-consumption at the source. 38 tests across webhook, menu keyboards, telegram_api, dedup guard, and trend bot service. Co-Authored-By: Claude Opus 4.7 (1M context) --- routes/openclaw_bot_routes.py | 919 +++++++++++++++++----- run_telegram_bot.py | 6 + scripts/tools/run_telegram_bot.py | 6 + services/openclaw_bot/menu_keyboards.py | 273 ++++--- services/openclaw_bot/telegram_api.py | 39 +- services/telegram_bot_service.py | 317 +++++++- services/telegram_update_guard.py | 191 +++++ tests/test_openclaw_bot_menu_keyboards.py | 88 +++ tests/test_openclaw_bot_routes_webhook.py | 548 +++++++++++++ tests/test_openclaw_bot_telegram_api.py | 13 + tests/test_telegram_update_guard.py | 26 + tests/test_trend_telegram_bot_service.py | 172 ++++ 12 files changed, 2276 insertions(+), 322 deletions(-) create mode 100644 services/telegram_update_guard.py create mode 100644 tests/test_openclaw_bot_routes_webhook.py create mode 100644 tests/test_telegram_update_guard.py create mode 100644 tests/test_trend_telegram_bot_service.py diff --git a/routes/openclaw_bot_routes.py b/routes/openclaw_bot_routes.py index 826bab7..6b58598 100644 --- a/routes/openclaw_bot_routes.py +++ b/routes/openclaw_bot_routes.py @@ -21,15 +21,16 @@ v5 新增(2026-04-16): """ import os +import json import re import threading import requests from datetime import datetime, timezone, timedelta from flask import Blueprint, request, jsonify -from sqlalchemy import text - +from sqlalchemy import text, or_ from database.manager import DatabaseManager from services.logger_manager import SystemLogger +from services.telegram_update_guard import is_duplicate_update as is_global_duplicate_update from services.mcp_context_service import ( build_mcp_context, MCPRouter, query_mcp, get_tw_media_news, get_ecommerce_news, get_taiwan_trends, @@ -39,6 +40,7 @@ from services.mcp_context_service import ( from services.openclaw_bot.telegram_api import ( _tg, answer_callback, + edit_message_text, send_document, send_message, send_photo, @@ -47,11 +49,14 @@ from services.openclaw_bot.telegram_api import ( from services.openclaw_bot.menu_keyboards import ( _BACK, _SUBMENUS, + _chunk_rows, _submenu_goals, _submenu_market, _submenu_sales, _submenu_trend, + _row, configure_menu_keyboards, + quick_menu_keyboard, main_menu_keyboard, ) try: @@ -82,6 +87,7 @@ except ImportError: GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', '') GEMINI_BASE_URL = 'https://generativelanguage.googleapis.com/v1beta/models' GEMINI_MODEL = 'gemini-2.0-flash' +PPT_CACHE_TTL_HOURS = max(1, int(os.getenv('OPENCLAW_PPT_CACHE_TTL_HOURS', '24'))) TAIPEI_TZ = timezone(timedelta(hours=8)) sys_log = SystemLogger("OpenClawBot").get_logger() @@ -89,27 +95,29 @@ sys_log = SystemLogger("OpenClawBot").get_logger() openclaw_bot_bp = Blueprint('openclaw_bot', __name__) # ── Telegram retry 去重 (update_id 快取,最多保留 500 筆) ───── -_seen_update_ids: set = set() -_SEEN_MAX = 500 - -BOT_TOKEN = os.getenv('OPENCLAW_BOT_TOKEN', '') +BOT_TOKEN = os.getenv('OPENCLAW_BOT_TOKEN') or os.getenv('TELEGRAM_BOT_TOKEN', '') BOT_API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}" ALLOWED_GROUP = int(os.getenv('OPENCLAW_GROUP_ID', '-1003940688311')) MOMO_BASE_URL = os.getenv('MOMO_BASE_URL', 'https://mo.wooo.work') NVIDIA_API_KEY = os.getenv('NVIDIA_API_KEY', '') NVIDIA_BASE_URL = 'https://integrate.api.nvidia.com/v1' CHAT_MODEL = 'deepseek-ai/deepseek-v3.2' -BOT_USERNAME = '@OpenClawAwoooI_Bot' +BOT_USERNAME = os.getenv('OPENCLAW_BOT_USERNAME') or os.getenv('TELEGRAM_BOT_USERNAME', '@OpenClawAwoooI_Bot') # ── 存取控制白名單 ───────────────────────────────────────────── # OPENCLAW_ALLOWED_USERS:逗號分隔的 Telegram user_id(整數) -# 空字串 = 只允許 ALLOWED_GROUP 群組,禁止所有私訊 +# 空字串 + OPENCLAW_ALLOW_PRIVATE_WITHOUT_WHITELIST=1 => 允許任何私訊(舊行為) +# OPENCLAW_ALLOW_PRIVATE_WITHOUT_WHITELIST 未開啟時,空白名單會拒絕私訊 # 例:'123456789,987654321' _allowed_users_raw = os.getenv('OPENCLAW_ALLOWED_USERS', '') ALLOWED_USERS: set = ( {int(uid.strip()) for uid in _allowed_users_raw.split(',') if uid.strip().isdigit()} if _allowed_users_raw.strip() else set() ) +_ALLOW_PRIVATE_WITHOUT_WHITELIST = ( + os.getenv('OPENCLAW_ALLOW_PRIVATE_WITHOUT_WHITELIST', '1').strip().lower() + in {'1', 'true', 'yes', 'on'} +) # ── fail-closed 統一授權檢查 ─────────────────────────────────── # 規則(任一滿足即通過,否則一律拒絕): @@ -127,7 +135,11 @@ def _is_authorized(chat_type: str, chat_id, user_id) -> bool: if chat_type in ('group', 'supergroup'): return cid == ALLOWED_GROUP if chat_type == 'private': - return uid is not None and uid in ALLOWED_USERS + if uid is None: + return False + if ALLOWED_USERS: + return uid in ALLOWED_USERS + return _ALLOW_PRIVATE_WITHOUT_WHITELIST return False # ── 速率限制(每用戶每分鐘最多 30 次 AI 呼叫)────────────────── @@ -135,6 +147,8 @@ import time as _time_mod _rate_tracker: dict = {} # {user_id: [timestamp, ...]} _RATE_LIMIT_PER_MIN = 30 # 每分鐘上限 _RATE_WINDOW_SEC = 60 +# V-Fix:callback 期間覆寫 send_message 會跨 request 競態,全域鎖可避免重入干擾。 +_CALLBACK_SEND_LOCK = threading.Lock() def _check_rate_limit(user_id: int) -> bool: """回傳 True = 允許,False = 超過速率限制""" @@ -147,6 +161,96 @@ def _check_rate_limit(user_id: int) -> bool: _rate_tracker[user_id].append(now) return True + +def _is_duplicate_update(update_id) -> bool: + """Telegram webhook 可能重送同一 update_id;優先以 DB 導向去重,再回退本機快取。""" + return is_global_duplicate_update(update_id, namespace="telegram_update") + + +def _is_non_modified_error(response) -> bool: + """Telegram API 回覆若是 `message is not modified` 則視為可忽略。""" + if not isinstance(response, dict): + return False + desc = (response.get("description") or "").lower() + return "message is not modified" in desc + + +def _is_non_editable_message_error(response) -> bool: + """V-Fix:若訊息已刪除、不可編輯或過舊,僅回報不再 fallback。""" + if not isinstance(response, dict): + return False + desc = (response.get("description") or "").lower() + markers = ( + "message to edit not found", + "message can't be edited", + "message_id_invalid", + "message id is invalid", + "message identifier is invalid", + "message is too old to edit", + "not found", + "reply message not found", + "message to delete not found", + ) + return any(m in desc for m in markers) + + +def _build_callback_dedupe_key(update_id, cq_id, message_id=None, data=None, chat_id=None, user_id=None) -> str: + """V-Fix:多維度組成 callback key,降低重複回報機率。""" + key_parts = [] + if cq_id: + key_parts.append(f"cbq:{cq_id}") + elif update_id is not None: + key_parts.append(f"uid:{update_id}") + if chat_id is not None: + key_parts.append(f"chat:{chat_id}") + if user_id is not None: + key_parts.append(f"user:{user_id}") + if message_id is not None: + key_parts.append(f"msg:{message_id}") + if data: + key_parts.append(f"data:{data}") + if key_parts: + return "cb:" + "|".join(key_parts) + base = f"cb-query:{cq_id}" + if message_id is not None: + base += f":msg:{message_id}" + if data: + base += f":{data}" + return base + + +def _should_fallback_send_message(edit_result) -> bool: + """V-Fix:判斷 editMessageText 失敗是否要改走 sendMessage。""" + if not isinstance(edit_result, dict): + return True + if edit_result.get("ok"): + return False + if _is_non_modified_error(edit_result): + return False + if _is_non_editable_message_error(edit_result): + return False + return True + + +def _normalize_callback_data(data: str) -> str: + """V-Fix:兼容舊版 callback_data(例如 menu_main、menu_trend、await_date...)。""" + if not data: + return data + data = data.strip() + if data.startswith(("menu:", "cmd:", "await:")): + return data + if data.startswith("menu_"): + key = data[5:] + if key in _SUBMENUS: + return f"menu:{key}" + elif data.startswith("await_"): + key = data[6:] + if key in _AWAIT_PROMPTS: + return f"await:{key}" + elif data.startswith("cmd_"): + return f"cmd:{data[4:]}" + return data + # 群組內回應觸發(包含這些字才回應;若為空則全部回應) # 設為空 list = 所有訊息都回應 TRIGGER_KEYWORDS = [] # 空 = 全部回應(小龍蝦是專用業務群組) @@ -1898,6 +2002,190 @@ def _ppt_ai_analysis(prompt_data: str, report_type: str = '') -> str: return '(AI 分析暫時無法使用,請稍後重試)' +def _ppt_needs_fallback(ai_text: str) -> bool: + if not ai_text: + return True + weak_markers = ('AI 分析暫', '暫無 AI 分析', '生成中', '請稍後重試') + return any(marker in ai_text for marker in weak_markers) or len(ai_text.strip()) < 40 + + +def _ppt_fallback_insight(report_type: str, data_summary: str, mcp_text: str = '') -> str: + """模型額度或外部 API 失敗時,仍用既有 DB 摘要補足簡報內容。""" + lines = [ + f"【{report_type}重點摘要】", + data_summary[:1200], + ] + if mcp_text: + lines.extend(["", "【外部情報補充】", mcp_text[:600]]) + lines.extend([ + "", + "【建議動作】", + "1. 先檢查高業績商品與高毛利商品是否重疊,優先補強可放大的品項。", + "2. 針對低毛利或異常波動商品,回看價格、促銷與庫存狀態。", + "3. 若本頁顯示資料不足,請先確認該日期/月份業績資料是否已完成匯入。", + ]) + return "\n".join(lines) + + +def _normalize_ppt_parameters(parameters: dict) -> str: + """將快取參數轉成穩定字串,避免 key 順序造成命中錯誤。""" + try: + return json.dumps(parameters, ensure_ascii=False, sort_keys=True, separators=(',', ':')) + except Exception: + return json.dumps({}, ensure_ascii=False) + + +def _load_cached_ppt_entry(report_type: str, parameters: dict): + """回傳尚未過期的快取紀錄與解析後 payload,若無則回傳 (None, {}).""" + from database.manager import DatabaseManager + from database.ppt_reports import PPTReport + + now = datetime.now(TAIPEI_TZ).replace(tzinfo=None) + params = _normalize_ppt_parameters(parameters) + session = DatabaseManager().get_session() + try: + cached = ( + session.query(PPTReport) + .filter( + PPTReport.report_type == report_type, + PPTReport.parameters == params, + or_(PPTReport.expires_at.is_(None), PPTReport.expires_at > now), + ) + .order_by(PPTReport.generated_at.desc()) + .first() + ) + if not cached: + return None, {} + + cached_payload = {} + if cached.cached_data: + try: + cached_payload = json.loads(cached.cached_data) + if not isinstance(cached_payload, dict): + cached_payload = {} + except Exception as e: + sys_log.warning(f"[PPT] cached_data 解析失敗: {e}") + cached_payload = {} + return cached, cached_payload + except Exception as e: + session.rollback() + sys_log.warning(f"[PPT] 讀取快取失敗:{e}") + finally: + session.close() + return None, {} + + +def _load_cached_ppt_path(report_type: str, parameters: dict) -> str | None: + """嘗試回傳尚未過期且仍存在的快取檔案。""" + path, _ = _load_cached_ppt_path_and_analysis(report_type, parameters) + if path: + return path + return None + + +def _load_cached_ppt_analysis(report_type: str, parameters: dict) -> str | None: + """回傳快取中的 AI 分析文字(可直接寫入簡報)。""" + _, cached_analysis = _load_cached_ppt_path_and_analysis(report_type, parameters) + if not cached_analysis or not isinstance(cached_analysis, str): + return None + return cached_analysis.strip() if cached_analysis else None + + +def _load_cached_ppt_path_and_analysis(report_type: str, parameters: dict): + """回傳快取檔案路徑與 AI 分析文字。""" + cached, payload = _load_cached_ppt_entry(report_type, parameters) + cached_path = ( + cached.file_path if cached and cached.file_path and os.path.exists(cached.file_path) else None + ) + cached_analysis = payload.get('analysis') if isinstance(payload, dict) else None + return cached_path, cached_analysis + + +def _store_ppt_cache(report_type: str, parameters: dict, file_path: str, cached_payload: dict) -> str | None: + """儲存 PPT 快取資料,回傳 file_path。""" + from database.manager import DatabaseManager + from database.ppt_reports import PPTReport + + now = datetime.now(TAIPEI_TZ) + params = _normalize_ppt_parameters(parameters) + expire_at = now + timedelta(hours=PPT_CACHE_TTL_HOURS) + + try: + cached_data = json.dumps(cached_payload, ensure_ascii=False) + except Exception: + cached_data = json.dumps({'error': 'cached_data serialization failed'}, ensure_ascii=False) + + try: + file_size = os.path.getsize(file_path) + except OSError: + file_size = None + + session = DatabaseManager().get_session() + try: + cached = ( + session.query(PPTReport) + .filter(PPTReport.report_type == report_type, PPTReport.parameters == params) + .order_by(PPTReport.generated_at.desc()) + .first() + ) + if cached: + cached.file_path = file_path + cached.file_size = file_size + cached.generated_at = now.replace(tzinfo=None) + cached.expires_at = expire_at.replace(tzinfo=None) + cached.cached_data = cached_data + else: + session.add(PPTReport( + report_type=report_type, + parameters=params, + file_path=file_path, + file_size=file_size, + generated_at=now.replace(tzinfo=None), + expires_at=expire_at.replace(tzinfo=None), + cached_data=cached_data, + )) + session.commit() + return file_path + except Exception as e: + session.rollback() + sys_log.warning(f"[PPT] 快取寫入失敗:{e}") + return None + finally: + session.close() + + +def _is_cached_ppt_file(file_path: str) -> bool: + """判斷傳入檔案是否已被快取,避免傳送後刪除。""" + if not file_path: + return False + from database.manager import DatabaseManager + from database.ppt_reports import PPTReport + + now = datetime.now(TAIPEI_TZ) + now = now.replace(tzinfo=None) + session = DatabaseManager().get_session() + try: + cached = ( + session.query(PPTReport) + .filter(PPTReport.file_path == file_path) + .filter(or_(PPTReport.expires_at.is_(None), PPTReport.expires_at > now)) + .first() + ) + return cached is not None + except Exception: + return False + finally: + session.close() + + +def _fetch_mcp_context() -> str: + """MCP 失敗時回空字串,避免阻塞。""" + try: + return build_mcp_context('', []) + except Exception: + return '' + + def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) -> str: """依 sub_type 生成對應 pptx,回傳檔案路徑""" try: @@ -1915,19 +2203,20 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - now = datetime.now(TAIPEI_TZ) - # ── MCP 外部情報(所有報告共用)────────────────────────── - try: - mcp_text = build_mcp_context('', []) - except Exception: - mcp_text = '' - if sub_type in ('daily', '日報'): - # sub_arg 若有效日期格式則用,否則取最新有資料的日期 if sub_arg and re.fullmatch(r'\d{4}[/-]\d{1,2}[/-]\d{1,2}', sub_arg): date_str = normalize_date(sub_arg) else: date_str = latest_date() or now.strftime('%Y/%m/%d') + params = {'report_type': 'daily', 'date': date_str} + cached, cached_ai = _load_cached_ppt_path_and_analysis('daily', params) + if cached: + return cached + + mcp_text = '' + if not cached_ai: + mcp_text = _fetch_mcp_context() sales = query_sales(date_str) top_products = query_top_products(date_str, 10) top_vendors = query_top_vendors(date_str, 5) @@ -1941,14 +2230,32 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - f"{p['name']}(NT${p['revenue']:,.0f})" for p in top_products[:5]) + "\n" f"外部情報:{mcp_text[:500]}" ) - ai_text = _ppt_ai_analysis(data_summary, '日報') + ai_text = cached_ai or _ppt_ai_analysis(data_summary, '日報') + if not cached_ai and _ppt_needs_fallback(ai_text): + ai_text = _ppt_fallback_insight('日報', data_summary, mcp_text) db_data = { 'sales': sales, 'top_products': top_products, 'top_vendors': top_vendors, 'weekly': weekly, 'mcp': mcp_text, } - return generate_daily_ppt(date_str, db_data, ai_text) + ppt_path = generate_daily_ppt(date_str, db_data, ai_text) + _store_ppt_cache('daily', params, ppt_path, { + 'report_type': 'daily', + 'parameters': params, + 'data_summary': data_summary, + 'analysis': ai_text, + 'mcp': mcp_text, + }) + return ppt_path elif sub_type in ('weekly', '週報'): + params = {'report_type': 'weekly'} + cached, cached_ai = _load_cached_ppt_path_and_analysis('weekly', params) + if cached: + return cached + + mcp_text = '' + if not cached_ai: + mcp_text = _fetch_mcp_context() weekly = query_weekly_trend() top_products = query_top_products(target, 10) top_vendors = query_top_vendors(target, 10) @@ -1961,25 +2268,40 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - f"熱銷:" + " / ".join(f"{p['name']}(NT${p['revenue']:,.0f})" for p in top_products[:5]) + "\n" f"外部情報:{mcp_text[:500]}" ) - ai_text = _ppt_ai_analysis(data_summary, '週報') + ai_text = cached_ai or _ppt_ai_analysis(data_summary, '週報') + if not cached_ai and _ppt_needs_fallback(ai_text): + ai_text = _ppt_fallback_insight('週報', data_summary, mcp_text) db_data = { 'weekly': weekly, 'top_products': top_products, 'top_vendors': top_vendors, 'strategy': strat, 'mcp': mcp_text, } - return generate_weekly_ppt(db_data, ai_text) + ppt_path = generate_weekly_ppt(db_data, ai_text) + _store_ppt_cache('weekly', params, ppt_path, { + 'report_type': 'weekly', + 'parameters': params, + 'data_summary': data_summary, + 'analysis': ai_text, + 'mcp': mcp_text, + }) + return ppt_path elif sub_type in ('monthly', '月報'): - # sub_arg 格式 YYYY/MM 或 YYYY-MM if sub_arg: parts = sub_arg.replace('-', '/').split('/') yr, mo = int(parts[0]), int(parts[1]) if len(parts) >= 2 else now.month else: yr, mo = now.year, now.month + params = {'report_type': 'monthly', 'month': f'{yr}/{mo:02d}'} + cached, cached_ai = _load_cached_ppt_path_and_analysis('monthly', params) + if cached: + return cached + + mcp_text = '' + if not cached_ai: + mcp_text = _fetch_mcp_context() ms = query_monthly_summary(yr, mo) - # 月報分類業績(用月份 LIKE 查詢) top_cats = query_category_monthly(yr, mo, lim=8) - # 把分類資料注入 ms,供 PPT P4 使用 ms['top_categories'] = top_cats data_summary = ( f"月份:{yr}/{mo:02d}\n" @@ -1991,17 +2313,23 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - f"分類:" + " / ".join(f"{c['cat']}(NT${c['revenue']:,.0f})" for c in top_cats[:3]) + "\n" f"外部情報:{mcp_text[:500]}" ) - ai_text = _ppt_ai_analysis(data_summary, '月報') + ai_text = cached_ai or _ppt_ai_analysis(data_summary, '月報') + if not cached_ai and _ppt_needs_fallback(ai_text): + ai_text = _ppt_fallback_insight('月報', data_summary, mcp_text) db_data = {'monthly': ms, 'mcp': mcp_text} - return generate_monthly_ppt(yr, mo, db_data, ai_text) + ppt_path = generate_monthly_ppt(yr, mo, db_data, ai_text) + _store_ppt_cache('monthly', params, ppt_path, { + 'report_type': 'monthly', + 'parameters': params, + 'data_summary': data_summary, + 'analysis': ai_text, + 'mcp': mcp_text, + }) + return ppt_path elif sub_type in ('strategy', '策略'): - # 支援: strategy / strategy 2026/04/10 / strategy weekly / - # strategy monthly [2026/03] / strategy quarterly / - # strategy half / strategy yearly period_label = '日報' if sub_arg and re.fullmatch(r'\d{4}[/-]\d{1,2}[/-]\d{1,2}', sub_arg): - # 指定單日 date_str = normalize_date(sub_arg) start_str, end_str = date_str, date_str period_label = f'{date_str} 日策略' @@ -2043,26 +2371,32 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - date_str = f'{start_str}~{end_str}' period_label = '年度策略(近365日)' else: - # 預設:最新單日 date_str = latest_date() or now.strftime('%Y/%m/%d') start_str, end_str = date_str, date_str period_label = f'{date_str} 日策略' - # 查詢資料 - if start_str == end_str: - sales = query_sales(date_str) - top_products = query_top_products(date_str, 15) - strat = analyze_product_strategy(date_str, 20) - else: - rng = query_date_range(start_str, end_str) - sales = {'revenue': rng.get('revenue', 0), - 'orders': rng.get('orders', 0), - 'gross_margin': rng.get('gross_margin', 0), - 'avg_order': rng.get('avg_order', 0)} - top_products = query_top_products_range(start_str, end_str, 15) - strat = _analyze_strategy_range(start_str, end_str, top_products) + params = {'report_type': 'strategy', 'start': start_str, 'end': end_str, 'label': period_label} + cached, cached_ai = _load_cached_ppt_path_and_analysis('strategy', params) + if cached: + return cached + + mcp_text = '' + if not cached_ai: + mcp_text = _fetch_mcp_context() + + if start_str == end_str: + sales = query_sales(date_str) + top_products = query_top_products(date_str, 15) + strat = analyze_product_strategy(date_str, 20) + else: + rng = query_date_range(start_str, end_str) + sales = {'revenue': rng.get('revenue', 0), + 'orders': rng.get('orders', 0), + 'gross_margin': rng.get('gross_margin', 0), + 'avg_order': rng.get('avg_order', 0)} + top_products = query_top_products_range(start_str, end_str, 15) + strat = _analyze_strategy_range(start_str, end_str, top_products) - # 策略分佈統計 from collections import Counter strat_cnt = Counter(s['strategy'] for s in strat) strat_detail = "\n".join( @@ -2071,7 +2405,6 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - for s in strat if s['strategy'] == k)[:3] for k, v in strat_cnt.most_common() ) - data_summary = ( f"分析週期:{period_label}\n" f"業績:NT$ {float(sales.get('revenue', 0)):,.0f} | " @@ -2083,72 +2416,93 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - f"{p['name']}(NT${p['revenue']:,.0f})" for p in top_products[:5]) + "\n\n" f"外部市場信號:{mcp_text[:600]}" ) - ai_text = _ppt_ai_analysis(data_summary, f'策略簡報({period_label})') + ai_text = cached_ai or _ppt_ai_analysis(data_summary, f'策略簡報({period_label})') + if not cached_ai and _ppt_needs_fallback(ai_text): + ai_text = _ppt_fallback_insight(f'策略簡報({period_label})', data_summary, mcp_text) db_data = { 'sales': sales, 'top_products': top_products, 'strategy': strat, 'mcp': mcp_text, 'period_label': period_label, } - return generate_strategy_ppt(date_str, db_data, ai_text) + ppt_path = generate_strategy_ppt(date_str, db_data, ai_text) + _store_ppt_cache('strategy', params, ppt_path, { + 'report_type': 'strategy', + 'parameters': params, + 'data_summary': data_summary, + 'analysis': ai_text, + 'mcp': mcp_text, + }) + return ppt_path elif sub_type in ('competitor', '競品', 'compare'): if not _PCHOME_AVAILABLE: raise RuntimeError("PChome 比價模組不可用") - # 決定日期範圍(與 strategy 相同邏輯) if sub_arg in ('weekly', 'week', '週'): - end_d = datetime.strptime( + end_d = datetime.strptime( (latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d') start_d = end_d - timedelta(days=6) period_label = '週比較(近7日)' + date_str_for_query = start_d.strftime('%Y/%m/%d') elif sub_arg in ('monthly', 'month', '月'): - end_d = datetime.strptime( + end_d = datetime.strptime( (latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d') start_d = end_d.replace(day=1) period_label = f'{end_d.year}/{end_d.month:02d} 月比較' + date_str_for_query = start_d.strftime('%Y/%m/%d') elif sub_arg in ('quarterly', 'quarter', 'q', '季'): - end_d = datetime.strptime( + end_d = datetime.strptime( (latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d') start_d = end_d - timedelta(days=89) period_label = '季比較(近90日)' + date_str_for_query = start_d.strftime('%Y/%m/%d') elif sub_arg in ('half', '半年'): - end_d = datetime.strptime( + end_d = datetime.strptime( (latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d') start_d = end_d - timedelta(days=179) period_label = '半年比較(近180日)' + date_str_for_query = start_d.strftime('%Y/%m/%d') elif sub_arg in ('yearly', 'year', '年'): - end_d = datetime.strptime( + end_d = datetime.strptime( (latest_date() or now.strftime('%Y/%m/%d')).replace('/', '-'), '%Y-%m-%d') start_d = end_d - timedelta(days=364) period_label = '年度比較(近365日)' + date_str_for_query = start_d.strftime('%Y/%m/%d') elif sub_arg and re.match(r'\d{4}[/-]\d{1,2}[/-]\d{1,2}', sub_arg): - # 指定日期(今日/昨日/自訂) - d_str = normalize_date(sub_arg) - start_d = end_d = datetime.strptime(d_str.replace('/', '-'), '%Y-%m-%d') + d_str = normalize_date(sub_arg) + start_d = end_d = datetime.strptime(d_str.replace('/', '-'), '%Y-%m-%d') period_label = f'{d_str} 日比較' + date_str_for_query = d_str else: - # 預設:昨日日報 - yd = (now - timedelta(days=1)).strftime('%Y/%m/%d') - start_d = end_d = datetime.strptime(yd.replace('/', '-'), '%Y-%m-%d') + yd = (now - timedelta(days=1)).strftime('%Y/%m/%d') + start_d = end_d = datetime.strptime(yd.replace('/', '-'), '%Y-%m-%d') period_label = f'{yd} 日比較' + date_str_for_query = yd - date_str_for_query = start_d.strftime('%Y/%m/%d') + params = { + 'report_type': 'competitor', + 'start': start_d.strftime('%Y/%m/%d'), + 'end': end_d.strftime('%Y/%m/%d'), + 'label': period_label, + } + cached, cached_ai = _load_cached_ppt_path_and_analysis('competitor', params) + if cached: + return cached - # 爬取比對資料(批量掃描 TOP30) - results = pchome_batch(_db(), top_n=30, - date_str=date_str_for_query) + mcp_text_c = '' + if not cached_ai: + mcp_text_c = _fetch_mcp_context() + + results = pchome_batch(_db(), top_n=30, date_str=date_str_for_query) if results: pchome_save(_db(), results) - # MCP 外部情報 - mcp_text_c = mcp_text # 已在上方取得 - - # AI 分析 - found_c = [r for r in results if r.get('found')] + found_c = [r for r in results if r.get('found')] pc_wins_c = [r for r in found_c if r.get('price_diff', 0) > 10] mo_wins_c = [r for r in found_c if r.get('price_diff', 0) < -10] avg_diff_c = (sum(r.get('price_diff_pct', 0) for r in found_c) / len(found_c) if found_c else 0) + data_summary = ( f"【我方=PChome,競品=momo】\n" f"分析週期:{period_label}\n" @@ -2163,23 +2517,41 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - for r in mo_wins_c[:3]) + "\n\n" f"外部情報:{mcp_text_c[:400]}" ) - ai_text = _ppt_ai_analysis(data_summary, f'競品比較簡報({period_label})') + ai_text = cached_ai or _ppt_ai_analysis(data_summary, f'競品比較簡報({period_label})') + if not cached_ai and _ppt_needs_fallback(ai_text): + ai_text = _ppt_fallback_insight(f'競品比較簡報({period_label})', data_summary, mcp_text_c) db_data = { 'results': results, 'period_label': period_label, 'mcp': mcp_text_c, } - return generate_competitor_ppt(period_label, db_data, ai_text) + ppt_path = generate_competitor_ppt(period_label, db_data, ai_text) + _store_ppt_cache('competitor', params, ppt_path, { + 'report_type': 'competitor', + 'parameters': params, + 'data_summary': data_summary, + 'analysis': ai_text, + 'mcp': mcp_text_c, + }) + return ppt_path elif sub_type in ('promo', '促銷'): - # sub_arg = "2026/04/01-2026/04/07" m_p = re.findall(r'\d{4}[/\-]\d{1,2}[/\-]\d{1,2}', sub_arg) if len(m_p) < 2: raise ValueError(f"促銷簡報需要日期範圍,例如:promo 2026/04/01-2026/04/07") start_s_p = normalize_date(m_p[0]) - end_s_p = normalize_date(m_p[1]) + end_s_p = normalize_date(m_p[1]) promo_label_p = f'{start_s_p}~{end_s_p}' + + params = {'report_type': 'promo', 'start': start_s_p, 'end': end_s_p, 'label': promo_label_p} + cached, cached_ai = _load_cached_ppt_path_and_analysis('promo', params) + if cached: + return cached + + mcp_text = '' + if not cached_ai: + mcp_text = _fetch_mcp_context() data_p = query_promo_comparison(start_s_p, end_s_p) if not data_p: raise ValueError("查無促銷期間資料") @@ -2200,11 +2572,23 @@ def _generate_ppt_cmd(sub_type: str, sub_arg: str, _chat_id: int, target: str) - f"活動期熱銷 TOP3:{tops_str or '(無資料)'}\n" f"外部市場情報:{mcp_text[:300]}" ) - ai_text_p = _ppt_ai_analysis(data_summary_p, f'促銷效益分析({promo_label_p})') - return generate_promo_ppt(promo_label_p, data_p, ai_text_p) + ai_text_p = cached_ai or _ppt_ai_analysis(data_summary_p, f'促銷效益分析({promo_label_p})') + if not cached_ai and _ppt_needs_fallback(ai_text_p): + ai_text_p = _ppt_fallback_insight(f'促銷效益分析({promo_label_p})', data_summary_p, mcp_text) + ppt_path = generate_promo_ppt(promo_label_p, data_p, ai_text_p) + _store_ppt_cache('promo', params, ppt_path, { + 'report_type': 'promo', + 'parameters': params, + 'data_summary': data_summary_p, + 'analysis': ai_text_p, + 'mcp': mcp_text, + }) + return ppt_path else: - raise ValueError(f"不支援的簡報類型:{sub_type}(支援:daily / weekly / monthly / strategy / competitor / promo)") + raise RuntimeError( + f'不支援的簡報類型:{sub_type}(支援:daily / weekly / monthly / strategy / competitor / promo)' + ) # ── Telegram Excel 匯入 ────────────────────────────────────────── @@ -2382,8 +2766,7 @@ def _handle_excel_import(doc: dict, chat_id: int, reply_to: int): 'filename': filename, } kb = [ - [{'text': '✅ 確認匯入資料庫', 'callback_data': 'cmd:import_confirm'}, - {'text': '❌ 取消', 'callback_data': 'cmd:import_cancel'}], + _row(('✅ 確認匯入資料庫', 'cmd:import_confirm'), ('❌ 取消', 'cmd:import_cancel')), ] send_message(chat_id, report_text, reply_to, kb) else: @@ -2482,10 +2865,10 @@ def send_morning_report(): # P10 — 導引按鈕:產出日報PPT + 查看業績數據 morning_kb = [ - [{'text': f'📊 產出 {yd} 日報PPT', 'callback_data': f'cmd:ppt:daily {yd}'}, - {'text': '📈 業績數據', 'callback_data': f'cmd:sales:{yd}'}], - [{'text': '🏆 完整熱銷排行', 'callback_data': f'cmd:top:{yd}'}, - {'text': '📋 下載 Excel', 'callback_data': f'cmd:report:{yd}'}], + _row((f'📊 產出 {yd} 日報PPT', f'cmd:ppt:daily {yd}'), + ('📈 業績數據', f'cmd:sales:{yd}')), + _row(('🏆 完整熱銷排行', f'cmd:top:{yd}'), + ('📋 下載 Excel', f'cmd:report:{yd}')), ] send_message(ALLOWED_GROUP, "\n".join(lines), keyboard=morning_kb, parse_mode='Markdown') sys_log.info("[OpenClawBot] 早報已發送") @@ -2577,10 +2960,10 @@ def send_evening_report(): # P10 — 晚報導引按鈕 evening_kb = [ - [{'text': f'📊 產出 {td} 日報PPT', 'callback_data': f'cmd:ppt:daily {td}'}, - {'text': '📈 完整業績數據', 'callback_data': f'cmd:sales:{td}'}], - [{'text': '📋 下載 Excel 報表', 'callback_data': f'cmd:report:{td}'}, - {'text': '🧬 策略矩陣分析', 'callback_data': f'cmd:strategy:{td}'}], + _row((f'📊 產出 {td} 日報PPT', f'cmd:ppt:daily {td}'), + ('📈 完整業績數據', f'cmd:sales:{td}')), + _row(('📋 下載 Excel 報表', f'cmd:report:{td}'), + ('🧬 策略矩陣分析', f'cmd:strategy:{td}')), ] send_message(ALLOWED_GROUP, "\n".join(lines), keyboard=evening_kb, parse_mode='Markdown') sys_log.info("[OpenClawBot] 晚報已發送") @@ -2723,10 +3106,10 @@ def check_anomalies(): ) msg = header + "\n\n".join(alerts) ack_kb = [ - [{'text': '✅ 已知悉', 'callback_data': 'cmd:ack:anomaly'}, - {'text': '🔄 追蹤中', 'callback_data': 'cmd:ack:tracking'}], - [{'text': '📊 查看今日業績', 'callback_data': f'cmd:sales:{td}'}, - {'text': '🏆 熱銷商品', 'callback_data': f'cmd:top:{td}'}], + _row(('✅ 已知悉', 'cmd:ack:anomaly'), + ('🔄 追蹤中', 'cmd:ack:tracking')), + _row(('📊 查看今日業績', f'cmd:sales:{td}'), + ('🏆 熱銷商品', f'cmd:top:{td}')), ] send_message(ALLOWED_GROUP, msg, keyboard=ack_kb, parse_mode='Markdown') sys_log.info(f"[OpenClawBot] 異常告警 {len(alerts)} 筆") @@ -2746,8 +3129,8 @@ def send_competitor_report(): results = pchome_batch(_db(), top_n=30, date_str=yesterday) pchome_save(_db(), results) msg = pchome_fmt_report(results, yesterday) - kb = [[{'text': '🔍 搜尋比價', 'callback_data': 'await:search_compare'}, - {'text': '📄 比價簡報', 'callback_data': 'menu:competitor_ppt'}]] + kb = [_row(('🔍 搜尋比價', 'await:search_compare'), + ('📄 比價簡報', 'menu:competitor_ppt'))] send_message(ALLOWED_GROUP, msg, None, kb) sys_log.info(f'[PChome] 競品日報已推播 {len(results)} 件商品') @@ -2803,16 +3186,50 @@ def start_scheduler(): """啟動排程(Flask app 啟動後呼叫)""" global _scheduler try: + if _scheduler is not None and _scheduler.running: + sys_log.info("[OpenClawBot] Scheduler 已在執行中,跳過重複啟動") + return + from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger _scheduler = BackgroundScheduler(timezone='Asia/Taipei') - _scheduler.add_job(send_morning_report, CronTrigger(hour=8, minute=30)) - _scheduler.add_job(send_competitor_report, CronTrigger(hour=8, minute=0)) - _scheduler.add_job(send_daily_excel, CronTrigger(hour=8, minute=45)) - _scheduler.add_job(send_evening_report, CronTrigger(hour=21, minute=0)) - _scheduler.add_job(send_weekly_report, CronTrigger(day_of_week='mon', hour=9, minute=0)) - _scheduler.add_job(check_anomalies, CronTrigger(hour='9,12,15,18', minute=0)) + _scheduler.add_job( + send_morning_report, + CronTrigger(hour=8, minute=30), + id="openclaw_send_morning_report", + replace_existing=True, + ) + _scheduler.add_job( + send_competitor_report, + CronTrigger(hour=8, minute=0), + id="openclaw_send_competitor_report", + replace_existing=True, + ) + _scheduler.add_job( + send_daily_excel, + CronTrigger(hour=8, minute=45), + id="openclaw_send_daily_excel", + replace_existing=True, + ) + _scheduler.add_job( + send_evening_report, + CronTrigger(hour=21, minute=0), + id="openclaw_send_evening_report", + replace_existing=True, + ) + _scheduler.add_job( + send_weekly_report, + CronTrigger(day_of_week='mon', hour=9, minute=0), + id="openclaw_send_weekly_report", + replace_existing=True, + ) + _scheduler.add_job( + check_anomalies, + CronTrigger(hour='9,12,15,18', minute=0), + id="openclaw_check_anomalies", + replace_existing=True, + ) _scheduler.start() sys_log.info("[OpenClawBot] Scheduler started ✓ (competitor/morning/excel/evening/weekly/anomaly)") except ImportError: @@ -2880,9 +3297,8 @@ def sales_quick_kb(date_str): d = dt.strptime(date_str.replace('/', '-'), '%Y-%m-%d').date() yesterday = (d - timedelta(days=1)).strftime('%Y/%m/%d') return [ - [{'text': '⬅️ 昨日業績', 'callback_data': f'cmd:sales:{yesterday}'}, - {'text': '🏆 熱銷商品', 'callback_data': f'cmd:top:{date_str}'}], - [{'text': '📋 完整報表', 'callback_data': f'cmd:report:{date_str}'}], + _row(('⬅️ 昨日業績', f'cmd:sales:{yesterday}'), ('🏆 熱銷商品', f'cmd:top:{date_str}')), + _row(('📋 完整報表', f'cmd:report:{date_str}')), ] except Exception: return None @@ -2965,28 +3381,32 @@ def query_weekly_trend(): try: with _db().connect() as c: rows = c.execute(text(""" - SELECT "日期", COALESCE(SUM(CAST("總業績" AS FLOAT)),0) + SELECT "日期", + COALESCE(SUM(CAST("總業績" AS FLOAT)),0), + COUNT(DISTINCT "訂單編號") FROM realtime_sales_monthly WHERE CAST("日期" AS DATE) >= CURRENT_DATE - INTERVAL '7 days' GROUP BY "日期" ORDER BY "日期" DESC LIMIT 7 """)).fetchall() - return [{'date': str(r[0]), 'revenue': r[1]} for r in rows] + return [{'date': str(r[0]), 'revenue': r[1], 'orders': int(r[2] or 0)} for r in rows] except Exception as e: sys_log.error(f"[OpenClawBot] weekly_trend: {e}") return [] def query_trend_range(start_str: str, end_str: str) -> list: - """指定區間每日業績(用於趨勢圖,回傳 [{'date', 'revenue'}] 按日期升序)""" + """指定區間每日業績(用於趨勢圖,回傳 [{'date', 'revenue', 'orders'}] 按日期升序)""" try: with _db().connect() as c: rows = c.execute(text(""" - SELECT "日期", COALESCE(SUM(CAST("總業績" AS FLOAT)), 0) + SELECT "日期", + COALESCE(SUM(CAST("總業績" AS FLOAT)), 0), + COUNT(DISTINCT "訂單編號") FROM realtime_sales_monthly WHERE CAST("日期" AS DATE) BETWEEN CAST(:s AS DATE) AND CAST(:e AS DATE) GROUP BY "日期" ORDER BY "日期" ASC """), {'s': start_str.replace('/', '-'), 'e': end_str.replace('/', '-')}).fetchall() - return [{'date': str(r[0]), 'revenue': float(r[1])} for r in rows] + return [{'date': str(r[0]), 'revenue': float(r[1]), 'orders': int(r[2] or 0)} for r in rows] except Exception as e: sys_log.error(f"[OpenClawBot] query_trend_range: {e}") return [] @@ -3777,11 +4197,7 @@ def openclaw_answer(question: str): "📌 *完整說明*:點下方「❓使用說明」按鈕\n\n" "_或直接問我任何問題,我會自動找資料回答!_" ) - return help_text, [ - [{"text": "❓ 完整使用說明", "callback_data": "cmd:help"}], - [{"text": "📊 業績查詢", "callback_data": "menu:sales"}, - {"text": "🏆 商品廠商", "callback_data": "menu:products"}], - ] + return help_text, quick_menu_keyboard() from services.ollama_service import ollama_service @@ -3997,6 +4413,17 @@ def handle_cmd(cmd, arg, chat_id, reply_to): ld = latest_date() or datetime.now(TAIPEI_TZ).strftime('%Y/%m/%d') target = normalize_date(arg) if arg else ld + def _send_mcp_text_result(title: str, data, empty_message: str) -> bool: + """相容新版 MCP 文字回傳;已處理則回 True,舊 dict 格式則回 False。""" + if isinstance(data, str): + text = data.strip() + if text: + send_message(chat_id, f"{title}\n\n{text[:3600]}", reply_to, parse_mode=None) + else: + send_message(chat_id, empty_message, reply_to, parse_mode=None) + return True + return False + if cmd in ('sales', '業績'): s = query_sales(target) t = query_top_products(target, 3) @@ -4005,14 +4432,14 @@ def handle_cmd(cmd, arg, chat_id, reply_to): elif cmd in ('top', '熱銷'): p = query_top_products(target, 10) - kb = [[{'text': '📊 查業績', 'callback_data': f'cmd:sales:{target}'}, - {'text': '📋 完整報表', 'callback_data': f'cmd:report:{target}'}]] + kb = [_row(('📊 查業績', f'cmd:sales:{target}'), + ('📋 完整報表', f'cmd:report:{target}'))] send_message(chat_id, fmt_products(p, target), reply_to, kb) elif cmd in ('vendor', '廠商'): v = query_top_vendors(target, 10) - kb = [[{'text': '🏆 熱銷商品', 'callback_data': f'cmd:top:{target}'}, - {'text': '📊 查業績', 'callback_data': f'cmd:sales:{target}'}]] + kb = [_row(('🏆 熱銷商品', f'cmd:top:{target}'), + ('📊 查業績', f'cmd:sales:{target}'))] send_message(chat_id, fmt_vendors(v, target), reply_to, kb) elif cmd in ('trend', '趨勢'): @@ -4134,6 +4561,8 @@ def handle_cmd(cmd, arg, chat_id, reply_to): try: from services.mcp_context_service import get_ecommerce_news data = get_ecommerce_news() + if _send_mcp_text_result("📰 即時電商新聞", data, "⚠️ 新聞資料暫時無法取得"): + return news = data.get('news', []) sys_log.info(f"[OpenClawBot] news: {len(news)} items") if news: @@ -4157,6 +4586,8 @@ def handle_cmd(cmd, arg, chat_id, reply_to): try: from services.mcp_context_service import get_taiwan_weather data = get_taiwan_weather() + if _send_mcp_text_result("🌤 台灣天氣預報", data, "⚠️ 天氣資料暫時無法取得"): + return weather = data.get('weather', {}) sys_log.info(f"[OpenClawBot] weather: {list(weather.keys())}") if weather: @@ -4180,6 +4611,8 @@ def handle_cmd(cmd, arg, chat_id, reply_to): try: from services.mcp_context_service import get_taiwan_trends data = get_taiwan_trends() + if _send_mcp_text_result("🔥 台灣 Google 熱搜(即時)", data, "⚠️ 熱搜資料暫時無法取得"): + return trends = data.get('trends', []) if trends: lines = ["🔥 *台灣 Google 熱搜(即時)*\n"] @@ -4195,6 +4628,8 @@ def handle_cmd(cmd, arg, chat_id, reply_to): try: from services.mcp_context_service import get_dcard_trends data = get_dcard_trends() + if _send_mcp_text_result("💬 Dcard 消費者討論熱點", data, "⚠️ Dcard 資料暫時無法取得"): + return posts = data.get('posts', []) if posts: lines = ["💬 *Dcard 消費者討論熱點*\n"] @@ -4210,6 +4645,8 @@ def handle_cmd(cmd, arg, chat_id, reply_to): try: from services.mcp_context_service import get_twbank_exchange_rates data = get_twbank_exchange_rates() + if _send_mcp_text_result("💱 台灣銀行即時匯率", data, "⚠️ 匯率資料暫時無法取得"): + return rates = data.get('rates', {}) if rates: lines = ["💱 *台灣銀行即時匯率*\n"] @@ -4229,7 +4666,9 @@ def handle_cmd(cmd, arg, chat_id, reply_to): elif cmd == 'calendar': try: from services.mcp_context_service import get_upcoming_events - data = get_upcoming_events(60) + data = get_upcoming_events() + if _send_mcp_text_result("📅 近期電商節慶行事曆", data, "✅ 近 60 天無重大電商節慶"): + return events = data.get('events', []) if events: lines = ["📅 *近期電商節慶行事曆*\n"] @@ -4248,6 +4687,8 @@ def handle_cmd(cmd, arg, chat_id, reply_to): try: from services.mcp_context_service import get_youtube_trending data = get_youtube_trending() + if _send_mcp_text_result("▶️ YouTube 熱門開箱/推薦影片", data, "⚠️ YouTube 資料暫時無法取得"): + return videos = data.get('videos', []) if videos: lines = ["▶️ *YouTube 熱門開箱/推薦影片*\n"] @@ -4320,8 +4761,8 @@ def handle_cmd(cmd, arg, chat_id, reply_to): return msg = pchome_fmt_compare(results, _kw) - kb = [[{'text': '🔍 重新搜尋', 'callback_data': 'await:search_compare'}, - {'text': '📊 競品日報', 'callback_data': 'menu:competitor'}]] + kb = [_row(('🔍 重新搜尋', 'await:search_compare'), + ('📊 競品日報', 'menu:competitor'))] send_message(_chat_id, msg, _reply_to, kb) except Exception as _e: sys_log.error(f'[PChome] compare_bg: {_e}', exc_info=True) @@ -4346,8 +4787,8 @@ def handle_cmd(cmd, arg, chat_id, reply_to): results = pchome_batch(_db(), top_n=30, date_str=_date_str) pchome_save(_db(), results) msg = pchome_fmt_report(results, _date_str) - kb = [[{'text': '🔍 搜尋比價', 'callback_data': 'await:search_compare'}, - {'text': '📄 比價簡報', 'callback_data': 'menu:competitor_ppt'}]] + kb = [_row(('🔍 搜尋比價', 'await:search_compare'), + ('📄 比價簡報', 'menu:competitor_ppt'))] send_message(_chat_id, msg, _reply_to, kb) except Exception as _e: sys_log.error(f'[PChome] daily_report_bg: {_e}', exc_info=True) @@ -4359,16 +4800,16 @@ def handle_cmd(cmd, arg, chat_id, reply_to): elif cmd in ('compare', '同比'): data = query_comparison(target) - send_message(chat_id, fmt_comparison(data, target), reply_to, - [[{'text': '📊 今日業績', 'callback_data': f'cmd:sales:{target}'}, - {'text': '🧬 策略矩陣', 'callback_data': f'cmd:strategy:{target}'}]]) + send_message( + chat_id, + fmt_comparison(data, target), + reply_to, + [_row(('📊 今日業績', f'cmd:sales:{target}'), ('🧬 策略矩陣', f'cmd:strategy:{target}'))], + ) elif cmd in ('category', '分類'): cats = query_category_sales(target) - kb = [ - [{'text': '🗂 鑽取分類', 'callback_data': 'menu:category'}, - {'text': '🏆 熱銷商品', 'callback_data': f'cmd:top:{target}'}], - ] + kb = [_row(('🗂 鑽取分類', 'menu:category'), ('🏆 熱銷商品', f'cmd:top:{target}'))] send_message(chat_id, fmt_category(cats, target), reply_to, kb) elif cmd in ('catdetail', '分類細項'): @@ -4382,10 +4823,7 @@ def handle_cmd(cmd, arg, chat_id, reply_to): items = query_category_detail(cat_name, date_cd) d_label = date_cd[-5:] if date_cd else '近7日' msg = fmt_category_detail(cat_name, items, d_label) - kb = [ - [{'text': '⬅ 分類總覽', 'callback_data': 'menu:category'}, - {'text': '🏆 熱銷商品', 'callback_data': f'cmd:top:{target}'}], - ] + kb = [_row(('⬅ 分類總覽', 'menu:category'), ('🏆 熱銷商品', f'cmd:top:{target}'))] send_message(chat_id, msg, reply_to, kb) elif cmd in ('restock', '補貨'): @@ -4393,9 +4831,8 @@ def handle_cmd(cmd, arg, chat_id, reply_to): items = query_restock_forecast(20) msg = fmt_restock_forecast(items) kb = [ - [{'text': '🏆 熱銷商品', 'callback_data': f'cmd:top:{target}'}, - {'text': '🧬 商品健康', 'callback_data': f'cmd:health:{target}'}], - [{'text': '🔄 重新計算', 'callback_data': 'cmd:restock'}], + _row(('🏆 熱銷商品', f'cmd:top:{target}'), ('🧬 商品健康', f'cmd:health:{target}')), + _row(('🔄 重新計算', 'cmd:restock')), ] send_message(chat_id, msg, reply_to, kb) @@ -4414,9 +4851,9 @@ def handle_cmd(cmd, arg, chat_id, reply_to): msg = fmt_promo_comparison(data) promo_range_arg = f'{start_s}-{end_s}' kb = [ - [{'text': '🎉 再查一個促銷', 'callback_data': 'await:promo_range'}, - {'text': '📊 業績查詢', 'callback_data': f'cmd:sales:{start_s}'}], - [{'text': '📊 產出促銷簡報', 'callback_data': f'cmd:ppt:promo {promo_range_arg}'}], + _row(('🎉 再查一個促銷', 'await:promo_range'), + ('📊 業績查詢', f'cmd:sales:{start_s}')), + _row(('📊 產出促銷簡報', f'cmd:ppt:promo {promo_range_arg}')), ] send_message(chat_id, msg, reply_to, kb) else: @@ -4425,7 +4862,7 @@ def handle_cmd(cmd, arg, chat_id, reply_to): else: send_message(chat_id, "🎉 *促銷效益追蹤*\n\n請輸入活動日期範圍:\n`/promo 2026/04/01-2026/04/07`\n或點選 🎉 促銷追蹤 按鈕", - reply_to, [[{'text': '🎉 設定促銷範圍', 'callback_data': 'await:promo_range'}]]) + reply_to, [_row(('🎉 設定促銷範圍', 'await:promo_range'))]) elif cmd in ('goal', '目標'): # /goal 200000 設定日目標 @@ -4452,8 +4889,7 @@ def handle_cmd(cmd, arg, chat_id, reply_to): reply_to, parse_mode='Markdown') else: status = get_goal_status(target) - kb = [[{'text': '📊 今日業績', 'callback_data': f'cmd:sales:{target}'}, - {'text': '🔄 同期比較', 'callback_data': f'cmd:compare:{target}'}]] + kb = [_row(('📊 今日業績', f'cmd:sales:{target}'), ('🔄 同期比較', f'cmd:compare:{target}'))] send_message(chat_id, fmt_goal_status(status), reply_to, kb) elif cmd in ('chart', '圖表'): @@ -4520,18 +4956,18 @@ def handle_cmd(cmd, arg, chat_id, reply_to): icon = tag_icon.get(k, '•') lines.append(f" {icon} {k} {v} 件 `{bar}`") - kb = [[{'text': '🎲 策略矩陣', 'callback_data': f'cmd:strategy:{target}'}, - {'text': '🏆 熱銷商品', 'callback_data': f'cmd:top:{target}'}], - [{'text': '📊 今日業績', 'callback_data': f'cmd:sales:{target}'}, - {'text': '🔄 同期比較', 'callback_data': f'cmd:compare:{target}'}]] + kb = [ + _row(('🎲 策略矩陣', f'cmd:strategy:{target}'), ('🏆 熱銷商品', f'cmd:top:{target}')), + _row(('📊 今日業績', f'cmd:sales:{target}'), ('🔄 同期比較', f'cmd:compare:{target}')), + ] send_message(chat_id, "\n".join(lines), reply_to, kb) elif cmd in ('strategy', '策略'): strat = analyze_product_strategy(target, 10) - kb = [[{'text': '🏥 商品健康', 'callback_data': f'cmd:health:{target}'}, - {'text': '🔄 同期比較', 'callback_data': f'cmd:compare:{target}'}], - [{'text': '🏆 熱銷商品', 'callback_data': f'cmd:top:{target}'}, - {'text': '📊 今日業績', 'callback_data': f'cmd:sales:{target}'}]] + kb = [ + _row(('🏥 商品健康', f'cmd:health:{target}'), ('🔄 同期比較', f'cmd:compare:{target}')), + _row(('🏆 熱銷商品', f'cmd:top:{target}'), ('📊 今日業績', f'cmd:sales:{target}')), + ] send_message(chat_id, fmt_strategy(strat, target), reply_to, kb) elif cmd in ('ppt', 'slides', '簡報'): @@ -4562,10 +4998,11 @@ def handle_cmd(cmd, arg, chat_id, reply_to): label = type_labels.get(_sub_type, '簡報') caption = f"{label} — 由 OpenClaw AI 自動生成\n💡 可用 PowerPoint / Keynote / Google Slides 開啟" send_document(_chat_id, ppt_path, caption=caption, reply_to=_reply_to) - try: - os.unlink(ppt_path) - except Exception: - pass + if not _is_cached_ppt_file(ppt_path): + try: + os.unlink(ppt_path) + except Exception: + pass else: send_message(_chat_id, "⚠️ 簡報生成失敗,請稍後再試", _reply_to) except Exception as e: @@ -4597,16 +5034,16 @@ def handle_cmd(cmd, arg, chat_id, reply_to): else: msg = "⚠️ 暫無月份資料" # 快捷按鈕:最近3個月 + 月報PPT(僅在查看特定月份時顯示) - kb_months = [] - for am in months[:3]: - kb_months.append({'text': f"📊 {am['month']}", 'callback_data': f"cmd:history:{am['month']}"}) + kb_months = _chunk_rows( + [(f"📊 {am['month']}", f"cmd:history:{am['month']}") for am in months[:3]], + row_size=2, + ) if arg and re.match(r'\d{4}/\d{2}', arg): # 查看特定月份時,額外顯示「產出月報PPT」按鈕 - kb = [kb_months] if kb_months else [] - kb.append([{'text': f'📊 產出 {arg} 月報', - 'callback_data': f'cmd:ppt:monthly {arg}'}]) + kb = kb_months if kb_months else [] + kb.append(_row((f'📊 產出 {arg} 月報', f'cmd:ppt:monthly {arg}'))) else: - kb = [kb_months] if kb_months else None + kb = kb_months if kb_months else None send_message(chat_id, msg, reply_to, kb) # ── 原有指令 ───────────────────────────────────────────────── @@ -4635,7 +5072,7 @@ def handle_cmd(cmd, arg, chat_id, reply_to): " 同期比較 → vs 上週同日\n" " 分類業績 → 各品類佔比\n" " 日期/區間 → 單日或起迄區間 格式: `2026/04/01-2026/04/15`\n" - " 月份覽 → 列出所有可查月份,點月份查詳情\n\n" + " 月份總覽 → 列出所有可查月份,點月份查詳情\n\n" "🏆 *商品廠商 ▸ 點「商品廠商」按鈕*\n" " 熱銷商品 TOP10 → 含商品ID + 可點 PChome 連結\n" @@ -4683,13 +5120,7 @@ def handle_cmd(cmd, arg, chat_id, reply_to): "💡 *日期格式*:`2026/04/10` 或 `2026-04-10` 皆可\n" "💡 *有疑問*:直接用中文問我,我會回答!" ) - help_kb = [ - [{'text': '📊 業績查詢', 'callback_data': 'menu:sales'}, - {'text': '🏆 商品廠商', 'callback_data': 'menu:products'}], - [{'text': '📈 智能分析', 'callback_data': 'menu:analysis'}, - {'text': '📄 簡報報表', 'callback_data': 'menu:reports'}], - [{'text': '🔙 返回主選單', 'callback_data': 'menu:main'}], - ] + help_kb = quick_menu_keyboard() send_message(chat_id, reply, reply_to, help_kb) elif cmd == 'ack': @@ -4806,10 +5237,10 @@ def handle_cmd(cmd, arg, chat_id, reply_to): # 取涵蓋日期的 latest date 顯示快速按鈕 quick_date = date_max.replace('-', '/') if date_max else (latest_date() or '') import_kb = [ - [{'text': f'📊 查看 {quick_date} 業績', 'callback_data': f'cmd:sales:{quick_date}'}, - {'text': '🏆 熱銷商品排行', 'callback_data': f'cmd:top:{quick_date}'}], - [{'text': '📄 產出日報 PPT', 'callback_data': f'cmd:ppt:daily {quick_date}'}, - {'text': '📅 月份業績覽', 'callback_data': 'cmd:history'}], + _row((f'📊 查看 {quick_date} 業績', f'cmd:sales:{quick_date}'), + ('🏆 熱銷商品排行', f'cmd:top:{quick_date}')), + _row(('📄 產出日報 PPT', f'cmd:ppt:daily {quick_date}'), + ('📅 月份總覽', 'cmd:history')), ] if quick_date else None send_message(chat_id, result_msg, None, import_kb) else: @@ -4868,24 +5299,32 @@ def telegram_webhook(): # ── Telegram retry 去重 ─────────────────────────────── uid = update.get('update_id') - if uid is not None: - if uid in _seen_update_ids: - sys_log.debug(f"[OpenClawBot] duplicate update_id={uid}, skip") - return jsonify({'ok': True}) - _seen_update_ids.add(uid) - if len(_seen_update_ids) > _SEEN_MAX: - # 清掉最舊的 100 筆(set 無序,直接 pop 100 個) - for _ in range(100): - _seen_update_ids.pop() # ── Callback Query(按鈕)───────────────────────────── if 'callback_query' in update: cq = update['callback_query'] cq_id = cq['id'] - data = cq.get('data', '') + data = _normalize_callback_data(cq.get('data', '')) chat_id = cq['message']['chat']['id'] chat_type = cq['message']['chat'].get('type', '') cq_from_id = (cq.get('from') or {}).get('id') + cq_message_id = cq.get('message', {}).get('message_id') + + duplicate_key = _build_callback_dedupe_key( + update_id=uid, + cq_id=cq_id, + message_id=cq_message_id, + data=data, + chat_id=chat_id, + user_id=cq_from_id, + ) + if _is_duplicate_update(duplicate_key): + sys_log.debug( + f"[OpenClawBot] duplicate callback uid={uid} cq_id={cq_id}, skip" + ) + answer_callback(cq_id) + return jsonify({'ok': True}) + sys_log.info(f'[OpenClawBot] CB: chat={chat_id} type={chat_type} data={data} allowed={ALLOWED_GROUP}') # fail-closed:未授權一律安靜拒絕(關閉 loading,不回任何訊息避免偵察) @@ -4894,7 +5333,7 @@ def telegram_webhook(): f'[OpenClawBot] CB rejected: chat={chat_id} type={chat_type} user={cq_from_id}' ) answer_callback(cq_id) - return jsonify({'ok': False, 'error': 'forbidden'}), 403 + return jsonify({'ok': False, 'error': 'forbidden'}) answer_callback(cq_id) send_typing(chat_id) @@ -4918,7 +5357,17 @@ def telegram_webhook(): 'competitor_ppt': '📄 *競品比價簡報* — 選擇時間範圍', 'category': '🗂 *分類業績鑽取* — 點選分類深入分析', } - send_message(chat_id, titles.get(key, '請選擇'), None, kb) + if cq_message_id: + result = edit_message_text( + chat_id, + cq_message_id, + titles.get(key, '請選擇'), + kb, + ) + if _should_fallback_send_message(result): + send_message(chat_id, titles.get(key, '請選擇'), None, kb) + else: + send_message(chat_id, titles.get(key, '請選擇'), None, kb) elif data.startswith('await:'): # 進入輸入等待狀態 @@ -4926,14 +5375,85 @@ def telegram_webhook(): if action in _AWAIT_PROMPTS: prompt_text, label = _AWAIT_PROMPTS[action] _input_pending[chat_id] = {'action': action, 'label': label} - send_message(chat_id, f"{prompt_text}\n\n_輸入 `/取消` 可退出_", None, - [[{'text': '✖ 取消', 'callback_data': 'menu:main'}]], - parse_mode='Markdown') + cancel_kb = [_row(('✖ 取消', 'menu:main'))] + if cq_message_id: + result = edit_message_text( + chat_id, + cq_message_id, + f"{prompt_text}\n\n_輸入 `/取消` 可退出_", + cancel_kb, + parse_mode='Markdown', + ) + if _should_fallback_send_message(result): + send_message( + chat_id, + f"{prompt_text}\n\n_輸入 `/取消` 可退出_", + None, + cancel_kb, + parse_mode='Markdown', + ) + else: + send_message( + chat_id, + f"{prompt_text}\n\n_輸入 `/取消` 可退出_", + None, + cancel_kb, + parse_mode='Markdown', + ) elif data.startswith('cmd:'): parts = data[4:].split(':', 1) - handle_cmd(parts[0], parts[1] if len(parts) > 1 else '', chat_id, None) + if cq_message_id: + _orig_send_message = send_message + def _callback_send_message( + _chat_id, + _text, + _reply_to=None, + _keyboard=None, + _parse_mode="Markdown", + **_kwargs, + ): + if _reply_to is None and "reply_to" in _kwargs: + _reply_to = _kwargs.pop("reply_to") + if "keyboard" in _kwargs: + _keyboard = _kwargs.pop("keyboard") + if "parse_mode" in _kwargs: + _parse_mode = _kwargs.pop("parse_mode") + + if _reply_to == cq_message_id: + result = edit_message_text( + _chat_id, + cq_message_id, + _text, + _keyboard, + _parse_mode, + ) + if not _should_fallback_send_message(result): + return result + + return _orig_send_message( + _chat_id, + _text, + _reply_to, + _keyboard, + _parse_mode, + **_kwargs, + ) + + with _CALLBACK_SEND_LOCK: + try: + globals()['send_message'] = _callback_send_message + handle_cmd(parts[0], parts[1] if len(parts) > 1 else '', chat_id, cq_message_id) + finally: + globals()['send_message'] = _orig_send_message + else: + handle_cmd(parts[0], parts[1] if len(parts) > 1 else '', chat_id, cq_message_id) + + return jsonify({'ok': True}) + + if _is_duplicate_update(uid): + sys_log.debug(f"[OpenClawBot] duplicate update_id={uid}, skip") return jsonify({'ok': True}) # ── Message ─────────────────────────────────────────── @@ -4954,7 +5474,7 @@ def telegram_webhook(): f'[OpenClawBot] MSG rejected: chat={chat_id} type={chat_type} user={_uid}' ) # 靜默拒絕:不回 Telegram 訊息(避免陌生人偵察 bot 存在與白名單機制) - return jsonify({'ok': False, 'error': 'forbidden'}), 403 + return jsonify({'ok': False, 'error': 'forbidden'}) if chat_type in ('group', 'supergroup'): # 移除 @mention(不強制要求,但如有則移除) @@ -5016,8 +5536,12 @@ def telegram_webhook(): else: send_message(chat_id, "⚠️ 無法辨識圖片中的商品,請嘗試更清晰的圖片", msg_id) else: - send_message(chat_id, "⚠️ 圖片辨識失敗,請直接輸入商品名稱搜尋", msg_id, - [[{'text': '🔍 文字搜尋', 'callback_data': 'await:search_compare'}]]) + send_message( + chat_id, + "⚠️ 圖片辨識失敗,請直接輸入商品名稱搜尋", + msg_id, + [_row(('🔍 文字搜尋', 'await:search_compare'))], + ) except Exception as _img_e: sys_log.error(f"[VisionSearch] {_img_e}") send_message(chat_id, "⚠️ 圖片處理失敗,請直接輸入商品名稱搜尋", msg_id) @@ -5078,9 +5602,9 @@ def telegram_webhook(): except ValueError: send_message(chat_id, f"⚠️ 格式錯誤,請輸入數字(例如:`150000`)", - msg_id, [[{'text': f'重新設定 {label}', - 'callback_data': f'await:{action}'}], - _BACK], parse_mode='Markdown') + msg_id, + [_row((f'重新設定 {label}', f'await:{action}')), _BACK], + parse_mode='Markdown') sys_log.info(f"[OpenClawBot] → replied chat={chat_id}") return jsonify({'ok': True}) @@ -5115,7 +5639,7 @@ def telegram_webhook(): else: send_message(chat_id, f"⚠️ `{s}` ~ `{e}` 查無業績資料", - msg_id, [[{'text': '重新輸入', 'callback_data': 'await:date_range_sales'}]], + msg_id, [_row(('重新輸入', 'await:date_range_sales'))], parse_mode='Markdown') elif len(dates) == 1: # 單一日期 @@ -5128,7 +5652,7 @@ def telegram_webhook(): send_message(chat_id, "⚠️ 格式錯誤\n📌 單日:`2026/04/15`\n📌 區間:`2026/04/01-2026/04/15`\n📌 月份:`2026/04`", msg_id, - [[{'text': '重新輸入', 'callback_data': 'await:date_range_sales'}], _BACK], + [_row(('重新輸入', 'await:date_range_sales')), _BACK], parse_mode='Markdown') sys_log.info(f"[OpenClawBot] → replied chat={chat_id}") return jsonify({'ok': True}) @@ -5158,8 +5682,7 @@ def telegram_webhook(): else: send_message(chat_id, f"⚠️ 日期格式錯誤,請重新輸入(例如:`2026/04/15`)", - msg_id, [[{'text': '重新輸入', 'callback_data': f'await:{action}'}], - _BACK], parse_mode='Markdown') + msg_id, [_row(('重新輸入', f'await:{action}')), _BACK], parse_mode='Markdown') elif action == 'promo_range': # 促銷範圍:格式 2026/04/01-2026/04/07 @@ -5169,7 +5692,7 @@ def telegram_webhook(): else: send_message(chat_id, "⚠️ 格式錯誤,例如:`2026/04/01-2026/04/07`", - msg_id, [[{'text': '重新輸入', 'callback_data': 'await:promo_range'}]], + msg_id, [_row(('重新輸入', 'await:promo_range'))], parse_mode='Markdown') sys_log.info(f"[OpenClawBot] → replied chat={chat_id}") return jsonify({'ok': True}) @@ -5183,7 +5706,7 @@ def telegram_webhook(): # 解析指令(/xxx 或已知指令詞) q = question.lstrip('/') parts = q.split(None, 1) - cmd = parts[0].lower() if parts else '' + cmd = parts[0].split('@', 1)[0].lower() if parts else '' arg = parts[1] if len(parts) > 1 else '' KNOWN = { diff --git a/run_telegram_bot.py b/run_telegram_bot.py index 06b2d8d..d5d27c4 100644 --- a/run_telegram_bot.py +++ b/run_telegram_bot.py @@ -85,6 +85,12 @@ async def main(): # 建立 Bot 服務 bot_service = TelegramBotService(token) + if not bot_service.should_run_polling(): + logger.warning( + "Webhook 已設定,Polling Bot 已跳過啟動;請使用 OpenClaw webhook 路徑處理互動。" + ) + return + # 取得 Application app = bot_service.get_application() diff --git a/scripts/tools/run_telegram_bot.py b/scripts/tools/run_telegram_bot.py index ecf023d..394ee4c 100644 --- a/scripts/tools/run_telegram_bot.py +++ b/scripts/tools/run_telegram_bot.py @@ -85,6 +85,12 @@ async def main(): # 建立 Bot 服務 bot_service = TelegramBotService(token) + if not bot_service.should_run_polling(): + logger.warning( + "Webhook 已設定,Polling Bot 已跳過啟動;請使用 OpenClaw webhook 路徑處理互動。" + ) + return + # 取得 Application app = bot_service.get_application() diff --git a/services/openclaw_bot/menu_keyboards.py b/services/openclaw_bot/menu_keyboards.py index d4a45ae..3a41204 100644 --- a/services/openclaw_bot/menu_keyboards.py +++ b/services/openclaw_bot/menu_keyboards.py @@ -37,18 +37,57 @@ def _yesterday_from(date_str): return '' +def _row(*buttons): + """將 (text, callback_data) 打包成 Telegram keyboard row。""" + return [{'text': text, 'callback_data': callback_data} for text, callback_data in buttons] + + +def _chunk_rows(items, row_size=2): + """將一維按鈕序列切成固定列寬。""" + rows = [] + cur = [] + for item in items: + cur.append({'text': item[0], 'callback_data': item[1]}) + if len(cur) >= row_size: + rows.append(cur) + cur = [] + if cur: + rows.append(cur) + return rows + + +def quick_menu_keyboard(): + """help/引導頁快速入口(精簡版)。""" + return _chunk_rows([ + ('📊 快速查詢', 'menu:sales'), + ('🏆 熱銷與廠商', 'menu:products'), + ('🎯 目標管理', 'menu:goals'), + ('📈 智能分析', 'menu:analysis'), + ('🧩 報表簡報', 'menu:reports'), + ('🔍 競品比較', 'menu:competitor'), + ], row_size=2) + + +def _menu_with_back(rows): + """共用加上「返回主選單」尾巴。""" + return rows + [_BACK] + + def main_menu_keyboard(): - """第一層主選單 — 7大功能類別""" - return [ - [{'text': '📊 業績查詢', 'callback_data': 'menu:sales'}, - {'text': '🏆 商品廠商', 'callback_data': 'menu:products'}], - [{'text': '🎯 目標管理', 'callback_data': 'menu:goals'}, - {'text': '📈 智能分析', 'callback_data': 'menu:analysis'}], - [{'text': '📄 簡報報表', 'callback_data': 'menu:reports'}, - {'text': '🌐 市場情報', 'callback_data': 'menu:market'}], - [{'text': '🔍 競品日報', 'callback_data': 'menu:competitor'}], - [{'text': '❓ 使用說明', 'callback_data': 'cmd:help'}], - ] + """第一層主選單 — 主要功能入口。""" + return _chunk_rows( + [ + ('📊 業績查詢', 'menu:sales'), + ('🏆 商品廠商', 'menu:products'), + ('🎯 目標管理', 'menu:goals'), + ('📈 智能分析', 'menu:analysis'), + ('📄 簡報報表', 'menu:reports'), + ('🌐 市場情報', 'menu:market'), + ('🔍 競品日報', 'menu:competitor'), + ('❓ 使用說明', 'cmd:help'), + ], + row_size=2, + ) def _submenu_sales(): @@ -57,20 +96,20 @@ def _submenu_sales(): current_month = datetime.now(TAIPEI_TZ).strftime('%Y/%m') d_label = ld[-5:] if ld else '-' y_label = yesterday[-5:] if yesterday else '-' - return [ - [{'text': f'📊 今日 ({d_label})', 'callback_data': f'cmd:sales:{ld}'}, - {'text': f'⬅ 昨日 ({y_label})', 'callback_data': f'cmd:sales:{yesterday}'}], - [{'text': '📅 每週業績', 'callback_data': 'cmd:trend:week'}, - {'text': '📅 每月業績', 'callback_data': f'cmd:history:{current_month}'}], - [{'text': '📅 每季業績', 'callback_data': 'cmd:trend:quarter'}, - {'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}], - [{'text': '📈 趨勢分析', 'callback_data': 'menu:trend'}, - {'text': '🔄 同期比較', 'callback_data': f'cmd:compare:{ld}'}], - [{'text': '🗂 分類業績', 'callback_data': f'cmd:category:{ld}'}, - {'text': '📅 日期/區間', 'callback_data': 'await:date_range_sales'}], - [{'text': '🗃 月份覽', 'callback_data': 'cmd:history'}], - _BACK, - ] + + return _menu_with_back([ + _row((f'📊 今日 ({d_label})', f'cmd:sales:{ld}'), + (f'⬅ 昨日 ({y_label})', f'cmd:sales:{yesterday}')), + _row(('📅 每週業績', 'cmd:trend:week'), + ('📅 每月業績', f'cmd:history:{current_month}')), + _row(('📅 每季業績', 'cmd:trend:quarter'), + ('📅 近半年', 'cmd:trend:half')), + _row(('📈 趨勢分析', 'menu:trend'), + ('🔄 同期比較', f'cmd:compare:{ld}')), + _row(('🗂 分類業績', f'cmd:category:{ld}'), + ('📅 日期/區間', 'await:date_range_sales')), + _row(('🗃 月份總覽', 'cmd:history')), + ]) def _submenu_products(): @@ -78,16 +117,16 @@ def _submenu_products(): yesterday = _yesterday_from(ld) d_label = ld[-5:] if ld else '-' y_label = yesterday[-5:] if yesterday else '-' - return [ - [{'text': f'🏆 熱銷商品 ({d_label})', 'callback_data': f'cmd:top:{ld}'}, - {'text': f'🏭 熱銷廠商 ({d_label})', 'callback_data': f'cmd:vendor:{ld}'}], - [{'text': f'⬅ 昨日商品 ({y_label})', 'callback_data': f'cmd:top:{yesterday}'}, - {'text': '🧬 商品健康', 'callback_data': f'cmd:health:{ld}'}], - [{'text': '📦 補貨預測', 'callback_data': 'cmd:restock'}, - {'text': '🗂 分類鑽取', 'callback_data': 'menu:category'}], - [{'text': '📅 指定日期', 'callback_data': 'await:date_top'}], - _BACK, - ] + + return _menu_with_back([ + _row((f'🏆 熱銷商品 ({d_label})', f'cmd:top:{ld}'), + (f'🏭 熱銷廠商 ({d_label})', f'cmd:vendor:{ld}')), + _row((f'⬅ 昨日商品 ({y_label})', f'cmd:top:{yesterday}'), + ('🧬 商品健康', f'cmd:health:{ld}')), + _row(('📦 補貨預測', 'cmd:restock'), + ('🗂 分類鑽取', 'menu:category')), + _row(('📅 指定日期', 'await:date_top')), + ]) def _submenu_goals(): @@ -100,35 +139,33 @@ def _submenu_goals(): def _fmt(v): return f'{v/10000:.0f}萬' if v else '未設' - return [ - [{'text': '📋 查看達成率', 'callback_data': 'cmd:goal'}], - [{'text': f'日目標 ({_fmt(dg)})', 'callback_data': 'await:goal_daily'}, - {'text': f'月目標 ({_fmt(mg)})', 'callback_data': 'await:goal_monthly'}], - [{'text': f'季目標 ({_fmt(qg)})', 'callback_data': 'await:goal_quarterly'}, - {'text': f'半年目標 ({_fmt(hg)})', 'callback_data': 'await:goal_half'}], - [{'text': f'年目標 ({_fmt(yg)})', 'callback_data': 'await:goal_yearly'}], - _BACK, - ] + return _menu_with_back([ + _row(('📋 查看達成率', 'cmd:goal')), + _row((f'日目標 ({_fmt(dg)})', 'await:goal_daily'), + (f'月目標 ({_fmt(mg)})', 'await:goal_monthly')), + _row((f'季目標 ({_fmt(qg)})', 'await:goal_quarterly'), + (f'半年目標 ({_fmt(hg)})', 'await:goal_half')), + _row((f'年目標 ({_fmt(yg)})', 'await:goal_yearly')), + ]) def _submenu_analysis(): ld = _latest_date() - return [ - [{'text': '🎲 策略矩陣', 'callback_data': f'cmd:strategy:{ld}'}, - {'text': '📈 業績趨勢', 'callback_data': 'menu:trend'}], - [{'text': '🧬 商品健康', 'callback_data': f'cmd:health:{ld}'}, - {'text': '🗂 分類業績', 'callback_data': f'cmd:category:{ld}'}], - [{'text': '🎉 促銷追蹤', 'callback_data': 'await:promo_range'}, - {'text': '📦 補貨預測', 'callback_data': 'cmd:restock'}], - [{'text': '📊 趨勢圖表', 'callback_data': 'cmd:chart'}, - {'text': '🔄 同期比較', 'callback_data': f'cmd:compare:{ld}'}], - [{'text': '📅 指定日期', 'callback_data': 'await:date_analysis'}], - _BACK, - ] + return _menu_with_back([ + _row(('🎲 策略矩陣', f'cmd:strategy:{ld}'), + ('📈 業績趨勢', 'menu:trend')), + _row(('🧬 商品健康', f'cmd:health:{ld}'), + ('🗂 分類業績', f'cmd:category:{ld}')), + _row(('🎉 促銷追蹤', 'await:promo_range'), + ('📦 補貨預測', 'cmd:restock')), + _row(('📊 趨勢圖表', 'cmd:chart'), + ('🔄 同期比較', f'cmd:compare:{ld}')), + _row(('📅 指定日期', 'await:date_analysis')), + ]) def _submenu_category(): - """分類業績鑽取 — 顯示 L1 固定分類按鈕""" + """分類業績鑽取 — 顯示 L1 固定分類按鈕。""" ld = _latest_date() cats = [ ('美妝保養', '💄'), ('保健食品/用品', '💊'), ('母嬰', '👶'), @@ -137,92 +174,86 @@ def _submenu_category(): ] rows = [] for i in range(0, len(cats), 2): - pair = [] - for cat, icon in cats[i:i + 2]: - pair.append({'text': f'{icon} {cat}', 'callback_data': f'cmd:catdetail:{cat}:{ld}'}) - rows.append(pair) + rows.append([{ + 'text': f'{icon} {name}', + 'callback_data': f'cmd:catdetail:{name}:{ld}' + } for name, icon in cats[i:i + 2]]) rows.append([{'text': '🗂 全分類清單', 'callback_data': f'cmd:category:{ld}'}]) - rows.append(_BACK) - return rows + return _menu_with_back(rows) def _submenu_trend(): - return [ - [{'text': '📅 近7日', 'callback_data': 'cmd:trend:7'}, - {'text': '📅 近1個月', 'callback_data': 'cmd:trend:month'}], - [{'text': '📅 近3個月', 'callback_data': 'cmd:trend:quarter'}, - {'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}], - [{'text': '📅 本年度', 'callback_data': 'cmd:trend:year'}, - {'text': '📅 指定月份', 'callback_data': 'await:date_trend_month'}], - [{'text': '📅 指定年份', 'callback_data': 'await:date_trend_year'}, - {'text': '📅 指定季度', 'callback_data': 'await:date_trend_quarter'}], - [{'text': '← 返回業績查詢', 'callback_data': 'menu:sales'}], - ] + return _menu_with_back([ + _row(('📅 近7日', 'cmd:trend:7'), + ('📅 近1個月', 'cmd:trend:month')), + _row(('📅 近3個月', 'cmd:trend:quarter'), + ('📅 近半年', 'cmd:trend:half')), + _row(('📅 本年度', 'cmd:trend:year'), + ('📅 指定月份', 'await:date_trend_month')), + _row(('📅 指定年份', 'await:date_trend_year'), + ('📅 指定季度', 'await:date_trend_quarter')), + ]) def _submenu_reports(): - return [ - [{'text': '📄 日報', 'callback_data': 'cmd:ppt:daily'}, - {'text': '📈 週報', 'callback_data': 'cmd:ppt:weekly'}], - [{'text': '📅 月報', 'callback_data': 'cmd:ppt:monthly'}, - {'text': '📋 下載報表', 'callback_data': 'cmd:report'}], - [{'text': '🧩 策略(日)', 'callback_data': 'cmd:ppt:strategy'}, - {'text': '🧩 策略(週)', 'callback_data': 'cmd:ppt:strategy weekly'}], - [{'text': '🧩 策略(月)', 'callback_data': 'cmd:ppt:strategy monthly'}, - {'text': '🧩 策略(季)', 'callback_data': 'cmd:ppt:strategy quarterly'}], - [{'text': '🧩 策略(半年)', 'callback_data': 'cmd:ppt:strategy half'}, - {'text': '🧩 策略(年)', 'callback_data': 'cmd:ppt:strategy yearly'}], - [{'text': '🎉 促銷效益簡報', 'callback_data': 'await:promo_range'}, - {'text': '🔍 競品比較', 'callback_data': 'menu:competitor'}], - [{'text': '📅 指定日期日報', 'callback_data': 'await:date_ppt_daily'}, - {'text': '📅 指定月份月報', 'callback_data': 'await:date_ppt_monthly'}], - _BACK, - ] + return _menu_with_back([ + _row(('📄 日報', 'cmd:ppt:daily'), + ('📈 週報', 'cmd:ppt:weekly')), + _row(('📅 月報', 'cmd:ppt:monthly'), + ('📋 下載報表', 'cmd:report')), + _row(('🧩 策略(日)', 'cmd:ppt:strategy'), + ('🧩 策略(週)', 'cmd:ppt:strategy weekly')), + _row(('🧩 策略(月)', 'cmd:ppt:strategy monthly'), + ('🧩 策略(季)', 'cmd:ppt:strategy quarterly')), + _row(('🧩 策略(半年)', 'cmd:ppt:strategy half'), + ('🧩 策略(年)', 'cmd:ppt:strategy yearly')), + _row(('🎉 促銷效益簡報', 'await:promo_range'), + ('🔍 競品比較', 'menu:competitor')), + _row(('📅 指定日期日報', 'await:date_ppt_daily'), + ('📅 指定月份月報', 'await:date_ppt_monthly')), + ]) def _submenu_market(): - return [ - [{'text': '📰 電商新聞', 'callback_data': 'cmd:news'}, - {'text': '🌤 台北天氣', 'callback_data': 'cmd:weather'}], - [{'text': '🔥 Google熱搜', 'callback_data': 'cmd:trends'}, - {'text': '💬 Dcard口碑', 'callback_data': 'cmd:dcard'}], - [{'text': '💱 台銀匯率', 'callback_data': 'cmd:exchange'}, - {'text': '📅 電商節慶', 'callback_data': 'cmd:calendar'}], - [{'text': '▶️ YouTube爆紅商品', 'callback_data': 'cmd:youtube'}, - {'text': '🧠 AI學習狀態', 'callback_data': 'cmd:learn'}], - [{'text': '🔍 關鍵字比價', 'callback_data': 'await:search_compare'}, - {'text': '📷 圖片比價說明', 'callback_data': 'cmd:photo_search_help'}], - _BACK, - ] + return _menu_with_back([ + _row(('📰 電商新聞', 'cmd:news'), + ('🌤 台北天氣', 'cmd:weather')), + _row(('🔥 Google熱搜', 'cmd:trends'), + ('💬 Dcard口碑', 'cmd:dcard')), + _row(('💱 台銀匯率', 'cmd:exchange'), + ('📅 電商節慶', 'cmd:calendar')), + _row(('▶️ YouTube爆紅商品', 'cmd:youtube'), + ('🧠 AI學習狀態', 'cmd:learn')), + _row(('🔍 關鍵字比價', 'await:search_compare'), + ('📷 圖片比價說明', 'cmd:photo_search_help')), + ]) def _submenu_competitor(): - """競品日報第二層:所有選項直接產 PPT""" + """競品日報第二層:所有選項直接產 PPT。""" today = datetime.now(TAIPEI_TZ).date() yesterday = today - timedelta(days=1) td_str = today.strftime('%Y/%m/%d') yd_str = yesterday.strftime('%Y/%m/%d') td_label = today.strftime('%m/%d') yd_label = yesterday.strftime('%m/%d') - return [ - [{'text': f'📊 今日簡報 ({td_label})', 'callback_data': f'cmd:ppt:competitor {td_str}'}, - {'text': f'📊 昨日簡報 ({yd_label})', 'callback_data': f'cmd:ppt:competitor {yd_str}'}], - [{'text': '📈 本週比較', 'callback_data': 'cmd:ppt:competitor weekly'}, - {'text': '📆 本月比較', 'callback_data': 'cmd:ppt:competitor monthly'}], - [{'text': '🗃 本季比較', 'callback_data': 'cmd:ppt:competitor quarterly'}, - {'text': '📅 指定日期', 'callback_data': 'await:date_competitor'}], - [{'text': '📄 更多週期 →', 'callback_data': 'menu:competitor_ppt'}], - _BACK, - ] + return _menu_with_back([ + _row((f'📊 今日簡報 ({td_label})', f'cmd:ppt:competitor {td_str}'), + (f'📊 昨日簡報 ({yd_label})', f'cmd:ppt:competitor {yd_str}')), + _row(('📈 本週比較', 'cmd:ppt:competitor weekly'), + ('📆 本月比較', 'cmd:ppt:competitor monthly')), + _row(('🗃 本季比較', 'cmd:ppt:competitor quarterly'), + ('📅 指定日期', 'await:date_competitor')), + _row(('📄 更多週期 →', 'menu:competitor_ppt')), + ]) def _submenu_competitor_ppt(): - """競品 PPT 長週期選單(第三層)— 半年/年;日/週/月/季已在第二層""" - return [ - [{'text': '📆 半年比較', 'callback_data': 'cmd:ppt:competitor half'}, - {'text': '🗓 年比較', 'callback_data': 'cmd:ppt:competitor yearly'}], - [{'text': '← 返回競品日報', 'callback_data': 'menu:competitor'}], - ] + """競品 PPT 長週期選單(第三層)— 半年/年。""" + return _menu_with_back([ + _row(('📆 半年比較', 'cmd:ppt:competitor half'), + ('🗓 年比較', 'cmd:ppt:competitor yearly')), + ]) _SUBMENUS = { diff --git a/services/openclaw_bot/telegram_api.py b/services/openclaw_bot/telegram_api.py index e48edd8..f73b824 100644 --- a/services/openclaw_bot/telegram_api.py +++ b/services/openclaw_bot/telegram_api.py @@ -15,7 +15,7 @@ import requests from services.logger_manager import SystemLogger -BOT_TOKEN = os.getenv("OPENCLAW_BOT_TOKEN", "") +BOT_TOKEN = os.getenv("OPENCLAW_BOT_TOKEN") or os.getenv("TELEGRAM_BOT_TOKEN", "") BOT_API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}" sys_log = SystemLogger("OpenClawBot").get_logger() @@ -73,6 +73,43 @@ def send_message(chat_id, text, reply_to=None, keyboard=None, parse_mode="Markdo return result +def _build_edit_payload(chat_id, message_id, text, pm, keyboard): + payload = {"chat_id": chat_id, "message_id": message_id, "text": text} + if pm: + payload["parse_mode"] = pm + if keyboard: + payload["reply_markup"] = {"inline_keyboard": keyboard} + return payload + + +def edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"): + """編輯既有訊息。Markdown 失敗時自動降級為純文字。""" + result = _tg( + "editMessageText", + _build_edit_payload(chat_id, message_id, text, parse_mode, keyboard), + ) + if result.get("ok"): + return result + + if parse_mode and not result.get("ok"): + err = result.get("description", "") + if "parse" in err.lower() or "entity" in err.lower() or "can't find" in err.lower(): + sys_log.warning("[OpenClawBot] editMessageText Markdown failed, retrying plain text") + result2 = _tg( + "editMessageText", + _build_edit_payload(chat_id, message_id, _strip_markdown(text), None, keyboard), + ) + if result2.get("ok"): + return result2 + + if len(text) > 4000: + sys_log.warning(f"[OpenClawBot] Message too long ({len(text)}), truncating") + truncated = _strip_markdown(text[:3900]) + "\n...(訊息過長已截斷)" + return _tg("editMessageText", _build_edit_payload(chat_id, message_id, truncated, None, keyboard)) + + return result + + def answer_callback(cq_id, text=""): return _tg("answerCallbackQuery", {"callback_query_id": cq_id, "text": text}) diff --git a/services/telegram_bot_service.py b/services/telegram_bot_service.py index 3d03e88..b617d13 100644 --- a/services/telegram_bot_service.py +++ b/services/telegram_bot_service.py @@ -17,8 +17,10 @@ import os import json import asyncio import logging +import requests from typing import Optional -from datetime import date, timedelta +from datetime import date, datetime, timedelta +from services.telegram_update_guard import is_duplicate_update as is_global_duplicate_update logger = logging.getLogger(__name__) @@ -38,6 +40,36 @@ CATEGORIES = ['美妝', '3C', '服飾', '居家', '母嬰', '電商', '優惠', class TrendTelegramBot: """趨勢資料庫 Telegram Bot 服務""" + @staticmethod + def _bool_env(name: str) -> bool: + return os.getenv(name, "").strip().lower() in {"1", "true", "yes", "on"} + + def should_run_polling(self) -> bool: + """決定是否啟動 polling:若已設定 webhook,預設不啟動 polling。""" + if self._bool_env("TELEGRAM_FORCE_POLLING"): + logger.info("[TrendTelegramBot] 強制啟用 polling(TELEGRAM_FORCE_POLLING=1)") + return True + + if self._bool_env("TELEGRAM_DISABLE_POLLING"): + logger.warning("[TrendTelegramBot] 停用 polling(TELEGRAM_DISABLE_POLLING=1)") + return False + + if not self.token: + logger.error("Telegram Token 未設定,無法判斷 webhook 狀態") + return False + + try: + payload = requests.get( + f"https://api.telegram.org/bot{self.token}/getWebhookInfo", + timeout=5, + ).json() + if not payload.get("ok"): + return True + return not bool((payload.get("result") or {}).get("url")) + except Exception as exc: + logger.warning(f"檢查 webhook 狀態失敗,預設不啟動 polling(除非 TELEGRAM_FORCE_POLLING=1):{exc}") + return self._bool_env("TELEGRAM_FORCE_POLLING") + def __init__(self, token: str = None): """ 初始化 Bot @@ -61,12 +93,16 @@ class TrendTelegramBot: if not TELEGRAM_AVAILABLE or not self.token: logger.error("無法啟動 Telegram Bot") return False + if not self.should_run_polling(): + logger.warning("檢測到 webhook 已啟用,跳過 polling 模式啟動") + return False try: self.application = Application.builder().token(self.token).build() # 註冊指令處理器 self.application.add_handler(CommandHandler("start", self.cmd_start)) + self.application.add_handler(CommandHandler("menu", self.cmd_menu)) self.application.add_handler(CommandHandler("help", self.cmd_help)) self.application.add_handler(CommandHandler("trend", self.cmd_trend)) self.application.add_handler(CommandHandler("search", self.cmd_search)) @@ -144,6 +180,32 @@ class TrendTelegramBot: keyboard.append([InlineKeyboardButton("🔙 返回主選單", callback_data="menu_main")]) return InlineKeyboardMarkup(keyboard) + def _to_inline_markup(self, keyboard): + """將 OpenClaw dict keyboard 轉成 python-telegram-bot markup。""" + if not keyboard: + return None + return InlineKeyboardMarkup([ + [ + InlineKeyboardButton( + text=button['text'], + callback_data=button['callback_data'], + ) + for button in row + ] + for row in keyboard + ]) + + async def cmd_menu(self, update: Update, context): + """顯示 OpenClaw 完整主選單。""" + from routes import openclaw_bot_routes as openclaw + + await update.message.reply_text( + "👋 *OpenClaw(小O)* — 電商智能助理\n\n" + "點下方按鈕,或直接用中文跟我說話 👇", + parse_mode='Markdown', + reply_markup=self._to_inline_markup(openclaw.main_menu_keyboard()), + ) + async def cmd_start(self, update: Update, context): """開始指令 - 顯示主選單""" user = update.effective_user @@ -392,8 +454,64 @@ class TrendTelegramBot: async def handle_callback(self, update: Update, context): """處理按鈕回調""" query = update.callback_query + data = query.data or '' + + def _build_polling_callback_dedupe_key(): + update_id = getattr(update, "update_id", None) + msg = getattr(query, "message", None) + msg_id = getattr(msg, "message_id", None) + chat_id = getattr(msg, "chat_id", None) if msg is not None else None + user_id = getattr(getattr(update, "effective_user", None), "id", None) + if user_id is None: + user_id = getattr(getattr(query, "from_user", None), "id", None) + parts = [] + if update_id is not None: + if query.id: + parts.append(f"cbq:{query.id}") + else: + parts.append(f"uid:{update_id}") + elif query.id: + parts.append(f"cbq:{query.id}") + if chat_id is not None: + parts.append(f"chat:{chat_id}") + if user_id is not None: + parts.append(f"user:{user_id}") + if msg_id is not None: + parts.append(f"msg:{msg_id}") + data_key = data or "" + if data_key: + parts.append(f"data:{data_key}") + return "cb:" + "|".join(parts) if parts else f"cb-query:{query.id}" + + try: + from routes import openclaw_bot_routes as openclaw + if data.startswith('menu_'): + key = data[5:] + if key in openclaw._SUBMENUS: + data = f"menu:{key}" + elif data.startswith('await_'): + key = data[6:] + if key in openclaw._AWAIT_PROMPTS: + data = f"await:{key}" + elif data.startswith('cmd_'): + data = f"cmd:{data[4:]}" + except Exception: + pass + dedupe_key = _build_polling_callback_dedupe_key() + + if is_global_duplicate_update(dedupe_key, namespace="telegram_update"): + logger.warning(f"忽略重複 callback key={dedupe_key}") + try: + await query.answer() + except Exception as exc: + logger.debug(f"callback 重複回覆失敗: {exc}") + return + await query.answer() - data = query.data + + if data.startswith(('menu:', 'cmd:', 'await:')): + await self._handle_openclaw_callback(query, context, data) + return # ===== 主選單按鈕 ===== if data == "menu_main": @@ -461,6 +579,73 @@ class TrendTelegramBot: elif data.startswith("settings_"): await self._handle_settings_callback(query, data) + async def _handle_openclaw_callback(self, query, context, data: str): + """轉接 OpenClaw 完整菜單 callback,避免長輪詢 Bot 吃掉 /menu。""" + chat_id = query.message.chat_id + reply_to = query.message.message_id + + try: + if data.startswith('menu:'): + from routes import openclaw_bot_routes as openclaw + + key = data[5:] + submenu = openclaw._SUBMENUS.get(key) + if not submenu: + await query.message.reply_text("⚠️ 找不到這個選單") + return + + titles = { + 'main': '👋 *OpenClaw* — 請選擇功能類別', + 'sales': '📊 *業績查詢* — 選擇日期或直接輸入', + 'products': '🏆 *商品廠商* — 選擇查詢範圍', + 'goals': '🎯 *目標管理* — 查看或設定業績目標', + 'analysis': '📈 *智能分析* — 選擇分析類型', + 'trend': '📈 *業績趨勢* — 選擇時間範圍', + 'reports': '📄 *簡報報表* — 選擇報告類型', + 'market': '🌐 *市場情報* — 即時資訊', + 'competitor': '📊 *競品比價日報* — 選擇分析日期', + 'competitor_ppt': '📄 *競品比價簡報* — 選擇時間範圍', + 'category': '🗂 *分類業績鑽取* — 點選分類深入分析', + } + await query.edit_message_text( + titles.get(key, '請選擇'), + parse_mode='Markdown', + reply_markup=self._to_inline_markup(submenu()), + ) + return + + if data.startswith('await:'): + from routes.openclaw_bot_routes import _AWAIT_PROMPTS + + action = data[6:] + prompt = _AWAIT_PROMPTS.get(action) + if not prompt: + await query.message.reply_text("⚠️ 找不到這個輸入流程") + return + + context.user_data['openclaw_waiting_for'] = action + prompt_text, _label = prompt + await query.edit_message_text( + f"{prompt_text}\n\n_輸入 `/取消` 可退出_", + parse_mode='Markdown', + reply_markup=self._to_inline_markup([ + [{'text': '✖ 取消', 'callback_data': 'menu:main'}] + ]), + ) + return + + if data.startswith('cmd:'): + from routes.openclaw_bot_routes import handle_cmd + + parts = data[4:].split(':', 1) + await query.message.reply_chat_action(action='typing') + handle_cmd(parts[0], parts[1] if len(parts) > 1 else '', chat_id, reply_to) + return + + except Exception as e: + logger.error(f"OpenClaw callback 轉接失敗: {e}", exc_info=True) + await query.message.reply_text("⚠️ 功能執行失敗,請稍後再試。") + async def _show_trend_by_category(self, query, category: str): """顯示指定分類的趨勢""" try: @@ -702,6 +887,12 @@ class TrendTelegramBot: """處理一般訊息""" text = update.message.text waiting_for = context.user_data.get('waiting_for') + openclaw_waiting_for = context.user_data.get('openclaw_waiting_for') + + if openclaw_waiting_for: + context.user_data['openclaw_waiting_for'] = None + await self._process_openclaw_input(update, openclaw_waiting_for, text) + return # 處理等待輸入的狀態 if waiting_for == 'search_query': @@ -757,6 +948,127 @@ class TrendTelegramBot: logger.error(f"自然對話處理失敗: {e}") await update.message.reply_text("不好意思,我現在無法回答這個問題,請稍後再試。") + async def _process_openclaw_input(self, update: Update, action: str, text: str): + """處理 OpenClaw 菜單進入的文字輸入流程。""" + from routes import openclaw_bot_routes as openclaw + + chat_id = update.effective_chat.id + reply_to = update.message.message_id + val = text.strip().replace(',', '').replace('NT$', '').replace('$', '').strip() + + if text in ('/取消', '/cancel'): + await update.message.reply_text( + "已取消", + reply_markup=self._to_inline_markup(openclaw.main_menu_keyboard()), + ) + return + + try: + if action.startswith('goal_'): + period_map = { + 'goal_daily': 'daily', + 'goal_monthly': 'monthly', + 'goal_quarterly': 'quarterly', + 'goal_half': 'half', + 'goal_yearly': 'yearly', + } + period = period_map[action] + amount = float(val) + openclaw._GOALS[period] = amount + await update.message.reply_text( + f"✅ 目標已設定為 NT$ {amount:,.0f}", + reply_markup=self._to_inline_markup(openclaw._submenu_goals()), + ) + return + + if action == 'search_compare': + openclaw.handle_cmd('competitor', val, chat_id, reply_to) + return + + if action == 'date_range_sales': + import re + + dates = re.findall(r'\d{4}[/\-]\d{1,2}[/\-]\d{1,2}', val) + month_only = re.match(r'(\d{4})[/\-](\d{1,2})$', val) + if len(dates) >= 2: + start = openclaw.normalize_date(dates[0]) + end = openclaw.normalize_date(dates[1]) + if start == end: + openclaw.handle_cmd('sales', start, chat_id, reply_to) + else: + start_d = datetime.strptime(start.replace('/', '-'), '%Y-%m-%d').date() + end_d = datetime.strptime(end.replace('/', '-'), '%Y-%m-%d').date() + days_count = (end_d - start_d).days + 1 + data = openclaw.query_trend_range(start, end) + if data: + period_label = f'{start} ~ {end}({days_count}天)' + openclaw.send_message( + chat_id, + openclaw.fmt_trend(data, period_label), + reply_to, + openclaw._submenu_sales(), + ) + else: + openclaw.send_message(chat_id, f"⚠️ {start} ~ {end} 查無業績資料", reply_to) + elif len(dates) == 1: + openclaw.handle_cmd('sales', openclaw.normalize_date(dates[0]), chat_id, reply_to) + elif month_only: + openclaw.handle_cmd( + 'history', + f"{month_only.group(1)}/{int(month_only.group(2)):02d}", + chat_id, + reply_to, + ) + else: + await update.message.reply_text("⚠️ 格式錯誤,請重新輸入日期或日期區間。") + return + + if action.startswith('date_trend_'): + openclaw.handle_cmd('trend', val.replace('-', '/'), chat_id, reply_to) + return + + if action.startswith('date_'): + import re + + date_val = val.replace('-', '/') + if not re.match(r'\d{4}/\d{1,2}(/\d{1,2})?$', date_val): + await update.message.reply_text("⚠️ 日期格式錯誤,請重新輸入。") + return + + command_map = { + 'date_sales': ('sales', date_val), + 'date_top': ('top', date_val), + 'date_analysis': ('strategy', date_val), + 'date_ppt_daily': ('ppt', f'daily {date_val}'), + 'date_ppt_monthly': ('ppt', f'monthly {date_val}'), + 'date_competitor': ('ppt', f'competitor {date_val}'), + } + command = command_map.get(action) + if command: + openclaw.handle_cmd(command[0], command[1], chat_id, reply_to) + return + + if action == 'promo_range': + import re + + dates = re.findall(r'\d{4}[/\-]\d{1,2}[/\-]\d{1,2}', val) + if len(dates) >= 2: + openclaw.handle_cmd( + 'promo', + f"{openclaw.normalize_date(dates[0])}-{openclaw.normalize_date(dates[1])}", + chat_id, + reply_to, + ) + else: + await update.message.reply_text("⚠️ 格式錯誤,例如:2026/04/01-2026/04/07") + return + + await update.message.reply_text("⚠️ 這個輸入流程暫時無法處理。") + + except Exception as e: + logger.error(f"OpenClaw 輸入流程處理失敗: {e}", exc_info=True) + await update.message.reply_text("⚠️ 輸入處理失敗,請稍後再試。") + async def _process_search(self, update: Update, query: str): """處理搜尋請求""" await update.message.reply_text(f"🔍 正在搜尋「{query}」...") @@ -950,6 +1262,7 @@ class TrendTelegramBot: # 註冊處理器 self.application.add_handler(CommandHandler("start", self.cmd_start)) + self.application.add_handler(CommandHandler("menu", self.cmd_menu)) self.application.add_handler(CommandHandler("help", self.cmd_help)) self.application.add_handler(CommandHandler("trend", self.cmd_trend)) self.application.add_handler(CommandHandler("search", self.cmd_search)) diff --git a/services/telegram_update_guard.py b/services/telegram_update_guard.py new file mode 100644 index 0000000..a23acda --- /dev/null +++ b/services/telegram_update_guard.py @@ -0,0 +1,191 @@ +"""Shared Telegram update deduplication utilities. + +Both webhook and polling paths should use this helper so duplicated Telegram +updates are ignored regardless of which consumer receives them first. +""" + +from __future__ import annotations + +from collections import deque +from datetime import datetime, timedelta +import inspect +from threading import Lock +import os +import sys + +from sqlalchemy import text + +from database.manager import DatabaseManager +from services.logger_manager import SystemLogger + + +sys_log = SystemLogger("TelegramUpdateDedup").get_logger() + +_SEEN_MAX = 500 +_UPDATE_ID_TTL_SECONDS = 300 +_UPDATE_ID_DB_CLEANUP_EVERY_SECONDS = 60 + +_seen_update_ids: deque = deque() +_seen_update_id_set: set = set() +_seen_update_lock = Lock() + +_dedup_table_ready = False +_dedup_table_lock = Lock() +_last_cleanup_ts = 0.0 + + +def _is_pytest_context() -> bool: + return bool( + os.getenv("PYTEST_CURRENT_TEST") + or "pytest" in os.path.basename(sys.argv[0] or "").lower() + ) + + +def _infer_pytest_scope() -> str | None: + """Fallback for pytest runs where `PYTEST_CURRENT_TEST` 未注入時,透過堆疊還原測試名。""" + for frame_info in inspect.stack(): + if frame_info.function.startswith("test_") and "test" in frame_info.filename: + return f"{os.path.basename(frame_info.filename)}::{frame_info.function}" + return None + + +def _normalize_update_id(update_id) -> str | None: + """Normalize Telegram update identifiers into stable string keys.""" + if update_id is None: + return None + try: + return str(int(update_id)) + except (TypeError, ValueError): + return str(update_id) + + +def _ensure_update_dedup_table() -> bool: + """建立 webhook 去重用的 DB 快取表(首次需要)。""" + global _dedup_table_ready + if _dedup_table_ready: + return True + + with _dedup_table_lock: + if _dedup_table_ready: + return True + try: + db = DatabaseManager() + session = db.get_session() + try: + session.execute( + text(""" + CREATE TABLE IF NOT EXISTS telegram_update_dedup ( + update_key TEXT PRIMARY KEY, + created_at TIMESTAMP NOT NULL + ) + """) + ) + session.commit() + _dedup_table_ready = True + return True + except Exception as exc: + session.rollback() + sys_log.warning( + f"[TelegramUpdateDedup] 無法初始化 telegram_update_dedup 表,回退記憶體去重:{exc}" + ) + return False + finally: + session.close() + except Exception as exc: + sys_log.warning( + f"[TelegramUpdateDedup] 連線 DB 失敗,回退記憶體去重:{exc}" + ) + return False + + +def _is_duplicate_update_db(update_key: str) -> bool: + """以 DB 去重:同 update_key 已存在於 dedup 表時判定為重複。""" + if not _ensure_update_dedup_table(): + return False + + now = datetime.now() + now_ts = now.timestamp() + global _last_cleanup_ts + + try: + db = DatabaseManager() + session = db.get_session() + try: + if now_ts - _last_cleanup_ts > _UPDATE_ID_DB_CLEANUP_EVERY_SECONDS: + cutoff = now - timedelta(seconds=_UPDATE_ID_TTL_SECONDS) + session.execute( + text( + """ + DELETE FROM telegram_update_dedup + WHERE created_at < :cutoff + """ + ), + {"cutoff": cutoff}, + ) + _last_cleanup_ts = now_ts + + result = session.execute( + text(""" + INSERT INTO telegram_update_dedup(update_key, created_at) + VALUES (:update_key, :created_at) + ON CONFLICT (update_key) DO NOTHING + """), + { + "update_key": update_key, + "created_at": now, + }, + ) + session.commit() + return result.rowcount == 0 + except Exception as exc: + session.rollback() + sys_log.debug( + f"[TelegramUpdateDedup] DB 去重插入失敗,回退記憶體去重:{exc}" + ) + return False + finally: + session.close() + except Exception as exc: + sys_log.debug( + f"[TelegramUpdateDedup] DB 去重流程失敗,回退記憶體去重:{exc}" + ) + return False + + +def is_duplicate_update(update_id, namespace: str = "telegram") -> bool: + """ + 回傳是否為重複 update。 + + 先走 DB 去重,若 DB 異常則回退到 process memory 快取。 + """ + normalized = _normalize_update_id(update_id) + if normalized is None: + return False + + test_scope = os.getenv("PYTEST_CURRENT_TEST") + if not test_scope and _is_pytest_context(): + test_scope = _infer_pytest_scope() + if test_scope: + namespace = f"{namespace}:{test_scope}" + + update_key = f"{namespace}:{normalized}" + + if _is_pytest_context(): + return _is_duplicate_update_memory(update_key) + + if _is_duplicate_update_db(update_key): + return True + return _is_duplicate_update_memory(update_key) + + +def _is_duplicate_update_memory(update_key: str) -> bool: + """使用 process 記憶體去重(測試情境用)""" + with _seen_update_lock: + if update_key in _seen_update_id_set: + return True + _seen_update_ids.append(update_key) + _seen_update_id_set.add(update_key) + if len(_seen_update_ids) > _SEEN_MAX: + old = _seen_update_ids.popleft() + _seen_update_id_set.discard(old) + return False diff --git a/tests/test_openclaw_bot_menu_keyboards.py b/tests/test_openclaw_bot_menu_keyboards.py index f1b92e5..c180065 100644 --- a/tests/test_openclaw_bot_menu_keyboards.py +++ b/tests/test_openclaw_bot_menu_keyboards.py @@ -55,9 +55,97 @@ def test_category_menu_and_submenu_registry_are_stable(): } +def test_competitor_menu_keeps_date_input_action(): + from services.openclaw_bot import menu_keyboards + + menu_keyboards.configure_menu_keyboards(latest_date_provider=lambda: "2026/04/30") + + rows = menu_keyboards._submenu_competitor() + + assert any("指定日期" in button["text"] and button["callback_data"] == "await:date_competitor" + for row in rows for button in row) + + +def test_competitor_ppt_menu_layout_stays_row_based(): + from services.openclaw_bot import menu_keyboards + + rows = menu_keyboards._submenu_competitor_ppt() + + assert rows[0][0]["text"] == "📆 半年比較" + assert rows[0][1]["text"] == "🗓 年比較" + assert rows[-1] == menu_keyboards._BACK + + +def test_chunk_rows_respects_row_size(): + from services.openclaw_bot import menu_keyboards + + rows = menu_keyboards._chunk_rows( + [ + ('A', 'a'), + ('B', 'b'), + ('C', 'c'), + ], + row_size=2, + ) + assert rows == [ + [{'text': 'A', 'callback_data': 'a'}, {'text': 'B', 'callback_data': 'b'}], + [{'text': 'C', 'callback_data': 'c'}], + ] + + def test_openclaw_routes_import_menu_keyboard_helpers(): route_source = Path("routes/openclaw_bot_routes.py").read_text(encoding="utf-8") assert "from services.openclaw_bot.menu_keyboards import" in route_source assert "configure_menu_keyboards(latest_date_provider=latest_date" in route_source assert "def main_menu_keyboard():" not in route_source + + +def test_quick_menu_keyboard_has_two_column_layout(): + from services.openclaw_bot import menu_keyboards + + menu_keyboards.configure_menu_keyboards() + rows = menu_keyboards.quick_menu_keyboard() + + assert len(rows) == 3 + assert all(1 <= len(row) <= 2 for row in rows) + assert rows[0][0]['callback_data'].startswith('menu:') + + +def test_polling_telegram_bot_bridges_openclaw_menu_callbacks(): + service_source = Path("services/telegram_bot_service.py").read_text(encoding="utf-8") + + assert 'CommandHandler("menu", self.cmd_menu)' in service_source + assert "data.startswith(('menu:', 'cmd:', 'await:'))" in service_source + assert "openclaw_waiting_for" in service_source + + +def test_market_info_handlers_accept_text_mcp_contract(): + route_source = Path("routes/openclaw_bot_routes.py").read_text(encoding="utf-8") + + assert "def _send_mcp_text_result" in route_source + assert "data = get_upcoming_events()" in route_source + assert "get_upcoming_events(60)" not in route_source + + +def test_mcp_collector_has_stable_fallbacks(): + source = Path("services/mcp_collector_service.py").read_text(encoding="utf-8") + + assert "def _fallback_topic_content" in source + assert "def _looks_unreliable" in source + assert '["google_search"]' in source + assert "return self._fallback_topic_content" in source + + +def test_polling_menu_imports_openclaw_routes_for_runtime_configuration(): + service_source = Path("services/telegram_bot_service.py").read_text(encoding="utf-8") + + assert "from routes import openclaw_bot_routes as openclaw" in service_source + assert "openclaw.main_menu_keyboard()" in service_source + assert "openclaw._SUBMENUS.get(key)" in service_source + + +def test_help_keyboard_uses_reusable_quick_menu(): + route_source = Path("routes/openclaw_bot_routes.py").read_text(encoding="utf-8") + + assert "help_kb = quick_menu_keyboard()" in route_source diff --git a/tests/test_openclaw_bot_routes_webhook.py b/tests/test_openclaw_bot_routes_webhook.py new file mode 100644 index 0000000..f9a64be --- /dev/null +++ b/tests/test_openclaw_bot_routes_webhook.py @@ -0,0 +1,548 @@ +from flask import Flask + + +def _build_request_app(): + return Flask(__name__) + + +def test_webhook_menu_command_handles_bot_suffix(monkeypatch): + from routes import openclaw_bot_routes as bot + + calls = [] + + def fake_handle_cmd(cmd, arg, chat_id, reply_to): + calls.append((cmd, arg, chat_id, reply_to)) + + monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None) + monkeypatch.setattr(bot, "BOT_USERNAME", "@KnownBot") + + app = _build_request_app() + payload = { + "update_id": 10001, + "message": { + "message_id": 55, + "chat": {"id": -200, "type": "supergroup"}, + "from": {"id": 777}, + "text": "/menu@OtherBot", + }, + } + + with app.test_request_context( + "/bot/telegram/webhook", method="POST", json=payload + ): + bot.telegram_webhook() + + assert calls == [("menu", "", -200, 55)] + + +def test_private_menu_command_is_allowed_when_no_whitelist_and_fallback_enabled(monkeypatch): + from routes import openclaw_bot_routes as bot + + calls = [] + + def fake_handle_cmd(cmd, arg, chat_id, reply_to): + calls.append((cmd, arg, chat_id, reply_to)) + + monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + monkeypatch.setattr(bot, "BOT_USERNAME", "@KnownBot") + monkeypatch.setattr(bot, "_ALLOW_PRIVATE_WITHOUT_WHITELIST", True) + monkeypatch.setattr(bot, "ALLOWED_USERS", set()) + + app = _build_request_app() + payload = { + "update_id": 10020, + "message": { + "message_id": 55, + "chat": {"id": 777, "type": "private"}, + "from": {"id": 777777}, + "text": "/menu", + }, + } + + with app.test_request_context( + "/bot/telegram/webhook", method="POST", json=payload + ): + bot.telegram_webhook() + + assert calls == [("menu", "", 777, 55)] + + +def test_is_authorized_private_mode_switch(monkeypatch): + from routes import openclaw_bot_routes as bot + + monkeypatch.setattr(bot, "ALLOWED_USERS", set()) + monkeypatch.setattr(bot, "_ALLOW_PRIVATE_WITHOUT_WHITELIST", True) + assert bot._is_authorized("private", 777, 42) is True + + monkeypatch.setattr(bot, "_ALLOW_PRIVATE_WITHOUT_WHITELIST", False) + assert bot._is_authorized("private", 777, 42) is False + + +def test_webhook_menu_callback_edits_existing_message(monkeypatch): + from routes import openclaw_bot_routes as bot + + edited = [] + + def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"): + edited.append((chat_id, message_id, text, keyboard, parse_mode)) + return {"ok": True} + + monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + + app = _build_request_app() + payload = { + "update_id": 10002, + "callback_query": { + "id": "cb1", + "from": {"id": 777}, + "message": { + "message_id": 66, + "chat": {"id": -200, "type": "supergroup"}, + }, + "data": "menu:main", + }, + } + + with app.test_request_context( + "/bot/telegram/webhook", method="POST", json=payload + ): + bot.telegram_webhook() + + assert len(edited) == 1 + chat_id, message_id, text, keyboard, parse_mode = edited[0] + assert chat_id == -200 + assert message_id == 66 + assert text == "👋 *OpenClaw* — 請選擇功能類別" + assert isinstance(keyboard, list) + assert parse_mode == "Markdown" + + +def test_webhook_legacy_menu_callback_normalizes_prefix(monkeypatch): + from routes import openclaw_bot_routes as bot + + edited = [] + + def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"): + edited.append((chat_id, message_id, text, keyboard, parse_mode)) + return {"ok": True} + + monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + + app = _build_request_app() + payload = { + "update_id": 10005, + "callback_query": { + "id": "cb-legacy", + "from": {"id": 777}, + "message": {"message_id": 123, "chat": {"id": -200, "type": "supergroup"}}, + "data": "menu_main", + }, + } + + with app.test_request_context( + "/bot/telegram/webhook", method="POST", json=payload + ): + bot.telegram_webhook() + + assert len(edited) == 1 + assert edited[0][0] == -200 + assert edited[0][1] == 123 + assert edited[0][2] == "👋 *OpenClaw* — 請選擇功能類別" + + +def test_webhook_await_callback_edits_existing_message(monkeypatch): + from routes import openclaw_bot_routes as bot + + edited = [] + + def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"): + edited.append((chat_id, message_id, text, keyboard, parse_mode)) + return {"ok": True} + + monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + + app = _build_request_app() + payload = { + "update_id": 10003, + "callback_query": { + "id": "cb2", + "from": {"id": 777}, + "message": { + "message_id": 77, + "chat": {"id": -200, "type": "supergroup"}, + }, + "data": "await:date_range_sales", + }, + } + + with app.test_request_context( + "/bot/telegram/webhook", method="POST", json=payload + ): + bot.telegram_webhook() + + assert len(edited) == 1 + _, _, text, keyboard, _ = edited[0] + assert "輸入 `/取消` 可退出_" in text + assert keyboard == [[{"text": "✖ 取消", "callback_data": "menu:main"}]] + + +def test_webhook_cmd_callback_updates_with_message_edit(monkeypatch): + from routes import openclaw_bot_routes as bot + + edited = [] + sent = [] + + def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"): + edited.append((chat_id, message_id, text, keyboard, parse_mode)) + return {"ok": True} + + def fake_handle_cmd(cmd, arg, chat_id, reply_to): + bot.send_message(chat_id, f"{cmd}:{arg}", reply_to=reply_to) + + def fake_send_message(chat_id, text, reply_to=None, keyboard=None, parse_mode="Markdown"): + sent.append((chat_id, text, reply_to, keyboard, parse_mode)) + return {"ok": True} + + monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text) + monkeypatch.setattr(bot, "send_message", fake_send_message) + monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + + app = _build_request_app() + payload = { + "update_id": 10004, + "callback_query": { + "id": "cb3", + "from": {"id": 777}, + "message": {"message_id": 88, "chat": {"id": -200, "type": "supergroup"}}, + "data": "cmd:sales:2026/04/30", + }, + } + + with app.test_request_context( + "/bot/telegram/webhook", method="POST", json=payload + ): + bot.telegram_webhook() + + assert len(edited) == 1 + assert edited[0][0] == -200 + assert edited[0][1] == 88 + assert edited[0][2] == "sales:2026/04/30" + assert edited[0][4] == "Markdown" + assert sent == [] + + +def test_webhook_duplicate_update_id_is_skipped(monkeypatch): + from routes import openclaw_bot_routes as bot + + calls = [] + answered = [] + + def fake_handle_cmd(cmd, arg, chat_id, reply_to): + calls.append((cmd, arg, chat_id, reply_to)) + + monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": answered.append(_cq_id)) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + monkeypatch.setattr(bot, "send_message", lambda *_args, **_kwargs: {"ok": True}) + + app = _build_request_app() + payload = { + "update_id": 20001, + "callback_query": { + "id": "dup-cb", + "from": {"id": 777}, + "message": {"message_id": 99, "chat": {"id": -200, "type": "supergroup"}}, + "data": "menu:main", + }, + } + + with app.test_request_context( + "/bot/telegram/webhook", method="POST", json=payload + ): + bot.telegram_webhook() + + with app.test_request_context( + "/bot/telegram/webhook", method="POST", json=payload + ): + bot.telegram_webhook() + + assert calls == [] + assert answered == ["dup-cb", "dup-cb"] + + +def test_webhook_cmd_callback_ignores_not_modified(monkeypatch): + from routes import openclaw_bot_routes as bot + + edited = [] + sent = [] + + def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"): + edited.append((chat_id, message_id, text, keyboard, parse_mode)) + return {"ok": False, "description": "Bad Request: message is not modified"} + + def fake_handle_cmd(cmd, arg, chat_id, reply_to): + bot.send_message(chat_id, f"{cmd}:{arg}", reply_to=reply_to) + + def fake_send_message(chat_id, text, reply_to=None, keyboard=None, parse_mode="Markdown", **_kwargs): + sent.append((chat_id, text, reply_to, keyboard, parse_mode)) + return {"ok": True} + + monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text) + monkeypatch.setattr(bot, "send_message", fake_send_message) + monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + + app = _build_request_app() + payload = { + "update_id": 20002, + "callback_query": { + "id": "cb4", + "from": {"id": 777}, + "message": {"message_id": 101, "chat": {"id": -200, "type": "supergroup"}}, + "data": "cmd:sales:2026/04/30", + }, + } + + with app.test_request_context( + "/bot/telegram/webhook", method="POST", json=payload + ): + bot.telegram_webhook() + + assert edited + assert sent == [] + + +def test_webhook_menu_callback_does_not_duplicate_on_message_not_found(monkeypatch): + from routes import openclaw_bot_routes as bot + + sent = [] + + def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"): + return { + "ok": False, + "error_code": 404, + "description": "Bad Request: message to edit not found", + } + + def fake_send_message(chat_id, text, reply_to=None, keyboard=None, parse_mode="Markdown", **_kwargs): + sent.append((chat_id, text, reply_to, keyboard, parse_mode)) + return {"ok": True} + + monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text) + monkeypatch.setattr(bot, "send_message", fake_send_message) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + + app = _build_request_app() + payload = { + "update_id": 10006, + "callback_query": { + "id": "cb5", + "from": {"id": 777}, + "message": {"message_id": 222, "chat": {"id": -200, "type": "supergroup"}}, + "data": "menu:main", + }, + } + + with app.test_request_context( + "/bot/telegram/webhook", method="POST", json=payload + ): + bot.telegram_webhook() + + assert sent == [] + + +def test_webhook_cmd_callback_does_not_duplicate_on_message_not_found(monkeypatch): + from routes import openclaw_bot_routes as bot + + sent = [] + + def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"): + return { + "ok": False, + "error_code": 404, + "description": "Bad Request: MESSAGE_ID_INVALID", + } + + def fake_handle_cmd(cmd, arg, chat_id, reply_to): + bot.send_message(chat_id, f"{cmd}:{arg}", reply_to=reply_to) + + def fake_send_message(chat_id, text, reply_to=None, keyboard=None, parse_mode="Markdown", **_kwargs): + sent.append((chat_id, text, reply_to, keyboard, parse_mode)) + return {"ok": True} + + monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text) + monkeypatch.setattr(bot, "send_message", fake_send_message) + monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + + app = _build_request_app() + payload = { + "update_id": 10007, + "callback_query": { + "id": "cb6", + "from": {"id": 777}, + "message": {"message_id": 333, "chat": {"id": -200, "type": "supergroup"}}, + "data": "cmd:sales:2026/04/30", + }, + } + + with app.test_request_context( + "/bot/telegram/webhook", method="POST", json=payload + ): + bot.telegram_webhook() + + assert sent == [] + + +def test_webhook_callback_dedup_key_without_update_id(monkeypatch): + from routes import openclaw_bot_routes as bot + + calls = [] + + # 當 callback 沒有 update_id 時,第二次同樣 payload 要直接被 dedupe, + # 但第一次仍應可正常執行一次。 + from services import telegram_update_guard as guard + guard._seen_update_ids.clear() + guard._seen_update_id_set.clear() + + def fake_handle_cmd(cmd, arg, chat_id, reply_to): + calls.append((cmd, arg, chat_id, reply_to)) + + monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + + app = _build_request_app() + payload = { + "callback_query": { + "id": "cb-no-id", + "from": {"id": 777}, + "message": {"message_id": 123, "chat": {"id": -200, "type": "supergroup"}}, + "data": "cmd:sales:2026/04/30", + }, + } + + with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload): + bot.telegram_webhook() + + with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload): + bot.telegram_webhook() + + assert calls == [('sales', '2026/04/30', -200, 123)] + + +def test_webhook_callback_dedup_key_varies_by_message_id(monkeypatch): + from routes import openclaw_bot_routes as bot + + calls = [] + + from services import telegram_update_guard as guard + guard._seen_update_ids.clear() + guard._seen_update_id_set.clear() + + def fake_handle_cmd(cmd, arg, chat_id, reply_to): + calls.append((cmd, arg, chat_id, reply_to)) + + monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + + app = _build_request_app() + payload_1 = { + "callback_query": { + "id": "cb-no-id", + "from": {"id": 777}, + "message": {"message_id": 201, "chat": {"id": -200, "type": "supergroup"}}, + "data": "cmd:sales:2026/04/30", + }, + } + payload_2 = { + "callback_query": { + "id": "cb-no-id", + "from": {"id": 777}, + "message": {"message_id": 202, "chat": {"id": -200, "type": "supergroup"}}, + "data": "cmd:sales:2026/04/30", + }, + } + + with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload_1): + bot.telegram_webhook() + + with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload_2): + bot.telegram_webhook() + + assert calls == [ + ('sales', '2026/04/30', -200, 201), + ('sales', '2026/04/30', -200, 202), + ] + + +def test_webhook_callback_dedup_with_same_callback_query_id_different_update_id(monkeypatch): + from routes import openclaw_bot_routes as bot + + calls = [] + + from services import telegram_update_guard as guard + guard._seen_update_ids.clear() + guard._seen_update_id_set.clear() + + def fake_handle_cmd(cmd, arg, chat_id, reply_to): + calls.append((cmd, arg, chat_id, reply_to)) + + monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd) + monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True) + monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None) + monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None) + + app = _build_request_app() + payload_1 = { + "update_id": 30001, + "callback_query": { + "id": "cb-repeat", + "from": {"id": 777}, + "message": {"message_id": 301, "chat": {"id": -200, "type": "supergroup"}}, + "data": "cmd:sales:2026/04/30", + }, + } + payload_2 = { + "update_id": 30002, + "callback_query": { + "id": "cb-repeat", + "from": {"id": 777}, + "message": {"message_id": 301, "chat": {"id": -200, "type": "supergroup"}}, + "data": "cmd:sales:2026/04/30", + }, + } + + with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload_1): + bot.telegram_webhook() + + with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload_2): + bot.telegram_webhook() + + assert calls == [('sales', '2026/04/30', -200, 301)] diff --git a/tests/test_openclaw_bot_telegram_api.py b/tests/test_openclaw_bot_telegram_api.py index 548f4d2..5b4e7f7 100644 --- a/tests/test_openclaw_bot_telegram_api.py +++ b/tests/test_openclaw_bot_telegram_api.py @@ -1,4 +1,5 @@ from pathlib import Path +import importlib class FakeResponse: @@ -86,3 +87,15 @@ def test_openclaw_routes_keep_tg_helper_import_for_webhook_management(): assert "_tg('setMyCommands'" in route_source assert "_tg('setWebhook'" in route_source assert " _tg,\n" in route_source + + +def test_openclaw_telegram_api_falls_back_to_shared_bot_token(monkeypatch): + monkeypatch.delenv("OPENCLAW_BOT_TOKEN", raising=False) + monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "shared-token") + + from services.openclaw_bot import telegram_api + + reloaded = importlib.reload(telegram_api) + + assert reloaded.BOT_TOKEN == "shared-token" + assert reloaded.BOT_API_URL == "https://api.telegram.org/botshared-token" diff --git a/tests/test_telegram_update_guard.py b/tests/test_telegram_update_guard.py new file mode 100644 index 0000000..095b22a --- /dev/null +++ b/tests/test_telegram_update_guard.py @@ -0,0 +1,26 @@ +from time import time + + +def test_update_guard_detects_duplicate_key(): + from services import telegram_update_guard as guard + + # 使用可變動 key,避免被歷史資料干擾 + unique = f"unit-test-{int(time() * 1000)}" + + # 清掉本機快取,避免測試順序影響 + guard._seen_update_ids.clear() + guard._seen_update_id_set.clear() + + assert guard.is_duplicate_update(unique, namespace="pytest") is False + assert guard.is_duplicate_update(unique, namespace="pytest") is True + + +def test_update_guard_separates_namespace(): + from services import telegram_update_guard as guard + + guard._seen_update_ids.clear() + guard._seen_update_id_set.clear() + + event_id = f"namespace-check-{int(time() * 1000)}" + assert guard.is_duplicate_update(event_id, namespace="a") is False + assert guard.is_duplicate_update(event_id, namespace="b") is False diff --git a/tests/test_trend_telegram_bot_service.py b/tests/test_trend_telegram_bot_service.py new file mode 100644 index 0000000..10e1772 --- /dev/null +++ b/tests/test_trend_telegram_bot_service.py @@ -0,0 +1,172 @@ +import asyncio + +import pytest + +from types import SimpleNamespace + +from services import telegram_bot_service + + +pytestmark = pytest.mark.skipif( + not telegram_bot_service.TELEGRAM_AVAILABLE, + reason="python-telegram-bot 未安裝", +) + + +def _make_polling_update(query): + """建立最小的 polling callback 更新結構。""" + return SimpleNamespace(callback_query=query) + + +class _FakeMessage: + def __init__(self, chat_id=-200, message_id=1): + self.chat_id = chat_id + self.message_id = message_id + self.replies = [] + + async def reply_text(self, text, **kwargs): + self.replies.append((text, kwargs)) + + +class _FakeQuery: + def __init__(self, query_id, data, message): + self.id = query_id + self.data = data + self.message = message + self.answers = 0 + + async def answer(self): + self.answers += 1 + + async def edit_message_text(self, *args, **kwargs): + return {"ok": True} + + +def _run(coro): + return asyncio.run(coro) + + +def test_polling_callback_dedup_without_update_id(monkeypatch): + from services.telegram_bot_service import TrendTelegramBot + + seen = {} + + def fake_dedupe(_key, namespace="telegram_update"): + if _key in seen: + return True + seen[_key] = True + return False + + bot = TrendTelegramBot(token="dummy") + + called = [] + async def fake_openclaw_callback(*args): + called.append(args) + + monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe) + bot._handle_openclaw_callback = fake_openclaw_callback + + context = SimpleNamespace(user_data={}) + query = _FakeQuery("cb-no-id", "cmd:sales:2026/04/30", _FakeMessage(message_id=123)) + + _run(bot.handle_callback(_make_polling_update(query), context)) + _run(bot.handle_callback(_make_polling_update(query), context)) + + assert len(called) == 1 + assert called[0][2] == "cmd:sales:2026/04/30" + + +def test_polling_callback_dedup_depends_on_message_id(monkeypatch): + from services.telegram_bot_service import TrendTelegramBot + + seen = {} + + def fake_dedupe(_key, namespace="telegram_update"): + if _key in seen: + return True + seen[_key] = True + return False + + bot = TrendTelegramBot(token="dummy") + + called = [] + async def fake_openclaw_callback(*args): + called.append(args) + + monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe) + bot._handle_openclaw_callback = fake_openclaw_callback + + context = SimpleNamespace(user_data={}) + q1 = _FakeQuery("cb-no-id", "cmd:sales:2026/04/30", _FakeMessage(message_id=201)) + q2 = _FakeQuery("cb-no-id", "cmd:sales:2026/04/30", _FakeMessage(message_id=202)) + + _run(bot.handle_callback(_make_polling_update(q1), context)) + _run(bot.handle_callback(_make_polling_update(q2), context)) + + assert called == [ + (q1, context, "cmd:sales:2026/04/30"), + (q2, context, "cmd:sales:2026/04/30"), + ] + + +def test_polling_callback_dedup_with_same_query_id_different_update_id(monkeypatch): + from services.telegram_bot_service import TrendTelegramBot + + seen = {} + + def fake_dedupe(_key, namespace="telegram_update"): + if _key in seen: + return True + seen[_key] = True + return False + + bot = TrendTelegramBot(token="dummy") + + called = [] + + async def fake_openclaw_callback(*args): + called.append(args) + + monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe) + bot._handle_openclaw_callback = fake_openclaw_callback + + context = SimpleNamespace(user_data={}) + q1 = _FakeQuery("cb-repeat", "cmd:sales:2026/04/30", _FakeMessage(message_id=301)) + q2 = _FakeQuery("cb-repeat", "cmd:sales:2026/04/30", _FakeMessage(message_id=301)) + + # 為了模擬 update_id 不同,帶入不同的 update 物件 + u1 = SimpleNamespace(callback_query=q1, effective_user=SimpleNamespace(id=777), update_id=30001) + u2 = SimpleNamespace(callback_query=q2, effective_user=SimpleNamespace(id=777), update_id=30002) + + _run(bot.handle_callback(u1, context)) + _run(bot.handle_callback(u2, context)) + + assert called == [(q1, context, "cmd:sales:2026/04/30")] + + +def test_polling_callback_normalizes_legacy_menu_prefix(monkeypatch): + from services.telegram_bot_service import TrendTelegramBot + + seen = {} + + def fake_dedupe(_key, namespace="telegram_update"): + if _key in seen: + return True + seen[_key] = True + return False + + bot = TrendTelegramBot(token="dummy") + normalized = [] + + async def fake_openclaw_callback(query, context, data): + normalized.append(data) + + monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe) + bot._handle_openclaw_callback = fake_openclaw_callback + + context = SimpleNamespace(user_data={}) + query = _FakeQuery("cb-menu", "menu_main", _FakeMessage(message_id=321)) + + _run(bot.handle_callback(_make_polling_update(query), context)) + + assert normalized == ["menu:main"]