Files
awoooi/apps/api/src/jobs/coverage_evaluator_job.py
OG T c1f23cfabe
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
feat(coverage_evaluator): 擴充 4 維 — playbook/remediation/rule_matching/rule_creation
Review 盲點: coverage 7 維中原只實作 3 維 (monitoring/alerting/km),其餘 4 維永遠 unknown

v2 擴充:
  + auto_playbook: asset.name 出現在 playbooks.symptom_pattern/description (approved 狀態) → green
     沒對應 playbook 但 type='k8s_workload' → yellow
  + auto_remediation: 過去 30d remediation_events.target_resource ILIKE asset.name → green
     沒 target 但 k8s_workload/container → red (應有修復能力但沒)
  + auto_rule_matching: 過去 30d incidents.affected_services ILIKE asset.name
     或 incidents.alertname match alert_rule.labels.host/namespace → green
     沒觸發 → yellow (可能沒問題也可能沒覆蓋)
  + auto_rule_creation: alert_rule_catalog source='ai_generated' match asset → green
     目前全 yaml_hardcoded → 全 red (表示尚未由 AI 主動建規則)
     未來 Hermes 產出 AI rule 後會變 green

解鎖: coverage 7 維完整 SLO KPI (MASTER §7.1)
  - red count = 真正的治理缺口
  - green ratio = 自動化成熟度
  - AI 可主動推薦 red asset 的補覆蓋動作

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 19:54:36 +08:00

518 lines
22 KiB
Python

