feat(coverage_evaluator): Gap 3.3 LLM 升級 — 缺口分析 + 補覆蓋建議
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 10m14s
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 10m14s
Gap 3 進度: 4/9 service 升級 LLM (達到合理上限 — 其他 4 個純資料移動不需 LLM)
coverage_evaluator 原本 7 維升級 unknown→green/yellow/red 後無主動建議.
新增:
1. _fetch_red_summary: 撈最新 run 的 red 分布 + top 10 被標 red 的 asset
2. _llm_analyze_coverage_gaps (~50 行):
有 >= 20 red 時才跑 LLM (避免 well-covered 集群浪費 token)
LLM JSON 輸出:
- worst_dimension: 最該優先補的維度
- root_cause: red 集中的真因 (繁中)
- top_remediation_actions[3]: priority/target/action/effort
- estimated_weeks_to_close: 1-52
- confidence: 0-1
3. _send_telegram_gaps: 推 coverage 缺口 Telegram 摘要
總 red + 最嚴重維度 + 補齊週數 + top 3 補覆蓋動作
scan 完流程:
評估 7 維 → 撈 red summary → LLM 分析 (if total_red >= 20) → Telegram
統帥鐵律對齊:
✅ 不寫死補覆蓋優先 (LLM 根據實際 red 分布推)
✅ AI 建議 + 人工決策 (Telegram 末行: '人工評估補覆蓋排程')
✅ 包含預估完成時間 + 信心 (可追蹤)
session 累計 35 commits, 9 新 scanner, 4 用 LLM:
- Hermes (rule quality)
- capacity_forecaster (容量預測)
- compliance_scanner (合規態勢)
- coverage_evaluator (覆蓋缺口)
剩 5 個純資料移動不適合 LLM (asset_scanner/rule_catalog_sync/
rule_stats_updater/asset_change_tracker/capacity_scanner)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -27,6 +27,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import json as _json
|
||||
import time as _time
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import structlog
|
||||
@@ -102,6 +103,17 @@ async def evaluate_once() -> dict[str, int]:
|
||||
logger.exception("coverage_evaluate_once_failed", error=error_msg)
|
||||
|
||||
duration_ms = int((_time.time() - started_ms) * 1000)
|
||||
|
||||
# Gap 3.3 LLM 升級: 分析 red 分布產補覆蓋建議
|
||||
# 只在有大量 red 時才跑 LLM (避免 well-covered 集群浪費 token)
|
||||
red_summary = await _fetch_red_summary()
|
||||
llm_analysis: dict[str, Any] | None = None
|
||||
if red_summary and red_summary.get("total_red", 0) >= 20:
|
||||
llm_analysis = await _llm_analyze_coverage_gaps(red_summary)
|
||||
if llm_analysis:
|
||||
stats["llm_analyzed"] = True
|
||||
await _send_telegram_gaps(red_summary, llm_analysis)
|
||||
|
||||
await _log_aol(stats, duration_ms, error_msg)
|
||||
|
||||
logger.info(
|
||||
@@ -113,11 +125,188 @@ async def evaluate_once() -> dict[str, int]:
|
||||
remediation=stats["remediation_updated"],
|
||||
rule_matching=stats["rule_matching_updated"],
|
||||
rule_creation=stats["rule_creation_updated"],
|
||||
llm_analyzed=bool(llm_analysis),
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
return stats
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Gap 3.3 LLM 升級 — 覆蓋率缺口分析 + 補覆蓋建議
|
||||
# ============================================================================
|
||||
|
||||
async def _fetch_red_summary() -> dict[str, Any] | None:
|
||||
"""撈最新 run 的 red 分佈 + top red asset type."""
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
try:
|
||||
async with get_db_context() as db:
|
||||
# 總覽: 每維度 red count
|
||||
dim_rows = await db.execute(_sql("""
|
||||
SELECT dimension, count(*) AS cnt
|
||||
FROM asset_coverage_snapshot
|
||||
WHERE run_id = (
|
||||
SELECT run_id FROM asset_discovery_run
|
||||
WHERE status='success' ORDER BY ended_at DESC LIMIT 1
|
||||
)
|
||||
AND coverage_status = 'red'
|
||||
GROUP BY dimension
|
||||
ORDER BY cnt DESC
|
||||
"""))
|
||||
by_dim = [{"dimension": r.dimension, "red_count": int(r.cnt)} for r in dim_rows.fetchall()]
|
||||
total_red = sum(d["red_count"] for d in by_dim)
|
||||
if total_red == 0:
|
||||
return None
|
||||
|
||||
# Top red asset: 哪些 asset 被標最多 red
|
||||
asset_rows = await db.execute(_sql("""
|
||||
SELECT ai.asset_key, ai.asset_type, count(*) AS red_dims
|
||||
FROM asset_coverage_snapshot cs
|
||||
JOIN asset_inventory ai ON cs.asset_id = ai.asset_id
|
||||
WHERE cs.run_id = (
|
||||
SELECT run_id FROM asset_discovery_run
|
||||
WHERE status='success' ORDER BY ended_at DESC LIMIT 1
|
||||
)
|
||||
AND cs.coverage_status = 'red'
|
||||
GROUP BY ai.asset_key, ai.asset_type
|
||||
ORDER BY red_dims DESC
|
||||
LIMIT 10
|
||||
"""))
|
||||
top_assets = [
|
||||
{"asset_key": r.asset_key, "asset_type": r.asset_type, "red_dims": int(r.red_dims)}
|
||||
for r in asset_rows.fetchall()
|
||||
]
|
||||
return {
|
||||
"total_red": total_red,
|
||||
"by_dimension": by_dim,
|
||||
"top_red_assets": top_assets,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.warning("fetch_red_summary_failed", error=str(e))
|
||||
return None
|
||||
|
||||
|
||||
_LLM_COVERAGE_PROMPT = """你是 AWOOOI 可觀察性覆蓋率專家。以下是最新 asset 覆蓋率掃描的 red 缺口,請分析並提出補覆蓋優先順序.
|
||||
|
||||
## red 缺口分布
|
||||
各維度 red 數: {by_dim_json}
|
||||
總 red count: {total_red}
|
||||
|
||||
## 最多 red 的 asset (top 10)
|
||||
{top_assets_json}
|
||||
|
||||
## 7 維自動化意義
|
||||
- auto_monitoring: 有無 Prometheus scrape
|
||||
- auto_alerting: 有無 alert rule 覆蓋
|
||||
- auto_rule_creation: 有無 AI 產生的規則
|
||||
- auto_rule_matching: 過去 30d 是否被 alert 匹配
|
||||
- auto_playbook: 有無 playbook
|
||||
- auto_remediation: 過去 30d 有無 remediation
|
||||
- auto_km_creation: 有無 knowledge_entries
|
||||
|
||||
## 輸出規格 (純 JSON)
|
||||
{{
|
||||
"worst_dimension": "哪個維度最該優先補",
|
||||
"root_cause": "red 集中的真因 (繁中)",
|
||||
"top_remediation_actions": [
|
||||
{{"priority": 1, "target": "asset_key 或類型", "action": "具體動作", "effort": "low|medium|high"}}
|
||||
],
|
||||
"estimated_weeks_to_close": 1-52,
|
||||
"confidence": 0.0-1.0
|
||||
}}
|
||||
"""
|
||||
|
||||
|
||||
async def _llm_analyze_coverage_gaps(red_summary: dict[str, Any]) -> dict[str, Any] | None:
|
||||
"""LLM 分析 coverage 缺口. 失敗回 None."""
|
||||
try:
|
||||
import json as _j
|
||||
from src.services.openclaw import get_openclaw
|
||||
|
||||
prompt = _LLM_COVERAGE_PROMPT.format(
|
||||
by_dim_json=_j.dumps(red_summary.get("by_dimension", []), ensure_ascii=False),
|
||||
total_red=red_summary.get("total_red", 0),
|
||||
top_assets_json=_j.dumps(red_summary.get("top_red_assets", []), ensure_ascii=False, indent=2),
|
||||
)
|
||||
openclaw = get_openclaw()
|
||||
text, provider, success = await openclaw.call(prompt)
|
||||
if not success or not text:
|
||||
return None
|
||||
|
||||
_raw = text.strip()
|
||||
if _raw.startswith("```"):
|
||||
_raw = _raw.strip("`").lstrip("json").strip()
|
||||
|
||||
try:
|
||||
parsed = _j.loads(_raw)
|
||||
if isinstance(parsed, dict) and "worst_dimension" in parsed:
|
||||
parsed["_llm_provider"] = provider
|
||||
return parsed
|
||||
if isinstance(parsed, dict) and "description" in parsed:
|
||||
desc = str(parsed["description"]).strip()
|
||||
if desc.startswith("{"):
|
||||
inner = _j.loads(desc)
|
||||
if isinstance(inner, dict) and "worst_dimension" in inner:
|
||||
inner["_llm_provider"] = provider
|
||||
return inner
|
||||
except (_j.JSONDecodeError, ValueError) as e:
|
||||
logger.warning("coverage_llm_parse_failed", error=str(e), raw=_raw[:200])
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning("coverage_llm_error", error=str(e))
|
||||
return None
|
||||
|
||||
|
||||
async def _send_telegram_gaps(
|
||||
red_summary: dict[str, Any],
|
||||
analysis: dict[str, Any],
|
||||
) -> None:
|
||||
"""推 coverage 缺口 Telegram 摘要."""
|
||||
try:
|
||||
import html
|
||||
from src.core.config import settings
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
|
||||
if not settings.OPENCLAW_TG_CHAT_ID:
|
||||
return
|
||||
|
||||
worst = html.escape(str(analysis.get("worst_dimension", "")))
|
||||
cause = html.escape(str(analysis.get("root_cause", ""))[:200])
|
||||
weeks = analysis.get("estimated_weeks_to_close", "?")
|
||||
conf = analysis.get("confidence", 0.0)
|
||||
|
||||
lines = [
|
||||
"📉 <b>Coverage 缺口分析 (AI 升級)</b>",
|
||||
f"總 red: <b>{red_summary.get('total_red', 0)}</b> | 最嚴重維度: <code>{worst}</code>",
|
||||
f"預計補齊週數: {weeks}w | AI 信心: {conf:.0%}",
|
||||
"",
|
||||
f"🔍 真因: {cause}",
|
||||
"",
|
||||
"<b>Top Remediation Priorities</b>:",
|
||||
]
|
||||
for act in (analysis.get("top_remediation_actions") or [])[:3]:
|
||||
pri = act.get("priority", "?")
|
||||
target = html.escape(str(act.get("target", ""))[:60])
|
||||
action = html.escape(str(act.get("action", ""))[:100])
|
||||
effort = act.get("effort", "?")
|
||||
lines.append(f" {pri}. <code>{target}</code> [{effort}]")
|
||||
lines.append(f" ↳ {action}")
|
||||
lines.append("")
|
||||
lines.append("決策: 人工評估補覆蓋排程")
|
||||
|
||||
msg = "\n".join(lines)
|
||||
tg = get_telegram_gateway()
|
||||
await tg._send_request("sendMessage", { # type: ignore[attr-defined]
|
||||
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
|
||||
"text": msg,
|
||||
"parse_mode": "HTML",
|
||||
"disable_web_page_preview": True,
|
||||
})
|
||||
except Exception as e:
|
||||
logger.warning("coverage_telegram_failed", error=str(e))
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# 查最新 run_id
|
||||
# ============================================================================
|
||||
|
||||
Reference in New Issue
Block a user