Files
awoooi/apps/api/scripts/run_migration.py
OG T d89f0520f9 fix(api): 修復 34 個 Ruff lint 錯誤
- 自動修復 import 排序、unused imports
- 手動修復 raise from、isinstance union、unused variable
- scripts/ 暫時保留 (非 CI 阻擋)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-29 15:27:49 +08:00

137 lines
3.9 KiB
Python

#!/usr/bin/env python3
"""
Phase 18 AuditLog Migration Script
===================================
執行 Phase 18 新增字段的數據庫遷移
使用方式:
cd apps/api && python scripts/run_migration.py
"""
import asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
# 數據庫連接
DATABASE_URL = "postgresql+asyncpg://awoooi:changeme@192.168.0.188:5432/awoooi_prod"
MIGRATION_SQLS = [
# 1. authorization_channel
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'authorization_channel'
) THEN
ALTER TABLE audit_logs ADD COLUMN authorization_channel VARCHAR(20);
END IF;
END $$;
""",
# 2. retry_count
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'retry_count'
) THEN
ALTER TABLE audit_logs ADD COLUMN retry_count INTEGER DEFAULT 0 NOT NULL;
END IF;
END $$;
""",
# 3. failure_classification
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'failure_classification'
) THEN
ALTER TABLE audit_logs ADD COLUMN failure_classification VARCHAR(50);
END IF;
END $$;
""",
# 4. source_approval_id
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'source_approval_id'
) THEN
ALTER TABLE audit_logs ADD COLUMN source_approval_id VARCHAR(36);
END IF;
END $$;
""",
# 5. auto_repair_attempted
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_attempted'
) THEN
ALTER TABLE audit_logs ADD COLUMN auto_repair_attempted BOOLEAN DEFAULT FALSE NOT NULL;
END IF;
END $$;
""",
# 6. auto_repair_result
"""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_name = 'audit_logs' AND column_name = 'auto_repair_result'
) THEN
ALTER TABLE audit_logs ADD COLUMN auto_repair_result TEXT;
END IF;
END $$;
""",
# 創建索引
"CREATE INDEX IF NOT EXISTS ix_audit_authorization_channel ON audit_logs(authorization_channel);",
"CREATE INDEX IF NOT EXISTS ix_audit_failure_classification ON audit_logs(failure_classification);",
"CREATE INDEX IF NOT EXISTS ix_audit_source_approval_id ON audit_logs(source_approval_id);",
]
async def run_migration():
"""執行遷移"""
print("=" * 60)
print("Phase 18 AuditLog Migration")
print("=" * 60)
engine = create_async_engine(DATABASE_URL, echo=False)
async with engine.begin() as conn:
# 執行遷移
for i, sql in enumerate(MIGRATION_SQLS, 1):
try:
await conn.execute(text(sql))
print(f"✅ Step {i}/{len(MIGRATION_SQLS)} completed")
except Exception as e:
print(f"❌ Step {i} failed: {e}")
# 驗證結果
print("\n" + "=" * 60)
print("驗證欄位:")
print("=" * 60)
result = await conn.execute(text("""
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = 'audit_logs'
ORDER BY ordinal_position
"""))
for row in result:
print(f" {row[0]}: {row[1]} (nullable={row[2]}, default={row[3]})")
await engine.dispose()
print("\n✅ Migration completed!")
if __name__ == "__main__":
asyncio.run(run_migration())