All checks were successful
CD Pipeline / deploy (push) Successful in 2m44s
Operation Ollama-First v5.0 / Phase 20 + LLM 模型治理 services/cost_throttle_service.py (新檔, 200+ 行) - evaluate_throttle_status() 每小時 cron 跑 - 查 ai_call_budgets monthly × 累計 spent → 月底線性外推 - 推估 > 預算 110% → 標 throttled(hysteresis:降到 95% 才解除) - _push_throttle_alerts: 狀態變化推 Telegram - is_provider_throttled(provider) public API(給 anthropic/gemini caller 啟動 check) - COST_THROTTLE_ENABLED 預設 OFF(避免戰時誤節流) run_scheduler.py 加 2 cron + task wrapper - 每 1 小時:cost_throttle_evaluate - 每日 00:05:cost_throttle_reset_if_new_month docs/llm_model_full_evaluation_20260504.md (260+ 行) - 場景 × 模型對應矩陣(4 大層次) 戰術層 / 戰略層 / 多模態 / 雲端 API - 本次啟動的追加 4 模型(qwen2.5-coder:32b / deepseek-r1:14b / llava / gemma3:4b)— Primary + Secondary 並行拉 - Phase 21 路由優化建議(context size + complexity 動態選 model) - Phase 22 多供應商編排 + cost throttle 整合 - 儲存 / RAM / 延遲評估 - 模型治理 SOP(新增 / 替換 / 淘汰) - COST_TABLE 對齊(含 deepseek 直連價格) 啟用前置(待統帥): 1. Primary + Secondary 4 模型拉完(背景進行中) 2. .env: COST_THROTTLE_ENABLED=true(觀察 1 週後) 3. ANTHROPIC_API_KEY 設後 Code Review 自動切 Claude Opus 4.7 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
265 lines
11 KiB
Python
265 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
services/cost_throttle_service.py
|
||
Operation Ollama-First v5.0 / Phase 20 — 成本自動節流(Cost Auto-Throttle)
|
||
|
||
設計原則:
|
||
- 主動節流(不只被動告警):當月預估成本超預算 110% → 自動 throttle
|
||
- throttle 機制:在記憶體 cache 標記 provider,主流程讀取後改路由
|
||
- claude throttled → fallback Gemini Flash
|
||
- gemini throttled → fallback Hermes Ollama
|
||
- nim throttled → fallback Hermes 規則引擎
|
||
- 每小時 cron 重評估
|
||
- Telegram 推播 throttle / unthrottle 事件
|
||
- feature flag COST_THROTTLE_ENABLED 預設 OFF(避免戰時誤節流)
|
||
|
||
對應 ai_call_budgets 表(migration 025):
|
||
daily / weekly / monthly × 各 provider × 80% alert_pct
|
||
本服務只看 monthly(線性外推月底成本)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
import os
|
||
import time
|
||
import logging
|
||
import threading
|
||
from datetime import datetime, timedelta
|
||
from calendar import monthrange
|
||
from typing import Dict, Any, Optional, List
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Feature flag + 配置
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
COST_THROTTLE_ENABLED = os.getenv('COST_THROTTLE_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on')
|
||
COST_THROTTLE_PROJECT_RATIO = float(os.getenv('COST_THROTTLE_PROJECT_RATIO', '1.10')) # 預估超預算 110%
|
||
COST_UNTHROTTLE_PROJECT_RATIO = float(os.getenv('COST_UNTHROTTLE_PROJECT_RATIO', '0.95')) # 降回 95% 解除
|
||
|
||
|
||
def is_cost_throttle_enabled() -> bool:
|
||
"""Runtime check(避免 import-time freeze)"""
|
||
return os.getenv('COST_THROTTLE_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on')
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Throttle 狀態(記憶體,由 cron 每小時更新)
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
_throttle_state: Dict[str, Dict[str, Any]] = {} # {provider: {throttled, projected, budget, ...}}
|
||
_state_lock = threading.Lock()
|
||
_state_ts = 0.0
|
||
|
||
|
||
def is_provider_throttled(provider: str) -> bool:
|
||
"""Public API — 給 anthropic_service / gemini caller 啟動時 check"""
|
||
if not is_cost_throttle_enabled():
|
||
return False
|
||
with _state_lock:
|
||
info = _throttle_state.get(provider)
|
||
return bool(info and info.get('throttled'))
|
||
|
||
|
||
def get_throttle_state() -> Dict[str, Dict[str, Any]]:
|
||
"""除錯用:取 throttle 狀態 snapshot"""
|
||
with _state_lock:
|
||
return dict(_throttle_state)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# 核心:每小時跑的 evaluate
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
def _days_in_current_month(today: Optional[datetime] = None) -> int:
|
||
today = today or datetime.now()
|
||
return monthrange(today.year, today.month)[1]
|
||
|
||
|
||
def _month_start(today: Optional[datetime] = None) -> datetime:
|
||
today = today or datetime.now()
|
||
return datetime(today.year, today.month, 1)
|
||
|
||
|
||
def evaluate_throttle_status() -> Dict[str, Dict[str, Any]]:
|
||
"""每小時 cron 跑:查 ai_call_budgets vs 當月 spent,計算月底推估。
|
||
|
||
Returns:
|
||
{provider: {throttled, spent, budget, projected, reason}}
|
||
"""
|
||
global _state_ts
|
||
|
||
try:
|
||
from sqlalchemy import text as sa_text
|
||
from database.manager import get_session
|
||
except Exception as exc:
|
||
logger.warning('[CostThrottle] DB import failed: %s', exc)
|
||
return {}
|
||
|
||
today = datetime.now()
|
||
days_elapsed = max(today.day, 1)
|
||
days_in_month = _days_in_current_month(today)
|
||
month_start = _month_start(today)
|
||
|
||
session = get_session()
|
||
new_state: Dict[str, Dict[str, Any]] = {}
|
||
try:
|
||
# 1. 取當月 monthly budget(不包 NULL provider 的全供應商總額)
|
||
budgets = session.execute(
|
||
sa_text("""
|
||
SELECT provider, budget_usd, alert_pct
|
||
FROM ai_call_budgets
|
||
WHERE period = 'monthly' AND provider IS NOT NULL
|
||
"""),
|
||
).fetchall()
|
||
|
||
# 2. 取當月每 provider 累計 cost_usd
|
||
spent_rows = session.execute(
|
||
sa_text("""
|
||
SELECT provider, COALESCE(SUM(cost_usd), 0) AS spent
|
||
FROM ai_calls
|
||
WHERE called_at >= :ms
|
||
GROUP BY provider
|
||
"""),
|
||
{'ms': month_start},
|
||
).fetchall()
|
||
spent_by_provider: Dict[str, float] = {
|
||
row[0]: float(row[1] or 0) for row in spent_rows
|
||
}
|
||
|
||
# 3. 計算每 provider 月底推估 + 決定是否 throttle
|
||
for row in budgets:
|
||
provider = row[0]
|
||
budget = float(row[1] or 0)
|
||
spent = spent_by_provider.get(provider, 0.0)
|
||
projected = spent / days_elapsed * days_in_month
|
||
ratio = projected / budget if budget > 0 else 0.0
|
||
|
||
# 取既有狀態(hysteresis:throttled 後降到 95% 才解除)
|
||
with _state_lock:
|
||
prev = _throttle_state.get(provider, {})
|
||
prev_throttled = prev.get('throttled', False)
|
||
|
||
should_throttle = False
|
||
reason = ''
|
||
if prev_throttled:
|
||
# 已 throttled → 降到 unthrottle threshold (95%) 才解除
|
||
if ratio >= COST_UNTHROTTLE_PROJECT_RATIO:
|
||
should_throttle = True
|
||
reason = f'still over unthrottle threshold ({ratio:.2%} >= 95%)'
|
||
else:
|
||
reason = f'recovered ({ratio:.2%} < 95%) — unthrottle'
|
||
else:
|
||
# 未 throttled → 超 110% 才開始
|
||
if ratio >= COST_THROTTLE_PROJECT_RATIO and budget > 0:
|
||
should_throttle = True
|
||
reason = (
|
||
f'projected ${projected:.2f} > budget ${budget:.2f} × '
|
||
f'{COST_THROTTLE_PROJECT_RATIO} (ratio={ratio:.2%})'
|
||
)
|
||
|
||
new_state[provider] = {
|
||
'throttled': should_throttle,
|
||
'spent': spent,
|
||
'budget': budget,
|
||
'projected': round(projected, 4),
|
||
'ratio': round(ratio, 4),
|
||
'reason': reason,
|
||
'evaluated_at': today.isoformat(),
|
||
}
|
||
|
||
except Exception as exc:
|
||
logger.error('[CostThrottle] evaluate failed: %s', exc)
|
||
return {}
|
||
finally:
|
||
session.close()
|
||
|
||
# 4. 偵測狀態變化推 Telegram
|
||
transitions = _diff_transitions(new_state)
|
||
|
||
# 5. atomic update
|
||
with _state_lock:
|
||
_throttle_state.clear()
|
||
_throttle_state.update(new_state)
|
||
_state_ts = time.time()
|
||
|
||
# 6. 推 Telegram(變化時)
|
||
if transitions:
|
||
_push_throttle_alerts(transitions)
|
||
|
||
return new_state
|
||
|
||
|
||
def _diff_transitions(new_state: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""偵測狀態變化(newly throttled / unthrottled)"""
|
||
transitions = []
|
||
with _state_lock:
|
||
for provider, info in new_state.items():
|
||
prev = _throttle_state.get(provider, {})
|
||
prev_throttled = prev.get('throttled', False)
|
||
curr_throttled = info.get('throttled', False)
|
||
if prev_throttled != curr_throttled:
|
||
transitions.append({
|
||
'provider': provider,
|
||
'from': prev_throttled,
|
||
'to': curr_throttled,
|
||
'info': info,
|
||
})
|
||
return transitions
|
||
|
||
|
||
def _push_throttle_alerts(transitions: List[Dict[str, Any]]) -> None:
|
||
"""推 Telegram throttle/unthrottle 事件"""
|
||
try:
|
||
from services.telegram_templates import _send_telegram_raw
|
||
except Exception:
|
||
return
|
||
|
||
for t in transitions:
|
||
provider = t['provider']
|
||
info = t['info']
|
||
if t['to']: # newly throttled
|
||
msg = (
|
||
f"⚠️ <b>成本自動節流啟動</b>\n"
|
||
f"━━━━━━━━━━━━━━━━━━━━\n"
|
||
f"📊 Provider: <code>{provider}</code>\n"
|
||
f"💰 已花費: ${info['spent']:.2f} / 預算 ${info['budget']:.2f}\n"
|
||
f"📈 月底推估: ${info['projected']:.2f} (ratio {info['ratio']:.0%})\n"
|
||
f"🔧 原因: {info['reason']}\n\n"
|
||
f"自動切換 fallback 路由直到月底推估 < 95%"
|
||
)
|
||
else: # unthrottled
|
||
msg = (
|
||
f"✅ <b>成本節流解除</b>\n"
|
||
f"━━━━━━━━━━━━━━━━━━━━\n"
|
||
f"📊 Provider: <code>{provider}</code>\n"
|
||
f"📈 月底推估: ${info['projected']:.2f} / 預算 ${info['budget']:.2f}"
|
||
f" (ratio {info['ratio']:.0%})\n"
|
||
f"恢復正常路由"
|
||
)
|
||
try:
|
||
_send_telegram_raw(msg)
|
||
except Exception as exc:
|
||
logger.warning('[CostThrottle] telegram push failed: %s', exc)
|
||
|
||
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
# Public API
|
||
# ─────────────────────────────────────────────────────────────────────────────
|
||
def reset_throttle_state() -> None:
|
||
"""月初 cron 跑:清空 _throttle_state,重新計算"""
|
||
global _state_ts
|
||
with _state_lock:
|
||
_throttle_state.clear()
|
||
_state_ts = 0.0
|
||
logger.info('[CostThrottle] state reset (新月份)')
|
||
|
||
|
||
__all__ = [
|
||
'evaluate_throttle_status',
|
||
'is_provider_throttled',
|
||
'is_cost_throttle_enabled',
|
||
'get_throttle_state',
|
||
'reset_throttle_state',
|
||
'COST_THROTTLE_PROJECT_RATIO',
|
||
'COST_UNTHROTTLE_PROJECT_RATIO',
|
||
]
|