diff --git a/apps/api/src/db/base.py b/apps/api/src/db/base.py index 2523e8a9..1f82f668 100644 --- a/apps/api/src/db/base.py +++ b/apps/api/src/db/base.py @@ -5,9 +5,12 @@ CTO-201: Async SQLAlchemy setup Features: - SQLAlchemy 2.0 async engine -- aiosqlite for local dev -- PostgreSQL-ready (asyncpg) +- PostgreSQL (asyncpg) - 188 PostgreSQL - Session dependency injection + +統帥鐵律 2026-03-23: +- 絕對禁止 SQLite +- 所有 Episodic Memory 必須使用 PostgreSQL """ from collections.abc import AsyncGenerator @@ -42,18 +45,30 @@ _session_factory: async_sessionmaker[AsyncSession] | None = None def get_engine() -> AsyncEngine: - """Get or create async engine""" + """ + Get or create async engine + + 統帥鐵律 2026-03-23: + - 使用 DATABASE_URL (PostgreSQL) + - 絕對禁止 SQLite + """ global _engine if _engine is None: - # SQLite 需要特殊處理 - connect_args = {} - if settings.SQLITE_DATABASE_URL.startswith("sqlite"): - connect_args["check_same_thread"] = False + database_url = settings.DATABASE_URL + + # 統帥鐵律: 禁止 SQLite + if "sqlite" in database_url.lower(): + import structlog + logger = structlog.get_logger(__name__) + logger.error("sqlite_forbidden", message="SQLite is FORBIDDEN. Using PostgreSQL default.") + database_url = "postgresql+asyncpg://awoooi:changeme@192.168.0.188:5432/awoooi_prod" _engine = create_async_engine( - settings.SQLITE_DATABASE_URL, + database_url, echo=settings.DEBUG, - connect_args=connect_args, + pool_size=10, + max_overflow=20, + pool_pre_ping=True, ) return _engine diff --git a/apps/api/src/main.py b/apps/api/src/main.py index db3e9e81..bffb51a0 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -79,9 +79,10 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: kubeconfig=settings.KUBECONFIG_PATH, ) - # CTO-201: Initialize SQLite database + # CTO-201: Initialize PostgreSQL database (統帥鐵律: 禁止 SQLite) await init_db() - logger.info("database_initialized", url=settings.SQLITE_DATABASE_URL) + db_url = settings.DATABASE_URL + logger.info("database_initialized", url=db_url.split("@")[-1] if "@" in db_url else db_url) # Phase 5: Initialize HTTP Clients (ClickHouse, Ollama) # 統帥鐵律: 連線池在啟動時建立,關閉時回收 diff --git a/packages/lewooogo-data/src/lewooogo_data/providers/__init__.py b/packages/lewooogo-data/src/lewooogo_data/providers/__init__.py index b7ba57fc..9ce3c534 100644 --- a/packages/lewooogo-data/src/lewooogo_data/providers/__init__.py +++ b/packages/lewooogo-data/src/lewooogo_data/providers/__init__.py @@ -1,22 +1,28 @@ """ leWOOOgo Data Providers - 具體實作 =================================== - -IMemoryProvider 的具體實作 +Phase 6.4d: Memory Provider 實作 Provider 列表: -- RedisMemory: Working Memory (7 天 TTL) -- PgMemory: Episodic Memory (永久) -- DualMemory: 雙層記憶體 (Working + Episodic) +- PgMemoryProvider: Episodic Memory (PostgreSQL 永久) + +統帥鐵律 2026-03-23: +- 絕對禁止 SQLite +- 所有 Episodic Memory 必須使用 PostgreSQL """ -# TODO: Phase 6.4d 實作後啟用 -# from lewooogo_data.providers.redis_memory import RedisMemory -# from lewooogo_data.providers.pg_memory import PgMemory -# from lewooogo_data.providers.dual_memory import DualMemory +from .pg_memory import ( + PgMemoryProvider, + init_pg_engine, + close_pg_engine, + get_session_factory, + get_database_url, +) __all__: list[str] = [ - # "RedisMemory", - # "PgMemory", - # "DualMemory", + "PgMemoryProvider", + "init_pg_engine", + "close_pg_engine", + "get_session_factory", + "get_database_url", ] diff --git a/packages/lewooogo-data/src/lewooogo_data/providers/pg_memory.py b/packages/lewooogo-data/src/lewooogo_data/providers/pg_memory.py new file mode 100644 index 00000000..a31abd68 --- /dev/null +++ b/packages/lewooogo-data/src/lewooogo_data/providers/pg_memory.py @@ -0,0 +1,269 @@ +""" +PostgreSQL Memory Provider - 永久記憶層 +======================================== +Phase 6.4d: Episodic Memory 實作 + +統帥鐵律 2026-03-23: +- 絕對禁止 SQLite,所有持久化必須使用 PostgreSQL +- 連線字串從環境變數讀取 (DATABASE_URL) +- 預設 fallback 為 188 PostgreSQL + +Features: +- 非同步連線池 (asyncpg) +- Incident 永久保存 +- 自動建表 (metadata.create_all) +""" + +import os +from typing import Any, TypeVar + +import structlog +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine +from sqlalchemy.orm import DeclarativeBase + +from ..interfaces.memory_provider import IMemoryProvider + +logger = structlog.get_logger(__name__) + +T = TypeVar("T") + + +# ============================================================================= +# 188 PostgreSQL 常量 (統帥鐵律: 禁止 SQLite) +# ============================================================================= + +DEFAULT_PG_URL = "postgresql+asyncpg://wooo:wooopwd@192.168.0.188:5432/awoooi" + + +def get_database_url() -> str: + """ + 取得資料庫連線字串 + + 優先順序: + 1. DATABASE_URL 環境變數 + 2. 188 PostgreSQL 預設值 + + 統帥鐵律: 絕對禁止出現 sqlite + """ + url = os.getenv("DATABASE_URL", DEFAULT_PG_URL) + + # 統帥鐵律: 禁止 SQLite + if "sqlite" in url.lower(): + logger.error( + "sqlite_forbidden", + message="SQLite is FORBIDDEN by Commander's decree. Using PostgreSQL.", + ) + return DEFAULT_PG_URL + + return url + + +# ============================================================================= +# PostgreSQL Engine (全域單例) +# ============================================================================= + +_engine = None +_session_factory = None + + +async def init_pg_engine(base: type[DeclarativeBase] | None = None) -> None: + """ + 初始化 PostgreSQL 引擎與連線池 + + Args: + base: SQLAlchemy DeclarativeBase,用於自動建表 + """ + global _engine, _session_factory + + if _engine is not None: + return + + database_url = get_database_url() + + _engine = create_async_engine( + database_url, + echo=False, + pool_size=10, + max_overflow=20, + pool_pre_ping=True, + ) + + _session_factory = async_sessionmaker( + _engine, + class_=AsyncSession, + expire_on_commit=False, + ) + + # 自動建表 (如果提供 Base) + if base is not None: + async with _engine.begin() as conn: + await conn.run_sync(base.metadata.create_all) + logger.info("pg_tables_created", base=base.__name__) + + logger.info( + "pg_engine_initialized", + url=database_url.split("@")[-1] if "@" in database_url else database_url, + pool_size=10, + ) + + +async def close_pg_engine() -> None: + """關閉 PostgreSQL 引擎""" + global _engine, _session_factory + + if _engine is not None: + await _engine.dispose() + _engine = None + _session_factory = None + logger.info("pg_engine_closed") + + +def get_session_factory() -> async_sessionmaker[AsyncSession]: + """取得 Session 工廠""" + if _session_factory is None: + raise RuntimeError("PostgreSQL engine not initialized. Call init_pg_engine() first.") + return _session_factory + + +# ============================================================================= +# PgMemoryProvider 實作 +# ============================================================================= + +class PgMemoryProvider(IMemoryProvider[T]): + """ + PostgreSQL 記憶體提供者 (Episodic Memory) + + 特性: + - 永久保存 (無 TTL) + - 支援複雜查詢 + - 事務保證 + + 統帥鐵律: + - 絕對禁止 SQLite + - 所有 Incident 必須持久化到 PostgreSQL + """ + + def __init__(self, model_class: type[T] | None = None): + """ + Args: + model_class: SQLAlchemy Model 類別 + """ + self._model_class = model_class + + async def load(self, key: str) -> T | None: + """ + 從 PostgreSQL 載入資料 + + Args: + key: 主鍵值 + + Returns: + Model 實例或 None + """ + if self._model_class is None: + raise ValueError("model_class is required for load()") + + session_factory = get_session_factory() + async with session_factory() as session: + result = await session.get(self._model_class, key) + return result + + async def save(self, key: str, data: T, ttl_seconds: int | None = None) -> bool: + """ + 儲存到 PostgreSQL + + Args: + key: 主鍵值 (用於日誌) + data: Model 實例 + ttl_seconds: 忽略 (PostgreSQL 永久保存) + + Returns: + 是否成功 + """ + session_factory = get_session_factory() + + try: + async with session_factory() as session: + session.add(data) + await session.commit() + + logger.debug("pg_saved", key=key) + return True + + except Exception as e: + logger.error("pg_save_failed", key=key, error=str(e)) + return False + + async def delete(self, key: str) -> bool: + """ + 從 PostgreSQL 刪除 + + Args: + key: 主鍵值 + + Returns: + 是否成功 + """ + if self._model_class is None: + raise ValueError("model_class is required for delete()") + + session_factory = get_session_factory() + + try: + async with session_factory() as session: + obj = await session.get(self._model_class, key) + if obj: + await session.delete(obj) + await session.commit() + logger.debug("pg_deleted", key=key) + return True + return False + + except Exception as e: + logger.error("pg_delete_failed", key=key, error=str(e)) + return False + + async def exists(self, key: str) -> bool: + """ + 檢查是否存在 + + Args: + key: 主鍵值 + + Returns: + 是否存在 + """ + result = await self.load(key) + return result is not None + + async def update(self, key: str, updates: dict[str, Any]) -> bool: + """ + 更新 PostgreSQL 記錄 + + Args: + key: 主鍵值 + updates: 要更新的欄位 + + Returns: + 是否成功 + """ + if self._model_class is None: + raise ValueError("model_class is required for update()") + + session_factory = get_session_factory() + + try: + async with session_factory() as session: + obj = await session.get(self._model_class, key) + if obj: + for field, value in updates.items(): + if hasattr(obj, field): + setattr(obj, field, value) + await session.commit() + logger.debug("pg_updated", key=key, fields=list(updates.keys())) + return True + return False + + except Exception as e: + logger.error("pg_update_failed", key=key, error=str(e)) + return False