Files
awoooi/apps/api/src/db/base.py
OG T f9ba200638
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
fix(db): Phase 6 migration 三條 CREATE INDEX 拆開各自 execute
asyncpg 不支援 prepared statement 內多條 SQL 指令,
原本一個 text("""...""") 包含三條 CREATE INDEX 導致 CrashLoopBackOff。
拆成三個獨立 conn.execute() 呼叫。

2026-04-15 ogt + Claude Sonnet 4.6(亞太)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 19:37:58 +08:00

251 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Database Base Configuration
===========================
CTO-201: Async SQLAlchemy setup
Features:
- SQLAlchemy 2.0 async engine
- PostgreSQL (asyncpg) - 188 PostgreSQL
- Session dependency injection
統帥鐵律 2026-03-23:
- 絕對禁止 SQLite
- 所有 Episodic Memory 必須使用 PostgreSQL
"""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy import text
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from src.core.config import settings
# =============================================================================
# Base Model
# =============================================================================
class Base(DeclarativeBase):
"""SQLAlchemy declarative base"""
pass
# =============================================================================
# Engine & Session Factory
# =============================================================================
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None
def get_engine() -> AsyncEngine:
"""
Get or create async engine
統帥鐵律 2026-03-23:
- 使用 DATABASE_URL (PostgreSQL)
- 絕對禁止 SQLite
"""
global _engine
if _engine is None:
database_url = settings.DATABASE_URL
# 統帥鐵律: 禁止 SQLite (AWOOOI 憲法)
# 🔴 違反此規則必須立即報錯,禁止 fallback
if "sqlite" in database_url.lower():
import structlog
logger = structlog.get_logger(__name__)
logger.error("sqlite_forbidden", url=database_url)
raise ValueError(
"SQLite is FORBIDDEN by AWOOOI Constitution. "
"Set DATABASE_URL to PostgreSQL: postgresql+asyncpg://user:pass@host:5432/db"
)
_engine = create_async_engine(
database_url,
echo=settings.DEBUG,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)
return _engine
def get_session_factory() -> async_sessionmaker[AsyncSession]:
"""Get or create session factory"""
global _session_factory
if _session_factory is None:
_session_factory = async_sessionmaker(
bind=get_engine(),
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
return _session_factory
# =============================================================================
# Dependency Injection
# =============================================================================
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI dependency for database session
Usage:
@router.get("/items")
async def get_items(db: AsyncSession = Depends(get_db)):
...
"""
factory = get_session_factory()
async with factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@asynccontextmanager
async def get_db_context() -> AsyncGenerator[AsyncSession, None]:
"""
Context manager for database session (non-FastAPI usage)
Usage:
async with get_db_context() as db:
...
"""
factory = get_session_factory()
async with factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# =============================================================================
# Initialization
# =============================================================================
async def init_db() -> None:
"""
Initialize database tables
Call this at application startup.
"""
engine = get_engine()
# 2026-04-15 ogt: 多 replica 並行啟動競爭修復
# 問題:單一大 transaction 裡兩個 pod 同時建 table → 其中一個 CREATE INDEX 失敗
# PostgreSQL 中 transaction 內任何錯誤導致整個 transaction ROLLBACK
# → table + index 全消失 → 下次重啟同樣問題 → 無限 CrashLoop
# 修法:每個 table 獨立 transaction先 DROP INDEX IF EXISTS 清殘留孤兒 index
# 捕捉 "already exists" 讓並行 pod 優雅跳過
async with engine.connect() as probe_conn:
existing = set(await probe_conn.run_sync(
lambda c: __import__('sqlalchemy', fromlist=['inspect']).inspect(c).get_table_names()
))
for table in Base.metadata.sorted_tables:
if table.name not in existing:
try:
async with engine.begin() as conn:
# 先清殘留孤兒 index前次 CrashLoop 留下的部分狀態)
for index in table.indexes:
await conn.execute(text(f'DROP INDEX IF EXISTS "{index.name}"'))
await conn.run_sync(table.create)
except Exception as exc:
if "already exists" in str(exc).lower():
pass # 並行 pod 已建好,忽略
else:
raise
async with engine.begin() as conn:
# 2026-04-02 Claude Code: 確保 risklevel enum 包含 'high' 值
# Phase 23 新增,避免舊 DB 缺少此值導致 InvalidTextRepresentation
await conn.execute(
text("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumtypid = 'risklevel'::regtype
AND enumlabel = 'high'
) THEN
ALTER TYPE risklevel ADD VALUE 'high';
END IF;
END
$$;
""")
)
# 2026-04-09 Claude Sonnet 4.6: Sprint 5R C1 修復 — 批准執行閉環 Telegram 訊息持久化欄位
# create_all 不做 ALTER需手動補欄位
await conn.execute(
text("""
ALTER TABLE approval_records
ADD COLUMN IF NOT EXISTS telegram_message_id INTEGER,
ADD COLUMN IF NOT EXISTS telegram_chat_id INTEGER;
""")
)
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 Playbook PostgreSQL 持久化
# ADR-085: AI 學習成果不可存 Cache — trust_score、EWMA 必須永久保存
# playbooks 表已存在15 筆舊資料),補加新欄位
await conn.execute(
text("""
ALTER TABLE playbooks
ADD COLUMN IF NOT EXISTS trust_score FLOAT NOT NULL DEFAULT 0.3,
ADD COLUMN IF NOT EXISTS requires_approval_level VARCHAR(20) NOT NULL DEFAULT 'auto',
ADD COLUMN IF NOT EXISTS stateful_targets JSONB NOT NULL DEFAULT '[]',
ADD COLUMN IF NOT EXISTS requires_pre_backup BOOLEAN NOT NULL DEFAULT FALSE;
""")
)
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 感官升級
# ADR-084: EvidenceSnapshot 加入 Phase 4 動態異常上下文anomaly_context
await conn.execute(
text("""
ALTER TABLE incident_evidence
ADD COLUMN IF NOT EXISTS anomaly_context JSONB;
""")
)
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 自我治理閉環
# ADR-087: ai_governance_events 不可變 Event Sourcing 表
# asyncpg 不允許 prepared statement 內多條指令,必須分開 execute
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_ai_governance_event_type "
"ON ai_governance_events (event_type);"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_ai_governance_triggered_at "
"ON ai_governance_events (triggered_at);"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_ai_governance_resolved "
"ON ai_governance_events (resolved);"
))
async def close_db() -> None:
"""
Close database connections
Call this at application shutdown.
"""
global _engine, _session_factory
if _engine is not None:
await _engine.dispose()
_engine = None
_session_factory = None