From 505232336be101cf1f1930a6b2a38d6913a610a5 Mon Sep 17 00:00:00 2001 From: OG T Date: Sun, 19 Apr 2026 17:02:21 +0800 Subject: [PATCH] =?UTF-8?q?feat(aiops):=20coverage=5Fevaluator=20=E2=80=94?= =?UTF-8?q?=20=E6=8A=8A=20coverage=5Fsnapshot=20=E5=BE=9E=20unknown=20?= =?UTF-8?q?=E5=8D=87=E7=82=BA=E7=9C=9F=E5=AF=A6=20status?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review 盲點 4: asset_coverage_snapshot 546 筆全是 'unknown',沒實際意義 新增 coverage_evaluator_job.py (~270 行): 每 1h 針對最新 asset_discovery_run 的 coverage_snapshot 做 3 維升級: ✅ auto_monitoring: Prometheus /api/v1/targets 看 host asset IP → green (有 target) / red (無 target) ✅ auto_alerting: alert_rule_catalog.labels 是否 match asset → host/namespace/layer 三種 match 策略, green/red ✅ auto_km_creation: knowledge_entries.body ILIKE asset.name → green (有 KM) / yellow (無 KM) evidence JSONB 記錄升級依據,供 AI 後續稽核 未實作 (留 unknown): ⏳ auto_rule_matching (需 alert history 統計) ⏳ auto_playbook / auto_remediation / auto_rule_creation (需 playbook 表) 預期效果 (下次 evaluator 跑 + coverage_snapshot UPDATE): - 546 筆 coverage 從 100% unknown → 30-50% green/red/yellow - 真正可以算 "覆蓋率 SLO" KPI (MASTER §7.1) - AI 可從 coverage_snapshot 看出 red asset,主動推 remediation Wire main.py lifespan asyncio.create_task() Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/api/src/jobs/coverage_evaluator_job.py | 327 ++++++++++++++++++++ apps/api/src/main.py | 10 + 2 files changed, 337 insertions(+) create mode 100644 apps/api/src/jobs/coverage_evaluator_job.py diff --git a/apps/api/src/jobs/coverage_evaluator_job.py b/apps/api/src/jobs/coverage_evaluator_job.py new file mode 100644 index 00000000..4d6c749e --- /dev/null +++ b/apps/api/src/jobs/coverage_evaluator_job.py @@ -0,0 +1,327 @@ +""" +Coverage Evaluator Job — ADR-090 § 覆蓋率評估 +============================================== +把 asset_coverage_snapshot 從 'unknown' 升級為真實 green/yellow/red. + +職責邊界 (MVP): + ✅ auto_monitoring: 查 Prometheus /api/v1/targets 看 asset 是否有 scrape target + ✅ auto_alerting: asset 的 host/namespace 是否 match alert_rule_catalog.labels + ✅ auto_km_creation: asset_type 是否有對應 knowledge_entries (粗略) + ⏳ TODO: auto_rule_matching (需 alert history 統計) + ⏳ TODO: auto_playbook / auto_remediation / auto_rule_creation (需 playbook 表) + +設計鐵律: + - 只 UPDATE 最新 run 的 coverage_snapshot (不創新 row) + - evidence JSONB 記錄 「為什麼 green/red」的證據 + - 失敗 → log + 跳過該 dim,不 crash 整個 evaluator + +排程: + - 首次延遲 300s (asset_scanner+rule_catalog 完成後) + - 每 1h 跑一次 + +2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei +ADR-090 § Phase 7 Coverage Evaluator +""" +from __future__ import annotations + +import asyncio +import json as _json +import time as _time +from typing import Any + +import httpx +import structlog + +from src.core.config import settings + +logger = structlog.get_logger(__name__) + +# ============================================================================ +# 排程 +# ============================================================================ +_EVAL_INTERVAL_SEC = 3600 +_FIRST_DELAY_SEC = 300 +_HTTP_TIMEOUT_SEC = 10 +_LOOP_BACKOFF_SEC = 600 + + +# ============================================================================ +# Public entry +# ============================================================================ + +async def run_coverage_evaluator_loop() -> None: + """每 1h 把最新 run 的 coverage_snapshot 從 unknown 升級成真實 status.""" + logger.info("coverage_evaluator_loop_started", interval_sec=_EVAL_INTERVAL_SEC) + await asyncio.sleep(_FIRST_DELAY_SEC) + + while True: + try: + await evaluate_once() + except Exception as e: + logger.exception("coverage_evaluator_loop_error", error=str(e)) + await asyncio.sleep(_LOOP_BACKOFF_SEC) + continue + await asyncio.sleep(_EVAL_INTERVAL_SEC) + + +async def evaluate_once() -> dict[str, int]: + """針對最新 asset_discovery_run 的 coverage_snapshot 升級 status.""" + started_ms = _time.time() + stats = {"monitoring_updated": 0, "alerting_updated": 0, "km_updated": 0} + error_msg: str | None = None + + try: + run_id = await _get_latest_run_id() + if not run_id: + logger.info("coverage_evaluator_no_run_yet") + return stats + + # 1. auto_monitoring: Prometheus targets + stats["monitoring_updated"] = await _evaluate_monitoring(run_id) + + # 2. auto_alerting: alert_rule_catalog labels match + stats["alerting_updated"] = await _evaluate_alerting(run_id) + + # 3. auto_km_creation: knowledge_entries 覆蓋 + stats["km_updated"] = await _evaluate_km_coverage(run_id) + + except Exception as e: + error_msg = f"{type(e).__name__}: {e}"[:1000] + logger.exception("coverage_evaluate_once_failed", error=error_msg) + + duration_ms = int((_time.time() - started_ms) * 1000) + await _log_aol(stats, duration_ms, error_msg) + + logger.info( + "coverage_evaluate_once_done", + monitoring=stats["monitoring_updated"], + alerting=stats["alerting_updated"], + km=stats["km_updated"], + duration_ms=duration_ms, + ) + return stats + + +# ============================================================================ +# 查最新 run_id +# ============================================================================ + +async def _get_latest_run_id() -> str | None: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + try: + async with get_db_context() as db: + row = await db.execute( + _sql("SELECT run_id FROM asset_discovery_run WHERE status='success' ORDER BY ended_at DESC LIMIT 1"), + ) + rid = row.scalar() + return str(rid) if rid else None + except Exception as e: + logger.warning("get_latest_run_id_failed", error=str(e)) + return None + + +# ============================================================================ +# auto_monitoring: Prometheus targets +# ============================================================================ + +async def _evaluate_monitoring(run_id: str) -> int: + """ + Prometheus /api/v1/targets 拿所有 scrape targets 的 instance IP, + 然後 UPDATE asset_coverage_snapshot dim='auto_monitoring': + - host asset 的 IP 在 targets 內 → green + - 不在 → red + """ + targets_ips = await _fetch_prometheus_target_ips() + if not targets_ips: + return 0 + + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + try: + async with get_db_context() as db: + # host asset: 看 metadata.internal_ip 是否在 targets + # 其他 asset type: 留 unknown (Prometheus 不直接 scrape) + result = await db.execute( + _sql(""" + UPDATE asset_coverage_snapshot cs + SET coverage_status = CASE + WHEN (ai.metadata->>'internal_ip')::text = ANY(:ips) THEN 'green' + WHEN ai.asset_type = 'host' THEN 'red' + ELSE cs.coverage_status + END, + evidence = CASE + WHEN (ai.metadata->>'internal_ip')::text = ANY(:ips) + THEN jsonb_build_object( + 'source', 'prometheus_targets', + 'matched_ip', ai.metadata->>'internal_ip' + ) + WHEN ai.asset_type = 'host' + THEN jsonb_build_object( + 'source', 'prometheus_targets', + 'reason', 'host IP not in scrape targets' + ) + ELSE cs.evidence + END + FROM asset_inventory ai + WHERE cs.asset_id = ai.asset_id + AND cs.run_id = CAST(:rid AS uuid) + AND cs.dimension = 'auto_monitoring' + AND ai.asset_type = 'host' + """), + {"rid": run_id, "ips": targets_ips}, + ) + return result.rowcount or 0 + except Exception as e: + logger.warning("evaluate_monitoring_failed", error=str(e)) + return 0 + + +async def _fetch_prometheus_target_ips() -> list[str]: + """GET Prometheus /api/v1/targets 回傳 scrape target IPs.""" + url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/targets" + try: + async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client: + resp = await client.get(url, params={"state": "active"}) + resp.raise_for_status() + data = resp.json() + ips: set[str] = set() + for t in (data.get("data", {}) or {}).get("activeTargets", []) or []: + instance = ((t.get("labels") or {}).get("instance") or "") + ip = instance.split(":")[0] if instance else "" + if ip: + ips.add(ip) + return sorted(ips) + except Exception as e: + logger.warning("prometheus_targets_fetch_failed", error=str(e)) + return [] + + +# ============================================================================ +# auto_alerting: alert_rule_catalog labels match +# ============================================================================ + +async def _evaluate_alerting(run_id: str) -> int: + """ + 每個 host/k8s_workload asset: + - 看 alert_rule_catalog.labels.host 是否 match asset.host → green + - 或 alert_rule_catalog.labels.namespace match asset.namespace → green + - 無任何 match → red + """ + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + try: + async with get_db_context() as db: + result = await db.execute( + _sql(""" + UPDATE asset_coverage_snapshot cs + SET coverage_status = CASE + WHEN EXISTS ( + SELECT 1 FROM alert_rule_catalog arc + WHERE (arc.labels ? 'host' AND arc.labels->>'host' = ai.host) + OR (arc.labels ? 'namespace' AND arc.labels->>'namespace' = ai.namespace) + OR (arc.labels ? 'layer' AND arc.labels->>'layer' LIKE '%' || COALESCE(ai.host, '') || '%') + ) THEN 'green' + ELSE 'red' + END, + evidence = jsonb_build_object( + 'source', 'alert_rule_catalog_label_match', + 'asset_host', ai.host, + 'asset_namespace', ai.namespace + ) + FROM asset_inventory ai + WHERE cs.asset_id = ai.asset_id + AND cs.run_id = CAST(:rid AS uuid) + AND cs.dimension = 'auto_alerting' + AND ai.asset_type IN ('host', 'k8s_workload', 'container') + """), + {"rid": run_id}, + ) + return result.rowcount or 0 + except Exception as e: + logger.warning("evaluate_alerting_failed", error=str(e)) + return 0 + + +# ============================================================================ +# auto_km_creation: knowledge_entries 覆蓋 +# ============================================================================ + +async def _evaluate_km_coverage(run_id: str) -> int: + """ + asset 有對應 knowledge_entries → green + 粗略: k8s_workload 看 app/namespace 是否有出現在 knowledge_entries.body. + """ + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + try: + async with get_db_context() as db: + # 簡化: 只看 k8s_workload 是否有 app:name 字樣出現在 KM 裡 + result = await db.execute( + _sql(""" + UPDATE asset_coverage_snapshot cs + SET coverage_status = CASE + WHEN ai.asset_type = 'k8s_workload' AND EXISTS ( + SELECT 1 FROM knowledge_entries ke + WHERE ke.body ILIKE '%' || ai.name || '%' + ) THEN 'green' + WHEN ai.asset_type = 'k8s_workload' THEN 'yellow' + ELSE cs.coverage_status + END, + evidence = jsonb_build_object( + 'source', 'knowledge_entries_substring_match', + 'asset_name', ai.name + ) + FROM asset_inventory ai + WHERE cs.asset_id = ai.asset_id + AND cs.run_id = CAST(:rid AS uuid) + AND cs.dimension = 'auto_km_creation' + AND ai.asset_type = 'k8s_workload' + """), + {"rid": run_id}, + ) + return result.rowcount or 0 + except Exception as e: + logger.warning("evaluate_km_coverage_failed", error=str(e)) + return 0 + + +# ============================================================================ +# AOL +# ============================================================================ + +async def _log_aol(stats: dict[str, int], duration_ms: int, error: str | None) -> None: + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + aol_status = "failed" if error else "success" + async with get_db_context() as db: + await db.execute( + _sql(""" + INSERT INTO automation_operation_log ( + operation_type, actor, status, + input, output, duration_ms, error, tags + ) VALUES ( + 'coverage_recalculated', + 'coverage_evaluator', + :st, + '{}'::jsonb, + CAST(:output AS jsonb), + :dur, :err, :tags + ) + """), + { + "st": aol_status, + "output": _json.dumps(stats, ensure_ascii=False), + "dur": duration_ms, + "err": (error or "")[:2000] if error else None, + "tags": ["coverage_evaluator"], + }, + ) + except Exception as e: + logger.warning("coverage_evaluator_aol_failed", error=str(e)) diff --git a/apps/api/src/main.py b/apps/api/src/main.py index 04fc858a..b42422a1 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -410,6 +410,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("compliance_scanner_loop_schedule_failed", error=str(e)) + # ADR-090 § Coverage Evaluator (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei) + # 每 1h 把 asset_coverage_snapshot 從 'unknown' 升級成 green/yellow/red + # 依據: Prometheus targets / alert_rule_catalog labels / knowledge_entries 覆蓋 + try: + from src.jobs.coverage_evaluator_job import run_coverage_evaluator_loop + asyncio.create_task(run_coverage_evaluator_loop()) + logger.info("coverage_evaluator_loop_scheduled", interval_sec=3600) + except Exception as e: + logger.warning("coverage_evaluator_loop_schedule_failed", error=str(e)) + # ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告 # 2026-04-14 Claude Haiku 4.5 Asia/Taipei try: