feat(flywheel): W1 PR-R1 規則→Playbook 遷移 + PR-K1 timeline 防禦 ALTER
W1 第二波:onboarder 飛輪 80→90 路徑剩餘兩件 PR。 ## PR-R1 — 25 條 yaml 規則 → DRAFT Playbook 遷移 斷鏈背景(onboarder C2):alert_rules.yaml 25 條規則 68% 寫死 RESTART, 沒有對應 Playbook → RAG 永遠 generic_fallback → 規則命中率沒回饋給 catalog。 修法: - 新建 services/rule_to_playbook_migrator.py - 自動從 alert_rules.yaml 解析每條 rule - 產生 PlaybookRecord(status=DRAFT, ai_confidence=0.3, source=YAML_RULE) - 誠實標示信心 0.3(非假 1.0,違反 feedback_confidence_truthfulness) - INSERT ON CONFLICT 冪等(name LIKE 'AutoMigrated: %' 去重,不擾動 seed) - 新建 scripts/migrate_rules_to_playbooks.py(CLI: --dry-run/--commit/--disable-flag) - ENABLE_RULE_MIGRATION_DRAFT=true(rollback flag) - 23 測試覆蓋(parse / build_dict / idempotent / dry_run / action_type / severity_map / feature_flag / wildcard_filter / partial_existing 等) ## PR-K1 — timeline_events 防禦性 ALTER(db-expert finding) 任務原前提錯誤:onboarder 報告的 C7 斷鏈(incident_id 欄位)在 2026-04-24 P1.6 已修復 ORM。但生產環境若在 P1.6 前已建表,create_all 跳過 已存在的表 → ORM 寫入 SELECT 仍可能找不到 column。 修法: - db/base.py:init_db() 補防禦性 ALTER: ALTER TABLE timeline_events ADD COLUMN IF NOT EXISTS incident_id VARCHAR(64); CREATE INDEX IF NOT EXISTS ix_timeline_incident_id ON timeline_events(incident_id); - IF NOT EXISTS 為 no-op 安全(已有 column 不做事) - stage 欄位是任務描述的幻覺(codebase 0 writer),不新增 未做: - alembic migration(專案不用 alembic,遵循既有 init_db ALTER pattern) - onboarder C7 在 ORM 層已修,本 commit 確保 prod schema 對齊 ## 驗證 - 1608 unit tests 全綠(+23 from 1585) - PR-R1 23 個測試獨立通過 ## 期望影響 - 飛輪 RAG 終於有 25 條 DRAFT Playbook 可查 → +5 分 - prod schema 對齊保險 → 防 ORM SELECT 失敗 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
140
apps/api/scripts/migrate_rules_to_playbooks.py
Normal file
140
apps/api/scripts/migrate_rules_to_playbooks.py
Normal file
@@ -0,0 +1,140 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
migrate_rules_to_playbooks.py — 規則 → Playbook 遷移 CLI
|
||||
=========================================================
|
||||
將 alert_rules.yaml 中的 25 條規則遷移為 DRAFT Playbook,讓飛輪 RAG 有資料可查。
|
||||
|
||||
用法:
|
||||
# 預設 dry-run(只印計畫,不寫 DB)
|
||||
python scripts/migrate_rules_to_playbooks.py
|
||||
|
||||
# 指定 yaml 路徑
|
||||
python scripts/migrate_rules_to_playbooks.py --yaml-path /path/to/alert_rules.yaml
|
||||
|
||||
# 真實寫入 DB
|
||||
python scripts/migrate_rules_to_playbooks.py --commit
|
||||
|
||||
# 完整選項
|
||||
python scripts/migrate_rules_to_playbooks.py --yaml-path alert_rules.yaml --commit
|
||||
|
||||
W1 PR-R1 — 規則 → Playbook 遷移
|
||||
2026-04-28 ogt + Claude Sonnet 4.6
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# 確保 apps/api/src 在 import path 中(從 scripts/ 執行時)
|
||||
_SCRIPT_DIR = Path(__file__).parent
|
||||
_API_ROOT = _SCRIPT_DIR.parent
|
||||
sys.path.insert(0, str(_API_ROOT))
|
||||
|
||||
# 預設 yaml 路徑:相對 scripts/ 的上一層(apps/api/alert_rules.yaml)
|
||||
_DEFAULT_YAML_PATH = _API_ROOT / "alert_rules.yaml"
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="將 alert_rules.yaml 遷移為 DRAFT Playbook(飛輪 RAG 冷啟動)",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
範例:
|
||||
python scripts/migrate_rules_to_playbooks.py # dry-run(預設)
|
||||
python scripts/migrate_rules_to_playbooks.py --commit # 真實寫入
|
||||
python scripts/migrate_rules_to_playbooks.py --yaml-path alert_rules.yaml --commit
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--yaml-path",
|
||||
type=Path,
|
||||
default=_DEFAULT_YAML_PATH,
|
||||
help=f"alert_rules.yaml 路徑(預設: {_DEFAULT_YAML_PATH})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--commit",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="真實寫入 DB(預設 dry-run,僅印計畫)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--disable-flag",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="模擬 ENABLE_RULE_MIGRATION_DRAFT=false(測試 feature flag 關閉路徑)",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
async def _run(args: argparse.Namespace) -> int:
|
||||
"""
|
||||
非同步主流程
|
||||
|
||||
Returns:
|
||||
exit code (0=成功, 1=有錯誤)
|
||||
"""
|
||||
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
|
||||
|
||||
yaml_path: Path = args.yaml_path
|
||||
dry_run: bool = not args.commit
|
||||
enable_migration: bool = not args.disable_flag
|
||||
|
||||
# 讀取 feature flag(環境變數優先,CLI flag 次之)
|
||||
env_flag = os.environ.get("ENABLE_RULE_MIGRATION_DRAFT", "").lower()
|
||||
if env_flag == "false":
|
||||
enable_migration = False
|
||||
|
||||
print(f"\n{'[DRY-RUN] ' if dry_run else ''}規則 → Playbook 遷移")
|
||||
print(f" yaml_path: {yaml_path}")
|
||||
print(f" enable_migration: {enable_migration}")
|
||||
print(f" dry_run: {dry_run}")
|
||||
print()
|
||||
|
||||
if not yaml_path.exists():
|
||||
print(f"[ERROR] yaml 不存在: {yaml_path}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
report = await migrate_yaml_rules_to_playbooks(
|
||||
yaml_path=yaml_path,
|
||||
dry_run=dry_run,
|
||||
enable_migration=enable_migration,
|
||||
)
|
||||
|
||||
# 輸出報告
|
||||
print("=" * 60)
|
||||
print(report.summary())
|
||||
print("=" * 60)
|
||||
|
||||
if report.created_names:
|
||||
action = "待建立" if dry_run else "已建立"
|
||||
print(f"\n{action} ({len(report.created_names)} 條):")
|
||||
for name in report.created_names:
|
||||
print(f" + {name}")
|
||||
|
||||
if report.skipped_names:
|
||||
print(f"\n已跳過(已存在)({len(report.skipped_names)} 條):")
|
||||
for name in report.skipped_names:
|
||||
print(f" ~ {name}")
|
||||
|
||||
if report.errors:
|
||||
print(f"\n[ERROR] 失敗 ({len(report.errors)} 條):", file=sys.stderr)
|
||||
for err in report.errors:
|
||||
print(f" ! {err}", file=sys.stderr)
|
||||
|
||||
if dry_run and report.created > 0:
|
||||
print(f"\n提示: 加 --commit 參數執行實際寫入(將建立 {report.created} 條 DRAFT Playbook)")
|
||||
|
||||
return 1 if report.failed > 0 else 0
|
||||
|
||||
|
||||
def main() -> None:
|
||||
args = parse_args()
|
||||
exit_code = asyncio.run(_run(args))
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -72,6 +72,16 @@ class Settings(BaseSettings):
|
||||
description="W1 PR-P1: True=generate_proposal 時執行 Playbook RAG 匹配並填 matched_playbook_id, False=行為與修復前完全相同(回滾用)",
|
||||
)
|
||||
|
||||
# ==========================================================================
|
||||
# W1 PR-R1: 規則 → Playbook 遷移 Feature Flag (2026-04-28 ogt + Claude Sonnet 4.6)
|
||||
# 將 alert_rules.yaml 25 條規則遷移為 DRAFT Playbook(飛輪 RAG 冷啟動)
|
||||
# 回滾指令: kubectl set env deployment/awoooi-api ENABLE_RULE_MIGRATION_DRAFT=false
|
||||
# ==========================================================================
|
||||
ENABLE_RULE_MIGRATION_DRAFT: bool = Field(
|
||||
default=True,
|
||||
description="W1 PR-R1: True=允許 migrate_rules_to_playbooks CLI 寫入 DB, False=停用寫入(回滾用)",
|
||||
)
|
||||
|
||||
# ==========================================================================
|
||||
# P1-1: KMWriter 統一契約 (2026-04-28 ogt + Claude Sonnet 4.6)
|
||||
# KM_WRITE_AWAIT=true → 強制 await asyncio.wait_for(timeout=KM_WRITE_TIMEOUT_SECONDS)
|
||||
|
||||
@@ -220,6 +220,21 @@ async def init_db() -> None:
|
||||
""")
|
||||
)
|
||||
|
||||
# 2026-04-29 ogt + Claude Opus 4.7: PR-K1 防禦性 ALTER (db-expert finding)
|
||||
# P1.6 (2026-04-24) ORM 已加 timeline_events.incident_id,但 prod 若在 P1.6 前
|
||||
# 已建表,create_all 跳過已存在的表 → ALTER 不會跑 → ORM 寫入 SELECT 找不到欄位
|
||||
# 補防禦性 IF NOT EXISTS(已有 column 為 no-op,安全)
|
||||
await conn.execute(
|
||||
text("""
|
||||
ALTER TABLE timeline_events
|
||||
ADD COLUMN IF NOT EXISTS incident_id VARCHAR(64);
|
||||
""")
|
||||
)
|
||||
await conn.execute(text(
|
||||
"CREATE INDEX IF NOT EXISTS ix_timeline_incident_id "
|
||||
"ON timeline_events(incident_id);"
|
||||
))
|
||||
|
||||
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 自我治理閉環
|
||||
# ADR-087: ai_governance_events 不可變 Event Sourcing 表
|
||||
# asyncpg 不允許 prepared statement 內多條指令,必須分開 execute
|
||||
|
||||
405
apps/api/src/services/rule_to_playbook_migrator.py
Normal file
405
apps/api/src/services/rule_to_playbook_migrator.py
Normal file
@@ -0,0 +1,405 @@
|
||||
"""
|
||||
Rule → Playbook Migrator
|
||||
========================
|
||||
將 alert_rules.yaml 中的 25 條規則遷移為 DRAFT Playbook,讓飛輪 RAG 有料可查。
|
||||
|
||||
設計原則:
|
||||
- status=DRAFT(不直接 APPROVED — 違反「禁寫死」鐵律)
|
||||
- ai_confidence=0.3(誠實標示,非假 1.0 — 違反 feedback_confidence_truthfulness)
|
||||
- source=PlaybookSource.YAML_RULE(現有 enum,不新增 RULE_MIGRATED)
|
||||
- 冪等:name LIKE 'AutoMigrated: %' 已存在則跳過
|
||||
- INSERT ON CONFLICT → repo.create() UPSERT(playbook_id 唯一鍵)
|
||||
- 與 playbook_seed_service.py 完全解耦(不擾動既有 seed 機制)
|
||||
|
||||
name 格式: "AutoMigrated: {rule.id}" — 與 seed_service 用 description 作 name 的格式區隔
|
||||
|
||||
W1 PR-R1 — 規則 → Playbook 遷移
|
||||
2026-04-28 ogt + Claude Sonnet 4.6
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
import yaml
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# 告警 severity → risk 等級
|
||||
_SEVERITY_TO_RISK: dict[str, str] = {
|
||||
"low": "LOW",
|
||||
"medium": "MEDIUM",
|
||||
"high": "HIGH",
|
||||
"critical": "CRITICAL",
|
||||
}
|
||||
|
||||
# yaml risk 欄位允許 "high" 但 RiskLevel enum 有 HIGH;seed_service 用的 map 少了 high
|
||||
_YAML_RISK_MAP: dict[str, str] = {
|
||||
"low": "LOW",
|
||||
"medium": "MEDIUM",
|
||||
"high": "HIGH",
|
||||
"critical": "CRITICAL",
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class MigrationReport:
|
||||
"""遷移報告"""
|
||||
total_rules: int = 0
|
||||
created: int = 0
|
||||
skipped: int = 0
|
||||
failed: int = 0
|
||||
dry_run: bool = False
|
||||
errors: list[str] = field(default_factory=list)
|
||||
created_names: list[str] = field(default_factory=list)
|
||||
skipped_names: list[str] = field(default_factory=list)
|
||||
|
||||
def summary(self) -> str:
|
||||
mode = "[DRY-RUN] " if self.dry_run else ""
|
||||
return (
|
||||
f"{mode}遷移完成 — "
|
||||
f"總計 {self.total_rules} 條規則,"
|
||||
f"建立 {self.created},跳過 {self.skipped},失敗 {self.failed}"
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 命令類型判斷(不依賴 SPF-2 action_parser,用既有 regex 守門)
|
||||
# =============================================================================
|
||||
|
||||
def _infer_action_type(kubectl_command: str) -> str:
|
||||
"""
|
||||
從指令字串推斷 ActionType(字串形式,對應 ActionType enum 值)
|
||||
|
||||
規則:
|
||||
- 空字串 → "manual"
|
||||
- 以 "ssh " 開頭 → "ssh_command"
|
||||
- 其他有指令 → "kubectl"
|
||||
"""
|
||||
cmd = (kubectl_command or "").strip()
|
||||
if not cmd:
|
||||
return "manual"
|
||||
if cmd.startswith("ssh "):
|
||||
return "ssh_command"
|
||||
return "kubectl"
|
||||
|
||||
|
||||
def _infer_risk_level(risk_str: str) -> str:
|
||||
"""
|
||||
YAML risk 欄位 → RiskLevel 字串
|
||||
|
||||
alert_rules.yaml 的 risk 欄位值: low / medium / high / critical
|
||||
"""
|
||||
return _YAML_RISK_MAP.get((risk_str or "medium").lower(), "MEDIUM")
|
||||
|
||||
|
||||
def _build_symptom_pattern(rule: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
從規則 match block 推導 SymptomPattern dict
|
||||
|
||||
symptom_pattern 包含:
|
||||
- alert_names: match.alertname list
|
||||
- affected_services: 從 id/description 推導關鍵字(保守策略:留空,讓 RAG 學習)
|
||||
- severity_range: 從 risk 反推 ["P1"] / ["P2"] / ["P3"]
|
||||
- keywords: match.message list(部分匹配關鍵字)
|
||||
"""
|
||||
match_block = rule.get("match", {})
|
||||
alertnames: list[str] = match_block.get("alertname", [])
|
||||
messages: list[str] = match_block.get("message", [])
|
||||
alert_types: list[str] = match_block.get("alert_type", [])
|
||||
|
||||
# risk → severity_range 反推
|
||||
risk_str = (rule.get("response", {}).get("risk", "medium") or "medium").lower()
|
||||
if risk_str == "critical":
|
||||
severity_range = ["P1", "P2"]
|
||||
elif risk_str in ("high", "medium"):
|
||||
severity_range = ["P2", "P3"]
|
||||
else:
|
||||
severity_range = ["P3"]
|
||||
|
||||
# keywords: message + alert_type 列表合併(最多 15 個)
|
||||
keywords = list(messages) + list(alert_types)
|
||||
# 過濾萬用符(generic_fallback 有 "*")
|
||||
keywords = [k for k in keywords if k != "*"][:15]
|
||||
|
||||
return {
|
||||
"alert_names": alertnames if isinstance(alertnames, list) else [alertnames],
|
||||
"affected_services": [],
|
||||
"severity_range": severity_range,
|
||||
"keywords": keywords,
|
||||
"label_patterns": {},
|
||||
}
|
||||
|
||||
|
||||
def _build_repair_steps(rule: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
"""
|
||||
從規則 response block 建立 RepairStep dict list
|
||||
|
||||
策略:
|
||||
- kubectl_command 存在且非空 → step 1
|
||||
- 若 optimization list 存在 → 每項追加為額外步驟
|
||||
- 若 kubectl_command 空 (NO_ACTION) → step 1 action_type=manual,command=描述文字
|
||||
"""
|
||||
resp = rule.get("response", {})
|
||||
kubectl_cmd = (resp.get("kubectl_command", "") or "").strip()
|
||||
risk_level = _infer_risk_level(resp.get("risk", "medium"))
|
||||
suggested_action = resp.get("suggested_action", "NO_ACTION") or "NO_ACTION"
|
||||
|
||||
steps: list[dict[str, Any]] = []
|
||||
|
||||
if kubectl_cmd:
|
||||
action_type = _infer_action_type(kubectl_cmd)
|
||||
steps.append({
|
||||
"step_number": 1,
|
||||
"action_type": action_type,
|
||||
"command": kubectl_cmd,
|
||||
"expected_result": resp.get("action_title", ""),
|
||||
"risk_level": risk_level,
|
||||
"requires_approval": risk_level == "CRITICAL" or suggested_action in ("RESTART_DEPLOYMENT", "DELETE_POD", "SCALE_DEPLOYMENT"),
|
||||
})
|
||||
else:
|
||||
# NO_ACTION — 記錄診斷描述為 manual step,讓 RAG 至少有症狀可查
|
||||
description_text = resp.get("description", rule.get("description", "人工診斷"))
|
||||
steps.append({
|
||||
"step_number": 1,
|
||||
"action_type": "manual",
|
||||
"command": description_text[:500],
|
||||
"expected_result": resp.get("action_title", ""),
|
||||
"risk_level": risk_level,
|
||||
"requires_approval": True,
|
||||
})
|
||||
|
||||
# 追加 optimization steps(最多 3 個,step_number 從 2 開始)
|
||||
for idx, opt in enumerate(resp.get("optimization", []) or [], start=2):
|
||||
opt_cmd = (opt.get("command", "") or "").strip()
|
||||
if not opt_cmd or opt_cmd.startswith("#"):
|
||||
continue
|
||||
steps.append({
|
||||
"step_number": idx,
|
||||
"action_type": _infer_action_type(opt_cmd),
|
||||
"command": opt_cmd,
|
||||
"expected_result": opt.get("description", ""),
|
||||
"risk_level": "LOW",
|
||||
"requires_approval": False,
|
||||
})
|
||||
if idx >= 4: # 最多 3 個 optimization steps
|
||||
break
|
||||
|
||||
return steps
|
||||
|
||||
|
||||
def _estimated_duration(risk_level: str, suggested_action: str) -> int:
|
||||
"""估算修復時間(分鐘)"""
|
||||
if suggested_action in ("NO_ACTION",):
|
||||
return 15
|
||||
if risk_level == "CRITICAL":
|
||||
return 5
|
||||
return 3
|
||||
|
||||
|
||||
def _build_tags(rule: dict[str, Any]) -> list[str]:
|
||||
"""從規則提取標籤"""
|
||||
tags: set[str] = {"yaml_rule", "auto_migrated"}
|
||||
|
||||
rule_id = rule.get("id", "")
|
||||
resp = rule.get("response", {})
|
||||
responsibility = resp.get("responsibility", "")
|
||||
if responsibility:
|
||||
tags.add(responsibility.lower())
|
||||
|
||||
# 從 alertname 推導類型標籤
|
||||
alertnames = rule.get("match", {}).get("alertname", [])
|
||||
for name in alertnames:
|
||||
name_lower = (name or "").lower()
|
||||
if "cpu" in name_lower:
|
||||
tags.add("cpu")
|
||||
if "memory" in name_lower or "oom" in name_lower:
|
||||
tags.add("memory")
|
||||
if "disk" in name_lower or "storage" in name_lower:
|
||||
tags.add("disk")
|
||||
if "pod" in name_lower or "k8s" in name_lower or "kube" in name_lower:
|
||||
tags.add("kubernetes")
|
||||
if "ssl" in name_lower or "cert" in name_lower:
|
||||
tags.add("ssl")
|
||||
if "backup" in name_lower:
|
||||
tags.add("backup")
|
||||
if "postgresql" in name_lower or "postgres" in name_lower:
|
||||
tags.add("database")
|
||||
if "redis" in name_lower:
|
||||
tags.add("cache")
|
||||
if "ollama" in name_lower:
|
||||
tags.add("ai")
|
||||
|
||||
return list(tags)[:10]
|
||||
|
||||
|
||||
def parse_yaml_rules(yaml_path: Path) -> list[dict[str, Any]]:
|
||||
"""
|
||||
讀取並解析 alert_rules.yaml,回傳 rules list
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: yaml 不存在
|
||||
yaml.YAMLError: yaml 格式錯誤
|
||||
"""
|
||||
data = yaml.safe_load(yaml_path.read_text(encoding="utf-8"))
|
||||
rules = data.get("rules", [])
|
||||
return [r for r in rules if isinstance(r, dict)]
|
||||
|
||||
|
||||
def build_playbook_dict(rule: dict[str, Any]) -> dict[str, Any]:
|
||||
"""
|
||||
從單條規則建立 Playbook 初始化 dict(不寫 DB)
|
||||
|
||||
Returns dict 可直接傳給 Playbook(**dict)
|
||||
"""
|
||||
rule_id = rule.get("id", "unknown")
|
||||
resp = rule.get("response", {})
|
||||
description = resp.get("description", rule.get("description", f"規則 {rule_id} 自動遷移"))
|
||||
risk_str = (resp.get("risk", "medium") or "medium").lower()
|
||||
suggested_action = resp.get("suggested_action", "NO_ACTION") or "NO_ACTION"
|
||||
|
||||
symptom_pattern = _build_symptom_pattern(rule)
|
||||
repair_steps = _build_repair_steps(rule)
|
||||
tags = _build_tags(rule)
|
||||
risk_level = _infer_risk_level(risk_str)
|
||||
duration = _estimated_duration(risk_level, suggested_action)
|
||||
|
||||
return {
|
||||
"name": f"AutoMigrated: {rule_id}",
|
||||
"description": description[:2000],
|
||||
"status": "draft",
|
||||
"source": "yaml_rule",
|
||||
"symptom_pattern": symptom_pattern,
|
||||
"repair_steps": repair_steps,
|
||||
"estimated_duration_minutes": duration,
|
||||
"ai_confidence": 0.3,
|
||||
"trust_score": 0.3,
|
||||
"tags": tags,
|
||||
"notes": f"自動從 alert_rules.yaml rule.id={rule_id} 遷移。priority={rule.get('priority', 999)}",
|
||||
"created_by_agent": "migrator",
|
||||
}
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 核心遷移函式(async,依賴 DB)
|
||||
# =============================================================================
|
||||
|
||||
async def migrate_yaml_rules_to_playbooks(
|
||||
yaml_path: Path,
|
||||
dry_run: bool = True,
|
||||
enable_migration: bool = True,
|
||||
) -> MigrationReport:
|
||||
"""
|
||||
將 alert_rules.yaml 遷移為 DRAFT Playbook
|
||||
|
||||
Args:
|
||||
yaml_path: alert_rules.yaml 路徑
|
||||
dry_run: True=只印計畫不寫 DB,False=真實寫入
|
||||
enable_migration: feature flag(ENABLE_RULE_MIGRATION_DRAFT),False 時直接 return
|
||||
|
||||
Returns:
|
||||
MigrationReport
|
||||
|
||||
設計:
|
||||
- 冪等:name LIKE 'AutoMigrated: %' 已存在任何狀態的 playbook 即跳過
|
||||
- 不依賴 seed_service(source=yaml_rule 但 name prefix 不同,互不干擾)
|
||||
- generic_fallback 規則(id=generic_fallback)也遷移,讓 RAG 能學到「兜底症狀」
|
||||
"""
|
||||
report = MigrationReport(dry_run=dry_run)
|
||||
|
||||
if not enable_migration:
|
||||
logger.info("rule_migration_disabled_by_flag")
|
||||
return report
|
||||
|
||||
if not yaml_path.exists():
|
||||
logger.error("rule_migration_yaml_not_found", path=str(yaml_path))
|
||||
report.errors.append(f"yaml 不存在: {yaml_path}")
|
||||
return report
|
||||
|
||||
# 1. 解析 yaml
|
||||
try:
|
||||
rules = parse_yaml_rules(yaml_path)
|
||||
except Exception as e:
|
||||
logger.error("rule_migration_parse_error", error=str(e))
|
||||
report.errors.append(f"yaml 解析失敗: {e}")
|
||||
return report
|
||||
|
||||
report.total_rules = len(rules)
|
||||
|
||||
if dry_run:
|
||||
# Dry-run:只建立 dict,不查 DB、不寫 DB
|
||||
for rule in rules:
|
||||
rule_id = rule.get("id", "unknown")
|
||||
try:
|
||||
pb_dict = build_playbook_dict(rule)
|
||||
report.created_names.append(pb_dict["name"])
|
||||
report.created += 1
|
||||
logger.info(
|
||||
"rule_migration_dry_run_would_create",
|
||||
rule_id=rule_id,
|
||||
name=pb_dict["name"],
|
||||
alert_names=pb_dict["symptom_pattern"]["alert_names"],
|
||||
)
|
||||
except Exception as e:
|
||||
report.failed += 1
|
||||
report.errors.append(f"rule_id={rule_id} 建立 dict 失敗: {e}")
|
||||
logger.warning("rule_migration_dry_run_error", rule_id=rule_id, error=str(e))
|
||||
return report
|
||||
|
||||
# 2. 查詢現有 AutoMigrated Playbook(冪等去重)
|
||||
from src.db.base import get_db_context
|
||||
from sqlalchemy import text as sa_text
|
||||
|
||||
async with get_db_context() as db:
|
||||
rows = await db.execute(
|
||||
sa_text("SELECT name FROM playbooks WHERE name LIKE 'AutoMigrated: %'")
|
||||
)
|
||||
existing_names: set[str] = {r[0] for r in rows.fetchall()}
|
||||
|
||||
# 3. 逐條遷移
|
||||
from src.models.playbook import Playbook
|
||||
from src.repositories.playbook_repository import get_playbook_repository
|
||||
|
||||
repo = get_playbook_repository()
|
||||
|
||||
for rule in rules:
|
||||
rule_id = rule.get("id", "unknown")
|
||||
try:
|
||||
pb_dict = build_playbook_dict(rule)
|
||||
name = pb_dict["name"]
|
||||
|
||||
if name in existing_names:
|
||||
report.skipped += 1
|
||||
report.skipped_names.append(name)
|
||||
logger.debug("rule_migration_skip_existing", rule_id=rule_id, name=name)
|
||||
continue
|
||||
|
||||
playbook = Playbook(**pb_dict)
|
||||
await repo.create(playbook)
|
||||
report.created += 1
|
||||
report.created_names.append(name)
|
||||
existing_names.add(name) # 防止同 session 重複建立
|
||||
|
||||
logger.info(
|
||||
"rule_migration_created",
|
||||
rule_id=rule_id,
|
||||
playbook_id=playbook.playbook_id,
|
||||
name=name,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
report.failed += 1
|
||||
report.errors.append(f"rule_id={rule_id} 失敗: {e}")
|
||||
logger.warning("rule_migration_create_error", rule_id=rule_id, error=str(e))
|
||||
|
||||
logger.info(
|
||||
"rule_migration_complete",
|
||||
total=report.total_rules,
|
||||
created=report.created,
|
||||
skipped=report.skipped,
|
||||
failed=report.failed,
|
||||
)
|
||||
return report
|
||||
454
apps/api/tests/test_rule_to_playbook_migrator.py
Normal file
454
apps/api/tests/test_rule_to_playbook_migrator.py
Normal file
@@ -0,0 +1,454 @@
|
||||
"""
|
||||
test_rule_to_playbook_migrator.py — 規則 → Playbook 遷移測試
|
||||
=============================================================
|
||||
W1 PR-R1: 覆蓋遷移邏輯,不真寫 DB
|
||||
|
||||
測試策略:
|
||||
- 所有測試用 conftest.py 設定 MOCK_MODE=true,避免 DB 連線
|
||||
- DB 寫入用 AsyncMock 模擬
|
||||
- yaml 解析用臨時 fixture 檔案(不依賴實際 alert_rules.yaml 避免路徑問題)
|
||||
|
||||
2026-04-28 ogt + Claude Sonnet 4.6
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import yaml
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Fixtures
|
||||
# =============================================================================
|
||||
|
||||
MINIMAL_RULE_KUBECTL = {
|
||||
"id": "test_pod_crash",
|
||||
"priority": 60,
|
||||
"description": "Test Pod CrashLoopBackOff",
|
||||
"match": {
|
||||
"alertname": ["KubePodCrashLooping", "PodCrashLoopBackOff"],
|
||||
"alert_type": ["pod_crash"],
|
||||
"message": ["crashloop", "crash"],
|
||||
},
|
||||
"response": {
|
||||
"action_title": "診斷 CrashLoop 根因",
|
||||
"description": "⚙️ 規則匹配: Pod 進入 CrashLoopBackOff",
|
||||
"suggested_action": "NO_ACTION",
|
||||
"kubectl_command": "kubectl logs {target} -n {namespace} --previous --tail=50",
|
||||
"risk": "critical",
|
||||
"responsibility": "BE",
|
||||
"optimization": [
|
||||
{
|
||||
"type": "LIVENESS_PROBE",
|
||||
"description": "調整 liveness probe",
|
||||
"command": "# 調整 initialDelaySeconds",
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
MINIMAL_RULE_SSH = {
|
||||
"id": "test_ollama_down",
|
||||
"priority": 90,
|
||||
"description": "Test Ollama Down",
|
||||
"match": {
|
||||
"alertname": ["OllamaDown"],
|
||||
"message": ["ollama"],
|
||||
},
|
||||
"response": {
|
||||
"action_title": "重啟 Ollama",
|
||||
"description": "⚙️ Ollama 下線",
|
||||
"suggested_action": "RESTART_DEPLOYMENT",
|
||||
"kubectl_command": "ssh {host} 'systemctl restart ollama'",
|
||||
"risk": "medium",
|
||||
"responsibility": "INFRA",
|
||||
"optimization": [],
|
||||
},
|
||||
}
|
||||
|
||||
MINIMAL_RULE_NO_ACTION = {
|
||||
"id": "test_no_action",
|
||||
"priority": 110,
|
||||
"description": "Test No Action Rule",
|
||||
"match": {
|
||||
"alertname": ["GiteaDown"],
|
||||
"message": ["gitea"],
|
||||
},
|
||||
"response": {
|
||||
"action_title": "Gitea 下線 — 人工確認",
|
||||
"description": "⚠️ Gitea 無法連線,不自動修復",
|
||||
"suggested_action": "NO_ACTION",
|
||||
"kubectl_command": "",
|
||||
"risk": "critical",
|
||||
"responsibility": "INFRA",
|
||||
"optimization": [],
|
||||
},
|
||||
}
|
||||
|
||||
SEVERITY_TEST_CASES = [
|
||||
("low", "LOW"),
|
||||
("medium", "MEDIUM"),
|
||||
("high", "HIGH"),
|
||||
("critical", "CRITICAL"),
|
||||
("", "MEDIUM"),
|
||||
(None, "MEDIUM"),
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def minimal_yaml(tmp_path: Path) -> Path:
|
||||
"""建立含 3 條規則的最小 yaml fixture"""
|
||||
data = {
|
||||
"version": "1.0.0",
|
||||
"updated_at": "2026-04-28",
|
||||
"rules": [
|
||||
MINIMAL_RULE_KUBECTL,
|
||||
MINIMAL_RULE_SSH,
|
||||
MINIMAL_RULE_NO_ACTION,
|
||||
],
|
||||
}
|
||||
yaml_file = tmp_path / "test_alert_rules.yaml"
|
||||
yaml_file.write_text(yaml.dump(data, allow_unicode=True), encoding="utf-8")
|
||||
return yaml_file
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def empty_yaml(tmp_path: Path) -> Path:
|
||||
"""建立空規則 yaml"""
|
||||
data = {"version": "1.0.0", "rules": []}
|
||||
yaml_file = tmp_path / "empty_rules.yaml"
|
||||
yaml_file.write_text(yaml.dump(data), encoding="utf-8")
|
||||
return yaml_file
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 1. test_parse_yaml_rule_extracts_alertname
|
||||
# =============================================================================
|
||||
|
||||
def test_parse_yaml_rule_extracts_alertname(minimal_yaml: Path) -> None:
|
||||
"""parse_yaml_rules 能正確讀取 alertname list"""
|
||||
from src.services.rule_to_playbook_migrator import parse_yaml_rules
|
||||
|
||||
rules = parse_yaml_rules(minimal_yaml)
|
||||
|
||||
assert len(rules) == 3
|
||||
# 第一條規則的 alertnames
|
||||
alertnames = rules[0]["match"]["alertname"]
|
||||
assert "KubePodCrashLooping" in alertnames
|
||||
assert "PodCrashLoopBackOff" in alertnames
|
||||
|
||||
|
||||
def test_parse_yaml_rule_all_fields(minimal_yaml: Path) -> None:
|
||||
"""parse_yaml_rules 保留所有欄位"""
|
||||
from src.services.rule_to_playbook_migrator import parse_yaml_rules
|
||||
|
||||
rules = parse_yaml_rules(minimal_yaml)
|
||||
|
||||
rule = rules[0]
|
||||
assert rule["id"] == "test_pod_crash"
|
||||
assert rule["priority"] == 60
|
||||
assert "response" in rule
|
||||
assert rule["response"]["risk"] == "critical"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 2. test_migration_creates_draft_playbook
|
||||
# =============================================================================
|
||||
|
||||
def test_build_playbook_dict_creates_draft() -> None:
|
||||
"""build_playbook_dict 產生 status=draft 的 dict"""
|
||||
from src.services.rule_to_playbook_migrator import build_playbook_dict
|
||||
|
||||
pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL)
|
||||
|
||||
assert pb_dict["status"] == "draft"
|
||||
assert pb_dict["source"] == "yaml_rule"
|
||||
assert pb_dict["ai_confidence"] == 0.3
|
||||
assert pb_dict["trust_score"] == 0.3
|
||||
|
||||
|
||||
def test_build_playbook_dict_name_prefix() -> None:
|
||||
"""name 格式必須是 'AutoMigrated: {rule_id}'"""
|
||||
from src.services.rule_to_playbook_migrator import build_playbook_dict
|
||||
|
||||
pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL)
|
||||
|
||||
assert pb_dict["name"] == "AutoMigrated: test_pod_crash"
|
||||
|
||||
|
||||
def test_build_playbook_dict_symptom_pattern_alertnames() -> None:
|
||||
"""symptom_pattern.alert_names 來自 match.alertname"""
|
||||
from src.services.rule_to_playbook_migrator import build_playbook_dict
|
||||
|
||||
pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL)
|
||||
sp = pb_dict["symptom_pattern"]
|
||||
|
||||
assert "KubePodCrashLooping" in sp["alert_names"]
|
||||
assert "PodCrashLoopBackOff" in sp["alert_names"]
|
||||
|
||||
|
||||
def test_build_playbook_dict_kubectl_action_type() -> None:
|
||||
"""kubectl 指令 → action_type=kubectl"""
|
||||
from src.services.rule_to_playbook_migrator import build_playbook_dict
|
||||
|
||||
pb_dict = build_playbook_dict(MINIMAL_RULE_KUBECTL)
|
||||
step = pb_dict["repair_steps"][0]
|
||||
|
||||
assert step["action_type"] == "kubectl"
|
||||
assert "kubectl logs" in step["command"]
|
||||
|
||||
|
||||
def test_build_playbook_dict_ssh_action_type() -> None:
|
||||
"""ssh 指令 → action_type=ssh_command"""
|
||||
from src.services.rule_to_playbook_migrator import build_playbook_dict
|
||||
|
||||
pb_dict = build_playbook_dict(MINIMAL_RULE_SSH)
|
||||
step = pb_dict["repair_steps"][0]
|
||||
|
||||
assert step["action_type"] == "ssh_command"
|
||||
assert step["command"].startswith("ssh ")
|
||||
|
||||
|
||||
def test_build_playbook_dict_no_action_uses_manual() -> None:
|
||||
"""kubectl_command 為空時 → action_type=manual,command 為描述文字"""
|
||||
from src.services.rule_to_playbook_migrator import build_playbook_dict
|
||||
|
||||
pb_dict = build_playbook_dict(MINIMAL_RULE_NO_ACTION)
|
||||
step = pb_dict["repair_steps"][0]
|
||||
|
||||
assert step["action_type"] == "manual"
|
||||
assert len(step["command"]) > 0 # 有描述文字
|
||||
assert step["requires_approval"] is True
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 3. test_migration_idempotent_on_conflict
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_migration_idempotent_on_conflict(minimal_yaml: Path) -> None:
|
||||
"""已存在的 AutoMigrated: name → 跳過,不重複建立"""
|
||||
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
|
||||
|
||||
# 模擬 DB 已有全部 3 條規則
|
||||
existing_names = {
|
||||
"AutoMigrated: test_pod_crash",
|
||||
"AutoMigrated: test_ollama_down",
|
||||
"AutoMigrated: test_no_action",
|
||||
}
|
||||
|
||||
mock_result = MagicMock()
|
||||
mock_result.fetchall.return_value = [(name,) for name in existing_names]
|
||||
|
||||
mock_db = AsyncMock()
|
||||
mock_db.execute = AsyncMock(return_value=mock_result)
|
||||
|
||||
mock_cm = MagicMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_db)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.create = AsyncMock(side_effect=AssertionError("不應呼叫 create"))
|
||||
|
||||
# patch src.db.base.get_db_context(lazy import 的攔截點)
|
||||
with (
|
||||
patch("src.db.base.get_db_context", return_value=mock_cm),
|
||||
patch("src.repositories.playbook_repository.get_playbook_repository", return_value=mock_repo),
|
||||
):
|
||||
report = await migrate_yaml_rules_to_playbooks(
|
||||
yaml_path=minimal_yaml,
|
||||
dry_run=False,
|
||||
enable_migration=True,
|
||||
)
|
||||
|
||||
assert report.created == 0
|
||||
assert report.skipped == 3
|
||||
assert report.failed == 0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 4. test_migration_dry_run_no_db_write
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_migration_dry_run_no_db_write(minimal_yaml: Path) -> None:
|
||||
"""dry_run=True 時不查 DB、不寫 DB"""
|
||||
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
|
||||
|
||||
with (
|
||||
patch("src.db.base.get_db_context") as mock_db_ctx,
|
||||
patch("src.repositories.playbook_repository.get_playbook_repository") as mock_repo,
|
||||
):
|
||||
report = await migrate_yaml_rules_to_playbooks(
|
||||
yaml_path=minimal_yaml,
|
||||
dry_run=True,
|
||||
enable_migration=True,
|
||||
)
|
||||
|
||||
# 不呼叫 DB
|
||||
mock_db_ctx.assert_not_called()
|
||||
mock_repo.assert_not_called()
|
||||
|
||||
# dry-run 時 created = 規則總數(全部「待建立」)
|
||||
assert report.dry_run is True
|
||||
assert report.total_rules == 3
|
||||
assert report.created == 3
|
||||
assert report.failed == 0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 5. test_kubectl_command_validation_via_regex
|
||||
# =============================================================================
|
||||
|
||||
def test_infer_action_type_kubectl() -> None:
|
||||
"""kubectl 指令 → kubectl"""
|
||||
from src.services.rule_to_playbook_migrator import _infer_action_type
|
||||
|
||||
assert _infer_action_type("kubectl delete pod foo -n default") == "kubectl"
|
||||
assert _infer_action_type("kubectl rollout restart deployment/api -n awoooi") == "kubectl"
|
||||
|
||||
|
||||
def test_infer_action_type_ssh() -> None:
|
||||
"""ssh 指令 → ssh_command"""
|
||||
from src.services.rule_to_playbook_migrator import _infer_action_type
|
||||
|
||||
assert _infer_action_type("ssh {host} 'docker restart minio'") == "ssh_command"
|
||||
assert _infer_action_type("ssh root@192.168.0.111 'systemctl restart ollama'") == "ssh_command"
|
||||
|
||||
|
||||
def test_infer_action_type_empty() -> None:
|
||||
"""空指令 → manual"""
|
||||
from src.services.rule_to_playbook_migrator import _infer_action_type
|
||||
|
||||
assert _infer_action_type("") == "manual"
|
||||
assert _infer_action_type(" ") == "manual"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 6. test_severity_to_risk_level_mapping
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.parametrize("risk_str,expected", SEVERITY_TEST_CASES)
|
||||
def test_severity_to_risk_level_mapping(risk_str: str | None, expected: str) -> None:
|
||||
"""YAML risk 欄位 → RiskLevel 字串映射"""
|
||||
from src.services.rule_to_playbook_migrator import _infer_risk_level
|
||||
|
||||
result = _infer_risk_level(risk_str)
|
||||
assert result == expected
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 7. test_feature_flag_disabled_skips_db_insert
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_feature_flag_disabled_skips_db_insert(minimal_yaml: Path) -> None:
|
||||
"""enable_migration=False 時直接 return 空報告,不查 DB"""
|
||||
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
|
||||
|
||||
with (
|
||||
patch("src.db.base.get_db_context") as mock_db_ctx,
|
||||
patch("src.repositories.playbook_repository.get_playbook_repository") as mock_repo,
|
||||
):
|
||||
report = await migrate_yaml_rules_to_playbooks(
|
||||
yaml_path=minimal_yaml,
|
||||
dry_run=False,
|
||||
enable_migration=False,
|
||||
)
|
||||
|
||||
mock_db_ctx.assert_not_called()
|
||||
mock_repo.assert_not_called()
|
||||
|
||||
assert report.total_rules == 0
|
||||
assert report.created == 0
|
||||
assert report.skipped == 0
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 8. 整合路徑:1 條已存在 + 2 條新建
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_migration_partial_existing(minimal_yaml: Path) -> None:
|
||||
"""1 條已存在 → skipped=1,2 條新建 → created=2"""
|
||||
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
|
||||
|
||||
# 只有第一條已存在
|
||||
existing_names = {"AutoMigrated: test_pod_crash"}
|
||||
|
||||
mock_result = MagicMock()
|
||||
mock_result.fetchall.return_value = [(name,) for name in existing_names]
|
||||
|
||||
mock_db = AsyncMock()
|
||||
mock_db.execute = AsyncMock(return_value=mock_result)
|
||||
|
||||
mock_cm = MagicMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_db)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=False)
|
||||
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.create = AsyncMock(side_effect=lambda pb: pb)
|
||||
|
||||
with (
|
||||
patch("src.db.base.get_db_context", return_value=mock_cm),
|
||||
patch("src.repositories.playbook_repository.PlaybookRepository", return_value=mock_repo),
|
||||
patch("src.repositories.playbook_repository.get_playbook_repository", return_value=mock_repo),
|
||||
):
|
||||
report = await migrate_yaml_rules_to_playbooks(
|
||||
yaml_path=minimal_yaml,
|
||||
dry_run=False,
|
||||
enable_migration=True,
|
||||
)
|
||||
|
||||
assert report.skipped == 1
|
||||
assert report.created == 2
|
||||
assert report.failed == 0
|
||||
assert mock_repo.create.call_count == 2
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 9. yaml 不存在 → 回傳有錯誤的報告
|
||||
# =============================================================================
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_migration_yaml_not_found(tmp_path: Path) -> None:
|
||||
"""yaml 不存在 → 回傳有錯誤的報告,不 raise"""
|
||||
from src.services.rule_to_playbook_migrator import migrate_yaml_rules_to_playbooks
|
||||
|
||||
nonexistent = tmp_path / "nonexistent.yaml"
|
||||
|
||||
report = await migrate_yaml_rules_to_playbooks(
|
||||
yaml_path=nonexistent,
|
||||
dry_run=False,
|
||||
enable_migration=True,
|
||||
)
|
||||
|
||||
assert report.failed == 0
|
||||
assert len(report.errors) == 1
|
||||
assert "不存在" in report.errors[0]
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# 10. symptom_pattern keywords 過濾萬用符
|
||||
# =============================================================================
|
||||
|
||||
def test_build_symptom_pattern_filters_wildcard() -> None:
|
||||
"""generic_fallback 的 alertname=['*'] 不應進入 keywords"""
|
||||
from src.services.rule_to_playbook_migrator import _build_symptom_pattern
|
||||
|
||||
generic_rule = {
|
||||
"id": "generic_fallback",
|
||||
"match": {
|
||||
"alertname": ["*"],
|
||||
"message": ["fallback"],
|
||||
},
|
||||
"response": {"risk": "medium"},
|
||||
}
|
||||
sp = _build_symptom_pattern(generic_rule)
|
||||
|
||||
# alertname=['*'] 進入 alert_names 沒問題(就是萬用符)
|
||||
# keywords 不應含 '*'
|
||||
assert "*" not in sp["keywords"]
|
||||
Reference in New Issue
Block a user