feat(aiops): capacity_forecaster — Phase 4 Holt-Winters MVP (predict_linear)
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

統帥批准 4 項下階段候選之一完成: AI 容量預測.

新增 capacity_forecaster_job.py (~220 行):
  每日 05:00 Taipei 跑預測 (02:00 scanner → 03:00 compliance →
  04:00 Hermes → 05:00 forecaster 形成完整日鏈).

預測方法論 (MVP):
  Prometheus predict_linear(metric[7d], 86400*7) — 基於過去 7d 做線性外推
  3 個預測 query:
    1. disk_saturation_7d: predict_linear(node_filesystem_avail_bytes[7d], 7d) < 0
    2. mem_saturation_7d: predict_linear(MemAvailable[7d], 7d) / MemTotal < 10%
    3. cpu_high_7d_trend: avg_over_time(cpu_used_pct[7d]) > 70%

發現高風險 host → 寫 aol(capacity_recommendation) + 推 Telegram
  - input: host + horizon + findings count
  - output: findings list + proposed_actions + requires_human_decision=true

proposed_actions 依 findings 推導:
  - disk: 清理 log/docker/PG WAL 或擴容
  - mem: top consumer / JVM 調整
  - cpu: scale out / vCPU 擴充

統帥鐵律對齊:
   只推建議不自動 scale up
   7d window 有足夠樣本
   AI 預測 + 人工決策

未來 TODO:
  - 真 Holt-Winters (含季節性) — 需 Python statsmodels
  - 業務週期調整 (週一高峰/週末低谷)

Wire main.py lifespan asyncio.create_task()

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-19 20:00:36 +08:00
parent 53618b25c9
commit a391dfc389
2 changed files with 278 additions and 0 deletions

View File

