From d6b854a25e3a370631595eb56a19f23bb667ce32 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sun, 19 Apr 2026 21:52:34 +0800 Subject: [PATCH] =?UTF-8?q?feat(capacity=5Fforecaster):=20Gap=203=20LLM=20?= =?UTF-8?q?=E5=8D=87=E7=B4=9A=20=E2=80=94=20=E5=BE=9E=20threshold=20?= =?UTF-8?q?=E5=88=B0=20AI=20=E6=B1=BA=E7=AD=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Audit 發現 8/9 個新 scanner 是純 threshold,只 Hermes 1 個用 LLM. 統帥指示「朝 AI 自主化方向」→ Gap 3 開始把 threshold 升級 LLM. 第 1 個升級: capacity_forecaster (最高戰略) 原邏輯 _derive_actions 是硬編 keyword → action mapping: disk → "清理 /var/log, /var/lib/docker, PG WAL" mem → "檢查 top mem consumer, 考慮加記憶體" cpu → "分析 top CPU process, 考慮擴充 vCPU" 新增 _llm_analyze_risk (~60 行): 用 OpenClaw 對每個高風險 host 跑 LLM 分析 Prompt 含: - host + findings (Prometheus predict_linear 結果) - 主機架構說明 (110 Harbor / 120-121 K3s / 188 PG 等) LLM JSON 輸出: - root_causes (3 個候選真因,繁中) - priority_actions (high/medium/low + 具體指令 hint) - urgency_days (0-30) - confidence (0-1) 3-path JSON parse fallback (直接 / NemoTron wrapper / description 巢狀) _write_recommendation_aol: 加 llm_analysis 到 output_payload _send_telegram_forecast: 含 AI 判定 (緊急天數 + 信心 + top 2 action) LLM 失敗時 fallback _derive_actions 硬編建議 對齊統帥鐵律: ✅ AI 分析 + 人工決策 (仍 requires_human_decision=True) ✅ 不寫死修復動作 (LLM 根據 host 實際狀況產) ✅ root_causes 考慮 host 主機架構 context Gap 3 進度: 1/8 service 升級 LLM (capacity_forecaster) 剩下 compliance_scanner / coverage_evaluator 等 7 個留後續 Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/api/src/jobs/capacity_forecaster_job.py | 137 +++++++++++++++++-- 1 file changed, 122 insertions(+), 15 deletions(-) diff --git a/apps/api/src/jobs/capacity_forecaster_job.py b/apps/api/src/jobs/capacity_forecaster_job.py index 69468785..887a9b36 100644 --- a/apps/api/src/jobs/capacity_forecaster_job.py +++ b/apps/api/src/jobs/capacity_forecaster_job.py @@ -95,10 +95,13 @@ async def run_capacity_forecaster_loop() -> None: async def forecast_once() -> dict[str, Any]: - """跑一次預測,對每個高風險 host 留痕 + 推 Telegram.""" + """跑一次預測,對每個高風險 host 留痕 + LLM 分析 + 推 Telegram.""" started_ms = _time.time() - stats: dict[str, Any] = {"queries_run": 0, "high_risk_hosts": 0, "recommendations": 0} - risks: dict[str, list[dict[str, Any]]] = {} # host -> [{query, value, reason}] + stats: dict[str, Any] = { + "queries_run": 0, "high_risk_hosts": 0, "recommendations": 0, "llm_analyzed": 0, + } + risks: dict[str, list[dict[str, Any]]] = {} + llm_analyses: dict[str, dict[str, Any]] = {} error_msg: str | None = None try: @@ -114,13 +117,21 @@ async def forecast_once() -> dict[str, Any]: stats["high_risk_hosts"] = len(risks) + # v2 Gap 3 LLM 升級: 對每個高風險 host 跑 LLM 分析產具體建議 + # (原 _derive_actions 是硬編 keyword mapping, LLM 能看完整 context 產客製建議) for host, findings in risks.items(): - ok = await _write_recommendation_aol(host, findings) + analysis = await _llm_analyze_risk(host, findings) + if analysis: + llm_analyses[host] = analysis + stats["llm_analyzed"] += 1 + + for host, findings in risks.items(): + ok = await _write_recommendation_aol(host, findings, llm_analyses.get(host)) if ok: stats["recommendations"] += 1 if risks: - await _send_telegram_forecast(risks) + await _send_telegram_forecast(risks, llm_analyses) except Exception as e: error_msg = f"{type(e).__name__}: {e}"[:1000] @@ -132,11 +143,85 @@ async def forecast_once() -> dict[str, Any]: queries=stats["queries_run"], hosts=stats["high_risk_hosts"], recommendations=stats["recommendations"], + llm_analyzed=stats["llm_analyzed"], duration_ms=duration_ms, ) return stats +# ============================================================================ +# v2 Gap 3 LLM 分析 — 統帥鐵律「朝 AI 自主化方向」 +# ============================================================================ + +_LLM_FORECAST_PROMPT = """你是 AWOOOI 容量規劃專家。以下 host 過去 7 天趨勢顯示高風險,請分析真因並給具體可執行建議。 + +## Host +{host} + +## Prometheus 預測命中 +{findings_json} + +## 當前主機環境資訊 + - 主機架構: 110 (Harbor/Gitea/監控), 112 (Security), 120/121 (K3s), 125 (K3s backup), 188 (PG/Redis/Ollama/MinIO) + - 判斷請考慮: 該主機上跑什麼服務、常見瓶頸模式 + +## 輸出規格 (必須是合法 JSON,純 JSON 無前後文字) +{{ + "root_causes": ["3 個候選真因,繁中"], + "priority_actions": [ + {{"priority": "high|medium|low", "action": "具體動作 (繁中)", "command_hint": "可執行指令 hint"}} + ], + "urgency_days": 0-30, + "confidence": 0.0-1.0 +}} + +## 分析方向 (不要寫死 hardcoded reason) + - disk_saturation: 查是哪類檔案增長 (container images / PG WAL / 日誌 / build cache) + - mem: 查哪個 process 佔最多 (JVM / Redis / cache thrashing) + - cpu: 看是 runtime 壓力還是 cron / batch job +""" + + +async def _llm_analyze_risk(host: str, findings: list[dict[str, Any]]) -> dict[str, Any] | None: + """用 OpenClaw 分析高風險 host. 失敗回 None 不阻塞.""" + try: + import json as _j + from src.services.openclaw import get_openclaw + + prompt = _LLM_FORECAST_PROMPT.format( + host=host, + findings_json=_j.dumps(findings, ensure_ascii=False, indent=2), + ) + openclaw = get_openclaw() + text, provider, success = await openclaw.call(prompt) + if not success or not text: + return None + + _raw = text.strip() + if _raw.startswith("```"): + _raw = _raw.strip("`").lstrip("json").strip() + + try: + parsed = _j.loads(_raw) + if isinstance(parsed, dict) and "priority_actions" in parsed: + parsed["_llm_provider"] = provider + return parsed + # NemoTron wrapper fallback + if isinstance(parsed, dict) and "description" in parsed: + desc = str(parsed["description"]).strip() + if desc.startswith("{"): + inner = _j.loads(desc) + if isinstance(inner, dict) and "priority_actions" in inner: + inner["_llm_provider"] = provider + return inner + except (_j.JSONDecodeError, ValueError) as e: + logger.warning("forecast_llm_parse_failed", host=host, error=str(e), raw=_raw[:200]) + return None + except Exception as e: + logger.warning("forecast_llm_error", host=host, error=str(e)) + return None + + async def _run_prom_query(promql: str) -> dict[str, float]: """跑 Prometheus instant query, 回傳 {host: value}.""" url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query" @@ -163,19 +248,25 @@ async def _run_prom_query(promql: str) -> dict[str, float]: return {} -async def _write_recommendation_aol(host: str, findings: list[dict[str, Any]]) -> bool: - """寫 aol(capacity_recommendation).""" +async def _write_recommendation_aol( + host: str, + findings: list[dict[str, Any]], + llm_analysis: dict[str, Any] | None = None, +) -> bool: + """寫 aol(capacity_recommendation) + LLM 分析結果.""" try: from sqlalchemy import text as _sql from src.db.base import get_db_context input_payload = {"host": host, "forecast_horizon_days": 7, "findings_count": len(findings)} - output_payload = { + output_payload: dict[str, Any] = { "host": host, "findings": findings, "proposed_actions": _derive_actions(findings), "requires_human_decision": True, } + if llm_analysis: + output_payload["llm_analysis"] = llm_analysis async with get_db_context() as db: await db.execute( @@ -219,8 +310,11 @@ def _derive_actions(findings: list[dict[str, Any]]) -> list[str]: return actions -async def _send_telegram_forecast(risks: dict[str, list[dict[str, Any]]]) -> bool: - """推 Telegram 預測摘要.""" +async def _send_telegram_forecast( + risks: dict[str, list[dict[str, Any]]], + llm_analyses: dict[str, dict[str, Any]] | None = None, +) -> bool: + """推 Telegram 預測摘要 (含 LLM 分析).""" try: import html from src.services.telegram_gateway import get_telegram_gateway @@ -228,18 +322,31 @@ async def _send_telegram_forecast(risks: dict[str, list[dict[str, Any]]]) -> boo if not settings.OPENCLAW_TG_CHAT_ID: return False + llm_analyses = llm_analyses or {} lines = [ - "📈 容量預測 (Phase 4 Holt-Winters MVP)", + "📈 容量預測 (Phase 4 AI 升級版)", f"未來 7 天高風險 host: {len(risks)} 台", "", ] - for host, findings in list(risks.items())[:10]: + for host, findings in list(risks.items())[:8]: lines.append(f"🟡 {html.escape(host)}") for f in findings[:3]: lines.append(f" ▸ {html.escape(f['reason'])} (value={f['value']:.2f})") - actions = _derive_actions(findings) - if actions: - lines.append(f" 建議: {html.escape(actions[0])[:100]}") + + ai = llm_analyses.get(host) + if ai: + urgency = ai.get("urgency_days", "?") + conf = ai.get("confidence", 0.0) + lines.append(f" 🤖 AI 判定: 緊急 {urgency}d, 信心 {conf:.0%}") + for act in (ai.get("priority_actions") or [])[:2]: + pri = act.get("priority", "") + detail = html.escape(str(act.get("action", ""))[:100]) + lines.append(f" ▸ [{pri}] {detail}") + else: + # LLM fallback: 用 hardcoded _derive_actions + actions = _derive_actions(findings) + if actions: + lines.append(f" 建議: {html.escape(actions[0])[:100]}") lines.append("") lines.append("決策: 人工評估擴容/清理時機")