diff --git a/apps/api/src/api/v1/drift.py b/apps/api/src/api/v1/drift.py index 04920335..41c4c306 100644 --- a/apps/api/src/api/v1/drift.py +++ b/apps/api/src/api/v1/drift.py @@ -21,6 +21,7 @@ from src.models.drift import ( DriftScanRequest, DriftScanResponse, ) +from src.repositories.drift_repository import get_drift_repository from src.services.drift_analyzer import get_drift_analyzer from src.services.drift_detector import get_drift_detector from src.services.drift_interpreter import get_drift_interpreter @@ -28,8 +29,7 @@ from src.services.drift_remediator import get_drift_remediator router = APIRouter(prefix="/drift", tags=["drift"]) -# 本次 session 的漂移報告暫存(prod 應存 DB) -_recent_reports: dict[str, DriftReport] = {} +# 2026-04-09 Claude Sonnet 4.6: B4 drift_reports 持久化 — 改用 DB repository @router.post("/scan", response_model=DriftScanResponse, summary="觸發漂移掃描") @@ -49,6 +49,7 @@ async def trigger_drift_scan( detector = get_drift_detector() analyzer = get_drift_analyzer() + repo = get_drift_repository() all_items = [] last_report: DriftReport | None = None @@ -57,18 +58,12 @@ async def trigger_drift_scan( classified_report = analyzer.classify(raw_report) all_items.extend(classified_report.items) - if analyzer.needs_alert(classified_report): - # Nemotron 意圖分析(背景執行,避免阻塞) - background_tasks.add_task( - _analyze_and_notify, classified_report - ) - last_report = classified_report + # 持久化到 DB + await repo.save(classified_report) - # 暫存(最多 50 筆) - _recent_reports[classified_report.report_id] = classified_report - if len(_recent_reports) > 50: - oldest_key = next(iter(_recent_reports)) - del _recent_reports[oldest_key] + if analyzer.needs_alert(classified_report): + background_tasks.add_task(_analyze_and_notify, classified_report) + last_report = classified_report # 若多 namespace,彙總第一個 report 的計數 if last_report: @@ -94,7 +89,8 @@ async def trigger_drift_scan( @router.get("/reports", response_model=DriftListResponse, summary="列出最近漂移報告") async def list_drift_reports() -> DriftListResponse: """列出最近 50 筆漂移報告(倒序)""" - items = list(reversed(list(_recent_reports.values()))) + repo = get_drift_repository() + items = await repo.list_recent(limit=50) return DriftListResponse(items=items, total=len(items)) @@ -105,7 +101,8 @@ async def rollback_drift(report_id: str) -> dict: 人工確認後才執行,DriftRemediator 負責確定性修復 """ - report = _recent_reports.get(report_id) + repo = get_drift_repository() + report = await repo.get(report_id) if not report: raise HTTPException(status_code=404, detail=f"Report {report_id} not found") @@ -122,7 +119,8 @@ async def adopt_drift(report_id: str) -> dict: 2026-04-05 Claude Code: ADR-057 實作 — 改用 Gitea PR API(不再 git push main) 流程: 建立 drift/adopt-* branch → commit YAML 注解 → 建立 PR → Telegram 通知 SRE """ - report = _recent_reports.get(report_id) + repo = get_drift_repository() + report = await repo.get(report_id) if not report: raise HTTPException(status_code=404, detail=f"Report {report_id} not found") @@ -161,8 +159,8 @@ async def _analyze_and_notify(report: DriftReport) -> None: analyzer = get_drift_analyzer() interpretation = await interpreter.analyze(report) - updated = report.model_copy(update={"interpretation": interpretation}) - _recent_reports[report.report_id] = updated + repo = get_drift_repository() + await repo.update_interpretation(report.report_id, interpretation) diff_summary = analyzer.format_diff_summary(report) intent_label = { @@ -201,11 +199,12 @@ async def _run_full_scan(namespaces: list[str]) -> None: detector = get_drift_detector() analyzer = get_drift_analyzer() + repo = get_drift_repository() for namespace in namespaces: try: raw = await detector.scan(namespace, triggered_by="cron") classified = analyzer.classify(raw) - _recent_reports[classified.report_id] = classified + await repo.save(classified) if analyzer.needs_alert(classified): await _analyze_and_notify(classified) diff --git a/apps/api/src/repositories/drift_repository.py b/apps/api/src/repositories/drift_repository.py new file mode 100644 index 00000000..4976577b --- /dev/null +++ b/apps/api/src/repositories/drift_repository.py @@ -0,0 +1,164 @@ +""" +Drift Report Repository - PostgreSQL 實作 +========================================== +Phase 25 P2 B4: drift_reports 表 DB 持久化 + +職責: DriftReport 的 CRUD 操作(取代 in-memory dict) +設計: raw SQL via SQLAlchemy text()(表由 phase9 migration 建立) + +版本: v1.0 +建立: 2026-04-09 (台北時區) +建立者: Claude Sonnet 4.6 (B4 drift_reports 持久化) +""" + +import json +from datetime import datetime + +import structlog +from sqlalchemy import text + +from src.db.base import get_db_context +from src.models.drift import DriftInterpretation, DriftIntent, DriftItem, DriftLevel, DriftReport, DriftStatus + +logger = structlog.get_logger(__name__) + +_MAX_REPORTS = 200 # DB 最多保留筆數(定期清理) + + +def _report_to_row(report: DriftReport) -> dict: + """DriftReport → DB row dict""" + return { + "report_id": report.report_id, + "namespace": report.namespace, + "triggered_by": report.triggered_by, + "scanned_at": report.scanned_at, + "high_count": report.high_count, + "medium_count": report.medium_count, + "info_count": report.info_count, + "items": json.dumps([item.model_dump() for item in report.items]), + "interpretation": json.dumps(report.interpretation.model_dump()) if report.interpretation else None, + "status": report.status.value, + "created_at": report.created_at, + "resolved_at": report.resolved_at, + } + + +def _row_to_report(row) -> DriftReport: + """DB row → DriftReport""" + items = [] + for item_data in (row.items or []): + item_data["drift_level"] = DriftLevel(item_data.get("drift_level", "medium")) + items.append(DriftItem(**item_data)) + + interpretation = None + if row.interpretation: + d = row.interpretation + interpretation = DriftInterpretation( + intent=DriftIntent(d.get("intent", "unknown")), + explanation=d.get("explanation", ""), + risk=d.get("risk", "MEDIUM"), + confidence=d.get("confidence", 0.0), + ) + + return DriftReport( + report_id=row.report_id, + namespace=row.namespace, + triggered_by=row.triggered_by, + scanned_at=row.scanned_at, + high_count=row.high_count, + medium_count=row.medium_count, + info_count=row.info_count, + items=items, + interpretation=interpretation, + status=DriftStatus(row.status), + created_at=row.created_at, + resolved_at=row.resolved_at, + ) + + +class DriftReportRepository: + """drift_reports 表的 CRUD 操作""" + + async def save(self, report: DriftReport) -> None: + """新增或更新漂移報告(upsert)""" + row = _report_to_row(report) + async with get_db_context() as db: + await db.execute( + text(""" + INSERT INTO drift_reports + (report_id, namespace, triggered_by, scanned_at, + high_count, medium_count, info_count, + items, interpretation, status, created_at, resolved_at) + VALUES + (:report_id, :namespace, :triggered_by, :scanned_at, + :high_count, :medium_count, :info_count, + :items::jsonb, :interpretation::jsonb, :status, :created_at, :resolved_at) + ON CONFLICT (report_id) DO UPDATE SET + items = EXCLUDED.items, + interpretation = EXCLUDED.interpretation, + status = EXCLUDED.status, + resolved_at = EXCLUDED.resolved_at, + high_count = EXCLUDED.high_count, + medium_count = EXCLUDED.medium_count, + info_count = EXCLUDED.info_count + """), + row, + ) + logger.info("drift_report_saved", report_id=report.report_id, namespace=report.namespace) + + async def get(self, report_id: str) -> DriftReport | None: + """依 report_id 查詢""" + async with get_db_context() as db: + result = await db.execute( + text("SELECT * FROM drift_reports WHERE report_id = :report_id"), + {"report_id": report_id}, + ) + row = result.fetchone() + return _row_to_report(row) if row else None + + async def list_recent(self, limit: int = 50) -> list[DriftReport]: + """列出最近 N 筆(倒序)""" + async with get_db_context() as db: + result = await db.execute( + text("SELECT * FROM drift_reports ORDER BY created_at DESC LIMIT :limit"), + {"limit": limit}, + ) + rows = result.fetchall() + return [_row_to_report(r) for r in rows] + + async def update_status(self, report_id: str, status: DriftStatus, resolved_at: datetime | None = None) -> None: + """更新處理狀態""" + async with get_db_context() as db: + await db.execute( + text(""" + UPDATE drift_reports + SET status = :status, resolved_at = :resolved_at + WHERE report_id = :report_id + """), + {"report_id": report_id, "status": status.value, "resolved_at": resolved_at}, + ) + + async def update_interpretation(self, report_id: str, interpretation: DriftInterpretation) -> None: + """更新 Nemotron 意圖分析結果""" + async with get_db_context() as db: + await db.execute( + text(""" + UPDATE drift_reports + SET interpretation = :interpretation::jsonb + WHERE report_id = :report_id + """), + { + "report_id": report_id, + "interpretation": json.dumps(interpretation.model_dump()), + }, + ) + + +_drift_repo: DriftReportRepository | None = None + + +def get_drift_repository() -> DriftReportRepository: + global _drift_repo + if _drift_repo is None: + _drift_repo = DriftReportRepository() + return _drift_repo diff --git a/k8s/drift-cronjob.yaml b/k8s/drift-cronjob.yaml index 23877bd0..b5e33906 100644 --- a/k8s/drift-cronjob.yaml +++ b/k8s/drift-cronjob.yaml @@ -40,11 +40,13 @@ spec: component: drift-scanner spec: restartPolicy: Never - serviceAccountName: awoooi-api # 使用 API 的 ServiceAccount(有 kubectl 權限) + # 2026-04-09 Claude Sonnet 4.6: awoooi-api SA 不存在,改用 default(只需呼叫內部 API,不需 K8s 權限) + serviceAccountName: default containers: - name: drift-scanner # 使用 awoooi-api 鏡像(含 kubectl + Python 環境) - image: harbor.wooo.work/awoooi/api:latest + # 2026-04-09 Claude Sonnet 4.6: 改用內網 registry + 固定 SHA tag (禁止 latest) + image: 192.168.0.110:5000/awoooi/api:21567a7a6dbee7db2c0f59c265f80713ff5e6fe4 imagePullPolicy: Always command: - python @@ -58,8 +60,10 @@ spec: print(f"status={r.status_code} body={r.text[:200]}") asyncio.run(run()) env: + # 2026-04-09 Claude Sonnet 4.6: ClusterIP 和 DNS 在 Job Pod 均不可達 + # 改用 NodePort 直連 K3s worker node(同 K8s_API_SERVER_URL 解法) - name: INTERNAL_API_URL - value: "http://awoooi-api.awoooi-prod.svc.cluster.local:8000" + value: "http://192.168.0.121:32334" - name: DRIFT_SCAN_NAMESPACES value: "awoooi-prod" resources: diff --git a/k8s/jobs/migrate-phase8-symptoms-hash.yaml b/k8s/jobs/migrate-phase8-symptoms-hash.yaml new file mode 100644 index 00000000..fee8c944 --- /dev/null +++ b/k8s/jobs/migrate-phase8-symptoms-hash.yaml @@ -0,0 +1,84 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: migrate-phase8-symptoms-hash + namespace: awoooi-prod + labels: + app: awoooi-migration + phase: phase25 +spec: + ttlSecondsAfterFinished: 300 + backoffLimit: 1 + template: + spec: + restartPolicy: Never + containers: + - name: migrate + image: postgres:15-alpine + command: + - /bin/sh + - -c + - | + echo "==========================================" + echo "Phase 25 P1: knowledge_entries symptoms_hash Migration" + echo "==========================================" + + DB_HOST=$(echo $DATABASE_URL | sed 's/.*@\([^:]*\):.*/\1/') + DB_PORT=$(echo $DATABASE_URL | sed 's/.*:\([0-9]*\)\/.*/\1/') + DB_NAME=$(echo $DATABASE_URL | sed 's/.*\/\([^?]*\).*/\1/') + DB_USER=$(echo $DATABASE_URL | sed 's/.*\/\/\([^:]*\):.*/\1/') + DB_PASS=$(echo $DATABASE_URL | sed 's/.*:\/\/[^:]*:\([^@]*\)@.*/\1/') + + echo "Connecting to: $DB_HOST:$DB_PORT/$DB_NAME" + export PGPASSWORD="$DB_PASS" + + psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" <<'EOSQL' + -- Phase 25 P1: Anti-Pattern symptoms_hash 欄位 + -- 2026-04-09 Claude Sonnet 4.6 + + ALTER TABLE knowledge_entries + ADD COLUMN IF NOT EXISTS symptoms_hash VARCHAR(16); + + CREATE INDEX IF NOT EXISTS idx_knowledge_anti_pattern_hash + ON knowledge_entries (entry_type, symptoms_hash, created_at) + WHERE entry_type = 'anti_pattern' AND symptoms_hash IS NOT NULL; + + -- 自動處理 CHECK constraint(若有) + DO $$ + DECLARE + v_conname text; + BEGIN + SELECT conname INTO v_conname + FROM pg_constraint + WHERE conrelid = 'knowledge_entries'::regclass AND contype = 'c' AND conname LIKE '%status%'; + + IF v_conname IS NOT NULL THEN + EXECUTE format('ALTER TABLE knowledge_entries DROP CONSTRAINT %I', v_conname); + ALTER TABLE knowledge_entries ADD CONSTRAINT knowledge_entries_status_check + CHECK (status IN ('draft', 'review', 'approved', 'archived', 'published')); + RAISE NOTICE 'Updated status CHECK constraint: % → added published', v_conname; + ELSE + RAISE NOTICE 'No status CHECK constraint found, skipping'; + END IF; + END $$; + + -- 驗證 + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = 'knowledge_entries' + ORDER BY ordinal_position; + EOSQL + + echo "==========================================" + echo "Migration completed!" + echo "==========================================" + envFrom: + - secretRef: + name: awoooi-secrets + resources: + requests: + memory: "64Mi" + cpu: "100m" + limits: + memory: "128Mi" + cpu: "200m" diff --git a/k8s/jobs/migrate-phase9-drift-reports.yaml b/k8s/jobs/migrate-phase9-drift-reports.yaml new file mode 100644 index 00000000..e291ff1f --- /dev/null +++ b/k8s/jobs/migrate-phase9-drift-reports.yaml @@ -0,0 +1,83 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: migrate-phase9-drift-reports + namespace: awoooi-prod + labels: + app: awoooi-migration + phase: phase25 +spec: + ttlSecondsAfterFinished: 300 + backoffLimit: 1 + template: + spec: + restartPolicy: Never + containers: + - name: migrate + image: postgres:15-alpine + command: + - /bin/sh + - -c + - | + echo "==========================================" + echo "Phase 25 P2: drift_reports 資料表 Migration" + echo "==========================================" + + DB_HOST=$(echo $DATABASE_URL | sed 's/.*@\([^:]*\):.*/\1/') + DB_PORT=$(echo $DATABASE_URL | sed 's/.*:\([0-9]*\)\/.*/\1/') + DB_NAME=$(echo $DATABASE_URL | sed 's/.*\/\([^?]*\).*/\1/') + DB_USER=$(echo $DATABASE_URL | sed 's/.*\/\/\([^:]*\):.*/\1/') + DB_PASS=$(echo $DATABASE_URL | sed 's/.*:\/\/[^:]*:\([^@]*\)@.*/\1/') + + echo "Connecting to: $DB_HOST:$DB_PORT/$DB_NAME" + export PGPASSWORD="$DB_PASS" + + psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" <<'EOSQL' + -- Phase 25 P2: Config Drift Detection — drift_reports 資料表 + -- 2026-04-09 Claude Sonnet 4.6 + + CREATE TABLE IF NOT EXISTS drift_reports ( + report_id VARCHAR(32) PRIMARY KEY, + namespace VARCHAR(128) NOT NULL, + triggered_by VARCHAR(64) NOT NULL DEFAULT 'cron', + scanned_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + high_count INT NOT NULL DEFAULT 0, + medium_count INT NOT NULL DEFAULT 0, + info_count INT NOT NULL DEFAULT 0, + items JSONB NOT NULL DEFAULT '[]', + interpretation JSONB, + status VARCHAR(32) NOT NULL DEFAULT 'pending', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + resolved_at TIMESTAMPTZ + ); + + CREATE INDEX IF NOT EXISTS idx_drift_reports_namespace + ON drift_reports(namespace); + CREATE INDEX IF NOT EXISTS idx_drift_reports_status + ON drift_reports(status); + CREATE INDEX IF NOT EXISTS idx_drift_reports_created_at + ON drift_reports(created_at DESC); + CREATE INDEX IF NOT EXISTS idx_drift_reports_high_count + ON drift_reports(high_count) + WHERE high_count > 0; + + -- 驗證 + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_name = 'drift_reports' + ORDER BY ordinal_position; + EOSQL + + echo "==========================================" + echo "Migration completed!" + echo "==========================================" + envFrom: + - secretRef: + name: awoooi-secrets + resources: + requests: + memory: "64Mi" + cpu: "100m" + limits: + memory: "128Mi" + cpu: "200m"