#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ services/cost_throttle_service.py Operation Ollama-First v5.0 / Phase 20 — 成本自動節流(Cost Auto-Throttle) 設計原則: - 主動節流(不只被動告警):當月預估成本超預算 110% → 自動 throttle - throttle 機制:在記憶體 cache 標記 provider,主流程讀取後改路由 - claude throttled → fallback Gemini Flash - gemini throttled → fallback Hermes Ollama - nim throttled → fallback Hermes 規則引擎 - 每小時 cron 重評估 - Telegram 推播 throttle / unthrottle 事件 - feature flag COST_THROTTLE_ENABLED 預設 OFF(避免戰時誤節流) 對應 ai_call_budgets 表(migration 025): daily / weekly / monthly × 各 provider × 80% alert_pct 本服務只看 monthly(線性外推月底成本) """ from __future__ import annotations import os import time import logging import threading from datetime import datetime, timedelta from calendar import monthrange from typing import Dict, Any, Optional, List logger = logging.getLogger(__name__) # ───────────────────────────────────────────────────────────────────────────── # Feature flag + 配置 # ───────────────────────────────────────────────────────────────────────────── COST_THROTTLE_ENABLED = os.getenv('COST_THROTTLE_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on') COST_THROTTLE_PROJECT_RATIO = float(os.getenv('COST_THROTTLE_PROJECT_RATIO', '1.10')) # 預估超預算 110% COST_UNTHROTTLE_PROJECT_RATIO = float(os.getenv('COST_UNTHROTTLE_PROJECT_RATIO', '0.95')) # 降回 95% 解除 def is_cost_throttle_enabled() -> bool: """Runtime check(避免 import-time freeze)""" return os.getenv('COST_THROTTLE_ENABLED', 'false').strip().lower() in ('true', '1', 'yes', 'on') # ───────────────────────────────────────────────────────────────────────────── # Throttle 狀態(記憶體,由 cron 每小時更新) # ───────────────────────────────────────────────────────────────────────────── _throttle_state: Dict[str, Dict[str, Any]] = {} # {provider: {throttled, projected, budget, ...}} _state_lock = threading.Lock() _state_ts = 0.0 def is_provider_throttled(provider: str) -> bool: """Public API — 給 anthropic_service / gemini caller 啟動時 check""" if not is_cost_throttle_enabled(): return False with _state_lock: info = _throttle_state.get(provider) return bool(info and info.get('throttled')) def get_throttle_state() -> Dict[str, Dict[str, Any]]: """除錯用:取 throttle 狀態 snapshot""" with _state_lock: return dict(_throttle_state) # ───────────────────────────────────────────────────────────────────────────── # 核心:每小時跑的 evaluate # ───────────────────────────────────────────────────────────────────────────── def _days_in_current_month(today: Optional[datetime] = None) -> int: today = today or datetime.now() return monthrange(today.year, today.month)[1] def _month_start(today: Optional[datetime] = None) -> datetime: today = today or datetime.now() return datetime(today.year, today.month, 1) def evaluate_throttle_status() -> Dict[str, Dict[str, Any]]: """每小時 cron 跑:查 ai_call_budgets vs 當月 spent,計算月底推估。 Returns: {provider: {throttled, spent, budget, projected, reason}} """ global _state_ts try: from sqlalchemy import text as sa_text from database.manager import get_session except Exception as exc: logger.warning('[CostThrottle] DB import failed: %s', exc) return {} today = datetime.now() days_elapsed = max(today.day, 1) days_in_month = _days_in_current_month(today) month_start = _month_start(today) session = get_session() new_state: Dict[str, Dict[str, Any]] = {} try: # 1. 取當月 monthly budget(不包 NULL provider 的全供應商總額) budgets = session.execute( sa_text(""" SELECT provider, budget_usd, alert_pct FROM ai_call_budgets WHERE period = 'monthly' AND provider IS NOT NULL """), ).fetchall() # 2. 取當月每 provider 累計 cost_usd spent_rows = session.execute( sa_text(""" SELECT provider, COALESCE(SUM(cost_usd), 0) AS spent FROM ai_calls WHERE called_at >= :ms GROUP BY provider """), {'ms': month_start}, ).fetchall() spent_by_provider: Dict[str, float] = { row[0]: float(row[1] or 0) for row in spent_rows } # 3. 計算每 provider 月底推估 + 決定是否 throttle for row in budgets: provider = row[0] budget = float(row[1] or 0) spent = spent_by_provider.get(provider, 0.0) projected = spent / days_elapsed * days_in_month ratio = projected / budget if budget > 0 else 0.0 # 取既有狀態(hysteresis:throttled 後降到 95% 才解除) with _state_lock: prev = _throttle_state.get(provider, {}) prev_throttled = prev.get('throttled', False) should_throttle = False reason = '' if prev_throttled: # 已 throttled → 降到 unthrottle threshold (95%) 才解除 if ratio >= COST_UNTHROTTLE_PROJECT_RATIO: should_throttle = True reason = f'still over unthrottle threshold ({ratio:.2%} >= 95%)' else: reason = f'recovered ({ratio:.2%} < 95%) — unthrottle' else: # 未 throttled → 超 110% 才開始 if ratio >= COST_THROTTLE_PROJECT_RATIO and budget > 0: should_throttle = True reason = ( f'projected ${projected:.2f} > budget ${budget:.2f} × ' f'{COST_THROTTLE_PROJECT_RATIO} (ratio={ratio:.2%})' ) new_state[provider] = { 'throttled': should_throttle, 'spent': spent, 'budget': budget, 'projected': round(projected, 4), 'ratio': round(ratio, 4), 'reason': reason, 'evaluated_at': today.isoformat(), } except Exception as exc: logger.error('[CostThrottle] evaluate failed: %s', exc) return {} finally: session.close() # 4. 偵測狀態變化推 Telegram transitions = _diff_transitions(new_state) # 5. atomic update with _state_lock: _throttle_state.clear() _throttle_state.update(new_state) _state_ts = time.time() # 6. 推 Telegram(變化時) if transitions: _push_throttle_alerts(transitions) return new_state def _diff_transitions(new_state: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: """偵測狀態變化(newly throttled / unthrottled)""" transitions = [] with _state_lock: for provider, info in new_state.items(): prev = _throttle_state.get(provider, {}) prev_throttled = prev.get('throttled', False) curr_throttled = info.get('throttled', False) if prev_throttled != curr_throttled: transitions.append({ 'provider': provider, 'from': prev_throttled, 'to': curr_throttled, 'info': info, }) return transitions def _push_throttle_alerts(transitions: List[Dict[str, Any]]) -> None: """推 Telegram throttle/unthrottle 事件""" try: from services.telegram_templates import _send_telegram_raw except Exception: return for t in transitions: provider = t['provider'] info = t['info'] if t['to']: # newly throttled msg = ( f"⚠️ 成本自動節流啟動\n" f"━━━━━━━━━━━━━━━━━━━━\n" f"📊 Provider: {provider}\n" f"💰 已花費: ${info['spent']:.2f} / 預算 ${info['budget']:.2f}\n" f"📈 月底推估: ${info['projected']:.2f} (ratio {info['ratio']:.0%})\n" f"🔧 原因: {info['reason']}\n\n" f"自動切換 fallback 路由直到月底推估 < 95%" ) else: # unthrottled msg = ( f"✅ 成本節流解除\n" f"━━━━━━━━━━━━━━━━━━━━\n" f"📊 Provider: {provider}\n" f"📈 月底推估: ${info['projected']:.2f} / 預算 ${info['budget']:.2f}" f" (ratio {info['ratio']:.0%})\n" f"恢復正常路由" ) try: _send_telegram_raw(msg) except Exception as exc: logger.warning('[CostThrottle] telegram push failed: %s', exc) # ───────────────────────────────────────────────────────────────────────────── # Public API # ───────────────────────────────────────────────────────────────────────────── def reset_throttle_state() -> None: """月初 cron 跑:清空 _throttle_state,重新計算""" global _state_ts with _state_lock: _throttle_state.clear() _state_ts = 0.0 logger.info('[CostThrottle] state reset (新月份)') __all__ = [ 'evaluate_throttle_status', 'is_provider_throttled', 'is_cost_throttle_enabled', 'get_throttle_state', 'reset_throttle_state', 'COST_THROTTLE_PROJECT_RATIO', 'COST_UNTHROTTLE_PROJECT_RATIO', ]