feat(phase25): Nemotron 主動防禦三方向 P0+P1+P2 完整實作
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 38s
Type Sync Check / check-type-sync (push) Failing after 35s

P0 - DIAGNOSE Privacy-First Routing:
- ai_router.py: _local_fallback_chain [NEMOTRON→OLLAMA→REJECT]
- DIAGNOSE 意圖 override 改為 NEMOTRON (原 OLLAMA)
- DIAGNOSE fallback 使用 local-only 鏈,不觸碰雲端
- 全部失敗時 REJECT + Telegram 通知
- config.py: NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS=30, OLLAMA_DIAGNOSE_TIMEOUT_SECONDS=60
- nemotron.py: 根據 context[task_type] 選擇 timeout

P1 - Knowledge Auto-Harvesting:
- models/knowledge.py: EntryType.AUTO_RUNBOOK + ANTI_PATTERN + symptoms_hash
- EntryStatus.PUBLISHED (ANTI_PATTERN 直接發布,無需審核)
- models/playbook.py: SymptomPattern.compute_hash() (16字元確定性 hash)
- services/runbook_generator.py: NemotronRunbookGenerator (v1.1)
  - generate_runbook() → AUTO_RUNBOOK (DRAFT) + Telegram 審核 card
  - generate_anti_pattern() → ANTI_PATTERN (PUBLISHED) + Telegram 通知
  - 使用 nvidia.chat() (正確介面),Nemotron 超時時 Minimal fallback
- knowledge_service.py: check_anti_pattern(symptoms_hash, days=7)
- db/models.py: symptoms_hash VARCHAR(16) + ix_knowledge_symptoms_hash
- repositories/knowledge_repository.py: create() 支援 symptoms_hash + status
- auto_repair_service.py: anti_pattern_gate 在 decide() + runbook hook 在 execute()
- migrations/phase8_symptoms_hash.sql: ALTER TABLE + partial index + PUBLISHED constraint

P2 - Config Drift Detection:
- models/drift.py: DriftItem/DriftReport/DriftLevel/DriftIntent/DriftStatus
- services/drift_detector.py: GitStateReader + K8sStateReader + DriftDetector
- services/drift_analyzer.py: 白名單過濾 + DriftLevel 分級
- services/drift_interpreter.py: NemotronDriftInterpreter(意圖分析,不生成修復指令)
- services/drift_remediator.py: rollback(kubectl apply) + adopt(git push gitea)
- api/v1/drift.py: POST /scan, GET /reports, POST /rollback, POST /adopt
- migrations/phase9_drift_reports.sql: drift_reports 表
- k8s/drift-cronjob.yaml: 每小時自動掃描 CronJob

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-04 12:35:05 +08:00
parent 0b41df45d6
commit 3455044457
20 changed files with 1945 additions and 7 deletions

View File

