Files
awoooi/apps/api/src/services/nvidia_provider.py
OG T ae21ba2cc6 feat(ai): Phase 20 P3 優化 - Circuit Breaker + 指數退避 + Prometheus
P3-1: Circuit Breaker 狀態機 (CLOSED/OPEN/HALF_OPEN)
- 連續 3 次失敗觸發斷路
- 60 秒後自動嘗試恢復
- 防止連鎖故障

P3-2: 指數退避重試
- 基礎延遲 1s,最大 30s
- 含 10% jitter 避免雷鳴

P3-3: Prometheus Metrics
- nvidia_tool_call_requests_total (status, tool_name)
- nvidia_tool_call_latency_seconds (histogram)
- nvidia_circuit_breaker_state_changes_total

測試: 25 → 34 PASSED

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-29 01:49:08 +08:00

710 lines
23 KiB
Python

"""
NVIDIA Nemotron Provider - ADR-036
==================================
2026-03-29 ogt: Nemotron Tool Calling 整合 (83.3% 精準度)
專門處理 Tool Calling 任務,提供高精準度的 K8s 操作決策。
設計原則:
1. OpenAI 相容格式 - 與 Nemotron API 對接
2. Pydantic 強制驗證 - 所有回應必須通過 Schema 驗證
3. Fallback 機制 - 失敗時降級到 Gemini/Claude
4. HITL 高風險保護 - DELETE 等操作需人工審核
版本: v1.0
建立: 2026-03-29 (台北時區)
建立者: Claude Code
"""
from __future__ import annotations
import asyncio
import json
import random
import time
from enum import Enum
from typing import Any, Protocol, runtime_checkable # 2026-03-29 ogt: P2-1 Protocol
import httpx
import structlog
from prometheus_client import Counter, Histogram # 2026-03-29 ogt: P3-3 Prometheus
from src.core.config import get_settings
from src.core.telemetry import get_tracer # 2026-03-29 ogt: P1-2 OTEL 追蹤
from src.models.nvidia import (
NvidiaProviderResult,
NvidiaResponse,
NvidiaUsage,
ToolCallValidationResult,
ToolDefinition,
)
from src.services.langfuse_client import ( # 2026-03-29 ogt: P1-1 Langfuse 整合
LangfuseTraceContext,
)
logger = structlog.get_logger(__name__)
settings = get_settings()
# OTEL Tracer (P1-2 修復)
_tracer = get_tracer("nvidia_provider")
# =============================================================================
# Protocol 定義 (P2-1 修復)
# =============================================================================
@runtime_checkable
class INvidiaProvider(Protocol):
"""
NVIDIA Provider Interface - P2-1 修復
2026-03-29 ogt: 定義 NvidiaProvider 介面,支援 DI 和測試替換
使用方式:
```python
def process_tool_call(provider: INvidiaProvider):
result = await provider.tool_call(messages, tools)
```
"""
async def tool_call(
self,
messages: list[dict[str, Any]],
tools: list[ToolDefinition | dict[str, Any]],
model: str = ...,
temperature: float = ...,
max_tokens: int = ...,
) -> NvidiaProviderResult:
"""執行 Tool Calling 請求"""
...
def is_high_risk_tool(self, tool_name: str) -> bool:
"""檢查是否為高風險 Tool"""
...
def get_high_risk_tools(
self, tool_calls: list[ToolCallValidationResult]
) -> list[ToolCallValidationResult]:
"""篩選高風險 Tool Calls"""
...
async def close(self) -> None:
"""關閉資源"""
...
# =============================================================================
# 常量定義
# =============================================================================
# NVIDIA NIM API Endpoint
NVIDIA_API_URL = "https://integrate.api.nvidia.com/v1/chat/completions"
# 預設模型 (2026-03-29 ogt: 修正為可用的 mini 模型)
NVIDIA_DEFAULT_MODEL = "nvidia/nemotron-mini-4b-instruct"
# 請求超時 (秒) - Nemotron 延遲 11-45s
NVIDIA_TIMEOUT = 60.0
# 重試次數
MAX_RETRIES = 2
# =============================================================================
# P3-1: Circuit Breaker 配置 (2026-03-29 ogt)
# =============================================================================
# Circuit Breaker 閾值
CIRCUIT_BREAKER_FAILURE_THRESHOLD = 3 # 連續失敗次數觸發斷路
CIRCUIT_BREAKER_RECOVERY_TIMEOUT = 60 # 斷路後等待恢復時間 (秒)
CIRCUIT_BREAKER_HALF_OPEN_REQUESTS = 1 # 半開狀態允許的測試請求數
# P3-2: 指數退避配置
RETRY_BASE_DELAY = 1.0 # 基礎延遲 (秒)
RETRY_MAX_DELAY = 30.0 # 最大延遲 (秒)
RETRY_EXPONENTIAL_BASE = 2 # 指數基數
# =============================================================================
# P3-3: Prometheus Metrics (2026-03-29 ogt)
# =============================================================================
NVIDIA_REQUESTS_TOTAL = Counter(
"nvidia_tool_call_requests_total",
"Total NVIDIA Tool Calling requests",
["status", "tool_name"],
)
NVIDIA_LATENCY_HISTOGRAM = Histogram(
"nvidia_tool_call_latency_seconds",
"NVIDIA Tool Calling latency in seconds",
buckets=[1, 5, 10, 15, 20, 30, 45, 60],
)
NVIDIA_CIRCUIT_BREAKER_STATE = Counter(
"nvidia_circuit_breaker_state_changes_total",
"Circuit breaker state changes",
["from_state", "to_state"],
)
# =============================================================================
# P3-1: Circuit Breaker 狀態機 (2026-03-29 ogt)
# =============================================================================
class CircuitState(Enum):
"""Circuit Breaker 狀態"""
CLOSED = "closed" # 正常運作
OPEN = "open" # 斷路,拒絕請求
HALF_OPEN = "half_open" # 測試恢復
class CircuitBreaker:
"""
Circuit Breaker 實作 - P3-1 優化
防止連鎖故障,當 NVIDIA API 連續失敗時自動斷路。
狀態轉換:
CLOSED → (連續失敗 >= 3) → OPEN
OPEN → (等待 60s) → HALF_OPEN
HALF_OPEN → (成功) → CLOSED
HALF_OPEN → (失敗) → OPEN
"""
def __init__(
self,
failure_threshold: int = CIRCUIT_BREAKER_FAILURE_THRESHOLD,
recovery_timeout: float = CIRCUIT_BREAKER_RECOVERY_TIMEOUT,
):
self._state = CircuitState.CLOSED
self._failure_count = 0
self._last_failure_time: float = 0
self._failure_threshold = failure_threshold
self._recovery_timeout = recovery_timeout
@property
def state(self) -> CircuitState:
"""取得當前狀態 (含自動轉換檢查)"""
if self._state == CircuitState.OPEN:
# 檢查是否應該轉為 HALF_OPEN
if time.time() - self._last_failure_time >= self._recovery_timeout:
self._transition_to(CircuitState.HALF_OPEN)
return self._state
def _transition_to(self, new_state: CircuitState) -> None:
"""狀態轉換 (含 Prometheus 記錄)"""
old_state = self._state
if old_state != new_state:
NVIDIA_CIRCUIT_BREAKER_STATE.labels(
from_state=old_state.value, to_state=new_state.value
).inc()
logger.info(
"circuit_breaker_state_change",
from_state=old_state.value,
to_state=new_state.value,
)
self._state = new_state
def can_execute(self) -> bool:
"""是否允許執行請求"""
state = self.state # 觸發自動狀態檢查
if state == CircuitState.CLOSED:
return True
if state == CircuitState.HALF_OPEN:
return True # 允許測試請求
return False # OPEN 狀態拒絕
def record_success(self) -> None:
"""記錄成功"""
if self._state == CircuitState.HALF_OPEN:
self._transition_to(CircuitState.CLOSED)
self._failure_count = 0
def record_failure(self) -> None:
"""記錄失敗"""
self._failure_count += 1
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
# HALF_OPEN 失敗,重新斷路
self._transition_to(CircuitState.OPEN)
elif self._failure_count >= self._failure_threshold:
# 連續失敗達閾值,斷路
self._transition_to(CircuitState.OPEN)
# 高風險 Tool 清單 (需要 HITL 審核)
HIGH_RISK_TOOLS: set[str] = {
"delete_pod",
"delete_deployment",
"delete_namespace",
"delete_service",
"delete_configmap",
"delete_secret",
"scale_to_zero",
"drain_node",
"cordon_node",
"delete_pvc",
"delete_pv",
}
# =============================================================================
# NvidiaProvider 類別
# =============================================================================
class NvidiaProvider:
"""
NVIDIA Nemotron Provider
專門處理 Tool Calling 任務,提供 83.3% 精準度的 K8s 操作決策。
使用方式:
```python
provider = NvidiaProvider()
result = await provider.tool_call(
messages=[{"role": "user", "content": "重啟 awoooi-api pod"}],
tools=[restart_tool, scale_tool],
)
if result.success:
for tc in result.tool_calls:
if tc.valid:
execute_tool(tc.tool_name, tc.arguments)
```
"""
def __init__(self, api_key: str | None = None):
"""
初始化 NvidiaProvider
Args:
api_key: NVIDIA API Key (預設從 settings 取得)
2026-03-29 ogt: P3-1 加入 Circuit Breaker
"""
self._api_key = api_key or settings.NVIDIA_API_KEY
self._client: httpx.AsyncClient | None = None
self._circuit_breaker = CircuitBreaker() # P3-1: Circuit Breaker
async def _get_client(self) -> httpx.AsyncClient:
"""取得或建立 HTTP Client"""
if self._client is None or self._client.is_closed:
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(NVIDIA_TIMEOUT, connect=10.0),
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
)
return self._client
async def close(self) -> None:
"""關閉 HTTP Client"""
if self._client and not self._client.is_closed:
await self._client.aclose()
self._client = None
async def tool_call(
self,
messages: list[dict[str, Any]],
tools: list[ToolDefinition | dict[str, Any]],
model: str = NVIDIA_DEFAULT_MODEL,
temperature: float = 0.0,
max_tokens: int = 1024,
) -> NvidiaProviderResult:
"""
執行 Tool Calling 請求
Args:
messages: 對話訊息列表
tools: 可用 Tool 定義列表
model: 模型名稱
temperature: 溫度 (0.0 最確定性)
max_tokens: 最大輸出 Token
Returns:
NvidiaProviderResult: 包含驗證後的 Tool Calls
2026-03-29 ogt: P1-1/P1-2 修復 - 加入 OTEL + Langfuse 追蹤
2026-03-29 ogt: P3-1/P3-2/P3-3 - Circuit Breaker + 指數退避 + Prometheus
"""
start_time = time.perf_counter()
# P1-2: OTEL Span 包裝整個 Tool Calling 流程
with _tracer.start_as_current_span("nvidia_tool_call") as span:
span.set_attribute("ai.provider", "nvidia")
span.set_attribute("ai.model", model)
span.set_attribute("ai.tool_count", len(tools))
# P3-1: Circuit Breaker 檢查
if not self._circuit_breaker.can_execute():
span.set_attribute("ai.error", "circuit_breaker_open")
NVIDIA_REQUESTS_TOTAL.labels(status="circuit_open", tool_name="").inc()
logger.warning(
"nvidia_circuit_breaker_open",
state=self._circuit_breaker.state.value,
)
return NvidiaProviderResult(
success=False,
error="Circuit Breaker OPEN - NVIDIA API 暫時不可用",
fallback_triggered=True,
)
# 檢查 API Key
if not self._api_key:
span.set_attribute("ai.error", "api_key_not_set")
NVIDIA_REQUESTS_TOTAL.labels(status="error", tool_name="").inc()
return NvidiaProviderResult(
success=False,
error="NVIDIA_API_KEY 未設定",
fallback_triggered=True,
)
# 轉換 tools 為 dict 格式
tools_data = []
tool_names = []
for tool in tools:
if isinstance(tool, ToolDefinition):
tools_data.append(tool.model_dump())
tool_names.append(tool.function.get("name", "unknown"))
else:
tools_data.append(tool)
tool_names.append(tool.get("function", {}).get("name", "unknown"))
span.set_attribute("ai.tool_names", ",".join(tool_names))
# 建立請求
request_body = {
"model": model,
"messages": messages,
"tools": tools_data,
"tool_choice": "auto",
"temperature": temperature,
"max_tokens": max_tokens,
}
# P1-1: Langfuse 追蹤
with LangfuseTraceContext(
name="nvidia_tool_call",
metadata={"model": model, "tool_count": len(tools)},
) as langfuse_ctx:
# 執行請求 (含 P3-2 指數退避重試)
response_data: dict | None = None
last_error: str | None = None
for attempt in range(MAX_RETRIES + 1):
try:
response_data = await self._send_request(request_body)
self._circuit_breaker.record_success() # P3-1
break
except Exception as e:
last_error = str(e)
span.set_attribute(f"ai.retry.{attempt}", last_error)
logger.warning(
"nvidia_request_retry",
attempt=attempt + 1,
max_retries=MAX_RETRIES,
error=last_error,
)
if attempt == MAX_RETRIES:
self._circuit_breaker.record_failure() # P3-1
break
# P3-2: 指數退避 (含 jitter)
delay = min(
RETRY_BASE_DELAY * (RETRY_EXPONENTIAL_BASE ** attempt),
RETRY_MAX_DELAY,
)
jitter = random.uniform(0, delay * 0.1) # 10% jitter
await asyncio.sleep(delay + jitter)
latency_ms = (time.perf_counter() - start_time) * 1000
latency_seconds = latency_ms / 1000
span.set_attribute("ai.latency_ms", round(latency_ms, 2))
NVIDIA_LATENCY_HISTOGRAM.observe(latency_seconds) # P3-3
# 請求失敗
if response_data is None:
span.set_attribute("ai.success", False)
span.set_attribute("ai.error", last_error or "unknown")
NVIDIA_REQUESTS_TOTAL.labels(status="error", tool_name="").inc()
logger.error(
"nvidia_request_failed",
error=last_error,
latency_ms=round(latency_ms, 2),
)
return NvidiaProviderResult(
success=False,
error=last_error,
latency_ms=latency_ms,
fallback_triggered=True,
)
# 解析回應
try:
nvidia_response = NvidiaResponse.model_validate(response_data)
except Exception as e:
span.set_attribute("ai.success", False)
span.set_attribute("ai.error", f"parse_failed: {e}")
logger.error(
"nvidia_response_parse_failed",
error=str(e),
raw_response=str(response_data)[:500],
)
return NvidiaProviderResult(
success=False,
error=f"回應解析失敗: {e}",
latency_ms=latency_ms,
fallback_triggered=True,
)
# 驗證 Tool Calls
tool_calls = self._validate_tool_calls(nvidia_response)
# 統計
usage = nvidia_response.usage
prompt_tokens = usage.prompt_tokens if usage else 0
completion_tokens = usage.completion_tokens if usage else 0
total_tokens = usage.total_tokens if usage else 0
# P1-2: OTEL 屬性
span.set_attribute("ai.success", True)
span.set_attribute("ai.tool_call_count", len(tool_calls))
span.set_attribute(
"ai.valid_count", sum(1 for tc in tool_calls if tc.valid)
)
span.set_attribute("ai.prompt_tokens", prompt_tokens)
span.set_attribute("ai.completion_tokens", completion_tokens)
span.set_attribute("ai.total_tokens", total_tokens)
# P1-1: Langfuse Generation 記錄
langfuse_ctx.generation(
name="nvidia_nemotron",
model=model,
input={"messages": messages, "tools": tool_names},
output={
"tool_calls": [
{"name": tc.tool_name, "args": tc.arguments}
for tc in tool_calls
if tc.valid
]
},
usage={"input": prompt_tokens, "output": completion_tokens},
metadata={
"latency_ms": round(latency_ms, 2),
"valid_count": sum(1 for tc in tool_calls if tc.valid),
},
)
# P3-3: Prometheus 成功指標
for tc in tool_calls:
if tc.valid and tc.tool_name:
NVIDIA_REQUESTS_TOTAL.labels(
status="success", tool_name=tc.tool_name
).inc()
logger.info(
"nvidia_tool_call_completed",
success=True,
tool_call_count=len(tool_calls),
valid_count=sum(1 for tc in tool_calls if tc.valid),
latency_ms=round(latency_ms, 2),
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
)
return NvidiaProviderResult(
success=True,
tool_calls=tool_calls,
usage=usage,
latency_ms=latency_ms,
fallback_triggered=False,
)
async def _send_request(self, request_body: dict) -> dict:
"""
發送 HTTP 請求到 NVIDIA API
Args:
request_body: 請求內容
Returns:
API 回應 (dict)
Raises:
Exception: 請求失敗
"""
client = await self._get_client()
headers = {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
}
response = await client.post(
NVIDIA_API_URL,
headers=headers,
json=request_body,
)
if response.status_code != 200:
error_text = response.text[:500]
raise Exception(
f"NVIDIA API 錯誤: {response.status_code} - {error_text}"
)
return response.json()
def _validate_tool_calls(
self, response: NvidiaResponse
) -> list[ToolCallValidationResult]:
"""
驗證 Tool Calls
Args:
response: NVIDIA API 回應
Returns:
驗證後的 Tool Call 結果列表
"""
results: list[ToolCallValidationResult] = []
if not response.choices:
return results
message = response.choices[0].message
if not message.tool_calls:
return results
for tc in message.tool_calls:
try:
# 解析 arguments JSON
arguments = json.loads(tc.function.arguments)
results.append(
ToolCallValidationResult(
valid=True,
tool_name=tc.function.name,
arguments=arguments,
)
)
except json.JSONDecodeError as e:
results.append(
ToolCallValidationResult(
valid=False,
tool_name=tc.function.name,
error=f"Arguments JSON 解析失敗: {e}",
raw_response=tc.function.arguments,
)
)
except Exception as e:
results.append(
ToolCallValidationResult(
valid=False,
error=f"驗證失敗: {e}",
)
)
return results
def is_high_risk_tool(self, tool_name: str) -> bool:
"""
檢查是否為高風險 Tool
Args:
tool_name: Tool 名稱
Returns:
是否需要 HITL 審核
"""
return tool_name.lower() in HIGH_RISK_TOOLS
def get_high_risk_tools(
self, tool_calls: list[ToolCallValidationResult]
) -> list[ToolCallValidationResult]:
"""
篩選高風險 Tool Calls
Args:
tool_calls: Tool Call 結果列表
Returns:
高風險 Tool Calls
"""
return [
tc
for tc in tool_calls
if tc.valid and tc.tool_name and self.is_high_risk_tool(tc.tool_name)
]
# =============================================================================
# 單例與工廠函數
# =============================================================================
_provider: NvidiaProvider | None = None
def get_nvidia_provider() -> NvidiaProvider:
"""取得 NvidiaProvider 單例"""
global _provider
if _provider is None:
_provider = NvidiaProvider()
return _provider
def reset_nvidia_provider() -> None:
"""重置單例 (用於測試)"""
global _provider
_provider = None
# =============================================================================
# 便捷函數
# =============================================================================
async def nvidia_tool_call(
messages: list[dict[str, Any]],
tools: list[ToolDefinition | dict[str, Any]],
**kwargs,
) -> NvidiaProviderResult:
"""
便捷函數: 執行 NVIDIA Tool Calling
Args:
messages: 對話訊息列表
tools: 可用 Tool 定義列表
**kwargs: 其他參數 (model, temperature, max_tokens)
Returns:
NvidiaProviderResult
"""
provider = get_nvidia_provider()
return await provider.tool_call(messages, tools, **kwargs)
def create_tool_definition(
name: str,
description: str,
parameters: dict[str, Any],
) -> ToolDefinition:
"""
建立 Tool 定義
Args:
name: Tool 名稱
description: Tool 描述
parameters: JSON Schema 參數定義
Returns:
ToolDefinition
"""
return ToolDefinition(
type="function",
function={
"name": name,
"description": description,
"parameters": parameters,
},
)