"""市場情報正式 DB schema 只讀探針。 本模組只查詢系統 catalog,不使用 DatabaseManager、不呼叫 create_all、不寫入。 """ from sqlalchemy import bindparam, create_engine, text def _build_table_status(expected_tables, existing_tables): existing_set = set(existing_tables) return [ { "table": table_name, "exists": table_name in existing_set, } for table_name in expected_tables ] def _probe_postgresql(conn, expected_tables): existing_tables = [] for table_name in expected_tables: exists = conn.execute( text( """ SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = ANY (current_schemas(false)) AND table_name = :table_name ) """ ), {"table_name": table_name}, ).scalar() if exists: existing_tables.append(table_name) return existing_tables def _probe_sqlite(conn, expected_tables): rows = conn.execute( text( """ SELECT name FROM sqlite_master WHERE type = 'table' AND name IN :table_names """ ).bindparams(bindparam("table_names", expanding=True)), {"table_names": tuple(expected_tables)}, ).fetchall() return [row[0] for row in rows] def build_schema_db_probe_plan( expected_tables, *, execute_requested=False, database_url=None, database_type=None, engine=None, ): """建立 DB schema 探針結果;預設只回 planned,不連 DB。""" expected_tables = list(expected_tables) if not execute_requested: return { "mode": "schema_db_probe_planned", "execute_requested": False, "read_only_query_executed": False, "database_connection_opened": False, "database_session_created": False, "explicit_transaction_opened": False, "database_write_executed": False, "database_commit_executed": False, "migration_executed": False, "schema_tables_exist": False, "expected_tables": expected_tables, "existing_tables": [], "missing_tables": expected_tables, "table_statuses": _build_table_status(expected_tables, []), "blocked_reasons": [ "execute_false_planned_only", "migration_not_executed_by_this_probe", "seed_write_still_blocked", ], } from config import DATABASE_PATH, DATABASE_TYPE effective_database_type = (database_type or DATABASE_TYPE or "").lower() effective_database_url = database_url or DATABASE_PATH created_engine = False connection_opened = False try: if engine is None: connect_args = {} if effective_database_type == "postgresql": connect_args = { "connect_timeout": 8, "options": "-c statement_timeout=15000", } engine = create_engine( effective_database_url, isolation_level="AUTOCOMMIT", pool_pre_ping=True, connect_args=connect_args, ) created_engine = True with engine.connect() as conn: connection_opened = True if effective_database_type == "postgresql": existing_tables = _probe_postgresql(conn, expected_tables) else: existing_tables = _probe_sqlite(conn, expected_tables) missing_tables = [ table_name for table_name in expected_tables if table_name not in set(existing_tables) ] schema_tables_exist = not missing_tables blocked_reasons = [ "migration_not_executed_by_this_probe", "seed_write_still_blocked", ] if not schema_tables_exist: blocked_reasons.insert(0, "market_tables_missing") return { "mode": "schema_db_probe_read_only", "execute_requested": True, "read_only_query_executed": True, "database_connection_opened": connection_opened, "database_session_created": False, "explicit_transaction_opened": False, "database_write_executed": False, "database_commit_executed": False, "migration_executed": False, "schema_tables_exist": schema_tables_exist, "expected_tables": expected_tables, "existing_tables": existing_tables, "missing_tables": missing_tables, "table_statuses": _build_table_status(expected_tables, existing_tables), "blocked_reasons": blocked_reasons, } except Exception as exc: return { "mode": "schema_db_probe_error", "execute_requested": True, "read_only_query_executed": False, "database_connection_opened": connection_opened, "database_session_created": False, "explicit_transaction_opened": False, "database_write_executed": False, "database_commit_executed": False, "migration_executed": False, "schema_tables_exist": False, "expected_tables": expected_tables, "existing_tables": [], "missing_tables": expected_tables, "table_statuses": _build_table_status(expected_tables, []), "blocked_reasons": [ "schema_db_probe_error", "seed_write_still_blocked", ], "error_message": str(exc), } finally: if created_engine: engine.dispose()