@@ -0,0 +1,48 @@
-- Phase 25 P1: Knowledge Auto-Harvesting — symptoms_hash 欄位
-- 用於 Anti-Pattern 閉環攔截的確定性症狀 hash
-- 建立時間: 2026-04-04 (台北時區)
-- 建立者: Claude Code (Phase 25 P1)
--
-- 執行方式: psql -h 192.168.0.188 -U awoooi -d awoooi -f phase8_symptoms_hash.sql
-- 1. knowledge_entries 表新增 symptoms_hash 欄位
ALTER TABLE knowledge_entries
ADD COLUMN IF NOT EXISTS symptoms_hash VARCHAR(16);
-- 2. 建立 index 加速 Anti-Pattern 閘門查詢
-- 查詢條件: entry_type='anti_pattern' AND symptoms_hash=:hash AND created_at>=:cutoff
CREATE INDEX IF NOT EXISTS idx_knowledge_anti_pattern_hash
ON knowledge_entries (entry_type, symptoms_hash, created_at)
WHERE entry_type = 'anti_pattern' AND symptoms_hash IS NOT NULL;
-- 3. EntryStatus 新增 PUBLISHED用於 ANTI_PATTERN 直接發布)
-- PostgreSQL CHECK constraint 需要重建(若有的話)
-- 若無 constraintPostgreSQL 的 VARCHAR 欄位可直接存入任意值,無需 ALTER。
-- 確認 status 欄位是否有 CHECK constraint:
-- SELECT conname, consrc FROM pg_constraint
-- WHERE conrelid = 'knowledge_entries'::regclass AND contype = 'c';
-- 若有 CHECK constraint如 status IN ('draft', 'review', 'approved', 'archived')
-- 需執行以下(請先確認 constraint 名稱):
-- ALTER TABLE knowledge_entries DROP CONSTRAINT IF EXISTS knowledge_entries_status_check;
-- ALTER TABLE knowledge_entries ADD CONSTRAINT knowledge_entries_status_check
-- CHECK (status IN ('draft', 'review', 'approved', 'archived', 'published'));
-- 安全執行版本(自動處理 CHECK constraint
DO $$
DECLARE
v_conname text;
BEGIN
SELECT conname INTO v_conname
FROM pg_constraint
WHERE conrelid = 'knowledge_entries'::regclass AND contype = 'c' AND conname LIKE '%status%';
IF v_conname IS NOT NULL THEN
EXECUTE format('ALTER TABLE knowledge_entries DROP CONSTRAINT %I', v_conname);
ALTER TABLE knowledge_entries ADD CONSTRAINT knowledge_entries_status_check
CHECK (status IN ('draft', 'review', 'approved', 'archived', 'published'));
RAISE NOTICE 'Updated status CHECK constraint: % → added published', v_conname;
ELSE
RAISE NOTICE 'No status CHECK constraint found, skipping';
END IF;
END $$;

View File

@@ -0,0 +1,54 @@
-- Phase 25 P2: Config Drift Detection — drift_reports 資料表
-- 建立時間: 2026-04-04 (台北時區)
-- 建立者: Claude Code (Phase 25 P2)
-- 對應模型: apps/api/src/models/drift.py
-- 對應設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向三
--
-- 執行方式: psql -h 192.168.0.188 -U awoooi -d awoooi -f phase9_drift_reports.sql
CREATE TABLE IF NOT EXISTS drift_reports (
-- 識別
report_id VARCHAR(32) PRIMARY KEY,
-- 掃描資訊
namespace VARCHAR(128) NOT NULL,
triggered_by VARCHAR(64) NOT NULL DEFAULT 'cron', -- cron / webhook / api
scanned_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- 計數(非正規化,避免每次 JOIN
high_count INT NOT NULL DEFAULT 0,
medium_count INT NOT NULL DEFAULT 0,
info_count INT NOT NULL DEFAULT 0,
-- 漂移項目JSONB 列表)
items JSONB NOT NULL DEFAULT '[]',
-- Nemotron 意圖分析
interpretation JSONB, -- DriftInterpretation可為 NULL尚未分析
-- 處理狀態
status VARCHAR(32) NOT NULL DEFAULT 'pending',
-- pending / acknowledged / rolled_back / adopted / ignored
-- 時間軸
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMPTZ
);
-- 索引
CREATE INDEX IF NOT EXISTS idx_drift_reports_namespace
ON drift_reports(namespace);
CREATE INDEX IF NOT EXISTS idx_drift_reports_status
ON drift_reports(status);
CREATE INDEX IF NOT EXISTS idx_drift_reports_created_at
ON drift_reports(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_drift_reports_high_count
ON drift_reports(high_count)
WHERE high_count > 0;
-- 說明:
-- 目前 API 使用 in-memory dict 暫存,此表供未來持久化使用
-- 啟用持久化後,需在 drift.py 的 _recent_reports 操作改為 DB 寫入

View File

@@ -0,0 +1,215 @@
"""
Config Drift Detection API Router - Phase 25 P2
================================================
GitOps 守門員 HTTP 端點
leWOOOgo 積木化原則:
- Router 層只做 HTTP 轉發
- 不直接存取 Redis/DB
- 業務邏輯委託給 Service 層
版本: v1.0
建立: 2026-04-04 (台北時區)
建立者: Claude Code (Phase 25 P2)
"""
from fastapi import APIRouter, BackgroundTasks, HTTPException
from src.models.drift import (
DriftListResponse,
DriftReport,
DriftScanRequest,
DriftScanResponse,
)
from src.services.drift_analyzer import get_drift_analyzer
from src.services.drift_detector import get_drift_detector
from src.services.drift_interpreter import get_drift_interpreter
from src.services.drift_remediator import get_drift_remediator
router = APIRouter(prefix="/drift", tags=["drift"])
# 本次 session 的漂移報告暫存prod 應存 DB
_recent_reports: dict[str, DriftReport] = {}
@router.post("/scan", response_model=DriftScanResponse, summary="觸發漂移掃描")
async def trigger_drift_scan(
request: DriftScanRequest,
background_tasks: BackgroundTasks,
) -> DriftScanResponse:
"""
觸發 Config Drift 掃描
- 比對 Git YAML vs K8s 實際狀態
- Nemotron 分析漂移意圖
- 高/中嚴重度漂移自動推送 Telegram
適合由 Gitea CD Webhook 或手動呼叫觸發
"""
detector = get_drift_detector()
analyzer = get_drift_analyzer()
interpreter = get_drift_interpreter()
all_items = []
last_report: DriftReport | None = None
for namespace in request.namespaces:
raw_report = await detector.scan(namespace, triggered_by=request.triggered_by)
classified_report = analyzer.classify(raw_report)
all_items.extend(classified_report.items)
if analyzer.needs_alert(classified_report):
# Nemotron 意圖分析(背景執行,避免阻塞)
background_tasks.add_task(
_analyze_and_notify, classified_report
)
last_report = classified_report
# 暫存(最多 50 筆)
_recent_reports[classified_report.report_id] = classified_report
if len(_recent_reports) > 50:
oldest_key = next(iter(_recent_reports))
del _recent_reports[oldest_key]
# 若多 namespace彙總第一個 report 的計數
if last_report:
return DriftScanResponse(
report_id=last_report.report_id,
summary=last_report.summary,
high_count=last_report.high_count,
medium_count=last_report.medium_count,
info_count=last_report.info_count,
has_critical_drift=last_report.has_critical_drift,
)
return DriftScanResponse(
report_id="no-drift",
summary="無漂移",
high_count=0,
medium_count=0,
info_count=0,
has_critical_drift=False,
)
@router.get("/reports", response_model=DriftListResponse, summary="列出最近漂移報告")
async def list_drift_reports() -> DriftListResponse:
"""列出最近 50 筆漂移報告(倒序)"""
items = list(reversed(list(_recent_reports.values())))
return DriftListResponse(items=items, total=len(items))
@router.post("/reports/{report_id}/rollback", summary="覆蓋回 Git 狀態")
async def rollback_drift(report_id: str) -> dict:
"""
將 K8s 狀態覆蓋回 Git YAMLkubectl apply
人工確認後才執行DriftRemediator 負責確定性修復
"""
report = _recent_reports.get(report_id)
if not report:
raise HTTPException(status_code=404, detail=f"Report {report_id} not found")
remediator = get_drift_remediator()
result = await remediator.rollback(report)
return result
@router.post("/reports/{report_id}/adopt", summary="承認變更並更新 Git")
async def adopt_drift(report_id: str) -> dict:
"""
承認 K8s 漂移,更新 Git 使其與實際狀態一致
人工確認後才執行git commit + push gitea main
"""
report = _recent_reports.get(report_id)
if not report:
raise HTTPException(status_code=404, detail=f"Report {report_id} not found")
remediator = get_drift_remediator()
result = await remediator.adopt(report)
return result
# =============================================================================
# Internal endpoint供 K8s CronJob 呼叫)
# =============================================================================
@router.post("/internal/scan", include_in_schema=False, summary="CronJob 觸發掃描")
async def internal_scan(background_tasks: BackgroundTasks) -> dict:
"""內部 CronJob 端點,每小時自動掃描 awoooi-prod"""
from src.core.config import get_settings
settings = get_settings()
namespaces = getattr(settings, "DRIFT_SCAN_NAMESPACES", "awoooi-prod").split(",")
background_tasks.add_task(
_run_full_scan,
[ns.strip() for ns in namespaces],
)
return {"status": "scan_triggered", "namespaces": namespaces}
# =============================================================================
# Background helpers
# =============================================================================
async def _analyze_and_notify(report: DriftReport) -> None:
"""背景Nemotron 意圖分析 + Telegram 推送"""
try:
interpreter = get_drift_interpreter()
analyzer = get_drift_analyzer()
interpretation = await interpreter.analyze(report)
updated = report.model_copy(update={"interpretation": interpretation})
_recent_reports[report.report_id] = updated
diff_summary = analyzer.format_diff_summary(report)
intent_label = {
"emergency_hotfix": "🚨 緊急 Hotfix",
"human_error": "⚠️ 人為誤操作",
"automated_change": "🤖 系統自動變更",
"unknown": "❓ 意圖不明",
}.get(interpretation.intent.value, "❓ 意圖不明")
try:
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
await tg.send_text(
f"🔍 <b>Config Drift 偵測</b>\n"
f"Namespace: {report.namespace}\n"
f"嚴重度: HIGH×{report.high_count} MEDIUM×{report.medium_count}\n\n"
f"<b>意圖分析</b>: {intent_label}\n"
f"{interpretation.explanation}\n"
f"信心: {interpretation.confidence:.0%}\n\n"
f"<b>漂移詳情</b>:\n{diff_summary}\n\n"
f"Report ID: <code>{report.report_id}</code>\n"
f"POST /api/v1/drift/reports/{report.report_id}/rollback — 覆蓋回 Git\n"
f"POST /api/v1/drift/reports/{report.report_id}/adopt — 承認變更"
)
except Exception as e:
import structlog
structlog.get_logger(__name__).warning("drift_telegram_failed", error=str(e))
except Exception as e:
import structlog
structlog.get_logger(__name__).error("drift_analyze_notify_failed", error=str(e))
async def _run_full_scan(namespaces: list[str]) -> None:
"""背景:完整漂移掃描"""
detector = get_drift_detector()
analyzer = get_drift_analyzer()
for namespace in namespaces:
try:
raw = await detector.scan(namespace, triggered_by="cron")
classified = analyzer.classify(raw)
_recent_reports[classified.report_id] = classified
if analyzer.needs_alert(classified):
await _analyze_and_notify(classified)
except Exception as e:
import structlog
structlog.get_logger(__name__).error(
"full_scan_namespace_failed", namespace=namespace, error=str(e)
)

View File

@@ -84,6 +84,15 @@ class Settings(BaseSettings):
default=True,
description="Phase 22: True=異步更新 (先推 OpenClaw), False=同步等待",
)
# 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE Privacy-First 專用 timeout
NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS: int = Field(
default=30,
description="Phase 25 P0: DIAGNOSE 任務 Nemotron timeout (秒),比 Tool Calling 短",
)
OLLAMA_DIAGNOSE_TIMEOUT_SECONDS: int = Field(
default=60,
description="Phase 25 P0: DIAGNOSE 任務 Ollama backup timeout (秒)Ollama 較慢",
)
# ==========================================================================
# CORS - 嚴格白名單 (無 UAT, 無 wildcard)

View File

@@ -530,6 +530,12 @@ class KnowledgeEntryRecord(Base):
nullable=True,
comment="關聯 Playbook Redis Key",
)
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環攔截用症狀 hash (SymptomPattern.compute_hash())
symptoms_hash: Mapped[str | None] = mapped_column(
String(16),
nullable=True,
comment="症狀模式 hash (16字元 SHA256 前綴)Anti-Pattern 閉環攔截使用",
)
# Metrics
view_count: Mapped[int] = mapped_column(
@@ -556,4 +562,6 @@ class KnowledgeEntryRecord(Base):
Index("ix_knowledge_category", "category"),
Index("ix_knowledge_status", "status"),
Index("ix_knowledge_created_at", "created_at"),
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 快速查詢
Index("ix_knowledge_symptoms_hash", "symptoms_hash"),
)

View File

