#!/usr/bin/env python3 """ Phase 18 AuditLog Migration Script =================================== 執行 Phase 18 新增字段的數據庫遷移 使用方式: cd apps/api && python scripts/run_migration.py """ import asyncio import os from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine # 2026-04-22 ogt: 移除硬碼 changeme,改為讀取環境變數(強制要求設定)。 # 執行前: export DATABASE_URL="postgresql+asyncpg://awoooi:@192.168.0.188:5432/awoooi_prod" DATABASE_URL = os.environ["DATABASE_URL"] MIGRATION_SQLS = [ # 1. authorization_channel """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'audit_logs' AND column_name = 'authorization_channel' ) THEN ALTER TABLE audit_logs ADD COLUMN authorization_channel VARCHAR(20); END IF; END $$; """, # 2. retry_count """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'audit_logs' AND column_name = 'retry_count' ) THEN ALTER TABLE audit_logs ADD COLUMN retry_count INTEGER DEFAULT 0 NOT NULL; END IF; END $$; """, # 3. failure_classification """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'audit_logs' AND column_name = 'failure_classification' ) THEN ALTER TABLE audit_logs ADD COLUMN failure_classification VARCHAR(50); END IF; END $$; """, # 4. source_approval_id """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'audit_logs' AND column_name = 'source_approval_id' ) THEN ALTER TABLE audit_logs ADD COLUMN source_approval_id VARCHAR(36); END IF; END $$; """, # 5. auto_repair_attempted """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_attempted' ) THEN ALTER TABLE audit_logs ADD COLUMN auto_repair_attempted BOOLEAN DEFAULT FALSE NOT NULL; END IF; END $$; """, # 6. auto_repair_result """ DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_result' ) THEN ALTER TABLE audit_logs ADD COLUMN auto_repair_result TEXT; END IF; END $$; """, # 創建索引 "CREATE INDEX IF NOT EXISTS ix_audit_authorization_channel ON audit_logs(authorization_channel);", "CREATE INDEX IF NOT EXISTS ix_audit_failure_classification ON audit_logs(failure_classification);", "CREATE INDEX IF NOT EXISTS ix_audit_source_approval_id ON audit_logs(source_approval_id);", ] async def run_migration(): """執行遷移""" print("=" * 60) print("Phase 18 AuditLog Migration") print("=" * 60) engine = create_async_engine(DATABASE_URL, echo=False) async with engine.begin() as conn: # 執行遷移 for i, sql in enumerate(MIGRATION_SQLS, 1): try: await conn.execute(text(sql)) print(f"✅ Step {i}/{len(MIGRATION_SQLS)} completed") except Exception as e: print(f"❌ Step {i} failed: {e}") # 驗證結果 print("\n" + "=" * 60) print("驗證欄位:") print("=" * 60) result = await conn.execute(text(""" SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_name = 'audit_logs' ORDER BY ordinal_position """)) for row in result: print(f" {row[0]}: {row[1]} (nullable={row[2]}, default={row[3]})") await engine.dispose() print("\n✅ Migration completed!") if __name__ == "__main__": asyncio.run(run_migration())