""" Privacy Shield - BFF 脫敏攔截器 Phase 2.4: 資料清理引擎 在送給 LLM 之前,自動脫敏機敏資料: - IPv4/IPv6 地址 → [IP_1], [IP_2], ... - Email 信箱 → [EMAIL_1], [EMAIL_2], ... - UUIDs/Tokens → [SECRET_1], [SECRET_2], ... - API Keys (sk-*) → [SECRET_1], [SECRET_2], ... 特色:一致性雜湊 (Consistent Hashing) - 同一段 Log 裡的同一個 IP,會被替換成同一個標籤 - AI 仍能辨識「這兩個 IP 是同一個」 """ import re from collections.abc import Callable from dataclasses import dataclass, field from enum import Enum # ==================== Types ==================== class SensitiveDataType(str, Enum): """機敏資料類型""" IP_ADDRESS = "IP" EMAIL = "EMAIL" SECRET = "SECRET" # UUID, Token, API Key CREDIT_CARD = "CC" # 未來擴充 PHONE = "PHONE" # 未來擴充 ID_NUMBER = "ID" # 未來擴充 @dataclass class RedactionMatch: """單次脫敏匹配""" original: str redacted: str data_type: SensitiveDataType start: int end: int @dataclass class RedactionResult: """脫敏結果""" original_text: str redacted_text: str matches: list[RedactionMatch] mapping: dict[str, str] # 原始值 → 脫敏標籤 (可逆映射) @property def has_sensitive_data(self) -> bool: return len(self.matches) > 0 @property def stats(self) -> dict[str, int]: """各類型脫敏統計""" stats: dict[str, int] = {} for match in self.matches: key = match.data_type.value stats[key] = stats.get(key, 0) + 1 return stats # ==================== Regex Patterns ==================== # IPv4: 192.168.1.1 PATTERN_IPV4 = re.compile( r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}' r'(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b' ) # IPv6: 2001:0db8:85a3::8a2e:0370:7334 (簡化版) PATTERN_IPV6 = re.compile( r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b|' # 完整格式 r'\b(?:[0-9a-fA-F]{1,4}:){1,7}:\b|' # 壓縮格式 r'\b(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}\b|' r'\b(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}\b|' r'\b(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}\b|' r'\b(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}\b|' r'\b(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}\b|' r'\b[0-9a-fA-F]{1,4}:(?::[0-9a-fA-F]{1,4}){1,6}\b|' r'\b::(?:[0-9a-fA-F]{1,4}:){0,5}[0-9a-fA-F]{1,4}\b|' r'\b::1\b' # localhost ) # Email: user@example.com PATTERN_EMAIL = re.compile( r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b' ) # UUID: 550e8400-e29b-41d4-a716-446655440000 PATTERN_UUID = re.compile( r'\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-' r'[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b' ) # API Keys: sk-xxx, pk-xxx, key-xxx, token-xxx PATTERN_API_KEY = re.compile( r'\b(?:sk|pk|api|key|token|bearer|secret|password|pwd|auth)[-_]?' r'[a-zA-Z0-9]{16,}\b', re.IGNORECASE ) # Generic long tokens (32+ hex/alphanumeric) PATTERN_LONG_TOKEN = re.compile( r'\b[a-zA-Z0-9]{32,}\b' ) # JWT-like tokens (xxx.xxx.xxx) PATTERN_JWT = re.compile( r'\beyJ[a-zA-Z0-9_-]*\.[a-zA-Z0-9_-]*\.[a-zA-Z0-9_-]*\b' ) # ==================== Privacy Shield Engine ==================== @dataclass class ConsistentMapper: """ 一致性映射器 確保同一個值在同一個上下文中被映射到同一個標籤 例如:192.168.1.1 總是映射到 [IP_1] """ prefix: str _counter: int = 0 _mapping: dict[str, str] = field(default_factory=dict) _reverse: dict[str, str] = field(default_factory=dict) def get_label(self, value: str) -> str: """取得或建立標籤""" if value not in self._mapping: self._counter += 1 label = f"[{self.prefix}_{self._counter}]" self._mapping[value] = label self._reverse[label] = value return self._mapping[value] def get_original(self, label: str) -> str | None: """反查原始值 (用於還原)""" return self._reverse.get(label) @property def mapping(self) -> dict[str, str]: return self._mapping.copy() class PrivacyShield: """ Privacy Shield 脫敏引擎 BFF 層攔截器,在送給 LLM 前自動脫敏機敏資料 使用一致性雜湊確保同值同標籤,AI 仍能辨識上下文關係 """ def __init__(self): # 預設啟用的規則 (可動態配置) self.rules: list[tuple[re.Pattern, SensitiveDataType]] = [ (PATTERN_API_KEY, SensitiveDataType.SECRET), # API Key 優先 (PATTERN_JWT, SensitiveDataType.SECRET), # JWT Token (PATTERN_UUID, SensitiveDataType.SECRET), # UUID (PATTERN_EMAIL, SensitiveDataType.EMAIL), # Email (PATTERN_IPV6, SensitiveDataType.IP_ADDRESS), # IPv6 先於 IPv4 (PATTERN_IPV4, SensitiveDataType.IP_ADDRESS), # IPv4 (PATTERN_LONG_TOKEN, SensitiveDataType.SECRET), # 長 Token (最後) ] def redact(self, text: str) -> RedactionResult: """ 執行脫敏 Args: text: 原始文字 (Log、錯誤訊息、使用者輸入等) Returns: RedactionResult 包含脫敏後文字、匹配列表、映射表 """ # 每次 redact 使用獨立的 mapper,確保同一批文字內一致 mappers: dict[SensitiveDataType, ConsistentMapper] = { SensitiveDataType.IP_ADDRESS: ConsistentMapper(prefix="IP"), SensitiveDataType.EMAIL: ConsistentMapper(prefix="EMAIL"), SensitiveDataType.SECRET: ConsistentMapper(prefix="SECRET"), } matches: list[RedactionMatch] = [] redacted_positions: set[tuple[int, int]] = set() # 1. 收集所有匹配 (避免重疊) all_matches: list[tuple[re.Match, SensitiveDataType]] = [] for pattern, data_type in self.rules: for match in pattern.finditer(text): # 檢查是否與已匹配區域重疊 start, end = match.start(), match.end() overlaps = any( not (end <= s or start >= e) for s, e in redacted_positions ) if not overlaps: all_matches.append((match, data_type)) redacted_positions.add((start, end)) # 2. 按位置排序 (從後往前替換,避免位移) all_matches.sort(key=lambda x: x[0].start(), reverse=True) # 3. 執行替換 result_text = text for match, data_type in all_matches: original = match.group() mapper = mappers[data_type] label = mapper.get_label(original) # 記錄匹配 matches.append(RedactionMatch( original=original, redacted=label, data_type=data_type, start=match.start(), end=match.end(), )) # 替換文字 result_text = ( result_text[:match.start()] + label + result_text[match.end():] ) # 反轉 matches 順序 (恢復正序) matches.reverse() # 合併所有映射 combined_mapping: dict[str, str] = {} for mapper in mappers.values(): combined_mapping.update(mapper.mapping) return RedactionResult( original_text=text, redacted_text=result_text, matches=matches, mapping=combined_mapping, ) def redact_batch(self, texts: list[str]) -> list[RedactionResult]: """批次脫敏 (每個文字獨立映射)""" return [self.redact(text) for text in texts] def redact_with_shared_context(self, texts: list[str]) -> tuple[list[str], dict[str, str]]: """ 共享上下文批次脫敏 多段文字共用同一個映射器,確保跨文字的同值同標籤 適用於:多行 Log、對話歷史等 """ mappers: dict[SensitiveDataType, ConsistentMapper] = { SensitiveDataType.IP_ADDRESS: ConsistentMapper(prefix="IP"), SensitiveDataType.EMAIL: ConsistentMapper(prefix="EMAIL"), SensitiveDataType.SECRET: ConsistentMapper(prefix="SECRET"), } results: list[str] = [] for text in texts: result_text = text redacted_positions: set[tuple[int, int]] = set() all_matches: list[tuple[re.Match, SensitiveDataType]] = [] for pattern, data_type in self.rules: for match in pattern.finditer(text): start, end = match.start(), match.end() overlaps = any( not (end <= s or start >= e) for s, e in redacted_positions ) if not overlaps: all_matches.append((match, data_type)) redacted_positions.add((start, end)) all_matches.sort(key=lambda x: x[0].start(), reverse=True) for match, data_type in all_matches: original = match.group() label = mappers[data_type].get_label(original) result_text = ( result_text[:match.start()] + label + result_text[match.end():] ) results.append(result_text) # 合併映射 combined_mapping: dict[str, str] = {} for mapper in mappers.values(): combined_mapping.update(mapper.mapping) return results, combined_mapping def restore(self, text: str, mapping: dict[str, str]) -> str: """ 還原脫敏文字 (用於除錯或日誌記錄) ⚠️ 警告:只應在 BFF 內部使用,絕不可還原後送給外部系統 """ result = text # 反轉映射 reverse_mapping = {v: k for k, v in mapping.items()} for label, original in reverse_mapping.items(): result = result.replace(label, original) return result # ==================== FastAPI Middleware Integration ==================== def create_privacy_middleware(shield: "PrivacyShield"): """ 建立 FastAPI 中間件 用於自動脫敏請求/回應中的機敏資料 """ from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import Response class PrivacyShieldMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next: Callable) -> Response: # TODO: 實作請求/回應脫敏 # 目前僅作為範例骨架 response = await call_next(request) return response return PrivacyShieldMiddleware # 全域引擎實例 privacy_shield = PrivacyShield()