diff --git a/apps/api/src/core/config.py b/apps/api/src/core/config.py
index aa060200..33cb7cc6 100644
--- a/apps/api/src/core/config.py
+++ b/apps/api/src/core/config.py
@@ -103,6 +103,35 @@ class Settings(BaseSettings):
description="C1: True=啟用 km:backfill:dlq 補掃 job(每 5 分鐘), False=停用",
)
+ # ==========================================================================
+ # W2 PR-R2: AOL → alert_rule_catalog Confidence EWMA Writeback
+ # ADR-091 Task T2 — 飛輪斷鏈 C2 修復:規則命中率回灌 catalog confidence
+ # default=false:先寫 code,人工驗證 AOL 資料品質後再開啟
+ # 啟用:kubectl set env deployment/awoooi-api ENABLE_AOL_WRITEBACK_JOB=true
+ # 回滾:kubectl set env deployment/awoooi-api ENABLE_AOL_WRITEBACK_JOB=false
+ # ==========================================================================
+ ENABLE_AOL_WRITEBACK_JOB: bool = Field(
+ default=False,
+ description="W2 PR-R2: True=每小時從 AOL 聚合 alertname 成功率並 EWMA 更新 alert_rule_catalog.confidence, False=停用(預設)",
+ )
+
+ # ==========================================================================
+ # W2 PR-L1: KM → Playbook 互饋回路 (2026-04-28 ogt + Claude Sonnet 4.6)
+ # 飛輪斷鏈 C3 + C4 修復 — KM 與 Playbook 演化互饋
+ # 邏輯 1: promote/demote 觸發 → 寫 KM 演化條目(path_type=playbook_evolution)
+ # 邏輯 2: 同 symptom_pattern_hash 累積 N=5 條 KM → 標記 playbook.review_required=true
+ # 邏輯 3: DEPRECATED Playbook → 降低 alert_rule_catalog.confidence *= 0.5
+ # 回滾指令: kubectl set env deployment/awoooi-api ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=false
+ # ==========================================================================
+ ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP: bool = Field(
+ default=False,
+ description="W2 PR-L1: True=啟用 KM↔Playbook 互饋回路(飛輪 C3+C4 修復), False=停用(default,驗證後才開)",
+ )
+ KM_PLAYBOOK_REVIEW_THRESHOLD: int = Field(
+ default=5,
+ description="W2 PR-L1: 同 symptom_pattern_hash 累積幾條 KM 後觸發 Playbook review_required 標記(預設 N=5)",
+ )
+
# ==========================================================================
# aider-watch v2 integration (2026-04-20 ADR-091)
# 整合 Mac aider CLI 監控進 awoooi 飛輪(events → incident → ai_router feedback)
@@ -575,6 +604,16 @@ class Settings(BaseSettings):
description="P3.1-T2-PathA: 啟用 DiagnosisAggregator 信號分類層補 PDI(路徑 A:不重複收集,只分類已有 raw 資料)",
)
+ # ==========================================================================
+ # W2 PR-V1: SelfHealingValidator Feature Flag (2026-04-28 ogt + Claude Sonnet 4.6)
+ # 飛輪斷鏈 C6 修復 — 驗證層串接自愈品質評估
+ # 回滾指令: kubectl set env deployment/awoooi-api ENABLE_SELF_HEALING_VALIDATOR=false
+ # ==========================================================================
+ ENABLE_SELF_HEALING_VALIDATOR: bool = Field(
+ default=False,
+ description="W2 PR-V1: True=PostExecutionVerifier 執行後評估自愈品質分數(score<0.5發Telegram警示), False=跳過(回滾用)",
+ )
+
def get_tg_user_whitelist(self) -> list[int]:
"""Parse comma-separated or JSON array user IDs to list[int]"""
raw = self.OPENCLAW_TG_USER_WHITELIST
diff --git a/apps/api/src/db/base.py b/apps/api/src/db/base.py
index be8ae2dc..feebc32c 100644
--- a/apps/api/src/db/base.py
+++ b/apps/api/src/db/base.py
@@ -220,6 +220,17 @@ async def init_db() -> None:
""")
)
+ # W2 PR-V1: SelfHealingValidator 補欄 (2026-04-28 ogt + Claude Sonnet 4.6)
+ # incident_evidence 加 self_healing_score + self_healing_detail
+ # create_all 不做 ALTER,防禦性補加(prod 已存在的表不會自動加欄)
+ await conn.execute(
+ text("""
+ ALTER TABLE incident_evidence
+ ADD COLUMN IF NOT EXISTS self_healing_score FLOAT,
+ ADD COLUMN IF NOT EXISTS self_healing_detail JSONB;
+ """)
+ )
+
# 2026-04-29 ogt + Claude Opus 4.7: PR-K1 防禦性 ALTER (db-expert finding)
# P1.6 (2026-04-24) ORM 已加 timeline_events.incident_id,但 prod 若在 P1.6 前
# 已建表,create_all 跳過已存在的表 → ALTER 不會跑 → ORM 寫入 SELECT 找不到欄位
@@ -230,6 +241,20 @@ async def init_db() -> None:
ADD COLUMN IF NOT EXISTS incident_id VARCHAR(64);
""")
)
+
+ # W2 PR-L1 2026-04-28 ogt + Claude Sonnet 4.6: KM→Playbook 互饋回路(飛輪 C3 修復)
+ # PlaybookRecord 新增 review_required 欄位
+ # 已存在表不會被 create_all 重建,必須手動 ALTER
+ await conn.execute(
+ text("""
+ ALTER TABLE playbooks
+ ADD COLUMN IF NOT EXISTS review_required BOOLEAN NOT NULL DEFAULT FALSE;
+ """)
+ )
+ await conn.execute(text(
+ "CREATE INDEX IF NOT EXISTS ix_playbook_review_required "
+ "ON playbooks(review_required) WHERE review_required = true;"
+ ))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_timeline_incident_id "
"ON timeline_events(incident_id);"
diff --git a/apps/api/src/db/models.py b/apps/api/src/db/models.py
index 16731554..65fec351 100644
--- a/apps/api/src/db/models.py
+++ b/apps/api/src/db/models.py
@@ -932,6 +932,20 @@ class IncidentEvidence(Base):
String(20), nullable=True, comment="success / degraded / failed / timeout(PostExecutionVerifier 填入)"
)
+ # W2 PR-V1: SelfHealingValidator 自愈品質分數 (2026-04-28 ogt + Claude Sonnet 4.6)
+ # 0.0-1.0:1.0=完全自愈,<0.5=觸發 rollback 提案(Telegram 警示)
+ # base.py ALTER IF NOT EXISTS 補欄對應下方
+ self_healing_score: Mapped[float | None] = mapped_column(
+ Float,
+ nullable=True,
+ comment="W2 PR-V1 SelfHealingValidator 自愈品質分數(0.0-1.0),<0.5 觸發 rollback 提案",
+ )
+ self_healing_detail: Mapped[dict | None] = mapped_column(
+ JSON,
+ nullable=True,
+ comment="W2 PR-V1 SelfHealingValidator 評估明細:root_cause_cleared/regressions/detail",
+ )
+
# 時間戳(台北時區)
collected_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=taipei_now, nullable=False
@@ -1017,6 +1031,14 @@ class PlaybookRecord(Base):
stateful_targets: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False)
requires_pre_backup: Mapped[bool] = mapped_column(default=False, nullable=False)
+ # W2 PR-L1 2026-04-28 ogt + Claude Sonnet 4.6: KM→Playbook 互饋回路(飛輪 C3 修復)
+ # 同 symptom_pattern_hash 累積 N=5 條 KM 後,LearningService 自動設 True
+ # 人工 review 後可重設為 False(由 playbook_service 負責清除)
+ review_required: Mapped[bool] = mapped_column(
+ Boolean, default=False, nullable=False,
+ comment="W2 PR-L1: True=KM 累積觸發人工複審信號(symptom_hash≥5 條),review 後清為 False",
+ )
+
# Timestamps
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now,
@@ -1026,6 +1048,12 @@ class PlaybookRecord(Base):
Index("ix_playbook_status", "status"),
Index("ix_playbook_trust_score", "trust_score"),
Index("ix_playbook_created_at", "created_at"),
+ # W2 PR-L1: 快速查詢需要人工 review 的 Playbook(預期數量少,partial index 最省空間)
+ Index(
+ "ix_playbook_review_required",
+ "review_required",
+ postgresql_where=text("review_required = true"),
+ ),
)
diff --git a/apps/api/src/jobs/aol_to_catalog_writeback_job.py b/apps/api/src/jobs/aol_to_catalog_writeback_job.py
new file mode 100644
index 00000000..5b88d26a
--- /dev/null
+++ b/apps/api/src/jobs/aol_to_catalog_writeback_job.py
@@ -0,0 +1,387 @@
+"""
+AOL → alert_rule_catalog Confidence EWMA Writeback Job
+========================================================
+ADR-091 Task T2 — 飛輪斷鏈 C2 修復
+
+每 1 小時從 automation_operation_log 聚合 alertname 執行成功率,
+用 EWMA 回灌 alert_rule_catalog.confidence,並對低成功率規則標記 'draft'
+(等待人工審查)。
+
+流程:
+ 1. 撈 AOL 過去 24h,group by alertname,算 ok/total
+ 2. EWMA: new_confidence = 0.7 * old_confidence + 0.3 * recent_success_rate
+ 若 confidence IS NULL,初始值用 recent_success_rate
+ 3. recent_success_rate < 0.3 且 sample >= 5 → review_status = 'draft'
+ (schema CHECK 只允許 draft/approved/deprecated/retired,'draft' 語義
+ 等同「需要人工審查」,Hermes 可設定撈 draft 觸發 advisory)
+ 4. 寫 automation_operation_log summary
+
+Feature Flag:
+ ENABLE_AOL_WRITEBACK_JOB=false(預設)— 先寫 code 後人工驗證才開啟
+
+設計鐵律:
+ - ENABLE_AOL_WRITEBACK_JOB=false 時完全 skip,不碰 DB
+ - 只 UPDATE,不 INSERT(alert_rule_catalog 必須先由 rule_catalog_sync 建立)
+ - EWMA alpha=0.3(新資料權重),穩定性優先
+ - sample < 5 → 不降 review_status(避免少量資料誤判)
+ - 任何 DB 失敗 → log warning,下次重試,不 crash 主程序
+
+排程:
+ - 啟動延遲 360s(等 rule_catalog_sync + rule_stats_updater 先跑)
+ - 每 3600s(每 1 小時)
+
+schema 依賴(已存在,不需要 migration):
+ - alert_rule_catalog.confidence NUMERIC(3,2) — 行 279 adr090_asset_inventory_foundation.sql
+ - alert_rule_catalog.review_status TEXT CHECK (draft/approved/deprecated/retired)
+ - automation_operation_log.input, .status, .tags, .operation_type
+
+W2 PR-R2 2026-04-28 ogt + Claude Sonnet 4.6 Asia/Taipei
+ADR-091 Task T2 飛輪斷鏈 C2 修復 — AOL 命中率回灌
+"""
+from __future__ import annotations
+
+import asyncio
+import json as _json
+import time as _time
+from typing import Any
+
+import structlog
+
+from src.core.config import settings
+
+logger = structlog.get_logger(__name__)
+
+# ============================================================================
+# 排程參數
+# ============================================================================
+_WRITEBACK_INTERVAL_SEC = 3600 # 每 1 小時
+_FIRST_DELAY_SEC = 360 # 啟動後等 360s(讓 rule_catalog_sync + rule_stats 先完成)
+_LOOP_BACKOFF_SEC = 300 # 錯誤後重試間隔
+_AOL_WINDOW_HOURS = 24 # 聚合 AOL 的時間窗口
+
+# EWMA 參數
+_EWMA_ALPHA = 0.3 # 新資料權重 (0.3 = 保守更新)
+_LOW_SUCCESS_THRESHOLD = 0.3 # 低成功率閾值
+_MIN_SAMPLE_SIZE = 5 # 樣本不足不降 review_status
+
+# AOL 操作類型白名單(只計「真正執行了操作」的 log)
+_RELEVANT_OP_TYPES = (
+ "alert_resolved",
+ "action_executed",
+ "auto_repair_success",
+ "auto_repair_failed",
+ "auto_repair_skipped",
+)
+
+# review_status 降級用值(schema CHECK 允許的值中語義最接近「需要人工審查」)
+_NEEDS_REVIEW_STATUS = "draft"
+
+
+# ============================================================================
+# Public entry — main.py lifespan 呼叫
+# ============================================================================
+
+async def run_aol_writeback_loop() -> None:
+ """
+ 永久迴圈:每 _WRITEBACK_INTERVAL_SEC 秒執行一次 AOL → catalog 回灌.
+
+ Feature Flag ENABLE_AOL_WRITEBACK_JOB=false 時,進入迴圈但每次 sleep 後立即 skip.
+ """
+ logger.info(
+ "aol_to_catalog_writeback_loop_started",
+ interval_sec=_WRITEBACK_INTERVAL_SEC,
+ flag_enabled=settings.ENABLE_AOL_WRITEBACK_JOB,
+ )
+ await asyncio.sleep(_FIRST_DELAY_SEC)
+
+ while True:
+ try:
+ await run_aol_writeback_once()
+ except Exception as e:
+ logger.exception("aol_writeback_loop_error", error=str(e))
+ await asyncio.sleep(_LOOP_BACKOFF_SEC)
+ continue
+ await asyncio.sleep(_WRITEBACK_INTERVAL_SEC)
+
+
+async def run_aol_writeback_once() -> dict[str, Any]:
+ """
+ 執行一次 AOL → alert_rule_catalog confidence EWMA 回灌.
+
+ Returns:
+ {
+ "skipped": True/False, # feature flag 停用時回傳 skipped=True
+ "rules_sampled": N, # AOL 中找到的 alertname 數
+ "rules_updated": M, # 成功 UPDATE confidence 的 rule 數
+ "rules_flagged_draft": K, # 低成功率被標 draft 的 rule 數
+ "error": None | str,
+ }
+ """
+ if not settings.ENABLE_AOL_WRITEBACK_JOB:
+ logger.debug("aol_writeback_skipped_flag_disabled")
+ return {"skipped": True, "rules_sampled": 0, "rules_updated": 0, "rules_flagged_draft": 0, "error": None}
+
+ started_ms = _time.time()
+ stats: dict[str, Any] = {
+ "skipped": False,
+ "rules_sampled": 0,
+ "rules_updated": 0,
+ "rules_flagged_draft": 0,
+ "error": None,
+ }
+
+ try:
+ samples = await _fetch_aol_samples()
+ stats["rules_sampled"] = len(samples)
+
+ for sample in samples:
+ updated, flagged = await _update_catalog_confidence(sample)
+ if updated:
+ stats["rules_updated"] += 1
+ if flagged:
+ stats["rules_flagged_draft"] += 1
+
+ except Exception as e:
+ stats["error"] = f"{type(e).__name__}: {e}"[:1000]
+ logger.exception("aol_writeback_once_failed", error=stats["error"])
+
+ duration_ms = int((_time.time() - started_ms) * 1000)
+ await _log_aol_summary(stats, duration_ms)
+
+ logger.info(
+ "aol_writeback_once_done",
+ rules_sampled=stats["rules_sampled"],
+ rules_updated=stats["rules_updated"],
+ rules_flagged_draft=stats["rules_flagged_draft"],
+ duration_ms=duration_ms,
+ error=stats["error"],
+ )
+ return stats
+
+
+# ============================================================================
+# 資料查詢
+# ============================================================================
+
+async def _fetch_aol_samples() -> list[dict[str, Any]]:
+ """
+ 從 automation_operation_log 聚合過去 24h 的 alertname 成功率.
+
+ 查詢條件:
+ - operation_type IN (alert_resolved, action_executed, auto_repair_*)
+ - tags @> '["auto_execute"]'
+ - created_at > NOW() - INTERVAL '24h'
+
+ 回傳: [{alertname, ok, total, recent_success_rate}, ...]
+ """
+ from sqlalchemy import text as _sql
+ from src.db.base import get_db_context
+
+ # 動態拼 operation_type IN 清單(parameterized,避免 SQL 注入)
+ # SQLAlchemy bindparam 不支援 INTERVAL 的數字插值,INTERVAL 用 f-string literal 拼接
+ op_list = list(_RELEVANT_OP_TYPES)
+ placeholders = ", ".join(f":op{i}" for i in range(len(op_list)))
+ params: dict[str, Any] = {f"op{i}": v for i, v in enumerate(op_list)}
+
+ sql = f"""
+ SELECT
+ input->>'alertname' AS alertname,
+ SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS ok,
+ COUNT(*) AS total
+ FROM automation_operation_log
+ WHERE operation_type IN ({placeholders})
+ AND tags @> '["auto_execute"]'
+ AND created_at > NOW() - INTERVAL '{_AOL_WINDOW_HOURS} hours'
+ AND input->>'alertname' IS NOT NULL
+ AND input->>'alertname' != ''
+ GROUP BY input->>'alertname'
+ HAVING COUNT(*) > 0
+ ORDER BY COUNT(*) DESC
+ """
+
+ try:
+ async with get_db_context() as db:
+ result = await db.execute(_sql(sql), params)
+ rows = result.fetchall()
+
+ samples = []
+ for row in rows:
+ alertname = row.alertname
+ ok = int(row.ok or 0)
+ total = int(row.total or 0)
+ recent_success_rate = ok / total if total > 0 else 0.0
+ samples.append({
+ "alertname": alertname,
+ "ok": ok,
+ "total": total,
+ "recent_success_rate": recent_success_rate,
+ })
+ return samples
+ except Exception as e:
+ logger.warning("aol_fetch_samples_failed", error=str(e))
+ return []
+
+
+# ============================================================================
+# EWMA 更新
+# ============================================================================
+
+async def _update_catalog_confidence(sample: dict[str, Any]) -> tuple[bool, bool]:
+ """
+ 對單一 alertname 執行 EWMA 更新 + 低成功率降級.
+
+ Returns:
+ (updated: bool, flagged_draft: bool)
+ updated = True 表示 confidence 有被更新
+ flagged_draft = True 表示 review_status 被設為 'draft'
+ """
+ from sqlalchemy import text as _sql
+ from src.db.base import get_db_context
+
+ alertname = sample["alertname"]
+ recent_sr = sample["recent_success_rate"]
+ total = sample["total"]
+
+ try:
+ async with get_db_context() as db:
+ # 先讀現有 confidence 與 review_status
+ row = await db.execute(
+ _sql("""
+ SELECT rule_id, confidence, review_status
+ FROM alert_rule_catalog
+ WHERE rule_name = :rn
+ LIMIT 1
+ """),
+ {"rn": alertname},
+ )
+ existing = row.one_or_none()
+ if existing is None:
+ # rule 不在 catalog → skip(等 rule_catalog_sync 先建)
+ logger.debug("aol_writeback_rule_not_in_catalog", alertname=alertname)
+ return False, False
+
+ old_confidence = float(existing.confidence) if existing.confidence is not None else None
+ current_review_status = existing.review_status
+
+ # EWMA 計算
+ if old_confidence is None:
+ new_confidence = recent_sr
+ else:
+ new_confidence = (1 - _EWMA_ALPHA) * old_confidence + _EWMA_ALPHA * recent_sr
+
+ # 限制到 [0.00, 1.00](NUMERIC(3,2) 最大 9.99,但 confidence 語義 0-1)
+ new_confidence = max(0.0, min(1.0, new_confidence))
+
+ # 判斷是否需要降級 review_status
+ should_flag = (
+ recent_sr < _LOW_SUCCESS_THRESHOLD
+ and total >= _MIN_SAMPLE_SIZE
+ and current_review_status not in (_NEEDS_REVIEW_STATUS, "deprecated", "retired")
+ )
+
+ if should_flag:
+ await db.execute(
+ _sql("""
+ UPDATE alert_rule_catalog
+ SET confidence = :conf,
+ review_status = :rs,
+ updated_at = NOW()
+ WHERE rule_name = :rn
+ """),
+ {
+ "conf": round(new_confidence, 2),
+ "rs": _NEEDS_REVIEW_STATUS,
+ "rn": alertname,
+ },
+ )
+ logger.info(
+ "aol_writeback_flagged_draft",
+ alertname=alertname,
+ old_confidence=old_confidence,
+ new_confidence=round(new_confidence, 2),
+ recent_sr=round(recent_sr, 3),
+ sample=total,
+ )
+ return True, True
+ else:
+ await db.execute(
+ _sql("""
+ UPDATE alert_rule_catalog
+ SET confidence = :conf,
+ updated_at = NOW()
+ WHERE rule_name = :rn
+ """),
+ {
+ "conf": round(new_confidence, 2),
+ "rn": alertname,
+ },
+ )
+ logger.debug(
+ "aol_writeback_confidence_updated",
+ alertname=alertname,
+ old_confidence=old_confidence,
+ new_confidence=round(new_confidence, 2),
+ recent_sr=round(recent_sr, 3),
+ sample=total,
+ )
+ return True, False
+
+ except Exception as e:
+ logger.warning(
+ "aol_writeback_update_failed",
+ alertname=alertname,
+ error=str(e),
+ )
+ return False, False
+
+
+# ============================================================================
+# AOL 稽核 log
+# ============================================================================
+
+async def _log_aol_summary(stats: dict[str, Any], duration_ms: int) -> None:
+ """寫一筆 summary 到 automation_operation_log(每次 writeback 只寫 1 筆)."""
+ if stats.get("skipped"):
+ return # flag 停用時不留 log,避免污染
+
+ try:
+ from sqlalchemy import text as _sql
+ from src.db.base import get_db_context
+
+ aol_status = "failed" if stats.get("error") else "success"
+ async with get_db_context() as db:
+ await db.execute(
+ _sql("""
+ INSERT INTO automation_operation_log (
+ operation_type, actor, status,
+ input, output, duration_ms, error, tags
+ ) VALUES (
+ 'rule_updated',
+ 'aol_to_catalog_writeback',
+ :st,
+ CAST(:input AS jsonb),
+ CAST(:output AS jsonb),
+ :dur, :err, :tags
+ )
+ """),
+ {
+ "st": aol_status,
+ "input": _json.dumps(
+ {"window_hours": _AOL_WINDOW_HOURS, "ewma_alpha": _EWMA_ALPHA},
+ ensure_ascii=False,
+ ),
+ "output": _json.dumps(
+ {
+ "rules_sampled": stats.get("rules_sampled", 0),
+ "rules_updated": stats.get("rules_updated", 0),
+ "rules_flagged_draft": stats.get("rules_flagged_draft", 0),
+ },
+ ensure_ascii=False,
+ ),
+ "dur": duration_ms,
+ "err": (stats.get("error") or "")[:2000] if stats.get("error") else None,
+ "tags": ["rule_catalog", "aol_writeback", "ewma", "confidence"],
+ },
+ )
+ except Exception as e:
+ logger.warning("aol_writeback_log_aol_failed", error=str(e))
diff --git a/apps/api/src/jobs/hermes_rule_quality_job.py b/apps/api/src/jobs/hermes_rule_quality_job.py
index d721ffd5..b70a51ca 100644
--- a/apps/api/src/jobs/hermes_rule_quality_job.py
+++ b/apps/api/src/jobs/hermes_rule_quality_job.py
@@ -196,7 +196,12 @@ async def _llm_analyze_noisy_rule(rule: dict[str, Any]) -> dict[str, Any] | None
# ============================================================================
async def _fetch_noisy_rules() -> list[dict[str, Any]]:
- """撈 noise_rate >= 0.7 且樣本 >= 5 的 rules."""
+ """撈 noise_rate >= 0.7 且樣本 >= 5 的 rules,或 AOL writeback 標記 draft 的 rules.
+
+ W2 PR-R2 2026-04-28 ogt + Claude Sonnet 4.6: 加 OR review_status = 'draft' 條件
+ 讓 AOL writeback 觸發的 draft 規則能被 Hermes 自動推 Telegram 建議
+ (不再卡人工 SQL 才能觸發 advisory)
+ """
from sqlalchemy import text as _sql
from src.db.base import get_db_context
@@ -209,10 +214,16 @@ async def _fetch_noisy_rules() -> list[dict[str, Any]]:
true_positive_count, false_positive_count, noise_rate,
last_fired_at, review_status
FROM alert_rule_catalog
- WHERE noise_rate >= :thr
- AND (true_positive_count + false_positive_count) >= :min_sample
- AND (review_status IS NULL OR review_status = 'approved')
- ORDER BY noise_rate DESC, (true_positive_count + false_positive_count) DESC
+ WHERE (
+ (
+ noise_rate >= :thr
+ AND (true_positive_count + false_positive_count) >= :min_sample
+ AND (review_status IS NULL OR review_status = 'approved')
+ )
+ OR review_status = 'draft'
+ )
+ ORDER BY noise_rate DESC NULLS LAST,
+ (true_positive_count + false_positive_count) DESC
"""),
{"thr": _NOISE_THRESHOLD, "min_sample": _MIN_SAMPLE_SIZE},
)
diff --git a/apps/api/src/main.py b/apps/api/src/main.py
index a1cc6582..a35d7998 100644
--- a/apps/api/src/main.py
+++ b/apps/api/src/main.py
@@ -519,6 +519,17 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("km_backfill_reconciler_loop_schedule_failed", error=str(e))
+ # W2 PR-R2 2026-04-28 ogt + Claude Sonnet 4.6: AOL → alert_rule_catalog EWMA Writeback(每 1 小時)
+ # 飛輪斷鏈 C2 修復:automation_operation_log 執行結果回灌 alert_rule_catalog.confidence
+ # Feature Flag: ENABLE_AOL_WRITEBACK_JOB=false 預設停用(人工驗證後再開)
+ # ADR-091 Task T2
+ try:
+ from src.jobs.aol_to_catalog_writeback_job import run_aol_writeback_loop
+ asyncio.create_task(run_aol_writeback_loop())
+ logger.info("aol_to_catalog_writeback_loop_scheduled", interval_sec=3600)
+ except Exception as e:
+ logger.warning("aol_to_catalog_writeback_loop_schedule_failed", error=str(e))
+
# ADR-087 Phase 6: KB 腐爛清理(月度)— 每月 1 號 03:00 台北時間
# 掃描 knowledge_entries 中腐爛條目(廢棄 K8s API / Prometheus pattern / 180d 未引用)
# 2026-04-27 P3.1-T3 by Claude
diff --git a/apps/api/src/services/evidence_snapshot.py b/apps/api/src/services/evidence_snapshot.py
index 8e615ff9..7d10ebfd 100644
--- a/apps/api/src/services/evidence_snapshot.py
+++ b/apps/api/src/services/evidence_snapshot.py
@@ -110,6 +110,11 @@ class EvidenceSnapshot:
post_execution_state: dict[str, Any] | None = None
verification_result: str | None = None
+ # W2 PR-V1: SelfHealingValidator 自愈品質評估 (2026-04-28 ogt + Claude Sonnet 4.6)
+ # ENABLE_SELF_HEALING_VALIDATOR=false 時永 None
+ self_healing_score: float | None = None
+ self_healing_detail: dict[str, Any] | None = None
+
# Phase 3 填充(目前永 null)
matched_playbook_id: str | None = None
@@ -292,6 +297,55 @@ class EvidenceSnapshot:
)
raise
+ async def update_self_healing(
+ self,
+ score: float,
+ detail: dict[str, Any],
+ ) -> None:
+ """
+ W2 PR-V1: SelfHealingValidator 評估結果補填。
+
+ 在 PostExecutionVerifier.verify() 完成 update_post_execution() 之後呼叫。
+ 僅在 ENABLE_SELF_HEALING_VALIDATOR=True 且 snapshot 已持久化時有效。
+
+ Args:
+ score: 自愈品質分數(0.0-1.0)
+ detail: SelfHealingValidator.assess_self_healing() 返回的明細 dict
+ 2026-04-28 ogt + Claude Sonnet 4.6: W2 PR-V1 初始建立
+ """
+ self.self_healing_score = score
+ self.self_healing_detail = detail
+
+ try:
+ async with get_db_context() as db:
+ stmt_result = await db.execute(
+ update(IncidentEvidence)
+ .where(IncidentEvidence.id == self.snapshot_id)
+ .values(
+ self_healing_score=score,
+ self_healing_detail=detail,
+ )
+ )
+
+ if stmt_result.rowcount < 1:
+ logger.warning(
+ "evidence_snapshot_self_healing_update_no_rows",
+ snapshot_id=self.snapshot_id,
+ score=score,
+ )
+ else:
+ logger.info(
+ "evidence_snapshot_self_healing_updated",
+ snapshot_id=self.snapshot_id,
+ score=score,
+ )
+ except Exception:
+ logger.exception(
+ "evidence_snapshot_self_healing_update_error",
+ snapshot_id=self.snapshot_id,
+ )
+ raise
+
async def get_latest_snapshot(incident_id: str) -> EvidenceSnapshot | None:
"""
diff --git a/apps/api/src/services/learning_service.py b/apps/api/src/services/learning_service.py
index 52569014..fd337fda 100644
--- a/apps/api/src/services/learning_service.py
+++ b/apps/api/src/services/learning_service.py
@@ -389,13 +389,40 @@ class LearningService:
playbook_id: str,
success: bool,
) -> None:
- """更新 Playbook 統計"""
+ """
+ 更新 Playbook 統計
+
+ W2 PR-L1: 統計更新後,取 Playbook 的 symptom_pattern hash 觸發邏輯 2
+ (KM 累積門檻檢查 → review_required 標記)。
+ """
try:
from src.services.playbook_service import get_playbook_service
service = get_playbook_service()
await service.record_execution(playbook_id, success)
+ # W2 PR-L1 邏輯 2: 取得 Playbook symptom_pattern hash,觸發 KM 累積檢查
+ from src.core.config import settings
+ if settings.ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP:
+ try:
+ from src.repositories.playbook_repository import get_playbook_repository
+ from src.models.playbook import SymptomPattern
+ repo = get_playbook_repository()
+ playbook = await repo.get_by_id(playbook_id)
+ if playbook and playbook.symptom_pattern:
+ sp = playbook.symptom_pattern
+ # symptom_pattern 可能是 Pydantic model 或 dict(ORM 載入)
+ if isinstance(sp, dict):
+ sp = SymptomPattern.model_validate(sp)
+ symptoms_hash = sp.compute_hash()
+ await self._check_and_mark_playbook_review(symptoms_hash)
+ except Exception as inner_e:
+ logger.warning(
+ "playbook_review_check_failed",
+ playbook_id=playbook_id,
+ error=str(inner_e),
+ )
+
except Exception as e:
logger.warning(
"playbook_stats_update_error",
@@ -459,6 +486,7 @@ class LearningService:
- 尋找 source_incident_ids 包含此 incident_id 的 Playbooks
- 提升 ai_confidence +0.1 (上限 1.0)
- 若信心度 >= 0.9 且 status == DRAFT → 自動升級為 APPROVED
+ - W2 PR-L1: 寫 KM 演化條目(ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP 開啟時)
"""
try:
from src.repositories.playbook_repository import get_playbook_repository
@@ -478,6 +506,7 @@ class LearningService:
updated_count = 0
for playbook in playbooks:
+ previous_trust = playbook.trust_score
result = await repo.adjust_confidence(
playbook_id=playbook.playbook_id,
delta=CONFIDENCE_BOOST,
@@ -485,6 +514,13 @@ class LearningService:
)
if result:
updated_count += 1
+ # W2 PR-L1: promote 觸發 → 寫 KM 演化條目
+ await self._write_playbook_evolution_km(
+ playbook=playbook,
+ previous_trust=previous_trust,
+ evolution_type="promote",
+ incident_id=incident_id,
+ )
logger.info(
"playbook_promoted",
@@ -513,6 +549,7 @@ class LearningService:
- 尋找 source_incident_ids 包含此 incident_id 的 Playbooks
- 降低 ai_confidence -0.15 (下限 0.0)
- 若信心度 < 0.3 且 failure_rate > 50% → 自動降級為 DEPRECATED
+ - W2 PR-L1: 寫 KM 演化條目;DEPRECATED 時回灌 alert_rule_catalog(飛輪 C4 修復)
"""
try:
from src.repositories.playbook_repository import get_playbook_repository
@@ -532,6 +569,7 @@ class LearningService:
updated_count = 0
for playbook in playbooks:
+ previous_trust = playbook.trust_score
result = await repo.adjust_confidence(
playbook_id=playbook.playbook_id,
delta=CONFIDENCE_PENALTY,
@@ -539,6 +577,17 @@ class LearningService:
)
if result:
updated_count += 1
+ # W2 PR-L1: demote 觸發 → 寫 KM 演化條目
+ await self._write_playbook_evolution_km(
+ playbook=playbook,
+ previous_trust=previous_trust,
+ evolution_type="demote",
+ incident_id=incident_id,
+ )
+ # W2 PR-L1 邏輯 3: DEPRECATED 時回灌 alert_rule_catalog(飛輪 C4 修復)
+ from src.models.playbook import PlaybookStatus
+ if playbook.status == PlaybookStatus.DEPRECATED:
+ await self._demote_alert_rule_catalog_confidence(playbook)
logger.info(
"playbook_demoted",
@@ -557,6 +606,241 @@ class LearningService:
)
return False
+ # =========================================================================
+ # W2 PR-L1: KM → Playbook 互饋回路私有方法
+ # 飛輪斷鏈 C3 + C4 修復
+ # 2026-04-28 ogt + Claude Sonnet 4.6
+ # =========================================================================
+
+ async def _write_playbook_evolution_km(
+ self,
+ playbook: Any,
+ previous_trust: float,
+ evolution_type: str,
+ incident_id: str,
+ ) -> None:
+ """
+ 邏輯 1: promote/demote 觸發 → 寫 KM 演化條目(飛輪 C3)
+
+ KM 條目 metadata 含:playbook_id, previous_trust, new_trust,
+ success_count, failure_count, decision_chain
+ path_type='playbook_evolution',供冪等 key 使用
+ (incident_id, path_type) = (incident_id, 'playbook_evolution') 可能重複,
+ 但 playbook_id 不同的演化各自獨立,所以 path_type 加 playbook_id 作為識別。
+ """
+ from src.core.config import settings
+ if not settings.ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP:
+ return
+
+ try:
+ import json
+ from src.services.km_writer import KMWritePayload, km_write_with_flag
+ from src.utils.timezone import now_taipei
+
+ new_trust = getattr(playbook, "trust_score", previous_trust)
+ success_count = getattr(playbook, "success_count", 0)
+ failure_count = getattr(playbook, "failure_count", 0)
+
+ path_type = f"playbook_evolution:{playbook.playbook_id}"
+
+ payload = KMWritePayload(
+ path_type=path_type,
+ incident_id=incident_id,
+ entry_create_kwargs={
+ "title": f"Playbook {evolution_type}: {playbook.name} [{playbook.playbook_id}]",
+ "content": (
+ f"Playbook {evolution_type} 事件記錄\n"
+ f"Playbook ID: {playbook.playbook_id}\n"
+ f"名稱: {playbook.name}\n"
+ f"trust_score 變化: {previous_trust:.3f} → {new_trust:.3f}\n"
+ f"成功次數: {success_count} / 失敗次數: {failure_count}\n"
+ f"觸發來源: incident {incident_id}\n"
+ f"記錄時間: {now_taipei().isoformat()}"
+ ),
+ "entry_type": "best_practice",
+ "category": "AI系統",
+ "tags": ["playbook_evolution", evolution_type, playbook.playbook_id],
+ "source": "ai_extracted",
+ "related_playbook_id": playbook.playbook_id,
+ "related_incident_id": incident_id,
+ "path_type": path_type,
+ },
+ metadata={
+ "playbook_id": playbook.playbook_id,
+ "previous_trust": previous_trust,
+ "new_trust": new_trust,
+ "success_count": success_count,
+ "failure_count": failure_count,
+ "evolution_type": evolution_type,
+ },
+ )
+ await km_write_with_flag(payload)
+ logger.info(
+ "playbook_evolution_km_written",
+ playbook_id=playbook.playbook_id,
+ evolution_type=evolution_type,
+ trust_change=f"{previous_trust:.3f} → {new_trust:.3f}",
+ )
+ except Exception as e:
+ logger.warning(
+ "playbook_evolution_km_write_failed",
+ playbook_id=getattr(playbook, "playbook_id", "unknown"),
+ evolution_type=evolution_type,
+ error=str(e),
+ )
+
+ async def _check_and_mark_playbook_review(self, symptoms_hash: str) -> None:
+ """
+ 邏輯 2: KM 累積 N=5 條同 symptom_pattern_hash → 觸發 Playbook review_required 標記(飛輪 C3)
+
+ 每次 KM 寫入後由 _update_playbook_stats 呼叫端觸發此檢查。
+ 若同 symptoms_hash 在 knowledge_entries 已有 >= threshold 條,
+ 則 UPDATE playbooks SET review_required=true WHERE 症狀 hash 相符。
+
+ 比對策略:從 KnowledgeEntry 讀 symptoms_hash 計數,
+ 再透過 playbook.symptom_pattern 的 hash 比對 Playbook。
+ """
+ from src.core.config import settings
+ if not settings.ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP:
+ return
+ if not symptoms_hash:
+ return
+
+ try:
+ from sqlalchemy import text as sa_text
+ from src.db.base import get_db_context
+
+ async with get_db_context() as db:
+ # 計算同 symptoms_hash 的 KM 條目數
+ count_result = await db.execute(
+ sa_text(
+ "SELECT COUNT(*) FROM knowledge_entries "
+ "WHERE symptoms_hash = :hash"
+ ),
+ {"hash": symptoms_hash},
+ )
+ count = count_result.scalar() or 0
+
+ if count < settings.KM_PLAYBOOK_REVIEW_THRESHOLD:
+ return
+
+ # 累積達到門檻 → 標記相關 Playbook 需要 review
+ # Playbook 的 symptom_pattern 存為 JSONB,無直接 hash 欄位
+ # 透過 knowledge_entries.related_playbook_id 關聯找到要標記的 Playbook
+ updated = await db.execute(
+ sa_text(
+ "UPDATE playbooks pb "
+ "SET review_required = true, updated_at = NOW() "
+ "FROM knowledge_entries ke "
+ "WHERE ke.symptoms_hash = :hash "
+ " AND ke.related_playbook_id = pb.playbook_id "
+ " AND pb.review_required = false "
+ "RETURNING pb.playbook_id"
+ ),
+ {"hash": symptoms_hash},
+ )
+ marked_ids = [row[0] for row in updated.fetchall()]
+ await db.commit()
+
+ if marked_ids:
+ logger.info(
+ "playbook_review_required_marked",
+ symptoms_hash=symptoms_hash,
+ km_count=count,
+ threshold=settings.KM_PLAYBOOK_REVIEW_THRESHOLD,
+ playbook_ids=marked_ids,
+ )
+ except Exception as e:
+ logger.warning(
+ "playbook_review_mark_failed",
+ symptoms_hash=symptoms_hash,
+ error=str(e),
+ )
+
+ async def _demote_alert_rule_catalog_confidence(self, playbook: Any) -> None:
+ """
+ 邏輯 3: Playbook DEPRECATED 時回灌 alert_rule_catalog(飛輪 C4 修復)
+
+ UPDATE alert_rule_catalog
+ SET confidence = confidence * 0.5,
+ review_status = 'draft' -- CHECK constraint 允許 draft/approved/deprecated/retired
+ WHERE rule_name LIKE pattern(symptom_pattern.alert_names)
+
+ 注意:alert_rule_catalog.review_status CHECK 限制只允許:
+ draft | approved | deprecated | retired
+ 任務描述的 'needs_review' 不合法,改用 'draft'(語意等效:需要人工審核)
+
+ 失敗容忍:不影響 demote 主流程。
+ """
+ from src.core.config import settings
+ if not settings.ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP:
+ return
+
+ try:
+ import json
+ from sqlalchemy import text as sa_text
+ from src.db.base import get_db_context
+
+ # 從 playbook symptom_pattern 取出 alert_names 作為比對鍵
+ symptom = getattr(playbook, "symptom_pattern", None)
+ if symptom is None:
+ return
+
+ # symptom_pattern 可能是 Pydantic model 或 dict(從 ORM 載入為 dict)
+ if hasattr(symptom, "alert_names"):
+ alert_names: list[str] = symptom.alert_names or []
+ elif isinstance(symptom, dict):
+ alert_names = symptom.get("alert_names") or []
+ else:
+ return
+
+ if not alert_names:
+ logger.debug(
+ "playbook_demote_no_alert_names",
+ playbook_id=playbook.playbook_id,
+ )
+ return
+
+ async with get_db_context() as db:
+ updated_count = 0
+ for alert_name in alert_names:
+ # rule_name 完全匹配或前綴匹配(去掉 * suffix)
+ match_name = alert_name.rstrip("*")
+ result = await db.execute(
+ sa_text(
+ "UPDATE alert_rule_catalog "
+ "SET confidence = CASE "
+ " WHEN confidence IS NOT NULL "
+ " THEN GREATEST(0.01, confidence * 0.5) "
+ " ELSE 0.5 "
+ " END, "
+ " review_status = 'draft', "
+ " updated_at = NOW() "
+ "WHERE rule_name LIKE :pattern "
+ " AND (review_status IS NULL OR review_status NOT IN "
+ " ('deprecated', 'retired')) "
+ "RETURNING rule_id"
+ ),
+ {"pattern": f"{match_name}%"},
+ )
+ affected = result.rowcount or 0
+ updated_count += affected
+ await db.commit()
+
+ if updated_count > 0:
+ logger.info(
+ "alert_rule_catalog_confidence_demoted",
+ playbook_id=playbook.playbook_id,
+ alert_names=alert_names,
+ rules_updated=updated_count,
+ )
+ except Exception as e:
+ logger.warning(
+ "alert_rule_catalog_demote_failed",
+ playbook_id=getattr(playbook, "playbook_id", "unknown"),
+ error=str(e),
+ )
+
# =========================================================================
# 🆕 Phase D-G P0 修正: 新增方法
# =========================================================================
diff --git a/apps/api/src/services/post_execution_verifier.py b/apps/api/src/services/post_execution_verifier.py
index 1e05343e..e8e81d8d 100644
--- a/apps/api/src/services/post_execution_verifier.py
+++ b/apps/api/src/services/post_execution_verifier.py
@@ -21,6 +21,11 @@ AWOOOI AIOps Phase 1 — 執行後驗證器
- 超時不 raise,標記 "timeout" 並繼續流程
- 不阻塞原始執行路徑(await,但結果不影響執行本身是否成功)
+W2 PR-V1: SelfHealingValidator 串接 (2026-04-28 ogt + Claude Sonnet 4.6)
+ - ENABLE_SELF_HEALING_VALIDATOR=True 時,verify() 完成後呼叫 assess_self_healing()
+ - self_healing_score < 0.5 → Telegram 警示 rollback 提案(不自動執行)
+ - 驗證失敗不阻塞主流程(try/except 全包)
+
ADR-081: PreDecisionInvestigator + EvidenceSnapshot
MASTER §3.1 L6×D1
2026-04-15 ogt + Claude Sonnet 4.6 (亞太): Phase 1 初始建立
@@ -37,6 +42,9 @@ import structlog
from src.services.evidence_snapshot import EvidenceSnapshot
from src.services.mcp_tool_registry import SensorDimension, get_mcp_tool_registry
from src.services.sanitization_service import sanitize_dict_values
+# W2 PR-V1: 頂層 import 讓測試 patch 路徑固定(延遲 import 無法被 patch)
+# ENABLE_SELF_HEALING_VALIDATOR=False 時此 import 不影響效能(純 python 模組)
+from src.services import self_healing_validator as _shv_module
if TYPE_CHECKING:
from src.models.incident import Incident
@@ -136,6 +144,26 @@ class PostExecutionVerifier:
result=result,
action=action_taken,
)
+
+ # 5. W2 PR-V1: SelfHealingValidator 串接(ENABLE_SELF_HEALING_VALIDATOR gate)
+ # 在 post_state 已補填後評估自愈品質,不阻塞主流程
+ # 外層 try/except 確保任何 validator 失敗不影響 verify() 返回值
+ try:
+ await _run_self_healing_validator(
+ incident_id=incident_id,
+ snapshot=snapshot,
+ pre_state=pre_state,
+ post_state=post_state,
+ verification_result=result,
+ action_taken=action_taken,
+ )
+ except Exception:
+ logger.warning(
+ "self_healing_validator_uncaught",
+ incident_id=incident_id,
+ exc_info=True,
+ )
+
return result
async def capture_pre_execution_state(
@@ -209,6 +237,132 @@ class PostExecutionVerifier:
return state
+# ─────────────────────────────────────────────────────────────────────────────
+# W2 PR-V1: SelfHealingValidator 串接
+# 2026-04-28 ogt + Claude Sonnet 4.6: C6 飛輪斷鏈修復
+# ─────────────────────────────────────────────────────────────────────────────
+
+async def _run_self_healing_validator(
+ incident_id: str,
+ snapshot: EvidenceSnapshot | None,
+ pre_state: dict[str, Any] | None,
+ post_state: dict[str, Any],
+ verification_result: str,
+ action_taken: str,
+) -> None:
+ """
+ SelfHealingValidator 串接入口。
+
+ Feature gate: ENABLE_SELF_HEALING_VALIDATOR(預設 False)。
+ 驗證失敗全程 try/except 保護,不影響主流程。
+
+ 評估後:
+ - 補填 snapshot.self_healing_score + self_healing_detail
+ - score < 0.5 → 發送 Telegram rollback 提案警示
+ """
+ try:
+ from src.core.config import get_settings
+ _settings = get_settings()
+ if not _settings.ENABLE_SELF_HEALING_VALIDATOR:
+ return
+
+ assessment = _shv_module.assess_self_healing(
+ pre_state=pre_state,
+ post_state=post_state,
+ verification_result=verification_result,
+ action_taken=action_taken,
+ )
+ score: float = assessment["score"]
+
+ logger.info(
+ "self_healing_assessed",
+ incident_id=incident_id,
+ score=score,
+ regressions=assessment.get("regressions", []),
+ root_cause_cleared=assessment.get("root_cause_cleared"),
+ detail=assessment.get("detail"),
+ )
+
+ # 補填 EvidenceSnapshot
+ if snapshot:
+ try:
+ await snapshot.update_self_healing(score=score, detail=assessment)
+ except Exception as _snap_err:
+ logger.warning(
+ "self_healing_snapshot_update_failed",
+ incident_id=incident_id,
+ error=str(_snap_err),
+ )
+
+ # score < 0.5 → Telegram rollback 提案警示
+ if score < 0.5:
+ await _send_rollback_proposal_alert(
+ incident_id=incident_id,
+ score=score,
+ assessment=assessment,
+ action_taken=action_taken,
+ )
+
+ except Exception:
+ logger.warning(
+ "self_healing_validator_error",
+ incident_id=incident_id,
+ exc_info=True,
+ )
+
+
+async def _send_rollback_proposal_alert(
+ incident_id: str,
+ score: float,
+ assessment: dict[str, Any],
+ action_taken: str,
+) -> None:
+ """
+ 自愈品質分數 < 0.5 時,發送 Telegram rollback 提案警示。
+
+ 不自動執行 rollback,僅通知人工評估。
+ """
+ try:
+ from src.core.config import get_settings
+ from src.services.telegram_gateway import get_telegram_gateway
+ _settings = get_settings()
+ gateway = get_telegram_gateway()
+
+ regressions = assessment.get("regressions", [])
+ reg_str = ", ".join(regressions[:5]) if regressions else "無"
+ root_cleared = "是" if assessment.get("root_cause_cleared") else "否"
+
+ text = (
+ f"⚠️ 自愈品質警示 — 建議人工評估 Rollback\n"
+ f"Incident: {incident_id}\n"
+ f"動作: {action_taken[:120]}\n"
+ f"自愈分數: {score:.2f} (門檻 0.5)\n"
+ f"Root Cause 解除: {root_cleared}\n"
+ f"Regression 信號: {reg_str}\n"
+ f"此為提案,不會自動執行 Rollback"
+ )
+
+ await gateway._http_client.post(
+ f"https://api.telegram.org/bot{_settings.OPENCLAW_TG_BOT_TOKEN}/sendMessage",
+ json={
+ "chat_id": _settings.OPENCLAW_TG_CHAT_ID,
+ "text": text,
+ "parse_mode": "HTML",
+ },
+ )
+ logger.info(
+ "rollback_proposal_sent",
+ incident_id=incident_id,
+ score=score,
+ )
+ except Exception:
+ logger.warning(
+ "rollback_proposal_send_failed",
+ incident_id=incident_id,
+ exc_info=True,
+ )
+
+
# ─────────────────────────────────────────────────────────────────────────────
# Recovery Assessment
# ─────────────────────────────────────────────────────────────────────────────
diff --git a/apps/api/src/services/self_healing_validator.py b/apps/api/src/services/self_healing_validator.py
new file mode 100644
index 00000000..5d40c97b
--- /dev/null
+++ b/apps/api/src/services/self_healing_validator.py
@@ -0,0 +1,163 @@
+"""
+AWOOOI AIOps — 自愈品質驗證器
+================================
+W2 PR-V1: 飛輪斷鏈 C6 修復 — PostExecutionVerifier 串接自愈品質評估
+
+職責:
+ 1. 評估系統是否真的「自愈」(root cause 解除 vs 只是 metric 暫時恢復)
+ 2. Regression Detection(修完一個指標但其他指標惡化)
+ 3. 修復品質分數(0.0 ~ 1.0)
+
+評分邏輯:
+ - base_score 由 verification_result 決定(success=1.0 / degraded=0.4 / failed=0.0 / timeout=0.2)
+ - regression_penalty 由 pre/post state diff 中惡化指標數量決定
+ - 最終 score = max(0.0, base_score - regression_penalty)
+
+閾值:
+ - score < 0.5 → rollback 提案(Telegram 警示,不自動執行)
+ - score >= 0.5 → 認可自愈,無額外動作
+
+設計原則:
+ - 不修改 self_healing_validator 內部邏輯(外部串接層)
+ - 驗證失敗不阻塞主流程(容錯 try/except 全包)
+ - Feature Flag: ENABLE_SELF_HEALING_VALIDATOR=false(預設關閉)
+
+ADR-081 Phase 1 延伸
+2026-04-28 ogt + Claude Sonnet 4.6: W2 PR-V1 初始建立(C6 修復)
+"""
+
+from __future__ import annotations
+
+import re
+from typing import TYPE_CHECKING, Any
+
+import structlog
+
+if TYPE_CHECKING:
+ pass
+
+logger = structlog.get_logger(__name__)
+
+# 修復品質分數基準(by verification_result)
+_BASE_SCORES: dict[str, float] = {
+ "success": 1.0,
+ "degraded": 0.4,
+ "failed": 0.0,
+ "timeout": 0.2,
+}
+
+# 每個惡化指標的扣分
+_REGRESSION_PENALTY_PER_METRIC = 0.15
+
+# 扣分上限(避免 over-penalty)
+_MAX_REGRESSION_PENALTY = 0.4
+
+# root cause 解除信號(post_state 出現這些 → root cause 已清除)
+_ROOT_CAUSE_CLEARED_SIGNALS = ["running", "ready", "1/1", "2/2", "3/3", "healthy"]
+
+# regression 惡化信號(post_state 新出現但 pre_state 不存在 → regression)
+_REGRESSION_SIGNALS = [
+ "crashloopbackoff",
+ "oomkilled",
+ "oomkill",
+ "pending",
+ "terminating",
+ "error",
+ "failed",
+ "timeout",
+ "evicted",
+ "imagepullbackoff",
+ "errimagepull",
+]
+
+# 數值指標惡化偵測(regex 找 %、數字,比較增幅)
+_NUMERIC_THRESHOLD_RATIO = 0.2 # 超過 20% 增幅算惡化
+
+
+def assess_self_healing(
+ pre_state: dict[str, Any] | None,
+ post_state: dict[str, Any] | None,
+ verification_result: str,
+ action_taken: str,
+) -> dict[str, Any]:
+ """
+ 評估自愈品質,返回結構化評估結果。
+
+ Args:
+ pre_state: 執行前環境狀態(可為 None)
+ post_state: 執行後環境狀態(可為 None)
+ verification_result: PostExecutionVerifier 的判斷結果(success/degraded/failed/timeout)
+ action_taken: 執行的動作描述
+
+ Returns:
+ dict 包含:
+ score (float 0.0-1.0)
+ root_cause_cleared (bool)
+ regressions (list[str] — 惡化的指標名稱)
+ detail (str — 人類可讀說明)
+ """
+ base_score = _BASE_SCORES.get(verification_result, 0.0)
+
+ pre_str = str(pre_state).lower() if pre_state else ""
+ post_str = str(post_state).lower() if post_state else ""
+
+ # 1. Root cause 是否真正解除
+ root_cause_cleared = any(sig in post_str for sig in _ROOT_CAUSE_CLEARED_SIGNALS)
+ if verification_result in ("failed", "timeout"):
+ root_cause_cleared = False
+
+ # 2. Regression detection — 新出現在 post 但 pre 沒有的惡化信號
+ regressions: list[str] = []
+ for sig in _REGRESSION_SIGNALS:
+ if sig in post_str and sig not in pre_str:
+ regressions.append(sig)
+
+ # 3. 數值指標惡化偵測(簡單版:找百分比值增幅)
+ pre_nums = _extract_percentages(pre_str)
+ post_nums = _extract_percentages(post_str)
+ for key, pre_val in pre_nums.items():
+ if key in post_nums:
+ post_val = post_nums[key]
+ if pre_val > 0 and (post_val - pre_val) / pre_val > _NUMERIC_THRESHOLD_RATIO:
+ regressions.append(f"metric_increase:{key}")
+
+ # 4. 計算最終分數
+ regression_penalty = min(
+ len(regressions) * _REGRESSION_PENALTY_PER_METRIC,
+ _MAX_REGRESSION_PENALTY,
+ )
+ score = max(0.0, base_score - regression_penalty)
+
+ # 5. 組裝說明
+ detail_parts = [f"base={base_score:.2f}"]
+ if regressions:
+ detail_parts.append(f"regression_penalty={regression_penalty:.2f} ({','.join(regressions[:5])})")
+ if not root_cause_cleared and verification_result == "success":
+ detail_parts.append("root_cause_unclear")
+ detail = "; ".join(detail_parts)
+
+ return {
+ "score": round(score, 4),
+ "root_cause_cleared": root_cause_cleared,
+ "regressions": regressions,
+ "detail": detail,
+ "verification_result": verification_result,
+ "action_taken": action_taken,
+ }
+
+
+def _extract_percentages(text: str) -> dict[str, float]:
+ """
+ 從狀態字串中提取數值百分比。
+
+ 例如 "cpu_usage: 85%" → {"cpu_usage": 85.0}
+ 用於偵測指標惡化(簡單啟發式,Phase 1 版本)。
+ """
+ result: dict[str, float] = {}
+ # 格式:word_key: N% 或 word_key=N%
+ pattern = re.compile(r"(\w+)[:\s=]+(\d+(?:\.\d+)?)\s*%")
+ for match in pattern.finditer(text):
+ key = match.group(1)
+ val = float(match.group(2))
+ result[key] = val
+ return result
diff --git a/apps/api/tests/test_aol_to_catalog_writeback_job.py b/apps/api/tests/test_aol_to_catalog_writeback_job.py
new file mode 100644
index 00000000..c1fa5507
--- /dev/null
+++ b/apps/api/tests/test_aol_to_catalog_writeback_job.py
@@ -0,0 +1,378 @@
+"""
+W2 PR-R2 — AOL → alert_rule_catalog Confidence EWMA Writeback 測試
+====================================================================
+ADR-091 Task T2 飛輪斷鏈 C2 修復
+
+測試範圍:
+ - test_ewma_calculation EWMA 公式正確(有舊值 / 無舊值兩路)
+ - test_low_success_triggers_draft 低成功率且樣本 >= 5 → review_status='draft'
+ - test_min_sample_threshold 樣本 < 5 不降 review_status
+ - test_dry_run_no_db_write feature flag=False → 不碰 DB
+ - test_feature_flag_disabled_skips flag=False 回傳 skipped=True
+ - test_hermes_picks_up_draft Hermes _fetch_noisy_rules SQL 含 OR review_status='draft'
+
+禁止 Mock 測試鐵律:
+ DB 依賴用 AsyncMock patch(get_db_context),只測業務邏輯分支。
+ EWMA / sample 判斷為純 Python 邏輯,直接呼叫私有函式驗證。
+
+建立: 2026-04-28 (台北時區) Claude Sonnet 4.6 (W2 PR-R2 ADR-091 T2)
+"""
+from __future__ import annotations
+
+import os
+from contextlib import asynccontextmanager
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+# conftest 前先設環境變數
+os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://test:test@localhost/test")
+
+
+# =============================================================================
+# Helper: DB context mock
+# =============================================================================
+
+def _make_db_ctx(fetch_one_return=None, execute_side_effect=None):
+ """
+ 回傳 (ctx_factory, mock_db)。
+ simulate get_db_context() 的 async context manager。
+ """
+ mock_db = AsyncMock()
+
+ if execute_side_effect is not None:
+ mock_db.execute = AsyncMock(side_effect=execute_side_effect)
+
+ if fetch_one_return is not None:
+ mock_result = MagicMock()
+ mock_result.one_or_none.return_value = fetch_one_return
+ mock_db.execute = AsyncMock(return_value=mock_result)
+
+ @asynccontextmanager
+ async def _ctx():
+ yield mock_db
+
+ return _ctx, mock_db
+
+
+def _catalog_row(confidence=None, review_status=None):
+ """建構 alert_rule_catalog 假 row。"""
+ row = MagicMock()
+ row.rule_id = 1
+ row.confidence = confidence
+ row.review_status = review_status
+ return row
+
+
+# =============================================================================
+# test_ewma_calculation — EWMA 計算正確性
+# =============================================================================
+
+@pytest.mark.asyncio
+async def test_ewma_calculation_with_existing_confidence():
+ """
+ 有舊 confidence 時:new = 0.7 * old + 0.3 * recent_sr
+ """
+ from src.jobs.aol_to_catalog_writeback_job import _update_catalog_confidence
+
+ old_conf = 0.80
+ recent_sr = 0.50
+ expected = round(0.7 * old_conf + 0.3 * recent_sr, 2) # 0.71
+
+ # mock: 第一次 execute 回傳 existing row,第二次 execute 為 UPDATE
+ existing_row = _catalog_row(confidence=old_conf, review_status="approved")
+ call_count = 0
+
+ @asynccontextmanager
+ async def _ctx():
+ mock_db = AsyncMock()
+
+ async def _execute(sql, params=None):
+ nonlocal call_count
+ call_count += 1
+ if call_count == 1:
+ # SELECT 查詢
+ r = MagicMock()
+ r.one_or_none.return_value = existing_row
+ return r
+ else:
+ # UPDATE
+ return MagicMock()
+
+ mock_db.execute = _execute
+ yield mock_db
+
+ sample = {
+ "alertname": "HostHighCpuLoad",
+ "ok": 5,
+ "total": 10,
+ "recent_success_rate": recent_sr,
+ }
+
+ # lazy import: aol_to_catalog_writeback_job 內 from src.db.base import get_db_context
+ # patch 源頭模組即可
+ with patch("src.db.base.get_db_context", _ctx):
+ updated, flagged = await _update_catalog_confidence(sample)
+
+ assert updated is True
+ assert flagged is False
+ # call_count=2: SELECT + UPDATE(不降級)
+
+
+@pytest.mark.asyncio
+async def test_ewma_calculation_without_existing_confidence():
+ """
+ confidence IS NULL 時:new_confidence = recent_success_rate(初始化)
+ """
+ from src.jobs.aol_to_catalog_writeback_job import _update_catalog_confidence
+
+ recent_sr = 0.75
+ existing_row = _catalog_row(confidence=None, review_status=None)
+ call_count = 0
+
+ @asynccontextmanager
+ async def _ctx():
+ mock_db = AsyncMock()
+
+ async def _execute(sql, params=None):
+ nonlocal call_count
+ call_count += 1
+ r = MagicMock()
+ r.one_or_none.return_value = existing_row
+ return r
+
+ mock_db.execute = _execute
+ yield mock_db
+
+ sample = {
+ "alertname": "HostDiskFull",
+ "ok": 6,
+ "total": 8,
+ "recent_success_rate": recent_sr,
+ }
+
+ with patch("src.db.base.get_db_context", _ctx):
+ updated, flagged = await _update_catalog_confidence(sample)
+
+ assert updated is True
+ # 初始值 = recent_sr,不是低成功率 → 不降 draft
+ assert flagged is False
+
+
+# =============================================================================
+# test_low_success_triggers_draft
+# =============================================================================
+
+@pytest.mark.asyncio
+async def test_low_success_triggers_draft():
+ """
+ recent_success_rate < 0.3 且 total >= 5 → review_status 設為 'draft',
+ 且 updated=True, flagged=True.
+ """
+ from src.jobs.aol_to_catalog_writeback_job import _update_catalog_confidence
+
+ existing_row = _catalog_row(confidence=0.60, review_status="approved")
+ updates_seen = []
+ call_count = 0
+
+ @asynccontextmanager
+ async def _ctx():
+ mock_db = AsyncMock()
+
+ async def _execute(sql, params=None):
+ nonlocal call_count
+ call_count += 1
+ r = MagicMock()
+ r.one_or_none.return_value = existing_row
+ if params:
+ updates_seen.append(params)
+ return r
+
+ mock_db.execute = _execute
+ yield mock_db
+
+ sample = {
+ "alertname": "KubeDeploymentReplicasMismatch",
+ "ok": 1,
+ "total": 10, # >= 5
+ "recent_success_rate": 0.10, # < 0.3 → 觸發 draft
+ }
+
+ with patch("src.db.base.get_db_context", _ctx):
+ updated, flagged = await _update_catalog_confidence(sample)
+
+ assert updated is True
+ assert flagged is True
+
+ # 確認 UPDATE 帶了 review_status='draft'
+ draft_update = next(
+ (p for p in updates_seen if p.get("rs") == "draft"),
+ None,
+ )
+ assert draft_update is not None, "應有帶 rs='draft' 的 UPDATE 參數"
+
+
+# =============================================================================
+# test_min_sample_threshold
+# =============================================================================
+
+@pytest.mark.asyncio
+async def test_min_sample_threshold_no_flag():
+ """
+ recent_success_rate < 0.3 但 total < 5 → 不降 draft,只更新 confidence.
+ """
+ from src.jobs.aol_to_catalog_writeback_job import _update_catalog_confidence
+
+ existing_row = _catalog_row(confidence=0.60, review_status="approved")
+ updates_seen = []
+ call_count = 0
+
+ @asynccontextmanager
+ async def _ctx():
+ mock_db = AsyncMock()
+
+ async def _execute(sql, params=None):
+ nonlocal call_count
+ call_count += 1
+ r = MagicMock()
+ r.one_or_none.return_value = existing_row
+ if params:
+ updates_seen.append(params)
+ return r
+
+ mock_db.execute = _execute
+ yield mock_db
+
+ sample = {
+ "alertname": "SomeRareAlert",
+ "ok": 0,
+ "total": 3, # < 5 → 不降
+ "recent_success_rate": 0.0,
+ }
+
+ with patch("src.db.base.get_db_context", _ctx):
+ updated, flagged = await _update_catalog_confidence(sample)
+
+ assert updated is True
+ assert flagged is False
+ # 確認沒有帶 rs='draft' 的 UPDATE
+ draft_update = next(
+ (p for p in updates_seen if p.get("rs") == "draft"),
+ None,
+ )
+ assert draft_update is None, "sample < 5 不應降 review_status"
+
+
+# =============================================================================
+# test_dry_run_no_db_write / test_feature_flag_disabled_skips
+# =============================================================================
+
+@pytest.mark.asyncio
+async def test_feature_flag_disabled_skips():
+ """
+ ENABLE_AOL_WRITEBACK_JOB=False → run_aol_writeback_once 回傳 skipped=True,
+ 且不觸發任何 DB 操作。
+ """
+ from src.jobs.aol_to_catalog_writeback_job import run_aol_writeback_once
+
+ db_call_count = 0
+
+ @asynccontextmanager
+ async def _ctx():
+ nonlocal db_call_count
+ db_call_count += 1
+ yield AsyncMock()
+
+ with patch("src.core.config.settings") as mock_settings:
+ mock_settings.ENABLE_AOL_WRITEBACK_JOB = False
+
+ # patch job module's settings reference
+ with patch("src.jobs.aol_to_catalog_writeback_job.settings", mock_settings):
+ result = await run_aol_writeback_once()
+
+ assert result["skipped"] is True
+ assert result["rules_sampled"] == 0
+ assert result["rules_updated"] == 0
+ assert result["rules_flagged_draft"] == 0
+ assert db_call_count == 0, "feature flag=False 時不應碰 DB"
+
+
+@pytest.mark.asyncio
+async def test_dry_run_no_db_write():
+ """
+ 同上:flag=False 時完全不寫 DB(別名測試,語義明確).
+ """
+ from src.jobs.aol_to_catalog_writeback_job import run_aol_writeback_once
+
+ written = []
+
+ @asynccontextmanager
+ async def _ctx():
+ mock_db = AsyncMock()
+ mock_db.execute = AsyncMock(side_effect=lambda *a, **kw: written.append(a))
+ yield mock_db
+
+ with patch("src.jobs.aol_to_catalog_writeback_job.settings") as mock_settings:
+ mock_settings.ENABLE_AOL_WRITEBACK_JOB = False
+ result = await run_aol_writeback_once()
+
+ assert result["skipped"] is True
+ assert len(written) == 0
+
+
+# =============================================================================
+# test_hermes_picks_up_draft — Hermes SQL 包含 OR review_status='draft' 條件
+# =============================================================================
+
+def test_hermes_sql_includes_draft_condition():
+ """
+ 驗證 hermes_rule_quality_job._fetch_noisy_rules 的 SQL 包含 OR review_status = 'draft'
+ (靜態檢查,不跑真實 DB).
+
+ W2 PR-R2 要求:Hermes 必須撈到 AOL writeback 標記的 draft rules。
+ """
+ import inspect
+ from src.jobs import hermes_rule_quality_job
+
+ # 讀取 _fetch_noisy_rules 的原始碼
+ src = inspect.getsource(hermes_rule_quality_job._fetch_noisy_rules)
+
+ assert "review_status = 'draft'" in src, (
+ "Hermes _fetch_noisy_rules 缺少 OR review_status = 'draft' 條件 "
+ "(W2 PR-R2 斷鏈 C2 修復要求此條件觸發 AOL writeback advisory)"
+ )
+
+
+@pytest.mark.asyncio
+async def test_hermes_picks_up_needs_review_rules():
+ """
+ Hermes _fetch_noisy_rules 被呼叫時,若 DB 有 review_status='draft' 的 rule,
+ 應正常回傳(不因額外 OR 條件報錯).
+ """
+ from src.jobs.hermes_rule_quality_job import _fetch_noisy_rules
+
+ draft_row = MagicMock()
+ draft_row.rule_id = 99
+ draft_row.rule_name = "LowSuccessRate"
+ draft_row.severity = "warning"
+ draft_row.true_positive_count = 1
+ draft_row.false_positive_count = 9
+ draft_row.noise_rate = 0.9
+ draft_row.last_fired_at = None
+ draft_row.review_status = "draft"
+
+ mock_result = MagicMock()
+ mock_result.fetchall.return_value = [draft_row]
+
+ @asynccontextmanager
+ async def _ctx():
+ mock_db = AsyncMock()
+ mock_db.execute = AsyncMock(return_value=mock_result)
+ yield mock_db
+
+ with patch("src.db.base.get_db_context", _ctx):
+ rules = await _fetch_noisy_rules()
+
+ assert len(rules) == 1
+ assert rules[0]["rule_name"] == "LowSuccessRate"
+ assert rules[0]["review_status"] == "draft"
diff --git a/apps/api/tests/test_km_playbook_feedback_loop.py b/apps/api/tests/test_km_playbook_feedback_loop.py
new file mode 100644
index 00000000..1a189d3b
--- /dev/null
+++ b/apps/api/tests/test_km_playbook_feedback_loop.py
@@ -0,0 +1,402 @@
+"""
+KM → Playbook 互饋回路單元測試
+================================
+W2 PR-L1: 飛輪斷鏈 C3 + C4 修復測試
+
+測試範圍:
+ 1. test_playbook_promotion_writes_km_entry
+ — _promote_playbook 觸發後,KMWriter 被呼叫寫 playbook_evolution 條目
+ 2. test_playbook_demotion_writes_km_entry
+ — _demote_playbook 觸發後,KMWriter 被呼叫寫 playbook_evolution 條目
+ 3. test_km_accumulation_triggers_playbook_review
+ — 同 symptoms_hash 累積 5 條 → UPDATE playbooks.review_required=true
+ 4. test_km_accumulation_below_threshold_no_update
+ — KM 條目 < threshold → 不執行 UPDATE
+ 5. test_playbook_deprecated_demotes_alert_rule_confidence
+ — DEPRECATED Playbook → alert_rule_catalog.confidence *= 0.5
+ 6. test_feature_flag_disabled
+ — ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=false → 三條邏輯全部跳過,不呼叫 DB
+
+設計原則:
+ - 外部服務(DB / KMWriter / PlaybookRepository)以 AsyncMock 替換
+ - 每個 test 只測一條主路徑(單一職責)
+ - Feature flag 透過 patch 'src.core.config.settings' 控制
+ - get_db_context patch 路徑:src.db.base.get_db_context(local import 的來源模組)
+ - get_playbook_repository patch 路徑:
+ src.repositories.playbook_repository.get_playbook_repository
+
+建立:2026-04-28 (台北時區) ogt + Claude Sonnet 4.6
+"""
+
+from __future__ import annotations
+
+from contextlib import asynccontextmanager
+from types import SimpleNamespace
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+
+
+# =============================================================================
+# Helpers
+# =============================================================================
+
+def _make_playbook(
+ playbook_id: str = "PB-20260428-AAAAAA",
+ name: str = "TestPlaybook",
+ trust_score: float = 0.5,
+ success_count: int = 3,
+ failure_count: int = 1,
+ status: str = "approved",
+ alert_names: list[str] | None = None,
+) -> SimpleNamespace:
+ """
+ 建立一個最小可用的 Playbook mock 物件。
+
+ 使用 SimpleNamespace 讓屬性存取與 Pydantic model 相同,
+ 但不引入真實 ORM / Pydantic 依賴(防止 DB 連線)。
+ symptom_pattern.compute_hash() 返回固定 'abc123' 供測試使用。
+ """
+ symptom = SimpleNamespace(
+ alert_names=alert_names or ["HighCpuUsage"],
+ affected_services=["api"],
+ label_patterns={},
+ compute_hash=lambda: "abc123",
+ )
+
+ from src.models.playbook import PlaybookStatus
+ status_enum = PlaybookStatus(status)
+
+ return SimpleNamespace(
+ playbook_id=playbook_id,
+ name=name,
+ trust_score=trust_score,
+ success_count=success_count,
+ failure_count=failure_count,
+ status=status_enum,
+ symptom_pattern=symptom,
+ )
+
+
+def _make_learning_service():
+ """
+ 建立 LearningService 實例,所有外部依賴 mock 掉。
+ repository 和 trust_repository 均使用 AsyncMock 防止 Redis 連線。
+ """
+ from src.services.learning_service import LearningService
+
+ mock_repo = AsyncMock()
+ mock_trust_repo = AsyncMock()
+ mock_trust_mgr = MagicMock()
+ mock_trust_mgr.get_trust_record.return_value = None
+
+ svc = LearningService(
+ repository=mock_repo,
+ trust_repository=mock_trust_repo,
+ )
+ svc._trust_manager = mock_trust_mgr
+ return svc
+
+
+def _make_settings(enable_loop: bool = True, threshold: int = 5) -> MagicMock:
+ """
+ 建立 settings mock。
+ patch 路徑:src.core.config.settings(learning_service 各方法均 local import 自此模組)
+ """
+ m = MagicMock()
+ m.ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP = enable_loop
+ m.KM_PLAYBOOK_REVIEW_THRESHOLD = threshold
+ m.KM_WRITE_AWAIT = True
+ m.KM_WRITE_TIMEOUT_SECONDS = 5.0
+ return m
+
+
+def _make_db_context_factory(mock_db):
+ """
+ 返回一個可多次呼叫的 async context manager factory。
+
+ 每次呼叫 factory() 返回新的 async context manager 實例,
+ 防止同一 cm 物件被複用(async generator 只能迭代一次)。
+ """
+ def factory():
+ @asynccontextmanager
+ async def _ctx():
+ yield mock_db
+ return _ctx()
+ return factory
+
+
+# =============================================================================
+# 1. Promote 觸發 → 寫 KM 演化條目
+# =============================================================================
+
+@pytest.mark.asyncio
+async def test_playbook_promotion_writes_km_entry():
+ """
+ _promote_playbook 觸發後,若 ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=True,
+ km_write_with_flag 應被呼叫一次,path_type 含 'playbook_evolution'。
+ """
+ svc = _make_learning_service()
+ playbook = _make_playbook(trust_score=0.5, status="approved")
+
+ km_calls: list = []
+
+ async def _mock_km_write(payload, *, timeout=None):
+ km_calls.append(payload)
+ from src.services.km_writer import KMWriteResult
+ return KMWriteResult.SUCCESS
+
+ mock_pb_repo = AsyncMock()
+ mock_pb_repo.find_by_source_incident = AsyncMock(return_value=[playbook])
+ mock_pb_repo.adjust_confidence = AsyncMock(return_value=True)
+
+ mock_settings = _make_settings(enable_loop=True)
+
+ with (
+ patch("src.core.config.settings", mock_settings),
+ patch("src.services.km_writer.km_write_with_flag", side_effect=_mock_km_write),
+ patch(
+ "src.repositories.playbook_repository.get_playbook_repository",
+ return_value=mock_pb_repo,
+ ),
+ ):
+ result = await svc._promote_playbook("INC-TEST-001")
+
+ assert result is True
+ assert len(km_calls) == 1, "KMWriter 應被呼叫一次(一個 Playbook promote)"
+ assert "playbook_evolution" in km_calls[0].path_type
+ assert km_calls[0].metadata["evolution_type"] == "promote"
+ assert km_calls[0].metadata["playbook_id"] == playbook.playbook_id
+ assert km_calls[0].metadata["previous_trust"] == 0.5
+
+
+# =============================================================================
+# 2. Demote 觸發 → 寫 KM 演化條目
+# =============================================================================
+
+@pytest.mark.asyncio
+async def test_playbook_demotion_writes_km_entry():
+ """
+ _demote_playbook 觸發後,若 ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=True,
+ km_write_with_flag 應被呼叫一次,evolution_type='demote'。
+
+ status='approved'(非 DEPRECATED)→ 邏輯 3 不觸發,保持單一職責。
+ """
+ svc = _make_learning_service()
+ playbook = _make_playbook(trust_score=0.4, status="approved")
+
+ km_calls: list = []
+
+ async def _mock_km_write(payload, *, timeout=None):
+ km_calls.append(payload)
+ from src.services.km_writer import KMWriteResult
+ return KMWriteResult.SUCCESS
+
+ mock_pb_repo = AsyncMock()
+ mock_pb_repo.find_by_source_incident = AsyncMock(return_value=[playbook])
+ mock_pb_repo.adjust_confidence = AsyncMock(return_value=True)
+
+ mock_settings = _make_settings(enable_loop=True)
+
+ with (
+ patch("src.core.config.settings", mock_settings),
+ patch("src.services.km_writer.km_write_with_flag", side_effect=_mock_km_write),
+ patch(
+ "src.repositories.playbook_repository.get_playbook_repository",
+ return_value=mock_pb_repo,
+ ),
+ ):
+ result = await svc._demote_playbook("INC-TEST-002")
+
+ assert result is True
+ assert len(km_calls) == 1, "KMWriter 應被呼叫一次(一個 Playbook demote)"
+ assert "playbook_evolution" in km_calls[0].path_type
+ assert km_calls[0].metadata["evolution_type"] == "demote"
+
+
+# =============================================================================
+# 3. KM 累積 N=5 → review_required=True
+# =============================================================================
+
+@pytest.mark.asyncio
+async def test_km_accumulation_triggers_playbook_review():
+ """
+ 同 symptoms_hash 的 KM 條目達到 threshold(預設 5)時,
+ _check_and_mark_playbook_review 應執行 COUNT + UPDATE,並 commit。
+ """
+ svc = _make_learning_service()
+ symptoms_hash = "abc123"
+
+ mock_db = AsyncMock()
+ execute_call_count = {"n": 0}
+
+ mock_count_result = MagicMock()
+ mock_count_result.scalar.return_value = 5
+
+ mock_update_result = MagicMock()
+ mock_update_result.fetchall.return_value = [("PB-20260428-AAAAAA",)]
+
+ async def _multi_execute(stmt, params=None):
+ execute_call_count["n"] += 1
+ if execute_call_count["n"] == 1:
+ return mock_count_result
+ return mock_update_result
+
+ mock_db.execute = _multi_execute
+ mock_db.commit = AsyncMock()
+
+ mock_settings = _make_settings(enable_loop=True, threshold=5)
+
+ with (
+ patch("src.core.config.settings", mock_settings),
+ patch(
+ "src.db.base.get_db_context",
+ side_effect=_make_db_context_factory(mock_db),
+ ),
+ ):
+ await svc._check_and_mark_playbook_review(symptoms_hash)
+
+ assert execute_call_count["n"] == 2, "應執行兩次 SQL(COUNT + UPDATE)"
+ mock_db.commit.assert_called_once()
+
+
+@pytest.mark.asyncio
+async def test_km_accumulation_below_threshold_no_update():
+ """
+ KM 條目數 < threshold → 不執行 UPDATE,不 commit。
+ """
+ svc = _make_learning_service()
+ symptoms_hash = "abc123"
+
+ mock_db = AsyncMock()
+ execute_call_count = {"n": 0}
+
+ mock_count_result = MagicMock()
+ mock_count_result.scalar.return_value = 3 # < 5
+
+ async def _single_execute(stmt, params=None):
+ execute_call_count["n"] += 1
+ return mock_count_result
+
+ mock_db.execute = _single_execute
+ mock_db.commit = AsyncMock()
+
+ mock_settings = _make_settings(enable_loop=True, threshold=5)
+
+ with (
+ patch("src.core.config.settings", mock_settings),
+ patch(
+ "src.db.base.get_db_context",
+ side_effect=_make_db_context_factory(mock_db),
+ ),
+ ):
+ await svc._check_and_mark_playbook_review(symptoms_hash)
+
+ assert execute_call_count["n"] == 1, "只執行 COUNT,不執行 UPDATE"
+ mock_db.commit.assert_not_called()
+
+
+# =============================================================================
+# 4. DEPRECATED → alert_rule_catalog.confidence *= 0.5
+# =============================================================================
+
+@pytest.mark.asyncio
+async def test_playbook_deprecated_demotes_alert_rule_confidence():
+ """
+ DEPRECATED Playbook 的 _demote_alert_rule_catalog_confidence 執行後,
+ 每個 alert_name 執行一次 UPDATE,最後 commit 一次。
+ """
+ svc = _make_learning_service()
+
+ from src.models.playbook import PlaybookStatus
+ playbook = _make_playbook(
+ status="deprecated",
+ alert_names=["HighCpuUsage", "PodCrashLooping"],
+ )
+ playbook.status = PlaybookStatus.DEPRECATED
+
+ mock_db = AsyncMock()
+ execute_call_count = {"n": 0}
+
+ async def _track_execute(stmt, params=None):
+ execute_call_count["n"] += 1
+ m = MagicMock()
+ m.rowcount = 1
+ return m
+
+ mock_db.execute = _track_execute
+ mock_db.commit = AsyncMock()
+
+ mock_settings = _make_settings(enable_loop=True)
+
+ with (
+ patch("src.core.config.settings", mock_settings),
+ patch(
+ "src.db.base.get_db_context",
+ side_effect=_make_db_context_factory(mock_db),
+ ),
+ ):
+ await svc._demote_alert_rule_catalog_confidence(playbook)
+
+ assert execute_call_count["n"] == 2, "2 條 alert_names → 2 次 UPDATE"
+ mock_db.commit.assert_called_once()
+
+
+# =============================================================================
+# 5. Feature flag disabled → 所有邏輯跳過
+# =============================================================================
+
+@pytest.mark.asyncio
+async def test_feature_flag_disabled():
+ """
+ ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=False 時,
+ _write_playbook_evolution_km / _check_and_mark_playbook_review /
+ _demote_alert_rule_catalog_confidence 均不應呼叫任何 DB 或 KMWriter。
+ """
+ svc = _make_learning_service()
+ from src.models.playbook import PlaybookStatus
+ playbook = _make_playbook(trust_score=0.3, status="deprecated")
+ playbook.status = PlaybookStatus.DEPRECATED
+
+ km_write_calls: list = []
+ db_execute_calls: list = []
+
+ async def _mock_km_write(payload, *, timeout=None):
+ km_write_calls.append(payload)
+ from src.services.km_writer import KMWriteResult
+ return KMWriteResult.SUCCESS
+
+ mock_db = AsyncMock()
+
+ async def _track_execute(stmt, params=None):
+ db_execute_calls.append(stmt)
+ return MagicMock()
+
+ mock_db.execute = _track_execute
+ mock_db.commit = AsyncMock()
+
+ mock_settings = _make_settings(enable_loop=False)
+
+ with (
+ patch("src.core.config.settings", mock_settings),
+ patch("src.services.km_writer.km_write_with_flag", side_effect=_mock_km_write),
+ patch(
+ "src.db.base.get_db_context",
+ side_effect=_make_db_context_factory(mock_db),
+ ),
+ ):
+ # 邏輯 1
+ await svc._write_playbook_evolution_km(
+ playbook=playbook,
+ previous_trust=0.5,
+ evolution_type="promote",
+ incident_id="INC-TEST-FLAG",
+ )
+ # 邏輯 2
+ await svc._check_and_mark_playbook_review("abc123")
+ # 邏輯 3
+ await svc._demote_alert_rule_catalog_confidence(playbook)
+
+ assert len(km_write_calls) == 0, "KMWriter 不應被呼叫(flag=False)"
+ assert len(db_execute_calls) == 0, "DB execute 不應被呼叫(flag=False)"
+ mock_db.commit.assert_not_called()
diff --git a/apps/api/tests/test_self_healing_validator_integration.py b/apps/api/tests/test_self_healing_validator_integration.py
new file mode 100644
index 00000000..eb0e80b9
--- /dev/null
+++ b/apps/api/tests/test_self_healing_validator_integration.py
@@ -0,0 +1,352 @@
+"""
+SelfHealingValidator 整合測試
+================================
+W2 PR-V1: 飛輪斷鏈 C6 修復驗收測試
+
+測試項目:
+ 1. test_validator_called_after_verification
+ — ENABLE=True 時,verify() 完成後 assess_self_healing 被呼叫
+
+ 2. test_low_score_triggers_rollback_proposal
+ — score < 0.5 時,Telegram rollback 提案被發送
+
+ 3. test_high_score_no_action
+ — score >= 0.5 時,Telegram 不觸發
+
+ 4. test_validator_failure_does_not_block_main_flow
+ — assess_self_healing 拋例外,verify() 仍返回正確結果
+
+ 5. test_feature_flag_disabled_skips
+ — ENABLE=False 時,assess_self_healing 不被呼叫
+
+2026-04-28 ogt + Claude Sonnet 4.6: W2 PR-V1 初始建立
+"""
+
+from __future__ import annotations
+
+import pytest
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from src.services.post_execution_verifier import PostExecutionVerifier
+from src.services.evidence_snapshot import EvidenceSnapshot
+from src.services.self_healing_validator import assess_self_healing
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# Stubs
+# ─────────────────────────────────────────────────────────────────────────────
+
+def _stub_incident(
+ alertname: str = "KubePodCrashLooping",
+ namespace: str = "awoooi-prod",
+ pod: str = "api-xyz",
+) -> object:
+ class _Signal:
+ labels = {
+ "alertname": alertname,
+ "namespace": namespace,
+ "pod": pod,
+ }
+
+ class _Incident:
+ incident_id = "INC-TEST"
+ signals = [_Signal()]
+
+ return _Incident()
+
+
+def _stub_snapshot(incident_id: str = "INC-TEST") -> EvidenceSnapshot:
+ snap = EvidenceSnapshot(incident_id=incident_id)
+ snap.pre_execution_state = {"status": "CrashLoopBackOff"}
+ return snap
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# assess_self_healing 單元測試(無 IO)
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestAssessSelfHealing:
+ """assess_self_healing() 純函數測試"""
+
+ def test_success_result_gives_high_score(self):
+ result = assess_self_healing(
+ pre_state={"status": "CrashLoopBackOff"},
+ post_state={"status": "Running", "containers": "1/1"},
+ verification_result="success",
+ action_taken="restart_service:api",
+ )
+ assert result["score"] >= 0.5
+ assert result["root_cause_cleared"] is True
+
+ def test_failed_result_gives_zero_score(self):
+ result = assess_self_healing(
+ pre_state={"status": "Running"},
+ post_state={"status": "CrashLoopBackOff"},
+ verification_result="failed",
+ action_taken="patch_config",
+ )
+ assert result["score"] == 0.0
+ assert result["root_cause_cleared"] is False
+
+ def test_degraded_result_gives_low_score(self):
+ result = assess_self_healing(
+ pre_state=None,
+ post_state={"status": "Pending"},
+ verification_result="degraded",
+ action_taken="scale_up",
+ )
+ assert result["score"] < 0.5
+
+ def test_regression_reduces_score(self):
+ """執行後出現新 CrashLoopBackOff → regression penalty 扣分"""
+ result = assess_self_healing(
+ pre_state={"status": "Running"},
+ post_state={"status": "Running", "reason": "CrashLoopBackOff"},
+ verification_result="success",
+ action_taken="restart_service",
+ )
+ # regression 要扣分
+ assert "crashloopbackoff" in result["regressions"]
+ # 即使 verification_result=success,regression 導致扣分
+ assert result["score"] < 1.0
+
+ def test_no_regression_full_score_on_success(self):
+ """乾淨的 success:無 regression、root cause 解除 → score=1.0"""
+ result = assess_self_healing(
+ pre_state={"status": "CrashLoopBackOff"},
+ post_state={"status": "Running", "containers": "1/1"},
+ verification_result="success",
+ action_taken="restart_service:api",
+ )
+ assert result["score"] == 1.0
+ assert result["regressions"] == []
+
+ def test_timeout_gives_low_base_score(self):
+ result = assess_self_healing(
+ pre_state=None,
+ post_state={},
+ verification_result="timeout",
+ action_taken="restart_service",
+ )
+ assert result["score"] == 0.2
+
+ def test_detail_is_human_readable(self):
+ result = assess_self_healing(
+ pre_state=None,
+ post_state={"status": "Running"},
+ verification_result="success",
+ action_taken="restart",
+ )
+ assert "base=" in result["detail"]
+
+
+# ─────────────────────────────────────────────────────────────────────────────
+# 整合測試:verify() → _run_self_healing_validator
+# ─────────────────────────────────────────────────────────────────────────────
+
+class TestVerifyIntegration:
+ """PostExecutionVerifier.verify() 串接 SelfHealingValidator 整合測試"""
+
+ @pytest.mark.asyncio
+ async def test_validator_called_after_verification(self):
+ """ENABLE=True → verify() 完成後 assess_self_healing 被呼叫"""
+ verifier = PostExecutionVerifier()
+ incident = _stub_incident()
+
+ with (
+ patch.object(
+ verifier,
+ "_collect_post_state",
+ new=AsyncMock(return_value={"status": "Running"}),
+ ),
+ patch("src.services.post_execution_verifier._update_snapshot", new=AsyncMock()),
+ patch(
+ "src.services.post_execution_verifier._run_self_healing_validator",
+ new=AsyncMock(),
+ ) as mock_validator,
+ ):
+ await verifier.verify(
+ incident=incident,
+ snapshot=None,
+ action_taken="restart_service:api",
+ warmup_sec=0.0,
+ )
+
+ mock_validator.assert_called_once()
+ call_kwargs = mock_validator.call_args.kwargs
+ assert call_kwargs["incident_id"] == "INC-TEST"
+ assert call_kwargs["verification_result"] == "success"
+
+ @pytest.mark.asyncio
+ async def test_low_score_triggers_rollback_proposal(self):
+ """score < 0.5 → Telegram rollback 提案被發送"""
+ with (
+ patch(
+ "src.services.self_healing_validator.assess_self_healing",
+ return_value={
+ "score": 0.2,
+ "root_cause_cleared": False,
+ "regressions": ["crashloopbackoff"],
+ "detail": "base=0.40; regression_penalty=0.15",
+ "verification_result": "degraded",
+ "action_taken": "restart_service",
+ },
+ ),
+ patch(
+ "src.services.post_execution_verifier._send_rollback_proposal_alert",
+ new=AsyncMock(),
+ ) as mock_send,
+ patch(
+ "src.core.config.get_settings",
+ return_value=MagicMock(ENABLE_SELF_HEALING_VALIDATOR=True),
+ ),
+ ):
+ from src.services.post_execution_verifier import _run_self_healing_validator
+ await _run_self_healing_validator(
+ incident_id="INC-LOW",
+ snapshot=None,
+ pre_state={"status": "Running"},
+ post_state={"status": "CrashLoopBackOff"},
+ verification_result="degraded",
+ action_taken="restart_service",
+ )
+
+ mock_send.assert_called_once()
+ call_kwargs = mock_send.call_args.kwargs
+ assert call_kwargs["score"] == 0.2
+ assert call_kwargs["incident_id"] == "INC-LOW"
+
+ @pytest.mark.asyncio
+ async def test_high_score_no_action(self):
+ """score >= 0.5 → Telegram rollback 提案不發送"""
+ with (
+ patch(
+ "src.services.self_healing_validator.assess_self_healing",
+ return_value={
+ "score": 1.0,
+ "root_cause_cleared": True,
+ "regressions": [],
+ "detail": "base=1.00",
+ "verification_result": "success",
+ "action_taken": "restart_service",
+ },
+ ),
+ patch(
+ "src.services.post_execution_verifier._send_rollback_proposal_alert",
+ new=AsyncMock(),
+ ) as mock_send,
+ patch(
+ "src.core.config.get_settings",
+ return_value=MagicMock(ENABLE_SELF_HEALING_VALIDATOR=True),
+ ),
+ ):
+ from src.services.post_execution_verifier import _run_self_healing_validator
+ await _run_self_healing_validator(
+ incident_id="INC-HIGH",
+ snapshot=None,
+ pre_state={"status": "CrashLoopBackOff"},
+ post_state={"status": "Running"},
+ verification_result="success",
+ action_taken="restart_service",
+ )
+
+ mock_send.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_validator_failure_does_not_block_main_flow(self):
+ """assess_self_healing 拋例外,verify() 仍返回正確結果"""
+ verifier = PostExecutionVerifier()
+ incident = _stub_incident()
+
+ with (
+ patch.object(
+ verifier,
+ "_collect_post_state",
+ new=AsyncMock(return_value={"status": "Running"}),
+ ),
+ patch("src.services.post_execution_verifier._update_snapshot", new=AsyncMock()),
+ # _run_self_healing_validator 本身 raise → 應被吞掉
+ patch(
+ "src.services.post_execution_verifier._run_self_healing_validator",
+ new=AsyncMock(side_effect=RuntimeError("validator exploded")),
+ ),
+ ):
+ # verify() 不應 raise,仍返回 "success"
+ result = await verifier.verify(
+ incident=incident,
+ snapshot=None,
+ action_taken="restart_service:api",
+ warmup_sec=0.0,
+ )
+
+ # verify() 的主流程結果不受影響
+ # 注意:_run_self_healing_validator 由 verify() await 直接呼叫,
+ # 其例外由 verify() 的 try/except(approve_execution 層級)或自身包住
+ # 此測試確認即使 validator 炸掉,result 仍是正確的驗證結果
+ assert result == "success"
+
+ @pytest.mark.asyncio
+ async def test_feature_flag_disabled_skips(self):
+ """ENABLE_SELF_HEALING_VALIDATOR=False → assess_self_healing 不被呼叫"""
+ import src.services.self_healing_validator as _shv
+ with (
+ patch.object(_shv, "assess_self_healing") as mock_assess,
+ patch(
+ "src.core.config.get_settings",
+ return_value=MagicMock(ENABLE_SELF_HEALING_VALIDATOR=False),
+ ),
+ ):
+ from src.services.post_execution_verifier import _run_self_healing_validator
+ await _run_self_healing_validator(
+ incident_id="INC-FLAG",
+ snapshot=None,
+ pre_state=None,
+ post_state={"status": "Running"},
+ verification_result="success",
+ action_taken="restart_service",
+ )
+
+ mock_assess.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_snapshot_self_healing_score_updated(self):
+ """score 補填 EvidenceSnapshot.self_healing_score"""
+ snap = _stub_snapshot()
+ snap.update_self_healing = AsyncMock()
+
+ with (
+ patch(
+ "src.services.self_healing_validator.assess_self_healing",
+ return_value={
+ "score": 0.85,
+ "root_cause_cleared": True,
+ "regressions": [],
+ "detail": "base=1.00",
+ "verification_result": "success",
+ "action_taken": "restart_service",
+ },
+ ),
+ patch(
+ "src.services.post_execution_verifier._send_rollback_proposal_alert",
+ new=AsyncMock(),
+ ),
+ patch(
+ "src.core.config.get_settings",
+ return_value=MagicMock(ENABLE_SELF_HEALING_VALIDATOR=True),
+ ),
+ ):
+ from src.services.post_execution_verifier import _run_self_healing_validator
+ await _run_self_healing_validator(
+ incident_id="INC-SNAP",
+ snapshot=snap,
+ pre_state={"status": "CrashLoopBackOff"},
+ post_state={"status": "Running"},
+ verification_result="success",
+ action_taken="restart_service",
+ )
+
+ snap.update_self_healing.assert_called_once()
+ call_kwargs = snap.update_self_healing.call_args.kwargs
+ assert call_kwargs["score"] == 0.85
+ assert call_kwargs["detail"]["root_cause_cleared"] is True
+ assert call_kwargs["detail"]["regressions"] == []