Files
awoooi/apps/api/src/services/embedding_service.py
OG T bf32c4b1f2 feat(api): Phase 13.2 AI Rate Limiter + RAG 基礎設施 (#84)
Rate Limiter (防止 Gemini 用量暴衝):
- ai_rate_limiter.py: RPM/Daily/Token 三層閥值
- openclaw.py: 整合 rate limit 檢查,超限自動降級
- health.py: /health/ai-usage 監控端點

RAG Tool 基礎 (#84 進行中):
- embedding_service.py: Ollama embedding 封裝
- rag_service.py: Redis vector search 服務

閥值設定:
- Gemini: 10 RPM, 500/day, 100K tokens/day
- Claude: 5 RPM, 200/day, 50K tokens/day

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-26 15:52:57 +08:00

232 lines
6.4 KiB
Python

"""
Embedding Service - Ollama BGE-M3 替代方案
==========================================
使用 Ollama qwen2.5:7b-instruct 提供文本向量化功能。
雖非專用 embedding 模型,但支援多語言 (繁中/英文)。
Phase 13.2 #84 - RAG Tool 基礎設施
版本: v1.0
建立日期: 2026-03-26 20:30 (台北時區)
建立者: Claude Code
"""
import asyncio
from typing import Protocol
import httpx
import structlog
from src.core.config import settings
logger = structlog.get_logger(__name__)
# =============================================================================
# Interface (DI Protocol)
# =============================================================================
class IEmbeddingService(Protocol):
"""Embedding 服務介面"""
async def embed_text(self, text: str) -> list[float]:
"""將單一文本轉換為向量"""
...
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""批次向量化多個文本"""
...
@property
def dimension(self) -> int:
"""向量維度"""
...
# =============================================================================
# Implementation
# =============================================================================
class OllamaEmbeddingService:
"""
Ollama Embedding Service
使用 Ollama API 進行文本向量化。
預設使用 qwen2.5:7b-instruct (3584 維向量)。
Usage:
service = OllamaEmbeddingService()
vector = await service.embed_text("維運手冊")
"""
def __init__(
self,
model: str = "qwen2.5:7b-instruct",
ollama_url: str | None = None,
timeout: float = 30.0,
) -> None:
"""
初始化 Embedding Service
Args:
model: Ollama 模型名稱 (必須支援 embedding)
ollama_url: Ollama API URL (預設從 config 讀取)
timeout: 請求超時 (秒)
"""
self._model = model
self._ollama_url = ollama_url or settings.OLLAMA_URL
self._timeout = timeout
self._dimension: int | None = None
self._client: httpx.AsyncClient | None = None
@property
def dimension(self) -> int:
"""
向量維度
首次呼叫會自動偵測,之後快取。
qwen2.5:7b-instruct = 3584 維
"""
if self._dimension is None:
# 預設值,實際會在第一次 embed 時更新
return 3584
return self._dimension
async def _get_client(self) -> httpx.AsyncClient:
"""取得 HTTP Client (連線池共用)"""
if self._client is None:
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(self._timeout),
limits=httpx.Limits(max_connections=10),
)
return self._client
async def embed_text(self, text: str) -> list[float]:
"""
將單一文本轉換為向量
Args:
text: 要向量化的文本
Returns:
list[float]: 向量 (3584 維)
Raises:
EmbeddingError: 向量化失敗
"""
client = await self._get_client()
try:
response = await client.post(
f"{self._ollama_url}/api/embeddings",
json={
"model": self._model,
"prompt": text,
},
)
response.raise_for_status()
data = response.json()
embedding = data.get("embedding", [])
# 更新維度快取
if self._dimension is None and embedding:
self._dimension = len(embedding)
logger.info(
"embedding_dimension_detected",
model=self._model,
dimension=self._dimension,
)
return embedding
except httpx.TimeoutException:
logger.error("embedding_timeout", model=self._model, text_len=len(text))
raise EmbeddingError(f"Embedding timeout after {self._timeout}s")
except httpx.HTTPStatusError as e:
logger.error(
"embedding_http_error",
status=e.response.status_code,
model=self._model,
)
raise EmbeddingError(f"Ollama API error: {e.response.status_code}")
except Exception as e:
logger.error("embedding_error", error=str(e), model=self._model)
raise EmbeddingError(f"Embedding failed: {e}")
async def embed_batch(
self,
texts: list[str],
concurrency: int = 5,
) -> list[list[float]]:
"""
批次向量化多個文本
Args:
texts: 文本列表
concurrency: 同時並行數 (避免過載 Ollama)
Returns:
list[list[float]]: 向量列表 (與輸入順序對應)
"""
if not texts:
return []
results: list[list[float]] = []
semaphore = asyncio.Semaphore(concurrency)
async def embed_with_semaphore(text: str) -> list[float]:
async with semaphore:
return await self.embed_text(text)
tasks = [embed_with_semaphore(text) for text in texts]
results = await asyncio.gather(*tasks, return_exceptions=False)
logger.info(
"batch_embedding_complete",
count=len(texts),
model=self._model,
)
return results
async def close(self) -> None:
"""關閉連線"""
if self._client:
await self._client.aclose()
self._client = None
# =============================================================================
# Errors
# =============================================================================
class EmbeddingError(Exception):
"""Embedding 操作錯誤"""
pass
# =============================================================================
# Singleton Factory
# =============================================================================
_embedding_service: OllamaEmbeddingService | None = None
def get_embedding_service() -> OllamaEmbeddingService:
"""
取得 Embedding Service 單例
Returns:
OllamaEmbeddingService: 共用實例
"""
global _embedding_service
if _embedding_service is None:
_embedding_service = OllamaEmbeddingService()
return _embedding_service