feat(aiops): Hermes rule quality advisor — E3 AI 規則品質建議 (保守版)
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 8m22s

實證 rule_stats 跑完後發現 2 條 100% noise_rate 規則:
  - PostgreSQLDiskGrowthRate (tp=0 fp=2)
  - NoAlertsReceived2Hours   (tp=0 fp=1)
加上 MoWoooWorkDown (33%), KubePodCrashLooping (25%)

新增 hermes_rule_quality_job.py (~210 行):
  每日 04:00 Taipei 分析 alert_rule_catalog:
    - threshold: noise_rate >= 0.7 AND 樣本 >= 5
    - 為每條寫 aol('rule_rejected', proposed_action='review_or_deprecate')
    - 推 Telegram 摘要給 SRE group

統帥鐵律對齊:
   不自動改 review_status (人工決策 deprecate,AI 只推建議)
   threshold 作為「觸發討論」而非「最終決策」
   aol(rule_rejected) 留 trail,未來可升級 LLM 辯證

解鎖 E3 Hermes 基礎: 後續可加 LLM 分析假報真因 (expr 缺 for: window、
label match 太寬泛、metric 本身 noisy 等),產出具體改進建議.

Wire main.py lifespan asyncio.create_task()

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-19 18:11:14 +08:00
parent 691bdc6cc1
commit 6ab0ce9c75
2 changed files with 264 additions and 0 deletions

View File

