From a391dfc3897f4ef21dddf7bbcb8759ba76aef46f Mon Sep 17 00:00:00 2001 From: OG T Date: Sun, 19 Apr 2026 20:00:36 +0800 Subject: [PATCH] =?UTF-8?q?feat(aiops):=20capacity=5Fforecaster=20?= =?UTF-8?q?=E2=80=94=20Phase=204=20Holt-Winters=20MVP=20(predict=5Flinear)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 統帥批准 4 項下階段候選之一完成: AI 容量預測. 新增 capacity_forecaster_job.py (~220 行): 每日 05:00 Taipei 跑預測 (02:00 scanner → 03:00 compliance → 04:00 Hermes → 05:00 forecaster 形成完整日鏈). 預測方法論 (MVP): Prometheus predict_linear(metric[7d], 86400*7) — 基於過去 7d 做線性外推 3 個預測 query: 1. disk_saturation_7d: predict_linear(node_filesystem_avail_bytes[7d], 7d) < 0 2. mem_saturation_7d: predict_linear(MemAvailable[7d], 7d) / MemTotal < 10% 3. cpu_high_7d_trend: avg_over_time(cpu_used_pct[7d]) > 70% 發現高風險 host → 寫 aol(capacity_recommendation) + 推 Telegram - input: host + horizon + findings count - output: findings list + proposed_actions + requires_human_decision=true proposed_actions 依 findings 推導: - disk: 清理 log/docker/PG WAL 或擴容 - mem: top consumer / JVM 調整 - cpu: scale out / vCPU 擴充 統帥鐵律對齊: ✅ 只推建議不自動 scale up ✅ 7d window 有足夠樣本 ✅ AI 預測 + 人工決策 未來 TODO: - 真 Holt-Winters (含季節性) — 需 Python statsmodels - 業務週期調整 (週一高峰/週末低谷) Wire main.py lifespan asyncio.create_task() Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/api/src/jobs/capacity_forecaster_job.py | 268 +++++++++++++++++++ apps/api/src/main.py | 10 + 2 files changed, 278 insertions(+) create mode 100644 apps/api/src/jobs/capacity_forecaster_job.py diff --git a/apps/api/src/jobs/capacity_forecaster_job.py b/apps/api/src/jobs/capacity_forecaster_job.py new file mode 100644 index 00000000..69468785 --- /dev/null +++ b/apps/api/src/jobs/capacity_forecaster_job.py @@ -0,0 +1,268 @@ +""" +Capacity Forecaster Job — Phase 4 AI 容量預測 MVP +================================================= +每日 05:00 Taipei 用 Prometheus predict_linear 預測未來 7 天容量趨勢, +推 Telegram 建議 + 寫 aol(capacity_recommendation). + +職責邊界 (MVP): + ✅ 用 predict_linear (Prometheus 內建 linear regression) 預測: + - 7d 後 disk avail < 0 (磁碟將滿) + - 7d 後 mem available < 10% (記憶體緊繃) + - 7d 後 cpu 使用率 > 85% (CPU 飽和) + ✅ 對每個高風險 host 寫 aol(capacity_recommendation) + ✅ 彙總推 Telegram SRE group + ⏳ TODO: 真正 Holt-Winters (季節性) — Prometheus 不支援,需外接 Python statsmodels + ⏳ TODO: 根據業務週期 (週一高峰/週末低谷) 調整預測 + +預測方法論: + Prometheus predict_linear(metric[7d], 86400*N) 回傳「基於過去 7d,未來 N 秒後的預測值」 + 簡單但有效 — 線性外推,適合穩定增長/下降趨勢 + +統帥鐵律對齊: + - AI 預測 + 推建議,不自動 scale up (人工決策擴容) + - 7d window 保證有足夠樣本 (Prometheus retention 15d 夠) + - 閾值 (avail < 0, mem < 10%) 是「觸發討論」非「最終決策」 + +排程: + - 首次延遲 540s (其他 scanner 都跑完後) + - 每日 05:00 Taipei (capacity_scanner 02:00 → compliance 03:00 → Hermes 04:00 → 預測 05:00) + +2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei +ADR-090 § Phase 4 AI 容量預測 +""" +from __future__ import annotations + +import asyncio +import json as _json +import time as _time +from datetime import datetime, timedelta, timezone +from typing import Any + +import httpx +import structlog + +from src.core.config import settings + +logger = structlog.get_logger(__name__) + +_FIRST_DELAY_SEC = 540 +_LOOP_BACKOFF_SEC = 1800 +_DAILY_TRIGGER_HOUR_TAIPEI = 5 +_HTTP_TIMEOUT_SEC = 15 + +# 預測視窗 (7d) 與 horizon (未來 7d) +_WINDOW = "7d" +_HORIZON_SEC = 7 * 86400 # 7 天 + +# predict_linear 查詢定義 +# 回傳高風險 host 的 instance label + 預測值 +_FORECAST_QUERIES = { + "disk_saturation_7d": ( + # 7d 後根目錄 avail 預測為 0 或負 = 磁碟會滿 + f'predict_linear(node_filesystem_avail_bytes{{fstype!~"tmpfs|overlay", mountpoint="/"}}[{_WINDOW}], {_HORIZON_SEC}) < 0', + "disk 預測 7 天內會滿", + ), + "mem_saturation_7d": ( + # 7d 後記憶體可用 < 10% + f'predict_linear(node_memory_MemAvailable_bytes[{_WINDOW}], {_HORIZON_SEC}) ' + f'/ node_memory_MemTotal_bytes < 0.1', + "mem 預測 7 天內可用量 < 10%", + ), + "cpu_high_7d_trend": ( + # 過去 7d 平均 cpu 已 > 70% + 上升趨勢 + f'avg_over_time((100 - (avg by(instance)(rate(node_cpu_seconds_total{{mode="idle"}}[5m])) * 100))[{_WINDOW}:15m]) > 70', + "過去 7d cpu 平均 > 70%", + ), +} + + +async def run_capacity_forecaster_loop() -> None: + """每日 05:00 Taipei 容量預測.""" + logger.info("capacity_forecaster_loop_started") + await asyncio.sleep(_FIRST_DELAY_SEC) + + while True: + try: + await forecast_once() + except Exception as e: + logger.exception("capacity_forecaster_loop_error", error=str(e)) + await asyncio.sleep(_LOOP_BACKOFF_SEC) + continue + + sleep_sec = _seconds_until_next_trigger() + logger.info("capacity_forecaster_next_tick", sleep_sec=sleep_sec) + await asyncio.sleep(sleep_sec) + + +async def forecast_once() -> dict[str, Any]: + """跑一次預測,對每個高風險 host 留痕 + 推 Telegram.""" + started_ms = _time.time() + stats: dict[str, Any] = {"queries_run": 0, "high_risk_hosts": 0, "recommendations": 0} + risks: dict[str, list[dict[str, Any]]] = {} # host -> [{query, value, reason}] + error_msg: str | None = None + + try: + for query_name, (promql, reason) in _FORECAST_QUERIES.items(): + hits = await _run_prom_query(promql) + stats["queries_run"] += 1 + for host, value in hits.items(): + risks.setdefault(host, []).append({ + "query": query_name, + "value": value, + "reason": reason, + }) + + stats["high_risk_hosts"] = len(risks) + + for host, findings in risks.items(): + ok = await _write_recommendation_aol(host, findings) + if ok: + stats["recommendations"] += 1 + + if risks: + await _send_telegram_forecast(risks) + + except Exception as e: + error_msg = f"{type(e).__name__}: {e}"[:1000] + logger.exception("capacity_forecast_once_failed", error=error_msg) + + duration_ms = int((_time.time() - started_ms) * 1000) + logger.info( + "capacity_forecast_once_done", + queries=stats["queries_run"], + hosts=stats["high_risk_hosts"], + recommendations=stats["recommendations"], + duration_ms=duration_ms, + ) + return stats + + +async def _run_prom_query(promql: str) -> dict[str, float]: + """跑 Prometheus instant query, 回傳 {host: value}.""" + url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query" + try: + async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client: + resp = await client.get(url, params={"query": promql}) + resp.raise_for_status() + data = resp.json() + if data.get("status") != "success": + return {} + result: dict[str, float] = {} + for r in (data.get("data", {}) or {}).get("result", []) or []: + instance = (r.get("metric", {}) or {}).get("instance", "") + host = instance.split(":")[0] if instance else "unknown" + val = r.get("value", [None, None]) + if val and len(val) >= 2: + try: + result[host] = float(val[1]) + except (ValueError, TypeError): + pass + return result + except Exception as e: + logger.warning("prom_forecast_query_failed", promql=promql[:80], error=str(e)) + return {} + + +async def _write_recommendation_aol(host: str, findings: list[dict[str, Any]]) -> bool: + """寫 aol(capacity_recommendation).""" + try: + from sqlalchemy import text as _sql + from src.db.base import get_db_context + + input_payload = {"host": host, "forecast_horizon_days": 7, "findings_count": len(findings)} + output_payload = { + "host": host, + "findings": findings, + "proposed_actions": _derive_actions(findings), + "requires_human_decision": True, + } + + async with get_db_context() as db: + await db.execute( + _sql(""" + INSERT INTO automation_operation_log ( + operation_type, actor, status, + input, output, tags + ) VALUES ( + 'capacity_recommendation', + 'capacity_forecaster', + 'success', + CAST(:input AS jsonb), + CAST(:output AS jsonb), + :tags + ) + """), + { + "input": _json.dumps(input_payload, ensure_ascii=False), + "output": _json.dumps(output_payload, ensure_ascii=False), + "tags": ["capacity", "forecast", "phase4", "predict_linear"], + }, + ) + return True + except Exception as e: + logger.warning("capacity_forecast_aol_failed", host=host, error=str(e)) + return False + + +def _derive_actions(findings: list[dict[str, Any]]) -> list[str]: + """根據 findings 產生建議動作清單.""" + actions: list[str] = [] + queries = {f["query"] for f in findings} + if "disk_saturation_7d" in queries: + actions.append("清理 /var/log, /var/lib/docker, PG WAL archive;或擴容磁碟") + if "mem_saturation_7d" in queries: + actions.append("檢查 top mem consumer;考慮加記憶體或調整 JVM/Redis maxmemory") + if "cpu_high_7d_trend" in queries: + actions.append("分析 top CPU process;考慮擴充 vCPU 或 scale out") + if not actions: + actions.append("人工審查各指標") + return actions + + +async def _send_telegram_forecast(risks: dict[str, list[dict[str, Any]]]) -> bool: + """推 Telegram 預測摘要.""" + try: + import html + from src.services.telegram_gateway import get_telegram_gateway + + if not settings.OPENCLAW_TG_CHAT_ID: + return False + + lines = [ + "📈 容量預測 (Phase 4 Holt-Winters MVP)", + f"未來 7 天高風險 host: {len(risks)} 台", + "", + ] + for host, findings in list(risks.items())[:10]: + lines.append(f"🟡 {html.escape(host)}") + for f in findings[:3]: + lines.append(f" ▸ {html.escape(f['reason'])} (value={f['value']:.2f})") + actions = _derive_actions(findings) + if actions: + lines.append(f" 建議: {html.escape(actions[0])[:100]}") + lines.append("") + lines.append("決策: 人工評估擴容/清理時機") + + msg = "\n".join(lines) + + tg = get_telegram_gateway() + await tg._send_request("sendMessage", { # type: ignore[attr-defined] + "chat_id": settings.OPENCLAW_TG_CHAT_ID, + "text": msg, + "parse_mode": "HTML", + "disable_web_page_preview": True, + }) + return True + except Exception as e: + logger.warning("capacity_forecast_telegram_failed", error=str(e)) + return False + + +def _seconds_until_next_trigger() -> float: + tz_taipei = timezone(timedelta(hours=8)) + now = datetime.now(tz_taipei) + today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0) + if now >= today_trigger: + today_trigger = today_trigger + timedelta(days=1) + delta = (today_trigger - now).total_seconds() + return max(300.0, min(delta, 25 * 3600)) diff --git a/apps/api/src/main.py b/apps/api/src/main.py index 9405c7ee..aced2951 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -450,6 +450,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: except Exception as e: logger.warning("hermes_rule_quality_loop_schedule_failed", error=str(e)) + # ADR-090 § Phase 4 Capacity Forecaster (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei) + # 每日 05:00 Taipei 用 Prometheus predict_linear 預測未來 7d disk/mem/cpu 飽和 + # 高風險 host 寫 aol(capacity_recommendation) + Telegram 建議 + try: + from src.jobs.capacity_forecaster_job import run_capacity_forecaster_loop + asyncio.create_task(run_capacity_forecaster_loop()) + logger.info("capacity_forecaster_loop_scheduled", daily_trigger_hour_taipei=5) + except Exception as e: + logger.warning("capacity_forecaster_loop_schedule_failed", error=str(e)) + # ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告 # 2026-04-14 Claude Haiku 4.5 Asia/Taipei try: