diff --git a/apps/api/migrations/phase8_symptoms_hash.sql b/apps/api/migrations/phase8_symptoms_hash.sql
new file mode 100644
index 00000000..73e184e5
--- /dev/null
+++ b/apps/api/migrations/phase8_symptoms_hash.sql
@@ -0,0 +1,48 @@
+-- Phase 25 P1: Knowledge Auto-Harvesting — symptoms_hash 欄位
+-- 用於 Anti-Pattern 閉環攔截的確定性症狀 hash
+-- 建立時間: 2026-04-04 (台北時區)
+-- 建立者: Claude Code (Phase 25 P1)
+--
+-- 執行方式: psql -h 192.168.0.188 -U awoooi -d awoooi -f phase8_symptoms_hash.sql
+
+-- 1. knowledge_entries 表新增 symptoms_hash 欄位
+ALTER TABLE knowledge_entries
+ ADD COLUMN IF NOT EXISTS symptoms_hash VARCHAR(16);
+
+-- 2. 建立 index 加速 Anti-Pattern 閘門查詢
+-- 查詢條件: entry_type='anti_pattern' AND symptoms_hash=:hash AND created_at>=:cutoff
+CREATE INDEX IF NOT EXISTS idx_knowledge_anti_pattern_hash
+ ON knowledge_entries (entry_type, symptoms_hash, created_at)
+ WHERE entry_type = 'anti_pattern' AND symptoms_hash IS NOT NULL;
+
+-- 3. EntryStatus 新增 PUBLISHED(用於 ANTI_PATTERN 直接發布)
+-- PostgreSQL CHECK constraint 需要重建(若有的話)
+-- 若無 constraint,PostgreSQL 的 VARCHAR 欄位可直接存入任意值,無需 ALTER。
+-- 確認 status 欄位是否有 CHECK constraint:
+-- SELECT conname, consrc FROM pg_constraint
+-- WHERE conrelid = 'knowledge_entries'::regclass AND contype = 'c';
+
+-- 若有 CHECK constraint(如 status IN ('draft', 'review', 'approved', 'archived')),
+-- 需執行以下(請先確認 constraint 名稱):
+-- ALTER TABLE knowledge_entries DROP CONSTRAINT IF EXISTS knowledge_entries_status_check;
+-- ALTER TABLE knowledge_entries ADD CONSTRAINT knowledge_entries_status_check
+-- CHECK (status IN ('draft', 'review', 'approved', 'archived', 'published'));
+
+-- 安全執行版本(自動處理 CHECK constraint):
+DO $$
+DECLARE
+ v_conname text;
+BEGIN
+ SELECT conname INTO v_conname
+ FROM pg_constraint
+ WHERE conrelid = 'knowledge_entries'::regclass AND contype = 'c' AND conname LIKE '%status%';
+
+ IF v_conname IS NOT NULL THEN
+ EXECUTE format('ALTER TABLE knowledge_entries DROP CONSTRAINT %I', v_conname);
+ ALTER TABLE knowledge_entries ADD CONSTRAINT knowledge_entries_status_check
+ CHECK (status IN ('draft', 'review', 'approved', 'archived', 'published'));
+ RAISE NOTICE 'Updated status CHECK constraint: % → added published', v_conname;
+ ELSE
+ RAISE NOTICE 'No status CHECK constraint found, skipping';
+ END IF;
+END $$;
diff --git a/apps/api/migrations/phase9_drift_reports.sql b/apps/api/migrations/phase9_drift_reports.sql
new file mode 100644
index 00000000..47810bed
--- /dev/null
+++ b/apps/api/migrations/phase9_drift_reports.sql
@@ -0,0 +1,54 @@
+-- Phase 25 P2: Config Drift Detection — drift_reports 資料表
+-- 建立時間: 2026-04-04 (台北時區)
+-- 建立者: Claude Code (Phase 25 P2)
+-- 對應模型: apps/api/src/models/drift.py
+-- 對應設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向三
+--
+-- 執行方式: psql -h 192.168.0.188 -U awoooi -d awoooi -f phase9_drift_reports.sql
+
+CREATE TABLE IF NOT EXISTS drift_reports (
+ -- 識別
+ report_id VARCHAR(32) PRIMARY KEY,
+
+ -- 掃描資訊
+ namespace VARCHAR(128) NOT NULL,
+ triggered_by VARCHAR(64) NOT NULL DEFAULT 'cron', -- cron / webhook / api
+ scanned_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+
+ -- 計數(非正規化,避免每次 JOIN)
+ high_count INT NOT NULL DEFAULT 0,
+ medium_count INT NOT NULL DEFAULT 0,
+ info_count INT NOT NULL DEFAULT 0,
+
+ -- 漂移項目(JSONB 列表)
+ items JSONB NOT NULL DEFAULT '[]',
+
+ -- Nemotron 意圖分析
+ interpretation JSONB, -- DriftInterpretation,可為 NULL(尚未分析)
+
+ -- 處理狀態
+ status VARCHAR(32) NOT NULL DEFAULT 'pending',
+ -- pending / acknowledged / rolled_back / adopted / ignored
+
+ -- 時間軸
+ created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ resolved_at TIMESTAMPTZ
+);
+
+-- 索引
+CREATE INDEX IF NOT EXISTS idx_drift_reports_namespace
+ ON drift_reports(namespace);
+
+CREATE INDEX IF NOT EXISTS idx_drift_reports_status
+ ON drift_reports(status);
+
+CREATE INDEX IF NOT EXISTS idx_drift_reports_created_at
+ ON drift_reports(created_at DESC);
+
+CREATE INDEX IF NOT EXISTS idx_drift_reports_high_count
+ ON drift_reports(high_count)
+ WHERE high_count > 0;
+
+-- 說明:
+-- 目前 API 使用 in-memory dict 暫存,此表供未來持久化使用
+-- 啟用持久化後,需在 drift.py 的 _recent_reports 操作改為 DB 寫入
diff --git a/apps/api/src/api/v1/drift.py b/apps/api/src/api/v1/drift.py
new file mode 100644
index 00000000..03e86d6d
--- /dev/null
+++ b/apps/api/src/api/v1/drift.py
@@ -0,0 +1,215 @@
+"""
+Config Drift Detection API Router - Phase 25 P2
+================================================
+GitOps 守門員 HTTP 端點
+
+leWOOOgo 積木化原則:
+- Router 層只做 HTTP 轉發
+- 不直接存取 Redis/DB
+- 業務邏輯委託給 Service 層
+
+版本: v1.0
+建立: 2026-04-04 (台北時區)
+建立者: Claude Code (Phase 25 P2)
+"""
+
+from fastapi import APIRouter, BackgroundTasks, HTTPException
+
+from src.models.drift import (
+ DriftListResponse,
+ DriftReport,
+ DriftScanRequest,
+ DriftScanResponse,
+)
+from src.services.drift_analyzer import get_drift_analyzer
+from src.services.drift_detector import get_drift_detector
+from src.services.drift_interpreter import get_drift_interpreter
+from src.services.drift_remediator import get_drift_remediator
+
+router = APIRouter(prefix="/drift", tags=["drift"])
+
+# 本次 session 的漂移報告暫存(prod 應存 DB)
+_recent_reports: dict[str, DriftReport] = {}
+
+
+@router.post("/scan", response_model=DriftScanResponse, summary="觸發漂移掃描")
+async def trigger_drift_scan(
+ request: DriftScanRequest,
+ background_tasks: BackgroundTasks,
+) -> DriftScanResponse:
+ """
+ 觸發 Config Drift 掃描
+
+ - 比對 Git YAML vs K8s 實際狀態
+ - Nemotron 分析漂移意圖
+ - 高/中嚴重度漂移自動推送 Telegram
+
+ 適合由 Gitea CD Webhook 或手動呼叫觸發
+ """
+ detector = get_drift_detector()
+ analyzer = get_drift_analyzer()
+ interpreter = get_drift_interpreter()
+
+ all_items = []
+ last_report: DriftReport | None = None
+
+ for namespace in request.namespaces:
+ raw_report = await detector.scan(namespace, triggered_by=request.triggered_by)
+ classified_report = analyzer.classify(raw_report)
+ all_items.extend(classified_report.items)
+
+ if analyzer.needs_alert(classified_report):
+ # Nemotron 意圖分析(背景執行,避免阻塞)
+ background_tasks.add_task(
+ _analyze_and_notify, classified_report
+ )
+ last_report = classified_report
+
+ # 暫存(最多 50 筆)
+ _recent_reports[classified_report.report_id] = classified_report
+ if len(_recent_reports) > 50:
+ oldest_key = next(iter(_recent_reports))
+ del _recent_reports[oldest_key]
+
+ # 若多 namespace,彙總第一個 report 的計數
+ if last_report:
+ return DriftScanResponse(
+ report_id=last_report.report_id,
+ summary=last_report.summary,
+ high_count=last_report.high_count,
+ medium_count=last_report.medium_count,
+ info_count=last_report.info_count,
+ has_critical_drift=last_report.has_critical_drift,
+ )
+
+ return DriftScanResponse(
+ report_id="no-drift",
+ summary="無漂移",
+ high_count=0,
+ medium_count=0,
+ info_count=0,
+ has_critical_drift=False,
+ )
+
+
+@router.get("/reports", response_model=DriftListResponse, summary="列出最近漂移報告")
+async def list_drift_reports() -> DriftListResponse:
+ """列出最近 50 筆漂移報告(倒序)"""
+ items = list(reversed(list(_recent_reports.values())))
+ return DriftListResponse(items=items, total=len(items))
+
+
+@router.post("/reports/{report_id}/rollback", summary="覆蓋回 Git 狀態")
+async def rollback_drift(report_id: str) -> dict:
+ """
+ 將 K8s 狀態覆蓋回 Git YAML(kubectl apply)
+
+ 人工確認後才執行,DriftRemediator 負責確定性修復
+ """
+ report = _recent_reports.get(report_id)
+ if not report:
+ raise HTTPException(status_code=404, detail=f"Report {report_id} not found")
+
+ remediator = get_drift_remediator()
+ result = await remediator.rollback(report)
+ return result
+
+
+@router.post("/reports/{report_id}/adopt", summary="承認變更並更新 Git")
+async def adopt_drift(report_id: str) -> dict:
+ """
+ 承認 K8s 漂移,更新 Git 使其與實際狀態一致
+
+ 人工確認後才執行,git commit + push gitea main
+ """
+ report = _recent_reports.get(report_id)
+ if not report:
+ raise HTTPException(status_code=404, detail=f"Report {report_id} not found")
+
+ remediator = get_drift_remediator()
+ result = await remediator.adopt(report)
+ return result
+
+
+# =============================================================================
+# Internal endpoint(供 K8s CronJob 呼叫)
+# =============================================================================
+
+@router.post("/internal/scan", include_in_schema=False, summary="CronJob 觸發掃描")
+async def internal_scan(background_tasks: BackgroundTasks) -> dict:
+ """內部 CronJob 端點,每小時自動掃描 awoooi-prod"""
+ from src.core.config import get_settings
+ settings = get_settings()
+ namespaces = getattr(settings, "DRIFT_SCAN_NAMESPACES", "awoooi-prod").split(",")
+
+ background_tasks.add_task(
+ _run_full_scan,
+ [ns.strip() for ns in namespaces],
+ )
+ return {"status": "scan_triggered", "namespaces": namespaces}
+
+
+# =============================================================================
+# Background helpers
+# =============================================================================
+
+async def _analyze_and_notify(report: DriftReport) -> None:
+ """背景:Nemotron 意圖分析 + Telegram 推送"""
+ try:
+ interpreter = get_drift_interpreter()
+ analyzer = get_drift_analyzer()
+
+ interpretation = await interpreter.analyze(report)
+ updated = report.model_copy(update={"interpretation": interpretation})
+ _recent_reports[report.report_id] = updated
+
+ diff_summary = analyzer.format_diff_summary(report)
+ intent_label = {
+ "emergency_hotfix": "🚨 緊急 Hotfix",
+ "human_error": "⚠️ 人為誤操作",
+ "automated_change": "🤖 系統自動變更",
+ "unknown": "❓ 意圖不明",
+ }.get(interpretation.intent.value, "❓ 意圖不明")
+
+ try:
+ from src.services.telegram_gateway import get_telegram_gateway
+ tg = get_telegram_gateway()
+ await tg.send_text(
+ f"🔍 Config Drift 偵測\n"
+ f"Namespace: {report.namespace}\n"
+ f"嚴重度: HIGH×{report.high_count} MEDIUM×{report.medium_count}\n\n"
+ f"意圖分析: {intent_label}\n"
+ f"{interpretation.explanation}\n"
+ f"信心: {interpretation.confidence:.0%}\n\n"
+ f"漂移詳情:\n{diff_summary}\n\n"
+ f"Report ID: {report.report_id}\n"
+ f"POST /api/v1/drift/reports/{report.report_id}/rollback — 覆蓋回 Git\n"
+ f"POST /api/v1/drift/reports/{report.report_id}/adopt — 承認變更"
+ )
+ except Exception as e:
+ import structlog
+ structlog.get_logger(__name__).warning("drift_telegram_failed", error=str(e))
+
+ except Exception as e:
+ import structlog
+ structlog.get_logger(__name__).error("drift_analyze_notify_failed", error=str(e))
+
+
+async def _run_full_scan(namespaces: list[str]) -> None:
+ """背景:完整漂移掃描"""
+ detector = get_drift_detector()
+ analyzer = get_drift_analyzer()
+
+ for namespace in namespaces:
+ try:
+ raw = await detector.scan(namespace, triggered_by="cron")
+ classified = analyzer.classify(raw)
+ _recent_reports[classified.report_id] = classified
+
+ if analyzer.needs_alert(classified):
+ await _analyze_and_notify(classified)
+ except Exception as e:
+ import structlog
+ structlog.get_logger(__name__).error(
+ "full_scan_namespace_failed", namespace=namespace, error=str(e)
+ )
diff --git a/apps/api/src/core/config.py b/apps/api/src/core/config.py
index 6cd9bbfc..1346d185 100644
--- a/apps/api/src/core/config.py
+++ b/apps/api/src/core/config.py
@@ -84,6 +84,15 @@ class Settings(BaseSettings):
default=True,
description="Phase 22: True=異步更新 (先推 OpenClaw), False=同步等待",
)
+ # 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE Privacy-First 專用 timeout
+ NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS: int = Field(
+ default=30,
+ description="Phase 25 P0: DIAGNOSE 任務 Nemotron timeout (秒),比 Tool Calling 短",
+ )
+ OLLAMA_DIAGNOSE_TIMEOUT_SECONDS: int = Field(
+ default=60,
+ description="Phase 25 P0: DIAGNOSE 任務 Ollama backup timeout (秒),Ollama 較慢",
+ )
# ==========================================================================
# CORS - 嚴格白名單 (無 UAT, 無 wildcard)
diff --git a/apps/api/src/db/models.py b/apps/api/src/db/models.py
index b2dd1987..7c2ff5b3 100644
--- a/apps/api/src/db/models.py
+++ b/apps/api/src/db/models.py
@@ -530,6 +530,12 @@ class KnowledgeEntryRecord(Base):
nullable=True,
comment="關聯 Playbook Redis Key",
)
+ # 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環攔截用症狀 hash (SymptomPattern.compute_hash())
+ symptoms_hash: Mapped[str | None] = mapped_column(
+ String(16),
+ nullable=True,
+ comment="症狀模式 hash (16字元 SHA256 前綴),Anti-Pattern 閉環攔截使用",
+ )
# Metrics
view_count: Mapped[int] = mapped_column(
@@ -556,4 +562,6 @@ class KnowledgeEntryRecord(Base):
Index("ix_knowledge_category", "category"),
Index("ix_knowledge_status", "status"),
Index("ix_knowledge_created_at", "created_at"),
+ # 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 快速查詢
+ Index("ix_knowledge_symptoms_hash", "symptoms_hash"),
)
diff --git a/apps/api/src/main.py b/apps/api/src/main.py
index c806f682..dd1166bf 100644
--- a/apps/api/src/main.py
+++ b/apps/api/src/main.py
@@ -57,6 +57,7 @@ from src.api.v1 import (
from src.api.v1 import (
signoz_webhook as signoz_webhook_v1, # Phase 21: SignOz → Telegram (ADR-037)
)
+from src.api.v1 import drift as drift_v1 # Phase 25 P2: Config Drift Detection
from src.api.v1 import monitoring as monitoring_v1 # 2026-04-03: 監控工具狀態
from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics
from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway
@@ -422,6 +423,9 @@ app.include_router(
app.include_router(
auto_repair_v1.router, prefix="/api/v1", tags=["Auto Repair"]
) # #8: 自動升級決策
+app.include_router(
+ drift_v1.router, prefix="/api/v1", tags=["Drift Detection"]
+) # Phase 25 P2: Config Drift Detection
app.include_router(
errors_v1.router, prefix="/api/v1", tags=["Errors"]
) # #40: Sentry 錯誤 BFF API
diff --git a/apps/api/src/models/drift.py b/apps/api/src/models/drift.py
new file mode 100644
index 00000000..e4d354a5
--- /dev/null
+++ b/apps/api/src/models/drift.py
@@ -0,0 +1,155 @@
+"""
+Config Drift Detection Models - Phase 25 P2
+============================================
+GitOps 守門員:偵測 K8s 實際狀態 vs Git YAML 的漂移
+
+設計原則:
+- DriftDetector: 只比對,輸出結構化 Diff,不判斷嚴重性
+- DriftAnalyzer: 白名單過濾、DriftLevel 分級,不解釋意圖
+- NemotronDriftInterpreter: 意圖分析(不生成修復指令)
+- DriftRemediator: 確定性修復(kubectl apply / git push),不使用 AI 判斷
+
+版本: v1.0
+建立: 2026-04-04 (台北時區)
+建立者: ogt (首席架構師設計) + Claude Code (實作)
+關聯設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向三
+關聯 ADR: 待起草 ADR-057
+"""
+
+from __future__ import annotations
+
+from datetime import datetime
+from enum import Enum
+from typing import Any
+
+from pydantic import BaseModel, Field
+
+from src.utils.timezone import now_taipei
+
+
+# =============================================================================
+# Enums
+# =============================================================================
+
+
+class DriftLevel(str, Enum):
+ """漂移嚴重度分級"""
+ INFO = "info" # 白名單欄位(replicas, resources)→ 靜默記錄
+ MEDIUM = "medium" # 非關鍵欄位 → Telegram 通知,無需緊急處理
+ HIGH = "high" # 關鍵欄位(image, env, ports)→ 立即通知,需確認
+
+
+class DriftIntent(str, Enum):
+ """Nemotron 意圖分析結果"""
+ EMERGENCY_HOTFIX = "emergency_hotfix" # 繞過 CI 的緊急修補
+ HUMAN_ERROR = "human_error" # 誤操作
+ AUTOMATED_CHANGE = "automated_change" # 系統自動變更(HPA 等)
+ UNKNOWN = "unknown" # 無法判斷
+
+
+class DriftStatus(str, Enum):
+ """漂移報告處理狀態"""
+ PENDING = "pending" # 待處理
+ ACKNOWLEDGED = "acknowledged" # 已知悉(不需要處理)
+ ROLLED_BACK = "rolled_back" # 已覆蓋回 Git 狀態
+ ADOPTED = "adopted" # 已承認(Git 已更新)
+ IGNORED = "ignored" # 白名單忽略
+
+
+# =============================================================================
+# Core Models
+# =============================================================================
+
+
+class DriftItem(BaseModel):
+ """單一欄位的漂移記錄"""
+ resource_kind: str = Field(..., description="K8s 資源類型(Deployment, Service 等)")
+ resource_name: str = Field(..., description="K8s 資源名稱")
+ namespace: str = Field(..., description="K8s namespace")
+ field_path: str = Field(..., description="欄位路徑(如 spec.template.spec.containers[0].image)")
+ git_value: Any = Field(None, description="Git YAML 中的值")
+ actual_value: Any = Field(None, description="K8s 中的實際值")
+ drift_level: DriftLevel = DriftLevel.MEDIUM
+ is_allowlisted: bool = False # 是否為白名單欄位(靜默記錄)
+
+
+class DriftInterpretation(BaseModel):
+ """Nemotron 意圖分析結果"""
+ intent: DriftIntent = DriftIntent.UNKNOWN
+ explanation: str = Field("", description="Nemotron 的意圖說明")
+ risk: str = Field("MEDIUM", description="風險等級(HIGH/MEDIUM/LOW)")
+ confidence: float = Field(0.0, ge=0.0, le=1.0, description="分析信心分數")
+
+
+class DriftReport(BaseModel):
+ """單次漂移掃描的完整報告"""
+ report_id: str = Field(..., description="報告 ID")
+ scanned_at: datetime = Field(default_factory=now_taipei)
+ namespace: str = Field(..., description="掃描的 namespace")
+
+ # 漂移項目
+ items: list[DriftItem] = Field(default_factory=list)
+ high_count: int = 0
+ medium_count: int = 0
+ info_count: int = 0
+
+ # Nemotron 分析
+ interpretation: DriftInterpretation | None = None
+
+ # 處理狀態
+ status: DriftStatus = DriftStatus.PENDING
+
+ # 觸發來源
+ triggered_by: str = Field("cron", description="觸發來源:cron / webhook / manual")
+
+ # 時間軸
+ created_at: datetime = Field(default_factory=now_taipei)
+ resolved_at: datetime | None = None
+
+ @property
+ def has_critical_drift(self) -> bool:
+ """是否有需要立即處理的高嚴重度漂移"""
+ return self.high_count > 0
+
+ @property
+ def summary(self) -> str:
+ """單行摘要"""
+ parts = []
+ if self.high_count:
+ parts.append(f"HIGH×{self.high_count}")
+ if self.medium_count:
+ parts.append(f"MEDIUM×{self.medium_count}")
+ if self.info_count:
+ parts.append(f"INFO×{self.info_count}")
+ return ", ".join(parts) if parts else "無漂移"
+
+
+# =============================================================================
+# API Request / Response
+# =============================================================================
+
+
+class DriftScanRequest(BaseModel):
+ """觸發漂移掃描 Request"""
+ namespaces: list[str] = Field(
+ default=["awoooi-prod"],
+ description="要掃描的 namespace 列表",
+ )
+ triggered_by: str = Field(default="api", description="觸發來源")
+
+
+class DriftScanResponse(BaseModel):
+ """漂移掃描結果回應"""
+ report_id: str
+ summary: str
+ high_count: int
+ medium_count: int
+ info_count: int
+ has_critical_drift: bool
+ interpretation: DriftInterpretation | None = None
+
+
+class DriftListResponse(BaseModel):
+ """漂移報告列表回應"""
+ items: list[DriftReport]
+ total: int
diff --git a/apps/api/src/models/knowledge.py b/apps/api/src/models/knowledge.py
index bfece7c8..6d1136d4 100644
--- a/apps/api/src/models/knowledge.py
+++ b/apps/api/src/models/knowledge.py
@@ -33,6 +33,9 @@ class EntryType(str, Enum):
RUNBOOK = "runbook" # 手動建立的操作手冊
BEST_PRACTICE = "best_practice" # 最佳實踐文章
POSTMORTEM = "postmortem" # 事後分析報告
+ # 2026-04-04 ogt: Phase 25 P1 — Knowledge Auto-Harvesting 新增類型
+ AUTO_RUNBOOK = "auto_runbook" # Nemotron 自動生成的 Runbook(DRAFT 待人工審核)
+ ANTI_PATTERN = "anti_pattern" # 修復失敗案例(直接 PUBLISHED,阻斷後續重蹈覆轍)
class EntrySource(str, Enum):
@@ -47,6 +50,8 @@ class EntryStatus(str, Enum):
REVIEW = "review" # 審核中
APPROVED = "approved" # 已批准
ARCHIVED = "archived" # 已封存
+ # 2026-04-04 Claude Code: Phase 25 P1 — ANTI_PATTERN 直接發布,無需審核
+ PUBLISHED = "published" # 已發布(ANTI_PATTERN 用,無需人工審核)
# =============================================================================
@@ -61,8 +66,11 @@ class KnowledgeEntryCreate(BaseModel):
category: str = Field(..., min_length=1, max_length=100)
tags: list[str] = Field(default_factory=list)
source: EntrySource = EntrySource.HUMAN
+ status: EntryStatus = EntryStatus.DRAFT
related_incident_id: str | None = None
related_playbook_id: str | None = None
+ # 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環用症狀 hash
+ symptoms_hash: str | None = None
created_by: str | None = None
@@ -88,6 +96,8 @@ class KnowledgeEntry(BaseModel):
status: EntryStatus = EntryStatus.DRAFT
related_incident_id: str | None = None
related_playbook_id: str | None = None
+ # 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環攔截用的症狀 hash(SymptomPattern.compute_hash())
+ symptoms_hash: str | None = None
view_count: int = 0
created_by: str | None = None
created_at: datetime = Field(default_factory=now_taipei)
diff --git a/apps/api/src/models/playbook.py b/apps/api/src/models/playbook.py
index d42b6a65..e2a35ad7 100644
--- a/apps/api/src/models/playbook.py
+++ b/apps/api/src/models/playbook.py
@@ -97,6 +97,21 @@ class SymptomPattern(BaseModel):
model_config = ConfigDict(extra="ignore")
+ def compute_hash(self) -> str:
+ """
+ 2026-04-04 Claude Code: Phase 25 P1 — Anti-Pattern 閉環攔截用
+ 確定性 hash:alert_names + affected_services + label_patterns
+ 目的:O(1) 精確比對,避免純語意搜尋的模糊性
+ """
+ import hashlib
+ import json
+ key = (
+ "|".join(sorted(self.alert_names)) + "||"
+ + "|".join(sorted(self.affected_services)) + "||"
+ + json.dumps(self.label_patterns, sort_keys=True)
+ )
+ return hashlib.sha256(key.encode()).hexdigest()[:16]
+
class RepairStep(BaseModel):
"""
diff --git a/apps/api/src/repositories/knowledge_repository.py b/apps/api/src/repositories/knowledge_repository.py
index 0fbb0841..f40fe865 100644
--- a/apps/api/src/repositories/knowledge_repository.py
+++ b/apps/api/src/repositories/knowledge_repository.py
@@ -45,8 +45,12 @@ class KnowledgeDBRepository:
category=data.category,
tags=data.tags,
source=data.source,
+ # 2026-04-04 ogt: Phase 25 P1 — 支援指定 status(ANTI_PATTERN 直接 PUBLISHED)
+ status=data.status,
related_incident_id=data.related_incident_id,
related_playbook_id=data.related_playbook_id,
+ # 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環用症狀 hash
+ symptoms_hash=data.symptoms_hash,
created_by=data.created_by,
)
self.db.add(record)
@@ -268,6 +272,7 @@ class KnowledgeDBRepository:
status=record.status,
related_incident_id=record.related_incident_id,
related_playbook_id=record.related_playbook_id,
+ symptoms_hash=getattr(record, "symptoms_hash", None),
view_count=record.view_count,
created_by=record.created_by,
created_at=record.created_at,
diff --git a/apps/api/src/services/ai_providers/nemotron.py b/apps/api/src/services/ai_providers/nemotron.py
index 18168eaf..accf328d 100644
--- a/apps/api/src/services/ai_providers/nemotron.py
+++ b/apps/api/src/services/ai_providers/nemotron.py
@@ -160,7 +160,13 @@ class NemotronProvider:
"""
try:
- timeout = getattr(settings, "NEMOTRON_TIMEOUT_SECONDS", 30)
+ # 2026-04-04 ogt: Phase 25 P0 — 根據 task_type 選擇 timeout
+ # DIAGNOSE 用較短 timeout(30s),避免拖累整體 AutoRepair 流程
+ task_type = context.get("task_type", "")
+ if task_type == "diagnose":
+ timeout = getattr(settings, "NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS", 30)
+ else:
+ timeout = getattr(settings, "NEMOTRON_TIMEOUT_SECONDS", 45)
nvidia = self._get_nvidia()
result = await asyncio.wait_for(
diff --git a/apps/api/src/services/ai_router.py b/apps/api/src/services/ai_router.py
index f4d14206..306161ce 100644
--- a/apps/api/src/services/ai_router.py
+++ b/apps/api/src/services/ai_router.py
@@ -30,6 +30,7 @@ AI Router - Phase 13.3 #87
| v2.0 | 2026-03-26 | Claude Code | 支援 IntentResult + 新意圖類型 |
| v3.0 | 2026-03-26 | Claude Code | Phase 13.3 #87 完整路由決策矩陣 |
| v4.0 | 2026-04-02 | ogt (首席架構師) | Phase 24 AIProvider Registry + Executor; C1 Langfuse Trace; C2 AIRouter.route(); C3 型別 typo; I4 Protocol close |
+| v4.1 | 2026-04-04 | ogt (首席架構師) | Phase 25 P0: DIAGNOSE Privacy-First — _local_fallback_chain; DIAGNOSE→NEMOTRON; REJECT+Telegram |
"""
from __future__ import annotations
@@ -246,13 +247,22 @@ class AIRouter:
(AIProviderEnum.CLAUDE, self._claude_default),
]
+ # 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE/FORCE_LOCAL 專用鏈
+ # 隱私邊界:絕不包含任何雲端 Provider,到 OLLAMA 為止
+ self._local_fallback_chain: list[tuple[AIProviderEnum, str]] = [
+ (AIProviderEnum.NEMOTRON, self._nemotron_default), # NIM 188,主力(零費用,高能力)
+ (AIProviderEnum.OLLAMA, self._ollama_summary), # Ollama 188,備援(慢但可靠)
+ ]
+
# 意圖對應 Provider 強制覆寫 (None = 依複雜度決定)
self._intent_provider_overrides: dict[IntentType, AIProviderEnum | None] = {
# 四大核心意圖
IntentType.RESTART: None, # 依複雜度
IntentType.SCALE: None, # 依複雜度
IntentType.CONFIG: None, # 依複雜度 (但 HIGH 會升級)
- IntentType.DIAGNOSE: AIProviderEnum.OLLAMA, # 診斷優先本地 (隱私)
+ # 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE 改為 NEMOTRON (NIM 188)
+ # 原因: 零費用本地 NIM + 高能力; 搭配 _local_fallback_chain 保證不觸碰雲端
+ IntentType.DIAGNOSE: AIProviderEnum.NEMOTRON, # 診斷優先 NIM 本地 (隱私)
# 輔助意圖
IntentType.DELETE: AIProviderEnum.CLAUDE, # CRITICAL → 強制 Claude
IntentType.ROLLBACK: None, # 依複雜度
@@ -308,7 +318,11 @@ class AIRouter:
)
# Step 4: 建立 Fallback 鏈
- fallback_chain = self._build_fallback_chain(provider)
+ # 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE 使用 local-only 鏈(隱私邊界)
+ if intent == IntentType.DIAGNOSE:
+ fallback_chain = [fc for fc in self._local_fallback_chain if fc[0] != provider]
+ else:
+ fallback_chain = self._build_fallback_chain(provider)
# Step 5: 計算延遲預算
latency_budget = PROVIDER_LATENCY_BUDGET.get(provider, 30000)
@@ -398,10 +412,11 @@ class AIRouter:
provider_override = self._intent_provider_overrides.get(intent)
if provider_override is not None:
provider = provider_override
- # 2026-04-03 ogt: DIAGNOSE/ALERT_TRIAGE 用 summary model (llama3.2:3b)
+ # 2026-04-03 ogt: ALERT_TRIAGE/QUERY 用 Ollama summary model (llama3.2:3b)
# 避免 qwen2.5:7b-instruct 90秒 timeout 導致全鏈路失敗 (Phase 24 A選項)
+ # 2026-04-04 ogt: DIAGNOSE 已改為 NEMOTRON,不走這條分支
if provider == AIProviderEnum.OLLAMA and intent in (
- IntentType.DIAGNOSE, IntentType.ALERT_TRIAGE, IntentType.QUERY
+ IntentType.ALERT_TRIAGE, IntentType.QUERY
):
model = self._ollama_summary
else:
@@ -951,6 +966,29 @@ class AIRouterExecutor:
_lf_trace_ctx.__exit__(None, None, None)
except Exception:
pass
+
+ # 2026-04-04 ogt: Phase 25 P0 — require_local 全部失敗時 Telegram 通知(隱私邊界)
+ if require_local:
+ try:
+ from src.services.telegram_gateway import get_telegram_gateway
+ tg = get_telegram_gateway()
+ import asyncio as _asyncio
+ _asyncio.create_task(
+ tg.send_text(
+ "⚠️ DIAGNOSE 本地 Provider 不可用\n"
+ f"已嘗試: {', '.join(provider_order)}\n"
+ "需要人工介入,雲端 Provider 不會被呼叫(隱私邊界)。"
+ )
+ )
+ except Exception as _tg_e:
+ logger.warning("diagnose_reject_telegram_failed", error=str(_tg_e))
+ return AIResult(
+ raw_response="",
+ success=False,
+ provider="none",
+ error="local_providers_unavailable",
+ )
+
return AIResult(
raw_response="",
success=False,
diff --git a/apps/api/src/services/auto_repair_service.py b/apps/api/src/services/auto_repair_service.py
index 68ae7cae..3854b2bf 100644
--- a/apps/api/src/services/auto_repair_service.py
+++ b/apps/api/src/services/auto_repair_service.py
@@ -143,6 +143,9 @@ class AutoRepairService:
# 2026-04-01 ogt: 注入 cooldown_checker 支援測試隔離 (DI 原則)
self._playbook_service = playbook_service or get_playbook_service()
self._cooldown_checker = cooldown_checker or check_global_repair_cooldown
+ # 2026-04-04 Claude Code: Phase 25 P1 — 持有 runbook_generator task 引用,防 GC 回收
+ import asyncio
+ self._pending_tasks: set[asyncio.Task] = set()
async def evaluate_auto_repair(
self,
@@ -196,6 +199,33 @@ class AutoRepairService:
# 2. 提取症狀模式
symptoms = self._extract_symptoms(incident)
+ # 2.1 2026-04-04 Claude Code: Phase 25 P1 — Anti-Pattern 閘門
+ # 根據確定性 hash 比對近 7 天失敗案例,避免 AI 在同一個坑重複摔倒
+ try:
+ from src.services.knowledge_service import get_knowledge_service
+ symptoms_hash = symptoms.compute_hash()
+ anti_patterns = await get_knowledge_service().check_anti_pattern(
+ symptoms_hash, days=7
+ )
+ if anti_patterns:
+ ap = anti_patterns[0]
+ logger.warning(
+ "auto_repair_blocked_anti_pattern",
+ incident_id=incident.incident_id,
+ symptoms_hash=symptoms_hash,
+ anti_pattern_id=ap.id,
+ anti_pattern_title=ap.title,
+ )
+ return AutoRepairDecision(
+ can_auto_repair=False,
+ reason=f"過去 7 天有失敗案例: {ap.title}",
+ blocked_by="ANTI_PATTERN",
+ )
+ except Exception as _ap_e:
+ # Anti-Pattern 閘門失敗不阻塞主流程(僅記錄)
+ logger.warning("anti_pattern_gate_error", error=str(_ap_e))
+ symptoms_hash = ""
+
# 3. 找匹配的 Playbook
recommendations = await self._playbook_service.get_recommendations(
symptoms=symptoms,
@@ -324,7 +354,7 @@ class AutoRepairService:
execution_time_ms=execution_time,
)
- return AutoRepairResult(
+ repair_result = AutoRepairResult(
success=True,
playbook_id=playbook.playbook_id,
incident_id=incident.incident_id,
@@ -332,6 +362,25 @@ class AutoRepairService:
execution_time_ms=execution_time,
)
+ # 2026-04-04 Claude Code: Phase 25 P1 — 成功修復後 fire-and-forget 生成 AUTO_RUNBOOK
+ try:
+ from src.services.runbook_generator import get_runbook_generator
+ symptoms = self._extract_symptoms(incident)
+ symptoms_hash = symptoms.compute_hash()
+ gen = get_runbook_generator()
+ import asyncio as _asyncio
+ task = _asyncio.create_task(
+ gen.generate_runbook(incident, playbook, repair_result, symptoms_hash)
+ )
+ self._pending_tasks.add(task) if hasattr(self, "_pending_tasks") else None
+ task.add_done_callback(
+ lambda t: self._pending_tasks.discard(t) if hasattr(self, "_pending_tasks") else None
+ )
+ except Exception as _rg_e:
+ logger.warning("runbook_generator_task_failed", error=str(_rg_e))
+
+ return repair_result
+
except Exception as e:
# 更新失敗統計
await self._playbook_service.record_execution(
@@ -348,7 +397,7 @@ class AutoRepairService:
error=str(e),
)
- return AutoRepairResult(
+ fail_result = AutoRepairResult(
success=False,
playbook_id=playbook.playbook_id,
incident_id=incident.incident_id,
@@ -357,6 +406,21 @@ class AutoRepairService:
execution_time_ms=execution_time,
)
+ # 2026-04-04 Claude Code: Phase 25 P1 — 失敗修復後 fire-and-forget 生成 ANTI_PATTERN
+ try:
+ from src.services.runbook_generator import get_runbook_generator
+ symptoms = self._extract_symptoms(incident)
+ symptoms_hash = symptoms.compute_hash()
+ gen = get_runbook_generator()
+ import asyncio as _asyncio
+ _asyncio.create_task(
+ gen.generate_anti_pattern(incident, playbook, fail_result, symptoms_hash)
+ )
+ except Exception as _ap_e:
+ logger.warning("anti_pattern_task_failed", error=str(_ap_e))
+
+ return fail_result
+
# === Private Helpers ===
def _extract_symptoms(self, incident: Incident) -> SymptomPattern:
diff --git a/apps/api/src/services/drift_analyzer.py b/apps/api/src/services/drift_analyzer.py
new file mode 100644
index 00000000..4801a85c
--- /dev/null
+++ b/apps/api/src/services/drift_analyzer.py
@@ -0,0 +1,106 @@
+"""
+Drift Analyzer - Phase 25 P2 Config Drift Detection
+=====================================================
+職責:白名單過濾、DriftLevel 分級
+不解釋意圖,不生成修復指令
+
+版本: v1.0
+建立: 2026-04-04 (台北時區)
+建立者: ogt (首席架構師設計) + Claude Code (實作)
+"""
+
+from __future__ import annotations
+
+import structlog
+
+from src.models.drift import DriftItem, DriftLevel, DriftReport, DriftStatus
+
+logger = structlog.get_logger(__name__)
+
+
+class DriftAnalyzer:
+ """
+ 分析 DriftReport,決定哪些漂移需要告警、哪些靜默記錄
+
+ 職責邊界:只分級,不解釋意圖,不生成修復指令
+ """
+
+ def classify(self, report: DriftReport) -> DriftReport:
+ """
+ 根據 DriftLevel 分類漂移項目,更新計數
+
+ - INFO(白名單)→ 靜默記錄,status 保持 PENDING
+ - MEDIUM → 需通知,但非緊急
+ - HIGH → 立即告警
+
+ Returns:
+ 更新後的 DriftReport(immutable-friendly:回傳新 report)
+ """
+ high_count = 0
+ medium_count = 0
+ info_count = 0
+
+ for item in report.items:
+ if item.drift_level == DriftLevel.HIGH:
+ high_count += 1
+ elif item.drift_level == DriftLevel.MEDIUM:
+ medium_count += 1
+ else:
+ info_count += 1
+
+ # 若只有 INFO 漂移,直接標記為 IGNORED(不需人工處理)
+ status = report.status
+ if high_count == 0 and medium_count == 0 and info_count > 0:
+ status = DriftStatus.IGNORED
+ logger.info(
+ "drift_all_allowlisted",
+ report_id=report.report_id,
+ info_count=info_count,
+ )
+ elif high_count == 0 and medium_count == 0:
+ status = DriftStatus.IGNORED
+
+ return report.model_copy(update={
+ "high_count": high_count,
+ "medium_count": medium_count,
+ "info_count": info_count,
+ "status": status,
+ })
+
+ def needs_alert(self, report: DriftReport) -> bool:
+ """是否需要 Telegram 告警"""
+ return report.high_count > 0 or report.medium_count > 0
+
+ def format_diff_summary(self, report: DriftReport) -> str:
+ """格式化漂移差異摘要(給 Telegram 用)"""
+ if not report.items:
+ return "無漂移"
+
+ lines = []
+ # HIGH 優先顯示
+ for item in sorted(report.items, key=lambda i: (i.drift_level != DriftLevel.HIGH, i.field_path)):
+ if item.is_allowlisted:
+ continue
+ level_label = "🔴" if item.drift_level == DriftLevel.HIGH else "🟡"
+ lines.append(
+ f"{level_label} {item.resource_kind}/{item.resource_name}.{item.field_path}\n"
+ f" Git: {str(item.git_value)[:60]}\n"
+ f" K8s: {str(item.actual_value)[:60]}"
+ )
+ if len(lines) >= 5: # 最多顯示 5 項,避免訊息過長
+ remaining = report.high_count + report.medium_count - len(lines)
+ if remaining > 0:
+ lines.append(f"... 另有 {remaining} 項漂移")
+ break
+
+ return "\n".join(lines) if lines else f"共 {report.info_count} 項白名單漂移(已靜默)"
+
+
+_analyzer: DriftAnalyzer | None = None
+
+
+def get_drift_analyzer() -> DriftAnalyzer:
+ global _analyzer
+ if _analyzer is None:
+ _analyzer = DriftAnalyzer()
+ return _analyzer
diff --git a/apps/api/src/services/drift_detector.py b/apps/api/src/services/drift_detector.py
new file mode 100644
index 00000000..c873ecc1
--- /dev/null
+++ b/apps/api/src/services/drift_detector.py
@@ -0,0 +1,328 @@
+"""
+Drift Detector - Phase 25 P2 Config Drift Detection
+=====================================================
+職責:比對 Git YAML vs K8s 實際狀態,輸出結構化 DriftItem 列表
+不判斷嚴重性,不解釋意圖,只做事實比對
+
+版本: v1.0
+建立: 2026-04-04 (台北時區)
+建立者: ogt (首席架構師設計) + Claude Code (實作)
+"""
+
+from __future__ import annotations
+
+import asyncio
+import subprocess
+import uuid
+from pathlib import Path
+from typing import Any
+
+import structlog
+import yaml
+
+from src.models.drift import DriftItem, DriftLevel, DriftReport
+
+logger = structlog.get_logger(__name__)
+
+# 白名單欄位(靜默記錄,不告警)
+_DEFAULT_ALLOWLIST_FIELDS = frozenset([
+ "spec.replicas",
+ "spec.template.spec.containers[*].resources.requests",
+ "spec.template.spec.containers[*].resources.limits",
+ "metadata.annotations",
+ "metadata.labels.pod-template-hash",
+ "metadata.resourceVersion",
+ "metadata.generation",
+ "metadata.uid",
+ "status",
+])
+
+# 關鍵欄位(必須立即告警)
+_DEFAULT_CRITICAL_FIELDS = frozenset([
+ "spec.template.spec.containers[*].image",
+ "spec.template.spec.containers[*].env",
+ "spec.template.spec.containers[*].ports",
+ "spec.template.spec.volumes",
+ "spec.template.spec.serviceAccountName",
+])
+
+
+class GitStateReader:
+ """從 Git HEAD 讀取 K8s YAML 狀態"""
+
+ def __init__(self, k8s_dir: str = "k8s"):
+ self._k8s_dir = Path(k8s_dir)
+
+ async def read(self, namespace: str) -> dict[str, Any]:
+ """
+ 讀取 Git HEAD 中指定 namespace 的所有 K8s YAML
+
+ Returns:
+ {resource_key: parsed_yaml_dict}
+ resource_key 格式: "{kind}/{name}"
+ """
+ try:
+ result = await asyncio.get_event_loop().run_in_executor(
+ None, self._read_sync, namespace
+ )
+ return result
+ except Exception as e:
+ logger.warning("git_state_read_failed", namespace=namespace, error=str(e))
+ return {}
+
+ def _read_sync(self, namespace: str) -> dict[str, Any]:
+ resources: dict[str, Any] = {}
+
+ if not self._k8s_dir.exists():
+ logger.warning("k8s_dir_not_found", path=str(self._k8s_dir))
+ return resources
+
+ for yaml_file in self._k8s_dir.rglob("*.yaml"):
+ try:
+ with open(yaml_file) as f:
+ docs = list(yaml.safe_load_all(f))
+ for doc in docs:
+ if not doc or not isinstance(doc, dict):
+ continue
+ metadata = doc.get("metadata", {})
+ ns = metadata.get("namespace", "")
+ if ns and ns != namespace:
+ continue
+ kind = doc.get("kind", "")
+ name = metadata.get("name", "")
+ if kind and name:
+ key = f"{kind}/{name}"
+ resources[key] = doc
+ except Exception as e:
+ logger.debug("yaml_parse_failed", file=str(yaml_file), error=str(e))
+
+ return resources
+
+
+class K8sStateReader:
+ """從 kubectl 讀取 K8s 實際狀態"""
+
+ async def read(self, namespace: str) -> dict[str, Any]:
+ """
+ 透過 kubectl 取得指定 namespace 的實際狀態
+
+ Returns:
+ {resource_key: actual_resource_dict}
+ """
+ try:
+ result = await asyncio.get_event_loop().run_in_executor(
+ None, self._read_sync, namespace
+ )
+ return result
+ except Exception as e:
+ logger.warning("k8s_state_read_failed", namespace=namespace, error=str(e))
+ return {}
+
+ def _read_sync(self, namespace: str) -> dict[str, Any]:
+ resources: dict[str, Any] = {}
+ resource_types = ["deployment", "service", "configmap", "ingress"]
+
+ for rtype in resource_types:
+ try:
+ proc = subprocess.run(
+ ["kubectl", "get", rtype, "-n", namespace, "-o", "yaml"],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+ if proc.returncode != 0:
+ logger.debug("kubectl_failed", type=rtype, stderr=proc.stderr[:200])
+ continue
+
+ data = yaml.safe_load(proc.stdout)
+ if not data or data.get("kind") != "List":
+ continue
+
+ for item in data.get("items", []):
+ kind = item.get("kind", rtype.capitalize())
+ name = item.get("metadata", {}).get("name", "")
+ if name:
+ key = f"{kind}/{name}"
+ resources[key] = item
+
+ except subprocess.TimeoutExpired:
+ logger.warning("kubectl_timeout", type=rtype, namespace=namespace)
+ except Exception as e:
+ logger.warning("kubectl_error", type=rtype, error=str(e))
+
+ return resources
+
+
+class DriftDetector:
+ """
+ 比對 Git vs K8s 實際狀態,輸出 DriftItem 列表
+
+ 職責邊界:只做事實比對,不判斷嚴重性,不解釋意圖
+ """
+
+ def __init__(
+ self,
+ k8s_dir: str = "k8s",
+ allowlist_fields: frozenset | None = None,
+ critical_fields: frozenset | None = None,
+ ):
+ self._git_reader = GitStateReader(k8s_dir)
+ self._k8s_reader = K8sStateReader()
+ self._allowlist = allowlist_fields or _DEFAULT_ALLOWLIST_FIELDS
+ self._critical_fields = critical_fields or _DEFAULT_CRITICAL_FIELDS
+
+ async def scan(self, namespace: str, triggered_by: str = "cron") -> DriftReport:
+ """
+ 掃描指定 namespace 的漂移
+
+ Args:
+ namespace: K8s namespace
+ triggered_by: 觸發來源(cron / webhook / api)
+
+ Returns:
+ DriftReport(含 DriftItem 列表,尚未分析 intent)
+ """
+ report_id = str(uuid.uuid4())[:8]
+
+ logger.info("drift_scan_start", namespace=namespace, report_id=report_id)
+
+ git_state, k8s_state = await asyncio.gather(
+ self._git_reader.read(namespace),
+ self._k8s_reader.read(namespace),
+ )
+
+ items: list[DriftItem] = []
+
+ # 比對 Git 中有的資源
+ for resource_key, git_resource in git_state.items():
+ actual_resource = k8s_state.get(resource_key)
+ if actual_resource is None:
+ # 資源在 Git 中存在但 K8s 中不存在(可能尚未部署)
+ logger.debug("resource_missing_in_k8s", resource=resource_key)
+ continue
+
+ kind, name = resource_key.split("/", 1)
+ diffs = self._diff_resources(git_resource, actual_resource, kind, name, namespace)
+ items.extend(diffs)
+
+ high_count = sum(1 for i in items if i.drift_level == DriftLevel.HIGH)
+ medium_count = sum(1 for i in items if i.drift_level == DriftLevel.MEDIUM)
+ info_count = sum(1 for i in items if i.drift_level == DriftLevel.INFO)
+
+ logger.info(
+ "drift_scan_done",
+ namespace=namespace,
+ report_id=report_id,
+ high=high_count,
+ medium=medium_count,
+ info=info_count,
+ )
+
+ return DriftReport(
+ report_id=report_id,
+ namespace=namespace,
+ items=items,
+ high_count=high_count,
+ medium_count=medium_count,
+ info_count=info_count,
+ triggered_by=triggered_by,
+ )
+
+ def _diff_resources(
+ self,
+ git_res: dict,
+ actual_res: dict,
+ kind: str,
+ name: str,
+ namespace: str,
+ ) -> list[DriftItem]:
+ """逐欄位比對兩個資源,回傳差異列表"""
+ items: list[DriftItem] = []
+
+ # 只比對 spec 層(metadata 的動態欄位太多)
+ git_spec = git_res.get("spec", {})
+ actual_spec = actual_res.get("spec", {})
+
+ diffs = self._flatten_diff("spec", git_spec, actual_spec)
+ for field_path, (git_val, actual_val) in diffs.items():
+ is_allowlisted = self._is_allowlisted(field_path)
+ if is_allowlisted:
+ level = DriftLevel.INFO
+ elif self._is_critical(field_path):
+ level = DriftLevel.HIGH
+ else:
+ level = DriftLevel.MEDIUM
+
+ items.append(DriftItem(
+ resource_kind=kind,
+ resource_name=name,
+ namespace=namespace,
+ field_path=field_path,
+ git_value=git_val,
+ actual_value=actual_val,
+ drift_level=level,
+ is_allowlisted=is_allowlisted,
+ ))
+
+ return items
+
+ def _flatten_diff(
+ self,
+ prefix: str,
+ git_dict: Any,
+ actual_dict: Any,
+ ) -> dict[str, tuple[Any, Any]]:
+ """遞迴展開並比對兩個 dict,回傳 {field_path: (git_val, actual_val)}"""
+ diffs: dict[str, tuple[Any, Any]] = {}
+
+ if not isinstance(git_dict, dict) or not isinstance(actual_dict, dict):
+ if git_dict != actual_dict:
+ diffs[prefix] = (git_dict, actual_dict)
+ return diffs
+
+ all_keys = set(git_dict.keys()) | set(actual_dict.keys())
+ for key in all_keys:
+ path = f"{prefix}.{key}"
+ git_val = git_dict.get(key)
+ actual_val = actual_dict.get(key)
+
+ if git_val == actual_val:
+ continue
+
+ if isinstance(git_val, dict) and isinstance(actual_val, dict):
+ diffs.update(self._flatten_diff(path, git_val, actual_val))
+ else:
+ diffs[path] = (git_val, actual_val)
+
+ return diffs
+
+ def _is_allowlisted(self, field_path: str) -> bool:
+ """判斷欄位是否在白名單(靜默記錄不告警)"""
+ for pattern in self._allowlist:
+ # 簡單前綴匹配(*替換為粗略包含)
+ clean_pattern = pattern.replace("[*]", "")
+ if field_path.startswith(clean_pattern.replace("*", "")):
+ return True
+ return False
+
+ def _is_critical(self, field_path: str) -> bool:
+ """判斷欄位是否為關鍵欄位(HIGH 等級)"""
+ for pattern in self._critical_fields:
+ clean_pattern = pattern.replace("[*]", "")
+ if clean_pattern.replace("*", "") in field_path:
+ return True
+ return False
+
+
+# =============================================================================
+# Singleton
+# =============================================================================
+
+_detector: DriftDetector | None = None
+
+
+def get_drift_detector() -> DriftDetector:
+ global _detector
+ if _detector is None:
+ _detector = DriftDetector()
+ return _detector
diff --git a/apps/api/src/services/drift_interpreter.py b/apps/api/src/services/drift_interpreter.py
new file mode 100644
index 00000000..282aa3cc
--- /dev/null
+++ b/apps/api/src/services/drift_interpreter.py
@@ -0,0 +1,173 @@
+"""
+Drift Interpreter - Phase 25 P2 Config Drift Detection
+=======================================================
+職責:Nemotron 意圖分析(不生成修復指令)
+只回答「這是人為操作?Hotfix?系統自動變更?」
+
+設計邊界(核心原則):
+- 只輸出意圖分析,不生成 kubectl 或 git 指令
+- 確定性修復由 DriftRemediator 負責
+- Nemotron 超時 → UNKNOWN,不阻塞主流程
+
+版本: v1.0
+建立: 2026-04-04 (台北時區)
+建立者: ogt (首席架構師設計) + Claude Code (實作)
+"""
+
+from __future__ import annotations
+
+import asyncio
+import json
+from typing import TYPE_CHECKING
+
+import structlog
+
+from src.models.drift import DriftIntent, DriftInterpretation, DriftItem
+
+if TYPE_CHECKING:
+ from src.models.drift import DriftReport
+
+logger = structlog.get_logger(__name__)
+
+_INTENT_PROMPT_TEMPLATE = """你是 AWOOOI GitOps 守門員,請分析以下 K8s 配置漂移的意圖。
+
+## 漂移詳情
+{diff_summary}
+
+## 任務
+判斷這次漂移最可能的原因:
+- emergency_hotfix: 繞過 CI 的緊急修補(image tag 改變但無對應 Git commit)
+- human_error: 誤操作(非預期的隨機欄位改變)
+- automated_change: 系統自動變更(HPA replicas, 系統注入的 annotation 等)
+- unknown: 無法判斷
+
+請以 JSON 回應:
+{{
+ "intent": "emergency_hotfix|human_error|automated_change|unknown",
+ "explanation": "用繁體中文解釋你的判斷理由(一句話)",
+ "risk": "HIGH|MEDIUM|LOW",
+ "confidence": 0.0到1.0之間的數字
+}}
+
+只輸出 JSON,不要任何額外說明。
+"""
+
+
+class NemotronDriftInterpreter:
+ """
+ 使用 Nemotron 分析漂移意圖
+
+ 職責邊界:
+ ✅ 輸出意圖分析
+ ❌ 不生成修復指令
+ ❌ 不直接呼叫 kubectl 或 git
+ """
+
+ async def analyze(self, report: "DriftReport") -> DriftInterpretation:
+ """
+ 分析漂移意圖
+
+ Args:
+ report: 已分類的 DriftReport
+
+ Returns:
+ DriftInterpretation(超時或失敗時回傳 UNKNOWN)
+ """
+ if not report.items or (report.high_count == 0 and report.medium_count == 0):
+ return DriftInterpretation(
+ intent=DriftIntent.UNKNOWN,
+ explanation="無顯著漂移,不需要意圖分析",
+ confidence=1.0,
+ )
+
+ diff_text = self._format_diff_for_prompt(report)
+ prompt = _INTENT_PROMPT_TEMPLATE.format(diff_summary=diff_text)
+
+ result = await self._call_nemotron(prompt)
+ return result
+
+ def _format_diff_for_prompt(self, report: "DriftReport") -> str:
+ """格式化 diff 給 Nemotron 分析用"""
+ lines = []
+ for item in report.items[:10]: # 最多 10 項避免 token 過多
+ if item.is_allowlisted:
+ continue
+ lines.append(
+ f"- {item.resource_kind}/{item.resource_name}: "
+ f"{item.field_path} "
+ f"Git={str(item.git_value)[:40]} → "
+ f"K8s={str(item.actual_value)[:40]}"
+ )
+ return "\n".join(lines) if lines else "(均為白名單欄位)"
+
+ async def _call_nemotron(self, prompt: str) -> DriftInterpretation:
+ """呼叫 Nemotron 進行意圖分析"""
+ try:
+ from src.core.config import get_settings
+ from src.services.nvidia_provider import get_nvidia_provider
+
+ settings = get_settings()
+ nvidia = get_nvidia_provider()
+
+ response_text, success, _tokens, _cost = await asyncio.wait_for(
+ nvidia.chat(prompt=prompt),
+ timeout=getattr(settings, "NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS", 30),
+ )
+
+ if not success or not response_text:
+ return self._unknown_result("Nemotron 回傳空值")
+
+ return self._parse_response(response_text)
+
+ except asyncio.TimeoutError:
+ logger.warning("drift_nemotron_timeout")
+ return self._unknown_result("Nemotron 超時")
+ except Exception as e:
+ logger.warning("drift_nemotron_error", error=str(e))
+ return self._unknown_result(str(e))
+
+ def _parse_response(self, text: str) -> DriftInterpretation:
+ """解析 Nemotron JSON 回應"""
+ try:
+ # 嘗試直接解析
+ data = json.loads(text)
+ except Exception:
+ try:
+ import re
+ match = re.search(r"```(?:json)?\s*([\s\S]+?)```", text)
+ if match:
+ data = json.loads(match.group(1))
+ else:
+ return self._unknown_result("無法解析 JSON")
+ except Exception:
+ return self._unknown_result("JSON 解析失敗")
+
+ try:
+ intent_str = data.get("intent", "unknown")
+ intent = DriftIntent(intent_str) if intent_str in DriftIntent._value2member_map_ else DriftIntent.UNKNOWN
+ return DriftInterpretation(
+ intent=intent,
+ explanation=data.get("explanation", ""),
+ risk=data.get("risk", "MEDIUM"),
+ confidence=float(data.get("confidence", 0.0)),
+ )
+ except Exception as e:
+ return self._unknown_result(f"模型解析失敗: {e}")
+
+ def _unknown_result(self, reason: str) -> DriftInterpretation:
+ return DriftInterpretation(
+ intent=DriftIntent.UNKNOWN,
+ explanation=f"意圖分析失敗:{reason}",
+ risk="MEDIUM",
+ confidence=0.0,
+ )
+
+
+_interpreter: NemotronDriftInterpreter | None = None
+
+
+def get_drift_interpreter() -> NemotronDriftInterpreter:
+ global _interpreter
+ if _interpreter is None:
+ _interpreter = NemotronDriftInterpreter()
+ return _interpreter
diff --git a/apps/api/src/services/drift_remediator.py b/apps/api/src/services/drift_remediator.py
new file mode 100644
index 00000000..33ba64be
--- /dev/null
+++ b/apps/api/src/services/drift_remediator.py
@@ -0,0 +1,233 @@
+"""
+Drift Remediator - Phase 25 P2 Config Drift Detection
+======================================================
+職責:確定性修復執行
+- rollback():kubectl apply -f (覆蓋回 Git 狀態)
+- adopt():git commit + git push gitea main(承認變更,更新 Git)
+
+設計邊界(核心原則):
+- 不使用 AI 判斷如何修復
+- 只有人工確認按鈕後才執行
+- rollback 失敗只通知,不重試(避免重複操作)
+
+版本: v1.0
+建立: 2026-04-04 (台北時區)
+建立者: ogt (首席架構師設計) + Claude Code (實作)
+"""
+
+from __future__ import annotations
+
+import asyncio
+import subprocess
+from typing import TYPE_CHECKING
+
+import structlog
+
+if TYPE_CHECKING:
+ from src.models.drift import DriftItem, DriftReport
+
+logger = structlog.get_logger(__name__)
+
+
+class DriftRemediator:
+ """
+ 確定性漂移修復執行器
+
+ 職責邊界:
+ ✅ kubectl apply(覆蓋回 Git 狀態)
+ ✅ git commit + push(承認變更)
+ ❌ 不使用 AI 決定修復策略
+ ❌ 不自動重試
+ """
+
+ def __init__(self, k8s_dir: str = "k8s"):
+ self._k8s_dir = k8s_dir
+
+ async def rollback(
+ self,
+ report: "DriftReport",
+ resource_key: str | None = None,
+ ) -> dict:
+ """
+ 覆蓋回 Git 狀態(kubectl apply)
+
+ Args:
+ report: 漂移報告
+ resource_key: 指定資源(Kind/Name),None 表示全部
+
+ Returns:
+ {"success": bool, "message": str}
+ """
+ logger.info(
+ "drift_rollback_start",
+ report_id=report.report_id,
+ resource=resource_key or "all",
+ )
+
+ try:
+ result = await asyncio.get_event_loop().run_in_executor(
+ None,
+ self._kubectl_apply,
+ report.namespace,
+ resource_key,
+ )
+
+ if result["success"]:
+ logger.info(
+ "drift_rollback_success",
+ report_id=report.report_id,
+ namespace=report.namespace,
+ )
+ await self._notify_telegram(
+ f"✅ 漂移已覆蓋回 Git 狀態\n"
+ f"Namespace: {report.namespace}\n"
+ f"資源: {resource_key or '全部'}"
+ )
+ else:
+ logger.error(
+ "drift_rollback_failed",
+ report_id=report.report_id,
+ error=result.get("message"),
+ )
+ await self._notify_telegram(
+ f"❌ 漂移覆蓋失敗,需要人工介入\n"
+ f"Namespace: {report.namespace}\n"
+ f"錯誤: {result.get('message', '')[:200]}"
+ )
+
+ return result
+
+ except Exception as e:
+ msg = f"rollback 異常: {str(e)}"
+ logger.error("drift_rollback_exception", error=str(e))
+ await self._notify_telegram(
+ f"❌ 漂移覆蓋異常\nNamespace: {report.namespace}\n錯誤: {str(e)[:200]}"
+ )
+ return {"success": False, "message": msg}
+
+ async def adopt(
+ self,
+ report: "DriftReport",
+ field_description: str = "",
+ ) -> dict:
+ """
+ 承認變更:git commit + git push gitea main
+
+ Args:
+ report: 漂移報告
+ field_description: 漂移欄位說明(用於 commit message)
+
+ Returns:
+ {"success": bool, "message": str}
+ """
+ logger.info(
+ "drift_adopt_start",
+ report_id=report.report_id,
+ namespace=report.namespace,
+ )
+
+ # 這裡不直接修改 git(需要人工決定具體的值),
+ # 而是提示用戶需要在本地執行 git 操作
+ # 在實際部署場景中,可透過 Gitea API 建立 PR 或直接 push
+ commit_msg = (
+ f"chore: adopt drift — {report.namespace} "
+ f"{field_description or report.summary}"
+ )
+
+ try:
+ result = await asyncio.get_event_loop().run_in_executor(
+ None,
+ self._git_push,
+ commit_msg,
+ )
+
+ if result["success"]:
+ logger.info("drift_adopt_success", report_id=report.report_id)
+ await self._notify_telegram(
+ f"✅ 漂移已承認,Git 已更新\n"
+ f"Namespace: {report.namespace}\n"
+ f"Commit: {commit_msg[:80]}"
+ )
+ else:
+ logger.error("drift_adopt_failed", error=result.get("message"))
+ await self._notify_telegram(
+ f"❌ Git 更新失敗,需要人工處理\n"
+ f"錯誤: {result.get('message', '')[:200]}"
+ )
+
+ return result
+
+ except Exception as e:
+ logger.error("drift_adopt_exception", error=str(e))
+ return {"success": False, "message": str(e)}
+
+ # =========================================================================
+ # Private
+ # =========================================================================
+
+ def _kubectl_apply(self, namespace: str, resource_key: str | None) -> dict:
+ """執行 kubectl apply(同步)"""
+ try:
+ cmd = ["kubectl", "apply", "-f", self._k8s_dir, "-n", namespace, "--dry-run=none"]
+ proc = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=60,
+ )
+ if proc.returncode == 0:
+ return {"success": True, "message": proc.stdout[:500]}
+ else:
+ return {"success": False, "message": proc.stderr[:500]}
+ except subprocess.TimeoutExpired:
+ return {"success": False, "message": "kubectl apply 超時(60s)"}
+ except Exception as e:
+ return {"success": False, "message": str(e)}
+
+ def _git_push(self, commit_msg: str) -> dict:
+ """執行 git add + commit + push gitea(同步)"""
+ try:
+ # git add
+ subprocess.run(["git", "add", "-A"], check=True, timeout=10)
+ # git commit
+ subprocess.run(
+ ["git", "commit", "-m", commit_msg],
+ check=True,
+ timeout=10,
+ )
+ # git push gitea main
+ proc = subprocess.run(
+ ["git", "push", "gitea", "main"],
+ capture_output=True,
+ text=True,
+ timeout=30,
+ )
+ if proc.returncode == 0:
+ return {"success": True, "message": "已推送至 gitea main"}
+ else:
+ return {"success": False, "message": proc.stderr[:500]}
+ except subprocess.CalledProcessError as e:
+ return {"success": False, "message": f"git 操作失敗: {e}"}
+ except subprocess.TimeoutExpired:
+ return {"success": False, "message": "git push 超時"}
+ except Exception as e:
+ return {"success": False, "message": str(e)}
+
+ async def _notify_telegram(self, message: str) -> None:
+ """推送通知到 Telegram"""
+ try:
+ from src.services.telegram_gateway import get_telegram_gateway
+ tg = get_telegram_gateway()
+ await tg.send_text(message)
+ except Exception as e:
+ logger.warning("drift_remediator_telegram_failed", error=str(e))
+
+
+_remediator: DriftRemediator | None = None
+
+
+def get_drift_remediator() -> DriftRemediator:
+ global _remediator
+ if _remediator is None:
+ _remediator = DriftRemediator()
+ return _remediator
diff --git a/apps/api/src/services/knowledge_service.py b/apps/api/src/services/knowledge_service.py
index 6c50dfdd..a07f11d0 100644
--- a/apps/api/src/services/knowledge_service.py
+++ b/apps/api/src/services/knowledge_service.py
@@ -223,3 +223,56 @@ class KnowledgeService:
logger.info("embed_all_complete", total=len(rows), success=success, failed=failed)
return {"total": len(rows), "success": success, "failed": failed}
+
+ async def check_anti_pattern(
+ self,
+ symptoms_hash: str,
+ days: int = 7,
+ ) -> list[KnowledgeEntry]:
+ """
+ 2026-04-04 Claude Code: Phase 25 P1 — Anti-Pattern 閉環閘門
+ 根據 symptoms_hash 查找近期失敗案例,供 auto_repair decide() 攔截用
+
+ Args:
+ symptoms_hash: SymptomPattern.compute_hash() 的 16 字元 hash
+ days: 查找幾天內的記錄(預設 7 天)
+
+ Returns:
+ list[KnowledgeEntry] — ANTI_PATTERN 條目,空表示無已知失敗案例
+ """
+ from datetime import timedelta
+ from sqlalchemy import text as sa_text
+ from src.utils.timezone import now_taipei
+
+ cutoff = now_taipei() - timedelta(days=days)
+
+ async with get_db_context() as db:
+ result = await db.execute(
+ sa_text(
+ "SELECT id FROM knowledge_entries "
+ "WHERE entry_type = 'anti_pattern' "
+ "AND symptoms_hash = :hash "
+ "AND created_at >= :cutoff "
+ "AND status != 'archived' "
+ "ORDER BY created_at DESC LIMIT 5"
+ ),
+ {"hash": symptoms_hash, "cutoff": cutoff},
+ )
+ entry_ids = [row.id for row in result.fetchall()]
+
+ if not entry_ids:
+ return []
+
+ entries = []
+ for eid in entry_ids:
+ entry = await self.get_entry(eid)
+ if entry:
+ entries.append(entry)
+
+ logger.info(
+ "anti_pattern_check",
+ symptoms_hash=symptoms_hash,
+ days=days,
+ found=len(entries),
+ )
+ return entries
diff --git a/apps/api/src/services/runbook_generator.py b/apps/api/src/services/runbook_generator.py
new file mode 100644
index 00000000..a70b1172
--- /dev/null
+++ b/apps/api/src/services/runbook_generator.py
@@ -0,0 +1,343 @@
+"""
+Runbook Generator - Phase 25 P1 Knowledge Auto-Harvesting
+==========================================================
+修復後自動生成 Runbook(成功)或 Anti-Pattern(失敗)
+透過 Nemotron NIM 生成,結果沉澱至 KM 知識庫
+
+設計原則:
+- 非阻塞:asyncio.create_task() 呼叫,絕不影響 AutoRepair 主流程
+- 失敗靜默:生成失敗只記 log,不拋例外
+- DRAFT/PUBLISHED:成功 → DRAFT(需人工審核),失敗 → PUBLISHED(直接封鎖)
+
+版本: v1.1
+建立: 2026-04-04 (台北時區)
+建立者: ogt (首席架構師設計) + Claude Code (實作)
+關聯設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向一
+
+變更紀錄:
+| 版本 | 日期 | 執行者 | 變更內容 |
+|------|------|--------|----------|
+| v1.0 | 2026-04-04 | Claude Code | 初始佔位(使用 generate() 但介面不存在) |
+| v1.1 | 2026-04-04 | ogt (首席架構師) | 改用正確的 nvidia.chat() 介面;新增 Minimal fallback |
+"""
+
+from __future__ import annotations
+
+import asyncio
+import time
+from typing import TYPE_CHECKING
+
+import structlog
+
+from src.models.knowledge import EntrySource, EntryStatus, EntryType, KnowledgeEntryCreate
+
+if TYPE_CHECKING:
+ from src.models.incident import Incident
+ from src.models.playbook import Playbook
+ from src.services.auto_repair_service import AutoRepairResult
+
+logger = structlog.get_logger(__name__)
+
+
+class NemotronRunbookGenerator:
+ """
+ Nemotron 驅動的 Runbook 自動生成器
+
+ 職責:
+ - 成功修復 → AUTO_RUNBOOK (DRAFT) + Telegram 審核 card
+ - 失敗修復 → ANTI_PATTERN (PUBLISHED) + Telegram 通知
+
+ leWOOOgo 積木化:
+ - 呼叫 KnowledgeService(不直接存 DB)
+ - 呼叫 NvidiaProvider.chat()(非 AIRouter,Runbook 是知識副作用)
+ """
+
+ _RUNBOOK_SYSTEM = (
+ "你是 AWOOOI 平台的 SRE Runbook 撰寫專家。"
+ "根據提供的 Incident 與修復結果,用繁體中文生成完整結構化 Runbook。"
+ )
+
+ _ANTI_PATTERN_SYSTEM = (
+ "你是 AWOOOI 平台的故障分析專家。"
+ "根據失敗的修復嘗試,用繁體中文生成失敗案例記錄,幫助未來避免重蹈覆轍。"
+ )
+
+ async def generate_runbook(
+ self,
+ incident: "Incident",
+ playbook: "Playbook",
+ result: "AutoRepairResult",
+ symptoms_hash: str,
+ ) -> None:
+ """
+ 成功修復後生成 AUTO_RUNBOOK(fire-and-forget,呼叫方不等待)
+
+ Args:
+ incident: 觸發的 Incident
+ playbook: 執行的 Playbook
+ result: 執行結果(success=True)
+ symptoms_hash: SymptomPattern.compute_hash() 的 hash
+ """
+ try:
+ content = await self._call_nemotron_for_runbook(incident, playbook, result)
+ if not content:
+ return
+
+ from src.services.knowledge_service import get_knowledge_service
+ ks = get_knowledge_service()
+
+ entry_data = KnowledgeEntryCreate(
+ title=f"[AUTO] {incident.incident_id} — {playbook.name}",
+ content=content,
+ entry_type=EntryType.AUTO_RUNBOOK,
+ category="auto_generated",
+ tags=list(incident.affected_services or []) + ["auto_runbook", "nemotron"],
+ source=EntrySource.AI_EXTRACTED,
+ status=EntryStatus.DRAFT,
+ related_incident_id=incident.incident_id,
+ related_playbook_id=playbook.playbook_id,
+ symptoms_hash=symptoms_hash,
+ created_by="nemotron_runbook_generator",
+ )
+
+ entry = await ks.create_entry(entry_data)
+
+ logger.info(
+ "auto_runbook_created",
+ incident_id=incident.incident_id,
+ entry_id=entry.id,
+ playbook_id=playbook.playbook_id,
+ )
+
+ await self._push_runbook_review_card(incident, entry.id, content[:200])
+
+ except Exception as e:
+ logger.error(
+ "runbook_generation_failed",
+ incident_id=incident.incident_id,
+ error=str(e),
+ )
+
+ async def generate_anti_pattern(
+ self,
+ incident: "Incident",
+ playbook: "Playbook",
+ result: "AutoRepairResult",
+ symptoms_hash: str,
+ ) -> None:
+ """
+ 失敗修復後生成 ANTI_PATTERN(fire-and-forget,直接 PUBLISHED)
+
+ Args:
+ incident: 觸發的 Incident
+ playbook: 嘗試執行的 Playbook
+ result: 執行結果(success=False)
+ symptoms_hash: SymptomPattern.compute_hash() 的 hash
+ """
+ try:
+ content = await self._call_nemotron_for_anti_pattern(incident, playbook, result)
+ if not content:
+ return
+
+ from src.services.knowledge_service import get_knowledge_service
+ ks = get_knowledge_service()
+
+ title = f"[FAIL] {incident.incident_id} — {playbook.name}"
+ entry_data = KnowledgeEntryCreate(
+ title=title,
+ content=content,
+ entry_type=EntryType.ANTI_PATTERN,
+ category="failure_cases",
+ tags=list(incident.affected_services or []) + ["anti_pattern", "failure"],
+ source=EntrySource.AI_EXTRACTED,
+ status=EntryStatus.PUBLISHED, # 直接發布,無需審核
+ related_incident_id=incident.incident_id,
+ related_playbook_id=playbook.playbook_id,
+ symptoms_hash=symptoms_hash,
+ created_by="nemotron_runbook_generator",
+ )
+
+ entry = await ks.create_entry(entry_data)
+
+ logger.info(
+ "anti_pattern_created",
+ incident_id=incident.incident_id,
+ entry_id=entry.id,
+ symptoms_hash=symptoms_hash,
+ )
+
+ await self._push_anti_pattern_notification(incident, title)
+
+ except Exception as e:
+ logger.error(
+ "anti_pattern_generation_failed",
+ incident_id=incident.incident_id,
+ error=str(e),
+ )
+
+ # =========================================================================
+ # Private
+ # =========================================================================
+
+ async def _call_nemotron_for_runbook(
+ self,
+ incident: "Incident",
+ playbook: "Playbook",
+ result: "AutoRepairResult",
+ ) -> str:
+ """呼叫 Nemotron chat() 生成 9 段 Runbook,回傳 Markdown 字串"""
+ from src.core.config import get_settings
+ from src.services.nvidia_provider import get_nvidia_provider
+
+ settings = get_settings()
+ prompt = (
+ f"## Incident 資訊\n"
+ f"- ID: {incident.incident_id}\n"
+ f"- 受影響服務: {', '.join(incident.affected_services or [])}\n"
+ f"- 嚴重度: {incident.severity.value if incident.severity else 'unknown'}\n\n"
+ f"## 執行的 Playbook\n"
+ f"- 名稱: {playbook.name}\n"
+ f"- 執行步驟:\n"
+ + "\n".join(f" {s}" for s in result.executed_steps[:5])
+ + f"\n\n## 執行結果\n- 狀態: 成功,耗時 {result.execution_time_ms}ms\n\n"
+ "請生成包含以下 9 段的 Runbook(Markdown 格式):\n"
+ "1. ## 症狀描述\n2. ## 根因分析\n3. ## 執行步驟\n"
+ "4. ## 驗證步驟\n5. ## 注意事項\n6. ## 影響範圍\n"
+ "7. ## 相關 Incident\n8. ## 下次預防建議\n9. ## 適用條件"
+ )
+
+ try:
+ nvidia = get_nvidia_provider()
+ start = time.time()
+ # chat() 回傳 (response_text, success, total_tokens, cost_usd)
+ response_text, success, _tokens, _cost = await asyncio.wait_for(
+ nvidia.chat(prompt=f"[SYSTEM]{self._RUNBOOK_SYSTEM}\n\n{prompt}"),
+ timeout=settings.NEMOTRON_TIMEOUT_SECONDS,
+ )
+ latency_ms = (time.time() - start) * 1000
+ logger.info("runbook_nemotron_call_ok", latency_ms=round(latency_ms, 1))
+ if success and response_text:
+ return response_text
+ except Exception as e:
+ logger.warning("runbook_nemotron_call_failed", error=str(e))
+
+ # Fallback:組裝基本 Runbook
+ return self._build_minimal_runbook(incident, playbook, result)
+
+ async def _call_nemotron_for_anti_pattern(
+ self,
+ incident: "Incident",
+ playbook: "Playbook",
+ result: "AutoRepairResult",
+ ) -> str:
+ """呼叫 Nemotron chat() 生成失敗案例記錄,回傳 Markdown 字串"""
+ from src.core.config import get_settings
+ from src.services.nvidia_provider import get_nvidia_provider
+
+ settings = get_settings()
+ prompt = (
+ f"## Incident 資訊\n"
+ f"- ID: {incident.incident_id}\n"
+ f"- 受影響服務: {', '.join(incident.affected_services or [])}\n\n"
+ f"## 嘗試的 Playbook\n- 名稱: {playbook.name}\n\n"
+ f"## 失敗原因\n{result.error or '執行中發生未知異常'}\n\n"
+ "請生成失敗案例文件(Markdown 格式),包含:\n"
+ "## 症狀描述\n## 嘗試的修復方案\n## 失敗原因分析\n"
+ "## 已知不適用條件\n## 替代方案建議"
+ )
+
+ try:
+ nvidia = get_nvidia_provider()
+ response_text, success, _tokens, _cost = await asyncio.wait_for(
+ nvidia.chat(prompt=f"[SYSTEM]{self._ANTI_PATTERN_SYSTEM}\n\n{prompt}"),
+ timeout=settings.NEMOTRON_TIMEOUT_SECONDS,
+ )
+ if success and response_text:
+ return response_text
+ except Exception as e:
+ logger.warning("anti_pattern_nemotron_call_failed", error=str(e))
+
+ return self._build_minimal_anti_pattern(incident, playbook, result)
+
+ def _build_minimal_runbook(
+ self,
+ incident: "Incident",
+ playbook: "Playbook",
+ result: "AutoRepairResult",
+ ) -> str:
+ """Nemotron 超時/失敗時的基本 Runbook fallback"""
+ steps = "\n".join(f"- {s}" for s in result.executed_steps)
+ return (
+ f"## 症狀描述\nIncident {incident.incident_id},"
+ f"受影響服務:{', '.join(incident.affected_services or [])}\n\n"
+ f"## 執行步驟\n{steps}\n\n"
+ f"## 執行結果\n成功,耗時 {result.execution_time_ms}ms\n\n"
+ "*本文件由系統自動生成(Nemotron fallback),建議人工補充完善。*"
+ )
+
+ def _build_minimal_anti_pattern(
+ self,
+ incident: "Incident",
+ playbook: "Playbook",
+ result: "AutoRepairResult",
+ ) -> str:
+ """Nemotron 超時/失敗時的基本 Anti-Pattern fallback"""
+ return (
+ f"## 症狀描述\nIncident {incident.incident_id},"
+ f"受影響服務:{', '.join(incident.affected_services or [])}\n\n"
+ f"## 失敗原因\n{result.error or '執行中發生異常'}\n\n"
+ f"## 已知不適用條件\nPlaybook '{playbook.name}' 在此症狀下失敗,請勿自動重試。\n\n"
+ "*本文件由系統自動生成(Nemotron fallback)。*"
+ )
+
+ async def _push_runbook_review_card(
+ self,
+ incident: "Incident",
+ entry_id: str,
+ content_preview: str,
+ ) -> None:
+ """推送 Runbook 審核 card 到 Telegram"""
+ try:
+ from src.services.telegram_gateway import get_telegram_gateway
+ tg = get_telegram_gateway()
+ await tg.send_text(
+ f"📄 Auto Runbook 待審核\n"
+ f"Incident: {incident.incident_id}\n"
+ f"Entry ID: {entry_id}\n\n"
+ f"{content_preview}...\n\n"
+ f"請至知識庫審核並發布。"
+ )
+ except Exception as e:
+ logger.warning("runbook_review_card_failed", error=str(e))
+
+ async def _push_anti_pattern_notification(
+ self,
+ incident: "Incident",
+ title: str,
+ ) -> None:
+ """推送 Anti-Pattern 已記錄通知到 Telegram"""
+ try:
+ from src.services.telegram_gateway import get_telegram_gateway
+ tg = get_telegram_gateway()
+ await tg.send_text(
+ f"⚠️ 已記錄失敗案例\n"
+ f"Incident: {incident.incident_id}\n"
+ f"標題: {title}\n\n"
+ f"相同症狀的後續告警將阻斷自動修復,要求人工介入。"
+ )
+ except Exception as e:
+ logger.warning("anti_pattern_notification_failed", error=str(e))
+
+
+# =============================================================================
+# 單例管理
+# =============================================================================
+
+_generator: NemotronRunbookGenerator | None = None
+
+
+def get_runbook_generator() -> NemotronRunbookGenerator:
+ global _generator
+ if _generator is None:
+ _generator = NemotronRunbookGenerator()
+ return _generator
diff --git a/k8s/drift-cronjob.yaml b/k8s/drift-cronjob.yaml
new file mode 100644
index 00000000..23877bd0
--- /dev/null
+++ b/k8s/drift-cronjob.yaml
@@ -0,0 +1,71 @@
+# Config Drift Detection CronJob - Phase 25 P2
+# 每小時掃描 awoooi-prod namespace 的配置漂移
+#
+# 建立時間: 2026-04-04 (台北時區)
+# 建立者: Claude Code (Phase 25 P2)
+# 關聯設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向三
+# 關聯 ADR: 待起草 ADR-057
+#
+# 部署: kubectl apply -f k8s/drift-cronjob.yaml -n awoooi-prod
+# 手動觸發: kubectl create job --from=cronjob/drift-scanner drift-scan-manual -n awoooi-prod
+# 查看 log: kubectl logs -l job-name=drift-scanner -n awoooi-prod
+
+apiVersion: batch/v1
+kind: CronJob
+metadata:
+ name: drift-scanner
+ namespace: awoooi-prod
+ labels:
+ app: awoooi
+ component: drift-scanner
+ phase: "25"
+ annotations:
+ # 2026-04-04 ogt: Phase 25 P2 — Config Drift Detection
+ description: "每小時掃描 K8s 配置漂移,由 Nemotron 做意圖分析"
+spec:
+ # 每小時整點執行(台北時間 = UTC+8,schedule 用 UTC)
+ schedule: "0 * * * *"
+ concurrencyPolicy: Forbid # 禁止並發:上次未完成則跳過
+ successfulJobsHistoryLimit: 3
+ failedJobsHistoryLimit: 5
+ startingDeadlineSeconds: 60 # 錯過時間窗口超過 60s 則跳過
+ jobTemplate:
+ spec:
+ backoffLimit: 0 # 失敗不重試(漂移掃描冪等,下次 cron 自動補掃)
+ activeDeadlineSeconds: 300 # 最長 5 分鐘
+ template:
+ metadata:
+ labels:
+ app: awoooi
+ component: drift-scanner
+ spec:
+ restartPolicy: Never
+ serviceAccountName: awoooi-api # 使用 API 的 ServiceAccount(有 kubectl 權限)
+ containers:
+ - name: drift-scanner
+ # 使用 awoooi-api 鏡像(含 kubectl + Python 環境)
+ image: harbor.wooo.work/awoooi/api:latest
+ imagePullPolicy: Always
+ command:
+ - python
+ - -c
+ - |
+ import asyncio, httpx, os
+ API_URL = os.environ.get("INTERNAL_API_URL", "http://awoooi-api:8000")
+ async def run():
+ async with httpx.AsyncClient(timeout=240) as c:
+ r = await c.post(f"{API_URL}/api/v1/drift/internal/scan")
+ print(f"status={r.status_code} body={r.text[:200]}")
+ asyncio.run(run())
+ env:
+ - name: INTERNAL_API_URL
+ value: "http://awoooi-api.awoooi-prod.svc.cluster.local:8000"
+ - name: DRIFT_SCAN_NAMESPACES
+ value: "awoooi-prod"
+ resources:
+ requests:
+ cpu: "50m"
+ memory: "64Mi"
+ limits:
+ cpu: "200m"
+ memory: "256Mi"