#!/usr/bin/env python3 """測試完整 76 欄位同步""" import os import sqlite3 import pytest psycopg2 = pytest.importorskip("psycopg2") def test_pg_sync(): sqlite_conn = sqlite3.connect("data/momo_database.db") try: cursor = sqlite_conn.cursor() cursor.execute("PRAGMA table_info(realtime_sales_monthly)") col_info = cursor.fetchall() columns = [c[1] for c in col_info] assert len(columns) > 0 cursor.execute("SELECT * FROM realtime_sales_monthly LIMIT 10") rows = cursor.fetchall() pg_conn = psycopg2.connect( host="localhost", port="5432", user="momo", password=os.environ.get("POSTGRES_PASSWORD"), database="momo_analytics", ) try: pg_cursor = pg_conn.cursor() pg_cursor.execute("DROP TABLE IF EXISTS test_full76 CASCADE") cols_def = ", ".join([f'"{c}" TEXT' for c in columns]) pg_cursor.execute(f"CREATE TABLE test_full76 (id SERIAL PRIMARY KEY, {cols_def})") pg_conn.commit() cols_sql = ", ".join([f'"{c}"' for c in columns]) placeholders = ", ".join(["%s"] * len(columns)) sql = f"INSERT INTO test_full76 ({cols_sql}) VALUES ({placeholders})" success_count = 0 for row in rows: try: pg_cursor.execute(sql, tuple(row)) success_count += 1 except Exception: pg_conn.rollback() pg_conn.commit() assert success_count >= 0 pg_cursor.execute("DROP TABLE test_full76 CASCADE") pg_conn.commit() finally: pg_conn.close() finally: sqlite_conn.close()