#!/usr/bin/env python3 """Opt-in integration smoke for the legacy SQLite → PostgreSQL sync script.""" import os import sqlite3 import pytest psycopg2 = pytest.importorskip("psycopg2") pytestmark = pytest.mark.skipif( os.environ.get("RUN_PG_SYNC_INTEGRATION") != "1", reason="PG sync integration touches a local PostgreSQL database; set RUN_PG_SYNC_INTEGRATION=1 to run.", ) def test_pg_sync(): if not os.environ.get("POSTGRES_PASSWORD"): pytest.skip("POSTGRES_PASSWORD is required for PG sync integration") sqlite_conn = sqlite3.connect("data/momo_database.db") test_table = "test_full76" try: cursor = sqlite_conn.cursor() cursor.execute("PRAGMA table_info(realtime_sales_monthly)") col_info = cursor.fetchall() columns = [c[1] for c in col_info] assert len(columns) > 0 cursor.execute("SELECT * FROM realtime_sales_monthly LIMIT 10") rows = cursor.fetchall() pg_conn = psycopg2.connect( host="localhost", port="5432", user="momo", password=os.environ.get("POSTGRES_PASSWORD"), database="momo_analytics", ) try: pg_cursor = pg_conn.cursor() pg_cursor.execute(f'DROP TABLE IF EXISTS "{test_table}" CASCADE') cols_def = ", ".join([f'"{c}" TEXT' for c in columns]) pg_cursor.execute(f'CREATE TABLE "{test_table}" (id SERIAL PRIMARY KEY, {cols_def})') pg_conn.commit() cols_sql = ", ".join([f'"{c}"' for c in columns]) placeholders = ", ".join(["%s"] * len(columns)) sql = f'INSERT INTO "{test_table}" ({cols_sql}) VALUES ({placeholders})' success_count = 0 for row in rows: try: pg_cursor.execute(sql, tuple(row)) success_count += 1 except Exception: pg_conn.rollback() pg_conn.commit() assert success_count >= 0 pg_cursor.execute(f'DROP TABLE IF EXISTS "{test_table}" CASCADE') pg_conn.commit() finally: pg_conn.close() finally: sqlite_conn.close()