diff --git a/apps/api/src/jobs/hermes_rule_quality_job.py b/apps/api/src/jobs/hermes_rule_quality_job.py new file mode 100644 index 00000000..cb35c2f9 --- /dev/null +++ b/apps/api/src/jobs/hermes_rule_quality_job.py @@ -0,0 +1,254 @@ +""" +Hermes Rule Quality Advisor — ADR-090 § E3 AI 規則品質建議 +========================================================== +每日 04:00 Taipei 分析 alert_rule_catalog,對 noise_rate > 0.7 的 rule 推 Telegram +建議 + 寫 aol(rule_rejected) 稽核,人工決策是否 deprecate. + +職責邊界: + ✅ 讀 alert_rule_catalog WHERE noise_rate >= 0.7 + ✅ 為每條寫 aol(rule_rejected) + proposed_action='review_or_deprecate' + ✅ 推 Telegram 通知 SRE group (格式化清單) + ⏳ 不自動改 review_status (統帥鐵律: AI 不做最終決策) + ⏳ TODO: LLM 分析每條 rule 的假報真因 (下一階段) + +統帥鐵律對齊: + - 禁止寫死規則做最終決策 → 本 agent 只推建議,人工決策 + - 朝 AI 自主化方向 → aol 留 trail,未來可升級為 LLM 判斷 + - noise_rate threshold 0.7 是「觸發討論」而非「自動動作」 + +排程: + - 首次延遲 420s + - 每日 04:00 Taipei + +2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei +ADR-090 § E3 Hermes +""" +from __future__ import annotations + +import asyncio +import json as _json +import time as _time +from datetime import datetime, timedelta, timezone +from typing import Any + +import structlog + +logger = structlog.get_logger(__name__) + +_FIRST_DELAY_SEC = 420 +_LOOP_BACKOFF_SEC = 1800 +_DAILY_TRIGGER_HOUR_TAIPEI = 4 + +# 觸發討論的噪音閾值 +_NOISE_THRESHOLD = 0.7 +# 樣本不足不發建議 (避免只 fire 1 次就標為噪音) +_MIN_SAMPLE_SIZE = 5 + + +async def run_hermes_rule_quality_loop() -> None: + """每日 04:00 分析 rule 品質.""" + logger.info("hermes_rule_quality_loop_started") + await asyncio.sleep(_FIRST_DELAY_SEC) + + while True: + try: + await analyze_once() + except Exception as e: + logger.exception("hermes_rule_quality_loop_error", error=str(e)) + await asyncio.sleep(_LOOP_BACKOFF_SEC) + continue + + sleep_sec = _seconds_until_next_trigger() + logger.info("hermes_rule_quality_next_tick", sleep_sec=sleep_sec) + await asyncio.sleep(sleep_sec) + + +async def analyze_once() -> dict[str, int]: + """一次分析: 找噪音 rule + 推建議.""" + started_ms = _time.time() + stats = {"noisy_rules": 0, "advisories_written": 0, "telegram_sent": 0} + error_msg: str | None = None + + try: + noisy = await _fetch_noisy_rules() + stats["noisy_rules"] = len(noisy) + + for r in noisy: + ok = await _write_advisory_aol(r) + if ok: + stats["advisories_written"] += 1 + + if noisy: + sent = await _send_telegram_summary(noisy) + stats["telegram_sent"] = 1 if sent else 0 + + except Exception as e: + error_msg = f"{type(e).__name__}: {e}"[:1000] + logger.exception("hermes_analyze_once_failed", error=error_msg) + + duration_ms = int((_time.time() - started_ms) * 1000) + logger.info( + "hermes_rule_quality_once_done", + noisy=stats["noisy_rules"], + advisories=stats["advisories_written"], + telegram_sent=stats["telegram_sent"], + duration_ms=duration_ms, + ) + return stats + + +# ============================================================================ +# 資料查詢 +# ============================================================================ + +async def _fetch_noisy_rules() -> list[dict[str, Any]]: + """撈 noise_rate >= 0.7 且樣本 >= 5 的 rules.""" + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + try: + async with get_db_context() as db: + result = await db.execute( + _sql(f""" + SELECT + rule_id, rule_name, severity, + true_positive_count, false_positive_count, noise_rate, + last_fired_at, review_status + FROM alert_rule_catalog + WHERE noise_rate >= :thr + AND (true_positive_count + false_positive_count) >= :min_sample + AND (review_status IS NULL OR review_status = 'approved') + ORDER BY noise_rate DESC, (true_positive_count + false_positive_count) DESC + """), + {"thr": _NOISE_THRESHOLD, "min_sample": _MIN_SAMPLE_SIZE}, + ) + return [ + { + "rule_id": r.rule_id, + "rule_name": r.rule_name, + "severity": r.severity, + "tp": int(r.true_positive_count or 0), + "fp": int(r.false_positive_count or 0), + "noise_rate": float(r.noise_rate) if r.noise_rate else 0.0, + "last_fired_at": r.last_fired_at, + "review_status": r.review_status, + } + for r in result.fetchall() + ] + except Exception as e: + logger.warning("fetch_noisy_rules_failed", error=str(e)) + return [] + + +# ============================================================================ +# 建議寫入 (aol only,不改 rule 本身) +# ============================================================================ + +async def _write_advisory_aol(rule: dict[str, Any]) -> bool: + """寫 aol(rule_rejected) — 紀錄 AI 建議人工審查.""" + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + input_payload = { + "rule_name": rule["rule_name"], + "severity": rule["severity"], + "noise_rate": rule["noise_rate"], + "true_positive_count": rule["tp"], + "false_positive_count": rule["fp"], + } + output_payload = { + "proposed_action": "review_or_deprecate", + "reason": ( + f"過去 30d noise_rate {rule['noise_rate']:.1%} " + f"(tp={rule['tp']}, fp={rule['fp']})," + f"假報過多應考慮 deprecate 或改進 expr" + ), + "requires_human_decision": True, + } + + async with get_db_context() as db: + await db.execute( + _sql(""" + INSERT INTO automation_operation_log ( + operation_type, actor, status, + input, output, tags + ) VALUES ( + 'rule_rejected', + 'hermes_rule_quality', + 'success', + CAST(:input AS jsonb), + CAST(:output AS jsonb), + :tags + ) + """), + { + "input": _json.dumps(input_payload, ensure_ascii=False), + "output": _json.dumps(output_payload, ensure_ascii=False), + "tags": ["hermes", "rule_quality", "advisory"], + }, + ) + return True + except Exception as e: + logger.warning("write_advisory_aol_failed", rule=rule["rule_name"], error=str(e)) + return False + + +# ============================================================================ +# Telegram 推送 +# ============================================================================ + +async def _send_telegram_summary(noisy: list[dict[str, Any]]) -> bool: + """推 Telegram 摘要訊息給 SRE group.""" + try: + from src.core.config import settings + from src.services.telegram_gateway import get_telegram_gateway + + if not settings.OPENCLAW_TG_CHAT_ID: + logger.info("hermes_telegram_skip_no_chat_id") + return False + + lines = [ + f"🔍 Hermes 規則品質檢測", + f"檢測到 {len(noisy)} 條規則噪音率 ≥ {_NOISE_THRESHOLD:.0%},建議人工審查:", + "", + ] + for r in noisy[:10]: # 最多秀 10 條避免太長 + import html + safe_name = html.escape(r["rule_name"]) + lines.append( + f"🟡 {safe_name}\n" + f" 噪音率 {r['noise_rate']:.1%} (tp={r['tp']} fp={r['fp']} sev={r['severity'] or '-'})" + ) + if len(noisy) > 10: + lines.append(f"\n…還有 {len(noisy) - 10} 條") + lines.append("\n人工決策: 確認 deprecate 或改 expr → 手動 UPDATE review_status") + + msg = "\n".join(lines) + + tg = get_telegram_gateway() + # 直接用 telegram_gateway._send_request 送一般訊息 + await tg._send_request("sendMessage", { # type: ignore[attr-defined] + "chat_id": settings.OPENCLAW_TG_CHAT_ID, + "text": msg, + "parse_mode": "HTML", + "disable_web_page_preview": True, + }) + return True + except Exception as e: + logger.warning("hermes_telegram_send_failed", error=str(e)) + return False + + +# ============================================================================ +# 時間 +# ============================================================================ + +def _seconds_until_next_trigger() -> float: + tz_taipei = timezone(timedelta(hours=8)) + now = datetime.now(tz_taipei) + today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0) + if now >= today_trigger: + today_trigger = today_trigger + timedelta(days=1) + delta = (today_trigger - now).total_seconds() + return max(300.0, min(delta, 25 * 3600)) diff --git a/apps/api/src/main.py b/apps/api/src/main.py index 1bb609e5..9405c7ee 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -440,6 +440,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("asset_change_tracker_loop_schedule_failed", error=str(e)) + # ADR-090 § Hermes Rule Quality Advisor (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei) + # 每日 04:00 Taipei 分析 alert_rule_catalog.noise_rate,對高噪音規則推 Telegram 建議 + # 統帥鐵律: AI 只推建議不自動改 review_status,人工決策 deprecate + try: + from src.jobs.hermes_rule_quality_job import run_hermes_rule_quality_loop + asyncio.create_task(run_hermes_rule_quality_loop()) + logger.info("hermes_rule_quality_loop_scheduled", daily_trigger_hour_taipei=4) + except Exception as e: + logger.warning("hermes_rule_quality_loop_schedule_failed", error=str(e)) + # ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告 # 2026-04-14 Claude Haiku 4.5 Asia/Taipei try: