Files
ewoooc/tests/test_pg_sync.py
2026-05-13 11:51:45 +08:00

67 lines
2.2 KiB
Python

#!/usr/bin/env python3
"""Opt-in integration smoke for the legacy SQLite → PostgreSQL sync script."""
import os
import sqlite3
import pytest
psycopg2 = pytest.importorskip("psycopg2")
pytestmark = pytest.mark.skipif(
os.environ.get("RUN_PG_SYNC_INTEGRATION") != "1",
reason="PG sync integration touches a local PostgreSQL database; set RUN_PG_SYNC_INTEGRATION=1 to run.",
)
def test_pg_sync():
if not os.environ.get("POSTGRES_PASSWORD"):
pytest.skip("POSTGRES_PASSWORD is required for PG sync integration")
sqlite_conn = sqlite3.connect("data/momo_database.db")
test_table = "test_full76"
try:
cursor = sqlite_conn.cursor()
cursor.execute("PRAGMA table_info(realtime_sales_monthly)")
col_info = cursor.fetchall()
columns = [c[1] for c in col_info]
assert len(columns) > 0
cursor.execute("SELECT * FROM realtime_sales_monthly LIMIT 10")
rows = cursor.fetchall()
pg_conn = psycopg2.connect(
host="localhost",
port="5432",
user="momo",
password=os.environ.get("POSTGRES_PASSWORD"),
database="momo_analytics",
)
try:
pg_cursor = pg_conn.cursor()
pg_cursor.execute(f'DROP TABLE IF EXISTS "{test_table}" CASCADE')
cols_def = ", ".join([f'"{c}" TEXT' for c in columns])
pg_cursor.execute(f'CREATE TABLE "{test_table}" (id SERIAL PRIMARY KEY, {cols_def})')
pg_conn.commit()
cols_sql = ", ".join([f'"{c}"' for c in columns])
placeholders = ", ".join(["%s"] * len(columns))
sql = f'INSERT INTO "{test_table}" ({cols_sql}) VALUES ({placeholders})'
success_count = 0
for row in rows:
try:
pg_cursor.execute(sql, tuple(row))
success_count += 1
except Exception:
pg_conn.rollback()
pg_conn.commit()
assert success_count >= 0
pg_cursor.execute(f'DROP TABLE IF EXISTS "{test_table}" CASCADE')
pg_conn.commit()
finally:
pg_conn.close()
finally:
sqlite_conn.close()