"""
Coverage Evaluator Job — ADR-090 § 覆蓋率評估
==============================================
把 asset_coverage_snapshot 從 'unknown' 升級為真實 green/yellow/red.
職責邊界 (MVP):
✅ auto_monitoring: 查 Prometheus /api/v1/targets 看 asset 是否有 scrape target
✅ auto_alerting: asset 的 host/namespace 是否 match alert_rule_catalog.labels
✅ auto_km_creation: asset_type 是否有對應 knowledge_entries (粗略)
⏳ TODO: auto_rule_matching (需 alert history 統計)
⏳ TODO: auto_playbook / auto_remediation / auto_rule_creation (需 playbook 表)
設計鐵律:
- 只 UPDATE 最新 run 的 coverage_snapshot (不創新 row)
- evidence JSONB 記錄 「為什麼 green/red」的證據
- 失敗 → log + 跳過該 dim,不 crash 整個 evaluator
排程:
- 首次延遲 300s (asset_scanner+rule_catalog 完成後)
- 每 1h 跑一次
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
ADR-090 § Phase 7 Coverage Evaluator
"""
from __future__ import annotations
import asyncio
import json as _json
import time as _time
import httpx
import structlog
from src.core.config import settings
logger = structlog.get_logger(__name__)
# ============================================================================
# 排程
# ============================================================================
_EVAL_INTERVAL_SEC = 3600
_FIRST_DELAY_SEC = 300
_HTTP_TIMEOUT_SEC = 10
_LOOP_BACKOFF_SEC = 600
# ============================================================================
# Public entry
# ============================================================================
async def run_coverage_evaluator_loop() -> None:
"""每 1h 把最新 run 的 coverage_snapshot 從 unknown 升級成真實 status."""
logger.info("coverage_evaluator_loop_started", interval_sec=_EVAL_INTERVAL_SEC)
await asyncio.sleep(_FIRST_DELAY_SEC)
while True:
try:
await evaluate_once()
except Exception as e:
logger.exception("coverage_evaluator_loop_error", error=str(e))
await asyncio.sleep(_LOOP_BACKOFF_SEC)
continue
await asyncio.sleep(_EVAL_INTERVAL_SEC)
async def evaluate_once() -> dict[str, int]:
"""針對最新 asset_discovery_run 的 coverage_snapshot 升級 status.
2026-04-19 v2 擴充 4 維 (原 3 維 monitoring/alerting/km):
+ auto_playbook: asset.name 出現在 playbooks.symptom_pattern 或 description
+ auto_remediation: remediation_events 過去 30d 有 target match asset.name
+ auto_rule_matching: incidents 過去 30d 有 asset match (alertname+affected_services)
+ auto_rule_creation: alert_rule_catalog source='ai_generated' 覆蓋 asset
"""
started_ms = _time.time()
stats = {
"monitoring_updated": 0, "alerting_updated": 0, "km_updated": 0,
"playbook_updated": 0, "remediation_updated": 0,
"rule_matching_updated": 0, "rule_creation_updated": 0,
}
error_msg: str | None = None
try:
run_id = await _get_latest_run_id()
if not run_id:
logger.info("coverage_evaluator_no_run_yet")
return stats
# 原 3 維
stats["monitoring_updated"] = await _evaluate_monitoring(run_id)
stats["alerting_updated"] = await _evaluate_alerting(run_id)
stats["km_updated"] = await _evaluate_km_coverage(run_id)
# v2 新增 4 維
stats["playbook_updated"] = await _evaluate_playbook_coverage(run_id)
stats["remediation_updated"] = await _evaluate_remediation_coverage(run_id)
stats["rule_matching_updated"] = await _evaluate_rule_matching_coverage(run_id)
stats["rule_creation_updated"] = await _evaluate_rule_creation_coverage(run_id)
except Exception as e:
error_msg = f"{type(e).__name__}: {e}"[:1000]
logger.exception("coverage_evaluate_once_failed", error=error_msg)
duration_ms = int((_time.time() - started_ms) * 1000)
await _log_aol(stats, duration_ms, error_msg)
logger.info(
"coverage_evaluate_once_done",
monitoring=stats["monitoring_updated"],
alerting=stats["alerting_updated"],
km=stats["km_updated"],
duration_ms=duration_ms,
)
return stats
# ============================================================================
# 查最新 run_id
# ============================================================================
async def _get_latest_run_id() -> str | None:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
row = await db.execute(
_sql("SELECT run_id FROM asset_discovery_run WHERE status='success' ORDER BY ended_at DESC LIMIT 1"),
)
rid = row.scalar()
return str(rid) if rid else None
except Exception as e:
logger.warning("get_latest_run_id_failed", error=str(e))
return None
# ============================================================================
# auto_monitoring: Prometheus targets
# ============================================================================
async def _evaluate_monitoring(run_id: str) -> int:
"""
Prometheus /api/v1/targets 拿所有 scrape targets 的 instance IP,
然後 UPDATE asset_coverage_snapshot dim='auto_monitoring':
- host asset 的 IP 在 targets 內 → green
- 不在 → red
"""
targets_ips = await _fetch_prometheus_target_ips()
if not targets_ips:
return 0
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
# host asset: 看 metadata.internal_ip 是否在 targets
# 其他 asset type: 留 unknown (Prometheus 不直接 scrape)
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN (ai.metadata->>'internal_ip')::text = ANY(:ips) THEN 'green'
WHEN ai.asset_type = 'host' THEN 'red'
ELSE cs.coverage_status
END,
evidence = CASE
WHEN (ai.metadata->>'internal_ip')::text = ANY(:ips)
THEN jsonb_build_object(
'source', 'prometheus_targets',
'matched_ip', ai.metadata->>'internal_ip'
)
WHEN ai.asset_type = 'host'
THEN jsonb_build_object(
'source', 'prometheus_targets',
'reason', 'host IP not in scrape targets'
)
ELSE cs.evidence
END
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_monitoring'
AND ai.asset_type = 'host'
"""),
{"rid": run_id, "ips": targets_ips},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_monitoring_failed", error=str(e))
return 0
async def _fetch_prometheus_target_ips() -> list[str]:
"""GET Prometheus /api/v1/targets 回傳 scrape target IPs."""
url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/targets"
try:
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client:
resp = await client.get(url, params={"state": "active"})
resp.raise_for_status()
data = resp.json()
ips: set[str] = set()
for t in (data.get("data", {}) or {}).get("activeTargets", []) or []:
instance = ((t.get("labels") or {}).get("instance") or "")
ip = instance.split(":")[0] if instance else ""
if ip:
ips.add(ip)
return sorted(ips)
except Exception as e:
logger.warning("prometheus_targets_fetch_failed", error=str(e))
return []
# ============================================================================
# auto_alerting: alert_rule_catalog labels match
# ============================================================================
async def _evaluate_alerting(run_id: str) -> int:
"""
每個 host/k8s_workload asset:
- 看 alert_rule_catalog.labels.host 是否 match asset.host → green
- 或 alert_rule_catalog.labels.namespace match asset.namespace → green
- 無任何 match → red
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN EXISTS (
SELECT 1 FROM alert_rule_catalog arc
WHERE (arc.labels ? 'host' AND arc.labels->>'host' = ai.host)
OR (arc.labels ? 'namespace' AND arc.labels->>'namespace' = ai.namespace)
OR (arc.labels ? 'layer' AND arc.labels->>'layer' LIKE '%' || COALESCE(ai.host, '') || '%')
) THEN 'green'
ELSE 'red'
END,
evidence = jsonb_build_object(
'source', 'alert_rule_catalog_label_match',
'asset_host', ai.host,
'asset_namespace', ai.namespace
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_alerting'
AND ai.asset_type IN ('host', 'k8s_workload', 'container')
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_alerting_failed", error=str(e))
return 0
# ============================================================================
# auto_km_creation: knowledge_entries 覆蓋
# ============================================================================
async def _evaluate_km_coverage(run_id: str) -> int:
"""
asset 有對應 knowledge_entries → green
2026-04-19 ogt + Claude Opus 4.7 v2 bug fix: knowledge_entries 欄位是 'content',
不是 'body' (前次 UndefinedColumnError). 同時加 title 匹配擴大覆蓋.
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN ai.asset_type = 'k8s_workload' AND EXISTS (
SELECT 1 FROM knowledge_entries ke
WHERE ke.content ILIKE '%' || ai.name || '%'
OR ke.title ILIKE '%' || ai.name || '%'
) THEN 'green'
WHEN ai.asset_type = 'k8s_workload' THEN 'yellow'
ELSE cs.coverage_status
END,
evidence = jsonb_build_object(
'source', 'knowledge_entries_content_or_title_match',
'asset_name', ai.name
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_km_creation'
AND ai.asset_type = 'k8s_workload'
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_km_coverage_failed", error=str(e))
return 0
# ============================================================================
# v2 新增 4 維 evaluator
# ============================================================================
async def _evaluate_playbook_coverage(run_id: str) -> int:
"""
auto_playbook: k8s_workload asset 在 playbooks.symptom_pattern (JSON) 或 description 出現 → green
沒對應 playbook 但 type 合理 → yellow; 否則保持 unknown
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN ai.asset_type = 'k8s_workload' AND EXISTS (
SELECT 1 FROM playbooks pb
WHERE pb.status = 'approved'
AND (pb.description ILIKE '%' || ai.name || '%'
OR pb.symptom_pattern::text ILIKE '%' || ai.name || '%')
) THEN 'green'
WHEN ai.asset_type = 'k8s_workload' THEN 'yellow'
ELSE cs.coverage_status
END,
evidence = jsonb_build_object(
'source', 'playbooks_symptom_pattern_or_description_match',
'asset_name', ai.name
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_playbook'
AND ai.asset_type = 'k8s_workload'
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_playbook_coverage_failed", error=str(e))
return 0
async def _evaluate_remediation_coverage(run_id: str) -> int:
"""
auto_remediation: 過去 30d remediation_events.target_resource 包含 asset.name → green
沒 target 匹配但 asset 是 k8s_workload/container → red (應有修復能力但沒)
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN ai.asset_type IN ('k8s_workload', 'container') AND EXISTS (
SELECT 1 FROM remediation_events re
WHERE re.target_resource ILIKE '%' || ai.name || '%'
AND re.created_at > NOW() - INTERVAL '30 days'
) THEN 'green'
WHEN ai.asset_type IN ('k8s_workload', 'container') THEN 'red'
ELSE cs.coverage_status
END,
evidence = jsonb_build_object(
'source', 'remediation_events_target_match_30d',
'asset_name', ai.name
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_remediation'
AND ai.asset_type IN ('k8s_workload', 'container')
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_remediation_coverage_failed", error=str(e))
return 0
async def _evaluate_rule_matching_coverage(run_id: str) -> int:
"""
auto_rule_matching: 過去 30d incidents 有觸發過關聯到該 asset → green
關聯: incident.alertname match alert_rule_catalog + labels.namespace/host 對應 asset
或 incident.affected_services ILIKE asset.name
沒觸發 → yellow (可能沒問題也可能沒覆蓋,中性)
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN EXISTS (
SELECT 1 FROM incidents i
WHERE i.created_at > NOW() - INTERVAL '30 days'
AND (i.affected_services::text ILIKE '%' || ai.name || '%'
OR (i.alertname IS NOT NULL AND EXISTS (
SELECT 1 FROM alert_rule_catalog arc
WHERE arc.rule_name = i.alertname
AND (arc.labels->>'host' = ai.host
OR arc.labels->>'namespace' = ai.namespace)
)))
) THEN 'green'
WHEN ai.asset_type IN ('host','k8s_workload','container') THEN 'yellow'
ELSE cs.coverage_status
END,
evidence = jsonb_build_object(
'source', 'incidents_match_30d',
'asset_name', ai.name
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_rule_matching'
AND ai.asset_type IN ('host', 'k8s_workload', 'container')
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_rule_matching_coverage_failed", error=str(e))
return 0
async def _evaluate_rule_creation_coverage(run_id: str) -> int:
"""
auto_rule_creation: asset 是否有被 AI-generated rule 覆蓋
current: 所有 rule source='yaml_hardcoded',沒 AI-generated → 全 red (表示尚未由 AI 主動建規則)
未來 Hermes 建出 AI rule 後會變 green
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN EXISTS (
SELECT 1 FROM alert_rule_catalog arc
WHERE arc.source = 'ai_generated'
AND (arc.labels->>'host' = ai.host
OR arc.labels->>'namespace' = ai.namespace)
) THEN 'green'
WHEN ai.asset_type IN ('host','k8s_workload','container') THEN 'red'
ELSE cs.coverage_status
END,
evidence = jsonb_build_object(
'source', 'alert_rule_catalog_ai_generated_match',
'asset_name', ai.name,
'note', 'AI 自主建規則尚未啟用,後續 Hermes 產出後此欄變 green'
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_rule_creation'
AND ai.asset_type IN ('host', 'k8s_workload', 'container')
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_rule_creation_coverage_failed", error=str(e))
return 0
# ============================================================================
# AOL
# ============================================================================
async def _log_aol(stats: dict[str, int], duration_ms: int, error: str | None) -> None:
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
aol_status = "failed" if error else "success"
async with get_db_context() as db:
await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, duration_ms, error, tags
) VALUES (
'coverage_recalculated',
'coverage_evaluator',
:st,
'{}'::jsonb,
CAST(:output AS jsonb),
:dur, :err, :tags
)
"""),
{
"st": aol_status,
"output": _json.dumps(stats, ensure_ascii=False),
"dur": duration_ms,
"err": (error or "")[:2000] if error else None,
"tags": ["coverage_evaluator"],
},
)
except Exception as e:
logger.warning("coverage_evaluator_aol_failed", error=str(e))