@@ -57,6 +57,7 @@ from src.api.v1 import (
from src.api.v1 import (
signoz_webhook as signoz_webhook_v1, # Phase 21: SignOz → Telegram (ADR-037)
)
from src.api.v1 import drift as drift_v1 # Phase 25 P2: Config Drift Detection
from src.api.v1 import monitoring as monitoring_v1 # 2026-04-03: 監控工具狀態
from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics
from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway
@@ -422,6 +423,9 @@ app.include_router(
app.include_router(
auto_repair_v1.router, prefix="/api/v1", tags=["Auto Repair"]
) # #8: 自動升級決策
app.include_router(
drift_v1.router, prefix="/api/v1", tags=["Drift Detection"]
) # Phase 25 P2: Config Drift Detection
app.include_router(
errors_v1.router, prefix="/api/v1", tags=["Errors"]
) # #40: Sentry 錯誤 BFF API

View File

@@ -0,0 +1,155 @@
"""
Config Drift Detection Models - Phase 25 P2
============================================
GitOps 守門員:偵測 K8s 實際狀態 vs Git YAML 的漂移
設計原則:
- DriftDetector: 只比對,輸出結構化 Diff不判斷嚴重性
- DriftAnalyzer: 白名單過濾、DriftLevel 分級,不解釋意圖
- NemotronDriftInterpreter: 意圖分析(不生成修復指令)
- DriftRemediator: 確定性修復kubectl apply / git push不使用 AI 判斷
版本: v1.0
建立: 2026-04-04 (台北時區)
建立者: ogt (首席架構師設計) + Claude Code (實作)
關聯設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向三
關聯 ADR: 待起草 ADR-057
"""
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
from src.utils.timezone import now_taipei
# =============================================================================
# Enums
# =============================================================================
class DriftLevel(str, Enum):
"""漂移嚴重度分級"""
INFO = "info" # 白名單欄位replicas, resources→ 靜默記錄
MEDIUM = "medium" # 非關鍵欄位 → Telegram 通知,無需緊急處理
HIGH = "high" # 關鍵欄位image, env, ports→ 立即通知,需確認
class DriftIntent(str, Enum):
"""Nemotron 意圖分析結果"""
EMERGENCY_HOTFIX = "emergency_hotfix" # 繞過 CI 的緊急修補
HUMAN_ERROR = "human_error" # 誤操作
AUTOMATED_CHANGE = "automated_change" # 系統自動變更HPA 等)
UNKNOWN = "unknown" # 無法判斷
class DriftStatus(str, Enum):
"""漂移報告處理狀態"""
PENDING = "pending" # 待處理
ACKNOWLEDGED = "acknowledged" # 已知悉(不需要處理)
ROLLED_BACK = "rolled_back" # 已覆蓋回 Git 狀態
ADOPTED = "adopted" # 已承認Git 已更新)
IGNORED = "ignored" # 白名單忽略
# =============================================================================
# Core Models
# =============================================================================
class DriftItem(BaseModel):
"""單一欄位的漂移記錄"""
resource_kind: str = Field(..., description="K8s 資源類型Deployment, Service 等)")
resource_name: str = Field(..., description="K8s 資源名稱")
namespace: str = Field(..., description="K8s namespace")
field_path: str = Field(..., description="欄位路徑(如 spec.template.spec.containers[0].image")
git_value: Any = Field(None, description="Git YAML 中的值")
actual_value: Any = Field(None, description="K8s 中的實際值")
drift_level: DriftLevel = DriftLevel.MEDIUM
is_allowlisted: bool = False # 是否為白名單欄位(靜默記錄)
class DriftInterpretation(BaseModel):
"""Nemotron 意圖分析結果"""
intent: DriftIntent = DriftIntent.UNKNOWN
explanation: str = Field("", description="Nemotron 的意圖說明")
risk: str = Field("MEDIUM", description="風險等級HIGH/MEDIUM/LOW")
confidence: float = Field(0.0, ge=0.0, le=1.0, description="分析信心分數")
class DriftReport(BaseModel):
"""單次漂移掃描的完整報告"""
report_id: str = Field(..., description="報告 ID")
scanned_at: datetime = Field(default_factory=now_taipei)
namespace: str = Field(..., description="掃描的 namespace")
# 漂移項目
items: list[DriftItem] = Field(default_factory=list)
high_count: int = 0
medium_count: int = 0
info_count: int = 0
# Nemotron 分析
interpretation: DriftInterpretation | None = None
# 處理狀態
status: DriftStatus = DriftStatus.PENDING
# 觸發來源
triggered_by: str = Field("cron", description="觸發來源cron / webhook / manual")
# 時間軸
created_at: datetime = Field(default_factory=now_taipei)
resolved_at: datetime | None = None
@property
def has_critical_drift(self) -> bool:
"""是否有需要立即處理的高嚴重度漂移"""
return self.high_count > 0
@property
def summary(self) -> str:
"""單行摘要"""
parts = []
if self.high_count:
parts.append(f"HIGH×{self.high_count}")
if self.medium_count:
parts.append(f"MEDIUM×{self.medium_count}")
if self.info_count:
parts.append(f"INFO×{self.info_count}")
return ", ".join(parts) if parts else "無漂移"
# =============================================================================
# API Request / Response
# =============================================================================
class DriftScanRequest(BaseModel):
"""觸發漂移掃描 Request"""
namespaces: list[str] = Field(
default=["awoooi-prod"],
description="要掃描的 namespace 列表",
)
triggered_by: str = Field(default="api", description="觸發來源")
class DriftScanResponse(BaseModel):
"""漂移掃描結果回應"""
report_id: str
summary: str
high_count: int
medium_count: int
info_count: int
has_critical_drift: bool
interpretation: DriftInterpretation | None = None
class DriftListResponse(BaseModel):
"""漂移報告列表回應"""
items: list[DriftReport]
total: int

View File

@@ -33,6 +33,9 @@ class EntryType(str, Enum):
RUNBOOK = "runbook" # 手動建立的操作手冊
BEST_PRACTICE = "best_practice" # 最佳實踐文章
POSTMORTEM = "postmortem" # 事後分析報告
# 2026-04-04 ogt: Phase 25 P1 — Knowledge Auto-Harvesting 新增類型
AUTO_RUNBOOK = "auto_runbook" # Nemotron 自動生成的 RunbookDRAFT 待人工審核)
ANTI_PATTERN = "anti_pattern" # 修復失敗案例(直接 PUBLISHED阻斷後續重蹈覆轍
class EntrySource(str, Enum):
@@ -47,6 +50,8 @@ class EntryStatus(str, Enum):
REVIEW = "review" # 審核中
APPROVED = "approved" # 已批准
ARCHIVED = "archived" # 已封存
# 2026-04-04 Claude Code: Phase 25 P1 — ANTI_PATTERN 直接發布,無需審核
PUBLISHED = "published" # 已發布ANTI_PATTERN 用,無需人工審核)
# =============================================================================
@@ -61,8 +66,11 @@ class KnowledgeEntryCreate(BaseModel):
category: str = Field(..., min_length=1, max_length=100)
tags: list[str] = Field(default_factory=list)
source: EntrySource = EntrySource.HUMAN
status: EntryStatus = EntryStatus.DRAFT
related_incident_id: str | None = None
related_playbook_id: str | None = None
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環用症狀 hash
symptoms_hash: str | None = None
created_by: str | None = None
@@ -88,6 +96,8 @@ class KnowledgeEntry(BaseModel):
status: EntryStatus = EntryStatus.DRAFT
related_incident_id: str | None = None
related_playbook_id: str | None = None
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環攔截用的症狀 hashSymptomPattern.compute_hash()
symptoms_hash: str | None = None
view_count: int = 0
created_by: str | None = None
created_at: datetime = Field(default_factory=now_taipei)

View File

@@ -97,6 +97,21 @@ class SymptomPattern(BaseModel):
model_config = ConfigDict(extra="ignore")
def compute_hash(self) -> str:
"""
2026-04-04 Claude Code: Phase 25 P1 — Anti-Pattern 閉環攔截用
確定性 hashalert_names + affected_services + label_patterns
目的O(1) 精確比對,避免純語意搜尋的模糊性
"""
import hashlib
import json
key = (
"|".join(sorted(self.alert_names)) + "||"
+ "|".join(sorted(self.affected_services)) + "||"
+ json.dumps(self.label_patterns, sort_keys=True)
)
return hashlib.sha256(key.encode()).hexdigest()[:16]
class RepairStep(BaseModel):
"""

View File

@@ -45,8 +45,12 @@ class KnowledgeDBRepository:
category=data.category,
tags=data.tags,
source=data.source,
# 2026-04-04 ogt: Phase 25 P1 — 支援指定 statusANTI_PATTERN 直接 PUBLISHED
status=data.status,
related_incident_id=data.related_incident_id,
related_playbook_id=data.related_playbook_id,
# 2026-04-04 ogt: Phase 25 P1 — Anti-Pattern 閉環用症狀 hash
symptoms_hash=data.symptoms_hash,
created_by=data.created_by,
)
self.db.add(record)
@@ -268,6 +272,7 @@ class KnowledgeDBRepository:
status=record.status,
related_incident_id=record.related_incident_id,
related_playbook_id=record.related_playbook_id,
symptoms_hash=getattr(record, "symptoms_hash", None),
view_count=record.view_count,
created_by=record.created_by,
created_at=record.created_at,

View File

@@ -160,7 +160,13 @@ class NemotronProvider:
"""
try:
timeout = getattr(settings, "NEMOTRON_TIMEOUT_SECONDS", 30)
# 2026-04-04 ogt: Phase 25 P0 — 根據 task_type 選擇 timeout
# DIAGNOSE 用較短 timeout30s避免拖累整體 AutoRepair 流程
task_type = context.get("task_type", "")
if task_type == "diagnose":
timeout = getattr(settings, "NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS", 30)
else:
timeout = getattr(settings, "NEMOTRON_TIMEOUT_SECONDS", 45)
nvidia = self._get_nvidia()
result = await asyncio.wait_for(

View File

@@ -30,6 +30,7 @@ AI Router - Phase 13.3 #87
| v2.0 | 2026-03-26 | Claude Code | 支援 IntentResult + 新意圖類型 |
| v3.0 | 2026-03-26 | Claude Code | Phase 13.3 #87 完整路由決策矩陣 |
| v4.0 | 2026-04-02 | ogt (首席架構師) | Phase 24 AIProvider Registry + Executor; C1 Langfuse Trace; C2 AIRouter.route(); C3 型別 typo; I4 Protocol close |
| v4.1 | 2026-04-04 | ogt (首席架構師) | Phase 25 P0: DIAGNOSE Privacy-First — _local_fallback_chain; DIAGNOSE→NEMOTRON; REJECT+Telegram |
"""
from __future__ import annotations
@@ -246,13 +247,22 @@ class AIRouter:
(AIProviderEnum.CLAUDE, self._claude_default),
]
# 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE/FORCE_LOCAL 專用鏈
# 隱私邊界:絕不包含任何雲端 Provider到 OLLAMA 為止
self._local_fallback_chain: list[tuple[AIProviderEnum, str]] = [
(AIProviderEnum.NEMOTRON, self._nemotron_default), # NIM 188主力零費用高能力
(AIProviderEnum.OLLAMA, self._ollama_summary), # Ollama 188備援慢但可靠
]
# 意圖對應 Provider 強制覆寫 (None = 依複雜度決定)
self._intent_provider_overrides: dict[IntentType, AIProviderEnum | None] = {
# 四大核心意圖
IntentType.RESTART: None, # 依複雜度
IntentType.SCALE: None, # 依複雜度
IntentType.CONFIG: None, # 依複雜度 (但 HIGH 會升級)
IntentType.DIAGNOSE: AIProviderEnum.OLLAMA, # 診斷優先本地 (隱私)
# 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE 改為 NEMOTRON (NIM 188)
# 原因: 零費用本地 NIM + 高能力; 搭配 _local_fallback_chain 保證不觸碰雲端
IntentType.DIAGNOSE: AIProviderEnum.NEMOTRON, # 診斷優先 NIM 本地 (隱私)
# 輔助意圖
IntentType.DELETE: AIProviderEnum.CLAUDE, # CRITICAL → 強制 Claude
IntentType.ROLLBACK: None, # 依複雜度
@@ -308,7 +318,11 @@ class AIRouter:
)
# Step 4: 建立 Fallback 鏈
fallback_chain = self._build_fallback_chain(provider)
# 2026-04-04 ogt: Phase 25 P0 — DIAGNOSE 使用 local-only 鏈(隱私邊界)
if intent == IntentType.DIAGNOSE:
fallback_chain = [fc for fc in self._local_fallback_chain if fc[0] != provider]
else:
fallback_chain = self._build_fallback_chain(provider)
# Step 5: 計算延遲預算
latency_budget = PROVIDER_LATENCY_BUDGET.get(provider, 30000)
@@ -398,10 +412,11 @@ class AIRouter:
provider_override = self._intent_provider_overrides.get(intent)
if provider_override is not None:
provider = provider_override
# 2026-04-03 ogt: DIAGNOSE/ALERT_TRIAGE summary model (llama3.2:3b)
# 2026-04-03 ogt: ALERT_TRIAGE/QUERY 用 Ollama summary model (llama3.2:3b)
# 避免 qwen2.5:7b-instruct 90秒 timeout 導致全鏈路失敗 (Phase 24 A選項)
# 2026-04-04 ogt: DIAGNOSE 已改為 NEMOTRON不走這條分支
if provider == AIProviderEnum.OLLAMA and intent in (
IntentType.DIAGNOSE, IntentType.ALERT_TRIAGE, IntentType.QUERY
IntentType.ALERT_TRIAGE, IntentType.QUERY
):
model = self._ollama_summary
else:
@@ -951,6 +966,29 @@ class AIRouterExecutor:
_lf_trace_ctx.__exit__(None, None, None)
except Exception:
pass
# 2026-04-04 ogt: Phase 25 P0 — require_local 全部失敗時 Telegram 通知(隱私邊界)
if require_local:
try:
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
import asyncio as _asyncio
_asyncio.create_task(
tg.send_text(
"⚠️ <b>DIAGNOSE 本地 Provider 不可用</b>\n"
f"已嘗試: {', '.join(provider_order)}\n"
"需要人工介入,雲端 Provider 不會被呼叫(隱私邊界)。"
)
)
except Exception as _tg_e:
logger.warning("diagnose_reject_telegram_failed", error=str(_tg_e))
return AIResult(
raw_response="",
success=False,
provider="none",
error="local_providers_unavailable",
)
return AIResult(
raw_response="",
success=False,

View File

@@ -143,6 +143,9 @@ class AutoRepairService:
# 2026-04-01 ogt: 注入 cooldown_checker 支援測試隔離 (DI 原則)
self._playbook_service = playbook_service or get_playbook_service()
self._cooldown_checker = cooldown_checker or check_global_repair_cooldown
# 2026-04-04 Claude Code: Phase 25 P1 — 持有 runbook_generator task 引用,防 GC 回收
import asyncio
self._pending_tasks: set[asyncio.Task] = set()
async def evaluate_auto_repair(
self,
@@ -196,6 +199,33 @@ class AutoRepairService:
# 2. 提取症狀模式
symptoms = self._extract_symptoms(incident)
# 2.1 2026-04-04 Claude Code: Phase 25 P1 — Anti-Pattern 閘門
# 根據確定性 hash 比對近 7 天失敗案例,避免 AI 在同一個坑重複摔倒
try:
from src.services.knowledge_service import get_knowledge_service
symptoms_hash = symptoms.compute_hash()
anti_patterns = await get_knowledge_service().check_anti_pattern(
symptoms_hash, days=7
)
if anti_patterns:
ap = anti_patterns[0]
logger.warning(
"auto_repair_blocked_anti_pattern",
incident_id=incident.incident_id,
symptoms_hash=symptoms_hash,
anti_pattern_id=ap.id,
anti_pattern_title=ap.title,
)
return AutoRepairDecision(
can_auto_repair=False,
reason=f"過去 7 天有失敗案例: {ap.title}",
blocked_by="ANTI_PATTERN",
)
except Exception as _ap_e:
# Anti-Pattern 閘門失敗不阻塞主流程(僅記錄)
logger.warning("anti_pattern_gate_error", error=str(_ap_e))
symptoms_hash = ""
# 3. 找匹配的 Playbook
recommendations = await self._playbook_service.get_recommendations(
symptoms=symptoms,
@@ -324,7 +354,7 @@ class AutoRepairService:
execution_time_ms=execution_time,
)
return AutoRepairResult(
repair_result = AutoRepairResult(
success=True,
playbook_id=playbook.playbook_id,
incident_id=incident.incident_id,
@@ -332,6 +362,25 @@ class AutoRepairService:
execution_time_ms=execution_time,
)
# 2026-04-04 Claude Code: Phase 25 P1 — 成功修復後 fire-and-forget 生成 AUTO_RUNBOOK
try:
from src.services.runbook_generator import get_runbook_generator
symptoms = self._extract_symptoms(incident)
symptoms_hash = symptoms.compute_hash()
gen = get_runbook_generator()
import asyncio as _asyncio
task = _asyncio.create_task(
gen.generate_runbook(incident, playbook, repair_result, symptoms_hash)
)
self._pending_tasks.add(task) if hasattr(self, "_pending_tasks") else None
task.add_done_callback(
lambda t: self._pending_tasks.discard(t) if hasattr(self, "_pending_tasks") else None
)
except Exception as _rg_e:
logger.warning("runbook_generator_task_failed", error=str(_rg_e))
return repair_result
except Exception as e:
# 更新失敗統計
await self._playbook_service.record_execution(
@@ -348,7 +397,7 @@ class AutoRepairService:
error=str(e),
)
return AutoRepairResult(
fail_result = AutoRepairResult(
success=False,
playbook_id=playbook.playbook_id,
incident_id=incident.incident_id,
@@ -357,6 +406,21 @@ class AutoRepairService:
execution_time_ms=execution_time,
)
# 2026-04-04 Claude Code: Phase 25 P1 — 失敗修復後 fire-and-forget 生成 ANTI_PATTERN
try:
from src.services.runbook_generator import get_runbook_generator
symptoms = self._extract_symptoms(incident)
symptoms_hash = symptoms.compute_hash()
gen = get_runbook_generator()
import asyncio as _asyncio
_asyncio.create_task(
gen.generate_anti_pattern(incident, playbook, fail_result, symptoms_hash)
)
except Exception as _ap_e:
logger.warning("anti_pattern_task_failed", error=str(_ap_e))
return fail_result
# === Private Helpers ===
def _extract_symptoms(self, incident: Incident) -> SymptomPattern:

View File

@@ -0,0 +1,106 @@
"""
Drift Analyzer - Phase 25 P2 Config Drift Detection
=====================================================
職責白名單過濾、DriftLevel 分級
不解釋意圖,不生成修復指令
版本: v1.0
建立: 2026-04-04 (台北時區)
建立者: ogt (首席架構師設計) + Claude Code (實作)
"""
from __future__ import annotations
import structlog
from src.models.drift import DriftItem, DriftLevel, DriftReport, DriftStatus
logger = structlog.get_logger(__name__)
class DriftAnalyzer:
"""
分析 DriftReport決定哪些漂移需要告警、哪些靜默記錄
職責邊界:只分級,不解釋意圖,不生成修復指令
"""
def classify(self, report: DriftReport) -> DriftReport:
"""
根據 DriftLevel 分類漂移項目,更新計數
- INFO白名單→ 靜默記錄status 保持 PENDING
- MEDIUM → 需通知,但非緊急
- HIGH → 立即告警
Returns:
更新後的 DriftReportimmutable-friendly回傳新 report
"""
high_count = 0
medium_count = 0
info_count = 0
for item in report.items:
if item.drift_level == DriftLevel.HIGH:
high_count += 1
elif item.drift_level == DriftLevel.MEDIUM:
medium_count += 1
else:
info_count += 1
# 若只有 INFO 漂移,直接標記為 IGNORED不需人工處理
status = report.status
if high_count == 0 and medium_count == 0 and info_count > 0:
status = DriftStatus.IGNORED
logger.info(
"drift_all_allowlisted",
report_id=report.report_id,
info_count=info_count,
)
elif high_count == 0 and medium_count == 0:
status = DriftStatus.IGNORED
return report.model_copy(update={
"high_count": high_count,
"medium_count": medium_count,
"info_count": info_count,
"status": status,
})
def needs_alert(self, report: DriftReport) -> bool:
"""是否需要 Telegram 告警"""
return report.high_count > 0 or report.medium_count > 0
def format_diff_summary(self, report: DriftReport) -> str:
"""格式化漂移差異摘要(給 Telegram 用)"""
if not report.items:
return "無漂移"
lines = []
# HIGH 優先顯示
for item in sorted(report.items, key=lambda i: (i.drift_level != DriftLevel.HIGH, i.field_path)):
if item.is_allowlisted:
continue
level_label = "🔴" if item.drift_level == DriftLevel.HIGH else "🟡"
lines.append(
f"{level_label} {item.resource_kind}/{item.resource_name}.{item.field_path}\n"
f" Git: {str(item.git_value)[:60]}\n"
f" K8s: {str(item.actual_value)[:60]}"
)
if len(lines) >= 5: # 最多顯示 5 項,避免訊息過長
remaining = report.high_count + report.medium_count - len(lines)
if remaining > 0:
lines.append(f"... 另有 {remaining} 項漂移")
break
return "\n".join(lines) if lines else f"{report.info_count} 項白名單漂移(已靜默)"
_analyzer: DriftAnalyzer | None = None
def get_drift_analyzer() -> DriftAnalyzer:
global _analyzer
if _analyzer is None:
_analyzer = DriftAnalyzer()
return _analyzer

View File

@@ -0,0 +1,328 @@
"""
Drift Detector - Phase 25 P2 Config Drift Detection
=====================================================
職責:比對 Git YAML vs K8s 實際狀態,輸出結構化 DriftItem 列表
不判斷嚴重性,不解釋意圖,只做事實比對
版本: v1.0
建立: 2026-04-04 (台北時區)
建立者: ogt (首席架構師設計) + Claude Code (實作)
"""
from __future__ import annotations
import asyncio
import subprocess
import uuid
from pathlib import Path
from typing import Any
import structlog
import yaml
from src.models.drift import DriftItem, DriftLevel, DriftReport
logger = structlog.get_logger(__name__)
# 白名單欄位(靜默記錄,不告警)
_DEFAULT_ALLOWLIST_FIELDS = frozenset([
"spec.replicas",
"spec.template.spec.containers[*].resources.requests",
"spec.template.spec.containers[*].resources.limits",
"metadata.annotations",
"metadata.labels.pod-template-hash",
"metadata.resourceVersion",
"metadata.generation",
"metadata.uid",
"status",
])
# 關鍵欄位(必須立即告警)
_DEFAULT_CRITICAL_FIELDS = frozenset([
"spec.template.spec.containers[*].image",
"spec.template.spec.containers[*].env",
"spec.template.spec.containers[*].ports",
"spec.template.spec.volumes",
"spec.template.spec.serviceAccountName",
])
class GitStateReader:
"""從 Git HEAD 讀取 K8s YAML 狀態"""
def __init__(self, k8s_dir: str = "k8s"):
self._k8s_dir = Path(k8s_dir)
async def read(self, namespace: str) -> dict[str, Any]:
"""
讀取 Git HEAD 中指定 namespace 的所有 K8s YAML
Returns:
{resource_key: parsed_yaml_dict}
resource_key 格式: "{kind}/{name}"
"""
try:
result = await asyncio.get_event_loop().run_in_executor(
None, self._read_sync, namespace
)
return result
except Exception as e:
logger.warning("git_state_read_failed", namespace=namespace, error=str(e))
return {}
def _read_sync(self, namespace: str) -> dict[str, Any]:
resources: dict[str, Any] = {}
if not self._k8s_dir.exists():
logger.warning("k8s_dir_not_found", path=str(self._k8s_dir))
return resources
for yaml_file in self._k8s_dir.rglob("*.yaml"):
try:
with open(yaml_file) as f:
docs = list(yaml.safe_load_all(f))
for doc in docs:
if not doc or not isinstance(doc, dict):
continue
metadata = doc.get("metadata", {})
ns = metadata.get("namespace", "")
if ns and ns != namespace:
continue
kind = doc.get("kind", "")
name = metadata.get("name", "")
if kind and name:
key = f"{kind}/{name}"
resources[key] = doc
except Exception as e:
logger.debug("yaml_parse_failed", file=str(yaml_file), error=str(e))
return resources
class K8sStateReader:
"""從 kubectl 讀取 K8s 實際狀態"""
async def read(self, namespace: str) -> dict[str, Any]:
"""
透過 kubectl 取得指定 namespace 的實際狀態
Returns:
{resource_key: actual_resource_dict}
"""
try:
result = await asyncio.get_event_loop().run_in_executor(
None, self._read_sync, namespace
)
return result
except Exception as e:
logger.warning("k8s_state_read_failed", namespace=namespace, error=str(e))
return {}
def _read_sync(self, namespace: str) -> dict[str, Any]:
resources: dict[str, Any] = {}
resource_types = ["deployment", "service", "configmap", "ingress"]
for rtype in resource_types:
try:
proc = subprocess.run(
["kubectl", "get", rtype, "-n", namespace, "-o", "yaml"],
capture_output=True,
text=True,
timeout=30,
)
if proc.returncode != 0:
logger.debug("kubectl_failed", type=rtype, stderr=proc.stderr[:200])
continue
data = yaml.safe_load(proc.stdout)
if not data or data.get("kind") != "List":
continue
for item in data.get("items", []):
kind = item.get("kind", rtype.capitalize())
name = item.get("metadata", {}).get("name", "")
if name:
key = f"{kind}/{name}"
resources[key] = item
except subprocess.TimeoutExpired:
logger.warning("kubectl_timeout", type=rtype, namespace=namespace)
except Exception as e:
logger.warning("kubectl_error", type=rtype, error=str(e))
return resources
class DriftDetector:
"""
比對 Git vs K8s 實際狀態,輸出 DriftItem 列表
職責邊界:只做事實比對,不判斷嚴重性,不解釋意圖
"""
def __init__(
self,
k8s_dir: str = "k8s",
allowlist_fields: frozenset | None = None,
critical_fields: frozenset | None = None,
):
self._git_reader = GitStateReader(k8s_dir)
self._k8s_reader = K8sStateReader()
self._allowlist = allowlist_fields or _DEFAULT_ALLOWLIST_FIELDS
self._critical_fields = critical_fields or _DEFAULT_CRITICAL_FIELDS
async def scan(self, namespace: str, triggered_by: str = "cron") -> DriftReport:
"""
掃描指定 namespace 的漂移
Args:
namespace: K8s namespace
triggered_by: 觸發來源cron / webhook / api
Returns:
DriftReport含 DriftItem 列表,尚未分析 intent
"""
report_id = str(uuid.uuid4())[:8]
logger.info("drift_scan_start", namespace=namespace, report_id=report_id)
git_state, k8s_state = await asyncio.gather(
self._git_reader.read(namespace),
self._k8s_reader.read(namespace),
)
items: list[DriftItem] = []
# 比對 Git 中有的資源
for resource_key, git_resource in git_state.items():
actual_resource = k8s_state.get(resource_key)
if actual_resource is None:
# 資源在 Git 中存在但 K8s 中不存在(可能尚未部署)
logger.debug("resource_missing_in_k8s", resource=resource_key)
continue
kind, name = resource_key.split("/", 1)
diffs = self._diff_resources(git_resource, actual_resource, kind, name, namespace)
items.extend(diffs)
high_count = sum(1 for i in items if i.drift_level == DriftLevel.HIGH)
medium_count = sum(1 for i in items if i.drift_level == DriftLevel.MEDIUM)
info_count = sum(1 for i in items if i.drift_level == DriftLevel.INFO)
logger.info(
"drift_scan_done",
namespace=namespace,
report_id=report_id,
high=high_count,
medium=medium_count,
info=info_count,
)
return DriftReport(
report_id=report_id,
namespace=namespace,
items=items,
high_count=high_count,
medium_count=medium_count,
info_count=info_count,
triggered_by=triggered_by,
)
def _diff_resources(
self,
git_res: dict,
actual_res: dict,
kind: str,
name: str,
namespace: str,
) -> list[DriftItem]:
"""逐欄位比對兩個資源,回傳差異列表"""
items: list[DriftItem] = []
# 只比對 spec 層metadata 的動態欄位太多)
git_spec = git_res.get("spec", {})
actual_spec = actual_res.get("spec", {})
diffs = self._flatten_diff("spec", git_spec, actual_spec)
for field_path, (git_val, actual_val) in diffs.items():
is_allowlisted = self._is_allowlisted(field_path)
if is_allowlisted:
level = DriftLevel.INFO
elif self._is_critical(field_path):
level = DriftLevel.HIGH
else:
level = DriftLevel.MEDIUM
items.append(DriftItem(
resource_kind=kind,
resource_name=name,
namespace=namespace,
field_path=field_path,
git_value=git_val,
actual_value=actual_val,
drift_level=level,
is_allowlisted=is_allowlisted,
))
return items
def _flatten_diff(
self,
prefix: str,
git_dict: Any,
actual_dict: Any,
) -> dict[str, tuple[Any, Any]]:
"""遞迴展開並比對兩個 dict回傳 {field_path: (git_val, actual_val)}"""
diffs: dict[str, tuple[Any, Any]] = {}
if not isinstance(git_dict, dict) or not isinstance(actual_dict, dict):
if git_dict != actual_dict:
diffs[prefix] = (git_dict, actual_dict)
return diffs
all_keys = set(git_dict.keys()) | set(actual_dict.keys())
for key in all_keys:
path = f"{prefix}.{key}"
git_val = git_dict.get(key)
actual_val = actual_dict.get(key)
if git_val == actual_val:
continue
if isinstance(git_val, dict) and isinstance(actual_val, dict):
diffs.update(self._flatten_diff(path, git_val, actual_val))
else:
diffs[path] = (git_val, actual_val)
return diffs
def _is_allowlisted(self, field_path: str) -> bool:
"""判斷欄位是否在白名單(靜默記錄不告警)"""
for pattern in self._allowlist:
# 簡單前綴匹配(*替換為粗略包含)
clean_pattern = pattern.replace("[*]", "")
if field_path.startswith(clean_pattern.replace("*", "")):
return True
return False
def _is_critical(self, field_path: str) -> bool:
"""判斷欄位是否為關鍵欄位HIGH 等級)"""
for pattern in self._critical_fields:
clean_pattern = pattern.replace("[*]", "")
if clean_pattern.replace("*", "") in field_path:
return True
return False
# =============================================================================
# Singleton
# =============================================================================
_detector: DriftDetector | None = None
def get_drift_detector() -> DriftDetector:
global _detector
if _detector is None:
_detector = DriftDetector()
return _detector

View File

@@ -0,0 +1,173 @@
"""
Drift Interpreter - Phase 25 P2 Config Drift Detection
=======================================================
職責Nemotron 意圖分析(不生成修復指令)
只回答「這是人為操作Hotfix系統自動變更
設計邊界(核心原則):
- 只輸出意圖分析,不生成 kubectl 或 git 指令
- 確定性修復由 DriftRemediator 負責
- Nemotron 超時 → UNKNOWN不阻塞主流程
版本: v1.0
建立: 2026-04-04 (台北時區)
建立者: ogt (首席架構師設計) + Claude Code (實作)
"""
from __future__ import annotations
import asyncio
import json
from typing import TYPE_CHECKING
import structlog
from src.models.drift import DriftIntent, DriftInterpretation, DriftItem
if TYPE_CHECKING:
from src.models.drift import DriftReport
logger = structlog.get_logger(__name__)
_INTENT_PROMPT_TEMPLATE = """你是 AWOOOI GitOps 守門員,請分析以下 K8s 配置漂移的意圖。
## 漂移詳情
{diff_summary}
## 任務
判斷這次漂移最可能的原因:
- emergency_hotfix: 繞過 CI 的緊急修補image tag 改變但無對應 Git commit
- human_error: 誤操作(非預期的隨機欄位改變)
- automated_change: 系統自動變更HPA replicas, 系統注入的 annotation 等)
- unknown: 無法判斷
請以 JSON 回應:
{{
"intent": "emergency_hotfix|human_error|automated_change|unknown",
"explanation": "用繁體中文解釋你的判斷理由(一句話)",
"risk": "HIGH|MEDIUM|LOW",
"confidence": 0.0到1.0之間的數字
}}
只輸出 JSON不要任何額外說明。
"""
class NemotronDriftInterpreter:
"""
使用 Nemotron 分析漂移意圖
職責邊界:
✅ 輸出意圖分析
❌ 不生成修復指令
❌ 不直接呼叫 kubectl 或 git
"""
async def analyze(self, report: "DriftReport") -> DriftInterpretation:
"""
分析漂移意圖
Args:
report: 已分類的 DriftReport
Returns:
DriftInterpretation超時或失敗時回傳 UNKNOWN
"""
if not report.items or (report.high_count == 0 and report.medium_count == 0):
return DriftInterpretation(
intent=DriftIntent.UNKNOWN,
explanation="無顯著漂移,不需要意圖分析",
confidence=1.0,
)
diff_text = self._format_diff_for_prompt(report)
prompt = _INTENT_PROMPT_TEMPLATE.format(diff_summary=diff_text)
result = await self._call_nemotron(prompt)
return result
def _format_diff_for_prompt(self, report: "DriftReport") -> str:
"""格式化 diff 給 Nemotron 分析用"""
lines = []
for item in report.items[:10]: # 最多 10 項避免 token 過多
if item.is_allowlisted:
continue
lines.append(
f"- {item.resource_kind}/{item.resource_name}: "
f"{item.field_path} "
f"Git={str(item.git_value)[:40]}"
f"K8s={str(item.actual_value)[:40]}"
)
return "\n".join(lines) if lines else "(均為白名單欄位)"
async def _call_nemotron(self, prompt: str) -> DriftInterpretation:
"""呼叫 Nemotron 進行意圖分析"""
try:
from src.core.config import get_settings
from src.services.nvidia_provider import get_nvidia_provider
settings = get_settings()
nvidia = get_nvidia_provider()
response_text, success, _tokens, _cost = await asyncio.wait_for(
nvidia.chat(prompt=prompt),
timeout=getattr(settings, "NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS", 30),
)
if not success or not response_text:
return self._unknown_result("Nemotron 回傳空值")
return self._parse_response(response_text)
except asyncio.TimeoutError:
logger.warning("drift_nemotron_timeout")
return self._unknown_result("Nemotron 超時")
except Exception as e:
logger.warning("drift_nemotron_error", error=str(e))
return self._unknown_result(str(e))
def _parse_response(self, text: str) -> DriftInterpretation:
"""解析 Nemotron JSON 回應"""
try:
# 嘗試直接解析
data = json.loads(text)
except Exception:
try:
import re
match = re.search(r"```(?:json)?\s*([\s\S]+?)```", text)
if match:
data = json.loads(match.group(1))
else:
return self._unknown_result("無法解析 JSON")
except Exception:
return self._unknown_result("JSON 解析失敗")
try:
intent_str = data.get("intent", "unknown")
intent = DriftIntent(intent_str) if intent_str in DriftIntent._value2member_map_ else DriftIntent.UNKNOWN
return DriftInterpretation(
intent=intent,
explanation=data.get("explanation", ""),
risk=data.get("risk", "MEDIUM"),
confidence=float(data.get("confidence", 0.0)),
)
except Exception as e:
return self._unknown_result(f"模型解析失敗: {e}")
def _unknown_result(self, reason: str) -> DriftInterpretation:
return DriftInterpretation(
intent=DriftIntent.UNKNOWN,
explanation=f"意圖分析失敗:{reason}",
risk="MEDIUM",
confidence=0.0,
)
_interpreter: NemotronDriftInterpreter | None = None
def get_drift_interpreter() -> NemotronDriftInterpreter:
global _interpreter
if _interpreter is None:
_interpreter = NemotronDriftInterpreter()
return _interpreter

View File

@@ -0,0 +1,233 @@
"""
Drift Remediator - Phase 25 P2 Config Drift Detection
======================================================
職責:確定性修復執行
- rollback()kubectl apply -f <git-yaml>(覆蓋回 Git 狀態)
- adopt()git commit + git push gitea main承認變更更新 Git
設計邊界(核心原則):
- 不使用 AI 判斷如何修復
- 只有人工確認按鈕後才執行
- rollback 失敗只通知,不重試(避免重複操作)
版本: v1.0
建立: 2026-04-04 (台北時區)
建立者: ogt (首席架構師設計) + Claude Code (實作)
"""
from __future__ import annotations
import asyncio
import subprocess
from typing import TYPE_CHECKING
import structlog
if TYPE_CHECKING:
from src.models.drift import DriftItem, DriftReport
logger = structlog.get_logger(__name__)
class DriftRemediator:
"""
確定性漂移修復執行器
職責邊界:
✅ kubectl apply覆蓋回 Git 狀態)
✅ git commit + push承認變更
❌ 不使用 AI 決定修復策略
❌ 不自動重試
"""
def __init__(self, k8s_dir: str = "k8s"):
self._k8s_dir = k8s_dir
async def rollback(
self,
report: "DriftReport",
resource_key: str | None = None,
) -> dict:
"""
覆蓋回 Git 狀態kubectl apply
Args:
report: 漂移報告
resource_key: 指定資源Kind/NameNone 表示全部
Returns:
{"success": bool, "message": str}
"""
logger.info(
"drift_rollback_start",
report_id=report.report_id,
resource=resource_key or "all",
)
try:
result = await asyncio.get_event_loop().run_in_executor(
None,
self._kubectl_apply,
report.namespace,
resource_key,
)
if result["success"]:
logger.info(
"drift_rollback_success",
report_id=report.report_id,
namespace=report.namespace,
)
await self._notify_telegram(
f"✅ 漂移已覆蓋回 Git 狀態\n"
f"Namespace: {report.namespace}\n"
f"資源: {resource_key or '全部'}"
)
else:
logger.error(
"drift_rollback_failed",
report_id=report.report_id,
error=result.get("message"),
)
await self._notify_telegram(
f"❌ 漂移覆蓋失敗,需要人工介入\n"
f"Namespace: {report.namespace}\n"
f"錯誤: {result.get('message', '')[:200]}"
)
return result
except Exception as e:
msg = f"rollback 異常: {str(e)}"
logger.error("drift_rollback_exception", error=str(e))
await self._notify_telegram(
f"❌ 漂移覆蓋異常\nNamespace: {report.namespace}\n錯誤: {str(e)[:200]}"
)
return {"success": False, "message": msg}
async def adopt(
self,
report: "DriftReport",
field_description: str = "",
) -> dict:
"""
承認變更git commit + git push gitea main
Args:
report: 漂移報告
field_description: 漂移欄位說明(用於 commit message
Returns:
{"success": bool, "message": str}
"""
logger.info(
"drift_adopt_start",
report_id=report.report_id,
namespace=report.namespace,
)
# 這裡不直接修改 git需要人工決定具體的值
# 而是提示用戶需要在本地執行 git 操作
# 在實際部署場景中,可透過 Gitea API 建立 PR 或直接 push
commit_msg = (
f"chore: adopt drift — {report.namespace} "
f"{field_description or report.summary}"
)
try:
result = await asyncio.get_event_loop().run_in_executor(
None,
self._git_push,
commit_msg,
)
if result["success"]:
logger.info("drift_adopt_success", report_id=report.report_id)
await self._notify_telegram(
f"✅ 漂移已承認Git 已更新\n"
f"Namespace: {report.namespace}\n"
f"Commit: {commit_msg[:80]}"
)
else:
logger.error("drift_adopt_failed", error=result.get("message"))
await self._notify_telegram(
f"❌ Git 更新失敗,需要人工處理\n"
f"錯誤: {result.get('message', '')[:200]}"
)
return result
except Exception as e:
logger.error("drift_adopt_exception", error=str(e))
return {"success": False, "message": str(e)}
# =========================================================================
# Private
# =========================================================================
def _kubectl_apply(self, namespace: str, resource_key: str | None) -> dict:
"""執行 kubectl apply同步"""
try:
cmd = ["kubectl", "apply", "-f", self._k8s_dir, "-n", namespace, "--dry-run=none"]
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60,
)
if proc.returncode == 0:
return {"success": True, "message": proc.stdout[:500]}
else:
return {"success": False, "message": proc.stderr[:500]}
except subprocess.TimeoutExpired:
return {"success": False, "message": "kubectl apply 超時60s"}
except Exception as e:
return {"success": False, "message": str(e)}
def _git_push(self, commit_msg: str) -> dict:
"""執行 git add + commit + push gitea同步"""
try:
# git add
subprocess.run(["git", "add", "-A"], check=True, timeout=10)
# git commit
subprocess.run(
["git", "commit", "-m", commit_msg],
check=True,
timeout=10,
)
# git push gitea main
proc = subprocess.run(
["git", "push", "gitea", "main"],
capture_output=True,
text=True,
timeout=30,
)
if proc.returncode == 0:
return {"success": True, "message": "已推送至 gitea main"}
else:
return {"success": False, "message": proc.stderr[:500]}
except subprocess.CalledProcessError as e:
return {"success": False, "message": f"git 操作失敗: {e}"}
except subprocess.TimeoutExpired:
return {"success": False, "message": "git push 超時"}
except Exception as e:
return {"success": False, "message": str(e)}
async def _notify_telegram(self, message: str) -> None:
"""推送通知到 Telegram"""
try:
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
await tg.send_text(message)
except Exception as e:
logger.warning("drift_remediator_telegram_failed", error=str(e))
_remediator: DriftRemediator | None = None
def get_drift_remediator() -> DriftRemediator:
global _remediator
if _remediator is None:
_remediator = DriftRemediator()
return _remediator

View File

@@ -223,3 +223,56 @@ class KnowledgeService:
logger.info("embed_all_complete", total=len(rows), success=success, failed=failed)
return {"total": len(rows), "success": success, "failed": failed}
async def check_anti_pattern(
self,
symptoms_hash: str,
days: int = 7,
) -> list[KnowledgeEntry]:
"""
2026-04-04 Claude Code: Phase 25 P1 — Anti-Pattern 閉環閘門
根據 symptoms_hash 查找近期失敗案例,供 auto_repair decide() 攔截用
Args:
symptoms_hash: SymptomPattern.compute_hash() 的 16 字元 hash
days: 查找幾天內的記錄(預設 7 天)
Returns:
list[KnowledgeEntry] — ANTI_PATTERN 條目,空表示無已知失敗案例
"""
from datetime import timedelta
from sqlalchemy import text as sa_text
from src.utils.timezone import now_taipei
cutoff = now_taipei() - timedelta(days=days)
async with get_db_context() as db:
result = await db.execute(
sa_text(
"SELECT id FROM knowledge_entries "
"WHERE entry_type = 'anti_pattern' "
"AND symptoms_hash = :hash "
"AND created_at >= :cutoff "
"AND status != 'archived' "
"ORDER BY created_at DESC LIMIT 5"
),
{"hash": symptoms_hash, "cutoff": cutoff},
)
entry_ids = [row.id for row in result.fetchall()]
if not entry_ids:
return []
entries = []
for eid in entry_ids:
entry = await self.get_entry(eid)
if entry:
entries.append(entry)
logger.info(
"anti_pattern_check",
symptoms_hash=symptoms_hash,
days=days,
found=len(entries),
)
return entries

View File

@@ -0,0 +1,343 @@
"""
Runbook Generator - Phase 25 P1 Knowledge Auto-Harvesting
==========================================================
修復後自動生成 Runbook成功或 Anti-Pattern失敗
透過 Nemotron NIM 生成,結果沉澱至 KM 知識庫
設計原則:
- 非阻塞asyncio.create_task() 呼叫,絕不影響 AutoRepair 主流程
- 失敗靜默:生成失敗只記 log不拋例外
- DRAFT/PUBLISHED成功 → DRAFT需人工審核失敗 → PUBLISHED直接封鎖
版本: v1.1
建立: 2026-04-04 (台北時區)
建立者: ogt (首席架構師設計) + Claude Code (實作)
關聯設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向一
變更紀錄:
| 版本 | 日期 | 執行者 | 變更內容 |
|------|------|--------|----------|
| v1.0 | 2026-04-04 | Claude Code | 初始佔位(使用 generate() 但介面不存在) |
| v1.1 | 2026-04-04 | ogt (首席架構師) | 改用正確的 nvidia.chat() 介面;新增 Minimal fallback |
"""
from __future__ import annotations
import asyncio
import time
from typing import TYPE_CHECKING
import structlog
from src.models.knowledge import EntrySource, EntryStatus, EntryType, KnowledgeEntryCreate
if TYPE_CHECKING:
from src.models.incident import Incident
from src.models.playbook import Playbook
from src.services.auto_repair_service import AutoRepairResult
logger = structlog.get_logger(__name__)
class NemotronRunbookGenerator:
"""
Nemotron 驅動的 Runbook 自動生成器
職責:
- 成功修復 → AUTO_RUNBOOK (DRAFT) + Telegram 審核 card
- 失敗修復 → ANTI_PATTERN (PUBLISHED) + Telegram 通知
leWOOOgo 積木化:
- 呼叫 KnowledgeService不直接存 DB
- 呼叫 NvidiaProvider.chat()(非 AIRouterRunbook 是知識副作用)
"""
_RUNBOOK_SYSTEM = (
"你是 AWOOOI 平台的 SRE Runbook 撰寫專家。"
"根據提供的 Incident 與修復結果,用繁體中文生成完整結構化 Runbook。"
)
_ANTI_PATTERN_SYSTEM = (
"你是 AWOOOI 平台的故障分析專家。"
"根據失敗的修復嘗試,用繁體中文生成失敗案例記錄,幫助未來避免重蹈覆轍。"
)
async def generate_runbook(
self,
incident: "Incident",
playbook: "Playbook",
result: "AutoRepairResult",
symptoms_hash: str,
) -> None:
"""
成功修復後生成 AUTO_RUNBOOKfire-and-forget呼叫方不等待
Args:
incident: 觸發的 Incident
playbook: 執行的 Playbook
result: 執行結果success=True
symptoms_hash: SymptomPattern.compute_hash() 的 hash
"""
try:
content = await self._call_nemotron_for_runbook(incident, playbook, result)
if not content:
return
from src.services.knowledge_service import get_knowledge_service
ks = get_knowledge_service()
entry_data = KnowledgeEntryCreate(
title=f"[AUTO] {incident.incident_id}{playbook.name}",
content=content,
entry_type=EntryType.AUTO_RUNBOOK,
category="auto_generated",
tags=list(incident.affected_services or []) + ["auto_runbook", "nemotron"],
source=EntrySource.AI_EXTRACTED,
status=EntryStatus.DRAFT,
related_incident_id=incident.incident_id,
related_playbook_id=playbook.playbook_id,
symptoms_hash=symptoms_hash,
created_by="nemotron_runbook_generator",
)
entry = await ks.create_entry(entry_data)
logger.info(
"auto_runbook_created",
incident_id=incident.incident_id,
entry_id=entry.id,
playbook_id=playbook.playbook_id,
)
await self._push_runbook_review_card(incident, entry.id, content[:200])
except Exception as e:
logger.error(
"runbook_generation_failed",
incident_id=incident.incident_id,
error=str(e),
)
async def generate_anti_pattern(
self,
incident: "Incident",
playbook: "Playbook",
result: "AutoRepairResult",
symptoms_hash: str,
) -> None:
"""
失敗修復後生成 ANTI_PATTERNfire-and-forget直接 PUBLISHED
Args:
incident: 觸發的 Incident
playbook: 嘗試執行的 Playbook
result: 執行結果success=False
symptoms_hash: SymptomPattern.compute_hash() 的 hash
"""
try:
content = await self._call_nemotron_for_anti_pattern(incident, playbook, result)
if not content:
return
from src.services.knowledge_service import get_knowledge_service
ks = get_knowledge_service()
title = f"[FAIL] {incident.incident_id}{playbook.name}"
entry_data = KnowledgeEntryCreate(
title=title,
content=content,
entry_type=EntryType.ANTI_PATTERN,
category="failure_cases",
tags=list(incident.affected_services or []) + ["anti_pattern", "failure"],
source=EntrySource.AI_EXTRACTED,
status=EntryStatus.PUBLISHED, # 直接發布,無需審核
related_incident_id=incident.incident_id,
related_playbook_id=playbook.playbook_id,
symptoms_hash=symptoms_hash,
created_by="nemotron_runbook_generator",
)
entry = await ks.create_entry(entry_data)
logger.info(
"anti_pattern_created",
incident_id=incident.incident_id,
entry_id=entry.id,
symptoms_hash=symptoms_hash,
)
await self._push_anti_pattern_notification(incident, title)
except Exception as e:
logger.error(
"anti_pattern_generation_failed",
incident_id=incident.incident_id,
error=str(e),
)
# =========================================================================
# Private
# =========================================================================
async def _call_nemotron_for_runbook(
self,
incident: "Incident",
playbook: "Playbook",
result: "AutoRepairResult",
) -> str:
"""呼叫 Nemotron chat() 生成 9 段 Runbook回傳 Markdown 字串"""
from src.core.config import get_settings
from src.services.nvidia_provider import get_nvidia_provider
settings = get_settings()
prompt = (
f"## Incident 資訊\n"
f"- ID: {incident.incident_id}\n"
f"- 受影響服務: {', '.join(incident.affected_services or [])}\n"
f"- 嚴重度: {incident.severity.value if incident.severity else 'unknown'}\n\n"
f"## 執行的 Playbook\n"
f"- 名稱: {playbook.name}\n"
f"- 執行步驟:\n"
+ "\n".join(f" {s}" for s in result.executed_steps[:5])
+ f"\n\n## 執行結果\n- 狀態: 成功,耗時 {result.execution_time_ms}ms\n\n"
"請生成包含以下 9 段的 RunbookMarkdown 格式):\n"
"1. ## 症狀描述\n2. ## 根因分析\n3. ## 執行步驟\n"
"4. ## 驗證步驟\n5. ## 注意事項\n6. ## 影響範圍\n"
"7. ## 相關 Incident\n8. ## 下次預防建議\n9. ## 適用條件"
)
try:
nvidia = get_nvidia_provider()
start = time.time()
# chat() 回傳 (response_text, success, total_tokens, cost_usd)
response_text, success, _tokens, _cost = await asyncio.wait_for(
nvidia.chat(prompt=f"[SYSTEM]{self._RUNBOOK_SYSTEM}\n\n{prompt}"),
timeout=settings.NEMOTRON_TIMEOUT_SECONDS,
)
latency_ms = (time.time() - start) * 1000
logger.info("runbook_nemotron_call_ok", latency_ms=round(latency_ms, 1))
if success and response_text:
return response_text
except Exception as e:
logger.warning("runbook_nemotron_call_failed", error=str(e))
# Fallback組裝基本 Runbook
return self._build_minimal_runbook(incident, playbook, result)
async def _call_nemotron_for_anti_pattern(
self,
incident: "Incident",
playbook: "Playbook",
result: "AutoRepairResult",
) -> str:
"""呼叫 Nemotron chat() 生成失敗案例記錄,回傳 Markdown 字串"""
from src.core.config import get_settings
from src.services.nvidia_provider import get_nvidia_provider
settings = get_settings()
prompt = (
f"## Incident 資訊\n"
f"- ID: {incident.incident_id}\n"
f"- 受影響服務: {', '.join(incident.affected_services or [])}\n\n"
f"## 嘗試的 Playbook\n- 名稱: {playbook.name}\n\n"
f"## 失敗原因\n{result.error or '執行中發生未知異常'}\n\n"
"請生成失敗案例文件Markdown 格式),包含:\n"
"## 症狀描述\n## 嘗試的修復方案\n## 失敗原因分析\n"
"## 已知不適用條件\n## 替代方案建議"
)
try:
nvidia = get_nvidia_provider()
response_text, success, _tokens, _cost = await asyncio.wait_for(
nvidia.chat(prompt=f"[SYSTEM]{self._ANTI_PATTERN_SYSTEM}\n\n{prompt}"),
timeout=settings.NEMOTRON_TIMEOUT_SECONDS,
)
if success and response_text:
return response_text
except Exception as e:
logger.warning("anti_pattern_nemotron_call_failed", error=str(e))
return self._build_minimal_anti_pattern(incident, playbook, result)
def _build_minimal_runbook(
self,
incident: "Incident",
playbook: "Playbook",
result: "AutoRepairResult",
) -> str:
"""Nemotron 超時/失敗時的基本 Runbook fallback"""
steps = "\n".join(f"- {s}" for s in result.executed_steps)
return (
f"## 症狀描述\nIncident {incident.incident_id}"
f"受影響服務:{', '.join(incident.affected_services or [])}\n\n"
f"## 執行步驟\n{steps}\n\n"
f"## 執行結果\n成功,耗時 {result.execution_time_ms}ms\n\n"
"*本文件由系統自動生成Nemotron fallback建議人工補充完善。*"
)
def _build_minimal_anti_pattern(
self,
incident: "Incident",
playbook: "Playbook",
result: "AutoRepairResult",
) -> str:
"""Nemotron 超時/失敗時的基本 Anti-Pattern fallback"""
return (
f"## 症狀描述\nIncident {incident.incident_id}"
f"受影響服務:{', '.join(incident.affected_services or [])}\n\n"
f"## 失敗原因\n{result.error or '執行中發生異常'}\n\n"
f"## 已知不適用條件\nPlaybook '{playbook.name}' 在此症狀下失敗,請勿自動重試。\n\n"
"*本文件由系統自動生成Nemotron fallback。*"
)
async def _push_runbook_review_card(
self,
incident: "Incident",
entry_id: str,
content_preview: str,
) -> None:
"""推送 Runbook 審核 card 到 Telegram"""
try:
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
await tg.send_text(
f"📄 <b>Auto Runbook 待審核</b>\n"
f"Incident: <code>{incident.incident_id}</code>\n"
f"Entry ID: <code>{entry_id}</code>\n\n"
f"<i>{content_preview}...</i>\n\n"
f"請至知識庫審核並發布。"
)
except Exception as e:
logger.warning("runbook_review_card_failed", error=str(e))
async def _push_anti_pattern_notification(
self,
incident: "Incident",
title: str,
) -> None:
"""推送 Anti-Pattern 已記錄通知到 Telegram"""
try:
from src.services.telegram_gateway import get_telegram_gateway
tg = get_telegram_gateway()
await tg.send_text(
f"⚠️ <b>已記錄失敗案例</b>\n"
f"Incident: <code>{incident.incident_id}</code>\n"
f"標題: {title}\n\n"
f"相同症狀的後續告警將阻斷自動修復,要求人工介入。"
)
except Exception as e:
logger.warning("anti_pattern_notification_failed", error=str(e))
# =============================================================================
# 單例管理
# =============================================================================
_generator: NemotronRunbookGenerator | None = None
def get_runbook_generator() -> NemotronRunbookGenerator:
global _generator
if _generator is None:
_generator = NemotronRunbookGenerator()
return _generator

71
k8s/drift-cronjob.yaml Normal file
View File

@@ -0,0 +1,71 @@
# Config Drift Detection CronJob - Phase 25 P2
# 每小時掃描 awoooi-prod namespace 的配置漂移
#
# 建立時間: 2026-04-04 (台北時區)
# 建立者: Claude Code (Phase 25 P2)
# 關聯設計: docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md 方向三
# 關聯 ADR: 待起草 ADR-057
#
# 部署: kubectl apply -f k8s/drift-cronjob.yaml -n awoooi-prod
# 手動觸發: kubectl create job --from=cronjob/drift-scanner drift-scan-manual -n awoooi-prod
# 查看 log: kubectl logs -l job-name=drift-scanner -n awoooi-prod
apiVersion: batch/v1
kind: CronJob
metadata:
name: drift-scanner
namespace: awoooi-prod
labels:
app: awoooi
component: drift-scanner
phase: "25"
annotations:
# 2026-04-04 ogt: Phase 25 P2 — Config Drift Detection
description: "每小時掃描 K8s 配置漂移,由 Nemotron 做意圖分析"
spec:
# 每小時整點執行(台北時間 = UTC+8schedule 用 UTC
schedule: "0 * * * *"
concurrencyPolicy: Forbid # 禁止並發:上次未完成則跳過
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 5
startingDeadlineSeconds: 60 # 錯過時間窗口超過 60s 則跳過
jobTemplate:
spec:
backoffLimit: 0 # 失敗不重試(漂移掃描冪等,下次 cron 自動補掃)
activeDeadlineSeconds: 300 # 最長 5 分鐘
template:
metadata:
labels:
app: awoooi
component: drift-scanner
spec:
restartPolicy: Never
serviceAccountName: awoooi-api # 使用 API 的 ServiceAccount有 kubectl 權限)
containers:
- name: drift-scanner
# 使用 awoooi-api 鏡像(含 kubectl + Python 環境)
image: harbor.wooo.work/awoooi/api:latest
imagePullPolicy: Always
command:
- python
- -c
- |
import asyncio, httpx, os
API_URL = os.environ.get("INTERNAL_API_URL", "http://awoooi-api:8000")
async def run():
async with httpx.AsyncClient(timeout=240) as c:
r = await c.post(f"{API_URL}/api/v1/drift/internal/scan")
print(f"status={r.status_code} body={r.text[:200]}")
asyncio.run(run())
env:
- name: INTERNAL_API_URL
value: "http://awoooi-api.awoooi-prod.svc.cluster.local:8000"
- name: DRIFT_SCAN_NAMESPACES
value: "awoooi-prod"
resources:
requests:
cpu: "50m"
memory: "64Mi"
limits:
cpu: "200m"
memory: "256Mi"