Files
awoooi/scripts/batch_vectorize_km.py
Your Name 8c4dc7a5a8
Some checks failed
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m5s
CD Pipeline / build-and-deploy (push) Failing after 10m6s
CD Pipeline / post-deploy-checks (push) Has been skipped
chore(rls): 新增 manual script gate 與 canary wave1
2026-05-12 20:23:27 +08:00

92 lines
2.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
batch_vectorize_km.py — ADR-073 Phase 1 Step 9
批次向量化所有 embedding IS NULL 的 KnowledgeEntry。
執行方式(在 K8s Pod 內):
# 透過 AWOOOI API embed-all endpoint
python3 scripts/batch_vectorize_km.py --via-api
# 直接呼叫 Service需在 Pod 內)
python3 scripts/batch_vectorize_km.py
2026-04-12 Claude Sonnet 4.6 (ADR-073 Phase 1)
"""
import asyncio
import asyncpg
import os
import sys
import httpx
VIA_API = "--via-api" in sys.argv
DRY_RUN = "--dry-run" in sys.argv
DATABASE_URL = os.environ.get("DATABASE_URL", "").replace("postgresql+asyncpg://", "postgresql://")
API_BASE = os.environ.get("API_BASE", "http://localhost:8000")
PROJECT_ID = os.environ.get("AWOOOP_PROJECT_ID", "awoooi")
async def check_status():
"""確認需要向量化的數量"""
conn = await asyncpg.connect(DATABASE_URL)
try:
await conn.execute("SELECT set_config('app.project_id', $1, FALSE)", PROJECT_ID)
total = await conn.fetchval("SELECT count(*) FROM knowledge_entries")
null_emb = await conn.fetchval("SELECT count(*) FROM knowledge_entries WHERE embedding IS NULL")
return total, null_emb
finally:
await conn.close()
async def via_api():
"""呼叫 /api/v1/knowledge/embed-all endpoint"""
print(f"呼叫 {API_BASE}/api/v1/knowledge/embed-all ...")
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(f"{API_BASE}/api/v1/knowledge/embed-all")
if resp.status_code == 200:
result = resp.json()
print(f"✅ API 回應: {result}")
else:
print(f"❌ API 錯誤 {resp.status_code}: {resp.text}")
async def via_service():
"""直接呼叫 KnowledgeService.embed_all_entries()"""
sys.path.insert(0, "/app")
from src.services.knowledge_service import get_knowledge_service
service = get_knowledge_service()
result = await service.embed_all_entries()
print(f"✅ 向量化結果: {result}")
async def main():
if not DATABASE_URL and not VIA_API:
print("ERROR: DATABASE_URL 未設定,請加 --via-api 使用 API 模式")
sys.exit(1)
if not VIA_API and DATABASE_URL:
total, null_emb = await check_status()
print(f"knowledge_entries: total={total}, embedding IS NULL={null_emb}")
if null_emb == 0:
print("✅ 所有條目已向量化,無需執行")
return
if DRY_RUN:
print(f"[DRY RUN] 將向量化 {null_emb} 筆 KM 條目")
print("使用 --via-api 透過 API 執行,或不加參數直接執行 Service")
return
if VIA_API:
await via_api()
else:
await via_service()
# 完成後確認
if DATABASE_URL:
total, null_emb = await check_status()
print(f"\n完成後狀態: total={total}, embedding IS NULL={null_emb}, vectorized={total - null_emb}")
if __name__ == "__main__":
asyncio.run(main())