From 5b9b36f30deb31429bbd2dc1b6093890d518c9d8 Mon Sep 17 00:00:00 2001 From: OG T Date: Sun, 19 Apr 2026 16:08:34 +0800 Subject: [PATCH] fix(ci)+feat(aiops): cd.yaml shared network + rule_catalog_sync (ADR-090 E3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI 修復 (c0f3509 第三次 fail 真因): c0f3509 log: 'Detected act task network: (none, will fall back to bridge)' → grep ACT_NET 在 CI 環境未 match → fallback bridge → default bridge 不支援 container name DNS → pg-test-b5 解析失敗 修復 (v3 — 主動創 shared network): - B5_NET=b5-test-net (idempotent docker network create) - ci-runner 自己 docker network connect $HOSTNAME - pg-test-b5 --network=$B5_NET - 兩邊同 user-defined network → container name DNS 正常 新增 rule_catalog_sync_job (ADR-090 § Phase 7 第 2 個 service): + apps/api/src/jobs/rule_catalog_sync_job.py (~230 行) - run_rule_catalog_sync_loop: 啟動延遲 90s,每 1h sync - sync_once: HTTP GET {PROMETHEUS_URL}/api/v1/rules (type=alert) - UPSERT alert_rule_catalog (rule_name 為 UNIQUE) - 只在實際 INSERT/UPDATE 發生時才寫 aol (避免 N 條 rule 污染) + main.py lifespan asyncio.create_task() wire 預期解鎖: - alert_rule_catalog: 從 0 → Prometheus active rules 數 (~68 條) - automation_operation_log: 新增 'rule_created' / 'rule_updated' op_type - E3 Hermes AI 終於有 baseline 可以提案規則修正 Refs: ADR-090 §4.2 E3, MASTER §3.3 Co-Authored-By: Claude Opus 4.7 (1M context) --- .gitea/workflows/cd.yaml | 19 +- apps/api/src/jobs/rule_catalog_sync_job.py | 285 +++++++++++++++++++++ apps/api/src/main.py | 10 + 3 files changed, 307 insertions(+), 7 deletions(-) create mode 100644 apps/api/src/jobs/rule_catalog_sync_job.py diff --git a/.gitea/workflows/cd.yaml b/.gitea/workflows/cd.yaml index e71e3417..fc792ccf 100644 --- a/.gitea/workflows/cd.yaml +++ b/.gitea/workflows/cd.yaml @@ -158,15 +158,20 @@ jobs: if ! command -v psql &>/dev/null; then apt-get install -y -q postgresql-client fi - # 找 act runner 為本 task 創的 network (Gitea act 命名: GITEA-ACTIONS-TASK-XXX_*-network) - # 注意: act runner 用 'bash -e -o pipefail',grep 無 match 時 exit 1 會中斷整 step - # 必須 || echo "" 確保 grep 失敗時 ACT_NET 為空字串而非 abort - ACT_NET=$(docker network ls --format '{{.Name}}' 2>/dev/null | (grep -E '^GITEA-ACTIONS-TASK-[0-9]+_WORKFLOW-.*-network$' || echo "") | head -1) - echo "Detected act task network: ${ACT_NET:-(none, will fall back to bridge)}" - # 啟動測試 DB — 加入 act network,後續用 container name 'pg-test-b5' 連線 + # 2026-04-19 ogt + Claude Opus 4.7 v3: 主動創 shared network + # 之前 grep ACT_NET 在 c0f3509 run 沒 match → fallback bridge → container name DNS 失效 + # 真因: default bridge 不支援 container name DNS,必須 user-defined network + # 修法: 主動建 'b5-test-net' (idempotent),ci-runner + pg-test-b5 都加入 + B5_NET="b5-test-net" + docker network create "$B5_NET" 2>/dev/null || true + # 當前 ci-runner container (hostname == short container id) 連上此 network + # 若已連 → docker network connect 回 error 1,用 || true 吞掉 + docker network connect "$B5_NET" "$HOSTNAME" 2>/dev/null || true + echo "B5 shared network: $B5_NET (ci-runner hostname: $HOSTNAME)" + # 啟動測試 DB 於 shared network,用 container name 'pg-test-b5' 連線 docker rm -f pg-test-b5 2>/dev/null || true docker run -d --name pg-test-b5 \ - ${ACT_NET:+--network=$ACT_NET} \ + --network="$B5_NET" \ -e POSTGRES_DB=awoooi_test \ -e POSTGRES_USER=awoooi \ -e POSTGRES_PASSWORD=awoooi_test_2026 \ diff --git a/apps/api/src/jobs/rule_catalog_sync_job.py b/apps/api/src/jobs/rule_catalog_sync_job.py new file mode 100644 index 00000000..30655e93 --- /dev/null +++ b/apps/api/src/jobs/rule_catalog_sync_job.py @@ -0,0 +1,285 @@ +""" +Alert Rule Catalog Sync Job — ADR-090 § E3 Hermes 自動建規則解鎖 +================================================================= +每 1 小時從 Prometheus /api/v1/rules 拉 active rules → UPSERT alert_rule_catalog. + +職責邊界: + ✅ 同步 Prometheus 現有規則到 alert_rule_catalog (source='yaml_hardcoded') + ✅ 新增/更新時寫 automation_operation_log(rule_created / rule_updated) + ✅ 規則數變化時回寫 run summary + ❌ 不負責 AI 產生新規則 (那是 Hermes agent 的後續工作) + ❌ 不負責 noise_rate 計算 (那是告警統計 job) + +設計鐵律: + - 用 rule_name 為 UNIQUE 鍵 (alert_rule_catalog_rule_name_key) + - 已存在的 rule: UPDATE expr/labels/annotations + updated_at + - 新的 rule: INSERT review_status='approved' (既存 yaml 視為已批准) + - Prometheus API 失敗 → log warning,下次重試,不 crash 主程序 + +資料來源: + - Prometheus HTTP API: {PROMETHEUS_URL}/api/v1/rules (default 192.168.0.188:9090) + - 回應結構: data.groups[].rules[] (type='alerting' 才是告警規則,recording rules 跳過) + +排程: + - 啟動延遲 90s (等 MCP/DB/Redis init + Prometheus 就緒) + - 每 3600s 跑一次 (Prometheus rule reload 後自動同步) + +2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei +ADR-090 監控盲區治理 § Phase 7 Rule Catalog +""" +from __future__ import annotations + +import asyncio +import json as _json +import time as _time +from typing import Any + +import httpx +import structlog + +from src.core.config import settings + +logger = structlog.get_logger(__name__) + +# ============================================================================ +# 排程參數 +# ============================================================================ +_SYNC_INTERVAL_SEC = 3600 # 每 1 小時 +_FIRST_DELAY_SEC = 90 # 啟動後等 90s (等 Prometheus 就緒) +_HTTP_TIMEOUT_SEC = 10 +_LOOP_BACKOFF_SEC = 300 + +_PROM_RULES_ENDPOINT = "/api/v1/rules" + + +# ============================================================================ +# Public entry — main.py lifespan 呼叫 +# ============================================================================ + +async def run_rule_catalog_sync_loop() -> None: + """ + 永久迴圈: 每 _SYNC_INTERVAL_SEC 秒從 Prometheus 同步規則到 alert_rule_catalog. + """ + logger.info("rule_catalog_sync_loop_started", interval_sec=_SYNC_INTERVAL_SEC) + await asyncio.sleep(_FIRST_DELAY_SEC) + + while True: + try: + await sync_once() + except Exception as e: + logger.exception("rule_catalog_sync_loop_error", error=str(e)) + await asyncio.sleep(_LOOP_BACKOFF_SEC) + continue + await asyncio.sleep(_SYNC_INTERVAL_SEC) + + +async def sync_once() -> dict[str, int]: + """ + 執行一次 Prometheus → alert_rule_catalog 同步. + + Returns: + {"total": N, "new": M, "updated": K, "unchanged": L} + 失敗時所有值為 0,error 會寫 aol. + """ + started_ms = _time.time() + stats = {"total": 0, "new": 0, "updated": 0, "unchanged": 0} + error_msg: str | None = None + + try: + rules = await _fetch_prometheus_rules() + stats["total"] = len(rules) + for r in rules: + result = await _upsert_rule(r) + if result == "new": + stats["new"] += 1 + elif result == "updated": + stats["updated"] += 1 + else: + stats["unchanged"] += 1 + except Exception as e: + error_msg = f"{type(e).__name__}: {e}"[:1000] + logger.exception("rule_catalog_sync_once_failed", error=error_msg) + + duration_ms = int((_time.time() - started_ms) * 1000) + await _log_aol(stats=stats, duration_ms=duration_ms, error=error_msg) + + logger.info( + "rule_catalog_sync_once_done", + total=stats["total"], + new=stats["new"], + updated=stats["updated"], + unchanged=stats["unchanged"], + duration_ms=duration_ms, + ) + return stats + + +# ============================================================================ +# Prometheus API 拉取 +# ============================================================================ + +async def _fetch_prometheus_rules() -> list[dict[str, Any]]: + """ + GET {PROMETHEUS_URL}/api/v1/rules → 解析回 alert rules (排除 recording rules). + + 回傳: [{rule_name, expr, duration_seconds, severity, labels, annotations, group_name}, ...] + """ + url = f"{settings.PROMETHEUS_URL.rstrip('/')}{_PROM_RULES_ENDPOINT}" + async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client: + resp = await client.get(url, params={"type": "alert"}) + resp.raise_for_status() + data = resp.json() + + if data.get("status") != "success": + raise RuntimeError(f"prometheus rules API non-success: {data.get('status')}") + + groups = (data.get("data", {}) or {}).get("groups", []) or [] + rules: list[dict[str, Any]] = [] + for g in groups: + group_name = g.get("name") or "" + for r in g.get("rules", []) or []: + if r.get("type") != "alerting": + continue # 跳過 recording rules + rule_name = r.get("name") or "" + if not rule_name: + continue + labels = r.get("labels", {}) or {} + annotations = r.get("annotations", {}) or {} + rules.append({ + "rule_name": rule_name, + "expr": r.get("query") or "", + "duration_seconds": _parse_duration(r.get("duration", 0)), + "severity": labels.get("severity") or "", + "labels": labels, + "annotations": annotations, + "group_name": group_name, + }) + return rules + + +def _parse_duration(d: Any) -> int: + """Prometheus 的 duration 可能是 秒 (int/float) 或 '5m' 字串.""" + if isinstance(d, (int, float)): + return int(d) + if isinstance(d, str) and d: + try: + if d.endswith("s"): + return int(float(d[:-1])) + if d.endswith("m"): + return int(float(d[:-1]) * 60) + if d.endswith("h"): + return int(float(d[:-1]) * 3600) + return int(float(d)) + except (ValueError, TypeError): + return 0 + return 0 + + +# ============================================================================ +# DB 寫入 +# ============================================================================ + +async def _upsert_rule(rule: dict[str, Any]) -> str: + """ + UPSERT 單一 rule 到 alert_rule_catalog. + + Returns: 'new' | 'updated' | 'unchanged' + + 用 rule_name 為 UNIQUE 鍵 (alert_rule_catalog_rule_name_key). + 比較 expr/severity/labels/annotations,有變化 → updated,否則 unchanged. + """ + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + try: + async with get_db_context() as db: + row = await db.execute( + _sql(""" + INSERT INTO alert_rule_catalog ( + rule_name, source, expr, duration_seconds, + severity, labels, annotations, + created_by_agent, review_status, + created_at, updated_at + ) VALUES ( + :name, 'yaml_hardcoded', :expr, :dur, + :sev, CAST(:labels AS jsonb), CAST(:ann AS jsonb), + 'prometheus_sync', 'approved', + NOW(), NOW() + ) + ON CONFLICT (rule_name) DO UPDATE + SET expr = EXCLUDED.expr, + duration_seconds = EXCLUDED.duration_seconds, + severity = EXCLUDED.severity, + labels = EXCLUDED.labels, + annotations = EXCLUDED.annotations, + updated_at = NOW() + WHERE alert_rule_catalog.expr IS DISTINCT FROM EXCLUDED.expr + OR alert_rule_catalog.duration_seconds IS DISTINCT FROM EXCLUDED.duration_seconds + OR alert_rule_catalog.severity IS DISTINCT FROM EXCLUDED.severity + OR alert_rule_catalog.labels IS DISTINCT FROM EXCLUDED.labels + OR alert_rule_catalog.annotations IS DISTINCT FROM EXCLUDED.annotations + RETURNING rule_id, (xmax = 0) AS inserted + """), + { + "name": rule["rule_name"][:200], + "expr": rule["expr"][:4000], + "dur": rule["duration_seconds"], + "sev": rule["severity"][:50], + "labels": _json.dumps(rule["labels"], ensure_ascii=False), + "ann": _json.dumps(rule["annotations"], ensure_ascii=False), + }, + ) + result = row.one_or_none() + if result is None: + # UPDATE WHERE 條件不成立 (沒變化) — RETURNING 不回任何 row + return "unchanged" + _, inserted = result + return "new" if inserted else "updated" + except Exception as e: + logger.warning("rule_upsert_failed", rule_name=rule["rule_name"], error=str(e)) + return "unchanged" + + +async def _log_aol(stats: dict[str, int], duration_ms: int, error: str | None) -> None: + """ + 寫 automation_operation_log. + + 每次 sync 只寫 1 筆 summary (避免 N 條規則 N 筆 log 污染). + op_type 用 'rule_created' 若有新 rule,否則 'rule_updated',全 unchanged 就 skip. + """ + if stats["new"] == 0 and stats["updated"] == 0 and not error: + # 全部 unchanged,無需留痕 + return + + op_type = "rule_created" if stats["new"] > 0 else "rule_updated" + aol_status = "failed" if error else "success" + + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + async with get_db_context() as db: + await db.execute( + _sql(""" + INSERT INTO automation_operation_log ( + operation_type, actor, status, + input, output, duration_ms, error, tags + ) VALUES ( + :op, 'rule_catalog_sync', :st, + CAST(:input AS jsonb), + CAST(:output AS jsonb), + :dur, :err, :tags + ) + """), + { + "op": op_type, + "st": aol_status, + "input": _json.dumps({"source": "prometheus_api"}, ensure_ascii=False), + "output": _json.dumps(stats, ensure_ascii=False), + "dur": duration_ms, + "err": (error or "")[:2000] if error else None, + "tags": ["rule_catalog", "prometheus_sync"], + }, + ) + except Exception as e: + logger.warning("rule_catalog_aol_write_failed", error=str(e)) diff --git a/apps/api/src/main.py b/apps/api/src/main.py index f1f956d6..337c5854 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -380,6 +380,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("asset_scanner_loop_schedule_failed", error=str(e)) + # ADR-090 § Rule Catalog Sync (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei) + # 每 1 小時從 Prometheus /api/v1/rules 拉 active rules → UPSERT alert_rule_catalog + # 解鎖 E3 Hermes 自動建規則: AI 需要 alert_rule_catalog 作為 baseline 才能提案修正 + try: + from src.jobs.rule_catalog_sync_job import run_rule_catalog_sync_loop + asyncio.create_task(run_rule_catalog_sync_loop()) + logger.info("rule_catalog_sync_loop_scheduled", interval_sec=3600) + except Exception as e: + logger.warning("rule_catalog_sync_loop_schedule_failed", error=str(e)) + # ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告 # 2026-04-14 Claude Haiku 4.5 Asia/Taipei try: