Files
awoooi/apps/api/src/db/base.py
OG T 628387de8c
Some checks failed
E2E Health Check / e2e-health (push) Successful in 17s
CD Pipeline / build-and-deploy (push) Has been cancelled
fix: risklevel migration 自動化 + Telegram Whitelist 注入
1. init_db() 啟動時自動確保 risklevel enum 包含 'high' 值
   (Phase 23 新增,避免舊 DB 缺值導致 InvalidTextRepresentation)

2. CD Pipeline 新增 OPENCLAW_TG_USER_WHITELIST 自動注入
   (之前為 CHANGE_ME,已更新為實際 user ID 5619078117)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 09:13:13 +08:00

179 lines
4.8 KiB
Python

"""
Database Base Configuration
===========================
CTO-201: Async SQLAlchemy setup
Features:
- SQLAlchemy 2.0 async engine
- PostgreSQL (asyncpg) - 188 PostgreSQL
- Session dependency injection
統帥鐵律 2026-03-23:
- 絕對禁止 SQLite
- 所有 Episodic Memory 必須使用 PostgreSQL
"""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy import text
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from src.core.config import settings
# =============================================================================
# Base Model
# =============================================================================
class Base(DeclarativeBase):
"""SQLAlchemy declarative base"""
pass
# =============================================================================
# Engine & Session Factory
# =============================================================================
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None
def get_engine() -> AsyncEngine:
"""
Get or create async engine
統帥鐵律 2026-03-23:
- 使用 DATABASE_URL (PostgreSQL)
- 絕對禁止 SQLite
"""
global _engine
if _engine is None:
database_url = settings.DATABASE_URL
# 統帥鐵律: 禁止 SQLite (AWOOOI 憲法)
# 🔴 違反此規則必須立即報錯,禁止 fallback
if "sqlite" in database_url.lower():
import structlog
logger = structlog.get_logger(__name__)
logger.error("sqlite_forbidden", url=database_url)
raise ValueError(
"SQLite is FORBIDDEN by AWOOOI Constitution. "
"Set DATABASE_URL to PostgreSQL: postgresql+asyncpg://user:pass@host:5432/db"
)
_engine = create_async_engine(
database_url,
echo=settings.DEBUG,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)
return _engine
def get_session_factory() -> async_sessionmaker[AsyncSession]:
"""Get or create session factory"""
global _session_factory
if _session_factory is None:
_session_factory = async_sessionmaker(
bind=get_engine(),
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
return _session_factory
# =============================================================================
# Dependency Injection
# =============================================================================
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI dependency for database session
Usage:
@router.get("/items")
async def get_items(db: AsyncSession = Depends(get_db)):
...
"""
factory = get_session_factory()
async with factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@asynccontextmanager
async def get_db_context() -> AsyncGenerator[AsyncSession, None]:
"""
Context manager for database session (non-FastAPI usage)
Usage:
async with get_db_context() as db:
...
"""
factory = get_session_factory()
async with factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# =============================================================================
# Initialization
# =============================================================================
async def init_db() -> None:
"""
Initialize database tables
Call this at application startup.
"""
engine = get_engine()
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 2026-04-02 Claude Code: 確保 risklevel enum 包含 'high' 值
# Phase 23 新增,避免舊 DB 缺少此值導致 InvalidTextRepresentation
await conn.execute(
text("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumtypid = 'risklevel'::regtype
AND enumlabel = 'high'
) THEN
ALTER TYPE risklevel ADD VALUE 'high';
END IF;
END
$$;
""")
)
async def close_db() -> None:
"""
Close database connections
Call this at application shutdown.
"""
global _engine, _session_factory
if _engine is not None:
await _engine.dispose()
_engine = None
_session_factory = None