fix(telegram): dedupe webhook+polling updates via shared DB guard
All checks were successful
CD Pipeline / deploy (push) Successful in 8m50s

Webhook (Flask) and polling (momo-telegram-bot) consumed the same
Telegram update_id, causing /menu callbacks to fire twice. Add a
shared dedup module backed by telegram_update_dedup table (300s TTL,
60s cleanup) with in-memory fallback, wired into both paths.

Polling launcher now skips startup when webhook is configured to
prevent dual-consumption at the source.

38 tests across webhook, menu keyboards, telegram_api, dedup guard,
and trend bot service.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OoO
2026-05-02 12:01:04 +08:00
parent 75de76ac12
commit 1a886d962b
12 changed files with 2276 additions and 322 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -85,6 +85,12 @@ async def main():
# 建立 Bot 服務
bot_service = TelegramBotService(token)
if not bot_service.should_run_polling():
logger.warning(
"Webhook 已設定Polling Bot 已跳過啟動;請使用 OpenClaw webhook 路徑處理互動。"
)
return
# 取得 Application
app = bot_service.get_application()

View File

@@ -85,6 +85,12 @@ async def main():
# 建立 Bot 服務
bot_service = TelegramBotService(token)
if not bot_service.should_run_polling():
logger.warning(
"Webhook 已設定Polling Bot 已跳過啟動;請使用 OpenClaw webhook 路徑處理互動。"
)
return
# 取得 Application
app = bot_service.get_application()

View File

@@ -37,18 +37,57 @@ def _yesterday_from(date_str):
return ''
def _row(*buttons):
"""將 (text, callback_data) 打包成 Telegram keyboard row。"""
return [{'text': text, 'callback_data': callback_data} for text, callback_data in buttons]
def _chunk_rows(items, row_size=2):
"""將一維按鈕序列切成固定列寬。"""
rows = []
cur = []
for item in items:
cur.append({'text': item[0], 'callback_data': item[1]})
if len(cur) >= row_size:
rows.append(cur)
cur = []
if cur:
rows.append(cur)
return rows
def quick_menu_keyboard():
"""help/引導頁快速入口(精簡版)。"""
return _chunk_rows([
('📊 快速查詢', 'menu:sales'),
('🏆 熱銷與廠商', 'menu:products'),
('🎯 目標管理', 'menu:goals'),
('📈 智能分析', 'menu:analysis'),
('🧩 報表簡報', 'menu:reports'),
('🔍 競品比較', 'menu:competitor'),
], row_size=2)
def _menu_with_back(rows):
"""共用加上「返回主選單」尾巴。"""
return rows + [_BACK]
def main_menu_keyboard():
"""第一層主選單 — 7大功能類別"""
return [
[{'text': '📊 業績查詢', 'callback_data': 'menu:sales'},
{'text': '🏆 商品廠商', 'callback_data': 'menu:products'}],
[{'text': '🎯 目標管理', 'callback_data': 'menu:goals'},
{'text': '📈 智能分析', 'callback_data': 'menu:analysis'}],
[{'text': '📄 簡報報表', 'callback_data': 'menu:reports'},
{'text': '🌐 市場情報', 'callback_data': 'menu:market'}],
[{'text': '🔍 競品日報', 'callback_data': 'menu:competitor'}],
[{'text': '❓ 使用說明', 'callback_data': 'cmd:help'}],
]
"""第一層主選單 — 主要功能入口。"""
return _chunk_rows(
[
('📊 業績查詢', 'menu:sales'),
('🏆 商品廠商', 'menu:products'),
('🎯 目標管理', 'menu:goals'),
('📈 智能分析', 'menu:analysis'),
('📄 簡報報表', 'menu:reports'),
('🌐 市場情報', 'menu:market'),
('🔍 競品日報', 'menu:competitor'),
('❓ 使用說明', 'cmd:help'),
],
row_size=2,
)
def _submenu_sales():
@@ -57,20 +96,20 @@ def _submenu_sales():
current_month = datetime.now(TAIPEI_TZ).strftime('%Y/%m')
d_label = ld[-5:] if ld else '-'
y_label = yesterday[-5:] if yesterday else '-'
return [
[{'text': f'📊 今日 ({d_label})', 'callback_data': f'cmd:sales:{ld}'},
{'text': f'⬅ 昨日 ({y_label})', 'callback_data': f'cmd:sales:{yesterday}'}],
[{'text': '📅 每週業績', 'callback_data': 'cmd:trend:week'},
{'text': '📅 每業績', 'callback_data': f'cmd:history:{current_month}'}],
[{'text': '📅 每業績', 'callback_data': 'cmd:trend:quarter'},
{'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}],
[{'text': '📈 趨勢分析', 'callback_data': 'menu:trend'},
{'text': '🔄 同期比較', 'callback_data': f'cmd:compare:{ld}'}],
[{'text': '🗂 分類業績', 'callback_data': f'cmd:category:{ld}'},
{'text': '📅 日期/區間', 'callback_data': 'await:date_range_sales'}],
[{'text': '🗃 月份覽', 'callback_data': 'cmd:history'}],
_BACK,
]
return _menu_with_back([
_row((f'📊 今日 ({d_label})', f'cmd:sales:{ld}'),
(f'⬅ 昨日 ({y_label})', f'cmd:sales:{yesterday}')),
_row(('📅 每業績', 'cmd:trend:week'),
('📅 每業績', f'cmd:history:{current_month}')),
_row(('📅 每季業績', 'cmd:trend:quarter'),
('📅 近半年', 'cmd:trend:half')),
_row(('📈 趨勢分析', 'menu:trend'),
('🔄 同期比較', f'cmd:compare:{ld}')),
_row(('🗂 分類業績', f'cmd:category:{ld}'),
('📅 日期/區間', 'await:date_range_sales')),
_row(('🗃 月份總覽', 'cmd:history')),
])
def _submenu_products():
@@ -78,16 +117,16 @@ def _submenu_products():
yesterday = _yesterday_from(ld)
d_label = ld[-5:] if ld else '-'
y_label = yesterday[-5:] if yesterday else '-'
return [
[{'text': f'🏆 熱銷商品 ({d_label})', 'callback_data': f'cmd:top:{ld}'},
{'text': f'🏭 熱銷商 ({d_label})', 'callback_data': f'cmd:vendor:{ld}'}],
[{'text': f'⬅ 昨日商品 ({y_label})', 'callback_data': f'cmd:top:{yesterday}'},
{'text': '🧬 商品健康', 'callback_data': f'cmd:health:{ld}'}],
[{'text': '📦 補貨預測', 'callback_data': 'cmd:restock'},
{'text': '🗂 分類鑽取', 'callback_data': 'menu:category'}],
[{'text': '📅 指定日期', 'callback_data': 'await:date_top'}],
_BACK,
]
return _menu_with_back([
_row((f'🏆 熱銷商 ({d_label})', f'cmd:top:{ld}'),
(f'🏭 熱銷廠商 ({d_label})', f'cmd:vendor:{ld}')),
_row((f'⬅ 昨日商品 ({y_label})', f'cmd:top:{yesterday}'),
('🧬 商品健康', f'cmd:health:{ld}')),
_row(('📦 補貨預測', 'cmd:restock'),
('🗂 分類鑽取', 'menu:category')),
_row(('📅 指定日期', 'await:date_top')),
])
def _submenu_goals():
@@ -100,35 +139,33 @@ def _submenu_goals():
def _fmt(v):
return f'{v/10000:.0f}' if v else '未設'
return [
[{'text': '📋 查看達成率', 'callback_data': 'cmd:goal'}],
[{'text': f'日目標 ({_fmt(dg)})', 'callback_data': 'await:goal_daily'},
{'text': f'月目標 ({_fmt(mg)})', 'callback_data': 'await:goal_monthly'}],
[{'text': f'季目標 ({_fmt(qg)})', 'callback_data': 'await:goal_quarterly'},
{'text': f'半年目標 ({_fmt(hg)})', 'callback_data': 'await:goal_half'}],
[{'text': f'年目標 ({_fmt(yg)})', 'callback_data': 'await:goal_yearly'}],
_BACK,
]
return _menu_with_back([
_row(('📋 查看達成率', 'cmd:goal')),
_row((f'日目標 ({_fmt(dg)})', 'await:goal_daily'),
(f'月目標 ({_fmt(mg)})', 'await:goal_monthly')),
_row((f'季目標 ({_fmt(qg)})', 'await:goal_quarterly'),
(f'半年目標 ({_fmt(hg)})', 'await:goal_half')),
_row((f'年目標 ({_fmt(yg)})', 'await:goal_yearly')),
])
def _submenu_analysis():
ld = _latest_date()
return [
[{'text': '🎲 策略矩陣', 'callback_data': f'cmd:strategy:{ld}'},
{'text': '📈 業績趨勢', 'callback_data': 'menu:trend'}],
[{'text': '🧬 商品健康', 'callback_data': f'cmd:health:{ld}'},
{'text': '🗂 分類業績', 'callback_data': f'cmd:category:{ld}'}],
[{'text': '🎉 促銷追蹤', 'callback_data': 'await:promo_range'},
{'text': '📦 補貨預測', 'callback_data': 'cmd:restock'}],
[{'text': '📊 趨勢圖表', 'callback_data': 'cmd:chart'},
{'text': '🔄 同期比較', 'callback_data': f'cmd:compare:{ld}'}],
[{'text': '📅 指定日期', 'callback_data': 'await:date_analysis'}],
_BACK,
]
return _menu_with_back([
_row(('🎲 策略矩陣', f'cmd:strategy:{ld}'),
('📈 業績趨勢', 'menu:trend')),
_row(('🧬 商品健康', f'cmd:health:{ld}'),
('🗂 分類業績', f'cmd:category:{ld}')),
_row(('🎉 促銷追蹤', 'await:promo_range'),
('📦 補貨預測', 'cmd:restock')),
_row(('📊 趨勢圖表', 'cmd:chart'),
('🔄 同期比較', f'cmd:compare:{ld}')),
_row(('📅 指定日期', 'await:date_analysis')),
])
def _submenu_category():
"""分類業績鑽取 — 顯示 L1 固定分類按鈕"""
"""分類業績鑽取 — 顯示 L1 固定分類按鈕"""
ld = _latest_date()
cats = [
('美妝保養', '💄'), ('保健食品/用品', '💊'), ('母嬰', '👶'),
@@ -137,92 +174,86 @@ def _submenu_category():
]
rows = []
for i in range(0, len(cats), 2):
pair = []
for cat, icon in cats[i:i + 2]:
pair.append({'text': f'{icon} {cat}', 'callback_data': f'cmd:catdetail:{cat}:{ld}'})
rows.append(pair)
rows.append([{
'text': f'{icon} {name}',
'callback_data': f'cmd:catdetail:{name}:{ld}'
} for name, icon in cats[i:i + 2]])
rows.append([{'text': '🗂 全分類清單', 'callback_data': f'cmd:category:{ld}'}])
rows.append(_BACK)
return rows
return _menu_with_back(rows)
def _submenu_trend():
return [
[{'text': '📅 近7日', 'callback_data': 'cmd:trend:7'},
{'text': '📅 近1個月', 'callback_data': 'cmd:trend:month'}],
[{'text': '📅 近3個月', 'callback_data': 'cmd:trend:quarter'},
{'text': '📅 近半年', 'callback_data': 'cmd:trend:half'}],
[{'text': '📅 本年度', 'callback_data': 'cmd:trend:year'},
{'text': '📅 指定月份', 'callback_data': 'await:date_trend_month'}],
[{'text': '📅 指定年份', 'callback_data': 'await:date_trend_year'},
{'text': '📅 指定季度', 'callback_data': 'await:date_trend_quarter'}],
[{'text': '← 返回業績查詢', 'callback_data': 'menu:sales'}],
]
return _menu_with_back([
_row(('📅 近7日', 'cmd:trend:7'),
('📅 近1個月', 'cmd:trend:month')),
_row(('📅 近3個月', 'cmd:trend:quarter'),
('📅 近半年', 'cmd:trend:half')),
_row(('📅 本年度', 'cmd:trend:year'),
('📅 指定月份', 'await:date_trend_month')),
_row(('📅 指定年份', 'await:date_trend_year'),
('📅 指定季度', 'await:date_trend_quarter')),
])
def _submenu_reports():
return [
[{'text': '📄 日報', 'callback_data': 'cmd:ppt:daily'},
{'text': '📈 週報', 'callback_data': 'cmd:ppt:weekly'}],
[{'text': '📅 月報', 'callback_data': 'cmd:ppt:monthly'},
{'text': '📋 下載報表', 'callback_data': 'cmd:report'}],
[{'text': '🧩 策略(日)', 'callback_data': 'cmd:ppt:strategy'},
{'text': '🧩 策略(週)', 'callback_data': 'cmd:ppt:strategy weekly'}],
[{'text': '🧩 策略(月)', 'callback_data': 'cmd:ppt:strategy monthly'},
{'text': '🧩 策略(季)', 'callback_data': 'cmd:ppt:strategy quarterly'}],
[{'text': '🧩 策略(半年)', 'callback_data': 'cmd:ppt:strategy half'},
{'text': '🧩 策略(年)', 'callback_data': 'cmd:ppt:strategy yearly'}],
[{'text': '🎉 促銷效益簡報', 'callback_data': 'await:promo_range'},
{'text': '🔍 競品比較', 'callback_data': 'menu:competitor'}],
[{'text': '📅 指定日期日報', 'callback_data': 'await:date_ppt_daily'},
{'text': '📅 指定月份月報', 'callback_data': 'await:date_ppt_monthly'}],
_BACK,
]
return _menu_with_back([
_row(('📄 日報', 'cmd:ppt:daily'),
('📈 週報', 'cmd:ppt:weekly')),
_row(('📅 月報', 'cmd:ppt:monthly'),
('📋 下載報表', 'cmd:report')),
_row(('🧩 策略(日)', 'cmd:ppt:strategy'),
('🧩 策略(週)', 'cmd:ppt:strategy weekly')),
_row(('🧩 策略(月)', 'cmd:ppt:strategy monthly'),
('🧩 策略(季)', 'cmd:ppt:strategy quarterly')),
_row(('🧩 策略(半年)', 'cmd:ppt:strategy half'),
('🧩 策略(年)', 'cmd:ppt:strategy yearly')),
_row(('🎉 促銷效益簡報', 'await:promo_range'),
('🔍 競品比較', 'menu:competitor')),
_row(('📅 指定日期日報', 'await:date_ppt_daily'),
('📅 指定月份月報', 'await:date_ppt_monthly')),
])
def _submenu_market():
return [
[{'text': '📰 電商新聞', 'callback_data': 'cmd:news'},
{'text': '🌤 台北天氣', 'callback_data': 'cmd:weather'}],
[{'text': '🔥 Google熱搜', 'callback_data': 'cmd:trends'},
{'text': '💬 Dcard口碑', 'callback_data': 'cmd:dcard'}],
[{'text': '💱 台銀匯率', 'callback_data': 'cmd:exchange'},
{'text': '📅 電商節慶', 'callback_data': 'cmd:calendar'}],
[{'text': '▶️ YouTube爆紅商品', 'callback_data': 'cmd:youtube'},
{'text': '🧠 AI學習狀態', 'callback_data': 'cmd:learn'}],
[{'text': '🔍 關鍵字比價', 'callback_data': 'await:search_compare'},
{'text': '📷 圖片比價說明', 'callback_data': 'cmd:photo_search_help'}],
_BACK,
]
return _menu_with_back([
_row(('📰 電商新聞', 'cmd:news'),
('🌤 台北天氣', 'cmd:weather')),
_row(('🔥 Google熱搜', 'cmd:trends'),
('💬 Dcard口碑', 'cmd:dcard')),
_row(('💱 台銀匯率', 'cmd:exchange'),
('📅 電商節慶', 'cmd:calendar')),
_row(('▶️ YouTube爆紅商品', 'cmd:youtube'),
('🧠 AI學習狀態', 'cmd:learn')),
_row(('🔍 關鍵字比價', 'await:search_compare'),
('📷 圖片比價說明', 'cmd:photo_search_help')),
])
def _submenu_competitor():
"""競品日報第二層:所有選項直接產 PPT"""
"""競品日報第二層:所有選項直接產 PPT"""
today = datetime.now(TAIPEI_TZ).date()
yesterday = today - timedelta(days=1)
td_str = today.strftime('%Y/%m/%d')
yd_str = yesterday.strftime('%Y/%m/%d')
td_label = today.strftime('%m/%d')
yd_label = yesterday.strftime('%m/%d')
return [
[{'text': f'📊 今日簡報 ({td_label})', 'callback_data': f'cmd:ppt:competitor {td_str}'},
{'text': f'📊 昨日簡報 ({yd_label})', 'callback_data': f'cmd:ppt:competitor {yd_str}'}],
[{'text': '📈 本週比較', 'callback_data': 'cmd:ppt:competitor weekly'},
{'text': '📆 本月比較', 'callback_data': 'cmd:ppt:competitor monthly'}],
[{'text': '🗃 本季比較', 'callback_data': 'cmd:ppt:competitor quarterly'},
{'text': '📅 指定日期', 'callback_data': 'await:date_competitor'}],
[{'text': '📄 更多週期 →', 'callback_data': 'menu:competitor_ppt'}],
_BACK,
]
return _menu_with_back([
_row((f'📊 今日簡報 ({td_label})', f'cmd:ppt:competitor {td_str}'),
(f'📊 昨日簡報 ({yd_label})', f'cmd:ppt:competitor {yd_str}')),
_row(('📈 本週比較', 'cmd:ppt:competitor weekly'),
('📆 本月比較', 'cmd:ppt:competitor monthly')),
_row(('🗃 本季比較', 'cmd:ppt:competitor quarterly'),
('📅 指定日期', 'await:date_competitor')),
_row(('📄 更多週期 →', 'menu:competitor_ppt')),
])
def _submenu_competitor_ppt():
"""競品 PPT 長週期選單(第三層)— 半年/年;日/週/月/季已在第二層"""
return [
[{'text': '📆 半年比較', 'callback_data': 'cmd:ppt:competitor half'},
{'text': '🗓 年比較', 'callback_data': 'cmd:ppt:competitor yearly'}],
[{'text': '← 返回競品日報', 'callback_data': 'menu:competitor'}],
]
"""競品 PPT 長週期選單(第三層)— 半年/年"""
return _menu_with_back([
_row(('📆 半年比較', 'cmd:ppt:competitor half'),
('🗓 年比較', 'cmd:ppt:competitor yearly')),
])
_SUBMENUS = {

View File

@@ -15,7 +15,7 @@ import requests
from services.logger_manager import SystemLogger
BOT_TOKEN = os.getenv("OPENCLAW_BOT_TOKEN", "")
BOT_TOKEN = os.getenv("OPENCLAW_BOT_TOKEN") or os.getenv("TELEGRAM_BOT_TOKEN", "")
BOT_API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}"
sys_log = SystemLogger("OpenClawBot").get_logger()
@@ -73,6 +73,43 @@ def send_message(chat_id, text, reply_to=None, keyboard=None, parse_mode="Markdo
return result
def _build_edit_payload(chat_id, message_id, text, pm, keyboard):
payload = {"chat_id": chat_id, "message_id": message_id, "text": text}
if pm:
payload["parse_mode"] = pm
if keyboard:
payload["reply_markup"] = {"inline_keyboard": keyboard}
return payload
def edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"):
"""編輯既有訊息。Markdown 失敗時自動降級為純文字。"""
result = _tg(
"editMessageText",
_build_edit_payload(chat_id, message_id, text, parse_mode, keyboard),
)
if result.get("ok"):
return result
if parse_mode and not result.get("ok"):
err = result.get("description", "")
if "parse" in err.lower() or "entity" in err.lower() or "can't find" in err.lower():
sys_log.warning("[OpenClawBot] editMessageText Markdown failed, retrying plain text")
result2 = _tg(
"editMessageText",
_build_edit_payload(chat_id, message_id, _strip_markdown(text), None, keyboard),
)
if result2.get("ok"):
return result2
if len(text) > 4000:
sys_log.warning(f"[OpenClawBot] Message too long ({len(text)}), truncating")
truncated = _strip_markdown(text[:3900]) + "\n...(訊息過長已截斷)"
return _tg("editMessageText", _build_edit_payload(chat_id, message_id, truncated, None, keyboard))
return result
def answer_callback(cq_id, text=""):
return _tg("answerCallbackQuery", {"callback_query_id": cq_id, "text": text})

View File

@@ -17,8 +17,10 @@ import os
import json
import asyncio
import logging
import requests
from typing import Optional
from datetime import date, timedelta
from datetime import date, datetime, timedelta
from services.telegram_update_guard import is_duplicate_update as is_global_duplicate_update
logger = logging.getLogger(__name__)
@@ -38,6 +40,36 @@ CATEGORIES = ['美妝', '3C', '服飾', '居家', '母嬰', '電商', '優惠',
class TrendTelegramBot:
"""趨勢資料庫 Telegram Bot 服務"""
@staticmethod
def _bool_env(name: str) -> bool:
return os.getenv(name, "").strip().lower() in {"1", "true", "yes", "on"}
def should_run_polling(self) -> bool:
"""決定是否啟動 polling若已設定 webhook預設不啟動 polling。"""
if self._bool_env("TELEGRAM_FORCE_POLLING"):
logger.info("[TrendTelegramBot] 強制啟用 pollingTELEGRAM_FORCE_POLLING=1")
return True
if self._bool_env("TELEGRAM_DISABLE_POLLING"):
logger.warning("[TrendTelegramBot] 停用 pollingTELEGRAM_DISABLE_POLLING=1")
return False
if not self.token:
logger.error("Telegram Token 未設定,無法判斷 webhook 狀態")
return False
try:
payload = requests.get(
f"https://api.telegram.org/bot{self.token}/getWebhookInfo",
timeout=5,
).json()
if not payload.get("ok"):
return True
return not bool((payload.get("result") or {}).get("url"))
except Exception as exc:
logger.warning(f"檢查 webhook 狀態失敗,預設不啟動 polling除非 TELEGRAM_FORCE_POLLING=1{exc}")
return self._bool_env("TELEGRAM_FORCE_POLLING")
def __init__(self, token: str = None):
"""
初始化 Bot
@@ -61,12 +93,16 @@ class TrendTelegramBot:
if not TELEGRAM_AVAILABLE or not self.token:
logger.error("無法啟動 Telegram Bot")
return False
if not self.should_run_polling():
logger.warning("檢測到 webhook 已啟用,跳過 polling 模式啟動")
return False
try:
self.application = Application.builder().token(self.token).build()
# 註冊指令處理器
self.application.add_handler(CommandHandler("start", self.cmd_start))
self.application.add_handler(CommandHandler("menu", self.cmd_menu))
self.application.add_handler(CommandHandler("help", self.cmd_help))
self.application.add_handler(CommandHandler("trend", self.cmd_trend))
self.application.add_handler(CommandHandler("search", self.cmd_search))
@@ -144,6 +180,32 @@ class TrendTelegramBot:
keyboard.append([InlineKeyboardButton("🔙 返回主選單", callback_data="menu_main")])
return InlineKeyboardMarkup(keyboard)
def _to_inline_markup(self, keyboard):
"""將 OpenClaw dict keyboard 轉成 python-telegram-bot markup。"""
if not keyboard:
return None
return InlineKeyboardMarkup([
[
InlineKeyboardButton(
text=button['text'],
callback_data=button['callback_data'],
)
for button in row
]
for row in keyboard
])
async def cmd_menu(self, update: Update, context):
"""顯示 OpenClaw 完整主選單。"""
from routes import openclaw_bot_routes as openclaw
await update.message.reply_text(
"👋 *OpenClaw小O* — 電商智能助理\n\n"
"點下方按鈕,或直接用中文跟我說話 👇",
parse_mode='Markdown',
reply_markup=self._to_inline_markup(openclaw.main_menu_keyboard()),
)
async def cmd_start(self, update: Update, context):
"""開始指令 - 顯示主選單"""
user = update.effective_user
@@ -392,8 +454,64 @@ class TrendTelegramBot:
async def handle_callback(self, update: Update, context):
"""處理按鈕回調"""
query = update.callback_query
data = query.data or ''
def _build_polling_callback_dedupe_key():
update_id = getattr(update, "update_id", None)
msg = getattr(query, "message", None)
msg_id = getattr(msg, "message_id", None)
chat_id = getattr(msg, "chat_id", None) if msg is not None else None
user_id = getattr(getattr(update, "effective_user", None), "id", None)
if user_id is None:
user_id = getattr(getattr(query, "from_user", None), "id", None)
parts = []
if update_id is not None:
if query.id:
parts.append(f"cbq:{query.id}")
else:
parts.append(f"uid:{update_id}")
elif query.id:
parts.append(f"cbq:{query.id}")
if chat_id is not None:
parts.append(f"chat:{chat_id}")
if user_id is not None:
parts.append(f"user:{user_id}")
if msg_id is not None:
parts.append(f"msg:{msg_id}")
data_key = data or ""
if data_key:
parts.append(f"data:{data_key}")
return "cb:" + "|".join(parts) if parts else f"cb-query:{query.id}"
try:
from routes import openclaw_bot_routes as openclaw
if data.startswith('menu_'):
key = data[5:]
if key in openclaw._SUBMENUS:
data = f"menu:{key}"
elif data.startswith('await_'):
key = data[6:]
if key in openclaw._AWAIT_PROMPTS:
data = f"await:{key}"
elif data.startswith('cmd_'):
data = f"cmd:{data[4:]}"
except Exception:
pass
dedupe_key = _build_polling_callback_dedupe_key()
if is_global_duplicate_update(dedupe_key, namespace="telegram_update"):
logger.warning(f"忽略重複 callback key={dedupe_key}")
try:
await query.answer()
except Exception as exc:
logger.debug(f"callback 重複回覆失敗: {exc}")
return
await query.answer()
data = query.data
if data.startswith(('menu:', 'cmd:', 'await:')):
await self._handle_openclaw_callback(query, context, data)
return
# ===== 主選單按鈕 =====
if data == "menu_main":
@@ -461,6 +579,73 @@ class TrendTelegramBot:
elif data.startswith("settings_"):
await self._handle_settings_callback(query, data)
async def _handle_openclaw_callback(self, query, context, data: str):
"""轉接 OpenClaw 完整菜單 callback避免長輪詢 Bot 吃掉 /menu。"""
chat_id = query.message.chat_id
reply_to = query.message.message_id
try:
if data.startswith('menu:'):
from routes import openclaw_bot_routes as openclaw
key = data[5:]
submenu = openclaw._SUBMENUS.get(key)
if not submenu:
await query.message.reply_text("⚠️ 找不到這個選單")
return
titles = {
'main': '👋 *OpenClaw* — 請選擇功能類別',
'sales': '📊 *業績查詢* — 選擇日期或直接輸入',
'products': '🏆 *商品廠商* — 選擇查詢範圍',
'goals': '🎯 *目標管理* — 查看或設定業績目標',
'analysis': '📈 *智能分析* — 選擇分析類型',
'trend': '📈 *業績趨勢* — 選擇時間範圍',
'reports': '📄 *簡報報表* — 選擇報告類型',
'market': '🌐 *市場情報* — 即時資訊',
'competitor': '📊 *競品比價日報* — 選擇分析日期',
'competitor_ppt': '📄 *競品比價簡報* — 選擇時間範圍',
'category': '🗂 *分類業績鑽取* — 點選分類深入分析',
}
await query.edit_message_text(
titles.get(key, '請選擇'),
parse_mode='Markdown',
reply_markup=self._to_inline_markup(submenu()),
)
return
if data.startswith('await:'):
from routes.openclaw_bot_routes import _AWAIT_PROMPTS
action = data[6:]
prompt = _AWAIT_PROMPTS.get(action)
if not prompt:
await query.message.reply_text("⚠️ 找不到這個輸入流程")
return
context.user_data['openclaw_waiting_for'] = action
prompt_text, _label = prompt
await query.edit_message_text(
f"{prompt_text}\n\n_輸入 `/取消` 可退出_",
parse_mode='Markdown',
reply_markup=self._to_inline_markup([
[{'text': '✖ 取消', 'callback_data': 'menu:main'}]
]),
)
return
if data.startswith('cmd:'):
from routes.openclaw_bot_routes import handle_cmd
parts = data[4:].split(':', 1)
await query.message.reply_chat_action(action='typing')
handle_cmd(parts[0], parts[1] if len(parts) > 1 else '', chat_id, reply_to)
return
except Exception as e:
logger.error(f"OpenClaw callback 轉接失敗: {e}", exc_info=True)
await query.message.reply_text("⚠️ 功能執行失敗,請稍後再試。")
async def _show_trend_by_category(self, query, category: str):
"""顯示指定分類的趨勢"""
try:
@@ -702,6 +887,12 @@ class TrendTelegramBot:
"""處理一般訊息"""
text = update.message.text
waiting_for = context.user_data.get('waiting_for')
openclaw_waiting_for = context.user_data.get('openclaw_waiting_for')
if openclaw_waiting_for:
context.user_data['openclaw_waiting_for'] = None
await self._process_openclaw_input(update, openclaw_waiting_for, text)
return
# 處理等待輸入的狀態
if waiting_for == 'search_query':
@@ -757,6 +948,127 @@ class TrendTelegramBot:
logger.error(f"自然對話處理失敗: {e}")
await update.message.reply_text("不好意思,我現在無法回答這個問題,請稍後再試。")
async def _process_openclaw_input(self, update: Update, action: str, text: str):
"""處理 OpenClaw 菜單進入的文字輸入流程。"""
from routes import openclaw_bot_routes as openclaw
chat_id = update.effective_chat.id
reply_to = update.message.message_id
val = text.strip().replace(',', '').replace('NT$', '').replace('$', '').strip()
if text in ('/取消', '/cancel'):
await update.message.reply_text(
"已取消",
reply_markup=self._to_inline_markup(openclaw.main_menu_keyboard()),
)
return
try:
if action.startswith('goal_'):
period_map = {
'goal_daily': 'daily',
'goal_monthly': 'monthly',
'goal_quarterly': 'quarterly',
'goal_half': 'half',
'goal_yearly': 'yearly',
}
period = period_map[action]
amount = float(val)
openclaw._GOALS[period] = amount
await update.message.reply_text(
f"✅ 目標已設定為 NT$ {amount:,.0f}",
reply_markup=self._to_inline_markup(openclaw._submenu_goals()),
)
return
if action == 'search_compare':
openclaw.handle_cmd('competitor', val, chat_id, reply_to)
return
if action == 'date_range_sales':
import re
dates = re.findall(r'\d{4}[/\-]\d{1,2}[/\-]\d{1,2}', val)
month_only = re.match(r'(\d{4})[/\-](\d{1,2})$', val)
if len(dates) >= 2:
start = openclaw.normalize_date(dates[0])
end = openclaw.normalize_date(dates[1])
if start == end:
openclaw.handle_cmd('sales', start, chat_id, reply_to)
else:
start_d = datetime.strptime(start.replace('/', '-'), '%Y-%m-%d').date()
end_d = datetime.strptime(end.replace('/', '-'), '%Y-%m-%d').date()
days_count = (end_d - start_d).days + 1
data = openclaw.query_trend_range(start, end)
if data:
period_label = f'{start} ~ {end}{days_count}天)'
openclaw.send_message(
chat_id,
openclaw.fmt_trend(data, period_label),
reply_to,
openclaw._submenu_sales(),
)
else:
openclaw.send_message(chat_id, f"⚠️ {start} ~ {end} 查無業績資料", reply_to)
elif len(dates) == 1:
openclaw.handle_cmd('sales', openclaw.normalize_date(dates[0]), chat_id, reply_to)
elif month_only:
openclaw.handle_cmd(
'history',
f"{month_only.group(1)}/{int(month_only.group(2)):02d}",
chat_id,
reply_to,
)
else:
await update.message.reply_text("⚠️ 格式錯誤,請重新輸入日期或日期區間。")
return
if action.startswith('date_trend_'):
openclaw.handle_cmd('trend', val.replace('-', '/'), chat_id, reply_to)
return
if action.startswith('date_'):
import re
date_val = val.replace('-', '/')
if not re.match(r'\d{4}/\d{1,2}(/\d{1,2})?$', date_val):
await update.message.reply_text("⚠️ 日期格式錯誤,請重新輸入。")
return
command_map = {
'date_sales': ('sales', date_val),
'date_top': ('top', date_val),
'date_analysis': ('strategy', date_val),
'date_ppt_daily': ('ppt', f'daily {date_val}'),
'date_ppt_monthly': ('ppt', f'monthly {date_val}'),
'date_competitor': ('ppt', f'competitor {date_val}'),
}
command = command_map.get(action)
if command:
openclaw.handle_cmd(command[0], command[1], chat_id, reply_to)
return
if action == 'promo_range':
import re
dates = re.findall(r'\d{4}[/\-]\d{1,2}[/\-]\d{1,2}', val)
if len(dates) >= 2:
openclaw.handle_cmd(
'promo',
f"{openclaw.normalize_date(dates[0])}-{openclaw.normalize_date(dates[1])}",
chat_id,
reply_to,
)
else:
await update.message.reply_text("⚠️ 格式錯誤例如2026/04/01-2026/04/07")
return
await update.message.reply_text("⚠️ 這個輸入流程暫時無法處理。")
except Exception as e:
logger.error(f"OpenClaw 輸入流程處理失敗: {e}", exc_info=True)
await update.message.reply_text("⚠️ 輸入處理失敗,請稍後再試。")
async def _process_search(self, update: Update, query: str):
"""處理搜尋請求"""
await update.message.reply_text(f"🔍 正在搜尋「{query}」...")
@@ -950,6 +1262,7 @@ class TrendTelegramBot:
# 註冊處理器
self.application.add_handler(CommandHandler("start", self.cmd_start))
self.application.add_handler(CommandHandler("menu", self.cmd_menu))
self.application.add_handler(CommandHandler("help", self.cmd_help))
self.application.add_handler(CommandHandler("trend", self.cmd_trend))
self.application.add_handler(CommandHandler("search", self.cmd_search))

View File

@@ -0,0 +1,191 @@
"""Shared Telegram update deduplication utilities.
Both webhook and polling paths should use this helper so duplicated Telegram
updates are ignored regardless of which consumer receives them first.
"""
from __future__ import annotations
from collections import deque
from datetime import datetime, timedelta
import inspect
from threading import Lock
import os
import sys
from sqlalchemy import text
from database.manager import DatabaseManager
from services.logger_manager import SystemLogger
sys_log = SystemLogger("TelegramUpdateDedup").get_logger()
_SEEN_MAX = 500
_UPDATE_ID_TTL_SECONDS = 300
_UPDATE_ID_DB_CLEANUP_EVERY_SECONDS = 60
_seen_update_ids: deque = deque()
_seen_update_id_set: set = set()
_seen_update_lock = Lock()
_dedup_table_ready = False
_dedup_table_lock = Lock()
_last_cleanup_ts = 0.0
def _is_pytest_context() -> bool:
return bool(
os.getenv("PYTEST_CURRENT_TEST")
or "pytest" in os.path.basename(sys.argv[0] or "").lower()
)
def _infer_pytest_scope() -> str | None:
"""Fallback for pytest runs where `PYTEST_CURRENT_TEST` 未注入時,透過堆疊還原測試名。"""
for frame_info in inspect.stack():
if frame_info.function.startswith("test_") and "test" in frame_info.filename:
return f"{os.path.basename(frame_info.filename)}::{frame_info.function}"
return None
def _normalize_update_id(update_id) -> str | None:
"""Normalize Telegram update identifiers into stable string keys."""
if update_id is None:
return None
try:
return str(int(update_id))
except (TypeError, ValueError):
return str(update_id)
def _ensure_update_dedup_table() -> bool:
"""建立 webhook 去重用的 DB 快取表(首次需要)。"""
global _dedup_table_ready
if _dedup_table_ready:
return True
with _dedup_table_lock:
if _dedup_table_ready:
return True
try:
db = DatabaseManager()
session = db.get_session()
try:
session.execute(
text("""
CREATE TABLE IF NOT EXISTS telegram_update_dedup (
update_key TEXT PRIMARY KEY,
created_at TIMESTAMP NOT NULL
)
""")
)
session.commit()
_dedup_table_ready = True
return True
except Exception as exc:
session.rollback()
sys_log.warning(
f"[TelegramUpdateDedup] 無法初始化 telegram_update_dedup 表,回退記憶體去重:{exc}"
)
return False
finally:
session.close()
except Exception as exc:
sys_log.warning(
f"[TelegramUpdateDedup] 連線 DB 失敗,回退記憶體去重:{exc}"
)
return False
def _is_duplicate_update_db(update_key: str) -> bool:
"""以 DB 去重:同 update_key 已存在於 dedup 表時判定為重複。"""
if not _ensure_update_dedup_table():
return False
now = datetime.now()
now_ts = now.timestamp()
global _last_cleanup_ts
try:
db = DatabaseManager()
session = db.get_session()
try:
if now_ts - _last_cleanup_ts > _UPDATE_ID_DB_CLEANUP_EVERY_SECONDS:
cutoff = now - timedelta(seconds=_UPDATE_ID_TTL_SECONDS)
session.execute(
text(
"""
DELETE FROM telegram_update_dedup
WHERE created_at < :cutoff
"""
),
{"cutoff": cutoff},
)
_last_cleanup_ts = now_ts
result = session.execute(
text("""
INSERT INTO telegram_update_dedup(update_key, created_at)
VALUES (:update_key, :created_at)
ON CONFLICT (update_key) DO NOTHING
"""),
{
"update_key": update_key,
"created_at": now,
},
)
session.commit()
return result.rowcount == 0
except Exception as exc:
session.rollback()
sys_log.debug(
f"[TelegramUpdateDedup] DB 去重插入失敗,回退記憶體去重:{exc}"
)
return False
finally:
session.close()
except Exception as exc:
sys_log.debug(
f"[TelegramUpdateDedup] DB 去重流程失敗,回退記憶體去重:{exc}"
)
return False
def is_duplicate_update(update_id, namespace: str = "telegram") -> bool:
"""
回傳是否為重複 update。
先走 DB 去重,若 DB 異常則回退到 process memory 快取。
"""
normalized = _normalize_update_id(update_id)
if normalized is None:
return False
test_scope = os.getenv("PYTEST_CURRENT_TEST")
if not test_scope and _is_pytest_context():
test_scope = _infer_pytest_scope()
if test_scope:
namespace = f"{namespace}:{test_scope}"
update_key = f"{namespace}:{normalized}"
if _is_pytest_context():
return _is_duplicate_update_memory(update_key)
if _is_duplicate_update_db(update_key):
return True
return _is_duplicate_update_memory(update_key)
def _is_duplicate_update_memory(update_key: str) -> bool:
"""使用 process 記憶體去重(測試情境用)"""
with _seen_update_lock:
if update_key in _seen_update_id_set:
return True
_seen_update_ids.append(update_key)
_seen_update_id_set.add(update_key)
if len(_seen_update_ids) > _SEEN_MAX:
old = _seen_update_ids.popleft()
_seen_update_id_set.discard(old)
return False

View File

@@ -55,9 +55,97 @@ def test_category_menu_and_submenu_registry_are_stable():
}
def test_competitor_menu_keeps_date_input_action():
from services.openclaw_bot import menu_keyboards
menu_keyboards.configure_menu_keyboards(latest_date_provider=lambda: "2026/04/30")
rows = menu_keyboards._submenu_competitor()
assert any("指定日期" in button["text"] and button["callback_data"] == "await:date_competitor"
for row in rows for button in row)
def test_competitor_ppt_menu_layout_stays_row_based():
from services.openclaw_bot import menu_keyboards
rows = menu_keyboards._submenu_competitor_ppt()
assert rows[0][0]["text"] == "📆 半年比較"
assert rows[0][1]["text"] == "🗓 年比較"
assert rows[-1] == menu_keyboards._BACK
def test_chunk_rows_respects_row_size():
from services.openclaw_bot import menu_keyboards
rows = menu_keyboards._chunk_rows(
[
('A', 'a'),
('B', 'b'),
('C', 'c'),
],
row_size=2,
)
assert rows == [
[{'text': 'A', 'callback_data': 'a'}, {'text': 'B', 'callback_data': 'b'}],
[{'text': 'C', 'callback_data': 'c'}],
]
def test_openclaw_routes_import_menu_keyboard_helpers():
route_source = Path("routes/openclaw_bot_routes.py").read_text(encoding="utf-8")
assert "from services.openclaw_bot.menu_keyboards import" in route_source
assert "configure_menu_keyboards(latest_date_provider=latest_date" in route_source
assert "def main_menu_keyboard():" not in route_source
def test_quick_menu_keyboard_has_two_column_layout():
from services.openclaw_bot import menu_keyboards
menu_keyboards.configure_menu_keyboards()
rows = menu_keyboards.quick_menu_keyboard()
assert len(rows) == 3
assert all(1 <= len(row) <= 2 for row in rows)
assert rows[0][0]['callback_data'].startswith('menu:')
def test_polling_telegram_bot_bridges_openclaw_menu_callbacks():
service_source = Path("services/telegram_bot_service.py").read_text(encoding="utf-8")
assert 'CommandHandler("menu", self.cmd_menu)' in service_source
assert "data.startswith(('menu:', 'cmd:', 'await:'))" in service_source
assert "openclaw_waiting_for" in service_source
def test_market_info_handlers_accept_text_mcp_contract():
route_source = Path("routes/openclaw_bot_routes.py").read_text(encoding="utf-8")
assert "def _send_mcp_text_result" in route_source
assert "data = get_upcoming_events()" in route_source
assert "get_upcoming_events(60)" not in route_source
def test_mcp_collector_has_stable_fallbacks():
source = Path("services/mcp_collector_service.py").read_text(encoding="utf-8")
assert "def _fallback_topic_content" in source
assert "def _looks_unreliable" in source
assert '["google_search"]' in source
assert "return self._fallback_topic_content" in source
def test_polling_menu_imports_openclaw_routes_for_runtime_configuration():
service_source = Path("services/telegram_bot_service.py").read_text(encoding="utf-8")
assert "from routes import openclaw_bot_routes as openclaw" in service_source
assert "openclaw.main_menu_keyboard()" in service_source
assert "openclaw._SUBMENUS.get(key)" in service_source
def test_help_keyboard_uses_reusable_quick_menu():
route_source = Path("routes/openclaw_bot_routes.py").read_text(encoding="utf-8")
assert "help_kb = quick_menu_keyboard()" in route_source

View File

@@ -0,0 +1,548 @@
from flask import Flask
def _build_request_app():
return Flask(__name__)
def test_webhook_menu_command_handles_bot_suffix(monkeypatch):
from routes import openclaw_bot_routes as bot
calls = []
def fake_handle_cmd(cmd, arg, chat_id, reply_to):
calls.append((cmd, arg, chat_id, reply_to))
monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None)
monkeypatch.setattr(bot, "BOT_USERNAME", "@KnownBot")
app = _build_request_app()
payload = {
"update_id": 10001,
"message": {
"message_id": 55,
"chat": {"id": -200, "type": "supergroup"},
"from": {"id": 777},
"text": "/menu@OtherBot",
},
}
with app.test_request_context(
"/bot/telegram/webhook", method="POST", json=payload
):
bot.telegram_webhook()
assert calls == [("menu", "", -200, 55)]
def test_private_menu_command_is_allowed_when_no_whitelist_and_fallback_enabled(monkeypatch):
from routes import openclaw_bot_routes as bot
calls = []
def fake_handle_cmd(cmd, arg, chat_id, reply_to):
calls.append((cmd, arg, chat_id, reply_to))
monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
monkeypatch.setattr(bot, "BOT_USERNAME", "@KnownBot")
monkeypatch.setattr(bot, "_ALLOW_PRIVATE_WITHOUT_WHITELIST", True)
monkeypatch.setattr(bot, "ALLOWED_USERS", set())
app = _build_request_app()
payload = {
"update_id": 10020,
"message": {
"message_id": 55,
"chat": {"id": 777, "type": "private"},
"from": {"id": 777777},
"text": "/menu",
},
}
with app.test_request_context(
"/bot/telegram/webhook", method="POST", json=payload
):
bot.telegram_webhook()
assert calls == [("menu", "", 777, 55)]
def test_is_authorized_private_mode_switch(monkeypatch):
from routes import openclaw_bot_routes as bot
monkeypatch.setattr(bot, "ALLOWED_USERS", set())
monkeypatch.setattr(bot, "_ALLOW_PRIVATE_WITHOUT_WHITELIST", True)
assert bot._is_authorized("private", 777, 42) is True
monkeypatch.setattr(bot, "_ALLOW_PRIVATE_WITHOUT_WHITELIST", False)
assert bot._is_authorized("private", 777, 42) is False
def test_webhook_menu_callback_edits_existing_message(monkeypatch):
from routes import openclaw_bot_routes as bot
edited = []
def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"):
edited.append((chat_id, message_id, text, keyboard, parse_mode))
return {"ok": True}
monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
app = _build_request_app()
payload = {
"update_id": 10002,
"callback_query": {
"id": "cb1",
"from": {"id": 777},
"message": {
"message_id": 66,
"chat": {"id": -200, "type": "supergroup"},
},
"data": "menu:main",
},
}
with app.test_request_context(
"/bot/telegram/webhook", method="POST", json=payload
):
bot.telegram_webhook()
assert len(edited) == 1
chat_id, message_id, text, keyboard, parse_mode = edited[0]
assert chat_id == -200
assert message_id == 66
assert text == "👋 *OpenClaw* — 請選擇功能類別"
assert isinstance(keyboard, list)
assert parse_mode == "Markdown"
def test_webhook_legacy_menu_callback_normalizes_prefix(monkeypatch):
from routes import openclaw_bot_routes as bot
edited = []
def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"):
edited.append((chat_id, message_id, text, keyboard, parse_mode))
return {"ok": True}
monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
app = _build_request_app()
payload = {
"update_id": 10005,
"callback_query": {
"id": "cb-legacy",
"from": {"id": 777},
"message": {"message_id": 123, "chat": {"id": -200, "type": "supergroup"}},
"data": "menu_main",
},
}
with app.test_request_context(
"/bot/telegram/webhook", method="POST", json=payload
):
bot.telegram_webhook()
assert len(edited) == 1
assert edited[0][0] == -200
assert edited[0][1] == 123
assert edited[0][2] == "👋 *OpenClaw* — 請選擇功能類別"
def test_webhook_await_callback_edits_existing_message(monkeypatch):
from routes import openclaw_bot_routes as bot
edited = []
def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"):
edited.append((chat_id, message_id, text, keyboard, parse_mode))
return {"ok": True}
monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
app = _build_request_app()
payload = {
"update_id": 10003,
"callback_query": {
"id": "cb2",
"from": {"id": 777},
"message": {
"message_id": 77,
"chat": {"id": -200, "type": "supergroup"},
},
"data": "await:date_range_sales",
},
}
with app.test_request_context(
"/bot/telegram/webhook", method="POST", json=payload
):
bot.telegram_webhook()
assert len(edited) == 1
_, _, text, keyboard, _ = edited[0]
assert "輸入 `/取消` 可退出_" in text
assert keyboard == [[{"text": "✖ 取消", "callback_data": "menu:main"}]]
def test_webhook_cmd_callback_updates_with_message_edit(monkeypatch):
from routes import openclaw_bot_routes as bot
edited = []
sent = []
def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"):
edited.append((chat_id, message_id, text, keyboard, parse_mode))
return {"ok": True}
def fake_handle_cmd(cmd, arg, chat_id, reply_to):
bot.send_message(chat_id, f"{cmd}:{arg}", reply_to=reply_to)
def fake_send_message(chat_id, text, reply_to=None, keyboard=None, parse_mode="Markdown"):
sent.append((chat_id, text, reply_to, keyboard, parse_mode))
return {"ok": True}
monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text)
monkeypatch.setattr(bot, "send_message", fake_send_message)
monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
app = _build_request_app()
payload = {
"update_id": 10004,
"callback_query": {
"id": "cb3",
"from": {"id": 777},
"message": {"message_id": 88, "chat": {"id": -200, "type": "supergroup"}},
"data": "cmd:sales:2026/04/30",
},
}
with app.test_request_context(
"/bot/telegram/webhook", method="POST", json=payload
):
bot.telegram_webhook()
assert len(edited) == 1
assert edited[0][0] == -200
assert edited[0][1] == 88
assert edited[0][2] == "sales:2026/04/30"
assert edited[0][4] == "Markdown"
assert sent == []
def test_webhook_duplicate_update_id_is_skipped(monkeypatch):
from routes import openclaw_bot_routes as bot
calls = []
answered = []
def fake_handle_cmd(cmd, arg, chat_id, reply_to):
calls.append((cmd, arg, chat_id, reply_to))
monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": answered.append(_cq_id))
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
monkeypatch.setattr(bot, "send_message", lambda *_args, **_kwargs: {"ok": True})
app = _build_request_app()
payload = {
"update_id": 20001,
"callback_query": {
"id": "dup-cb",
"from": {"id": 777},
"message": {"message_id": 99, "chat": {"id": -200, "type": "supergroup"}},
"data": "menu:main",
},
}
with app.test_request_context(
"/bot/telegram/webhook", method="POST", json=payload
):
bot.telegram_webhook()
with app.test_request_context(
"/bot/telegram/webhook", method="POST", json=payload
):
bot.telegram_webhook()
assert calls == []
assert answered == ["dup-cb", "dup-cb"]
def test_webhook_cmd_callback_ignores_not_modified(monkeypatch):
from routes import openclaw_bot_routes as bot
edited = []
sent = []
def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"):
edited.append((chat_id, message_id, text, keyboard, parse_mode))
return {"ok": False, "description": "Bad Request: message is not modified"}
def fake_handle_cmd(cmd, arg, chat_id, reply_to):
bot.send_message(chat_id, f"{cmd}:{arg}", reply_to=reply_to)
def fake_send_message(chat_id, text, reply_to=None, keyboard=None, parse_mode="Markdown", **_kwargs):
sent.append((chat_id, text, reply_to, keyboard, parse_mode))
return {"ok": True}
monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text)
monkeypatch.setattr(bot, "send_message", fake_send_message)
monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
app = _build_request_app()
payload = {
"update_id": 20002,
"callback_query": {
"id": "cb4",
"from": {"id": 777},
"message": {"message_id": 101, "chat": {"id": -200, "type": "supergroup"}},
"data": "cmd:sales:2026/04/30",
},
}
with app.test_request_context(
"/bot/telegram/webhook", method="POST", json=payload
):
bot.telegram_webhook()
assert edited
assert sent == []
def test_webhook_menu_callback_does_not_duplicate_on_message_not_found(monkeypatch):
from routes import openclaw_bot_routes as bot
sent = []
def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"):
return {
"ok": False,
"error_code": 404,
"description": "Bad Request: message to edit not found",
}
def fake_send_message(chat_id, text, reply_to=None, keyboard=None, parse_mode="Markdown", **_kwargs):
sent.append((chat_id, text, reply_to, keyboard, parse_mode))
return {"ok": True}
monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text)
monkeypatch.setattr(bot, "send_message", fake_send_message)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
app = _build_request_app()
payload = {
"update_id": 10006,
"callback_query": {
"id": "cb5",
"from": {"id": 777},
"message": {"message_id": 222, "chat": {"id": -200, "type": "supergroup"}},
"data": "menu:main",
},
}
with app.test_request_context(
"/bot/telegram/webhook", method="POST", json=payload
):
bot.telegram_webhook()
assert sent == []
def test_webhook_cmd_callback_does_not_duplicate_on_message_not_found(monkeypatch):
from routes import openclaw_bot_routes as bot
sent = []
def fake_edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"):
return {
"ok": False,
"error_code": 404,
"description": "Bad Request: MESSAGE_ID_INVALID",
}
def fake_handle_cmd(cmd, arg, chat_id, reply_to):
bot.send_message(chat_id, f"{cmd}:{arg}", reply_to=reply_to)
def fake_send_message(chat_id, text, reply_to=None, keyboard=None, parse_mode="Markdown", **_kwargs):
sent.append((chat_id, text, reply_to, keyboard, parse_mode))
return {"ok": True}
monkeypatch.setattr(bot, "edit_message_text", fake_edit_message_text)
monkeypatch.setattr(bot, "send_message", fake_send_message)
monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
app = _build_request_app()
payload = {
"update_id": 10007,
"callback_query": {
"id": "cb6",
"from": {"id": 777},
"message": {"message_id": 333, "chat": {"id": -200, "type": "supergroup"}},
"data": "cmd:sales:2026/04/30",
},
}
with app.test_request_context(
"/bot/telegram/webhook", method="POST", json=payload
):
bot.telegram_webhook()
assert sent == []
def test_webhook_callback_dedup_key_without_update_id(monkeypatch):
from routes import openclaw_bot_routes as bot
calls = []
# 當 callback 沒有 update_id 時,第二次同樣 payload 要直接被 dedupe
# 但第一次仍應可正常執行一次。
from services import telegram_update_guard as guard
guard._seen_update_ids.clear()
guard._seen_update_id_set.clear()
def fake_handle_cmd(cmd, arg, chat_id, reply_to):
calls.append((cmd, arg, chat_id, reply_to))
monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
app = _build_request_app()
payload = {
"callback_query": {
"id": "cb-no-id",
"from": {"id": 777},
"message": {"message_id": 123, "chat": {"id": -200, "type": "supergroup"}},
"data": "cmd:sales:2026/04/30",
},
}
with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload):
bot.telegram_webhook()
with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload):
bot.telegram_webhook()
assert calls == [('sales', '2026/04/30', -200, 123)]
def test_webhook_callback_dedup_key_varies_by_message_id(monkeypatch):
from routes import openclaw_bot_routes as bot
calls = []
from services import telegram_update_guard as guard
guard._seen_update_ids.clear()
guard._seen_update_id_set.clear()
def fake_handle_cmd(cmd, arg, chat_id, reply_to):
calls.append((cmd, arg, chat_id, reply_to))
monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
app = _build_request_app()
payload_1 = {
"callback_query": {
"id": "cb-no-id",
"from": {"id": 777},
"message": {"message_id": 201, "chat": {"id": -200, "type": "supergroup"}},
"data": "cmd:sales:2026/04/30",
},
}
payload_2 = {
"callback_query": {
"id": "cb-no-id",
"from": {"id": 777},
"message": {"message_id": 202, "chat": {"id": -200, "type": "supergroup"}},
"data": "cmd:sales:2026/04/30",
},
}
with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload_1):
bot.telegram_webhook()
with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload_2):
bot.telegram_webhook()
assert calls == [
('sales', '2026/04/30', -200, 201),
('sales', '2026/04/30', -200, 202),
]
def test_webhook_callback_dedup_with_same_callback_query_id_different_update_id(monkeypatch):
from routes import openclaw_bot_routes as bot
calls = []
from services import telegram_update_guard as guard
guard._seen_update_ids.clear()
guard._seen_update_id_set.clear()
def fake_handle_cmd(cmd, arg, chat_id, reply_to):
calls.append((cmd, arg, chat_id, reply_to))
monkeypatch.setattr(bot, "handle_cmd", fake_handle_cmd)
monkeypatch.setattr(bot, "_is_authorized", lambda _chat_type, _chat_id, _uid: True)
monkeypatch.setattr(bot, "answer_callback", lambda _cq_id, text="": None)
monkeypatch.setattr(bot, "send_typing", lambda _chat_id: None)
app = _build_request_app()
payload_1 = {
"update_id": 30001,
"callback_query": {
"id": "cb-repeat",
"from": {"id": 777},
"message": {"message_id": 301, "chat": {"id": -200, "type": "supergroup"}},
"data": "cmd:sales:2026/04/30",
},
}
payload_2 = {
"update_id": 30002,
"callback_query": {
"id": "cb-repeat",
"from": {"id": 777},
"message": {"message_id": 301, "chat": {"id": -200, "type": "supergroup"}},
"data": "cmd:sales:2026/04/30",
},
}
with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload_1):
bot.telegram_webhook()
with app.test_request_context("/bot/telegram/webhook", method="POST", json=payload_2):
bot.telegram_webhook()
assert calls == [('sales', '2026/04/30', -200, 301)]

View File

@@ -1,4 +1,5 @@
from pathlib import Path
import importlib
class FakeResponse:
@@ -86,3 +87,15 @@ def test_openclaw_routes_keep_tg_helper_import_for_webhook_management():
assert "_tg('setMyCommands'" in route_source
assert "_tg('setWebhook'" in route_source
assert " _tg,\n" in route_source
def test_openclaw_telegram_api_falls_back_to_shared_bot_token(monkeypatch):
monkeypatch.delenv("OPENCLAW_BOT_TOKEN", raising=False)
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "shared-token")
from services.openclaw_bot import telegram_api
reloaded = importlib.reload(telegram_api)
assert reloaded.BOT_TOKEN == "shared-token"
assert reloaded.BOT_API_URL == "https://api.telegram.org/botshared-token"

View File

@@ -0,0 +1,26 @@
from time import time
def test_update_guard_detects_duplicate_key():
from services import telegram_update_guard as guard
# 使用可變動 key避免被歷史資料干擾
unique = f"unit-test-{int(time() * 1000)}"
# 清掉本機快取,避免測試順序影響
guard._seen_update_ids.clear()
guard._seen_update_id_set.clear()
assert guard.is_duplicate_update(unique, namespace="pytest") is False
assert guard.is_duplicate_update(unique, namespace="pytest") is True
def test_update_guard_separates_namespace():
from services import telegram_update_guard as guard
guard._seen_update_ids.clear()
guard._seen_update_id_set.clear()
event_id = f"namespace-check-{int(time() * 1000)}"
assert guard.is_duplicate_update(event_id, namespace="a") is False
assert guard.is_duplicate_update(event_id, namespace="b") is False

View File

@@ -0,0 +1,172 @@
import asyncio
import pytest
from types import SimpleNamespace
from services import telegram_bot_service
pytestmark = pytest.mark.skipif(
not telegram_bot_service.TELEGRAM_AVAILABLE,
reason="python-telegram-bot 未安裝",
)
def _make_polling_update(query):
"""建立最小的 polling callback 更新結構。"""
return SimpleNamespace(callback_query=query)
class _FakeMessage:
def __init__(self, chat_id=-200, message_id=1):
self.chat_id = chat_id
self.message_id = message_id
self.replies = []
async def reply_text(self, text, **kwargs):
self.replies.append((text, kwargs))
class _FakeQuery:
def __init__(self, query_id, data, message):
self.id = query_id
self.data = data
self.message = message
self.answers = 0
async def answer(self):
self.answers += 1
async def edit_message_text(self, *args, **kwargs):
return {"ok": True}
def _run(coro):
return asyncio.run(coro)
def test_polling_callback_dedup_without_update_id(monkeypatch):
from services.telegram_bot_service import TrendTelegramBot
seen = {}
def fake_dedupe(_key, namespace="telegram_update"):
if _key in seen:
return True
seen[_key] = True
return False
bot = TrendTelegramBot(token="dummy")
called = []
async def fake_openclaw_callback(*args):
called.append(args)
monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe)
bot._handle_openclaw_callback = fake_openclaw_callback
context = SimpleNamespace(user_data={})
query = _FakeQuery("cb-no-id", "cmd:sales:2026/04/30", _FakeMessage(message_id=123))
_run(bot.handle_callback(_make_polling_update(query), context))
_run(bot.handle_callback(_make_polling_update(query), context))
assert len(called) == 1
assert called[0][2] == "cmd:sales:2026/04/30"
def test_polling_callback_dedup_depends_on_message_id(monkeypatch):
from services.telegram_bot_service import TrendTelegramBot
seen = {}
def fake_dedupe(_key, namespace="telegram_update"):
if _key in seen:
return True
seen[_key] = True
return False
bot = TrendTelegramBot(token="dummy")
called = []
async def fake_openclaw_callback(*args):
called.append(args)
monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe)
bot._handle_openclaw_callback = fake_openclaw_callback
context = SimpleNamespace(user_data={})
q1 = _FakeQuery("cb-no-id", "cmd:sales:2026/04/30", _FakeMessage(message_id=201))
q2 = _FakeQuery("cb-no-id", "cmd:sales:2026/04/30", _FakeMessage(message_id=202))
_run(bot.handle_callback(_make_polling_update(q1), context))
_run(bot.handle_callback(_make_polling_update(q2), context))
assert called == [
(q1, context, "cmd:sales:2026/04/30"),
(q2, context, "cmd:sales:2026/04/30"),
]
def test_polling_callback_dedup_with_same_query_id_different_update_id(monkeypatch):
from services.telegram_bot_service import TrendTelegramBot
seen = {}
def fake_dedupe(_key, namespace="telegram_update"):
if _key in seen:
return True
seen[_key] = True
return False
bot = TrendTelegramBot(token="dummy")
called = []
async def fake_openclaw_callback(*args):
called.append(args)
monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe)
bot._handle_openclaw_callback = fake_openclaw_callback
context = SimpleNamespace(user_data={})
q1 = _FakeQuery("cb-repeat", "cmd:sales:2026/04/30", _FakeMessage(message_id=301))
q2 = _FakeQuery("cb-repeat", "cmd:sales:2026/04/30", _FakeMessage(message_id=301))
# 為了模擬 update_id 不同,帶入不同的 update 物件
u1 = SimpleNamespace(callback_query=q1, effective_user=SimpleNamespace(id=777), update_id=30001)
u2 = SimpleNamespace(callback_query=q2, effective_user=SimpleNamespace(id=777), update_id=30002)
_run(bot.handle_callback(u1, context))
_run(bot.handle_callback(u2, context))
assert called == [(q1, context, "cmd:sales:2026/04/30")]
def test_polling_callback_normalizes_legacy_menu_prefix(monkeypatch):
from services.telegram_bot_service import TrendTelegramBot
seen = {}
def fake_dedupe(_key, namespace="telegram_update"):
if _key in seen:
return True
seen[_key] = True
return False
bot = TrendTelegramBot(token="dummy")
normalized = []
async def fake_openclaw_callback(query, context, data):
normalized.append(data)
monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe)
bot._handle_openclaw_callback = fake_openclaw_callback
context = SimpleNamespace(user_data={})
query = _FakeQuery("cb-menu", "menu_main", _FakeMessage(message_id=321))
_run(bot.handle_callback(_make_polling_update(query), context))
assert normalized == ["menu:main"]