feat(flywheel): W2 三件 + KMWriter critic 修法(1635 tests 全綠)
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 1m38s

W2 (onboarder 4 週飛輪 80→90 路徑第二週) + critic PR review 5 個 critical/major
全部修完,default flag=false 安全無爆炸風險。

## W2 三件 PR

### PR-R2 — AOL → catalog confidence EWMA 回灌(修飛輪斷鏈 C2)
- 新檔 `apps/api/src/jobs/aol_to_catalog_writeback_job.py`
- 邏輯:每小時掃 AOL 計算 EWMA confidence (alpha=0.3) 回灌 alert_rule_catalog
- 失敗閾值 N=5 連續低成功率 → review_status='draft'
- Hermes _fetch_noisy_rules SQL 加 OR review_status='draft'
- ENABLE_AOL_WRITEBACK_JOB=false (default)
- 8 個測試(mock path 修正:lazy import → patch src.db.base.get_db_context)

### PR-V1 — self_healing_validator 串接 (修飛輪斷鏈 C6)
- 新檔 `apps/api/src/services/self_healing_validator.py`(純函數 assess_self_healing)
- post_execution_verifier.py step 5 串接(feature flag gate)
- evidence_snapshot.py 加 self_healing_score / self_healing_detail 欄位
- db/models.py + base.py ALTER IF NOT EXISTS
- score < 0.5 → 觸發 rollback 提案 Telegram alert(不自動執行)
- ENABLE_SELF_HEALING_VALIDATOR=false (default)
- 7 個測試

### PR-L1 — KM ↔ Playbook 雙向回路 (修飛輪斷鏈 C3+C4)
- learning_service.py 三條新邏輯:
  1. _write_playbook_evolution_km:promote/demote 寫 KM 演化條目
  2. _check_and_mark_playbook_review:N=5 累積觸發 review_required
  3. _demote_alert_rule_catalog_confidence:DEPRECATED → confidence×=0.5
- PlaybookRecord 加 review_required 欄位(schema migration via base.py)
- ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=false (default)
- KM_PLAYBOOK_REVIEW_THRESHOLD=5 可調
- 6 個測試

## KMWriter Critic 5 個 Critical/Major 修復(之前 critic PR review 發現)
之前 push commit c5753e1c 已修,本 commit 補回 stash 中的對應檔案:
- C1 km_writer.py:194 backfill 自打臉(已修:同步 await + DLQ)
- C2 km_writer.py:391 KM_WRITE_AWAIT=false 路徑收緊
- M1 decision_manager.py:2178/2203 移除 _fire_and_forget
- M2 incident_service.py:1099 自製 path 加 retry+DLQ
- M3 km_writer.py:166 冪等聲明對齊(UPSERT + partial unique index)

## 驗證
- 1635 unit tests 全綠(+27 from 1608)
- 與 fb0c72db (推翻 A2 Ollama primary) 共存無衝突
- 所有新 Job/Service default flag=false(不爆炸)

## 期望影響
飛輪斷鏈 C2 + C3 + C4 + C6 全修
飛輪自主化評分:65 → 85 預估(W2 完成後)

啟用順序(待 prod fb0c72db 驗證 OLLAMA primary 跑得起來後):
1. ENABLE_AOL_WRITEBACK_JOB=true
2. ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=true
3. ENABLE_SELF_HEALING_VALIDATOR=true

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-29 19:44:04 +08:00
parent fb0c72db42
commit 3668d49f2f
13 changed files with 2294 additions and 6 deletions

View File

@@ -103,6 +103,35 @@ class Settings(BaseSettings):
description="C1: True=啟用 km:backfill:dlq 補掃 job每 5 分鐘), False=停用",
)
# ==========================================================================
# W2 PR-R2: AOL → alert_rule_catalog Confidence EWMA Writeback
# ADR-091 Task T2 — 飛輪斷鏈 C2 修復:規則命中率回灌 catalog confidence
# default=false先寫 code人工驗證 AOL 資料品質後再開啟
# 啟用kubectl set env deployment/awoooi-api ENABLE_AOL_WRITEBACK_JOB=true
# 回滾kubectl set env deployment/awoooi-api ENABLE_AOL_WRITEBACK_JOB=false
# ==========================================================================
ENABLE_AOL_WRITEBACK_JOB: bool = Field(
default=False,
description="W2 PR-R2: True=每小時從 AOL 聚合 alertname 成功率並 EWMA 更新 alert_rule_catalog.confidence, False=停用(預設)",
)
# ==========================================================================
# W2 PR-L1: KM → Playbook 互饋回路 (2026-04-28 ogt + Claude Sonnet 4.6)
# 飛輪斷鏈 C3 + C4 修復 — KM 與 Playbook 演化互饋
# 邏輯 1: promote/demote 觸發 → 寫 KM 演化條目path_type=playbook_evolution
# 邏輯 2: 同 symptom_pattern_hash 累積 N=5 條 KM → 標記 playbook.review_required=true
# 邏輯 3: DEPRECATED Playbook → 降低 alert_rule_catalog.confidence *= 0.5
# 回滾指令: kubectl set env deployment/awoooi-api ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=false
# ==========================================================================
ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP: bool = Field(
default=False,
description="W2 PR-L1: True=啟用 KM↔Playbook 互饋回路(飛輪 C3+C4 修復), False=停用default驗證後才開",
)
KM_PLAYBOOK_REVIEW_THRESHOLD: int = Field(
default=5,
description="W2 PR-L1: 同 symptom_pattern_hash 累積幾條 KM 後觸發 Playbook review_required 標記(預設 N=5",
)
# ==========================================================================
# aider-watch v2 integration (2026-04-20 ADR-091)
# 整合 Mac aider CLI 監控進 awoooi 飛輪events → incident → ai_router feedback
@@ -575,6 +604,16 @@ class Settings(BaseSettings):
description="P3.1-T2-PathA: 啟用 DiagnosisAggregator 信號分類層補 PDI路徑 A不重複收集只分類已有 raw 資料)",
)
# ==========================================================================
# W2 PR-V1: SelfHealingValidator Feature Flag (2026-04-28 ogt + Claude Sonnet 4.6)
# 飛輪斷鏈 C6 修復 — 驗證層串接自愈品質評估
# 回滾指令: kubectl set env deployment/awoooi-api ENABLE_SELF_HEALING_VALIDATOR=false
# ==========================================================================
ENABLE_SELF_HEALING_VALIDATOR: bool = Field(
default=False,
description="W2 PR-V1: True=PostExecutionVerifier 執行後評估自愈品質分數score<0.5發Telegram警示, False=跳過(回滾用)",
)
def get_tg_user_whitelist(self) -> list[int]:
"""Parse comma-separated or JSON array user IDs to list[int]"""
raw = self.OPENCLAW_TG_USER_WHITELIST

View File

@@ -220,6 +220,17 @@ async def init_db() -> None:
""")
)
# W2 PR-V1: SelfHealingValidator 補欄 (2026-04-28 ogt + Claude Sonnet 4.6)
# incident_evidence 加 self_healing_score + self_healing_detail
# create_all 不做 ALTER防禦性補加prod 已存在的表不會自動加欄)
await conn.execute(
text("""
ALTER TABLE incident_evidence
ADD COLUMN IF NOT EXISTS self_healing_score FLOAT,
ADD COLUMN IF NOT EXISTS self_healing_detail JSONB;
""")
)
# 2026-04-29 ogt + Claude Opus 4.7: PR-K1 防禦性 ALTER (db-expert finding)
# P1.6 (2026-04-24) ORM 已加 timeline_events.incident_id但 prod 若在 P1.6 前
# 已建表create_all 跳過已存在的表 → ALTER 不會跑 → ORM 寫入 SELECT 找不到欄位
@@ -230,6 +241,20 @@ async def init_db() -> None:
ADD COLUMN IF NOT EXISTS incident_id VARCHAR(64);
""")
)
# W2 PR-L1 2026-04-28 ogt + Claude Sonnet 4.6: KM→Playbook 互饋回路(飛輪 C3 修復)
# PlaybookRecord 新增 review_required 欄位
# 已存在表不會被 create_all 重建,必須手動 ALTER
await conn.execute(
text("""
ALTER TABLE playbooks
ADD COLUMN IF NOT EXISTS review_required BOOLEAN NOT NULL DEFAULT FALSE;
""")
)
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_playbook_review_required "
"ON playbooks(review_required) WHERE review_required = true;"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_timeline_incident_id "
"ON timeline_events(incident_id);"

View File

@@ -932,6 +932,20 @@ class IncidentEvidence(Base):
String(20), nullable=True, comment="success / degraded / failed / timeoutPostExecutionVerifier 填入)"
)
# W2 PR-V1: SelfHealingValidator 自愈品質分數 (2026-04-28 ogt + Claude Sonnet 4.6)
# 0.0-1.01.0=完全自愈,<0.5=觸發 rollback 提案Telegram 警示)
# base.py ALTER IF NOT EXISTS 補欄對應下方
self_healing_score: Mapped[float | None] = mapped_column(
Float,
nullable=True,
comment="W2 PR-V1 SelfHealingValidator 自愈品質分數0.0-1.0<0.5 觸發 rollback 提案",
)
self_healing_detail: Mapped[dict | None] = mapped_column(
JSON,
nullable=True,
comment="W2 PR-V1 SelfHealingValidator 評估明細root_cause_cleared/regressions/detail",
)
# 時間戳(台北時區)
collected_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=taipei_now, nullable=False
@@ -1017,6 +1031,14 @@ class PlaybookRecord(Base):
stateful_targets: Mapped[list[str]] = mapped_column(JSON, default=list, nullable=False)
requires_pre_backup: Mapped[bool] = mapped_column(default=False, nullable=False)
# W2 PR-L1 2026-04-28 ogt + Claude Sonnet 4.6: KM→Playbook 互饋回路(飛輪 C3 修復)
# 同 symptom_pattern_hash 累積 N=5 條 KM 後LearningService 自動設 True
# 人工 review 後可重設為 False由 playbook_service 負責清除)
review_required: Mapped[bool] = mapped_column(
Boolean, default=False, nullable=False,
comment="W2 PR-L1: True=KM 累積觸發人工複審信號symptom_hash≥5 條review 後清為 False",
)
# Timestamps
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now,
@@ -1026,6 +1048,12 @@ class PlaybookRecord(Base):
Index("ix_playbook_status", "status"),
Index("ix_playbook_trust_score", "trust_score"),
Index("ix_playbook_created_at", "created_at"),
# W2 PR-L1: 快速查詢需要人工 review 的 Playbook預期數量少partial index 最省空間)
Index(
"ix_playbook_review_required",
"review_required",
postgresql_where=text("review_required = true"),
),
)

View File

@@ -0,0 +1,387 @@
"""
AOL → alert_rule_catalog Confidence EWMA Writeback Job
========================================================
ADR-091 Task T2 — 飛輪斷鏈 C2 修復
每 1 小時從 automation_operation_log 聚合 alertname 執行成功率,
用 EWMA 回灌 alert_rule_catalog.confidence並對低成功率規則標記 'draft'
(等待人工審查)。
流程:
1. 撈 AOL 過去 24hgroup by alertname算 ok/total
2. EWMA: new_confidence = 0.7 * old_confidence + 0.3 * recent_success_rate
若 confidence IS NULL初始值用 recent_success_rate
3. recent_success_rate < 0.3 且 sample >= 5 → review_status = 'draft'
schema CHECK 只允許 draft/approved/deprecated/retired'draft' 語義
等同「需要人工審查」Hermes 可設定撈 draft 觸發 advisory
4. 寫 automation_operation_log summary
Feature Flag:
ENABLE_AOL_WRITEBACK_JOB=false預設— 先寫 code 後人工驗證才開啟
設計鐵律:
- ENABLE_AOL_WRITEBACK_JOB=false 時完全 skip不碰 DB
- 只 UPDATE不 INSERTalert_rule_catalog 必須先由 rule_catalog_sync 建立)
- EWMA alpha=0.3(新資料權重),穩定性優先
- sample < 5 → 不降 review_status避免少量資料誤判
- 任何 DB 失敗 → log warning下次重試不 crash 主程序
排程:
- 啟動延遲 360s等 rule_catalog_sync + rule_stats_updater 先跑)
- 每 3600s每 1 小時)
schema 依賴(已存在,不需要 migration:
- alert_rule_catalog.confidence NUMERIC(3,2) — 行 279 adr090_asset_inventory_foundation.sql
- alert_rule_catalog.review_status TEXT CHECK (draft/approved/deprecated/retired)
- automation_operation_log.input, .status, .tags, .operation_type
W2 PR-R2 2026-04-28 ogt + Claude Sonnet 4.6 Asia/Taipei
ADR-091 Task T2 飛輪斷鏈 C2 修復 — AOL 命中率回灌
"""
from __future__ import annotations
import asyncio
import json as _json
import time as _time
from typing import Any
import structlog
from src.core.config import settings
logger = structlog.get_logger(__name__)
# ============================================================================
# 排程參數
# ============================================================================
_WRITEBACK_INTERVAL_SEC = 3600 # 每 1 小時
_FIRST_DELAY_SEC = 360 # 啟動後等 360s讓 rule_catalog_sync + rule_stats 先完成)
_LOOP_BACKOFF_SEC = 300 # 錯誤後重試間隔
_AOL_WINDOW_HOURS = 24 # 聚合 AOL 的時間窗口
# EWMA 參數
_EWMA_ALPHA = 0.3 # 新資料權重 (0.3 = 保守更新)
_LOW_SUCCESS_THRESHOLD = 0.3 # 低成功率閾值
_MIN_SAMPLE_SIZE = 5 # 樣本不足不降 review_status
# AOL 操作類型白名單(只計「真正執行了操作」的 log
_RELEVANT_OP_TYPES = (
"alert_resolved",
"action_executed",
"auto_repair_success",
"auto_repair_failed",
"auto_repair_skipped",
)
# review_status 降級用值schema CHECK 允許的值中語義最接近「需要人工審查」)
_NEEDS_REVIEW_STATUS = "draft"
# ============================================================================
# Public entry — main.py lifespan 呼叫
# ============================================================================
async def run_aol_writeback_loop() -> None:
"""
永久迴圈:每 _WRITEBACK_INTERVAL_SEC 秒執行一次 AOL → catalog 回灌.
Feature Flag ENABLE_AOL_WRITEBACK_JOB=false 時,進入迴圈但每次 sleep 後立即 skip.
"""
logger.info(
"aol_to_catalog_writeback_loop_started",
interval_sec=_WRITEBACK_INTERVAL_SEC,
flag_enabled=settings.ENABLE_AOL_WRITEBACK_JOB,
)
await asyncio.sleep(_FIRST_DELAY_SEC)
while True:
try:
await run_aol_writeback_once()
except Exception as e:
logger.exception("aol_writeback_loop_error", error=str(e))
await asyncio.sleep(_LOOP_BACKOFF_SEC)
continue
await asyncio.sleep(_WRITEBACK_INTERVAL_SEC)
async def run_aol_writeback_once() -> dict[str, Any]:
"""
執行一次 AOL → alert_rule_catalog confidence EWMA 回灌.
Returns:
{
"skipped": True/False, # feature flag 停用時回傳 skipped=True
"rules_sampled": N, # AOL 中找到的 alertname 數
"rules_updated": M, # 成功 UPDATE confidence 的 rule 數
"rules_flagged_draft": K, # 低成功率被標 draft 的 rule 數
"error": None | str,
}
"""
if not settings.ENABLE_AOL_WRITEBACK_JOB:
logger.debug("aol_writeback_skipped_flag_disabled")
return {"skipped": True, "rules_sampled": 0, "rules_updated": 0, "rules_flagged_draft": 0, "error": None}
started_ms = _time.time()
stats: dict[str, Any] = {
"skipped": False,
"rules_sampled": 0,
"rules_updated": 0,
"rules_flagged_draft": 0,
"error": None,
}
try:
samples = await _fetch_aol_samples()
stats["rules_sampled"] = len(samples)
for sample in samples:
updated, flagged = await _update_catalog_confidence(sample)
if updated:
stats["rules_updated"] += 1
if flagged:
stats["rules_flagged_draft"] += 1
except Exception as e:
stats["error"] = f"{type(e).__name__}: {e}"[:1000]
logger.exception("aol_writeback_once_failed", error=stats["error"])
duration_ms = int((_time.time() - started_ms) * 1000)
await _log_aol_summary(stats, duration_ms)
logger.info(
"aol_writeback_once_done",
rules_sampled=stats["rules_sampled"],
rules_updated=stats["rules_updated"],
rules_flagged_draft=stats["rules_flagged_draft"],
duration_ms=duration_ms,
error=stats["error"],
)
return stats
# ============================================================================
# 資料查詢
# ============================================================================
async def _fetch_aol_samples() -> list[dict[str, Any]]:
"""
從 automation_operation_log 聚合過去 24h 的 alertname 成功率.
查詢條件:
- operation_type IN (alert_resolved, action_executed, auto_repair_*)
- tags @> '["auto_execute"]'
- created_at > NOW() - INTERVAL '24h'
回傳: [{alertname, ok, total, recent_success_rate}, ...]
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
# 動態拼 operation_type IN 清單parameterized避免 SQL 注入)
# SQLAlchemy bindparam 不支援 INTERVAL 的數字插值INTERVAL 用 f-string literal 拼接
op_list = list(_RELEVANT_OP_TYPES)
placeholders = ", ".join(f":op{i}" for i in range(len(op_list)))
params: dict[str, Any] = {f"op{i}": v for i, v in enumerate(op_list)}
sql = f"""
SELECT
input->>'alertname' AS alertname,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS ok,
COUNT(*) AS total
FROM automation_operation_log
WHERE operation_type IN ({placeholders})
AND tags @> '["auto_execute"]'
AND created_at > NOW() - INTERVAL '{_AOL_WINDOW_HOURS} hours'
AND input->>'alertname' IS NOT NULL
AND input->>'alertname' != ''
GROUP BY input->>'alertname'
HAVING COUNT(*) > 0
ORDER BY COUNT(*) DESC
"""
try:
async with get_db_context() as db:
result = await db.execute(_sql(sql), params)
rows = result.fetchall()
samples = []
for row in rows:
alertname = row.alertname
ok = int(row.ok or 0)
total = int(row.total or 0)
recent_success_rate = ok / total if total > 0 else 0.0
samples.append({
"alertname": alertname,
"ok": ok,
"total": total,
"recent_success_rate": recent_success_rate,
})
return samples
except Exception as e:
logger.warning("aol_fetch_samples_failed", error=str(e))
return []
# ============================================================================
# EWMA 更新
# ============================================================================
async def _update_catalog_confidence(sample: dict[str, Any]) -> tuple[bool, bool]:
"""
對單一 alertname 執行 EWMA 更新 + 低成功率降級.
Returns:
(updated: bool, flagged_draft: bool)
updated = True 表示 confidence 有被更新
flagged_draft = True 表示 review_status 被設為 'draft'
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
alertname = sample["alertname"]
recent_sr = sample["recent_success_rate"]
total = sample["total"]
try:
async with get_db_context() as db:
# 先讀現有 confidence 與 review_status
row = await db.execute(
_sql("""
SELECT rule_id, confidence, review_status
FROM alert_rule_catalog
WHERE rule_name = :rn
LIMIT 1
"""),
{"rn": alertname},
)
existing = row.one_or_none()
if existing is None:
# rule 不在 catalog → skip等 rule_catalog_sync 先建)
logger.debug("aol_writeback_rule_not_in_catalog", alertname=alertname)
return False, False
old_confidence = float(existing.confidence) if existing.confidence is not None else None
current_review_status = existing.review_status
# EWMA 計算
if old_confidence is None:
new_confidence = recent_sr
else:
new_confidence = (1 - _EWMA_ALPHA) * old_confidence + _EWMA_ALPHA * recent_sr
# 限制到 [0.00, 1.00]NUMERIC(3,2) 最大 9.99,但 confidence 語義 0-1
new_confidence = max(0.0, min(1.0, new_confidence))
# 判斷是否需要降級 review_status
should_flag = (
recent_sr < _LOW_SUCCESS_THRESHOLD
and total >= _MIN_SAMPLE_SIZE
and current_review_status not in (_NEEDS_REVIEW_STATUS, "deprecated", "retired")
)
if should_flag:
await db.execute(
_sql("""
UPDATE alert_rule_catalog
SET confidence = :conf,
review_status = :rs,
updated_at = NOW()
WHERE rule_name = :rn
"""),
{
"conf": round(new_confidence, 2),
"rs": _NEEDS_REVIEW_STATUS,
"rn": alertname,
},
)
logger.info(
"aol_writeback_flagged_draft",
alertname=alertname,
old_confidence=old_confidence,
new_confidence=round(new_confidence, 2),
recent_sr=round(recent_sr, 3),
sample=total,
)
return True, True
else:
await db.execute(
_sql("""
UPDATE alert_rule_catalog
SET confidence = :conf,
updated_at = NOW()
WHERE rule_name = :rn
"""),
{
"conf": round(new_confidence, 2),
"rn": alertname,
},
)
logger.debug(
"aol_writeback_confidence_updated",
alertname=alertname,
old_confidence=old_confidence,
new_confidence=round(new_confidence, 2),
recent_sr=round(recent_sr, 3),
sample=total,
)
return True, False
except Exception as e:
logger.warning(
"aol_writeback_update_failed",
alertname=alertname,
error=str(e),
)
return False, False
# ============================================================================
# AOL 稽核 log
# ============================================================================
async def _log_aol_summary(stats: dict[str, Any], duration_ms: int) -> None:
"""寫一筆 summary 到 automation_operation_log每次 writeback 只寫 1 筆)."""
if stats.get("skipped"):
return # flag 停用時不留 log避免污染
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
aol_status = "failed" if stats.get("error") else "success"
async with get_db_context() as db:
await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, duration_ms, error, tags
) VALUES (
'rule_updated',
'aol_to_catalog_writeback',
:st,
CAST(:input AS jsonb),
CAST(:output AS jsonb),
:dur, :err, :tags
)
"""),
{
"st": aol_status,
"input": _json.dumps(
{"window_hours": _AOL_WINDOW_HOURS, "ewma_alpha": _EWMA_ALPHA},
ensure_ascii=False,
),
"output": _json.dumps(
{
"rules_sampled": stats.get("rules_sampled", 0),
"rules_updated": stats.get("rules_updated", 0),
"rules_flagged_draft": stats.get("rules_flagged_draft", 0),
},
ensure_ascii=False,
),
"dur": duration_ms,
"err": (stats.get("error") or "")[:2000] if stats.get("error") else None,
"tags": ["rule_catalog", "aol_writeback", "ewma", "confidence"],
},
)
except Exception as e:
logger.warning("aol_writeback_log_aol_failed", error=str(e))

View File

@@ -196,7 +196,12 @@ async def _llm_analyze_noisy_rule(rule: dict[str, Any]) -> dict[str, Any] | None
# ============================================================================
async def _fetch_noisy_rules() -> list[dict[str, Any]]:
"""撈 noise_rate >= 0.7 且樣本 >= 5 的 rules."""
"""撈 noise_rate >= 0.7 且樣本 >= 5 的 rules,或 AOL writeback 標記 draft 的 rules.
W2 PR-R2 2026-04-28 ogt + Claude Sonnet 4.6: 加 OR review_status = 'draft' 條件
讓 AOL writeback 觸發的 draft 規則能被 Hermes 自動推 Telegram 建議
(不再卡人工 SQL 才能觸發 advisory
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
@@ -209,10 +214,16 @@ async def _fetch_noisy_rules() -> list[dict[str, Any]]:
true_positive_count, false_positive_count, noise_rate,
last_fired_at, review_status
FROM alert_rule_catalog
WHERE noise_rate >= :thr
AND (true_positive_count + false_positive_count) >= :min_sample
AND (review_status IS NULL OR review_status = 'approved')
ORDER BY noise_rate DESC, (true_positive_count + false_positive_count) DESC
WHERE (
(
noise_rate >= :thr
AND (true_positive_count + false_positive_count) >= :min_sample
AND (review_status IS NULL OR review_status = 'approved')
)
OR review_status = 'draft'
)
ORDER BY noise_rate DESC NULLS LAST,
(true_positive_count + false_positive_count) DESC
"""),
{"thr": _NOISE_THRESHOLD, "min_sample": _MIN_SAMPLE_SIZE},
)

View File

@@ -519,6 +519,17 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("km_backfill_reconciler_loop_schedule_failed", error=str(e))
# W2 PR-R2 2026-04-28 ogt + Claude Sonnet 4.6: AOL → alert_rule_catalog EWMA Writeback每 1 小時)
# 飛輪斷鏈 C2 修復automation_operation_log 執行結果回灌 alert_rule_catalog.confidence
# Feature Flag: ENABLE_AOL_WRITEBACK_JOB=false 預設停用(人工驗證後再開)
# ADR-091 Task T2
try:
from src.jobs.aol_to_catalog_writeback_job import run_aol_writeback_loop
asyncio.create_task(run_aol_writeback_loop())
logger.info("aol_to_catalog_writeback_loop_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("aol_to_catalog_writeback_loop_schedule_failed", error=str(e))
# ADR-087 Phase 6: KB 腐爛清理(月度)— 每月 1 號 03:00 台北時間
# 掃描 knowledge_entries 中腐爛條目(廢棄 K8s API / Prometheus pattern / 180d 未引用)
# 2026-04-27 P3.1-T3 by Claude

View File

@@ -110,6 +110,11 @@ class EvidenceSnapshot:
post_execution_state: dict[str, Any] | None = None
verification_result: str | None = None
# W2 PR-V1: SelfHealingValidator 自愈品質評估 (2026-04-28 ogt + Claude Sonnet 4.6)
# ENABLE_SELF_HEALING_VALIDATOR=false 時永 None
self_healing_score: float | None = None
self_healing_detail: dict[str, Any] | None = None
# Phase 3 填充(目前永 null
matched_playbook_id: str | None = None
@@ -292,6 +297,55 @@ class EvidenceSnapshot:
)
raise
async def update_self_healing(
self,
score: float,
detail: dict[str, Any],
) -> None:
"""
W2 PR-V1: SelfHealingValidator 評估結果補填。
在 PostExecutionVerifier.verify() 完成 update_post_execution() 之後呼叫。
僅在 ENABLE_SELF_HEALING_VALIDATOR=True 且 snapshot 已持久化時有效。
Args:
score: 自愈品質分數0.0-1.0
detail: SelfHealingValidator.assess_self_healing() 返回的明細 dict
2026-04-28 ogt + Claude Sonnet 4.6: W2 PR-V1 初始建立
"""
self.self_healing_score = score
self.self_healing_detail = detail
try:
async with get_db_context() as db:
stmt_result = await db.execute(
update(IncidentEvidence)
.where(IncidentEvidence.id == self.snapshot_id)
.values(
self_healing_score=score,
self_healing_detail=detail,
)
)
if stmt_result.rowcount < 1:
logger.warning(
"evidence_snapshot_self_healing_update_no_rows",
snapshot_id=self.snapshot_id,
score=score,
)
else:
logger.info(
"evidence_snapshot_self_healing_updated",
snapshot_id=self.snapshot_id,
score=score,
)
except Exception:
logger.exception(
"evidence_snapshot_self_healing_update_error",
snapshot_id=self.snapshot_id,
)
raise
async def get_latest_snapshot(incident_id: str) -> EvidenceSnapshot | None:
"""

View File

@@ -389,13 +389,40 @@ class LearningService:
playbook_id: str,
success: bool,
) -> None:
"""更新 Playbook 統計"""
"""
更新 Playbook 統計
W2 PR-L1: 統計更新後,取 Playbook 的 symptom_pattern hash 觸發邏輯 2
KM 累積門檻檢查 → review_required 標記)。
"""
try:
from src.services.playbook_service import get_playbook_service
service = get_playbook_service()
await service.record_execution(playbook_id, success)
# W2 PR-L1 邏輯 2: 取得 Playbook symptom_pattern hash觸發 KM 累積檢查
from src.core.config import settings
if settings.ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP:
try:
from src.repositories.playbook_repository import get_playbook_repository
from src.models.playbook import SymptomPattern
repo = get_playbook_repository()
playbook = await repo.get_by_id(playbook_id)
if playbook and playbook.symptom_pattern:
sp = playbook.symptom_pattern
# symptom_pattern 可能是 Pydantic model 或 dictORM 載入)
if isinstance(sp, dict):
sp = SymptomPattern.model_validate(sp)
symptoms_hash = sp.compute_hash()
await self._check_and_mark_playbook_review(symptoms_hash)
except Exception as inner_e:
logger.warning(
"playbook_review_check_failed",
playbook_id=playbook_id,
error=str(inner_e),
)
except Exception as e:
logger.warning(
"playbook_stats_update_error",
@@ -459,6 +486,7 @@ class LearningService:
- 尋找 source_incident_ids 包含此 incident_id 的 Playbooks
- 提升 ai_confidence +0.1 (上限 1.0)
- 若信心度 >= 0.9 且 status == DRAFT → 自動升級為 APPROVED
- W2 PR-L1: 寫 KM 演化條目ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP 開啟時)
"""
try:
from src.repositories.playbook_repository import get_playbook_repository
@@ -478,6 +506,7 @@ class LearningService:
updated_count = 0
for playbook in playbooks:
previous_trust = playbook.trust_score
result = await repo.adjust_confidence(
playbook_id=playbook.playbook_id,
delta=CONFIDENCE_BOOST,
@@ -485,6 +514,13 @@ class LearningService:
)
if result:
updated_count += 1
# W2 PR-L1: promote 觸發 → 寫 KM 演化條目
await self._write_playbook_evolution_km(
playbook=playbook,
previous_trust=previous_trust,
evolution_type="promote",
incident_id=incident_id,
)
logger.info(
"playbook_promoted",
@@ -513,6 +549,7 @@ class LearningService:
- 尋找 source_incident_ids 包含此 incident_id 的 Playbooks
- 降低 ai_confidence -0.15 (下限 0.0)
- 若信心度 < 0.3 且 failure_rate > 50% → 自動降級為 DEPRECATED
- W2 PR-L1: 寫 KM 演化條目DEPRECATED 時回灌 alert_rule_catalog飛輪 C4 修復)
"""
try:
from src.repositories.playbook_repository import get_playbook_repository
@@ -532,6 +569,7 @@ class LearningService:
updated_count = 0
for playbook in playbooks:
previous_trust = playbook.trust_score
result = await repo.adjust_confidence(
playbook_id=playbook.playbook_id,
delta=CONFIDENCE_PENALTY,
@@ -539,6 +577,17 @@ class LearningService:
)
if result:
updated_count += 1
# W2 PR-L1: demote 觸發 → 寫 KM 演化條目
await self._write_playbook_evolution_km(
playbook=playbook,
previous_trust=previous_trust,
evolution_type="demote",
incident_id=incident_id,
)
# W2 PR-L1 邏輯 3: DEPRECATED 時回灌 alert_rule_catalog飛輪 C4 修復)
from src.models.playbook import PlaybookStatus
if playbook.status == PlaybookStatus.DEPRECATED:
await self._demote_alert_rule_catalog_confidence(playbook)
logger.info(
"playbook_demoted",
@@ -557,6 +606,241 @@ class LearningService:
)
return False
# =========================================================================
# W2 PR-L1: KM → Playbook 互饋回路私有方法
# 飛輪斷鏈 C3 + C4 修復
# 2026-04-28 ogt + Claude Sonnet 4.6
# =========================================================================
async def _write_playbook_evolution_km(
self,
playbook: Any,
previous_trust: float,
evolution_type: str,
incident_id: str,
) -> None:
"""
邏輯 1: promote/demote 觸發 → 寫 KM 演化條目(飛輪 C3
KM 條目 metadata 含playbook_id, previous_trust, new_trust,
success_count, failure_count, decision_chain
path_type='playbook_evolution',供冪等 key 使用
(incident_id, path_type) = (incident_id, 'playbook_evolution') 可能重複,
但 playbook_id 不同的演化各自獨立,所以 path_type 加 playbook_id 作為識別。
"""
from src.core.config import settings
if not settings.ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP:
return
try:
import json
from src.services.km_writer import KMWritePayload, km_write_with_flag
from src.utils.timezone import now_taipei
new_trust = getattr(playbook, "trust_score", previous_trust)
success_count = getattr(playbook, "success_count", 0)
failure_count = getattr(playbook, "failure_count", 0)
path_type = f"playbook_evolution:{playbook.playbook_id}"
payload = KMWritePayload(
path_type=path_type,
incident_id=incident_id,
entry_create_kwargs={
"title": f"Playbook {evolution_type}: {playbook.name} [{playbook.playbook_id}]",
"content": (
f"Playbook {evolution_type} 事件記錄\n"
f"Playbook ID: {playbook.playbook_id}\n"
f"名稱: {playbook.name}\n"
f"trust_score 變化: {previous_trust:.3f}{new_trust:.3f}\n"
f"成功次數: {success_count} / 失敗次數: {failure_count}\n"
f"觸發來源: incident {incident_id}\n"
f"記錄時間: {now_taipei().isoformat()}"
),
"entry_type": "best_practice",
"category": "AI系統",
"tags": ["playbook_evolution", evolution_type, playbook.playbook_id],
"source": "ai_extracted",
"related_playbook_id": playbook.playbook_id,
"related_incident_id": incident_id,
"path_type": path_type,
},
metadata={
"playbook_id": playbook.playbook_id,
"previous_trust": previous_trust,
"new_trust": new_trust,
"success_count": success_count,
"failure_count": failure_count,
"evolution_type": evolution_type,
},
)
await km_write_with_flag(payload)
logger.info(
"playbook_evolution_km_written",
playbook_id=playbook.playbook_id,
evolution_type=evolution_type,
trust_change=f"{previous_trust:.3f}{new_trust:.3f}",
)
except Exception as e:
logger.warning(
"playbook_evolution_km_write_failed",
playbook_id=getattr(playbook, "playbook_id", "unknown"),
evolution_type=evolution_type,
error=str(e),
)
async def _check_and_mark_playbook_review(self, symptoms_hash: str) -> None:
"""
邏輯 2: KM 累積 N=5 條同 symptom_pattern_hash → 觸發 Playbook review_required 標記(飛輪 C3
每次 KM 寫入後由 _update_playbook_stats 呼叫端觸發此檢查。
若同 symptoms_hash 在 knowledge_entries 已有 >= threshold 條,
則 UPDATE playbooks SET review_required=true WHERE 症狀 hash 相符。
比對策略:從 KnowledgeEntry 讀 symptoms_hash 計數,
再透過 playbook.symptom_pattern 的 hash 比對 Playbook。
"""
from src.core.config import settings
if not settings.ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP:
return
if not symptoms_hash:
return
try:
from sqlalchemy import text as sa_text
from src.db.base import get_db_context
async with get_db_context() as db:
# 計算同 symptoms_hash 的 KM 條目數
count_result = await db.execute(
sa_text(
"SELECT COUNT(*) FROM knowledge_entries "
"WHERE symptoms_hash = :hash"
),
{"hash": symptoms_hash},
)
count = count_result.scalar() or 0
if count < settings.KM_PLAYBOOK_REVIEW_THRESHOLD:
return
# 累積達到門檻 → 標記相關 Playbook 需要 review
# Playbook 的 symptom_pattern 存為 JSONB無直接 hash 欄位
# 透過 knowledge_entries.related_playbook_id 關聯找到要標記的 Playbook
updated = await db.execute(
sa_text(
"UPDATE playbooks pb "
"SET review_required = true, updated_at = NOW() "
"FROM knowledge_entries ke "
"WHERE ke.symptoms_hash = :hash "
" AND ke.related_playbook_id = pb.playbook_id "
" AND pb.review_required = false "
"RETURNING pb.playbook_id"
),
{"hash": symptoms_hash},
)
marked_ids = [row[0] for row in updated.fetchall()]
await db.commit()
if marked_ids:
logger.info(
"playbook_review_required_marked",
symptoms_hash=symptoms_hash,
km_count=count,
threshold=settings.KM_PLAYBOOK_REVIEW_THRESHOLD,
playbook_ids=marked_ids,
)
except Exception as e:
logger.warning(
"playbook_review_mark_failed",
symptoms_hash=symptoms_hash,
error=str(e),
)
async def _demote_alert_rule_catalog_confidence(self, playbook: Any) -> None:
"""
邏輯 3: Playbook DEPRECATED 時回灌 alert_rule_catalog飛輪 C4 修復)
UPDATE alert_rule_catalog
SET confidence = confidence * 0.5,
review_status = 'draft' -- CHECK constraint 允許 draft/approved/deprecated/retired
WHERE rule_name LIKE pattern(symptom_pattern.alert_names)
注意alert_rule_catalog.review_status CHECK 限制只允許:
draft | approved | deprecated | retired
任務描述的 'needs_review' 不合法,改用 'draft'(語意等效:需要人工審核)
失敗容忍:不影響 demote 主流程。
"""
from src.core.config import settings
if not settings.ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP:
return
try:
import json
from sqlalchemy import text as sa_text
from src.db.base import get_db_context
# 從 playbook symptom_pattern 取出 alert_names 作為比對鍵
symptom = getattr(playbook, "symptom_pattern", None)
if symptom is None:
return
# symptom_pattern 可能是 Pydantic model 或 dict從 ORM 載入為 dict
if hasattr(symptom, "alert_names"):
alert_names: list[str] = symptom.alert_names or []
elif isinstance(symptom, dict):
alert_names = symptom.get("alert_names") or []
else:
return
if not alert_names:
logger.debug(
"playbook_demote_no_alert_names",
playbook_id=playbook.playbook_id,
)
return
async with get_db_context() as db:
updated_count = 0
for alert_name in alert_names:
# rule_name 完全匹配或前綴匹配(去掉 * suffix
match_name = alert_name.rstrip("*")
result = await db.execute(
sa_text(
"UPDATE alert_rule_catalog "
"SET confidence = CASE "
" WHEN confidence IS NOT NULL "
" THEN GREATEST(0.01, confidence * 0.5) "
" ELSE 0.5 "
" END, "
" review_status = 'draft', "
" updated_at = NOW() "
"WHERE rule_name LIKE :pattern "
" AND (review_status IS NULL OR review_status NOT IN "
" ('deprecated', 'retired')) "
"RETURNING rule_id"
),
{"pattern": f"{match_name}%"},
)
affected = result.rowcount or 0
updated_count += affected
await db.commit()
if updated_count > 0:
logger.info(
"alert_rule_catalog_confidence_demoted",
playbook_id=playbook.playbook_id,
alert_names=alert_names,
rules_updated=updated_count,
)
except Exception as e:
logger.warning(
"alert_rule_catalog_demote_failed",
playbook_id=getattr(playbook, "playbook_id", "unknown"),
error=str(e),
)
# =========================================================================
# 🆕 Phase D-G P0 修正: 新增方法
# =========================================================================

