feat(flywheel): W1 PR-R1 規則→Playbook 遷移 + PR-K1 timeline 防禦 ALTER
Some checks failed
run-migration / migrate (push) Failing after 12s
Type Sync Check / check-type-sync (push) Successful in 1m25s
CD Pipeline / build-and-deploy (push) Failing after 1m48s

W1 第二波:onboarder 飛輪 80→90 路徑剩餘兩件 PR。

## PR-R1 — 25 條 yaml 規則 → DRAFT Playbook 遷移

斷鏈背景(onboarder C2):alert_rules.yaml 25 條規則 68% 寫死 RESTART,
沒有對應 Playbook → RAG 永遠 generic_fallback → 規則命中率沒回饋給 catalog。

修法:
- 新建 services/rule_to_playbook_migrator.py
  - 自動從 alert_rules.yaml 解析每條 rule
  - 產生 PlaybookRecord(status=DRAFT, ai_confidence=0.3, source=YAML_RULE)
  - 誠實標示信心 0.3(非假 1.0,違反 feedback_confidence_truthfulness)
  - INSERT ON CONFLICT 冪等(name LIKE 'AutoMigrated: %' 去重,不擾動 seed)
- 新建 scripts/migrate_rules_to_playbooks.py(CLI: --dry-run/--commit/--disable-flag)
- ENABLE_RULE_MIGRATION_DRAFT=true(rollback flag)
- 23 測試覆蓋(parse / build_dict / idempotent / dry_run / action_type /
  severity_map / feature_flag / wildcard_filter / partial_existing 等)

## PR-K1 — timeline_events 防禦性 ALTER(db-expert finding)

任務原前提錯誤:onboarder 報告的 C7 斷鏈(incident_id 欄位)在
2026-04-24 P1.6 已修復 ORM。但生產環境若在 P1.6 前已建表,create_all 跳過
已存在的表 → ORM 寫入 SELECT 仍可能找不到 column。

修法:
- db/base.py:init_db() 補防禦性 ALTER:
  ALTER TABLE timeline_events ADD COLUMN IF NOT EXISTS incident_id VARCHAR(64);
  CREATE INDEX IF NOT EXISTS ix_timeline_incident_id ON timeline_events(incident_id);
- IF NOT EXISTS 為 no-op 安全(已有 column 不做事)
- stage 欄位是任務描述的幻覺(codebase 0 writer),不新增

未做:
- alembic migration(專案不用 alembic,遵循既有 init_db ALTER pattern)
- onboarder C7 在 ORM 層已修,本 commit 確保 prod schema 對齊

## 驗證
- 1608 unit tests 全綠(+23 from 1585)
- PR-R1 23 個測試獨立通過

## 期望影響
- 飛輪 RAG 終於有 25 條 DRAFT Playbook 可查 → +5 分
- prod schema 對齊保險 → 防 ORM SELECT 失敗

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-29 10:49:25 +08:00
parent c5753e1c57
commit 681b5ac949
5 changed files with 1024 additions and 0 deletions

View File

