Files
awoooi/apps/api/src/services/drift_interpreter.py
OG T 3455044457
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 38s
Type Sync Check / check-type-sync (push) Failing after 35s
feat(phase25): Nemotron 主動防禦三方向 P0+P1+P2 完整實作
P0 - DIAGNOSE Privacy-First Routing:
- ai_router.py: _local_fallback_chain [NEMOTRON→OLLAMA→REJECT]
- DIAGNOSE 意圖 override 改為 NEMOTRON (原 OLLAMA)
- DIAGNOSE fallback 使用 local-only 鏈,不觸碰雲端
- 全部失敗時 REJECT + Telegram 通知
- config.py: NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS=30, OLLAMA_DIAGNOSE_TIMEOUT_SECONDS=60
- nemotron.py: 根據 context[task_type] 選擇 timeout

P1 - Knowledge Auto-Harvesting:
- models/knowledge.py: EntryType.AUTO_RUNBOOK + ANTI_PATTERN + symptoms_hash
- EntryStatus.PUBLISHED (ANTI_PATTERN 直接發布,無需審核)
- models/playbook.py: SymptomPattern.compute_hash() (16字元確定性 hash)
- services/runbook_generator.py: NemotronRunbookGenerator (v1.1)
  - generate_runbook() → AUTO_RUNBOOK (DRAFT) + Telegram 審核 card
  - generate_anti_pattern() → ANTI_PATTERN (PUBLISHED) + Telegram 通知
  - 使用 nvidia.chat() (正確介面),Nemotron 超時時 Minimal fallback
- knowledge_service.py: check_anti_pattern(symptoms_hash, days=7)
- db/models.py: symptoms_hash VARCHAR(16) + ix_knowledge_symptoms_hash
- repositories/knowledge_repository.py: create() 支援 symptoms_hash + status
- auto_repair_service.py: anti_pattern_gate 在 decide() + runbook hook 在 execute()
- migrations/phase8_symptoms_hash.sql: ALTER TABLE + partial index + PUBLISHED constraint

P2 - Config Drift Detection:
- models/drift.py: DriftItem/DriftReport/DriftLevel/DriftIntent/DriftStatus
- services/drift_detector.py: GitStateReader + K8sStateReader + DriftDetector
- services/drift_analyzer.py: 白名單過濾 + DriftLevel 分級
- services/drift_interpreter.py: NemotronDriftInterpreter(意圖分析,不生成修復指令)
- services/drift_remediator.py: rollback(kubectl apply) + adopt(git push gitea)
- api/v1/drift.py: POST /scan, GET /reports, POST /rollback, POST /adopt
- migrations/phase9_drift_reports.sql: drift_reports 表
- k8s/drift-cronjob.yaml: 每小時自動掃描 CronJob

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-04 12:35:05 +08:00

