feat(aiops): Hermes rule quality advisor — E3 AI 規則品質建議 (保守版)
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 8m22s
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 8m22s
實證 rule_stats 跑完後發現 2 條 100% noise_rate 規則:
- PostgreSQLDiskGrowthRate (tp=0 fp=2)
- NoAlertsReceived2Hours (tp=0 fp=1)
加上 MoWoooWorkDown (33%), KubePodCrashLooping (25%)
新增 hermes_rule_quality_job.py (~210 行):
每日 04:00 Taipei 分析 alert_rule_catalog:
- threshold: noise_rate >= 0.7 AND 樣本 >= 5
- 為每條寫 aol('rule_rejected', proposed_action='review_or_deprecate')
- 推 Telegram 摘要給 SRE group
統帥鐵律對齊:
✅ 不自動改 review_status (人工決策 deprecate,AI 只推建議)
✅ threshold 作為「觸發討論」而非「最終決策」
✅ aol(rule_rejected) 留 trail,未來可升級 LLM 辯證
解鎖 E3 Hermes 基礎: 後續可加 LLM 分析假報真因 (expr 缺 for: window、
label match 太寬泛、metric 本身 noisy 等),產出具體改進建議.
Wire main.py lifespan asyncio.create_task()
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
254
apps/api/src/jobs/hermes_rule_quality_job.py
Normal file
254
apps/api/src/jobs/hermes_rule_quality_job.py
Normal file
@@ -0,0 +1,254 @@
|
||||
"""
|
||||
Hermes Rule Quality Advisor — ADR-090 § E3 AI 規則品質建議
|
||||
==========================================================
|
||||
每日 04:00 Taipei 分析 alert_rule_catalog,對 noise_rate > 0.7 的 rule 推 Telegram
|
||||
建議 + 寫 aol(rule_rejected) 稽核,人工決策是否 deprecate.
|
||||
|
||||
職責邊界:
|
||||
✅ 讀 alert_rule_catalog WHERE noise_rate >= 0.7
|
||||
✅ 為每條寫 aol(rule_rejected) + proposed_action='review_or_deprecate'
|
||||
✅ 推 Telegram 通知 SRE group (格式化清單)
|
||||
⏳ 不自動改 review_status (統帥鐵律: AI 不做最終決策)
|
||||
⏳ TODO: LLM 分析每條 rule 的假報真因 (下一階段)
|
||||
|
||||
統帥鐵律對齊:
|
||||
- 禁止寫死規則做最終決策 → 本 agent 只推建議,人工決策
|
||||
- 朝 AI 自主化方向 → aol 留 trail,未來可升級為 LLM 判斷
|
||||
- noise_rate threshold 0.7 是「觸發討論」而非「自動動作」
|
||||
|
||||
排程:
|
||||
- 首次延遲 420s
|
||||
- 每日 04:00 Taipei
|
||||
|
||||
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
|
||||
ADR-090 § E3 Hermes
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json as _json
|
||||
import time as _time
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
_FIRST_DELAY_SEC = 420
|
||||
_LOOP_BACKOFF_SEC = 1800
|
||||
_DAILY_TRIGGER_HOUR_TAIPEI = 4
|
||||
|
||||
# 觸發討論的噪音閾值
|
||||
_NOISE_THRESHOLD = 0.7
|
||||
# 樣本不足不發建議 (避免只 fire 1 次就標為噪音)
|
||||
_MIN_SAMPLE_SIZE = 5
|
||||
|
||||
|
||||
async def run_hermes_rule_quality_loop() -> None:
|
||||
"""每日 04:00 分析 rule 品質."""
|
||||
logger.info("hermes_rule_quality_loop_started")
|
||||
await asyncio.sleep(_FIRST_DELAY_SEC)
|
||||
|
||||
while True:
|
||||
try:
|
||||
await analyze_once()
|
||||
except Exception as e:
|
||||
logger.exception("hermes_rule_quality_loop_error", error=str(e))
|
||||
await asyncio.sleep(_LOOP_BACKOFF_SEC)
|
||||
continue
|
||||
|
||||
sleep_sec = _seconds_until_next_trigger()
|
||||
logger.info("hermes_rule_quality_next_tick", sleep_sec=sleep_sec)
|
||||
await asyncio.sleep(sleep_sec)
|
||||
|
||||
|
||||
async def analyze_once() -> dict[str, int]:
|
||||
"""一次分析: 找噪音 rule + 推建議."""
|
||||
started_ms = _time.time()
|
||||
stats = {"noisy_rules": 0, "advisories_written": 0, "telegram_sent": 0}
|
||||
error_msg: str | None = None
|
||||
|
||||
try:
|
||||
noisy = await _fetch_noisy_rules()
|
||||
stats["noisy_rules"] = len(noisy)
|
||||
|
||||
for r in noisy:
|
||||
ok = await _write_advisory_aol(r)
|
||||
if ok:
|
||||
stats["advisories_written"] += 1
|
||||
|
||||
if noisy:
|
||||
sent = await _send_telegram_summary(noisy)
|
||||
stats["telegram_sent"] = 1 if sent else 0
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"{type(e).__name__}: {e}"[:1000]
|
||||
logger.exception("hermes_analyze_once_failed", error=error_msg)
|
||||
|
||||
duration_ms = int((_time.time() - started_ms) * 1000)
|
||||
logger.info(
|
||||
"hermes_rule_quality_once_done",
|
||||
noisy=stats["noisy_rules"],
|
||||
advisories=stats["advisories_written"],
|
||||
telegram_sent=stats["telegram_sent"],
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
return stats
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 資料查詢
|
||||
# ============================================================================
|
||||
|
||||
async def _fetch_noisy_rules() -> list[dict[str, Any]]:
|
||||
"""撈 noise_rate >= 0.7 且樣本 >= 5 的 rules."""
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
_sql(f"""
|
||||
SELECT
|
||||
rule_id, rule_name, severity,
|
||||
true_positive_count, false_positive_count, noise_rate,
|
||||
last_fired_at, review_status
|
||||
FROM alert_rule_catalog
|
||||
WHERE noise_rate >= :thr
|
||||
AND (true_positive_count + false_positive_count) >= :min_sample
|
||||
AND (review_status IS NULL OR review_status = 'approved')
|
||||
ORDER BY noise_rate DESC, (true_positive_count + false_positive_count) DESC
|
||||
"""),
|
||||
{"thr": _NOISE_THRESHOLD, "min_sample": _MIN_SAMPLE_SIZE},
|
||||
)
|
||||
return [
|
||||
{
|
||||
"rule_id": r.rule_id,
|
||||
"rule_name": r.rule_name,
|
||||
"severity": r.severity,
|
||||
"tp": int(r.true_positive_count or 0),
|
||||
"fp": int(r.false_positive_count or 0),
|
||||
"noise_rate": float(r.noise_rate) if r.noise_rate else 0.0,
|
||||
"last_fired_at": r.last_fired_at,
|
||||
"review_status": r.review_status,
|
||||
}
|
||||
for r in result.fetchall()
|
||||
]
|
||||
except Exception as e:
|
||||
logger.warning("fetch_noisy_rules_failed", error=str(e))
|
||||
return []
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 建議寫入 (aol only,不改 rule 本身)
|
||||
# ============================================================================
|
||||
|
||||
async def _write_advisory_aol(rule: dict[str, Any]) -> bool:
|
||||
"""寫 aol(rule_rejected) — 紀錄 AI 建議人工審查."""
|
||||
try:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
input_payload = {
|
||||
"rule_name": rule["rule_name"],
|
||||
"severity": rule["severity"],
|
||||
"noise_rate": rule["noise_rate"],
|
||||
"true_positive_count": rule["tp"],
|
||||
"false_positive_count": rule["fp"],
|
||||
}
|
||||
output_payload = {
|
||||
"proposed_action": "review_or_deprecate",
|
||||
"reason": (
|
||||
f"過去 30d noise_rate {rule['noise_rate']:.1%} "
|
||||
f"(tp={rule['tp']}, fp={rule['fp']}),"
|
||||
f"假報過多應考慮 deprecate 或改進 expr"
|
||||
),
|
||||
"requires_human_decision": True,
|
||||
}
|
||||
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO automation_operation_log (
|
||||
operation_type, actor, status,
|
||||
input, output, tags
|
||||
) VALUES (
|
||||
'rule_rejected',
|
||||
'hermes_rule_quality',
|
||||
'success',
|
||||
CAST(:input AS jsonb),
|
||||
CAST(:output AS jsonb),
|
||||
:tags
|
||||
)
|
||||
"""),
|
||||
{
|
||||
"input": _json.dumps(input_payload, ensure_ascii=False),
|
||||
"output": _json.dumps(output_payload, ensure_ascii=False),
|
||||
"tags": ["hermes", "rule_quality", "advisory"],
|
||||
},
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning("write_advisory_aol_failed", rule=rule["rule_name"], error=str(e))
|
||||
return False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Telegram 推送
|
||||
# ============================================================================
|
||||
|
||||
async def _send_telegram_summary(noisy: list[dict[str, Any]]) -> bool:
|
||||
"""推 Telegram 摘要訊息給 SRE group."""
|
||||
try:
|
||||
from src.core.config import settings
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
|
||||
if not settings.OPENCLAW_TG_CHAT_ID:
|
||||
logger.info("hermes_telegram_skip_no_chat_id")
|
||||
return False
|
||||
|
||||
lines = [
|
||||
f"🔍 <b>Hermes 規則品質檢測</b>",
|
||||
f"檢測到 {len(noisy)} 條規則噪音率 ≥ {_NOISE_THRESHOLD:.0%},建議人工審查:",
|
||||
"",
|
||||
]
|
||||
for r in noisy[:10]: # 最多秀 10 條避免太長
|
||||
import html
|
||||
safe_name = html.escape(r["rule_name"])
|
||||
lines.append(
|
||||
f"🟡 <code>{safe_name}</code>\n"
|
||||
f" 噪音率 <b>{r['noise_rate']:.1%}</b> (tp={r['tp']} fp={r['fp']} sev={r['severity'] or '-'})"
|
||||
)
|
||||
if len(noisy) > 10:
|
||||
lines.append(f"\n…還有 {len(noisy) - 10} 條")
|
||||
lines.append("\n人工決策: 確認 deprecate 或改 expr → 手動 UPDATE review_status")
|
||||
|
||||
msg = "\n".join(lines)
|
||||
|
||||
tg = get_telegram_gateway()
|
||||
# 直接用 telegram_gateway._send_request 送一般訊息
|
||||
await tg._send_request("sendMessage", { # type: ignore[attr-defined]
|
||||
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
|
||||
"text": msg,
|
||||
"parse_mode": "HTML",
|
||||
"disable_web_page_preview": True,
|
||||
})
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning("hermes_telegram_send_failed", error=str(e))
|
||||
return False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 時間
|
||||
# ============================================================================
|
||||
|
||||
def _seconds_until_next_trigger() -> float:
|
||||
tz_taipei = timezone(timedelta(hours=8))
|
||||
now = datetime.now(tz_taipei)
|
||||
today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0)
|
||||
if now >= today_trigger:
|
||||
today_trigger = today_trigger + timedelta(days=1)
|
||||
delta = (today_trigger - now).total_seconds()
|
||||
return max(300.0, min(delta, 25 * 3600))
|
||||
@@ -440,6 +440,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
except Exception as e:
|
||||
logger.warning("asset_change_tracker_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-090 § Hermes Rule Quality Advisor (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
|
||||
# 每日 04:00 Taipei 分析 alert_rule_catalog.noise_rate,對高噪音規則推 Telegram 建議
|
||||
# 統帥鐵律: AI 只推建議不自動改 review_status,人工決策 deprecate
|
||||
try:
|
||||
from src.jobs.hermes_rule_quality_job import run_hermes_rule_quality_loop
|
||||
asyncio.create_task(run_hermes_rule_quality_loop())
|
||||
logger.info("hermes_rule_quality_loop_scheduled", daily_trigger_hour_taipei=4)
|
||||
except Exception as e:
|
||||
logger.warning("hermes_rule_quality_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
|
||||
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user