#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/cost_throttle_service.py
Operation Ollama-First v5.0 / Phase 20 — 成本自動節流(Cost Auto-Throttle)
設計原則:
- 主動節流(不只被動告警):當月預估成本超預算 110% → 自動 throttle
- throttle 機制:在記憶體 cache 標記 provider,主流程讀取後改路由
- claude throttled → fallback Gemini Flash
- gemini throttled → fallback Hermes Ollama
- nim throttled → fallback Hermes 規則引擎
- 每小時 cron 重評估
- Telegram 推播 throttle / unthrottle 事件
- feature flag COST_THROTTLE_ENABLED 預設 OFF(避免戰時誤節流)
對應 ai_call_budgets 表(migration 025):
daily / weekly / monthly × 各 provider × 80% alert_pct
本服務只看 monthly(線性外推月底成本)
"""
from __future__ import annotations
import os
import time
import logging
import threading
from datetime import datetime, timedelta
from calendar import monthrange
from typing import Dict, Any, Optional, List
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Feature flag + 配置
# ─────────────────────────────────────────────────────────────────────────────
COST_THROTTLE_ENABLED = os.getenv('COST_THROTTLE_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on')
COST_THROTTLE_PROJECT_RATIO = float(os.getenv('COST_THROTTLE_PROJECT_RATIO', '1.10')) # 預估超預算 110%
COST_UNTHROTTLE_PROJECT_RATIO = float(os.getenv('COST_UNTHROTTLE_PROJECT_RATIO', '0.95')) # 降回 95% 解除
def is_cost_throttle_enabled() -> bool:
"""Runtime check(避免 import-time freeze)"""
return os.getenv('COST_THROTTLE_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on')
# ─────────────────────────────────────────────────────────────────────────────
# Throttle 狀態(記憶體,由 cron 每小時更新)
# ─────────────────────────────────────────────────────────────────────────────
_throttle_state: Dict[str, Dict[str, Any]] = {} # {provider: {throttled, projected, budget, ...}}
_state_lock = threading.Lock()
_state_ts = 0.0
def is_provider_throttled(provider: str) -> bool:
"""Public API — 給 anthropic_service / gemini caller 啟動時 check"""
if not is_cost_throttle_enabled():
return False
with _state_lock:
info = _throttle_state.get(provider)
return bool(info and info.get('throttled'))
def get_throttle_state() -> Dict[str, Dict[str, Any]]:
"""除錯用:取 throttle 狀態 snapshot"""
with _state_lock:
return dict(_throttle_state)
# ─────────────────────────────────────────────────────────────────────────────
# 核心:每小時跑的 evaluate
# ─────────────────────────────────────────────────────────────────────────────
def _days_in_current_month(today: Optional[datetime] = None) -> int:
today = today or datetime.now()
return monthrange(today.year, today.month)[1]
def _month_start(today: Optional[datetime] = None) -> datetime:
today = today or datetime.now()
return datetime(today.year, today.month, 1)
def evaluate_throttle_status() -> Dict[str, Dict[str, Any]]:
"""每小時 cron 跑:查 ai_call_budgets vs 當月 spent,計算月底推估。
Returns:
{provider: {throttled, spent, budget, projected, reason}}
"""
global _state_ts
try:
from sqlalchemy import text as sa_text
from database.manager import get_session
except Exception as exc:
logger.warning('[CostThrottle] DB import failed: %s', exc)
return {}
today = datetime.now()
days_elapsed = max(today.day, 1)
days_in_month = _days_in_current_month(today)
month_start = _month_start(today)
session = get_session()
new_state: Dict[str, Dict[str, Any]] = {}
try:
# 1. 取當月 monthly budget(不包 NULL provider 的全供應商總額)
budgets = session.execute(
sa_text("""
SELECT provider, budget_usd, alert_pct
FROM ai_call_budgets
WHERE period = 'monthly' AND provider IS NOT NULL
"""),
).fetchall()
# 2. 取當月每 provider 累計 cost_usd
spent_rows = session.execute(
sa_text("""
SELECT provider, COALESCE(SUM(cost_usd), 0) AS spent
FROM ai_calls
WHERE called_at >= :ms
GROUP BY provider
"""),
{'ms': month_start},
).fetchall()
spent_by_provider: Dict[str, float] = {
row[0]: float(row[1] or 0) for row in spent_rows
}
# 3. 計算每 provider 月底推估 + 決定是否 throttle
for row in budgets:
provider = row[0]
budget = float(row[1] or 0)
spent = spent_by_provider.get(provider, 0.0)
projected = spent / days_elapsed * days_in_month
ratio = projected / budget if budget > 0 else 0.0
# 取既有狀態(hysteresis:throttled 後降到 95% 才解除)
with _state_lock:
prev = _throttle_state.get(provider, {})
prev_throttled = prev.get('throttled', False)
should_throttle = False
reason = ''
if prev_throttled:
# 已 throttled → 降到 unthrottle threshold (95%) 才解除
if ratio >= COST_UNTHROTTLE_PROJECT_RATIO:
should_throttle = True
reason = f'still over unthrottle threshold ({ratio:.2%} >= 95%)'
else:
reason = f'recovered ({ratio:.2%} < 95%) — unthrottle'
else:
# 未 throttled → 超 110% 才開始
if ratio >= COST_THROTTLE_PROJECT_RATIO and budget > 0:
should_throttle = True
reason = (
f'projected ${projected:.2f} > budget ${budget:.2f} × '
f'{COST_THROTTLE_PROJECT_RATIO} (ratio={ratio:.2%})'
)
new_state[provider] = {
'throttled': should_throttle,
'spent': spent,
'budget': budget,
'projected': round(projected, 4),
'ratio': round(ratio, 4),
'reason': reason,
'evaluated_at': today.isoformat(),
}
except Exception as exc:
logger.error('[CostThrottle] evaluate failed: %s', exc)
return {}
finally:
session.close()
# 4. 偵測狀態變化推 Telegram
transitions = _diff_transitions(new_state)
# 5. atomic update
with _state_lock:
_throttle_state.clear()
_throttle_state.update(new_state)
_state_ts = time.time()
# 6. 推 Telegram(變化時)
if transitions:
_push_throttle_alerts(transitions)
return new_state
def _diff_transitions(new_state: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]:
"""偵測狀態變化(newly throttled / unthrottled)"""
transitions = []
with _state_lock:
for provider, info in new_state.items():
prev = _throttle_state.get(provider, {})
prev_throttled = prev.get('throttled', False)
curr_throttled = info.get('throttled', False)
if prev_throttled != curr_throttled:
transitions.append({
'provider': provider,
'from': prev_throttled,
'to': curr_throttled,
'info': info,
})
return transitions
def _push_throttle_alerts(transitions: List[Dict[str, Any]]) -> None:
"""推 Telegram throttle/unthrottle 事件"""
try:
from services.telegram_templates import _send_telegram_raw
except Exception:
return
for t in transitions:
provider = t['provider']
info = t['info']
if t['to']: # newly throttled
msg = (
f"⚠️ 成本自動節流啟動\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"📊 Provider: {provider}\n"
f"💰 已花費: ${info['spent']:.2f} / 預算 ${info['budget']:.2f}\n"
f"📈 月底推估: ${info['projected']:.2f} (ratio {info['ratio']:.0%})\n"
f"🔧 原因: {info['reason']}\n\n"
f"自動切換 fallback 路由直到月底推估 < 95%"
)
else: # unthrottled
msg = (
f"✅ 成本節流解除\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"📊 Provider: {provider}\n"
f"📈 月底推估: ${info['projected']:.2f} / 預算 ${info['budget']:.2f}"
f" (ratio {info['ratio']:.0%})\n"
f"恢復正常路由"
)
try:
_send_telegram_raw(msg)
except Exception as exc:
logger.warning('[CostThrottle] telegram push failed: %s', exc)
# ─────────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────────
def reset_throttle_state() -> None:
"""月初 cron 跑:清空 _throttle_state,重新計算"""
global _state_ts
with _state_lock:
_throttle_state.clear()
_state_ts = 0.0
logger.info('[CostThrottle] state reset (新月份)')
__all__ = [
'evaluate_throttle_status',
'is_provider_throttled',
'is_cost_throttle_enabled',
'get_throttle_state',
'reset_throttle_state',
'COST_THROTTLE_PROJECT_RATIO',
'COST_UNTHROTTLE_PROJECT_RATIO',
]