feat(Phase 3.5 + Phase 4): AI 學習成果持久化到 PostgreSQL — 修正「AI 失憶」架構缺陷
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

ADR-085: AI 學習成果不可存在 Cache

架構鐵律確立:
- PostgreSQL = System of Record(AI 的永久記憶)
- Redis = Warm Cache(加速讀取,TTL 到期從 PG 復原)

核心變更:
1. models.py: 新增 PlaybookRecord / DynamicBaselineRecord / LogClusterRecord ORM
2. base.py: ALTER TABLE playbooks 補加 trust_score / requires_approval_level 等欄位
3. playbook_repository.py: 完整雙寫實作(PG upsert + Redis cache)
4. dynamic_baseline_service.py: Holt-Winters 訓練結果寫入 PG,Redis 只作 24h warm cache
5. log_anomaly_detector.py: Drain3 cluster template 寫入 PG(UPSERT on cluster_id)
6. main.py: 啟動時執行 backfill_redis_to_pg()(Redis → PG 冪等補救)

修正的問題:
- Playbook 7天 Redis TTL 到期 → AI 失去所有修復知識
- trust_score EWMA 隨 Redis TTL 歸零 → AI 重新回到初始信任度 0.3
- Holt-Winters 基線 24h TTL → AI 每天重新學習「正常」的定義
- Drain3 cluster 沒有持久化 → AI 把已知 log pattern 反覆當新 pattern

Phase 4 新服務(requirements.txt 已加入 statsmodels + drain3 + numpy)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-15 15:33:52 +08:00
parent 9126c594a4
commit bf45b80bd2
8 changed files with 1490 additions and 238 deletions

View File

@@ -43,6 +43,16 @@ opentelemetry-instrumentation-logging>=0.41b0
# 2026-04-02 Claude Code: 鎖定 v2.60.x — v3.x/v4.x 移除 client.trace() API與 langfuse_client.py 不相容
langfuse>=2.0.0,<3.0.0
# ==========================================================================
# Phase 4: 動態異常偵測 (ADR-084)
# ==========================================================================
# Holt-Winters 指數平滑(動態基線)
statsmodels>=0.14.0
# Log clusteringDrain3 演算法)
drain3>=0.9.11
# numpy 已為 statsmodels 依賴,顯式列出確保可用(線性趨勢預測)
numpy>=1.24.0
# Development
pytest>=7.4.0
pytest-asyncio>=0.23.0

View File

@@ -131,6 +131,10 @@ class AIOpsFeatureFlags(BaseSettings):
default=False,
description="P4: 主動巡檢每 5min 是否執行",
)
AIOPS_P4_SHADOW_MODE: bool = Field(
default=True,
description="P4: Shadow Mode = True 時動態偵測只記錄不觸發 AlertFalse = 真實觸發(需先觀察噪音率)",
)
# ==========================================================================
# Phase 5 細粒度子開關

View File

@@ -186,6 +186,19 @@ async def init_db() -> None:
""")
)
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 Playbook PostgreSQL 持久化
# ADR-085: AI 學習成果不可存 Cache — trust_score、EWMA 必須永久保存
# playbooks 表已存在15 筆舊資料),補加新欄位
await conn.execute(
text("""
ALTER TABLE playbooks
ADD COLUMN IF NOT EXISTS trust_score FLOAT NOT NULL DEFAULT 0.3,
ADD COLUMN IF NOT EXISTS requires_approval_level VARCHAR(20) NOT NULL DEFAULT 'auto',
ADD COLUMN IF NOT EXISTS stateful_targets JSONB NOT NULL DEFAULT '[]',
ADD COLUMN IF NOT EXISTS requires_pre_backup BOOLEAN NOT NULL DEFAULT FALSE;
""")
)
async def close_db() -> None:
"""

View File

@@ -835,6 +835,207 @@ class IncidentEvidence(Base):
)
# =============================================================================
# PlaybookRecord — Phase 3.5 Playbook PostgreSQL 持久化 (System of Record)
# ADR-085: AI 學習成果不可存在 Cache — Playbook 是 AI 的肌肉記憶
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 初始建立
#
# 核心鐵律:
# - PostgreSQL = System of Record永久保存AI 的長期記憶)
# - Redis = Warm Cache7天 TTL加速讀取DB 為 source of truth
# - trust_score, EWMA, 統計數據必須持久化 — 不能因 Redis TTL 消失
# =============================================================================
class PlaybookRecord(Base):
"""
Playbook 修復劇本 PostgreSQL ORM
與 Pydantic Playbook 模型對應。
Redis 為 warm cache7d TTLPostgreSQL 為 source of truth。
設計原則:
- AI 的學習成果trust_score、success_count、failure_count永久保存
- EWMA 信任度在 Redis TTL 後不會重置Pod 重啟後 AI 記憶不失
- 雙寫create/update 先寫 PG再更新 Redis cache
- 讀取Redis-firstcache hitmiss 時從 PG 載入並回填 Redis
"""
__tablename__ = "playbooks"
# Primary Key
playbook_id: Mapped[str] = mapped_column(
String(36), primary_key=True,
comment="Playbook 唯一識別碼 (PB-YYYYMMDD-XXXXXX)",
)
# Core Fields
name: Mapped[str] = mapped_column(String(256), nullable=False)
description: Mapped[str] = mapped_column(Text, default="", nullable=False)
status: Mapped[str] = mapped_column(String(20), default="draft", nullable=False)
source: Mapped[str] = mapped_column(String(20), default="extracted", nullable=False)
# Complex structures (JSONB)
symptom_pattern: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict, nullable=False)
repair_steps: Mapped[list[dict[str, Any]]] = mapped_column(JSON, default=list, nullable=False)
# Timing
estimated_duration_minutes: Mapped[int] = mapped_column(Integer, default=5, nullable=False)
# Source tracing
source_incident_ids: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False)
ai_confidence: Mapped[float] = mapped_column(default=0.0, nullable=False)
# Stats — MUST be in PG (AI learning artifacts, cannot expire)
success_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
failure_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
last_used_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
# EWMA trust score — ADR-083 Phase 3, 絕對不能用 Redis TTL 管理
# trust_score 是 AI 累積學習的結晶TTL 到期就歸零 = AI 記憶全部消失
trust_score: Mapped[float] = mapped_column(default=0.3, nullable=False,
comment="EWMA 動態信任度 (Phase 3)。成功 α=0.1,失敗 α=0.22x 衰減)。< 0.1 → 封存")
# Approval metadata
approved_by: Mapped[str | None] = mapped_column(String(100), nullable=True)
approved_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
tags: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False)
notes: Mapped[str | None] = mapped_column(Text, nullable=True)
# Sprint 5.1 護欄欄位 (2026-04-08)
requires_approval_level: Mapped[str] = mapped_column(
String(20), default="auto", nullable=False,
comment="auto=直接執行, standard=1票, critical=2票MultiSig",
)
stateful_targets: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False)
requires_pre_backup: Mapped[bool] = mapped_column(default=False, nullable=False)
# Timestamps
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now,
onupdate=taipei_now, nullable=False)
__table_args__ = (
Index("ix_playbook_status", "status"),
Index("ix_playbook_trust_score", "trust_score"),
Index("ix_playbook_created_at", "created_at"),
)
# =============================================================================
# DynamicBaselineRecord — Phase 4 Holt-Winters 訓練基線持久化
# ADR-084: 動態基線不能只存 Redis — AI 每天重學「正常」不是在學習
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立
#
# 核心鐵律:
# - 訓練好的 Holt-Winters 模型必須在 PG 長期保存
# - Redis 為 24h warm cache加速 is_anomaly() 讀取)
# - 基線消失 = AI 對「正常」的認識消失 = 每天從頭學習 = 不是 AI
# =============================================================================
class DynamicBaselineRecord(Base):
"""
動態基線訓練結果 PostgreSQL ORM
Holt-Winters 訓練完成後:
1. 先寫入 PG永久保存
2. 再寫入 Redis24h warm cache加速讀取
Redis key: baseline:{metric_name}
PG: 此表metric_name 為主鍵,最新一筆 = 有效基線
"""
__tablename__ = "dynamic_baselines"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=generate_uuid)
# 基線識別
metric_name: Mapped[str] = mapped_column(
String(200), nullable=False, index=True,
comment="基線識別名 (e.g. cpu_usage_node_mon)",
)
# 訓練結果Holt-Winters 統計)
mean: Mapped[float] = mapped_column(nullable=False, comment="擬合值均值")
std: Mapped[float] = mapped_column(nullable=False, comment="殘差標準差")
# 24h 季節性因子JSON 陣列,長度 24
seasonal_factors: Mapped[list[float]] = mapped_column(
JSON, default=list, nullable=False,
comment="24h 週期季節性因子(乘法形式,均值 ≈ 1.0",
)
# 訓練元資料
datapoint_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
promql: Mapped[str] = mapped_column(Text, default="", nullable=False,
comment="訓練使用的 PromQL 查詢")
lookback_hours: Mapped[int] = mapped_column(Integer, default=336, nullable=False)
# Timestamps
trained_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, nullable=False)
__table_args__ = (
Index("ix_dynamic_baseline_metric", "metric_name"),
Index("ix_dynamic_baseline_trained_at", "trained_at"),
)
# =============================================================================
# LogClusterRecord — Phase 4 Drain3 學習到的 Log Pattern 持久化
# ADR-084: Drain3 模板不能只存 Redis — 每次重啟 AI 把已知 pattern 當新 pattern
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立
#
# 核心鐵律:
# - Drain3 學到的 log cluster template 必須在 PG 長期保存
# - 新 cluster 事件列表 (log_anomaly:new) 才存 Redis短期工作記憶
# - 基礎知識庫(已學到的 pattern必須在 PG
# =============================================================================
class LogClusterRecord(Base):
"""
Drain3 Log Cluster Template 持久化
每個新 pattern 首次偵測到時:
1. 寫入 PG永久保存AI 的 log 語意理解)
2. 推送到 Redis list log_anomaly:new短期工作記憶
Re-detect 相同 template 時只更新 last_seen_at + size不重複寫入 PG。
"""
__tablename__ = "log_clusters"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=generate_uuid)
# Cluster 識別MD5[:8] of template
cluster_id: Mapped[str] = mapped_column(
String(16), nullable=False, unique=True, index=True,
comment="模板 MD5[:8].upper(),穩定 ID",
)
# Drain3 模板
template: Mapped[str] = mapped_column(
Text, nullable=False,
comment="Drain3 萃取的 log 模板 (e.g. 'ERROR <*> connection failed to <*>')",
)
# 統計
size: Mapped[int] = mapped_column(Integer, default=1, nullable=False,
comment="命中次數(第一次 = 1")
source: Mapped[str] = mapped_column(String(50), default="k8s_pod", nullable=False,
comment="k8s_pod | host_syslog | app_log")
# 樣本日誌(保留首次觸發的原始行,供事後分析)
sample_log: Mapped[str | None] = mapped_column(Text, nullable=True,
comment="首次觸發的原始 log 行(前 500 字元)")
# Timestamps
first_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, nullable=False)
last_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now,
onupdate=taipei_now, nullable=False)
__table_args__ = (
Index("ix_log_cluster_first_seen", "first_seen_at"),
Index("ix_log_cluster_source", "source"),
)
# =============================================================================
# AgentSession — Phase 2 多 Agent 辯證 Audit Trail
# =============================================================================

