457 lines
17 KiB
Python
457 lines
17 KiB
Python
"""
|
||
Database Base Configuration
|
||
===========================
|
||
CTO-201: Async SQLAlchemy setup
|
||
|
||
Features:
|
||
- SQLAlchemy 2.0 async engine
|
||
- PostgreSQL (asyncpg) - 188 PostgreSQL
|
||
- Session dependency injection
|
||
|
||
統帥鐵律 2026-03-23:
|
||
- 絕對禁止 SQLite
|
||
- 所有 Episodic Memory 必須使用 PostgreSQL
|
||
"""
|
||
|
||
from collections.abc import AsyncGenerator
|
||
from contextlib import asynccontextmanager
|
||
|
||
from fastapi import HTTPException
|
||
from sqlalchemy import text
|
||
from sqlalchemy.ext.asyncio import (
|
||
AsyncEngine,
|
||
AsyncSession,
|
||
async_sessionmaker,
|
||
create_async_engine,
|
||
)
|
||
from sqlalchemy.orm import DeclarativeBase
|
||
|
||
from src.core.config import settings
|
||
from src.core.context import get_current_project_context
|
||
from src.core.logging import get_logger
|
||
|
||
# =============================================================================
|
||
# Base Model
|
||
# =============================================================================
|
||
|
||
class Base(DeclarativeBase):
|
||
"""SQLAlchemy declarative base"""
|
||
pass
|
||
|
||
|
||
# =============================================================================
|
||
# Engine & Session Factory
|
||
# =============================================================================
|
||
|
||
_engine: AsyncEngine | None = None
|
||
_session_factory: async_sessionmaker[AsyncSession] | None = None
|
||
logger = get_logger("awoooi.db")
|
||
|
||
|
||
def _raise_unauthorized_db_context(msg: str) -> None:
|
||
context = get_current_project_context()
|
||
logger.error(
|
||
"db_context_missing",
|
||
reason=msg,
|
||
project_id=context.get("project_id"),
|
||
project_id_source=context.get("source"),
|
||
request_id=context.get("request_id"),
|
||
)
|
||
raise HTTPException(status_code=401, detail="Missing tenant context: project_id is required")
|
||
|
||
|
||
def get_engine() -> AsyncEngine:
|
||
"""
|
||
Get or create async engine
|
||
|
||
統帥鐵律 2026-03-23:
|
||
- 使用 DATABASE_URL (PostgreSQL)
|
||
- 絕對禁止 SQLite
|
||
"""
|
||
global _engine
|
||
if _engine is None:
|
||
database_url = settings.DATABASE_URL
|
||
|
||
# 統帥鐵律: 禁止 SQLite (AWOOOI 憲法)
|
||
# 🔴 違反此規則必須立即報錯,禁止 fallback
|
||
if "sqlite" in database_url.lower():
|
||
import structlog
|
||
logger = structlog.get_logger(__name__)
|
||
logger.error("sqlite_forbidden", url=database_url)
|
||
raise ValueError(
|
||
"SQLite is FORBIDDEN by AWOOOI Constitution. "
|
||
"Set DATABASE_URL to PostgreSQL: postgresql+asyncpg://user:pass@host:5432/db"
|
||
)
|
||
|
||
_engine = create_async_engine(
|
||
database_url,
|
||
echo=settings.DEBUG,
|
||
pool_size=10,
|
||
max_overflow=20,
|
||
pool_pre_ping=True,
|
||
)
|
||
return _engine
|
||
|
||
|
||
def get_session_factory() -> async_sessionmaker[AsyncSession]:
|
||
"""Get or create session factory"""
|
||
global _session_factory
|
||
if _session_factory is None:
|
||
_session_factory = async_sessionmaker(
|
||
bind=get_engine(),
|
||
class_=AsyncSession,
|
||
expire_on_commit=False,
|
||
autoflush=False,
|
||
)
|
||
return _session_factory
|
||
|
||
|
||
# =============================================================================
|
||
# Dependency Injection
|
||
# =============================================================================
|
||
|
||
async def get_db() -> AsyncGenerator[AsyncSession, None]:
|
||
"""
|
||
FastAPI dependency for database session
|
||
|
||
Usage:
|
||
@router.get("/items")
|
||
async def get_items(db: AsyncSession = Depends(get_db)):
|
||
...
|
||
"""
|
||
factory = get_session_factory()
|
||
async with factory() as session:
|
||
try:
|
||
from src.core.context import get_current_project_id
|
||
|
||
# AwoooP Phase 2.3 (2026-05-04 ogt): SET LOCAL app.project_id 讓 RLS Policy 生效
|
||
# Fail-Closed RLS: 遇到未授權情境拋出錯誤而非回退到 "awoooi"
|
||
pid = get_current_project_id()
|
||
if not pid:
|
||
_raise_unauthorized_db_context(
|
||
"Unauthorized: project_id is missing in context (Fail-Closed RLS)"
|
||
)
|
||
|
||
await session.execute(
|
||
text("SELECT set_config('app.project_id', :pid, TRUE)"),
|
||
{"pid": pid},
|
||
)
|
||
yield session
|
||
await session.commit()
|
||
except Exception:
|
||
await session.rollback()
|
||
raise
|
||
|
||
|
||
@asynccontextmanager
|
||
async def get_db_context(project_id: str | None = None) -> AsyncGenerator[AsyncSession, None]:
|
||
"""
|
||
Context manager for database session (non-FastAPI usage)
|
||
|
||
AwoooP Phase 2.3/2.4: 優先序 — 明確參數 > contextvar(缺失則 fail-closed)
|
||
- Phase 2.3: 啟用 RLS tenant isolation(SET LOCAL app.project_id)
|
||
- Phase 2.4: 從 asyncio contextvar 讀取 background loop 的 project_id
|
||
|
||
Usage:
|
||
async with get_db_context() as db: # 繼承 contextvar(缺失將 fail-closed)
|
||
...
|
||
async with get_db_context("other-tenant") as db: # 明確指定 tenant
|
||
...
|
||
"""
|
||
from src.core.context import get_current_project_id
|
||
effective_pid = project_id if project_id is not None else get_current_project_id()
|
||
|
||
if not effective_pid:
|
||
_raise_unauthorized_db_context("Unauthorized: project_id is missing in context (Fail-Closed RLS)")
|
||
|
||
factory = get_session_factory()
|
||
async with factory() as session:
|
||
try:
|
||
await session.execute(
|
||
text("SELECT set_config('app.project_id', :pid, TRUE)"),
|
||
{"pid": effective_pid},
|
||
)
|
||
yield session
|
||
await session.commit()
|
||
except Exception:
|
||
await session.rollback()
|
||
raise
|
||
|
||
|
||
# =============================================================================
|
||
# Initialization
|
||
# =============================================================================
|
||
|
||
_DB_BOOTSTRAP_LOCK_NAME = "awoooi:init_db:ddl"
|
||
|
||
|
||
async def init_db() -> None:
|
||
"""
|
||
Initialize database tables
|
||
|
||
Call this at application startup.
|
||
"""
|
||
engine = get_engine()
|
||
|
||
async with engine.connect() as lock_conn:
|
||
# 2026-05-24 ogt + Codex: 兩個 API replica 同時啟動時,PostgreSQL 會在
|
||
# ALTER TABLE ... IF NOT EXISTS 上互相等待並 deadlock。整段 bootstrap
|
||
# DDL 必須序列化,避免 rollout 因一個 pod CrashLoop 變成 1/2 ready。
|
||
await lock_conn.execute(
|
||
text("SELECT pg_advisory_lock(hashtext(:lock_name))"),
|
||
{"lock_name": _DB_BOOTSTRAP_LOCK_NAME},
|
||
)
|
||
try:
|
||
await _run_init_db_ddl(engine)
|
||
finally:
|
||
await lock_conn.execute(
|
||
text("SELECT pg_advisory_unlock(hashtext(:lock_name))"),
|
||
{"lock_name": _DB_BOOTSTRAP_LOCK_NAME},
|
||
)
|
||
|
||
|
||
async def _run_init_db_ddl(engine: AsyncEngine) -> None:
|
||
"""
|
||
Run idempotent DB bootstrap DDL while caller holds the bootstrap advisory lock.
|
||
"""
|
||
|
||
# 2026-04-15 ogt: 多 replica 並行啟動競爭修復
|
||
# 問題:單一大 transaction 裡兩個 pod 同時建 table → 其中一個 CREATE INDEX 失敗
|
||
# PostgreSQL 中 transaction 內任何錯誤導致整個 transaction ROLLBACK
|
||
# → table + index 全消失 → 下次重啟同樣問題 → 無限 CrashLoop
|
||
# 修法:每個 table 獨立 transaction;先 DROP INDEX IF EXISTS 清殘留孤兒 index;
|
||
# 捕捉 "already exists" 讓並行 pod 優雅跳過
|
||
async with engine.connect() as probe_conn:
|
||
existing = set(await probe_conn.run_sync(
|
||
lambda c: __import__('sqlalchemy', fromlist=['inspect']).inspect(c).get_table_names()
|
||
))
|
||
|
||
for table in Base.metadata.sorted_tables:
|
||
if table.name not in existing:
|
||
try:
|
||
async with engine.begin() as conn:
|
||
# 先清殘留孤兒 index(前次 CrashLoop 留下的部分狀態)
|
||
for index in table.indexes:
|
||
await conn.execute(text(f'DROP INDEX IF EXISTS "{index.name}"'))
|
||
await conn.run_sync(table.create)
|
||
except Exception as exc:
|
||
if "already exists" in str(exc).lower():
|
||
pass # 並行 pod 已建好,忽略
|
||
else:
|
||
raise
|
||
|
||
async with engine.begin() as conn:
|
||
# 2026-04-02 Claude Code: 確保 risklevel enum 包含 'high' 值
|
||
# Phase 23 新增,避免舊 DB 缺少此值導致 InvalidTextRepresentation
|
||
await conn.execute(
|
||
text("""
|
||
DO $$
|
||
BEGIN
|
||
IF NOT EXISTS (
|
||
SELECT 1 FROM pg_enum
|
||
WHERE enumtypid = 'risklevel'::regtype
|
||
AND enumlabel = 'high'
|
||
) THEN
|
||
ALTER TYPE risklevel ADD VALUE 'high';
|
||
END IF;
|
||
END
|
||
$$;
|
||
""")
|
||
)
|
||
|
||
# 2026-04-09 Claude Sonnet 4.6: Sprint 5R C1 修復 — 批准執行閉環 Telegram 訊息持久化欄位
|
||
# create_all 不做 ALTER,需手動補欄位
|
||
await conn.execute(
|
||
text("""
|
||
ALTER TABLE approval_records
|
||
ADD COLUMN IF NOT EXISTS telegram_message_id INTEGER,
|
||
ADD COLUMN IF NOT EXISTS telegram_chat_id INTEGER;
|
||
""")
|
||
)
|
||
|
||
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 3.5 Playbook PostgreSQL 持久化
|
||
# ADR-085: AI 學習成果不可存 Cache — trust_score、EWMA 必須永久保存
|
||
# playbooks 表已存在(15 筆舊資料),補加新欄位
|
||
await conn.execute(
|
||
text("""
|
||
ALTER TABLE playbooks
|
||
ADD COLUMN IF NOT EXISTS trust_score FLOAT NOT NULL DEFAULT 0.3,
|
||
ADD COLUMN IF NOT EXISTS requires_approval_level VARCHAR(20) NOT NULL DEFAULT 'auto',
|
||
ADD COLUMN IF NOT EXISTS stateful_targets JSONB NOT NULL DEFAULT '[]',
|
||
ADD COLUMN IF NOT EXISTS requires_pre_backup BOOLEAN NOT NULL DEFAULT FALSE,
|
||
ADD COLUMN IF NOT EXISTS version INTEGER NOT NULL DEFAULT 1,
|
||
ADD COLUMN IF NOT EXISTS parent_playbook_id VARCHAR(36),
|
||
ADD COLUMN IF NOT EXISTS supersedes_playbook_id VARCHAR(36),
|
||
ADD COLUMN IF NOT EXISTS version_reason TEXT;
|
||
""")
|
||
)
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS ix_playbook_lineage "
|
||
"ON playbooks(parent_playbook_id, version);"
|
||
))
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS ix_playbook_supersedes "
|
||
"ON playbooks(supersedes_playbook_id) WHERE supersedes_playbook_id IS NOT NULL;"
|
||
))
|
||
await conn.execute(text(
|
||
"UPDATE playbooks SET parent_playbook_id = playbook_id WHERE parent_playbook_id IS NULL;"
|
||
))
|
||
|
||
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 4 8D 感官升級
|
||
# ADR-084: EvidenceSnapshot 加入 Phase 4 動態異常上下文(anomaly_context)
|
||
await conn.execute(
|
||
text("""
|
||
ALTER TABLE incident_evidence
|
||
ADD COLUMN IF NOT EXISTS anomaly_context JSONB;
|
||
""")
|
||
)
|
||
|
||
# W2 PR-V1: SelfHealingValidator 補欄 (2026-04-28 ogt + Claude Sonnet 4.6)
|
||
# incident_evidence 加 self_healing_score + self_healing_detail
|
||
# create_all 不做 ALTER,防禦性補加(prod 已存在的表不會自動加欄)
|
||
await conn.execute(
|
||
text("""
|
||
ALTER TABLE incident_evidence
|
||
ADD COLUMN IF NOT EXISTS self_healing_score FLOAT,
|
||
ADD COLUMN IF NOT EXISTS self_healing_detail JSONB;
|
||
""")
|
||
)
|
||
|
||
# 2026-04-29 ogt + Claude Opus 4.7: PR-K1 防禦性 ALTER (db-expert finding)
|
||
# P1.6 (2026-04-24) ORM 已加 timeline_events.incident_id,但 prod 若在 P1.6 前
|
||
# 已建表,create_all 跳過已存在的表 → ALTER 不會跑 → ORM 寫入 SELECT 找不到欄位
|
||
# 補防禦性 IF NOT EXISTS(已有 column 為 no-op,安全)
|
||
await conn.execute(
|
||
text("""
|
||
ALTER TABLE timeline_events
|
||
ADD COLUMN IF NOT EXISTS incident_id VARCHAR(64);
|
||
""")
|
||
)
|
||
|
||
# 2026-04-29 ogt + Claude Opus 4.7: M4 KMWriter 反查鏈 + 冪等補欄
|
||
# CD #1115-1117 全 failure 根因:commit c22e5f33 加 ORM 欄位但無對應 ALTER
|
||
# 錯誤:column "related_approval_id" of relation "knowledge_entries" does not exist
|
||
# 補防禦性 ALTER(同 timeline_events 模式)+ 對應 index
|
||
await conn.execute(
|
||
text("""
|
||
ALTER TABLE knowledge_entries
|
||
ADD COLUMN IF NOT EXISTS related_approval_id VARCHAR(64),
|
||
ADD COLUMN IF NOT EXISTS path_type VARCHAR(50);
|
||
""")
|
||
)
|
||
# M3 冪等 unique index (incident_id + path_type)
|
||
await conn.execute(text(
|
||
"CREATE UNIQUE INDEX IF NOT EXISTS uix_knowledge_incident_path "
|
||
"ON knowledge_entries(related_incident_id, path_type) "
|
||
"WHERE related_incident_id IS NOT NULL AND path_type IS NOT NULL;"
|
||
))
|
||
# M4 反查鏈 partial index(approval_id → KM 反查)
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS ix_knowledge_related_approval "
|
||
"ON knowledge_entries(related_approval_id) "
|
||
"WHERE related_approval_id IS NOT NULL;"
|
||
))
|
||
|
||
# W2 PR-L1 2026-04-28 ogt + Claude Sonnet 4.6: KM→Playbook 互饋回路(飛輪 C3 修復)
|
||
# PlaybookRecord 新增 review_required 欄位
|
||
# 已存在表不會被 create_all 重建,必須手動 ALTER
|
||
await conn.execute(
|
||
text("""
|
||
ALTER TABLE playbooks
|
||
ADD COLUMN IF NOT EXISTS review_required BOOLEAN NOT NULL DEFAULT FALSE;
|
||
""")
|
||
)
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS ix_playbook_review_required "
|
||
"ON playbooks(review_required) WHERE review_required = true;"
|
||
))
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS ix_timeline_incident_id "
|
||
"ON timeline_events(incident_id);"
|
||
))
|
||
|
||
# AwoooP Phase 2.6 (2026-05-04 ogt): budget_ledger 建表(ADR-120 Token Budget Hard Kill)
|
||
await conn.execute(text("""
|
||
CREATE TABLE IF NOT EXISTS budget_ledger (
|
||
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
|
||
project_id VARCHAR(64) NOT NULL DEFAULT 'awoooi',
|
||
agent_id VARCHAR(128),
|
||
run_id UUID,
|
||
model VARCHAR(64),
|
||
provider VARCHAR(32),
|
||
prompt_tokens INT,
|
||
completion_tokens INT,
|
||
cost_usd NUMERIC(10, 4) NOT NULL DEFAULT 0.0000,
|
||
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||
);
|
||
"""))
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS idx_budget_ledger_project_date "
|
||
"ON budget_ledger(project_id, recorded_at DESC);"
|
||
))
|
||
|
||
# AwoooP Phase 2.3 (2026-05-04 ogt): 四表加 project_id(RLS 多租戶隔離)
|
||
# 防禦性 ALTER — 已存在欄位為 no-op,安全。
|
||
# Batch 1 RLS migration 執行後,app.project_id 由 get_db_context() 自動設置。
|
||
await conn.execute(text(
|
||
"ALTER TABLE incidents "
|
||
"ADD COLUMN IF NOT EXISTS project_id VARCHAR(64) NOT NULL DEFAULT 'awoooi';"
|
||
))
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS idx_incidents_project_id "
|
||
"ON incidents (project_id);"
|
||
))
|
||
await conn.execute(text(
|
||
"ALTER TABLE knowledge_entries "
|
||
"ADD COLUMN IF NOT EXISTS project_id VARCHAR(64) NOT NULL DEFAULT 'awoooi';"
|
||
))
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS idx_knowledge_entries_project_id "
|
||
"ON knowledge_entries (project_id);"
|
||
))
|
||
await conn.execute(text(
|
||
"ALTER TABLE playbooks "
|
||
"ADD COLUMN IF NOT EXISTS project_id VARCHAR(64) NOT NULL DEFAULT 'awoooi';"
|
||
))
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS idx_playbooks_project_id "
|
||
"ON playbooks (project_id);"
|
||
))
|
||
await conn.execute(text(
|
||
"ALTER TABLE audit_logs "
|
||
"ADD COLUMN IF NOT EXISTS project_id VARCHAR(64) NOT NULL DEFAULT 'awoooi';"
|
||
))
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS idx_audit_logs_project_id "
|
||
"ON audit_logs (project_id);"
|
||
))
|
||
|
||
# 2026-04-15 ogt + Claude Sonnet 4.6(亞太): Phase 6 自我治理閉環
|
||
# ADR-087: ai_governance_events 不可變 Event Sourcing 表
|
||
# asyncpg 不允許 prepared statement 內多條指令,必須分開 execute
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS ix_ai_governance_event_type "
|
||
"ON ai_governance_events (event_type);"
|
||
))
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS ix_ai_governance_triggered_at "
|
||
"ON ai_governance_events (triggered_at);"
|
||
))
|
||
await conn.execute(text(
|
||
"CREATE INDEX IF NOT EXISTS ix_ai_governance_resolved "
|
||
"ON ai_governance_events (resolved);"
|
||
))
|
||
|
||
|
||
async def close_db() -> None:
|
||
"""
|
||
Close database connections
|
||
|
||
Call this at application shutdown.
|
||
"""
|
||
global _engine, _session_factory
|
||
if _engine is not None:
|
||
await _engine.dispose()
|
||
_engine = None
|
||
_session_factory = None
|