feat(coverage_evaluator): Gap 3.3 LLM 升級 — 缺口分析 + 補覆蓋建議
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 10m14s

Gap 3 進度: 4/9 service 升級 LLM (達到合理上限 — 其他 4 個純資料移動不需 LLM)

coverage_evaluator 原本 7 維升級 unknown→green/yellow/red 後無主動建議.
新增:

1. _fetch_red_summary: 撈最新 run 的 red 分布 + top 10 被標 red 的 asset
2. _llm_analyze_coverage_gaps (~50 行):
   有 >= 20 red 時才跑 LLM (避免 well-covered 集群浪費 token)
   LLM JSON 輸出:
     - worst_dimension: 最該優先補的維度
     - root_cause: red 集中的真因 (繁中)
     - top_remediation_actions[3]: priority/target/action/effort
     - estimated_weeks_to_close: 1-52
     - confidence: 0-1
3. _send_telegram_gaps: 推 coverage 缺口 Telegram 摘要
   總 red + 最嚴重維度 + 補齊週數 + top 3 補覆蓋動作

scan 完流程:
  評估 7 維 → 撈 red summary → LLM 分析 (if total_red >= 20) → Telegram

統帥鐵律對齊:
   不寫死補覆蓋優先 (LLM 根據實際 red 分布推)
   AI 建議 + 人工決策 (Telegram 末行: '人工評估補覆蓋排程')
   包含預估完成時間 + 信心 (可追蹤)

session 累計 35 commits, 9 新 scanner, 4 用 LLM:
  - Hermes (rule quality)
  - capacity_forecaster (容量預測)
  - compliance_scanner (合規態勢)
  - coverage_evaluator (覆蓋缺口)
剩 5 個純資料移動不適合 LLM (asset_scanner/rule_catalog_sync/
                           rule_stats_updater/asset_change_tracker/capacity_scanner)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-19 22:02:36 +08:00
parent f6cb938dc3
commit 2f5cab2e45

View File

