Files
awoooi/apps/api/src/services/rag_service.py
OG T e26ea526b1 fix(api): lint errors in Rate Limiter + RAG services
- Remove unused imports (settings, uuid)
- Add 'from e' to exception raises (B904)
- Add strict=True to zip() (B905)
- Remove unused variable

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-26 16:03:16 +08:00

467 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
RAG Service - 維運手冊向量搜尋
==============================
Phase 13.2 #84 - Runbook RAG Tool
功能:
- 文檔分段 (Chunking)
- 向量索引 (Redis Stack FT.CREATE)
- 語義搜尋 (KNN Vector Search)
版本: v1.0
建立日期: 2026-03-26 20:45 (台北時區)
建立者: Claude Code
"""
import hashlib
import struct
from pathlib import Path
from typing import Protocol
import redis.asyncio as redis
import structlog
from src.services.embedding_service import IEmbeddingService, get_embedding_service
logger = structlog.get_logger(__name__)
# =============================================================================
# Configuration
# =============================================================================
RAG_CONFIG = {
"chunk_size": 500, # 每段字數
"chunk_overlap": 50, # 重疊字數
"index_name": "idx:runbooks", # Redis index 名稱
"prefix": "runbook:", # Key prefix
"ttl_days": 30, # 文檔 TTL (天)
}
# 維運手冊來源目錄 (相對於專案根目錄)
RUNBOOK_SOURCES = [
"docs/operations/*.md",
"docs/troubleshooting/*.md",
"docs/adr/*.md",
".agents/skills/*.md",
]
# =============================================================================
# Interface
# =============================================================================
class IRAGService(Protocol):
"""RAG 服務介面"""
async def index_documents(self, base_path: Path) -> int:
"""索引文檔,回傳索引數量"""
...
async def search(self, query: str, top_k: int = 5) -> list[dict]:
"""語義搜尋,回傳相關段落"""
...
async def get_index_stats(self) -> dict:
"""取得索引統計"""
...
# =============================================================================
# Implementation
# =============================================================================
class RAGService:
"""
RAG Service 實作
使用 Redis Stack 進行向量索引與搜尋。
支援維運手冊的語義搜尋。
"""
def __init__(
self,
redis_client: redis.Redis | None = None,
embedding_service: IEmbeddingService | None = None,
) -> None:
"""
初始化 RAG Service
Args:
redis_client: Redis 連線 (DI 注入)
embedding_service: Embedding 服務 (DI 注入)
"""
self._redis = redis_client
self._embedding_service = embedding_service
self._index_created = False
async def _get_redis(self) -> redis.Redis:
"""Lazy load Redis client"""
if self._redis is None:
from src.core.redis_client import get_redis
self._redis = get_redis()
return self._redis
async def _get_embedding_service(self) -> IEmbeddingService:
"""Lazy load Embedding service"""
if self._embedding_service is None:
self._embedding_service = get_embedding_service()
return self._embedding_service
# =========================================================================
# Document Processing
# =========================================================================
def _chunk_text(self, text: str, source: str) -> list[dict]:
"""
將文本分段
Args:
text: 原始文本
source: 來源檔案路徑
Returns:
list[dict]: 分段列表,每個包含 content, source, chunk_id
"""
chunk_size = RAG_CONFIG["chunk_size"]
overlap = RAG_CONFIG["chunk_overlap"]
chunks = []
start = 0
chunk_idx = 0
while start < len(text):
end = start + chunk_size
# 嘗試在句號/換行處斷開 (避免截斷句子)
if end < len(text):
# 往後找到最近的句號或換行
for sep in ["\n\n", "", "\n", ". ", ""]:
sep_pos = text.rfind(sep, start + chunk_size // 2, end + 50)
if sep_pos > start:
end = sep_pos + len(sep)
break
chunk_content = text[start:end].strip()
if chunk_content:
chunk_id = self._generate_chunk_id(source, chunk_idx)
chunks.append({
"chunk_id": chunk_id,
"content": chunk_content,
"source": source,
"chunk_index": chunk_idx,
})
chunk_idx += 1
start = end - overlap if end < len(text) else len(text)
return chunks
def _generate_chunk_id(self, source: str, chunk_idx: int) -> str:
"""生成唯一 Chunk ID"""
content = f"{source}:{chunk_idx}"
return hashlib.md5(content.encode()).hexdigest()[:12]
# =========================================================================
# Redis Vector Index
# =========================================================================
async def _ensure_index(self) -> None:
"""
確保向量索引存在
使用 FT.CREATE 建立 HNSW 向量索引
"""
if self._index_created:
return
r = await self._get_redis()
embedding_service = await self._get_embedding_service()
dim = embedding_service.dimension
index_name = RAG_CONFIG["index_name"]
prefix = RAG_CONFIG["prefix"]
try:
# 檢查索引是否存在
await r.execute_command("FT.INFO", index_name)
logger.info("rag_index_exists", index=index_name)
self._index_created = True
return
except redis.ResponseError as e:
if "Unknown index name" not in str(e):
raise
# 建立向量索引
# Schema: content (TEXT), source (TAG), embedding (VECTOR HNSW)
try:
await r.execute_command(
"FT.CREATE", index_name,
"ON", "HASH",
"PREFIX", "1", prefix,
"SCHEMA",
"content", "TEXT", "WEIGHT", "1.0",
"source", "TAG",
"chunk_index", "NUMERIC",
"embedding", "VECTOR", "HNSW", "6",
"TYPE", "FLOAT32",
"DIM", str(dim),
"DISTANCE_METRIC", "COSINE",
)
logger.info(
"rag_index_created",
index=index_name,
dimension=dim,
)
self._index_created = True
except redis.ResponseError as e:
if "Index already exists" in str(e):
self._index_created = True
else:
logger.error("rag_index_create_failed", error=str(e))
raise
async def _store_chunk(
self,
chunk: dict,
embedding: list[float],
) -> None:
"""
儲存分段到 Redis
Args:
chunk: 分段資料
embedding: 向量
"""
r = await self._get_redis()
prefix = RAG_CONFIG["prefix"]
key = f"{prefix}{chunk['chunk_id']}"
# 將 float list 轉換為 bytes (FLOAT32)
embedding_bytes = struct.pack(f"{len(embedding)}f", *embedding)
await r.hset(
key,
mapping={
"content": chunk["content"],
"source": chunk["source"],
"chunk_index": chunk["chunk_index"],
"embedding": embedding_bytes,
},
)
# 設定 TTL
ttl_seconds = RAG_CONFIG["ttl_days"] * 24 * 60 * 60
await r.expire(key, ttl_seconds)
# =========================================================================
# Public API
# =========================================================================
async def index_documents(self, base_path: Path) -> int:
"""
索引維運手冊
Args:
base_path: 專案根目錄
Returns:
int: 索引的分段數量
"""
await self._ensure_index()
embedding_service = await self._get_embedding_service()
total_chunks = 0
all_chunks: list[dict] = []
# 收集所有文檔
for pattern in RUNBOOK_SOURCES:
for file_path in base_path.glob(pattern):
if file_path.is_file():
try:
content = file_path.read_text(encoding="utf-8")
relative_path = str(file_path.relative_to(base_path))
chunks = self._chunk_text(content, relative_path)
all_chunks.extend(chunks)
logger.debug(
"rag_file_chunked",
file=relative_path,
chunks=len(chunks),
)
except Exception as e:
logger.warning(
"rag_file_read_error",
file=str(file_path),
error=str(e),
)
if not all_chunks:
logger.warning("rag_no_documents_found", patterns=RUNBOOK_SOURCES)
return 0
# 批次向量化
logger.info("rag_embedding_start", chunks=len(all_chunks))
texts = [c["content"] for c in all_chunks]
embeddings = await embedding_service.embed_batch(texts, concurrency=3)
# 儲存到 Redis
for chunk, embedding in zip(all_chunks, embeddings, strict=True):
await self._store_chunk(chunk, embedding)
total_chunks += 1
logger.info(
"rag_index_complete",
total_chunks=total_chunks,
sources=len(RUNBOOK_SOURCES),
)
return total_chunks
async def search(
self,
query: str,
top_k: int = 5,
) -> list[dict]:
"""
語義搜尋維運手冊
Args:
query: 自然語言查詢
top_k: 回傳數量 (預設 5)
Returns:
list[dict]: 相關段落列表
- content: 段落內容
- source: 來源檔案
- score: 相似度分數
"""
await self._ensure_index()
r = await self._get_redis()
embedding_service = await self._get_embedding_service()
index_name = RAG_CONFIG["index_name"]
# 向量化查詢
query_embedding = await embedding_service.embed_text(query)
query_bytes = struct.pack(f"{len(query_embedding)}f", *query_embedding)
# KNN 向量搜尋
# *=>[KNN 5 @embedding $vec AS score]
try:
results = await r.execute_command(
"FT.SEARCH", index_name,
f"*=>[KNN {top_k} @embedding $vec AS score]",
"PARAMS", "2", "vec", query_bytes,
"SORTBY", "score",
"RETURN", "3", "content", "source", "score",
"DIALECT", "2",
)
except redis.ResponseError as e:
logger.error("rag_search_error", error=str(e), query=query[:50])
return []
# 解析結果
# Results format: [total, key1, [field1, value1, ...], key2, ...]
if not results or results[0] == 0:
return []
parsed = []
i = 1
while i < len(results):
# results[i] is the Redis key, results[i+1] is the fields
fields = results[i + 1] if i + 1 < len(results) else []
# 將 fields list 轉為 dict
field_dict = {}
for j in range(0, len(fields), 2):
if j + 1 < len(fields):
field_dict[fields[j]] = fields[j + 1]
parsed.append({
"content": field_dict.get("content", ""),
"source": field_dict.get("source", ""),
"score": float(field_dict.get("score", 0)),
})
i += 2
logger.info(
"rag_search_complete",
query=query[:30],
results=len(parsed),
)
return parsed
async def get_index_stats(self) -> dict:
"""
取得索引統計
Returns:
dict: 索引資訊
"""
r = await self._get_redis()
index_name = RAG_CONFIG["index_name"]
try:
info = await r.execute_command("FT.INFO", index_name)
# 將 list 轉為 dict
info_dict = {}
for i in range(0, len(info), 2):
if i + 1 < len(info):
info_dict[info[i]] = info[i + 1]
return {
"index_name": index_name,
"num_docs": info_dict.get("num_docs", 0),
"num_terms": info_dict.get("num_terms", 0),
"indexing": info_dict.get("indexing", 0),
}
except redis.ResponseError:
return {
"index_name": index_name,
"num_docs": 0,
"error": "Index not found",
}
async def clear_index(self) -> bool:
"""
清除索引 (重建用)
Returns:
bool: 是否成功
"""
r = await self._get_redis()
index_name = RAG_CONFIG["index_name"]
try:
await r.execute_command("FT.DROPINDEX", index_name, "DD")
self._index_created = False
logger.info("rag_index_cleared", index=index_name)
return True
except redis.ResponseError:
return False
# =============================================================================
# Singleton Factory
# =============================================================================
_rag_service: RAGService | None = None
def get_rag_service() -> RAGService:
"""
取得 RAG Service 單例
Returns:
RAGService: 共用實例
"""
global _rag_service
if _rag_service is None:
_rag_service = RAGService()
return _rag_service