## P0-05 Callback Nonce 防偽造(ADR-116)
- security_interceptor.py:generate_callback_nonce() 新增 HMAC-SHA256[:16] 附加
- 新 5-part 格式:{action}:{short_id}:{ts}:{rand}:{hmac16}
- CALLBACK_HMAC_SECRET 未設定時降級 warning(向後相容)
- security_interceptor.py:parse_callback_data() 新增 5-part 分支 + HMAC 驗證
- config.py:新增 CALLBACK_HMAC_SECRET: str = Field(default="")
## P0-06 Webhook HMAC Replay 防護(ADR-116)
- security_interceptor.py:新增 check_webhook_nonce()(Service 層,get_redis 在此層合法)
- webhooks.py:verify_webhook_signature() 新增兩個可選 Header
- X-Webhook-Timestamp:±300s 窗口驗證(若提供)
- X-Webhook-Nonce:呼叫 check_webhook_nonce()(Redis NX dedup,fail open)
- 移除直接 get_redis import(leWOOOgo 積木化修正)
## P0-11 ollama:current_primary Redis key 遷移 Phase A(ADR-110)
- ollama_auto_recovery.py:_REDIS_PRIMARY_KEY = "platform:ollama:current_primary"
- 雙寫舊 key "ollama:current_primary"(Phase A 30 天)
- 讀取以新 key 為主,fallback 舊 key
## P0-12 consensus Redis key 加 project namespace Phase A
- consensus_engine.py:新增 _consensus_key() / _consensus_legacy_key() helper
- 新 key:{project_id}:consensus:{consensus_id}
- project_id=None 時 fallback __platform__:consensus:{consensus_id}
- Phase A 雙寫 + fallback 讀取,現有呼叫方零修改
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
631 lines
21 KiB
Python
631 lines
21 KiB
Python
"""
|
||
Security Interceptor - Telegram Gateway 守門員
|
||
===============================================
|
||
Phase 5.4.2: CISO 安全需求實作
|
||
|
||
Features:
|
||
- Telegram user_id 白名單驗證
|
||
- Nonce 防重放攻擊 (Redis + Memory fallback)
|
||
- HMAC 簽章二次驗證
|
||
|
||
安全鐵律:
|
||
- 只有白名單內的 user_id 可以簽核
|
||
- 每個 Nonce 只能使用一次
|
||
- 過期的 Nonce 自動清除
|
||
"""
|
||
|
||
import hashlib
|
||
import hmac
|
||
import time
|
||
from dataclasses import dataclass
|
||
from typing import Protocol, runtime_checkable
|
||
|
||
import structlog
|
||
|
||
from src.core.config import settings
|
||
|
||
logger = structlog.get_logger(__name__)
|
||
|
||
|
||
# =============================================================================
|
||
# Nonce Store - 防重放攻擊
|
||
# =============================================================================
|
||
|
||
class NonceStore:
|
||
"""
|
||
Nonce 儲存器 - 防止 Replay Attack
|
||
|
||
實作策略:
|
||
1. 優先使用 Redis (生產環境)
|
||
2. 降級使用 Memory (開發環境)
|
||
|
||
每個 Nonce 只能使用一次,過期後自動清除
|
||
"""
|
||
|
||
def __init__(self):
|
||
self._memory_store: dict[str, float] = {}
|
||
self._redis_client = None
|
||
self._use_redis = False
|
||
|
||
async def initialize(self) -> bool:
|
||
"""初始化 Redis 連線"""
|
||
try:
|
||
import redis.asyncio as redis
|
||
|
||
self._redis_client = redis.from_url(
|
||
settings.REDIS_URL,
|
||
decode_responses=True,
|
||
)
|
||
# 測試連線
|
||
await self._redis_client.ping()
|
||
self._use_redis = True
|
||
logger.info("nonce_store_redis_initialized")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.warning(
|
||
"nonce_store_redis_failed_fallback_memory",
|
||
error=str(e),
|
||
)
|
||
self._use_redis = False
|
||
return False
|
||
|
||
async def check_and_consume(self, nonce: str) -> bool:
|
||
"""
|
||
檢查 Nonce 是否有效,若有效則消費 (標記為已使用)
|
||
|
||
Args:
|
||
nonce: 唯一識別碼
|
||
|
||
Returns:
|
||
bool: True = 有效 (首次使用), False = 無效 (重複或過期)
|
||
"""
|
||
if self._use_redis:
|
||
return await self._check_redis(nonce)
|
||
else:
|
||
return self._check_memory(nonce)
|
||
|
||
async def _check_redis(self, nonce: str) -> bool:
|
||
"""Redis 實作: 使用 SETNX + TTL"""
|
||
key = f"awoooi:nonce:{nonce}"
|
||
ttl = settings.WEBHOOK_NONCE_TTL
|
||
|
||
# SETNX: 只有 key 不存在時才設定成功
|
||
result = await self._redis_client.set(
|
||
key,
|
||
"1",
|
||
nx=True, # Only set if not exists
|
||
ex=ttl, # Expire after TTL seconds
|
||
)
|
||
|
||
if result:
|
||
logger.info("nonce_consumed_redis", nonce=nonce[:16] + "...")
|
||
return True
|
||
else:
|
||
logger.warning("nonce_replay_detected_redis", nonce=nonce[:16] + "...")
|
||
return False
|
||
|
||
def _check_memory(self, nonce: str) -> bool:
|
||
"""Memory 實作: 使用 dict + timestamp"""
|
||
now = time.time()
|
||
ttl = settings.WEBHOOK_NONCE_TTL
|
||
|
||
# 清理過期 Nonce
|
||
self._cleanup_expired(now, ttl)
|
||
|
||
# 檢查是否已存在
|
||
if nonce in self._memory_store:
|
||
logger.warning("nonce_replay_detected_memory", nonce=nonce[:16] + "...")
|
||
return False
|
||
|
||
# 記錄 Nonce
|
||
self._memory_store[nonce] = now
|
||
logger.info("nonce_consumed_memory", nonce=nonce[:16] + "...")
|
||
return True
|
||
|
||
def _cleanup_expired(self, now: float, ttl: int) -> None:
|
||
"""清理過期的 Nonce (Memory 模式)"""
|
||
expired = [
|
||
nonce for nonce, ts in self._memory_store.items()
|
||
if now - ts > ttl
|
||
]
|
||
for nonce in expired:
|
||
del self._memory_store[nonce]
|
||
|
||
if expired:
|
||
logger.debug("nonce_cleanup", removed_count=len(expired))
|
||
|
||
|
||
async def check_webhook_nonce(nonce: str, ttl: int = 600) -> bool:
|
||
"""
|
||
Webhook replay 防護:用 Redis NX 記錄 nonce(TTL=600s),重複使用回傳 False。
|
||
|
||
Service 層 helper,供 Router 層(webhooks.py)呼叫;禁止 Router 直接用 get_redis。
|
||
Redis 不可用時 fail open(回傳 True + 記錄 warning)。
|
||
|
||
P0-06 修正(ADR-116,2026-05-04 ogt + Claude Sonnet 4.6)
|
||
"""
|
||
from src.core.redis_client import get_redis
|
||
nonce_key = f"webhook:nonce:{nonce}"
|
||
try:
|
||
redis = get_redis()
|
||
stored = await redis.set(nonce_key, "1", nx=True, ex=ttl)
|
||
if not stored:
|
||
logger.warning("webhook_nonce_replay_detected", nonce_prefix=nonce[:16] + "...")
|
||
return False
|
||
logger.debug("webhook_nonce_registered", nonce_key=nonce_key)
|
||
return True
|
||
except Exception as exc:
|
||
logger.warning(
|
||
"webhook_nonce_redis_unavailable",
|
||
error=str(exc),
|
||
note="fail open: request allowed despite Redis unavailability",
|
||
)
|
||
return True
|
||
|
||
|
||
# =============================================================================
|
||
# Telegram Security Interceptor
|
||
# =============================================================================
|
||
|
||
@dataclass
|
||
class TelegramUser:
|
||
"""Telegram 使用者資訊"""
|
||
user_id: int
|
||
username: str | None = None
|
||
first_name: str | None = None
|
||
is_whitelisted: bool = False
|
||
|
||
|
||
class SecurityInterceptorError(Exception):
|
||
"""Security Interceptor 錯誤"""
|
||
pass
|
||
|
||
|
||
class UserNotWhitelistedError(SecurityInterceptorError):
|
||
"""使用者不在白名單內"""
|
||
pass
|
||
|
||
|
||
class NonceReplayError(SecurityInterceptorError):
|
||
"""Nonce 重放攻擊"""
|
||
pass
|
||
|
||
|
||
class SignatureVerificationError(SecurityInterceptorError):
|
||
"""簽章驗證失敗"""
|
||
pass
|
||
|
||
|
||
# =============================================================================
|
||
# Protocol Interface (Phase 17 P1 - 紅區治理)
|
||
# =============================================================================
|
||
|
||
@runtime_checkable
|
||
class ITelegramSecurityInterceptor(Protocol):
|
||
"""
|
||
TelegramSecurityInterceptor 介面定義
|
||
|
||
用途:
|
||
- 依賴注入 (DI) 時的型別約束
|
||
- 測試時 Mock 的型別檢查
|
||
- 符合 leWOOOgo 積木化規範
|
||
|
||
Tier 3 紅區服務: 修改需首席架構師簽核
|
||
|
||
@see feedback_lewooogo_modular_enforcement.md
|
||
@see docs/RED_ZONES.md
|
||
"""
|
||
|
||
async def initialize(self) -> bool:
|
||
"""初始化攔截器"""
|
||
...
|
||
|
||
def is_whitelisted(self, user_id: int) -> bool:
|
||
"""檢查 user_id 是否在白名單內"""
|
||
...
|
||
|
||
async def verify_callback(
|
||
self,
|
||
user_id: int,
|
||
callback_id: str,
|
||
nonce: str | None = None,
|
||
) -> TelegramUser:
|
||
"""驗證 Telegram Callback 請求"""
|
||
...
|
||
|
||
@property
|
||
def whitelist(self) -> list[int]:
|
||
"""取得白名單 user_id 列表"""
|
||
...
|
||
|
||
|
||
class TelegramSecurityInterceptor:
|
||
"""
|
||
Telegram 安全攔截器
|
||
|
||
CISO 安全要求:
|
||
1. user_id 白名單驗證 (只有統帥可以簽核)
|
||
2. Nonce 防重放攻擊
|
||
3. 可選: Telegram Bot Token HMAC 驗證
|
||
|
||
所有簽核請求必須通過此攔截器
|
||
"""
|
||
|
||
def __init__(self):
|
||
self._nonce_store = NonceStore()
|
||
self._initialized = False
|
||
|
||
async def initialize(self) -> bool:
|
||
"""初始化攔截器"""
|
||
await self._nonce_store.initialize()
|
||
self._initialized = True
|
||
logger.info("telegram_security_interceptor_initialized")
|
||
return True
|
||
|
||
@property
|
||
def whitelist(self) -> list[int]:
|
||
"""取得白名單 user_id 列表"""
|
||
return settings.get_tg_user_whitelist()
|
||
|
||
def is_whitelisted(self, user_id: int) -> bool:
|
||
"""
|
||
檢查 user_id 是否在白名單內
|
||
|
||
Args:
|
||
user_id: Telegram user ID
|
||
|
||
Returns:
|
||
bool: True = 在白名單內
|
||
"""
|
||
# 空白名單 = 禁止所有人
|
||
if not self.whitelist:
|
||
logger.warning(
|
||
"telegram_whitelist_empty",
|
||
user_id=user_id,
|
||
message="Whitelist is empty, all users denied",
|
||
)
|
||
return False
|
||
|
||
is_allowed = user_id in self.whitelist
|
||
|
||
if is_allowed:
|
||
logger.info("telegram_user_whitelisted", user_id=user_id)
|
||
else:
|
||
logger.warning(
|
||
"telegram_user_not_whitelisted",
|
||
user_id=user_id,
|
||
whitelist=self.whitelist,
|
||
)
|
||
|
||
return is_allowed
|
||
|
||
async def intercept_telegram(self, user_id: int) -> None:
|
||
"""
|
||
攔截 Telegram 文字訊息 (ADR-044 Phase 22)
|
||
|
||
用於 _handle_chat_message 的白名單驗證。
|
||
與 verify_callback 不同,純文字訊息不需要 Nonce 防重放。
|
||
|
||
Args:
|
||
user_id: Telegram user ID
|
||
|
||
Raises:
|
||
UserNotWhitelistedError: user_id 不在白名單內
|
||
"""
|
||
# 2026-03-31 ogt: Phase 22 修復 - 補齊對話訊息的安全攔截方法
|
||
if not self.is_whitelisted(user_id):
|
||
raise UserNotWhitelistedError(
|
||
f"User {user_id} is not in the chat whitelist"
|
||
)
|
||
|
||
async def verify_callback(
|
||
self,
|
||
user_id: int,
|
||
callback_id: str,
|
||
nonce: str | None = None,
|
||
) -> TelegramUser:
|
||
"""
|
||
驗證 Telegram Callback 請求
|
||
|
||
安全檢查流程:
|
||
1. 白名單驗證
|
||
2. Nonce 防重放 (如果提供)
|
||
|
||
Args:
|
||
user_id: Telegram user ID
|
||
callback_id: Callback Query ID
|
||
nonce: 可選的 Nonce (防重放)
|
||
|
||
Returns:
|
||
TelegramUser: 驗證通過的使用者資訊
|
||
|
||
Raises:
|
||
UserNotWhitelistedError: 使用者不在白名單
|
||
NonceReplayError: Nonce 重放攻擊
|
||
"""
|
||
if not self._initialized:
|
||
await self.initialize()
|
||
|
||
# =======================================================================
|
||
# Step 0: ADR-093 Bound User Check(群組 CSRF 防護)
|
||
# 若 Redis 中存有 cb_bind:{nonce},則嚴格比對 user_id;
|
||
# 若 key 不存在(舊格式 nonce 或 Redis 暫時不可用)→ 跳過,繼續走 whitelist。
|
||
# 2026-04-24 Claude Sonnet 4.6 (ADR-093 WS3)
|
||
# =======================================================================
|
||
if nonce:
|
||
try:
|
||
redis = self._nonce_store._redis_client
|
||
if redis is not None and self._nonce_store._use_redis:
|
||
bind_key = f"cb_bind:{nonce}"
|
||
bound_raw = await redis.get(bind_key)
|
||
if bound_raw is not None:
|
||
bound_id = int(bound_raw)
|
||
if user_id != bound_id:
|
||
logger.warning(
|
||
"telegram_callback_rejected_wrong_user",
|
||
user_id=user_id,
|
||
bound_user_id=bound_id,
|
||
callback_id=callback_id,
|
||
)
|
||
raise UserNotWhitelistedError(
|
||
f"User {user_id} not bound to this approval (bound={bound_id})"
|
||
)
|
||
except UserNotWhitelistedError:
|
||
raise
|
||
except Exception as exc:
|
||
# Redis 暫時不可用 → 降級繼續走 whitelist
|
||
logger.warning("callback_bound_check_failed", error=str(exc))
|
||
|
||
# =======================================================================
|
||
# Step 1: 白名單驗證
|
||
# =======================================================================
|
||
if not self.is_whitelisted(user_id):
|
||
logger.warning(
|
||
"telegram_callback_rejected_not_whitelisted",
|
||
user_id=user_id,
|
||
callback_id=callback_id,
|
||
)
|
||
raise UserNotWhitelistedError(
|
||
f"User {user_id} is not in the approval whitelist"
|
||
)
|
||
|
||
# =======================================================================
|
||
# Step 2: Nonce 防重放 (如果提供)
|
||
# =======================================================================
|
||
if nonce:
|
||
is_valid = await self._nonce_store.check_and_consume(nonce)
|
||
if not is_valid:
|
||
logger.warning(
|
||
"telegram_callback_rejected_nonce_replay",
|
||
user_id=user_id,
|
||
callback_id=callback_id,
|
||
nonce=nonce[:16] + "...",
|
||
)
|
||
raise NonceReplayError(
|
||
f"Nonce replay detected: {nonce[:16]}..."
|
||
)
|
||
|
||
# =======================================================================
|
||
# 驗證通過
|
||
# =======================================================================
|
||
logger.info(
|
||
"telegram_callback_verified",
|
||
user_id=user_id,
|
||
callback_id=callback_id,
|
||
nonce_checked=bool(nonce),
|
||
)
|
||
|
||
return TelegramUser(
|
||
user_id=user_id,
|
||
is_whitelisted=True,
|
||
)
|
||
|
||
async def verify_webhook_update(
|
||
self,
|
||
update_id: int,
|
||
user_id: int,
|
||
) -> TelegramUser:
|
||
"""
|
||
驗證 Telegram Webhook Update
|
||
|
||
用於驗證來自 Telegram Bot API 的 Update 請求
|
||
|
||
Args:
|
||
update_id: Telegram Update ID (作為 Nonce)
|
||
user_id: Telegram user ID
|
||
|
||
Returns:
|
||
TelegramUser: 驗證通過的使用者資訊
|
||
|
||
Raises:
|
||
UserNotWhitelistedError: 使用者不在白名單
|
||
NonceReplayError: Update ID 重放
|
||
"""
|
||
# 使用 update_id 作為 Nonce
|
||
nonce = f"tg_update_{update_id}"
|
||
|
||
return await self.verify_callback(
|
||
user_id=user_id,
|
||
callback_id=str(update_id),
|
||
nonce=nonce,
|
||
)
|
||
|
||
async def bind_callback_user(self, nonce: str, user_id: int) -> None:
|
||
"""
|
||
非同步綁定 callback nonce 到指定 user_id(ADR-093 群組 CSRF 防護)
|
||
|
||
在 SRE 群組場景中,為指定人員綁定此 nonce,
|
||
handler 驗證時會確認 caller == bound_user_id。
|
||
Redis 不可用時優雅降級(繼續走 whitelist check)。
|
||
|
||
Args:
|
||
nonce: generate_callback_nonce 產生的 4-part nonce
|
||
user_id: 被綁定的 Telegram user_id(僅此人可點按)
|
||
"""
|
||
# 2026-04-24 Claude Sonnet 4.6 (ADR-093 WS3): bound user binding
|
||
try:
|
||
redis = self._nonce_store._redis_client
|
||
if redis is not None and self._nonce_store._use_redis:
|
||
bind_key = f"cb_bind:{nonce}"
|
||
await redis.setex(bind_key, 172800, str(user_id)) # TTL=48h
|
||
logger.debug(
|
||
"callback_user_bound",
|
||
user_id=user_id,
|
||
nonce_prefix=nonce[:16],
|
||
)
|
||
except Exception as exc:
|
||
# Redis unavailable → 降級無 binding,依然走 whitelist check
|
||
logger.warning("callback_user_bind_failed", error=str(exc))
|
||
|
||
def generate_callback_nonce(self, approval_id: str, action: str) -> str:
|
||
"""
|
||
產生 Callback Nonce (嵌入到 callback_data)
|
||
|
||
格式: {action}:{approval_id}:{timestamp}:{random}
|
||
|
||
Args:
|
||
approval_id: 簽核單 ID
|
||
action: 操作類型 (approve/reject)
|
||
|
||
Returns:
|
||
str: 唯一的 Nonce
|
||
"""
|
||
import secrets
|
||
|
||
import base64, uuid as _uuid
|
||
|
||
timestamp = int(time.time())
|
||
random_part = secrets.token_hex(4)
|
||
|
||
# UUID (36 chars) → base64url (22 chars) to keep nonce ≤ 63 bytes for all action names.
|
||
# Longest known action: host_restart_service (20) + 22 + ts(10) + rand(8) + 3 colons = 63 bytes.
|
||
try:
|
||
short_id = base64.urlsafe_b64encode(
|
||
_uuid.UUID(approval_id).bytes
|
||
).rstrip(b"=").decode()
|
||
except (ValueError, AttributeError):
|
||
# Not a valid UUID (e.g. legacy format) — use as-is, may exceed limit but won't crash
|
||
short_id = approval_id
|
||
|
||
nonce_body = f"{action}:{short_id}:{timestamp}:{random_part}"
|
||
|
||
# ADR-116 P0-05: 附加 HMAC-SHA256[:16] 防偽造
|
||
# 2026-05-04 Claude Sonnet 4.6 (ADR-116): 若 CALLBACK_HMAC_SECRET 未設定則 warning + 降級
|
||
if settings.CALLBACK_HMAC_SECRET:
|
||
hmac_hex = hmac.new(
|
||
settings.CALLBACK_HMAC_SECRET.encode(),
|
||
nonce_body.encode(),
|
||
hashlib.sha256,
|
||
).hexdigest()
|
||
nonce = f"{nonce_body}:{hmac_hex[:16]}"
|
||
else:
|
||
logger.warning(
|
||
"callback_hmac_secret_missing",
|
||
note="CALLBACK_HMAC_SECRET not configured; nonce generated without HMAC (transition mode)",
|
||
)
|
||
nonce = nonce_body
|
||
|
||
logger.debug(
|
||
"callback_nonce_generated",
|
||
approval_id=approval_id,
|
||
action=action,
|
||
nonce_len=len(nonce.encode()),
|
||
hmac_appended=bool(settings.CALLBACK_HMAC_SECRET),
|
||
)
|
||
|
||
return nonce
|
||
|
||
def parse_callback_data(self, callback_data: str) -> dict:
|
||
"""
|
||
解析 Callback Data
|
||
|
||
格式一 (寫操作,nonce 防重放): {action}:{approval_id}:{timestamp}:{random}
|
||
格式二 (讀操作,ADR-050): {action}:{incident_id} (2 parts)
|
||
|
||
Args:
|
||
callback_data: Telegram callback_data 字串
|
||
|
||
Returns:
|
||
dict: 解析結果
|
||
- 格式一: {action, approval_id, timestamp, nonce, is_info_action: False}
|
||
- 格式二: {action, incident_id, is_info_action: True}
|
||
"""
|
||
# 2026-04-01 Claude Code (ADR-050): 支援 read-only info actions (2-part format)
|
||
# 2026-04-20 P0.1 ogt + Claude Opus 4.7: drift_view_page 納入 INFO_ACTIONS
|
||
# payload 格式: drift_view_page:{report_id}_{page}(底線分隔,不跟冒號衝突)
|
||
INFO_ACTIONS = {"detail", "reanalyze", "history", "drift_view_page"}
|
||
parts = callback_data.split(":")
|
||
if len(parts) == 2 and parts[0] in INFO_ACTIONS:
|
||
return {
|
||
"action": parts[0],
|
||
"incident_id": parts[1],
|
||
"approval_id": parts[1], # 相容舊版呼叫
|
||
"is_info_action": True,
|
||
}
|
||
|
||
# ADR-116 P0-05: 支援 5-part 格式(含 HMAC)
|
||
# 2026-05-04 Claude Sonnet 4.6 (ADR-116): {action}:{short_id}:{ts}:{rand}:{hmac16}
|
||
if len(parts) == 5:
|
||
# 5-part:驗證 HMAC,然後還原成 4-part 格式繼續解析
|
||
embedded_hmac = parts[4]
|
||
nonce_body = ":".join(parts[:4])
|
||
if settings.CALLBACK_HMAC_SECRET:
|
||
expected_hmac = hmac.new(
|
||
settings.CALLBACK_HMAC_SECRET.encode(),
|
||
nonce_body.encode(),
|
||
hashlib.sha256,
|
||
).hexdigest()[:16]
|
||
if not hmac.compare_digest(embedded_hmac, expected_hmac):
|
||
logger.warning(
|
||
"callback_nonce_hmac_mismatch",
|
||
nonce_prefix=callback_data[:20] + "...",
|
||
)
|
||
raise ValueError(f"Callback nonce HMAC verification failed")
|
||
else:
|
||
logger.warning(
|
||
"callback_hmac_secret_missing",
|
||
note="CALLBACK_HMAC_SECRET not configured; skipping nonce HMAC verification (transition mode)",
|
||
)
|
||
# 以 4-part nonce_body 繼續解析(以下邏輯共用)
|
||
parts = parts[:4]
|
||
elif len(parts) != 4:
|
||
raise ValueError(f"Invalid callback_data format: {callback_data}")
|
||
|
||
import base64, uuid as _uuid
|
||
|
||
raw_id = parts[1]
|
||
# Decode base64url-encoded UUID (22 chars) back to full UUID string.
|
||
# Legacy nonces with full UUID (36 chars) pass through unchanged.
|
||
if len(raw_id) == 22:
|
||
try:
|
||
decoded = _uuid.UUID(bytes=base64.urlsafe_b64decode(raw_id + "=="))
|
||
approval_id = str(decoded)
|
||
except Exception:
|
||
approval_id = raw_id
|
||
else:
|
||
approval_id = raw_id
|
||
|
||
return {
|
||
"action": parts[0],
|
||
"approval_id": approval_id,
|
||
"timestamp": int(parts[2]),
|
||
"nonce": callback_data,
|
||
"is_info_action": False,
|
||
}
|
||
|
||
|
||
# =============================================================================
|
||
# Singleton
|
||
# =============================================================================
|
||
|
||
_interceptor: TelegramSecurityInterceptor | None = None
|
||
|
||
|
||
def get_security_interceptor() -> TelegramSecurityInterceptor:
|
||
"""取得全域 TelegramSecurityInterceptor 實例"""
|
||
global _interceptor
|
||
if _interceptor is None:
|
||
_interceptor = TelegramSecurityInterceptor()
|
||
return _interceptor
|