Files
awoooi/apps/api/src/db/base.py
Your Name 9b01f1fa46
All checks were successful
CD Pipeline / tests (push) Successful in 5m29s
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / build-and-deploy (push) Successful in 4m9s
CD Pipeline / post-deploy-checks (push) Successful in 1m57s
fix(api): serialize startup bootstrap ddl
2026-05-24 17:10:26 +08:00

432 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Database Base Configuration
===========================
CTO-201: Async SQLAlchemy setup
Features:
- SQLAlchemy 2.0 async engine
- PostgreSQL (asyncpg) - 188 PostgreSQL
- Session dependency injection
統帥鐵律 2026-03-23:
- 絕對禁止 SQLite
- 所有 Episodic Memory 必須使用 PostgreSQL
"""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy import text
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
from src.core.config import settings
# =============================================================================
# Base Model
# =============================================================================
class Base(DeclarativeBase):
"""SQLAlchemy declarative base"""
pass
# =============================================================================
# Engine & Session Factory
# =============================================================================
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None
def get_engine() -> AsyncEngine:
"""
Get or create async engine
統帥鐵律 2026-03-23:
- 使用 DATABASE_URL (PostgreSQL)
- 絕對禁止 SQLite
"""
global _engine
if _engine is None:
database_url = settings.DATABASE_URL
# 統帥鐵律: 禁止 SQLite (AWOOOI 憲法)
# 🔴 違反此規則必須立即報錯,禁止 fallback
if "sqlite" in database_url.lower():
import structlog
logger = structlog.get_logger(__name__)
logger.error("sqlite_forbidden", url=database_url)
raise ValueError(
"SQLite is FORBIDDEN by AWOOOI Constitution. "
"Set DATABASE_URL to PostgreSQL: postgresql+asyncpg://user:pass@host:5432/db"
)
_engine = create_async_engine(
database_url,
echo=settings.DEBUG,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)
return _engine
def get_session_factory() -> async_sessionmaker[AsyncSession]:
"""Get or create session factory"""
global _session_factory
if _session_factory is None:
_session_factory = async_sessionmaker(
bind=get_engine(),
class_=AsyncSession,
expire_on_commit=False,
autoflush=False,
)
return _session_factory
# =============================================================================
# Dependency Injection
# =============================================================================
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI dependency for database session
Usage:
@router.get("/items")
async def get_items(db: AsyncSession = Depends(get_db)):
...
"""
factory = get_session_factory()
async with factory() as session:
try:
from src.core.context import get_current_project_id
# AwoooP Phase 2.3 (2026-05-04 ogt): SET LOCAL app.project_id 讓 RLS Policy 生效
# 預設 'awoooi',多租戶路由將透過 contextvar 注入實際 project_id
await session.execute(
text("SELECT set_config('app.project_id', :pid, TRUE)"),
{"pid": get_current_project_id()},
)
yield session
await session.commit()
except Exception:
await session.rollback()
raise
@asynccontextmanager
async def get_db_context(project_id: str | None = None) -> AsyncGenerator[AsyncSession, None]:
"""
Context manager for database session (non-FastAPI usage)
AwoooP Phase 2.3/2.4: 優先序 — 明確參數 > contextvar > "awoooi"
- Phase 2.3: 啟用 RLS tenant isolationSET LOCAL app.project_id
- Phase 2.4: 從 asyncio contextvar 讀取 background loop 的 project_id
Usage:
async with get_db_context() as db: # 繼承 contextvar 或預設 awoooi
...
async with get_db_context("other-tenant") as db: # 明確指定 tenant
...
"""
from src.core.context import get_current_project_id
effective_pid = project_id if project_id is not None else get_current_project_id()
factory = get_session_factory()
async with factory() as session:
try:
await session.execute(
text("SELECT set_config('app.project_id', :pid, TRUE)"),
{"pid": effective_pid},
)
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# =============================================================================
# Initialization
# =============================================================================
_DB_BOOTSTRAP_LOCK_NAME = "awoooi:init_db:ddl"
async def init_db() -> None:
"""
Initialize database tables
Call this at application startup.
"""
engine = get_engine()
async with engine.connect() as lock_conn:
# 2026-05-24 ogt + Codex: 兩個 API replica 同時啟動時PostgreSQL 會在
# ALTER TABLE ... IF NOT EXISTS 上互相等待並 deadlock。整段 bootstrap
# DDL 必須序列化,避免 rollout 因一個 pod CrashLoop 變成 1/2 ready。
await lock_conn.execute(
text("SELECT pg_advisory_lock(hashtext(:lock_name))"),
{"lock_name": _DB_BOOTSTRAP_LOCK_NAME},
)
try:
await _run_init_db_ddl(engine)
finally:
await lock_conn.execute(
text("SELECT pg_advisory_unlock(hashtext(:lock_name))"),
{"lock_name": _DB_BOOTSTRAP_LOCK_NAME},
)
async def _run_init_db_ddl(engine: AsyncEngine) -> None:
"""
Run idempotent DB bootstrap DDL while caller holds the bootstrap advisory lock.
"""
# 2026-04-15 ogt: 多 replica 並行啟動競爭修復
# 問題:單一大 transaction 裡兩個 pod 同時建 table → 其中一個 CREATE INDEX 失敗
# PostgreSQL 中 transaction 內任何錯誤導致整個 transaction ROLLBACK
# → table + index 全消失 → 下次重啟同樣問題 → 無限 CrashLoop
# 修法:每個 table 獨立 transaction先 DROP INDEX IF EXISTS 清殘留孤兒 index
# 捕捉 "already exists" 讓並行 pod 優雅跳過
async with engine.connect() as probe_conn:
existing = set(await probe_conn.run_sync(
lambda c: __import__('sqlalchemy', fromlist=['inspect']).inspect(c).get_table_names()
))
for table in Base.metadata.sorted_tables:
if table.name not in existing:
try:
async with engine.begin() as conn:
# 先清殘留孤兒 index前次 CrashLoop 留下的部分狀態)
for index in table.indexes:
await conn.execute(text(f'DROP INDEX IF EXISTS "{index.name}"'))
await conn.run_sync(table.create)
except Exception as exc:
if "already exists" in str(exc).lower():
pass # 並行 pod 已建好,忽略
else:
raise
async with engine.begin() as conn:
# 2026-04-02 Claude Code: 確保 risklevel enum 包含 'high' 值
# Phase 23 新增,避免舊 DB 缺少此值導致 InvalidTextRepresentation
await conn.execute(
text("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM pg_enum
WHERE enumtypid = 'risklevel'::regtype
AND enumlabel = 'high'
) THEN
ALTER TYPE risklevel ADD VALUE 'high';
END IF;
END
$$;
""")
)
# 2026-04-09 Claude Sonnet 4.6: Sprint 5R C1 修復 — 批准執行閉環 Telegram 訊息持久化欄位
# create_all 不做 ALTER需手動補欄位
await conn.execute(
text("""
ALTER TABLE approval_records
ADD COLUMN IF NOT EXISTS telegram_message_id INTEGER,
ADD COLUMN IF NOT EXISTS telegram_chat_id INTEGER;
""")
)
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 Playbook PostgreSQL 持久化
# ADR-085: AI 學習成果不可存 Cache — trust_score、EWMA 必須永久保存
# playbooks 表已存在15 筆舊資料),補加新欄位
await conn.execute(
text("""
ALTER TABLE playbooks
ADD COLUMN IF NOT EXISTS trust_score FLOAT NOT NULL DEFAULT 0.3,
ADD COLUMN IF NOT EXISTS requires_approval_level VARCHAR(20) NOT NULL DEFAULT 'auto',
ADD COLUMN IF NOT EXISTS stateful_targets JSONB NOT NULL DEFAULT '[]',
ADD COLUMN IF NOT EXISTS requires_pre_backup BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS version INTEGER NOT NULL DEFAULT 1,
ADD COLUMN IF NOT EXISTS parent_playbook_id VARCHAR(36),
ADD COLUMN IF NOT EXISTS supersedes_playbook_id VARCHAR(36),
ADD COLUMN IF NOT EXISTS version_reason TEXT;
""")
)
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_playbook_lineage "
"ON playbooks(parent_playbook_id, version);"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_playbook_supersedes "
"ON playbooks(supersedes_playbook_id) WHERE supersedes_playbook_id IS NOT NULL;"
))
await conn.execute(text(
"UPDATE playbooks SET parent_playbook_id = playbook_id WHERE parent_playbook_id IS NULL;"
))
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 感官升級
# ADR-084: EvidenceSnapshot 加入 Phase 4 動態異常上下文anomaly_context
await conn.execute(
text("""
ALTER TABLE incident_evidence
ADD COLUMN IF NOT EXISTS anomaly_context JSONB;
""")
)
# W2 PR-V1: SelfHealingValidator 補欄 (2026-04-28 ogt + Claude Sonnet 4.6)
# incident_evidence 加 self_healing_score + self_healing_detail
# create_all 不做 ALTER防禦性補加prod 已存在的表不會自動加欄)
await conn.execute(
text("""
ALTER TABLE incident_evidence
ADD COLUMN IF NOT EXISTS self_healing_score FLOAT,
ADD COLUMN IF NOT EXISTS self_healing_detail JSONB;
""")
)
# 2026-04-29 ogt + Claude Opus 4.7: PR-K1 防禦性 ALTER (db-expert finding)
# P1.6 (2026-04-24) ORM 已加 timeline_events.incident_id但 prod 若在 P1.6 前
# 已建表create_all 跳過已存在的表 → ALTER 不會跑 → ORM 寫入 SELECT 找不到欄位
# 補防禦性 IF NOT EXISTS已有 column 為 no-op安全
await conn.execute(
text("""
ALTER TABLE timeline_events
ADD COLUMN IF NOT EXISTS incident_id VARCHAR(64);
""")
)
# 2026-04-29 ogt + Claude Opus 4.7: M4 KMWriter 反查鏈 + 冪等補欄
# CD #1115-1117 全 failure 根因commit c22e5f33 加 ORM 欄位但無對應 ALTER
# 錯誤column "related_approval_id" of relation "knowledge_entries" does not exist
# 補防禦性 ALTER同 timeline_events 模式)+ 對應 index
await conn.execute(
text("""
ALTER TABLE knowledge_entries
ADD COLUMN IF NOT EXISTS related_approval_id VARCHAR(64),
ADD COLUMN IF NOT EXISTS path_type VARCHAR(50);
""")
)
# M3 冪等 unique index (incident_id + path_type)
await conn.execute(text(
"CREATE UNIQUE INDEX IF NOT EXISTS uix_knowledge_incident_path "
"ON knowledge_entries(related_incident_id, path_type) "
"WHERE related_incident_id IS NOT NULL AND path_type IS NOT NULL;"
))
# M4 反查鏈 partial indexapproval_id → KM 反查)
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_knowledge_related_approval "
"ON knowledge_entries(related_approval_id) "
"WHERE related_approval_id IS NOT NULL;"
))
# W2 PR-L1 2026-04-28 ogt + Claude Sonnet 4.6: KM→Playbook 互饋回路(飛輪 C3 修復)
# PlaybookRecord 新增 review_required 欄位
# 已存在表不會被 create_all 重建,必須手動 ALTER
await conn.execute(
text("""
ALTER TABLE playbooks
ADD COLUMN IF NOT EXISTS review_required BOOLEAN NOT NULL DEFAULT FALSE;
""")
)
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_playbook_review_required "
"ON playbooks(review_required) WHERE review_required = true;"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_timeline_incident_id "
"ON timeline_events(incident_id);"
))
# AwoooP Phase 2.6 (2026-05-04 ogt): budget_ledger 建表ADR-120 Token Budget Hard Kill
await conn.execute(text("""
CREATE TABLE IF NOT EXISTS budget_ledger (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
project_id VARCHAR(64) NOT NULL DEFAULT 'awoooi',
agent_id VARCHAR(128),
run_id UUID,
model VARCHAR(64),
provider VARCHAR(32),
prompt_tokens INT,
completion_tokens INT,
cost_usd NUMERIC(10, 4) NOT NULL DEFAULT 0.0000,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
"""))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS idx_budget_ledger_project_date "
"ON budget_ledger(project_id, recorded_at DESC);"
))
# AwoooP Phase 2.3 (2026-05-04 ogt): 四表加 project_idRLS 多租戶隔離)
# 防禦性 ALTER — 已存在欄位為 no-op安全。
# Batch 1 RLS migration 執行後app.project_id 由 get_db_context() 自動設置。
await conn.execute(text(
"ALTER TABLE incidents "
"ADD COLUMN IF NOT EXISTS project_id VARCHAR(64) NOT NULL DEFAULT 'awoooi';"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS idx_incidents_project_id "
"ON incidents (project_id);"
))
await conn.execute(text(
"ALTER TABLE knowledge_entries "
"ADD COLUMN IF NOT EXISTS project_id VARCHAR(64) NOT NULL DEFAULT 'awoooi';"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS idx_knowledge_entries_project_id "
"ON knowledge_entries (project_id);"
))
await conn.execute(text(
"ALTER TABLE playbooks "
"ADD COLUMN IF NOT EXISTS project_id VARCHAR(64) NOT NULL DEFAULT 'awoooi';"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS idx_playbooks_project_id "
"ON playbooks (project_id);"
))
await conn.execute(text(
"ALTER TABLE audit_logs "
"ADD COLUMN IF NOT EXISTS project_id VARCHAR(64) NOT NULL DEFAULT 'awoooi';"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS idx_audit_logs_project_id "
"ON audit_logs (project_id);"
))
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 自我治理閉環
# ADR-087: ai_governance_events 不可變 Event Sourcing 表
# asyncpg 不允許 prepared statement 內多條指令,必須分開 execute
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_ai_governance_event_type "
"ON ai_governance_events (event_type);"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_ai_governance_triggered_at "
"ON ai_governance_events (triggered_at);"
))
await conn.execute(text(
"CREATE INDEX IF NOT EXISTS ix_ai_governance_resolved "
"ON ai_governance_events (resolved);"
))
async def close_db() -> None:
"""
Close database connections
Call this at application shutdown.
"""
global _engine, _session_factory
if _engine is not None:
await _engine.dispose()
_engine = None
_session_factory = None