""" Knowledge Service - 業務邏輯層 =============================== Knowledge Base Phase 1: CRUD + 狀態流轉 + 搜尋 建立時間: 2026-04-02 (台北時區) 建立者: Claude Code (Knowledge Base Phase 1) 遵循 leWOOOgo 積木化原則: - Service 層封裝業務邏輯 - 依賴 IKnowledgeRepository Protocol - Router 層禁止直接存取 DB """ import structlog from src.db.base import get_db_context from src.models.knowledge import ( CategoryCount, EntryStatus, EntryType, KnowledgeEntry, KnowledgeEntryCreate, KnowledgeEntryUpdate, KnowledgeListResponse, ) from src.repositories.interfaces import IKnowledgeRepository from src.repositories.knowledge_repository import KnowledgeDBRepository logger = structlog.get_logger(__name__) # ============================================================================= # Singleton # ============================================================================= _knowledge_service: "KnowledgeService | None" = None def get_knowledge_service() -> "KnowledgeService": """取得 Knowledge Service 實例""" global _knowledge_service if _knowledge_service is None: _knowledge_service = KnowledgeService() return _knowledge_service class KnowledgeService: """Knowledge Base 業務邏輯""" async def create_entry(self, data: KnowledgeEntryCreate) -> KnowledgeEntry: """建立知識條目""" async with get_db_context() as db: repo: IKnowledgeRepository = KnowledgeDBRepository(db) entry = await repo.create(data) logger.info( "knowledge_entry_created", entry_id=entry.id, entry_type=entry.entry_type, source=entry.source, ) return entry async def get_entry(self, entry_id: str) -> KnowledgeEntry | None: """取得知識條目 (view_count +1)""" async with get_db_context() as db: repo: IKnowledgeRepository = KnowledgeDBRepository(db) entry = await repo.get_by_id(entry_id) if entry: await repo.increment_view_count(entry_id) entry.view_count += 1 return entry async def update_entry( self, entry_id: str, data: KnowledgeEntryUpdate ) -> KnowledgeEntry | None: """更新知識條目""" update_data = data.model_dump(exclude_none=True) async with get_db_context() as db: repo: IKnowledgeRepository = KnowledgeDBRepository(db) if not update_data: return await repo.get_by_id(entry_id) return await repo.update(entry_id, update_data) async def approve_entry(self, entry_id: str) -> KnowledgeEntry | None: """審核通過 (draft/review → approved)""" async with get_db_context() as db: repo: IKnowledgeRepository = KnowledgeDBRepository(db) entry = await repo.get_by_id(entry_id) if not entry: return None if entry.status == EntryStatus.APPROVED: return entry return await repo.update(entry_id, {"status": EntryStatus.APPROVED}) async def archive_entry(self, entry_id: str) -> bool: """封存 (軟刪除)""" async with get_db_context() as db: repo: IKnowledgeRepository = KnowledgeDBRepository(db) return await repo.delete(entry_id) async def list_entries( self, category: str | None = None, entry_type: EntryType | None = None, status: EntryStatus | None = None, tags: list[str] | None = None, q: str | None = None, limit: int = 20, offset: int = 0, ) -> KnowledgeListResponse: """列出知識條目 + 分類統計""" async with get_db_context() as db: repo: IKnowledgeRepository = KnowledgeDBRepository(db) items, total = await repo.list_entries( category=category, entry_type=entry_type, status=status, tags=tags, q=q, limit=limit, offset=offset, ) categories_raw = await repo.get_categories() categories = [ CategoryCount(category=cat, count=cnt) for cat, cnt in categories_raw ] return KnowledgeListResponse( items=items, total=total, categories=categories ) async def get_categories(self) -> list[CategoryCount]: """取得分類統計(直接呼叫 repo,不走 list_entries)""" async with get_db_context() as db: repo: IKnowledgeRepository = KnowledgeDBRepository(db) categories_raw = await repo.get_categories() return [CategoryCount(category=cat, count=cnt) for cat, cnt in categories_raw] async def search(self, query: str, limit: int = 20) -> list[KnowledgeEntry]: """關鍵字搜尋""" async with get_db_context() as db: repo: IKnowledgeRepository = KnowledgeDBRepository(db) return await repo.search(query, limit)