View File

@@ -296,6 +296,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("playbook_seed_schedule_failed", error=str(e))
# Phase 3.5 ADR-085: Playbook Redis → PG 補寫(一次性遷移 + 啟動時冪等補救)
# 確保 Redis 中存在但 PG 中缺少的 Playbook 不因 TTL 消失而永久丟失
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 AI 學習成果持久化
try:
from src.repositories.playbook_repository import get_playbook_repository
asyncio.create_task(get_playbook_repository().backfill_redis_to_pg())
logger.info("playbook_pg_backfill_scheduled")
except Exception as e:
logger.warning("playbook_pg_backfill_schedule_failed", error=str(e))
try:
from src.services.playbook_embedding_service import ensure_playbook_embeddings_indexed
asyncio.create_task(ensure_playbook_embeddings_indexed())

View File

@@ -1,26 +1,31 @@
"""
Playbook Repository - #7 Playbook 萃取
======================================
Playbook CRUD 操作 (Redis + PostgreSQL)
Playbook Repository — Phase 3.5 雙寫持久化
==========================================
ADR-085: AI 學習成果必須儲存到 PostgreSQL
儲存策略Phase 3.5 架構升級):
- PostgreSQL = System of Record永久保存source of truth
- Redis = Warm Cache7天 TTL加速讀取PG 為準)
雙寫規則:
- create/update → 先寫 PG再更新 Redis cache
- 讀取 → Redis-firstcache hitmiss 時從 PG 載入並回填 Redis
- trust_score / EWMA 統計 → PG 永久保存Redis TTL 到期後從 PG 復原
Phase 7.2: Repository 實作
建立時間: 2026-03-26 (台北時區)
建立者: Claude Code (#7 Playbook 萃取)
遵循 leWOOOgo 積木化原則:
- 實作 IPlaybookRepository Protocol
- Redis 為 Working Memory (7天 TTL)
- PostgreSQL 為 Episodic Memory
Phase 22 P2: 相似度計算邏輯移至 utils
2026-03-31 Claude Code (首席架構師技術債修復)
Phase 3.5 (ADR-085): PostgreSQL dual-write + Redis warm cache
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 重寫修正「AI 失憶」架構問題
"""
import json
import structlog
from sqlalchemy import select
from src.core.redis_client import get_redis
from src.db.base import get_session_factory
from src.db.models import PlaybookRecord
from src.models.playbook import (
Playbook,
PlaybookStatus,
@@ -32,7 +37,7 @@ from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# Redis TTL: 7 天
# Redis TTL: 7 天warm cache onlyPG 為 source of truth
PLAYBOOK_TTL_SECONDS = 7 * 24 * 60 * 60
# Redis Key 前綴
@@ -41,129 +46,175 @@ PLAYBOOK_INDEX_ALERT_PREFIX = "playbook:index:alert:"
PLAYBOOK_INDEX_SERVICE_PREFIX = "playbook:index:service:"
# =============================================================================
# ORM ↔ Pydantic 轉換 Helpers
# =============================================================================
def _pydantic_to_orm(playbook: Playbook) -> PlaybookRecord:
"""Pydantic Playbook → PlaybookRecord ORM"""
return PlaybookRecord(
playbook_id=playbook.playbook_id,
name=playbook.name,
description=playbook.description,
status=playbook.status.value,
source=playbook.source.value,
symptom_pattern=playbook.symptom_pattern.model_dump(),
repair_steps=[s.model_dump() for s in playbook.repair_steps],
estimated_duration_minutes=playbook.estimated_duration_minutes,
source_incident_ids=playbook.source_incident_ids,
ai_confidence=playbook.ai_confidence,
success_count=playbook.success_count,
failure_count=playbook.failure_count,
last_used_at=playbook.last_used_at,
trust_score=playbook.trust_score,
approved_by=playbook.approved_by,
approved_at=playbook.approved_at,
tags=playbook.tags,
notes=playbook.notes,
requires_approval_level=playbook.requires_approval_level,
stateful_targets=playbook.stateful_targets,
requires_pre_backup=playbook.requires_pre_backup,
created_at=playbook.created_at,
updated_at=playbook.updated_at,
)
def _orm_to_pydantic(record: PlaybookRecord) -> Playbook:
"""PlaybookRecord ORM → Pydantic Playbook"""
return Playbook.model_validate({
"playbook_id": record.playbook_id,
"name": record.name,
"description": record.description,
"status": record.status,
"source": record.source,
"symptom_pattern": record.symptom_pattern,
"repair_steps": record.repair_steps,
"estimated_duration_minutes": record.estimated_duration_minutes,
"source_incident_ids": record.source_incident_ids,
"ai_confidence": float(record.ai_confidence),
"success_count": record.success_count,
"failure_count": record.failure_count,
"last_used_at": record.last_used_at,
"trust_score": float(record.trust_score),
"approved_by": record.approved_by,
"approved_at": record.approved_at,
"tags": record.tags,
"notes": record.notes,
"requires_approval_level": record.requires_approval_level,
"stateful_targets": record.stateful_targets,
"requires_pre_backup": record.requires_pre_backup,
"created_at": record.created_at,
"updated_at": record.updated_at,
})
class PlaybookRepository:
"""
Playbook Repository 實作
Playbook Repository — Phase 3.5 雙寫實作
儲存策略:
- Redis: Working Memory (快速讀取7天 TTL)
- PostgreSQL: Episodic Memory (持久化,待實作)
- PostgreSQL: source of truth所有 create/update 必須先寫 PG
- Redis: warm cache加速讀取7d TTL到期後從 PG 復原)
Phase 7.2 先實作 Redis 層PostgreSQL 待 #7.5 整合
這保證:
- trust_score / EWMA 永久不丟失
- Pod 重啟後 AI 學習成果完整保留
- Redis 失效只影響讀取速度,不影響資料完整性
"""
# === CRUD Operations ===
# =========================================================================
# CRUD Operations
# =========================================================================
async def create(self, playbook: Playbook) -> Playbook:
"""
建立新的 Playbook
1. 儲存到 Redis
2. 建立索引 (alert_names, services)
順序:
1. 寫入 PostgreSQLsource of truth
2. 寫入 Redis cachewarm cache
3. 更新索引
"""
try:
redis_client = get_redis()
if not playbook.created_at:
playbook.created_at = now_taipei()
playbook.updated_at = now_taipei()
# 確保有建立時間
if not playbook.created_at:
playbook.created_at = now_taipei()
playbook.updated_at = now_taipei()
# 1. 寫入 PostgreSQL先寫確保資料不丟失
await self._pg_upsert(playbook)
# 儲存 Playbook
key = f"{PLAYBOOK_KEY_PREFIX}{playbook.playbook_id}"
await redis_client.set(
key,
json.dumps(playbook.to_redis_dict(), ensure_ascii=False),
ex=PLAYBOOK_TTL_SECONDS,
)
# 2. 寫入 Rediswarm cache
await self._redis_set(playbook)
# 建立索引
await self._update_indexes(playbook)
# 3. 建立索引
await self._update_indexes(playbook)
logger.info(
"playbook_created",
playbook_id=playbook.playbook_id,
name=playbook.name,
)
return playbook
except Exception as e:
logger.error("playbook_create_failed", error=str(e))
raise
logger.info("playbook_created", playbook_id=playbook.playbook_id, name=playbook.name)
return playbook
async def get_by_id(self, playbook_id: str) -> Playbook | None:
"""根據 ID 取得 Playbook"""
try:
redis_client = get_redis()
key = f"{PLAYBOOK_KEY_PREFIX}{playbook_id}"
data = await redis_client.get(key)
"""
根據 ID 取得 Playbook
if data:
return Playbook.from_redis_dict(json.loads(data))
return None
Redis-first → miss 時從 PG 載入並回填 Redis
"""
# 1. Redis cache hit
cached = await self._redis_get(playbook_id)
if cached is not None:
return cached
except Exception as e:
logger.error("playbook_get_failed", playbook_id=playbook_id, error=str(e))
return None
# 2. PG fallbackcache miss
playbook = await self._pg_get(playbook_id)
if playbook is not None:
# 回填 Redis cache
await self._redis_set(playbook)
return playbook
async def update(self, playbook: Playbook) -> Playbook | None:
"""更新 Playbook"""
try:
existing = await self.get_by_id(playbook.playbook_id)
if not existing:
return None
"""
更新 Playbook
playbook.updated_at = now_taipei()
redis_client = get_redis()
key = f"{PLAYBOOK_KEY_PREFIX}{playbook.playbook_id}"
await redis_client.set(
key,
json.dumps(playbook.to_redis_dict(), ensure_ascii=False),
ex=PLAYBOOK_TTL_SECONDS,
)
# 更新索引
await self._update_indexes(playbook)
logger.info("playbook_updated", playbook_id=playbook.playbook_id)
return playbook
except Exception as e:
logger.error(
"playbook_update_failed",
先確認 PG 存在 → 寫入 PG → 更新 Redis
"""
existing = await self._pg_get(playbook.playbook_id)
if existing is None:
# PG 找不到,可能是老資料只在 Redis先建立 PG 記錄
logger.warning(
"playbook_pg_not_found_creating",
playbook_id=playbook.playbook_id,
error=str(e),
)
return None
playbook.updated_at = now_taipei()
# 1. 寫入 PG
await self._pg_upsert(playbook)
# 2. 更新 Redis
await self._redis_set(playbook)
# 3. 更新索引
await self._update_indexes(playbook)
logger.info("playbook_updated", playbook_id=playbook.playbook_id)
return playbook
async def delete(self, playbook_id: str) -> bool:
"""
刪除 Playbook (軟刪除 → DEPRECATED)
不真正刪除,而是將狀態改為 DEPRECATED
刪除 Playbook(狀態改為 DEPRECATED
"""
try:
playbook = await self.get_by_id(playbook_id)
if not playbook:
return False
playbook.status = PlaybookStatus.DEPRECATED
playbook.updated_at = now_taipei()
await self.update(playbook)
logger.info("playbook_deprecated", playbook_id=playbook_id)
return True
except Exception as e:
logger.error(
"playbook_delete_failed",
playbook_id=playbook_id,
error=str(e),
)
playbook = await self.get_by_id(playbook_id)
if not playbook:
return False
# === Query Operations ===
playbook.status = PlaybookStatus.DEPRECATED
playbook.updated_at = now_taipei()
await self.update(playbook)
logger.info("playbook_deprecated", playbook_id=playbook_id)
return True
# =========================================================================
# Query Operations
# =========================================================================
async def list_playbooks(
self,
@@ -173,42 +224,32 @@ class PlaybookRepository:
offset: int = 0,
) -> tuple[list[Playbook], int]:
"""
列出 Playbooks
列出 Playbooks(從 PostgreSQL 查詢,不走 Redis scan
注意: Redis 實作效率較低,後續需遷移到 PostgreSQL
Phase 3.5:改用 PG 查詢,效率更高,資料更完整
"""
try:
redis_client = get_redis()
factory = get_session_factory()
async with factory() as session:
stmt = select(PlaybookRecord)
if status is not None:
stmt = stmt.where(PlaybookRecord.status == status.value)
stmt = stmt.order_by(PlaybookRecord.updated_at.desc())
# 掃描所有 Playbook keys
pattern = f"{PLAYBOOK_KEY_PREFIX}PB-*"
keys = []
async for key in redis_client.scan_iter(match=pattern, count=100):
keys.append(key)
result = await session.execute(stmt)
all_records = result.scalars().all()
# 讀取並過濾
all_playbooks: list[Playbook] = []
for key in keys:
data = await redis_client.get(key)
if data:
playbook = Playbook.from_redis_dict(json.loads(data))
all_playbooks = [_orm_to_pydantic(r) for r in all_records]
# 狀態過濾
if status and playbook.status != status:
continue
# 標籤過濾
if tags and not set(tags).intersection(set(playbook.tags)):
continue
all_playbooks.append(playbook)
# 排序: 按 updated_at 降序
all_playbooks.sort(key=lambda p: p.updated_at, reverse=True)
# 標籤過濾PG JSONB 過濾後續可移到 SQL目前 Python 過濾
if tags:
all_playbooks = [
p for p in all_playbooks
if set(tags).intersection(set(p.tags))
]
total = len(all_playbooks)
items = all_playbooks[offset : offset + limit]
items = all_playbooks[offset: offset + limit]
return items, total
except Exception as e:
@@ -225,7 +266,7 @@ class PlaybookRepository:
根據症狀模式找相似 Playbook
策略:
1. 從索引快速過濾候選
1. 從 Redis 索引快速過濾候選
2. 計算詳細相似度
3. 返回 Top K
"""
@@ -235,24 +276,19 @@ class PlaybookRepository:
# 1. 使用索引找候選 Playbook IDs
candidate_ids: set[str] = set()
# 從 alert_names 索引查詢
for alert_name in symptoms.alert_names:
index_key = f"{PLAYBOOK_INDEX_ALERT_PREFIX}{alert_name}"
members = await redis_client.smembers(index_key)
candidate_ids.update(m.decode() if isinstance(m, bytes) else m for m in members)
# 從 services 索引查詢
for service in symptoms.affected_services:
index_key = f"{PLAYBOOK_INDEX_SERVICE_PREFIX}{service}"
members = await redis_client.smembers(index_key)
candidate_ids.update(m.decode() if isinstance(m, bytes) else m for m in members)
# 如果沒有索引命中,掃描所有 APPROVED Playbooks
# 索引命中 → 從 PG 掃 APPROVED
if not candidate_ids:
playbooks, _ = await self.list_playbooks(
status=PlaybookStatus.APPROVED,
limit=100,
)
playbooks, _ = await self.list_playbooks(status=PlaybookStatus.APPROVED, limit=100)
candidate_ids = {p.playbook_id for p in playbooks}
# 2. 計算相似度
@@ -260,26 +296,16 @@ class PlaybookRepository:
for playbook_id in candidate_ids:
playbook = await self.get_by_id(playbook_id)
if not playbook:
if not playbook or playbook.status != PlaybookStatus.APPROVED:
continue
# 只考慮 APPROVED 狀態
if playbook.status != PlaybookStatus.APPROVED:
continue
similarity = calculate_symptom_similarity(
symptoms,
playbook.symptom_pattern,
)
# alert_names 完全匹配時,保證通過(不因其他維度拉低分數)
similarity = calculate_symptom_similarity(symptoms, playbook.symptom_pattern)
alert_exact_match = bool(
set(symptoms.alert_names) & set(playbook.symptom_pattern.alert_names)
)
if alert_exact_match or similarity >= min_similarity:
results.append((playbook, similarity))
# 3. 排序並返回 Top K
results.sort(key=lambda x: x[1], reverse=True)
return results[:top_k]
@@ -293,13 +319,13 @@ class PlaybookRepository:
success: bool,
) -> bool:
"""
更新執行統計 + EWMA 信任度
更新執行統計 + EWMA 信任度(雙寫到 PG + Redis
ADR-083 Phase 3: 負向 2x 強化 EWMA 公式
成功: trust_new = 0.9 * trust_old + 0.1 * 1.0
失敗: trust_new = 0.8 * trust_old + 0.2 * 0.0(衰減速度 2x
trust < 0.1 → 記錄警告,由 Evolver Agent 封存
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3 EWMA 實裝
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 EWMA 雙寫 PG
"""
try:
playbook = await self.get_by_id(playbook_id)
@@ -308,16 +334,15 @@ class PlaybookRepository:
if success:
playbook.success_count += 1
# 正向 EWMAalpha=0.1,正向結果權重較小(保守更新)
playbook.trust_score = 0.9 * playbook.trust_score + 0.1 * 1.0
else:
playbook.failure_count += 1
# 負向 EWMAalpha=0.2,失敗懲罰 2x快速衰退
playbook.trust_score = 0.8 * playbook.trust_score + 0.2 * 0.0
# 邊界保護
playbook.trust_score = max(0.0, min(1.0, playbook.trust_score))
playbook.last_used_at = now_taipei()
# 雙寫PG + Redis
await self.update(playbook)
if playbook.trust_score < 0.1:
@@ -338,70 +363,25 @@ class PlaybookRepository:
return True
except Exception as e:
logger.error(
"playbook_stats_update_failed",
playbook_id=playbook_id,
error=str(e),
)
logger.error("playbook_stats_update_failed", playbook_id=playbook_id, error=str(e))
return False
# === Index Management ===
async def _update_indexes(self, playbook: Playbook) -> None:
"""更新索引"""
async def find_by_source_incident(self, incident_id: str) -> list[Playbook]:
"""根據來源 Incident ID 找 Playbook從 PG 查詢)"""
try:
redis_client = get_redis()
# Alert names 索引
for alert_name in playbook.symptom_pattern.alert_names:
index_key = f"{PLAYBOOK_INDEX_ALERT_PREFIX}{alert_name}"
await redis_client.sadd(index_key, playbook.playbook_id)
await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS)
# Services 索引
for service in playbook.symptom_pattern.affected_services:
index_key = f"{PLAYBOOK_INDEX_SERVICE_PREFIX}{service}"
await redis_client.sadd(index_key, playbook.playbook_id)
await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS)
factory = get_session_factory()
async with factory() as session:
# PG JSONB contains 查詢
stmt = select(PlaybookRecord).where(
PlaybookRecord.source_incident_ids.contains([incident_id])
)
result = await session.execute(stmt)
records = result.scalars().all()
return [_orm_to_pydantic(r) for r in records]
except Exception as e:
logger.warning("playbook_index_update_failed", error=str(e))
# === Learning Service 信心度調整 (2026-03-30 Claude Code) ===
async def find_by_source_incident(
self,
incident_id: str,
) -> list[Playbook]:
"""
根據來源 Incident ID 找 Playbook
2026-03-30 Claude Code: Learning Service 信心度調整用
尋找 source_incident_ids 包含此 incident_id 的 Playbooks
"""
try:
redis_client = get_redis()
# 掃描所有 Playbook keys
pattern = f"{PLAYBOOK_KEY_PREFIX}PB-*"
results: list[Playbook] = []
async for key in redis_client.scan_iter(match=pattern, count=100):
data = await redis_client.get(key)
if data:
playbook = Playbook.from_redis_dict(json.loads(data))
if incident_id in playbook.source_incident_ids:
results.append(playbook)
return results
except Exception as e:
logger.error(
"playbook_find_by_incident_failed",
incident_id=incident_id,
error=str(e),
)
return []
logger.error("playbook_find_by_incident_failed", incident_id=incident_id, error=str(e))
# Fallback: Redis scan維持向下相容
return await self._redis_find_by_incident(incident_id)
async def adjust_confidence(
self,
@@ -410,9 +390,7 @@ class PlaybookRepository:
reason: str,
) -> Playbook | None:
"""
調整 Playbook 信心度
2026-03-30 Claude Code: Learning Service 信心度調整用
調整 Playbook 信心度(雙寫到 PG + Redis
邏輯:
- ai_confidence += delta (clamp 到 0.0~1.0)
@@ -425,25 +403,16 @@ class PlaybookRepository:
return None
old_confidence = playbook.ai_confidence
# 調整信心度 (clamp 到 0.0~1.0)
playbook.ai_confidence = max(0.0, min(1.0, playbook.ai_confidence + delta))
# 狀態自動轉換
old_status = playbook.status
# 高信心度自動升級
if (
playbook.ai_confidence >= 0.9
and playbook.status == PlaybookStatus.DRAFT
):
if playbook.ai_confidence >= 0.9 and playbook.status == PlaybookStatus.DRAFT:
playbook.status = PlaybookStatus.APPROVED
playbook.approved_by = "auto_learning"
playbook.approved_at = now_taipei()
note = f"\n[Auto-approved: confidence {playbook.ai_confidence:.2f}]"
playbook.notes = (playbook.notes or "") + note
# 低信心度 + 高失敗率 → 棄用
elif (
playbook.ai_confidence < 0.3
and playbook.total_executions >= 5
@@ -455,7 +424,6 @@ class PlaybookRepository:
note = f"\n[Auto-deprecated: conf={conf:.2f}, fail={fail:.0%}]"
playbook.notes = (playbook.notes or "") + note
# 儲存
updated = await self.update(playbook)
logger.info(
@@ -467,18 +435,190 @@ class PlaybookRepository:
new_status=playbook.status.value,
reason=reason,
)
return updated
except Exception as e:
logger.error(
"playbook_confidence_adjust_failed",
playbook_id=playbook_id,
delta=delta,
error=str(e),
)
logger.error("playbook_confidence_adjust_failed", playbook_id=playbook_id, delta=delta, error=str(e))
return None
# =========================================================================
# Startup Backfill — Redis → PG 遷移
# =========================================================================
async def backfill_redis_to_pg(self) -> int:
"""
啟動時將 Redis 中存在但 PG 中沒有的 Playbook 寫入 PG。
用途Phase 3.5 上線前後,確保舊 Redis 資料不丟失。
返回:補寫的 Playbook 數量。
"""
count = 0
try:
redis_client = get_redis()
pattern = f"{PLAYBOOK_KEY_PREFIX}PB-*"
async for key in redis_client.scan_iter(match=pattern, count=100):
data = await redis_client.get(key)
if not data:
continue
try:
playbook = Playbook.from_redis_dict(json.loads(data))
existing = await self._pg_get(playbook.playbook_id)
if existing is None:
await self._pg_upsert(playbook)
count += 1
logger.info("playbook_backfilled_to_pg", playbook_id=playbook.playbook_id)
except Exception as e:
logger.warning("playbook_backfill_item_failed", key=str(key), error=str(e))
except Exception as e:
logger.warning("playbook_backfill_failed", error=str(e))
logger.info("playbook_backfill_complete", count=count)
return count
# =========================================================================
# Private — PostgreSQL Helpers
# =========================================================================
async def _pg_upsert(self, playbook: Playbook) -> None:
"""Upsert Playbook 到 PostgreSQLINSERT ON CONFLICT UPDATE"""
try:
from sqlalchemy.dialects.postgresql import insert as pg_insert
factory = get_session_factory()
async with factory() as session:
stmt = pg_insert(PlaybookRecord).values(
playbook_id=playbook.playbook_id,
name=playbook.name,
description=playbook.description,
status=playbook.status.value,
source=playbook.source.value,
symptom_pattern=playbook.symptom_pattern.model_dump(),
repair_steps=[s.model_dump() for s in playbook.repair_steps],
estimated_duration_minutes=playbook.estimated_duration_minutes,
source_incident_ids=playbook.source_incident_ids,
ai_confidence=playbook.ai_confidence,
success_count=playbook.success_count,
failure_count=playbook.failure_count,
last_used_at=playbook.last_used_at,
trust_score=playbook.trust_score,
approved_by=playbook.approved_by,
approved_at=playbook.approved_at,
tags=playbook.tags,
notes=playbook.notes,
requires_approval_level=playbook.requires_approval_level,
stateful_targets=playbook.stateful_targets,
requires_pre_backup=playbook.requires_pre_backup,
created_at=playbook.created_at,
updated_at=playbook.updated_at,
).on_conflict_do_update(
index_elements=["playbook_id"],
set_={
"name": playbook.name,
"description": playbook.description,
"status": playbook.status.value,
"source": playbook.source.value,
"symptom_pattern": playbook.symptom_pattern.model_dump(),
"repair_steps": [s.model_dump() for s in playbook.repair_steps],
"estimated_duration_minutes": playbook.estimated_duration_minutes,
"source_incident_ids": playbook.source_incident_ids,
"ai_confidence": playbook.ai_confidence,
"success_count": playbook.success_count,
"failure_count": playbook.failure_count,
"last_used_at": playbook.last_used_at,
"trust_score": playbook.trust_score,
"approved_by": playbook.approved_by,
"approved_at": playbook.approved_at,
"tags": playbook.tags,
"notes": playbook.notes,
"requires_approval_level": playbook.requires_approval_level,
"stateful_targets": playbook.stateful_targets,
"requires_pre_backup": playbook.requires_pre_backup,
"updated_at": playbook.updated_at,
},
)
await session.execute(stmt)
await session.commit()
except Exception as e:
logger.error("playbook_pg_upsert_failed", playbook_id=playbook.playbook_id, error=str(e))
raise
async def _pg_get(self, playbook_id: str) -> Playbook | None:
"""從 PostgreSQL 載入 Playbook"""
try:
factory = get_session_factory()
async with factory() as session:
result = await session.get(PlaybookRecord, playbook_id)
if result is None:
return None
return _orm_to_pydantic(result)
except Exception as e:
logger.warning("playbook_pg_get_failed", playbook_id=playbook_id, error=str(e))
return None
# =========================================================================
# Private — Redis Cache Helpers
# =========================================================================
async def _redis_set(self, playbook: Playbook) -> None:
"""寫入 Redis warm cacheTTL 7天"""
try:
redis_client = get_redis()
key = f"{PLAYBOOK_KEY_PREFIX}{playbook.playbook_id}"
await redis_client.set(
key,
json.dumps(playbook.to_redis_dict(), ensure_ascii=False),
ex=PLAYBOOK_TTL_SECONDS,
)
except Exception as e:
# Redis 寫入失敗不 raisePG 已成功為主)
logger.warning("playbook_redis_set_failed", playbook_id=playbook.playbook_id, error=str(e))
async def _redis_get(self, playbook_id: str) -> Playbook | None:
"""從 Redis cache 讀取 Playbook"""
try:
redis_client = get_redis()
key = f"{PLAYBOOK_KEY_PREFIX}{playbook_id}"
data = await redis_client.get(key)
if data:
return Playbook.from_redis_dict(json.loads(data))
return None
except Exception as e:
logger.warning("playbook_redis_get_failed", playbook_id=playbook_id, error=str(e))
return None
async def _redis_find_by_incident(self, incident_id: str) -> list[Playbook]:
"""Redis fallback: 掃描所有 Playbook 找包含 incident_id 的"""
try:
redis_client = get_redis()
results: list[Playbook] = []
async for key in redis_client.scan_iter(match=f"{PLAYBOOK_KEY_PREFIX}PB-*", count=100):
data = await redis_client.get(key)
if data:
playbook = Playbook.from_redis_dict(json.loads(data))
if incident_id in playbook.source_incident_ids:
results.append(playbook)
return results
except Exception:
return []
async def _update_indexes(self, playbook: Playbook) -> None:
"""更新 Redis 索引(供快速 symptom 過濾)"""
try:
redis_client = get_redis()
for alert_name in playbook.symptom_pattern.alert_names:
index_key = f"{PLAYBOOK_INDEX_ALERT_PREFIX}{alert_name}"
await redis_client.sadd(index_key, playbook.playbook_id)
await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS)
for service in playbook.symptom_pattern.affected_services:
index_key = f"{PLAYBOOK_INDEX_SERVICE_PREFIX}{service}"
await redis_client.sadd(index_key, playbook.playbook_id)
await redis_client.expire(index_key, PLAYBOOK_TTL_SECONDS)
except Exception as e:
logger.warning("playbook_index_update_failed", error=str(e))
# =============================================================================
# Singleton

View File

@@ -0,0 +1,489 @@
"""
AWOOOI AIOps Phase 4 — Dynamic Baseline Service動態基線服務
=============================================================
職責Holt-Winters 指數平滑,偵測 Prometheus metric 異常偏離
核心 API
is_anomaly(metric_name, current_value) -> AnomalyResult
update_baseline(metric_name, datapoints)
設計原則:
- Shadow ModeAIOPS_P4_SHADOW_MODE=True只記錄不觸發 Alert
- 熔斷保護statsmodels 失敗 → fallback 到滑動平均
- 7 天歷史資料最少訓練量(低於此閾值 → skip不誤判
- 基線持久化到 Rediskey: baseline:{metric_name}TTL 24h
- 訓練在 background worker 執行not in webhook handler
ADR-084: Phase 4 動態異常偵測源頭升級
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立
"""
from __future__ import annotations
import json
import math
from dataclasses import dataclass, field
from typing import Any
import structlog
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ── 常數 ────────────────────────────────────────────────────────────────────
MIN_DATAPOINTS = 168 # 7 天 × 24h 最少樣本數(才能訓練季節性模型)
SIGMA_THRESHOLD = 3.0 # 偏差 ≥ 3σ → 異常
REDIS_TTL_SEC = 86400 # 基線 Redis TTL = 24h
REDIS_KEY_PREFIX = "baseline:"
HISTORY_WINDOW_HOURS = 336 # 保留 14 天歷史
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class MetricDatapoint:
"""單一 metric 時序資料點"""
timestamp: float # Unix epoch
value: float
@dataclass
class BaselineState:
"""Holt-Winters 訓練後的基線狀態Redis 持久化)"""
metric_name: str
mean: float
std: float
seasonal_factors: list[float] = field(default_factory=list) # 24h 週期
last_trained_at: str = ""
datapoint_count: int = 0
def to_dict(self) -> dict[str, Any]:
return {
"metric_name": self.metric_name,
"mean": self.mean,
"std": self.std,
"seasonal_factors": self.seasonal_factors,
"last_trained_at": self.last_trained_at,
"datapoint_count": self.datapoint_count,
}
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "BaselineState":
return cls(
metric_name=d["metric_name"],
mean=d["mean"],
std=d["std"],
seasonal_factors=d.get("seasonal_factors", []),
last_trained_at=d.get("last_trained_at", ""),
datapoint_count=d.get("datapoint_count", 0),
)
@dataclass
class AnomalyResult:
"""is_anomaly() 回傳結果"""
metric_name: str
current_value: float
is_anomaly: bool
deviation_sigma: float # 偏差 σ 數(>3 = 異常)
expected_mean: float
expected_std: float
direction: str = "none" # "up" / "down" / "none"
shadow_mode: bool = True # True = 只記錄,不觸發
reason: str = ""
# ─────────────────────────────────────────────────────────────────────────────
# Main Service
# ─────────────────────────────────────────────────────────────────────────────
class DynamicBaselineService:
"""
動態基線服務
兩大功能:
1. train_baseline() — 從 Prometheus 抓歷史資料,用 Holt-Winters 訓練
2. is_anomaly() — 即時判斷當前值是否偏離基線 ≥ 3σ
"""
async def train_baseline(
self,
metric_name: str,
promql: str,
lookback_hours: int = HISTORY_WINDOW_HOURS,
) -> BaselineState | None:
"""
從 Prometheus 抓取歷史資料並訓練基線。
Args:
metric_name: 基線識別名e.g. "cpu_usage_node_mon"
promql: Prometheus querye.g. "avg(rate(node_cpu_seconds_total[5m]))"
lookback_hours: 歷史視窗(預設 14 天)
Returns:
BaselineState已存 Redis資料不足 → None
"""
try:
datapoints = await self._fetch_prometheus_history(promql, lookback_hours)
if len(datapoints) < MIN_DATAPOINTS:
logger.info(
"baseline_insufficient_data",
metric=metric_name,
count=len(datapoints),
required=MIN_DATAPOINTS,
)
return None
state = self._fit_holt_winters(metric_name, datapoints)
await self._save_baseline(state)
logger.info(
"baseline_trained",
metric=metric_name,
mean=f"{state.mean:.4f}",
std=f"{state.std:.4f}",
datapoints=len(datapoints),
)
return state
except Exception:
logger.exception("baseline_train_failed", metric=metric_name)
return None
async def is_anomaly(
self,
metric_name: str,
current_value: float,
hour_of_day: int | None = None,
) -> AnomalyResult:
"""
即時異常判斷。
Args:
metric_name: 基線識別名
current_value: 當前觀測值
hour_of_day: 當前小時0-23用於套用 seasonal factorNone = 不套用
Returns:
AnomalyResult
"""
from src.core.feature_flags import aiops_flags
shadow_mode = aiops_flags.AIOPS_P4_SHADOW_MODE
try:
state = await self._load_baseline(metric_name)
if state is None:
return AnomalyResult(
metric_name=metric_name,
current_value=current_value,
is_anomaly=False,
deviation_sigma=0.0,
expected_mean=current_value,
expected_std=0.0,
reason="no_baseline_available",
shadow_mode=shadow_mode,
)
# 套用 seasonal factor如果有 24h 週期資料)
expected_mean = state.mean
if hour_of_day is not None and len(state.seasonal_factors) == 24:
expected_mean *= state.seasonal_factors[hour_of_day]
expected_std = state.std if state.std > 0 else 1e-9
deviation = abs(current_value - expected_mean)
sigma = deviation / expected_std
anomaly = sigma >= SIGMA_THRESHOLD
direction = "none"
if anomaly:
direction = "up" if current_value > expected_mean else "down"
result = AnomalyResult(
metric_name=metric_name,
current_value=current_value,
is_anomaly=anomaly,
deviation_sigma=round(sigma, 2),
expected_mean=round(expected_mean, 4),
expected_std=round(expected_std, 4),
direction=direction,
shadow_mode=shadow_mode,
reason=f"deviation {sigma:.1f}σ from baseline" if anomaly else "within_normal_range",
)
if anomaly:
logger.info(
"dynamic_anomaly_detected",
metric=metric_name,
value=current_value,
expected=expected_mean,
sigma=sigma,
direction=direction,
shadow_mode=shadow_mode,
)
return result
except Exception as e:
logger.warning("baseline_anomaly_check_failed", metric=metric_name, error=str(e))
return AnomalyResult(
metric_name=metric_name,
current_value=current_value,
is_anomaly=False,
deviation_sigma=0.0,
expected_mean=0.0,
expected_std=0.0,
reason=f"check_error:{e}",
shadow_mode=shadow_mode,
)
# ──────────────────────────────────────────────────────────────────────────
# Private Helpers
# ──────────────────────────────────────────────────────────────────────────
async def _fetch_prometheus_history(
self,
promql: str,
lookback_hours: int,
) -> list[MetricDatapoint]:
"""從 Prometheus query_range API 抓取歷史資料1h 步進)。"""
import httpx
from src.core.config import settings
end_ts = now_taipei().timestamp()
start_ts = end_ts - lookback_hours * 3600
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(
f"{settings.PROMETHEUS_URL}/api/v1/query_range",
params={
"query": promql,
"start": start_ts,
"end": end_ts,
"step": "3600", # 1h 步進
},
)
resp.raise_for_status()
data = resp.json()
results = data.get("data", {}).get("result", [])
if not results:
return []
# 取第一個 time series
values = results[0].get("values", [])
return [
MetricDatapoint(timestamp=float(ts), value=float(v))
for ts, v in values
if v != "NaN"
]
except Exception as e:
logger.warning("prometheus_history_fetch_failed", error=str(e))
return []
def _fit_holt_winters(
self,
metric_name: str,
datapoints: list[MetricDatapoint],
) -> BaselineState:
"""
用 statsmodels Holt-Winters 訓練基線。
Fallback若 statsmodels 不可用 → 滑動統計。
"""
values = [dp.value for dp in datapoints]
try:
import numpy as np
from statsmodels.tsa.holtwinters import ExponentialSmoothing
arr = np.array(values, dtype=float)
# 確保無 NaN / Inf
arr = arr[np.isfinite(arr)]
if len(arr) < MIN_DATAPOINTS:
return self._fit_simple_stats(metric_name, values)
# Holt-Winters加法趨勢 + 加法季節性24h 週期)
seasonal_periods = min(24, len(arr) // 2)
model = ExponentialSmoothing(
arr,
trend="add",
seasonal="add" if len(arr) >= seasonal_periods * 2 else None,
seasonal_periods=seasonal_periods,
initialization_method="estimated",
).fit(optimized=True, disp=False)
fitted = model.fittedvalues
residuals = arr - fitted
mean_val = float(np.mean(fitted))
std_val = float(np.std(residuals))
# 24h seasonal factors正規化為相對倍數
seasonal_factors = [1.0] * 24
if hasattr(model, "season") and model.season is not None:
s = model.season
if len(s) >= 24:
s_arr = np.array(s[-24:])
# 轉為乘法因子mean-centered
s_mean = abs(np.mean(s_arr)) or 1.0
sf = (s_arr / s_mean).tolist()
seasonal_factors = [max(0.1, min(10.0, f)) for f in sf]
return BaselineState(
metric_name=metric_name,
mean=mean_val,
std=max(std_val, mean_val * 0.01), # 最小 std = 1% mean
seasonal_factors=seasonal_factors,
last_trained_at=now_taipei().isoformat(),
datapoint_count=len(arr),
)
except Exception as e:
logger.warning("holt_winters_failed_fallback_to_stats", error=str(e))
return self._fit_simple_stats(metric_name, values)
def _fit_simple_stats(
self,
metric_name: str,
values: list[float],
) -> BaselineState:
"""Fallback純滑動平均 + 標準差基線。"""
if not values:
return BaselineState(metric_name=metric_name, mean=0.0, std=1.0)
n = len(values)
mean_val = sum(values) / n
variance = sum((v - mean_val) ** 2 for v in values) / n
std_val = math.sqrt(variance)
return BaselineState(
metric_name=metric_name,
mean=mean_val,
std=max(std_val, mean_val * 0.01),
seasonal_factors=[1.0] * 24,
last_trained_at=now_taipei().isoformat(),
datapoint_count=n,
)
async def _save_baseline(self, state: BaselineState, promql: str = "", lookback_hours: int = HISTORY_WINDOW_HOURS) -> None:
"""
儲存基線狀態:
1. 先寫 PostgreSQL永久保存source of truth
2. 再寫 Redis24h warm cache加速讀取
Phase 4 ADR-084 架構鐵律:訓練好的 Holt-Winters 模型不能只存 Redis。
Redis 24h TTL 到期 = AI 每天重新學習「正常」的定義 = 不是在學習。
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 改為 PG source of truth
"""
# 1. 寫入 PostgreSQL主要持久化
await self._pg_upsert_baseline(state, promql, lookback_hours)
# 2. 寫入 Redis warm cache加速讀取到期後從 PG 復原)
try:
from src.core.redis_client import get_redis
r = get_redis()
key = f"{REDIS_KEY_PREFIX}{state.metric_name}"
await r.set(key, json.dumps(state.to_dict()), ex=REDIS_TTL_SEC)
except Exception as e:
logger.warning("baseline_redis_cache_failed", metric=state.metric_name, error=str(e))
async def _load_baseline(self, metric_name: str) -> BaselineState | None:
"""
載入基線Redis-first → miss 時從 PG 載入並回填 Redis。
Phase 4 ADR-084: Redis 只是 warm cachePG 才是 source of truth。
"""
# 1. Redis warm cache hit
try:
from src.core.redis_client import get_redis
r = get_redis()
key = f"{REDIS_KEY_PREFIX}{metric_name}"
data = await r.get(key)
if data is not None:
return BaselineState.from_dict(json.loads(data))
except Exception as e:
logger.warning("baseline_redis_load_failed", metric=metric_name, error=str(e))
# 2. PG fallbackcache miss
state = await self._pg_load_latest_baseline(metric_name)
if state is not None:
# 回填 Redis cache
try:
from src.core.redis_client import get_redis
r = get_redis()
key = f"{REDIS_KEY_PREFIX}{metric_name}"
await r.set(key, json.dumps(state.to_dict()), ex=REDIS_TTL_SEC)
except Exception:
pass # cache 回填失敗不影響讀取
return state
async def _pg_upsert_baseline(self, state: BaselineState, promql: str, lookback_hours: int) -> None:
"""寫入 DynamicBaselineRecord 到 PostgreSQLINSERT不更新舊記錄"""
try:
from src.db.base import get_session_factory
from src.db.models import DynamicBaselineRecord
factory = get_session_factory()
async with factory() as session:
record = DynamicBaselineRecord(
metric_name=state.metric_name,
mean=state.mean,
std=state.std,
seasonal_factors=state.seasonal_factors,
datapoint_count=state.datapoint_count,
promql=promql,
lookback_hours=lookback_hours,
)
session.add(record)
await session.commit()
logger.info("baseline_pg_saved", metric=state.metric_name, datapoints=state.datapoint_count)
except Exception as e:
logger.warning("baseline_pg_save_failed", metric=state.metric_name, error=str(e))
async def _pg_load_latest_baseline(self, metric_name: str) -> BaselineState | None:
"""從 PostgreSQL 載入最新一筆基線記錄"""
try:
from sqlalchemy import select
from src.db.base import get_session_factory
from src.db.models import DynamicBaselineRecord
factory = get_session_factory()
async with factory() as session:
stmt = (
select(DynamicBaselineRecord)
.where(DynamicBaselineRecord.metric_name == metric_name)
.order_by(DynamicBaselineRecord.trained_at.desc())
.limit(1)
)
result = await session.execute(stmt)
record = result.scalar_one_or_none()
if record is None:
return None
return BaselineState(
metric_name=record.metric_name,
mean=record.mean,
std=record.std,
seasonal_factors=record.seasonal_factors,
last_trained_at=record.trained_at.isoformat(),
datapoint_count=record.datapoint_count,
)
except Exception as e:
logger.warning("baseline_pg_load_failed", metric=metric_name, error=str(e))
return None
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_baseline_service: DynamicBaselineService | None = None
def get_dynamic_baseline_service() -> DynamicBaselineService:
global _baseline_service
if _baseline_service is None:
_baseline_service = DynamicBaselineService()
return _baseline_service

View File

@@ -0,0 +1,385 @@
"""
AWOOOI AIOps Phase 4 — Log Anomaly Detector日誌異常偵測
==========================================================
職責Drain3 log clustering即時偵測新 pattern
核心 API
process_log_line(line) -> LogAnomalyEvent | None
get_new_patterns(since_ts) -> list[LogCluster]
設計原則:
- Shadow Mode新 pattern 只記錄 logger.info不觸發 Alert
- 狀態持久化到 Rediscluster tree 序列化JSON
- 熔斷Drain3 失敗 → 僅記錄,不 raise
- 非同步:所有 Redis I/O 非同步Drain3 計算在同步 helper 中執行
ADR-084: Phase 4 動態異常偵測源頭升級
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 初始建立
"""
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass
from typing import Any
import structlog
from src.utils.timezone import now_taipei
logger = structlog.get_logger(__name__)
# ── 常數 ────────────────────────────────────────────────────────────────────
REDIS_KEY_CLUSTERS = "log_anomaly:clusters" # hash: cluster_id → cluster data
REDIS_KEY_NEW_PATTERNS = "log_anomaly:new" # list: 新 pattern 事件(最新在前)
REDIS_TTL_CLUSTERS = 86400 * 7 # 7 天
MAX_NEW_PATTERNS = 200 # 保留最近 200 個新 pattern 事件
DRAIN_DEPTH = 4 # Drain3 tree depth
DRAIN_SIM_THRESHOLD = 0.4 # 相似度 < 此值 → 新 cluster
DRAIN_MAX_CHILDREN = 100 # max children per node
# ─────────────────────────────────────────────────────────────────────────────
# Data Types
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class LogCluster:
"""Drain3 log cluster"""
cluster_id: str
template: str # 模板e.g. "ERROR <*> connection failed to <*>"
size: int = 1 # 命中次數
first_seen_at: str = ""
last_seen_at: str = ""
is_new: bool = False # 首次出現 → True
def to_dict(self) -> dict[str, Any]:
return {
"cluster_id": self.cluster_id,
"template": self.template,
"size": self.size,
"first_seen_at": self.first_seen_at,
"last_seen_at": self.last_seen_at,
}
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "LogCluster":
return cls(
cluster_id=d["cluster_id"],
template=d["template"],
size=d.get("size", 1),
first_seen_at=d.get("first_seen_at", ""),
last_seen_at=d.get("last_seen_at", ""),
)
@dataclass
class LogAnomalyEvent:
"""新 pattern 偵測事件"""
cluster_id: str
template: str
sample_log: str
detected_at: str
shadow_mode: bool = True
source: str = "k8s_pod" # k8s_pod | host_syslog | app_log
# ─────────────────────────────────────────────────────────────────────────────
# Main Service
# ─────────────────────────────────────────────────────────────────────────────
class LogAnomalyDetector:
"""
Drain3 日誌異常偵測服務
工作流程:
1. process_log_line() — 即時 clustering
2. 新 cluster → 記錄到 Redis list
3. ProactiveInspector 定期呼叫 get_new_patterns() 聚合
"""
def __init__(self) -> None:
self._drain: Any = None # lazy-init Drain3 instance
self._initialized = False
def _get_drain(self) -> Any:
"""Lazy-init Drain3避免 import 在啟動時失敗)。"""
if self._drain is not None:
return self._drain
try:
from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig
config = TemplateMinerConfig()
config.drain_depth = DRAIN_DEPTH
config.drain_sim_th = DRAIN_SIM_THRESHOLD
config.drain_max_children = DRAIN_MAX_CHILDREN
config.parametrize_numeric_tokens = True
self._drain = TemplateMiner(config=config)
self._initialized = True
logger.info("drain3_initialized")
return self._drain
except ImportError:
logger.warning("drain3_not_available", reason="package not installed")
return None
except Exception as e:
logger.warning("drain3_init_failed", error=str(e))
return None
async def process_log_line(
self,
log_line: str,
source: str = "k8s_pod",
) -> LogAnomalyEvent | None:
"""
處理單行日誌,若為新 pattern 回傳 LogAnomalyEvent。
Args:
log_line: 原始日誌行
source: 來源標籤
Returns:
LogAnomalyEvent新 pattern或 None已知 pattern
"""
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_LOG_ANOMALY:
return None
shadow_mode = aiops_flags.AIOPS_P4_SHADOW_MODE
try:
drain = self._get_drain()
if drain is None:
return None
# Drain3 clustering同步計算輕量
result = drain.add_log_message(log_line)
if result is None:
return None
cluster = result.get("cluster", None)
if cluster is None:
return None
change_type = result.get("change_type", "none")
is_new = change_type in ("cluster_created", "template_created")
if not is_new:
# 已知 pattern更新 last_seen
await self._update_cluster_hit(str(cluster.cluster_id))
return None
# 新 pattern
template = cluster.get_template()
cluster_id = self._make_cluster_id(template)
now_str = now_taipei().isoformat()
log_cluster = LogCluster(
cluster_id=cluster_id,
template=template,
size=1,
first_seen_at=now_str,
last_seen_at=now_str,
is_new=True,
)
await self._save_new_cluster(log_cluster)
event = LogAnomalyEvent(
cluster_id=cluster_id,
template=template,
sample_log=log_line[:500], # 限制長度
detected_at=now_str,
shadow_mode=shadow_mode,
source=source,
)
logger.info(
"log_new_pattern_detected",
cluster_id=cluster_id,
template=template[:200],
shadow_mode=shadow_mode,
source=source,
)
return event
except Exception as e:
logger.warning("log_anomaly_process_failed", error=str(e))
return None
async def process_pod_logs(
self,
namespace: str = "awoooi-prod",
tail_lines: int = 100,
) -> list[LogAnomalyEvent]:
"""
批次掃描 K8s Pod 日誌(供 ProactiveInspector 呼叫)。
Returns:
新 pattern 事件列表Shadow Mode 時只記錄不觸發)
"""
from src.core.feature_flags import aiops_flags
if not aiops_flags.AIOPS_P4_LOG_ANOMALY:
return []
events: list[LogAnomalyEvent] = []
try:
logs = await self._fetch_pod_logs(namespace, tail_lines)
for line in logs:
if len(line.strip()) < 10:
continue
event = await self.process_log_line(line)
if event:
events.append(event)
except Exception as e:
logger.warning("pod_log_scan_failed", error=str(e))
return events
async def get_recent_new_patterns(
self,
limit: int = 10,
) -> list[dict[str, Any]]:
"""取得最近偵測到的新 pattern供 ProactiveInspector 聚合報告)。"""
try:
from src.core.redis_client import get_redis
r = get_redis()
raw = await r.lrange(REDIS_KEY_NEW_PATTERNS, 0, limit - 1)
return [json.loads(item) for item in raw]
except Exception:
return []
# ──────────────────────────────────────────────────────────────────────────
# Private Helpers
# ──────────────────────────────────────────────────────────────────────────
def _make_cluster_id(self, template: str) -> str:
"""根據模板產生穩定 ID。"""
return hashlib.md5(template.encode()).hexdigest()[:8].upper()
async def _save_new_cluster(self, cluster: LogCluster, sample_log: str = "") -> None:
"""
儲存新 cluster
1. 先寫 PostgreSQL永久保存AI 的 log 語意理解庫)
2. 推送到 Redis list短期工作記憶供 ProactiveInspector 聚合)
Phase 4 ADR-084 架構鐵律Drain3 學到的模板不能只存 Redis。
Redis TTL 到期 = AI 把已知 pattern 再次當成新 pattern = 永遠不會學習。
2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 改為 PG source of truth
"""
# 1. 寫入 PostgreSQL主要持久化UPSERT 防重複)
await self._pg_upsert_cluster(cluster, sample_log)
# 2. 推送到 Redis list短期工作記憶
try:
from src.core.redis_client import get_redis
r = get_redis()
payload = json.dumps({
**cluster.to_dict(),
"detected_at": cluster.first_seen_at,
})
await r.lpush(REDIS_KEY_NEW_PATTERNS, payload)
await r.ltrim(REDIS_KEY_NEW_PATTERNS, 0, MAX_NEW_PATTERNS - 1)
await r.expire(REDIS_KEY_NEW_PATTERNS, REDIS_TTL_CLUSTERS)
except Exception as e:
logger.warning("log_cluster_redis_push_failed", error=str(e))
async def _pg_upsert_cluster(self, cluster: LogCluster, sample_log: str) -> None:
"""
寫入或更新 LogClusterRecordUPSERT on cluster_id
同一 cluster_id 再次出現時只更新 last_seen_at 和 size不重複 INSERT。
"""
try:
from sqlalchemy.dialects.postgresql import insert as pg_insert
from src.db.base import get_session_factory
from src.db.models import LogClusterRecord
from src.utils.timezone import now_taipei
factory = get_session_factory()
async with factory() as session:
stmt = pg_insert(LogClusterRecord).values(
cluster_id=cluster.cluster_id,
template=cluster.template,
size=cluster.size,
source="k8s_pod",
sample_log=sample_log[:500] if sample_log else None,
).on_conflict_do_update(
index_elements=["cluster_id"],
set_={
"size": LogClusterRecord.size + 1,
"last_seen_at": now_taipei(),
},
)
await session.execute(stmt)
await session.commit()
except Exception as e:
logger.warning("log_cluster_pg_upsert_failed", cluster_id=cluster.cluster_id, error=str(e))
async def _update_cluster_hit(self, cluster_id: str) -> None:
"""更新已知 cluster 的命中次數best-effort"""
try:
from src.core.redis_client import get_redis
r = get_redis()
key = f"{REDIS_KEY_CLUSTERS}:{cluster_id}"
await r.hincrby(key, "size", 1)
await r.hset(key, "last_seen_at", now_taipei().isoformat())
except Exception:
pass # best-effort
async def _fetch_pod_logs(
self,
namespace: str,
tail_lines: int,
) -> list[str]:
"""
透過 kubectl API server 抓取 Pod 日誌。
使用 K8s in-cluster configAPI server: https://kubernetes.default.svc
或本地 kubeconfig。
"""
import asyncio
import subprocess
try:
# 抓取 awoooi-api deploy 的日誌(最新 Pod
result = await asyncio.get_event_loop().run_in_executor(
None,
lambda: subprocess.run(
[
"kubectl", "logs",
f"deploy/awoooi-api",
"-n", namespace,
f"--tail={tail_lines}",
"--timestamps=false",
],
capture_output=True,
text=True,
timeout=15,
),
)
if result.returncode == 0:
return result.stdout.splitlines()
logger.warning("kubectl_logs_failed", stderr=result.stderr[:200])
return []
except Exception as e:
logger.warning("pod_log_fetch_failed", error=str(e))
return []
# ─────────────────────────────────────────────────────────────────────────────
# Singleton
# ─────────────────────────────────────────────────────────────────────────────
_detector: LogAnomalyDetector | None = None
def get_log_anomaly_detector() -> LogAnomalyDetector:
global _detector
if _detector is None:
_detector = LogAnomalyDetector()
return _detector