feat(capacity_forecaster): Gap 3 LLM 升級 — 從 threshold 到 AI 決策
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

Audit 發現 8/9 個新 scanner 是純 threshold,只 Hermes 1 個用 LLM.
統帥指示「朝 AI 自主化方向」→ Gap 3 開始把 threshold 升級 LLM.

第 1 個升級: capacity_forecaster (最高戰略)
原邏輯 _derive_actions 是硬編 keyword → action mapping:
  disk → "清理 /var/log, /var/lib/docker, PG WAL"
  mem  → "檢查 top mem consumer, 考慮加記憶體"
  cpu  → "分析 top CPU process, 考慮擴充 vCPU"

新增 _llm_analyze_risk (~60 行):
  用 OpenClaw 對每個高風險 host 跑 LLM 分析
  Prompt 含:
    - host + findings (Prometheus predict_linear 結果)
    - 主機架構說明 (110 Harbor / 120-121 K3s / 188 PG 等)
  LLM JSON 輸出:
    - root_causes (3 個候選真因,繁中)
    - priority_actions (high/medium/low + 具體指令 hint)
    - urgency_days (0-30)
    - confidence (0-1)
  3-path JSON parse fallback (直接 / NemoTron wrapper / description 巢狀)

_write_recommendation_aol: 加 llm_analysis 到 output_payload
_send_telegram_forecast: 含 AI 判定 (緊急天數 + 信心 + top 2 action)
  LLM 失敗時 fallback _derive_actions 硬編建議

對齊統帥鐵律:
   AI 分析 + 人工決策 (仍 requires_human_decision=True)
   不寫死修復動作 (LLM 根據 host 實際狀況產)
   root_causes 考慮 host 主機架構 context

Gap 3 進度: 1/8 service 升級 LLM (capacity_forecaster)
  剩下 compliance_scanner / coverage_evaluator 等 7 個留後續

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-19 21:52:34 +08:00
parent 97154d12fa
commit d6b854a25e

View File

