223 lines
6.5 KiB
Python
223 lines
6.5 KiB
Python
import asyncio
|
|
|
|
import pytest
|
|
|
|
from types import SimpleNamespace
|
|
|
|
from services import telegram_bot_service
|
|
|
|
|
|
pytestmark = pytest.mark.skipif(
|
|
not telegram_bot_service.TELEGRAM_AVAILABLE,
|
|
reason="python-telegram-bot 未安裝",
|
|
)
|
|
|
|
|
|
def _make_polling_update(query):
|
|
"""建立最小的 polling callback 更新結構。"""
|
|
return SimpleNamespace(callback_query=query)
|
|
|
|
|
|
class _FakeMessage:
|
|
def __init__(self, chat_id=-200, message_id=1):
|
|
self.chat_id = chat_id
|
|
self.message_id = message_id
|
|
self.replies = []
|
|
self.edits = []
|
|
|
|
async def reply_text(self, text, **kwargs):
|
|
self.replies.append((text, kwargs))
|
|
|
|
async def reply_chat_action(self, action=None, **kwargs):
|
|
self.edits.append(("typing", action, kwargs))
|
|
|
|
|
|
class _FakeQuery:
|
|
def __init__(self, query_id, data, message):
|
|
self.id = query_id
|
|
self.data = data
|
|
self.message = message
|
|
self.answers = 0
|
|
|
|
async def answer(self):
|
|
self.answers += 1
|
|
|
|
async def edit_message_text(self, *args, **kwargs):
|
|
return {"ok": True}
|
|
|
|
|
|
def _run(coro):
|
|
return asyncio.run(coro)
|
|
|
|
|
|
def test_default_category_keyboard_uses_handled_trend_prefix():
|
|
from services.telegram_bot_service import CATEGORIES, TrendTelegramBot
|
|
|
|
bot = TrendTelegramBot(token="dummy")
|
|
keyboard = bot._get_category_keyboard()
|
|
callback_values = [
|
|
button.callback_data
|
|
for row in keyboard.inline_keyboard
|
|
for button in row
|
|
]
|
|
|
|
assert f"trend_{CATEGORIES[0]}" in callback_values
|
|
assert not any(value.startswith("cat_") for value in callback_values)
|
|
|
|
|
|
def test_polling_callback_dedup_without_update_id(monkeypatch):
|
|
from services.telegram_bot_service import TrendTelegramBot
|
|
|
|
seen = {}
|
|
|
|
def fake_dedupe(_key, namespace="telegram_update"):
|
|
if _key in seen:
|
|
return True
|
|
seen[_key] = True
|
|
return False
|
|
|
|
bot = TrendTelegramBot(token="dummy")
|
|
|
|
called = []
|
|
async def fake_openclaw_callback(*args):
|
|
called.append(args)
|
|
|
|
monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe)
|
|
bot._handle_openclaw_callback = fake_openclaw_callback
|
|
|
|
context = SimpleNamespace(user_data={})
|
|
query = _FakeQuery("cb-no-id", "cmd:sales:2026/04/30", _FakeMessage(message_id=123))
|
|
|
|
_run(bot.handle_callback(_make_polling_update(query), context))
|
|
_run(bot.handle_callback(_make_polling_update(query), context))
|
|
|
|
assert len(called) == 1
|
|
assert called[0][2] == "cmd:sales:2026/04/30"
|
|
|
|
|
|
def test_polling_callback_dedup_depends_on_message_id(monkeypatch):
|
|
from services.telegram_bot_service import TrendTelegramBot
|
|
|
|
seen = {}
|
|
|
|
def fake_dedupe(_key, namespace="telegram_update"):
|
|
if _key in seen:
|
|
return True
|
|
seen[_key] = True
|
|
return False
|
|
|
|
bot = TrendTelegramBot(token="dummy")
|
|
|
|
called = []
|
|
async def fake_openclaw_callback(*args):
|
|
called.append(args)
|
|
|
|
monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe)
|
|
bot._handle_openclaw_callback = fake_openclaw_callback
|
|
|
|
context = SimpleNamespace(user_data={})
|
|
q1 = _FakeQuery("cb-no-id", "cmd:sales:2026/04/30", _FakeMessage(message_id=201))
|
|
q2 = _FakeQuery("cb-no-id", "cmd:sales:2026/04/30", _FakeMessage(message_id=202))
|
|
|
|
_run(bot.handle_callback(_make_polling_update(q1), context))
|
|
_run(bot.handle_callback(_make_polling_update(q2), context))
|
|
|
|
assert called == [
|
|
(q1, context, "cmd:sales:2026/04/30"),
|
|
(q2, context, "cmd:sales:2026/04/30"),
|
|
]
|
|
|
|
|
|
def test_polling_callback_dedup_with_same_query_id_different_update_id(monkeypatch):
|
|
from services.telegram_bot_service import TrendTelegramBot
|
|
|
|
seen = {}
|
|
|
|
def fake_dedupe(_key, namespace="telegram_update"):
|
|
if _key in seen:
|
|
return True
|
|
seen[_key] = True
|
|
return False
|
|
|
|
bot = TrendTelegramBot(token="dummy")
|
|
|
|
called = []
|
|
|
|
async def fake_openclaw_callback(*args):
|
|
called.append(args)
|
|
|
|
monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe)
|
|
bot._handle_openclaw_callback = fake_openclaw_callback
|
|
|
|
context = SimpleNamespace(user_data={})
|
|
q1 = _FakeQuery("cb-repeat", "cmd:sales:2026/04/30", _FakeMessage(message_id=301))
|
|
q2 = _FakeQuery("cb-repeat", "cmd:sales:2026/04/30", _FakeMessage(message_id=301))
|
|
|
|
# 為了模擬 update_id 不同,帶入不同的 update 物件
|
|
u1 = SimpleNamespace(callback_query=q1, effective_user=SimpleNamespace(id=777), update_id=30001)
|
|
u2 = SimpleNamespace(callback_query=q2, effective_user=SimpleNamespace(id=777), update_id=30002)
|
|
|
|
_run(bot.handle_callback(u1, context))
|
|
_run(bot.handle_callback(u2, context))
|
|
|
|
assert called == [(q1, context, "cmd:sales:2026/04/30")]
|
|
|
|
|
|
def test_polling_callback_normalizes_legacy_menu_prefix(monkeypatch):
|
|
from services.telegram_bot_service import TrendTelegramBot
|
|
|
|
seen = {}
|
|
|
|
def fake_dedupe(_key, namespace="telegram_update"):
|
|
if _key in seen:
|
|
return True
|
|
seen[_key] = True
|
|
return False
|
|
|
|
bot = TrendTelegramBot(token="dummy")
|
|
normalized = []
|
|
|
|
async def fake_openclaw_callback(query, context, data):
|
|
normalized.append(data)
|
|
|
|
monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe)
|
|
bot._handle_openclaw_callback = fake_openclaw_callback
|
|
|
|
context = SimpleNamespace(user_data={})
|
|
query = _FakeQuery("cb-menu", "menu_main", _FakeMessage(message_id=321))
|
|
|
|
_run(bot.handle_callback(_make_polling_update(query), context))
|
|
|
|
assert normalized == ["menu:main"]
|
|
|
|
|
|
def test_polling_cmd_callback_uses_callback_context(monkeypatch):
|
|
from services.telegram_bot_service import TrendTelegramBot
|
|
from routes import openclaw_bot_routes as openclaw
|
|
|
|
seen = {}
|
|
|
|
def fake_dedupe(_key, namespace="telegram_update"):
|
|
if _key in seen:
|
|
return True
|
|
seen[_key] = True
|
|
return False
|
|
|
|
called = []
|
|
|
|
def fake_handle_cmd(cmd, arg, chat_id, reply_to):
|
|
called.append((cmd, arg, chat_id, reply_to, openclaw._CMD_FROM_CALLBACK_CTX.get()))
|
|
return None
|
|
|
|
bot = TrendTelegramBot(token="dummy")
|
|
|
|
context = SimpleNamespace(user_data={})
|
|
query = _FakeQuery("cb-context", "cmd:sales:2026/04/30", _FakeMessage(message_id=401))
|
|
|
|
monkeypatch.setattr(telegram_bot_service, "is_global_duplicate_update", fake_dedupe)
|
|
monkeypatch.setattr(openclaw, "handle_cmd", fake_handle_cmd)
|
|
|
|
_run(bot.handle_callback(_make_polling_update(query), context))
|
|
|
|
assert called == [("sales", "2026/04/30", -200, 401, True)]
|