""" Complexity Scorer - Phase 13.3 #86 =================================== 複雜度評分,用於智能路由模型選擇 目標: < 10ms 延遲 (純規則引擎) 策略: 基於特徵提取的加權評分 Phase 13.3 (2026-03-26): 初始實作 Phase 13.3 (2026-03-26): 增強版 - 9 維度完整評分系統 (ADR-023) 版本: v2.0 建立: 2026-03-26 (台北時區) 建立者: Claude Code 最後修改: 2026-03-26 (台北時區) 修改者: Claude Code """ from dataclasses import dataclass, field from enum import Enum from typing import Protocol import structlog from src.services.model_registry import get_model_registry logger = structlog.get_logger(__name__) # ============================================================================= # Enums # ============================================================================= class DataImpact(Enum): """資料影響等級 (ADR-023)""" NONE = "none" # 無資料影響 READ_ONLY = "read_only" # 只讀操作 WRITE = "write" # 寫入操作 DESTRUCTIVE = "destructive" # 破壞性操作 (刪除、DROP) class BusinessCriticality(Enum): """業務關鍵度等級""" NON_CRITICAL = "non_critical" # 非關鍵服務 SUPPORTING = "supporting" # 支援服務 IMPORTANT = "important" # 重要服務 CRITICAL = "critical" # 核心服務 MISSION_CRITICAL = "mission_critical" # 業務命脈 # ============================================================================= # Interface (支援 DI 測試) # ============================================================================= class IComplexityScorer(Protocol): """Complexity Scorer Interface for DI""" def score(self, context: dict) -> "ComplexityScore": """計算複雜度分數""" ... def get_dimension_weights(self) -> dict[str, float]: """取得維度權重配置""" ... # ============================================================================= # Data Classes # ============================================================================= def _get_default_model() -> str: """取得預設模型 (從 ModelRegistry)""" return get_model_registry().get_model("ollama", "default") @dataclass class DimensionScore: """單一維度評分""" name: str # 維度名稱 raw_value: int | float | str | bool # 原始值 normalized_score: int # 正規化分數 (1-5) weight: float # 權重 weighted_score: float # 加權後分數 reason: str # 評分原因 @dataclass class ComplexityScore: """複雜度評分結果""" score: int # 1-5 (1=簡單, 5=極複雜) features: dict[str, int] = field(default_factory=dict) # 向後相容 recommended_model: str = "" # 由 ComplexityScorer 填入 reasoning: str = "" # v2.0 新增欄位 dimensions: list[DimensionScore] = field(default_factory=list) raw_weighted_sum: float = 0.0 # 加權總分 (正規化前) total_weight: float = 0.0 # 總權重 def __post_init__(self): """初始化後設定預設模型""" if not self.recommended_model: self.recommended_model = _get_default_model() def to_dict(self) -> dict: """轉換為字典 (API 回應用)""" return { "score": self.score, "recommended_model": self.recommended_model, "reasoning": self.reasoning, "dimensions": [ { "name": d.name, "raw_value": d.raw_value if not isinstance(d.raw_value, Enum) else d.raw_value.value, "normalized_score": d.normalized_score, "weight": d.weight, "weighted_score": round(d.weighted_score, 3), "reason": d.reason, } for d in self.dimensions ], "raw_weighted_sum": round(self.raw_weighted_sum, 3), "total_weight": round(self.total_weight, 3), } # ============================================================================= # Complexity Scorer Implementation # ============================================================================= class ComplexityScorer: """ 複雜度評分器 (v2.0) 基於規則的複雜度評估,無 LLM 依賴,確保 < 10ms 評分維度 (9 個,ADR-023): 1. 資源數量 (resource_count) 2. 跨命名空間 (cross_namespace) 3. 有狀態資源 (stateful_resource) 4. 資料影響 (data_impact) 5. 服務依賴 (service_dependencies) 6. 回滾難度 (rollback_difficulty) 7. 停機時間 (downtime_estimate) 8. 安全敏感度 (security_sensitivity) 9. 業務關鍵度 (business_criticality) 權重配置說明: - 權重越高,對最終分數影響越大 - 總權重 = 所有啟用維度權重之和 - 最終分數 = 加權平均 (1-5) """ # ========================================================================== # 權重配置 (可透過 models.json 覆寫) # ========================================================================== DEFAULT_WEIGHTS = { # 維度名稱: 權重 "resource_count": 1.0, # 資源數量 "cross_namespace": 1.5, # 跨命名空間 (風險較高) "stateful_resource": 2.0, # 有狀態資源 (最高風險) "data_impact": 2.0, # 資料影響 (最高風險) "service_dependencies": 1.0, # 服務依賴 "rollback_difficulty": 1.5, # 回滾難度 "downtime_estimate": 1.0, # 停機時間 "security_sensitivity": 1.5, # 安全敏感度 "business_criticality": 1.5, # 業務關鍵度 # 降低複雜度的維度 (負權重) "has_playbook": -0.5, # 有歷史 Playbook "has_history": -0.5, # 有歷史案例 } # ========================================================================== # 評分閾值 # ========================================================================== # 資源數量閾值 RESOURCE_COUNT_THRESHOLDS = { 1: 1, # 1 個資源 = 分數 1 2: 2, # 2 個資源 = 分數 2 3: 3, # 3-4 個資源 = 分數 3 5: 4, # 5-9 個資源 = 分數 4 10: 5, # 10+ 個資源 = 分數 5 } # 服務依賴閾值 SERVICE_DEPENDENCY_THRESHOLDS = { 0: 1, # 獨立服務 = 分數 1 1: 2, # 1 個依賴 = 分數 2 2: 3, # 2 個依賴 = 分數 3 4: 4, # 4 個依賴 = 分數 4 6: 5, # 6+ 個依賴 = 分數 5 } # 停機時間閾值 (分鐘) DOWNTIME_THRESHOLDS = { 0: 1, # 0 分鐘 = 分數 1 1: 2, # 1-4 分鐘 = 分數 2 5: 3, # 5-14 分鐘 = 分數 3 15: 4, # 15-29 分鐘 = 分數 4 30: 5, # 30+ 分鐘 = 分數 5 } # 資料影響對應分數 DATA_IMPACT_SCORES = { DataImpact.NONE: 1, DataImpact.READ_ONLY: 2, DataImpact.WRITE: 4, DataImpact.DESTRUCTIVE: 5, } # 業務關鍵度對應分數 BUSINESS_CRITICALITY_SCORES = { BusinessCriticality.NON_CRITICAL: 1, BusinessCriticality.SUPPORTING: 2, BusinessCriticality.IMPORTANT: 3, BusinessCriticality.CRITICAL: 4, BusinessCriticality.MISSION_CRITICAL: 5, } def __init__(self, weights: dict[str, float] | None = None): """ 初始化 ComplexityScorer Args: weights: 自訂權重配置,None 使用預設 """ self._weights = weights or self.DEFAULT_WEIGHTS.copy() def get_dimension_weights(self) -> dict[str, float]: """取得維度權重配置""" return self._weights.copy() def score(self, context: dict) -> ComplexityScore: """ 計算複雜度分數 Args: context: 上下文資訊,包含 (全部可選): # 基本維度 - resource_count: int (受影響資源數量) - affected_services: list[str] (受影響服務清單,向後相容) - metrics: list[str] (相關指標,向後相容) # 命名空間與資源類型 - namespaces: list[str] (涉及的命名空間) - cross_namespace: bool (是否跨命名空間) - stateful_resources: list[str] (有狀態資源清單) - has_statefulset: bool (是否有 StatefulSet) - has_pvc: bool (是否有 PVC) # 資料影響 - data_impact: str | DataImpact (資料影響等級) # 服務依賴 - service_dependencies: list[str] (服務依賴清單) - dependency_count: int (依賴數量) # 回滾 - rollback_difficulty: int (1-5) - can_rollback_immediately: bool (是否可立即回滾) - irreversible: bool (是否不可逆) # 停機時間 - downtime_minutes: int (預估停機時間) - zero_downtime: bool (是否零停機) # 安全 - involves_secrets: bool (是否涉及 Secret) - involves_rbac: bool (是否涉及 RBAC) - security_sensitive: bool (是否安全敏感) # 業務 - business_criticality: str | BusinessCriticality (業務關鍵度) - is_core_service: bool (是否核心服務) # 歷史 - has_playbook: bool (是否有 Playbook) - has_history: bool (是否有歷史案例) # 其他 (向後相容) - requires_code_analysis: bool - cross_system: bool - severity: str Returns: ComplexityScore: 評分結果 """ dimensions: list[DimensionScore] = [] features: dict[str, int] = {} # 向後相容 # ======================================================================= # 評估各維度 # ======================================================================= # 維度 1: 資源數量 dim1 = self._score_resource_count(context) if dim1: dimensions.append(dim1) features["resource_count"] = dim1.normalized_score # 維度 2: 跨命名空間 dim2 = self._score_cross_namespace(context) if dim2: dimensions.append(dim2) features["cross_namespace"] = dim2.normalized_score # 維度 3: 有狀態資源 dim3 = self._score_stateful_resource(context) if dim3: dimensions.append(dim3) features["stateful_resource"] = dim3.normalized_score # 維度 4: 資料影響 dim4 = self._score_data_impact(context) if dim4: dimensions.append(dim4) features["data_impact"] = dim4.normalized_score # 維度 5: 服務依賴 dim5 = self._score_service_dependencies(context) if dim5: dimensions.append(dim5) features["service_dependencies"] = dim5.normalized_score # 維度 6: 回滾難度 dim6 = self._score_rollback_difficulty(context) if dim6: dimensions.append(dim6) features["rollback_difficulty"] = dim6.normalized_score # 維度 7: 停機時間 dim7 = self._score_downtime(context) if dim7: dimensions.append(dim7) features["downtime_estimate"] = dim7.normalized_score # 維度 8: 安全敏感度 dim8 = self._score_security_sensitivity(context) if dim8: dimensions.append(dim8) features["security_sensitivity"] = dim8.normalized_score # 維度 9: 業務關鍵度 dim9 = self._score_business_criticality(context) if dim9: dimensions.append(dim9) features["business_criticality"] = dim9.normalized_score # 降低複雜度的維度 dim_playbook = self._score_has_playbook(context) if dim_playbook: dimensions.append(dim_playbook) features["has_playbook"] = 1 dim_history = self._score_has_history(context) if dim_history: dimensions.append(dim_history) features["has_history"] = 1 # ======================================================================= # 計算加權平均 # ======================================================================= if not dimensions: # 無維度資料,返回基本分數 final_score = 1 raw_weighted_sum = 1.0 total_weight = 1.0 reasoning = "基本複雜度 (無足夠資訊)" else: # 計算加權總分 weighted_sum = sum(d.weighted_score for d in dimensions) total_weight = sum(abs(d.weight) for d in dimensions) # 加權平均 if total_weight > 0: avg_score = weighted_sum / total_weight else: avg_score = 1.0 # 正規化到 1-5 final_score = max(1, min(5, round(avg_score))) raw_weighted_sum = weighted_sum # 生成 reasoning high_impact_dims = [d for d in dimensions if d.normalized_score >= 4] if high_impact_dims: reasons = [d.reason for d in high_impact_dims[:3]] # 最多 3 個 reasoning = "; ".join(reasons) else: reasons = [d.reason for d in dimensions if d.normalized_score >= 2][:3] reasoning = "; ".join(reasons) if reasons else "基本複雜度" # ======================================================================= # 從 ModelRegistry 取得推薦模型 # ======================================================================= registry = get_model_registry() recommended_model = registry.get_model_by_complexity(final_score) result = ComplexityScore( score=final_score, features=features, recommended_model=recommended_model, reasoning=reasoning, dimensions=dimensions, raw_weighted_sum=raw_weighted_sum, total_weight=total_weight, ) logger.debug( "complexity_scored", score=final_score, features=features, model=recommended_model, dimension_count=len(dimensions), ) return result # ========================================================================== # 維度評分方法 # ========================================================================== def _score_resource_count(self, context: dict) -> DimensionScore | None: """維度 1: 資源數量""" # 優先使用 resource_count,否則計算 affected_services count = context.get("resource_count") if count is None: services = context.get("affected_services", []) if not services: return None count = len(services) if count < 1: return None # 計算分數 score = 1 for threshold, s in sorted(self.RESOURCE_COUNT_THRESHOLDS.items()): if count >= threshold: score = s weight = self._weights.get("resource_count", 1.0) return DimensionScore( name="resource_count", raw_value=count, normalized_score=score, weight=weight, weighted_score=score * weight, reason=f"{count} 個資源" if count <= 5 else f"{count} 個資源 (大規模)", ) def _score_cross_namespace(self, context: dict) -> DimensionScore | None: """維度 2: 跨命名空間""" # 直接標記 cross_ns = context.get("cross_namespace", False) # 或從 namespaces 推斷 if not cross_ns: namespaces = context.get("namespaces", []) cross_ns = len(namespaces) > 1 # 或從 cross_system 推斷 (向後相容) if not cross_ns: cross_ns = context.get("cross_system", False) if not cross_ns: return None namespaces = context.get("namespaces", []) ns_count = len(namespaces) if namespaces else 2 # 跨命名空間基本分數 = 3,多個 = 4-5 score = 3 if ns_count <= 2 else (4 if ns_count <= 4 else 5) weight = self._weights.get("cross_namespace", 1.5) return DimensionScore( name="cross_namespace", raw_value=True, normalized_score=score, weight=weight, weighted_score=score * weight, reason=f"跨 {ns_count} 個命名空間" if ns_count > 1 else "跨命名空間操作", ) def _score_stateful_resource(self, context: dict) -> DimensionScore | None: """維度 3: 有狀態資源 (StatefulSet, PVC)""" stateful_resources = context.get("stateful_resources", []) has_sts = context.get("has_statefulset", False) has_pvc = context.get("has_pvc", False) if not stateful_resources and not has_sts and not has_pvc: return None # 計算分數 if has_pvc or "pvc" in str(stateful_resources).lower(): score = 5 # PVC 最高風險 reason = "涉及 PVC (資料持久化)" elif has_sts or "statefulset" in str(stateful_resources).lower(): score = 4 # StatefulSet 高風險 reason = "涉及 StatefulSet (有序部署)" else: score = 3 reason = f"涉及 {len(stateful_resources)} 個有狀態資源" weight = self._weights.get("stateful_resource", 2.0) return DimensionScore( name="stateful_resource", raw_value=stateful_resources or [has_sts, has_pvc], normalized_score=score, weight=weight, weighted_score=score * weight, reason=reason, ) def _score_data_impact(self, context: dict) -> DimensionScore | None: """維度 4: 資料影響""" impact = context.get("data_impact") if impact is None: return None # 轉換為 Enum if isinstance(impact, str): try: impact = DataImpact(impact.lower()) except ValueError: return None elif not isinstance(impact, DataImpact): return None if impact == DataImpact.NONE: return None # 無影響不計分 score = self.DATA_IMPACT_SCORES.get(impact, 1) weight = self._weights.get("data_impact", 2.0) reason_map = { DataImpact.READ_ONLY: "只讀操作", DataImpact.WRITE: "寫入操作 (資料變更)", DataImpact.DESTRUCTIVE: "破壞性操作 (不可恢復)", } return DimensionScore( name="data_impact", raw_value=impact, normalized_score=score, weight=weight, weighted_score=score * weight, reason=reason_map.get(impact, "資料影響"), ) def _score_service_dependencies(self, context: dict) -> DimensionScore | None: """維度 5: 服務依賴""" deps = context.get("service_dependencies", []) dep_count = context.get("dependency_count") if dep_count is None: dep_count = len(deps) if deps else 0 if dep_count == 0: return None # 計算分數 score = 1 for threshold, s in sorted(self.SERVICE_DEPENDENCY_THRESHOLDS.items()): if dep_count >= threshold: score = s weight = self._weights.get("service_dependencies", 1.0) return DimensionScore( name="service_dependencies", raw_value=dep_count, normalized_score=score, weight=weight, weighted_score=score * weight, reason=f"依賴 {dep_count} 個服務", ) def _score_rollback_difficulty(self, context: dict) -> DimensionScore | None: """維度 6: 回滾難度""" # 直接指定難度 difficulty = context.get("rollback_difficulty") if difficulty is None: # 從其他欄位推斷 if context.get("irreversible", False): difficulty = 5 elif context.get("can_rollback_immediately", True): return None # 可立即回滾,不加分 else: difficulty = 3 # 預設中等 if difficulty is None or difficulty < 2: return None score = max(1, min(5, difficulty)) weight = self._weights.get("rollback_difficulty", 1.5) reason_map = { 2: "回滾需要額外步驟", 3: "回滾難度中等", 4: "回滾困難 (需人工介入)", 5: "不可逆操作", } return DimensionScore( name="rollback_difficulty", raw_value=difficulty, normalized_score=score, weight=weight, weighted_score=score * weight, reason=reason_map.get(score, f"回滾難度 {score}"), ) def _score_downtime(self, context: dict) -> DimensionScore | None: """維度 7: 停機時間""" if context.get("zero_downtime", False): return None # 零停機不加分 downtime = context.get("downtime_minutes") if downtime is None or downtime == 0: return None # 計算分數 score = 1 for threshold, s in sorted(self.DOWNTIME_THRESHOLDS.items()): if downtime >= threshold: score = s weight = self._weights.get("downtime_estimate", 1.0) if downtime < 5: reason = f"預估停機 {downtime} 分鐘" elif downtime < 15: reason = f"預估停機 {downtime} 分鐘 (中等)" else: reason = f"預估停機 {downtime} 分鐘 (長時間)" return DimensionScore( name="downtime_estimate", raw_value=downtime, normalized_score=score, weight=weight, weighted_score=score * weight, reason=reason, ) def _score_security_sensitivity(self, context: dict) -> DimensionScore | None: """維度 8: 安全敏感度 (Secret/RBAC)""" involves_secrets = context.get("involves_secrets", False) involves_rbac = context.get("involves_rbac", False) security_sensitive = context.get("security_sensitive", False) if not involves_secrets and not involves_rbac and not security_sensitive: return None # 計算分數 if involves_rbac: score = 5 # RBAC 最敏感 reason = "涉及 RBAC 權限變更" elif involves_secrets: score = 4 # Secret 高敏感 reason = "涉及 Secret 操作" else: score = 3 reason = "安全敏感操作" weight = self._weights.get("security_sensitivity", 1.5) return DimensionScore( name="security_sensitivity", raw_value={"secrets": involves_secrets, "rbac": involves_rbac}, normalized_score=score, weight=weight, weighted_score=score * weight, reason=reason, ) def _score_business_criticality(self, context: dict) -> DimensionScore | None: """維度 9: 業務關鍵度""" criticality = context.get("business_criticality") if criticality is None: # 從 is_core_service 推斷 if context.get("is_core_service", False): criticality = BusinessCriticality.CRITICAL else: return None # 轉換為 Enum if isinstance(criticality, str): try: criticality = BusinessCriticality(criticality.lower()) except ValueError: # 嘗試映射常見值 mapping = { "low": BusinessCriticality.NON_CRITICAL, "medium": BusinessCriticality.IMPORTANT, "high": BusinessCriticality.CRITICAL, } criticality = mapping.get(criticality.lower()) if criticality is None: return None elif not isinstance(criticality, BusinessCriticality): return None if criticality == BusinessCriticality.NON_CRITICAL: return None # 非關鍵不加分 score = self.BUSINESS_CRITICALITY_SCORES.get(criticality, 1) weight = self._weights.get("business_criticality", 1.5) reason_map = { BusinessCriticality.SUPPORTING: "支援服務", BusinessCriticality.IMPORTANT: "重要服務", BusinessCriticality.CRITICAL: "核心服務", BusinessCriticality.MISSION_CRITICAL: "業務命脈 (最高優先)", } return DimensionScore( name="business_criticality", raw_value=criticality, normalized_score=score, weight=weight, weighted_score=score * weight, reason=reason_map.get(criticality, "業務關鍵度"), ) def _score_has_playbook(self, context: dict) -> DimensionScore | None: """降低複雜度: 有 Playbook""" if not context.get("has_playbook", False): return None weight = self._weights.get("has_playbook", -0.5) return DimensionScore( name="has_playbook", raw_value=True, normalized_score=1, # 正向降低 weight=weight, # 負權重 weighted_score=1 * weight, # 負分 reason="有歷史 Playbook (降低複雜度)", ) def _score_has_history(self, context: dict) -> DimensionScore | None: """降低複雜度: 有歷史案例""" if not context.get("has_history", False): return None weight = self._weights.get("has_history", -0.5) return DimensionScore( name="has_history", raw_value=True, normalized_score=1, weight=weight, weighted_score=1 * weight, reason="有歷史案例參考 (降低複雜度)", ) # ============================================================================= # Singleton # ============================================================================= _scorer: ComplexityScorer | None = None def get_complexity_scorer() -> ComplexityScorer: """取得 ComplexityScorer 單例""" global _scorer if _scorer is None: _scorer = ComplexityScorer() return _scorer def reset_complexity_scorer() -> None: """重置單例 (用於測試)""" global _scorer _scorer = None # ============================================================================= # Convenience Functions # ============================================================================= def score_complexity(context: dict) -> ComplexityScore: """便捷函數: 計算複雜度""" return get_complexity_scorer().score(context)