174 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Drift Interpreter - Phase 25 P2 Config Drift Detection
=======================================================
職責Nemotron 意圖分析(不生成修復指令)
只回答「這是人為操作Hotfix系統自動變更
設計邊界(核心原則):
- 只輸出意圖分析,不生成 kubectl 或 git 指令
- 確定性修復由 DriftRemediator 負責
- Nemotron 超時 → UNKNOWN不阻塞主流程
版本: v1.0
建立: 2026-04-04 (台北時區)
建立者: ogt (首席架構師設計) + Claude Code (實作)
"""
from __future__ import annotations
import asyncio
import json
from typing import TYPE_CHECKING
import structlog
from src.models.drift import DriftIntent, DriftInterpretation, DriftItem
if TYPE_CHECKING:
from src.models.drift import DriftReport
logger = structlog.get_logger(__name__)
_INTENT_PROMPT_TEMPLATE = """你是 AWOOOI GitOps 守門員,請分析以下 K8s 配置漂移的意圖。
## 漂移詳情
{diff_summary}
## 任務
判斷這次漂移最可能的原因:
- emergency_hotfix: 繞過 CI 的緊急修補image tag 改變但無對應 Git commit
- human_error: 誤操作(非預期的隨機欄位改變)
- automated_change: 系統自動變更HPA replicas, 系統注入的 annotation 等)
- unknown: 無法判斷
請以 JSON 回應:
{{
"intent": "emergency_hotfix|human_error|automated_change|unknown",
"explanation": "用繁體中文解釋你的判斷理由(一句話)",
"risk": "HIGH|MEDIUM|LOW",
"confidence": 0.0到1.0之間的數字
}}
只輸出 JSON不要任何額外說明。
"""
class NemotronDriftInterpreter:
"""
使用 Nemotron 分析漂移意圖
職責邊界:
✅ 輸出意圖分析
❌ 不生成修復指令
❌ 不直接呼叫 kubectl 或 git
"""
async def analyze(self, report: "DriftReport") -> DriftInterpretation:
"""
分析漂移意圖
Args:
report: 已分類的 DriftReport
Returns:
DriftInterpretation超時或失敗時回傳 UNKNOWN
"""
if not report.items or (report.high_count == 0 and report.medium_count == 0):
return DriftInterpretation(
intent=DriftIntent.UNKNOWN,
explanation="無顯著漂移,不需要意圖分析",
confidence=1.0,
)
diff_text = self._format_diff_for_prompt(report)
prompt = _INTENT_PROMPT_TEMPLATE.format(diff_summary=diff_text)
result = await self._call_nemotron(prompt)
return result
def _format_diff_for_prompt(self, report: "DriftReport") -> str:
"""格式化 diff 給 Nemotron 分析用"""
lines = []
for item in report.items[:10]: # 最多 10 項避免 token 過多
if item.is_allowlisted:
continue
lines.append(
f"- {item.resource_kind}/{item.resource_name}: "
f"{item.field_path} "
f"Git={str(item.git_value)[:40]}"
f"K8s={str(item.actual_value)[:40]}"
)
return "\n".join(lines) if lines else "(均為白名單欄位)"
async def _call_nemotron(self, prompt: str) -> DriftInterpretation:
"""呼叫 Nemotron 進行意圖分析"""
try:
from src.core.config import get_settings
from src.services.nvidia_provider import get_nvidia_provider
settings = get_settings()
nvidia = get_nvidia_provider()
response_text, success, _tokens, _cost = await asyncio.wait_for(
nvidia.chat(prompt=prompt),
timeout=getattr(settings, "NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS", 30),
)
if not success or not response_text:
return self._unknown_result("Nemotron 回傳空值")
return self._parse_response(response_text)
except asyncio.TimeoutError:
logger.warning("drift_nemotron_timeout")
return self._unknown_result("Nemotron 超時")
except Exception as e:
logger.warning("drift_nemotron_error", error=str(e))
return self._unknown_result(str(e))
def _parse_response(self, text: str) -> DriftInterpretation:
"""解析 Nemotron JSON 回應"""
try:
# 嘗試直接解析
data = json.loads(text)
except Exception:
try:
import re
match = re.search(r"```(?:json)?\s*([\s\S]+?)```", text)
if match:
data = json.loads(match.group(1))
else:
return self._unknown_result("無法解析 JSON")
except Exception:
return self._unknown_result("JSON 解析失敗")
try:
intent_str = data.get("intent", "unknown")
intent = DriftIntent(intent_str) if intent_str in DriftIntent._value2member_map_ else DriftIntent.UNKNOWN
return DriftInterpretation(
intent=intent,
explanation=data.get("explanation", ""),
risk=data.get("risk", "MEDIUM"),
confidence=float(data.get("confidence", 0.0)),
)
except Exception as e:
return self._unknown_result(f"模型解析失敗: {e}")
def _unknown_result(self, reason: str) -> DriftInterpretation:
return DriftInterpretation(
intent=DriftIntent.UNKNOWN,
explanation=f"意圖分析失敗:{reason}",
risk="MEDIUM",
confidence=0.0,
)
_interpreter: NemotronDriftInterpreter | None = None
def get_drift_interpreter() -> NemotronDriftInterpreter:
global _interpreter
if _interpreter is None:
_interpreter = NemotronDriftInterpreter()
return _interpreter