feat(drift): B4 drift_reports DB 持久化 + CronJob 修復
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 12m17s

- drift_repository.py: DriftReportRepository (save/get/list/update)
- drift.py router: 移除 in-memory dict,改用 DB repository
- drift-cronjob.yaml: 修正 SA/NetworkPolicy/NodePort 問題
- allow-intra-namespace NetworkPolicy (已套用至 prod)
- migrate-phase8/9: symptoms_hash + drift_reports migration Job YAML

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-09 20:28:55 +08:00
parent b1e207ffae
commit c92cdeea0f
5 changed files with 356 additions and 22 deletions

View File

@@ -21,6 +21,7 @@ from src.models.drift import (
DriftScanRequest,
DriftScanResponse,
)
from src.repositories.drift_repository import get_drift_repository
from src.services.drift_analyzer import get_drift_analyzer
from src.services.drift_detector import get_drift_detector
from src.services.drift_interpreter import get_drift_interpreter
@@ -28,8 +29,7 @@ from src.services.drift_remediator import get_drift_remediator
router = APIRouter(prefix="/drift", tags=["drift"])
# 本次 session 的漂移報告暫存prod 應存 DB
_recent_reports: dict[str, DriftReport] = {}
# 2026-04-09 Claude Sonnet 4.6: B4 drift_reports 持久化 — 改用 DB repository
@router.post("/scan", response_model=DriftScanResponse, summary="觸發漂移掃描")
@@ -49,6 +49,7 @@ async def trigger_drift_scan(
detector = get_drift_detector()
analyzer = get_drift_analyzer()
repo = get_drift_repository()
all_items = []
last_report: DriftReport | None = None
@@ -57,18 +58,12 @@ async def trigger_drift_scan(
classified_report = analyzer.classify(raw_report)
all_items.extend(classified_report.items)
if analyzer.needs_alert(classified_report):
# Nemotron 意圖分析(背景執行,避免阻塞)
background_tasks.add_task(
_analyze_and_notify, classified_report
)
last_report = classified_report
# 持久化到 DB
await repo.save(classified_report)
# 暫存(最多 50 筆)
_recent_reports[classified_report.report_id] = classified_report
if len(_recent_reports) > 50:
oldest_key = next(iter(_recent_reports))
del _recent_reports[oldest_key]
if analyzer.needs_alert(classified_report):
background_tasks.add_task(_analyze_and_notify, classified_report)
last_report = classified_report
# 若多 namespace彙總第一個 report 的計數
if last_report:
@@ -94,7 +89,8 @@ async def trigger_drift_scan(
@router.get("/reports", response_model=DriftListResponse, summary="列出最近漂移報告")
async def list_drift_reports() -> DriftListResponse:
"""列出最近 50 筆漂移報告(倒序)"""
items = list(reversed(list(_recent_reports.values())))
repo = get_drift_repository()
items = await repo.list_recent(limit=50)
return DriftListResponse(items=items, total=len(items))
@@ -105,7 +101,8 @@ async def rollback_drift(report_id: str) -> dict:
人工確認後才執行DriftRemediator 負責確定性修復
"""
report = _recent_reports.get(report_id)
repo = get_drift_repository()
report = await repo.get(report_id)
if not report:
raise HTTPException(status_code=404, detail=f"Report {report_id} not found")
@@ -122,7 +119,8 @@ async def adopt_drift(report_id: str) -> dict:
2026-04-05 Claude Code: ADR-057 實作 — 改用 Gitea PR API不再 git push main
流程: 建立 drift/adopt-* branch → commit YAML 注解 → 建立 PR → Telegram 通知 SRE
"""
report = _recent_reports.get(report_id)
repo = get_drift_repository()
report = await repo.get(report_id)
if not report:
raise HTTPException(status_code=404, detail=f"Report {report_id} not found")
@@ -161,8 +159,8 @@ async def _analyze_and_notify(report: DriftReport) -> None:
analyzer = get_drift_analyzer()
interpretation = await interpreter.analyze(report)
updated = report.model_copy(update={"interpretation": interpretation})
_recent_reports[report.report_id] = updated
repo = get_drift_repository()
await repo.update_interpretation(report.report_id, interpretation)
diff_summary = analyzer.format_diff_summary(report)
intent_label = {
@@ -201,11 +199,12 @@ async def _run_full_scan(namespaces: list[str]) -> None:
detector = get_drift_detector()
analyzer = get_drift_analyzer()
repo = get_drift_repository()
for namespace in namespaces:
try:
raw = await detector.scan(namespace, triggered_by="cron")
classified = analyzer.classify(raw)
_recent_reports[classified.report_id] = classified
await repo.save(classified)
if analyzer.needs_alert(classified):
await _analyze_and_notify(classified)

View File

@@ -0,0 +1,164 @@
"""
Drift Report Repository - PostgreSQL 實作
==========================================
Phase 25 P2 B4: drift_reports 表 DB 持久化
職責: DriftReport 的 CRUD 操作(取代 in-memory dict
設計: raw SQL via SQLAlchemy text()(表由 phase9 migration 建立)
版本: v1.0
建立: 2026-04-09 (台北時區)
建立者: Claude Sonnet 4.6 (B4 drift_reports 持久化)
"""
import json
from datetime import datetime
import structlog
from sqlalchemy import text
from src.db.base import get_db_context
from src.models.drift import DriftInterpretation, DriftIntent, DriftItem, DriftLevel, DriftReport, DriftStatus
logger = structlog.get_logger(__name__)
_MAX_REPORTS = 200 # DB 最多保留筆數(定期清理)
def _report_to_row(report: DriftReport) -> dict:
"""DriftReport → DB row dict"""
return {
"report_id": report.report_id,
"namespace": report.namespace,
"triggered_by": report.triggered_by,
"scanned_at": report.scanned_at,
"high_count": report.high_count,
"medium_count": report.medium_count,
"info_count": report.info_count,
"items": json.dumps([item.model_dump() for item in report.items]),
"interpretation": json.dumps(report.interpretation.model_dump()) if report.interpretation else None,
"status": report.status.value,
"created_at": report.created_at,
"resolved_at": report.resolved_at,
}
def _row_to_report(row) -> DriftReport:
"""DB row → DriftReport"""
items = []
for item_data in (row.items or []):
item_data["drift_level"] = DriftLevel(item_data.get("drift_level", "medium"))
items.append(DriftItem(**item_data))
interpretation = None
if row.interpretation:
d = row.interpretation
interpretation = DriftInterpretation(
intent=DriftIntent(d.get("intent", "unknown")),
explanation=d.get("explanation", ""),
risk=d.get("risk", "MEDIUM"),
confidence=d.get("confidence", 0.0),
)
return DriftReport(
report_id=row.report_id,
namespace=row.namespace,
triggered_by=row.triggered_by,
scanned_at=row.scanned_at,
high_count=row.high_count,
medium_count=row.medium_count,
info_count=row.info_count,
items=items,
interpretation=interpretation,
status=DriftStatus(row.status),
created_at=row.created_at,
resolved_at=row.resolved_at,
)
class DriftReportRepository:
"""drift_reports 表的 CRUD 操作"""
async def save(self, report: DriftReport) -> None:
"""新增或更新漂移報告upsert"""
row = _report_to_row(report)
async with get_db_context() as db:
await db.execute(
text("""
INSERT INTO drift_reports
(report_id, namespace, triggered_by, scanned_at,
high_count, medium_count, info_count,
items, interpretation, status, created_at, resolved_at)
VALUES
(:report_id, :namespace, :triggered_by, :scanned_at,
:high_count, :medium_count, :info_count,
:items::jsonb, :interpretation::jsonb, :status, :created_at, :resolved_at)
ON CONFLICT (report_id) DO UPDATE SET
items = EXCLUDED.items,
interpretation = EXCLUDED.interpretation,
status = EXCLUDED.status,
resolved_at = EXCLUDED.resolved_at,
high_count = EXCLUDED.high_count,
medium_count = EXCLUDED.medium_count,
info_count = EXCLUDED.info_count
"""),
row,
)
logger.info("drift_report_saved", report_id=report.report_id, namespace=report.namespace)
async def get(self, report_id: str) -> DriftReport | None:
"""依 report_id 查詢"""
async with get_db_context() as db:
result = await db.execute(
text("SELECT * FROM drift_reports WHERE report_id = :report_id"),
{"report_id": report_id},
)
row = result.fetchone()
return _row_to_report(row) if row else None
async def list_recent(self, limit: int = 50) -> list[DriftReport]:
"""列出最近 N 筆(倒序)"""
async with get_db_context() as db:
result = await db.execute(
text("SELECT * FROM drift_reports ORDER BY created_at DESC LIMIT :limit"),
{"limit": limit},
)
rows = result.fetchall()
return [_row_to_report(r) for r in rows]
async def update_status(self, report_id: str, status: DriftStatus, resolved_at: datetime | None = None) -> None:
"""更新處理狀態"""
async with get_db_context() as db:
await db.execute(
text("""
UPDATE drift_reports
SET status = :status, resolved_at = :resolved_at
WHERE report_id = :report_id
"""),
{"report_id": report_id, "status": status.value, "resolved_at": resolved_at},
)
async def update_interpretation(self, report_id: str, interpretation: DriftInterpretation) -> None:
"""更新 Nemotron 意圖分析結果"""
async with get_db_context() as db:
await db.execute(
text("""
UPDATE drift_reports
SET interpretation = :interpretation::jsonb
WHERE report_id = :report_id
"""),
{
"report_id": report_id,
"interpretation": json.dumps(interpretation.model_dump()),
},
)
_drift_repo: DriftReportRepository | None = None
def get_drift_repository() -> DriftReportRepository:
global _drift_repo
if _drift_repo is None:
_drift_repo = DriftReportRepository()
return _drift_repo

View File

@@ -40,11 +40,13 @@ spec:
component: drift-scanner
spec:
restartPolicy: Never
serviceAccountName: awoooi-api # 使用 API 的 ServiceAccount有 kubectl 權限)
# 2026-04-09 Claude Sonnet 4.6: awoooi-api SA 不存在,改用 default只需呼叫內部 API不需 K8s 權限)
serviceAccountName: default
containers:
- name: drift-scanner
# 使用 awoooi-api 鏡像(含 kubectl + Python 環境)
image: harbor.wooo.work/awoooi/api:latest
# 2026-04-09 Claude Sonnet 4.6: 改用內網 registry + 固定 SHA tag (禁止 latest)
image: 192.168.0.110:5000/awoooi/api:21567a7a6dbee7db2c0f59c265f80713ff5e6fe4
imagePullPolicy: Always
command:
- python
@@ -58,8 +60,10 @@ spec:
print(f"status={r.status_code} body={r.text[:200]}")
asyncio.run(run())
env:
# 2026-04-09 Claude Sonnet 4.6: ClusterIP 和 DNS 在 Job Pod 均不可達
# 改用 NodePort 直連 K3s worker node同 K8s_API_SERVER_URL 解法)
- name: INTERNAL_API_URL
value: "http://awoooi-api.awoooi-prod.svc.cluster.local:8000"
value: "http://192.168.0.121:32334"
- name: DRIFT_SCAN_NAMESPACES
value: "awoooi-prod"
resources:

View File

@@ -0,0 +1,84 @@
apiVersion: batch/v1
kind: Job
metadata:
name: migrate-phase8-symptoms-hash
namespace: awoooi-prod
labels:
app: awoooi-migration
phase: phase25
spec:
ttlSecondsAfterFinished: 300
backoffLimit: 1
template:
spec:
restartPolicy: Never
containers:
- name: migrate
image: postgres:15-alpine
command:
- /bin/sh
- -c
- |
echo "=========================================="
echo "Phase 25 P1: knowledge_entries symptoms_hash Migration"
echo "=========================================="
DB_HOST=$(echo $DATABASE_URL | sed 's/.*@\([^:]*\):.*/\1/')
DB_PORT=$(echo $DATABASE_URL | sed 's/.*:\([0-9]*\)\/.*/\1/')
DB_NAME=$(echo $DATABASE_URL | sed 's/.*\/\([^?]*\).*/\1/')
DB_USER=$(echo $DATABASE_URL | sed 's/.*\/\/\([^:]*\):.*/\1/')
DB_PASS=$(echo $DATABASE_URL | sed 's/.*:\/\/[^:]*:\([^@]*\)@.*/\1/')
echo "Connecting to: $DB_HOST:$DB_PORT/$DB_NAME"
export PGPASSWORD="$DB_PASS"
psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" <<'EOSQL'
-- Phase 25 P1: Anti-Pattern symptoms_hash 欄位
-- 2026-04-09 Claude Sonnet 4.6
ALTER TABLE knowledge_entries
ADD COLUMN IF NOT EXISTS symptoms_hash VARCHAR(16);
CREATE INDEX IF NOT EXISTS idx_knowledge_anti_pattern_hash
ON knowledge_entries (entry_type, symptoms_hash, created_at)
WHERE entry_type = 'anti_pattern' AND symptoms_hash IS NOT NULL;
-- 自動處理 CHECK constraint若有
DO $$
DECLARE
v_conname text;
BEGIN
SELECT conname INTO v_conname
FROM pg_constraint
WHERE conrelid = 'knowledge_entries'::regclass AND contype = 'c' AND conname LIKE '%status%';
IF v_conname IS NOT NULL THEN
EXECUTE format('ALTER TABLE knowledge_entries DROP CONSTRAINT %I', v_conname);
ALTER TABLE knowledge_entries ADD CONSTRAINT knowledge_entries_status_check
CHECK (status IN ('draft', 'review', 'approved', 'archived', 'published'));
RAISE NOTICE 'Updated status CHECK constraint: % → added published', v_conname;
ELSE
RAISE NOTICE 'No status CHECK constraint found, skipping';
END IF;
END $$;
-- 驗證
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'knowledge_entries'
ORDER BY ordinal_position;
EOSQL
echo "=========================================="
echo "Migration completed!"
echo "=========================================="
envFrom:
- secretRef:
name: awoooi-secrets
resources:
requests:
memory: "64Mi"
cpu: "100m"
limits:
memory: "128Mi"
cpu: "200m"

View File

@@ -0,0 +1,83 @@
apiVersion: batch/v1
kind: Job
metadata:
name: migrate-phase9-drift-reports
namespace: awoooi-prod
labels:
app: awoooi-migration
phase: phase25
spec:
ttlSecondsAfterFinished: 300
backoffLimit: 1
template:
spec:
restartPolicy: Never
containers:
- name: migrate
image: postgres:15-alpine
command:
- /bin/sh
- -c
- |
echo "=========================================="
echo "Phase 25 P2: drift_reports 資料表 Migration"
echo "=========================================="
DB_HOST=$(echo $DATABASE_URL | sed 's/.*@\([^:]*\):.*/\1/')
DB_PORT=$(echo $DATABASE_URL | sed 's/.*:\([0-9]*\)\/.*/\1/')
DB_NAME=$(echo $DATABASE_URL | sed 's/.*\/\([^?]*\).*/\1/')
DB_USER=$(echo $DATABASE_URL | sed 's/.*\/\/\([^:]*\):.*/\1/')
DB_PASS=$(echo $DATABASE_URL | sed 's/.*:\/\/[^:]*:\([^@]*\)@.*/\1/')
echo "Connecting to: $DB_HOST:$DB_PORT/$DB_NAME"
export PGPASSWORD="$DB_PASS"
psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" <<'EOSQL'
-- Phase 25 P2: Config Drift Detection — drift_reports 資料表
-- 2026-04-09 Claude Sonnet 4.6
CREATE TABLE IF NOT EXISTS drift_reports (
report_id VARCHAR(32) PRIMARY KEY,
namespace VARCHAR(128) NOT NULL,
triggered_by VARCHAR(64) NOT NULL DEFAULT 'cron',
scanned_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
high_count INT NOT NULL DEFAULT 0,
medium_count INT NOT NULL DEFAULT 0,
info_count INT NOT NULL DEFAULT 0,
items JSONB NOT NULL DEFAULT '[]',
interpretation JSONB,
status VARCHAR(32) NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
resolved_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_drift_reports_namespace
ON drift_reports(namespace);
CREATE INDEX IF NOT EXISTS idx_drift_reports_status
ON drift_reports(status);
CREATE INDEX IF NOT EXISTS idx_drift_reports_created_at
ON drift_reports(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_drift_reports_high_count
ON drift_reports(high_count)
WHERE high_count > 0;
-- 驗證
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'drift_reports'
ORDER BY ordinal_position;
EOSQL
echo "=========================================="
echo "Migration completed!"
echo "=========================================="
envFrom:
- secretRef:
name: awoooi-secrets
resources:
requests:
memory: "64Mi"
cpu: "100m"
limits:
memory: "128Mi"
cpu: "200m"