@@ -0,0 +1,268 @@
"""
Capacity Forecaster Job — Phase 4 AI 容量預測 MVP
=================================================
每日 05:00 Taipei 用 Prometheus predict_linear 預測未來 7 天容量趨勢,
推 Telegram 建議 + 寫 aol(capacity_recommendation).
職責邊界 (MVP):
✅ 用 predict_linear (Prometheus 內建 linear regression) 預測:
- 7d 後 disk avail < 0 (磁碟將滿)
- 7d 後 mem available < 10% (記憶體緊繃)
- 7d 後 cpu 使用率 > 85% (CPU 飽和)
✅ 對每個高風險 host 寫 aol(capacity_recommendation)
✅ 彙總推 Telegram SRE group
⏳ TODO: 真正 Holt-Winters (季節性) — Prometheus 不支援,需外接 Python statsmodels
⏳ TODO: 根據業務週期 (週一高峰/週末低谷) 調整預測
預測方法論:
Prometheus predict_linear(metric[7d], 86400*N) 回傳「基於過去 7d,未來 N 秒後的預測值」
簡單但有效 — 線性外推,適合穩定增長/下降趨勢
統帥鐵律對齊:
- AI 預測 + 推建議,不自動 scale up (人工決策擴容)
- 7d window 保證有足夠樣本 (Prometheus retention 15d 夠)
- 閾值 (avail < 0, mem < 10%) 是「觸發討論」非「最終決策」
排程:
- 首次延遲 540s (其他 scanner 都跑完後)
- 每日 05:00 Taipei (capacity_scanner 02:00 → compliance 03:00 → Hermes 04:00 → 預測 05:00)
2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
ADR-090 § Phase 4 AI 容量預測
"""
from __future__ import annotations
import asyncio
import json as _json
import time as _time
from datetime import datetime, timedelta, timezone
from typing import Any
import httpx
import structlog
from src.core.config import settings
logger = structlog.get_logger(__name__)
_FIRST_DELAY_SEC = 540
_LOOP_BACKOFF_SEC = 1800
_DAILY_TRIGGER_HOUR_TAIPEI = 5
_HTTP_TIMEOUT_SEC = 15
# 預測視窗 (7d) 與 horizon (未來 7d)
_WINDOW = "7d"
_HORIZON_SEC = 7 * 86400 # 7 天
# predict_linear 查詢定義
# 回傳高風險 host 的 instance label + 預測值
_FORECAST_QUERIES = {
"disk_saturation_7d": (
# 7d 後根目錄 avail 預測為 0 或負 = 磁碟會滿
f'predict_linear(node_filesystem_avail_bytes{{fstype!~"tmpfs|overlay", mountpoint="/"}}[{_WINDOW}], {_HORIZON_SEC}) < 0',
"disk 預測 7 天內會滿",
),
"mem_saturation_7d": (
# 7d 後記憶體可用 < 10%
f'predict_linear(node_memory_MemAvailable_bytes[{_WINDOW}], {_HORIZON_SEC}) '
f'/ node_memory_MemTotal_bytes < 0.1',
"mem 預測 7 天內可用量 < 10%",
),
"cpu_high_7d_trend": (
# 過去 7d 平均 cpu 已 > 70% + 上升趨勢
f'avg_over_time((100 - (avg by(instance)(rate(node_cpu_seconds_total{{mode="idle"}}[5m])) * 100))[{_WINDOW}:15m]) > 70',
"過去 7d cpu 平均 > 70%",
),
}
async def run_capacity_forecaster_loop() -> None:
"""每日 05:00 Taipei 容量預測."""
logger.info("capacity_forecaster_loop_started")
await asyncio.sleep(_FIRST_DELAY_SEC)
while True:
try:
await forecast_once()
except Exception as e:
logger.exception("capacity_forecaster_loop_error", error=str(e))
await asyncio.sleep(_LOOP_BACKOFF_SEC)
continue
sleep_sec = _seconds_until_next_trigger()
logger.info("capacity_forecaster_next_tick", sleep_sec=sleep_sec)
await asyncio.sleep(sleep_sec)
async def forecast_once() -> dict[str, Any]:
"""跑一次預測,對每個高風險 host 留痕 + 推 Telegram."""
started_ms = _time.time()
stats: dict[str, Any] = {"queries_run": 0, "high_risk_hosts": 0, "recommendations": 0}
risks: dict[str, list[dict[str, Any]]] = {} # host -> [{query, value, reason}]
error_msg: str | None = None
try:
for query_name, (promql, reason) in _FORECAST_QUERIES.items():
hits = await _run_prom_query(promql)
stats["queries_run"] += 1
for host, value in hits.items():
risks.setdefault(host, []).append({
"query": query_name,
"value": value,
"reason": reason,
})
stats["high_risk_hosts"] = len(risks)
for host, findings in risks.items():
ok = await _write_recommendation_aol(host, findings)
if ok:
stats["recommendations"] += 1
if risks:
await _send_telegram_forecast(risks)
except Exception as e:
error_msg = f"{type(e).__name__}: {e}"[:1000]
logger.exception("capacity_forecast_once_failed", error=error_msg)
duration_ms = int((_time.time() - started_ms) * 1000)
logger.info(
"capacity_forecast_once_done",
queries=stats["queries_run"],
hosts=stats["high_risk_hosts"],
recommendations=stats["recommendations"],
duration_ms=duration_ms,
)
return stats
async def _run_prom_query(promql: str) -> dict[str, float]:
"""跑 Prometheus instant query, 回傳 {host: value}."""
url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query"
try:
async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client:
resp = await client.get(url, params={"query": promql})
resp.raise_for_status()
data = resp.json()
if data.get("status") != "success":
return {}
result: dict[str, float] = {}
for r in (data.get("data", {}) or {}).get("result", []) or []:
instance = (r.get("metric", {}) or {}).get("instance", "")
host = instance.split(":")[0] if instance else "unknown"
val = r.get("value", [None, None])
if val and len(val) >= 2:
try:
result[host] = float(val[1])
except (ValueError, TypeError):
pass
return result
except Exception as e:
logger.warning("prom_forecast_query_failed", promql=promql[:80], error=str(e))
return {}
async def _write_recommendation_aol(host: str, findings: list[dict[str, Any]]) -> bool:
"""寫 aol(capacity_recommendation)."""
try:
from sqlalchemy import text as _sql
from src.db.base import get_db_context
input_payload = {"host": host, "forecast_horizon_days": 7, "findings_count": len(findings)}
output_payload = {
"host": host,
"findings": findings,
"proposed_actions": _derive_actions(findings),
"requires_human_decision": True,
}
async with get_db_context() as db:
await db.execute(
_sql("""
INSERT INTO automation_operation_log (
operation_type, actor, status,
input, output, tags
) VALUES (
'capacity_recommendation',
'capacity_forecaster',
'success',
CAST(:input AS jsonb),
CAST(:output AS jsonb),
:tags
)
"""),
{
"input": _json.dumps(input_payload, ensure_ascii=False),
"output": _json.dumps(output_payload, ensure_ascii=False),
"tags": ["capacity", "forecast", "phase4", "predict_linear"],
},
)
return True
except Exception as e:
logger.warning("capacity_forecast_aol_failed", host=host, error=str(e))
return False
def _derive_actions(findings: list[dict[str, Any]]) -> list[str]:
"""根據 findings 產生建議動作清單."""
actions: list[str] = []
queries = {f["query"] for f in findings}
if "disk_saturation_7d" in queries:
actions.append("清理 /var/log, /var/lib/docker, PG WAL archive;或擴容磁碟")
if "mem_saturation_7d" in queries:
actions.append("檢查 top mem consumer;考慮加記憶體或調整 JVM/Redis maxmemory")
if "cpu_high_7d_trend" in queries:
actions.append("分析 top CPU process;考慮擴充 vCPU 或 scale out")
if not actions:
actions.append("人工審查各指標")
return actions
async def _send_telegram_forecast(risks: dict[str, list[dict[str, Any]]]) -> bool:
"""推 Telegram 預測摘要."""
try:
import html
from src.services.telegram_gateway import get_telegram_gateway
if not settings.OPENCLAW_TG_CHAT_ID:
return False
lines = [
"📈 <b>容量預測 (Phase 4 Holt-Winters MVP)</b>",
f"未來 7 天高風險 host: {len(risks)}",
"",
]
for host, findings in list(risks.items())[:10]:
lines.append(f"🟡 <code>{html.escape(host)}</code>")
for f in findings[:3]:
lines.append(f"{html.escape(f['reason'])} (value={f['value']:.2f})")
actions = _derive_actions(findings)
if actions:
lines.append(f" 建議: {html.escape(actions[0])[:100]}")
lines.append("")
lines.append("決策: 人工評估擴容/清理時機")
msg = "\n".join(lines)
tg = get_telegram_gateway()
await tg._send_request("sendMessage", { # type: ignore[attr-defined]
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": msg,
"parse_mode": "HTML",
"disable_web_page_preview": True,
})
return True
except Exception as e:
logger.warning("capacity_forecast_telegram_failed", error=str(e))
return False
def _seconds_until_next_trigger() -> float:
tz_taipei = timezone(timedelta(hours=8))
now = datetime.now(tz_taipei)
today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0)
if now >= today_trigger:
today_trigger = today_trigger + timedelta(days=1)
delta = (today_trigger - now).total_seconds()
return max(300.0, min(delta, 25 * 3600))

View File

@@ -450,6 +450,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("hermes_rule_quality_loop_schedule_failed", error=str(e))
# ADR-090 § Phase 4 Capacity Forecaster (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
# 每日 05:00 Taipei 用 Prometheus predict_linear 預測未來 7d disk/mem/cpu 飽和
# 高風險 host 寫 aol(capacity_recommendation) + Telegram 建議
try:
from src.jobs.capacity_forecaster_job import run_capacity_forecaster_loop
asyncio.create_task(run_capacity_forecaster_loop())
logger.info("capacity_forecaster_loop_scheduled", daily_trigger_hour_taipei=5)
except Exception as e:
logger.warning("capacity_forecaster_loop_schedule_failed", error=str(e))
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
try: