Files
ewoooc/services/market_intel/live_db_inventory.py
OoO e880c91028
All checks were successful
CD Pipeline / deploy (push) Successful in 1m3s
新增市場情報 DB 庫存只讀總覽
2026-05-18 20:37:04 +08:00

373 lines
12 KiB
Python

"""市場情報正式 DB 只讀庫存總覽。
本模組只做 read-only count / group by 查詢,不使用 DatabaseManager、
不建立 ORM session、不寫入、不 commit。
"""
from sqlalchemy import bindparam, create_engine, text
def _table_status(expected_tables, existing_tables, table_counts):
existing_set = set(existing_tables)
return [
{
"table": table_name,
"exists": table_name in existing_set,
"row_count": table_counts.get(table_name) if table_name in existing_set else None,
"status": "loaded" if table_name in existing_set else "missing",
}
for table_name in expected_tables
]
def _planned_table_status(expected_tables):
return [
{
"table": table_name,
"exists": False,
"row_count": None,
"status": "planned",
}
for table_name in expected_tables
]
def _json_safe(value):
if hasattr(value, "isoformat"):
return value.isoformat()
return value
def _row_dict(row):
return {
key: _json_safe(value)
for key, value in dict(row._mapping).items()
}
def _probe_existing_postgresql(conn, expected_tables):
existing_tables = []
for table_name in expected_tables:
exists = conn.execute(
text(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = ANY (current_schemas(false))
AND table_name = :table_name
)
"""
),
{"table_name": table_name},
).scalar()
if exists:
existing_tables.append(table_name)
return existing_tables
def _probe_existing_sqlite(conn, expected_tables):
rows = conn.execute(
text(
"""
SELECT name
FROM sqlite_master
WHERE type = 'table'
AND name IN :table_names
"""
).bindparams(bindparam("table_names", expanding=True)),
{"table_names": tuple(expected_tables)},
).fetchall()
return [row[0] for row in rows]
def _count_rows(conn, table_name):
return conn.execute(text(f'SELECT COUNT(*) FROM "{table_name}"')).scalar() or 0
def _query_rows(conn, sql):
return [_row_dict(row) for row in conn.execute(text(sql)).fetchall()]
def _query_platform_breakdown(conn, existing_tables):
if "market_platforms" not in existing_tables:
return []
return _query_rows(
conn,
"""
SELECT code, name, enabled
FROM market_platforms
ORDER BY code
""",
)
def _query_campaign_status_breakdown(conn, existing_tables):
if "market_campaigns" not in existing_tables:
return []
return _query_rows(
conn,
"""
SELECT platform_code, status, COUNT(*) AS campaign_count
FROM market_campaigns
GROUP BY platform_code, status
ORDER BY platform_code, status
""",
)
def _query_product_activity_summary(conn, existing_tables):
if "market_campaign_products" not in existing_tables:
return []
return _query_rows(
conn,
"""
SELECT
platform_code,
is_active,
COUNT(*) AS product_count,
MAX(last_seen_at) AS latest_seen_at,
AVG(discount_rate) AS avg_discount_rate
FROM market_campaign_products
GROUP BY platform_code, is_active
ORDER BY platform_code, is_active DESC
""",
)
def _query_match_status_breakdown(conn, existing_tables):
if "market_product_matches" not in existing_tables:
return []
return _query_rows(
conn,
"""
SELECT
match_status,
COUNT(*) AS match_count,
AVG(match_score) AS avg_match_score,
MAX(reviewed_at) AS latest_reviewed_at
FROM market_product_matches
GROUP BY match_status
ORDER BY match_status
""",
)
def _query_alert_review_state_breakdown(conn, existing_tables):
if "market_alert_review_queue" not in existing_tables:
return []
return _query_rows(
conn,
"""
SELECT
review_state,
priority_lane,
COUNT(*) AS alert_count,
AVG(total_score) AS avg_total_score,
MAX(updated_at) AS latest_updated_at
FROM market_alert_review_queue
GROUP BY review_state, priority_lane
ORDER BY review_state, priority_lane
""",
)
def _query_crawler_run_summary(conn, existing_tables):
if "market_crawler_runs" not in existing_tables:
return []
return _query_rows(
conn,
"""
SELECT
platform_code,
status,
dry_run,
COUNT(*) AS run_count,
MAX(started_at) AS latest_started_at,
MAX(finished_at) AS latest_finished_at,
SUM(error_count) AS total_errors
FROM market_crawler_runs
GROUP BY platform_code, status, dry_run
ORDER BY platform_code, status, dry_run DESC
""",
)
def build_live_db_inventory_preview(
expected_tables,
*,
execute_requested=False,
database_url=None,
database_type=None,
engine=None,
):
"""建立 market_* 正式 DB 只讀庫存總覽;預設不連 DB。"""
expected_tables = list(expected_tables)
if not execute_requested:
return {
"mode": "live_db_inventory_planned",
"execute_requested": False,
"read_only_query_executed": False,
"database_connection_opened": False,
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"migration_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"expected_tables": expected_tables,
"existing_tables": [],
"missing_tables": expected_tables,
"table_statuses": _planned_table_status(expected_tables),
"table_counts": {},
"total_rows": 0,
"summary_ready": False,
"platform_breakdown": [],
"campaign_status_breakdown": [],
"product_activity_summary": [],
"match_status_breakdown": [],
"alert_review_state_breakdown": [],
"crawler_run_summary": [],
"safety_checks": {
"planned_default_no_db_connection": True,
"database_write_blocked": True,
"migration_not_executed": True,
"scheduler_detached": True,
"external_network_blocked": True,
},
"blocked_reasons": [
"execute_false_planned_only",
"inventory_not_loaded",
"database_write_still_blocked",
],
}
from config import DATABASE_PATH, DATABASE_TYPE
effective_database_type = (database_type or DATABASE_TYPE or "").lower()
effective_database_url = database_url or DATABASE_PATH
created_engine = False
connection_opened = False
try:
if engine is None:
connect_args = {}
if effective_database_type == "postgresql":
connect_args = {
"connect_timeout": 8,
"options": "-c statement_timeout=15000",
}
engine = create_engine(
effective_database_url,
isolation_level="AUTOCOMMIT",
pool_pre_ping=True,
connect_args=connect_args,
)
created_engine = True
with engine.connect() as conn:
connection_opened = True
if effective_database_type == "postgresql":
existing_tables = _probe_existing_postgresql(conn, expected_tables)
else:
existing_tables = _probe_existing_sqlite(conn, expected_tables)
existing_set = set(existing_tables)
table_counts = {
table_name: _count_rows(conn, table_name)
for table_name in expected_tables
if table_name in existing_set
}
platform_breakdown = _query_platform_breakdown(conn, existing_set)
campaign_status_breakdown = _query_campaign_status_breakdown(conn, existing_set)
product_activity_summary = _query_product_activity_summary(conn, existing_set)
match_status_breakdown = _query_match_status_breakdown(conn, existing_set)
alert_review_state_breakdown = _query_alert_review_state_breakdown(conn, existing_set)
crawler_run_summary = _query_crawler_run_summary(conn, existing_set)
missing_tables = [
table_name for table_name in expected_tables
if table_name not in set(existing_tables)
]
blocked_reasons = ["database_write_still_blocked"]
if missing_tables:
blocked_reasons.insert(0, "market_tables_missing")
return {
"mode": "live_db_inventory_read_only",
"execute_requested": True,
"read_only_query_executed": True,
"database_connection_opened": connection_opened,
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"migration_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"expected_tables": expected_tables,
"existing_tables": existing_tables,
"missing_tables": missing_tables,
"table_statuses": _table_status(expected_tables, existing_tables, table_counts),
"table_counts": table_counts,
"total_rows": sum(table_counts.values()),
"summary_ready": not missing_tables,
"platform_breakdown": platform_breakdown,
"campaign_status_breakdown": campaign_status_breakdown,
"product_activity_summary": product_activity_summary,
"match_status_breakdown": match_status_breakdown,
"alert_review_state_breakdown": alert_review_state_breakdown,
"crawler_run_summary": crawler_run_summary,
"safety_checks": {
"read_only_query_executed": True,
"database_write_blocked": True,
"migration_not_executed": True,
"scheduler_detached": True,
"external_network_blocked": True,
},
"blocked_reasons": blocked_reasons,
}
except Exception as exc:
return {
"mode": "live_db_inventory_error",
"execute_requested": True,
"read_only_query_executed": False,
"database_connection_opened": connection_opened,
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"migration_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"expected_tables": expected_tables,
"existing_tables": [],
"missing_tables": expected_tables,
"table_statuses": _planned_table_status(expected_tables),
"table_counts": {},
"total_rows": 0,
"summary_ready": False,
"platform_breakdown": [],
"campaign_status_breakdown": [],
"product_activity_summary": [],
"match_status_breakdown": [],
"alert_review_state_breakdown": [],
"crawler_run_summary": [],
"safety_checks": {
"read_only_query_executed": False,
"database_write_blocked": True,
"migration_not_executed": True,
"scheduler_detached": True,
"external_network_blocked": True,
},
"blocked_reasons": [
"live_db_inventory_probe_error",
"database_write_still_blocked",
],
"error_message": str(exc),
}
finally:
if created_engine:
engine.dispose()