Files
ewoooc/services/cost_throttle_service.py
OoO c13dc22639
All checks were successful
CD Pipeline / deploy (push) Successful in 2m44s
feat(p20)+docs: cost auto-throttle + LLM 模型完整評估
Operation Ollama-First v5.0 / Phase 20 + LLM 模型治理

services/cost_throttle_service.py (新檔, 200+ 行)
- evaluate_throttle_status() 每小時 cron 跑
- 查 ai_call_budgets monthly × 累計 spent → 月底線性外推
- 推估 > 預算 110% → 標 throttled(hysteresis:降到 95% 才解除)
- _push_throttle_alerts: 狀態變化推 Telegram
- is_provider_throttled(provider) public API(給 anthropic/gemini caller 啟動 check)
- COST_THROTTLE_ENABLED 預設 OFF(避免戰時誤節流)

run_scheduler.py 加 2 cron + task wrapper
- 每 1 小時:cost_throttle_evaluate
- 每日 00:05:cost_throttle_reset_if_new_month

docs/llm_model_full_evaluation_20260504.md (260+ 行)
- 場景 × 模型對應矩陣(4 大層次)
  戰術層 / 戰略層 / 多模態 / 雲端 API
- 本次啟動的追加 4 模型(qwen2.5-coder:32b / deepseek-r1:14b /
  llava / gemma3:4b)— Primary + Secondary 並行拉
- Phase 21 路由優化建議(context size + complexity 動態選 model)
- Phase 22 多供應商編排 + cost throttle 整合
- 儲存 / RAM / 延遲評估
- 模型治理 SOP(新增 / 替換 / 淘汰)
- COST_TABLE 對齊(含 deepseek 直連價格)

啟用前置(待統帥):
1. Primary + Secondary 4 模型拉完(背景進行中)
2. .env: COST_THROTTLE_ENABLED=true(觀察 1 週後)
3. ANTHROPIC_API_KEY 設後 Code Review 自動切 Claude Opus 4.7

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 10:36:56 +08:00