@@ -95,10 +95,13 @@ async def run_capacity_forecaster_loop() -> None:
async def forecast_once() -> dict[str, Any]: async def forecast_once() -> dict[str, Any]:
"""跑一次預測,對每個高風險 host 留痕 + 推 Telegram.""" """跑一次預測,對每個高風險 host 留痕 + LLM 分析 + 推 Telegram."""
started_ms = _time.time() started_ms = _time.time()
stats: dict[str, Any] = {"queries_run": 0, "high_risk_hosts": 0, "recommendations": 0} stats: dict[str, Any] = {
risks: dict[str, list[dict[str, Any]]] = {} # host -> [{query, value, reason}] "queries_run": 0, "high_risk_hosts": 0, "recommendations": 0, "llm_analyzed": 0,
}
risks: dict[str, list[dict[str, Any]]] = {}
llm_analyses: dict[str, dict[str, Any]] = {}
error_msg: str | None = None error_msg: str | None = None
try: try:
@@ -114,13 +117,21 @@ async def forecast_once() -> dict[str, Any]:
stats["high_risk_hosts"] = len(risks) stats["high_risk_hosts"] = len(risks)
# v2 Gap 3 LLM 升級: 對每個高風險 host 跑 LLM 分析產具體建議
# (原 _derive_actions 是硬編 keyword mapping, LLM 能看完整 context 產客製建議)
for host, findings in risks.items(): for host, findings in risks.items():
ok = await _write_recommendation_aol(host, findings) analysis = await _llm_analyze_risk(host, findings)
if analysis:
llm_analyses[host] = analysis
stats["llm_analyzed"] += 1
for host, findings in risks.items():
ok = await _write_recommendation_aol(host, findings, llm_analyses.get(host))
if ok: if ok:
stats["recommendations"] += 1 stats["recommendations"] += 1
if risks: if risks:
await _send_telegram_forecast(risks) await _send_telegram_forecast(risks, llm_analyses)
except Exception as e: except Exception as e:
error_msg = f"{type(e).__name__}: {e}"[:1000] error_msg = f"{type(e).__name__}: {e}"[:1000]
@@ -132,11 +143,85 @@ async def forecast_once() -> dict[str, Any]:
queries=stats["queries_run"], queries=stats["queries_run"],
hosts=stats["high_risk_hosts"], hosts=stats["high_risk_hosts"],
recommendations=stats["recommendations"], recommendations=stats["recommendations"],
llm_analyzed=stats["llm_analyzed"],
duration_ms=duration_ms, duration_ms=duration_ms,
) )
return stats return stats
# ============================================================================
# v2 Gap 3 LLM 分析 — 統帥鐵律「朝 AI 自主化方向」
# ============================================================================
_LLM_FORECAST_PROMPT = """你是 AWOOOI 容量規劃專家。以下 host 過去 7 天趨勢顯示高風險,請分析真因並給具體可執行建議。
## Host
{host}
## Prometheus 預測命中
{findings_json}
## 當前主機環境資訊
- 主機架構: 110 (Harbor/Gitea/監控), 112 (Security), 120/121 (K3s), 125 (K3s backup), 188 (PG/Redis/Ollama/MinIO)
- 判斷請考慮: 該主機上跑什麼服務、常見瓶頸模式
## 輸出規格 (必須是合法 JSON,純 JSON 無前後文字)
{{
"root_causes": ["3 個候選真因,繁中"],
"priority_actions": [
{{"priority": "high|medium|low", "action": "具體動作 (繁中)", "command_hint": "可執行指令 hint"}}
],
"urgency_days": 0-30,
"confidence": 0.0-1.0
}}
## 分析方向 (不要寫死 hardcoded reason)
- disk_saturation: 查是哪類檔案增長 (container images / PG WAL / 日誌 / build cache)
- mem: 查哪個 process 佔最多 (JVM / Redis / cache thrashing)
- cpu: 看是 runtime 壓力還是 cron / batch job
"""
async def _llm_analyze_risk(host: str, findings: list[dict[str, Any]]) -> dict[str, Any] | None:
"""用 OpenClaw 分析高風險 host. 失敗回 None 不阻塞."""
try:
import json as _j
from src.services.openclaw import get_openclaw
prompt = _LLM_FORECAST_PROMPT.format(
host=host,
findings_json=_j.dumps(findings, ensure_ascii=False, indent=2),
)
openclaw = get_openclaw()
text, provider, success = await openclaw.call(prompt)
if not success or not text:
return None
_raw = text.strip()
if _raw.startswith("```"):
_raw = _raw.strip("`").lstrip("json").strip()
try:
parsed = _j.loads(_raw)
if isinstance(parsed, dict) and "priority_actions" in parsed:
parsed["_llm_provider"] = provider
return parsed
# NemoTron wrapper fallback
if isinstance(parsed, dict) and "description" in parsed:
desc = str(parsed["description"]).strip()
if desc.startswith("{"):
inner = _j.loads(desc)
if isinstance(inner, dict) and "priority_actions" in inner:
inner["_llm_provider"] = provider
return inner
except (_j.JSONDecodeError, ValueError) as e:
logger.warning("forecast_llm_parse_failed", host=host, error=str(e), raw=_raw[:200])
return None
except Exception as e:
logger.warning("forecast_llm_error", host=host, error=str(e))
return None
async def _run_prom_query(promql: str) -> dict[str, float]: async def _run_prom_query(promql: str) -> dict[str, float]:
"""跑 Prometheus instant query, 回傳 {host: value}.""" """跑 Prometheus instant query, 回傳 {host: value}."""
url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query" url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query"
@@ -163,19 +248,25 @@ async def _run_prom_query(promql: str) -> dict[str, float]:
return {} return {}
async def _write_recommendation_aol(host: str, findings: list[dict[str, Any]]) -> bool: async def _write_recommendation_aol(
"""寫 aol(capacity_recommendation).""" host: str,
findings: list[dict[str, Any]],
llm_analysis: dict[str, Any] | None = None,
) -> bool:
"""寫 aol(capacity_recommendation) + LLM 分析結果."""
try: try:
from sqlalchemy import text as _sql from sqlalchemy import text as _sql
from src.db.base import get_db_context from src.db.base import get_db_context
input_payload = {"host": host, "forecast_horizon_days": 7, "findings_count": len(findings)} input_payload = {"host": host, "forecast_horizon_days": 7, "findings_count": len(findings)}
output_payload = { output_payload: dict[str, Any] = {
"host": host, "host": host,
"findings": findings, "findings": findings,
"proposed_actions": _derive_actions(findings), "proposed_actions": _derive_actions(findings),
"requires_human_decision": True, "requires_human_decision": True,
} }
if llm_analysis:
output_payload["llm_analysis"] = llm_analysis
async with get_db_context() as db: async with get_db_context() as db:
await db.execute( await db.execute(
@@ -219,8 +310,11 @@ def _derive_actions(findings: list[dict[str, Any]]) -> list[str]:
return actions return actions
async def _send_telegram_forecast(risks: dict[str, list[dict[str, Any]]]) -> bool: async def _send_telegram_forecast(
"""推 Telegram 預測摘要.""" risks: dict[str, list[dict[str, Any]]],
llm_analyses: dict[str, dict[str, Any]] | None = None,
) -> bool:
"""推 Telegram 預測摘要 (含 LLM 分析)."""
try: try:
import html import html
from src.services.telegram_gateway import get_telegram_gateway from src.services.telegram_gateway import get_telegram_gateway
@@ -228,18 +322,31 @@ async def _send_telegram_forecast(risks: dict[str, list[dict[str, Any]]]) -> boo
if not settings.OPENCLAW_TG_CHAT_ID: if not settings.OPENCLAW_TG_CHAT_ID:
return False return False
llm_analyses = llm_analyses or {}
lines = [ lines = [
"📈 <b>容量預測 (Phase 4 Holt-Winters MVP)</b>", "📈 <b>容量預測 (Phase 4 AI 升級版)</b>",
f"未來 7 天高風險 host: {len(risks)}", f"未來 7 天高風險 host: {len(risks)}",
"", "",
] ]
for host, findings in list(risks.items())[:10]: for host, findings in list(risks.items())[:8]:
lines.append(f"🟡 <code>{html.escape(host)}</code>") lines.append(f"🟡 <code>{html.escape(host)}</code>")
for f in findings[:3]: for f in findings[:3]:
lines.append(f"{html.escape(f['reason'])} (value={f['value']:.2f})") lines.append(f"{html.escape(f['reason'])} (value={f['value']:.2f})")
actions = _derive_actions(findings)
if actions: ai = llm_analyses.get(host)
lines.append(f" 建議: {html.escape(actions[0])[:100]}") if ai:
urgency = ai.get("urgency_days", "?")
conf = ai.get("confidence", 0.0)
lines.append(f" 🤖 AI 判定: 緊急 {urgency}d, 信心 {conf:.0%}")
for act in (ai.get("priority_actions") or [])[:2]:
pri = act.get("priority", "")
detail = html.escape(str(act.get("action", ""))[:100])
lines.append(f" ▸ [{pri}] {detail}")
else:
# LLM fallback: 用 hardcoded _derive_actions
actions = _derive_actions(findings)
if actions:
lines.append(f" 建議: {html.escape(actions[0])[:100]}")
lines.append("") lines.append("")
lines.append("決策: 人工評估擴容/清理時機") lines.append("決策: 人工評估擴容/清理時機")