Files
ewoooc/services/market_intel/schema_db_probe.py

173 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報正式 DB schema 只讀探針。
本模組只查詢系統 catalog不使用 DatabaseManager、不呼叫 create_all、不寫入。
"""
from sqlalchemy import bindparam, create_engine, text
def _build_table_status(expected_tables, existing_tables):
existing_set = set(existing_tables)
return [
{
"table": table_name,
"exists": table_name in existing_set,
}
for table_name in expected_tables
]
def _probe_postgresql(conn, expected_tables):
existing_tables = []
for table_name in expected_tables:
exists = conn.execute(
text(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = ANY (current_schemas(false))
AND table_name = :table_name
)
"""
),
{"table_name": table_name},
).scalar()
if exists:
existing_tables.append(table_name)
return existing_tables
def _probe_sqlite(conn, expected_tables):
rows = conn.execute(
text(
"""
SELECT name
FROM sqlite_master
WHERE type = 'table'
AND name IN :table_names
"""
).bindparams(bindparam("table_names", expanding=True)),
{"table_names": tuple(expected_tables)},
).fetchall()
return [row[0] for row in rows]
def build_schema_db_probe_plan(
expected_tables,
*,
execute_requested=False,
database_url=None,
database_type=None,
engine=None,
):
"""建立 DB schema 探針結果;預設只回 planned不連 DB。"""
expected_tables = list(expected_tables)
if not execute_requested:
return {
"mode": "schema_db_probe_planned",
"execute_requested": False,
"read_only_query_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"migration_executed": False,
"schema_tables_exist": False,
"expected_tables": expected_tables,
"existing_tables": [],
"missing_tables": expected_tables,
"table_statuses": _build_table_status(expected_tables, []),
"blocked_reasons": [
"execute_false_planned_only",
"migration_not_executed_by_this_probe",
"seed_write_still_blocked",
],
}
from config import DATABASE_PATH, DATABASE_TYPE
effective_database_type = (database_type or DATABASE_TYPE or "").lower()
effective_database_url = database_url or DATABASE_PATH
created_engine = False
connection_opened = False
try:
if engine is None:
connect_args = {}
if effective_database_type == "postgresql":
connect_args = {
"connect_timeout": 8,
"options": "-c statement_timeout=15000",
}
engine = create_engine(
effective_database_url,
isolation_level="AUTOCOMMIT",
pool_pre_ping=True,
connect_args=connect_args,
)
created_engine = True
with engine.connect() as conn:
connection_opened = True
if effective_database_type == "postgresql":
existing_tables = _probe_postgresql(conn, expected_tables)
else:
existing_tables = _probe_sqlite(conn, expected_tables)
missing_tables = [
table_name for table_name in expected_tables
if table_name not in set(existing_tables)
]
schema_tables_exist = not missing_tables
blocked_reasons = [
"migration_not_executed_by_this_probe",
"seed_write_still_blocked",
]
if not schema_tables_exist:
blocked_reasons.insert(0, "market_tables_missing")
return {
"mode": "schema_db_probe_read_only",
"execute_requested": True,
"read_only_query_executed": True,
"database_connection_opened": connection_opened,
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"migration_executed": False,
"schema_tables_exist": schema_tables_exist,
"expected_tables": expected_tables,
"existing_tables": existing_tables,
"missing_tables": missing_tables,
"table_statuses": _build_table_status(expected_tables, existing_tables),
"blocked_reasons": blocked_reasons,
}
except Exception as exc:
return {
"mode": "schema_db_probe_error",
"execute_requested": True,
"read_only_query_executed": False,
"database_connection_opened": connection_opened,
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"migration_executed": False,
"schema_tables_exist": False,
"expected_tables": expected_tables,
"existing_tables": [],
"missing_tables": expected_tables,
"table_statuses": _build_table_status(expected_tables, []),
"blocked_reasons": [
"schema_db_probe_error",
"seed_write_still_blocked",
],
"error_message": str(exc),
}
finally:
if created_engine:
engine.dispose()