feat(rule-engine): 自動規則生成 — generic_fallback 觸發 AI 學習
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 11m25s

流程:
1. 告警命中 generic_fallback 規則
2. 背景觸發 auto_generate_rule()
3. Ollama (deepseek-r1:14b) 生成 YAML 規則片段
4. Ollama 失敗 → Gemini 備援
5. 驗證格式 → append alert_rules.yaml → 清除 lru_cache
6. 下次同類告警直接命中專屬規則,不再走兜底

去重: 同一 alertname 進程內只生成一次
手寫規則 priority 1-499,AI 生成 500-899,兜底 999

2026-04-09 ogt: AI 自學規則引擎

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-09 09:20:33 +08:00
parent f98be41517
commit 71437db0e9
2 changed files with 226 additions and 2 deletions

View File

@@ -9,16 +9,27 @@ OpenClaw 告警規則匹配引擎
- priority 越小越優先999 = 通用兜底 - priority 越小越優先999 = 通用兜底
- 變數替換: {target} {host} {container} {instance} {job} {namespace} - 變數替換: {target} {host} {container} {instance} {job} {namespace}
2026-04-09 ogt: 初版,從 openclaw.py _generate_mock_response if/elif 抽出 自動規則生成:
- 只有 generic_fallback 觸發時才生成(具體規則不觸發)
- 呼叫 Ollama (deepseek-r1:14b) 或 Gemini 生成 YAML 規則片段
- 生成後 append 到 alert_rules.yaml清除 lru_cache 立即生效
- 同一 alertname 已有規則時跳過(去重)
2026-04-09 ogt: 初版 + 自動規則生成
""" """
from __future__ import annotations from __future__ import annotations
import asyncio
import json
import re import re
import time
from datetime import datetime
from functools import lru_cache from functools import lru_cache
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
import httpx
import structlog import structlog
import yaml import yaml
@@ -26,6 +37,9 @@ logger = structlog.get_logger(__name__)
RULES_FILE = Path(__file__).parent.parent.parent / "alert_rules.yaml" RULES_FILE = Path(__file__).parent.parent.parent / "alert_rules.yaml"
# 防止同一 alertname 重複生成(進程記憶體內去重)
_generating: set[str] = set()
# ── 變數提取 ──────────────────────────────────────────────── # ── 變數提取 ────────────────────────────────────────────────
@@ -193,3 +207,199 @@ def match_rule(alert_context: dict) -> dict[str, Any] | None:
"affected_services": [vars["target"]], "affected_services": [vars["target"]],
"signoz_correlation": "", "signoz_correlation": "",
} }
# ── 自動規則生成 ─────────────────────────────────────────────
_AUTO_RULE_PROMPT = """\
你是 SRE 專家。根據以下 Prometheus 告警資訊,生成一條 YAML 規則供 OpenClaw 規則引擎使用。
告警資訊:
- alertname: {alertname}
- alert_type: {alert_type}
- message: {message}
- labels: {labels}
請嚴格輸出以下 YAML 格式,不要加任何說明文字,只輸出 YAML 區塊:
- id: {rule_id}
priority: {priority}
description: <一行中文描述>
match:
alertname:
- {alertname}
response:
action_title: <含 {{target}} 變數的動作標題>
description: "⚙️ 規則匹配: <含 {{target}} 的描述,說明根因>"
suggested_action: <RESTART_DEPLOYMENT|DELETE_POD|SCALE_DEPLOYMENT|NO_ACTION>
kubectl_command: "<含 {{target}} {{namespace}} 的具體指令>"
estimated_downtime: "<停機估計>"
risk: <low|medium|critical>
responsibility: <FE|BE|INFRA|DB|COLLAB>
responsibility_reasoning: "<責任歸屬說明>"
secondary_teams: []
optimization:
- type: <優化類型>
description: "<優化說明>"
command: "<含 {{target}} {{namespace}} 的指令>"
reasoning: "[規則匹配] <處置邏輯說明>"
規則 id 使用 snake_case不含特殊字符。\
"""
def _rule_id_exists(alertname: str) -> bool:
"""檢查 alertname 是否已有規則(排除通用兜底)"""
try:
rules = _load_rules()
for rule in rules:
if _is_generic(rule):
continue
if alertname in rule.get("match", {}).get("alertname", []):
return True
except Exception:
pass
return False
def _append_rule_to_yaml(rule_yaml: str, alertname: str) -> bool:
"""驗證並 append 規則到 alert_rules.yaml清除 lru_cache"""
try:
parsed = yaml.safe_load(rule_yaml)
if not isinstance(parsed, list) or not parsed:
logger.warning("auto_rule_invalid_yaml", alertname=alertname, raw=rule_yaml[:200])
return False
rule = parsed[0]
required = {"id", "match", "response"}
if not required.issubset(rule.keys()):
logger.warning("auto_rule_missing_fields", alertname=alertname, keys=list(rule.keys()))
return False
resp = rule.get("response", {})
if not resp.get("kubectl_command") or not resp.get("suggested_action"):
logger.warning("auto_rule_empty_response", alertname=alertname)
return False
# append 到 YAML 檔
with RULES_FILE.open("a", encoding="utf-8") as f:
now = datetime.now().strftime("%Y-%m-%d %H:%M")
f.write(f"\n # AUTO-GENERATED {now} — alertname={alertname}\n")
# indent list item under rules:
for line in rule_yaml.strip().splitlines():
f.write(f" {line}\n")
# 清除 lru_cache 讓新規則立即生效
_load_rules.cache_clear()
logger.info("auto_rule_written", rule_id=rule.get("id"), alertname=alertname)
return True
except Exception as e:
logger.error("auto_rule_append_failed", alertname=alertname, error=str(e))
return False
async def _call_ollama(prompt: str, ollama_url: str, model: str) -> str | None:
"""呼叫 Ollama 生成規則 YAML"""
try:
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
f"{ollama_url}/api/generate",
json={"model": model, "prompt": prompt, "stream": False, "options": {"temperature": 0.1}},
)
resp.raise_for_status()
return resp.json().get("response", "")
except Exception as e:
logger.warning("auto_rule_ollama_failed", error=str(e))
return None
async def _call_gemini(prompt: str, api_key: str) -> str | None:
"""呼叫 Gemini 生成規則 YAMLOllama 失敗時備援)"""
try:
url = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash:generateContent"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
url,
params={"key": api_key},
json={"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"temperature": 0.1}},
)
resp.raise_for_status()
data = resp.json()
return data["candidates"][0]["content"]["parts"][0]["text"]
except Exception as e:
logger.warning("auto_rule_gemini_failed", error=str(e))
return None
def _extract_yaml_block(text: str) -> str:
"""從 LLM 回應中提取 YAML 區塊"""
# 去掉 markdown code fence
text = re.sub(r"```(?:yaml)?\n?", "", text).strip()
# 確保以 `- id:` 開頭
match = re.search(r"(- id:.+)", text, re.DOTALL)
return match.group(1).strip() if match else text
async def auto_generate_rule(alert_context: dict) -> None:
"""
非同步背景任務:呼叫 AI 為未知告警自動生成規則並寫入 alert_rules.yaml。
觸發條件: match_rule() 命中 generic_fallback
流程: Ollama (deepseek-r1:14b) → 失敗則 Gemini → 驗證 → append YAML → 清除 cache
"""
from src.core.config import settings
labels = alert_context.get("labels", {})
alertname = labels.get("alertname", alert_context.get("alert_type", "custom"))
# 去重:同一 alertname 同時只跑一次
if alertname in _generating:
return
if _rule_id_exists(alertname):
logger.debug("auto_rule_skip_exists", alertname=alertname)
return
_generating.add(alertname)
try:
rule_id = re.sub(r"[^a-z0-9_]", "_", alertname.lower()).strip("_")
# priority: 500~899 給 AI 生成規則,不干擾手寫規則 (1-499)
existing = [r.get("priority", 0) for r in _load_rules() if not _is_generic(r)]
priority = max((p for p in existing if 500 <= p < 900), default=499) + 10
prompt = _AUTO_RULE_PROMPT.format(
alertname=alertname,
alert_type=alert_context.get("alert_type", "custom"),
message=alert_context.get("message", "")[:200],
labels=json.dumps({k: v for k, v in labels.items() if k in
("job", "instance", "severity", "namespace", "container", "name")},
ensure_ascii=False),
rule_id=rule_id,
priority=priority,
)
logger.info("auto_rule_generating", alertname=alertname, rule_id=rule_id)
# 1. 先試 Ollama
raw = await _call_ollama(prompt, settings.OLLAMA_URL, settings.OPENCLAW_DEFAULT_MODEL)
# 2. Ollama 失敗 → Gemini
if not raw and settings.GEMINI_API_KEY:
raw = await _call_gemini(prompt, settings.GEMINI_API_KEY)
if not raw:
logger.warning("auto_rule_no_response", alertname=alertname)
return
yaml_block = _extract_yaml_block(raw)
success = _append_rule_to_yaml(yaml_block, alertname)
if success:
logger.info("auto_rule_success", alertname=alertname, rule_id=rule_id)
else:
logger.warning("auto_rule_failed_validation", alertname=alertname)
except Exception as e:
logger.error("auto_rule_exception", alertname=alertname, error=str(e))
finally:
_generating.discard(alertname)

View File

@@ -628,9 +628,10 @@ class OpenClawService:
if signoz_metrics: if signoz_metrics:
mock_response["description"] += f" {signoz_metrics.to_summary()}" mock_response["description"] += f" {signoz_metrics.to_summary()}"
rule_id = mock_response.get("rule_id", "unknown")
logger.info( logger.info(
"mock_llm_response_generated", "mock_llm_response_generated",
rule_id=mock_response.get("rule_id", "unknown"), rule_id=rule_id,
action_title=mock_response["action_title"], action_title=mock_response["action_title"],
risk_level=mock_response["risk_level"], risk_level=mock_response["risk_level"],
primary_responsibility=mock_response["primary_responsibility"], primary_responsibility=mock_response["primary_responsibility"],
@@ -639,6 +640,19 @@ class OpenClawService:
is_mock=True, is_mock=True,
) )
# 2026-04-09 ogt: 命中通用兜底時,背景自動生成專屬規則
if rule_id == "generic_fallback":
from src.services.alert_rule_engine import auto_generate_rule
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
loop.create_task(auto_generate_rule(alert_context))
else:
asyncio.run(auto_generate_rule(alert_context))
except Exception as _e:
logger.warning("auto_rule_trigger_failed", error=str(_e))
return json.dumps(mock_response) return json.dumps(mock_response)
# ========================================================================= # =========================================================================