""" Drift Analyzer - Phase 25 P2 Config Drift Detection ===================================================== 職責:白名單過濾、DriftLevel 分級 不解釋意圖,不生成修復指令 版本: v1.0 建立: 2026-04-04 (台北時區) 建立者: ogt (首席架構師設計) + Claude Code (實作) """ from __future__ import annotations import structlog from src.models.drift import DriftItem, DriftLevel, DriftReport, DriftStatus logger = structlog.get_logger(__name__) class DriftAnalyzer: """ 分析 DriftReport,決定哪些漂移需要告警、哪些靜默記錄 職責邊界:只分級,不解釋意圖,不生成修復指令 """ def classify(self, report: DriftReport) -> DriftReport: """ 根據 DriftLevel 分類漂移項目,更新計數 - INFO(白名單)→ 靜默記錄,status 保持 PENDING - MEDIUM → 需通知,但非緊急 - HIGH → 立即告警 Returns: 更新後的 DriftReport(immutable-friendly:回傳新 report) """ high_count = 0 medium_count = 0 info_count = 0 for item in report.items: if item.drift_level == DriftLevel.HIGH: high_count += 1 elif item.drift_level == DriftLevel.MEDIUM: medium_count += 1 else: info_count += 1 # 若只有 INFO 漂移,直接標記為 IGNORED(不需人工處理) status = report.status if high_count == 0 and medium_count == 0 and info_count > 0: status = DriftStatus.IGNORED logger.info( "drift_all_allowlisted", report_id=report.report_id, info_count=info_count, ) elif high_count == 0 and medium_count == 0: status = DriftStatus.IGNORED return report.model_copy(update={ "high_count": high_count, "medium_count": medium_count, "info_count": info_count, "status": status, }) def needs_alert(self, report: DriftReport) -> bool: """是否需要 Telegram 告警""" return report.high_count > 0 or report.medium_count > 0 def format_diff_summary(self, report: DriftReport) -> str: """格式化漂移差異摘要(給 Telegram 用)""" if not report.items: return "無漂移" lines = [] # HIGH 優先顯示 for item in sorted(report.items, key=lambda i: (i.drift_level != DriftLevel.HIGH, i.field_path)): if item.is_allowlisted: continue level_label = "🔴" if item.drift_level == DriftLevel.HIGH else "🟡" lines.append( f"{level_label} {item.resource_kind}/{item.resource_name}.{item.field_path}\n" f" Git: {str(item.git_value)[:60]}\n" f" K8s: {str(item.actual_value)[:60]}" ) if len(lines) >= 5: # 最多顯示 5 項,避免訊息過長 remaining = report.high_count + report.medium_count - len(lines) if remaining > 0: lines.append(f"... 另有 {remaining} 項漂移") break return "\n".join(lines) if lines else f"共 {report.info_count} 項白名單漂移(已靜默)" _analyzer: DriftAnalyzer | None = None def get_drift_analyzer() -> DriftAnalyzer: global _analyzer if _analyzer is None: _analyzer = DriftAnalyzer() return _analyzer