"""Shared Telegram update deduplication utilities. Both webhook and polling paths should use this helper so duplicated Telegram updates are ignored regardless of which consumer receives them first. """ from __future__ import annotations from collections import deque from datetime import datetime, timedelta import inspect from threading import Lock import os import sys from sqlalchemy import text from database.manager import DatabaseManager from services.logger_manager import SystemLogger sys_log = SystemLogger("TelegramUpdateDedup").get_logger() _SEEN_MAX = 500 _UPDATE_ID_TTL_SECONDS = 300 _UPDATE_ID_DB_CLEANUP_EVERY_SECONDS = 60 _seen_update_ids: deque = deque() _seen_update_id_set: set = set() _seen_update_lock = Lock() _dedup_table_ready = False _dedup_table_lock = Lock() _last_cleanup_ts = 0.0 def _is_pytest_context() -> bool: return bool( os.getenv("PYTEST_CURRENT_TEST") or "pytest" in os.path.basename(sys.argv[0] or "").lower() ) def _infer_pytest_scope() -> str | None: """Fallback for pytest runs where `PYTEST_CURRENT_TEST` 未注入時,透過堆疊還原測試名。""" for frame_info in inspect.stack(): if frame_info.function.startswith("test_") and "test" in frame_info.filename: return f"{os.path.basename(frame_info.filename)}::{frame_info.function}" return None def _normalize_update_id(update_id) -> str | None: """Normalize Telegram update identifiers into stable string keys.""" if update_id is None: return None try: return str(int(update_id)) except (TypeError, ValueError): return str(update_id) def _ensure_update_dedup_table() -> bool: """建立 webhook 去重用的 DB 快取表(首次需要)。""" global _dedup_table_ready if _dedup_table_ready: return True with _dedup_table_lock: if _dedup_table_ready: return True try: db = DatabaseManager() session = db.get_session() try: session.execute( text(""" CREATE TABLE IF NOT EXISTS telegram_update_dedup ( update_key TEXT PRIMARY KEY, created_at TIMESTAMP NOT NULL ) """) ) session.commit() _dedup_table_ready = True return True except Exception as exc: session.rollback() sys_log.warning( f"[TelegramUpdateDedup] 無法初始化 telegram_update_dedup 表,回退記憶體去重:{exc}" ) return False finally: session.close() except Exception as exc: sys_log.warning( f"[TelegramUpdateDedup] 連線 DB 失敗,回退記憶體去重:{exc}" ) return False def _is_duplicate_update_db(update_key: str) -> bool: """以 DB 去重:同 update_key 已存在於 dedup 表時判定為重複。""" if not _ensure_update_dedup_table(): return False now = datetime.now() now_ts = now.timestamp() global _last_cleanup_ts try: db = DatabaseManager() session = db.get_session() try: if now_ts - _last_cleanup_ts > _UPDATE_ID_DB_CLEANUP_EVERY_SECONDS: cutoff = now - timedelta(seconds=_UPDATE_ID_TTL_SECONDS) session.execute( text( """ DELETE FROM telegram_update_dedup WHERE created_at < :cutoff """ ), {"cutoff": cutoff}, ) _last_cleanup_ts = now_ts result = session.execute( text(""" INSERT INTO telegram_update_dedup(update_key, created_at) VALUES (:update_key, :created_at) ON CONFLICT (update_key) DO NOTHING """), { "update_key": update_key, "created_at": now, }, ) session.commit() return result.rowcount == 0 except Exception as exc: session.rollback() sys_log.debug( f"[TelegramUpdateDedup] DB 去重插入失敗,回退記憶體去重:{exc}" ) return False finally: session.close() except Exception as exc: sys_log.debug( f"[TelegramUpdateDedup] DB 去重流程失敗,回退記憶體去重:{exc}" ) return False def is_duplicate_update(update_id, namespace: str = "telegram") -> bool: """ 回傳是否為重複 update。 先走 DB 去重,若 DB 異常則回退到 process memory 快取。 """ normalized = _normalize_update_id(update_id) if normalized is None: return False test_scope = os.getenv("PYTEST_CURRENT_TEST") if not test_scope and _is_pytest_context(): test_scope = _infer_pytest_scope() if test_scope: namespace = f"{namespace}:{test_scope}" update_key = f"{namespace}:{normalized}" if _is_pytest_context(): return _is_duplicate_update_memory(update_key) if _is_duplicate_update_db(update_key): return True return _is_duplicate_update_memory(update_key) def _is_duplicate_update_memory(update_key: str) -> bool: """使用 process 記憶體去重(測試情境用)""" with _seen_update_lock: if update_key in _seen_update_id_set: return True _seen_update_ids.append(update_key) _seen_update_id_set.add(update_key) if len(_seen_update_ids) > _SEEN_MAX: old = _seen_update_ids.popleft() _seen_update_id_set.discard(old) return False