feat(knowledge): pgvector RAG — 語意搜尋 + 背景 Embedding 管線
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

- repository: save_embedding (raw SQL pgvector cast) + semantic_search (cosine <=>)
- service: create_entry 背景 embed + semantic_search + embed_all_entries 批次補 embed
- router: GET /semantic-search (q/limit/threshold) + POST /embed-all 管理端點

向量模型: nomic-embed-text (Ollama 192.168.0.188, 768 dims)
索引: ivfflat cosine (knowledge_entries.embedding vector(768))

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-04 11:17:24 +08:00
parent 200c382ca4
commit 8960bba7fe
3 changed files with 173 additions and 2 deletions

View File

@@ -67,6 +67,36 @@ async def search_entries(
return await service.search(q, limit)
@router.get("/semantic-search")
async def semantic_search(
q: str = Query(..., min_length=1, description="語意搜尋查詢"),
limit: int = Query(10, ge=1, le=50),
threshold: float = Query(0.5, ge=0.0, le=1.0, description="相似度門檻 (0-1)"),
) -> list[dict]:
"""
語意搜尋 (pgvector cosine similarity)
使用 nomic-embed-text 向量模型,回傳含相似度分數的結果。
"""
service = get_knowledge_service()
results = await service.semantic_search(q, limit=limit, threshold=threshold)
return [
{**entry.model_dump(), "score": round(score, 4)}
for entry, score in results
]
@router.post("/embed-all", status_code=200)
async def embed_all_entries() -> dict:
"""
管理端點: 批次為所有未 embed 的條目產生 embedding
Returns: {"total": N, "success": N, "failed": N}
"""
service = get_knowledge_service()
return await service.embed_all_entries()
@router.get("/categories")
async def get_categories() -> list[dict]:
"""取得分類樹 (含各類數量)"""

View File

@@ -184,6 +184,64 @@ class KnowledgeDBRepository:
)
return result.rowcount > 0
async def save_embedding(self, entry_id: str, embedding: list[float]) -> bool:
"""儲存向量 embedding (768 維)"""
# 直接用 raw SQL 寫入 pgvector 欄位
from sqlalchemy import text as sa_text
result = await self.db.execute(
sa_text(
"UPDATE knowledge_entries SET embedding = :emb::vector WHERE id = :id"
),
{"emb": str(embedding), "id": entry_id},
)
return result.rowcount > 0
async def semantic_search(
self,
query_embedding: list[float],
limit: int = 10,
threshold: float = 0.5,
) -> list[tuple[KnowledgeEntry, float]]:
"""
語意搜尋 — cosine similarity (pgvector)
Returns:
list of (entry, similarity_score) 已按分數降序排列
"""
from sqlalchemy import text as sa_text
sql = sa_text("""
SELECT id, 1 - (embedding <=> :emb::vector) AS score
FROM knowledge_entries
WHERE status != 'ARCHIVED'
AND embedding IS NOT NULL
AND 1 - (embedding <=> :emb::vector) >= :threshold
ORDER BY embedding <=> :emb::vector
LIMIT :limit
""")
rows = await self.db.execute(
sql,
{"emb": str(query_embedding), "threshold": threshold, "limit": limit},
)
rows = rows.fetchall()
if not rows:
return []
# 批次取得完整 entry
ids = [r[0] for r in rows]
scores = {r[0]: float(r[1]) for r in rows}
result = await self.db.execute(
select(KnowledgeEntryRecord).where(KnowledgeEntryRecord.id.in_(ids))
)
records = {r.id: r for r in result.scalars().all()}
return [
(self._to_model(records[entry_id]), scores[entry_id])
for entry_id in ids
if entry_id in records
]
def _to_model(self, record: KnowledgeEntryRecord) -> KnowledgeEntry:
"""ORM Record → Pydantic Model"""
return KnowledgeEntry(

View File

@@ -12,6 +12,8 @@ Knowledge Base Phase 1: CRUD + 狀態流轉 + 搜尋
- Router 層禁止直接存取 DB
"""
import asyncio
import structlog
from src.db.base import get_db_context
@@ -26,6 +28,7 @@ from src.models.knowledge import (
)
from src.repositories.interfaces import IKnowledgeRepository
from src.repositories.knowledge_repository import KnowledgeDBRepository
from src.services.embedding_service import OllamaEmbeddingService
logger = structlog.get_logger(__name__)
@@ -48,7 +51,7 @@ class KnowledgeService:
"""Knowledge Base 業務邏輯"""
async def create_entry(self, data: KnowledgeEntryCreate) -> KnowledgeEntry:
"""建立知識條目"""
"""建立知識條目,建立後背景自動產生 embedding"""
async with get_db_context() as db:
repo: IKnowledgeRepository = KnowledgeDBRepository(db)
entry = await repo.create(data)
@@ -58,7 +61,25 @@ class KnowledgeService:
entry_type=entry.entry_type,
source=entry.source,
)
return entry
# 背景產生 embedding (不阻塞回應)
asyncio.create_task(self._embed_entry(entry.id, data.title, data.content))
return entry
async def _embed_entry(self, entry_id: str, title: str, content: str) -> None:
"""背景任務:產生並儲存 embedding"""
try:
svc = OllamaEmbeddingService(model="nomic-embed-text", timeout=15.0)
text = f"search_document: {title}\n\n{content[:2000]}"
embedding = await svc.embed_text(text)
if not embedding:
return
async with get_db_context() as db:
repo = KnowledgeDBRepository(db)
await repo.save_embedding(entry_id, embedding)
logger.info("knowledge_embedding_saved", entry_id=entry_id)
except Exception as e:
logger.warning("knowledge_embedding_failed", entry_id=entry_id, error=str(e))
async def get_entry(self, entry_id: str) -> KnowledgeEntry | None:
"""取得知識條目 (view_count +1)"""
@@ -140,3 +161,65 @@ class KnowledgeService:
async with get_db_context() as db:
repo: IKnowledgeRepository = KnowledgeDBRepository(db)
return await repo.search(query, limit)
async def semantic_search(
self,
query: str,
limit: int = 10,
threshold: float = 0.5,
) -> list[tuple[KnowledgeEntry, float]]:
"""
語意搜尋 (pgvector cosine similarity)
Returns:
list of (entry, score) 已按相似度降序排列
"""
svc = OllamaEmbeddingService(model="nomic-embed-text", timeout=15.0)
query_text = f"search_query: {query}"
embedding = await svc.embed_text(query_text)
if not embedding:
logger.warning("semantic_search_embedding_failed", query=query)
return []
async with get_db_context() as db:
repo = KnowledgeDBRepository(db)
return await repo.semantic_search(embedding, limit=limit, threshold=threshold)
async def embed_all_entries(self) -> dict[str, int]:
"""
批次為所有未 embed 的條目產生 embedding (管理用)
Returns:
{"total": N, "success": N, "failed": N}
"""
svc = OllamaEmbeddingService(model="nomic-embed-text", timeout=15.0)
success = failed = 0
async with get_db_context() as db:
from sqlalchemy import text as sa_text
result = await db.execute(
sa_text(
"SELECT id, title, content FROM knowledge_entries "
"WHERE embedding IS NULL AND status != 'ARCHIVED'"
)
)
rows = result.fetchall()
for row in rows:
entry_id, title, content = row
try:
text = f"search_document: {title}\n\n{content[:2000]}"
embedding = await svc.embed_text(text)
if embedding:
async with get_db_context() as db:
repo = KnowledgeDBRepository(db)
await repo.save_embedding(entry_id, embedding)
success += 1
else:
failed += 1
except Exception as e:
logger.warning("embed_all_failed", entry_id=entry_id, error=str(e))
failed += 1
logger.info("embed_all_complete", total=len(rows), success=success, failed=failed)
return {"total": len(rows), "success": success, "failed": failed}