""" Security Interceptor - Telegram Gateway 守門員 =============================================== Phase 5.4.2: CISO 安全需求實作 Features: - Telegram user_id 白名單驗證 - Nonce 防重放攻擊 (Redis + Memory fallback) - HMAC 簽章二次驗證 安全鐵律: - 只有白名單內的 user_id 可以簽核 - 每個 Nonce 只能使用一次 - 過期的 Nonce 自動清除 """ import time from dataclasses import dataclass from typing import Protocol, runtime_checkable import structlog from src.core.config import settings logger = structlog.get_logger(__name__) # ============================================================================= # Nonce Store - 防重放攻擊 # ============================================================================= class NonceStore: """ Nonce 儲存器 - 防止 Replay Attack 實作策略: 1. 優先使用 Redis (生產環境) 2. 降級使用 Memory (開發環境) 每個 Nonce 只能使用一次,過期後自動清除 """ def __init__(self): self._memory_store: dict[str, float] = {} self._redis_client = None self._use_redis = False async def initialize(self) -> bool: """初始化 Redis 連線""" try: import redis.asyncio as redis self._redis_client = redis.from_url( settings.REDIS_URL, decode_responses=True, ) # 測試連線 await self._redis_client.ping() self._use_redis = True logger.info("nonce_store_redis_initialized") return True except Exception as e: logger.warning( "nonce_store_redis_failed_fallback_memory", error=str(e), ) self._use_redis = False return False async def check_and_consume(self, nonce: str) -> bool: """ 檢查 Nonce 是否有效,若有效則消費 (標記為已使用) Args: nonce: 唯一識別碼 Returns: bool: True = 有效 (首次使用), False = 無效 (重複或過期) """ if self._use_redis: return await self._check_redis(nonce) else: return self._check_memory(nonce) async def _check_redis(self, nonce: str) -> bool: """Redis 實作: 使用 SETNX + TTL""" key = f"awoooi:nonce:{nonce}" ttl = settings.WEBHOOK_NONCE_TTL # SETNX: 只有 key 不存在時才設定成功 result = await self._redis_client.set( key, "1", nx=True, # Only set if not exists ex=ttl, # Expire after TTL seconds ) if result: logger.info("nonce_consumed_redis", nonce=nonce[:16] + "...") return True else: logger.warning("nonce_replay_detected_redis", nonce=nonce[:16] + "...") return False def _check_memory(self, nonce: str) -> bool: """Memory 實作: 使用 dict + timestamp""" now = time.time() ttl = settings.WEBHOOK_NONCE_TTL # 清理過期 Nonce self._cleanup_expired(now, ttl) # 檢查是否已存在 if nonce in self._memory_store: logger.warning("nonce_replay_detected_memory", nonce=nonce[:16] + "...") return False # 記錄 Nonce self._memory_store[nonce] = now logger.info("nonce_consumed_memory", nonce=nonce[:16] + "...") return True def _cleanup_expired(self, now: float, ttl: int) -> None: """清理過期的 Nonce (Memory 模式)""" expired = [ nonce for nonce, ts in self._memory_store.items() if now - ts > ttl ] for nonce in expired: del self._memory_store[nonce] if expired: logger.debug("nonce_cleanup", removed_count=len(expired)) # ============================================================================= # Telegram Security Interceptor # ============================================================================= @dataclass class TelegramUser: """Telegram 使用者資訊""" user_id: int username: str | None = None first_name: str | None = None is_whitelisted: bool = False class SecurityInterceptorError(Exception): """Security Interceptor 錯誤""" pass class UserNotWhitelistedError(SecurityInterceptorError): """使用者不在白名單內""" pass class NonceReplayError(SecurityInterceptorError): """Nonce 重放攻擊""" pass class SignatureVerificationError(SecurityInterceptorError): """簽章驗證失敗""" pass # ============================================================================= # Protocol Interface (Phase 17 P1 - 紅區治理) # ============================================================================= @runtime_checkable class ITelegramSecurityInterceptor(Protocol): """ TelegramSecurityInterceptor 介面定義 用途: - 依賴注入 (DI) 時的型別約束 - 測試時 Mock 的型別檢查 - 符合 leWOOOgo 積木化規範 Tier 3 紅區服務: 修改需首席架構師簽核 @see feedback_lewooogo_modular_enforcement.md @see docs/RED_ZONES.md """ async def initialize(self) -> bool: """初始化攔截器""" ... def is_whitelisted(self, user_id: int) -> bool: """檢查 user_id 是否在白名單內""" ... async def verify_callback( self, user_id: int, callback_id: str, nonce: str | None = None, ) -> TelegramUser: """驗證 Telegram Callback 請求""" ... @property def whitelist(self) -> list[int]: """取得白名單 user_id 列表""" ... class TelegramSecurityInterceptor: """ Telegram 安全攔截器 CISO 安全要求: 1. user_id 白名單驗證 (只有統帥可以簽核) 2. Nonce 防重放攻擊 3. 可選: Telegram Bot Token HMAC 驗證 所有簽核請求必須通過此攔截器 """ def __init__(self): self._nonce_store = NonceStore() self._initialized = False async def initialize(self) -> bool: """初始化攔截器""" await self._nonce_store.initialize() self._initialized = True logger.info("telegram_security_interceptor_initialized") return True @property def whitelist(self) -> list[int]: """取得白名單 user_id 列表""" return settings.OPENCLAW_TG_USER_WHITELIST def is_whitelisted(self, user_id: int) -> bool: """ 檢查 user_id 是否在白名單內 Args: user_id: Telegram user ID Returns: bool: True = 在白名單內 """ # 空白名單 = 禁止所有人 if not self.whitelist: logger.warning( "telegram_whitelist_empty", user_id=user_id, message="Whitelist is empty, all users denied", ) return False is_allowed = user_id in self.whitelist if is_allowed: logger.info("telegram_user_whitelisted", user_id=user_id) else: logger.warning( "telegram_user_not_whitelisted", user_id=user_id, whitelist=self.whitelist, ) return is_allowed async def verify_callback( self, user_id: int, callback_id: str, nonce: str | None = None, ) -> TelegramUser: """ 驗證 Telegram Callback 請求 安全檢查流程: 1. 白名單驗證 2. Nonce 防重放 (如果提供) Args: user_id: Telegram user ID callback_id: Callback Query ID nonce: 可選的 Nonce (防重放) Returns: TelegramUser: 驗證通過的使用者資訊 Raises: UserNotWhitelistedError: 使用者不在白名單 NonceReplayError: Nonce 重放攻擊 """ if not self._initialized: await self.initialize() # ======================================================================= # Step 1: 白名單驗證 # ======================================================================= if not self.is_whitelisted(user_id): logger.warning( "telegram_callback_rejected_not_whitelisted", user_id=user_id, callback_id=callback_id, ) raise UserNotWhitelistedError( f"User {user_id} is not in the approval whitelist" ) # ======================================================================= # Step 2: Nonce 防重放 (如果提供) # ======================================================================= if nonce: is_valid = await self._nonce_store.check_and_consume(nonce) if not is_valid: logger.warning( "telegram_callback_rejected_nonce_replay", user_id=user_id, callback_id=callback_id, nonce=nonce[:16] + "...", ) raise NonceReplayError( f"Nonce replay detected: {nonce[:16]}..." ) # ======================================================================= # 驗證通過 # ======================================================================= logger.info( "telegram_callback_verified", user_id=user_id, callback_id=callback_id, nonce_checked=bool(nonce), ) return TelegramUser( user_id=user_id, is_whitelisted=True, ) async def verify_webhook_update( self, update_id: int, user_id: int, ) -> TelegramUser: """ 驗證 Telegram Webhook Update 用於驗證來自 Telegram Bot API 的 Update 請求 Args: update_id: Telegram Update ID (作為 Nonce) user_id: Telegram user ID Returns: TelegramUser: 驗證通過的使用者資訊 Raises: UserNotWhitelistedError: 使用者不在白名單 NonceReplayError: Update ID 重放 """ # 使用 update_id 作為 Nonce nonce = f"tg_update_{update_id}" return await self.verify_callback( user_id=user_id, callback_id=str(update_id), nonce=nonce, ) def generate_callback_nonce(self, approval_id: str, action: str) -> str: """ 產生 Callback Nonce (嵌入到 callback_data) 格式: {action}:{approval_id}:{timestamp}:{random} Args: approval_id: 簽核單 ID action: 操作類型 (approve/reject) Returns: str: 唯一的 Nonce """ import secrets timestamp = int(time.time()) random_part = secrets.token_hex(4) nonce = f"{action}:{approval_id}:{timestamp}:{random_part}" logger.debug( "callback_nonce_generated", approval_id=approval_id, action=action, ) return nonce def parse_callback_data(self, callback_data: str) -> dict: """ 解析 Callback Data 格式: {action}:{approval_id}:{timestamp}:{random} Args: callback_data: Telegram callback_data 字串 Returns: dict: 解析結果 {action, approval_id, timestamp, nonce} """ parts = callback_data.split(":") if len(parts) != 4: raise ValueError(f"Invalid callback_data format: {callback_data}") return { "action": parts[0], "approval_id": parts[1], "timestamp": int(parts[2]), "nonce": callback_data, # 整個字串作為 nonce } # ============================================================================= # Singleton # ============================================================================= _interceptor: TelegramSecurityInterceptor | None = None def get_security_interceptor() -> TelegramSecurityInterceptor: """取得全域 TelegramSecurityInterceptor 實例""" global _interceptor if _interceptor is None: _interceptor = TelegramSecurityInterceptor() return _interceptor