265 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/cost_throttle_service.py
Operation Ollama-First v5.0 / Phase 20 — 成本自動節流Cost Auto-Throttle
設計原則:
- 主動節流(不只被動告警):當月預估成本超預算 110% → 自動 throttle
- throttle 機制:在記憶體 cache 標記 provider主流程讀取後改路由
- claude throttled → fallback Gemini Flash
- gemini throttled → fallback Hermes Ollama
- nim throttled → fallback Hermes 規則引擎
- 每小時 cron 重評估
- Telegram 推播 throttle / unthrottle 事件
- feature flag COST_THROTTLE_ENABLED 預設 OFF避免戰時誤節流
對應 ai_call_budgets 表migration 025
daily / weekly / monthly × 各 provider × 80% alert_pct
本服務只看 monthly線性外推月底成本
"""
from __future__ import annotations
import os
import time
import logging
import threading
from datetime import datetime, timedelta
from calendar import monthrange
from typing import Dict, Any, Optional, List
logger = logging.getLogger(__name__)
# ─────────────────────────────────────────────────────────────────────────────
# Feature flag + 配置
# ─────────────────────────────────────────────────────────────────────────────
COST_THROTTLE_ENABLED = os.getenv('COST_THROTTLE_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on')
COST_THROTTLE_PROJECT_RATIO = float(os.getenv('COST_THROTTLE_PROJECT_RATIO', '1.10')) # 預估超預算 110%
COST_UNTHROTTLE_PROJECT_RATIO = float(os.getenv('COST_UNTHROTTLE_PROJECT_RATIO', '0.95')) # 降回 95% 解除
def is_cost_throttle_enabled() -> bool:
"""Runtime check避免 import-time freeze"""
return os.getenv('COST_THROTTLE_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on')
# ─────────────────────────────────────────────────────────────────────────────
# Throttle 狀態(記憶體,由 cron 每小時更新)
# ─────────────────────────────────────────────────────────────────────────────
_throttle_state: Dict[str, Dict[str, Any]] = {} # {provider: {throttled, projected, budget, ...}}
_state_lock = threading.Lock()
_state_ts = 0.0
def is_provider_throttled(provider: str) -> bool:
"""Public API — 給 anthropic_service / gemini caller 啟動時 check"""
if not is_cost_throttle_enabled():
return False
with _state_lock:
info = _throttle_state.get(provider)
return bool(info and info.get('throttled'))
def get_throttle_state() -> Dict[str, Dict[str, Any]]:
"""除錯用:取 throttle 狀態 snapshot"""
with _state_lock:
return dict(_throttle_state)
# ─────────────────────────────────────────────────────────────────────────────
# 核心:每小時跑的 evaluate
# ─────────────────────────────────────────────────────────────────────────────
def _days_in_current_month(today: Optional[datetime] = None) -> int:
today = today or datetime.now()
return monthrange(today.year, today.month)[1]
def _month_start(today: Optional[datetime] = None) -> datetime:
today = today or datetime.now()
return datetime(today.year, today.month, 1)
def evaluate_throttle_status() -> Dict[str, Dict[str, Any]]:
"""每小時 cron 跑:查 ai_call_budgets vs 當月 spent計算月底推估。
Returns:
{provider: {throttled, spent, budget, projected, reason}}
"""
global _state_ts
try:
from sqlalchemy import text as sa_text
from database.manager import get_session
except Exception as exc:
logger.warning('[CostThrottle] DB import failed: %s', exc)
return {}
today = datetime.now()
days_elapsed = max(today.day, 1)
days_in_month = _days_in_current_month(today)
month_start = _month_start(today)
session = get_session()
new_state: Dict[str, Dict[str, Any]] = {}
try:
# 1. 取當月 monthly budget不包 NULL provider 的全供應商總額)
budgets = session.execute(
sa_text("""
SELECT provider, budget_usd, alert_pct
FROM ai_call_budgets
WHERE period = 'monthly' AND provider IS NOT NULL
"""),
).fetchall()
# 2. 取當月每 provider 累計 cost_usd
spent_rows = session.execute(
sa_text("""
SELECT provider, COALESCE(SUM(cost_usd), 0) AS spent
FROM ai_calls
WHERE called_at >= :ms
GROUP BY provider
"""),
{'ms': month_start},
).fetchall()
spent_by_provider: Dict[str, float] = {
row[0]: float(row[1] or 0) for row in spent_rows
}
# 3. 計算每 provider 月底推估 + 決定是否 throttle
for row in budgets:
provider = row[0]
budget = float(row[1] or 0)
spent = spent_by_provider.get(provider, 0.0)
projected = spent / days_elapsed * days_in_month
ratio = projected / budget if budget > 0 else 0.0
# 取既有狀態hysteresisthrottled 後降到 95% 才解除)
with _state_lock:
prev = _throttle_state.get(provider, {})
prev_throttled = prev.get('throttled', False)
should_throttle = False
reason = ''
if prev_throttled:
# 已 throttled → 降到 unthrottle threshold (95%) 才解除
if ratio >= COST_UNTHROTTLE_PROJECT_RATIO:
should_throttle = True
reason = f'still over unthrottle threshold ({ratio:.2%} >= 95%)'
else:
reason = f'recovered ({ratio:.2%} < 95%) — unthrottle'
else:
# 未 throttled → 超 110% 才開始
if ratio >= COST_THROTTLE_PROJECT_RATIO and budget > 0:
should_throttle = True
reason = (
f'projected ${projected:.2f} > budget ${budget:.2f} × '
f'{COST_THROTTLE_PROJECT_RATIO} (ratio={ratio:.2%})'
)
new_state[provider] = {
'throttled': should_throttle,
'spent': spent,
'budget': budget,
'projected': round(projected, 4),
'ratio': round(ratio, 4),
'reason': reason,
'evaluated_at': today.isoformat(),
}
except Exception as exc:
logger.error('[CostThrottle] evaluate failed: %s', exc)
return {}
finally:
session.close()
# 4. 偵測狀態變化推 Telegram
transitions = _diff_transitions(new_state)
# 5. atomic update
with _state_lock:
_throttle_state.clear()
_throttle_state.update(new_state)
_state_ts = time.time()
# 6. 推 Telegram變化時
if transitions:
_push_throttle_alerts(transitions)
return new_state
def _diff_transitions(new_state: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]:
"""偵測狀態變化newly throttled / unthrottled"""
transitions = []
with _state_lock:
for provider, info in new_state.items():
prev = _throttle_state.get(provider, {})
prev_throttled = prev.get('throttled', False)
curr_throttled = info.get('throttled', False)
if prev_throttled != curr_throttled:
transitions.append({
'provider': provider,
'from': prev_throttled,
'to': curr_throttled,
'info': info,
})
return transitions
def _push_throttle_alerts(transitions: List[Dict[str, Any]]) -> None:
"""推 Telegram throttle/unthrottle 事件"""
try:
from services.telegram_templates import _send_telegram_raw
except Exception:
return
for t in transitions:
provider = t['provider']
info = t['info']
if t['to']: # newly throttled
msg = (
f"⚠️ <b>成本自動節流啟動</b>\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"📊 Provider: <code>{provider}</code>\n"
f"💰 已花費: ${info['spent']:.2f} / 預算 ${info['budget']:.2f}\n"
f"📈 月底推估: ${info['projected']:.2f} (ratio {info['ratio']:.0%})\n"
f"🔧 原因: {info['reason']}\n\n"
f"自動切換 fallback 路由直到月底推估 < 95%"
)
else: # unthrottled
msg = (
f"✅ <b>成本節流解除</b>\n"
f"━━━━━━━━━━━━━━━━━━━━\n"
f"📊 Provider: <code>{provider}</code>\n"
f"📈 月底推估: ${info['projected']:.2f} / 預算 ${info['budget']:.2f}"
f" (ratio {info['ratio']:.0%})\n"
f"恢復正常路由"
)
try:
_send_telegram_raw(msg)
except Exception as exc:
logger.warning('[CostThrottle] telegram push failed: %s', exc)
# ─────────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────────
def reset_throttle_state() -> None:
"""月初 cron 跑:清空 _throttle_state重新計算"""
global _state_ts
with _state_lock:
_throttle_state.clear()
_state_ts = 0.0
logger.info('[CostThrottle] state reset (新月份)')
__all__ = [
'evaluate_throttle_status',
'is_provider_throttled',
'is_cost_throttle_enabled',
'get_throttle_state',
'reset_throttle_state',
'COST_THROTTLE_PROJECT_RATIO',
'COST_UNTHROTTLE_PROJECT_RATIO',
]