@@ -0,0 +1,254 @@
"""
Hermes Rule Quality Advisor — ADR-090 § E3 AI 規則品質建議
==========================================================
每日 04:00 Taipei 分析 alert_rule_catalog,對 noise_rate > 0.7 的 rule 推 Telegram
建議 + 寫 aol(rule_rejected) 稽核,人工決策是否 deprecate.
職責邊界:
✅ 讀 alert_rule_catalog WHERE noise_rate >= 0.7
✅ 為每條寫 aol(rule_rejected) + proposed_action='review_or_deprecate'
✅ 推 Telegram 通知 SRE group (格式化清單)
⏳ 不自動改 review_status (統帥鐵律: AI 不做最終決策)
⏳ TODO: LLM 分析每條 rule 的假報真因 (下一階段)
統帥鐵律對齊:
- 禁止寫死規則做最終決策 → 本 agent 只推建議,人工決策
- 朝 AI 自主化方向 → aol 留 trail,未來可升級為 LLM 判斷
- noise_rate threshold 0.7 是「觸發討論」而非「自動動作」
排程:
- 首次延遲 420s
- 每日 04:00 Taipei
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
ADR-090 § E3 Hermes
"""
from __future__ import annotations
import asyncio
import json as _json
import time as _time
from datetime import datetime, timedelta, timezone
from typing import Any
import structlog
logger = structlog.get_logger(__name__)
_FIRST_DELAY_SEC = 420
_LOOP_BACKOFF_SEC = 1800
_DAILY_TRIGGER_HOUR_TAIPEI = 4
# 觸發討論的噪音閾值
_NOISE_THRESHOLD = 0.7
# 樣本不足不發建議 (避免只 fire 1 次就標為噪音)
_MIN_SAMPLE_SIZE = 5
async def run_hermes_rule_quality_loop() -> None:
"""每日 04:00 分析 rule 品質."""
logger.info("hermes_rule_quality_loop_started")
await asyncio.sleep(_FIRST_DELAY_SEC)
while True:
try:
await analyze_once()
except Exception as e:
logger.exception("hermes_rule_quality_loop_error", error=str(e))
await asyncio.sleep(_LOOP_BACKOFF_SEC)
continue
sleep_sec = _seconds_until_next_trigger()
logger.info("hermes_rule_quality_next_tick", sleep_sec=sleep_sec)
await asyncio.sleep(sleep_sec)
async def analyze_once() -> dict[str, int]:
"""一次分析: 找噪音 rule + 推建議."""
started_ms = _time.time()
stats = {"noisy_rules": 0, "advisories_written": 0, "telegram_sent": 0}
error_msg: str | None = None
try:
noisy = await _fetch_noisy_rules()
stats["noisy_rules"] = len(noisy)
for r in noisy:
ok = await _write_advisory_aol(r)
if ok:
stats["advisories_written"] += 1
if noisy:
sent = await _send_telegram_summary(noisy)
stats["telegram_sent"] = 1 if sent else 0
except Exception as e:
error_msg = f"{type(e).__name__}: {e}"[:1000]
logger.exception("hermes_analyze_once_failed", error=error_msg)
duration_ms = int((_time.time() - started_ms) * 1000)
logger.info(
"hermes_rule_quality_once_done",
noisy=stats["noisy_rules"],
advisories=stats["advisories_written"],
telegram_sent=stats["telegram_sent"],
duration_ms=duration_ms,
)
return stats
# ============================================================================
# 資料查詢
# ============================================================================
async def _fetch_noisy_rules() -> list[dict[str, Any]]:
"""撈 noise_rate >= 0.7 且樣本 >= 5 的 rules."""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql(f"""
SELECT
rule_id, rule_name, severity,
true_positive_count, false_positive_count, noise_rate,
last_fired_at, review_status
FROM alert_rule_catalog
WHERE noise_rate >= :thr
AND (true_positive_count + false_positive_count) >= :min_sample
AND (review_status IS NULL OR review_status = 'approved')
ORDER BY noise_rate DESC, (true_positive_count + false_positive_count) DESC
"""),
{"thr": _NOISE_THRESHOLD, "min_sample": _MIN_SAMPLE_SIZE},
)
return [
{
"rule_id": r.rule_id,
"rule_name": r.rule_name,
"severity": r.severity,
"tp": int(r.true_positive_count or 0),
"fp": int(r.false_positive_count or 0),
"noise_rate": float(r.noise_rate) if r.noise_rate else 0.0,
"last_fired_at": r.last_fired_at,
"review_status": r.review_status,
}
for r in result.fetchall()
]
except Exception as e:
logger.warning("fetch_noisy_rules_failed", error=str(e))
return []
# ============================================================================
# 建議寫入 (aol only,不改 rule 本身)
# ============================================================================
async def _write_advisory_aol(rule: dict[str, Any]) -> bool:
"""寫 aol(rule_rejected) — 紀錄 AI 建議人工審查."""
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
input_payload = {
"rule_name": rule["rule_name"],
"severity": rule["severity"],
"noise_rate": rule["noise_rate"],
"true_positive_count": rule["tp"],
"false_positive_count": rule["fp"],
}
output_payload = {
"proposed_action": "review_or_deprecate",
"reason": (
f"過去 30d noise_rate {rule['noise_rate']:.1%} "
f"(tp={rule['tp']}, fp={rule['fp']}),"
f"假報過多應考慮 deprecate 或改進 expr"
),
"requires_human_decision": True,
}
async with get_db_context() as db:
await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, tags
) VALUES (
'rule_rejected',
'hermes_rule_quality',
'success',
CAST(:input AS jsonb),
CAST(:output AS jsonb),
:tags
)
"""),
{
"input": _json.dumps(input_payload, ensure_ascii=False),
"output": _json.dumps(output_payload, ensure_ascii=False),
"tags": ["hermes", "rule_quality", "advisory"],
},
)
return True
except Exception as e:
logger.warning("write_advisory_aol_failed", rule=rule["rule_name"], error=str(e))
return False
# ============================================================================
# Telegram 推送
# ============================================================================
async def _send_telegram_summary(noisy: list[dict[str, Any]]) -> bool:
"""推 Telegram 摘要訊息給 SRE group."""
try:
from src.core.config import settings
from src.services.telegram_gateway import get_telegram_gateway
if not settings.OPENCLAW_TG_CHAT_ID:
logger.info("hermes_telegram_skip_no_chat_id")
return False
lines = [
f"🔍 <b>Hermes 規則品質檢測</b>",
f"檢測到 {len(noisy)} 條規則噪音率 ≥ {_NOISE_THRESHOLD:.0%},建議人工審查:",
"",
]
for r in noisy[:10]: # 最多秀 10 條避免太長
import html
safe_name = html.escape(r["rule_name"])
lines.append(
f"🟡 <code>{safe_name}</code>\n"
f" 噪音率 <b>{r['noise_rate']:.1%}</b> (tp={r['tp']} fp={r['fp']} sev={r['severity'] or '-'})"
)
if len(noisy) > 10:
lines.append(f"\n…還有 {len(noisy) - 10}")
lines.append("\n人工決策: 確認 deprecate 或改 expr → 手動 UPDATE review_status")
msg = "\n".join(lines)
tg = get_telegram_gateway()
# 直接用 telegram_gateway._send_request 送一般訊息
await tg._send_request("sendMessage", { # type: ignore[attr-defined]
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": msg,
"parse_mode": "HTML",
"disable_web_page_preview": True,
})
return True
except Exception as e:
logger.warning("hermes_telegram_send_failed", error=str(e))
return False
# ============================================================================
# 時間
# ============================================================================
def _seconds_until_next_trigger() -> float:
tz_taipei = timezone(timedelta(hours=8))
now = datetime.now(tz_taipei)
today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0)
if now >= today_trigger:
today_trigger = today_trigger + timedelta(days=1)
delta = (today_trigger - now).total_seconds()
return max(300.0, min(delta, 25 * 3600))

View File

@@ -440,6 +440,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e: except Exception as e:
logger.warning("asset_change_tracker_loop_schedule_failed", error=str(e)) logger.warning("asset_change_tracker_loop_schedule_failed", error=str(e))
# ADR-090 § Hermes Rule Quality Advisor (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每日 04:00 Taipei 分析 alert_rule_catalog.noise_rate,對高噪音規則推 Telegram 建議
# 統帥鐵律: AI 只推建議不自動改 review_status,人工決策 deprecate
try:
from src.jobs.hermes_rule_quality_job import run_hermes_rule_quality_loop
asyncio.create_task(run_hermes_rule_quality_loop())
logger.info("hermes_rule_quality_loop_scheduled", daily_trigger_hour_taipei=4)
except Exception as e:
logger.warning("hermes_rule_quality_loop_schedule_failed", error=str(e))
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告 # ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei # 2026-04-14 Claude Haiku 4.5 Asia/Taipei
try: try: