Files
ewoooc/tests/test_bot_api_telegram_delivery.py
OoO 4d62731730
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
[V10.322] 修正 Telegram 決策審核推播入口
2026-05-20 12:35:12 +08:00

134 lines
4.1 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Bot API Telegram delivery contract tests."""
from flask import Flask
def test_price_decision_notify_uses_central_telegram_sender(monkeypatch):
from routes import bot_api_routes
class FakeResult:
def fetchall(self):
return [("101",), ("202",)]
class FakeConnection:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def execute(self, _statement):
return FakeResult()
class FakeEngine:
def connect(self):
return FakeConnection()
class FakeDatabaseManager:
engine = FakeEngine()
sent = {}
def fake_send_telegram_with_result(text, chat_ids=None, reply_markup=None, parse_mode=None):
sent["text"] = text
sent["chat_ids"] = chat_ids
sent["reply_markup"] = reply_markup
sent["parse_mode"] = parse_mode
return {"ok": True, "sent": len(chat_ids or []), "failed": 0, "errors": []}
monkeypatch.setattr(bot_api_routes, "BOT_API_TOKEN", "test-api-token")
monkeypatch.setattr(bot_api_routes, "DatabaseManager", lambda: FakeDatabaseManager())
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "test-telegram-token")
monkeypatch.setattr(
"services.telegram_templates.send_telegram_with_result",
fake_send_telegram_with_result,
)
app = Flask(__name__)
app.register_blueprint(bot_api_routes.bot_api_bp)
client = app.test_client()
response = client.post(
"/bot/api/price-decision/notify",
headers={"X-API-Token": "test-api-token"},
json={
"product_sku": "SKU<001>",
"product_name": "精華 <script>",
"current_price": 1200,
"suggested_price": 990,
"reason": "第一行<br>第二行<script>alert(1)</script>",
"insight_id": 42,
"report_url": "https://mo.wooo.work/report?a=1&b=<x>",
},
)
data = response.get_json()
assert response.status_code == 200
assert data["success"] is True
assert data["sent"] == 2
assert sent["chat_ids"] == ["101", "202"]
assert sent["parse_mode"] == "HTML"
assert sent["reply_markup"]["inline_keyboard"][0][0]["callback_data"] == "momo:pa:42"
assert "<br" not in sent["text"].lower()
assert "<script>" not in sent["text"]
assert "第一行\n第二行&lt;script&gt;alert(1)&lt;/script&gt;" in sent["text"]
def test_price_decision_notify_does_not_fallback_when_no_admins(monkeypatch):
from routes import bot_api_routes
class FakeResult:
def fetchall(self):
return []
class FakeConnection:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def execute(self, _statement):
return FakeResult()
class FakeEngine:
def connect(self):
return FakeConnection()
class FakeDatabaseManager:
engine = FakeEngine()
def fail_send(*_args, **_kwargs):
raise AssertionError("empty admin list must not fall back to default chat")
monkeypatch.setattr(bot_api_routes, "BOT_API_TOKEN", "test-api-token")
monkeypatch.setattr(bot_api_routes, "DatabaseManager", lambda: FakeDatabaseManager())
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "test-telegram-token")
monkeypatch.setattr("services.telegram_templates.send_telegram_with_result", fail_send)
app = Flask(__name__)
app.register_blueprint(bot_api_routes.bot_api_bp)
client = app.test_client()
response = client.post(
"/bot/api/price-decision/notify",
headers={"X-API-Token": "test-api-token"},
json={
"product_sku": "SKU001",
"product_name": "精華",
"current_price": 1200,
"suggested_price": 990,
"reason": "測試",
"insight_id": 42,
},
)
data = response.get_json()
assert response.status_code == 200
assert data["success"] is True
assert data["sent"] == 0
assert data["total_admins"] == 0
assert data["errors"] == []