Files
awoooi/apps/api/src/services/drift_analyzer.py
OG T 3455044457
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 38s
Type Sync Check / check-type-sync (push) Failing after 35s
feat(phase25): Nemotron 主動防禦三方向 P0+P1+P2 完整實作
P0 - DIAGNOSE Privacy-First Routing:
- ai_router.py: _local_fallback_chain [NEMOTRON→OLLAMA→REJECT]
- DIAGNOSE 意圖 override 改為 NEMOTRON (原 OLLAMA)
- DIAGNOSE fallback 使用 local-only 鏈,不觸碰雲端
- 全部失敗時 REJECT + Telegram 通知
- config.py: NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS=30, OLLAMA_DIAGNOSE_TIMEOUT_SECONDS=60
- nemotron.py: 根據 context[task_type] 選擇 timeout

P1 - Knowledge Auto-Harvesting:
- models/knowledge.py: EntryType.AUTO_RUNBOOK + ANTI_PATTERN + symptoms_hash
- EntryStatus.PUBLISHED (ANTI_PATTERN 直接發布,無需審核)
- models/playbook.py: SymptomPattern.compute_hash() (16字元確定性 hash)
- services/runbook_generator.py: NemotronRunbookGenerator (v1.1)
  - generate_runbook() → AUTO_RUNBOOK (DRAFT) + Telegram 審核 card
  - generate_anti_pattern() → ANTI_PATTERN (PUBLISHED) + Telegram 通知
  - 使用 nvidia.chat() (正確介面),Nemotron 超時時 Minimal fallback
- knowledge_service.py: check_anti_pattern(symptoms_hash, days=7)
- db/models.py: symptoms_hash VARCHAR(16) + ix_knowledge_symptoms_hash
- repositories/knowledge_repository.py: create() 支援 symptoms_hash + status
- auto_repair_service.py: anti_pattern_gate 在 decide() + runbook hook 在 execute()
- migrations/phase8_symptoms_hash.sql: ALTER TABLE + partial index + PUBLISHED constraint

P2 - Config Drift Detection:
- models/drift.py: DriftItem/DriftReport/DriftLevel/DriftIntent/DriftStatus
- services/drift_detector.py: GitStateReader + K8sStateReader + DriftDetector
- services/drift_analyzer.py: 白名單過濾 + DriftLevel 分級
- services/drift_interpreter.py: NemotronDriftInterpreter(意圖分析,不生成修復指令)
- services/drift_remediator.py: rollback(kubectl apply) + adopt(git push gitea)
- api/v1/drift.py: POST /scan, GET /reports, POST /rollback, POST /adopt
- migrations/phase9_drift_reports.sql: drift_reports 表
- k8s/drift-cronjob.yaml: 每小時自動掃描 CronJob

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-04 12:35:05 +08:00

107 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Drift Analyzer - Phase 25 P2 Config Drift Detection
=====================================================
職責白名單過濾、DriftLevel 分級
不解釋意圖,不生成修復指令
版本: v1.0
建立: 2026-04-04 (台北時區)
建立者: ogt (首席架構師設計) + Claude Code (實作)
"""
from __future__ import annotations
import structlog
from src.models.drift import DriftItem, DriftLevel, DriftReport, DriftStatus
logger = structlog.get_logger(__name__)
class DriftAnalyzer:
"""
分析 DriftReport決定哪些漂移需要告警、哪些靜默記錄
職責邊界:只分級,不解釋意圖,不生成修復指令
"""
def classify(self, report: DriftReport) -> DriftReport:
"""
根據 DriftLevel 分類漂移項目,更新計數
- INFO白名單→ 靜默記錄status 保持 PENDING
- MEDIUM → 需通知,但非緊急
- HIGH → 立即告警
Returns:
更新後的 DriftReportimmutable-friendly回傳新 report
"""
high_count = 0
medium_count = 0
info_count = 0
for item in report.items:
if item.drift_level == DriftLevel.HIGH:
high_count += 1
elif item.drift_level == DriftLevel.MEDIUM:
medium_count += 1
else:
info_count += 1
# 若只有 INFO 漂移,直接標記為 IGNORED不需人工處理
status = report.status
if high_count == 0 and medium_count == 0 and info_count > 0:
status = DriftStatus.IGNORED
logger.info(
"drift_all_allowlisted",
report_id=report.report_id,
info_count=info_count,
)
elif high_count == 0 and medium_count == 0:
status = DriftStatus.IGNORED
return report.model_copy(update={
"high_count": high_count,
"medium_count": medium_count,
"info_count": info_count,
"status": status,
})
def needs_alert(self, report: DriftReport) -> bool:
"""是否需要 Telegram 告警"""
return report.high_count > 0 or report.medium_count > 0
def format_diff_summary(self, report: DriftReport) -> str:
"""格式化漂移差異摘要(給 Telegram 用)"""
if not report.items:
return "無漂移"
lines = []
# HIGH 優先顯示
for item in sorted(report.items, key=lambda i: (i.drift_level != DriftLevel.HIGH, i.field_path)):
if item.is_allowlisted:
continue
level_label = "🔴" if item.drift_level == DriftLevel.HIGH else "🟡"
lines.append(
f"{level_label} {item.resource_kind}/{item.resource_name}.{item.field_path}\n"
f" Git: {str(item.git_value)[:60]}\n"
f" K8s: {str(item.actual_value)[:60]}"
)
if len(lines) >= 5: # 最多顯示 5 項,避免訊息過長
remaining = report.high_count + report.medium_count - len(lines)
if remaining > 0:
lines.append(f"... 另有 {remaining} 項漂移")
break
return "\n".join(lines) if lines else f"{report.info_count} 項白名單漂移(已靜默)"
_analyzer: DriftAnalyzer | None = None
def get_drift_analyzer() -> DriftAnalyzer:
global _analyzer
if _analyzer is None:
_analyzer = DriftAnalyzer()
return _analyzer