""" Security Interceptor - Telegram Gateway 守門員 =============================================== Phase 5.4.2: CISO 安全需求實作 Features: - Telegram user_id 白名單驗證 - Nonce 防重放攻擊 (Redis + Memory fallback) - HMAC 簽章二次驗證 安全鐵律: - 只有白名單內的 user_id 可以簽核 - 每個 Nonce 只能使用一次 - 過期的 Nonce 自動清除 """ import hashlib import hmac import time from dataclasses import dataclass from typing import Protocol, runtime_checkable import structlog from src.core.config import settings logger = structlog.get_logger(__name__) # ============================================================================= # Nonce Store - 防重放攻擊 # ============================================================================= class NonceStore: """ Nonce 儲存器 - 防止 Replay Attack 實作策略: 1. 優先使用 Redis (生產環境) 2. 降級使用 Memory (開發環境) 每個 Nonce 只能使用一次,過期後自動清除 """ def __init__(self): self._memory_store: dict[str, float] = {} self._redis_client = None self._use_redis = False async def initialize(self) -> bool: """初始化 Redis 連線""" try: import redis.asyncio as redis self._redis_client = redis.from_url( settings.REDIS_URL, decode_responses=True, ) # 測試連線 await self._redis_client.ping() self._use_redis = True logger.info("nonce_store_redis_initialized") return True except Exception as e: logger.warning( "nonce_store_redis_failed_fallback_memory", error=str(e), ) self._use_redis = False return False async def check_and_consume(self, nonce: str) -> bool: """ 檢查 Nonce 是否有效,若有效則消費 (標記為已使用) Args: nonce: 唯一識別碼 Returns: bool: True = 有效 (首次使用), False = 無效 (重複或過期) """ if self._use_redis: return await self._check_redis(nonce) else: return self._check_memory(nonce) async def _check_redis(self, nonce: str) -> bool: """Redis 實作: 使用 SETNX + TTL""" key = f"awoooi:nonce:{nonce}" ttl = settings.WEBHOOK_NONCE_TTL # SETNX: 只有 key 不存在時才設定成功 result = await self._redis_client.set( key, "1", nx=True, # Only set if not exists ex=ttl, # Expire after TTL seconds ) if result: logger.info("nonce_consumed_redis", nonce=nonce[:16] + "...") return True else: logger.warning("nonce_replay_detected_redis", nonce=nonce[:16] + "...") return False def _check_memory(self, nonce: str) -> bool: """Memory 實作: 使用 dict + timestamp""" now = time.time() ttl = settings.WEBHOOK_NONCE_TTL # 清理過期 Nonce self._cleanup_expired(now, ttl) # 檢查是否已存在 if nonce in self._memory_store: logger.warning("nonce_replay_detected_memory", nonce=nonce[:16] + "...") return False # 記錄 Nonce self._memory_store[nonce] = now logger.info("nonce_consumed_memory", nonce=nonce[:16] + "...") return True def _cleanup_expired(self, now: float, ttl: int) -> None: """清理過期的 Nonce (Memory 模式)""" expired = [ nonce for nonce, ts in self._memory_store.items() if now - ts > ttl ] for nonce in expired: del self._memory_store[nonce] if expired: logger.debug("nonce_cleanup", removed_count=len(expired)) async def check_webhook_nonce(nonce: str, ttl: int = 600) -> bool: """ Webhook replay 防護:用 Redis NX 記錄 nonce(TTL=600s),重複使用回傳 False。 Service 層 helper,供 Router 層(webhooks.py)呼叫;禁止 Router 直接用 get_redis。 Redis 不可用時 fail open(回傳 True + 記錄 warning)。 P0-06 修正(ADR-116,2026-05-04 ogt + Claude Sonnet 4.6) """ from src.core.redis_client import get_redis nonce_key = f"webhook:nonce:{nonce}" try: redis = get_redis() stored = await redis.set(nonce_key, "1", nx=True, ex=ttl) if not stored: logger.warning("webhook_nonce_replay_detected", nonce_prefix=nonce[:16] + "...") return False logger.debug("webhook_nonce_registered", nonce_key=nonce_key) return True except Exception as exc: logger.warning( "webhook_nonce_redis_unavailable", error=str(exc), note="fail open: request allowed despite Redis unavailability", ) return True # ============================================================================= # Telegram Security Interceptor # ============================================================================= @dataclass class TelegramUser: """Telegram 使用者資訊""" user_id: int username: str | None = None first_name: str | None = None is_whitelisted: bool = False class SecurityInterceptorError(Exception): """Security Interceptor 錯誤""" pass class UserNotWhitelistedError(SecurityInterceptorError): """使用者不在白名單內""" pass class NonceReplayError(SecurityInterceptorError): """Nonce 重放攻擊""" pass class SignatureVerificationError(SecurityInterceptorError): """簽章驗證失敗""" pass # ============================================================================= # Protocol Interface (Phase 17 P1 - 紅區治理) # ============================================================================= @runtime_checkable class ITelegramSecurityInterceptor(Protocol): """ TelegramSecurityInterceptor 介面定義 用途: - 依賴注入 (DI) 時的型別約束 - 測試時 Mock 的型別檢查 - 符合 leWOOOgo 積木化規範 Tier 3 紅區服務: 修改需首席架構師簽核 @see feedback_lewooogo_modular_enforcement.md @see docs/RED_ZONES.md """ async def initialize(self) -> bool: """初始化攔截器""" ... def is_whitelisted(self, user_id: int) -> bool: """檢查 user_id 是否在白名單內""" ... async def verify_callback( self, user_id: int, callback_id: str, nonce: str | None = None, ) -> TelegramUser: """驗證 Telegram Callback 請求""" ... @property def whitelist(self) -> list[int]: """取得白名單 user_id 列表""" ... class TelegramSecurityInterceptor: """ Telegram 安全攔截器 CISO 安全要求: 1. user_id 白名單驗證 (只有統帥可以簽核) 2. Nonce 防重放攻擊 3. 可選: Telegram Bot Token HMAC 驗證 所有簽核請求必須通過此攔截器 """ def __init__(self): self._nonce_store = NonceStore() self._initialized = False async def initialize(self) -> bool: """初始化攔截器""" await self._nonce_store.initialize() self._initialized = True logger.info("telegram_security_interceptor_initialized") return True @property def whitelist(self) -> list[int]: """取得白名單 user_id 列表""" return settings.get_tg_user_whitelist() def is_whitelisted(self, user_id: int) -> bool: """ 檢查 user_id 是否在白名單內 Args: user_id: Telegram user ID Returns: bool: True = 在白名單內 """ # 空白名單 = 禁止所有人 if not self.whitelist: logger.warning( "telegram_whitelist_empty", user_id=user_id, message="Whitelist is empty, all users denied", ) return False is_allowed = user_id in self.whitelist if is_allowed: logger.info("telegram_user_whitelisted", user_id=user_id) else: logger.warning( "telegram_user_not_whitelisted", user_id=user_id, whitelist=self.whitelist, ) return is_allowed async def intercept_telegram(self, user_id: int) -> None: """ 攔截 Telegram 文字訊息 (ADR-044 Phase 22) 用於 _handle_chat_message 的白名單驗證。 與 verify_callback 不同,純文字訊息不需要 Nonce 防重放。 Args: user_id: Telegram user ID Raises: UserNotWhitelistedError: user_id 不在白名單內 """ # 2026-03-31 ogt: Phase 22 修復 - 補齊對話訊息的安全攔截方法 if not self.is_whitelisted(user_id): raise UserNotWhitelistedError( f"User {user_id} is not in the chat whitelist" ) async def verify_callback( self, user_id: int, callback_id: str, nonce: str | None = None, ) -> TelegramUser: """ 驗證 Telegram Callback 請求 安全檢查流程: 1. 白名單驗證 2. Nonce 防重放 (如果提供) Args: user_id: Telegram user ID callback_id: Callback Query ID nonce: 可選的 Nonce (防重放) Returns: TelegramUser: 驗證通過的使用者資訊 Raises: UserNotWhitelistedError: 使用者不在白名單 NonceReplayError: Nonce 重放攻擊 """ if not self._initialized: await self.initialize() # ======================================================================= # Step 0: ADR-093 Bound User Check(群組 CSRF 防護) # 若 Redis 中存有 cb_bind:{nonce},則嚴格比對 user_id; # 若 key 不存在(舊格式 nonce 或 Redis 暫時不可用)→ 跳過,繼續走 whitelist。 # 2026-04-24 Claude Sonnet 4.6 (ADR-093 WS3) # ======================================================================= if nonce: try: redis = self._nonce_store._redis_client if redis is not None and self._nonce_store._use_redis: bind_key = f"cb_bind:{nonce}" bound_raw = await redis.get(bind_key) if bound_raw is not None: bound_id = int(bound_raw) if user_id != bound_id: logger.warning( "telegram_callback_rejected_wrong_user", user_id=user_id, bound_user_id=bound_id, callback_id=callback_id, ) raise UserNotWhitelistedError( f"User {user_id} not bound to this approval (bound={bound_id})" ) except UserNotWhitelistedError: raise except Exception as exc: # Redis 暫時不可用 → 降級繼續走 whitelist logger.warning("callback_bound_check_failed", error=str(exc)) # ======================================================================= # Step 1: 白名單驗證 # ======================================================================= if not self.is_whitelisted(user_id): logger.warning( "telegram_callback_rejected_not_whitelisted", user_id=user_id, callback_id=callback_id, ) raise UserNotWhitelistedError( f"User {user_id} is not in the approval whitelist" ) # ======================================================================= # Step 2: Nonce 防重放 (如果提供) # ======================================================================= if nonce: is_valid = await self._nonce_store.check_and_consume(nonce) if not is_valid: logger.warning( "telegram_callback_rejected_nonce_replay", user_id=user_id, callback_id=callback_id, nonce=nonce[:16] + "...", ) raise NonceReplayError( f"Nonce replay detected: {nonce[:16]}..." ) # ======================================================================= # 驗證通過 # ======================================================================= logger.info( "telegram_callback_verified", user_id=user_id, callback_id=callback_id, nonce_checked=bool(nonce), ) return TelegramUser( user_id=user_id, is_whitelisted=True, ) async def verify_webhook_update( self, update_id: int, user_id: int, ) -> TelegramUser: """ 驗證 Telegram Webhook Update 用於驗證來自 Telegram Bot API 的 Update 請求 Args: update_id: Telegram Update ID (作為 Nonce) user_id: Telegram user ID Returns: TelegramUser: 驗證通過的使用者資訊 Raises: UserNotWhitelistedError: 使用者不在白名單 NonceReplayError: Update ID 重放 """ # 使用 update_id 作為 Nonce nonce = f"tg_update_{update_id}" return await self.verify_callback( user_id=user_id, callback_id=str(update_id), nonce=nonce, ) async def bind_callback_user(self, nonce: str, user_id: int) -> None: """ 非同步綁定 callback nonce 到指定 user_id(ADR-093 群組 CSRF 防護) 在 SRE 群組場景中,為指定人員綁定此 nonce, handler 驗證時會確認 caller == bound_user_id。 Redis 不可用時優雅降級(繼續走 whitelist check)。 Args: nonce: generate_callback_nonce 產生的 4-part nonce user_id: 被綁定的 Telegram user_id(僅此人可點按) """ # 2026-04-24 Claude Sonnet 4.6 (ADR-093 WS3): bound user binding try: redis = self._nonce_store._redis_client if redis is not None and self._nonce_store._use_redis: bind_key = f"cb_bind:{nonce}" await redis.setex(bind_key, 172800, str(user_id)) # TTL=48h logger.debug( "callback_user_bound", user_id=user_id, nonce_prefix=nonce[:16], ) except Exception as exc: # Redis unavailable → 降級無 binding,依然走 whitelist check logger.warning("callback_user_bind_failed", error=str(exc)) def generate_callback_nonce(self, approval_id: str, action: str) -> str: """ 產生 Callback Nonce (嵌入到 callback_data) 格式: {action}:{approval_id}:{timestamp}:{random} Args: approval_id: 簽核單 ID action: 操作類型 (approve/reject) Returns: str: 唯一的 Nonce """ import secrets import base64, uuid as _uuid timestamp = int(time.time()) random_part = secrets.token_hex(4) # UUID (36 chars) → base64url (22 chars) to keep nonce ≤ 63 bytes for all action names. # Longest known action: host_restart_service (20) + 22 + ts(10) + rand(8) + 3 colons = 63 bytes. try: short_id = base64.urlsafe_b64encode( _uuid.UUID(approval_id).bytes ).rstrip(b"=").decode() except (ValueError, AttributeError): # Not a valid UUID (e.g. legacy format) — use as-is, may exceed limit but won't crash short_id = approval_id nonce_body = f"{action}:{short_id}:{timestamp}:{random_part}" # ADR-116 P0-05: 附加 HMAC-SHA256[:16] 防偽造 # 2026-05-04 Claude Sonnet 4.6 (ADR-116): 若 CALLBACK_HMAC_SECRET 未設定則 warning + 降級 if settings.CALLBACK_HMAC_SECRET: hmac_hex = hmac.new( settings.CALLBACK_HMAC_SECRET.encode(), nonce_body.encode(), hashlib.sha256, ).hexdigest() nonce = f"{nonce_body}:{hmac_hex[:16]}" else: logger.warning( "callback_hmac_secret_missing", note="CALLBACK_HMAC_SECRET not configured; nonce generated without HMAC (transition mode)", ) nonce = nonce_body logger.debug( "callback_nonce_generated", approval_id=approval_id, action=action, nonce_len=len(nonce.encode()), hmac_appended=bool(settings.CALLBACK_HMAC_SECRET), ) return nonce def parse_callback_data(self, callback_data: str) -> dict: """ 解析 Callback Data 格式一 (寫操作,nonce 防重放): {action}:{approval_id}:{timestamp}:{random} 格式二 (讀操作,ADR-050): {action}:{incident_id} (2 parts) Args: callback_data: Telegram callback_data 字串 Returns: dict: 解析結果 - 格式一: {action, approval_id, timestamp, nonce, is_info_action: False} - 格式二: {action, incident_id, is_info_action: True} """ # 2026-04-01 Claude Code (ADR-050): 支援 read-only info actions (2-part format) # 2026-04-20 P0.1 ogt + Claude Opus 4.7: drift_view_page 納入 INFO_ACTIONS # payload 格式: drift_view_page:{report_id}_{page}(底線分隔,不跟冒號衝突) INFO_ACTIONS = {"detail", "reanalyze", "history", "drift_view_page"} parts = callback_data.split(":") if len(parts) == 2 and parts[0] in INFO_ACTIONS: return { "action": parts[0], "incident_id": parts[1], "approval_id": parts[1], # 相容舊版呼叫 "is_info_action": True, } # ADR-116 P0-05: 支援 5-part 格式(含 HMAC) # 2026-05-04 Claude Sonnet 4.6 (ADR-116): {action}:{short_id}:{ts}:{rand}:{hmac16} if len(parts) == 5: # 5-part:驗證 HMAC,然後還原成 4-part 格式繼續解析 embedded_hmac = parts[4] nonce_body = ":".join(parts[:4]) if settings.CALLBACK_HMAC_SECRET: expected_hmac = hmac.new( settings.CALLBACK_HMAC_SECRET.encode(), nonce_body.encode(), hashlib.sha256, ).hexdigest()[:16] if not hmac.compare_digest(embedded_hmac, expected_hmac): logger.warning( "callback_nonce_hmac_mismatch", nonce_prefix=callback_data[:20] + "...", ) raise ValueError(f"Callback nonce HMAC verification failed") else: logger.warning( "callback_hmac_secret_missing", note="CALLBACK_HMAC_SECRET not configured; skipping nonce HMAC verification (transition mode)", ) # 以 4-part nonce_body 繼續解析(以下邏輯共用) parts = parts[:4] elif len(parts) != 4: raise ValueError(f"Invalid callback_data format: {callback_data}") import base64, uuid as _uuid raw_id = parts[1] # Decode base64url-encoded UUID (22 chars) back to full UUID string. # Legacy nonces with full UUID (36 chars) pass through unchanged. if len(raw_id) == 22: try: decoded = _uuid.UUID(bytes=base64.urlsafe_b64decode(raw_id + "==")) approval_id = str(decoded) except Exception: approval_id = raw_id else: approval_id = raw_id return { "action": parts[0], "approval_id": approval_id, "timestamp": int(parts[2]), "nonce": callback_data, "is_info_action": False, } # ============================================================================= # Singleton # ============================================================================= _interceptor: TelegramSecurityInterceptor | None = None def get_security_interceptor() -> TelegramSecurityInterceptor: """取得全域 TelegramSecurityInterceptor 實例""" global _interceptor if _interceptor is None: _interceptor = TelegramSecurityInterceptor() return _interceptor