feat(aiops): coverage_evaluator — 把 coverage_snapshot 從 unknown 升為真實 status
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Review 盲點 4: asset_coverage_snapshot 546 筆全是 'unknown',沒實際意義
新增 coverage_evaluator_job.py (~270 行):
每 1h 針對最新 asset_discovery_run 的 coverage_snapshot 做 3 維升級:
✅ auto_monitoring: Prometheus /api/v1/targets 看 host asset IP
→ green (有 target) / red (無 target)
✅ auto_alerting: alert_rule_catalog.labels 是否 match asset
→ host/namespace/layer 三種 match 策略, green/red
✅ auto_km_creation: knowledge_entries.body ILIKE asset.name
→ green (有 KM) / yellow (無 KM)
evidence JSONB 記錄升級依據,供 AI 後續稽核
未實作 (留 unknown):
⏳ auto_rule_matching (需 alert history 統計)
⏳ auto_playbook / auto_remediation / auto_rule_creation (需 playbook 表)
預期效果 (下次 evaluator 跑 + coverage_snapshot UPDATE):
- 546 筆 coverage 從 100% unknown → 30-50% green/red/yellow
- 真正可以算 "覆蓋率 SLO" KPI (MASTER §7.1)
- AI 可從 coverage_snapshot 看出 red asset,主動推 remediation
Wire main.py lifespan asyncio.create_task()
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
327
apps/api/src/jobs/coverage_evaluator_job.py
Normal file
327
apps/api/src/jobs/coverage_evaluator_job.py
Normal file
@@ -0,0 +1,327 @@
|
||||
"""
|
||||
Coverage Evaluator Job — ADR-090 § 覆蓋率評估
|
||||
==============================================
|
||||
把 asset_coverage_snapshot 從 'unknown' 升級為真實 green/yellow/red.
|
||||
|
||||
職責邊界 (MVP):
|
||||
✅ auto_monitoring: 查 Prometheus /api/v1/targets 看 asset 是否有 scrape target
|
||||
✅ auto_alerting: asset 的 host/namespace 是否 match alert_rule_catalog.labels
|
||||
✅ auto_km_creation: asset_type 是否有對應 knowledge_entries (粗略)
|
||||
⏳ TODO: auto_rule_matching (需 alert history 統計)
|
||||
⏳ TODO: auto_playbook / auto_remediation / auto_rule_creation (需 playbook 表)
|
||||
|
||||
設計鐵律:
|
||||
- 只 UPDATE 最新 run 的 coverage_snapshot (不創新 row)
|
||||
- evidence JSONB 記錄 「為什麼 green/red」的證據
|
||||
- 失敗 → log + 跳過該 dim,不 crash 整個 evaluator
|
||||
|
||||
排程:
|
||||
- 首次延遲 300s (asset_scanner+rule_catalog 完成後)
|
||||
- 每 1h 跑一次
|
||||
|
||||
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
|
||||
ADR-090 § Phase 7 Coverage Evaluator
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json as _json
|
||||
import time as _time
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
|
||||
from src.core.config import settings
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# ============================================================================
|
||||
# 排程
|
||||
# ============================================================================
|
||||
_EVAL_INTERVAL_SEC = 3600
|
||||
_FIRST_DELAY_SEC = 300
|
||||
_HTTP_TIMEOUT_SEC = 10
|
||||
_LOOP_BACKOFF_SEC = 600
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Public entry
|
||||
# ============================================================================
|
||||
|
||||
async def run_coverage_evaluator_loop() -> None:
|
||||
"""每 1h 把最新 run 的 coverage_snapshot 從 unknown 升級成真實 status."""
|
||||
logger.info("coverage_evaluator_loop_started", interval_sec=_EVAL_INTERVAL_SEC)
|
||||
await asyncio.sleep(_FIRST_DELAY_SEC)
|
||||
|
||||
while True:
|
||||
try:
|
||||
await evaluate_once()
|
||||
except Exception as e:
|
||||
logger.exception("coverage_evaluator_loop_error", error=str(e))
|
||||
await asyncio.sleep(_LOOP_BACKOFF_SEC)
|
||||
continue
|
||||
await asyncio.sleep(_EVAL_INTERVAL_SEC)
|
||||
|
||||
|
||||
async def evaluate_once() -> dict[str, int]:
|
||||
"""針對最新 asset_discovery_run 的 coverage_snapshot 升級 status."""
|
||||
started_ms = _time.time()
|
||||
stats = {"monitoring_updated": 0, "alerting_updated": 0, "km_updated": 0}
|
||||
error_msg: str | None = None
|
||||
|
||||
try:
|
||||
run_id = await _get_latest_run_id()
|
||||
if not run_id:
|
||||
logger.info("coverage_evaluator_no_run_yet")
|
||||
return stats
|
||||
|
||||
# 1. auto_monitoring: Prometheus targets
|
||||
stats["monitoring_updated"] = await _evaluate_monitoring(run_id)
|
||||
|
||||
# 2. auto_alerting: alert_rule_catalog labels match
|
||||
stats["alerting_updated"] = await _evaluate_alerting(run_id)
|
||||
|
||||
# 3. auto_km_creation: knowledge_entries 覆蓋
|
||||
stats["km_updated"] = await _evaluate_km_coverage(run_id)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"{type(e).__name__}: {e}"[:1000]
|
||||
logger.exception("coverage_evaluate_once_failed", error=error_msg)
|
||||
|
||||
duration_ms = int((_time.time() - started_ms) * 1000)
|
||||
await _log_aol(stats, duration_ms, error_msg)
|
||||
|
||||
logger.info(
|
||||
"coverage_evaluate_once_done",
|
||||
monitoring=stats["monitoring_updated"],
|
||||
alerting=stats["alerting_updated"],
|
||||
km=stats["km_updated"],
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
return stats
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 查最新 run_id
|
||||
# ============================================================================
|
||||
|
||||
async def _get_latest_run_id() -> str | None:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
row = await db.execute(
|
||||
_sql("SELECT run_id FROM asset_discovery_run WHERE status='success' ORDER BY ended_at DESC LIMIT 1"),
|
||||
)
|
||||
rid = row.scalar()
|
||||
return str(rid) if rid else None
|
||||
except Exception as e:
|
||||
logger.warning("get_latest_run_id_failed", error=str(e))
|
||||
return None
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# auto_monitoring: Prometheus targets
|
||||
# ============================================================================
|
||||
|
||||
async def _evaluate_monitoring(run_id: str) -> int:
|
||||
"""
|
||||
Prometheus /api/v1/targets 拿所有 scrape targets 的 instance IP,
|
||||
然後 UPDATE asset_coverage_snapshot dim='auto_monitoring':
|
||||
- host asset 的 IP 在 targets 內 → green
|
||||
- 不在 → red
|
||||
"""
|
||||
targets_ips = await _fetch_prometheus_target_ips()
|
||||
if not targets_ips:
|
||||
return 0
|
||||
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
# host asset: 看 metadata.internal_ip 是否在 targets
|
||||
# 其他 asset type: 留 unknown (Prometheus 不直接 scrape)
|
||||
result = await db.execute(
|
||||
_sql("""
|
||||
UPDATE asset_coverage_snapshot cs
|
||||
SET coverage_status = CASE
|
||||
WHEN (ai.metadata->>'internal_ip')::text = ANY(:ips) THEN 'green'
|
||||
WHEN ai.asset_type = 'host' THEN 'red'
|
||||
ELSE cs.coverage_status
|
||||
END,
|
||||
evidence = CASE
|
||||
WHEN (ai.metadata->>'internal_ip')::text = ANY(:ips)
|
||||
THEN jsonb_build_object(
|
||||
'source', 'prometheus_targets',
|
||||
'matched_ip', ai.metadata->>'internal_ip'
|
||||
)
|
||||
WHEN ai.asset_type = 'host'
|
||||
THEN jsonb_build_object(
|
||||
'source', 'prometheus_targets',
|
||||
'reason', 'host IP not in scrape targets'
|
||||
)
|
||||
ELSE cs.evidence
|
||||
END
|
||||
FROM asset_inventory ai
|
||||
WHERE cs.asset_id = ai.asset_id
|
||||
AND cs.run_id = CAST(:rid AS uuid)
|
||||
AND cs.dimension = 'auto_monitoring'
|
||||
AND ai.asset_type = 'host'
|
||||
"""),
|
||||
{"rid": run_id, "ips": targets_ips},
|
||||
)
|
||||
return result.rowcount or 0
|
||||
except Exception as e:
|
||||
logger.warning("evaluate_monitoring_failed", error=str(e))
|
||||
return 0
|
||||
|
||||
|
||||
async def _fetch_prometheus_target_ips() -> list[str]:
|
||||
"""GET Prometheus /api/v1/targets 回傳 scrape target IPs."""
|
||||
url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/targets"
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client:
|
||||
resp = await client.get(url, params={"state": "active"})
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
ips: set[str] = set()
|
||||
for t in (data.get("data", {}) or {}).get("activeTargets", []) or []:
|
||||
instance = ((t.get("labels") or {}).get("instance") or "")
|
||||
ip = instance.split(":")[0] if instance else ""
|
||||
if ip:
|
||||
ips.add(ip)
|
||||
return sorted(ips)
|
||||
except Exception as e:
|
||||
logger.warning("prometheus_targets_fetch_failed", error=str(e))
|
||||
return []
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# auto_alerting: alert_rule_catalog labels match
|
||||
# ============================================================================
|
||||
|
||||
async def _evaluate_alerting(run_id: str) -> int:
|
||||
"""
|
||||
每個 host/k8s_workload asset:
|
||||
- 看 alert_rule_catalog.labels.host 是否 match asset.host → green
|
||||
- 或 alert_rule_catalog.labels.namespace match asset.namespace → green
|
||||
- 無任何 match → red
|
||||
"""
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
result = await db.execute(
|
||||
_sql("""
|
||||
UPDATE asset_coverage_snapshot cs
|
||||
SET coverage_status = CASE
|
||||
WHEN EXISTS (
|
||||
SELECT 1 FROM alert_rule_catalog arc
|
||||
WHERE (arc.labels ? 'host' AND arc.labels->>'host' = ai.host)
|
||||
OR (arc.labels ? 'namespace' AND arc.labels->>'namespace' = ai.namespace)
|
||||
OR (arc.labels ? 'layer' AND arc.labels->>'layer' LIKE '%' || COALESCE(ai.host, '') || '%')
|
||||
) THEN 'green'
|
||||
ELSE 'red'
|
||||
END,
|
||||
evidence = jsonb_build_object(
|
||||
'source', 'alert_rule_catalog_label_match',
|
||||
'asset_host', ai.host,
|
||||
'asset_namespace', ai.namespace
|
||||
)
|
||||
FROM asset_inventory ai
|
||||
WHERE cs.asset_id = ai.asset_id
|
||||
AND cs.run_id = CAST(:rid AS uuid)
|
||||
AND cs.dimension = 'auto_alerting'
|
||||
AND ai.asset_type IN ('host', 'k8s_workload', 'container')
|
||||
"""),
|
||||
{"rid": run_id},
|
||||
)
|
||||
return result.rowcount or 0
|
||||
except Exception as e:
|
||||
logger.warning("evaluate_alerting_failed", error=str(e))
|
||||
return 0
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# auto_km_creation: knowledge_entries 覆蓋
|
||||
# ============================================================================
|
||||
|
||||
async def _evaluate_km_coverage(run_id: str) -> int:
|
||||
"""
|
||||
asset 有對應 knowledge_entries → green
|
||||
粗略: k8s_workload 看 app/namespace 是否有出現在 knowledge_entries.body.
|
||||
"""
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
# 簡化: 只看 k8s_workload 是否有 app:name 字樣出現在 KM 裡
|
||||
result = await db.execute(
|
||||
_sql("""
|
||||
UPDATE asset_coverage_snapshot cs
|
||||
SET coverage_status = CASE
|
||||
WHEN ai.asset_type = 'k8s_workload' AND EXISTS (
|
||||
SELECT 1 FROM knowledge_entries ke
|
||||
WHERE ke.body ILIKE '%' || ai.name || '%'
|
||||
) THEN 'green'
|
||||
WHEN ai.asset_type = 'k8s_workload' THEN 'yellow'
|
||||
ELSE cs.coverage_status
|
||||
END,
|
||||
evidence = jsonb_build_object(
|
||||
'source', 'knowledge_entries_substring_match',
|
||||
'asset_name', ai.name
|
||||
)
|
||||
FROM asset_inventory ai
|
||||
WHERE cs.asset_id = ai.asset_id
|
||||
AND cs.run_id = CAST(:rid AS uuid)
|
||||
AND cs.dimension = 'auto_km_creation'
|
||||
AND ai.asset_type = 'k8s_workload'
|
||||
"""),
|
||||
{"rid": run_id},
|
||||
)
|
||||
return result.rowcount or 0
|
||||
except Exception as e:
|
||||
logger.warning("evaluate_km_coverage_failed", error=str(e))
|
||||
return 0
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# AOL
|
||||
# ============================================================================
|
||||
|
||||
async def _log_aol(stats: dict[str, int], duration_ms: int, error: str | None) -> None:
|
||||
try:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
aol_status = "failed" if error else "success"
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
_sql("""
|
||||
INSERT INTO automation_operation_log (
|
||||
operation_type, actor, status,
|
||||
input, output, duration_ms, error, tags
|
||||
) VALUES (
|
||||
'coverage_recalculated',
|
||||
'coverage_evaluator',
|
||||
:st,
|
||||
'{}'::jsonb,
|
||||
CAST(:output AS jsonb),
|
||||
:dur, :err, :tags
|
||||
)
|
||||
"""),
|
||||
{
|
||||
"st": aol_status,
|
||||
"output": _json.dumps(stats, ensure_ascii=False),
|
||||
"dur": duration_ms,
|
||||
"err": (error or "")[:2000] if error else None,
|
||||
"tags": ["coverage_evaluator"],
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("coverage_evaluator_aol_failed", error=str(e))
|
||||
@@ -410,6 +410,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
except Exception as e:
|
||||
logger.warning("compliance_scanner_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-090 § Coverage Evaluator (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
|
||||
# 每 1h 把 asset_coverage_snapshot 從 'unknown' 升級成 green/yellow/red
|
||||
# 依據: Prometheus targets / alert_rule_catalog labels / knowledge_entries 覆蓋
|
||||
try:
|
||||
from src.jobs.coverage_evaluator_job import run_coverage_evaluator_loop
|
||||
asyncio.create_task(run_coverage_evaluator_loop())
|
||||
logger.info("coverage_evaluator_loop_scheduled", interval_sec=3600)
|
||||
except Exception as e:
|
||||
logger.warning("coverage_evaluator_loop_schedule_failed", error=str(e))
|
||||
|
||||
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
|
||||
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user