""" Config Drift Detection Models - Phase 25 P2 ============================================ GitOps 守門員:偵測 K8s 實際狀態 vs Git YAML 的漂移 設計原則: - DriftDetector: 只比對,輸出結構化 Diff,不判斷嚴重性 - DriftAnalyzer: 白名單過濾、DriftLevel 分級,不解釋意圖 - NemotronDriftInterpreter: 意圖分析(不生成修復指令) - DriftRemediator: 確定性修復(kubectl apply / git push),不使用 AI 判斷 版本: v1.0 建立: 2026-04-04 (台北時區) 建立者: ogt (首席架構師設計) + Claude Code (實作) 關聯設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向三 關聯 ADR: 待起草 ADR-057 """ from __future__ import annotations from datetime import datetime from enum import Enum from typing import Any from pydantic import BaseModel, Field from src.utils.timezone import now_taipei # ============================================================================= # Enums # ============================================================================= class DriftLevel(str, Enum): """漂移嚴重度分級""" INFO = "info" # 白名單欄位(replicas, resources)→ 靜默記錄 MEDIUM = "medium" # 非關鍵欄位 → Telegram 通知,無需緊急處理 HIGH = "high" # 關鍵欄位(image, env, ports)→ 立即通知,需確認 class DriftIntent(str, Enum): """Nemotron 意圖分析結果""" EMERGENCY_HOTFIX = "emergency_hotfix" # 繞過 CI 的緊急修補 HUMAN_ERROR = "human_error" # 誤操作 AUTOMATED_CHANGE = "automated_change" # 系統自動變更(HPA 等) UNKNOWN = "unknown" # 無法判斷 class DriftStatus(str, Enum): """漂移報告處理狀態""" PENDING = "pending" # 待處理 ACKNOWLEDGED = "acknowledged" # 已知悉(不需要處理) ROLLED_BACK = "rolled_back" # 已覆蓋回 Git 狀態 ADOPTED = "adopted" # 已承認(Git 已更新) IGNORED = "ignored" # 白名單忽略 # ============================================================================= # Core Models # ============================================================================= class DriftItem(BaseModel): """單一欄位的漂移記錄""" resource_kind: str = Field(..., description="K8s 資源類型(Deployment, Service 等)") resource_name: str = Field(..., description="K8s 資源名稱") namespace: str = Field(..., description="K8s namespace") field_path: str = Field(..., description="欄位路徑(如 spec.template.spec.containers[0].image)") git_value: Any = Field(None, description="Git YAML 中的值") actual_value: Any = Field(None, description="K8s 中的實際值") drift_level: DriftLevel = DriftLevel.MEDIUM is_allowlisted: bool = False # 是否為白名單欄位(靜默記錄) class DriftInterpretation(BaseModel): """Nemotron 意圖分析結果""" intent: DriftIntent = DriftIntent.UNKNOWN explanation: str = Field("", description="Nemotron 的意圖說明") risk: str = Field("MEDIUM", description="風險等級(HIGH/MEDIUM/LOW)") confidence: float = Field(0.0, ge=0.0, le=1.0, description="分析信心分數") class DriftReport(BaseModel): """單次漂移掃描的完整報告""" report_id: str = Field(..., description="報告 ID") scanned_at: datetime = Field(default_factory=now_taipei) namespace: str = Field(..., description="掃描的 namespace") # 漂移項目 items: list[DriftItem] = Field(default_factory=list) high_count: int = 0 medium_count: int = 0 info_count: int = 0 # Nemotron 分析 interpretation: DriftInterpretation | None = None # 處理狀態 status: DriftStatus = DriftStatus.PENDING # 觸發來源 triggered_by: str = Field("cron", description="觸發來源:cron / webhook / manual") # 時間軸 created_at: datetime = Field(default_factory=now_taipei) resolved_at: datetime | None = None @property def has_critical_drift(self) -> bool: """是否有需要立即處理的高嚴重度漂移""" return self.high_count > 0 @property def summary(self) -> str: """單行摘要""" parts = [] if self.high_count: parts.append(f"HIGH×{self.high_count}") if self.medium_count: parts.append(f"MEDIUM×{self.medium_count}") if self.info_count: parts.append(f"INFO×{self.info_count}") return ", ".join(parts) if parts else "無漂移" # ============================================================================= # API Request / Response # ============================================================================= class DriftScanRequest(BaseModel): """觸發漂移掃描 Request""" namespaces: list[str] = Field( default=["awoooi-prod"], description="要掃描的 namespace 列表", ) triggered_by: str = Field(default="api", description="觸發來源") class DriftScanResponse(BaseModel): """漂移掃描結果回應""" report_id: str summary: str high_count: int medium_count: int info_count: int has_critical_drift: bool interpretation: DriftInterpretation | None = None class DriftListResponse(BaseModel): """漂移報告列表回應""" items: list[DriftReport] total: int