Files
awoooi/apps/api/src/services/security_interceptor.py
OG T 0bf0a1cea2
All checks were successful
CD Pipeline (Dev) / build-and-deploy-dev (push) Successful in 2m39s
CD Pipeline / build-and-deploy (push) Successful in 7m1s
E2E Health Check / e2e-health (push) Successful in 17s
feat(telegram): ADR-050 P1 - 6鍵 Inline Keyboard + info actions 骨架
第一行: [ 批准] [ 拒絕] [🔕 靜默] (nonce 防重放)
第二行: [📋 詳情] [🔄 重診] [📊 歷史] (read-only, action:incident_id 格式)

- security_interceptor: parse_callback_data 支援 2-part info action 格式
- telegram_gateway: _build_inline_keyboard 新增 incident_id 參數
- telegram.py: info_action 短路,不觸發 DB 操作

P2 待實作: detail/reanalyze/history 回傳實際資料 (目前回傳「功能開發中」)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-01 18:34:26 +08:00

473 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Security Interceptor - Telegram Gateway 守門員
===============================================
Phase 5.4.2: CISO 安全需求實作
Features:
- Telegram user_id 白名單驗證
- Nonce 防重放攻擊 (Redis + Memory fallback)
- HMAC 簽章二次驗證
安全鐵律:
- 只有白名單內的 user_id 可以簽核
- 每個 Nonce 只能使用一次
- 過期的 Nonce 自動清除
"""
import time
from dataclasses import dataclass
from typing import Protocol, runtime_checkable
import structlog
from src.core.config import settings
logger = structlog.get_logger(__name__)
# =============================================================================
# Nonce Store - 防重放攻擊
# =============================================================================
class NonceStore:
"""
Nonce 儲存器 - 防止 Replay Attack
實作策略:
1. 優先使用 Redis (生產環境)
2. 降級使用 Memory (開發環境)
每個 Nonce 只能使用一次,過期後自動清除
"""
def __init__(self):
self._memory_store: dict[str, float] = {}
self._redis_client = None
self._use_redis = False
async def initialize(self) -> bool:
"""初始化 Redis 連線"""
try:
import redis.asyncio as redis
self._redis_client = redis.from_url(
settings.REDIS_URL,
decode_responses=True,
)
# 測試連線
await self._redis_client.ping()
self._use_redis = True
logger.info("nonce_store_redis_initialized")
return True
except Exception as e:
logger.warning(
"nonce_store_redis_failed_fallback_memory",
error=str(e),
)
self._use_redis = False
return False
async def check_and_consume(self, nonce: str) -> bool:
"""
檢查 Nonce 是否有效,若有效則消費 (標記為已使用)
Args:
nonce: 唯一識別碼
Returns:
bool: True = 有效 (首次使用), False = 無效 (重複或過期)
"""
if self._use_redis:
return await self._check_redis(nonce)
else:
return self._check_memory(nonce)
async def _check_redis(self, nonce: str) -> bool:
"""Redis 實作: 使用 SETNX + TTL"""
key = f"awoooi:nonce:{nonce}"
ttl = settings.WEBHOOK_NONCE_TTL
# SETNX: 只有 key 不存在時才設定成功
result = await self._redis_client.set(
key,
"1",
nx=True, # Only set if not exists
ex=ttl, # Expire after TTL seconds
)
if result:
logger.info("nonce_consumed_redis", nonce=nonce[:16] + "...")
return True
else:
logger.warning("nonce_replay_detected_redis", nonce=nonce[:16] + "...")
return False
def _check_memory(self, nonce: str) -> bool:
"""Memory 實作: 使用 dict + timestamp"""
now = time.time()
ttl = settings.WEBHOOK_NONCE_TTL
# 清理過期 Nonce
self._cleanup_expired(now, ttl)
# 檢查是否已存在
if nonce in self._memory_store:
logger.warning("nonce_replay_detected_memory", nonce=nonce[:16] + "...")
return False
# 記錄 Nonce
self._memory_store[nonce] = now
logger.info("nonce_consumed_memory", nonce=nonce[:16] + "...")
return True
def _cleanup_expired(self, now: float, ttl: int) -> None:
"""清理過期的 Nonce (Memory 模式)"""
expired = [
nonce for nonce, ts in self._memory_store.items()
if now - ts > ttl
]
for nonce in expired:
del self._memory_store[nonce]
if expired:
logger.debug("nonce_cleanup", removed_count=len(expired))
# =============================================================================
# Telegram Security Interceptor
# =============================================================================
@dataclass
class TelegramUser:
"""Telegram 使用者資訊"""
user_id: int
username: str | None = None
first_name: str | None = None
is_whitelisted: bool = False
class SecurityInterceptorError(Exception):
"""Security Interceptor 錯誤"""
pass
class UserNotWhitelistedError(SecurityInterceptorError):
"""使用者不在白名單內"""
pass
class NonceReplayError(SecurityInterceptorError):
"""Nonce 重放攻擊"""
pass
class SignatureVerificationError(SecurityInterceptorError):
"""簽章驗證失敗"""
pass
# =============================================================================
# Protocol Interface (Phase 17 P1 - 紅區治理)
# =============================================================================
@runtime_checkable
class ITelegramSecurityInterceptor(Protocol):
"""
TelegramSecurityInterceptor 介面定義
用途:
- 依賴注入 (DI) 時的型別約束
- 測試時 Mock 的型別檢查
- 符合 leWOOOgo 積木化規範
Tier 3 紅區服務: 修改需首席架構師簽核
@see feedback_lewooogo_modular_enforcement.md
@see docs/RED_ZONES.md
"""
async def initialize(self) -> bool:
"""初始化攔截器"""
...
def is_whitelisted(self, user_id: int) -> bool:
"""檢查 user_id 是否在白名單內"""
...
async def verify_callback(
self,
user_id: int,
callback_id: str,
nonce: str | None = None,
) -> TelegramUser:
"""驗證 Telegram Callback 請求"""
...
@property
def whitelist(self) -> list[int]:
"""取得白名單 user_id 列表"""
...
class TelegramSecurityInterceptor:
"""
Telegram 安全攔截器
CISO 安全要求:
1. user_id 白名單驗證 (只有統帥可以簽核)
2. Nonce 防重放攻擊
3. 可選: Telegram Bot Token HMAC 驗證
所有簽核請求必須通過此攔截器
"""
def __init__(self):
self._nonce_store = NonceStore()
self._initialized = False
async def initialize(self) -> bool:
"""初始化攔截器"""
await self._nonce_store.initialize()
self._initialized = True
logger.info("telegram_security_interceptor_initialized")
return True
@property
def whitelist(self) -> list[int]:
"""取得白名單 user_id 列表"""
return settings.OPENCLAW_TG_USER_WHITELIST
def is_whitelisted(self, user_id: int) -> bool:
"""
檢查 user_id 是否在白名單內
Args:
user_id: Telegram user ID
Returns:
bool: True = 在白名單內
"""
# 空白名單 = 禁止所有人
if not self.whitelist:
logger.warning(
"telegram_whitelist_empty",
user_id=user_id,
message="Whitelist is empty, all users denied",
)
return False
is_allowed = user_id in self.whitelist
if is_allowed:
logger.info("telegram_user_whitelisted", user_id=user_id)
else:
logger.warning(
"telegram_user_not_whitelisted",
user_id=user_id,
whitelist=self.whitelist,
)
return is_allowed
async def intercept_telegram(self, user_id: int) -> None:
"""
攔截 Telegram 文字訊息 (ADR-044 Phase 22)
用於 _handle_chat_message 的白名單驗證。
與 verify_callback 不同,純文字訊息不需要 Nonce 防重放。
Args:
user_id: Telegram user ID
Raises:
UserNotWhitelistedError: user_id 不在白名單內
"""
# 2026-03-31 ogt: Phase 22 修復 - 補齊對話訊息的安全攔截方法
if not self.is_whitelisted(user_id):
raise UserNotWhitelistedError(
f"User {user_id} is not in the chat whitelist"
)
async def verify_callback(
self,
user_id: int,
callback_id: str,
nonce: str | None = None,
) -> TelegramUser:
"""
驗證 Telegram Callback 請求
安全檢查流程:
1. 白名單驗證
2. Nonce 防重放 (如果提供)
Args:
user_id: Telegram user ID
callback_id: Callback Query ID
nonce: 可選的 Nonce (防重放)
Returns:
TelegramUser: 驗證通過的使用者資訊
Raises:
UserNotWhitelistedError: 使用者不在白名單
NonceReplayError: Nonce 重放攻擊
"""
if not self._initialized:
await self.initialize()
# =======================================================================
# Step 1: 白名單驗證
# =======================================================================
if not self.is_whitelisted(user_id):
logger.warning(
"telegram_callback_rejected_not_whitelisted",
user_id=user_id,
callback_id=callback_id,
)
raise UserNotWhitelistedError(
f"User {user_id} is not in the approval whitelist"
)
# =======================================================================
# Step 2: Nonce 防重放 (如果提供)
# =======================================================================
if nonce:
is_valid = await self._nonce_store.check_and_consume(nonce)
if not is_valid:
logger.warning(
"telegram_callback_rejected_nonce_replay",
user_id=user_id,
callback_id=callback_id,
nonce=nonce[:16] + "...",
)
raise NonceReplayError(
f"Nonce replay detected: {nonce[:16]}..."
)
# =======================================================================
# 驗證通過
# =======================================================================
logger.info(
"telegram_callback_verified",
user_id=user_id,
callback_id=callback_id,
nonce_checked=bool(nonce),
)
return TelegramUser(
user_id=user_id,
is_whitelisted=True,
)
async def verify_webhook_update(
self,
update_id: int,
user_id: int,
) -> TelegramUser:
"""
驗證 Telegram Webhook Update
用於驗證來自 Telegram Bot API 的 Update 請求
Args:
update_id: Telegram Update ID (作為 Nonce)
user_id: Telegram user ID
Returns:
TelegramUser: 驗證通過的使用者資訊
Raises:
UserNotWhitelistedError: 使用者不在白名單
NonceReplayError: Update ID 重放
"""
# 使用 update_id 作為 Nonce
nonce = f"tg_update_{update_id}"
return await self.verify_callback(
user_id=user_id,
callback_id=str(update_id),
nonce=nonce,
)
def generate_callback_nonce(self, approval_id: str, action: str) -> str:
"""
產生 Callback Nonce (嵌入到 callback_data)
格式: {action}:{approval_id}:{timestamp}:{random}
Args:
approval_id: 簽核單 ID
action: 操作類型 (approve/reject)
Returns:
str: 唯一的 Nonce
"""
import secrets
timestamp = int(time.time())
random_part = secrets.token_hex(4)
nonce = f"{action}:{approval_id}:{timestamp}:{random_part}"
logger.debug(
"callback_nonce_generated",
approval_id=approval_id,
action=action,
)
return nonce
def parse_callback_data(self, callback_data: str) -> dict:
"""
解析 Callback Data
格式一 (寫操作nonce 防重放): {action}:{approval_id}:{timestamp}:{random}
格式二 (讀操作ADR-050): {action}:{incident_id} (2 parts)
Args:
callback_data: Telegram callback_data 字串
Returns:
dict: 解析結果
- 格式一: {action, approval_id, timestamp, nonce, is_info_action: False}
- 格式二: {action, incident_id, is_info_action: True}
"""
# 2026-04-01 Claude Code (ADR-050): 支援 read-only info actions (2-part format)
INFO_ACTIONS = {"detail", "reanalyze", "history"}
parts = callback_data.split(":")
if len(parts) == 2 and parts[0] in INFO_ACTIONS:
return {
"action": parts[0],
"incident_id": parts[1],
"approval_id": parts[1], # 相容舊版呼叫
"is_info_action": True,
}
if len(parts) != 4:
raise ValueError(f"Invalid callback_data format: {callback_data}")
return {
"action": parts[0],
"approval_id": parts[1],
"timestamp": int(parts[2]),
"nonce": callback_data, # 整個字串作為 nonce
"is_info_action": False,
}
# =============================================================================
# Singleton
# =============================================================================
_interceptor: TelegramSecurityInterceptor | None = None
def get_security_interceptor() -> TelegramSecurityInterceptor:
"""取得全域 TelegramSecurityInterceptor 實例"""
global _interceptor
if _interceptor is None:
_interceptor = TelegramSecurityInterceptor()
return _interceptor