58 lines
1.8 KiB
Python
58 lines
1.8 KiB
Python
#!/usr/bin/env python3
|
|
"""測試完整 76 欄位同步"""
|
|
|
|
import os
|
|
import sqlite3
|
|
|
|
import pytest
|
|
|
|
psycopg2 = pytest.importorskip("psycopg2")
|
|
|
|
|
|
def test_pg_sync():
|
|
sqlite_conn = sqlite3.connect("data/momo_database.db")
|
|
try:
|
|
cursor = sqlite_conn.cursor()
|
|
cursor.execute("PRAGMA table_info(realtime_sales_monthly)")
|
|
col_info = cursor.fetchall()
|
|
columns = [c[1] for c in col_info]
|
|
assert len(columns) > 0
|
|
|
|
cursor.execute("SELECT * FROM realtime_sales_monthly LIMIT 10")
|
|
rows = cursor.fetchall()
|
|
|
|
pg_conn = psycopg2.connect(
|
|
host="localhost",
|
|
port="5432",
|
|
user="momo",
|
|
password=os.environ.get("POSTGRES_PASSWORD"),
|
|
database="momo_analytics",
|
|
)
|
|
try:
|
|
pg_cursor = pg_conn.cursor()
|
|
pg_cursor.execute("DROP TABLE IF EXISTS test_full76 CASCADE")
|
|
cols_def = ", ".join([f'"{c}" TEXT' for c in columns])
|
|
pg_cursor.execute(f"CREATE TABLE test_full76 (id SERIAL PRIMARY KEY, {cols_def})")
|
|
pg_conn.commit()
|
|
|
|
cols_sql = ", ".join([f'"{c}"' for c in columns])
|
|
placeholders = ", ".join(["%s"] * len(columns))
|
|
sql = f"INSERT INTO test_full76 ({cols_sql}) VALUES ({placeholders})"
|
|
|
|
success_count = 0
|
|
for row in rows:
|
|
try:
|
|
pg_cursor.execute(sql, tuple(row))
|
|
success_count += 1
|
|
except Exception:
|
|
pg_conn.rollback()
|
|
|
|
pg_conn.commit()
|
|
assert success_count >= 0
|
|
pg_cursor.execute("DROP TABLE test_full76 CASCADE")
|
|
pg_conn.commit()
|
|
finally:
|
|
pg_conn.close()
|
|
finally:
|
|
sqlite_conn.close()
|