67 lines
2.2 KiB
Python
67 lines
2.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Opt-in integration smoke for the legacy SQLite → PostgreSQL sync script."""
|
|
|
|
import os
|
|
import sqlite3
|
|
|
|
import pytest
|
|
|
|
psycopg2 = pytest.importorskip("psycopg2")
|
|
|
|
pytestmark = pytest.mark.skipif(
|
|
os.environ.get("RUN_PG_SYNC_INTEGRATION") != "1",
|
|
reason="PG sync integration touches a local PostgreSQL database; set RUN_PG_SYNC_INTEGRATION=1 to run.",
|
|
)
|
|
|
|
|
|
def test_pg_sync():
|
|
if not os.environ.get("POSTGRES_PASSWORD"):
|
|
pytest.skip("POSTGRES_PASSWORD is required for PG sync integration")
|
|
|
|
sqlite_conn = sqlite3.connect("data/momo_database.db")
|
|
test_table = "test_full76"
|
|
try:
|
|
cursor = sqlite_conn.cursor()
|
|
cursor.execute("PRAGMA table_info(realtime_sales_monthly)")
|
|
col_info = cursor.fetchall()
|
|
columns = [c[1] for c in col_info]
|
|
assert len(columns) > 0
|
|
|
|
cursor.execute("SELECT * FROM realtime_sales_monthly LIMIT 10")
|
|
rows = cursor.fetchall()
|
|
|
|
pg_conn = psycopg2.connect(
|
|
host="localhost",
|
|
port="5432",
|
|
user="momo",
|
|
password=os.environ.get("POSTGRES_PASSWORD"),
|
|
database="momo_analytics",
|
|
)
|
|
try:
|
|
pg_cursor = pg_conn.cursor()
|
|
pg_cursor.execute(f'DROP TABLE IF EXISTS "{test_table}" CASCADE')
|
|
cols_def = ", ".join([f'"{c}" TEXT' for c in columns])
|
|
pg_cursor.execute(f'CREATE TABLE "{test_table}" (id SERIAL PRIMARY KEY, {cols_def})')
|
|
pg_conn.commit()
|
|
|
|
cols_sql = ", ".join([f'"{c}"' for c in columns])
|
|
placeholders = ", ".join(["%s"] * len(columns))
|
|
sql = f'INSERT INTO "{test_table}" ({cols_sql}) VALUES ({placeholders})'
|
|
|
|
success_count = 0
|
|
for row in rows:
|
|
try:
|
|
pg_cursor.execute(sql, tuple(row))
|
|
success_count += 1
|
|
except Exception:
|
|
pg_conn.rollback()
|
|
|
|
pg_conn.commit()
|
|
assert success_count >= 0
|
|
pg_cursor.execute(f'DROP TABLE IF EXISTS "{test_table}" CASCADE')
|
|
pg_conn.commit()
|
|
finally:
|
|
pg_conn.close()
|
|
finally:
|
|
sqlite_conn.close()
|