diff --git a/apps/api/src/jobs/capacity_forecaster_job.py b/apps/api/src/jobs/capacity_forecaster_job.py
new file mode 100644
index 00000000..69468785
--- /dev/null
+++ b/apps/api/src/jobs/capacity_forecaster_job.py
@@ -0,0 +1,268 @@
+"""
+Capacity Forecaster Job — Phase 4 AI 容量預測 MVP
+=================================================
+每日 05:00 Taipei 用 Prometheus predict_linear 預測未來 7 天容量趨勢,
+推 Telegram 建議 + 寫 aol(capacity_recommendation).
+
+職責邊界 (MVP):
+ ✅ 用 predict_linear (Prometheus 內建 linear regression) 預測:
+ - 7d 後 disk avail < 0 (磁碟將滿)
+ - 7d 後 mem available < 10% (記憶體緊繃)
+ - 7d 後 cpu 使用率 > 85% (CPU 飽和)
+ ✅ 對每個高風險 host 寫 aol(capacity_recommendation)
+ ✅ 彙總推 Telegram SRE group
+ ⏳ TODO: 真正 Holt-Winters (季節性) — Prometheus 不支援,需外接 Python statsmodels
+ ⏳ TODO: 根據業務週期 (週一高峰/週末低谷) 調整預測
+
+預測方法論:
+ Prometheus predict_linear(metric[7d], 86400*N) 回傳「基於過去 7d,未來 N 秒後的預測值」
+ 簡單但有效 — 線性外推,適合穩定增長/下降趨勢
+
+統帥鐵律對齊:
+ - AI 預測 + 推建議,不自動 scale up (人工決策擴容)
+ - 7d window 保證有足夠樣本 (Prometheus retention 15d 夠)
+ - 閾值 (avail < 0, mem < 10%) 是「觸發討論」非「最終決策」
+
+排程:
+ - 首次延遲 540s (其他 scanner 都跑完後)
+ - 每日 05:00 Taipei (capacity_scanner 02:00 → compliance 03:00 → Hermes 04:00 → 預測 05:00)
+
+2026-04-19 ogt + Claude Opus 4.7 (1M context) Asia/Taipei
+ADR-090 § Phase 4 AI 容量預測
+"""
+from __future__ import annotations
+
+import asyncio
+import json as _json
+import time as _time
+from datetime import datetime, timedelta, timezone
+from typing import Any
+
+import httpx
+import structlog
+
+from src.core.config import settings
+
+logger = structlog.get_logger(__name__)
+
+_FIRST_DELAY_SEC = 540
+_LOOP_BACKOFF_SEC = 1800
+_DAILY_TRIGGER_HOUR_TAIPEI = 5
+_HTTP_TIMEOUT_SEC = 15
+
+# 預測視窗 (7d) 與 horizon (未來 7d)
+_WINDOW = "7d"
+_HORIZON_SEC = 7 * 86400 # 7 天
+
+# predict_linear 查詢定義
+# 回傳高風險 host 的 instance label + 預測值
+_FORECAST_QUERIES = {
+ "disk_saturation_7d": (
+ # 7d 後根目錄 avail 預測為 0 或負 = 磁碟會滿
+ f'predict_linear(node_filesystem_avail_bytes{{fstype!~"tmpfs|overlay", mountpoint="/"}}[{_WINDOW}], {_HORIZON_SEC}) < 0',
+ "disk 預測 7 天內會滿",
+ ),
+ "mem_saturation_7d": (
+ # 7d 後記憶體可用 < 10%
+ f'predict_linear(node_memory_MemAvailable_bytes[{_WINDOW}], {_HORIZON_SEC}) '
+ f'/ node_memory_MemTotal_bytes < 0.1',
+ "mem 預測 7 天內可用量 < 10%",
+ ),
+ "cpu_high_7d_trend": (
+ # 過去 7d 平均 cpu 已 > 70% + 上升趨勢
+ f'avg_over_time((100 - (avg by(instance)(rate(node_cpu_seconds_total{{mode="idle"}}[5m])) * 100))[{_WINDOW}:15m]) > 70',
+ "過去 7d cpu 平均 > 70%",
+ ),
+}
+
+
+async def run_capacity_forecaster_loop() -> None:
+ """每日 05:00 Taipei 容量預測."""
+ logger.info("capacity_forecaster_loop_started")
+ await asyncio.sleep(_FIRST_DELAY_SEC)
+
+ while True:
+ try:
+ await forecast_once()
+ except Exception as e:
+ logger.exception("capacity_forecaster_loop_error", error=str(e))
+ await asyncio.sleep(_LOOP_BACKOFF_SEC)
+ continue
+
+ sleep_sec = _seconds_until_next_trigger()
+ logger.info("capacity_forecaster_next_tick", sleep_sec=sleep_sec)
+ await asyncio.sleep(sleep_sec)
+
+
+async def forecast_once() -> dict[str, Any]:
+ """跑一次預測,對每個高風險 host 留痕 + 推 Telegram."""
+ started_ms = _time.time()
+ stats: dict[str, Any] = {"queries_run": 0, "high_risk_hosts": 0, "recommendations": 0}
+ risks: dict[str, list[dict[str, Any]]] = {} # host -> [{query, value, reason}]
+ error_msg: str | None = None
+
+ try:
+ for query_name, (promql, reason) in _FORECAST_QUERIES.items():
+ hits = await _run_prom_query(promql)
+ stats["queries_run"] += 1
+ for host, value in hits.items():
+ risks.setdefault(host, []).append({
+ "query": query_name,
+ "value": value,
+ "reason": reason,
+ })
+
+ stats["high_risk_hosts"] = len(risks)
+
+ for host, findings in risks.items():
+ ok = await _write_recommendation_aol(host, findings)
+ if ok:
+ stats["recommendations"] += 1
+
+ if risks:
+ await _send_telegram_forecast(risks)
+
+ except Exception as e:
+ error_msg = f"{type(e).__name__}: {e}"[:1000]
+ logger.exception("capacity_forecast_once_failed", error=error_msg)
+
+ duration_ms = int((_time.time() - started_ms) * 1000)
+ logger.info(
+ "capacity_forecast_once_done",
+ queries=stats["queries_run"],
+ hosts=stats["high_risk_hosts"],
+ recommendations=stats["recommendations"],
+ duration_ms=duration_ms,
+ )
+ return stats
+
+
+async def _run_prom_query(promql: str) -> dict[str, float]:
+ """跑 Prometheus instant query, 回傳 {host: value}."""
+ url = f"{settings.PROMETHEUS_URL.rstrip('/')}/api/v1/query"
+ try:
+ async with httpx.AsyncClient(timeout=_HTTP_TIMEOUT_SEC, trust_env=False) as client:
+ resp = await client.get(url, params={"query": promql})
+ resp.raise_for_status()
+ data = resp.json()
+ if data.get("status") != "success":
+ return {}
+ result: dict[str, float] = {}
+ for r in (data.get("data", {}) or {}).get("result", []) or []:
+ instance = (r.get("metric", {}) or {}).get("instance", "")
+ host = instance.split(":")[0] if instance else "unknown"
+ val = r.get("value", [None, None])
+ if val and len(val) >= 2:
+ try:
+ result[host] = float(val[1])
+ except (ValueError, TypeError):
+ pass
+ return result
+ except Exception as e:
+ logger.warning("prom_forecast_query_failed", promql=promql[:80], error=str(e))
+ return {}
+
+
+async def _write_recommendation_aol(host: str, findings: list[dict[str, Any]]) -> bool:
+ """寫 aol(capacity_recommendation)."""
+ try:
+ from sqlalchemy import text as _sql
+ from src.db.base import get_db_context
+
+ input_payload = {"host": host, "forecast_horizon_days": 7, "findings_count": len(findings)}
+ output_payload = {
+ "host": host,
+ "findings": findings,
+ "proposed_actions": _derive_actions(findings),
+ "requires_human_decision": True,
+ }
+
+ async with get_db_context() as db:
+ await db.execute(
+ _sql("""
+ INSERT INTO automation_operation_log (
+ operation_type, actor, status,
+ input, output, tags
+ ) VALUES (
+ 'capacity_recommendation',
+ 'capacity_forecaster',
+ 'success',
+ CAST(:input AS jsonb),
+ CAST(:output AS jsonb),
+ :tags
+ )
+ """),
+ {
+ "input": _json.dumps(input_payload, ensure_ascii=False),
+ "output": _json.dumps(output_payload, ensure_ascii=False),
+ "tags": ["capacity", "forecast", "phase4", "predict_linear"],
+ },
+ )
+ return True
+ except Exception as e:
+ logger.warning("capacity_forecast_aol_failed", host=host, error=str(e))
+ return False
+
+
+def _derive_actions(findings: list[dict[str, Any]]) -> list[str]:
+ """根據 findings 產生建議動作清單."""
+ actions: list[str] = []
+ queries = {f["query"] for f in findings}
+ if "disk_saturation_7d" in queries:
+ actions.append("清理 /var/log, /var/lib/docker, PG WAL archive;或擴容磁碟")
+ if "mem_saturation_7d" in queries:
+ actions.append("檢查 top mem consumer;考慮加記憶體或調整 JVM/Redis maxmemory")
+ if "cpu_high_7d_trend" in queries:
+ actions.append("分析 top CPU process;考慮擴充 vCPU 或 scale out")
+ if not actions:
+ actions.append("人工審查各指標")
+ return actions
+
+
+async def _send_telegram_forecast(risks: dict[str, list[dict[str, Any]]]) -> bool:
+ """推 Telegram 預測摘要."""
+ try:
+ import html
+ from src.services.telegram_gateway import get_telegram_gateway
+
+ if not settings.OPENCLAW_TG_CHAT_ID:
+ return False
+
+ lines = [
+ "📈 容量預測 (Phase 4 Holt-Winters MVP)",
+ f"未來 7 天高風險 host: {len(risks)} 台",
+ "",
+ ]
+ for host, findings in list(risks.items())[:10]:
+ lines.append(f"🟡 {html.escape(host)}")
+ for f in findings[:3]:
+ lines.append(f" ▸ {html.escape(f['reason'])} (value={f['value']:.2f})")
+ actions = _derive_actions(findings)
+ if actions:
+ lines.append(f" 建議: {html.escape(actions[0])[:100]}")
+ lines.append("")
+ lines.append("決策: 人工評估擴容/清理時機")
+
+ msg = "\n".join(lines)
+
+ tg = get_telegram_gateway()
+ await tg._send_request("sendMessage", { # type: ignore[attr-defined]
+ "chat_id": settings.OPENCLAW_TG_CHAT_ID,
+ "text": msg,
+ "parse_mode": "HTML",
+ "disable_web_page_preview": True,
+ })
+ return True
+ except Exception as e:
+ logger.warning("capacity_forecast_telegram_failed", error=str(e))
+ return False
+
+
+def _seconds_until_next_trigger() -> float:
+ tz_taipei = timezone(timedelta(hours=8))
+ now = datetime.now(tz_taipei)
+ today_trigger = now.replace(hour=_DAILY_TRIGGER_HOUR_TAIPEI, minute=0, second=0, microsecond=0)
+ if now >= today_trigger:
+ today_trigger = today_trigger + timedelta(days=1)
+ delta = (today_trigger - now).total_seconds()
+ return max(300.0, min(delta, 25 * 3600))
diff --git a/apps/api/src/main.py b/apps/api/src/main.py
index 9405c7ee..aced2951 100644
--- a/apps/api/src/main.py
+++ b/apps/api/src/main.py
@@ -450,6 +450,16 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
except Exception as e:
logger.warning("hermes_rule_quality_loop_schedule_failed", error=str(e))
+ # ADR-090 § Phase 4 Capacity Forecaster (2026-04-19 ogt + Claude Opus 4.7 Asia/Taipei)
+ # 每日 05:00 Taipei 用 Prometheus predict_linear 預測未來 7d disk/mem/cpu 飽和
+ # 高風險 host 寫 aol(capacity_recommendation) + Telegram 建議
+ try:
+ from src.jobs.capacity_forecaster_job import run_capacity_forecaster_loop
+ asyncio.create_task(run_capacity_forecaster_loop())
+ logger.info("capacity_forecaster_loop_scheduled", daily_trigger_hour_taipei=5)
+ except Exception as e:
+ logger.warning("capacity_forecaster_loop_schedule_failed", error=str(e))
+
# ADR-076 Task 4: 每日 08:00 台北時間自動日度巡檢報告
# 2026-04-14 Claude Haiku 4.5 Asia/Taipei
try: