Files
awoooi/apps/api/src/db/base.py
OG T 896bef94ee fix(web): pending-approvals-card 加防重複點擊 + loading 狀態
linter 自動強化: actioningId state 防止同一張卡重複操作
- disabled + opacity 0.6 + cursor not-allowed
- loading 時按鈕顯示 '...'
- finally() 確保 actioningId 清除

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-09 18:38:08 +08:00

189 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Database Base Configuration
===========================
CTO-201: Async SQLAlchemy setup
Features:
- SQLAlchemy 2.0 async engine
- PostgreSQL (asyncpg) - 188 PostgreSQL
- Session dependency injection
統帥鐵律 2026-03-23:
- 絕對禁止 SQLite
- 所有 Episodic Memory 必須使用 PostgreSQL
"""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy import text
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from src.core.config import settings
# =============================================================================
# Base Model
# =============================================================================
class Base(DeclarativeBase):
"""SQLAlchemy declarative base"""
pass
# =============================================================================
# Engine & Session Factory
# =============================================================================
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None
def get_engine() -> AsyncEngine:
"""
Get or create async engine
統帥鐵律 2026-03-23:
- 使用 DATABASE_URL (PostgreSQL)
- 絕對禁止 SQLite
"""
global _engine
if _engine is None:
database_url = settings.DATABASE_URL
# 統帥鐵律: 禁止 SQLite (AWOOOI 憲法)
# 🔴 違反此規則必須立即報錯,禁止 fallback
if "sqlite" in database_url.lower():
import structlog
logger = structlog.get_logger(__name__)
logger.error("sqlite_forbidden", url=database_url)
raise ValueError(
"SQLite is FORBIDDEN by AWOOOI Constitution. "
"Set DATABASE_URL to PostgreSQL: postgresql+asyncpg://user:pass@host:5432/db"
)
_engine = create_async_engine(
database_url,
echo=settings.DEBUG,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)
return _engine
def get_session_factory() -> async_sessionmaker[AsyncSession]:
"""Get or create session factory"""
global _session_factory
if _session_factory is None:
_session_factory = async_sessionmaker(
bind=get_engine(),
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
return _session_factory
# =============================================================================
# Dependency Injection
# =============================================================================
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI dependency for database session
Usage:
@router.get("/items")
async def get_items(db: AsyncSession = Depends(get_db)):
...
"""
factory = get_session_factory()
async with factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@asynccontextmanager
async def get_db_context() -> AsyncGenerator[AsyncSession, None]:
"""
Context manager for database session (non-FastAPI usage)
Usage:
async with get_db_context() as db:
...
"""
factory = get_session_factory()
async with factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# =============================================================================
# Initialization
# =============================================================================
async def init_db() -> None:
"""
Initialize database tables
Call this at application startup.
"""
engine = get_engine()
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 2026-04-02 Claude Code: 確保 risklevel enum 包含 'high' 值
# Phase 23 新增,避免舊 DB 缺少此值導致 InvalidTextRepresentation
await conn.execute(
text("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumtypid = 'risklevel'::regtype
AND enumlabel = 'high'
) THEN
ALTER TYPE risklevel ADD VALUE 'high';
END IF;
END
$$;
""")
)
# 2026-04-09 Claude Sonnet 4.6: Sprint 5R C1 修復 — 批准執行閉環 Telegram 訊息持久化欄位
# create_all 不做 ALTER需手動補欄位
await conn.execute(
text("""
ALTER TABLE approval_records
ADD COLUMN IF NOT EXISTS telegram_message_id INTEGER,
ADD COLUMN IF NOT EXISTS telegram_chat_id INTEGER;
""")
)
async def close_db() -> None:
"""
Close database connections
Call this at application shutdown.
"""
global _engine, _session_factory
if _engine is not None:
await _engine.dispose()
_engine = None
_session_factory = None