Files
ewoooc/services/openclaw_bot/telegram_api.py
OoO 1a886d962b
All checks were successful
CD Pipeline / deploy (push) Successful in 8m50s
fix(telegram): dedupe webhook+polling updates via shared DB guard
Webhook (Flask) and polling (momo-telegram-bot) consumed the same
Telegram update_id, causing /menu callbacks to fire twice. Add a
shared dedup module backed by telegram_update_dedup table (300s TTL,
60s cleanup) with in-memory fallback, wired into both paths.

Polling launcher now skips startup when webhook is configured to
prevent dual-consumption at the source.

38 tests across webhook, menu keyboards, telegram_api, dedup guard,
and trend bot service.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 12:01:04 +08:00

172 lines
6.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Telegram Bot API helpers for OpenClaw Bot.
Route modules should keep only request handling and command dispatch. Telegram
delivery, Markdown fallback, and file upload glue live here so they can be
reused and tested without importing the 5k-line Blueprint.
"""
from __future__ import annotations
import os
import re
import requests
from services.logger_manager import SystemLogger
BOT_TOKEN = os.getenv("OPENCLAW_BOT_TOKEN") or os.getenv("TELEGRAM_BOT_TOKEN", "")
BOT_API_URL = f"https://api.telegram.org/bot{BOT_TOKEN}"
sys_log = SystemLogger("OpenClawBot").get_logger()
def _tg(method: str, payload: dict):
try:
response = requests.post(f"{BOT_API_URL}/{method}", json=payload, timeout=10)
if not response.ok:
sys_log.warning(f"[OpenClawBot] {method} failed: {response.text[:200]}")
return response.json()
except Exception as exc:
sys_log.error(f"[OpenClawBot] {method} error: {exc}")
return {}
def _strip_markdown(text: str) -> str:
"""移除 Telegram Markdown v1 格式符號轉為純文字send fallback 用)"""
text = re.sub(r"\*([^*]+)\*", r"\1", text)
text = re.sub(r"_([^_]+)_", r"\1", text)
text = re.sub(r"`([^`]+)`", r"\1", text)
text = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", text)
return text
def send_message(chat_id, text, reply_to=None, keyboard=None, parse_mode="Markdown"):
"""送出 Telegram 訊息。Markdown 解析失敗時自動降級為純文字重送。"""
def _build_payload(txt, pm):
payload = {"chat_id": chat_id, "text": txt, "disable_web_page_preview": True}
if pm:
payload["parse_mode"] = pm
if reply_to:
payload["reply_to_message_id"] = reply_to
if keyboard:
payload["reply_markup"] = {"inline_keyboard": keyboard}
return payload
result = _tg("sendMessage", _build_payload(text, parse_mode))
if result.get("ok"):
return result
if parse_mode and not result.get("ok"):
err = result.get("description", "")
if "parse" in err.lower() or "entity" in err.lower() or "can't find" in err.lower():
sys_log.warning("[OpenClawBot] Markdown parse failed, retrying as plain text")
result2 = _tg("sendMessage", _build_payload(_strip_markdown(text), None))
if result2.get("ok"):
return result2
if len(text) > 4000:
sys_log.warning(f"[OpenClawBot] Message too long ({len(text)}), truncating")
truncated = _strip_markdown(text[:3900]) + "\n...(訊息過長已截斷)"
return _tg("sendMessage", _build_payload(truncated, None))
return result
def _build_edit_payload(chat_id, message_id, text, pm, keyboard):
payload = {"chat_id": chat_id, "message_id": message_id, "text": text}
if pm:
payload["parse_mode"] = pm
if keyboard:
payload["reply_markup"] = {"inline_keyboard": keyboard}
return payload
def edit_message_text(chat_id, message_id, text, keyboard=None, parse_mode="Markdown"):
"""編輯既有訊息。Markdown 失敗時自動降級為純文字。"""
result = _tg(
"editMessageText",
_build_edit_payload(chat_id, message_id, text, parse_mode, keyboard),
)
if result.get("ok"):
return result
if parse_mode and not result.get("ok"):
err = result.get("description", "")
if "parse" in err.lower() or "entity" in err.lower() or "can't find" in err.lower():
sys_log.warning("[OpenClawBot] editMessageText Markdown failed, retrying plain text")
result2 = _tg(
"editMessageText",
_build_edit_payload(chat_id, message_id, _strip_markdown(text), None, keyboard),
)
if result2.get("ok"):
return result2
if len(text) > 4000:
sys_log.warning(f"[OpenClawBot] Message too long ({len(text)}), truncating")
truncated = _strip_markdown(text[:3900]) + "\n...(訊息過長已截斷)"
return _tg("editMessageText", _build_edit_payload(chat_id, message_id, truncated, None, keyboard))
return result
def answer_callback(cq_id, text=""):
return _tg("answerCallbackQuery", {"callback_query_id": cq_id, "text": text})
def send_typing(chat_id):
try:
requests.post(
f"{BOT_API_URL}/sendChatAction",
json={"chat_id": chat_id, "action": "typing"},
timeout=5,
)
except Exception as exc:
sys_log.debug(f"[OpenClawBot] sendChatAction skipped: {exc}")
def send_photo(chat_id, file_path, caption="", reply_to=None):
"""透過 Telegram Bot API 傳送圖片"""
try:
with open(file_path, "rb") as handle:
data = {"chat_id": chat_id}
if caption:
data["caption"] = caption
if reply_to:
data["reply_to_message_id"] = reply_to
response = requests.post(
f"{BOT_API_URL}/sendPhoto",
data=data,
files={"photo": handle},
timeout=30,
)
if not response.ok:
sys_log.warning(f"[OpenClawBot] sendPhoto failed: {response.text[:200]}")
return response.json()
except Exception as exc:
sys_log.error(f"[OpenClawBot] sendPhoto error: {exc}")
return {}
def send_document(chat_id, file_path, caption="", reply_to=None):
"""透過 Telegram Bot API 傳送文件"""
try:
with open(file_path, "rb") as handle:
data = {"chat_id": chat_id}
if caption:
data["caption"] = caption
if reply_to:
data["reply_to_message_id"] = reply_to
response = requests.post(
f"{BOT_API_URL}/sendDocument",
data=data,
files={"document": handle},
timeout=30,
)
if not response.ok:
sys_log.warning(f"[OpenClawBot] sendDocument failed: {response.text[:200]}")
return response.json()
except Exception as exc:
sys_log.error(f"[OpenClawBot] sendDocument error: {exc}")
return {}