feat(aiops): rule_stats_updater — 計算 noise_rate + true/false positive
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 8m26s

Review 盲點 5: alert_rule_catalog 68 筆但 noise_rate/TP/FP/last_fired_at 全 NULL

新增 rule_stats_updater_job.py (~170 行):
  每 1h UPDATE 全表 alert_rule_catalog,從 incidents + approval_records 推算:
    - last_fired_at = max(incidents.created_at WHERE alertname=rule_name)
    - true_positive_count = count incidents.status='RESOLVED' past 30d
    - false_positive_count = count approval_records.status='EXPIRED' past 30d
      (EXPIRED = 48h 無人處理,視為假警報 proxy)
    - noise_rate = fp / (tp + fp)

窗口: 30 天 (可配置)
使用單一 UPDATE + subquery,避免 N+1 (68 rule × 3 query = 204 queries → 1 query)

解鎖 E3 Hermes:
  後續 Hermes AI agent 讀 alert_rule_catalog WHERE noise_rate > 0.5
  提案 review_status='deprecated' 或 superseded_by_rule_id

Wire main.py lifespan asyncio.create_task()

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-19 17:05:30 +08:00
parent 505232336b
commit df71c9a37b
2 changed files with 190 additions and 0 deletions

View File

@@ -0,0 +1,180 @@
"""
Rule Stats Updater Job — ADR-090 § Rule Quality Analysis
=========================================================
每 1h 計算 alert_rule_catalog 的運作統計,解鎖 E3 Hermes 規則品質分析.
更新欄位:
- last_fired_at = max(incidents.created_at WHERE alertname=rule_name)
- true_positive_count = count incidents.status='RESOLVED' past 30d
- false_positive_count = count approval_records.status='EXPIRED' past 30d
(EXPIRED = 48h 無人處理,視為假警報 proxy)
- noise_rate = fp / (tp + fp) 若有資料,否則 NULL
職責邊界 (MVP):
✅ 從 incidents.alertname + approval_records.incident_id JOIN 計算
✅ UPDATE alert_rule_catalog 統計欄位
⏳ TODO: Hermes AI agent 基於 noise_rate > 0.5 提案 deprecate 規則 (下一階段)
設計鐵律:
- 只 UPDATE,不 INSERT (依賴 rule_catalog_sync 先建 rule)
- 計算窗口固定 30d (可未來配置化)
- 失敗 → log warning,下次重試
排程:
- 首次延遲 240s (rule_catalog_sync 先跑完)
- 每 1h
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
ADR-090 § Rule Quality
"""
from __future__ import annotations
import asyncio
import json as _json
import time as _time
import structlog
logger = structlog.get_logger(__name__)
_UPDATE_INTERVAL_SEC = 3600
_FIRST_DELAY_SEC = 240
_LOOP_BACKOFF_SEC = 600
_WINDOW_DAYS = 30
async def run_rule_stats_updater_loop() -> None:
"""每 1h 更新 alert_rule_catalog 統計欄位."""
logger.info("rule_stats_updater_loop_started", interval_sec=_UPDATE_INTERVAL_SEC)
await asyncio.sleep(_FIRST_DELAY_SEC)
while True:
try:
await update_once()
except Exception as e:
logger.exception("rule_stats_updater_loop_error", error=str(e))
await asyncio.sleep(_LOOP_BACKOFF_SEC)
continue
await asyncio.sleep(_UPDATE_INTERVAL_SEC)
async def update_once() -> dict[str, int]:
"""跑一次 update 全表 alert_rule_catalog."""
started_ms = _time.time()
stats = {"rules_updated": 0, "rules_with_fires": 0, "rules_noisy": 0}
error_msg: str | None = None
try:
stats = await _do_update()
except Exception as e:
error_msg = f"{type(e).__name__}: {e}"[:1000]
logger.exception("rule_stats_update_once_failed", error=error_msg)
duration_ms = int((_time.time() - started_ms) * 1000)
await _log_aol(stats, duration_ms, error_msg)
logger.info(
"rule_stats_update_once_done",
rules_updated=stats.get("rules_updated", 0),
rules_with_fires=stats.get("rules_with_fires", 0),
rules_noisy=stats.get("rules_noisy", 0),
duration_ms=duration_ms,
)
return stats
async def _do_update() -> dict[str, int]:
"""一次 UPDATE 更新所有 rule 的統計."""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
async with get_db_context() as db:
# 一次性 UPDATE 全表 — 對每 rule 計算 stats
# 使用 LATERAL 避免 N+1 queries
result = await db.execute(
_sql(f"""
UPDATE alert_rule_catalog arc
SET true_positive_count = COALESCE(s.tp_count, 0),
false_positive_count = COALESCE(s.fp_count, 0),
last_fired_at = s.last_fired,
noise_rate = CASE
WHEN COALESCE(s.tp_count, 0) + COALESCE(s.fp_count, 0) > 0
THEN COALESCE(s.fp_count, 0)::numeric
/ (COALESCE(s.tp_count, 0) + COALESCE(s.fp_count, 0))::numeric
ELSE NULL
END,
updated_at = NOW()
FROM (
SELECT
arc_inner.rule_name,
(SELECT count(*) FROM incidents i
WHERE i.alertname = arc_inner.rule_name
AND i.status = 'RESOLVED'
AND i.created_at > NOW() - INTERVAL '{_WINDOW_DAYS} days'
) AS tp_count,
(SELECT count(*) FROM approval_records ar
JOIN incidents i2 ON ar.incident_id = i2.incident_id
WHERE i2.alertname = arc_inner.rule_name
AND ar.status = 'EXPIRED'
AND ar.created_at > NOW() - INTERVAL '{_WINDOW_DAYS} days'
) AS fp_count,
(SELECT max(created_at) FROM incidents i3
WHERE i3.alertname = arc_inner.rule_name
) AS last_fired
FROM alert_rule_catalog arc_inner
) s
WHERE arc.rule_name = s.rule_name
"""),
)
rules_updated = result.rowcount or 0
# 再查統計摘要
row = await db.execute(
_sql(f"""
SELECT
count(*) FILTER (WHERE last_fired_at > NOW() - INTERVAL '{_WINDOW_DAYS} days') AS with_fires,
count(*) FILTER (WHERE noise_rate IS NOT NULL AND noise_rate > 0.5) AS noisy
FROM alert_rule_catalog
"""),
)
r = row.one()
return {
"rules_updated": rules_updated,
"rules_with_fires": int(r.with_fires or 0),
"rules_noisy": int(r.noisy or 0),
}
async def _log_aol(stats: dict[str, int], duration_ms: int, error: str | None) -> None:
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
aol_status = "failed" if error else "success"
async with get_db_context() as db:
await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, duration_ms, error, tags
) VALUES (
'rule_updated',
'rule_stats_updater',
:st,
CAST(:input AS jsonb),
CAST(:output AS jsonb),
:dur, :err, :tags
)
"""),
{
"st": aol_status,
"input": _json.dumps({"window_days": _WINDOW_DAYS}, ensure_ascii=False),
"output": _json.dumps(stats, ensure_ascii=False),
"dur": duration_ms,
"err": (error or "")[:2000] if error else None,
"tags": ["rule_catalog", "stats", "quality"],
},
)
except Exception as e:
logger.warning("rule_stats_aol_write_failed", error=str(e))

View File

@@ -420,6 +420,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("coverage_evaluator_loop_schedule_failed", error=str(e))
# ADR-090 § Rule Stats Updater (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每 1h 從 incidents + approval_records 計算 rule 統計
# 解鎖 E3 Hermes: noise_rate > 0.5 的 rule 可被 AI 提案 deprecate
try:
from src.jobs.rule_stats_updater_job import run_rule_stats_updater_loop
asyncio.create_task(run_rule_stats_updater_loop())
logger.info("rule_stats_updater_loop_scheduled", interval_sec=3600)
except Exception as e:
logger.warning("rule_stats_updater_loop_schedule_failed", error=str(e))
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
try: