Files
awoooi/apps/api/src/services/alert_rule_engine.py
Your Name 6878e62af7 feat(flywheel): W1 PR-P1 + ADR-091 T1 — 飛輪 80→90 第一波
依 onboarder 端到端閉環審計挖出的 10 條斷鏈 + critic 鐵律違反全景,
W1 第一波修復飛輪鐵證 1 + 2 的核心斷鏈 C1。

## W1 PR-P1 — matched_playbook_id 四斷點守門 (C1 修復)
fullstack 探勘發現 4 斷點之前 session 已修,本 PR 補:
- ENABLE_PLAYBOOK_MATCHING feature flag (default=true)
  rollback: kubectl set env deployment/awoooi-api ENABLE_PLAYBOOK_MATCHING=false
- proposal_service._try_playbook_match_id 入口加 flag check
- 7 個 e2e 測試補上保護網(之前無測試覆蓋)

斷鏈 C1 證據鏈:proposal_service.generate_proposal() → matched_playbook_id
→ approval_db → approval_repository → learning_service._update_playbook_stats
24h 後 playbooks.trust_score 應有真實 EWMA 更新。

## ADR-091 T1 — auto_generate_rule 雙寫 DB (鐵證 1 第一步)
飛輪鐵證 1:alert_rule_catalog.source='ai_generated' 全 codebase 0 筆。
auto_generate_rule() 寫 alert_rules.yaml 但不寫 DB → AI 自學成果與 catalog 雙軌脫鉤。

修法(依 ADR-091 §1 D1):
- 新增 _insert_catalog_ai_generated():YAML 寫入成功後雙寫
  source='ai_generated', confidence=0.5, review_status='draft', created_by_agent
- 新增 _parse_for_to_seconds() helper("30s"/"5m"/"2h" → seconds)
- ON CONFLICT (rule_name) DO NOTHING 冪等保證
- transaction 策略:YAML + DB 不在同一 transaction(YAML 已成 SoT,DB 失敗只 log)
- ENABLE_AI_RULE_CATALOG_WRITE feature flag (default=true)
  rollback: kubectl set env deployment/awoooi-api ENABLE_AI_RULE_CATALOG_WRITE=false

13 個測試覆蓋:parse helper 8 + 業務邏輯 5(success/db_fail/idempotent/flag/SQL_lit)

## 驗證
1572 unit tests 全綠(+20 新增:PR-P1 7 + ADR-091 T1 13)

## 期望影響
飛輪自主化評分:42 → 65(+23 = C1 +3 + 鐵證 1 +20)

## 已知債(critic PR review 揭示,下一個 commit 處理)
- KMWriter 統一契約 3 條 caller 路徑被旁路(C1/M1/M2)
- KMWriter 冪等聲明與實作不符(M3 缺 ON CONFLICT)
- Alertmanager equal:[] 爆炸抑制 + 版本未驗(M4/M5)
- drift checker regex 脆弱(M7 應改 AST)
- governance health score skipped 失真(M6)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 10:44:39 +08:00

