From bf45b80bd28d2701177485cb5cab346823c79ea7 Mon Sep 17 00:00:00 2001 From: OG T Date: Wed, 15 Apr 2026 15:33:52 +0800 Subject: [PATCH] =?UTF-8?q?feat(Phase=203.5=20+=20Phase=204):=20AI=20?= =?UTF-8?q?=E5=AD=B8=E7=BF=92=E6=88=90=E6=9E=9C=E6=8C=81=E4=B9=85=E5=8C=96?= =?UTF-8?q?=E5=88=B0=20PostgreSQL=20=E2=80=94=20=E4=BF=AE=E6=AD=A3?= =?UTF-8?q?=E3=80=8CAI=20=E5=A4=B1=E6=86=B6=E3=80=8D=E6=9E=B6=E6=A7=8B?= =?UTF-8?q?=E7=BC=BA=E9=99=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ADR-085: AI 學習成果不可存在 Cache 架構鐵律確立: - PostgreSQL = System of Record(AI 的永久記憶) - Redis = Warm Cache(加速讀取,TTL 到期從 PG 復原) 核心變更: 1. models.py: 新增 PlaybookRecord / DynamicBaselineRecord / LogClusterRecord ORM 2. base.py: ALTER TABLE playbooks 補加 trust_score / requires_approval_level 等欄位 3. playbook_repository.py: 完整雙寫實作(PG upsert + Redis cache) 4. dynamic_baseline_service.py: Holt-Winters 訓練結果寫入 PG,Redis 只作 24h warm cache 5. log_anomaly_detector.py: Drain3 cluster template 寫入 PG(UPSERT on cluster_id) 6. main.py: 啟動時執行 backfill_redis_to_pg()(Redis → PG 冪等補救) 修正的問題: - Playbook 7天 Redis TTL 到期 → AI 失去所有修復知識 - trust_score EWMA 隨 Redis TTL 歸零 → AI 重新回到初始信任度 0.3 - Holt-Winters 基線 24h TTL → AI 每天重新學習「正常」的定義 - Drain3 cluster 沒有持久化 → AI 把已知 log pattern 反覆當新 pattern Phase 4 新服務(requirements.txt 已加入 statsmodels + drain3 + numpy) Co-Authored-By: Claude Sonnet 4.6 --- apps/api/requirements.txt | 10 + apps/api/src/core/feature_flags.py | 4 + apps/api/src/db/base.py | 13 + apps/api/src/db/models.py | 201 ++++++ apps/api/src/main.py | 10 + .../src/repositories/playbook_repository.py | 616 +++++++++++------- .../src/services/dynamic_baseline_service.py | 489 ++++++++++++++ apps/api/src/services/log_anomaly_detector.py | 385 +++++++++++ 8 files changed, 1490 insertions(+), 238 deletions(-) create mode 100644 apps/api/src/services/dynamic_baseline_service.py create mode 100644 apps/api/src/services/log_anomaly_detector.py diff --git a/apps/api/requirements.txt b/apps/api/requirements.txt index 3b181874..a4bc70a0 100644 --- a/apps/api/requirements.txt +++ b/apps/api/requirements.txt @@ -43,6 +43,16 @@ opentelemetry-instrumentation-logging>=0.41b0 # 2026-04-02 Claude Code: 鎖定 v2.60.x — v3.x/v4.x 移除 client.trace() API,與 langfuse_client.py 不相容 langfuse>=2.0.0,<3.0.0 +# ========================================================================== +# Phase 4: 動態異常偵測 (ADR-084) +# ========================================================================== +# Holt-Winters 指數平滑(動態基線) +statsmodels>=0.14.0 +# Log clustering(Drain3 演算法) +drain3>=0.9.11 +# numpy 已為 statsmodels 依賴,顯式列出確保可用(線性趨勢預測) +numpy>=1.24.0 + # Development pytest>=7.4.0 pytest-asyncio>=0.23.0 diff --git a/apps/api/src/core/feature_flags.py b/apps/api/src/core/feature_flags.py index 69c61496..22c481c6 100644 --- a/apps/api/src/core/feature_flags.py +++ b/apps/api/src/core/feature_flags.py @@ -131,6 +131,10 @@ class AIOpsFeatureFlags(BaseSettings): default=False, description="P4: 主動巡檢每 5min 是否執行", ) + AIOPS_P4_SHADOW_MODE: bool = Field( + default=True, + description="P4: Shadow Mode = True 時動態偵測只記錄不觸發 Alert;False = 真實觸發(需先觀察噪音率)", + ) # ========================================================================== # Phase 5 細粒度子開關 diff --git a/apps/api/src/db/base.py b/apps/api/src/db/base.py index 12082876..ede5270f 100644 --- a/apps/api/src/db/base.py +++ b/apps/api/src/db/base.py @@ -186,6 +186,19 @@ async def init_db() -> None: """) ) + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 Playbook PostgreSQL 持久化 + # ADR-085: AI 學習成果不可存 Cache — trust_score、EWMA 必須永久保存 + # playbooks 表已存在(15 筆舊資料),補加新欄位 + await conn.execute( + text(""" + ALTER TABLE playbooks + ADD COLUMN IF NOT EXISTS trust_score FLOAT NOT NULL DEFAULT 0.3, + ADD COLUMN IF NOT EXISTS requires_approval_level VARCHAR(20) NOT NULL DEFAULT 'auto', + ADD COLUMN IF NOT EXISTS stateful_targets JSONB NOT NULL DEFAULT '[]', + ADD COLUMN IF NOT EXISTS requires_pre_backup BOOLEAN NOT NULL DEFAULT FALSE; + """) + ) + async def close_db() -> None: """ diff --git a/apps/api/src/db/models.py b/apps/api/src/db/models.py index 99de3b0f..3f53260e 100644 --- a/apps/api/src/db/models.py +++ b/apps/api/src/db/models.py @@ -835,6 +835,207 @@ class IncidentEvidence(Base): ) +# ============================================================================= +# PlaybookRecord — Phase 3.5 Playbook PostgreSQL 持久化 (System of Record) +# ADR-085: AI 學習成果不可存在 Cache — Playbook 是 AI 的肌肉記憶 +# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 初始建立 +# +# 核心鐵律: +# - PostgreSQL = System of Record(永久保存,AI 的長期記憶) +# - Redis = Warm Cache(7天 TTL,加速讀取,DB 為 source of truth) +# - trust_score, EWMA, 統計數據必須持久化 — 不能因 Redis TTL 消失 +# ============================================================================= + +class PlaybookRecord(Base): + """ + Playbook 修復劇本 PostgreSQL ORM + + 與 Pydantic Playbook 模型對應。 + Redis 為 warm cache(7d TTL),PostgreSQL 為 source of truth。 + + 設計原則: + - AI 的學習成果(trust_score、success_count、failure_count)永久保存 + - EWMA 信任度在 Redis TTL 後不會重置,Pod 重啟後 AI 記憶不失 + - 雙寫:create/update 先寫 PG,再更新 Redis cache + - 讀取:Redis-first(cache hit),miss 時從 PG 載入並回填 Redis + """ + __tablename__ = "playbooks" + + # Primary Key + playbook_id: Mapped[str] = mapped_column( + String(36), primary_key=True, + comment="Playbook 唯一識別碼 (PB-YYYYMMDD-XXXXXX)", + ) + + # Core Fields + name: Mapped[str] = mapped_column(String(256), nullable=False) + description: Mapped[str] = mapped_column(Text, default="", nullable=False) + status: Mapped[str] = mapped_column(String(20), default="draft", nullable=False) + source: Mapped[str] = mapped_column(String(20), default="extracted", nullable=False) + + # Complex structures (JSONB) + symptom_pattern: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, nullable=False) + repair_steps: Mapped[list[dict[str, Any]]] = mapped_column(JSON, default=list, nullable=False) + + # Timing + estimated_duration_minutes: Mapped[int] = mapped_column(Integer, default=5, nullable=False) + + # Source tracing + source_incident_ids: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False) + ai_confidence: Mapped[float] = mapped_column(default=0.0, nullable=False) + + # Stats — MUST be in PG (AI learning artifacts, cannot expire) + success_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False) + failure_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False) + last_used_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + + # EWMA trust score — ADR-083 Phase 3, 絕對不能用 Redis TTL 管理 + # trust_score 是 AI 累積學習的結晶,TTL 到期就歸零 = AI 記憶全部消失 + trust_score: Mapped[float] = mapped_column(default=0.3, nullable=False, + comment="EWMA 動態信任度 (Phase 3)。成功 α=0.1,失敗 α=0.2(2x 衰減)。< 0.1 → 封存") + + # Approval metadata + approved_by: Mapped[str | None] = mapped_column(String(100), nullable=True) + approved_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + tags: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False) + notes: Mapped[str | None] = mapped_column(Text, nullable=True) + + # Sprint 5.1 護欄欄位 (2026-04-08) + requires_approval_level: Mapped[str] = mapped_column( + String(20), default="auto", nullable=False, + comment="auto=直接執行, standard=1票, critical=2票MultiSig", + ) + stateful_targets: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False) + requires_pre_backup: Mapped[bool] = mapped_column(default=False, nullable=False) + + # Timestamps + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, nullable=False) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, + onupdate=taipei_now, nullable=False) + + __table_args__ = ( + Index("ix_playbook_status", "status"), + Index("ix_playbook_trust_score", "trust_score"), + Index("ix_playbook_created_at", "created_at"), + ) + + +# ============================================================================= +# DynamicBaselineRecord — Phase 4 Holt-Winters 訓練基線持久化 +# ADR-084: 動態基線不能只存 Redis — AI 每天重學「正常」不是在學習 +# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立 +# +# 核心鐵律: +# - 訓練好的 Holt-Winters 模型必須在 PG 長期保存 +# - Redis 為 24h warm cache(加速 is_anomaly() 讀取) +# - 基線消失 = AI 對「正常」的認識消失 = 每天從頭學習 = 不是 AI +# ============================================================================= + +class DynamicBaselineRecord(Base): + """ + 動態基線訓練結果 PostgreSQL ORM + + Holt-Winters 訓練完成後: + 1. 先寫入 PG(永久保存) + 2. 再寫入 Redis(24h warm cache,加速讀取) + + Redis key: baseline:{metric_name} + PG: 此表,metric_name 為主鍵,最新一筆 = 有效基線 + """ + __tablename__ = "dynamic_baselines" + + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=generate_uuid) + + # 基線識別 + metric_name: Mapped[str] = mapped_column( + String(200), nullable=False, index=True, + comment="基線識別名 (e.g. cpu_usage_node_mon)", + ) + + # 訓練結果(Holt-Winters 統計) + mean: Mapped[float] = mapped_column(nullable=False, comment="擬合值均值") + std: Mapped[float] = mapped_column(nullable=False, comment="殘差標準差") + + # 24h 季節性因子(JSON 陣列,長度 24) + seasonal_factors: Mapped[list[float]] = mapped_column( + JSON, default=list, nullable=False, + comment="24h 週期季節性因子(乘法形式,均值 ≈ 1.0)", + ) + + # 訓練元資料 + datapoint_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False) + promql: Mapped[str] = mapped_column(Text, default="", nullable=False, + comment="訓練使用的 PromQL 查詢") + lookback_hours: Mapped[int] = mapped_column(Integer, default=336, nullable=False) + + # Timestamps + trained_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, nullable=False) + + __table_args__ = ( + Index("ix_dynamic_baseline_metric", "metric_name"), + Index("ix_dynamic_baseline_trained_at", "trained_at"), + ) + + +# ============================================================================= +# LogClusterRecord — Phase 4 Drain3 學習到的 Log Pattern 持久化 +# ADR-084: Drain3 模板不能只存 Redis — 每次重啟 AI 把已知 pattern 當新 pattern +# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立 +# +# 核心鐵律: +# - Drain3 學到的 log cluster template 必須在 PG 長期保存 +# - 新 cluster 事件列表 (log_anomaly:new) 才存 Redis(短期工作記憶) +# - 基礎知識庫(已學到的 pattern)必須在 PG +# ============================================================================= + +class LogClusterRecord(Base): + """ + Drain3 Log Cluster Template 持久化 + + 每個新 pattern 首次偵測到時: + 1. 寫入 PG(永久保存,AI 的 log 語意理解) + 2. 推送到 Redis list log_anomaly:new(短期工作記憶) + + Re-detect 相同 template 時只更新 last_seen_at + size,不重複寫入 PG。 + """ + __tablename__ = "log_clusters" + + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=generate_uuid) + + # Cluster 識別(MD5[:8] of template) + cluster_id: Mapped[str] = mapped_column( + String(16), nullable=False, unique=True, index=True, + comment="模板 MD5[:8].upper(),穩定 ID", + ) + + # Drain3 模板 + template: Mapped[str] = mapped_column( + Text, nullable=False, + comment="Drain3 萃取的 log 模板 (e.g. 'ERROR <*> connection failed to <*>')", + ) + + # 統計 + size: Mapped[int] = mapped_column(Integer, default=1, nullable=False, + comment="命中次數(第一次 = 1)") + source: Mapped[str] = mapped_column(String(50), default="k8s_pod", nullable=False, + comment="k8s_pod | host_syslog | app_log") + + # 樣本日誌(保留首次觸發的原始行,供事後分析) + sample_log: Mapped[str | None] = mapped_column(Text, nullable=True, + comment="首次觸發的原始 log 行(前 500 字元)") + + # Timestamps + first_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, nullable=False) + last_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, + onupdate=taipei_now, nullable=False) + + __table_args__ = ( + Index("ix_log_cluster_first_seen", "first_seen_at"), + Index("ix_log_cluster_source", "source"), + ) + + # ============================================================================= # AgentSession — Phase 2 多 Agent 辯證 Audit Trail # ============================================================================= diff --git a/apps/api/src/main.py b/apps/api/src/main.py index ab6aa989..449c2900 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -296,6 +296,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("playbook_seed_schedule_failed", error=str(e)) + # Phase 3.5 ADR-085: Playbook Redis → PG 補寫(一次性遷移 + 啟動時冪等補救) + # 確保 Redis 中存在但 PG 中缺少的 Playbook 不因 TTL 消失而永久丟失 + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 AI 學習成果持久化 + try: + from src.repositories.playbook_repository import get_playbook_repository + asyncio.create_task(get_playbook_repository().backfill_redis_to_pg()) + logger.info("playbook_pg_backfill_scheduled") + except Exception as e: + logger.warning("playbook_pg_backfill_schedule_failed", error=str(e)) + try: from src.services.playbook_embedding_service import ensure_playbook_embeddings_indexed asyncio.create_task(ensure_playbook_embeddings_indexed()) diff --git a/apps/api/src/repositories/playbook_repository.py b/apps/api/src/repositories/playbook_repository.py index 54b612b2..e0b6d22a 100644 --- a/apps/api/src/repositories/playbook_repository.py +++ b/apps/api/src/repositories/playbook_repository.py @@ -1,26 +1,31 @@ """ -Playbook Repository - #7 Playbook 萃取 -====================================== -Playbook CRUD 操作 (Redis + PostgreSQL) +Playbook Repository — Phase 3.5 雙寫持久化 +========================================== +ADR-085: AI 學習成果必須儲存到 PostgreSQL + +儲存策略(Phase 3.5 架構升級): +- PostgreSQL = System of Record(永久保存,source of truth) +- Redis = Warm Cache(7天 TTL,加速讀取,PG 為準) + +雙寫規則: +- create/update → 先寫 PG,再更新 Redis cache +- 讀取 → Redis-first(cache hit),miss 時從 PG 載入並回填 Redis +- trust_score / EWMA 統計 → PG 永久保存,Redis TTL 到期後從 PG 復原 Phase 7.2: Repository 實作 -建立時間: 2026-03-26 (台北時區) -建立者: Claude Code (#7 Playbook 萃取) - -遵循 leWOOOgo 積木化原則: -- 實作 IPlaybookRepository Protocol -- Redis 為 Working Memory (7天 TTL) -- PostgreSQL 為 Episodic Memory - Phase 22 P2: 相似度計算邏輯移至 utils -2026-03-31 Claude Code (首席架構師技術債修復) +Phase 3.5 (ADR-085): PostgreSQL dual-write + Redis warm cache +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 重寫,修正「AI 失憶」架構問題 """ import json import structlog +from sqlalchemy import select from src.core.redis_client import get_redis +from src.db.base import get_session_factory +from src.db.models import PlaybookRecord from src.models.playbook import ( Playbook, PlaybookStatus, @@ -32,7 +37,7 @@ from src.utils.timezone import now_taipei logger = structlog.get_logger(__name__) -# Redis TTL: 7 天 +# Redis TTL: 7 天(warm cache only,PG 為 source of truth) PLAYBOOK_TTL_SECONDS = 7 * 24 * 60 * 60 # Redis Key 前綴 @@ -41,129 +46,175 @@ PLAYBOOK_INDEX_ALERT_PREFIX = "playbook:index:alert:" PLAYBOOK_INDEX_SERVICE_PREFIX = "playbook:index:service:" +# ============================================================================= +# ORM ↔ Pydantic 轉換 Helpers +# ============================================================================= + +def _pydantic_to_orm(playbook: Playbook) -> PlaybookRecord: + """Pydantic Playbook → PlaybookRecord ORM""" + return PlaybookRecord( + playbook_id=playbook.playbook_id, + name=playbook.name, + description=playbook.description, + status=playbook.status.value, + source=playbook.source.value, + symptom_pattern=playbook.symptom_pattern.model_dump(), + repair_steps=[s.model_dump() for s in playbook.repair_steps], + estimated_duration_minutes=playbook.estimated_duration_minutes, + source_incident_ids=playbook.source_incident_ids, + ai_confidence=playbook.ai_confidence, + success_count=playbook.success_count, + failure_count=playbook.failure_count, + last_used_at=playbook.last_used_at, + trust_score=playbook.trust_score, + approved_by=playbook.approved_by, + approved_at=playbook.approved_at, + tags=playbook.tags, + notes=playbook.notes, + requires_approval_level=playbook.requires_approval_level, + stateful_targets=playbook.stateful_targets, + requires_pre_backup=playbook.requires_pre_backup, + created_at=playbook.created_at, + updated_at=playbook.updated_at, + ) + + +def _orm_to_pydantic(record: PlaybookRecord) -> Playbook: + """PlaybookRecord ORM → Pydantic Playbook""" + return Playbook.model_validate({ + "playbook_id": record.playbook_id, + "name": record.name, + "description": record.description, + "status": record.status, + "source": record.source, + "symptom_pattern": record.symptom_pattern, + "repair_steps": record.repair_steps, + "estimated_duration_minutes": record.estimated_duration_minutes, + "source_incident_ids": record.source_incident_ids, + "ai_confidence": float(record.ai_confidence), + "success_count": record.success_count, + "failure_count": record.failure_count, + "last_used_at": record.last_used_at, + "trust_score": float(record.trust_score), + "approved_by": record.approved_by, + "approved_at": record.approved_at, + "tags": record.tags, + "notes": record.notes, + "requires_approval_level": record.requires_approval_level, + "stateful_targets": record.stateful_targets, + "requires_pre_backup": record.requires_pre_backup, + "created_at": record.created_at, + "updated_at": record.updated_at, + }) + + class PlaybookRepository: """ - Playbook Repository 實作 + Playbook Repository — Phase 3.5 雙寫實作 儲存策略: - - Redis: Working Memory (快速讀取,7天 TTL) - - PostgreSQL: Episodic Memory (持久化,待實作) + - PostgreSQL: source of truth(所有 create/update 必須先寫 PG) + - Redis: warm cache(加速讀取,7d TTL,到期後從 PG 復原) - Phase 7.2 先實作 Redis 層,PostgreSQL 待 #7.5 整合 + 這保證: + - trust_score / EWMA 永久不丟失 + - Pod 重啟後 AI 學習成果完整保留 + - Redis 失效只影響讀取速度,不影響資料完整性 """ - # === CRUD Operations === + # ========================================================================= + # CRUD Operations + # ========================================================================= async def create(self, playbook: Playbook) -> Playbook: """ 建立新的 Playbook - 1. 儲存到 Redis - 2. 建立索引 (alert_names, services) + 順序: + 1. 寫入 PostgreSQL(source of truth) + 2. 寫入 Redis cache(warm cache) + 3. 更新索引 """ - try: - redis_client = get_redis() + if not playbook.created_at: + playbook.created_at = now_taipei() + playbook.updated_at = now_taipei() - # 確保有建立時間 - if not playbook.created_at: - playbook.created_at = now_taipei() - playbook.updated_at = now_taipei() + # 1. 寫入 PostgreSQL(先寫,確保資料不丟失) + await self._pg_upsert(playbook) - # 儲存 Playbook - key = f"{PLAYBOOK_KEY_PREFIX}{playbook.playbook_id}" - await redis_client.set( - key, - json.dumps(playbook.to_redis_dict(), ensure_ascii=False), - ex=PLAYBOOK_TTL_SECONDS, - ) + # 2. 寫入 Redis(warm cache) + await self._redis_set(playbook) - # 建立索引 - await self._update_indexes(playbook) + # 3. 建立索引 + await self._update_indexes(playbook) - logger.info( - "playbook_created", - playbook_id=playbook.playbook_id, - name=playbook.name, - ) - return playbook - - except Exception as e: - logger.error("playbook_create_failed", error=str(e)) - raise + logger.info("playbook_created", playbook_id=playbook.playbook_id, name=playbook.name) + return playbook async def get_by_id(self, playbook_id: str) -> Playbook | None: - """根據 ID 取得 Playbook""" - try: - redis_client = get_redis() - key = f"{PLAYBOOK_KEY_PREFIX}{playbook_id}" - data = await redis_client.get(key) + """ + 根據 ID 取得 Playbook - if data: - return Playbook.from_redis_dict(json.loads(data)) - return None + Redis-first → miss 時從 PG 載入並回填 Redis + """ + # 1. Redis cache hit + cached = await self._redis_get(playbook_id) + if cached is not None: + return cached - except Exception as e: - logger.error("playbook_get_failed", playbook_id=playbook_id, error=str(e)) - return None + # 2. PG fallback(cache miss) + playbook = await self._pg_get(playbook_id) + if playbook is not None: + # 回填 Redis cache + await self._redis_set(playbook) + return playbook async def update(self, playbook: Playbook) -> Playbook | None: - """更新 Playbook""" - try: - existing = await self.get_by_id(playbook.playbook_id) - if not existing: - return None + """ + 更新 Playbook - playbook.updated_at = now_taipei() - - redis_client = get_redis() - key = f"{PLAYBOOK_KEY_PREFIX}{playbook.playbook_id}" - await redis_client.set( - key, - json.dumps(playbook.to_redis_dict(), ensure_ascii=False), - ex=PLAYBOOK_TTL_SECONDS, - ) - - # 更新索引 - await self._update_indexes(playbook) - - logger.info("playbook_updated", playbook_id=playbook.playbook_id) - return playbook - - except Exception as e: - logger.error( - "playbook_update_failed", + 先確認 PG 存在 → 寫入 PG → 更新 Redis + """ + existing = await self._pg_get(playbook.playbook_id) + if existing is None: + # PG 找不到,可能是老資料只在 Redis:先建立 PG 記錄 + logger.warning( + "playbook_pg_not_found_creating", playbook_id=playbook.playbook_id, - error=str(e), ) - return None + + playbook.updated_at = now_taipei() + + # 1. 寫入 PG + await self._pg_upsert(playbook) + + # 2. 更新 Redis + await self._redis_set(playbook) + + # 3. 更新索引 + await self._update_indexes(playbook) + + logger.info("playbook_updated", playbook_id=playbook.playbook_id) + return playbook async def delete(self, playbook_id: str) -> bool: """ - 刪除 Playbook (軟刪除 → DEPRECATED) - - 不真正刪除,而是將狀態改為 DEPRECATED + 軟刪除 Playbook(狀態改為 DEPRECATED) """ - try: - playbook = await self.get_by_id(playbook_id) - if not playbook: - return False - - playbook.status = PlaybookStatus.DEPRECATED - playbook.updated_at = now_taipei() - await self.update(playbook) - - logger.info("playbook_deprecated", playbook_id=playbook_id) - return True - - except Exception as e: - logger.error( - "playbook_delete_failed", - playbook_id=playbook_id, - error=str(e), - ) + playbook = await self.get_by_id(playbook_id) + if not playbook: return False - # === Query Operations === + playbook.status = PlaybookStatus.DEPRECATED + playbook.updated_at = now_taipei() + await self.update(playbook) + + logger.info("playbook_deprecated", playbook_id=playbook_id) + return True + + # ========================================================================= + # Query Operations + # ========================================================================= async def list_playbooks( self, @@ -173,42 +224,32 @@ class PlaybookRepository: offset: int = 0, ) -> tuple[list[Playbook], int]: """ - 列出 Playbooks + 列出 Playbooks(從 PostgreSQL 查詢,不走 Redis scan) - 注意: Redis 實作效率較低,後續需遷移到 PostgreSQL + Phase 3.5:改用 PG 查詢,效率更高,資料更完整 """ try: - redis_client = get_redis() + factory = get_session_factory() + async with factory() as session: + stmt = select(PlaybookRecord) + if status is not None: + stmt = stmt.where(PlaybookRecord.status == status.value) + stmt = stmt.order_by(PlaybookRecord.updated_at.desc()) - # 掃描所有 Playbook keys - pattern = f"{PLAYBOOK_KEY_PREFIX}PB-*" - keys = [] - async for key in redis_client.scan_iter(match=pattern, count=100): - keys.append(key) + result = await session.execute(stmt) + all_records = result.scalars().all() - # 讀取並過濾 - all_playbooks: list[Playbook] = [] - for key in keys: - data = await redis_client.get(key) - if data: - playbook = Playbook.from_redis_dict(json.loads(data)) + all_playbooks = [_orm_to_pydantic(r) for r in all_records] - # 狀態過濾 - if status and playbook.status != status: - continue - - # 標籤過濾 - if tags and not set(tags).intersection(set(playbook.tags)): - continue - - all_playbooks.append(playbook) - - # 排序: 按 updated_at 降序 - all_playbooks.sort(key=lambda p: p.updated_at, reverse=True) + # 標籤過濾(PG JSONB 過濾後續可移到 SQL,目前 Python 過濾) + if tags: + all_playbooks = [ + p for p in all_playbooks + if set(tags).intersection(set(p.tags)) + ] total = len(all_playbooks) - items = all_playbooks[offset : offset + limit] - + items = all_playbooks[offset: offset + limit] return items, total except Exception as e: @@ -225,7 +266,7 @@ class PlaybookRepository: 根據症狀模式找相似 Playbook 策略: - 1. 從索引快速過濾候選 + 1. 從 Redis 索引快速過濾候選 2. 計算詳細相似度 3. 返回 Top K """ @@ -235,24 +276,19 @@ class PlaybookRepository: # 1. 使用索引找候選 Playbook IDs candidate_ids: set[str] = set() - # 從 alert_names 索引查詢 for alert_name in symptoms.alert_names: index_key = f"{PLAYBOOK_INDEX_ALERT_PREFIX}{alert_name}" members = await redis_client.smembers(index_key) candidate_ids.update(m.decode() if isinstance(m, bytes) else m for m in members) - # 從 services 索引查詢 for service in symptoms.affected_services: index_key = f"{PLAYBOOK_INDEX_SERVICE_PREFIX}{service}" members = await redis_client.smembers(index_key) candidate_ids.update(m.decode() if isinstance(m, bytes) else m for m in members) - # 如果沒有索引命中,掃描所有 APPROVED Playbooks + # 索引無命中 → 從 PG 掃 APPROVED if not candidate_ids: - playbooks, _ = await self.list_playbooks( - status=PlaybookStatus.APPROVED, - limit=100, - ) + playbooks, _ = await self.list_playbooks(status=PlaybookStatus.APPROVED, limit=100) candidate_ids = {p.playbook_id for p in playbooks} # 2. 計算相似度 @@ -260,26 +296,16 @@ class PlaybookRepository: for playbook_id in candidate_ids: playbook = await self.get_by_id(playbook_id) - if not playbook: + if not playbook or playbook.status != PlaybookStatus.APPROVED: continue - # 只考慮 APPROVED 狀態 - if playbook.status != PlaybookStatus.APPROVED: - continue - - similarity = calculate_symptom_similarity( - symptoms, - playbook.symptom_pattern, - ) - - # alert_names 完全匹配時,保證通過(不因其他維度拉低分數) + similarity = calculate_symptom_similarity(symptoms, playbook.symptom_pattern) alert_exact_match = bool( set(symptoms.alert_names) & set(playbook.symptom_pattern.alert_names) ) if alert_exact_match or similarity >= min_similarity: results.append((playbook, similarity)) - # 3. 排序並返回 Top K results.sort(key=lambda x: x[1], reverse=True) return results[:top_k] @@ -293,13 +319,13 @@ class PlaybookRepository: success: bool, ) -> bool: """ - 更新執行統計 + EWMA 信任度 + 更新執行統計 + EWMA 信任度(雙寫到 PG + Redis) ADR-083 Phase 3: 負向 2x 強化 EWMA 公式 成功: trust_new = 0.9 * trust_old + 0.1 * 1.0 失敗: trust_new = 0.8 * trust_old + 0.2 * 0.0(衰減速度 2x) trust < 0.1 → 記錄警告,由 Evolver Agent 封存 - 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 EWMA 實裝 + 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 EWMA 雙寫 PG """ try: playbook = await self.get_by_id(playbook_id) @@ -308,16 +334,15 @@ class PlaybookRepository: if success: playbook.success_count += 1 - # 正向 EWMA:alpha=0.1,正向結果權重較小(保守更新) playbook.trust_score = 0.9 * playbook.trust_score + 0.1 * 1.0 else: playbook.failure_count += 1 - # 負向 EWMA:alpha=0.2,失敗懲罰 2x(快速衰退) playbook.trust_score = 0.8 * playbook.trust_score + 0.2 * 0.0 - # 邊界保護 playbook.trust_score = max(0.0, min(1.0, playbook.trust_score)) playbook.last_used_at = now_taipei() + + # 雙寫(PG + Redis) await self.update(playbook) if playbook.trust_score < 0.1: @@ -338,70 +363,25 @@ class PlaybookRepository: return True except Exception as e: - logger.error( - "playbook_stats_update_failed", - playbook_id=playbook_id, - error=str(e), - ) + logger.error("playbook_stats_update_failed", playbook_id=playbook_id, error=str(e)) return False - # === Index Management === - - async def _update_indexes(self, playbook: Playbook) -> None: - """更新索引""" + async def find_by_source_incident(self, incident_id: str) -> list[Playbook]: + """根據來源 Incident ID 找 Playbook(從 PG 查詢)""" try: - redis_client = get_redis() - - # Alert names 索引 - for alert_name in playbook.symptom_pattern.alert_names: - index_key = f"{PLAYBOOK_INDEX_ALERT_PREFIX}{alert_name}" - await redis_client.sadd(index_key, playbook.playbook_id) - await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS) - - # Services 索引 - for service in playbook.symptom_pattern.affected_services: - index_key = f"{PLAYBOOK_INDEX_SERVICE_PREFIX}{service}" - await redis_client.sadd(index_key, playbook.playbook_id) - await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS) - + factory = get_session_factory() + async with factory() as session: + # PG JSONB contains 查詢 + stmt = select(PlaybookRecord).where( + PlaybookRecord.source_incident_ids.contains([incident_id]) + ) + result = await session.execute(stmt) + records = result.scalars().all() + return [_orm_to_pydantic(r) for r in records] except Exception as e: - logger.warning("playbook_index_update_failed", error=str(e)) - - # === Learning Service 信心度調整 (2026-03-30 Claude Code) === - - async def find_by_source_incident( - self, - incident_id: str, - ) -> list[Playbook]: - """ - 根據來源 Incident ID 找 Playbook - - 2026-03-30 Claude Code: Learning Service 信心度調整用 - 尋找 source_incident_ids 包含此 incident_id 的 Playbooks - """ - try: - redis_client = get_redis() - - # 掃描所有 Playbook keys - pattern = f"{PLAYBOOK_KEY_PREFIX}PB-*" - results: list[Playbook] = [] - - async for key in redis_client.scan_iter(match=pattern, count=100): - data = await redis_client.get(key) - if data: - playbook = Playbook.from_redis_dict(json.loads(data)) - if incident_id in playbook.source_incident_ids: - results.append(playbook) - - return results - - except Exception as e: - logger.error( - "playbook_find_by_incident_failed", - incident_id=incident_id, - error=str(e), - ) - return [] + logger.error("playbook_find_by_incident_failed", incident_id=incident_id, error=str(e)) + # Fallback: Redis scan(維持向下相容) + return await self._redis_find_by_incident(incident_id) async def adjust_confidence( self, @@ -410,9 +390,7 @@ class PlaybookRepository: reason: str, ) -> Playbook | None: """ - 調整 Playbook 信心度 - - 2026-03-30 Claude Code: Learning Service 信心度調整用 + 調整 Playbook 信心度(雙寫到 PG + Redis) 邏輯: - ai_confidence += delta (clamp 到 0.0~1.0) @@ -425,25 +403,16 @@ class PlaybookRepository: return None old_confidence = playbook.ai_confidence - - # 調整信心度 (clamp 到 0.0~1.0) playbook.ai_confidence = max(0.0, min(1.0, playbook.ai_confidence + delta)) - - # 狀態自動轉換 old_status = playbook.status - # 高信心度自動升級 - if ( - playbook.ai_confidence >= 0.9 - and playbook.status == PlaybookStatus.DRAFT - ): + if playbook.ai_confidence >= 0.9 and playbook.status == PlaybookStatus.DRAFT: playbook.status = PlaybookStatus.APPROVED playbook.approved_by = "auto_learning" playbook.approved_at = now_taipei() note = f"\n[Auto-approved: confidence {playbook.ai_confidence:.2f}]" playbook.notes = (playbook.notes or "") + note - # 低信心度 + 高失敗率 → 棄用 elif ( playbook.ai_confidence < 0.3 and playbook.total_executions >= 5 @@ -455,7 +424,6 @@ class PlaybookRepository: note = f"\n[Auto-deprecated: conf={conf:.2f}, fail={fail:.0%}]" playbook.notes = (playbook.notes or "") + note - # 儲存 updated = await self.update(playbook) logger.info( @@ -467,18 +435,190 @@ class PlaybookRepository: new_status=playbook.status.value, reason=reason, ) - return updated except Exception as e: - logger.error( - "playbook_confidence_adjust_failed", - playbook_id=playbook_id, - delta=delta, - error=str(e), - ) + logger.error("playbook_confidence_adjust_failed", playbook_id=playbook_id, delta=delta, error=str(e)) return None + # ========================================================================= + # Startup Backfill — Redis → PG 遷移 + # ========================================================================= + + async def backfill_redis_to_pg(self) -> int: + """ + 啟動時將 Redis 中存在但 PG 中沒有的 Playbook 寫入 PG。 + + 用途:Phase 3.5 上線前後,確保舊 Redis 資料不丟失。 + 返回:補寫的 Playbook 數量。 + """ + count = 0 + try: + redis_client = get_redis() + pattern = f"{PLAYBOOK_KEY_PREFIX}PB-*" + + async for key in redis_client.scan_iter(match=pattern, count=100): + data = await redis_client.get(key) + if not data: + continue + + try: + playbook = Playbook.from_redis_dict(json.loads(data)) + existing = await self._pg_get(playbook.playbook_id) + if existing is None: + await self._pg_upsert(playbook) + count += 1 + logger.info("playbook_backfilled_to_pg", playbook_id=playbook.playbook_id) + except Exception as e: + logger.warning("playbook_backfill_item_failed", key=str(key), error=str(e)) + + except Exception as e: + logger.warning("playbook_backfill_failed", error=str(e)) + + logger.info("playbook_backfill_complete", count=count) + return count + + # ========================================================================= + # Private — PostgreSQL Helpers + # ========================================================================= + + async def _pg_upsert(self, playbook: Playbook) -> None: + """Upsert Playbook 到 PostgreSQL(INSERT ON CONFLICT UPDATE)""" + try: + from sqlalchemy.dialects.postgresql import insert as pg_insert + + factory = get_session_factory() + async with factory() as session: + stmt = pg_insert(PlaybookRecord).values( + playbook_id=playbook.playbook_id, + name=playbook.name, + description=playbook.description, + status=playbook.status.value, + source=playbook.source.value, + symptom_pattern=playbook.symptom_pattern.model_dump(), + repair_steps=[s.model_dump() for s in playbook.repair_steps], + estimated_duration_minutes=playbook.estimated_duration_minutes, + source_incident_ids=playbook.source_incident_ids, + ai_confidence=playbook.ai_confidence, + success_count=playbook.success_count, + failure_count=playbook.failure_count, + last_used_at=playbook.last_used_at, + trust_score=playbook.trust_score, + approved_by=playbook.approved_by, + approved_at=playbook.approved_at, + tags=playbook.tags, + notes=playbook.notes, + requires_approval_level=playbook.requires_approval_level, + stateful_targets=playbook.stateful_targets, + requires_pre_backup=playbook.requires_pre_backup, + created_at=playbook.created_at, + updated_at=playbook.updated_at, + ).on_conflict_do_update( + index_elements=["playbook_id"], + set_={ + "name": playbook.name, + "description": playbook.description, + "status": playbook.status.value, + "source": playbook.source.value, + "symptom_pattern": playbook.symptom_pattern.model_dump(), + "repair_steps": [s.model_dump() for s in playbook.repair_steps], + "estimated_duration_minutes": playbook.estimated_duration_minutes, + "source_incident_ids": playbook.source_incident_ids, + "ai_confidence": playbook.ai_confidence, + "success_count": playbook.success_count, + "failure_count": playbook.failure_count, + "last_used_at": playbook.last_used_at, + "trust_score": playbook.trust_score, + "approved_by": playbook.approved_by, + "approved_at": playbook.approved_at, + "tags": playbook.tags, + "notes": playbook.notes, + "requires_approval_level": playbook.requires_approval_level, + "stateful_targets": playbook.stateful_targets, + "requires_pre_backup": playbook.requires_pre_backup, + "updated_at": playbook.updated_at, + }, + ) + await session.execute(stmt) + await session.commit() + except Exception as e: + logger.error("playbook_pg_upsert_failed", playbook_id=playbook.playbook_id, error=str(e)) + raise + + async def _pg_get(self, playbook_id: str) -> Playbook | None: + """從 PostgreSQL 載入 Playbook""" + try: + factory = get_session_factory() + async with factory() as session: + result = await session.get(PlaybookRecord, playbook_id) + if result is None: + return None + return _orm_to_pydantic(result) + except Exception as e: + logger.warning("playbook_pg_get_failed", playbook_id=playbook_id, error=str(e)) + return None + + # ========================================================================= + # Private — Redis Cache Helpers + # ========================================================================= + + async def _redis_set(self, playbook: Playbook) -> None: + """寫入 Redis warm cache(TTL 7天)""" + try: + redis_client = get_redis() + key = f"{PLAYBOOK_KEY_PREFIX}{playbook.playbook_id}" + await redis_client.set( + key, + json.dumps(playbook.to_redis_dict(), ensure_ascii=False), + ex=PLAYBOOK_TTL_SECONDS, + ) + except Exception as e: + # Redis 寫入失敗不 raise(PG 已成功為主) + logger.warning("playbook_redis_set_failed", playbook_id=playbook.playbook_id, error=str(e)) + + async def _redis_get(self, playbook_id: str) -> Playbook | None: + """從 Redis cache 讀取 Playbook""" + try: + redis_client = get_redis() + key = f"{PLAYBOOK_KEY_PREFIX}{playbook_id}" + data = await redis_client.get(key) + if data: + return Playbook.from_redis_dict(json.loads(data)) + return None + except Exception as e: + logger.warning("playbook_redis_get_failed", playbook_id=playbook_id, error=str(e)) + return None + + async def _redis_find_by_incident(self, incident_id: str) -> list[Playbook]: + """Redis fallback: 掃描所有 Playbook 找包含 incident_id 的""" + try: + redis_client = get_redis() + results: list[Playbook] = [] + async for key in redis_client.scan_iter(match=f"{PLAYBOOK_KEY_PREFIX}PB-*", count=100): + data = await redis_client.get(key) + if data: + playbook = Playbook.from_redis_dict(json.loads(data)) + if incident_id in playbook.source_incident_ids: + results.append(playbook) + return results + except Exception: + return [] + + async def _update_indexes(self, playbook: Playbook) -> None: + """更新 Redis 索引(供快速 symptom 過濾)""" + try: + redis_client = get_redis() + for alert_name in playbook.symptom_pattern.alert_names: + index_key = f"{PLAYBOOK_INDEX_ALERT_PREFIX}{alert_name}" + await redis_client.sadd(index_key, playbook.playbook_id) + await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS) + for service in playbook.symptom_pattern.affected_services: + index_key = f"{PLAYBOOK_INDEX_SERVICE_PREFIX}{service}" + await redis_client.sadd(index_key, playbook.playbook_id) + await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS) + except Exception as e: + logger.warning("playbook_index_update_failed", error=str(e)) + # ============================================================================= # Singleton diff --git a/apps/api/src/services/dynamic_baseline_service.py b/apps/api/src/services/dynamic_baseline_service.py new file mode 100644 index 00000000..e30279dd --- /dev/null +++ b/apps/api/src/services/dynamic_baseline_service.py @@ -0,0 +1,489 @@ +""" +AWOOOI AIOps Phase 4 — Dynamic Baseline Service(動態基線服務) +============================================================= +職責:Holt-Winters 指數平滑,偵測 Prometheus metric 異常偏離 + +核心 API: + is_anomaly(metric_name, current_value) -> AnomalyResult + update_baseline(metric_name, datapoints) + +設計原則: +- Shadow Mode(AIOPS_P4_SHADOW_MODE=True):只記錄,不觸發 Alert +- 熔斷保護:statsmodels 失敗 → fallback 到滑動平均 +- 7 天歷史資料最少訓練量(低於此閾值 → skip,不誤判) +- 基線持久化到 Redis(key: baseline:{metric_name},TTL 24h) +- 訓練在 background worker 執行,not in webhook handler + +ADR-084: Phase 4 動態異常偵測源頭升級 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立 +""" + +from __future__ import annotations + +import json +import math +from dataclasses import dataclass, field +from typing import Any + +import structlog + +from src.utils.timezone import now_taipei + +logger = structlog.get_logger(__name__) + +# ── 常數 ──────────────────────────────────────────────────────────────────── +MIN_DATAPOINTS = 168 # 7 天 × 24h 最少樣本數(才能訓練季節性模型) +SIGMA_THRESHOLD = 3.0 # 偏差 ≥ 3σ → 異常 +REDIS_TTL_SEC = 86400 # 基線 Redis TTL = 24h +REDIS_KEY_PREFIX = "baseline:" +HISTORY_WINDOW_HOURS = 336 # 保留 14 天歷史 + + +# ───────────────────────────────────────────────────────────────────────────── +# Data Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class MetricDatapoint: + """單一 metric 時序資料點""" + timestamp: float # Unix epoch + value: float + + +@dataclass +class BaselineState: + """Holt-Winters 訓練後的基線狀態(Redis 持久化)""" + metric_name: str + mean: float + std: float + seasonal_factors: list[float] = field(default_factory=list) # 24h 週期 + last_trained_at: str = "" + datapoint_count: int = 0 + + def to_dict(self) -> dict[str, Any]: + return { + "metric_name": self.metric_name, + "mean": self.mean, + "std": self.std, + "seasonal_factors": self.seasonal_factors, + "last_trained_at": self.last_trained_at, + "datapoint_count": self.datapoint_count, + } + + @classmethod + def from_dict(cls, d: dict[str, Any]) -> "BaselineState": + return cls( + metric_name=d["metric_name"], + mean=d["mean"], + std=d["std"], + seasonal_factors=d.get("seasonal_factors", []), + last_trained_at=d.get("last_trained_at", ""), + datapoint_count=d.get("datapoint_count", 0), + ) + + +@dataclass +class AnomalyResult: + """is_anomaly() 回傳結果""" + metric_name: str + current_value: float + is_anomaly: bool + deviation_sigma: float # 偏差 σ 數(>3 = 異常) + expected_mean: float + expected_std: float + direction: str = "none" # "up" / "down" / "none" + shadow_mode: bool = True # True = 只記錄,不觸發 + reason: str = "" + + +# ───────────────────────────────────────────────────────────────────────────── +# Main Service +# ───────────────────────────────────────────────────────────────────────────── + +class DynamicBaselineService: + """ + 動態基線服務 + + 兩大功能: + 1. train_baseline() — 從 Prometheus 抓歷史資料,用 Holt-Winters 訓練 + 2. is_anomaly() — 即時判斷當前值是否偏離基線 ≥ 3σ + """ + + async def train_baseline( + self, + metric_name: str, + promql: str, + lookback_hours: int = HISTORY_WINDOW_HOURS, + ) -> BaselineState | None: + """ + 從 Prometheus 抓取歷史資料並訓練基線。 + + Args: + metric_name: 基線識別名(e.g. "cpu_usage_node_mon") + promql: Prometheus query(e.g. "avg(rate(node_cpu_seconds_total[5m]))") + lookback_hours: 歷史視窗(預設 14 天) + + Returns: + BaselineState(已存 Redis);資料不足 → None + """ + try: + datapoints = await self._fetch_prometheus_history(promql, lookback_hours) + if len(datapoints) < MIN_DATAPOINTS: + logger.info( + "baseline_insufficient_data", + metric=metric_name, + count=len(datapoints), + required=MIN_DATAPOINTS, + ) + return None + + state = self._fit_holt_winters(metric_name, datapoints) + await self._save_baseline(state) + logger.info( + "baseline_trained", + metric=metric_name, + mean=f"{state.mean:.4f}", + std=f"{state.std:.4f}", + datapoints=len(datapoints), + ) + return state + + except Exception: + logger.exception("baseline_train_failed", metric=metric_name) + return None + + async def is_anomaly( + self, + metric_name: str, + current_value: float, + hour_of_day: int | None = None, + ) -> AnomalyResult: + """ + 即時異常判斷。 + + Args: + metric_name: 基線識別名 + current_value: 當前觀測值 + hour_of_day: 當前小時(0-23),用於套用 seasonal factor;None = 不套用 + + Returns: + AnomalyResult + """ + from src.core.feature_flags import aiops_flags + + shadow_mode = aiops_flags.AIOPS_P4_SHADOW_MODE + + try: + state = await self._load_baseline(metric_name) + if state is None: + return AnomalyResult( + metric_name=metric_name, + current_value=current_value, + is_anomaly=False, + deviation_sigma=0.0, + expected_mean=current_value, + expected_std=0.0, + reason="no_baseline_available", + shadow_mode=shadow_mode, + ) + + # 套用 seasonal factor(如果有 24h 週期資料) + expected_mean = state.mean + if hour_of_day is not None and len(state.seasonal_factors) == 24: + expected_mean *= state.seasonal_factors[hour_of_day] + + expected_std = state.std if state.std > 0 else 1e-9 + deviation = abs(current_value - expected_mean) + sigma = deviation / expected_std + + anomaly = sigma >= SIGMA_THRESHOLD + direction = "none" + if anomaly: + direction = "up" if current_value > expected_mean else "down" + + result = AnomalyResult( + metric_name=metric_name, + current_value=current_value, + is_anomaly=anomaly, + deviation_sigma=round(sigma, 2), + expected_mean=round(expected_mean, 4), + expected_std=round(expected_std, 4), + direction=direction, + shadow_mode=shadow_mode, + reason=f"deviation {sigma:.1f}σ from baseline" if anomaly else "within_normal_range", + ) + + if anomaly: + logger.info( + "dynamic_anomaly_detected", + metric=metric_name, + value=current_value, + expected=expected_mean, + sigma=sigma, + direction=direction, + shadow_mode=shadow_mode, + ) + + return result + + except Exception as e: + logger.warning("baseline_anomaly_check_failed", metric=metric_name, error=str(e)) + return AnomalyResult( + metric_name=metric_name, + current_value=current_value, + is_anomaly=False, + deviation_sigma=0.0, + expected_mean=0.0, + expected_std=0.0, + reason=f"check_error:{e}", + shadow_mode=shadow_mode, + ) + + # ────────────────────────────────────────────────────────────────────────── + # Private Helpers + # ────────────────────────────────────────────────────────────────────────── + + async def _fetch_prometheus_history( + self, + promql: str, + lookback_hours: int, + ) -> list[MetricDatapoint]: + """從 Prometheus query_range API 抓取歷史資料(1h 步進)。""" + import httpx + from src.core.config import settings + + end_ts = now_taipei().timestamp() + start_ts = end_ts - lookback_hours * 3600 + + try: + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.get( + f"{settings.PROMETHEUS_URL}/api/v1/query_range", + params={ + "query": promql, + "start": start_ts, + "end": end_ts, + "step": "3600", # 1h 步進 + }, + ) + resp.raise_for_status() + data = resp.json() + + results = data.get("data", {}).get("result", []) + if not results: + return [] + + # 取第一個 time series + values = results[0].get("values", []) + return [ + MetricDatapoint(timestamp=float(ts), value=float(v)) + for ts, v in values + if v != "NaN" + ] + except Exception as e: + logger.warning("prometheus_history_fetch_failed", error=str(e)) + return [] + + def _fit_holt_winters( + self, + metric_name: str, + datapoints: list[MetricDatapoint], + ) -> BaselineState: + """ + 用 statsmodels Holt-Winters 訓練基線。 + Fallback:若 statsmodels 不可用 → 滑動統計。 + """ + values = [dp.value for dp in datapoints] + + try: + import numpy as np + from statsmodels.tsa.holtwinters import ExponentialSmoothing + + arr = np.array(values, dtype=float) + # 確保無 NaN / Inf + arr = arr[np.isfinite(arr)] + + if len(arr) < MIN_DATAPOINTS: + return self._fit_simple_stats(metric_name, values) + + # Holt-Winters:加法趨勢 + 加法季節性(24h 週期) + seasonal_periods = min(24, len(arr) // 2) + model = ExponentialSmoothing( + arr, + trend="add", + seasonal="add" if len(arr) >= seasonal_periods * 2 else None, + seasonal_periods=seasonal_periods, + initialization_method="estimated", + ).fit(optimized=True, disp=False) + + fitted = model.fittedvalues + residuals = arr - fitted + mean_val = float(np.mean(fitted)) + std_val = float(np.std(residuals)) + + # 24h seasonal factors(正規化為相對倍數) + seasonal_factors = [1.0] * 24 + if hasattr(model, "season") and model.season is not None: + s = model.season + if len(s) >= 24: + s_arr = np.array(s[-24:]) + # 轉為乘法因子(mean-centered) + s_mean = abs(np.mean(s_arr)) or 1.0 + sf = (s_arr / s_mean).tolist() + seasonal_factors = [max(0.1, min(10.0, f)) for f in sf] + + return BaselineState( + metric_name=metric_name, + mean=mean_val, + std=max(std_val, mean_val * 0.01), # 最小 std = 1% mean + seasonal_factors=seasonal_factors, + last_trained_at=now_taipei().isoformat(), + datapoint_count=len(arr), + ) + + except Exception as e: + logger.warning("holt_winters_failed_fallback_to_stats", error=str(e)) + return self._fit_simple_stats(metric_name, values) + + def _fit_simple_stats( + self, + metric_name: str, + values: list[float], + ) -> BaselineState: + """Fallback:純滑動平均 + 標準差基線。""" + if not values: + return BaselineState(metric_name=metric_name, mean=0.0, std=1.0) + + n = len(values) + mean_val = sum(values) / n + variance = sum((v - mean_val) ** 2 for v in values) / n + std_val = math.sqrt(variance) + + return BaselineState( + metric_name=metric_name, + mean=mean_val, + std=max(std_val, mean_val * 0.01), + seasonal_factors=[1.0] * 24, + last_trained_at=now_taipei().isoformat(), + datapoint_count=n, + ) + + async def _save_baseline(self, state: BaselineState, promql: str = "", lookback_hours: int = HISTORY_WINDOW_HOURS) -> None: + """ + 儲存基線狀態: + 1. 先寫 PostgreSQL(永久保存,source of truth) + 2. 再寫 Redis(24h warm cache,加速讀取) + + Phase 4 ADR-084 架構鐵律:訓練好的 Holt-Winters 模型不能只存 Redis。 + Redis 24h TTL 到期 = AI 每天重新學習「正常」的定義 = 不是在學習。 + 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 改為 PG source of truth + """ + # 1. 寫入 PostgreSQL(主要持久化) + await self._pg_upsert_baseline(state, promql, lookback_hours) + + # 2. 寫入 Redis warm cache(加速讀取,到期後從 PG 復原) + try: + from src.core.redis_client import get_redis + r = get_redis() + key = f"{REDIS_KEY_PREFIX}{state.metric_name}" + await r.set(key, json.dumps(state.to_dict()), ex=REDIS_TTL_SEC) + except Exception as e: + logger.warning("baseline_redis_cache_failed", metric=state.metric_name, error=str(e)) + + async def _load_baseline(self, metric_name: str) -> BaselineState | None: + """ + 載入基線:Redis-first → miss 時從 PG 載入並回填 Redis。 + + Phase 4 ADR-084: Redis 只是 warm cache,PG 才是 source of truth。 + """ + # 1. Redis warm cache hit + try: + from src.core.redis_client import get_redis + r = get_redis() + key = f"{REDIS_KEY_PREFIX}{metric_name}" + data = await r.get(key) + if data is not None: + return BaselineState.from_dict(json.loads(data)) + except Exception as e: + logger.warning("baseline_redis_load_failed", metric=metric_name, error=str(e)) + + # 2. PG fallback(cache miss) + state = await self._pg_load_latest_baseline(metric_name) + if state is not None: + # 回填 Redis cache + try: + from src.core.redis_client import get_redis + r = get_redis() + key = f"{REDIS_KEY_PREFIX}{metric_name}" + await r.set(key, json.dumps(state.to_dict()), ex=REDIS_TTL_SEC) + except Exception: + pass # cache 回填失敗不影響讀取 + return state + + async def _pg_upsert_baseline(self, state: BaselineState, promql: str, lookback_hours: int) -> None: + """寫入 DynamicBaselineRecord 到 PostgreSQL(INSERT,不更新舊記錄)""" + try: + from src.db.base import get_session_factory + from src.db.models import DynamicBaselineRecord + + factory = get_session_factory() + async with factory() as session: + record = DynamicBaselineRecord( + metric_name=state.metric_name, + mean=state.mean, + std=state.std, + seasonal_factors=state.seasonal_factors, + datapoint_count=state.datapoint_count, + promql=promql, + lookback_hours=lookback_hours, + ) + session.add(record) + await session.commit() + logger.info("baseline_pg_saved", metric=state.metric_name, datapoints=state.datapoint_count) + except Exception as e: + logger.warning("baseline_pg_save_failed", metric=state.metric_name, error=str(e)) + + async def _pg_load_latest_baseline(self, metric_name: str) -> BaselineState | None: + """從 PostgreSQL 載入最新一筆基線記錄""" + try: + from sqlalchemy import select + from src.db.base import get_session_factory + from src.db.models import DynamicBaselineRecord + + factory = get_session_factory() + async with factory() as session: + stmt = ( + select(DynamicBaselineRecord) + .where(DynamicBaselineRecord.metric_name == metric_name) + .order_by(DynamicBaselineRecord.trained_at.desc()) + .limit(1) + ) + result = await session.execute(stmt) + record = result.scalar_one_or_none() + if record is None: + return None + return BaselineState( + metric_name=record.metric_name, + mean=record.mean, + std=record.std, + seasonal_factors=record.seasonal_factors, + last_trained_at=record.trained_at.isoformat(), + datapoint_count=record.datapoint_count, + ) + except Exception as e: + logger.warning("baseline_pg_load_failed", metric=metric_name, error=str(e)) + return None + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_baseline_service: DynamicBaselineService | None = None + + +def get_dynamic_baseline_service() -> DynamicBaselineService: + global _baseline_service + if _baseline_service is None: + _baseline_service = DynamicBaselineService() + return _baseline_service diff --git a/apps/api/src/services/log_anomaly_detector.py b/apps/api/src/services/log_anomaly_detector.py new file mode 100644 index 00000000..c58ec02a --- /dev/null +++ b/apps/api/src/services/log_anomaly_detector.py @@ -0,0 +1,385 @@ +""" +AWOOOI AIOps Phase 4 — Log Anomaly Detector(日誌異常偵測) +========================================================== +職責:Drain3 log clustering,即時偵測新 pattern + +核心 API: + process_log_line(line) -> LogAnomalyEvent | None + get_new_patterns(since_ts) -> list[LogCluster] + +設計原則: +- Shadow Mode:新 pattern 只記錄 logger.info,不觸發 Alert +- 狀態持久化到 Redis:cluster tree 序列化(JSON) +- 熔斷:Drain3 失敗 → 僅記錄,不 raise +- 非同步:所有 Redis I/O 非同步;Drain3 計算在同步 helper 中執行 + +ADR-084: Phase 4 動態異常偵測源頭升級 +2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立 +""" + +from __future__ import annotations + +import hashlib +import json +from dataclasses import dataclass +from typing import Any + +import structlog + +from src.utils.timezone import now_taipei + +logger = structlog.get_logger(__name__) + +# ── 常數 ──────────────────────────────────────────────────────────────────── +REDIS_KEY_CLUSTERS = "log_anomaly:clusters" # hash: cluster_id → cluster data +REDIS_KEY_NEW_PATTERNS = "log_anomaly:new" # list: 新 pattern 事件(最新在前) +REDIS_TTL_CLUSTERS = 86400 * 7 # 7 天 +MAX_NEW_PATTERNS = 200 # 保留最近 200 個新 pattern 事件 +DRAIN_DEPTH = 4 # Drain3 tree depth +DRAIN_SIM_THRESHOLD = 0.4 # 相似度 < 此值 → 新 cluster +DRAIN_MAX_CHILDREN = 100 # max children per node + + +# ───────────────────────────────────────────────────────────────────────────── +# Data Types +# ───────────────────────────────────────────────────────────────────────────── + +@dataclass +class LogCluster: + """Drain3 log cluster""" + cluster_id: str + template: str # 模板(e.g. "ERROR <*> connection failed to <*>") + size: int = 1 # 命中次數 + first_seen_at: str = "" + last_seen_at: str = "" + is_new: bool = False # 首次出現 → True + + def to_dict(self) -> dict[str, Any]: + return { + "cluster_id": self.cluster_id, + "template": self.template, + "size": self.size, + "first_seen_at": self.first_seen_at, + "last_seen_at": self.last_seen_at, + } + + @classmethod + def from_dict(cls, d: dict[str, Any]) -> "LogCluster": + return cls( + cluster_id=d["cluster_id"], + template=d["template"], + size=d.get("size", 1), + first_seen_at=d.get("first_seen_at", ""), + last_seen_at=d.get("last_seen_at", ""), + ) + + +@dataclass +class LogAnomalyEvent: + """新 pattern 偵測事件""" + cluster_id: str + template: str + sample_log: str + detected_at: str + shadow_mode: bool = True + source: str = "k8s_pod" # k8s_pod | host_syslog | app_log + + +# ───────────────────────────────────────────────────────────────────────────── +# Main Service +# ───────────────────────────────────────────────────────────────────────────── + +class LogAnomalyDetector: + """ + Drain3 日誌異常偵測服務 + + 工作流程: + 1. process_log_line() — 即時 clustering + 2. 新 cluster → 記錄到 Redis list + 3. ProactiveInspector 定期呼叫 get_new_patterns() 聚合 + """ + + def __init__(self) -> None: + self._drain: Any = None # lazy-init Drain3 instance + self._initialized = False + + def _get_drain(self) -> Any: + """Lazy-init Drain3(避免 import 在啟動時失敗)。""" + if self._drain is not None: + return self._drain + + try: + from drain3 import TemplateMiner + from drain3.template_miner_config import TemplateMinerConfig + + config = TemplateMinerConfig() + config.drain_depth = DRAIN_DEPTH + config.drain_sim_th = DRAIN_SIM_THRESHOLD + config.drain_max_children = DRAIN_MAX_CHILDREN + config.parametrize_numeric_tokens = True + + self._drain = TemplateMiner(config=config) + self._initialized = True + logger.info("drain3_initialized") + return self._drain + + except ImportError: + logger.warning("drain3_not_available", reason="package not installed") + return None + except Exception as e: + logger.warning("drain3_init_failed", error=str(e)) + return None + + async def process_log_line( + self, + log_line: str, + source: str = "k8s_pod", + ) -> LogAnomalyEvent | None: + """ + 處理單行日誌,若為新 pattern 回傳 LogAnomalyEvent。 + + Args: + log_line: 原始日誌行 + source: 來源標籤 + + Returns: + LogAnomalyEvent(新 pattern)或 None(已知 pattern) + """ + from src.core.feature_flags import aiops_flags + + if not aiops_flags.AIOPS_P4_LOG_ANOMALY: + return None + + shadow_mode = aiops_flags.AIOPS_P4_SHADOW_MODE + + try: + drain = self._get_drain() + if drain is None: + return None + + # Drain3 clustering(同步計算,輕量) + result = drain.add_log_message(log_line) + if result is None: + return None + + cluster = result.get("cluster", None) + if cluster is None: + return None + + change_type = result.get("change_type", "none") + is_new = change_type in ("cluster_created", "template_created") + + if not is_new: + # 已知 pattern:更新 last_seen + await self._update_cluster_hit(str(cluster.cluster_id)) + return None + + # 新 pattern! + template = cluster.get_template() + cluster_id = self._make_cluster_id(template) + + now_str = now_taipei().isoformat() + log_cluster = LogCluster( + cluster_id=cluster_id, + template=template, + size=1, + first_seen_at=now_str, + last_seen_at=now_str, + is_new=True, + ) + await self._save_new_cluster(log_cluster) + + event = LogAnomalyEvent( + cluster_id=cluster_id, + template=template, + sample_log=log_line[:500], # 限制長度 + detected_at=now_str, + shadow_mode=shadow_mode, + source=source, + ) + + logger.info( + "log_new_pattern_detected", + cluster_id=cluster_id, + template=template[:200], + shadow_mode=shadow_mode, + source=source, + ) + return event + + except Exception as e: + logger.warning("log_anomaly_process_failed", error=str(e)) + return None + + async def process_pod_logs( + self, + namespace: str = "awoooi-prod", + tail_lines: int = 100, + ) -> list[LogAnomalyEvent]: + """ + 批次掃描 K8s Pod 日誌(供 ProactiveInspector 呼叫)。 + + Returns: + 新 pattern 事件列表(Shadow Mode 時只記錄不觸發) + """ + from src.core.feature_flags import aiops_flags + if not aiops_flags.AIOPS_P4_LOG_ANOMALY: + return [] + + events: list[LogAnomalyEvent] = [] + try: + logs = await self._fetch_pod_logs(namespace, tail_lines) + for line in logs: + if len(line.strip()) < 10: + continue + event = await self.process_log_line(line) + if event: + events.append(event) + except Exception as e: + logger.warning("pod_log_scan_failed", error=str(e)) + + return events + + async def get_recent_new_patterns( + self, + limit: int = 10, + ) -> list[dict[str, Any]]: + """取得最近偵測到的新 pattern(供 ProactiveInspector 聚合報告)。""" + try: + from src.core.redis_client import get_redis + r = get_redis() + raw = await r.lrange(REDIS_KEY_NEW_PATTERNS, 0, limit - 1) + return [json.loads(item) for item in raw] + except Exception: + return [] + + # ────────────────────────────────────────────────────────────────────────── + # Private Helpers + # ────────────────────────────────────────────────────────────────────────── + + def _make_cluster_id(self, template: str) -> str: + """根據模板產生穩定 ID。""" + return hashlib.md5(template.encode()).hexdigest()[:8].upper() + + async def _save_new_cluster(self, cluster: LogCluster, sample_log: str = "") -> None: + """ + 儲存新 cluster: + 1. 先寫 PostgreSQL(永久保存,AI 的 log 語意理解庫) + 2. 推送到 Redis list(短期工作記憶,供 ProactiveInspector 聚合) + + Phase 4 ADR-084 架構鐵律:Drain3 學到的模板不能只存 Redis。 + Redis TTL 到期 = AI 把已知 pattern 再次當成新 pattern = 永遠不會學習。 + 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 改為 PG source of truth + """ + # 1. 寫入 PostgreSQL(主要持久化,UPSERT 防重複) + await self._pg_upsert_cluster(cluster, sample_log) + + # 2. 推送到 Redis list(短期工作記憶) + try: + from src.core.redis_client import get_redis + r = get_redis() + payload = json.dumps({ + **cluster.to_dict(), + "detected_at": cluster.first_seen_at, + }) + await r.lpush(REDIS_KEY_NEW_PATTERNS, payload) + await r.ltrim(REDIS_KEY_NEW_PATTERNS, 0, MAX_NEW_PATTERNS - 1) + await r.expire(REDIS_KEY_NEW_PATTERNS, REDIS_TTL_CLUSTERS) + except Exception as e: + logger.warning("log_cluster_redis_push_failed", error=str(e)) + + async def _pg_upsert_cluster(self, cluster: LogCluster, sample_log: str) -> None: + """ + 寫入或更新 LogClusterRecord(UPSERT on cluster_id)。 + + 同一 cluster_id 再次出現時只更新 last_seen_at 和 size,不重複 INSERT。 + """ + try: + from sqlalchemy.dialects.postgresql import insert as pg_insert + from src.db.base import get_session_factory + from src.db.models import LogClusterRecord + from src.utils.timezone import now_taipei + + factory = get_session_factory() + async with factory() as session: + stmt = pg_insert(LogClusterRecord).values( + cluster_id=cluster.cluster_id, + template=cluster.template, + size=cluster.size, + source="k8s_pod", + sample_log=sample_log[:500] if sample_log else None, + ).on_conflict_do_update( + index_elements=["cluster_id"], + set_={ + "size": LogClusterRecord.size + 1, + "last_seen_at": now_taipei(), + }, + ) + await session.execute(stmt) + await session.commit() + except Exception as e: + logger.warning("log_cluster_pg_upsert_failed", cluster_id=cluster.cluster_id, error=str(e)) + + async def _update_cluster_hit(self, cluster_id: str) -> None: + """更新已知 cluster 的命中次數(best-effort)。""" + try: + from src.core.redis_client import get_redis + r = get_redis() + key = f"{REDIS_KEY_CLUSTERS}:{cluster_id}" + await r.hincrby(key, "size", 1) + await r.hset(key, "last_seen_at", now_taipei().isoformat()) + except Exception: + pass # best-effort + + async def _fetch_pod_logs( + self, + namespace: str, + tail_lines: int, + ) -> list[str]: + """ + 透過 kubectl API server 抓取 Pod 日誌。 + + 使用 K8s in-cluster config(API server: https://kubernetes.default.svc) + 或本地 kubeconfig。 + """ + import asyncio + import subprocess + + try: + # 抓取 awoooi-api deploy 的日誌(最新 Pod) + result = await asyncio.get_event_loop().run_in_executor( + None, + lambda: subprocess.run( + [ + "kubectl", "logs", + f"deploy/awoooi-api", + "-n", namespace, + f"--tail={tail_lines}", + "--timestamps=false", + ], + capture_output=True, + text=True, + timeout=15, + ), + ) + if result.returncode == 0: + return result.stdout.splitlines() + logger.warning("kubectl_logs_failed", stderr=result.stderr[:200]) + return [] + except Exception as e: + logger.warning("pod_log_fetch_failed", error=str(e)) + return [] + + +# ───────────────────────────────────────────────────────────────────────────── +# Singleton +# ───────────────────────────────────────────────────────────────────────────── + +_detector: LogAnomalyDetector | None = None + + +def get_log_anomaly_detector() -> LogAnomalyDetector: + global _detector + if _detector is None: + _detector = LogAnomalyDetector() + return _detector