V10.609 自動同步外部報價資料
All checks were successful
CD Pipeline / deploy (push) Successful in 1m5s

This commit is contained in:
OoO
2026-06-15 21:34:48 +08:00
parent df6714c3f7
commit a3ace326c8
7 changed files with 593 additions and 5 deletions

View File

@@ -211,6 +211,15 @@ def _has_table(conn, table_name: str) -> bool:
return False
def _quality_score_from_match(value: Any) -> float:
score = _to_float(value)
if score is None:
return 0.0
if 0 <= score <= 1:
score *= 100
return max(0.0, min(100.0, score))
def normalize_external_offer_payload(payload: dict[str, Any]) -> tuple[ExternalOfferPayload | None, list[str]]:
"""把 official API / provider API / manual CSV 的資料轉成同一份欄位。"""
errors: list[str] = []
@@ -276,6 +285,368 @@ def normalize_external_offer_payload(payload: dict[str, Any]) -> tuple[ExternalO
return record, []
def _ensure_external_market_source_seeds(conn) -> None:
if not _has_table(conn, "external_market_sources"):
return
for source in SOURCE_CONTRACTS:
payload = {
"code": source["code"],
"display_name": source["display_name"],
"platform_code": source["platform_code"],
"source_kind": source["source_kind"],
"status": source["status_code"],
"enabled": source["status_code"] == "active",
"write_enabled": False,
"allowed_input_methods_json": json.dumps(source["input_methods"], ensure_ascii=False),
"quality_policy_json": json.dumps({
"minimum_match_status": "verified",
"minimum_quality_score": 76,
}, ensure_ascii=False),
"plain_note": source["plain_note"],
}
if conn.dialect.name == "postgresql":
conn.execute(text("""
INSERT INTO external_market_sources (
code, display_name, platform_code, source_kind, status, enabled,
write_enabled, allowed_input_methods_json, quality_policy_json, plain_note
)
VALUES (
:code, :display_name, :platform_code, :source_kind, :status, :enabled,
:write_enabled, :allowed_input_methods_json, :quality_policy_json, :plain_note
)
ON CONFLICT (code) DO UPDATE SET
display_name = EXCLUDED.display_name,
platform_code = EXCLUDED.platform_code,
source_kind = EXCLUDED.source_kind,
status = EXCLUDED.status,
enabled = EXCLUDED.enabled,
allowed_input_methods_json = EXCLUDED.allowed_input_methods_json,
quality_policy_json = EXCLUDED.quality_policy_json,
plain_note = EXCLUDED.plain_note,
updated_at = NOW()
"""), payload)
else:
conn.execute(text("""
INSERT INTO external_market_sources (
code, display_name, platform_code, source_kind, status, enabled,
write_enabled, allowed_input_methods_json, quality_policy_json, plain_note
)
VALUES (
:code, :display_name, :platform_code, :source_kind, :status, :enabled,
:write_enabled, :allowed_input_methods_json, :quality_policy_json, :plain_note
)
ON CONFLICT (code) DO UPDATE SET
display_name = excluded.display_name,
platform_code = excluded.platform_code,
source_kind = excluded.source_kind,
status = excluded.status,
enabled = excluded.enabled,
allowed_input_methods_json = excluded.allowed_input_methods_json,
quality_policy_json = excluded.quality_policy_json,
plain_note = excluded.plain_note,
updated_at = CURRENT_TIMESTAMP
"""), payload)
def _fetch_legacy_momo_reference_rows(conn, limit: int) -> list[dict[str, Any]]:
if not all(_has_table(conn, table) for table in {"competitor_prices", "products", "price_records"}):
return []
if conn.dialect.name == "postgresql":
sql = """
WITH valid_cp AS (
SELECT DISTINCT ON (cp.sku, cp.competitor_product_id)
cp.sku AS momo_sku,
cp.competitor_product_id AS pchome_product_id,
cp.competitor_product_name AS pchome_product_name,
cp.match_score,
cp.tags::text AS tags,
cp.crawled_at,
cp.expires_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND cp.sku IS NOT NULL
AND cp.competitor_product_id IS NOT NULL
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= 0.76
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.competitor_product_id, cp.crawled_at DESC NULLS LAST
)
SELECT
vc.*,
lm.momo_name,
lm.product_url,
lm.image_url,
lm.momo_price,
lm.momo_price_at
FROM valid_cp vc
JOIN LATERAL (
SELECT
p.name AS momo_name,
p.url AS product_url,
p.image_url AS image_url,
pr.price AS momo_price,
pr.timestamp AS momo_price_at
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.i_code = vc.momo_sku
AND p.status = 'ACTIVE'
AND pr.price IS NOT NULL
AND pr.price > 0
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) lm ON TRUE
ORDER BY COALESCE(lm.momo_price_at, vc.crawled_at) DESC NULLS LAST
LIMIT :limit
"""
else:
sql = """
WITH latest_cp AS (
SELECT
cp.sku AS momo_sku,
cp.competitor_product_id AS pchome_product_id,
cp.competitor_product_name AS pchome_product_name,
cp.match_score,
cp.tags AS tags,
cp.crawled_at,
cp.expires_at,
ROW_NUMBER() OVER (
PARTITION BY cp.sku, cp.competitor_product_id
ORDER BY cp.crawled_at DESC
) AS rn
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND cp.sku IS NOT NULL
AND cp.competitor_product_id IS NOT NULL
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= 0.76
AND COALESCE(cp.tags, '') LIKE '%identity_v2%'
),
latest_momo AS (
SELECT
p.i_code AS momo_sku,
p.name AS momo_name,
p.url AS product_url,
p.image_url AS image_url,
pr.price AS momo_price,
pr.timestamp AS momo_price_at,
ROW_NUMBER() OVER (
PARTITION BY p.i_code
ORDER BY pr.timestamp DESC, pr.id DESC
) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
AND pr.price IS NOT NULL
AND pr.price > 0
)
SELECT
cp.momo_sku,
cp.pchome_product_id,
cp.pchome_product_name,
cp.match_score,
cp.tags,
cp.crawled_at,
cp.expires_at,
lm.momo_name,
lm.product_url,
lm.image_url,
lm.momo_price,
lm.momo_price_at
FROM latest_cp cp
JOIN latest_momo lm ON lm.momo_sku = cp.momo_sku AND lm.rn = 1
WHERE cp.rn = 1
ORDER BY COALESCE(lm.momo_price_at, cp.crawled_at) DESC
LIMIT :limit
"""
return [dict(row) for row in conn.execute(text(sql), {"limit": limit}).mappings().all()]
def _legacy_row_to_external_offer(row: dict[str, Any]) -> dict[str, Any]:
momo_sku = str(row.get("momo_sku") or "").strip()
pchome_product_id = str(row.get("pchome_product_id") or "").strip()
observed_at = row.get("momo_price_at") or row.get("crawled_at")
title = str(row.get("momo_name") or row.get("pchome_product_name") or momo_sku).strip()
quality_score = _quality_score_from_match(row.get("match_score"))
notes = [
"由已確認同款的舊比價快取自動同步",
"MOMO 最新價格作為外部參考價",
]
return {
"source_code": "momo_reference",
"platform_code": "momo",
"source_product_id": momo_sku,
"source_offer_key": f"momo_reference:{momo_sku}:{pchome_product_id}",
"title": title or momo_sku,
"brand": None,
"category_text": None,
"product_url": row.get("product_url"),
"image_url": row.get("image_url"),
"price": _to_float(row.get("momo_price")),
"original_price": None,
"currency": "TWD",
"stock_status": None,
"sold_count": None,
"rating": None,
"review_count": None,
"observed_at": observed_at,
"expires_at": row.get("expires_at"),
"ingestion_method": "legacy_competitor_cache",
"connector_key": "competitor_prices",
"pchome_product_id": pchome_product_id,
"momo_sku": momo_sku,
"match_status": "verified",
"quality_score": quality_score,
"data_quality_status": "verified" if quality_score >= 76 else "needs_review",
"quality_notes_json": json.dumps(notes, ensure_ascii=False),
"raw_payload_json": json.dumps({
"legacy_source": "competitor_prices",
"match_score": str(row.get("match_score") or ""),
"tags": row.get("tags"),
"crawled_at": str(row.get("crawled_at") or ""),
"momo_price_at": str(row.get("momo_price_at") or ""),
}, ensure_ascii=False),
}
def _upsert_external_offer(conn, payload: dict[str, Any]) -> None:
if conn.dialect.name == "postgresql":
sql = """
INSERT INTO external_offers (
source_code, platform_code, source_product_id, source_offer_key, title,
brand, category_text, product_url, image_url, price, original_price,
currency, stock_status, sold_count, rating, review_count, observed_at,
expires_at, ingestion_method, connector_key, pchome_product_id, momo_sku,
match_status, quality_score, data_quality_status, quality_notes_json,
raw_payload_json
)
VALUES (
:source_code, :platform_code, :source_product_id, :source_offer_key, :title,
:brand, :category_text, :product_url, :image_url, :price, :original_price,
:currency, :stock_status, :sold_count, :rating, :review_count, :observed_at,
:expires_at, :ingestion_method, :connector_key, :pchome_product_id, :momo_sku,
:match_status, :quality_score, :data_quality_status, :quality_notes_json,
:raw_payload_json
)
ON CONFLICT ON CONSTRAINT uq_external_offer_source_product_observed DO UPDATE SET
source_offer_key = EXCLUDED.source_offer_key,
title = EXCLUDED.title,
product_url = EXCLUDED.product_url,
image_url = EXCLUDED.image_url,
price = EXCLUDED.price,
original_price = EXCLUDED.original_price,
expires_at = EXCLUDED.expires_at,
connector_key = EXCLUDED.connector_key,
pchome_product_id = EXCLUDED.pchome_product_id,
momo_sku = EXCLUDED.momo_sku,
match_status = EXCLUDED.match_status,
quality_score = EXCLUDED.quality_score,
data_quality_status = EXCLUDED.data_quality_status,
quality_notes_json = EXCLUDED.quality_notes_json,
raw_payload_json = EXCLUDED.raw_payload_json,
updated_at = NOW()
"""
else:
sql = """
INSERT INTO external_offers (
source_code, platform_code, source_product_id, source_offer_key, title,
brand, category_text, product_url, image_url, price, original_price,
currency, stock_status, sold_count, rating, review_count, observed_at,
expires_at, ingestion_method, connector_key, pchome_product_id, momo_sku,
match_status, quality_score, data_quality_status, quality_notes_json,
raw_payload_json
)
VALUES (
:source_code, :platform_code, :source_product_id, :source_offer_key, :title,
:brand, :category_text, :product_url, :image_url, :price, :original_price,
:currency, :stock_status, :sold_count, :rating, :review_count, :observed_at,
:expires_at, :ingestion_method, :connector_key, :pchome_product_id, :momo_sku,
:match_status, :quality_score, :data_quality_status, :quality_notes_json,
:raw_payload_json
)
ON CONFLICT (source_code, source_product_id, observed_at, ingestion_method) DO UPDATE SET
source_offer_key = excluded.source_offer_key,
title = excluded.title,
product_url = excluded.product_url,
image_url = excluded.image_url,
price = excluded.price,
original_price = excluded.original_price,
expires_at = excluded.expires_at,
connector_key = excluded.connector_key,
pchome_product_id = excluded.pchome_product_id,
momo_sku = excluded.momo_sku,
match_status = excluded.match_status,
quality_score = excluded.quality_score,
data_quality_status = excluded.data_quality_status,
quality_notes_json = excluded.quality_notes_json,
raw_payload_json = excluded.raw_payload_json,
updated_at = CURRENT_TIMESTAMP
"""
conn.execute(text(sql), payload)
def sync_legacy_momo_reference_offers(engine, *, limit: int = 500, dry_run: bool = False) -> dict[str, Any]:
"""把既有已確認同款的比價快取自動同步到 external_offers。"""
limit = max(1, min(int(limit or 500), 5000))
generated_at = datetime.now().isoformat(timespec="seconds")
required_tables = {"external_market_sources", "external_offers", "competitor_prices", "products", "price_records"}
with engine.begin() as conn:
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
if missing_tables:
return {
"success": False,
"status": "skipped",
"generated_at": generated_at,
"candidate_count": 0,
"written_count": 0,
"dry_run": dry_run,
"message": "外部報價同步暫時無法執行,缺少必要資料表。",
"missing_tables": missing_tables,
}
_ensure_external_market_source_seeds(conn)
rows = _fetch_legacy_momo_reference_rows(conn, limit)
offers = [_legacy_row_to_external_offer(row) for row in rows]
offers = [
offer for offer in offers
if offer["source_product_id"] and offer["pchome_product_id"] and offer["price"]
]
if not dry_run:
for offer in offers:
_upsert_external_offer(conn, offer)
return {
"success": True,
"status": "dry_run" if dry_run else "synced",
"generated_at": generated_at,
"candidate_count": len(rows),
"written_count": 0 if dry_run else len(offers),
"dry_run": dry_run,
"source_code": "momo_reference",
"message": (
"已完成 MOMO 外部價格參考自動同步。"
if not dry_run
else "已完成 MOMO 外部價格參考同步預檢,尚未寫入資料。"
),
"sample_rows": [
{
"momo_sku": offer["momo_sku"],
"pchome_product_id": offer["pchome_product_id"],
"price": offer["price"],
"quality_score": offer["quality_score"],
}
for offer in offers[:5]
],
}
def _normalize_header(header: str) -> str:
cleaned = str(header or "").strip().replace("\ufeff", "")
for canonical, aliases in CSV_HEADER_ALIASES.items():