diff --git a/apps/api/src/jobs/coverage_evaluator_job.py b/apps/api/src/jobs/coverage_evaluator_job.py
index 03c83053..4e69a081 100644
--- a/apps/api/src/jobs/coverage_evaluator_job.py
+++ b/apps/api/src/jobs/coverage_evaluator_job.py
@@ -27,6 +27,7 @@ from __future__ import annotations
import asyncio
import json as _json
import time as _time
+from typing import Any
import httpx
import structlog
@@ -102,6 +103,17 @@ async def evaluate_once() -> dict[str, int]:
logger.exception("coverage_evaluate_once_failed", error=error_msg)
duration_ms = int((_time.time() - started_ms) * 1000)
+
+ # Gap 3.3 LLM 升級: 分析 red 分布產補覆蓋建議
+ # 只在有大量 red 時才跑 LLM (避免 well-covered 集群浪費 token)
+ red_summary = await _fetch_red_summary()
+ llm_analysis: dict[str, Any] | None = None
+ if red_summary and red_summary.get("total_red", 0) >= 20:
+ llm_analysis = await _llm_analyze_coverage_gaps(red_summary)
+ if llm_analysis:
+ stats["llm_analyzed"] = True
+ await _send_telegram_gaps(red_summary, llm_analysis)
+
await _log_aol(stats, duration_ms, error_msg)
logger.info(
@@ -113,11 +125,188 @@ async def evaluate_once() -> dict[str, int]:
remediation=stats["remediation_updated"],
rule_matching=stats["rule_matching_updated"],
rule_creation=stats["rule_creation_updated"],
+ llm_analyzed=bool(llm_analysis),
duration_ms=duration_ms,
)
return stats
+# ============================================================================
+# Gap 3.3 LLM 升級 — 覆蓋率缺口分析 + 補覆蓋建議
+# ============================================================================
+
+async def _fetch_red_summary() -> dict[str, Any] | None:
+ """撈最新 run 的 red 分佈 + top red asset type."""
+ from sqlalchemy import text as _sql
+ from src.db.base import get_db_context
+
+ try:
+ async with get_db_context() as db:
+ # 總覽: 每維度 red count
+ dim_rows = await db.execute(_sql("""
+ SELECT dimension, count(*) AS cnt
+ FROM asset_coverage_snapshot
+ WHERE run_id = (
+ SELECT run_id FROM asset_discovery_run
+ WHERE status='success' ORDER BY ended_at DESC LIMIT 1
+ )
+ AND coverage_status = 'red'
+ GROUP BY dimension
+ ORDER BY cnt DESC
+ """))
+ by_dim = [{"dimension": r.dimension, "red_count": int(r.cnt)} for r in dim_rows.fetchall()]
+ total_red = sum(d["red_count"] for d in by_dim)
+ if total_red == 0:
+ return None
+
+ # Top red asset: 哪些 asset 被標最多 red
+ asset_rows = await db.execute(_sql("""
+ SELECT ai.asset_key, ai.asset_type, count(*) AS red_dims
+ FROM asset_coverage_snapshot cs
+ JOIN asset_inventory ai ON cs.asset_id = ai.asset_id
+ WHERE cs.run_id = (
+ SELECT run_id FROM asset_discovery_run
+ WHERE status='success' ORDER BY ended_at DESC LIMIT 1
+ )
+ AND cs.coverage_status = 'red'
+ GROUP BY ai.asset_key, ai.asset_type
+ ORDER BY red_dims DESC
+ LIMIT 10
+ """))
+ top_assets = [
+ {"asset_key": r.asset_key, "asset_type": r.asset_type, "red_dims": int(r.red_dims)}
+ for r in asset_rows.fetchall()
+ ]
+ return {
+ "total_red": total_red,
+ "by_dimension": by_dim,
+ "top_red_assets": top_assets,
+ }
+ except Exception as e:
+ logger.warning("fetch_red_summary_failed", error=str(e))
+ return None
+
+
+_LLM_COVERAGE_PROMPT = """你是 AWOOOI 可觀察性覆蓋率專家。以下是最新 asset 覆蓋率掃描的 red 缺口,請分析並提出補覆蓋優先順序.
+
+## red 缺口分布
+各維度 red 數: {by_dim_json}
+總 red count: {total_red}
+
+## 最多 red 的 asset (top 10)
+{top_assets_json}
+
+## 7 維自動化意義
+ - auto_monitoring: 有無 Prometheus scrape
+ - auto_alerting: 有無 alert rule 覆蓋
+ - auto_rule_creation: 有無 AI 產生的規則
+ - auto_rule_matching: 過去 30d 是否被 alert 匹配
+ - auto_playbook: 有無 playbook
+ - auto_remediation: 過去 30d 有無 remediation
+ - auto_km_creation: 有無 knowledge_entries
+
+## 輸出規格 (純 JSON)
+{{
+ "worst_dimension": "哪個維度最該優先補",
+ "root_cause": "red 集中的真因 (繁中)",
+ "top_remediation_actions": [
+ {{"priority": 1, "target": "asset_key 或類型", "action": "具體動作", "effort": "low|medium|high"}}
+ ],
+ "estimated_weeks_to_close": 1-52,
+ "confidence": 0.0-1.0
+}}
+"""
+
+
+async def _llm_analyze_coverage_gaps(red_summary: dict[str, Any]) -> dict[str, Any] | None:
+ """LLM 分析 coverage 缺口. 失敗回 None."""
+ try:
+ import json as _j
+ from src.services.openclaw import get_openclaw
+
+ prompt = _LLM_COVERAGE_PROMPT.format(
+ by_dim_json=_j.dumps(red_summary.get("by_dimension", []), ensure_ascii=False),
+ total_red=red_summary.get("total_red", 0),
+ top_assets_json=_j.dumps(red_summary.get("top_red_assets", []), ensure_ascii=False, indent=2),
+ )
+ openclaw = get_openclaw()
+ text, provider, success = await openclaw.call(prompt)
+ if not success or not text:
+ return None
+
+ _raw = text.strip()
+ if _raw.startswith("```"):
+ _raw = _raw.strip("`").lstrip("json").strip()
+
+ try:
+ parsed = _j.loads(_raw)
+ if isinstance(parsed, dict) and "worst_dimension" in parsed:
+ parsed["_llm_provider"] = provider
+ return parsed
+ if isinstance(parsed, dict) and "description" in parsed:
+ desc = str(parsed["description"]).strip()
+ if desc.startswith("{"):
+ inner = _j.loads(desc)
+ if isinstance(inner, dict) and "worst_dimension" in inner:
+ inner["_llm_provider"] = provider
+ return inner
+ except (_j.JSONDecodeError, ValueError) as e:
+ logger.warning("coverage_llm_parse_failed", error=str(e), raw=_raw[:200])
+ return None
+ except Exception as e:
+ logger.warning("coverage_llm_error", error=str(e))
+ return None
+
+
+async def _send_telegram_gaps(
+ red_summary: dict[str, Any],
+ analysis: dict[str, Any],
+) -> None:
+ """推 coverage 缺口 Telegram 摘要."""
+ try:
+ import html
+ from src.core.config import settings
+ from src.services.telegram_gateway import get_telegram_gateway
+
+ if not settings.OPENCLAW_TG_CHAT_ID:
+ return
+
+ worst = html.escape(str(analysis.get("worst_dimension", "")))
+ cause = html.escape(str(analysis.get("root_cause", ""))[:200])
+ weeks = analysis.get("estimated_weeks_to_close", "?")
+ conf = analysis.get("confidence", 0.0)
+
+ lines = [
+ "📉 Coverage 缺口分析 (AI 升級)",
+ f"總 red: {red_summary.get('total_red', 0)} | 最嚴重維度: {worst}",
+ f"預計補齊週數: {weeks}w | AI 信心: {conf:.0%}",
+ "",
+ f"🔍 真因: {cause}",
+ "",
+ "Top Remediation Priorities:",
+ ]
+ for act in (analysis.get("top_remediation_actions") or [])[:3]:
+ pri = act.get("priority", "?")
+ target = html.escape(str(act.get("target", ""))[:60])
+ action = html.escape(str(act.get("action", ""))[:100])
+ effort = act.get("effort", "?")
+ lines.append(f" {pri}. {target} [{effort}]")
+ lines.append(f" ↳ {action}")
+ lines.append("")
+ lines.append("決策: 人工評估補覆蓋排程")
+
+ msg = "\n".join(lines)
+ tg = get_telegram_gateway()
+ await tg._send_request("sendMessage", { # type: ignore[attr-defined]
+ "chat_id": settings.OPENCLAW_TG_CHAT_ID,
+ "text": msg,
+ "parse_mode": "HTML",
+ "disable_web_page_preview": True,
+ })
+ except Exception as e:
+ logger.warning("coverage_telegram_failed", error=str(e))
+
+
# ============================================================================
# 查最新 run_id
# ============================================================================