feat(ai): $5 USD 成本上限 + 自動切換 Ollama (2026-03-29 ogt)

統帥要求:
1. 累積成本超過 $5 USD → 自動停用 Gemini,切換回 Ollama
2. 發送 Telegram 告警通知統帥
3. $4 USD 時發送警告

實作:
- ai_rate_limiter.py: 新增 COST_LIMITS, record_cost(), reset_cost()
- openclaw.py: 每次成功呼叫後記錄成本
- 成本存入 Redis (不過期,手動重置)
- 重置指令: redis-cli DEL ai_rate:total_cost:gemini

API 端點: GET /api/v1/health/ai-usage
- 顯示 total_cost_usd.current/limit/remaining
- 顯示 cost_exceeded: true/false

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-28 22:34:51 +08:00
parent 863fc5a426
commit c76a10ad6e
2 changed files with 219 additions and 3 deletions

View File

@@ -8,10 +8,12 @@ AI Rate Limiter - Gemini API 用量閥值控制
- 每分鐘請求限制 (RPM)
- 每日請求限制
- 每日 Token 限制
- 超限自動降級
- 🔴 累積成本限制 ($5 USD) - 2026-03-29 ogt 新增
- 超限自動降級 + Telegram 告警
版本: v1.0
版本: v1.1
建立日期: 2026-03-26 21:00 (台北時區)
更新日期: 2026-03-29 22:45 (台北時區)
建立者: Claude Code
"""
@@ -37,11 +39,35 @@ RATE_LIMITS = {
},
}
# =============================================================================
# 2026-03-29 ogt: 累積成本限制 (統帥要求)
# =============================================================================
COST_LIMITS = {
"gemini": {
"total_cost_usd": 5.0, # 🔴 總成本上限 $5 USD超過自動停用
"alert_threshold_usd": 4.0, # 警告閾值 $4 USD
},
"claude": {
"total_cost_usd": 10.0,
"alert_threshold_usd": 8.0,
},
}
# Gemini 1.5 Flash 定價 (per token)
GEMINI_PRICING = {
"input_per_token": 0.000000075, # $0.075 / 1M tokens
"output_per_token": 0.0000003, # $0.30 / 1M tokens
}
# Redis Keys
REDIS_KEY_PREFIX = "ai_rate:"
RPM_KEY = f"{REDIS_KEY_PREFIX}rpm:{{provider}}"
DAILY_REQ_KEY = f"{REDIS_KEY_PREFIX}daily_req:{{provider}}:{{date}}"
DAILY_TOKEN_KEY = f"{REDIS_KEY_PREFIX}daily_token:{{provider}}:{{date}}"
# 2026-03-29 ogt: 累積成本 Key (不過期,手動重置)
TOTAL_COST_KEY = f"{REDIS_KEY_PREFIX}total_cost:{{provider}}"
COST_ALERT_SENT_KEY = f"{REDIS_KEY_PREFIX}cost_alert_sent:{{provider}}"
# =============================================================================
@@ -100,6 +126,25 @@ class AIRateLimiter:
r = await self._get_redis()
today = self._get_today()
# 0. 🔴 2026-03-29 ogt: 檢查累積成本 (最高優先級)
if provider in COST_LIMITS:
cost_limit = COST_LIMITS[provider]["total_cost_usd"]
total_cost_key = TOTAL_COST_KEY.format(provider=provider)
current_cost = await r.get(total_cost_key)
current_cost = float(current_cost) if current_cost else 0.0
if current_cost >= cost_limit:
logger.error(
"ai_cost_limit_exceeded_blocking",
provider=provider,
current_cost=f"${current_cost:.4f}",
limit=f"${cost_limit:.2f}",
action="AUTO_SWITCH_TO_OLLAMA",
)
# 發送告警 (只發一次)
await self._send_cost_alert(provider, current_cost, cost_limit)
return False, f"🔴 成本超限! ${current_cost:.2f} >= ${cost_limit:.2f},已自動切換到 Ollama"
# 1. 檢查 RPM
rpm_key = RPM_KEY.format(provider=provider)
current_rpm = await r.get(rpm_key)
@@ -169,6 +214,140 @@ class AIRateLimiter:
return True, None
async def record_cost(self, provider: str, cost_usd: float) -> None:
"""
2026-03-29 ogt: 記錄累積成本
Args:
provider: AI 提供者
cost_usd: 本次成本 (USD)
"""
if provider not in COST_LIMITS or cost_usd <= 0:
return
r = await self._get_redis()
total_cost_key = TOTAL_COST_KEY.format(provider=provider)
# 使用 INCRBYFLOAT 原子操作
new_total = await r.incrbyfloat(total_cost_key, cost_usd)
logger.info(
"ai_cost_recorded",
provider=provider,
cost_usd=f"${cost_usd:.6f}",
total_cost=f"${new_total:.4f}",
)
# 檢查是否需要發送警告 (接近上限)
alert_threshold = COST_LIMITS[provider]["alert_threshold_usd"]
if new_total >= alert_threshold:
await self._send_cost_warning(provider, new_total, alert_threshold)
async def _send_cost_alert(self, provider: str, current_cost: float, limit: float) -> None:
"""
2026-03-29 ogt: 發送成本超限告警到 Telegram (只發一次)
"""
r = await self._get_redis()
alert_sent_key = COST_ALERT_SENT_KEY.format(provider=provider)
# 檢查是否已發送
if await r.get(alert_sent_key):
return
# 標記已發送 (24小時後可重新發送)
await r.set(alert_sent_key, "1", ex=86400)
try:
from src.core.config import settings
if not settings.OPENCLAW_TG_BOT_TOKEN or not settings.OPENCLAW_TG_CHAT_ID:
logger.warning("telegram_not_configured_for_cost_alert")
return
import httpx
message = (
f"🚨🚨🚨 <b>AI 成本超限警報</b> 🚨🚨🚨\n\n"
f"Provider: <code>{provider.upper()}</code>\n"
f"累積成本: <b>${current_cost:.2f}</b>\n"
f"上限: <b>${limit:.2f}</b>\n\n"
f"⚡ <b>已自動切換到 Ollama</b>\n\n"
f"如需恢復 {provider.upper()},請執行:\n"
f"<code>redis-cli DEL ai_rate:total_cost:{provider}</code>"
)
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(
f"https://api.telegram.org/bot{settings.OPENCLAW_TG_BOT_TOKEN}/sendMessage",
json={
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": message,
"parse_mode": "HTML",
},
)
logger.error(
"ai_cost_alert_sent",
provider=provider,
current_cost=f"${current_cost:.2f}",
limit=f"${limit:.2f}",
)
except Exception as e:
logger.error("ai_cost_alert_failed", error=str(e))
async def _send_cost_warning(self, provider: str, current_cost: float, threshold: float) -> None:
"""
2026-03-29 ogt: 發送成本接近上限警告
"""
r = await self._get_redis()
warning_key = f"{REDIS_KEY_PREFIX}cost_warning_sent:{provider}"
# 每小時只發一次警告
if await r.get(warning_key):
return
await r.set(warning_key, "1", ex=3600)
try:
from src.core.config import settings
if not settings.OPENCLAW_TG_BOT_TOKEN or not settings.OPENCLAW_TG_CHAT_ID:
return
import httpx
limit = COST_LIMITS[provider]["total_cost_usd"]
remaining = limit - current_cost
message = (
f"⚠️ <b>AI 成本警告</b>\n\n"
f"Provider: <code>{provider.upper()}</code>\n"
f"累積成本: <b>${current_cost:.2f}</b> / ${limit:.2f}\n"
f"剩餘額度: <b>${remaining:.2f}</b>\n\n"
f"接近上限,請注意監控!"
)
async with httpx.AsyncClient(timeout=10.0) as client:
await client.post(
f"https://api.telegram.org/bot{settings.OPENCLAW_TG_BOT_TOKEN}/sendMessage",
json={
"chat_id": settings.OPENCLAW_TG_CHAT_ID,
"text": message,
"parse_mode": "HTML",
},
)
logger.warning(
"ai_cost_warning_sent",
provider=provider,
current_cost=f"${current_cost:.2f}",
threshold=f"${threshold:.2f}",
)
except Exception as e:
logger.warning("ai_cost_warning_failed", error=str(e))
async def record_tokens(self, provider: str, tokens: int) -> None:
"""
記錄 Token 用量 (回應後呼叫)
@@ -195,7 +374,7 @@ class AIRateLimiter:
async def get_usage_stats(self, provider: str) -> dict:
"""
取得用量統計
取得用量統計 (含成本)
Args:
provider: AI 提供者
@@ -213,10 +392,27 @@ class AIRateLimiter:
rpm_key = RPM_KEY.format(provider=provider)
daily_req_key = DAILY_REQ_KEY.format(provider=provider, date=today)
daily_token_key = DAILY_TOKEN_KEY.format(provider=provider, date=today)
total_cost_key = TOTAL_COST_KEY.format(provider=provider)
current_rpm = await r.get(rpm_key)
current_daily = await r.get(daily_req_key)
current_tokens = await r.get(daily_token_key)
current_cost = await r.get(total_cost_key)
# 2026-03-29 ogt: 加入成本資訊
cost_info = {}
if provider in COST_LIMITS:
cost_limit = COST_LIMITS[provider]
current_cost_float = float(current_cost) if current_cost else 0.0
cost_info = {
"total_cost_usd": {
"current": round(current_cost_float, 4),
"limit": cost_limit["total_cost_usd"],
"remaining": round(cost_limit["total_cost_usd"] - current_cost_float, 4),
"alert_threshold": cost_limit["alert_threshold_usd"],
},
"cost_exceeded": current_cost_float >= cost_limit["total_cost_usd"],
}
return {
"provider": provider,
@@ -233,8 +429,23 @@ class AIRateLimiter:
"current": int(current_tokens) if current_tokens else 0,
"limit": limits["daily_tokens"],
},
**cost_info,
}
async def reset_cost(self, provider: str) -> None:
"""
2026-03-29 ogt: 重置累積成本 (統帥授權後使用)
Args:
provider: AI 提供者
"""
r = await self._get_redis()
total_cost_key = TOTAL_COST_KEY.format(provider=provider)
alert_sent_key = COST_ALERT_SENT_KEY.format(provider=provider)
await r.delete(total_cost_key, alert_sent_key)
logger.info("ai_cost_reset", provider=provider)
async def reset_limits(self, provider: str) -> None:
"""
重置限制 (緊急用)

View File

@@ -908,6 +908,11 @@ class OpenClawService:
)
# Langfuse: 記錄成功評分
trace.score(name="provider_success", value=1.0, comment=f"Success via {provider}")
# 2026-03-29 ogt: 記錄累積成本 (Gemini/Claude)
if cost_usd > 0:
await rate_limiter.record_cost(provider, cost_usd)
return response, provider, True, total_tokens, cost_usd
logger.warning("ai_provider_failed_fallback", provider=provider, latency_ms=latency_ms)