feat(capacity_forecaster): Gap 3 LLM 升級 — 從 threshold 到 AI 決策
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
Audit 發現 8/9 個新 scanner 是純 threshold,只 Hermes 1 個用 LLM.
統帥指示「朝 AI 自主化方向」→ Gap 3 開始把 threshold 升級 LLM.
第 1 個升級: capacity_forecaster (最高戰略)
原邏輯 _derive_actions 是硬編 keyword → action mapping:
disk → "清理 /var/log, /var/lib/docker, PG WAL"
mem → "檢查 top mem consumer, 考慮加記憶體"
cpu → "分析 top CPU process, 考慮擴充 vCPU"
新增 _llm_analyze_risk (~60 行):
用 OpenClaw 對每個高風險 host 跑 LLM 分析
Prompt 含:
- host + findings (Prometheus predict_linear 結果)
- 主機架構說明 (110 Harbor / 120-121 K3s / 188 PG 等)
LLM JSON 輸出:
- root_causes (3 個候選真因,繁中)
- priority_actions (high/medium/low + 具體指令 hint)
- urgency_days (0-30)
- confidence (0-1)
3-path JSON parse fallback (直接 / NemoTron wrapper / description 巢狀)
_write_recommendation_aol: 加 llm_analysis 到 output_payload
_send_telegram_forecast: 含 AI 判定 (緊急天數 + 信心 + top 2 action)
LLM 失敗時 fallback _derive_actions 硬編建議
對齊統帥鐵律:
✅ AI 分析 + 人工決策 (仍 requires_human_decision=True)
✅ 不寫死修復動作 (LLM 根據 host 實際狀況產)
✅ root_causes 考慮 host 主機架構 context
Gap 3 進度: 1/8 service 升級 LLM (capacity_forecaster)
剩下 compliance_scanner / coverage_evaluator 等 7 個留後續
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -95,10 +95,13 @@ async def run_capacity_forecaster_loop() -> None:
|
||||
|
||||
|
||||
async def forecast_once() -> dict[str, Any]:
|
||||
"""跑一次預測,對每個高風險 host 留痕 + 推 Telegram."""
|
||||
"""跑一次預測,對每個高風險 host 留痕 + LLM 分析 + 推 Telegram."""
|
||||
started_ms = _time.time()
|
||||
stats: dict[str, Any] = {"queries_run": 0, "high_risk_hosts": 0, "recommendations": 0}
|
||||
risks: dict[str, list[dict[str, Any]]] = {} # host -> [{query, value, reason}]
|
||||
stats: dict[str, Any] = {
|
||||
"queries_run": 0, "high_risk_hosts": 0, "recommendations": 0, "llm_analyzed": 0,
|
||||
}
|
||||
risks: dict[str, list[dict[str, Any]]] = {}
|
||||
llm_analyses: dict[str, dict[str, Any]] = {}
|
||||
error_msg: str | None = None
|
||||
|
||||
try:
|
||||
@@ -114,13 +117,21 @@ async def forecast_once() -> dict[str, Any]:
|
||||
|
||||
stats["high_risk_hosts"] = len(risks)
|
||||
|
||||
# v2 Gap 3 LLM 升級: 對每個高風險 host 跑 LLM 分析產具體建議
|
||||
# (原 _derive_actions 是硬編 keyword mapping, LLM 能看完整 context 產客製建議)
|
||||
for host, findings in risks.items():
|
||||
ok = await _write_recommendation_aol(host, findings)
|
||||
analysis = await _llm_analyze_risk(host, findings)
|
||||
if analysis:
|
||||
llm_analyses[host] = analysis
|
||||
stats["llm_analyzed"] += 1
|
||||
|
||||
for host, findings in risks.items():
|
||||
ok = await _write_recommendation_aol(host, findings, llm_analyses.get(host))
|
||||
if ok:
|
||||
stats["recommendations"] += 1
|
||||
|
||||
if risks:
|
||||
await _send_telegram_forecast(risks)
|
||||
await _send_telegram_forecast(risks, llm_analyses)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"{type(e).__name__}: {e}"[:1000]
|
||||
@@ -132,11 +143,85 @@ async def forecast_once() -> dict[str, Any]:
|
||||
queries=stats["queries_run"],
|
||||
hosts=stats["high_risk_hosts"],
|
||||
recommendations=stats["recommendations"],
|
||||
llm_analyzed=stats["llm_analyzed"],
|
||||
duration_ms=duration_ms,
|
||||
)
|
||||
return stats
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# v2 Gap 3 LLM 分析 — 統帥鐵律「朝 AI 自主化方向」
|
||||
# ============================================================================
|
||||
|
||||
_LLM_FORECAST_PROMPT = """你是 AWOOOI 容量規劃專家。以下 host 過去 7 天趨勢顯示高風險,請分析真因並給具體可執行建議。
|
||||
|
||||
## Host
|
||||
{host}
|
||||
|
||||
## Prometheus 預測命中
|
||||
{findings_json}
|
||||
|
||||
## 當前主機環境資訊
|
||||
- 主機架構: 110 (Harbor/Gitea/監控), 112 (Security), 120/121 (K3s), 125 (K3s backup), 188 (PG/Redis/Ollama/MinIO)
|
||||
- 判斷請考慮: 該主機上跑什麼服務、常見瓶頸模式
|
||||
|
||||
## 輸出規格 (必須是合法 JSON,純 JSON 無前後文字)
|
||||
{{
|
||||
"root_causes": ["3 個候選真因,繁中"],
|
||||
"priority_actions": [
|
||||
{{"priority": "high|medium|low", "action": "具體動作 (繁中)", "command_hint": "可執行指令 hint"}}
|
||||
],
|
||||
"urgency_days": 0-30,
|
||||
"confidence": 0.0-1.0
|
||||
}}
|
||||
|
||||
## 分析方向 (不要寫死 hardcoded reason)
|
||||
- disk_saturation: 查是哪類檔案增長 (container images / PG WAL / 日誌 / build cache)
|
||||
- mem: 查哪個 process 佔最多 (JVM / Redis / cache thrashing)
|
||||
- cpu: 看是 runtime 壓力還是 cron / batch job
|
||||
"""
|
||||
|
||||
|
||||
async def _llm_analyze_risk(host: str, findings: list[dict[str, Any]]) -> dict[str, Any] | None:
|
||||
"""用 OpenClaw 分析高風險 host. 失敗回 None 不阻塞."""
|
||||
try:
|
||||
import json as _j
|
||||
from src.services.openclaw import get_openclaw
|
||||
|
||||
prompt = _LLM_FORECAST_PROMPT.format(
|
||||
host=host,
|
||||
findings_json=_j.dumps(findings, ensure_ascii=False, indent=2),
|
||||
)
|
||||
openclaw = get_openclaw()
|
||||
text, provider, success = await openclaw.call(prompt)
|
||||
if not success or not text:
|
||||
return None
|
||||
|
||||
_raw = text.strip()
|
||||
if _raw.startswith("```"):
|
||||
_raw = _raw.strip("`").lstrip("json").strip()
|
||||
|
||||
try:
|
||||
parsed = _j.loads(_raw)
|
||||
if isinstance(parsed, dict) and "priority_actions" in parsed:
|
||||
parsed["_llm_provider"] = provider
|
||||
return parsed
|
||||
# NemoTron wrapper fallback
|
||||
if isinstance(parsed, dict) and "description" in parsed:
|
||||
desc = str(parsed["description"]).strip()
|
||||
if desc.startswith("{"):
|
||||
inner = _j.loads(desc)
|
||||
if isinstance(inner, dict) and "priority_actions" in inner:
|
||||
inner["_llm_provider"] = provider
|
||||
return inner
|
||||
except (_j.JSONDecodeError, ValueError) as e:
|
||||
logger.warning("forecast_llm_parse_failed", host=host, error=str(e), raw=_raw[:200])
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning("forecast_llm_error", host=host, error=str(e))
|
||||
return None
|
||||
|
||||
|
||||
async def _run_prom_query(promql: str) -> dict[str, float]:
|
||||
"""跑 Prometheus instant query, 回傳 {host: value}."""
|
||||
url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query"
|
||||
@@ -163,19 +248,25 @@ async def _run_prom_query(promql: str) -> dict[str, float]:
|
||||
return {}
|
||||
|
||||
|
||||
async def _write_recommendation_aol(host: str, findings: list[dict[str, Any]]) -> bool:
|
||||
"""寫 aol(capacity_recommendation)."""
|
||||
async def _write_recommendation_aol(
|
||||
host: str,
|
||||
findings: list[dict[str, Any]],
|
||||
llm_analysis: dict[str, Any] | None = None,
|
||||
) -> bool:
|
||||
"""寫 aol(capacity_recommendation) + LLM 分析結果."""
|
||||
try:
|
||||
from sqlalchemy import text as _sql
|
||||
from src.db.base import get_db_context
|
||||
|
||||
input_payload = {"host": host, "forecast_horizon_days": 7, "findings_count": len(findings)}
|
||||
output_payload = {
|
||||
output_payload: dict[str, Any] = {
|
||||
"host": host,
|
||||
"findings": findings,
|
||||
"proposed_actions": _derive_actions(findings),
|
||||
"requires_human_decision": True,
|
||||
}
|
||||
if llm_analysis:
|
||||
output_payload["llm_analysis"] = llm_analysis
|
||||
|
||||
async with get_db_context() as db:
|
||||
await db.execute(
|
||||
@@ -219,8 +310,11 @@ def _derive_actions(findings: list[dict[str, Any]]) -> list[str]:
|
||||
return actions
|
||||
|
||||
|
||||
async def _send_telegram_forecast(risks: dict[str, list[dict[str, Any]]]) -> bool:
|
||||
"""推 Telegram 預測摘要."""
|
||||
async def _send_telegram_forecast(
|
||||
risks: dict[str, list[dict[str, Any]]],
|
||||
llm_analyses: dict[str, dict[str, Any]] | None = None,
|
||||
) -> bool:
|
||||
"""推 Telegram 預測摘要 (含 LLM 分析)."""
|
||||
try:
|
||||
import html
|
||||
from src.services.telegram_gateway import get_telegram_gateway
|
||||
@@ -228,18 +322,31 @@ async def _send_telegram_forecast(risks: dict[str, list[dict[str, Any]]]) -> boo
|
||||
if not settings.OPENCLAW_TG_CHAT_ID:
|
||||
return False
|
||||
|
||||
llm_analyses = llm_analyses or {}
|
||||
lines = [
|
||||
"📈 <b>容量預測 (Phase 4 Holt-Winters MVP)</b>",
|
||||
"📈 <b>容量預測 (Phase 4 AI 升級版)</b>",
|
||||
f"未來 7 天高風險 host: {len(risks)} 台",
|
||||
"",
|
||||
]
|
||||
for host, findings in list(risks.items())[:10]:
|
||||
for host, findings in list(risks.items())[:8]:
|
||||
lines.append(f"🟡 <code>{html.escape(host)}</code>")
|
||||
for f in findings[:3]:
|
||||
lines.append(f" ▸ {html.escape(f['reason'])} (value={f['value']:.2f})")
|
||||
actions = _derive_actions(findings)
|
||||
if actions:
|
||||
lines.append(f" 建議: {html.escape(actions[0])[:100]}")
|
||||
|
||||
ai = llm_analyses.get(host)
|
||||
if ai:
|
||||
urgency = ai.get("urgency_days", "?")
|
||||
conf = ai.get("confidence", 0.0)
|
||||
lines.append(f" 🤖 AI 判定: 緊急 {urgency}d, 信心 {conf:.0%}")
|
||||
for act in (ai.get("priority_actions") or [])[:2]:
|
||||
pri = act.get("priority", "")
|
||||
detail = html.escape(str(act.get("action", ""))[:100])
|
||||
lines.append(f" ▸ [{pri}] {detail}")
|
||||
else:
|
||||
# LLM fallback: 用 hardcoded _derive_actions
|
||||
actions = _derive_actions(findings)
|
||||
if actions:
|
||||
lines.append(f" 建議: {html.escape(actions[0])[:100]}")
|
||||
lines.append("")
|
||||
lines.append("決策: 人工評估擴容/清理時機")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user