Files
awoooi/apps/api/src/jobs/coverage_evaluator_job.py
Your Name ee2cc2bfc3
Some checks failed
CD Pipeline / tests (push) Failing after 1m23s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
Code Review / ai-code-review (push) Successful in 15s
fix(alerts): 收斂 Telegram 告警到 SRE 戰情室
2026-06-12 11:06:16 +08:00

932 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Coverage Evaluator Job — ADR-090 § 覆蓋率評估
==============================================
把 asset_coverage_snapshot 從 'unknown' 升級為真實 green/yellow/red.
職責邊界 (MVP):
✅ auto_monitoring: 查 Prometheus /api/v1/targets 看 asset 是否有 scrape target
✅ auto_alerting: asset 的 host/namespace 是否 match alert_rule_catalog.labels
✅ auto_km_creation: asset_type 是否有對應 knowledge_entries (粗略)
⏳ TODO: auto_rule_matching (需 alert history 統計)
⏳ TODO: auto_playbook / auto_remediation / auto_rule_creation (需 playbook 表)
設計鐵律:
- 只 UPDATE 最新 run 的 coverage_snapshot (不創新 row)
- evidence JSONB 記錄 「為什麼 green/red」的證據
- 失敗 → log + 跳過該 dim,不 crash 整個 evaluator
排程:
- 首次延遲 300s (asset_scanner+rule_catalog 完成後)
- 每 1h 跑一次
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
ADR-090 § Phase 7 Coverage Evaluator
"""
from __future__ import annotations
import asyncio
import json as _json
import time as _time
from typing import Any
import httpx
import structlog
from src.core.config import settings
logger = structlog.get_logger(__name__)
# ============================================================================
# 排程
# ============================================================================
_EVAL_INTERVAL_SEC = 3600
_FIRST_DELAY_SEC = 300
_HTTP_TIMEOUT_SEC = 10
_LOOP_BACKOFF_SEC = 600
# ============================================================================
# Public entry
# ============================================================================
async def run_coverage_evaluator_loop() -> None:
"""每 1h 把最新 run 的 coverage_snapshot 從 unknown 升級成真實 status."""
logger.info("coverage_evaluator_loop_started", interval_sec=_EVAL_INTERVAL_SEC)
await asyncio.sleep(_FIRST_DELAY_SEC)
while True:
try:
await evaluate_once()
except Exception as e:
logger.exception("coverage_evaluator_loop_error", error=str(e))
await asyncio.sleep(_LOOP_BACKOFF_SEC)
continue
await asyncio.sleep(_EVAL_INTERVAL_SEC)
async def evaluate_once() -> dict[str, int]:
"""針對最新 asset_discovery_run 的 coverage_snapshot 升級 status.
2026-04-19 v2 擴充 4 維 (原 3 維 monitoring/alerting/km):
+ auto_playbook: asset.name 出現在 playbooks.symptom_pattern 或 description
+ auto_remediation: remediation_events 過去 30d 有 target match asset.name
+ auto_rule_matching: incidents 過去 30d 有 asset match (alertname+affected_services)
+ auto_rule_creation: alert_rule_catalog source='ai_generated' 覆蓋 asset
2026-04-19 P0 修: 加 hourly_lock 避免多 Pod 重複推 + LLM 分析.
"""
from src.services.ai_advisory_helpers import try_acquire_hourly_lock
if not await try_acquire_hourly_lock("coverage_evaluator"):
logger.info("coverage_evaluate_skipped_not_leader")
return {"skipped": "not_leader"}
started_ms = _time.time()
stats = {
"monitoring_updated": 0, "alerting_updated": 0, "km_updated": 0,
"playbook_updated": 0, "remediation_updated": 0,
"rule_matching_updated": 0, "rule_creation_updated": 0,
"rules_auto_created": 0,
}
error_msg: str | None = None
try:
run_id = await _get_latest_run_id()
if not run_id:
logger.info("coverage_evaluator_no_run_yet")
return stats
# 原 3 維
stats["monitoring_updated"] = await _evaluate_monitoring(run_id)
stats["alerting_updated"] = await _evaluate_alerting(run_id)
stats["km_updated"] = await _evaluate_km_coverage(run_id)
# v2 新增 4 維
stats["playbook_updated"] = await _evaluate_playbook_coverage(run_id)
stats["remediation_updated"] = await _evaluate_remediation_coverage(run_id)
stats["rule_matching_updated"] = await _evaluate_rule_matching_coverage(run_id)
stats["rule_creation_updated"] = await _evaluate_rule_creation_coverage(run_id)
except Exception as e:
error_msg = f"{type(e).__name__}: {e}"[:1000]
logger.exception("coverage_evaluate_once_failed", error=error_msg)
duration_ms = int((_time.time() - started_ms) * 1000)
# Gap 3.3 LLM 升級: 分析 red 分布產補覆蓋建議
# 2026-04-19 P1.3 閾值調整 (架構師 review): 從「total_red >= 20」改雙條件
# - 紅佔比 > 30%: 實質有治理缺口
# - 且總 asset_scan >= 50: 樣本量足夠
# 避免 bootstrap 首次 scan 必觸發 LLM 浪費 token.
red_summary = await _fetch_red_summary()
llm_analysis: dict[str, Any] | None = None
if red_summary:
total_red = red_summary.get("total_red", 0)
total_scanned = red_summary.get("total_scanned", 0)
red_ratio = (total_red / total_scanned) if total_scanned > 0 else 0.0
if red_ratio > 0.3 and total_scanned >= 50:
llm_analysis = await _llm_analyze_coverage_gaps(red_summary)
if llm_analysis:
stats["llm_analyzed"] = True
await _send_telegram_gaps(red_summary, llm_analysis)
# 2026-05-04 ogt + Claude Sonnet 4.6: Coverage Gap → AI 規則自動生成執行器
# 對 auto_alerting=red 的 asset 自動生成 alert_rule_catalog 記錄
# COVERAGE_AUTO_RULE_ENABLED flag 控制(預設啟用)
if getattr(settings, "COVERAGE_AUTO_RULE_ENABLED", True):
created = await _auto_create_rules_for_uncovered_assets(run_id)
stats["rules_auto_created"] = created
await _log_aol(stats, duration_ms, error_msg)
logger.info(
"coverage_evaluate_once_done",
monitoring=stats["monitoring_updated"],
alerting=stats["alerting_updated"],
km=stats["km_updated"],
playbook=stats["playbook_updated"],
remediation=stats["remediation_updated"],
rule_matching=stats["rule_matching_updated"],
rule_creation=stats["rule_creation_updated"],
rules_auto_created=stats.get("rules_auto_created", 0),
llm_analyzed=bool(llm_analysis),
duration_ms=duration_ms,
)
return stats
# ============================================================================
# Gap 3.3 LLM 升級 — 覆蓋率缺口分析 + 補覆蓋建議
# ============================================================================
async def _fetch_red_summary() -> dict[str, Any] | None:
"""撈最新 run 的 red 分佈 + top red asset type.
2026-04-19 P1.3: 加 total_scanned 供呼叫端算 red_ratio 做雙條件觸發.
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
# 總覽: 每維度 red count
dim_rows = await db.execute(_sql("""
SELECT dimension, count(*) AS cnt
FROM asset_coverage_snapshot
WHERE run_id = (
SELECT run_id FROM asset_discovery_run
WHERE status='success' ORDER BY ended_at DESC LIMIT 1
)
AND coverage_status = 'red'
GROUP BY dimension
ORDER BY cnt DESC
"""))
by_dim = [{"dimension": r.dimension, "red_count": int(r.cnt)} for r in dim_rows.fetchall()]
total_red = sum(d["red_count"] for d in by_dim)
if total_red == 0:
return None
# 總 snapshot 數 (for red_ratio 計算)
total_row = await db.execute(_sql("""
SELECT count(*) AS total
FROM asset_coverage_snapshot
WHERE run_id = (
SELECT run_id FROM asset_discovery_run
WHERE status='success' ORDER BY ended_at DESC LIMIT 1
)
"""))
total_scanned = int(total_row.scalar() or 0)
# Top red asset: 哪些 asset 被標最多 red
asset_rows = await db.execute(_sql("""
SELECT ai.asset_key, ai.asset_type, count(*) AS red_dims
FROM asset_coverage_snapshot cs
JOIN asset_inventory ai ON cs.asset_id = ai.asset_id
WHERE cs.run_id = (
SELECT run_id FROM asset_discovery_run
WHERE status='success' ORDER BY ended_at DESC LIMIT 1
)
AND cs.coverage_status = 'red'
GROUP BY ai.asset_key, ai.asset_type
ORDER BY red_dims DESC
LIMIT 10
"""))
top_assets = [
{"asset_key": r.asset_key, "asset_type": r.asset_type, "red_dims": int(r.red_dims)}
for r in asset_rows.fetchall()
]
return {
"total_red": total_red,
"total_scanned": total_scanned,
"by_dimension": by_dim,
"top_red_assets": top_assets,
}
except Exception as e:
logger.warning("fetch_red_summary_failed", error=str(e))
return None
_LLM_COVERAGE_PROMPT = """你是 AWOOOI 可觀察性覆蓋率專家。以下是最新 asset 覆蓋率掃描的 red 缺口,請分析並提出補覆蓋優先順序.
## red 缺口分布
各維度 red 數: {by_dim_json}
總 red count: {total_red}
## 最多 red 的 asset (top 10)
{top_assets_json}
## 7 維自動化意義
- auto_monitoring: 有無 Prometheus scrape
- auto_alerting: 有無 alert rule 覆蓋
- auto_rule_creation: 有無 AI 產生的規則
- auto_rule_matching: 過去 30d 是否被 alert 匹配
- auto_playbook: 有無 playbook
- auto_remediation: 過去 30d 有無 remediation
- auto_km_creation: 有無 knowledge_entries
## 輸出規格 (純 JSON)
{{
"worst_dimension": "哪個維度最該優先補",
"root_cause": "red 集中的真因 (繁中)",
"top_remediation_actions": [
{{"priority": 1, "target": "asset_key 或類型", "action": "具體動作", "effort": "low|medium|high"}}
],
"estimated_weeks_to_close": 1-52,
"confidence": 0.0-1.0
}}
"""
async def _llm_analyze_coverage_gaps(red_summary: dict[str, Any]) -> dict[str, Any] | None:
"""LLM 分析 coverage 缺口. 失敗回 None.
2026-04-19 P1.2 重構: 改用 llm_json_parser.parse_llm_json_response.
"""
try:
import json as _j
from src.services.llm_json_parser import parse_llm_json_response
from src.services.openclaw import get_openclaw
prompt = _LLM_COVERAGE_PROMPT.format(
by_dim_json=_j.dumps(red_summary.get("by_dimension", []), ensure_ascii=False),
total_red=red_summary.get("total_red", 0),
top_assets_json=_j.dumps(red_summary.get("top_red_assets", []), ensure_ascii=False, indent=2),
)
openclaw = get_openclaw()
text, provider, success = await openclaw.call(prompt)
if not success or not text:
return None
parsed = parse_llm_json_response(
text, required_key="worst_dimension", logger_context="coverage",
)
if parsed:
parsed["_llm_provider"] = provider
return parsed
except Exception as e:
logger.warning("coverage_llm_error", error=str(e))
return None
async def _send_telegram_gaps(
red_summary: dict[str, Any],
analysis: dict[str, Any],
) -> None:
"""推 coverage 缺口 Telegram 摘要 + 互動按鈕 (P0 修)."""
try:
import html
from src.core.config import settings
from src.services.ai_advisory_helpers import build_ai_advisory_keyboard, is_snoozed
from src.services.telegram_gateway import get_telegram_gateway
target_chat_id = settings.SRE_GROUP_CHAT_ID
if not target_chat_id:
return
# Snooze check: 以 worst_dimension 為 key
worst_dim = str(analysis.get("worst_dimension", "unknown"))
if await is_snoozed("coverage_gap", worst_dim):
logger.info("coverage_gap_snoozed", dim=worst_dim)
return
worst = html.escape(str(analysis.get("worst_dimension", "")))
cause = html.escape(str(analysis.get("root_cause", ""))[:200])
weeks = analysis.get("estimated_weeks_to_close", "?")
conf = analysis.get("confidence", 0.0)
lines = [
"📉 <b>Coverage 缺口分析 (AI 升級)</b>",
f"總 red: <b>{red_summary.get('total_red', 0)}</b> | 最嚴重維度: <code>{worst}</code>",
f"預計補齊週數: {weeks}w | AI 信心: {conf:.0%}",
"",
f"🔍 真因: {cause}",
"",
"<b>Top Remediation Priorities</b>:",
]
for act in (analysis.get("top_remediation_actions") or [])[:3]:
pri = act.get("priority", "?")
target = html.escape(str(act.get("target", ""))[:60])
action = html.escape(str(act.get("action", ""))[:100])
effort = act.get("effort", "?")
lines.append(f" {pri}. <code>{target}</code> [{effort}]")
lines.append(f"{action}")
lines.append("")
lines.append("決策: 人工評估補覆蓋排程")
msg = "\n".join(lines)
keyboard = build_ai_advisory_keyboard(
advisory_type="coverage_gap",
advisory_id=worst_dim,
include_view=False,
include_produce_cmd=False,
)
tg = get_telegram_gateway()
await tg._send_request("sendMessage", { # type: ignore[attr-defined]
"chat_id": target_chat_id,
"text": msg,
"parse_mode": "HTML",
"disable_web_page_preview": True,
"reply_markup": keyboard,
})
except Exception as e:
logger.warning("coverage_telegram_failed", error=str(e))
# ============================================================================
# 查最新 run_id
# ============================================================================
async def _get_latest_run_id() -> str | None:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
row = await db.execute(
_sql("SELECT run_id FROM asset_discovery_run WHERE status='success' ORDER BY ended_at DESC LIMIT 1"),
)
rid = row.scalar()
return str(rid) if rid else None
except Exception as e:
logger.warning("get_latest_run_id_failed", error=str(e))
return None
# ============================================================================
# auto_monitoring: Prometheus targets
# ============================================================================
async def _evaluate_monitoring(run_id: str) -> int:
"""
Prometheus /api/v1/targets 拿所有 scrape targets 的 instance IP,
然後 UPDATE asset_coverage_snapshot dim='auto_monitoring':
- host asset 的 IP 在 targets 內 → green
- 不在 → red
"""
targets_ips = await _fetch_prometheus_target_ips()
if not targets_ips:
return 0
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
# host asset: 看 metadata.internal_ip 是否在 targets
# 其他 asset type: 留 unknown (Prometheus 不直接 scrape)
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN (ai.metadata->>'internal_ip')::text = ANY(:ips) THEN 'green'
WHEN ai.asset_type = 'host' THEN 'red'
ELSE cs.coverage_status
END,
evidence = CASE
WHEN (ai.metadata->>'internal_ip')::text = ANY(:ips)
THEN jsonb_build_object(
'source', 'prometheus_targets',
'matched_ip', ai.metadata->>'internal_ip'
)
WHEN ai.asset_type = 'host'
THEN jsonb_build_object(
'source', 'prometheus_targets',
'reason', 'host IP not in scrape targets'
)
ELSE cs.evidence
END
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_monitoring'
AND ai.asset_type = 'host'
"""),
{"rid": run_id, "ips": targets_ips},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_monitoring_failed", error=str(e))
return 0
async def _fetch_prometheus_target_ips() -> list[str]:
"""GET Prometheus /api/v1/targets 回傳 scrape target IPs."""
url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/targets"
try:
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client:
resp = await client.get(url, params={"state": "active"})
resp.raise_for_status()
data = resp.json()
ips: set[str] = set()
for t in (data.get("data", {}) or {}).get("activeTargets", []) or []:
instance = ((t.get("labels") or {}).get("instance") or "")
ip = instance.split(":")[0] if instance else ""
if ip:
ips.add(ip)
return sorted(ips)
except Exception as e:
logger.warning("prometheus_targets_fetch_failed", error=str(e))
return []
# ============================================================================
# auto_alerting: alert_rule_catalog labels match
# ============================================================================
async def _evaluate_alerting(run_id: str) -> int:
"""
每個 host/k8s_workload asset:
- 看 alert_rule_catalog.labels.host 是否 match asset.host → green
- 或 alert_rule_catalog.labels.namespace match asset.namespace → green
- 無任何 match → red
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN EXISTS (
SELECT 1 FROM alert_rule_catalog arc
WHERE (arc.labels ? 'host' AND arc.labels->>'host' = ai.host)
OR (arc.labels ? 'namespace' AND arc.labels->>'namespace' = ai.namespace)
OR (arc.labels ? 'layer' AND arc.labels->>'layer' LIKE '%' || COALESCE(ai.host, '') || '%')
) THEN 'green'
ELSE 'red'
END,
evidence = jsonb_build_object(
'source', 'alert_rule_catalog_label_match',
'asset_host', ai.host,
'asset_namespace', ai.namespace
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_alerting'
AND ai.asset_type IN ('host', 'k8s_workload', 'container')
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_alerting_failed", error=str(e))
return 0
# ============================================================================
# auto_km_creation: knowledge_entries 覆蓋
# ============================================================================
async def _evaluate_km_coverage(run_id: str) -> int:
"""
asset 有對應 knowledge_entries → green
2026-04-19 ogt + Claude Opus 4.7 v2 bug fix: knowledge_entries 欄位是 'content',
不是 'body' (前次 UndefinedColumnError). 同時加 title 匹配擴大覆蓋.
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN ai.asset_type = 'k8s_workload' AND EXISTS (
SELECT 1 FROM knowledge_entries ke
WHERE ke.content ILIKE '%' || ai.name || '%'
OR ke.title ILIKE '%' || ai.name || '%'
) THEN 'green'
WHEN ai.asset_type = 'k8s_workload' THEN 'yellow'
ELSE cs.coverage_status
END,
evidence = jsonb_build_object(
'source', 'knowledge_entries_content_or_title_match',
'asset_name', ai.name
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_km_creation'
AND ai.asset_type = 'k8s_workload'
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_km_coverage_failed", error=str(e))
return 0
# ============================================================================
# v2 新增 4 維 evaluator
# ============================================================================
async def _evaluate_playbook_coverage(run_id: str) -> int:
"""
auto_playbook: k8s_workload asset 在 playbooks.symptom_pattern (JSON) 或 description 出現 → green
沒對應 playbook 但 type 合理 → yellow; 否則保持 unknown
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN ai.asset_type = 'k8s_workload' AND EXISTS (
SELECT 1 FROM playbooks pb
WHERE pb.status = 'approved'
AND (pb.description ILIKE '%' || ai.name || '%'
OR pb.symptom_pattern::text ILIKE '%' || ai.name || '%')
) THEN 'green'
WHEN ai.asset_type = 'k8s_workload' THEN 'yellow'
ELSE cs.coverage_status
END,
evidence = jsonb_build_object(
'source', 'playbooks_symptom_pattern_or_description_match',
'asset_name', ai.name
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_playbook'
AND ai.asset_type = 'k8s_workload'
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_playbook_coverage_failed", error=str(e))
return 0
async def _evaluate_remediation_coverage(run_id: str) -> int:
"""
auto_remediation: 過去 30d remediation_events.target_resource 包含 asset.name → green
沒 target 匹配但 asset 是 k8s_workload/container → red (應有修復能力但沒)
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN ai.asset_type IN ('k8s_workload', 'container') AND EXISTS (
SELECT 1 FROM remediation_events re
WHERE re.target_resource ILIKE '%' || ai.name || '%'
AND re.created_at > NOW() - INTERVAL '30 days'
) THEN 'green'
WHEN ai.asset_type IN ('k8s_workload', 'container') THEN 'red'
ELSE cs.coverage_status
END,
evidence = jsonb_build_object(
'source', 'remediation_events_target_match_30d',
'asset_name', ai.name
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_remediation'
AND ai.asset_type IN ('k8s_workload', 'container')
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_remediation_coverage_failed", error=str(e))
return 0
async def _evaluate_rule_matching_coverage(run_id: str) -> int:
"""
auto_rule_matching: 過去 30d incidents 有觸發過關聯到該 asset → green
關聯: incident.alertname match alert_rule_catalog + labels.namespace/host 對應 asset
或 incident.affected_services ILIKE asset.name
沒觸發 → yellow (可能沒問題也可能沒覆蓋,中性)
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN EXISTS (
SELECT 1 FROM incidents i
WHERE i.created_at > NOW() - INTERVAL '30 days'
AND (i.affected_services::text ILIKE '%' || ai.name || '%'
OR (i.alertname IS NOT NULL AND EXISTS (
SELECT 1 FROM alert_rule_catalog arc
WHERE arc.rule_name = i.alertname
AND (arc.labels->>'host' = ai.host
OR arc.labels->>'namespace' = ai.namespace)
)))
) THEN 'green'
WHEN ai.asset_type IN ('host','k8s_workload','container') THEN 'yellow'
ELSE cs.coverage_status
END,
evidence = jsonb_build_object(
'source', 'incidents_match_30d',
'asset_name', ai.name
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_rule_matching'
AND ai.asset_type IN ('host', 'k8s_workload', 'container')
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_rule_matching_coverage_failed", error=str(e))
return 0
async def _evaluate_rule_creation_coverage(run_id: str) -> int:
"""
auto_rule_creation: asset 是否有被 AI-generated rule 覆蓋
current: 所有 rule source='yaml_hardcoded',沒 AI-generated → 全 red (表示尚未由 AI 主動建規則)
未來 Hermes 建出 AI rule 後會變 green
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
result = await db.execute(
_sql("""
UPDATE asset_coverage_snapshot cs
SET coverage_status = CASE
WHEN EXISTS (
SELECT 1 FROM alert_rule_catalog arc
WHERE arc.source = 'ai_generated'
AND (arc.labels->>'host' = ai.host
OR arc.labels->>'namespace' = ai.namespace)
) THEN 'green'
WHEN ai.asset_type IN ('host','k8s_workload','container') THEN 'red'
ELSE cs.coverage_status
END,
evidence = jsonb_build_object(
'source', 'alert_rule_catalog_ai_generated_match',
'asset_name', ai.name,
'note', 'AI 自主建規則尚未啟用,後續 Hermes 產出後此欄變 green'
)
FROM asset_inventory ai
WHERE cs.asset_id = ai.asset_id
AND cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_rule_creation'
AND ai.asset_type IN ('host', 'k8s_workload', 'container')
"""),
{"rid": run_id},
)
return result.rowcount or 0
except Exception as e:
logger.warning("evaluate_rule_creation_coverage_failed", error=str(e))
return 0
# ============================================================================
# AOL
# ============================================================================
async def _log_aol(stats: dict[str, int], duration_ms: int, error: str | None) -> None:
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
aol_status = "failed" if error else "success"
async with get_db_context() as db:
await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, duration_ms, error, tags
) VALUES (
'coverage_recalculated',
'coverage_evaluator',
:st,
'{}'::jsonb,
CAST(:output AS jsonb),
:dur, :err, :tags
)
"""),
{
"st": aol_status,
"output": _json.dumps(stats, ensure_ascii=False),
"dur": duration_ms,
"err": (error or "")[:2000] if error else None,
"tags": ["coverage_evaluator"],
},
)
except Exception as e:
logger.warning("coverage_evaluator_aol_failed", error=str(e))
# ============================================================================
# 2026-05-04 ogt + Claude Sonnet 4.6: Coverage Gap → AI 規則自動生成執行器
# ============================================================================
_COVERAGE_RULE_COOLDOWN_SEC = 86400 # 每個 asset 24h 冷卻,避免重複建規則
async def _auto_create_rules_for_uncovered_assets(run_id: str | None) -> int:
"""
對 auto_alerting=red 的 top 3 asset 自動生成 alert_rule_catalog 記錄。
流程:
1. 查最新 run 中 auto_alerting=red 的 host/k8s_workload最多 5 筆)
2. 每個 asset 用 Redis 24h 冷卻防重複
3. 依 asset_type 建立範本化 PromQL rule
4. UPSERT 進 alert_rule_catalogsource='ai_generated', review_status='pending_review'
5. 回傳成功建立數量
設計鐵律:
- 只建 pending_review不自動 approve
- rule_name UNIQUE 鍵CoverageAuto_{type}_{safe_key}
- Redis 不可用時跳過冷卻檢查(不中斷主流程)
"""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
import json as _j
import re
if not run_id:
return 0
created = 0
try:
async with get_db_context() as db:
# 查 auto_alerting=red 的 host 和 k8s_workload asset最多 5 筆)
rows = await db.execute(
_sql("""
SELECT ai.asset_id, ai.asset_key, ai.asset_type,
ai.name, ai.host, ai.namespace,
ai.metadata->>'internal_ip' AS internal_ip
FROM asset_coverage_snapshot cs
JOIN asset_inventory ai ON cs.asset_id = ai.asset_id
WHERE cs.run_id = CAST(:rid AS uuid)
AND cs.dimension = 'auto_alerting'
AND cs.coverage_status = 'red'
AND ai.asset_type IN ('host', 'k8s_workload')
ORDER BY ai.asset_type, ai.asset_key
LIMIT 5
"""),
{"rid": run_id},
)
assets = rows.fetchall()
# PromQL 值安全性:只允許合法 hostname/IP/k8s name 字元,防止 PromQL 語意污染
_safe_label_val = re.compile(r'^[a-zA-Z0-9._\-]+$')
for asset in assets:
asset_key = str(asset.asset_key or "")
asset_type = str(asset.asset_type or "")
name = str(asset.name or "")
host = str(asset.host or "")
namespace = str(asset.namespace or "")
internal_ip = str(asset.internal_ip or "")
# Redis 24h 冷卻
cooldown_key = f"coverage_rule_created:{asset_key}"
try:
from src.core.redis_client import get_redis
redis = get_redis()
already = await redis.get(cooldown_key)
if already:
logger.debug("coverage_auto_rule_cooldown", asset_key=asset_key)
continue
except RuntimeError as e:
logger.warning("coverage_auto_rule_redis_unavailable", asset_key=asset_key, error=str(e))
except Exception:
pass
# 建立 PromQL 規則(所有代入值必須通過白名單驗證)
safe_key = re.sub(r"[^a-zA-Z0-9]", "_", asset_key)[:60]
if asset_type == "host":
ip_for_match = internal_ip or host
if not ip_for_match or not _safe_label_val.match(ip_for_match):
logger.debug("coverage_auto_rule_skip_unsafe_ip", asset_key=asset_key, ip=ip_for_match)
continue
rule_name = f"CoverageAuto_HostDown_{safe_key}"
expr = f'up{{instance=~"{ip_for_match}:.*"}} == 0'
severity = "warning"
display_host = host if _safe_label_val.match(host) else ip_for_match
labels = {"host": display_host, "layer": "infrastructure", "source": "coverage_auto"}
annotations = {
"summary": f"主機 {display_host} 無 Prometheus 探測響應",
"description": f"Coverage 缺口自動建規則 — asset_key={asset_key},請 SRE 複核 expr 後 approve",
}
duration_seconds = 120
elif asset_type == "k8s_workload":
if not name or not _safe_label_val.match(name):
logger.debug("coverage_auto_rule_skip_unsafe_name", asset_key=asset_key, name=name)
continue
if namespace and not _safe_label_val.match(namespace):
logger.debug("coverage_auto_rule_skip_unsafe_ns", asset_key=asset_key, namespace=namespace)
continue
rule_name = f"CoverageAuto_WorkloadDown_{safe_key}"
ns_selector = f',namespace="{namespace}"' if namespace else ""
expr = f'kube_deployment_status_replicas_available{{deployment="{name}"{ns_selector}}} == 0'
severity = "warning"
labels = {"namespace": namespace or "default", "deployment": name, "source": "coverage_auto"}
annotations = {
"summary": f"{name}{namespace or 'default'} 無可用副本",
"description": f"Coverage 缺口自動建規則 — asset_key={asset_key},請 SRE 複核 expr 後 approve",
}
duration_seconds = 180
else:
continue
# UPSERT 進 alert_rule_catalogsource='ai_generated'
# 用 RETURNING 判斷是否實際插入ON CONFLICT DO NOTHING 衝突時無 RETURNING row
try:
async with get_db_context() as db:
row = await db.execute(
_sql("""
INSERT INTO alert_rule_catalog (
rule_name, source, expr, duration_seconds,
severity, labels, annotations,
created_by_agent, review_status,
created_at, updated_at
) VALUES (
:rname, 'ai_generated', :expr, :dur,
:sev, CAST(:labels AS jsonb), CAST(:ann AS jsonb),
'coverage_evaluator', 'pending_review',
NOW(), NOW()
)
ON CONFLICT (rule_name) DO NOTHING
RETURNING rule_name
"""),
{
"rname": rule_name[:200],
"expr": expr[:4000],
"dur": duration_seconds,
"sev": severity,
"labels": _j.dumps(labels, ensure_ascii=False),
"ann": _j.dumps(annotations, ensure_ascii=False),
},
)
actually_inserted = row.fetchone() is not None
if actually_inserted:
created += 1
logger.info(
"coverage_auto_rule_created",
rule_name=rule_name,
asset_key=asset_key,
asset_type=asset_type,
)
# 設置 Redis 冷卻(僅實際插入才設)
try:
from src.core.redis_client import get_redis
redis = get_redis()
await redis.set(cooldown_key, "1", ex=_COVERAGE_RULE_COOLDOWN_SEC)
except Exception:
pass
else:
logger.debug("coverage_auto_rule_conflict_skip", rule_name=rule_name)
except Exception as e:
logger.warning("coverage_auto_rule_upsert_failed", asset_key=asset_key, error=str(e))
except Exception as e:
logger.warning("coverage_auto_create_rules_failed", error=str(e))
if created > 0:
logger.info("coverage_auto_rules_summary", created=created)
return created