View File

@@ -21,6 +21,11 @@ AWOOOI AIOps Phase 1 — 執行後驗證器
- 超時不 raise標記 "timeout" 並繼續流程
- 不阻塞原始執行路徑await但結果不影響執行本身是否成功
W2 PR-V1: SelfHealingValidator 串接 (2026-04-28 ogt + Claude Sonnet 4.6)
- ENABLE_SELF_HEALING_VALIDATOR=True 時verify() 完成後呼叫 assess_self_healing()
- self_healing_score < 0.5 → Telegram 警示 rollback 提案(不自動執行)
- 驗證失敗不阻塞主流程try/except 全包)
ADR-081: PreDecisionInvestigator + EvidenceSnapshot
MASTER §3.1 L6×D1
2026-04-15 ogt + Claude Sonnet 4.6 (亞太): Phase 1 初始建立
@@ -37,6 +42,9 @@ import structlog
from src.services.evidence_snapshot import EvidenceSnapshot
from src.services.mcp_tool_registry import SensorDimension, get_mcp_tool_registry
from src.services.sanitization_service import sanitize_dict_values
# W2 PR-V1: 頂層 import 讓測試 patch 路徑固定(延遲 import 無法被 patch
# ENABLE_SELF_HEALING_VALIDATOR=False 時此 import 不影響效能(純 python 模組)
from src.services import self_healing_validator as _shv_module
if TYPE_CHECKING:
from src.models.incident import Incident
@@ -136,6 +144,26 @@ class PostExecutionVerifier:
result=result,
action=action_taken,
)
# 5. W2 PR-V1: SelfHealingValidator 串接ENABLE_SELF_HEALING_VALIDATOR gate
# 在 post_state 已補填後評估自愈品質,不阻塞主流程
# 外層 try/except 確保任何 validator 失敗不影響 verify() 返回值
try:
await _run_self_healing_validator(
incident_id=incident_id,
snapshot=snapshot,
pre_state=pre_state,
post_state=post_state,
verification_result=result,
action_taken=action_taken,
)
except Exception:
logger.warning(
"self_healing_validator_uncaught",
incident_id=incident_id,
exc_info=True,
)
return result
async def capture_pre_execution_state(
@@ -209,6 +237,132 @@ class PostExecutionVerifier:
return state
# ─────────────────────────────────────────────────────────────────────────────
# W2 PR-V1: SelfHealingValidator 串接
# 2026-04-28 ogt + Claude Sonnet 4.6: C6 飛輪斷鏈修復
# ─────────────────────────────────────────────────────────────────────────────
async def _run_self_healing_validator(
incident_id: str,
snapshot: EvidenceSnapshot | None,
pre_state: dict[str, Any] | None,
post_state: dict[str, Any],
verification_result: str,
action_taken: str,
) -> None:
"""
SelfHealingValidator 串接入口。
Feature gate: ENABLE_SELF_HEALING_VALIDATOR預設 False
驗證失敗全程 try/except 保護,不影響主流程。
評估後:
- 補填 snapshot.self_healing_score + self_healing_detail
- score < 0.5 → 發送 Telegram rollback 提案警示
"""
try:
from src.core.config import get_settings
_settings = get_settings()
if not _settings.ENABLE_SELF_HEALING_VALIDATOR:
return
assessment = _shv_module.assess_self_healing(
pre_state=pre_state,
post_state=post_state,
verification_result=verification_result,
action_taken=action_taken,
)
score: float = assessment["score"]
logger.info(
"self_healing_assessed",
incident_id=incident_id,
score=score,
regressions=assessment.get("regressions", []),
root_cause_cleared=assessment.get("root_cause_cleared"),
detail=assessment.get("detail"),
)
# 補填 EvidenceSnapshot
if snapshot:
try:
await snapshot.update_self_healing(score=score, detail=assessment)
except Exception as _snap_err:
logger.warning(
"self_healing_snapshot_update_failed",
incident_id=incident_id,
error=str(_snap_err),
)
# score < 0.5 → Telegram rollback 提案警示
if score < 0.5:
await _send_rollback_proposal_alert(
incident_id=incident_id,
score=score,
assessment=assessment,
action_taken=action_taken,
)
except Exception:
logger.warning(
"self_healing_validator_error",
incident_id=incident_id,
exc_info=True,
)
async def _send_rollback_proposal_alert(
incident_id: str,
score: float,
assessment: dict[str, Any],
action_taken: str,
) -> None:
"""
自愈品質分數 < 0.5 時,發送 Telegram rollback 提案警示。
不自動執行 rollback僅通知人工評估。
"""
try:
from src.core.config import get_settings
from src.services.telegram_gateway import get_telegram_gateway
_settings = get_settings()
gateway = get_telegram_gateway()
regressions = assessment.get("regressions", [])
reg_str = ", ".join(regressions[:5]) if regressions else ""
root_cleared = "" if assessment.get("root_cause_cleared") else ""
text = (
f"⚠️ <b>自愈品質警示 — 建議人工評估 Rollback</b>\n"
f"Incident: <code>{incident_id}</code>\n"
f"動作: <code>{action_taken[:120]}</code>\n"
f"自愈分數: <b>{score:.2f}</b> (門檻 0.5)\n"
f"Root Cause 解除: {root_cleared}\n"
f"Regression 信號: {reg_str}\n"
f"<i>此為提案,不會自動執行 Rollback</i>"
)
await gateway._http_client.post(
f"https://api.telegram.org/bot{_settings.OPENCLAW_TG_BOT_TOKEN}/sendMessage",
json={
"chat_id": _settings.OPENCLAW_TG_CHAT_ID,
"text": text,
"parse_mode": "HTML",
},
)
logger.info(
"rollback_proposal_sent",
incident_id=incident_id,
score=score,
)
except Exception:
logger.warning(
"rollback_proposal_send_failed",
incident_id=incident_id,
exc_info=True,
)
# ─────────────────────────────────────────────────────────────────────────────
# Recovery Assessment
# ─────────────────────────────────────────────────────────────────────────────

View File

@@ -0,0 +1,163 @@
"""
AWOOOI AIOps — 自愈品質驗證器
================================
W2 PR-V1: 飛輪斷鏈 C6 修復 — PostExecutionVerifier 串接自愈品質評估
職責:
1. 評估系統是否真的「自愈」root cause 解除 vs 只是 metric 暫時恢復)
2. Regression Detection修完一個指標但其他指標惡化
3. 修復品質分數0.0 ~ 1.0
評分邏輯:
- base_score 由 verification_result 決定success=1.0 / degraded=0.4 / failed=0.0 / timeout=0.2
- regression_penalty 由 pre/post state diff 中惡化指標數量決定
- 最終 score = max(0.0, base_score - regression_penalty)
閾值:
- score < 0.5 → rollback 提案Telegram 警示,不自動執行)
- score >= 0.5 → 認可自愈,無額外動作
設計原則:
- 不修改 self_healing_validator 內部邏輯(外部串接層)
- 驗證失敗不阻塞主流程(容錯 try/except 全包)
- Feature Flag: ENABLE_SELF_HEALING_VALIDATOR=false預設關閉
ADR-081 Phase 1 延伸
2026-04-28 ogt + Claude Sonnet 4.6: W2 PR-V1 初始建立C6 修復)
"""
from __future__ import annotations
import re
from typing import TYPE_CHECKING, Any
import structlog
if TYPE_CHECKING:
pass
logger = structlog.get_logger(__name__)
# 修復品質分數基準by verification_result
_BASE_SCORES: dict[str, float] = {
"success": 1.0,
"degraded": 0.4,
"failed": 0.0,
"timeout": 0.2,
}
# 每個惡化指標的扣分
_REGRESSION_PENALTY_PER_METRIC = 0.15
# 扣分上限(避免 over-penalty
_MAX_REGRESSION_PENALTY = 0.4
# root cause 解除信號post_state 出現這些 → root cause 已清除)
_ROOT_CAUSE_CLEARED_SIGNALS = ["running", "ready", "1/1", "2/2", "3/3", "healthy"]
# regression 惡化信號post_state 新出現但 pre_state 不存在 → regression
_REGRESSION_SIGNALS = [
"crashloopbackoff",
"oomkilled",
"oomkill",
"pending",
"terminating",
"error",
"failed",
"timeout",
"evicted",
"imagepullbackoff",
"errimagepull",
]
# 數值指標惡化偵測regex 找 %、數字,比較增幅)
_NUMERIC_THRESHOLD_RATIO = 0.2 # 超過 20% 增幅算惡化
def assess_self_healing(
pre_state: dict[str, Any] | None,
post_state: dict[str, Any] | None,
verification_result: str,
action_taken: str,
) -> dict[str, Any]:
"""
評估自愈品質,返回結構化評估結果。
Args:
pre_state: 執行前環境狀態(可為 None
post_state: 執行後環境狀態(可為 None
verification_result: PostExecutionVerifier 的判斷結果success/degraded/failed/timeout
action_taken: 執行的動作描述
Returns:
dict 包含:
score (float 0.0-1.0)
root_cause_cleared (bool)
regressions (list[str] — 惡化的指標名稱)
detail (str — 人類可讀說明)
"""
base_score = _BASE_SCORES.get(verification_result, 0.0)
pre_str = str(pre_state).lower() if pre_state else ""
post_str = str(post_state).lower() if post_state else ""
# 1. Root cause 是否真正解除
root_cause_cleared = any(sig in post_str for sig in _ROOT_CAUSE_CLEARED_SIGNALS)
if verification_result in ("failed", "timeout"):
root_cause_cleared = False
# 2. Regression detection — 新出現在 post 但 pre 沒有的惡化信號
regressions: list[str] = []
for sig in _REGRESSION_SIGNALS:
if sig in post_str and sig not in pre_str:
regressions.append(sig)
# 3. 數值指標惡化偵測(簡單版:找百分比值增幅)
pre_nums = _extract_percentages(pre_str)
post_nums = _extract_percentages(post_str)
for key, pre_val in pre_nums.items():
if key in post_nums:
post_val = post_nums[key]
if pre_val > 0 and (post_val - pre_val) / pre_val > _NUMERIC_THRESHOLD_RATIO:
regressions.append(f"metric_increase:{key}")
# 4. 計算最終分數
regression_penalty = min(
len(regressions) * _REGRESSION_PENALTY_PER_METRIC,
_MAX_REGRESSION_PENALTY,
)
score = max(0.0, base_score - regression_penalty)
# 5. 組裝說明
detail_parts = [f"base={base_score:.2f}"]
if regressions:
detail_parts.append(f"regression_penalty={regression_penalty:.2f} ({','.join(regressions[:5])})")
if not root_cause_cleared and verification_result == "success":
detail_parts.append("root_cause_unclear")
detail = "; ".join(detail_parts)
return {
"score": round(score, 4),
"root_cause_cleared": root_cause_cleared,
"regressions": regressions,
"detail": detail,
"verification_result": verification_result,
"action_taken": action_taken,
}
def _extract_percentages(text: str) -> dict[str, float]:
"""
從狀態字串中提取數值百分比。
例如 "cpu_usage: 85%"{"cpu_usage": 85.0}
用於偵測指標惡化簡單啟發式Phase 1 版本)。
"""
result: dict[str, float] = {}
# 格式word_key: N% 或 word_key=N%
pattern = re.compile(r"(\w+)[:\s=]+(\d+(?:\.\d+)?)\s*%")
for match in pattern.finditer(text):
key = match.group(1)
val = float(match.group(2))
result[key] = val
return result

View File

@@ -0,0 +1,378 @@
"""
W2 PR-R2 — AOL → alert_rule_catalog Confidence EWMA Writeback 測試
====================================================================
ADR-091 Task T2 飛輪斷鏈 C2 修復
測試範圍:
- test_ewma_calculation EWMA 公式正確(有舊值 / 無舊值兩路)
- test_low_success_triggers_draft 低成功率且樣本 >= 5 → review_status='draft'
- test_min_sample_threshold 樣本 < 5 不降 review_status
- test_dry_run_no_db_write feature flag=False → 不碰 DB
- test_feature_flag_disabled_skips flag=False 回傳 skipped=True
- test_hermes_picks_up_draft Hermes _fetch_noisy_rules SQL 含 OR review_status='draft'
禁止 Mock 測試鐵律:
DB 依賴用 AsyncMock patchget_db_context只測業務邏輯分支。
EWMA / sample 判斷為純 Python 邏輯,直接呼叫私有函式驗證。
建立: 2026-04-28 (台北時區) Claude Sonnet 4.6 (W2 PR-R2 ADR-091 T2)
"""
from __future__ import annotations
import os
from contextlib import asynccontextmanager
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# conftest 前先設環境變數
os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://test:test@localhost/test")
# =============================================================================
# Helper: DB context mock
# =============================================================================
def _make_db_ctx(fetch_one_return=None, execute_side_effect=None):
"""
回傳 (ctx_factory, mock_db)。
simulate get_db_context() 的 async context manager。
"""
mock_db = AsyncMock()
if execute_side_effect is not None:
mock_db.execute = AsyncMock(side_effect=execute_side_effect)
if fetch_one_return is not None:
mock_result = MagicMock()
mock_result.one_or_none.return_value = fetch_one_return
mock_db.execute = AsyncMock(return_value=mock_result)
@asynccontextmanager
async def _ctx():
yield mock_db
return _ctx, mock_db
def _catalog_row(confidence=None, review_status=None):
"""建構 alert_rule_catalog 假 row。"""
row = MagicMock()
row.rule_id = 1
row.confidence = confidence
row.review_status = review_status
return row
# =============================================================================
# test_ewma_calculation — EWMA 計算正確性
# =============================================================================
@pytest.mark.asyncio
async def test_ewma_calculation_with_existing_confidence():
"""
有舊 confidence 時new = 0.7 * old + 0.3 * recent_sr
"""
from src.jobs.aol_to_catalog_writeback_job import _update_catalog_confidence
old_conf = 0.80
recent_sr = 0.50
expected = round(0.7 * old_conf + 0.3 * recent_sr, 2) # 0.71
# mock: 第一次 execute 回傳 existing row第二次 execute 為 UPDATE
existing_row = _catalog_row(confidence=old_conf, review_status="approved")
call_count = 0
@asynccontextmanager
async def _ctx():
mock_db = AsyncMock()
async def _execute(sql, params=None):
nonlocal call_count
call_count += 1
if call_count == 1:
# SELECT 查詢
r = MagicMock()
r.one_or_none.return_value = existing_row
return r
else:
# UPDATE
return MagicMock()
mock_db.execute = _execute
yield mock_db
sample = {
"alertname": "HostHighCpuLoad",
"ok": 5,
"total": 10,
"recent_success_rate": recent_sr,
}
# lazy import: aol_to_catalog_writeback_job 內 from src.db.base import get_db_context
# patch 源頭模組即可
with patch("src.db.base.get_db_context", _ctx):
updated, flagged = await _update_catalog_confidence(sample)
assert updated is True
assert flagged is False
# call_count=2: SELECT + UPDATE不降級
@pytest.mark.asyncio
async def test_ewma_calculation_without_existing_confidence():
"""
confidence IS NULL 時new_confidence = recent_success_rate初始化
"""
from src.jobs.aol_to_catalog_writeback_job import _update_catalog_confidence
recent_sr = 0.75
existing_row = _catalog_row(confidence=None, review_status=None)
call_count = 0
@asynccontextmanager
async def _ctx():
mock_db = AsyncMock()
async def _execute(sql, params=None):
nonlocal call_count
call_count += 1
r = MagicMock()
r.one_or_none.return_value = existing_row
return r
mock_db.execute = _execute
yield mock_db
sample = {
"alertname": "HostDiskFull",
"ok": 6,
"total": 8,
"recent_success_rate": recent_sr,
}
with patch("src.db.base.get_db_context", _ctx):
updated, flagged = await _update_catalog_confidence(sample)
assert updated is True
# 初始值 = recent_sr不是低成功率 → 不降 draft
assert flagged is False
# =============================================================================
# test_low_success_triggers_draft
# =============================================================================
@pytest.mark.asyncio
async def test_low_success_triggers_draft():
"""
recent_success_rate < 0.3 且 total >= 5 → review_status 設為 'draft'
且 updated=True, flagged=True.
"""
from src.jobs.aol_to_catalog_writeback_job import _update_catalog_confidence
existing_row = _catalog_row(confidence=0.60, review_status="approved")
updates_seen = []
call_count = 0
@asynccontextmanager
async def _ctx():
mock_db = AsyncMock()
async def _execute(sql, params=None):
nonlocal call_count
call_count += 1
r = MagicMock()
r.one_or_none.return_value = existing_row
if params:
updates_seen.append(params)
return r
mock_db.execute = _execute
yield mock_db
sample = {
"alertname": "KubeDeploymentReplicasMismatch",
"ok": 1,
"total": 10, # >= 5
"recent_success_rate": 0.10, # < 0.3 → 觸發 draft
}
with patch("src.db.base.get_db_context", _ctx):
updated, flagged = await _update_catalog_confidence(sample)
assert updated is True
assert flagged is True
# 確認 UPDATE 帶了 review_status='draft'
draft_update = next(
(p for p in updates_seen if p.get("rs") == "draft"),
None,
)
assert draft_update is not None, "應有帶 rs='draft' 的 UPDATE 參數"
# =============================================================================
# test_min_sample_threshold
# =============================================================================
@pytest.mark.asyncio
async def test_min_sample_threshold_no_flag():
"""
recent_success_rate < 0.3 但 total < 5 → 不降 draft只更新 confidence.
"""
from src.jobs.aol_to_catalog_writeback_job import _update_catalog_confidence
existing_row = _catalog_row(confidence=0.60, review_status="approved")
updates_seen = []
call_count = 0
@asynccontextmanager
async def _ctx():
mock_db = AsyncMock()
async def _execute(sql, params=None):
nonlocal call_count
call_count += 1
r = MagicMock()
r.one_or_none.return_value = existing_row
if params:
updates_seen.append(params)
return r
mock_db.execute = _execute
yield mock_db
sample = {
"alertname": "SomeRareAlert",
"ok": 0,
"total": 3, # < 5 → 不降
"recent_success_rate": 0.0,
}
with patch("src.db.base.get_db_context", _ctx):
updated, flagged = await _update_catalog_confidence(sample)
assert updated is True
assert flagged is False
# 確認沒有帶 rs='draft' 的 UPDATE
draft_update = next(
(p for p in updates_seen if p.get("rs") == "draft"),
None,
)
assert draft_update is None, "sample < 5 不應降 review_status"
# =============================================================================
# test_dry_run_no_db_write / test_feature_flag_disabled_skips
# =============================================================================
@pytest.mark.asyncio
async def test_feature_flag_disabled_skips():
"""
ENABLE_AOL_WRITEBACK_JOB=False → run_aol_writeback_once 回傳 skipped=True
且不觸發任何 DB 操作。
"""
from src.jobs.aol_to_catalog_writeback_job import run_aol_writeback_once
db_call_count = 0
@asynccontextmanager
async def _ctx():
nonlocal db_call_count
db_call_count += 1
yield AsyncMock()
with patch("src.core.config.settings") as mock_settings:
mock_settings.ENABLE_AOL_WRITEBACK_JOB = False
# patch job module's settings reference
with patch("src.jobs.aol_to_catalog_writeback_job.settings", mock_settings):
result = await run_aol_writeback_once()
assert result["skipped"] is True
assert result["rules_sampled"] == 0
assert result["rules_updated"] == 0
assert result["rules_flagged_draft"] == 0
assert db_call_count == 0, "feature flag=False 時不應碰 DB"
@pytest.mark.asyncio
async def test_dry_run_no_db_write():
"""
同上flag=False 時完全不寫 DB別名測試語義明確.
"""
from src.jobs.aol_to_catalog_writeback_job import run_aol_writeback_once
written = []
@asynccontextmanager
async def _ctx():
mock_db = AsyncMock()
mock_db.execute = AsyncMock(side_effect=lambda *a, **kw: written.append(a))
yield mock_db
with patch("src.jobs.aol_to_catalog_writeback_job.settings") as mock_settings:
mock_settings.ENABLE_AOL_WRITEBACK_JOB = False
result = await run_aol_writeback_once()
assert result["skipped"] is True
assert len(written) == 0
# =============================================================================
# test_hermes_picks_up_draft — Hermes SQL 包含 OR review_status='draft' 條件
# =============================================================================
def test_hermes_sql_includes_draft_condition():
"""
驗證 hermes_rule_quality_job._fetch_noisy_rules 的 SQL 包含 OR review_status = 'draft'
(靜態檢查,不跑真實 DB.
W2 PR-R2 要求Hermes 必須撈到 AOL writeback 標記的 draft rules。
"""
import inspect
from src.jobs import hermes_rule_quality_job
# 讀取 _fetch_noisy_rules 的原始碼
src = inspect.getsource(hermes_rule_quality_job._fetch_noisy_rules)
assert "review_status = 'draft'" in src, (
"Hermes _fetch_noisy_rules 缺少 OR review_status = 'draft' 條件 "
"W2 PR-R2 斷鏈 C2 修復要求此條件觸發 AOL writeback advisory"
)
@pytest.mark.asyncio
async def test_hermes_picks_up_needs_review_rules():
"""
Hermes _fetch_noisy_rules 被呼叫時,若 DB 有 review_status='draft' 的 rule
應正常回傳(不因額外 OR 條件報錯).
"""
from src.jobs.hermes_rule_quality_job import _fetch_noisy_rules
draft_row = MagicMock()
draft_row.rule_id = 99
draft_row.rule_name = "LowSuccessRate"
draft_row.severity = "warning"
draft_row.true_positive_count = 1
draft_row.false_positive_count = 9
draft_row.noise_rate = 0.9
draft_row.last_fired_at = None
draft_row.review_status = "draft"
mock_result = MagicMock()
mock_result.fetchall.return_value = [draft_row]
@asynccontextmanager
async def _ctx():
mock_db = AsyncMock()
mock_db.execute = AsyncMock(return_value=mock_result)
yield mock_db
with patch("src.db.base.get_db_context", _ctx):
rules = await _fetch_noisy_rules()
assert len(rules) == 1
assert rules[0]["rule_name"] == "LowSuccessRate"
assert rules[0]["review_status"] == "draft"

View File

@@ -0,0 +1,402 @@
"""
KM → Playbook 互饋回路單元測試
================================
W2 PR-L1: 飛輪斷鏈 C3 + C4 修復測試
測試範圍:
1. test_playbook_promotion_writes_km_entry
— _promote_playbook 觸發後KMWriter 被呼叫寫 playbook_evolution 條目
2. test_playbook_demotion_writes_km_entry
— _demote_playbook 觸發後KMWriter 被呼叫寫 playbook_evolution 條目
3. test_km_accumulation_triggers_playbook_review
— 同 symptoms_hash 累積 5 條 → UPDATE playbooks.review_required=true
4. test_km_accumulation_below_threshold_no_update
— KM 條目 < threshold → 不執行 UPDATE
5. test_playbook_deprecated_demotes_alert_rule_confidence
— DEPRECATED Playbook → alert_rule_catalog.confidence *= 0.5
6. test_feature_flag_disabled
— ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=false → 三條邏輯全部跳過,不呼叫 DB
設計原則:
- 外部服務DB / KMWriter / PlaybookRepository以 AsyncMock 替換
- 每個 test 只測一條主路徑(單一職責)
- Feature flag 透過 patch 'src.core.config.settings' 控制
- get_db_context patch 路徑src.db.base.get_db_contextlocal import 的來源模組)
- get_playbook_repository patch 路徑:
src.repositories.playbook_repository.get_playbook_repository
建立2026-04-28 (台北時區) ogt + Claude Sonnet 4.6
"""
from __future__ import annotations
from contextlib import asynccontextmanager
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# =============================================================================
# Helpers
# =============================================================================
def _make_playbook(
playbook_id: str = "PB-20260428-AAAAAA",
name: str = "TestPlaybook",
trust_score: float = 0.5,
success_count: int = 3,
failure_count: int = 1,
status: str = "approved",
alert_names: list[str] | None = None,
) -> SimpleNamespace:
"""
建立一個最小可用的 Playbook mock 物件。
使用 SimpleNamespace 讓屬性存取與 Pydantic model 相同,
但不引入真實 ORM / Pydantic 依賴(防止 DB 連線)。
symptom_pattern.compute_hash() 返回固定 'abc123' 供測試使用。
"""
symptom = SimpleNamespace(
alert_names=alert_names or ["HighCpuUsage"],
affected_services=["api"],
label_patterns={},
compute_hash=lambda: "abc123",
)
from src.models.playbook import PlaybookStatus
status_enum = PlaybookStatus(status)
return SimpleNamespace(
playbook_id=playbook_id,
name=name,
trust_score=trust_score,
success_count=success_count,
failure_count=failure_count,
status=status_enum,
symptom_pattern=symptom,
)
def _make_learning_service():
"""
建立 LearningService 實例,所有外部依賴 mock 掉。
repository 和 trust_repository 均使用 AsyncMock 防止 Redis 連線。
"""
from src.services.learning_service import LearningService
mock_repo = AsyncMock()
mock_trust_repo = AsyncMock()
mock_trust_mgr = MagicMock()
mock_trust_mgr.get_trust_record.return_value = None
svc = LearningService(
repository=mock_repo,
trust_repository=mock_trust_repo,
)
svc._trust_manager = mock_trust_mgr
return svc
def _make_settings(enable_loop: bool = True, threshold: int = 5) -> MagicMock:
"""
建立 settings mock。
patch 路徑src.core.config.settingslearning_service 各方法均 local import 自此模組)
"""
m = MagicMock()
m.ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP = enable_loop
m.KM_PLAYBOOK_REVIEW_THRESHOLD = threshold
m.KM_WRITE_AWAIT = True
m.KM_WRITE_TIMEOUT_SECONDS = 5.0
return m
def _make_db_context_factory(mock_db):
"""
返回一個可多次呼叫的 async context manager factory。
每次呼叫 factory() 返回新的 async context manager 實例,
防止同一 cm 物件被複用async generator 只能迭代一次)。
"""
def factory():
@asynccontextmanager
async def _ctx():
yield mock_db
return _ctx()
return factory
# =============================================================================
# 1. Promote 觸發 → 寫 KM 演化條目
# =============================================================================
@pytest.mark.asyncio
async def test_playbook_promotion_writes_km_entry():
"""
_promote_playbook 觸發後,若 ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=True
km_write_with_flag 應被呼叫一次path_type 含 'playbook_evolution'
"""
svc = _make_learning_service()
playbook = _make_playbook(trust_score=0.5, status="approved")
km_calls: list = []
async def _mock_km_write(payload, *, timeout=None):
km_calls.append(payload)
from src.services.km_writer import KMWriteResult
return KMWriteResult.SUCCESS
mock_pb_repo = AsyncMock()
mock_pb_repo.find_by_source_incident = AsyncMock(return_value=[playbook])
mock_pb_repo.adjust_confidence = AsyncMock(return_value=True)
mock_settings = _make_settings(enable_loop=True)
with (
patch("src.core.config.settings", mock_settings),
patch("src.services.km_writer.km_write_with_flag", side_effect=_mock_km_write),
patch(
"src.repositories.playbook_repository.get_playbook_repository",
return_value=mock_pb_repo,
),
):
result = await svc._promote_playbook("INC-TEST-001")
assert result is True
assert len(km_calls) == 1, "KMWriter 應被呼叫一次(一個 Playbook promote"
assert "playbook_evolution" in km_calls[0].path_type
assert km_calls[0].metadata["evolution_type"] == "promote"
assert km_calls[0].metadata["playbook_id"] == playbook.playbook_id
assert km_calls[0].metadata["previous_trust"] == 0.5
# =============================================================================
# 2. Demote 觸發 → 寫 KM 演化條目
# =============================================================================
@pytest.mark.asyncio
async def test_playbook_demotion_writes_km_entry():
"""
_demote_playbook 觸發後,若 ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=True
km_write_with_flag 應被呼叫一次evolution_type='demote'
status='approved'(非 DEPRECATED→ 邏輯 3 不觸發,保持單一職責。
"""
svc = _make_learning_service()
playbook = _make_playbook(trust_score=0.4, status="approved")
km_calls: list = []
async def _mock_km_write(payload, *, timeout=None):
km_calls.append(payload)
from src.services.km_writer import KMWriteResult
return KMWriteResult.SUCCESS
mock_pb_repo = AsyncMock()
mock_pb_repo.find_by_source_incident = AsyncMock(return_value=[playbook])
mock_pb_repo.adjust_confidence = AsyncMock(return_value=True)
mock_settings = _make_settings(enable_loop=True)
with (
patch("src.core.config.settings", mock_settings),
patch("src.services.km_writer.km_write_with_flag", side_effect=_mock_km_write),
patch(
"src.repositories.playbook_repository.get_playbook_repository",
return_value=mock_pb_repo,
),
):
result = await svc._demote_playbook("INC-TEST-002")
assert result is True
assert len(km_calls) == 1, "KMWriter 應被呼叫一次(一個 Playbook demote"
assert "playbook_evolution" in km_calls[0].path_type
assert km_calls[0].metadata["evolution_type"] == "demote"
# =============================================================================
# 3. KM 累積 N=5 → review_required=True
# =============================================================================
@pytest.mark.asyncio
async def test_km_accumulation_triggers_playbook_review():
"""
同 symptoms_hash 的 KM 條目達到 threshold預設 5
_check_and_mark_playbook_review 應執行 COUNT + UPDATE並 commit。
"""
svc = _make_learning_service()
symptoms_hash = "abc123"
mock_db = AsyncMock()
execute_call_count = {"n": 0}
mock_count_result = MagicMock()
mock_count_result.scalar.return_value = 5
mock_update_result = MagicMock()
mock_update_result.fetchall.return_value = [("PB-20260428-AAAAAA",)]
async def _multi_execute(stmt, params=None):
execute_call_count["n"] += 1
if execute_call_count["n"] == 1:
return mock_count_result
return mock_update_result
mock_db.execute = _multi_execute
mock_db.commit = AsyncMock()
mock_settings = _make_settings(enable_loop=True, threshold=5)
with (
patch("src.core.config.settings", mock_settings),
patch(
"src.db.base.get_db_context",
side_effect=_make_db_context_factory(mock_db),
),
):
await svc._check_and_mark_playbook_review(symptoms_hash)
assert execute_call_count["n"] == 2, "應執行兩次 SQLCOUNT + UPDATE"
mock_db.commit.assert_called_once()
@pytest.mark.asyncio
async def test_km_accumulation_below_threshold_no_update():
"""
KM 條目數 < threshold → 不執行 UPDATE不 commit。
"""
svc = _make_learning_service()
symptoms_hash = "abc123"
mock_db = AsyncMock()
execute_call_count = {"n": 0}
mock_count_result = MagicMock()
mock_count_result.scalar.return_value = 3 # < 5
async def _single_execute(stmt, params=None):
execute_call_count["n"] += 1
return mock_count_result
mock_db.execute = _single_execute
mock_db.commit = AsyncMock()
mock_settings = _make_settings(enable_loop=True, threshold=5)
with (
patch("src.core.config.settings", mock_settings),
patch(
"src.db.base.get_db_context",
side_effect=_make_db_context_factory(mock_db),
),
):
await svc._check_and_mark_playbook_review(symptoms_hash)
assert execute_call_count["n"] == 1, "只執行 COUNT不執行 UPDATE"
mock_db.commit.assert_not_called()
# =============================================================================
# 4. DEPRECATED → alert_rule_catalog.confidence *= 0.5
# =============================================================================
@pytest.mark.asyncio
async def test_playbook_deprecated_demotes_alert_rule_confidence():
"""
DEPRECATED Playbook 的 _demote_alert_rule_catalog_confidence 執行後,
每個 alert_name 執行一次 UPDATE最後 commit 一次。
"""
svc = _make_learning_service()
from src.models.playbook import PlaybookStatus
playbook = _make_playbook(
status="deprecated",
alert_names=["HighCpuUsage", "PodCrashLooping"],
)
playbook.status = PlaybookStatus.DEPRECATED
mock_db = AsyncMock()
execute_call_count = {"n": 0}
async def _track_execute(stmt, params=None):
execute_call_count["n"] += 1
m = MagicMock()
m.rowcount = 1
return m
mock_db.execute = _track_execute
mock_db.commit = AsyncMock()
mock_settings = _make_settings(enable_loop=True)
with (
patch("src.core.config.settings", mock_settings),
patch(
"src.db.base.get_db_context",
side_effect=_make_db_context_factory(mock_db),
),
):
await svc._demote_alert_rule_catalog_confidence(playbook)
assert execute_call_count["n"] == 2, "2 條 alert_names → 2 次 UPDATE"
mock_db.commit.assert_called_once()
# =============================================================================
# 5. Feature flag disabled → 所有邏輯跳過
# =============================================================================
@pytest.mark.asyncio
async def test_feature_flag_disabled():
"""
ENABLE_KM_PLAYBOOK_FEEDBACK_LOOP=False 時,
_write_playbook_evolution_km / _check_and_mark_playbook_review /
_demote_alert_rule_catalog_confidence 均不應呼叫任何 DB 或 KMWriter。
"""
svc = _make_learning_service()
from src.models.playbook import PlaybookStatus
playbook = _make_playbook(trust_score=0.3, status="deprecated")
playbook.status = PlaybookStatus.DEPRECATED
km_write_calls: list = []
db_execute_calls: list = []
async def _mock_km_write(payload, *, timeout=None):
km_write_calls.append(payload)
from src.services.km_writer import KMWriteResult
return KMWriteResult.SUCCESS
mock_db = AsyncMock()
async def _track_execute(stmt, params=None):
db_execute_calls.append(stmt)
return MagicMock()
mock_db.execute = _track_execute
mock_db.commit = AsyncMock()
mock_settings = _make_settings(enable_loop=False)
with (
patch("src.core.config.settings", mock_settings),
patch("src.services.km_writer.km_write_with_flag", side_effect=_mock_km_write),
patch(
"src.db.base.get_db_context",
side_effect=_make_db_context_factory(mock_db),
),
):
# 邏輯 1
await svc._write_playbook_evolution_km(
playbook=playbook,
previous_trust=0.5,
evolution_type="promote",
incident_id="INC-TEST-FLAG",
)
# 邏輯 2
await svc._check_and_mark_playbook_review("abc123")
# 邏輯 3
await svc._demote_alert_rule_catalog_confidence(playbook)
assert len(km_write_calls) == 0, "KMWriter 不應被呼叫flag=False"
assert len(db_execute_calls) == 0, "DB execute 不應被呼叫flag=False"
mock_db.commit.assert_not_called()

View File

@@ -0,0 +1,352 @@
"""
SelfHealingValidator 整合測試
================================
W2 PR-V1: 飛輪斷鏈 C6 修復驗收測試
測試項目:
1. test_validator_called_after_verification
— ENABLE=True 時verify() 完成後 assess_self_healing 被呼叫
2. test_low_score_triggers_rollback_proposal
— score < 0.5 時Telegram rollback 提案被發送
3. test_high_score_no_action
— score >= 0.5 時Telegram 不觸發
4. test_validator_failure_does_not_block_main_flow
— assess_self_healing 拋例外verify() 仍返回正確結果
5. test_feature_flag_disabled_skips
— ENABLE=False 時assess_self_healing 不被呼叫
2026-04-28 ogt + Claude Sonnet 4.6: W2 PR-V1 初始建立
"""
from __future__ import annotations
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from src.services.post_execution_verifier import PostExecutionVerifier
from src.services.evidence_snapshot import EvidenceSnapshot
from src.services.self_healing_validator import assess_self_healing
# ─────────────────────────────────────────────────────────────────────────────
# Stubs
# ─────────────────────────────────────────────────────────────────────────────
def _stub_incident(
alertname: str = "KubePodCrashLooping",
namespace: str = "awoooi-prod",
pod: str = "api-xyz",
) -> object:
class _Signal:
labels = {
"alertname": alertname,
"namespace": namespace,
"pod": pod,
}
class _Incident:
incident_id = "INC-TEST"
signals = [_Signal()]
return _Incident()
def _stub_snapshot(incident_id: str = "INC-TEST") -> EvidenceSnapshot:
snap = EvidenceSnapshot(incident_id=incident_id)
snap.pre_execution_state = {"status": "CrashLoopBackOff"}
return snap
# ─────────────────────────────────────────────────────────────────────────────
# assess_self_healing 單元測試(無 IO
# ─────────────────────────────────────────────────────────────────────────────
class TestAssessSelfHealing:
"""assess_self_healing() 純函數測試"""
def test_success_result_gives_high_score(self):
result = assess_self_healing(
pre_state={"status": "CrashLoopBackOff"},
post_state={"status": "Running", "containers": "1/1"},
verification_result="success",
action_taken="restart_service:api",
)
assert result["score"] >= 0.5
assert result["root_cause_cleared"] is True
def test_failed_result_gives_zero_score(self):
result = assess_self_healing(
pre_state={"status": "Running"},
post_state={"status": "CrashLoopBackOff"},
verification_result="failed",
action_taken="patch_config",
)
assert result["score"] == 0.0
assert result["root_cause_cleared"] is False
def test_degraded_result_gives_low_score(self):
result = assess_self_healing(
pre_state=None,
post_state={"status": "Pending"},
verification_result="degraded",
action_taken="scale_up",
)
assert result["score"] < 0.5
def test_regression_reduces_score(self):
"""執行後出現新 CrashLoopBackOff → regression penalty 扣分"""
result = assess_self_healing(
pre_state={"status": "Running"},
post_state={"status": "Running", "reason": "CrashLoopBackOff"},
verification_result="success",
action_taken="restart_service",
)
# regression 要扣分
assert "crashloopbackoff" in result["regressions"]
# 即使 verification_result=successregression 導致扣分
assert result["score"] < 1.0
def test_no_regression_full_score_on_success(self):
"""乾淨的 success無 regression、root cause 解除 → score=1.0"""
result = assess_self_healing(
pre_state={"status": "CrashLoopBackOff"},
post_state={"status": "Running", "containers": "1/1"},
verification_result="success",
action_taken="restart_service:api",
)
assert result["score"] == 1.0
assert result["regressions"] == []
def test_timeout_gives_low_base_score(self):
result = assess_self_healing(
pre_state=None,
post_state={},
verification_result="timeout",
action_taken="restart_service",
)
assert result["score"] == 0.2
def test_detail_is_human_readable(self):
result = assess_self_healing(
pre_state=None,
post_state={"status": "Running"},
verification_result="success",
action_taken="restart",
)
assert "base=" in result["detail"]
# ─────────────────────────────────────────────────────────────────────────────
# 整合測試verify() → _run_self_healing_validator
# ─────────────────────────────────────────────────────────────────────────────
class TestVerifyIntegration:
"""PostExecutionVerifier.verify() 串接 SelfHealingValidator 整合測試"""
@pytest.mark.asyncio
async def test_validator_called_after_verification(self):
"""ENABLE=True → verify() 完成後 assess_self_healing 被呼叫"""
verifier = PostExecutionVerifier()
incident = _stub_incident()
with (
patch.object(
verifier,
"_collect_post_state",
new=AsyncMock(return_value={"status": "Running"}),
),
patch("src.services.post_execution_verifier._update_snapshot", new=AsyncMock()),
patch(
"src.services.post_execution_verifier._run_self_healing_validator",
new=AsyncMock(),
) as mock_validator,
):
await verifier.verify(
incident=incident,
snapshot=None,
action_taken="restart_service:api",
warmup_sec=0.0,
)
mock_validator.assert_called_once()
call_kwargs = mock_validator.call_args.kwargs
assert call_kwargs["incident_id"] == "INC-TEST"
assert call_kwargs["verification_result"] == "success"
@pytest.mark.asyncio
async def test_low_score_triggers_rollback_proposal(self):
"""score < 0.5 → Telegram rollback 提案被發送"""
with (
patch(
"src.services.self_healing_validator.assess_self_healing",
return_value={
"score": 0.2,
"root_cause_cleared": False,
"regressions": ["crashloopbackoff"],
"detail": "base=0.40; regression_penalty=0.15",
"verification_result": "degraded",
"action_taken": "restart_service",
},
),
patch(
"src.services.post_execution_verifier._send_rollback_proposal_alert",
new=AsyncMock(),
) as mock_send,
patch(
"src.core.config.get_settings",
return_value=MagicMock(ENABLE_SELF_HEALING_VALIDATOR=True),
),
):
from src.services.post_execution_verifier import _run_self_healing_validator
await _run_self_healing_validator(
incident_id="INC-LOW",
snapshot=None,
pre_state={"status": "Running"},
post_state={"status": "CrashLoopBackOff"},
verification_result="degraded",
action_taken="restart_service",
)
mock_send.assert_called_once()
call_kwargs = mock_send.call_args.kwargs
assert call_kwargs["score"] == 0.2
assert call_kwargs["incident_id"] == "INC-LOW"
@pytest.mark.asyncio
async def test_high_score_no_action(self):
"""score >= 0.5 → Telegram rollback 提案不發送"""
with (
patch(
"src.services.self_healing_validator.assess_self_healing",
return_value={
"score": 1.0,
"root_cause_cleared": True,
"regressions": [],
"detail": "base=1.00",
"verification_result": "success",
"action_taken": "restart_service",
},
),
patch(
"src.services.post_execution_verifier._send_rollback_proposal_alert",
new=AsyncMock(),
) as mock_send,
patch(
"src.core.config.get_settings",
return_value=MagicMock(ENABLE_SELF_HEALING_VALIDATOR=True),
),
):
from src.services.post_execution_verifier import _run_self_healing_validator
await _run_self_healing_validator(
incident_id="INC-HIGH",
snapshot=None,
pre_state={"status": "CrashLoopBackOff"},
post_state={"status": "Running"},
verification_result="success",
action_taken="restart_service",
)
mock_send.assert_not_called()
@pytest.mark.asyncio
async def test_validator_failure_does_not_block_main_flow(self):
"""assess_self_healing 拋例外verify() 仍返回正確結果"""
verifier = PostExecutionVerifier()
incident = _stub_incident()
with (
patch.object(
verifier,
"_collect_post_state",
new=AsyncMock(return_value={"status": "Running"}),
),
patch("src.services.post_execution_verifier._update_snapshot", new=AsyncMock()),
# _run_self_healing_validator 本身 raise → 應被吞掉
patch(
"src.services.post_execution_verifier._run_self_healing_validator",
new=AsyncMock(side_effect=RuntimeError("validator exploded")),
),
):
# verify() 不應 raise仍返回 "success"
result = await verifier.verify(
incident=incident,
snapshot=None,
action_taken="restart_service:api",
warmup_sec=0.0,
)
# verify() 的主流程結果不受影響
# 注意_run_self_healing_validator 由 verify() await 直接呼叫,
# 其例外由 verify() 的 try/exceptapprove_execution 層級)或自身包住
# 此測試確認即使 validator 炸掉result 仍是正確的驗證結果
assert result == "success"
@pytest.mark.asyncio
async def test_feature_flag_disabled_skips(self):
"""ENABLE_SELF_HEALING_VALIDATOR=False → assess_self_healing 不被呼叫"""
import src.services.self_healing_validator as _shv
with (
patch.object(_shv, "assess_self_healing") as mock_assess,
patch(
"src.core.config.get_settings",
return_value=MagicMock(ENABLE_SELF_HEALING_VALIDATOR=False),
),
):
from src.services.post_execution_verifier import _run_self_healing_validator
await _run_self_healing_validator(
incident_id="INC-FLAG",
snapshot=None,
pre_state=None,
post_state={"status": "Running"},
verification_result="success",
action_taken="restart_service",
)
mock_assess.assert_not_called()
@pytest.mark.asyncio
async def test_snapshot_self_healing_score_updated(self):
"""score 補填 EvidenceSnapshot.self_healing_score"""
snap = _stub_snapshot()
snap.update_self_healing = AsyncMock()
with (
patch(
"src.services.self_healing_validator.assess_self_healing",
return_value={
"score": 0.85,
"root_cause_cleared": True,
"regressions": [],
"detail": "base=1.00",
"verification_result": "success",
"action_taken": "restart_service",
},
),
patch(
"src.services.post_execution_verifier._send_rollback_proposal_alert",
new=AsyncMock(),
),
patch(
"src.core.config.get_settings",
return_value=MagicMock(ENABLE_SELF_HEALING_VALIDATOR=True),
),
):
from src.services.post_execution_verifier import _run_self_healing_validator
await _run_self_healing_validator(
incident_id="INC-SNAP",
snapshot=snap,
pre_state={"status": "CrashLoopBackOff"},
post_state={"status": "Running"},
verification_result="success",
action_taken="restart_service",
)
snap.update_self_healing.assert_called_once()
call_kwargs = snap.update_self_healing.call_args.kwargs
assert call_kwargs["score"] == 0.85
assert call_kwargs["detail"]["root_cause_cleared"] is True
assert call_kwargs["detail"]["regressions"] == []