Files
ewoooc/services/market_intel/legacy_source_bridge.py
OoO bb6a862dbe
All checks were successful
CD Pipeline / deploy (push) Successful in 1m2s
feat(market-intel): 新增既有資料橋接預覽
2026-05-18 14:19:43 +08:00

440 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""既有資料來源橋接 preview。
本模組只讀取既有 EDM / PChome 資料表的摘要,產生未來導入 market_* 的
去重與映射計畫;不寫入 DB、不建立 ORM session、不掛 scheduler。
"""
from datetime import date, datetime
from decimal import Decimal
from sqlalchemy import create_engine, text
LEGACY_SOURCE_TABLES = (
{
"table": "promo_products",
"source_code": "momo_promo_products",
"platform_code": "momo",
"description": "既有 MOMO EDM / festival 活動商品資料,可作為 market_campaigns 與 market_campaign_products 的導入來源。",
"planned_targets": ["market_campaigns", "market_campaign_products"],
},
{
"table": "competitor_prices",
"source_code": "pchome_competitor_prices",
"platform_code": "pchome",
"description": "既有 PChome 最新比價快取,可作為商品比對與價格威脅背景,不直接冒充活動頁商品。",
"planned_targets": ["market_product_matches"],
},
{
"table": "competitor_price_history",
"source_code": "pchome_competitor_price_history",
"platform_code": "pchome",
"description": "既有 PChome 比價歷史,可在 market 商品與 campaign 建立後作為價格趨勢參考。",
"planned_targets": ["market_product_price_history"],
},
)
BRIDGE_OPERATIONS = (
{
"source_table": "promo_products",
"target_table": "market_campaigns",
"operation": "derive_momo_campaign_from_page_type_batch_and_activity_time",
"dedupe_key": "platform_code + campaign_key",
"write_status": "preview_only",
},
{
"source_table": "promo_products",
"target_table": "market_campaign_products",
"operation": "map_i_code_price_discount_url_into_campaign_product",
"dedupe_key": "campaign_id + platform_code + platform_product_id",
"write_status": "preview_only",
},
{
"source_table": "competitor_prices",
"target_table": "market_product_matches",
"operation": "reuse_pchome_match_score_as_review_seed",
"dedupe_key": "market_product_id + momo_i_code",
"write_status": "preview_only",
},
{
"source_table": "competitor_price_history",
"target_table": "market_product_price_history",
"operation": "defer_until_market_product_and_campaign_exist",
"dedupe_key": "platform_code + platform_product_id + crawled_at",
"write_status": "blocked_until_campaign_product_exists",
},
)
DUPLICATE_CONTROLS = (
{
"key": "momo_campaign_key",
"rule": "campaign_key 使用 momo:{page_type}:{batch_id 或 activity_time_text}:{time_slot},避免 EDM 批次重複建立活動。",
},
{
"key": "momo_product_key",
"rule": "platform_product_id 使用 promo_products.i_code並依 market_campaign_products unique key 去重。",
},
{
"key": "pchome_match_key",
"rule": "PChome 比價資料只進入比對候選,不直接建立活動商品,避免和未來 PChome 活動 crawler 重複。",
},
)
def _safe_value(value):
if isinstance(value, (datetime, date)):
return value.isoformat()
if isinstance(value, Decimal):
return float(value)
return value
def _row_to_dict(row):
return {
key: _safe_value(value)
for key, value in dict(row._mapping).items()
}
def _planned_source_summaries():
return [
{
**source,
"exists": False,
"row_count": 0,
"distinct_entity_count": 0,
"last_seen_at": None,
"breakdown": [],
"sample_rows": [],
"read_status": "planned_no_db_connection",
}
for source in LEGACY_SOURCE_TABLES
]
def _probe_postgresql_table(conn, table_name):
return bool(
conn.execute(
text(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = ANY (current_schemas(false))
AND table_name = :table_name
)
"""
),
{"table_name": table_name},
).scalar()
)
def _probe_sqlite_table(conn, table_name):
return bool(
conn.execute(
text(
"""
SELECT 1
FROM sqlite_master
WHERE type = 'table'
AND name = :table_name
LIMIT 1
"""
),
{"table_name": table_name},
).fetchone()
)
def _table_exists(conn, table_name, database_type):
if database_type == "postgresql":
return _probe_postgresql_table(conn, table_name)
return _probe_sqlite_table(conn, table_name)
def _query_promo_products_summary(conn, sample_limit):
summary = _row_to_dict(
conn.execute(
text(
"""
SELECT
COUNT(*) AS row_count,
COUNT(DISTINCT batch_id) AS batch_count,
COUNT(DISTINCT i_code) AS distinct_entity_count,
MAX(crawled_at) AS last_seen_at
FROM promo_products
"""
)
).fetchone()
)
breakdown = [
_row_to_dict(row)
for row in conn.execute(
text(
"""
SELECT
COALESCE(page_type, 'unknown') AS source_key,
COUNT(*) AS row_count,
COUNT(DISTINCT i_code) AS distinct_entity_count,
MAX(crawled_at) AS last_seen_at
FROM promo_products
GROUP BY COALESCE(page_type, 'unknown')
ORDER BY row_count DESC
LIMIT :sample_limit
"""
),
{"sample_limit": sample_limit},
).fetchall()
]
samples = [
_row_to_dict(row)
for row in conn.execute(
text(
"""
SELECT
page_type,
batch_id,
i_code,
name,
price,
discount_text,
url,
crawled_at
FROM promo_products
ORDER BY crawled_at DESC
LIMIT :sample_limit
"""
),
{"sample_limit": sample_limit},
).fetchall()
]
return summary, breakdown, samples
def _query_competitor_prices_summary(conn, table_name, sample_limit):
summary = _row_to_dict(
conn.execute(
text(
f"""
SELECT
COUNT(*) AS row_count,
COUNT(DISTINCT sku) AS sku_count,
COUNT(DISTINCT competitor_product_id) AS distinct_entity_count,
MAX(crawled_at) AS last_seen_at
FROM {table_name}
"""
)
).fetchone()
)
breakdown = [
_row_to_dict(row)
for row in conn.execute(
text(
f"""
SELECT
COALESCE(source, 'unknown') AS source_key,
COUNT(*) AS row_count,
COUNT(DISTINCT sku) AS sku_count,
COUNT(DISTINCT competitor_product_id) AS distinct_entity_count,
MAX(crawled_at) AS last_seen_at
FROM {table_name}
GROUP BY COALESCE(source, 'unknown')
ORDER BY row_count DESC
LIMIT :sample_limit
"""
),
{"sample_limit": sample_limit},
).fetchall()
]
samples = [
_row_to_dict(row)
for row in conn.execute(
text(
f"""
SELECT
source,
sku,
competitor_product_id,
competitor_product_name,
price,
original_price,
match_score,
crawled_at
FROM {table_name}
ORDER BY crawled_at DESC
LIMIT :sample_limit
"""
),
{"sample_limit": sample_limit},
).fetchall()
]
return summary, breakdown, samples
def _query_source_summary(conn, source, database_type, sample_limit):
table_name = source["table"]
exists = _table_exists(conn, table_name, database_type)
if not exists:
return {
**source,
"exists": False,
"row_count": 0,
"distinct_entity_count": 0,
"last_seen_at": None,
"breakdown": [],
"sample_rows": [],
"read_status": "missing_table",
}
if table_name == "promo_products":
summary, breakdown, samples = _query_promo_products_summary(conn, sample_limit)
else:
summary, breakdown, samples = _query_competitor_prices_summary(
conn,
table_name,
sample_limit,
)
return {
**source,
"exists": True,
"row_count": int(summary.get("row_count") or 0),
"distinct_entity_count": int(summary.get("distinct_entity_count") or 0),
"last_seen_at": summary.get("last_seen_at"),
"metrics": summary,
"breakdown": breakdown,
"sample_rows": samples,
"read_status": "read_only_loaded",
}
def _build_result(
*,
mode,
execute_requested,
read_only_query_executed,
database_connection_opened,
source_summaries,
error_message=None,
):
existing_sources = [
item["table"] for item in source_summaries
if item.get("exists")
]
missing_sources = [
item["table"] for item in source_summaries
if not item.get("exists")
]
total_existing_rows = sum(int(item.get("row_count") or 0) for item in source_summaries)
blocked_reasons = ["legacy_bridge_preview_only", "market_intel_write_still_blocked"]
if not execute_requested:
blocked_reasons.insert(0, "execute_false_planned_only")
if missing_sources:
blocked_reasons.insert(0, "legacy_source_tables_missing")
if error_message:
blocked_reasons.insert(0, "legacy_source_bridge_error")
return {
"mode": mode,
"execute_requested": bool(execute_requested),
"read_only_query_executed": bool(read_only_query_executed),
"database_connection_opened": bool(database_connection_opened),
"database_session_created": False,
"explicit_transaction_opened": False,
"database_write_executed": False,
"database_commit_executed": False,
"external_network_executed": False,
"scheduler_attached": False,
"source_count": len(source_summaries),
"existing_source_count": len(existing_sources),
"existing_sources": existing_sources,
"missing_sources": missing_sources,
"source_tables_ready": not missing_sources if read_only_query_executed else False,
"total_existing_rows": total_existing_rows,
"source_summaries": source_summaries,
"bridge_operations": list(BRIDGE_OPERATIONS),
"duplicate_controls": list(DUPLICATE_CONTROLS),
"writes_executed": False,
"would_write_database": False,
"blocked_reasons": blocked_reasons,
"error_message": error_message,
}
def build_legacy_source_bridge_plan(
*,
execute_requested=False,
database_url=None,
database_type=None,
engine=None,
sample_limit=5,
):
"""建立既有資料來源橋接計畫;預設只回 planned不連 DB。"""
sample_limit = max(1, min(int(sample_limit or 5), 20))
if not execute_requested:
return _build_result(
mode="legacy_source_bridge_planned",
execute_requested=False,
read_only_query_executed=False,
database_connection_opened=False,
source_summaries=_planned_source_summaries(),
)
from config import DATABASE_PATH, DATABASE_TYPE
effective_database_type = (database_type or DATABASE_TYPE or "").lower()
effective_database_url = database_url or DATABASE_PATH
created_engine = False
connection_opened = False
try:
if engine is None:
connect_args = {}
if effective_database_type == "postgresql":
connect_args = {
"connect_timeout": 8,
"options": "-c statement_timeout=15000",
}
engine = create_engine(
effective_database_url,
isolation_level="AUTOCOMMIT",
pool_pre_ping=True,
connect_args=connect_args,
)
created_engine = True
with engine.connect() as conn:
connection_opened = True
source_summaries = [
_query_source_summary(
conn,
source,
effective_database_type,
sample_limit,
)
for source in LEGACY_SOURCE_TABLES
]
return _build_result(
mode="legacy_source_bridge_read_only",
execute_requested=True,
read_only_query_executed=True,
database_connection_opened=connection_opened,
source_summaries=source_summaries,
)
except Exception as exc:
return _build_result(
mode="legacy_source_bridge_error",
execute_requested=True,
read_only_query_executed=False,
database_connection_opened=connection_opened,
source_summaries=_planned_source_summaries(),
error_message=str(exc),
)
finally:
if created_engine:
engine.dispose()