440 lines
14 KiB
Python
440 lines
14 KiB
Python
"""既有資料來源橋接 preview。
|
||
|
||
本模組只讀取既有 EDM / PChome 資料表的摘要,產生未來導入 market_* 的
|
||
去重與映射計畫;不寫入 DB、不建立 ORM session、不掛 scheduler。
|
||
"""
|
||
|
||
from datetime import date, datetime
|
||
from decimal import Decimal
|
||
|
||
from sqlalchemy import create_engine, text
|
||
|
||
|
||
LEGACY_SOURCE_TABLES = (
|
||
{
|
||
"table": "promo_products",
|
||
"source_code": "momo_promo_products",
|
||
"platform_code": "momo",
|
||
"description": "既有 MOMO EDM / festival 活動商品資料,可作為 market_campaigns 與 market_campaign_products 的導入來源。",
|
||
"planned_targets": ["market_campaigns", "market_campaign_products"],
|
||
},
|
||
{
|
||
"table": "competitor_prices",
|
||
"source_code": "pchome_competitor_prices",
|
||
"platform_code": "pchome",
|
||
"description": "既有 PChome 最新比價快取,可作為商品比對與價格威脅背景,不直接冒充活動頁商品。",
|
||
"planned_targets": ["market_product_matches"],
|
||
},
|
||
{
|
||
"table": "competitor_price_history",
|
||
"source_code": "pchome_competitor_price_history",
|
||
"platform_code": "pchome",
|
||
"description": "既有 PChome 比價歷史,可在 market 商品與 campaign 建立後作為價格趨勢參考。",
|
||
"planned_targets": ["market_product_price_history"],
|
||
},
|
||
)
|
||
|
||
|
||
BRIDGE_OPERATIONS = (
|
||
{
|
||
"source_table": "promo_products",
|
||
"target_table": "market_campaigns",
|
||
"operation": "derive_momo_campaign_from_page_type_batch_and_activity_time",
|
||
"dedupe_key": "platform_code + campaign_key",
|
||
"write_status": "preview_only",
|
||
},
|
||
{
|
||
"source_table": "promo_products",
|
||
"target_table": "market_campaign_products",
|
||
"operation": "map_i_code_price_discount_url_into_campaign_product",
|
||
"dedupe_key": "campaign_id + platform_code + platform_product_id",
|
||
"write_status": "preview_only",
|
||
},
|
||
{
|
||
"source_table": "competitor_prices",
|
||
"target_table": "market_product_matches",
|
||
"operation": "reuse_pchome_match_score_as_review_seed",
|
||
"dedupe_key": "market_product_id + momo_i_code",
|
||
"write_status": "preview_only",
|
||
},
|
||
{
|
||
"source_table": "competitor_price_history",
|
||
"target_table": "market_product_price_history",
|
||
"operation": "defer_until_market_product_and_campaign_exist",
|
||
"dedupe_key": "platform_code + platform_product_id + crawled_at",
|
||
"write_status": "blocked_until_campaign_product_exists",
|
||
},
|
||
)
|
||
|
||
|
||
DUPLICATE_CONTROLS = (
|
||
{
|
||
"key": "momo_campaign_key",
|
||
"rule": "campaign_key 使用 momo:{page_type}:{batch_id 或 activity_time_text}:{time_slot},避免 EDM 批次重複建立活動。",
|
||
},
|
||
{
|
||
"key": "momo_product_key",
|
||
"rule": "platform_product_id 使用 promo_products.i_code,並依 market_campaign_products unique key 去重。",
|
||
},
|
||
{
|
||
"key": "pchome_match_key",
|
||
"rule": "PChome 比價資料只進入比對候選,不直接建立活動商品,避免和未來 PChome 活動 crawler 重複。",
|
||
},
|
||
)
|
||
|
||
|
||
def _safe_value(value):
|
||
if isinstance(value, (datetime, date)):
|
||
return value.isoformat()
|
||
if isinstance(value, Decimal):
|
||
return float(value)
|
||
return value
|
||
|
||
|
||
def _row_to_dict(row):
|
||
return {
|
||
key: _safe_value(value)
|
||
for key, value in dict(row._mapping).items()
|
||
}
|
||
|
||
|
||
def _planned_source_summaries():
|
||
return [
|
||
{
|
||
**source,
|
||
"exists": False,
|
||
"row_count": 0,
|
||
"distinct_entity_count": 0,
|
||
"last_seen_at": None,
|
||
"breakdown": [],
|
||
"sample_rows": [],
|
||
"read_status": "planned_no_db_connection",
|
||
}
|
||
for source in LEGACY_SOURCE_TABLES
|
||
]
|
||
|
||
|
||
def _probe_postgresql_table(conn, table_name):
|
||
return bool(
|
||
conn.execute(
|
||
text(
|
||
"""
|
||
SELECT EXISTS (
|
||
SELECT 1
|
||
FROM information_schema.tables
|
||
WHERE table_schema = ANY (current_schemas(false))
|
||
AND table_name = :table_name
|
||
)
|
||
"""
|
||
),
|
||
{"table_name": table_name},
|
||
).scalar()
|
||
)
|
||
|
||
|
||
def _probe_sqlite_table(conn, table_name):
|
||
return bool(
|
||
conn.execute(
|
||
text(
|
||
"""
|
||
SELECT 1
|
||
FROM sqlite_master
|
||
WHERE type = 'table'
|
||
AND name = :table_name
|
||
LIMIT 1
|
||
"""
|
||
),
|
||
{"table_name": table_name},
|
||
).fetchone()
|
||
)
|
||
|
||
|
||
def _table_exists(conn, table_name, database_type):
|
||
if database_type == "postgresql":
|
||
return _probe_postgresql_table(conn, table_name)
|
||
return _probe_sqlite_table(conn, table_name)
|
||
|
||
|
||
def _query_promo_products_summary(conn, sample_limit):
|
||
summary = _row_to_dict(
|
||
conn.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
COUNT(*) AS row_count,
|
||
COUNT(DISTINCT batch_id) AS batch_count,
|
||
COUNT(DISTINCT i_code) AS distinct_entity_count,
|
||
MAX(crawled_at) AS last_seen_at
|
||
FROM promo_products
|
||
"""
|
||
)
|
||
).fetchone()
|
||
)
|
||
breakdown = [
|
||
_row_to_dict(row)
|
||
for row in conn.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
COALESCE(page_type, 'unknown') AS source_key,
|
||
COUNT(*) AS row_count,
|
||
COUNT(DISTINCT i_code) AS distinct_entity_count,
|
||
MAX(crawled_at) AS last_seen_at
|
||
FROM promo_products
|
||
GROUP BY COALESCE(page_type, 'unknown')
|
||
ORDER BY row_count DESC
|
||
LIMIT :sample_limit
|
||
"""
|
||
),
|
||
{"sample_limit": sample_limit},
|
||
).fetchall()
|
||
]
|
||
samples = [
|
||
_row_to_dict(row)
|
||
for row in conn.execute(
|
||
text(
|
||
"""
|
||
SELECT
|
||
page_type,
|
||
batch_id,
|
||
i_code,
|
||
name,
|
||
price,
|
||
discount_text,
|
||
url,
|
||
crawled_at
|
||
FROM promo_products
|
||
ORDER BY crawled_at DESC
|
||
LIMIT :sample_limit
|
||
"""
|
||
),
|
||
{"sample_limit": sample_limit},
|
||
).fetchall()
|
||
]
|
||
return summary, breakdown, samples
|
||
|
||
|
||
def _query_competitor_prices_summary(conn, table_name, sample_limit):
|
||
summary = _row_to_dict(
|
||
conn.execute(
|
||
text(
|
||
f"""
|
||
SELECT
|
||
COUNT(*) AS row_count,
|
||
COUNT(DISTINCT sku) AS sku_count,
|
||
COUNT(DISTINCT competitor_product_id) AS distinct_entity_count,
|
||
MAX(crawled_at) AS last_seen_at
|
||
FROM {table_name}
|
||
"""
|
||
)
|
||
).fetchone()
|
||
)
|
||
breakdown = [
|
||
_row_to_dict(row)
|
||
for row in conn.execute(
|
||
text(
|
||
f"""
|
||
SELECT
|
||
COALESCE(source, 'unknown') AS source_key,
|
||
COUNT(*) AS row_count,
|
||
COUNT(DISTINCT sku) AS sku_count,
|
||
COUNT(DISTINCT competitor_product_id) AS distinct_entity_count,
|
||
MAX(crawled_at) AS last_seen_at
|
||
FROM {table_name}
|
||
GROUP BY COALESCE(source, 'unknown')
|
||
ORDER BY row_count DESC
|
||
LIMIT :sample_limit
|
||
"""
|
||
),
|
||
{"sample_limit": sample_limit},
|
||
).fetchall()
|
||
]
|
||
samples = [
|
||
_row_to_dict(row)
|
||
for row in conn.execute(
|
||
text(
|
||
f"""
|
||
SELECT
|
||
source,
|
||
sku,
|
||
competitor_product_id,
|
||
competitor_product_name,
|
||
price,
|
||
original_price,
|
||
match_score,
|
||
crawled_at
|
||
FROM {table_name}
|
||
ORDER BY crawled_at DESC
|
||
LIMIT :sample_limit
|
||
"""
|
||
),
|
||
{"sample_limit": sample_limit},
|
||
).fetchall()
|
||
]
|
||
return summary, breakdown, samples
|
||
|
||
|
||
def _query_source_summary(conn, source, database_type, sample_limit):
|
||
table_name = source["table"]
|
||
exists = _table_exists(conn, table_name, database_type)
|
||
if not exists:
|
||
return {
|
||
**source,
|
||
"exists": False,
|
||
"row_count": 0,
|
||
"distinct_entity_count": 0,
|
||
"last_seen_at": None,
|
||
"breakdown": [],
|
||
"sample_rows": [],
|
||
"read_status": "missing_table",
|
||
}
|
||
|
||
if table_name == "promo_products":
|
||
summary, breakdown, samples = _query_promo_products_summary(conn, sample_limit)
|
||
else:
|
||
summary, breakdown, samples = _query_competitor_prices_summary(
|
||
conn,
|
||
table_name,
|
||
sample_limit,
|
||
)
|
||
|
||
return {
|
||
**source,
|
||
"exists": True,
|
||
"row_count": int(summary.get("row_count") or 0),
|
||
"distinct_entity_count": int(summary.get("distinct_entity_count") or 0),
|
||
"last_seen_at": summary.get("last_seen_at"),
|
||
"metrics": summary,
|
||
"breakdown": breakdown,
|
||
"sample_rows": samples,
|
||
"read_status": "read_only_loaded",
|
||
}
|
||
|
||
|
||
def _build_result(
|
||
*,
|
||
mode,
|
||
execute_requested,
|
||
read_only_query_executed,
|
||
database_connection_opened,
|
||
source_summaries,
|
||
error_message=None,
|
||
):
|
||
existing_sources = [
|
||
item["table"] for item in source_summaries
|
||
if item.get("exists")
|
||
]
|
||
missing_sources = [
|
||
item["table"] for item in source_summaries
|
||
if not item.get("exists")
|
||
]
|
||
total_existing_rows = sum(int(item.get("row_count") or 0) for item in source_summaries)
|
||
blocked_reasons = ["legacy_bridge_preview_only", "market_intel_write_still_blocked"]
|
||
if not execute_requested:
|
||
blocked_reasons.insert(0, "execute_false_planned_only")
|
||
if missing_sources:
|
||
blocked_reasons.insert(0, "legacy_source_tables_missing")
|
||
if error_message:
|
||
blocked_reasons.insert(0, "legacy_source_bridge_error")
|
||
|
||
return {
|
||
"mode": mode,
|
||
"execute_requested": bool(execute_requested),
|
||
"read_only_query_executed": bool(read_only_query_executed),
|
||
"database_connection_opened": bool(database_connection_opened),
|
||
"database_session_created": False,
|
||
"explicit_transaction_opened": False,
|
||
"database_write_executed": False,
|
||
"database_commit_executed": False,
|
||
"external_network_executed": False,
|
||
"scheduler_attached": False,
|
||
"source_count": len(source_summaries),
|
||
"existing_source_count": len(existing_sources),
|
||
"existing_sources": existing_sources,
|
||
"missing_sources": missing_sources,
|
||
"source_tables_ready": not missing_sources if read_only_query_executed else False,
|
||
"total_existing_rows": total_existing_rows,
|
||
"source_summaries": source_summaries,
|
||
"bridge_operations": list(BRIDGE_OPERATIONS),
|
||
"duplicate_controls": list(DUPLICATE_CONTROLS),
|
||
"writes_executed": False,
|
||
"would_write_database": False,
|
||
"blocked_reasons": blocked_reasons,
|
||
"error_message": error_message,
|
||
}
|
||
|
||
|
||
def build_legacy_source_bridge_plan(
|
||
*,
|
||
execute_requested=False,
|
||
database_url=None,
|
||
database_type=None,
|
||
engine=None,
|
||
sample_limit=5,
|
||
):
|
||
"""建立既有資料來源橋接計畫;預設只回 planned,不連 DB。"""
|
||
sample_limit = max(1, min(int(sample_limit or 5), 20))
|
||
if not execute_requested:
|
||
return _build_result(
|
||
mode="legacy_source_bridge_planned",
|
||
execute_requested=False,
|
||
read_only_query_executed=False,
|
||
database_connection_opened=False,
|
||
source_summaries=_planned_source_summaries(),
|
||
)
|
||
|
||
from config import DATABASE_PATH, DATABASE_TYPE
|
||
|
||
effective_database_type = (database_type or DATABASE_TYPE or "").lower()
|
||
effective_database_url = database_url or DATABASE_PATH
|
||
created_engine = False
|
||
connection_opened = False
|
||
|
||
try:
|
||
if engine is None:
|
||
connect_args = {}
|
||
if effective_database_type == "postgresql":
|
||
connect_args = {
|
||
"connect_timeout": 8,
|
||
"options": "-c statement_timeout=15000",
|
||
}
|
||
engine = create_engine(
|
||
effective_database_url,
|
||
isolation_level="AUTOCOMMIT",
|
||
pool_pre_ping=True,
|
||
connect_args=connect_args,
|
||
)
|
||
created_engine = True
|
||
|
||
with engine.connect() as conn:
|
||
connection_opened = True
|
||
source_summaries = [
|
||
_query_source_summary(
|
||
conn,
|
||
source,
|
||
effective_database_type,
|
||
sample_limit,
|
||
)
|
||
for source in LEGACY_SOURCE_TABLES
|
||
]
|
||
|
||
return _build_result(
|
||
mode="legacy_source_bridge_read_only",
|
||
execute_requested=True,
|
||
read_only_query_executed=True,
|
||
database_connection_opened=connection_opened,
|
||
source_summaries=source_summaries,
|
||
)
|
||
except Exception as exc:
|
||
return _build_result(
|
||
mode="legacy_source_bridge_error",
|
||
execute_requested=True,
|
||
read_only_query_executed=False,
|
||
database_connection_opened=connection_opened,
|
||
source_summaries=_planned_source_summaries(),
|
||
error_message=str(exc),
|
||
)
|
||
finally:
|
||
if created_engine:
|
||
engine.dispose()
|