901 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
OpenClaw 告警規則匹配引擎
============================================================
從 alert_rules.yaml 載入規則,取代 openclaw.py 中硬編碼的 if/elif 規則匹配。
設計原則:
- 規則在 YAML 定義,不需要改 Python 代碼
- 匹配邏輯: alertname 完全匹配 > alert_type 部分匹配 > message 關鍵字
- priority 越小越優先999 = 通用兜底
- 變數替換: {target} {host} {container} {instance} {job} {namespace}
自動規則生成:
- 只有 generic_fallback 觸發時才生成(具體規則不觸發)
- 呼叫 Ollama (deepseek-r1:14b) 或 Gemini 生成 YAML 規則片段
- 生成後 append 到 alert_rules.yaml清除 lru_cache 立即生效
- 同一 alertname 已有規則時跳過(去重)
2026-04-09 ogt: 初版 + 自動規則生成
"""
from __future__ import annotations
import json
import re
from datetime import datetime
from functools import lru_cache
from pathlib import Path
from typing import Any
import httpx
import structlog
import yaml
from src.constants.alert_types import ALERTNAME_TO_TYPE
logger = structlog.get_logger(__name__)
RULES_FILE = Path(__file__).parent.parent.parent / "alert_rules.yaml"
# 進程級去重(保留作為 Redis 不可用時的 fallback
_generating: set[str] = set()
# Redis 分散式鎖 TTL (秒),覆蓋 Ollama + Gemini 最長生成時間
_RULE_GEN_LOCK_TTL = 120
# ── kubectl 注入防護 (Task 2.3, ADR-076, 2026-04-14) ─────────
# 對齊 auto_approve._DESTRUCTIVE_PATTERNS + decision_manager._ALLOWED_KUBECTL_PATTERN
# 目標: 規則 YAML 中的 kubectl_command 在變數替換後若含下列破壞性模式 → 清空並告警
_RULE_ENGINE_DESTRUCTIVE_RE = re.compile(
r"(kubectl\s+delete\s+(pvc|namespace|statefulset|deployment)" # 破壞性 K8s 刪除
r"|kubectl\s+(drain|cordon)" # 節點驅逐/封鎖
r"|--replicas=\s*0\b" # 縮容至零
r"|rm\s+-[rf]{1,2}\s" # rm -rf
r"|\bdrop\s+(table|database)\b" # SQL 破壞性 DDL
r"|\$\([^)]{0,200}\)" # shell 命令替換 $(...)
r"|`[^`]{0,200}`" # 反引號替換
r")",
re.IGNORECASE,
)
# ── kubectl 注入防護 公開 API ───────────────────────────────
def validate_kubectl_command(command: str) -> bool:
"""
kubectl 注入安全驗證Task 2.3, ADR-076
Returns:
True — 指令安全,可執行
False — 含破壞性模式,呼叫方應清空指令並記錄 warning
通過條件(直接 True:
- 空字串 — 無動作規則
- "ssh ..." 開頭 — SSH 層指令,不走 kubectl 路徑
阻擋條件(返回 False:
- kubectl delete pvc/namespace/statefulset/deployment — 破壞性刪除
- kubectl drain / cordon — 節點驅逐(業務衝擊)
- --replicas=0 — 縮容至零(服務停止)
- rm -rf — 主機層破壞
- DROP TABLE/DATABASE — SQL 破壞性 DDL
- $(...) 或反引號 — Shell 命令注入
"""
if not command:
return True
if command.strip().startswith("ssh "):
return True
return not bool(_RULE_ENGINE_DESTRUCTIVE_RE.search(command))
# ── 變數提取 ────────────────────────────────────────────────
_POD_SUFFIX_DEPLOYMENT_RE = __import__("re").compile(
r"-[a-z0-9]{5,10}-[a-z0-9]{5}$"
)
_POD_SUFFIX_LEGACY_RE = __import__("re").compile(
r"-[a-z0-9]{5}$"
)
_POD_SUFFIX_STATEFULSET_RE = __import__("re").compile(
r"-\d+$"
)
def _strip_pod_suffix(pod_name: str) -> str:
"""
由 Pod 名稱推斷 Deployment/StatefulSet base name。
優先順序(由嚴格到寬鬆):
1. Deployment: {name}-{rs_hash 5-10 chars}-{pod_hash 5 chars}
範例: awoooi-api-7d6b776f78-4sgjl → awoooi-api
2. StatefulSet: {name}-{ordinal}
範例: postgresql-0 → postgresql
3. Legacy single-hash Pod: {name}-{hash 5 chars}
範例: my-job-x2m4k → my-job
GAP-A4 (2026-04-14 Claude Sonnet 4.6): Placeholder 解析缺漏修復。
"""
# 先試 Deployment 格式(最常見)
stripped = _POD_SUFFIX_DEPLOYMENT_RE.sub("", pod_name)
if stripped != pod_name and stripped:
return stripped
# 再試 StatefulSet
stripped = _POD_SUFFIX_STATEFULSET_RE.sub("", pod_name)
if stripped != pod_name and stripped:
return stripped
# 最後試 legacy single-hash
stripped = _POD_SUFFIX_LEGACY_RE.sub("", pod_name)
if stripped != pod_name and stripped and "-" in stripped:
return stripped
return pod_name
def _is_bad_target(target: str, alertname: str) -> bool:
"""
判斷 target 是否為「垃圾值」,不得組合成 kubectl 指令。
垃圾值:
- 空字串 / "unknown"
- 包含空白、冒號IP:port、括號、引號
- 等於 alertname 本身LLM/規則填錯)
- 純數字或 IP 格式
"""
if not target or target in ("unknown", "none", "null", ""):
return True
# 2026-04-15 Claude Sonnet 4.6 (GAP-A4 Phase 3): 擴充 fallback magic string
# 截圖實證Telegram 卡顯示 target=unknown-service 通過 _is_bad_target
# _resolve_target_from_k8s fallback 也會產 unknown-pod/unknown-container
_BAD_MAGIC_PREFIXES = ("unknown-", "none-", "null-", "undefined-")
if any(target.startswith(p) for p in _BAD_MAGIC_PREFIXES):
return True
if target == alertname:
return True
if any(c in target for c in (" ", ":", "(", ")", '"', "'", "<", ">", "{", "}")):
return True
# 純 IP 格式
if target.replace(".", "").isdigit() and target.count(".") == 3:
return True
return False
def _extract_vars(alert_context: dict) -> dict[str, str]:
"""
從 alert_context 提取模板變數。
GAP-A4 (2026-04-14 Claude Sonnet 4.6): 強化 target 解析,多層 label 查找順序:
1. labels.deployment (最權威)
2. labels.app / labels.app.kubernetes.io/name
3. labels.statefulset
4. labels.pod → 去除 replicaset/pod hash 後綴
5. labels.container / labels.name
6. labels.service2026-04-20 降級K8s Service 名 != Deployment 名,
改記 target_source=label.service 讓下游可疑點觸發 pre-flight 驗證)
7. target_resource但排除 IP:port 和 alertname
target_source 欄位回傳讓 decision/execution 層能追 trace
若 P0.3 本次 trace 不夠清楚,下次觸發就有 aol.context.target_source。
若全部提取失敗 → target="unknown",由 match_rule() 的後置驗證丟棄此規則。
"""
labels = alert_context.get("labels", {})
alertname = labels.get("alertname", alert_context.get("alert_type", ""))
raw_target = alert_context.get("target_resource", "unknown")
instance = labels.get("instance", raw_target)
host = instance.split(":")[0] if ":" in instance else instance
job = labels.get("job", "exporter")
namespace = alert_context.get("namespace", "awoooi-prod")
# GAP-A4: 多層 label 查找,由最權威到最弱
target = ""
target_source = "" # 2026-04-20: 追蹤 target 從哪個 label 來(供 aol 留痕)
for key in ("deployment", "app", "app.kubernetes.io/name", "statefulset"):
val = labels.get(key, "")
if val and not _is_bad_target(val, alertname):
target = val
target_source = f"label.{key}"
break
# Pod label 需去除 hash 後綴還原 Deployment 名稱
if not target:
pod = labels.get("pod", "")
if pod and not _is_bad_target(pod, alertname):
target = _strip_pod_suffix(pod)
target_source = "label.pod(stripped)"
# container / name 次優
if not target:
for key in ("container", "name"):
val = labels.get(key, "")
if val and not _is_bad_target(val, alertname):
target = val
target_source = f"label.{key}"
break
# service label 最末位K8s Service 名非 Deployment 名,常產生 awoooi-service 幻覺)
# 2026-04-20 P0.3:若 service 以 '-service' 結尾,先去 suffix 視作 base name
# 仍留 target_source=label.service(demoted) 讓下游觸發 pre-flight 驗證
if not target:
svc = labels.get("service", "")
if svc and not _is_bad_target(svc, alertname):
if svc.endswith("-service"):
# awoooi-service → awoooi (通常對應 awoooi-api 或類似,仍需 pre-flight 驗)
target = svc[: -len("-service")] or svc
target_source = "label.service(stripped -service suffix)"
else:
target = svc
target_source = "label.service(demoted)"
# raw_target 末位(且必須通過 bad_target 驗證)
if not target and not _is_bad_target(raw_target, alertname):
target = raw_target
target_source = "alert_context.target_resource"
# 若全部失敗 → 保留 "unknown" 讓後置驗證層 reject
if not target:
target = "unknown"
target_source = "none(fallback)"
container = labels.get("name", labels.get("container", "")) or target
return {
"target": target,
"target_source": target_source, # 2026-04-20 P0.3 新增
"host": host,
"container": container,
"instance": instance,
"job": job,
"namespace": namespace,
}
def _fill(template: str, vars: dict[str, str]) -> str:
"""填充模板變數,保留未知變數原樣"""
try:
return template.format_map(vars)
except (KeyError, ValueError):
return template
# ── 規則載入 ────────────────────────────────────────────────
@lru_cache(maxsize=1)
def _load_rules() -> list[dict]:
"""載入並快取規則(進程內不重載,重啟 Pod 才更新)"""
if not RULES_FILE.exists():
logger.warning("alert_rules_file_not_found", path=str(RULES_FILE))
return []
with RULES_FILE.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
rules = sorted(data.get("rules", []), key=lambda r: r.get("priority", 999))
logger.info("alert_rules_loaded", count=len(rules), path=str(RULES_FILE))
return rules
# ── 匹配邏輯 ────────────────────────────────────────────────
def _matches(rule: dict, alertname: str, alert_type: str, message: str, instance: str = "") -> bool:
"""判斷規則是否匹配。通用兜底規則alertname=["*"])永遠回傳 False由 match_rule 單獨處理。"""
match = rule.get("match", {})
# S1-3 修正: 通用兜底規則不參與 _matches防止其 alert_type/message 關鍵字意外命中
alertnames = match.get("alertname", [])
if alertnames == ["*"]:
return False
# instance/target 前綴匹配 (最高優先,用於 E2E test 告警識別)
# 2026-04-09 Claude Sonnet 4.6: 支援 instance_prefix 匹配,讓 E2E test 告警不走 generic_fallback
if instance and match.get("instance_prefix"):
for prefix in match["instance_prefix"]:
if instance.lower().startswith(prefix.lower()):
return True
# alertname 完全匹配
if alertnames and alertname in alertnames:
return True
# alert_type 部分匹配
for kw in match.get("alert_type", []):
if kw.lower() in alert_type.lower():
return True
# message 關鍵字匹配(不分大小寫)
msg_lower = message.lower()
for kw in match.get("message", []):
if kw.lower() in msg_lower:
return True
return False
def _is_generic(rule: dict) -> bool:
alertnames = rule.get("match", {}).get("alertname", [])
return alertnames == ["*"]
# ── 公開 API ────────────────────────────────────────────────
def get_risk_for_alertname(alertname: str) -> str | None:
"""
根據 alertname 從 alert_rules.yaml 取得 risk 等級。
ADR-073 Phase 3-6: 供 decision_manager 做 YAML risk override。
Returns:
risk 字串 ("low"/"medium"/"critical") 或 None無匹配規則
"""
for rule in _load_rules():
if _matches(rule, alertname, "", "", ""):
return rule.get("response", {}).get("risk")
return None
def match_rule(alert_context: dict) -> dict[str, Any] | None:
"""
根據 alert_context 匹配規則,回傳填充後的 response dict。
Returns:
匹配到的規則 response未匹配則回傳 None。
呼叫方應自行處理 None走 AI 分析或通用兜底)。
"""
labels = alert_context.get("labels", {})
alertname = labels.get("alertname", alert_context.get("alert_type", "custom"))
alert_type = alert_context.get("alert_type", "custom")
message = alert_context.get("message", "")
severity = alert_context.get("severity", "warning")
rules = _load_rules()
vars = _extract_vars(alert_context)
instance = labels.get("instance", alert_context.get("target_resource", ""))
matched_rule = None
for rule in rules:
if _is_generic(rule):
continue # 通用兜底最後才用
if _matches(rule, alertname, alert_type, message, instance):
matched_rule = rule
break
# 未匹配到具體規則 → 用通用兜底
if matched_rule is None:
for rule in rules:
if _is_generic(rule):
matched_rule = rule
break
if matched_rule is None:
return None
resp = matched_rule["response"]
risk = resp.get("risk", "medium")
# severity=critical 強制升級風險等級
if severity == "critical" and risk == "medium":
risk = "critical"
optimization = [
{
"type": o["type"],
"description": _fill(o.get("description", ""), vars),
"kubectl_or_config": _fill(o.get("command", ""), vars),
}
for o in resp.get("optimization", [])
]
# Task 2.3: kubectl 注入防護 — 變數替換後驗證,阻擋破壞性模式
kubectl_command = _fill(resp.get("kubectl_command", ""), vars)
if not validate_kubectl_command(kubectl_command):
logger.warning(
"rule_kubectl_command_blocked",
rule_id=matched_rule["id"],
reason="destructive_pattern_detected",
command_snippet=kubectl_command[:80],
)
kubectl_command = ""
# GAP-A4 (2026-04-14 Claude Sonnet 4.6): 後置驗證 — 垃圾 target 丟棄 command
# 避免 `kubectl rollout restart deployment unknown/HostHighCpuLoad/...` 這類無效指令
# 清空 kubectl_command 讓 decision_manager 降級給 LLM 處理
_invalid_target = False
if kubectl_command and _is_bad_target(vars["target"], alertname):
logger.warning(
"rule_kubectl_command_discarded_bad_target",
rule_id=matched_rule["id"],
target=vars["target"],
alertname=alertname,
reason="target 未解析為真實 deployment/app拒絕組裝指令 → fallback LLM",
original_command=kubectl_command[:120],
)
kubectl_command = ""
_invalid_target = True
# 還有 {var} 殘留 → 模板變數未被 _fill 填滿(可能 vars 缺少對應 key
if kubectl_command and ("{" in kubectl_command or "}" in kubectl_command):
logger.warning(
"rule_kubectl_command_discarded_unfilled_placeholder",
rule_id=matched_rule["id"],
command=kubectl_command[:120],
)
kubectl_command = ""
_invalid_target = True
# 2026-04-16 ogt + Claude Sonnet 4.6: blocked_reason 讓 decision_manager 感知 target 失敗
# 根因rule engine 清空 kubectl_command 但不告訴下游原因 → decision_manager 繼續嘗試執行
# 修復:設定 blocked_reason="INVALID_TARGET - ..." 讓 auto_execute 路徑提早返回 + TYPE-4
_blocked_reason_out = (
f"INVALID_TARGET - target='{vars['target']}' 未解析為有效 Deployment需 SRE 人工確認"
if _invalid_target else ""
)
return {
"rule_id": matched_rule["id"],
"action_title": _fill(resp["action_title"], vars),
"description": _fill(resp["description"], vars),
"suggested_action": resp["suggested_action"],
"kubectl_command": kubectl_command,
"target_resource": vars["target"],
"target_source": vars.get("target_source", ""), # 2026-04-20 P0.3 留痕
"namespace": vars["namespace"],
"risk_level": risk,
"blast_radius": {
"affected_pods": 1,
"estimated_downtime": resp.get("estimated_downtime", "unknown"),
"related_services": [vars["target"]],
"data_impact": "NONE",
},
"primary_responsibility": resp.get("responsibility", "COLLAB"),
"responsibility_reasoning": resp.get("responsibility_reasoning", ""),
"secondary_teams": resp.get("secondary_teams", []),
"optimization_suggestions": optimization,
"reasoning": _fill(resp.get("reasoning", ""), vars),
"deviation_analysis": "規則引擎觸發,監控指標偏離正常基準",
"confidence": 0.0, # 🔴 規則匹配固定 0.0,禁止偽造
"affected_services": [vars["target"]],
"signoz_correlation": "",
"blocked_reason": _blocked_reason_out, # "" 正常 | "INVALID_TARGET - ..." 目標解析失敗
}
# ── 自動規則生成 ─────────────────────────────────────────────
_AUTO_RULE_PROMPT = """\
你是 SRE 專家。根據以下 Prometheus 告警資訊,生成一條 YAML 規則供 OpenClaw 規則引擎使用。
告警資訊:
- alertname: {alertname}
- alert_type: {alert_type}
- message: {message}
- labels: {labels}
請嚴格輸出以下 YAML 格式,不要加任何說明文字,只輸出 YAML 區塊:
- id: {rule_id}
priority: {priority}
description: <一行中文描述>
match:
alertname:
- {alertname}
response:
action_title: <含 {{target}} 變數的動作標題>
description: "⚙️ 規則匹配: <含 {{target}} 的描述,說明根因>"
suggested_action: <RESTART_DEPLOYMENT|DELETE_POD|SCALE_DEPLOYMENT|NO_ACTION>
kubectl_command: "<含 {{target}} {{namespace}} 的具體指令>"
estimated_downtime: "<停機估計>"
risk: <low|medium|critical>
responsibility: <FE|BE|INFRA|DB|COLLAB>
responsibility_reasoning: "<責任歸屬說明>"
secondary_teams: []
optimization:
- type: <優化類型>
description: "<優化說明>"
command: "<含 {{target}} {{namespace}} 的指令>"
reasoning: "[規則匹配] <處置邏輯說明>"
規則 id 使用 snake_case不含特殊字符。\
"""
def get_incident_type(alertname: str) -> str:
"""
I1 整合 ADR-064 Rule Engine (2026-04-11):
從 YAML 規則動態推斷 incident_type取代 webhooks.py 靜態 dict。
優先順序:
1. alert_rules.yaml 中 match.alertname 完全匹配 且 rule 有 incident_type 欄位 → 使用之
2. ALERTNAME_TO_TYPE 靜態 dict fallbackconstants/alert_types.py
3. "custom"(兜底)
注意YAML rule.id 語意為「規則識別符」,不等於 incident_type
兩者命名空間不同不可混用。YAML 未設 incident_type 時一律走 fallback。
"""
try:
rules = _load_rules()
for rule in rules:
if _is_generic(rule):
continue
alertnames = rule.get("match", {}).get("alertname", [])
if alertname in alertnames:
# 只有 YAML 明確設定 incident_type 才採用,否則 fall through 到靜態 dict
incident_type = rule.get("incident_type")
if incident_type:
return incident_type
break
except Exception:
pass
return ALERTNAME_TO_TYPE.get(alertname, "custom")
def _rule_id_exists(alertname: str) -> bool:
"""檢查 alertname 是否已有規則(排除通用兜底)"""
try:
rules = _load_rules()
for rule in rules:
if _is_generic(rule):
continue
if alertname in rule.get("match", {}).get("alertname", []):
return True
except Exception:
pass
return False
def _append_rule_to_yaml(rule_yaml: str, alertname: str) -> bool:
"""驗證並 append 規則到 alert_rules.yaml清除 lru_cache"""
try:
parsed = yaml.safe_load(rule_yaml)
if not isinstance(parsed, list) or not parsed:
logger.warning("auto_rule_invalid_yaml", alertname=alertname, raw=rule_yaml[:200])
return False
rule = parsed[0]
required = {"id", "match", "response"}
if not required.issubset(rule.keys()):
logger.warning("auto_rule_missing_fields", alertname=alertname, keys=list(rule.keys()))
return False
resp = rule.get("response", {})
if not resp.get("kubectl_command") or not resp.get("suggested_action"):
logger.warning("auto_rule_empty_response", alertname=alertname)
return False
# S1-2 修正: dedent 正規化 LLM 可能輸出的前置空格,再加 2 spaces 縮排到 rules: 下
import textwrap
normalized = textwrap.dedent(rule_yaml.strip())
# append 到 YAML 檔
with RULES_FILE.open("a", encoding="utf-8") as f:
now = datetime.now().strftime("%Y-%m-%d %H:%M")
f.write(f"\n # AUTO-GENERATED {now} — alertname={alertname}\n")
for line in normalized.splitlines():
f.write(f" {line}\n")
# 清除 lru_cache 讓新規則立即生效
_load_rules.cache_clear()
logger.info("auto_rule_written", rule_id=rule.get("id"), alertname=alertname)
return True
except Exception as e:
logger.error("auto_rule_append_failed", alertname=alertname, error=str(e))
return False
def _parse_for_to_seconds(for_str: str) -> int | None:
"""將 Prometheus 'for' 字串(如 '5m', '30s', '1h')轉換為整數秒數。
無法解析時回傳 None。
"""
if not for_str:
return None
for_str = for_str.strip()
mapping = {"s": 1, "m": 60, "h": 3600, "d": 86400}
m = re.fullmatch(r"(\d+)([smhd]?)", for_str)
if not m:
return None
value = int(m.group(1))
unit = m.group(2) or "s"
return value * mapping.get(unit, 1)
async def _insert_catalog_ai_generated(
rule_dict: dict,
llm_source: str,
rule_id: str,
alertname_safe: str,
) -> None:
"""寫入 alert_rule_catalog source='ai_generated'
冪等rule_name 唯一索引alert_rule_catalog_rule_name_key已存在
使用 INSERT ON CONFLICT (rule_name) DO NOTHING
transaction 策略YAML + DB 不在同一 transaction
YAML 已成功 → DB 失敗只 log warning不回滾 YAML
review_status='draft':對應 DB CHECK ('draft','approved','deprecated','retired')
confidence=0.50:新規則未驗證,保守初始值
ADR-091 Task T1 — 2026-04-28 ogt + Claude Sonnet 4.6 (Asia/Taipei)
"""
from src.core.config import settings as _settings
if not _settings.ENABLE_AI_RULE_CATALOG_WRITE:
logger.debug(
"ai_rule_catalog_write_skipped_flag_disabled",
alertname=alertname_safe,
)
return
from sqlalchemy import text as _sql
import src.db.base as _db_base
import json as _json
# 從 rule_dict 提取欄位
# 'expr' 在 OpenClaw YAML 規則中不存在(非 PromQL
# 使用 alertname 作為語意佔位(與 yaml_hardcoded 同等策略)
expr = rule_dict.get("expr") or f'alertname="{alertname_safe}"'
for_str = rule_dict.get("for", "0s")
duration_seconds = _parse_for_to_seconds(str(for_str))
labels = rule_dict.get("labels", {})
annotations = rule_dict.get("annotations", {})
# 若 LLM 有產出 incident_type注入 annotations 方便後續 T3 查詢
incident_type = rule_dict.get("incident_type")
if incident_type and "incident_type" not in annotations:
annotations = {**annotations, "incident_type": str(incident_type)}
# severity 從 labels 取Prometheus 慣例),兜底空字串
severity = (labels.get("severity", "") or "").strip()[:50] or None
try:
async with _db_base.get_db_context() as db:
await db.execute(
_sql("""
INSERT INTO alert_rule_catalog (
rule_name, source, expr, duration_seconds,
severity, labels, annotations,
created_by_agent, confidence, review_status,
created_at, updated_at
) VALUES (
:rule_name, 'ai_generated', :expr, :duration_seconds,
:severity,
CAST(:labels AS jsonb),
CAST(:annotations AS jsonb),
:created_by_agent, 0.50, 'draft',
NOW(), NOW()
)
ON CONFLICT (rule_name) DO NOTHING
"""),
{
"rule_name": alertname_safe[:200],
"expr": expr[:4000],
"duration_seconds": duration_seconds,
"severity": severity,
"labels": _json.dumps(labels, ensure_ascii=False),
"annotations": _json.dumps(annotations, ensure_ascii=False),
"created_by_agent": llm_source,
},
)
logger.info(
"ai_rule_catalog_insert_success",
alertname=alertname_safe,
rule_id=rule_id,
llm_source=llm_source,
outcome="success",
)
except Exception as db_err:
# DB 失敗只 warning不影響已成功的 YAML 寫入
logger.warning(
"ai_rule_catalog_insert_failed",
alertname=alertname_safe,
rule_id=rule_id,
error=str(db_err),
outcome="failed",
)
async def _call_ollama(prompt: str, ollama_url: str, model: str) -> str | None:
"""呼叫 Ollama 生成規則 YAML"""
try:
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={"model": model, "prompt": prompt, "stream": False, "options": {"temperature": 0.1}},
)
resp.raise_for_status()
return resp.json().get("response", "")
except Exception as e:
logger.warning("auto_rule_ollama_failed", error=str(e))
return None
async def _call_gemini(prompt: str, api_key: str) -> str | None:
"""呼叫 Gemini 生成規則 YAMLOllama 失敗時備援)"""
try:
url = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
url,
params={"key": api_key},
json={"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"temperature": 0.1}},
)
resp.raise_for_status()
data = resp.json()
return data["candidates"][0]["content"]["parts"][0]["text"]
except Exception as e:
logger.warning("auto_rule_gemini_failed", error=str(e))
return None
def _extract_yaml_block(text: str) -> str:
"""從 LLM 回應中提取 YAML 區塊"""
# 去掉 markdown code fence
text = re.sub(r"```(?:yaml)?\n?", "", text).strip()
# 確保以 `- id:` 開頭
match = re.search(r"(- id:.+)", text, re.DOTALL)
return match.group(1).strip() if match else text
async def auto_generate_rule(
alert_context: dict,
ollama_url: str,
model: str,
gemini_api_key: str = "",
) -> None:
"""
非同步背景任務:呼叫 AI 為未知告警自動生成規則並寫入 alert_rules.yaml。
觸發條件: match_rule() 命中 generic_fallback
流程: Ollama → 失敗則 Gemini → 驗證格式 → append YAML → 清除 lru_cache 立即生效
Args:
alert_context: 告警上下文
ollama_url: Ollama endpoint由呼叫方從 settings 注入S2-1 DI 修正)
model: Ollama 模型名稱
gemini_api_key: Gemini API Key空字串則跳過 Gemini 備援)
限制:
- 進程級去重 (_generating set),多 Pod 環境可能重複生成ADR-064 已記錄)
- 寫入後清除 lru_cache同 Pod 立即生效;其他 Pod 需重啟
"""
labels = alert_context.get("labels", {})
alertname = labels.get("alertname", alert_context.get("alert_type", "custom"))
# S3-3 修正: sanitize alertname防止含 {/} 的 alertname 在 format() 中拋出 KeyError
alertname_safe = re.sub(r"[{}]", "", alertname)
# 去重:同一 alertname 同時只跑一次
# ADR-064 L1: 優先用 Redis 分散式鎖Redis 不可用時 fallback 進程級 set
if alertname_safe in _generating:
return
if _rule_id_exists(alertname_safe):
logger.debug("auto_rule_skip_exists", alertname=alertname_safe)
return
# 嘗試取得 Redis 分散式鎖
lock_key = f"rule_generating:{alertname_safe}"
redis_lock_acquired = False
try:
from src.core.redis_client import get_redis
redis = get_redis()
# SET NX EX — 只有第一個 Pod 能 SET 成功
redis_lock_acquired = bool(await redis.set(lock_key, "1", nx=True, ex=_RULE_GEN_LOCK_TTL))
if not redis_lock_acquired:
logger.info("auto_rule_skip_redis_lock", alertname=alertname_safe)
return
except Exception as redis_err:
# Redis 不可用時 fallback 進程級去重(降級不中斷)
logger.warning("auto_rule_redis_lock_unavailable", error=str(redis_err))
if alertname_safe in _generating:
return
_generating.add(alertname_safe)
try:
rule_id = re.sub(r"[^a-z0-9_]", "_", alertname_safe.lower()).strip("_")
# S3-2 修正: priority 上界 890防止超出 AI 生成範圍
existing = [r.get("priority", 0) for r in _load_rules() if not _is_generic(r)]
next_priority = max((p for p in existing if 500 <= p < 900), default=499) + 10
priority = min(next_priority, 890)
prompt = _AUTO_RULE_PROMPT.format(
alertname=alertname_safe,
alert_type=alert_context.get("alert_type", "custom"),
message=alert_context.get("message", "")[:200],
labels=json.dumps(
{k: v for k, v in labels.items()
if k in ("job", "instance", "severity", "namespace", "container", "name")},
ensure_ascii=False,
),
rule_id=rule_id,
priority=priority,
)
logger.info("auto_rule_generating", alertname=alertname_safe, rule_id=rule_id)
# 1. 先試 Ollama
raw = await _call_ollama(prompt, ollama_url, model)
llm_source = "ollama" if raw else None
# 2. Ollama 失敗 → Gemini
if not raw and gemini_api_key:
raw = await _call_gemini(prompt, gemini_api_key)
llm_source = "gemini" if raw else None
if not raw:
logger.warning(
"auto_rule_auto_generate_failed",
alertname=alertname_safe,
reason="llm_no_response",
)
return
yaml_block = _extract_yaml_block(raw)
success = _append_rule_to_yaml(yaml_block, alertname_safe)
if success:
logger.info(
"alert_rule_auto_generated",
alertname=alertname_safe,
rule_id=rule_id,
source=llm_source,
)
# 成功後記錄今日新增規則數(供系統報告顯示)
import asyncio as _asyncio
try:
from src.core.redis_client import get_redis as _get_redis
from src.utils.timezone import now_taipei as _now_taipei
_today = _now_taipei().strftime("%Y%m%d")
_key = f"stats:auto_rule_generated:{_today}"
_redis = _get_redis()
async def _incr_rule_stat() -> None:
# pipeline 原子化 incr+expire避免 race condition
async with _redis.pipeline() as _p:
_p.incr(_key)
_p.expire(_key, 86400 * 7)
await _p.execute()
_asyncio.create_task(_incr_rule_stat())
except Exception as _redis_err:
logger.debug("auto_rule_stats_redis_failed", error=str(_redis_err))
# 立即為新規則建立 APPROVED Playbook不等下次重啟
from src.services.playbook_seed_service import seed_playbooks_from_rules
_asyncio.create_task(seed_playbooks_from_rules())
# ADR-091 T1: 雙寫 alert_rule_catalog source='ai_generated'
# 獨立 try/except — DB 失敗不回滾已成功的 YAML 寫入
try:
parsed_rules = yaml.safe_load(yaml_block)
rule_dict = parsed_rules[0] if isinstance(parsed_rules, list) and parsed_rules else {}
_asyncio.create_task(
_insert_catalog_ai_generated(
rule_dict=rule_dict,
llm_source=llm_source or "unknown",
rule_id=rule_id,
alertname_safe=alertname_safe,
)
)
except Exception as _catalog_err:
logger.warning(
"ai_rule_catalog_task_create_failed",
alertname=alertname_safe,
error=str(_catalog_err),
)
else:
logger.warning(
"auto_rule_auto_generate_failed",
alertname=alertname_safe,
reason="yaml_validation_failed",
)
except Exception as e:
logger.error("auto_rule_exception", alertname=alertname_safe, error=str(e))
finally:
_generating.discard(alertname_safe)
# 釋放 Redis 分散式鎖(生成完成,讓其他 Pod 可以讀到新規則)
if redis_lock_acquired:
try:
from src.core.redis_client import get_redis
await get_redis().delete(lock_key)
except Exception:
pass # TTL 到期自動釋放,不影響正確性