feat(phase25): Nemotron 主動防禦三方向 P0+P1+P2 完整實作
P0 - DIAGNOSE Privacy-First Routing: - ai_router.py: _local_fallback_chain [NEMOTRON→OLLAMA→REJECT] - DIAGNOSE 意圖 override 改為 NEMOTRON (原 OLLAMA) - DIAGNOSE fallback 使用 local-only 鏈,不觸碰雲端 - 全部失敗時 REJECT + Telegram 通知 - config.py: NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS=30, OLLAMA_DIAGNOSE_TIMEOUT_SECONDS=60 - nemotron.py: 根據 context[task_type] 選擇 timeout P1 - Knowledge Auto-Harvesting: - models/knowledge.py: EntryType.AUTO_RUNBOOK + ANTI_PATTERN + symptoms_hash - EntryStatus.PUBLISHED (ANTI_PATTERN 直接發布,無需審核) - models/playbook.py: SymptomPattern.compute_hash() (16字元確定性 hash) - services/runbook_generator.py: NemotronRunbookGenerator (v1.1) - generate_runbook() → AUTO_RUNBOOK (DRAFT) + Telegram 審核 card - generate_anti_pattern() → ANTI_PATTERN (PUBLISHED) + Telegram 通知 - 使用 nvidia.chat() (正確介面),Nemotron 超時時 Minimal fallback - knowledge_service.py: check_anti_pattern(symptoms_hash, days=7) - db/models.py: symptoms_hash VARCHAR(16) + ix_knowledge_symptoms_hash - repositories/knowledge_repository.py: create() 支援 symptoms_hash + status - auto_repair_service.py: anti_pattern_gate 在 decide() + runbook hook 在 execute() - migrations/phase8_symptoms_hash.sql: ALTER TABLE + partial index + PUBLISHED constraint P2 - Config Drift Detection: - models/drift.py: DriftItem/DriftReport/DriftLevel/DriftIntent/DriftStatus - services/drift_detector.py: GitStateReader + K8sStateReader + DriftDetector - services/drift_analyzer.py: 白名單過濾 + DriftLevel 分級 - services/drift_interpreter.py: NemotronDriftInterpreter(意圖分析,不生成修復指令) - services/drift_remediator.py: rollback(kubectl apply) + adopt(git push gitea) - api/v1/drift.py: POST /scan, GET /reports, POST /rollback, POST /adopt - migrations/phase9_drift_reports.sql: drift_reports 表 - k8s/drift-cronjob.yaml: 每小時自動掃描 CronJob Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
48
apps/api/migrations/phase8_symptoms_hash.sql
Normal file
48
apps/api/migrations/phase8_symptoms_hash.sql
Normal file
@@ -0,0 +1,48 @@
|
||||
-- Phase 25 P1: Knowledge Auto-Harvesting — symptoms_hash 欄位
|
||||
-- 用於 Anti-Pattern 閉環攔截的確定性症狀 hash
|
||||
-- 建立時間: 2026-04-04 (台北時區)
|
||||
-- 建立者: Claude Code (Phase 25 P1)
|
||||
--
|
||||
-- 執行方式: psql -h 192.168.0.188 -U awoooi -d awoooi -f phase8_symptoms_hash.sql
|
||||
|
||||
-- 1. knowledge_entries 表新增 symptoms_hash 欄位
|
||||
ALTER TABLE knowledge_entries
|
||||
ADD COLUMN IF NOT EXISTS symptoms_hash VARCHAR(16);
|
||||
|
||||
-- 2. 建立 index 加速 Anti-Pattern 閘門查詢
|
||||
-- 查詢條件: entry_type='anti_pattern' AND symptoms_hash=:hash AND created_at>=:cutoff
|
||||
CREATE INDEX IF NOT EXISTS idx_knowledge_anti_pattern_hash
|
||||
ON knowledge_entries (entry_type, symptoms_hash, created_at)
|
||||
WHERE entry_type = 'anti_pattern' AND symptoms_hash IS NOT NULL;
|
||||
|
||||
-- 3. EntryStatus 新增 PUBLISHED(用於 ANTI_PATTERN 直接發布)
|
||||
-- PostgreSQL CHECK constraint 需要重建(若有的話)
|
||||
-- 若無 constraint,PostgreSQL 的 VARCHAR 欄位可直接存入任意值,無需 ALTER。
|
||||
-- 確認 status 欄位是否有 CHECK constraint:
|
||||
-- SELECT conname, consrc FROM pg_constraint
|
||||
-- WHERE conrelid = 'knowledge_entries'::regclass AND contype = 'c';
|
||||
|
||||
-- 若有 CHECK constraint(如 status IN ('draft', 'review', 'approved', 'archived')),
|
||||
-- 需執行以下(請先確認 constraint 名稱):
|
||||
-- ALTER TABLE knowledge_entries DROP CONSTRAINT IF EXISTS knowledge_entries_status_check;
|
||||
-- ALTER TABLE knowledge_entries ADD CONSTRAINT knowledge_entries_status_check
|
||||
-- CHECK (status IN ('draft', 'review', 'approved', 'archived', 'published'));
|
||||
|
||||
-- 安全執行版本(自動處理 CHECK constraint):
|
||||
DO $$
|
||||
DECLARE
|
||||
v_conname text;
|
||||
BEGIN
|
||||
SELECT conname INTO v_conname
|
||||
FROM pg_constraint
|
||||
WHERE conrelid = 'knowledge_entries'::regclass AND contype = 'c' AND conname LIKE '%status%';
|
||||
|
||||
IF v_conname IS NOT NULL THEN
|
||||
EXECUTE format('ALTER TABLE knowledge_entries DROP CONSTRAINT %I', v_conname);
|
||||
ALTER TABLE knowledge_entries ADD CONSTRAINT knowledge_entries_status_check
|
||||
CHECK (status IN ('draft', 'review', 'approved', 'archived', 'published'));
|
||||
RAISE NOTICE 'Updated status CHECK constraint: % → added published', v_conname;
|
||||
ELSE
|
||||
RAISE NOTICE 'No status CHECK constraint found, skipping';
|
||||
END IF;
|
||||
END $$;
|
||||
54
apps/api/migrations/phase9_drift_reports.sql
Normal file
54
apps/api/migrations/phase9_drift_reports.sql
Normal file
@@ -0,0 +1,54 @@
|
||||
-- Phase 25 P2: Config Drift Detection — drift_reports 資料表
|
||||
-- 建立時間: 2026-04-04 (台北時區)
|
||||
-- 建立者: Claude Code (Phase 25 P2)
|
||||
-- 對應模型: apps/api/src/models/drift.py
|
||||
-- 對應設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向三
|
||||
--
|
||||
-- 執行方式: psql -h 192.168.0.188 -U awoooi -d awoooi -f phase9_drift_reports.sql
|
||||
|
||||
CREATE TABLE IF NOT EXISTS drift_reports (
|
||||
-- 識別
|
||||
report_id VARCHAR(32) PRIMARY KEY,
|
||||
|
||||
-- 掃描資訊
|
||||
namespace VARCHAR(128) NOT NULL,
|
||||
triggered_by VARCHAR(64) NOT NULL DEFAULT 'cron', -- cron / webhook / api
|
||||
scanned_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
|
||||
-- 計數(非正規化,避免每次 JOIN)
|
||||
high_count INT NOT NULL DEFAULT 0,
|
||||
medium_count INT NOT NULL DEFAULT 0,
|
||||
info_count INT NOT NULL DEFAULT 0,
|
||||
|
||||
-- 漂移項目(JSONB 列表)
|
||||
items JSONB NOT NULL DEFAULT '[]',
|
||||
|
||||
-- Nemotron 意圖分析
|
||||
interpretation JSONB, -- DriftInterpretation,可為 NULL(尚未分析)
|
||||
|
||||
-- 處理狀態
|
||||
status VARCHAR(32) NOT NULL DEFAULT 'pending',
|
||||
-- pending / acknowledged / rolled_back / adopted / ignored
|
||||
|
||||
-- 時間軸
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
resolved_at TIMESTAMPTZ
|
||||
);
|
||||
|
||||
-- 索引
|
||||
CREATE INDEX IF NOT EXISTS idx_drift_reports_namespace
|
||||
ON drift_reports(namespace);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_drift_reports_status
|
||||
ON drift_reports(status);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_drift_reports_created_at
|
||||
ON drift_reports(created_at DESC);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_drift_reports_high_count
|
||||
ON drift_reports(high_count)
|
||||
WHERE high_count > 0;
|
||||
|
||||
-- 說明:
|
||||
-- 目前 API 使用 in-memory dict 暫存,此表供未來持久化使用
|
||||
-- 啟用持久化後,需在 drift.py 的 _recent_reports 操作改為 DB 寫入
|
||||
215
apps/api/src/api/v1/drift.py
Normal file
215
apps/api/src/api/v1/drift.py
Normal file
@@ -0,0 +1,215 @@
|
||||
"""
|
||||
Config Drift Detection API Router - Phase 25 P2
|
||||
================================================
|
||||
GitOps 守門員 HTTP 端點
|
||||
|
||||
leWOOOgo 積木化原則:
|
||||
- Router 層只做 HTTP 轉發
|
||||
- 不直接存取 Redis/DB
|
||||
- 業務邏輯委託給 Service 層
|
||||
|
||||
版本: v1.0
|
||||
建立: 2026-04-04 (台北時區)
|
||||
建立者: Claude Code (Phase 25 P2)
|
||||
"""
|
||||
|
||||
from fastapi import APIRouter, BackgroundTasks, HTTPException
|
||||
|
||||
from src.models.drift import (
|
||||
DriftListResponse,
|
||||
DriftReport,
|
||||
DriftScanRequest,
|
||||
DriftScanResponse,
|
||||
)
|
||||
from src.services.drift_analyzer import get_drift_analyzer
|
||||
from src.services.drift_detector import get_drift_detector
|
||||
from src.services.drift_interpreter import get_drift_interpreter
|
||||
from src.services.drift_remediator import get_drift_remediator
|
||||
|
||||
router = APIRouter(prefix="/drift", tags=["drift"])
|
||||
|
||||
# 本次 session 的漂移報告暫存(prod 應存 DB)
|
||||
_recent_reports: dict[str, DriftReport] = {}
|
||||
|
||||
|
||||
@router.post("/scan", response_model=DriftScanResponse, summary="觸發漂移掃描")
|
||||
async def trigger_drift_scan(
|
||||
request: DriftScanRequest,
|
||||
background_tasks: BackgroundTasks,
|
||||
) -> DriftScanResponse:
|
||||
"""
|
||||
觸發 Config Drift 掃描
|
||||
|
||||
- 比對 Git YAML vs K8s 實際狀態
|
||||
- Nemotron 分析漂移意圖
|
||||
- 高/中嚴重度漂移自動推送 Telegram
|
||||
|
||||
適合由 Gitea CD Webhook 或手動呼叫觸發
|
||||
"""
|
||||
detector = get_drift_detector()
|
||||
analyzer = get_drift_analyzer()
|
||||
interpreter = get_drift_interpreter()
|
||||
|
||||
all_items = []
|
||||
last_report: DriftReport | None = None
|
||||
|
||||
for namespace in request.namespaces:
|
||||
raw_report = await detector.scan(namespace, triggered_by=request.triggered_by)
|
||||
classified_report = analyzer.classify(raw_report)
|
||||
all_items.extend(classified_report.items)
|
||||
|
||||
if analyzer.needs_alert(classified_report):
|
||||
# Nemotron 意圖分析(背景執行,避免阻塞)
|
||||
background_tasks.add_task(
|
||||
_analyze_and_notify, classified_report
|
||||
)
|
||||
last_report = classified_report
|
||||
|
||||
# 暫存(最多 50 筆)
|
||||
_recent_reports[classified_report.report_id] = classified_report
|
||||
if len(_recent_reports) > 50:
|
||||
oldest_key = next(iter(_recent_reports))
|
||||
del _recent_reports[oldest_key]
|
||||
|
||||
# 若多 namespace,彙總第一個 report 的計數
|
||||
if last_report:
|
||||
return DriftScanResponse(
|
||||
report_id=last_report.report_id,
|
||||
summary=last_report.summary,
|
||||
high_count=last_report.high_count,
|
||||
medium_count=last_report.medium_count,
|
||||
info_count=last_report.info_count,
|
||||
has_critical_drift=last_report.has_critical_drift,
|
||||
)
|
||||
|
||||
return DriftScanResponse(
|
||||
report_id="no-drift",
|
||||
summary="無漂移",
|
||||
high_count=0,
|
||||
medium_count=0,
|
||||
info_count=0,
|
||||
has_critical_drift=False,
|
||||
)
|
||||
|
||||
|
||||
@router.get("/reports", response_model=DriftListResponse, summary="列出最近漂移報告")
|
||||
async def list_drift_reports() -> DriftListResponse:
|
||||
"""列出最近 50 筆漂移報告(倒序)"""
|
||||
items = list(reversed(list(_recent_reports.values())))
|
||||
return DriftListResponse(items=items, total=len(items))
|
||||
|
||||
|
||||
@router.post("/reports/{report_id}/rollback", summary="覆蓋回 Git 狀態")
|
||||
async def rollback_drift(report_id: str) -> dict:
|
||||
"""
|
||||
將 K8s 狀態覆蓋回 Git YAML(kubectl apply)
|
||||
|
||||
人工確認後才執行,DriftRemediator 負責確定性修復
|
||||
"""
|
||||
report = _recent_reports.get(report_id)
|
||||
if not report:
|
||||
raise HTTPException(status_code=404, detail=f"Report {report_id} not found")
|
||||
|
||||
remediator = get_drift_remediator()
|
||||
result = await remediator.rollback(report)
|
||||
return result
|
||||
|
||||
|
||||
@router.post("/reports/{report_id}/adopt", summary="承認變更並更新 Git")
|
||||
async def adopt_drift(report_id: str) -> dict:
|
||||
"""
|
||||
承認 K8s 漂移,更新 Git 使其與實際狀態一致
|
||||
|
||||
人工確認後才執行,git commit + push gitea main
|
||||
"""
|
||||
report = _recent_reports.get(report_id)
|
||||
if not report:
|
||||
raise HTTPException(status_code=404, detail=f"Report {report_id} not found")
|
||||
|
||||
remediator = get_drift_remediator()
|
||||
result = await remediator.adopt(report)
|
||||
return result
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Internal endpoint(供 K8s CronJob 呼叫)
|
||||
# =============================================================================
|
||||
|
||||
@router.post("/internal/scan", include_in_schema=False, summary="CronJob 觸發掃描")
|
||||
async def internal_scan(background_tasks: BackgroundTasks) -> dict:
|
||||
"""內部 CronJob 端點,每小時自動掃描 awoooi-prod"""
|
||||
from src.core.config import get_settings
|
||||
settings = get_settings()
|
||||
namespaces = getattr(settings, "DRIFT_SCAN_NAMESPACES", "awoooi-prod").split(",")
|
||||
|
||||
background_tasks.add_task(
|
||||
_run_full_scan,
|
||||
[ns.strip() for ns in namespaces],
|
||||
)
|
||||
return {"status": "scan_triggered", "namespaces": namespaces}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Background helpers
|
||||
# =============================================================================
|
||||
|
||||
async def _analyze_and_notify(report: DriftReport) -> None:
|
||||
"""背景:Nemotron 意圖分析 + Telegram 推送"""
|
||||
try:
|
||||
interpreter = get_drift_interpreter()
|
||||
analyzer = get_drift_analyzer()
|
||||
|
||||
interpretation = await interpreter.analyze(report)
|
||||
updated = report.model_copy(update={"interpretation": interpretation})
|
||||
_recent_reports[report.report_id] = updated
|
||||
|
||||
diff_summary = analyzer.format_diff_summary(report)
|
||||
intent_label = {
|
||||
"emergency_hotfix": "🚨 緊急 Hotfix",
|
||||
"human_error": "⚠️ 人為誤操作",
|
||||
"automated_change": "🤖 系統自動變更",
|
||||
"unknown": "❓ 意圖不明",
|
||||
}.get(interpretation.intent.value, "❓ 意圖不明")
|
||||
|
||||
try:
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
tg = get_telegram_gateway()
|
||||
await tg.send_text(
|
||||
f"🔍 <b>Config Drift 偵測</b>\n"
|
||||
f"Namespace: {report.namespace}\n"
|
||||
f"嚴重度: HIGH×{report.high_count} MEDIUM×{report.medium_count}\n\n"
|
||||
f"<b>意圖分析</b>: {intent_label}\n"
|
||||
f"{interpretation.explanation}\n"
|
||||
f"信心: {interpretation.confidence:.0%}\n\n"
|
||||
f"<b>漂移詳情</b>:\n{diff_summary}\n\n"
|
||||
f"Report ID: <code>{report.report_id}</code>\n"
|
||||
f"POST /api/v1/drift/reports/{report.report_id}/rollback — 覆蓋回 Git\n"
|
||||
f"POST /api/v1/drift/reports/{report.report_id}/adopt — 承認變更"
|
||||
)
|
||||
except Exception as e:
|
||||
import structlog
|
||||
structlog.get_logger(__name__).warning("drift_telegram_failed", error=str(e))
|
||||
|
||||
except Exception as e:
|
||||
import structlog
|
||||
structlog.get_logger(__name__).error("drift_analyze_notify_failed", error=str(e))
|
||||
|
||||
|
||||
async def _run_full_scan(namespaces: list[str]) -> None:
|
||||
"""背景:完整漂移掃描"""
|
||||
detector = get_drift_detector()
|
||||
analyzer = get_drift_analyzer()
|
||||
|
||||
for namespace in namespaces:
|
||||
try:
|
||||
raw = await detector.scan(namespace, triggered_by="cron")
|
||||
classified = analyzer.classify(raw)
|
||||
_recent_reports[classified.report_id] = classified
|
||||
|
||||
if analyzer.needs_alert(classified):
|
||||
await _analyze_and_notify(classified)
|
||||
except Exception as e:
|
||||
import structlog
|
||||
structlog.get_logger(__name__).error(
|
||||
"full_scan_namespace_failed", namespace=namespace, error=str(e)
|
||||
)
|
||||
@@ -84,6 +84,15 @@ class Settings(BaseSettings):
|
||||
default=True,
|
||||
description="Phase 22: True=異步更新 (先推 OpenClaw), False=同步等待",
|
||||
)
|
||||
# 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE Privacy-First 專用 timeout
|
||||
NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS: int = Field(
|
||||
default=30,
|
||||
description="Phase 25 P0: DIAGNOSE 任務 Nemotron timeout (秒),比 Tool Calling 短",
|
||||
)
|
||||
OLLAMA_DIAGNOSE_TIMEOUT_SECONDS: int = Field(
|
||||
default=60,
|
||||
description="Phase 25 P0: DIAGNOSE 任務 Ollama backup timeout (秒),Ollama 較慢",
|
||||
)
|
||||
|
||||
# ==========================================================================
|
||||
# CORS - 嚴格白名單 (無 UAT, 無 wildcard)
|
||||
|
||||
@@ -530,6 +530,12 @@ class KnowledgeEntryRecord(Base):
|
||||
nullable=True,
|
||||
comment="關聯 Playbook Redis Key",
|
||||
)
|
||||
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環攔截用症狀 hash (SymptomPattern.compute_hash())
|
||||
symptoms_hash: Mapped[str | None] = mapped_column(
|
||||
String(16),
|
||||
nullable=True,
|
||||
comment="症狀模式 hash (16字元 SHA256 前綴),Anti-Pattern 閉環攔截使用",
|
||||
)
|
||||
|
||||
# Metrics
|
||||
view_count: Mapped[int] = mapped_column(
|
||||
@@ -556,4 +562,6 @@ class KnowledgeEntryRecord(Base):
|
||||
Index("ix_knowledge_category", "category"),
|
||||
Index("ix_knowledge_status", "status"),
|
||||
Index("ix_knowledge_created_at", "created_at"),
|
||||
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 快速查詢
|
||||
Index("ix_knowledge_symptoms_hash", "symptoms_hash"),
|
||||
)
|
||||
|
||||
@@ -57,6 +57,7 @@ from src.api.v1 import (
|
||||
from src.api.v1 import (
|
||||
signoz_webhook as signoz_webhook_v1, # Phase 21: SignOz → Telegram (ADR-037)
|
||||
)
|
||||
from src.api.v1 import drift as drift_v1 # Phase 25 P2: Config Drift Detection
|
||||
from src.api.v1 import monitoring as monitoring_v1 # 2026-04-03: 監控工具狀態
|
||||
from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics
|
||||
from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway
|
||||
@@ -422,6 +423,9 @@ app.include_router(
|
||||
app.include_router(
|
||||
auto_repair_v1.router, prefix="/api/v1", tags=["Auto Repair"]
|
||||
) # #8: 自動升級決策
|
||||
app.include_router(
|
||||
drift_v1.router, prefix="/api/v1", tags=["Drift Detection"]
|
||||
) # Phase 25 P2: Config Drift Detection
|
||||
app.include_router(
|
||||
errors_v1.router, prefix="/api/v1", tags=["Errors"]
|
||||
) # #40: Sentry 錯誤 BFF API
|
||||
|
||||
155
apps/api/src/models/drift.py
Normal file
155
apps/api/src/models/drift.py
Normal file
@@ -0,0 +1,155 @@
|
||||
"""
|
||||
Config Drift Detection Models - Phase 25 P2
|
||||
============================================
|
||||
GitOps 守門員:偵測 K8s 實際狀態 vs Git YAML 的漂移
|
||||
|
||||
設計原則:
|
||||
- DriftDetector: 只比對,輸出結構化 Diff,不判斷嚴重性
|
||||
- DriftAnalyzer: 白名單過濾、DriftLevel 分級,不解釋意圖
|
||||
- NemotronDriftInterpreter: 意圖分析(不生成修復指令)
|
||||
- DriftRemediator: 確定性修復(kubectl apply / git push),不使用 AI 判斷
|
||||
|
||||
版本: v1.0
|
||||
建立: 2026-04-04 (台北時區)
|
||||
建立者: ogt (首席架構師設計) + Claude Code (實作)
|
||||
關聯設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向三
|
||||
關聯 ADR: 待起草 ADR-057
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from src.utils.timezone import now_taipei
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Enums
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class DriftLevel(str, Enum):
|
||||
"""漂移嚴重度分級"""
|
||||
INFO = "info" # 白名單欄位(replicas, resources)→ 靜默記錄
|
||||
MEDIUM = "medium" # 非關鍵欄位 → Telegram 通知,無需緊急處理
|
||||
HIGH = "high" # 關鍵欄位(image, env, ports)→ 立即通知,需確認
|
||||
|
||||
|
||||
class DriftIntent(str, Enum):
|
||||
"""Nemotron 意圖分析結果"""
|
||||
EMERGENCY_HOTFIX = "emergency_hotfix" # 繞過 CI 的緊急修補
|
||||
HUMAN_ERROR = "human_error" # 誤操作
|
||||
AUTOMATED_CHANGE = "automated_change" # 系統自動變更(HPA 等)
|
||||
UNKNOWN = "unknown" # 無法判斷
|
||||
|
||||
|
||||
class DriftStatus(str, Enum):
|
||||
"""漂移報告處理狀態"""
|
||||
PENDING = "pending" # 待處理
|
||||
ACKNOWLEDGED = "acknowledged" # 已知悉(不需要處理)
|
||||
ROLLED_BACK = "rolled_back" # 已覆蓋回 Git 狀態
|
||||
ADOPTED = "adopted" # 已承認(Git 已更新)
|
||||
IGNORED = "ignored" # 白名單忽略
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Core Models
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class DriftItem(BaseModel):
|
||||
"""單一欄位的漂移記錄"""
|
||||
resource_kind: str = Field(..., description="K8s 資源類型(Deployment, Service 等)")
|
||||
resource_name: str = Field(..., description="K8s 資源名稱")
|
||||
namespace: str = Field(..., description="K8s namespace")
|
||||
field_path: str = Field(..., description="欄位路徑(如 spec.template.spec.containers[0].image)")
|
||||
git_value: Any = Field(None, description="Git YAML 中的值")
|
||||
actual_value: Any = Field(None, description="K8s 中的實際值")
|
||||
drift_level: DriftLevel = DriftLevel.MEDIUM
|
||||
is_allowlisted: bool = False # 是否為白名單欄位(靜默記錄)
|
||||
|
||||
|
||||
class DriftInterpretation(BaseModel):
|
||||
"""Nemotron 意圖分析結果"""
|
||||
intent: DriftIntent = DriftIntent.UNKNOWN
|
||||
explanation: str = Field("", description="Nemotron 的意圖說明")
|
||||
risk: str = Field("MEDIUM", description="風險等級(HIGH/MEDIUM/LOW)")
|
||||
confidence: float = Field(0.0, ge=0.0, le=1.0, description="分析信心分數")
|
||||
|
||||
|
||||
class DriftReport(BaseModel):
|
||||
"""單次漂移掃描的完整報告"""
|
||||
report_id: str = Field(..., description="報告 ID")
|
||||
scanned_at: datetime = Field(default_factory=now_taipei)
|
||||
namespace: str = Field(..., description="掃描的 namespace")
|
||||
|
||||
# 漂移項目
|
||||
items: list[DriftItem] = Field(default_factory=list)
|
||||
high_count: int = 0
|
||||
medium_count: int = 0
|
||||
info_count: int = 0
|
||||
|
||||
# Nemotron 分析
|
||||
interpretation: DriftInterpretation | None = None
|
||||
|
||||
# 處理狀態
|
||||
status: DriftStatus = DriftStatus.PENDING
|
||||
|
||||
# 觸發來源
|
||||
triggered_by: str = Field("cron", description="觸發來源:cron / webhook / manual")
|
||||
|
||||
# 時間軸
|
||||
created_at: datetime = Field(default_factory=now_taipei)
|
||||
resolved_at: datetime | None = None
|
||||
|
||||
@property
|
||||
def has_critical_drift(self) -> bool:
|
||||
"""是否有需要立即處理的高嚴重度漂移"""
|
||||
return self.high_count > 0
|
||||
|
||||
@property
|
||||
def summary(self) -> str:
|
||||
"""單行摘要"""
|
||||
parts = []
|
||||
if self.high_count:
|
||||
parts.append(f"HIGH×{self.high_count}")
|
||||
if self.medium_count:
|
||||
parts.append(f"MEDIUM×{self.medium_count}")
|
||||
if self.info_count:
|
||||
parts.append(f"INFO×{self.info_count}")
|
||||
return ", ".join(parts) if parts else "無漂移"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# API Request / Response
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class DriftScanRequest(BaseModel):
|
||||
"""觸發漂移掃描 Request"""
|
||||
namespaces: list[str] = Field(
|
||||
default=["awoooi-prod"],
|
||||
description="要掃描的 namespace 列表",
|
||||
)
|
||||
triggered_by: str = Field(default="api", description="觸發來源")
|
||||
|
||||
|
||||
class DriftScanResponse(BaseModel):
|
||||
"""漂移掃描結果回應"""
|
||||
report_id: str
|
||||
summary: str
|
||||
high_count: int
|
||||
medium_count: int
|
||||
info_count: int
|
||||
has_critical_drift: bool
|
||||
interpretation: DriftInterpretation | None = None
|
||||
|
||||
|
||||
class DriftListResponse(BaseModel):
|
||||
"""漂移報告列表回應"""
|
||||
items: list[DriftReport]
|
||||
total: int
|
||||
@@ -33,6 +33,9 @@ class EntryType(str, Enum):
|
||||
RUNBOOK = "runbook" # 手動建立的操作手冊
|
||||
BEST_PRACTICE = "best_practice" # 最佳實踐文章
|
||||
POSTMORTEM = "postmortem" # 事後分析報告
|
||||
# 2026-04-04 ogt: Phase 25 P1 — Knowledge Auto-Harvesting 新增類型
|
||||
AUTO_RUNBOOK = "auto_runbook" # Nemotron 自動生成的 Runbook(DRAFT 待人工審核)
|
||||
ANTI_PATTERN = "anti_pattern" # 修復失敗案例(直接 PUBLISHED,阻斷後續重蹈覆轍)
|
||||
|
||||
|
||||
class EntrySource(str, Enum):
|
||||
@@ -47,6 +50,8 @@ class EntryStatus(str, Enum):
|
||||
REVIEW = "review" # 審核中
|
||||
APPROVED = "approved" # 已批准
|
||||
ARCHIVED = "archived" # 已封存
|
||||
# 2026-04-04 Claude Code: Phase 25 P1 — ANTI_PATTERN 直接發布,無需審核
|
||||
PUBLISHED = "published" # 已發布(ANTI_PATTERN 用,無需人工審核)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
@@ -61,8 +66,11 @@ class KnowledgeEntryCreate(BaseModel):
|
||||
category: str = Field(..., min_length=1, max_length=100)
|
||||
tags: list[str] = Field(default_factory=list)
|
||||
source: EntrySource = EntrySource.HUMAN
|
||||
status: EntryStatus = EntryStatus.DRAFT
|
||||
related_incident_id: str | None = None
|
||||
related_playbook_id: str | None = None
|
||||
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環用症狀 hash
|
||||
symptoms_hash: str | None = None
|
||||
created_by: str | None = None
|
||||
|
||||
|
||||
@@ -88,6 +96,8 @@ class KnowledgeEntry(BaseModel):
|
||||
status: EntryStatus = EntryStatus.DRAFT
|
||||
related_incident_id: str | None = None
|
||||
related_playbook_id: str | None = None
|
||||
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環攔截用的症狀 hash(SymptomPattern.compute_hash())
|
||||
symptoms_hash: str | None = None
|
||||
view_count: int = 0
|
||||
created_by: str | None = None
|
||||
created_at: datetime = Field(default_factory=now_taipei)
|
||||
|
||||
@@ -97,6 +97,21 @@ class SymptomPattern(BaseModel):
|
||||
|
||||
model_config = ConfigDict(extra="ignore")
|
||||
|
||||
def compute_hash(self) -> str:
|
||||
"""
|
||||
2026-04-04 Claude Code: Phase 25 P1 — Anti-Pattern 閉環攔截用
|
||||
確定性 hash:alert_names + affected_services + label_patterns
|
||||
目的:O(1) 精確比對,避免純語意搜尋的模糊性
|
||||
"""
|
||||
import hashlib
|
||||
import json
|
||||
key = (
|
||||
"|".join(sorted(self.alert_names)) + "||"
|
||||
+ "|".join(sorted(self.affected_services)) + "||"
|
||||
+ json.dumps(self.label_patterns, sort_keys=True)
|
||||
)
|
||||
return hashlib.sha256(key.encode()).hexdigest()[:16]
|
||||
|
||||
|
||||
class RepairStep(BaseModel):
|
||||
"""
|
||||
|
||||
@@ -45,8 +45,12 @@ class KnowledgeDBRepository:
|
||||
category=data.category,
|
||||
tags=data.tags,
|
||||
source=data.source,
|
||||
# 2026-04-04 ogt: Phase 25 P1 — 支援指定 status(ANTI_PATTERN 直接 PUBLISHED)
|
||||
status=data.status,
|
||||
related_incident_id=data.related_incident_id,
|
||||
related_playbook_id=data.related_playbook_id,
|
||||
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環用症狀 hash
|
||||
symptoms_hash=data.symptoms_hash,
|
||||
created_by=data.created_by,
|
||||
)
|
||||
self.db.add(record)
|
||||
@@ -268,6 +272,7 @@ class KnowledgeDBRepository:
|
||||
status=record.status,
|
||||
related_incident_id=record.related_incident_id,
|
||||
related_playbook_id=record.related_playbook_id,
|
||||
symptoms_hash=getattr(record, "symptoms_hash", None),
|
||||
view_count=record.view_count,
|
||||
created_by=record.created_by,
|
||||
created_at=record.created_at,
|
||||
|
||||
@@ -160,7 +160,13 @@ class NemotronProvider:
|
||||
"""
|
||||
|
||||
try:
|
||||
timeout = getattr(settings, "NEMOTRON_TIMEOUT_SECONDS", 30)
|
||||
# 2026-04-04 ogt: Phase 25 P0 — 根據 task_type 選擇 timeout
|
||||
# DIAGNOSE 用較短 timeout(30s),避免拖累整體 AutoRepair 流程
|
||||
task_type = context.get("task_type", "")
|
||||
if task_type == "diagnose":
|
||||
timeout = getattr(settings, "NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS", 30)
|
||||
else:
|
||||
timeout = getattr(settings, "NEMOTRON_TIMEOUT_SECONDS", 45)
|
||||
nvidia = self._get_nvidia()
|
||||
|
||||
result = await asyncio.wait_for(
|
||||
|
||||
@@ -30,6 +30,7 @@ AI Router - Phase 13.3 #87
|
||||
| v2.0 | 2026-03-26 | Claude Code | 支援 IntentResult + 新意圖類型 |
|
||||
| v3.0 | 2026-03-26 | Claude Code | Phase 13.3 #87 完整路由決策矩陣 |
|
||||
| v4.0 | 2026-04-02 | ogt (首席架構師) | Phase 24 AIProvider Registry + Executor; C1 Langfuse Trace; C2 AIRouter.route(); C3 型別 typo; I4 Protocol close |
|
||||
| v4.1 | 2026-04-04 | ogt (首席架構師) | Phase 25 P0: DIAGNOSE Privacy-First — _local_fallback_chain; DIAGNOSE→NEMOTRON; REJECT+Telegram |
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -246,13 +247,22 @@ class AIRouter:
|
||||
(AIProviderEnum.CLAUDE, self._claude_default),
|
||||
]
|
||||
|
||||
# 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE/FORCE_LOCAL 專用鏈
|
||||
# 隱私邊界:絕不包含任何雲端 Provider,到 OLLAMA 為止
|
||||
self._local_fallback_chain: list[tuple[AIProviderEnum, str]] = [
|
||||
(AIProviderEnum.NEMOTRON, self._nemotron_default), # NIM 188,主力(零費用,高能力)
|
||||
(AIProviderEnum.OLLAMA, self._ollama_summary), # Ollama 188,備援(慢但可靠)
|
||||
]
|
||||
|
||||
# 意圖對應 Provider 強制覆寫 (None = 依複雜度決定)
|
||||
self._intent_provider_overrides: dict[IntentType, AIProviderEnum | None] = {
|
||||
# 四大核心意圖
|
||||
IntentType.RESTART: None, # 依複雜度
|
||||
IntentType.SCALE: None, # 依複雜度
|
||||
IntentType.CONFIG: None, # 依複雜度 (但 HIGH 會升級)
|
||||
IntentType.DIAGNOSE: AIProviderEnum.OLLAMA, # 診斷優先本地 (隱私)
|
||||
# 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE 改為 NEMOTRON (NIM 188)
|
||||
# 原因: 零費用本地 NIM + 高能力; 搭配 _local_fallback_chain 保證不觸碰雲端
|
||||
IntentType.DIAGNOSE: AIProviderEnum.NEMOTRON, # 診斷優先 NIM 本地 (隱私)
|
||||
# 輔助意圖
|
||||
IntentType.DELETE: AIProviderEnum.CLAUDE, # CRITICAL → 強制 Claude
|
||||
IntentType.ROLLBACK: None, # 依複雜度
|
||||
@@ -308,7 +318,11 @@ class AIRouter:
|
||||
)
|
||||
|
||||
# Step 4: 建立 Fallback 鏈
|
||||
fallback_chain = self._build_fallback_chain(provider)
|
||||
# 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE 使用 local-only 鏈(隱私邊界)
|
||||
if intent == IntentType.DIAGNOSE:
|
||||
fallback_chain = [fc for fc in self._local_fallback_chain if fc[0] != provider]
|
||||
else:
|
||||
fallback_chain = self._build_fallback_chain(provider)
|
||||
|
||||
# Step 5: 計算延遲預算
|
||||
latency_budget = PROVIDER_LATENCY_BUDGET.get(provider, 30000)
|
||||
@@ -398,10 +412,11 @@ class AIRouter:
|
||||
provider_override = self._intent_provider_overrides.get(intent)
|
||||
if provider_override is not None:
|
||||
provider = provider_override
|
||||
# 2026-04-03 ogt: DIAGNOSE/ALERT_TRIAGE 用 summary model (llama3.2:3b)
|
||||
# 2026-04-03 ogt: ALERT_TRIAGE/QUERY 用 Ollama summary model (llama3.2:3b)
|
||||
# 避免 qwen2.5:7b-instruct 90秒 timeout 導致全鏈路失敗 (Phase 24 A選項)
|
||||
# 2026-04-04 ogt: DIAGNOSE 已改為 NEMOTRON,不走這條分支
|
||||
if provider == AIProviderEnum.OLLAMA and intent in (
|
||||
IntentType.DIAGNOSE, IntentType.ALERT_TRIAGE, IntentType.QUERY
|
||||
IntentType.ALERT_TRIAGE, IntentType.QUERY
|
||||
):
|
||||
model = self._ollama_summary
|
||||
else:
|
||||
@@ -951,6 +966,29 @@ class AIRouterExecutor:
|
||||
_lf_trace_ctx.__exit__(None, None, None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 2026-04-04 ogt: Phase 25 P0 — require_local 全部失敗時 Telegram 通知(隱私邊界)
|
||||
if require_local:
|
||||
try:
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
tg = get_telegram_gateway()
|
||||
import asyncio as _asyncio
|
||||
_asyncio.create_task(
|
||||
tg.send_text(
|
||||
"⚠️ <b>DIAGNOSE 本地 Provider 不可用</b>\n"
|
||||
f"已嘗試: {', '.join(provider_order)}\n"
|
||||
"需要人工介入,雲端 Provider 不會被呼叫(隱私邊界)。"
|
||||
)
|
||||
)
|
||||
except Exception as _tg_e:
|
||||
logger.warning("diagnose_reject_telegram_failed", error=str(_tg_e))
|
||||
return AIResult(
|
||||
raw_response="",
|
||||
success=False,
|
||||
provider="none",
|
||||
error="local_providers_unavailable",
|
||||
)
|
||||
|
||||
return AIResult(
|
||||
raw_response="",
|
||||
success=False,
|
||||
|
||||
@@ -143,6 +143,9 @@ class AutoRepairService:
|
||||
# 2026-04-01 ogt: 注入 cooldown_checker 支援測試隔離 (DI 原則)
|
||||
self._playbook_service = playbook_service or get_playbook_service()
|
||||
self._cooldown_checker = cooldown_checker or check_global_repair_cooldown
|
||||
# 2026-04-04 Claude Code: Phase 25 P1 — 持有 runbook_generator task 引用,防 GC 回收
|
||||
import asyncio
|
||||
self._pending_tasks: set[asyncio.Task] = set()
|
||||
|
||||
async def evaluate_auto_repair(
|
||||
self,
|
||||
@@ -196,6 +199,33 @@ class AutoRepairService:
|
||||
# 2. 提取症狀模式
|
||||
symptoms = self._extract_symptoms(incident)
|
||||
|
||||
# 2.1 2026-04-04 Claude Code: Phase 25 P1 — Anti-Pattern 閘門
|
||||
# 根據確定性 hash 比對近 7 天失敗案例,避免 AI 在同一個坑重複摔倒
|
||||
try:
|
||||
from src.services.knowledge_service import get_knowledge_service
|
||||
symptoms_hash = symptoms.compute_hash()
|
||||
anti_patterns = await get_knowledge_service().check_anti_pattern(
|
||||
symptoms_hash, days=7
|
||||
)
|
||||
if anti_patterns:
|
||||
ap = anti_patterns[0]
|
||||
logger.warning(
|
||||
"auto_repair_blocked_anti_pattern",
|
||||
incident_id=incident.incident_id,
|
||||
symptoms_hash=symptoms_hash,
|
||||
anti_pattern_id=ap.id,
|
||||
anti_pattern_title=ap.title,
|
||||
)
|
||||
return AutoRepairDecision(
|
||||
can_auto_repair=False,
|
||||
reason=f"過去 7 天有失敗案例: {ap.title}",
|
||||
blocked_by="ANTI_PATTERN",
|
||||
)
|
||||
except Exception as _ap_e:
|
||||
# Anti-Pattern 閘門失敗不阻塞主流程(僅記錄)
|
||||
logger.warning("anti_pattern_gate_error", error=str(_ap_e))
|
||||
symptoms_hash = ""
|
||||
|
||||
# 3. 找匹配的 Playbook
|
||||
recommendations = await self._playbook_service.get_recommendations(
|
||||
symptoms=symptoms,
|
||||
@@ -324,7 +354,7 @@ class AutoRepairService:
|
||||
execution_time_ms=execution_time,
|
||||
)
|
||||
|
||||
return AutoRepairResult(
|
||||
repair_result = AutoRepairResult(
|
||||
success=True,
|
||||
playbook_id=playbook.playbook_id,
|
||||
incident_id=incident.incident_id,
|
||||
@@ -332,6 +362,25 @@ class AutoRepairService:
|
||||
execution_time_ms=execution_time,
|
||||
)
|
||||
|
||||
# 2026-04-04 Claude Code: Phase 25 P1 — 成功修復後 fire-and-forget 生成 AUTO_RUNBOOK
|
||||
try:
|
||||
from src.services.runbook_generator import get_runbook_generator
|
||||
symptoms = self._extract_symptoms(incident)
|
||||
symptoms_hash = symptoms.compute_hash()
|
||||
gen = get_runbook_generator()
|
||||
import asyncio as _asyncio
|
||||
task = _asyncio.create_task(
|
||||
gen.generate_runbook(incident, playbook, repair_result, symptoms_hash)
|
||||
)
|
||||
self._pending_tasks.add(task) if hasattr(self, "_pending_tasks") else None
|
||||
task.add_done_callback(
|
||||
lambda t: self._pending_tasks.discard(t) if hasattr(self, "_pending_tasks") else None
|
||||
)
|
||||
except Exception as _rg_e:
|
||||
logger.warning("runbook_generator_task_failed", error=str(_rg_e))
|
||||
|
||||
return repair_result
|
||||
|
||||
except Exception as e:
|
||||
# 更新失敗統計
|
||||
await self._playbook_service.record_execution(
|
||||
@@ -348,7 +397,7 @@ class AutoRepairService:
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
return AutoRepairResult(
|
||||
fail_result = AutoRepairResult(
|
||||
success=False,
|
||||
playbook_id=playbook.playbook_id,
|
||||
incident_id=incident.incident_id,
|
||||
@@ -357,6 +406,21 @@ class AutoRepairService:
|
||||
execution_time_ms=execution_time,
|
||||
)
|
||||
|
||||
# 2026-04-04 Claude Code: Phase 25 P1 — 失敗修復後 fire-and-forget 生成 ANTI_PATTERN
|
||||
try:
|
||||
from src.services.runbook_generator import get_runbook_generator
|
||||
symptoms = self._extract_symptoms(incident)
|
||||
symptoms_hash = symptoms.compute_hash()
|
||||
gen = get_runbook_generator()
|
||||
import asyncio as _asyncio
|
||||
_asyncio.create_task(
|
||||
gen.generate_anti_pattern(incident, playbook, fail_result, symptoms_hash)
|
||||
)
|
||||
except Exception as _ap_e:
|
||||
logger.warning("anti_pattern_task_failed", error=str(_ap_e))
|
||||
|
||||
return fail_result
|
||||
|
||||
# === Private Helpers ===
|
||||
|
||||
def _extract_symptoms(self, incident: Incident) -> SymptomPattern:
|
||||
|
||||
106
apps/api/src/services/drift_analyzer.py
Normal file
106
apps/api/src/services/drift_analyzer.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""
|
||||
Drift Analyzer - Phase 25 P2 Config Drift Detection
|
||||
=====================================================
|
||||
職責:白名單過濾、DriftLevel 分級
|
||||
不解釋意圖,不生成修復指令
|
||||
|
||||
版本: v1.0
|
||||
建立: 2026-04-04 (台北時區)
|
||||
建立者: ogt (首席架構師設計) + Claude Code (實作)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import structlog
|
||||
|
||||
from src.models.drift import DriftItem, DriftLevel, DriftReport, DriftStatus
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
class DriftAnalyzer:
|
||||
"""
|
||||
分析 DriftReport,決定哪些漂移需要告警、哪些靜默記錄
|
||||
|
||||
職責邊界:只分級,不解釋意圖,不生成修復指令
|
||||
"""
|
||||
|
||||
def classify(self, report: DriftReport) -> DriftReport:
|
||||
"""
|
||||
根據 DriftLevel 分類漂移項目,更新計數
|
||||
|
||||
- INFO(白名單)→ 靜默記錄,status 保持 PENDING
|
||||
- MEDIUM → 需通知,但非緊急
|
||||
- HIGH → 立即告警
|
||||
|
||||
Returns:
|
||||
更新後的 DriftReport(immutable-friendly:回傳新 report)
|
||||
"""
|
||||
high_count = 0
|
||||
medium_count = 0
|
||||
info_count = 0
|
||||
|
||||
for item in report.items:
|
||||
if item.drift_level == DriftLevel.HIGH:
|
||||
high_count += 1
|
||||
elif item.drift_level == DriftLevel.MEDIUM:
|
||||
medium_count += 1
|
||||
else:
|
||||
info_count += 1
|
||||
|
||||
# 若只有 INFO 漂移,直接標記為 IGNORED(不需人工處理)
|
||||
status = report.status
|
||||
if high_count == 0 and medium_count == 0 and info_count > 0:
|
||||
status = DriftStatus.IGNORED
|
||||
logger.info(
|
||||
"drift_all_allowlisted",
|
||||
report_id=report.report_id,
|
||||
info_count=info_count,
|
||||
)
|
||||
elif high_count == 0 and medium_count == 0:
|
||||
status = DriftStatus.IGNORED
|
||||
|
||||
return report.model_copy(update={
|
||||
"high_count": high_count,
|
||||
"medium_count": medium_count,
|
||||
"info_count": info_count,
|
||||
"status": status,
|
||||
})
|
||||
|
||||
def needs_alert(self, report: DriftReport) -> bool:
|
||||
"""是否需要 Telegram 告警"""
|
||||
return report.high_count > 0 or report.medium_count > 0
|
||||
|
||||
def format_diff_summary(self, report: DriftReport) -> str:
|
||||
"""格式化漂移差異摘要(給 Telegram 用)"""
|
||||
if not report.items:
|
||||
return "無漂移"
|
||||
|
||||
lines = []
|
||||
# HIGH 優先顯示
|
||||
for item in sorted(report.items, key=lambda i: (i.drift_level != DriftLevel.HIGH, i.field_path)):
|
||||
if item.is_allowlisted:
|
||||
continue
|
||||
level_label = "🔴" if item.drift_level == DriftLevel.HIGH else "🟡"
|
||||
lines.append(
|
||||
f"{level_label} {item.resource_kind}/{item.resource_name}.{item.field_path}\n"
|
||||
f" Git: {str(item.git_value)[:60]}\n"
|
||||
f" K8s: {str(item.actual_value)[:60]}"
|
||||
)
|
||||
if len(lines) >= 5: # 最多顯示 5 項,避免訊息過長
|
||||
remaining = report.high_count + report.medium_count - len(lines)
|
||||
if remaining > 0:
|
||||
lines.append(f"... 另有 {remaining} 項漂移")
|
||||
break
|
||||
|
||||
return "\n".join(lines) if lines else f"共 {report.info_count} 項白名單漂移(已靜默)"
|
||||
|
||||
|
||||
_analyzer: DriftAnalyzer | None = None
|
||||
|
||||
|
||||
def get_drift_analyzer() -> DriftAnalyzer:
|
||||
global _analyzer
|
||||
if _analyzer is None:
|
||||
_analyzer = DriftAnalyzer()
|
||||
return _analyzer
|
||||
328
apps/api/src/services/drift_detector.py
Normal file
328
apps/api/src/services/drift_detector.py
Normal file
@@ -0,0 +1,328 @@
|
||||
"""
|
||||
Drift Detector - Phase 25 P2 Config Drift Detection
|
||||
=====================================================
|
||||
職責:比對 Git YAML vs K8s 實際狀態,輸出結構化 DriftItem 列表
|
||||
不判斷嚴重性,不解釋意圖,只做事實比對
|
||||
|
||||
版本: v1.0
|
||||
建立: 2026-04-04 (台北時區)
|
||||
建立者: ogt (首席架構師設計) + Claude Code (實作)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import subprocess
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
import yaml
|
||||
|
||||
from src.models.drift import DriftItem, DriftLevel, DriftReport
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# 白名單欄位(靜默記錄,不告警)
|
||||
_DEFAULT_ALLOWLIST_FIELDS = frozenset([
|
||||
"spec.replicas",
|
||||
"spec.template.spec.containers[*].resources.requests",
|
||||
"spec.template.spec.containers[*].resources.limits",
|
||||
"metadata.annotations",
|
||||
"metadata.labels.pod-template-hash",
|
||||
"metadata.resourceVersion",
|
||||
"metadata.generation",
|
||||
"metadata.uid",
|
||||
"status",
|
||||
])
|
||||
|
||||
# 關鍵欄位(必須立即告警)
|
||||
_DEFAULT_CRITICAL_FIELDS = frozenset([
|
||||
"spec.template.spec.containers[*].image",
|
||||
"spec.template.spec.containers[*].env",
|
||||
"spec.template.spec.containers[*].ports",
|
||||
"spec.template.spec.volumes",
|
||||
"spec.template.spec.serviceAccountName",
|
||||
])
|
||||
|
||||
|
||||
class GitStateReader:
|
||||
"""從 Git HEAD 讀取 K8s YAML 狀態"""
|
||||
|
||||
def __init__(self, k8s_dir: str = "k8s"):
|
||||
self._k8s_dir = Path(k8s_dir)
|
||||
|
||||
async def read(self, namespace: str) -> dict[str, Any]:
|
||||
"""
|
||||
讀取 Git HEAD 中指定 namespace 的所有 K8s YAML
|
||||
|
||||
Returns:
|
||||
{resource_key: parsed_yaml_dict}
|
||||
resource_key 格式: "{kind}/{name}"
|
||||
"""
|
||||
try:
|
||||
result = await asyncio.get_event_loop().run_in_executor(
|
||||
None, self._read_sync, namespace
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.warning("git_state_read_failed", namespace=namespace, error=str(e))
|
||||
return {}
|
||||
|
||||
def _read_sync(self, namespace: str) -> dict[str, Any]:
|
||||
resources: dict[str, Any] = {}
|
||||
|
||||
if not self._k8s_dir.exists():
|
||||
logger.warning("k8s_dir_not_found", path=str(self._k8s_dir))
|
||||
return resources
|
||||
|
||||
for yaml_file in self._k8s_dir.rglob("*.yaml"):
|
||||
try:
|
||||
with open(yaml_file) as f:
|
||||
docs = list(yaml.safe_load_all(f))
|
||||
for doc in docs:
|
||||
if not doc or not isinstance(doc, dict):
|
||||
continue
|
||||
metadata = doc.get("metadata", {})
|
||||
ns = metadata.get("namespace", "")
|
||||
if ns and ns != namespace:
|
||||
continue
|
||||
kind = doc.get("kind", "")
|
||||
name = metadata.get("name", "")
|
||||
if kind and name:
|
||||
key = f"{kind}/{name}"
|
||||
resources[key] = doc
|
||||
except Exception as e:
|
||||
logger.debug("yaml_parse_failed", file=str(yaml_file), error=str(e))
|
||||
|
||||
return resources
|
||||
|
||||
|
||||
class K8sStateReader:
|
||||
"""從 kubectl 讀取 K8s 實際狀態"""
|
||||
|
||||
async def read(self, namespace: str) -> dict[str, Any]:
|
||||
"""
|
||||
透過 kubectl 取得指定 namespace 的實際狀態
|
||||
|
||||
Returns:
|
||||
{resource_key: actual_resource_dict}
|
||||
"""
|
||||
try:
|
||||
result = await asyncio.get_event_loop().run_in_executor(
|
||||
None, self._read_sync, namespace
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
logger.warning("k8s_state_read_failed", namespace=namespace, error=str(e))
|
||||
return {}
|
||||
|
||||
def _read_sync(self, namespace: str) -> dict[str, Any]:
|
||||
resources: dict[str, Any] = {}
|
||||
resource_types = ["deployment", "service", "configmap", "ingress"]
|
||||
|
||||
for rtype in resource_types:
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
["kubectl", "get", rtype, "-n", namespace, "-o", "yaml"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
logger.debug("kubectl_failed", type=rtype, stderr=proc.stderr[:200])
|
||||
continue
|
||||
|
||||
data = yaml.safe_load(proc.stdout)
|
||||
if not data or data.get("kind") != "List":
|
||||
continue
|
||||
|
||||
for item in data.get("items", []):
|
||||
kind = item.get("kind", rtype.capitalize())
|
||||
name = item.get("metadata", {}).get("name", "")
|
||||
if name:
|
||||
key = f"{kind}/{name}"
|
||||
resources[key] = item
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning("kubectl_timeout", type=rtype, namespace=namespace)
|
||||
except Exception as e:
|
||||
logger.warning("kubectl_error", type=rtype, error=str(e))
|
||||
|
||||
return resources
|
||||
|
||||
|
||||
class DriftDetector:
|
||||
"""
|
||||
比對 Git vs K8s 實際狀態,輸出 DriftItem 列表
|
||||
|
||||
職責邊界:只做事實比對,不判斷嚴重性,不解釋意圖
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
k8s_dir: str = "k8s",
|
||||
allowlist_fields: frozenset | None = None,
|
||||
critical_fields: frozenset | None = None,
|
||||
):
|
||||
self._git_reader = GitStateReader(k8s_dir)
|
||||
self._k8s_reader = K8sStateReader()
|
||||
self._allowlist = allowlist_fields or _DEFAULT_ALLOWLIST_FIELDS
|
||||
self._critical_fields = critical_fields or _DEFAULT_CRITICAL_FIELDS
|
||||
|
||||
async def scan(self, namespace: str, triggered_by: str = "cron") -> DriftReport:
|
||||
"""
|
||||
掃描指定 namespace 的漂移
|
||||
|
||||
Args:
|
||||
namespace: K8s namespace
|
||||
triggered_by: 觸發來源(cron / webhook / api)
|
||||
|
||||
Returns:
|
||||
DriftReport(含 DriftItem 列表,尚未分析 intent)
|
||||
"""
|
||||
report_id = str(uuid.uuid4())[:8]
|
||||
|
||||
logger.info("drift_scan_start", namespace=namespace, report_id=report_id)
|
||||
|
||||
git_state, k8s_state = await asyncio.gather(
|
||||
self._git_reader.read(namespace),
|
||||
self._k8s_reader.read(namespace),
|
||||
)
|
||||
|
||||
items: list[DriftItem] = []
|
||||
|
||||
# 比對 Git 中有的資源
|
||||
for resource_key, git_resource in git_state.items():
|
||||
actual_resource = k8s_state.get(resource_key)
|
||||
if actual_resource is None:
|
||||
# 資源在 Git 中存在但 K8s 中不存在(可能尚未部署)
|
||||
logger.debug("resource_missing_in_k8s", resource=resource_key)
|
||||
continue
|
||||
|
||||
kind, name = resource_key.split("/", 1)
|
||||
diffs = self._diff_resources(git_resource, actual_resource, kind, name, namespace)
|
||||
items.extend(diffs)
|
||||
|
||||
high_count = sum(1 for i in items if i.drift_level == DriftLevel.HIGH)
|
||||
medium_count = sum(1 for i in items if i.drift_level == DriftLevel.MEDIUM)
|
||||
info_count = sum(1 for i in items if i.drift_level == DriftLevel.INFO)
|
||||
|
||||
logger.info(
|
||||
"drift_scan_done",
|
||||
namespace=namespace,
|
||||
report_id=report_id,
|
||||
high=high_count,
|
||||
medium=medium_count,
|
||||
info=info_count,
|
||||
)
|
||||
|
||||
return DriftReport(
|
||||
report_id=report_id,
|
||||
namespace=namespace,
|
||||
items=items,
|
||||
high_count=high_count,
|
||||
medium_count=medium_count,
|
||||
info_count=info_count,
|
||||
triggered_by=triggered_by,
|
||||
)
|
||||
|
||||
def _diff_resources(
|
||||
self,
|
||||
git_res: dict,
|
||||
actual_res: dict,
|
||||
kind: str,
|
||||
name: str,
|
||||
namespace: str,
|
||||
) -> list[DriftItem]:
|
||||
"""逐欄位比對兩個資源,回傳差異列表"""
|
||||
items: list[DriftItem] = []
|
||||
|
||||
# 只比對 spec 層(metadata 的動態欄位太多)
|
||||
git_spec = git_res.get("spec", {})
|
||||
actual_spec = actual_res.get("spec", {})
|
||||
|
||||
diffs = self._flatten_diff("spec", git_spec, actual_spec)
|
||||
for field_path, (git_val, actual_val) in diffs.items():
|
||||
is_allowlisted = self._is_allowlisted(field_path)
|
||||
if is_allowlisted:
|
||||
level = DriftLevel.INFO
|
||||
elif self._is_critical(field_path):
|
||||
level = DriftLevel.HIGH
|
||||
else:
|
||||
level = DriftLevel.MEDIUM
|
||||
|
||||
items.append(DriftItem(
|
||||
resource_kind=kind,
|
||||
resource_name=name,
|
||||
namespace=namespace,
|
||||
field_path=field_path,
|
||||
git_value=git_val,
|
||||
actual_value=actual_val,
|
||||
drift_level=level,
|
||||
is_allowlisted=is_allowlisted,
|
||||
))
|
||||
|
||||
return items
|
||||
|
||||
def _flatten_diff(
|
||||
self,
|
||||
prefix: str,
|
||||
git_dict: Any,
|
||||
actual_dict: Any,
|
||||
) -> dict[str, tuple[Any, Any]]:
|
||||
"""遞迴展開並比對兩個 dict,回傳 {field_path: (git_val, actual_val)}"""
|
||||
diffs: dict[str, tuple[Any, Any]] = {}
|
||||
|
||||
if not isinstance(git_dict, dict) or not isinstance(actual_dict, dict):
|
||||
if git_dict != actual_dict:
|
||||
diffs[prefix] = (git_dict, actual_dict)
|
||||
return diffs
|
||||
|
||||
all_keys = set(git_dict.keys()) | set(actual_dict.keys())
|
||||
for key in all_keys:
|
||||
path = f"{prefix}.{key}"
|
||||
git_val = git_dict.get(key)
|
||||
actual_val = actual_dict.get(key)
|
||||
|
||||
if git_val == actual_val:
|
||||
continue
|
||||
|
||||
if isinstance(git_val, dict) and isinstance(actual_val, dict):
|
||||
diffs.update(self._flatten_diff(path, git_val, actual_val))
|
||||
else:
|
||||
diffs[path] = (git_val, actual_val)
|
||||
|
||||
return diffs
|
||||
|
||||
def _is_allowlisted(self, field_path: str) -> bool:
|
||||
"""判斷欄位是否在白名單(靜默記錄不告警)"""
|
||||
for pattern in self._allowlist:
|
||||
# 簡單前綴匹配(*替換為粗略包含)
|
||||
clean_pattern = pattern.replace("[*]", "")
|
||||
if field_path.startswith(clean_pattern.replace("*", "")):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _is_critical(self, field_path: str) -> bool:
|
||||
"""判斷欄位是否為關鍵欄位(HIGH 等級)"""
|
||||
for pattern in self._critical_fields:
|
||||
clean_pattern = pattern.replace("[*]", "")
|
||||
if clean_pattern.replace("*", "") in field_path:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Singleton
|
||||
# =============================================================================
|
||||
|
||||
_detector: DriftDetector | None = None
|
||||
|
||||
|
||||
def get_drift_detector() -> DriftDetector:
|
||||
global _detector
|
||||
if _detector is None:
|
||||
_detector = DriftDetector()
|
||||
return _detector
|
||||
173
apps/api/src/services/drift_interpreter.py
Normal file
173
apps/api/src/services/drift_interpreter.py
Normal file
@@ -0,0 +1,173 @@
|
||||
"""
|
||||
Drift Interpreter - Phase 25 P2 Config Drift Detection
|
||||
=======================================================
|
||||
職責:Nemotron 意圖分析(不生成修復指令)
|
||||
只回答「這是人為操作?Hotfix?系統自動變更?」
|
||||
|
||||
設計邊界(核心原則):
|
||||
- 只輸出意圖分析,不生成 kubectl 或 git 指令
|
||||
- 確定性修復由 DriftRemediator 負責
|
||||
- Nemotron 超時 → UNKNOWN,不阻塞主流程
|
||||
|
||||
版本: v1.0
|
||||
建立: 2026-04-04 (台北時區)
|
||||
建立者: ogt (首席架構師設計) + Claude Code (實作)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
|
||||
from src.models.drift import DriftIntent, DriftInterpretation, DriftItem
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from src.models.drift import DriftReport
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
_INTENT_PROMPT_TEMPLATE = """你是 AWOOOI GitOps 守門員,請分析以下 K8s 配置漂移的意圖。
|
||||
|
||||
## 漂移詳情
|
||||
{diff_summary}
|
||||
|
||||
## 任務
|
||||
判斷這次漂移最可能的原因:
|
||||
- emergency_hotfix: 繞過 CI 的緊急修補(image tag 改變但無對應 Git commit)
|
||||
- human_error: 誤操作(非預期的隨機欄位改變)
|
||||
- automated_change: 系統自動變更(HPA replicas, 系統注入的 annotation 等)
|
||||
- unknown: 無法判斷
|
||||
|
||||
請以 JSON 回應:
|
||||
{{
|
||||
"intent": "emergency_hotfix|human_error|automated_change|unknown",
|
||||
"explanation": "用繁體中文解釋你的判斷理由(一句話)",
|
||||
"risk": "HIGH|MEDIUM|LOW",
|
||||
"confidence": 0.0到1.0之間的數字
|
||||
}}
|
||||
|
||||
只輸出 JSON,不要任何額外說明。
|
||||
"""
|
||||
|
||||
|
||||
class NemotronDriftInterpreter:
|
||||
"""
|
||||
使用 Nemotron 分析漂移意圖
|
||||
|
||||
職責邊界:
|
||||
✅ 輸出意圖分析
|
||||
❌ 不生成修復指令
|
||||
❌ 不直接呼叫 kubectl 或 git
|
||||
"""
|
||||
|
||||
async def analyze(self, report: "DriftReport") -> DriftInterpretation:
|
||||
"""
|
||||
分析漂移意圖
|
||||
|
||||
Args:
|
||||
report: 已分類的 DriftReport
|
||||
|
||||
Returns:
|
||||
DriftInterpretation(超時或失敗時回傳 UNKNOWN)
|
||||
"""
|
||||
if not report.items or (report.high_count == 0 and report.medium_count == 0):
|
||||
return DriftInterpretation(
|
||||
intent=DriftIntent.UNKNOWN,
|
||||
explanation="無顯著漂移,不需要意圖分析",
|
||||
confidence=1.0,
|
||||
)
|
||||
|
||||
diff_text = self._format_diff_for_prompt(report)
|
||||
prompt = _INTENT_PROMPT_TEMPLATE.format(diff_summary=diff_text)
|
||||
|
||||
result = await self._call_nemotron(prompt)
|
||||
return result
|
||||
|
||||
def _format_diff_for_prompt(self, report: "DriftReport") -> str:
|
||||
"""格式化 diff 給 Nemotron 分析用"""
|
||||
lines = []
|
||||
for item in report.items[:10]: # 最多 10 項避免 token 過多
|
||||
if item.is_allowlisted:
|
||||
continue
|
||||
lines.append(
|
||||
f"- {item.resource_kind}/{item.resource_name}: "
|
||||
f"{item.field_path} "
|
||||
f"Git={str(item.git_value)[:40]} → "
|
||||
f"K8s={str(item.actual_value)[:40]}"
|
||||
)
|
||||
return "\n".join(lines) if lines else "(均為白名單欄位)"
|
||||
|
||||
async def _call_nemotron(self, prompt: str) -> DriftInterpretation:
|
||||
"""呼叫 Nemotron 進行意圖分析"""
|
||||
try:
|
||||
from src.core.config import get_settings
|
||||
from src.services.nvidia_provider import get_nvidia_provider
|
||||
|
||||
settings = get_settings()
|
||||
nvidia = get_nvidia_provider()
|
||||
|
||||
response_text, success, _tokens, _cost = await asyncio.wait_for(
|
||||
nvidia.chat(prompt=prompt),
|
||||
timeout=getattr(settings, "NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS", 30),
|
||||
)
|
||||
|
||||
if not success or not response_text:
|
||||
return self._unknown_result("Nemotron 回傳空值")
|
||||
|
||||
return self._parse_response(response_text)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("drift_nemotron_timeout")
|
||||
return self._unknown_result("Nemotron 超時")
|
||||
except Exception as e:
|
||||
logger.warning("drift_nemotron_error", error=str(e))
|
||||
return self._unknown_result(str(e))
|
||||
|
||||
def _parse_response(self, text: str) -> DriftInterpretation:
|
||||
"""解析 Nemotron JSON 回應"""
|
||||
try:
|
||||
# 嘗試直接解析
|
||||
data = json.loads(text)
|
||||
except Exception:
|
||||
try:
|
||||
import re
|
||||
match = re.search(r"```(?:json)?\s*([\s\S]+?)```", text)
|
||||
if match:
|
||||
data = json.loads(match.group(1))
|
||||
else:
|
||||
return self._unknown_result("無法解析 JSON")
|
||||
except Exception:
|
||||
return self._unknown_result("JSON 解析失敗")
|
||||
|
||||
try:
|
||||
intent_str = data.get("intent", "unknown")
|
||||
intent = DriftIntent(intent_str) if intent_str in DriftIntent._value2member_map_ else DriftIntent.UNKNOWN
|
||||
return DriftInterpretation(
|
||||
intent=intent,
|
||||
explanation=data.get("explanation", ""),
|
||||
risk=data.get("risk", "MEDIUM"),
|
||||
confidence=float(data.get("confidence", 0.0)),
|
||||
)
|
||||
except Exception as e:
|
||||
return self._unknown_result(f"模型解析失敗: {e}")
|
||||
|
||||
def _unknown_result(self, reason: str) -> DriftInterpretation:
|
||||
return DriftInterpretation(
|
||||
intent=DriftIntent.UNKNOWN,
|
||||
explanation=f"意圖分析失敗:{reason}",
|
||||
risk="MEDIUM",
|
||||
confidence=0.0,
|
||||
)
|
||||
|
||||
|
||||
_interpreter: NemotronDriftInterpreter | None = None
|
||||
|
||||
|
||||
def get_drift_interpreter() -> NemotronDriftInterpreter:
|
||||
global _interpreter
|
||||
if _interpreter is None:
|
||||
_interpreter = NemotronDriftInterpreter()
|
||||
return _interpreter
|
||||
233
apps/api/src/services/drift_remediator.py
Normal file
233
apps/api/src/services/drift_remediator.py
Normal file
@@ -0,0 +1,233 @@
|
||||
"""
|
||||
Drift Remediator - Phase 25 P2 Config Drift Detection
|
||||
======================================================
|
||||
職責:確定性修復執行
|
||||
- rollback():kubectl apply -f <git-yaml>(覆蓋回 Git 狀態)
|
||||
- adopt():git commit + git push gitea main(承認變更,更新 Git)
|
||||
|
||||
設計邊界(核心原則):
|
||||
- 不使用 AI 判斷如何修復
|
||||
- 只有人工確認按鈕後才執行
|
||||
- rollback 失敗只通知,不重試(避免重複操作)
|
||||
|
||||
版本: v1.0
|
||||
建立: 2026-04-04 (台北時區)
|
||||
建立者: ogt (首席架構師設計) + Claude Code (實作)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import subprocess
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from src.models.drift import DriftItem, DriftReport
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
class DriftRemediator:
|
||||
"""
|
||||
確定性漂移修復執行器
|
||||
|
||||
職責邊界:
|
||||
✅ kubectl apply(覆蓋回 Git 狀態)
|
||||
✅ git commit + push(承認變更)
|
||||
❌ 不使用 AI 決定修復策略
|
||||
❌ 不自動重試
|
||||
"""
|
||||
|
||||
def __init__(self, k8s_dir: str = "k8s"):
|
||||
self._k8s_dir = k8s_dir
|
||||
|
||||
async def rollback(
|
||||
self,
|
||||
report: "DriftReport",
|
||||
resource_key: str | None = None,
|
||||
) -> dict:
|
||||
"""
|
||||
覆蓋回 Git 狀態(kubectl apply)
|
||||
|
||||
Args:
|
||||
report: 漂移報告
|
||||
resource_key: 指定資源(Kind/Name),None 表示全部
|
||||
|
||||
Returns:
|
||||
{"success": bool, "message": str}
|
||||
"""
|
||||
logger.info(
|
||||
"drift_rollback_start",
|
||||
report_id=report.report_id,
|
||||
resource=resource_key or "all",
|
||||
)
|
||||
|
||||
try:
|
||||
result = await asyncio.get_event_loop().run_in_executor(
|
||||
None,
|
||||
self._kubectl_apply,
|
||||
report.namespace,
|
||||
resource_key,
|
||||
)
|
||||
|
||||
if result["success"]:
|
||||
logger.info(
|
||||
"drift_rollback_success",
|
||||
report_id=report.report_id,
|
||||
namespace=report.namespace,
|
||||
)
|
||||
await self._notify_telegram(
|
||||
f"✅ 漂移已覆蓋回 Git 狀態\n"
|
||||
f"Namespace: {report.namespace}\n"
|
||||
f"資源: {resource_key or '全部'}"
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
"drift_rollback_failed",
|
||||
report_id=report.report_id,
|
||||
error=result.get("message"),
|
||||
)
|
||||
await self._notify_telegram(
|
||||
f"❌ 漂移覆蓋失敗,需要人工介入\n"
|
||||
f"Namespace: {report.namespace}\n"
|
||||
f"錯誤: {result.get('message', '')[:200]}"
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
msg = f"rollback 異常: {str(e)}"
|
||||
logger.error("drift_rollback_exception", error=str(e))
|
||||
await self._notify_telegram(
|
||||
f"❌ 漂移覆蓋異常\nNamespace: {report.namespace}\n錯誤: {str(e)[:200]}"
|
||||
)
|
||||
return {"success": False, "message": msg}
|
||||
|
||||
async def adopt(
|
||||
self,
|
||||
report: "DriftReport",
|
||||
field_description: str = "",
|
||||
) -> dict:
|
||||
"""
|
||||
承認變更:git commit + git push gitea main
|
||||
|
||||
Args:
|
||||
report: 漂移報告
|
||||
field_description: 漂移欄位說明(用於 commit message)
|
||||
|
||||
Returns:
|
||||
{"success": bool, "message": str}
|
||||
"""
|
||||
logger.info(
|
||||
"drift_adopt_start",
|
||||
report_id=report.report_id,
|
||||
namespace=report.namespace,
|
||||
)
|
||||
|
||||
# 這裡不直接修改 git(需要人工決定具體的值),
|
||||
# 而是提示用戶需要在本地執行 git 操作
|
||||
# 在實際部署場景中,可透過 Gitea API 建立 PR 或直接 push
|
||||
commit_msg = (
|
||||
f"chore: adopt drift — {report.namespace} "
|
||||
f"{field_description or report.summary}"
|
||||
)
|
||||
|
||||
try:
|
||||
result = await asyncio.get_event_loop().run_in_executor(
|
||||
None,
|
||||
self._git_push,
|
||||
commit_msg,
|
||||
)
|
||||
|
||||
if result["success"]:
|
||||
logger.info("drift_adopt_success", report_id=report.report_id)
|
||||
await self._notify_telegram(
|
||||
f"✅ 漂移已承認,Git 已更新\n"
|
||||
f"Namespace: {report.namespace}\n"
|
||||
f"Commit: {commit_msg[:80]}"
|
||||
)
|
||||
else:
|
||||
logger.error("drift_adopt_failed", error=result.get("message"))
|
||||
await self._notify_telegram(
|
||||
f"❌ Git 更新失敗,需要人工處理\n"
|
||||
f"錯誤: {result.get('message', '')[:200]}"
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error("drift_adopt_exception", error=str(e))
|
||||
return {"success": False, "message": str(e)}
|
||||
|
||||
# =========================================================================
|
||||
# Private
|
||||
# =========================================================================
|
||||
|
||||
def _kubectl_apply(self, namespace: str, resource_key: str | None) -> dict:
|
||||
"""執行 kubectl apply(同步)"""
|
||||
try:
|
||||
cmd = ["kubectl", "apply", "-f", self._k8s_dir, "-n", namespace, "--dry-run=none"]
|
||||
proc = subprocess.run(
|
||||
cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=60,
|
||||
)
|
||||
if proc.returncode == 0:
|
||||
return {"success": True, "message": proc.stdout[:500]}
|
||||
else:
|
||||
return {"success": False, "message": proc.stderr[:500]}
|
||||
except subprocess.TimeoutExpired:
|
||||
return {"success": False, "message": "kubectl apply 超時(60s)"}
|
||||
except Exception as e:
|
||||
return {"success": False, "message": str(e)}
|
||||
|
||||
def _git_push(self, commit_msg: str) -> dict:
|
||||
"""執行 git add + commit + push gitea(同步)"""
|
||||
try:
|
||||
# git add
|
||||
subprocess.run(["git", "add", "-A"], check=True, timeout=10)
|
||||
# git commit
|
||||
subprocess.run(
|
||||
["git", "commit", "-m", commit_msg],
|
||||
check=True,
|
||||
timeout=10,
|
||||
)
|
||||
# git push gitea main
|
||||
proc = subprocess.run(
|
||||
["git", "push", "gitea", "main"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
if proc.returncode == 0:
|
||||
return {"success": True, "message": "已推送至 gitea main"}
|
||||
else:
|
||||
return {"success": False, "message": proc.stderr[:500]}
|
||||
except subprocess.CalledProcessError as e:
|
||||
return {"success": False, "message": f"git 操作失敗: {e}"}
|
||||
except subprocess.TimeoutExpired:
|
||||
return {"success": False, "message": "git push 超時"}
|
||||
except Exception as e:
|
||||
return {"success": False, "message": str(e)}
|
||||
|
||||
async def _notify_telegram(self, message: str) -> None:
|
||||
"""推送通知到 Telegram"""
|
||||
try:
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
tg = get_telegram_gateway()
|
||||
await tg.send_text(message)
|
||||
except Exception as e:
|
||||
logger.warning("drift_remediator_telegram_failed", error=str(e))
|
||||
|
||||
|
||||
_remediator: DriftRemediator | None = None
|
||||
|
||||
|
||||
def get_drift_remediator() -> DriftRemediator:
|
||||
global _remediator
|
||||
if _remediator is None:
|
||||
_remediator = DriftRemediator()
|
||||
return _remediator
|
||||
@@ -223,3 +223,56 @@ class KnowledgeService:
|
||||
|
||||
logger.info("embed_all_complete", total=len(rows), success=success, failed=failed)
|
||||
return {"total": len(rows), "success": success, "failed": failed}
|
||||
|
||||
async def check_anti_pattern(
|
||||
self,
|
||||
symptoms_hash: str,
|
||||
days: int = 7,
|
||||
) -> list[KnowledgeEntry]:
|
||||
"""
|
||||
2026-04-04 Claude Code: Phase 25 P1 — Anti-Pattern 閉環閘門
|
||||
根據 symptoms_hash 查找近期失敗案例,供 auto_repair decide() 攔截用
|
||||
|
||||
Args:
|
||||
symptoms_hash: SymptomPattern.compute_hash() 的 16 字元 hash
|
||||
days: 查找幾天內的記錄(預設 7 天)
|
||||
|
||||
Returns:
|
||||
list[KnowledgeEntry] — ANTI_PATTERN 條目,空表示無已知失敗案例
|
||||
"""
|
||||
from datetime import timedelta
|
||||
from sqlalchemy import text as sa_text
|
||||
from src.utils.timezone import now_taipei
|
||||
|
||||
cutoff = now_taipei() - timedelta(days=days)
|
||||
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
sa_text(
|
||||
"SELECT id FROM knowledge_entries "
|
||||
"WHERE entry_type = 'anti_pattern' "
|
||||
"AND symptoms_hash = :hash "
|
||||
"AND created_at >= :cutoff "
|
||||
"AND status != 'archived' "
|
||||
"ORDER BY created_at DESC LIMIT 5"
|
||||
),
|
||||
{"hash": symptoms_hash, "cutoff": cutoff},
|
||||
)
|
||||
entry_ids = [row.id for row in result.fetchall()]
|
||||
|
||||
if not entry_ids:
|
||||
return []
|
||||
|
||||
entries = []
|
||||
for eid in entry_ids:
|
||||
entry = await self.get_entry(eid)
|
||||
if entry:
|
||||
entries.append(entry)
|
||||
|
||||
logger.info(
|
||||
"anti_pattern_check",
|
||||
symptoms_hash=symptoms_hash,
|
||||
days=days,
|
||||
found=len(entries),
|
||||
)
|
||||
return entries
|
||||
|
||||
343
apps/api/src/services/runbook_generator.py
Normal file
343
apps/api/src/services/runbook_generator.py
Normal file
@@ -0,0 +1,343 @@
|
||||
"""
|
||||
Runbook Generator - Phase 25 P1 Knowledge Auto-Harvesting
|
||||
==========================================================
|
||||
修復後自動生成 Runbook(成功)或 Anti-Pattern(失敗)
|
||||
透過 Nemotron NIM 生成,結果沉澱至 KM 知識庫
|
||||
|
||||
設計原則:
|
||||
- 非阻塞:asyncio.create_task() 呼叫,絕不影響 AutoRepair 主流程
|
||||
- 失敗靜默:生成失敗只記 log,不拋例外
|
||||
- DRAFT/PUBLISHED:成功 → DRAFT(需人工審核),失敗 → PUBLISHED(直接封鎖)
|
||||
|
||||
版本: v1.1
|
||||
建立: 2026-04-04 (台北時區)
|
||||
建立者: ogt (首席架構師設計) + Claude Code (實作)
|
||||
關聯設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向一
|
||||
|
||||
變更紀錄:
|
||||
| 版本 | 日期 | 執行者 | 變更內容 |
|
||||
|------|------|--------|----------|
|
||||
| v1.0 | 2026-04-04 | Claude Code | 初始佔位(使用 generate() 但介面不存在) |
|
||||
| v1.1 | 2026-04-04 | ogt (首席架構師) | 改用正確的 nvidia.chat() 介面;新增 Minimal fallback |
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
|
||||
from src.models.knowledge import EntrySource, EntryStatus, EntryType, KnowledgeEntryCreate
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from src.models.incident import Incident
|
||||
from src.models.playbook import Playbook
|
||||
from src.services.auto_repair_service import AutoRepairResult
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
class NemotronRunbookGenerator:
|
||||
"""
|
||||
Nemotron 驅動的 Runbook 自動生成器
|
||||
|
||||
職責:
|
||||
- 成功修復 → AUTO_RUNBOOK (DRAFT) + Telegram 審核 card
|
||||
- 失敗修復 → ANTI_PATTERN (PUBLISHED) + Telegram 通知
|
||||
|
||||
leWOOOgo 積木化:
|
||||
- 呼叫 KnowledgeService(不直接存 DB)
|
||||
- 呼叫 NvidiaProvider.chat()(非 AIRouter,Runbook 是知識副作用)
|
||||
"""
|
||||
|
||||
_RUNBOOK_SYSTEM = (
|
||||
"你是 AWOOOI 平台的 SRE Runbook 撰寫專家。"
|
||||
"根據提供的 Incident 與修復結果,用繁體中文生成完整結構化 Runbook。"
|
||||
)
|
||||
|
||||
_ANTI_PATTERN_SYSTEM = (
|
||||
"你是 AWOOOI 平台的故障分析專家。"
|
||||
"根據失敗的修復嘗試,用繁體中文生成失敗案例記錄,幫助未來避免重蹈覆轍。"
|
||||
)
|
||||
|
||||
async def generate_runbook(
|
||||
self,
|
||||
incident: "Incident",
|
||||
playbook: "Playbook",
|
||||
result: "AutoRepairResult",
|
||||
symptoms_hash: str,
|
||||
) -> None:
|
||||
"""
|
||||
成功修復後生成 AUTO_RUNBOOK(fire-and-forget,呼叫方不等待)
|
||||
|
||||
Args:
|
||||
incident: 觸發的 Incident
|
||||
playbook: 執行的 Playbook
|
||||
result: 執行結果(success=True)
|
||||
symptoms_hash: SymptomPattern.compute_hash() 的 hash
|
||||
"""
|
||||
try:
|
||||
content = await self._call_nemotron_for_runbook(incident, playbook, result)
|
||||
if not content:
|
||||
return
|
||||
|
||||
from src.services.knowledge_service import get_knowledge_service
|
||||
ks = get_knowledge_service()
|
||||
|
||||
entry_data = KnowledgeEntryCreate(
|
||||
title=f"[AUTO] {incident.incident_id} — {playbook.name}",
|
||||
content=content,
|
||||
entry_type=EntryType.AUTO_RUNBOOK,
|
||||
category="auto_generated",
|
||||
tags=list(incident.affected_services or []) + ["auto_runbook", "nemotron"],
|
||||
source=EntrySource.AI_EXTRACTED,
|
||||
status=EntryStatus.DRAFT,
|
||||
related_incident_id=incident.incident_id,
|
||||
related_playbook_id=playbook.playbook_id,
|
||||
symptoms_hash=symptoms_hash,
|
||||
created_by="nemotron_runbook_generator",
|
||||
)
|
||||
|
||||
entry = await ks.create_entry(entry_data)
|
||||
|
||||
logger.info(
|
||||
"auto_runbook_created",
|
||||
incident_id=incident.incident_id,
|
||||
entry_id=entry.id,
|
||||
playbook_id=playbook.playbook_id,
|
||||
)
|
||||
|
||||
await self._push_runbook_review_card(incident, entry.id, content[:200])
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"runbook_generation_failed",
|
||||
incident_id=incident.incident_id,
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
async def generate_anti_pattern(
|
||||
self,
|
||||
incident: "Incident",
|
||||
playbook: "Playbook",
|
||||
result: "AutoRepairResult",
|
||||
symptoms_hash: str,
|
||||
) -> None:
|
||||
"""
|
||||
失敗修復後生成 ANTI_PATTERN(fire-and-forget,直接 PUBLISHED)
|
||||
|
||||
Args:
|
||||
incident: 觸發的 Incident
|
||||
playbook: 嘗試執行的 Playbook
|
||||
result: 執行結果(success=False)
|
||||
symptoms_hash: SymptomPattern.compute_hash() 的 hash
|
||||
"""
|
||||
try:
|
||||
content = await self._call_nemotron_for_anti_pattern(incident, playbook, result)
|
||||
if not content:
|
||||
return
|
||||
|
||||
from src.services.knowledge_service import get_knowledge_service
|
||||
ks = get_knowledge_service()
|
||||
|
||||
title = f"[FAIL] {incident.incident_id} — {playbook.name}"
|
||||
entry_data = KnowledgeEntryCreate(
|
||||
title=title,
|
||||
content=content,
|
||||
entry_type=EntryType.ANTI_PATTERN,
|
||||
category="failure_cases",
|
||||
tags=list(incident.affected_services or []) + ["anti_pattern", "failure"],
|
||||
source=EntrySource.AI_EXTRACTED,
|
||||
status=EntryStatus.PUBLISHED, # 直接發布,無需審核
|
||||
related_incident_id=incident.incident_id,
|
||||
related_playbook_id=playbook.playbook_id,
|
||||
symptoms_hash=symptoms_hash,
|
||||
created_by="nemotron_runbook_generator",
|
||||
)
|
||||
|
||||
entry = await ks.create_entry(entry_data)
|
||||
|
||||
logger.info(
|
||||
"anti_pattern_created",
|
||||
incident_id=incident.incident_id,
|
||||
entry_id=entry.id,
|
||||
symptoms_hash=symptoms_hash,
|
||||
)
|
||||
|
||||
await self._push_anti_pattern_notification(incident, title)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"anti_pattern_generation_failed",
|
||||
incident_id=incident.incident_id,
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
# =========================================================================
|
||||
# Private
|
||||
# =========================================================================
|
||||
|
||||
async def _call_nemotron_for_runbook(
|
||||
self,
|
||||
incident: "Incident",
|
||||
playbook: "Playbook",
|
||||
result: "AutoRepairResult",
|
||||
) -> str:
|
||||
"""呼叫 Nemotron chat() 生成 9 段 Runbook,回傳 Markdown 字串"""
|
||||
from src.core.config import get_settings
|
||||
from src.services.nvidia_provider import get_nvidia_provider
|
||||
|
||||
settings = get_settings()
|
||||
prompt = (
|
||||
f"## Incident 資訊\n"
|
||||
f"- ID: {incident.incident_id}\n"
|
||||
f"- 受影響服務: {', '.join(incident.affected_services or [])}\n"
|
||||
f"- 嚴重度: {incident.severity.value if incident.severity else 'unknown'}\n\n"
|
||||
f"## 執行的 Playbook\n"
|
||||
f"- 名稱: {playbook.name}\n"
|
||||
f"- 執行步驟:\n"
|
||||
+ "\n".join(f" {s}" for s in result.executed_steps[:5])
|
||||
+ f"\n\n## 執行結果\n- 狀態: 成功,耗時 {result.execution_time_ms}ms\n\n"
|
||||
"請生成包含以下 9 段的 Runbook(Markdown 格式):\n"
|
||||
"1. ## 症狀描述\n2. ## 根因分析\n3. ## 執行步驟\n"
|
||||
"4. ## 驗證步驟\n5. ## 注意事項\n6. ## 影響範圍\n"
|
||||
"7. ## 相關 Incident\n8. ## 下次預防建議\n9. ## 適用條件"
|
||||
)
|
||||
|
||||
try:
|
||||
nvidia = get_nvidia_provider()
|
||||
start = time.time()
|
||||
# chat() 回傳 (response_text, success, total_tokens, cost_usd)
|
||||
response_text, success, _tokens, _cost = await asyncio.wait_for(
|
||||
nvidia.chat(prompt=f"[SYSTEM]{self._RUNBOOK_SYSTEM}\n\n{prompt}"),
|
||||
timeout=settings.NEMOTRON_TIMEOUT_SECONDS,
|
||||
)
|
||||
latency_ms = (time.time() - start) * 1000
|
||||
logger.info("runbook_nemotron_call_ok", latency_ms=round(latency_ms, 1))
|
||||
if success and response_text:
|
||||
return response_text
|
||||
except Exception as e:
|
||||
logger.warning("runbook_nemotron_call_failed", error=str(e))
|
||||
|
||||
# Fallback:組裝基本 Runbook
|
||||
return self._build_minimal_runbook(incident, playbook, result)
|
||||
|
||||
async def _call_nemotron_for_anti_pattern(
|
||||
self,
|
||||
incident: "Incident",
|
||||
playbook: "Playbook",
|
||||
result: "AutoRepairResult",
|
||||
) -> str:
|
||||
"""呼叫 Nemotron chat() 生成失敗案例記錄,回傳 Markdown 字串"""
|
||||
from src.core.config import get_settings
|
||||
from src.services.nvidia_provider import get_nvidia_provider
|
||||
|
||||
settings = get_settings()
|
||||
prompt = (
|
||||
f"## Incident 資訊\n"
|
||||
f"- ID: {incident.incident_id}\n"
|
||||
f"- 受影響服務: {', '.join(incident.affected_services or [])}\n\n"
|
||||
f"## 嘗試的 Playbook\n- 名稱: {playbook.name}\n\n"
|
||||
f"## 失敗原因\n{result.error or '執行中發生未知異常'}\n\n"
|
||||
"請生成失敗案例文件(Markdown 格式),包含:\n"
|
||||
"## 症狀描述\n## 嘗試的修復方案\n## 失敗原因分析\n"
|
||||
"## 已知不適用條件\n## 替代方案建議"
|
||||
)
|
||||
|
||||
try:
|
||||
nvidia = get_nvidia_provider()
|
||||
response_text, success, _tokens, _cost = await asyncio.wait_for(
|
||||
nvidia.chat(prompt=f"[SYSTEM]{self._ANTI_PATTERN_SYSTEM}\n\n{prompt}"),
|
||||
timeout=settings.NEMOTRON_TIMEOUT_SECONDS,
|
||||
)
|
||||
if success and response_text:
|
||||
return response_text
|
||||
except Exception as e:
|
||||
logger.warning("anti_pattern_nemotron_call_failed", error=str(e))
|
||||
|
||||
return self._build_minimal_anti_pattern(incident, playbook, result)
|
||||
|
||||
def _build_minimal_runbook(
|
||||
self,
|
||||
incident: "Incident",
|
||||
playbook: "Playbook",
|
||||
result: "AutoRepairResult",
|
||||
) -> str:
|
||||
"""Nemotron 超時/失敗時的基本 Runbook fallback"""
|
||||
steps = "\n".join(f"- {s}" for s in result.executed_steps)
|
||||
return (
|
||||
f"## 症狀描述\nIncident {incident.incident_id},"
|
||||
f"受影響服務:{', '.join(incident.affected_services or [])}\n\n"
|
||||
f"## 執行步驟\n{steps}\n\n"
|
||||
f"## 執行結果\n成功,耗時 {result.execution_time_ms}ms\n\n"
|
||||
"*本文件由系統自動生成(Nemotron fallback),建議人工補充完善。*"
|
||||
)
|
||||
|
||||
def _build_minimal_anti_pattern(
|
||||
self,
|
||||
incident: "Incident",
|
||||
playbook: "Playbook",
|
||||
result: "AutoRepairResult",
|
||||
) -> str:
|
||||
"""Nemotron 超時/失敗時的基本 Anti-Pattern fallback"""
|
||||
return (
|
||||
f"## 症狀描述\nIncident {incident.incident_id},"
|
||||
f"受影響服務:{', '.join(incident.affected_services or [])}\n\n"
|
||||
f"## 失敗原因\n{result.error or '執行中發生異常'}\n\n"
|
||||
f"## 已知不適用條件\nPlaybook '{playbook.name}' 在此症狀下失敗,請勿自動重試。\n\n"
|
||||
"*本文件由系統自動生成(Nemotron fallback)。*"
|
||||
)
|
||||
|
||||
async def _push_runbook_review_card(
|
||||
self,
|
||||
incident: "Incident",
|
||||
entry_id: str,
|
||||
content_preview: str,
|
||||
) -> None:
|
||||
"""推送 Runbook 審核 card 到 Telegram"""
|
||||
try:
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
tg = get_telegram_gateway()
|
||||
await tg.send_text(
|
||||
f"📄 <b>Auto Runbook 待審核</b>\n"
|
||||
f"Incident: <code>{incident.incident_id}</code>\n"
|
||||
f"Entry ID: <code>{entry_id}</code>\n\n"
|
||||
f"<i>{content_preview}...</i>\n\n"
|
||||
f"請至知識庫審核並發布。"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("runbook_review_card_failed", error=str(e))
|
||||
|
||||
async def _push_anti_pattern_notification(
|
||||
self,
|
||||
incident: "Incident",
|
||||
title: str,
|
||||
) -> None:
|
||||
"""推送 Anti-Pattern 已記錄通知到 Telegram"""
|
||||
try:
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
tg = get_telegram_gateway()
|
||||
await tg.send_text(
|
||||
f"⚠️ <b>已記錄失敗案例</b>\n"
|
||||
f"Incident: <code>{incident.incident_id}</code>\n"
|
||||
f"標題: {title}\n\n"
|
||||
f"相同症狀的後續告警將阻斷自動修復,要求人工介入。"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("anti_pattern_notification_failed", error=str(e))
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 單例管理
|
||||
# =============================================================================
|
||||
|
||||
_generator: NemotronRunbookGenerator | None = None
|
||||
|
||||
|
||||
def get_runbook_generator() -> NemotronRunbookGenerator:
|
||||
global _generator
|
||||
if _generator is None:
|
||||
_generator = NemotronRunbookGenerator()
|
||||
return _generator
|
||||
71
k8s/drift-cronjob.yaml
Normal file
71
k8s/drift-cronjob.yaml
Normal file
@@ -0,0 +1,71 @@
|
||||
# Config Drift Detection CronJob - Phase 25 P2
|
||||
# 每小時掃描 awoooi-prod namespace 的配置漂移
|
||||
#
|
||||
# 建立時間: 2026-04-04 (台北時區)
|
||||
# 建立者: Claude Code (Phase 25 P2)
|
||||
# 關聯設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向三
|
||||
# 關聯 ADR: 待起草 ADR-057
|
||||
#
|
||||
# 部署: kubectl apply -f k8s/drift-cronjob.yaml -n awoooi-prod
|
||||
# 手動觸發: kubectl create job --from=cronjob/drift-scanner drift-scan-manual -n awoooi-prod
|
||||
# 查看 log: kubectl logs -l job-name=drift-scanner -n awoooi-prod
|
||||
|
||||
apiVersion: batch/v1
|
||||
kind: CronJob
|
||||
metadata:
|
||||
name: drift-scanner
|
||||
namespace: awoooi-prod
|
||||
labels:
|
||||
app: awoooi
|
||||
component: drift-scanner
|
||||
phase: "25"
|
||||
annotations:
|
||||
# 2026-04-04 ogt: Phase 25 P2 — Config Drift Detection
|
||||
description: "每小時掃描 K8s 配置漂移,由 Nemotron 做意圖分析"
|
||||
spec:
|
||||
# 每小時整點執行(台北時間 = UTC+8,schedule 用 UTC)
|
||||
schedule: "0 * * * *"
|
||||
concurrencyPolicy: Forbid # 禁止並發:上次未完成則跳過
|
||||
successfulJobsHistoryLimit: 3
|
||||
failedJobsHistoryLimit: 5
|
||||
startingDeadlineSeconds: 60 # 錯過時間窗口超過 60s 則跳過
|
||||
jobTemplate:
|
||||
spec:
|
||||
backoffLimit: 0 # 失敗不重試(漂移掃描冪等,下次 cron 自動補掃)
|
||||
activeDeadlineSeconds: 300 # 最長 5 分鐘
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: awoooi
|
||||
component: drift-scanner
|
||||
spec:
|
||||
restartPolicy: Never
|
||||
serviceAccountName: awoooi-api # 使用 API 的 ServiceAccount(有 kubectl 權限)
|
||||
containers:
|
||||
- name: drift-scanner
|
||||
# 使用 awoooi-api 鏡像(含 kubectl + Python 環境)
|
||||
image: harbor.wooo.work/awoooi/api:latest
|
||||
imagePullPolicy: Always
|
||||
command:
|
||||
- python
|
||||
- -c
|
||||
- |
|
||||
import asyncio, httpx, os
|
||||
API_URL = os.environ.get("INTERNAL_API_URL", "http://awoooi-api:8000")
|
||||
async def run():
|
||||
async with httpx.AsyncClient(timeout=240) as c:
|
||||
r = await c.post(f"{API_URL}/api/v1/drift/internal/scan")
|
||||
print(f"status={r.status_code} body={r.text[:200]}")
|
||||
asyncio.run(run())
|
||||
env:
|
||||
- name: INTERNAL_API_URL
|
||||
value: "http://awoooi-api.awoooi-prod.svc.cluster.local:8000"
|
||||
- name: DRIFT_SCAN_NAMESPACES
|
||||
value: "awoooi-prod"
|
||||
resources:
|
||||
requests:
|
||||
cpu: "50m"
|
||||
memory: "64Mi"
|
||||
limits:
|
||||
cpu: "200m"
|
||||
memory: "256Mi"
|
||||
Reference in New Issue
Block a user