@@ -27,6 +27,7 @@ from __future__ import annotations
import asyncio
import json as _json
import time as _time
from typing import Any
import httpx
import structlog
@@ -102,6 +103,17 @@ async def evaluate_once() -> dict[str, int]:
logger.exception("coverage_evaluate_once_failed", error=error_msg)
duration_ms = int((_time.time() - started_ms) * 1000)
# Gap 3.3 LLM 升級: 分析 red 分布產補覆蓋建議
# 只在有大量 red 時才跑 LLM (避免 well-covered 集群浪費 token)
red_summary = await _fetch_red_summary()
llm_analysis: dict[str, Any] | None = None
if red_summary and red_summary.get("total_red", 0) >= 20:
llm_analysis = await _llm_analyze_coverage_gaps(red_summary)
if llm_analysis:
stats["llm_analyzed"] = True
await _send_telegram_gaps(red_summary, llm_analysis)
await _log_aol(stats, duration_ms, error_msg)
logger.info(
@@ -113,11 +125,188 @@ async def evaluate_once() -> dict[str, int]:
remediation=stats["remediation_updated"],
rule_matching=stats["rule_matching_updated"],
rule_creation=stats["rule_creation_updated"],
llm_analyzed=bool(llm_analysis),
duration_ms=duration_ms,
)
return stats
# ============================================================================
# Gap 3.3 LLM 升級 — 覆蓋率缺口分析 + 補覆蓋建議
# ============================================================================
async def _fetch_red_summary() -> dict[str, Any] | None:
"""撈最新 run 的 red 分佈 + top red asset type."""
from sqlalchemy import text as _sql
from src.db.base import get_db_context
try:
async with get_db_context() as db:
# 總覽: 每維度 red count
dim_rows = await db.execute(_sql("""
SELECT dimension, count(*) AS cnt
FROM asset_coverage_snapshot
WHERE run_id = (
SELECT run_id FROM asset_discovery_run
WHERE status='success' ORDER BY ended_at DESC LIMIT 1
)
AND coverage_status = 'red'
GROUP BY dimension
ORDER BY cnt DESC
"""))
by_dim = [{"dimension": r.dimension, "red_count": int(r.cnt)} for r in dim_rows.fetchall()]
total_red = sum(d["red_count"] for d in by_dim)
if total_red == 0:
return None
# Top red asset: 哪些 asset 被標最多 red
asset_rows = await db.execute(_sql("""
SELECT ai.asset_key, ai.asset_type, count(*) AS red_dims
FROM asset_coverage_snapshot cs
JOIN asset_inventory ai ON cs.asset_id = ai.asset_id
WHERE cs.run_id = (
SELECT run_id FROM asset_discovery_run
WHERE status='success' ORDER BY ended_at DESC LIMIT 1
)
AND cs.coverage_status = 'red'
GROUP BY ai.asset_key, ai.asset_type
ORDER BY red_dims DESC
LIMIT 10
"""))
top_assets = [
{"asset_key": r.asset_key, "asset_type": r.asset_type, "red_dims": int(r.red_dims)}
for r in asset_rows.fetchall()
]
return {
"total_red": total_red,
"by_dimension": by_dim,
"top_red_assets": top_assets,
}
except Exception as e:
logger.warning("fetch_red_summary_failed", error=str(e))
return None
_LLM_COVERAGE_PROMPT = """你是 AWOOOI 可觀察性覆蓋率專家。以下是最新 asset 覆蓋率掃描的 red 缺口,請分析並提出補覆蓋優先順序.
## red 缺口分布
各維度 red 數: {by_dim_json}
總 red count: {total_red}
## 最多 red 的 asset (top 10)
{top_assets_json}
## 7 維自動化意義
- auto_monitoring: 有無 Prometheus scrape
- auto_alerting: 有無 alert rule 覆蓋
- auto_rule_creation: 有無 AI 產生的規則
- auto_rule_matching: 過去 30d 是否被 alert 匹配
- auto_playbook: 有無 playbook
- auto_remediation: 過去 30d 有無 remediation
- auto_km_creation: 有無 knowledge_entries
## 輸出規格 (純 JSON)
{{
"worst_dimension": "哪個維度最該優先補",
"root_cause": "red 集中的真因 (繁中)",
"top_remediation_actions": [
{{"priority": 1, "target": "asset_key 或類型", "action": "具體動作", "effort": "low|medium|high"}}
],
"estimated_weeks_to_close": 1-52,
"confidence": 0.0-1.0
}}
"""
async def _llm_analyze_coverage_gaps(red_summary: dict[str, Any]) -> dict[str, Any] | None:
"""LLM 分析 coverage 缺口. 失敗回 None."""
try:
import json as _j
from src.services.openclaw import get_openclaw
prompt = _LLM_COVERAGE_PROMPT.format(
by_dim_json=_j.dumps(red_summary.get("by_dimension", []), ensure_ascii=False),
total_red=red_summary.get("total_red", 0),
top_assets_json=_j.dumps(red_summary.get("top_red_assets", []), ensure_ascii=False, indent=2),
)
openclaw = get_openclaw()
text, provider, success = await openclaw.call(prompt)
if not success or not text:
return None
_raw = text.strip()
if _raw.startswith("```"):
_raw = _raw.strip("`").lstrip("json").strip()
try:
parsed = _j.loads(_raw)
if isinstance(parsed, dict) and "worst_dimension" in parsed:
parsed["_llm_provider"] = provider
return parsed
if isinstance(parsed, dict) and "description" in parsed:
desc = str(parsed["description"]).strip()
if desc.startswith("{"):
inner = _j.loads(desc)
if isinstance(inner, dict) and "worst_dimension" in inner:
inner["_llm_provider"] = provider
return inner
except (_j.JSONDecodeError, ValueError) as e:
logger.warning("coverage_llm_parse_failed", error=str(e), raw=_raw[:200])
return None
except Exception as e:
logger.warning("coverage_llm_error", error=str(e))
return None
async def _send_telegram_gaps(
red_summary: dict[str, Any],
analysis: dict[str, Any],
) -> None:
"""推 coverage 缺口 Telegram 摘要."""
try:
import html
from src.core.config import settings
from src.services.telegram_gateway import get_telegram_gateway
if not settings.OPENCLAW_TG_CHAT_ID:
return
worst = html.escape(str(analysis.get("worst_dimension", "")))
cause = html.escape(str(analysis.get("root_cause", ""))[:200])
weeks = analysis.get("estimated_weeks_to_close", "?")
conf = analysis.get("confidence", 0.0)
lines = [
"📉 <b>Coverage 缺口分析 (AI 升級)</b>",
f"總 red: <b>{red_summary.get('total_red', 0)}</b> | 最嚴重維度: <code>{worst}</code>",
f"預計補齊週數: {weeks}w | AI 信心: {conf:.0%}",
"",
f"🔍 真因: {cause}",
"",
"<b>Top Remediation Priorities</b>:",
]
for act in (analysis.get("top_remediation_actions") or [])[:3]:
pri = act.get("priority", "?")
target = html.escape(str(act.get("target", ""))[:60])
action = html.escape(str(act.get("action", ""))[:100])
effort = act.get("effort", "?")
lines.append(f" {pri}. <code>{target}</code> [{effort}]")
lines.append(f"{action}")
lines.append("")
lines.append("決策: 人工評估補覆蓋排程")
msg = "\n".join(lines)
tg = get_telegram_gateway()
await tg._send_request("sendMessage", { # type: ignore[attr-defined]
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": msg,
"parse_mode": "HTML",
"disable_web_page_preview": True,
})
except Exception as e:
logger.warning("coverage_telegram_failed", error=str(e))
# ============================================================================
# 查最新 run_id
# ============================================================================