@@ -0,0 +1,140 @@
#!/usr/bin/env python3
"""
migrate_rules_to_playbooks.py — 規則 → Playbook 遷移 CLI
=========================================================
將 alert_rules.yaml 中的 25 條規則遷移為 DRAFT Playbook讓飛輪 RAG 有資料可查。
用法:
# 預設 dry-run只印計畫不寫 DB
python scripts/migrate_rules_to_playbooks.py
# 指定 yaml 路徑
python scripts/migrate_rules_to_playbooks.py --yaml-path /path/to/alert_rules.yaml
# 真實寫入 DB
python scripts/migrate_rules_to_playbooks.py --commit
# 完整選項
python scripts/migrate_rules_to_playbooks.py --yaml-path alert_rules.yaml --commit
W1 PR-R1 — 規則 → Playbook 遷移
2026-04-28 ogt + Claude Sonnet 4.6
"""
from __future__ import annotations
import argparse
import asyncio
import os
import sys
from pathlib import Path
# 確保 apps/api/src 在 import path 中(從 scripts/ 執行時)
_SCRIPT_DIR = Path(__file__).parent
_API_ROOT = _SCRIPT_DIR.parent
sys.path.insert(0, str(_API_ROOT))
# 預設 yaml 路徑:相對 scripts/ 的上一層apps/api/alert_rules.yaml
_DEFAULT_YAML_PATH = _API_ROOT / "alert_rules.yaml"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="將 alert_rules.yaml 遷移為 DRAFT Playbook飛輪 RAG 冷啟動)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
範例:
python scripts/migrate_rules_to_playbooks.py # dry-run預設
python scripts/migrate_rules_to_playbooks.py --commit # 真實寫入
python scripts/migrate_rules_to_playbooks.py --yaml-path alert_rules.yaml --commit
""",
)
parser.add_argument(
"--yaml-path",
type=Path,
default=_DEFAULT_YAML_PATH,
help=f"alert_rules.yaml 路徑(預設: {_DEFAULT_YAML_PATH}",
)
parser.add_argument(
"--commit",
action="store_true",
default=False,
help="真實寫入 DB預設 dry-run僅印計畫",
)
parser.add_argument(
"--disable-flag",
action="store_true",
default=False,
help="模擬 ENABLE_RULE_MIGRATION_DRAFT=false測試 feature flag 關閉路徑)",
)
return parser.parse_args()
async def _run(args: argparse.Namespace) -> int:
"""
非同步主流程
Returns:
exit code (0=成功, 1=有錯誤)
"""
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
yaml_path: Path = args.yaml_path
dry_run: bool = not args.commit
enable_migration: bool = not args.disable_flag
# 讀取 feature flag環境變數優先CLI flag 次之)
env_flag = os.environ.get("ENABLE_RULE_MIGRATION_DRAFT", "").lower()
if env_flag == "false":
enable_migration = False
print(f"\n{'[DRY-RUN] ' if dry_run else ''}規則 → Playbook 遷移")
print(f" yaml_path: {yaml_path}")
print(f" enable_migration: {enable_migration}")
print(f" dry_run: {dry_run}")
print()
if not yaml_path.exists():
print(f"[ERROR] yaml 不存在: {yaml_path}", file=sys.stderr)
return 1
report = await migrate_yaml_rules_to_playbooks(
yaml_path=yaml_path,
dry_run=dry_run,
enable_migration=enable_migration,
)
# 輸出報告
print("=" * 60)
print(report.summary())
print("=" * 60)
if report.created_names:
action = "待建立" if dry_run else "已建立"
print(f"\n{action} ({len(report.created_names)} 條):")
for name in report.created_names:
print(f" + {name}")
if report.skipped_names:
print(f"\n已跳過(已存在)({len(report.skipped_names)} 條):")
for name in report.skipped_names:
print(f" ~ {name}")
if report.errors:
print(f"\n[ERROR] 失敗 ({len(report.errors)} 條):", file=sys.stderr)
for err in report.errors:
print(f" ! {err}", file=sys.stderr)
if dry_run and report.created > 0:
print(f"\n提示: 加 --commit 參數執行實際寫入(將建立 {report.created} 條 DRAFT Playbook")
return 1 if report.failed > 0 else 0
def main() -> None:
args = parse_args()
exit_code = asyncio.run(_run(args))
sys.exit(exit_code)
if __name__ == "__main__":
main()

View File

@@ -72,6 +72,16 @@ class Settings(BaseSettings):
description="W1 PR-P1: True=generate_proposal 時執行 Playbook RAG 匹配並填 matched_playbook_id, False=行為與修復前完全相同(回滾用)",
)
# ==========================================================================
# W1 PR-R1: 規則 → Playbook 遷移 Feature Flag (2026-04-28 ogt + Claude Sonnet 4.6)
# 將 alert_rules.yaml 25 條規則遷移為 DRAFT Playbook飛輪 RAG 冷啟動)
# 回滾指令: kubectl set env deployment/awoooi-api ENABLE_RULE_MIGRATION_DRAFT=false
# ==========================================================================
ENABLE_RULE_MIGRATION_DRAFT: bool = Field(
default=True,
description="W1 PR-R1: True=允許 migrate_rules_to_playbooks CLI 寫入 DB, False=停用寫入(回滾用)",
)
# ==========================================================================
# P1-1: KMWriter 統一契約 (2026-04-28 ogt + Claude Sonnet 4.6)
# KM_WRITE_AWAIT=true → 強制 await asyncio.wait_for(timeout=KM_WRITE_TIMEOUT_SECONDS)

View File

@@ -220,6 +220,21 @@ async def init_db() -> None:
""")
)
# 2026-04-29 ogt + Claude Opus 4.7: PR-K1 防禦性 ALTER (db-expert finding)
# P1.6 (2026-04-24) ORM 已加 timeline_events.incident_id但 prod 若在 P1.6 前
# 已建表create_all 跳過已存在的表 → ALTER 不會跑 → ORM 寫入 SELECT 找不到欄位
# 補防禦性 IF NOT EXISTS已有 column 為 no-op安全
await conn.execute(
text("""
ALTER TABLE timeline_events
ADD COLUMN IF NOT EXISTS incident_id VARCHAR(64);
""")
)
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_timeline_incident_id "
"ON timeline_events(incident_id);"
))
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 自我治理閉環
# ADR-087: ai_governance_events 不可變 Event Sourcing 表
# asyncpg 不允許 prepared statement 內多條指令,必須分開 execute

View File

@@ -0,0 +1,405 @@
"""
Rule → Playbook Migrator
========================
將 alert_rules.yaml 中的 25 條規則遷移為 DRAFT Playbook讓飛輪 RAG 有料可查。
設計原則:
- status=DRAFT不直接 APPROVED — 違反「禁寫死」鐵律)
- ai_confidence=0.3(誠實標示,非假 1.0 — 違反 feedback_confidence_truthfulness
- source=PlaybookSource.YAML_RULE現有 enum不新增 RULE_MIGRATED
- 冪等name LIKE 'AutoMigrated: %' 已存在則跳過
- INSERT ON CONFLICT → repo.create() UPSERTplaybook_id 唯一鍵)
- 與 playbook_seed_service.py 完全解耦(不擾動既有 seed 機制)
name 格式: "AutoMigrated: {rule.id}" — 與 seed_service 用 description 作 name 的格式區隔
W1 PR-R1 — 規則 → Playbook 遷移
2026-04-28 ogt + Claude Sonnet 4.6
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import structlog
import yaml
logger = structlog.get_logger(__name__)
# 告警 severity → risk 等級
_SEVERITY_TO_RISK: dict[str, str] = {
"low": "LOW",
"medium": "MEDIUM",
"high": "HIGH",
"critical": "CRITICAL",
}
# yaml risk 欄位允許 "high" 但 RiskLevel enum 有 HIGHseed_service 用的 map 少了 high
_YAML_RISK_MAP: dict[str, str] = {
"low": "LOW",
"medium": "MEDIUM",
"high": "HIGH",
"critical": "CRITICAL",
}
@dataclass
class MigrationReport:
"""遷移報告"""
total_rules: int = 0
created: int = 0
skipped: int = 0
failed: int = 0
dry_run: bool = False
errors: list[str] = field(default_factory=list)
created_names: list[str] = field(default_factory=list)
skipped_names: list[str] = field(default_factory=list)
def summary(self) -> str:
mode = "[DRY-RUN] " if self.dry_run else ""
return (
f"{mode}遷移完成 — "
f"總計 {self.total_rules} 條規則,"
f"建立 {self.created},跳過 {self.skipped},失敗 {self.failed}"
)
# =============================================================================
# 命令類型判斷(不依賴 SPF-2 action_parser用既有 regex 守門)
# =============================================================================
def _infer_action_type(kubectl_command: str) -> str:
"""
從指令字串推斷 ActionType字串形式對應 ActionType enum 值)
規則:
- 空字串 → "manual"
- 以 "ssh " 開頭 → "ssh_command"
- 其他有指令 → "kubectl"
"""
cmd = (kubectl_command or "").strip()
if not cmd:
return "manual"
if cmd.startswith("ssh "):
return "ssh_command"
return "kubectl"
def _infer_risk_level(risk_str: str) -> str:
"""
YAML risk 欄位 → RiskLevel 字串
alert_rules.yaml 的 risk 欄位值: low / medium / high / critical
"""
return _YAML_RISK_MAP.get((risk_str or "medium").lower(), "MEDIUM")
def _build_symptom_pattern(rule: dict[str, Any]) -> dict[str, Any]:
"""
從規則 match block 推導 SymptomPattern dict
symptom_pattern 包含:
- alert_names: match.alertname list
- affected_services: 從 id/description 推導關鍵字(保守策略:留空,讓 RAG 學習)
- severity_range: 從 risk 反推 ["P1"] / ["P2"] / ["P3"]
- keywords: match.message list部分匹配關鍵字
"""
match_block = rule.get("match", {})
alertnames: list[str] = match_block.get("alertname", [])
messages: list[str] = match_block.get("message", [])
alert_types: list[str] = match_block.get("alert_type", [])
# risk → severity_range 反推
risk_str = (rule.get("response", {}).get("risk", "medium") or "medium").lower()
if risk_str == "critical":
severity_range = ["P1", "P2"]
elif risk_str in ("high", "medium"):
severity_range = ["P2", "P3"]
else:
severity_range = ["P3"]
# keywords: message + alert_type 列表合併(最多 15 個)
keywords = list(messages) + list(alert_types)
# 過濾萬用符generic_fallback 有 "*"
keywords = [k for k in keywords if k != "*"][:15]
return {
"alert_names": alertnames if isinstance(alertnames, list) else [alertnames],
"affected_services": [],
"severity_range": severity_range,
"keywords": keywords,
"label_patterns": {},
}
def _build_repair_steps(rule: dict[str, Any]) -> list[dict[str, Any]]:
"""
從規則 response block 建立 RepairStep dict list
策略:
- kubectl_command 存在且非空 → step 1
- 若 optimization list 存在 → 每項追加為額外步驟
- 若 kubectl_command 空 (NO_ACTION) → step 1 action_type=manualcommand=描述文字
"""
resp = rule.get("response", {})
kubectl_cmd = (resp.get("kubectl_command", "") or "").strip()
risk_level = _infer_risk_level(resp.get("risk", "medium"))
suggested_action = resp.get("suggested_action", "NO_ACTION") or "NO_ACTION"
steps: list[dict[str, Any]] = []
if kubectl_cmd:
action_type = _infer_action_type(kubectl_cmd)
steps.append({
"step_number": 1,
"action_type": action_type,
"command": kubectl_cmd,
"expected_result": resp.get("action_title", ""),
"risk_level": risk_level,
"requires_approval": risk_level == "CRITICAL" or suggested_action in ("RESTART_DEPLOYMENT", "DELETE_POD", "SCALE_DEPLOYMENT"),
})
else:
# NO_ACTION — 記錄診斷描述為 manual step讓 RAG 至少有症狀可查
description_text = resp.get("description", rule.get("description", "人工診斷"))
steps.append({
"step_number": 1,
"action_type": "manual",
"command": description_text[:500],
"expected_result": resp.get("action_title", ""),
"risk_level": risk_level,
"requires_approval": True,
})
# 追加 optimization steps最多 3 個step_number 從 2 開始)
for idx, opt in enumerate(resp.get("optimization", []) or [], start=2):
opt_cmd = (opt.get("command", "") or "").strip()
if not opt_cmd or opt_cmd.startswith("#"):
continue
steps.append({
"step_number": idx,
"action_type": _infer_action_type(opt_cmd),
"command": opt_cmd,
"expected_result": opt.get("description", ""),
"risk_level": "LOW",
"requires_approval": False,
})
if idx >= 4: # 最多 3 個 optimization steps
break
return steps
def _estimated_duration(risk_level: str, suggested_action: str) -> int:
"""估算修復時間(分鐘)"""
if suggested_action in ("NO_ACTION",):
return 15
if risk_level == "CRITICAL":
return 5
return 3
def _build_tags(rule: dict[str, Any]) -> list[str]:
"""從規則提取標籤"""
tags: set[str] = {"yaml_rule", "auto_migrated"}
rule_id = rule.get("id", "")
resp = rule.get("response", {})
responsibility = resp.get("responsibility", "")
if responsibility:
tags.add(responsibility.lower())
# 從 alertname 推導類型標籤
alertnames = rule.get("match", {}).get("alertname", [])
for name in alertnames:
name_lower = (name or "").lower()
if "cpu" in name_lower:
tags.add("cpu")
if "memory" in name_lower or "oom" in name_lower:
tags.add("memory")
if "disk" in name_lower or "storage" in name_lower:
tags.add("disk")
if "pod" in name_lower or "k8s" in name_lower or "kube" in name_lower:
tags.add("kubernetes")
if "ssl" in name_lower or "cert" in name_lower:
tags.add("ssl")
if "backup" in name_lower:
tags.add("backup")
if "postgresql" in name_lower or "postgres" in name_lower:
tags.add("database")
if "redis" in name_lower:
tags.add("cache")
if "ollama" in name_lower:
tags.add("ai")
return list(tags)[:10]
def parse_yaml_rules(yaml_path: Path) -> list[dict[str, Any]]:
"""
讀取並解析 alert_rules.yaml回傳 rules list
Raises:
FileNotFoundError: yaml 不存在
yaml.YAMLError: yaml 格式錯誤
"""
data = yaml.safe_load(yaml_path.read_text(encoding="utf-8"))
rules = data.get("rules", [])
return [r for r in rules if isinstance(r, dict)]
def build_playbook_dict(rule: dict[str, Any]) -> dict[str, Any]:
"""
從單條規則建立 Playbook 初始化 dict不寫 DB
Returns dict 可直接傳給 Playbook(**dict)
"""
rule_id = rule.get("id", "unknown")
resp = rule.get("response", {})
description = resp.get("description", rule.get("description", f"規則 {rule_id} 自動遷移"))
risk_str = (resp.get("risk", "medium") or "medium").lower()
suggested_action = resp.get("suggested_action", "NO_ACTION") or "NO_ACTION"
symptom_pattern = _build_symptom_pattern(rule)
repair_steps = _build_repair_steps(rule)
tags = _build_tags(rule)
risk_level = _infer_risk_level(risk_str)
duration = _estimated_duration(risk_level, suggested_action)
return {
"name": f"AutoMigrated: {rule_id}",
"description": description[:2000],
"status": "draft",
"source": "yaml_rule",
"symptom_pattern": symptom_pattern,
"repair_steps": repair_steps,
"estimated_duration_minutes": duration,
"ai_confidence": 0.3,
"trust_score": 0.3,
"tags": tags,
"notes": f"自動從 alert_rules.yaml rule.id={rule_id} 遷移。priority={rule.get('priority', 999)}",
"created_by_agent": "migrator",
}
# =============================================================================
# 核心遷移函式async依賴 DB
# =============================================================================
async def migrate_yaml_rules_to_playbooks(
yaml_path: Path,
dry_run: bool = True,
enable_migration: bool = True,
) -> MigrationReport:
"""
將 alert_rules.yaml 遷移為 DRAFT Playbook
Args:
yaml_path: alert_rules.yaml 路徑
dry_run: True=只印計畫不寫 DBFalse=真實寫入
enable_migration: feature flagENABLE_RULE_MIGRATION_DRAFTFalse 時直接 return
Returns:
MigrationReport
設計:
- 冪等name LIKE 'AutoMigrated: %' 已存在任何狀態的 playbook 即跳過
- 不依賴 seed_servicesource=yaml_rule 但 name prefix 不同,互不干擾)
- generic_fallback 規則id=generic_fallback也遷移讓 RAG 能學到「兜底症狀」
"""
report = MigrationReport(dry_run=dry_run)
if not enable_migration:
logger.info("rule_migration_disabled_by_flag")
return report
if not yaml_path.exists():
logger.error("rule_migration_yaml_not_found", path=str(yaml_path))
report.errors.append(f"yaml 不存在: {yaml_path}")
return report
# 1. 解析 yaml
try:
rules = parse_yaml_rules(yaml_path)
except Exception as e:
logger.error("rule_migration_parse_error", error=str(e))
report.errors.append(f"yaml 解析失敗: {e}")
return report
report.total_rules = len(rules)
if dry_run:
# Dry-run只建立 dict不查 DB、不寫 DB
for rule in rules:
rule_id = rule.get("id", "unknown")
try:
pb_dict = build_playbook_dict(rule)
report.created_names.append(pb_dict["name"])
report.created += 1
logger.info(
"rule_migration_dry_run_would_create",
rule_id=rule_id,
name=pb_dict["name"],
alert_names=pb_dict["symptom_pattern"]["alert_names"],
)
except Exception as e:
report.failed += 1
report.errors.append(f"rule_id={rule_id} 建立 dict 失敗: {e}")
logger.warning("rule_migration_dry_run_error", rule_id=rule_id, error=str(e))
return report
# 2. 查詢現有 AutoMigrated Playbook冪等去重
from src.db.base import get_db_context
from sqlalchemy import text as sa_text
async with get_db_context() as db:
rows = await db.execute(
sa_text("SELECT name FROM playbooks WHERE name LIKE 'AutoMigrated: %'")
)
existing_names: set[str] = {r[0] for r in rows.fetchall()}
# 3. 逐條遷移
from src.models.playbook import Playbook
from src.repositories.playbook_repository import get_playbook_repository
repo = get_playbook_repository()
for rule in rules:
rule_id = rule.get("id", "unknown")
try:
pb_dict = build_playbook_dict(rule)
name = pb_dict["name"]
if name in existing_names:
report.skipped += 1
report.skipped_names.append(name)
logger.debug("rule_migration_skip_existing", rule_id=rule_id, name=name)
continue
playbook = Playbook(**pb_dict)
await repo.create(playbook)
report.created += 1
report.created_names.append(name)
existing_names.add(name) # 防止同 session 重複建立
logger.info(
"rule_migration_created",
rule_id=rule_id,
playbook_id=playbook.playbook_id,
name=name,
)
except Exception as e:
report.failed += 1
report.errors.append(f"rule_id={rule_id} 失敗: {e}")
logger.warning("rule_migration_create_error", rule_id=rule_id, error=str(e))
logger.info(
"rule_migration_complete",
total=report.total_rules,
created=report.created,
skipped=report.skipped,
failed=report.failed,
)
return report

View File

@@ -0,0 +1,454 @@
"""
test_rule_to_playbook_migrator.py — 規則 → Playbook 遷移測試
=============================================================
W1 PR-R1: 覆蓋遷移邏輯,不真寫 DB
測試策略:
- 所有測試用 conftest.py 設定 MOCK_MODE=true避免 DB 連線
- DB 寫入用 AsyncMock 模擬
- yaml 解析用臨時 fixture 檔案(不依賴實際 alert_rules.yaml 避免路徑問題)
2026-04-28 ogt + Claude Sonnet 4.6
"""
from __future__ import annotations
import textwrap
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import yaml
# =============================================================================
# Fixtures
# =============================================================================
MINIMAL_RULE_KUBECTL = {
"id": "test_pod_crash",
"priority": 60,
"description": "Test Pod CrashLoopBackOff",
"match": {
"alertname": ["KubePodCrashLooping", "PodCrashLoopBackOff"],
"alert_type": ["pod_crash"],
"message": ["crashloop", "crash"],
},
"response": {
"action_title": "診斷 CrashLoop 根因",
"description": "⚙️ 規則匹配: Pod 進入 CrashLoopBackOff",
"suggested_action": "NO_ACTION",
"kubectl_command": "kubectl logs {target} -n {namespace} --previous --tail=50",
"risk": "critical",
"responsibility": "BE",
"optimization": [
{
"type": "LIVENESS_PROBE",
"description": "調整 liveness probe",
"command": "# 調整 initialDelaySeconds",
}
],
},
}
MINIMAL_RULE_SSH = {
"id": "test_ollama_down",
"priority": 90,
"description": "Test Ollama Down",
"match": {
"alertname": ["OllamaDown"],
"message": ["ollama"],
},
"response": {
"action_title": "重啟 Ollama",
"description": "⚙️ Ollama 下線",
"suggested_action": "RESTART_DEPLOYMENT",
"kubectl_command": "ssh {host} 'systemctl restart ollama'",
"risk": "medium",
"responsibility": "INFRA",
"optimization": [],
},
}
MINIMAL_RULE_NO_ACTION = {
"id": "test_no_action",
"priority": 110,
"description": "Test No Action Rule",
"match": {
"alertname": ["GiteaDown"],
"message": ["gitea"],
},
"response": {
"action_title": "Gitea 下線 — 人工確認",
"description": "⚠️ Gitea 無法連線,不自動修復",
"suggested_action": "NO_ACTION",
"kubectl_command": "",
"risk": "critical",
"responsibility": "INFRA",
"optimization": [],
},
}
SEVERITY_TEST_CASES = [
("low", "LOW"),
("medium", "MEDIUM"),
("high", "HIGH"),
("critical", "CRITICAL"),
("", "MEDIUM"),
(None, "MEDIUM"),
]
@pytest.fixture
def minimal_yaml(tmp_path: Path) -> Path:
"""建立含 3 條規則的最小 yaml fixture"""
data = {
"version": "1.0.0",
"updated_at": "2026-04-28",
"rules": [
MINIMAL_RULE_KUBECTL,
MINIMAL_RULE_SSH,
MINIMAL_RULE_NO_ACTION,
],
}
yaml_file = tmp_path / "test_alert_rules.yaml"
yaml_file.write_text(yaml.dump(data, allow_unicode=True), encoding="utf-8")
return yaml_file
@pytest.fixture
def empty_yaml(tmp_path: Path) -> Path:
"""建立空規則 yaml"""
data = {"version": "1.0.0", "rules": []}
yaml_file = tmp_path / "empty_rules.yaml"
yaml_file.write_text(yaml.dump(data), encoding="utf-8")
return yaml_file
# =============================================================================
# 1. test_parse_yaml_rule_extracts_alertname
# =============================================================================
def test_parse_yaml_rule_extracts_alertname(minimal_yaml: Path) -> None:
"""parse_yaml_rules 能正確讀取 alertname list"""
from src.services.rule_to_playbook_migrator import parse_yaml_rules
rules = parse_yaml_rules(minimal_yaml)
assert len(rules) == 3
# 第一條規則的 alertnames
alertnames = rules[0]["match"]["alertname"]
assert "KubePodCrashLooping" in alertnames
assert "PodCrashLoopBackOff" in alertnames
def test_parse_yaml_rule_all_fields(minimal_yaml: Path) -> None:
"""parse_yaml_rules 保留所有欄位"""
from src.services.rule_to_playbook_migrator import parse_yaml_rules
rules = parse_yaml_rules(minimal_yaml)
rule = rules[0]
assert rule["id"] == "test_pod_crash"
assert rule["priority"] == 60
assert "response" in rule
assert rule["response"]["risk"] == "critical"
# =============================================================================
# 2. test_migration_creates_draft_playbook
# =============================================================================
def test_build_playbook_dict_creates_draft() -> None:
"""build_playbook_dict 產生 status=draft 的 dict"""
from src.services.rule_to_playbook_migrator import build_playbook_dict
pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL)
assert pb_dict["status"] == "draft"
assert pb_dict["source"] == "yaml_rule"
assert pb_dict["ai_confidence"] == 0.3
assert pb_dict["trust_score"] == 0.3
def test_build_playbook_dict_name_prefix() -> None:
"""name 格式必須是 'AutoMigrated: {rule_id}'"""
from src.services.rule_to_playbook_migrator import build_playbook_dict
pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL)
assert pb_dict["name"] == "AutoMigrated: test_pod_crash"
def test_build_playbook_dict_symptom_pattern_alertnames() -> None:
"""symptom_pattern.alert_names 來自 match.alertname"""
from src.services.rule_to_playbook_migrator import build_playbook_dict
pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL)
sp = pb_dict["symptom_pattern"]
assert "KubePodCrashLooping" in sp["alert_names"]
assert "PodCrashLoopBackOff" in sp["alert_names"]
def test_build_playbook_dict_kubectl_action_type() -> None:
"""kubectl 指令 → action_type=kubectl"""
from src.services.rule_to_playbook_migrator import build_playbook_dict
pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL)
step = pb_dict["repair_steps"][0]
assert step["action_type"] == "kubectl"
assert "kubectl logs" in step["command"]
def test_build_playbook_dict_ssh_action_type() -> None:
"""ssh 指令 → action_type=ssh_command"""
from src.services.rule_to_playbook_migrator import build_playbook_dict
pb_dict = build_playbook_dict(MINIMAL_RULE_SSH)
step = pb_dict["repair_steps"][0]
assert step["action_type"] == "ssh_command"
assert step["command"].startswith("ssh ")
def test_build_playbook_dict_no_action_uses_manual() -> None:
"""kubectl_command 為空時 → action_type=manualcommand 為描述文字"""
from src.services.rule_to_playbook_migrator import build_playbook_dict
pb_dict = build_playbook_dict(MINIMAL_RULE_NO_ACTION)
step = pb_dict["repair_steps"][0]
assert step["action_type"] == "manual"
assert len(step["command"]) > 0 # 有描述文字
assert step["requires_approval"] is True
# =============================================================================
# 3. test_migration_idempotent_on_conflict
# =============================================================================
@pytest.mark.asyncio
async def test_migration_idempotent_on_conflict(minimal_yaml: Path) -> None:
"""已存在的 AutoMigrated: name → 跳過,不重複建立"""
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
# 模擬 DB 已有全部 3 條規則
existing_names = {
"AutoMigrated: test_pod_crash",
"AutoMigrated: test_ollama_down",
"AutoMigrated: test_no_action",
}
mock_result = MagicMock()
mock_result.fetchall.return_value = [(name,) for name in existing_names]
mock_db = AsyncMock()
mock_db.execute = AsyncMock(return_value=mock_result)
mock_cm = MagicMock()
mock_cm.__aenter__ = AsyncMock(return_value=mock_db)
mock_cm.__aexit__ = AsyncMock(return_value=False)
mock_repo = MagicMock()
mock_repo.create = AsyncMock(side_effect=AssertionError("不應呼叫 create"))
# patch src.db.base.get_db_contextlazy import 的攔截點)
with (
patch("src.db.base.get_db_context", return_value=mock_cm),
patch("src.repositories.playbook_repository.get_playbook_repository", return_value=mock_repo),
):
report = await migrate_yaml_rules_to_playbooks(
yaml_path=minimal_yaml,
dry_run=False,
enable_migration=True,
)
assert report.created == 0
assert report.skipped == 3
assert report.failed == 0
# =============================================================================
# 4. test_migration_dry_run_no_db_write
# =============================================================================
@pytest.mark.asyncio
async def test_migration_dry_run_no_db_write(minimal_yaml: Path) -> None:
"""dry_run=True 時不查 DB、不寫 DB"""
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
with (
patch("src.db.base.get_db_context") as mock_db_ctx,
patch("src.repositories.playbook_repository.get_playbook_repository") as mock_repo,
):
report = await migrate_yaml_rules_to_playbooks(
yaml_path=minimal_yaml,
dry_run=True,
enable_migration=True,
)
# 不呼叫 DB
mock_db_ctx.assert_not_called()
mock_repo.assert_not_called()
# dry-run 時 created = 規則總數(全部「待建立」)
assert report.dry_run is True
assert report.total_rules == 3
assert report.created == 3
assert report.failed == 0
# =============================================================================
# 5. test_kubectl_command_validation_via_regex
# =============================================================================
def test_infer_action_type_kubectl() -> None:
"""kubectl 指令 → kubectl"""
from src.services.rule_to_playbook_migrator import _infer_action_type
assert _infer_action_type("kubectl delete pod foo -n default") == "kubectl"
assert _infer_action_type("kubectl rollout restart deployment/api -n awoooi") == "kubectl"
def test_infer_action_type_ssh() -> None:
"""ssh 指令 → ssh_command"""
from src.services.rule_to_playbook_migrator import _infer_action_type
assert _infer_action_type("ssh {host} 'docker restart minio'") == "ssh_command"
assert _infer_action_type("ssh root@192.168.0.111 'systemctl restart ollama'") == "ssh_command"
def test_infer_action_type_empty() -> None:
"""空指令 → manual"""
from src.services.rule_to_playbook_migrator import _infer_action_type
assert _infer_action_type("") == "manual"
assert _infer_action_type(" ") == "manual"
# =============================================================================
# 6. test_severity_to_risk_level_mapping
# =============================================================================
@pytest.mark.parametrize("risk_str,expected", SEVERITY_TEST_CASES)
def test_severity_to_risk_level_mapping(risk_str: str | None, expected: str) -> None:
"""YAML risk 欄位 → RiskLevel 字串映射"""
from src.services.rule_to_playbook_migrator import _infer_risk_level
result = _infer_risk_level(risk_str)
assert result == expected
# =============================================================================
# 7. test_feature_flag_disabled_skips_db_insert
# =============================================================================
@pytest.mark.asyncio
async def test_feature_flag_disabled_skips_db_insert(minimal_yaml: Path) -> None:
"""enable_migration=False 時直接 return 空報告,不查 DB"""
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
with (
patch("src.db.base.get_db_context") as mock_db_ctx,
patch("src.repositories.playbook_repository.get_playbook_repository") as mock_repo,
):
report = await migrate_yaml_rules_to_playbooks(
yaml_path=minimal_yaml,
dry_run=False,
enable_migration=False,
)
mock_db_ctx.assert_not_called()
mock_repo.assert_not_called()
assert report.total_rules == 0
assert report.created == 0
assert report.skipped == 0
# =============================================================================
# 8. 整合路徑1 條已存在 + 2 條新建
# =============================================================================
@pytest.mark.asyncio
async def test_migration_partial_existing(minimal_yaml: Path) -> None:
"""1 條已存在 → skipped=12 條新建 → created=2"""
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
# 只有第一條已存在
existing_names = {"AutoMigrated: test_pod_crash"}
mock_result = MagicMock()
mock_result.fetchall.return_value = [(name,) for name in existing_names]
mock_db = AsyncMock()
mock_db.execute = AsyncMock(return_value=mock_result)
mock_cm = MagicMock()
mock_cm.__aenter__ = AsyncMock(return_value=mock_db)
mock_cm.__aexit__ = AsyncMock(return_value=False)
mock_repo = MagicMock()
mock_repo.create = AsyncMock(side_effect=lambda pb: pb)
with (
patch("src.db.base.get_db_context", return_value=mock_cm),
patch("src.repositories.playbook_repository.PlaybookRepository", return_value=mock_repo),
patch("src.repositories.playbook_repository.get_playbook_repository", return_value=mock_repo),
):
report = await migrate_yaml_rules_to_playbooks(
yaml_path=minimal_yaml,
dry_run=False,
enable_migration=True,
)
assert report.skipped == 1
assert report.created == 2
assert report.failed == 0
assert mock_repo.create.call_count == 2
# =============================================================================
# 9. yaml 不存在 → 回傳有錯誤的報告
# =============================================================================
@pytest.mark.asyncio
async def test_migration_yaml_not_found(tmp_path: Path) -> None:
"""yaml 不存在 → 回傳有錯誤的報告,不 raise"""
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
nonexistent = tmp_path / "nonexistent.yaml"
report = await migrate_yaml_rules_to_playbooks(
yaml_path=nonexistent,
dry_run=False,
enable_migration=True,
)
assert report.failed == 0
assert len(report.errors) == 1
assert "不存在" in report.errors[0]
# =============================================================================
# 10. symptom_pattern keywords 過濾萬用符
# =============================================================================
def test_build_symptom_pattern_filters_wildcard() -> None:
"""generic_fallback 的 alertname=['*'] 不應進入 keywords"""
from src.services.rule_to_playbook_migrator import _build_symptom_pattern
generic_rule = {
"id": "generic_fallback",
"match": {
"alertname": ["*"],
"message": ["fallback"],
},
"response": {"risk": "medium"},
}
sp = _build_symptom_pattern(generic_rule)
# alertname=['*'] 進入 alert_names 沒問題(就是萬用符)
# keywords 不應含 '*'
assert "*" not in sp["keywords"]