diff --git a/apps/api/scripts/migrate_rules_to_playbooks.py b/apps/api/scripts/migrate_rules_to_playbooks.py new file mode 100644 index 00000000..ba70b290 --- /dev/null +++ b/apps/api/scripts/migrate_rules_to_playbooks.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +""" +migrate_rules_to_playbooks.py — 規則 → Playbook 遷移 CLI +========================================================= +將 alert_rules.yaml 中的 25 條規則遷移為 DRAFT Playbook,讓飛輪 RAG 有資料可查。 + +用法: + # 預設 dry-run(只印計畫,不寫 DB) + python scripts/migrate_rules_to_playbooks.py + + # 指定 yaml 路徑 + python scripts/migrate_rules_to_playbooks.py --yaml-path /path/to/alert_rules.yaml + + # 真實寫入 DB + python scripts/migrate_rules_to_playbooks.py --commit + + # 完整選項 + python scripts/migrate_rules_to_playbooks.py --yaml-path alert_rules.yaml --commit + +W1 PR-R1 — 規則 → Playbook 遷移 +2026-04-28 ogt + Claude Sonnet 4.6 +""" +from __future__ import annotations + +import argparse +import asyncio +import os +import sys +from pathlib import Path + +# 確保 apps/api/src 在 import path 中(從 scripts/ 執行時) +_SCRIPT_DIR = Path(__file__).parent +_API_ROOT = _SCRIPT_DIR.parent +sys.path.insert(0, str(_API_ROOT)) + +# 預設 yaml 路徑:相對 scripts/ 的上一層(apps/api/alert_rules.yaml) +_DEFAULT_YAML_PATH = _API_ROOT / "alert_rules.yaml" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="將 alert_rules.yaml 遷移為 DRAFT Playbook(飛輪 RAG 冷啟動)", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +範例: + python scripts/migrate_rules_to_playbooks.py # dry-run(預設) + python scripts/migrate_rules_to_playbooks.py --commit # 真實寫入 + python scripts/migrate_rules_to_playbooks.py --yaml-path alert_rules.yaml --commit + """, + ) + parser.add_argument( + "--yaml-path", + type=Path, + default=_DEFAULT_YAML_PATH, + help=f"alert_rules.yaml 路徑(預設: {_DEFAULT_YAML_PATH})", + ) + parser.add_argument( + "--commit", + action="store_true", + default=False, + help="真實寫入 DB(預設 dry-run,僅印計畫)", + ) + parser.add_argument( + "--disable-flag", + action="store_true", + default=False, + help="模擬 ENABLE_RULE_MIGRATION_DRAFT=false(測試 feature flag 關閉路徑)", + ) + return parser.parse_args() + + +async def _run(args: argparse.Namespace) -> int: + """ + 非同步主流程 + + Returns: + exit code (0=成功, 1=有錯誤) + """ + from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks + + yaml_path: Path = args.yaml_path + dry_run: bool = not args.commit + enable_migration: bool = not args.disable_flag + + # 讀取 feature flag(環境變數優先,CLI flag 次之) + env_flag = os.environ.get("ENABLE_RULE_MIGRATION_DRAFT", "").lower() + if env_flag == "false": + enable_migration = False + + print(f"\n{'[DRY-RUN] ' if dry_run else ''}規則 → Playbook 遷移") + print(f" yaml_path: {yaml_path}") + print(f" enable_migration: {enable_migration}") + print(f" dry_run: {dry_run}") + print() + + if not yaml_path.exists(): + print(f"[ERROR] yaml 不存在: {yaml_path}", file=sys.stderr) + return 1 + + report = await migrate_yaml_rules_to_playbooks( + yaml_path=yaml_path, + dry_run=dry_run, + enable_migration=enable_migration, + ) + + # 輸出報告 + print("=" * 60) + print(report.summary()) + print("=" * 60) + + if report.created_names: + action = "待建立" if dry_run else "已建立" + print(f"\n{action} ({len(report.created_names)} 條):") + for name in report.created_names: + print(f" + {name}") + + if report.skipped_names: + print(f"\n已跳過(已存在)({len(report.skipped_names)} 條):") + for name in report.skipped_names: + print(f" ~ {name}") + + if report.errors: + print(f"\n[ERROR] 失敗 ({len(report.errors)} 條):", file=sys.stderr) + for err in report.errors: + print(f" ! {err}", file=sys.stderr) + + if dry_run and report.created > 0: + print(f"\n提示: 加 --commit 參數執行實際寫入(將建立 {report.created} 條 DRAFT Playbook)") + + return 1 if report.failed > 0 else 0 + + +def main() -> None: + args = parse_args() + exit_code = asyncio.run(_run(args)) + sys.exit(exit_code) + + +if __name__ == "__main__": + main() diff --git a/apps/api/src/core/config.py b/apps/api/src/core/config.py index 2ba31982..aa060200 100644 --- a/apps/api/src/core/config.py +++ b/apps/api/src/core/config.py @@ -72,6 +72,16 @@ class Settings(BaseSettings): description="W1 PR-P1: True=generate_proposal 時執行 Playbook RAG 匹配並填 matched_playbook_id, False=行為與修復前完全相同(回滾用)", ) + # ========================================================================== + # W1 PR-R1: 規則 → Playbook 遷移 Feature Flag (2026-04-28 ogt + Claude Sonnet 4.6) + # 將 alert_rules.yaml 25 條規則遷移為 DRAFT Playbook(飛輪 RAG 冷啟動) + # 回滾指令: kubectl set env deployment/awoooi-api ENABLE_RULE_MIGRATION_DRAFT=false + # ========================================================================== + ENABLE_RULE_MIGRATION_DRAFT: bool = Field( + default=True, + description="W1 PR-R1: True=允許 migrate_rules_to_playbooks CLI 寫入 DB, False=停用寫入(回滾用)", + ) + # ========================================================================== # P1-1: KMWriter 統一契約 (2026-04-28 ogt + Claude Sonnet 4.6) # KM_WRITE_AWAIT=true → 強制 await asyncio.wait_for(timeout=KM_WRITE_TIMEOUT_SECONDS) diff --git a/apps/api/src/db/base.py b/apps/api/src/db/base.py index c3ab124c..be8ae2dc 100644 --- a/apps/api/src/db/base.py +++ b/apps/api/src/db/base.py @@ -220,6 +220,21 @@ async def init_db() -> None: """) ) + # 2026-04-29 ogt + Claude Opus 4.7: PR-K1 防禦性 ALTER (db-expert finding) + # P1.6 (2026-04-24) ORM 已加 timeline_events.incident_id,但 prod 若在 P1.6 前 + # 已建表,create_all 跳過已存在的表 → ALTER 不會跑 → ORM 寫入 SELECT 找不到欄位 + # 補防禦性 IF NOT EXISTS(已有 column 為 no-op,安全) + await conn.execute( + text(""" + ALTER TABLE timeline_events + ADD COLUMN IF NOT EXISTS incident_id VARCHAR(64); + """) + ) + await conn.execute(text( + "CREATE INDEX IF NOT EXISTS ix_timeline_incident_id " + "ON timeline_events(incident_id);" + )) + # 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 自我治理閉環 # ADR-087: ai_governance_events 不可變 Event Sourcing 表 # asyncpg 不允許 prepared statement 內多條指令,必須分開 execute diff --git a/apps/api/src/services/rule_to_playbook_migrator.py b/apps/api/src/services/rule_to_playbook_migrator.py new file mode 100644 index 00000000..2e6cb98b --- /dev/null +++ b/apps/api/src/services/rule_to_playbook_migrator.py @@ -0,0 +1,405 @@ +""" +Rule → Playbook Migrator +======================== +將 alert_rules.yaml 中的 25 條規則遷移為 DRAFT Playbook,讓飛輪 RAG 有料可查。 + +設計原則: +- status=DRAFT(不直接 APPROVED — 違反「禁寫死」鐵律) +- ai_confidence=0.3(誠實標示,非假 1.0 — 違反 feedback_confidence_truthfulness) +- source=PlaybookSource.YAML_RULE(現有 enum,不新增 RULE_MIGRATED) +- 冪等:name LIKE 'AutoMigrated: %' 已存在則跳過 +- INSERT ON CONFLICT → repo.create() UPSERT(playbook_id 唯一鍵) +- 與 playbook_seed_service.py 完全解耦(不擾動既有 seed 機制) + +name 格式: "AutoMigrated: {rule.id}" — 與 seed_service 用 description 作 name 的格式區隔 + +W1 PR-R1 — 規則 → Playbook 遷移 +2026-04-28 ogt + Claude Sonnet 4.6 +""" +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +import structlog +import yaml + +logger = structlog.get_logger(__name__) + +# 告警 severity → risk 等級 +_SEVERITY_TO_RISK: dict[str, str] = { + "low": "LOW", + "medium": "MEDIUM", + "high": "HIGH", + "critical": "CRITICAL", +} + +# yaml risk 欄位允許 "high" 但 RiskLevel enum 有 HIGH;seed_service 用的 map 少了 high +_YAML_RISK_MAP: dict[str, str] = { + "low": "LOW", + "medium": "MEDIUM", + "high": "HIGH", + "critical": "CRITICAL", +} + + +@dataclass +class MigrationReport: + """遷移報告""" + total_rules: int = 0 + created: int = 0 + skipped: int = 0 + failed: int = 0 + dry_run: bool = False + errors: list[str] = field(default_factory=list) + created_names: list[str] = field(default_factory=list) + skipped_names: list[str] = field(default_factory=list) + + def summary(self) -> str: + mode = "[DRY-RUN] " if self.dry_run else "" + return ( + f"{mode}遷移完成 — " + f"總計 {self.total_rules} 條規則," + f"建立 {self.created},跳過 {self.skipped},失敗 {self.failed}" + ) + + +# ============================================================================= +# 命令類型判斷(不依賴 SPF-2 action_parser,用既有 regex 守門) +# ============================================================================= + +def _infer_action_type(kubectl_command: str) -> str: + """ + 從指令字串推斷 ActionType(字串形式,對應 ActionType enum 值) + + 規則: + - 空字串 → "manual" + - 以 "ssh " 開頭 → "ssh_command" + - 其他有指令 → "kubectl" + """ + cmd = (kubectl_command or "").strip() + if not cmd: + return "manual" + if cmd.startswith("ssh "): + return "ssh_command" + return "kubectl" + + +def _infer_risk_level(risk_str: str) -> str: + """ + YAML risk 欄位 → RiskLevel 字串 + + alert_rules.yaml 的 risk 欄位值: low / medium / high / critical + """ + return _YAML_RISK_MAP.get((risk_str or "medium").lower(), "MEDIUM") + + +def _build_symptom_pattern(rule: dict[str, Any]) -> dict[str, Any]: + """ + 從規則 match block 推導 SymptomPattern dict + + symptom_pattern 包含: + - alert_names: match.alertname list + - affected_services: 從 id/description 推導關鍵字(保守策略:留空,讓 RAG 學習) + - severity_range: 從 risk 反推 ["P1"] / ["P2"] / ["P3"] + - keywords: match.message list(部分匹配關鍵字) + """ + match_block = rule.get("match", {}) + alertnames: list[str] = match_block.get("alertname", []) + messages: list[str] = match_block.get("message", []) + alert_types: list[str] = match_block.get("alert_type", []) + + # risk → severity_range 反推 + risk_str = (rule.get("response", {}).get("risk", "medium") or "medium").lower() + if risk_str == "critical": + severity_range = ["P1", "P2"] + elif risk_str in ("high", "medium"): + severity_range = ["P2", "P3"] + else: + severity_range = ["P3"] + + # keywords: message + alert_type 列表合併(最多 15 個) + keywords = list(messages) + list(alert_types) + # 過濾萬用符(generic_fallback 有 "*") + keywords = [k for k in keywords if k != "*"][:15] + + return { + "alert_names": alertnames if isinstance(alertnames, list) else [alertnames], + "affected_services": [], + "severity_range": severity_range, + "keywords": keywords, + "label_patterns": {}, + } + + +def _build_repair_steps(rule: dict[str, Any]) -> list[dict[str, Any]]: + """ + 從規則 response block 建立 RepairStep dict list + + 策略: + - kubectl_command 存在且非空 → step 1 + - 若 optimization list 存在 → 每項追加為額外步驟 + - 若 kubectl_command 空 (NO_ACTION) → step 1 action_type=manual,command=描述文字 + """ + resp = rule.get("response", {}) + kubectl_cmd = (resp.get("kubectl_command", "") or "").strip() + risk_level = _infer_risk_level(resp.get("risk", "medium")) + suggested_action = resp.get("suggested_action", "NO_ACTION") or "NO_ACTION" + + steps: list[dict[str, Any]] = [] + + if kubectl_cmd: + action_type = _infer_action_type(kubectl_cmd) + steps.append({ + "step_number": 1, + "action_type": action_type, + "command": kubectl_cmd, + "expected_result": resp.get("action_title", ""), + "risk_level": risk_level, + "requires_approval": risk_level == "CRITICAL" or suggested_action in ("RESTART_DEPLOYMENT", "DELETE_POD", "SCALE_DEPLOYMENT"), + }) + else: + # NO_ACTION — 記錄診斷描述為 manual step,讓 RAG 至少有症狀可查 + description_text = resp.get("description", rule.get("description", "人工診斷")) + steps.append({ + "step_number": 1, + "action_type": "manual", + "command": description_text[:500], + "expected_result": resp.get("action_title", ""), + "risk_level": risk_level, + "requires_approval": True, + }) + + # 追加 optimization steps(最多 3 個,step_number 從 2 開始) + for idx, opt in enumerate(resp.get("optimization", []) or [], start=2): + opt_cmd = (opt.get("command", "") or "").strip() + if not opt_cmd or opt_cmd.startswith("#"): + continue + steps.append({ + "step_number": idx, + "action_type": _infer_action_type(opt_cmd), + "command": opt_cmd, + "expected_result": opt.get("description", ""), + "risk_level": "LOW", + "requires_approval": False, + }) + if idx >= 4: # 最多 3 個 optimization steps + break + + return steps + + +def _estimated_duration(risk_level: str, suggested_action: str) -> int: + """估算修復時間(分鐘)""" + if suggested_action in ("NO_ACTION",): + return 15 + if risk_level == "CRITICAL": + return 5 + return 3 + + +def _build_tags(rule: dict[str, Any]) -> list[str]: + """從規則提取標籤""" + tags: set[str] = {"yaml_rule", "auto_migrated"} + + rule_id = rule.get("id", "") + resp = rule.get("response", {}) + responsibility = resp.get("responsibility", "") + if responsibility: + tags.add(responsibility.lower()) + + # 從 alertname 推導類型標籤 + alertnames = rule.get("match", {}).get("alertname", []) + for name in alertnames: + name_lower = (name or "").lower() + if "cpu" in name_lower: + tags.add("cpu") + if "memory" in name_lower or "oom" in name_lower: + tags.add("memory") + if "disk" in name_lower or "storage" in name_lower: + tags.add("disk") + if "pod" in name_lower or "k8s" in name_lower or "kube" in name_lower: + tags.add("kubernetes") + if "ssl" in name_lower or "cert" in name_lower: + tags.add("ssl") + if "backup" in name_lower: + tags.add("backup") + if "postgresql" in name_lower or "postgres" in name_lower: + tags.add("database") + if "redis" in name_lower: + tags.add("cache") + if "ollama" in name_lower: + tags.add("ai") + + return list(tags)[:10] + + +def parse_yaml_rules(yaml_path: Path) -> list[dict[str, Any]]: + """ + 讀取並解析 alert_rules.yaml,回傳 rules list + + Raises: + FileNotFoundError: yaml 不存在 + yaml.YAMLError: yaml 格式錯誤 + """ + data = yaml.safe_load(yaml_path.read_text(encoding="utf-8")) + rules = data.get("rules", []) + return [r for r in rules if isinstance(r, dict)] + + +def build_playbook_dict(rule: dict[str, Any]) -> dict[str, Any]: + """ + 從單條規則建立 Playbook 初始化 dict(不寫 DB) + + Returns dict 可直接傳給 Playbook(**dict) + """ + rule_id = rule.get("id", "unknown") + resp = rule.get("response", {}) + description = resp.get("description", rule.get("description", f"規則 {rule_id} 自動遷移")) + risk_str = (resp.get("risk", "medium") or "medium").lower() + suggested_action = resp.get("suggested_action", "NO_ACTION") or "NO_ACTION" + + symptom_pattern = _build_symptom_pattern(rule) + repair_steps = _build_repair_steps(rule) + tags = _build_tags(rule) + risk_level = _infer_risk_level(risk_str) + duration = _estimated_duration(risk_level, suggested_action) + + return { + "name": f"AutoMigrated: {rule_id}", + "description": description[:2000], + "status": "draft", + "source": "yaml_rule", + "symptom_pattern": symptom_pattern, + "repair_steps": repair_steps, + "estimated_duration_minutes": duration, + "ai_confidence": 0.3, + "trust_score": 0.3, + "tags": tags, + "notes": f"自動從 alert_rules.yaml rule.id={rule_id} 遷移。priority={rule.get('priority', 999)}", + "created_by_agent": "migrator", + } + + +# ============================================================================= +# 核心遷移函式(async,依賴 DB) +# ============================================================================= + +async def migrate_yaml_rules_to_playbooks( + yaml_path: Path, + dry_run: bool = True, + enable_migration: bool = True, +) -> MigrationReport: + """ + 將 alert_rules.yaml 遷移為 DRAFT Playbook + + Args: + yaml_path: alert_rules.yaml 路徑 + dry_run: True=只印計畫不寫 DB,False=真實寫入 + enable_migration: feature flag(ENABLE_RULE_MIGRATION_DRAFT),False 時直接 return + + Returns: + MigrationReport + + 設計: + - 冪等:name LIKE 'AutoMigrated: %' 已存在任何狀態的 playbook 即跳過 + - 不依賴 seed_service(source=yaml_rule 但 name prefix 不同,互不干擾) + - generic_fallback 規則(id=generic_fallback)也遷移,讓 RAG 能學到「兜底症狀」 + """ + report = MigrationReport(dry_run=dry_run) + + if not enable_migration: + logger.info("rule_migration_disabled_by_flag") + return report + + if not yaml_path.exists(): + logger.error("rule_migration_yaml_not_found", path=str(yaml_path)) + report.errors.append(f"yaml 不存在: {yaml_path}") + return report + + # 1. 解析 yaml + try: + rules = parse_yaml_rules(yaml_path) + except Exception as e: + logger.error("rule_migration_parse_error", error=str(e)) + report.errors.append(f"yaml 解析失敗: {e}") + return report + + report.total_rules = len(rules) + + if dry_run: + # Dry-run:只建立 dict,不查 DB、不寫 DB + for rule in rules: + rule_id = rule.get("id", "unknown") + try: + pb_dict = build_playbook_dict(rule) + report.created_names.append(pb_dict["name"]) + report.created += 1 + logger.info( + "rule_migration_dry_run_would_create", + rule_id=rule_id, + name=pb_dict["name"], + alert_names=pb_dict["symptom_pattern"]["alert_names"], + ) + except Exception as e: + report.failed += 1 + report.errors.append(f"rule_id={rule_id} 建立 dict 失敗: {e}") + logger.warning("rule_migration_dry_run_error", rule_id=rule_id, error=str(e)) + return report + + # 2. 查詢現有 AutoMigrated Playbook(冪等去重) + from src.db.base import get_db_context + from sqlalchemy import text as sa_text + + async with get_db_context() as db: + rows = await db.execute( + sa_text("SELECT name FROM playbooks WHERE name LIKE 'AutoMigrated: %'") + ) + existing_names: set[str] = {r[0] for r in rows.fetchall()} + + # 3. 逐條遷移 + from src.models.playbook import Playbook + from src.repositories.playbook_repository import get_playbook_repository + + repo = get_playbook_repository() + + for rule in rules: + rule_id = rule.get("id", "unknown") + try: + pb_dict = build_playbook_dict(rule) + name = pb_dict["name"] + + if name in existing_names: + report.skipped += 1 + report.skipped_names.append(name) + logger.debug("rule_migration_skip_existing", rule_id=rule_id, name=name) + continue + + playbook = Playbook(**pb_dict) + await repo.create(playbook) + report.created += 1 + report.created_names.append(name) + existing_names.add(name) # 防止同 session 重複建立 + + logger.info( + "rule_migration_created", + rule_id=rule_id, + playbook_id=playbook.playbook_id, + name=name, + ) + + except Exception as e: + report.failed += 1 + report.errors.append(f"rule_id={rule_id} 失敗: {e}") + logger.warning("rule_migration_create_error", rule_id=rule_id, error=str(e)) + + logger.info( + "rule_migration_complete", + total=report.total_rules, + created=report.created, + skipped=report.skipped, + failed=report.failed, + ) + return report diff --git a/apps/api/tests/test_rule_to_playbook_migrator.py b/apps/api/tests/test_rule_to_playbook_migrator.py new file mode 100644 index 00000000..ece26846 --- /dev/null +++ b/apps/api/tests/test_rule_to_playbook_migrator.py @@ -0,0 +1,454 @@ +""" +test_rule_to_playbook_migrator.py — 規則 → Playbook 遷移測試 +============================================================= +W1 PR-R1: 覆蓋遷移邏輯,不真寫 DB + +測試策略: +- 所有測試用 conftest.py 設定 MOCK_MODE=true,避免 DB 連線 +- DB 寫入用 AsyncMock 模擬 +- yaml 解析用臨時 fixture 檔案(不依賴實際 alert_rules.yaml 避免路徑問題) + +2026-04-28 ogt + Claude Sonnet 4.6 +""" +from __future__ import annotations + +import textwrap +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +import yaml + + +# ============================================================================= +# Fixtures +# ============================================================================= + +MINIMAL_RULE_KUBECTL = { + "id": "test_pod_crash", + "priority": 60, + "description": "Test Pod CrashLoopBackOff", + "match": { + "alertname": ["KubePodCrashLooping", "PodCrashLoopBackOff"], + "alert_type": ["pod_crash"], + "message": ["crashloop", "crash"], + }, + "response": { + "action_title": "診斷 CrashLoop 根因", + "description": "⚙️ 規則匹配: Pod 進入 CrashLoopBackOff", + "suggested_action": "NO_ACTION", + "kubectl_command": "kubectl logs {target} -n {namespace} --previous --tail=50", + "risk": "critical", + "responsibility": "BE", + "optimization": [ + { + "type": "LIVENESS_PROBE", + "description": "調整 liveness probe", + "command": "# 調整 initialDelaySeconds", + } + ], + }, +} + +MINIMAL_RULE_SSH = { + "id": "test_ollama_down", + "priority": 90, + "description": "Test Ollama Down", + "match": { + "alertname": ["OllamaDown"], + "message": ["ollama"], + }, + "response": { + "action_title": "重啟 Ollama", + "description": "⚙️ Ollama 下線", + "suggested_action": "RESTART_DEPLOYMENT", + "kubectl_command": "ssh {host} 'systemctl restart ollama'", + "risk": "medium", + "responsibility": "INFRA", + "optimization": [], + }, +} + +MINIMAL_RULE_NO_ACTION = { + "id": "test_no_action", + "priority": 110, + "description": "Test No Action Rule", + "match": { + "alertname": ["GiteaDown"], + "message": ["gitea"], + }, + "response": { + "action_title": "Gitea 下線 — 人工確認", + "description": "⚠️ Gitea 無法連線,不自動修復", + "suggested_action": "NO_ACTION", + "kubectl_command": "", + "risk": "critical", + "responsibility": "INFRA", + "optimization": [], + }, +} + +SEVERITY_TEST_CASES = [ + ("low", "LOW"), + ("medium", "MEDIUM"), + ("high", "HIGH"), + ("critical", "CRITICAL"), + ("", "MEDIUM"), + (None, "MEDIUM"), +] + + +@pytest.fixture +def minimal_yaml(tmp_path: Path) -> Path: + """建立含 3 條規則的最小 yaml fixture""" + data = { + "version": "1.0.0", + "updated_at": "2026-04-28", + "rules": [ + MINIMAL_RULE_KUBECTL, + MINIMAL_RULE_SSH, + MINIMAL_RULE_NO_ACTION, + ], + } + yaml_file = tmp_path / "test_alert_rules.yaml" + yaml_file.write_text(yaml.dump(data, allow_unicode=True), encoding="utf-8") + return yaml_file + + +@pytest.fixture +def empty_yaml(tmp_path: Path) -> Path: + """建立空規則 yaml""" + data = {"version": "1.0.0", "rules": []} + yaml_file = tmp_path / "empty_rules.yaml" + yaml_file.write_text(yaml.dump(data), encoding="utf-8") + return yaml_file + + +# ============================================================================= +# 1. test_parse_yaml_rule_extracts_alertname +# ============================================================================= + +def test_parse_yaml_rule_extracts_alertname(minimal_yaml: Path) -> None: + """parse_yaml_rules 能正確讀取 alertname list""" + from src.services.rule_to_playbook_migrator import parse_yaml_rules + + rules = parse_yaml_rules(minimal_yaml) + + assert len(rules) == 3 + # 第一條規則的 alertnames + alertnames = rules[0]["match"]["alertname"] + assert "KubePodCrashLooping" in alertnames + assert "PodCrashLoopBackOff" in alertnames + + +def test_parse_yaml_rule_all_fields(minimal_yaml: Path) -> None: + """parse_yaml_rules 保留所有欄位""" + from src.services.rule_to_playbook_migrator import parse_yaml_rules + + rules = parse_yaml_rules(minimal_yaml) + + rule = rules[0] + assert rule["id"] == "test_pod_crash" + assert rule["priority"] == 60 + assert "response" in rule + assert rule["response"]["risk"] == "critical" + + +# ============================================================================= +# 2. test_migration_creates_draft_playbook +# ============================================================================= + +def test_build_playbook_dict_creates_draft() -> None: + """build_playbook_dict 產生 status=draft 的 dict""" + from src.services.rule_to_playbook_migrator import build_playbook_dict + + pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL) + + assert pb_dict["status"] == "draft" + assert pb_dict["source"] == "yaml_rule" + assert pb_dict["ai_confidence"] == 0.3 + assert pb_dict["trust_score"] == 0.3 + + +def test_build_playbook_dict_name_prefix() -> None: + """name 格式必須是 'AutoMigrated: {rule_id}'""" + from src.services.rule_to_playbook_migrator import build_playbook_dict + + pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL) + + assert pb_dict["name"] == "AutoMigrated: test_pod_crash" + + +def test_build_playbook_dict_symptom_pattern_alertnames() -> None: + """symptom_pattern.alert_names 來自 match.alertname""" + from src.services.rule_to_playbook_migrator import build_playbook_dict + + pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL) + sp = pb_dict["symptom_pattern"] + + assert "KubePodCrashLooping" in sp["alert_names"] + assert "PodCrashLoopBackOff" in sp["alert_names"] + + +def test_build_playbook_dict_kubectl_action_type() -> None: + """kubectl 指令 → action_type=kubectl""" + from src.services.rule_to_playbook_migrator import build_playbook_dict + + pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL) + step = pb_dict["repair_steps"][0] + + assert step["action_type"] == "kubectl" + assert "kubectl logs" in step["command"] + + +def test_build_playbook_dict_ssh_action_type() -> None: + """ssh 指令 → action_type=ssh_command""" + from src.services.rule_to_playbook_migrator import build_playbook_dict + + pb_dict = build_playbook_dict(MINIMAL_RULE_SSH) + step = pb_dict["repair_steps"][0] + + assert step["action_type"] == "ssh_command" + assert step["command"].startswith("ssh ") + + +def test_build_playbook_dict_no_action_uses_manual() -> None: + """kubectl_command 為空時 → action_type=manual,command 為描述文字""" + from src.services.rule_to_playbook_migrator import build_playbook_dict + + pb_dict = build_playbook_dict(MINIMAL_RULE_NO_ACTION) + step = pb_dict["repair_steps"][0] + + assert step["action_type"] == "manual" + assert len(step["command"]) > 0 # 有描述文字 + assert step["requires_approval"] is True + + +# ============================================================================= +# 3. test_migration_idempotent_on_conflict +# ============================================================================= + +@pytest.mark.asyncio +async def test_migration_idempotent_on_conflict(minimal_yaml: Path) -> None: + """已存在的 AutoMigrated: name → 跳過,不重複建立""" + from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks + + # 模擬 DB 已有全部 3 條規則 + existing_names = { + "AutoMigrated: test_pod_crash", + "AutoMigrated: test_ollama_down", + "AutoMigrated: test_no_action", + } + + mock_result = MagicMock() + mock_result.fetchall.return_value = [(name,) for name in existing_names] + + mock_db = AsyncMock() + mock_db.execute = AsyncMock(return_value=mock_result) + + mock_cm = MagicMock() + mock_cm.__aenter__ = AsyncMock(return_value=mock_db) + mock_cm.__aexit__ = AsyncMock(return_value=False) + + mock_repo = MagicMock() + mock_repo.create = AsyncMock(side_effect=AssertionError("不應呼叫 create")) + + # patch src.db.base.get_db_context(lazy import 的攔截點) + with ( + patch("src.db.base.get_db_context", return_value=mock_cm), + patch("src.repositories.playbook_repository.get_playbook_repository", return_value=mock_repo), + ): + report = await migrate_yaml_rules_to_playbooks( + yaml_path=minimal_yaml, + dry_run=False, + enable_migration=True, + ) + + assert report.created == 0 + assert report.skipped == 3 + assert report.failed == 0 + + +# ============================================================================= +# 4. test_migration_dry_run_no_db_write +# ============================================================================= + +@pytest.mark.asyncio +async def test_migration_dry_run_no_db_write(minimal_yaml: Path) -> None: + """dry_run=True 時不查 DB、不寫 DB""" + from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks + + with ( + patch("src.db.base.get_db_context") as mock_db_ctx, + patch("src.repositories.playbook_repository.get_playbook_repository") as mock_repo, + ): + report = await migrate_yaml_rules_to_playbooks( + yaml_path=minimal_yaml, + dry_run=True, + enable_migration=True, + ) + + # 不呼叫 DB + mock_db_ctx.assert_not_called() + mock_repo.assert_not_called() + + # dry-run 時 created = 規則總數(全部「待建立」) + assert report.dry_run is True + assert report.total_rules == 3 + assert report.created == 3 + assert report.failed == 0 + + +# ============================================================================= +# 5. test_kubectl_command_validation_via_regex +# ============================================================================= + +def test_infer_action_type_kubectl() -> None: + """kubectl 指令 → kubectl""" + from src.services.rule_to_playbook_migrator import _infer_action_type + + assert _infer_action_type("kubectl delete pod foo -n default") == "kubectl" + assert _infer_action_type("kubectl rollout restart deployment/api -n awoooi") == "kubectl" + + +def test_infer_action_type_ssh() -> None: + """ssh 指令 → ssh_command""" + from src.services.rule_to_playbook_migrator import _infer_action_type + + assert _infer_action_type("ssh {host} 'docker restart minio'") == "ssh_command" + assert _infer_action_type("ssh root@192.168.0.111 'systemctl restart ollama'") == "ssh_command" + + +def test_infer_action_type_empty() -> None: + """空指令 → manual""" + from src.services.rule_to_playbook_migrator import _infer_action_type + + assert _infer_action_type("") == "manual" + assert _infer_action_type(" ") == "manual" + + +# ============================================================================= +# 6. test_severity_to_risk_level_mapping +# ============================================================================= + +@pytest.mark.parametrize("risk_str,expected", SEVERITY_TEST_CASES) +def test_severity_to_risk_level_mapping(risk_str: str | None, expected: str) -> None: + """YAML risk 欄位 → RiskLevel 字串映射""" + from src.services.rule_to_playbook_migrator import _infer_risk_level + + result = _infer_risk_level(risk_str) + assert result == expected + + +# ============================================================================= +# 7. test_feature_flag_disabled_skips_db_insert +# ============================================================================= + +@pytest.mark.asyncio +async def test_feature_flag_disabled_skips_db_insert(minimal_yaml: Path) -> None: + """enable_migration=False 時直接 return 空報告,不查 DB""" + from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks + + with ( + patch("src.db.base.get_db_context") as mock_db_ctx, + patch("src.repositories.playbook_repository.get_playbook_repository") as mock_repo, + ): + report = await migrate_yaml_rules_to_playbooks( + yaml_path=minimal_yaml, + dry_run=False, + enable_migration=False, + ) + + mock_db_ctx.assert_not_called() + mock_repo.assert_not_called() + + assert report.total_rules == 0 + assert report.created == 0 + assert report.skipped == 0 + + +# ============================================================================= +# 8. 整合路徑:1 條已存在 + 2 條新建 +# ============================================================================= + +@pytest.mark.asyncio +async def test_migration_partial_existing(minimal_yaml: Path) -> None: + """1 條已存在 → skipped=1,2 條新建 → created=2""" + from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks + + # 只有第一條已存在 + existing_names = {"AutoMigrated: test_pod_crash"} + + mock_result = MagicMock() + mock_result.fetchall.return_value = [(name,) for name in existing_names] + + mock_db = AsyncMock() + mock_db.execute = AsyncMock(return_value=mock_result) + + mock_cm = MagicMock() + mock_cm.__aenter__ = AsyncMock(return_value=mock_db) + mock_cm.__aexit__ = AsyncMock(return_value=False) + + mock_repo = MagicMock() + mock_repo.create = AsyncMock(side_effect=lambda pb: pb) + + with ( + patch("src.db.base.get_db_context", return_value=mock_cm), + patch("src.repositories.playbook_repository.PlaybookRepository", return_value=mock_repo), + patch("src.repositories.playbook_repository.get_playbook_repository", return_value=mock_repo), + ): + report = await migrate_yaml_rules_to_playbooks( + yaml_path=minimal_yaml, + dry_run=False, + enable_migration=True, + ) + + assert report.skipped == 1 + assert report.created == 2 + assert report.failed == 0 + assert mock_repo.create.call_count == 2 + + +# ============================================================================= +# 9. yaml 不存在 → 回傳有錯誤的報告 +# ============================================================================= + +@pytest.mark.asyncio +async def test_migration_yaml_not_found(tmp_path: Path) -> None: + """yaml 不存在 → 回傳有錯誤的報告,不 raise""" + from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks + + nonexistent = tmp_path / "nonexistent.yaml" + + report = await migrate_yaml_rules_to_playbooks( + yaml_path=nonexistent, + dry_run=False, + enable_migration=True, + ) + + assert report.failed == 0 + assert len(report.errors) == 1 + assert "不存在" in report.errors[0] + + +# ============================================================================= +# 10. symptom_pattern keywords 過濾萬用符 +# ============================================================================= + +def test_build_symptom_pattern_filters_wildcard() -> None: + """generic_fallback 的 alertname=['*'] 不應進入 keywords""" + from src.services.rule_to_playbook_migrator import _build_symptom_pattern + + generic_rule = { + "id": "generic_fallback", + "match": { + "alertname": ["*"], + "message": ["fallback"], + }, + "response": {"risk": "medium"}, + } + sp = _build_symptom_pattern(generic_rule) + + # alertname=['*'] 進入 alert_names 沒問題(就是萬用符) + # keywords 不應含 '*' + assert "*" not in sp["keywords"]