""" Trust Engine - 風險判定與 Multi-Sig 簽核邏輯 ========================================== CISO-101: 信任引擎核心實作 風險等級與簽核需求: - LOW: 0 人,自動放行 (如 scale up) - MEDIUM: 需 1 人簽核 (如 delete pod) - CRITICAL: 需 2 人 Multi-Sig 雙重簽核 (如 DROP TABLE) Features: - 自動風險分類 - 簽核數驗證 - 狀態轉換控制 """ from collections.abc import Callable from datetime import UTC, datetime from uuid import UUID from src.models.approval import ( ApprovalRequest, ApprovalRequestCreate, ApprovalStatus, BlastRadius, DataImpact, RiskLevel, Signature, ) # ============================================================================= # Risk Classification Rules # ============================================================================= # 危險關鍵字 - 用於動作分類 CRITICAL_KEYWORDS = [ "drop", "delete database", "truncate", "rm -rf", "destroy", "format", "wipe", "purge all", ] MEDIUM_KEYWORDS = [ "delete", "remove", "stop", "restart", "rollback", "downgrade", "migrate", ] LOW_KEYWORDS = [ "scale", "update config", "patch", "upgrade", "add", "create", ] # ============================================================================= # Signature Requirements # ============================================================================= SIGNATURE_REQUIREMENTS: dict[RiskLevel, int] = { RiskLevel.LOW: 0, # 自動放行 RiskLevel.MEDIUM: 1, # 單人簽核 RiskLevel.CRITICAL: 1, # 2026-04-02 ogt: 統帥決策 — 只需 1 層審核,Multi-Sig 停用 } def get_required_signatures(risk_level: RiskLevel) -> int: """根據風險等級取得所需簽核數""" return SIGNATURE_REQUIREMENTS.get(risk_level, 1) # ============================================================================= # Risk Classification # ============================================================================= def classify_risk_by_action(action: str) -> RiskLevel: """ 根據動作描述自動分類風險等級 優先順序: CRITICAL > MEDIUM > LOW """ action_lower = action.lower() # Check CRITICAL first for keyword in CRITICAL_KEYWORDS: if keyword in action_lower: return RiskLevel.CRITICAL # Check MEDIUM for keyword in MEDIUM_KEYWORDS: if keyword in action_lower: return RiskLevel.MEDIUM # Check LOW for keyword in LOW_KEYWORDS: if keyword in action_lower: return RiskLevel.LOW # Default to MEDIUM for unknown actions return RiskLevel.MEDIUM def classify_risk_by_blast_radius(blast_radius: BlastRadius) -> RiskLevel: """ 根據爆炸半徑分類風險等級 - DESTRUCTIVE 數據影響 → CRITICAL - 影響 > 10 pods 或多於 3 個關聯服務 → CRITICAL - 影響 > 3 pods 或有停機時間 → MEDIUM - 其他 → LOW """ # DESTRUCTIVE 資料影響直接升級為 CRITICAL if blast_radius.data_impact == DataImpact.DESTRUCTIVE: return RiskLevel.CRITICAL # WRITE 資料影響至少 MEDIUM if blast_radius.data_impact == DataImpact.WRITE: if blast_radius.affected_pods > 5 or len(blast_radius.related_services) > 2: return RiskLevel.CRITICAL return RiskLevel.MEDIUM # 根據影響範圍判定 if blast_radius.affected_pods > 10: return RiskLevel.CRITICAL if len(blast_radius.related_services) > 3: return RiskLevel.CRITICAL if blast_radius.affected_pods > 3: return RiskLevel.MEDIUM if blast_radius.estimated_downtime != "0": return RiskLevel.MEDIUM if len(blast_radius.related_services) > 1: return RiskLevel.MEDIUM return RiskLevel.LOW def classify_risk( action: str, blast_radius: BlastRadius | None = None, explicit_level: RiskLevel | None = None, ) -> RiskLevel: """ 綜合風險分類 - 取最高風險等級 Args: action: 動作描述 blast_radius: 爆炸半徑 explicit_level: 明確指定的風險等級 (優先) Returns: 最終風險等級 """ # 如果明確指定,直接使用 if explicit_level is not None: return explicit_level # 從動作分類 action_risk = classify_risk_by_action(action) # 從爆炸半徑分類 blast_risk = RiskLevel.LOW if blast_radius: blast_risk = classify_risk_by_blast_radius(blast_radius) # 取較高風險等級 risk_order = [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.CRITICAL] action_idx = risk_order.index(action_risk) blast_idx = risk_order.index(blast_risk) return risk_order[max(action_idx, blast_idx)] # ============================================================================= # Approval State Machine # ============================================================================= class TrustEngine: """ 信任引擎 - 管理授權請求生命週期 狀態機: PENDING → APPROVED (當簽核數滿足) PENDING → REJECTED (當被拒絕) PENDING → EXPIRED (當過期) """ def __init__( self, on_approved: Callable[[ApprovalRequest], None] | None = None, on_rejected: Callable[[ApprovalRequest], None] | None = None, ): """ 初始化信任引擎 Args: on_approved: 當請求被批准時的回調 on_rejected: 當請求被拒絕時的回調 """ self._approvals: dict[UUID, ApprovalRequest] = {} self._on_approved = on_approved self._on_rejected = on_rejected def create_approval( self, request: ApprovalRequestCreate, ) -> ApprovalRequest: """ 建立新的授權請求 自動根據風險等級設定所需簽核數 LOW 風險自動批准 """ # 分類風險 risk_level = classify_risk( action=request.action, blast_radius=request.blast_radius, explicit_level=request.risk_level, ) # 取得所需簽核數 required_sigs = get_required_signatures(risk_level) # 建立完整請求 approval = ApprovalRequest( action=request.action, description=request.description, risk_level=risk_level, blast_radius=request.blast_radius, dry_run_checks=request.dry_run_checks, requested_by=request.requested_by, expires_at=request.expires_at, metadata=request.metadata, required_signatures=required_sigs, ) # LOW 風險自動批准 if risk_level == RiskLevel.LOW: approval.status = ApprovalStatus.APPROVED approval.resolved_at = datetime.now(UTC) if self._on_approved: self._on_approved(approval) # 儲存 self._approvals[approval.id] = approval return approval def get_approval(self, approval_id: UUID) -> ApprovalRequest | None: """取得授權請求""" return self._approvals.get(approval_id) def get_pending_approvals(self) -> list[ApprovalRequest]: """取得所有待簽核請求""" now = datetime.now(UTC) pending = [] for approval in self._approvals.values(): # 檢查是否過期 if approval.status == ApprovalStatus.PENDING: if approval.expires_at and approval.expires_at < now: approval.status = ApprovalStatus.EXPIRED approval.resolved_at = now else: pending.append(approval) # 按建立時間排序 (最新優先) pending.sort(key=lambda x: x.created_at, reverse=True) return pending def sign_approval( self, approval_id: UUID, signer_id: str, signer_name: str, comment: str | None = None, ) -> tuple[ApprovalRequest | None, str, bool]: """ 簽核授權請求 Returns: (approval, message, execution_triggered) - approval: 更新後的請求 (None 表示失敗) - message: 結果訊息 - execution_triggered: 是否觸發執行 """ approval = self._approvals.get(approval_id) if not approval: return None, "Approval not found", False if approval.status != ApprovalStatus.PENDING: return approval, f"Cannot sign: status is {approval.status.value}", False # 檢查是否已簽核 if approval.has_signer(signer_id): return approval, f"Signer {signer_id} has already signed", False # 新增簽核 signature = Signature( signer_id=signer_id, signer_name=signer_name, comment=comment, ) approval.signatures.append(signature) approval.updated_at = datetime.now(UTC) # 檢查是否滿足簽核數 _execution_triggered = False if approval.is_fully_signed: approval.status = ApprovalStatus.APPROVED approval.resolved_at = datetime.now(UTC) _execution_triggered = True if self._on_approved: self._on_approved(approval) return approval, "Approval completed - execution triggered", True remaining = approval.remaining_signatures return approval, f"Signed. {remaining} more signature(s) required", False def reject_approval( self, approval_id: UUID, rejector_id: str, rejector_name: str, reason: str, ) -> tuple[ApprovalRequest | None, str]: """ 拒絕授權請求 Returns: (approval, message) """ approval = self._approvals.get(approval_id) if not approval: return None, "Approval not found" if approval.status != ApprovalStatus.PENDING: return approval, f"Cannot reject: status is {approval.status.value}" # 更新狀態 approval.status = ApprovalStatus.REJECTED approval.rejection_reason = f"[{rejector_name}] {reason}" approval.resolved_at = datetime.now(UTC) approval.updated_at = datetime.now(UTC) if self._on_rejected: self._on_rejected(approval) return approval, "Approval rejected" def expire_stale_approvals(self) -> list[ApprovalRequest]: """ 過期所有超時的待簽核請求 Returns: 已過期的請求列表 """ now = datetime.now(UTC) expired = [] for approval in self._approvals.values(): if approval.status == ApprovalStatus.PENDING: if approval.expires_at and approval.expires_at < now: approval.status = ApprovalStatus.EXPIRED approval.resolved_at = now approval.updated_at = now expired.append(approval) return expired # ============================================================================= # Singleton Instance # ============================================================================= _trust_engine: TrustEngine | None = None def get_trust_engine() -> TrustEngine: """取得全域信任引擎實例""" global _trust_engine if _trust_engine is None: _trust_engine = TrustEngine() return _trust_engine def reset_trust_engine() -> None: """重置信任引擎 (僅供測試使用)""" global _trust_engine _trust_engine = None