fix(ci)+feat(aiops): cd.yaml shared network + rule_catalog_sync (ADR-090 E3)
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 14m31s
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 14m31s
CI 修復 (c0f3509第三次 fail 真因):c0f3509log: 'Detected act task network: (none, will fall back to bridge)' → grep ACT_NET 在 CI 環境未 match → fallback bridge → default bridge 不支援 container name DNS → pg-test-b5 解析失敗 修復 (v3 — 主動創 shared network): - B5_NET=b5-test-net (idempotent docker network create) - ci-runner 自己 docker network connect $HOSTNAME - pg-test-b5 --network=$B5_NET - 兩邊同 user-defined network → container name DNS 正常 新增 rule_catalog_sync_job (ADR-090 § Phase 7 第 2 個 service): + apps/api/src/jobs/rule_catalog_sync_job.py (~230 行) - run_rule_catalog_sync_loop: 啟動延遲 90s,每 1h sync - sync_once: HTTP GET {PROMETHEUS_URL}/api/v1/rules (type=alert) - UPSERT alert_rule_catalog (rule_name 為 UNIQUE) - 只在實際 INSERT/UPDATE 發生時才寫 aol (避免 N 條 rule 污染) + main.py lifespan asyncio.create_task() wire 預期解鎖: - alert_rule_catalog: 從 0 → Prometheus active rules 數 (~68 條) - automation_operation_log: 新增 'rule_created' / 'rule_updated' op_type - E3 Hermes AI 終於有 baseline 可以提案規則修正 Refs: ADR-090 §4.2 E3, MASTER §3.3 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -158,15 +158,20 @@ jobs:
|
||||
if ! command -v psql &>/dev/null; then
|
||||
apt-get install -y -q postgresql-client
|
||||
fi
|
||||
# 找 act runner 為本 task 創的 network (Gitea act 命名: GITEA-ACTIONS-TASK-XXX_*-network)
|
||||
# 注意: act runner 用 'bash -e -o pipefail',grep 無 match 時 exit 1 會中斷整 step
|
||||
# 必須 || echo "" 確保 grep 失敗時 ACT_NET 為空字串而非 abort
|
||||
ACT_NET=$(docker network ls --format '{{.Name}}' 2>/dev/null | (grep -E '^GITEA-ACTIONS-TASK-[0-9]+_WORKFLOW-.*-network$' || echo "") | head -1)
|
||||
echo "Detected act task network: ${ACT_NET:-(none, will fall back to bridge)}"
|
||||
# 啟動測試 DB — 加入 act network,後續用 container name 'pg-test-b5' 連線
|
||||
# 2026-04-19 ogt + Claude Opus 4.7 v3: 主動創 shared network
|
||||
# 之前 grep ACT_NET 在 c0f3509 run 沒 match → fallback bridge → container name DNS 失效
|
||||
# 真因: default bridge 不支援 container name DNS,必須 user-defined network
|
||||
# 修法: 主動建 'b5-test-net' (idempotent),ci-runner + pg-test-b5 都加入
|
||||
B5_NET="b5-test-net"
|
||||
docker network create "$B5_NET" 2>/dev/null || true
|
||||
# 當前 ci-runner container (hostname == short container id) 連上此 network
|
||||
# 若已連 → docker network connect 回 error 1,用 || true 吞掉
|
||||
docker network connect "$B5_NET" "$HOSTNAME" 2>/dev/null || true
|
||||
echo "B5 shared network: $B5_NET (ci-runner hostname: $HOSTNAME)"
|
||||
# 啟動測試 DB 於 shared network,用 container name 'pg-test-b5' 連線
|
||||
docker rm -f pg-test-b5 2>/dev/null || true
|
||||
docker run -d --name pg-test-b5 \
|
||||
${ACT_NET:+--network=$ACT_NET} \
|
||||
--network="$B5_NET" \
|
||||
-e POSTGRES_DB=awoooi_test \
|
||||
-e POSTGRES_USER=awoooi \
|
||||
-e POSTGRES_PASSWORD=awoooi_test_2026 \
|
||||
|
||||
285
apps/api/src/jobs/rule_catalog_sync_job.py
Normal file
285
apps/api/src/jobs/rule_catalog_sync_job.py
Normal file
@@ -0,0 +1,285 @@
|
||||
"""
|
||||
Alert Rule Catalog Sync Job — ADR-090 § E3 Hermes 自動建規則解鎖
|
||||
=================================================================
|
||||
每 1 小時從 Prometheus /api/v1/rules 拉 active rules → UPSERT alert_rule_catalog.
|
||||
|
||||
職責邊界:
|
||||
✅ 同步 Prometheus 現有規則到 alert_rule_catalog (source='yaml_hardcoded')
|
||||
✅ 新增/更新時寫 automation_operation_log(rule_created / rule_updated)
|
||||
✅ 規則數變化時回寫 run summary
|
||||
❌ 不負責 AI 產生新規則 (那是 Hermes agent 的後續工作)
|
||||
❌ 不負責 noise_rate 計算 (那是告警統計 job)
|
||||
|
||||
設計鐵律:
|
||||
- 用 rule_name 為 UNIQUE 鍵 (alert_rule_catalog_rule_name_key)
|
||||
- 已存在的 rule: UPDATE expr/labels/annotations + updated_at
|
||||
- 新的 rule: INSERT review_status='approved' (既存 yaml 視為已批准)
|
||||
- Prometheus API 失敗 → log warning,下次重試,不 crash 主程序
|
||||
|
||||
資料來源:
|
||||
- Prometheus HTTP API: {PROMETHEUS_URL}/api/v1/rules (default 192.168.0.188:9090)
|
||||
- 回應結構: data.groups[].rules[] (type='alerting' 才是告警規則,recording rules 跳過)
|
||||
|
||||
排程:
|
||||
- 啟動延遲 90s (等 MCP/DB/Redis init + Prometheus 就緒)
|
||||
- 每 3600s 跑一次 (Prometheus rule reload 後自動同步)
|
||||
|
||||
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
|
||||
ADR-090 監控盲區治理 § Phase 7 Rule Catalog
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json as _json
|
||||
import time as _time
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
|
||||
from src.core.config import settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# ============================================================================
|
||||
# 排程參數
|
||||
# ============================================================================
|
||||
_SYNC_INTERVAL_SEC = 3600 # 每 1 小時
|
||||
_FIRST_DELAY_SEC = 90 # 啟動後等 90s (等 Prometheus 就緒)
|
||||
_HTTP_TIMEOUT_SEC = 10
|
||||
_LOOP_BACKOFF_SEC = 300
|
||||
|
||||
_PROM_RULES_ENDPOINT = "/api/v1/rules"
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Public entry — main.py lifespan 呼叫
|
||||
# ============================================================================
|
||||
|
||||
async def run_rule_catalog_sync_loop() -> None:
|
||||
"""
|
||||
永久迴圈: 每 _SYNC_INTERVAL_SEC 秒從 Prometheus 同步規則到 alert_rule_catalog.
|
||||
"""
|
||||
logger.info("rule_catalog_sync_loop_started", interval_sec=_SYNC_INTERVAL_SEC)
|
||||
await asyncio.sleep(_FIRST_DELAY_SEC)
|
||||
|
||||
while True:
|
||||
try:
|
||||
await sync_once()
|
||||
except Exception as e:
|
||||
logger.exception("rule_catalog_sync_loop_error", error=str(e))
|
||||
await asyncio.sleep(_LOOP_BACKOFF_SEC)
|
||||
continue
|
||||
await asyncio.sleep(_SYNC_INTERVAL_SEC)
|
||||
|
||||
|
||||
async def sync_once() -> dict[str, int]:
|
||||
"""
|
||||
執行一次 Prometheus → alert_rule_catalog 同步.
|
||||
|
||||
Returns:
|
||||
{"total": N, "new": M, "updated": K, "unchanged": L}
|
||||
失敗時所有值為 0,error 會寫 aol.
|
||||
"""
|
||||
started_ms = _time.time()
|
||||
stats = {"total": 0, "new": 0, "updated": 0, "unchanged": 0}
|
||||
error_msg: str | None = None
|
||||
|
||||
try:
|
||||
rules = await _fetch_prometheus_rules()
|
||||
stats["total"] = len(rules)
|
||||
for r in rules:
|
||||
result = await _upsert_rule(r)
|
||||
if result == "new":
|
||||
stats["new"] += 1
|
||||
elif result == "updated":
|
||||
stats["updated"] += 1
|
||||
else:
|
||||
stats["unchanged"] += 1
|
||||
except Exception as e:
|
||||
error_msg = f"{type(e).__name__}: {e}"[:1000]
|
||||
logger.exception("rule_catalog_sync_once_failed", error=error_msg)
|
||||
|
||||
duration_ms = int((_time.time() - started_ms) * 1000)
|
||||
await _log_aol(stats=stats, duration_ms=duration_ms, error=error_msg)
|
||||
|
||||
logger.info(
|
||||
"rule_catalog_sync_once_done",
|
||||
total=stats["total"],
|
||||
new=stats["new"],
|
||||
updated=stats["updated"],
|
||||
unchanged=stats["unchanged"],
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
return stats
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Prometheus API 拉取
|
||||
# ============================================================================
|
||||
|
||||
async def _fetch_prometheus_rules() -> list[dict[str, Any]]:
|
||||
"""
|
||||
GET {PROMETHEUS_URL}/api/v1/rules → 解析回 alert rules (排除 recording rules).
|
||||
|
||||
回傳: [{rule_name, expr, duration_seconds, severity, labels, annotations, group_name}, ...]
|
||||
"""
|
||||
url = f"{settings.PROMETHEUS_URL.rstrip('/')}{_PROM_RULES_ENDPOINT}"
|
||||
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client:
|
||||
resp = await client.get(url, params={"type": "alert"})
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
if data.get("status") != "success":
|
||||
raise RuntimeError(f"prometheus rules API non-success: {data.get('status')}")
|
||||
|
||||
groups = (data.get("data", {}) or {}).get("groups", []) or []
|
||||
rules: list[dict[str, Any]] = []
|
||||
for g in groups:
|
||||
group_name = g.get("name") or ""
|
||||
for r in g.get("rules", []) or []:
|
||||
if r.get("type") != "alerting":
|
||||
continue # 跳過 recording rules
|
||||
rule_name = r.get("name") or ""
|
||||
if not rule_name:
|
||||
continue
|
||||
labels = r.get("labels", {}) or {}
|
||||
annotations = r.get("annotations", {}) or {}
|
||||
rules.append({
|
||||
"rule_name": rule_name,
|
||||
"expr": r.get("query") or "",
|
||||
"duration_seconds": _parse_duration(r.get("duration", 0)),
|
||||
"severity": labels.get("severity") or "",
|
||||
"labels": labels,
|
||||
"annotations": annotations,
|
||||
"group_name": group_name,
|
||||
})
|
||||
return rules
|
||||
|
||||
|
||||
def _parse_duration(d: Any) -> int:
|
||||
"""Prometheus 的 duration 可能是 秒 (int/float) 或 '5m' 字串."""
|
||||
if isinstance(d, (int, float)):
|
||||
return int(d)
|
||||
if isinstance(d, str) and d:
|
||||
try:
|
||||
if d.endswith("s"):
|
||||
return int(float(d[:-1]))
|
||||
if d.endswith("m"):
|
||||
return int(float(d[:-1]) * 60)
|
||||
if d.endswith("h"):
|
||||
return int(float(d[:-1]) * 3600)
|
||||
return int(float(d))
|
||||
except (ValueError, TypeError):
|
||||
return 0
|
||||
return 0
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# DB 寫入
|
||||
# ============================================================================
|
||||
|
||||
async def _upsert_rule(rule: dict[str, Any]) -> str:
|
||||
"""
|
||||
UPSERT 單一 rule 到 alert_rule_catalog.
|
||||
|
||||
Returns: 'new' | 'updated' | 'unchanged'
|
||||
|
||||
用 rule_name 為 UNIQUE 鍵 (alert_rule_catalog_rule_name_key).
|
||||
比較 expr/severity/labels/annotations,有變化 → updated,否則 unchanged.
|
||||
"""
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
row = await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO alert_rule_catalog (
|
||||
rule_name, source, expr, duration_seconds,
|
||||
severity, labels, annotations,
|
||||
created_by_agent, review_status,
|
||||
created_at, updated_at
|
||||
) VALUES (
|
||||
:name, 'yaml_hardcoded', :expr, :dur,
|
||||
:sev, CAST(:labels AS jsonb), CAST(:ann AS jsonb),
|
||||
'prometheus_sync', 'approved',
|
||||
NOW(), NOW()
|
||||
)
|
||||
ON CONFLICT (rule_name) DO UPDATE
|
||||
SET expr = EXCLUDED.expr,
|
||||
duration_seconds = EXCLUDED.duration_seconds,
|
||||
severity = EXCLUDED.severity,
|
||||
labels = EXCLUDED.labels,
|
||||
annotations = EXCLUDED.annotations,
|
||||
updated_at = NOW()
|
||||
WHERE alert_rule_catalog.expr IS DISTINCT FROM EXCLUDED.expr
|
||||
OR alert_rule_catalog.duration_seconds IS DISTINCT FROM EXCLUDED.duration_seconds
|
||||
OR alert_rule_catalog.severity IS DISTINCT FROM EXCLUDED.severity
|
||||
OR alert_rule_catalog.labels IS DISTINCT FROM EXCLUDED.labels
|
||||
OR alert_rule_catalog.annotations IS DISTINCT FROM EXCLUDED.annotations
|
||||
RETURNING rule_id, (xmax = 0) AS inserted
|
||||
"""),
|
||||
{
|
||||
"name": rule["rule_name"][:200],
|
||||
"expr": rule["expr"][:4000],
|
||||
"dur": rule["duration_seconds"],
|
||||
"sev": rule["severity"][:50],
|
||||
"labels": _json.dumps(rule["labels"], ensure_ascii=False),
|
||||
"ann": _json.dumps(rule["annotations"], ensure_ascii=False),
|
||||
},
|
||||
)
|
||||
result = row.one_or_none()
|
||||
if result is None:
|
||||
# UPDATE WHERE 條件不成立 (沒變化) — RETURNING 不回任何 row
|
||||
return "unchanged"
|
||||
_, inserted = result
|
||||
return "new" if inserted else "updated"
|
||||
except Exception as e:
|
||||
logger.warning("rule_upsert_failed", rule_name=rule["rule_name"], error=str(e))
|
||||
return "unchanged"
|
||||
|
||||
|
||||
async def _log_aol(stats: dict[str, int], duration_ms: int, error: str | None) -> None:
|
||||
"""
|
||||
寫 automation_operation_log.
|
||||
|
||||
每次 sync 只寫 1 筆 summary (避免 N 條規則 N 筆 log 污染).
|
||||
op_type 用 'rule_created' 若有新 rule,否則 'rule_updated',全 unchanged 就 skip.
|
||||
"""
|
||||
if stats["new"] == 0 and stats["updated"] == 0 and not error:
|
||||
# 全部 unchanged,無需留痕
|
||||
return
|
||||
|
||||
op_type = "rule_created" if stats["new"] > 0 else "rule_updated"
|
||||
aol_status = "failed" if error else "success"
|
||||
|
||||
try:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO automation_operation_log (
|
||||
operation_type, actor, status,
|
||||
input, output, duration_ms, error, tags
|
||||
) VALUES (
|
||||
:op, 'rule_catalog_sync', :st,
|
||||
CAST(:input AS jsonb),
|
||||
CAST(:output AS jsonb),
|
||||
:dur, :err, :tags
|
||||
)
|
||||
"""),
|
||||
{
|
||||
"op": op_type,
|
||||
"st": aol_status,
|
||||
"input": _json.dumps({"source": "prometheus_api"}, ensure_ascii=False),
|
||||
"output": _json.dumps(stats, ensure_ascii=False),
|
||||
"dur": duration_ms,
|
||||
"err": (error or "")[:2000] if error else None,
|
||||
"tags": ["rule_catalog", "prometheus_sync"],
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("rule_catalog_aol_write_failed", error=str(e))
|
||||
@@ -380,6 +380,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
except Exception as e:
|
||||
logger.warning("asset_scanner_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-090 § Rule Catalog Sync (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
|
||||
# 每 1 小時從 Prometheus /api/v1/rules 拉 active rules → UPSERT alert_rule_catalog
|
||||
# 解鎖 E3 Hermes 自動建規則: AI 需要 alert_rule_catalog 作為 baseline 才能提案修正
|
||||
try:
|
||||
from src.jobs.rule_catalog_sync_job import run_rule_catalog_sync_loop
|
||||
asyncio.create_task(run_rule_catalog_sync_loop())
|
||||
logger.info("rule_catalog_sync_loop_scheduled", interval_sec=3600)
|
||||
except Exception as e:
|
||||
logger.warning("rule_catalog_sync_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
|
||||
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user