""" Trust Engine - 信任引擎與漸進自治 Phase 3.2: Progressive Autonomy 核心理念: 當某種特定操作被人類連續批准多次後, 系統自動將該操作的風險等級降級,最終達成 Zero-Touch (免授權自動執行) 信任累積規則: - 每次 Approve: +1 分 - 每次 Reject: 歸零 (信任瞬間瓦解) 風險降級閾值: - score >= 5: medium → low (變成自動執行) - score >= 10: high → medium (雙簽變單簽) - critical: 永遠不准降級 (Drop Table 等毀滅性操作) """ import logging from dataclasses import dataclass, field from datetime import datetime from typing import Literal, Protocol, runtime_checkable # Phase 16 R2 (2026-03-25): RiskLevel 統一改從 models/approval.py 導入 # 原因: 消除重複定義,統一風險等級來源 # 執行者: Claude Code # 回滾: 取消註解下方 RiskLevel class 區塊,移除此 import from src.models.approval import RiskLevel logger = logging.getLogger(__name__) # ==================== Types ==================== # --- 以下為舊定義,已封存 (Phase 16 R2) --- # class RiskLevel(str, Enum): # """風險等級""" # LOW = "low" # MEDIUM = "medium" # HIGH = "high" # CRITICAL = "critical" # --- 封存結束 --- @dataclass class TrustRecord: """信任記錄""" action_pattern: str score: int = 0 total_approvals: int = 0 total_rejections: int = 0 last_approval_by: str | None = None last_approval_at: datetime | None = None last_rejection_by: str | None = None last_rejection_at: datetime | None = None created_at: datetime = field(default_factory=datetime.utcnow) @property def approval_rate(self) -> float: """批准率""" total = self.total_approvals + self.total_rejections if total == 0: return 0.0 return self.total_approvals / total @dataclass class RiskAdjustment: """風險調整結果""" original_risk: RiskLevel adjusted_risk: RiskLevel trust_score: int reason: str is_downgraded: bool def to_dict(self) -> dict: return { "originalRisk": self.original_risk.value, "adjustedRisk": self.adjusted_risk.value, "trustScore": self.trust_score, "reason": self.reason, "isDowngraded": self.is_downgraded, } # ==================== Configuration ==================== @dataclass class TrustThresholds: """信任閾值配置""" # 降級閾值 medium_to_low: int = 5 # medium → low (自動執行) high_to_medium: int = 10 # high → medium (雙簽→單簽) # Reject 懲罰 rejection_penalty: int = -5 # Reject 時直接扣分 (或歸零) reset_on_reject: bool = True # True = 歸零, False = 扣分 # 信任衰減 (可選,防止過時信任) decay_enabled: bool = False decay_days: int = 30 # 幾天沒操作後開始衰減 decay_rate: float = 0.1 # 每天衰減比例 # 預設閾值 DEFAULT_THRESHOLDS = TrustThresholds() # ==================== Protocol Interface (Phase 17 P1) ==================== @runtime_checkable class ITrustScoreManager(Protocol): """ TrustScoreManager 介面定義 用途: - 依賴注入 (DI) 時的型別約束 - 測試時 Mock 的型別檢查 - 符合 leWOOOgo 積木化規範 Tier 3 紅區服務: 修改需首席架構師簽核 @see feedback_lewooogo_modular_enforcement.md @see docs/RED_ZONES.md """ def record_approval( self, action_pattern: str, user_role: str, user_id: str | None = None, ) -> TrustRecord: """記錄人類批准""" ... def record_rejection( self, action_pattern: str, user_role: str, user_id: str | None = None, reason: str | None = None, ) -> TrustRecord: """記錄人類拒絕""" ... def evaluate_adjusted_risk( self, action_pattern: str, original_risk: str | RiskLevel, ) -> RiskAdjustment: """評估調整後的風險等級""" ... def get_trust_record(self, action_pattern: str) -> TrustRecord | None: """取得信任記錄""" ... # ==================== Trust Engine ==================== class TrustScoreManager: """ 信任分數管理器 追蹤每個 action_pattern 的信任分數, 根據人類批准/拒絕歷史動態調整風險等級 """ def __init__(self, thresholds: TrustThresholds | None = None): self.thresholds = thresholds or DEFAULT_THRESHOLDS # In-memory storage (Phase 4+ 換成 Redis/PostgreSQL) self._records: dict[str, TrustRecord] = {} def _get_or_create_record(self, action_pattern: str) -> TrustRecord: """取得或建立信任記錄""" if action_pattern not in self._records: self._records[action_pattern] = TrustRecord(action_pattern=action_pattern) return self._records[action_pattern] def record_approval( self, action_pattern: str, user_role: str, user_id: str | None = None, ) -> TrustRecord: """ 記錄人類批准 每次 Approve,該 pattern 的信任分數 +1 連續批准累積信任,最終達成 Zero-Touch Args: action_pattern: 操作模式 (例如: "delete_pod:nginx-*") user_role: 批准者角色 user_id: 批准者 ID (可選) Returns: 更新後的 TrustRecord """ record = self._get_or_create_record(action_pattern) # 累積信任 record.score += 1 record.total_approvals += 1 record.last_approval_by = user_id or user_role record.last_approval_at = datetime.utcnow() logger.info( f"[TrustEngine] Approval recorded: {action_pattern} " f"(score: {record.score}, by: {user_role})" ) return record def record_rejection( self, action_pattern: str, user_role: str, user_id: str | None = None, reason: str | None = None, ) -> TrustRecord: """ 記錄人類拒絕 ⚠️ 信任瞬間瓦解: Reject 會讓分數歸零或大幅扣分 這確保系統不會因為歷史批准而忽視人類當下的判斷 Args: action_pattern: 操作模式 user_role: 拒絕者角色 user_id: 拒絕者 ID (可選) reason: 拒絕原因 (可選) Returns: 更新後的 TrustRecord """ record = self._get_or_create_record(action_pattern) # 信任瓦解 old_score = record.score if self.thresholds.reset_on_reject: record.score = 0 # 歸零 else: record.score = max(0, record.score + self.thresholds.rejection_penalty) record.total_rejections += 1 record.last_rejection_by = user_id or user_role record.last_rejection_at = datetime.utcnow() logger.warning( f"[TrustEngine] Rejection recorded: {action_pattern} " f"(score: {old_score} → {record.score}, by: {user_role}, reason: {reason})" ) return record def evaluate_adjusted_risk( self, action_pattern: str, original_risk: str | RiskLevel, ) -> RiskAdjustment: """ 評估調整後的風險等級 根據信任分數決定是否降級風險 降級規則: - score >= 5: medium → low (自動執行) - score >= 10: high → medium (雙簽→單簽) - critical: 永遠不准降級 Args: action_pattern: 操作模式 original_risk: 原始風險等級 Returns: RiskAdjustment 包含調整後風險與原因 """ # 標準化 risk level if isinstance(original_risk, str): original_risk = RiskLevel(original_risk.lower()) record = self._get_or_create_record(action_pattern) score = record.score # ╔════════════════════════════════════════════════════╗ # ║ CRITICAL 永遠不准降級 - 企業鐵律 ║ # ║ Drop Table, Delete Namespace 等毀滅性操作 ║ # ║ 無論多少次批准,都必須人類雙簽 ║ # ╚════════════════════════════════════════════════════╝ if original_risk == RiskLevel.CRITICAL: return RiskAdjustment( original_risk=original_risk, adjusted_risk=RiskLevel.CRITICAL, trust_score=score, reason="CRITICAL operations never auto-downgrade (enterprise policy)", is_downgraded=False, ) adjusted_risk = original_risk reason = "No adjustment" is_downgraded = False # HIGH → MEDIUM (score >= 10) if original_risk == RiskLevel.HIGH and score >= self.thresholds.high_to_medium: adjusted_risk = RiskLevel.MEDIUM reason = f"Trust score {score} >= {self.thresholds.high_to_medium}: HIGH → MEDIUM (2-sig → 1-sig)" is_downgraded = True # MEDIUM → LOW (score >= 5) elif original_risk == RiskLevel.MEDIUM and score >= self.thresholds.medium_to_low: adjusted_risk = RiskLevel.LOW reason = f"Trust score {score} >= {self.thresholds.medium_to_low}: MEDIUM → LOW (auto-execute)" is_downgraded = True # HIGH 但未達降級閾值 elif original_risk == RiskLevel.HIGH and score < self.thresholds.high_to_medium: reason = f"Trust score {score} < {self.thresholds.high_to_medium}: HIGH maintained" # MEDIUM 但未達降級閾值 elif original_risk == RiskLevel.MEDIUM and score < self.thresholds.medium_to_low: reason = f"Trust score {score} < {self.thresholds.medium_to_low}: MEDIUM maintained" # LOW 已是最低 elif original_risk == RiskLevel.LOW: reason = "Already at lowest risk level" if is_downgraded: logger.info( f"[TrustEngine] Risk downgraded: {action_pattern} " f"({original_risk.value} → {adjusted_risk.value}, score: {score})" ) return RiskAdjustment( original_risk=original_risk, adjusted_risk=adjusted_risk, trust_score=score, reason=reason, is_downgraded=is_downgraded, ) def get_trust_record(self, action_pattern: str) -> TrustRecord | None: """取得信任記錄""" return self._records.get(action_pattern) def get_all_records(self) -> list[TrustRecord]: """取得所有信任記錄""" return list(self._records.values()) def reset_trust(self, action_pattern: str) -> None: """重置特定 pattern 的信任分數""" if action_pattern in self._records: self._records[action_pattern].score = 0 logger.info(f"[TrustEngine] Trust reset: {action_pattern}") def reset_all(self) -> None: """重置所有信任分數 (緊急用)""" for record in self._records.values(): record.score = 0 logger.warning("[TrustEngine] All trust scores reset!") # ==================== Pattern Matching Utilities ==================== def normalize_action_pattern( operation: str, parameters: dict, granularity: Literal["exact", "resource", "operation"] = "resource", ) -> str: """ 正規化操作為 pattern granularity 控制信任累積粒度: - exact: "delete_pod:nginx-frontend-7d4b8c9f5-xk2m3" (精確到實例) - resource: "delete_pod:nginx-frontend-*" (資源類型) - operation: "delete_pod:*" (操作類型) Args: operation: 操作名稱 parameters: 操作參數 granularity: 粒度 Returns: 正規化後的 pattern """ if granularity == "operation": return f"{operation}:*" # 嘗試從參數提取資源名稱 resource_name = ( parameters.get("pod_name") or parameters.get("deployment") or parameters.get("table_name") or parameters.get("resource") or parameters.get("name") or "*" ) if granularity == "exact": return f"{operation}:{resource_name}" # resource: 提取資源前綴 # nginx-frontend-7d4b8c9f5-xk2m3 → nginx-frontend-* if isinstance(resource_name, str) and resource_name != "*": parts = resource_name.rsplit("-", 2) if len(parts) >= 3: resource_name = f"{parts[0]}-*" return f"{operation}:{resource_name}" # 全域實例 trust_engine = TrustScoreManager() def get_trust_manager() -> TrustScoreManager: """取得 TrustScoreManager singleton (ADR-030 Phase 4)""" return trust_engine