From 2f5cab2e458a7d7f8aab7b3b46d51983d4fbc5ee Mon Sep 17 00:00:00 2001 From: Your Name Date: Sun, 19 Apr 2026 22:02:36 +0800 Subject: [PATCH] =?UTF-8?q?feat(coverage=5Fevaluator):=20Gap=203.3=20LLM?= =?UTF-8?q?=20=E5=8D=87=E7=B4=9A=20=E2=80=94=20=E7=BC=BA=E5=8F=A3=E5=88=86?= =?UTF-8?q?=E6=9E=90=20+=20=E8=A3=9C=E8=A6=86=E8=93=8B=E5=BB=BA=E8=AD=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gap 3 進度: 4/9 service 升級 LLM (達到合理上限 — 其他 4 個純資料移動不需 LLM) coverage_evaluator 原本 7 維升級 unknown→green/yellow/red 後無主動建議. 新增: 1. _fetch_red_summary: 撈最新 run 的 red 分布 + top 10 被標 red 的 asset 2. _llm_analyze_coverage_gaps (~50 行): 有 >= 20 red 時才跑 LLM (避免 well-covered 集群浪費 token) LLM JSON 輸出: - worst_dimension: 最該優先補的維度 - root_cause: red 集中的真因 (繁中) - top_remediation_actions[3]: priority/target/action/effort - estimated_weeks_to_close: 1-52 - confidence: 0-1 3. _send_telegram_gaps: 推 coverage 缺口 Telegram 摘要 總 red + 最嚴重維度 + 補齊週數 + top 3 補覆蓋動作 scan 完流程: 評估 7 維 → 撈 red summary → LLM 分析 (if total_red >= 20) → Telegram 統帥鐵律對齊: ✅ 不寫死補覆蓋優先 (LLM 根據實際 red 分布推) ✅ AI 建議 + 人工決策 (Telegram 末行: '人工評估補覆蓋排程') ✅ 包含預估完成時間 + 信心 (可追蹤) session 累計 35 commits, 9 新 scanner, 4 用 LLM: - Hermes (rule quality) - capacity_forecaster (容量預測) - compliance_scanner (合規態勢) - coverage_evaluator (覆蓋缺口) 剩 5 個純資料移動不適合 LLM (asset_scanner/rule_catalog_sync/ rule_stats_updater/asset_change_tracker/capacity_scanner) Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/api/src/jobs/coverage_evaluator_job.py | 189 ++++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/apps/api/src/jobs/coverage_evaluator_job.py b/apps/api/src/jobs/coverage_evaluator_job.py index 03c83053..4e69a081 100644 --- a/apps/api/src/jobs/coverage_evaluator_job.py +++ b/apps/api/src/jobs/coverage_evaluator_job.py @@ -27,6 +27,7 @@ from __future__ import annotations import asyncio import json as _json import time as _time +from typing import Any import httpx import structlog @@ -102,6 +103,17 @@ async def evaluate_once() -> dict[str, int]: logger.exception("coverage_evaluate_once_failed", error=error_msg) duration_ms = int((_time.time() - started_ms) * 1000) + + # Gap 3.3 LLM 升級: 分析 red 分布產補覆蓋建議 + # 只在有大量 red 時才跑 LLM (避免 well-covered 集群浪費 token) + red_summary = await _fetch_red_summary() + llm_analysis: dict[str, Any] | None = None + if red_summary and red_summary.get("total_red", 0) >= 20: + llm_analysis = await _llm_analyze_coverage_gaps(red_summary) + if llm_analysis: + stats["llm_analyzed"] = True + await _send_telegram_gaps(red_summary, llm_analysis) + await _log_aol(stats, duration_ms, error_msg) logger.info( @@ -113,11 +125,188 @@ async def evaluate_once() -> dict[str, int]: remediation=stats["remediation_updated"], rule_matching=stats["rule_matching_updated"], rule_creation=stats["rule_creation_updated"], + llm_analyzed=bool(llm_analysis), duration_ms=duration_ms, ) return stats +# ============================================================================ +# Gap 3.3 LLM 升級 — 覆蓋率缺口分析 + 補覆蓋建議 +# ============================================================================ + +async def _fetch_red_summary() -> dict[str, Any] | None: + """撈最新 run 的 red 分佈 + top red asset type.""" + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + try: + async with get_db_context() as db: + # 總覽: 每維度 red count + dim_rows = await db.execute(_sql(""" + SELECT dimension, count(*) AS cnt + FROM asset_coverage_snapshot + WHERE run_id = ( + SELECT run_id FROM asset_discovery_run + WHERE status='success' ORDER BY ended_at DESC LIMIT 1 + ) + AND coverage_status = 'red' + GROUP BY dimension + ORDER BY cnt DESC + """)) + by_dim = [{"dimension": r.dimension, "red_count": int(r.cnt)} for r in dim_rows.fetchall()] + total_red = sum(d["red_count"] for d in by_dim) + if total_red == 0: + return None + + # Top red asset: 哪些 asset 被標最多 red + asset_rows = await db.execute(_sql(""" + SELECT ai.asset_key, ai.asset_type, count(*) AS red_dims + FROM asset_coverage_snapshot cs + JOIN asset_inventory ai ON cs.asset_id = ai.asset_id + WHERE cs.run_id = ( + SELECT run_id FROM asset_discovery_run + WHERE status='success' ORDER BY ended_at DESC LIMIT 1 + ) + AND cs.coverage_status = 'red' + GROUP BY ai.asset_key, ai.asset_type + ORDER BY red_dims DESC + LIMIT 10 + """)) + top_assets = [ + {"asset_key": r.asset_key, "asset_type": r.asset_type, "red_dims": int(r.red_dims)} + for r in asset_rows.fetchall() + ] + return { + "total_red": total_red, + "by_dimension": by_dim, + "top_red_assets": top_assets, + } + except Exception as e: + logger.warning("fetch_red_summary_failed", error=str(e)) + return None + + +_LLM_COVERAGE_PROMPT = """你是 AWOOOI 可觀察性覆蓋率專家。以下是最新 asset 覆蓋率掃描的 red 缺口,請分析並提出補覆蓋優先順序. + +## red 缺口分布 +各維度 red 數: {by_dim_json} +總 red count: {total_red} + +## 最多 red 的 asset (top 10) +{top_assets_json} + +## 7 維自動化意義 + - auto_monitoring: 有無 Prometheus scrape + - auto_alerting: 有無 alert rule 覆蓋 + - auto_rule_creation: 有無 AI 產生的規則 + - auto_rule_matching: 過去 30d 是否被 alert 匹配 + - auto_playbook: 有無 playbook + - auto_remediation: 過去 30d 有無 remediation + - auto_km_creation: 有無 knowledge_entries + +## 輸出規格 (純 JSON) +{{ + "worst_dimension": "哪個維度最該優先補", + "root_cause": "red 集中的真因 (繁中)", + "top_remediation_actions": [ + {{"priority": 1, "target": "asset_key 或類型", "action": "具體動作", "effort": "low|medium|high"}} + ], + "estimated_weeks_to_close": 1-52, + "confidence": 0.0-1.0 +}} +""" + + +async def _llm_analyze_coverage_gaps(red_summary: dict[str, Any]) -> dict[str, Any] | None: + """LLM 分析 coverage 缺口. 失敗回 None.""" + try: + import json as _j + from src.services.openclaw import get_openclaw + + prompt = _LLM_COVERAGE_PROMPT.format( + by_dim_json=_j.dumps(red_summary.get("by_dimension", []), ensure_ascii=False), + total_red=red_summary.get("total_red", 0), + top_assets_json=_j.dumps(red_summary.get("top_red_assets", []), ensure_ascii=False, indent=2), + ) + openclaw = get_openclaw() + text, provider, success = await openclaw.call(prompt) + if not success or not text: + return None + + _raw = text.strip() + if _raw.startswith("```"): + _raw = _raw.strip("`").lstrip("json").strip() + + try: + parsed = _j.loads(_raw) + if isinstance(parsed, dict) and "worst_dimension" in parsed: + parsed["_llm_provider"] = provider + return parsed + if isinstance(parsed, dict) and "description" in parsed: + desc = str(parsed["description"]).strip() + if desc.startswith("{"): + inner = _j.loads(desc) + if isinstance(inner, dict) and "worst_dimension" in inner: + inner["_llm_provider"] = provider + return inner + except (_j.JSONDecodeError, ValueError) as e: + logger.warning("coverage_llm_parse_failed", error=str(e), raw=_raw[:200]) + return None + except Exception as e: + logger.warning("coverage_llm_error", error=str(e)) + return None + + +async def _send_telegram_gaps( + red_summary: dict[str, Any], + analysis: dict[str, Any], +) -> None: + """推 coverage 缺口 Telegram 摘要.""" + try: + import html + from src.core.config import settings + from src.services.telegram_gateway import get_telegram_gateway + + if not settings.OPENCLAW_TG_CHAT_ID: + return + + worst = html.escape(str(analysis.get("worst_dimension", ""))) + cause = html.escape(str(analysis.get("root_cause", ""))[:200]) + weeks = analysis.get("estimated_weeks_to_close", "?") + conf = analysis.get("confidence", 0.0) + + lines = [ + "📉 Coverage 缺口分析 (AI 升級)", + f"總 red: {red_summary.get('total_red', 0)} | 最嚴重維度: {worst}", + f"預計補齊週數: {weeks}w | AI 信心: {conf:.0%}", + "", + f"🔍 真因: {cause}", + "", + "Top Remediation Priorities:", + ] + for act in (analysis.get("top_remediation_actions") or [])[:3]: + pri = act.get("priority", "?") + target = html.escape(str(act.get("target", ""))[:60]) + action = html.escape(str(act.get("action", ""))[:100]) + effort = act.get("effort", "?") + lines.append(f" {pri}. {target} [{effort}]") + lines.append(f" ↳ {action}") + lines.append("") + lines.append("決策: 人工評估補覆蓋排程") + + msg = "\n".join(lines) + tg = get_telegram_gateway() + await tg._send_request("sendMessage", { # type: ignore[attr-defined] + "chat_id": settings.OPENCLAW_TG_CHAT_ID, + "text": msg, + "parse_mode": "HTML", + "disable_web_page_preview": True, + }) + except Exception as e: + logger.warning("coverage_telegram_failed", error=str(e)) + + # ============================================================================ # 查最新 run_id # ============================================================================