fix(db): eliminate SQLite brain-split, force PostgreSQL

Root cause: Worker used SQLITE_DATABASE_URL causing "no such table: incidents"
because each Pod had isolated SQLite file, not shared PostgreSQL.

Fixes:
- db/base.py: Use DATABASE_URL (PostgreSQL) instead of SQLITE_DATABASE_URL
- Added SQLite prohibition guard with logging
- Added pool_size and pool_pre_ping for production stability

New: packages/lewooogo-data PgMemoryProvider (Phase 6.4d)
- Episodic Memory implementation for PostgreSQL
- init_pg_engine() with auto table creation
- SQLite forbidden by Commander's decree

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-23 10:02:43 +08:00
parent 9f353343c9
commit 1576f2ab20
4 changed files with 314 additions and 23 deletions

View File

@@ -5,9 +5,12 @@ CTO-201: Async SQLAlchemy setup
Features:
- SQLAlchemy 2.0 async engine
- aiosqlite for local dev
- PostgreSQL-ready (asyncpg)
- PostgreSQL (asyncpg) - 188 PostgreSQL
- Session dependency injection
統帥鐵律 2026-03-23:
- 絕對禁止 SQLite
- 所有 Episodic Memory 必須使用 PostgreSQL
"""
from collections.abc import AsyncGenerator
@@ -42,18 +45,30 @@ _session_factory: async_sessionmaker[AsyncSession] | None = None
def get_engine() -> AsyncEngine:
"""Get or create async engine"""
"""
Get or create async engine
統帥鐵律 2026-03-23:
- 使用 DATABASE_URL (PostgreSQL)
- 絕對禁止 SQLite
"""
global _engine
if _engine is None:
# SQLite 需要特殊處理
connect_args = {}
if settings.SQLITE_DATABASE_URL.startswith("sqlite"):
connect_args["check_same_thread"] = False
database_url = settings.DATABASE_URL
# 統帥鐵律: 禁止 SQLite
if "sqlite" in database_url.lower():
import structlog
logger = structlog.get_logger(__name__)
logger.error("sqlite_forbidden", message="SQLite is FORBIDDEN. Using PostgreSQL default.")
database_url = "postgresql+asyncpg://awoooi:changeme@192.168.0.188:5432/awoooi_prod"
_engine = create_async_engine(
settings.SQLITE_DATABASE_URL,
database_url,
echo=settings.DEBUG,
connect_args=connect_args,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)
return _engine

View File

@@ -79,9 +79,10 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
kubeconfig=settings.KUBECONFIG_PATH,
)
# CTO-201: Initialize SQLite database
# CTO-201: Initialize PostgreSQL database (統帥鐵律: 禁止 SQLite)
await init_db()
logger.info("database_initialized", url=settings.SQLITE_DATABASE_URL)
db_url = settings.DATABASE_URL
logger.info("database_initialized", url=db_url.split("@")[-1] if "@" in db_url else db_url)
# Phase 5: Initialize HTTP Clients (ClickHouse, Ollama)
# 統帥鐵律: 連線池在啟動時建立,關閉時回收

View File

@@ -1,22 +1,28 @@
"""
leWOOOgo Data Providers - 具體實作
===================================
IMemoryProvider 的具體實作
Phase 6.4d: Memory Provider 實作
Provider 列表:
- RedisMemory: Working Memory (7 天 TTL)
- PgMemory: Episodic Memory (永久)
- DualMemory: 雙層記憶體 (Working + Episodic)
- PgMemoryProvider: Episodic Memory (PostgreSQL 永久)
統帥鐵律 2026-03-23:
- 絕對禁止 SQLite
- 所有 Episodic Memory 必須使用 PostgreSQL
"""
# TODO: Phase 6.4d 實作後啟用
# from lewooogo_data.providers.redis_memory import RedisMemory
# from lewooogo_data.providers.pg_memory import PgMemory
# from lewooogo_data.providers.dual_memory import DualMemory
from .pg_memory import (
PgMemoryProvider,
init_pg_engine,
close_pg_engine,
get_session_factory,
get_database_url,
)
__all__: list[str] = [
# "RedisMemory",
# "PgMemory",
# "DualMemory",
"PgMemoryProvider",
"init_pg_engine",
"close_pg_engine",
"get_session_factory",
"get_database_url",
]

View File

@@ -0,0 +1,269 @@
"""
PostgreSQL Memory Provider - 永久記憶層
========================================
Phase 6.4d: Episodic Memory 實作
統帥鐵律 2026-03-23:
- 絕對禁止 SQLite所有持久化必須使用 PostgreSQL
- 連線字串從環境變數讀取 (DATABASE_URL)
- 預設 fallback 為 188 PostgreSQL
Features:
- 非同步連線池 (asyncpg)
- Incident 永久保存
- 自動建表 (metadata.create_all)
"""
import os
from typing import Any, TypeVar
import structlog
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
from ..interfaces.memory_provider import IMemoryProvider
logger = structlog.get_logger(__name__)
T = TypeVar("T")
# =============================================================================
# 188 PostgreSQL 常量 (統帥鐵律: 禁止 SQLite)
# =============================================================================
DEFAULT_PG_URL = "postgresql+asyncpg://wooo:wooopwd@192.168.0.188:5432/awoooi"
def get_database_url() -> str:
"""
取得資料庫連線字串
優先順序:
1. DATABASE_URL 環境變數
2. 188 PostgreSQL 預設值
統帥鐵律: 絕對禁止出現 sqlite
"""
url = os.getenv("DATABASE_URL", DEFAULT_PG_URL)
# 統帥鐵律: 禁止 SQLite
if "sqlite" in url.lower():
logger.error(
"sqlite_forbidden",
message="SQLite is FORBIDDEN by Commander's decree. Using PostgreSQL.",
)
return DEFAULT_PG_URL
return url
# =============================================================================
# PostgreSQL Engine (全域單例)
# =============================================================================
_engine = None
_session_factory = None
async def init_pg_engine(base: type[DeclarativeBase] | None = None) -> None:
"""
初始化 PostgreSQL 引擎與連線池
Args:
base: SQLAlchemy DeclarativeBase用於自動建表
"""
global _engine, _session_factory
if _engine is not None:
return
database_url = get_database_url()
_engine = create_async_engine(
database_url,
echo=False,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)
_session_factory = async_sessionmaker(
_engine,
class_=AsyncSession,
expire_on_commit=False,
)
# 自動建表 (如果提供 Base)
if base is not None:
async with _engine.begin() as conn:
await conn.run_sync(base.metadata.create_all)
logger.info("pg_tables_created", base=base.__name__)
logger.info(
"pg_engine_initialized",
url=database_url.split("@")[-1] if "@" in database_url else database_url,
pool_size=10,
)
async def close_pg_engine() -> None:
"""關閉 PostgreSQL 引擎"""
global _engine, _session_factory
if _engine is not None:
await _engine.dispose()
_engine = None
_session_factory = None
logger.info("pg_engine_closed")
def get_session_factory() -> async_sessionmaker[AsyncSession]:
"""取得 Session 工廠"""
if _session_factory is None:
raise RuntimeError("PostgreSQL engine not initialized. Call init_pg_engine() first.")
return _session_factory
# =============================================================================
# PgMemoryProvider 實作
# =============================================================================
class PgMemoryProvider(IMemoryProvider[T]):
"""
PostgreSQL 記憶體提供者 (Episodic Memory)
特性:
- 永久保存 (無 TTL)
- 支援複雜查詢
- 事務保證
統帥鐵律:
- 絕對禁止 SQLite
- 所有 Incident 必須持久化到 PostgreSQL
"""
def __init__(self, model_class: type[T] | None = None):
"""
Args:
model_class: SQLAlchemy Model 類別
"""
self._model_class = model_class
async def load(self, key: str) -> T | None:
"""
從 PostgreSQL 載入資料
Args:
key: 主鍵值
Returns:
Model 實例或 None
"""
if self._model_class is None:
raise ValueError("model_class is required for load()")
session_factory = get_session_factory()
async with session_factory() as session:
result = await session.get(self._model_class, key)
return result
async def save(self, key: str, data: T, ttl_seconds: int | None = None) -> bool:
"""
儲存到 PostgreSQL
Args:
key: 主鍵值 (用於日誌)
data: Model 實例
ttl_seconds: 忽略 (PostgreSQL 永久保存)
Returns:
是否成功
"""
session_factory = get_session_factory()
try:
async with session_factory() as session:
session.add(data)
await session.commit()
logger.debug("pg_saved", key=key)
return True
except Exception as e:
logger.error("pg_save_failed", key=key, error=str(e))
return False
async def delete(self, key: str) -> bool:
"""
從 PostgreSQL 刪除
Args:
key: 主鍵值
Returns:
是否成功
"""
if self._model_class is None:
raise ValueError("model_class is required for delete()")
session_factory = get_session_factory()
try:
async with session_factory() as session:
obj = await session.get(self._model_class, key)
if obj:
await session.delete(obj)
await session.commit()
logger.debug("pg_deleted", key=key)
return True
return False
except Exception as e:
logger.error("pg_delete_failed", key=key, error=str(e))
return False
async def exists(self, key: str) -> bool:
"""
檢查是否存在
Args:
key: 主鍵值
Returns:
是否存在
"""
result = await self.load(key)
return result is not None
async def update(self, key: str, updates: dict[str, Any]) -> bool:
"""
更新 PostgreSQL 記錄
Args:
key: 主鍵值
updates: 要更新的欄位
Returns:
是否成功
"""
if self._model_class is None:
raise ValueError("model_class is required for update()")
session_factory = get_session_factory()
try:
async with session_factory() as session:
obj = await session.get(self._model_class, key)
if obj:
for field, value in updates.items():
if hasattr(obj, field):
setattr(obj, field, value)
await session.commit()
logger.debug("pg_updated", key=key, fields=list(updates.keys()))
return True
return False
except Exception as e:
logger.error("pg_update_failed", key=key, error=str(e))
return False