1886 lines
74 KiB
Python
1886 lines
74 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""外部市場來源與報價的正規化服務。
|
||
|
||
第一版只做資料規格、來源狀態與只讀統計,不主動抓資料、不寫 DB。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import csv
|
||
import io
|
||
import os
|
||
import re
|
||
from urllib.parse import quote
|
||
from urllib.request import Request, urlopen
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timedelta
|
||
from typing import Any
|
||
|
||
from sqlalchemy import inspect, text
|
||
|
||
from services.pchome_growth_cache_state import mark_pchome_growth_cache_stale
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
PCHOME_PUBLIC_PRODUCT_API = "https://ecapi.pchome.com.tw/ecshop/prodapi/v2/prod"
|
||
PCHOME_IMAGE_CDN_BASE = "https://cs-a.ecimg.tw"
|
||
PCHOME_PRODUCT_ID_RE = re.compile(r"^[A-Z0-9]{6}-[A-Z0-9]{9}-[0-9]{3}$")
|
||
|
||
|
||
SOURCE_CONTRACTS = [
|
||
{
|
||
"code": "momo_reference",
|
||
"display_name": "MOMO 外部價格參考",
|
||
"platform_code": "momo",
|
||
"status_code": "active",
|
||
"status_label": "正在使用",
|
||
"source_kind": "legacy_bridge",
|
||
"input_methods": ["既有比價快取", "手動 CSV", "供應商 API"],
|
||
"data_quality_label": "只採用已確認同款",
|
||
"plain_note": "目前用已確認同款的 MOMO 參考價,協助判斷 PChome 商品是否需要調整售價或曝光。",
|
||
},
|
||
{
|
||
"code": "shopee",
|
||
"display_name": "蝦皮",
|
||
"platform_code": "shopee",
|
||
"status_code": "paused",
|
||
"status_label": "先暫停",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "先保留資料接口,等有穩定合法來源後再啟用,不會影響目前作戰清單。",
|
||
},
|
||
{
|
||
"code": "lazada",
|
||
"display_name": "Lazada",
|
||
"platform_code": "lazada",
|
||
"status_code": "paused",
|
||
"status_label": "待接入",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "先建立跨境平台資料欄位,等合法穩定來源後才啟用價格與活動監控。",
|
||
},
|
||
{
|
||
"code": "amazon",
|
||
"display_name": "Amazon",
|
||
"platform_code": "amazon",
|
||
"status_code": "paused",
|
||
"status_label": "待接入",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "保留 SKU / ASIN / Marketplace 維度,未接正式來源前不參與自動判斷。",
|
||
},
|
||
{
|
||
"code": "google_merchant",
|
||
"display_name": "Google Merchant / Shopping",
|
||
"platform_code": "google_merchant",
|
||
"status_code": "paused",
|
||
"status_label": "待接入",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方報表", "官方 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "用於價格競爭力、商品資料完整度與成效 benchmark,待正式資料源接入。",
|
||
},
|
||
{
|
||
"code": "tiktok_shop",
|
||
"display_name": "TikTok Shop",
|
||
"platform_code": "tiktok_shop",
|
||
"status_code": "paused",
|
||
"status_label": "待接入",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "保留短影音電商的商品、促銷與內容導流欄位;未接合法穩定來源前不參與自動判斷。",
|
||
},
|
||
{
|
||
"code": "line_shopping",
|
||
"display_name": "LINE 購物",
|
||
"platform_code": "line_shopping",
|
||
"status_code": "paused",
|
||
"status_label": "待接入",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方報表", "供應商 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "保留本地導購、點數回饋與活動檔期訊號;接入前只列為待補來源。",
|
||
},
|
||
{
|
||
"code": "rakuten",
|
||
"display_name": "Rakuten",
|
||
"platform_code": "rakuten",
|
||
"status_code": "paused",
|
||
"status_label": "待接入",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "保留日本與海外平台比價欄位,未確認來源前只列入待接入清單。",
|
||
},
|
||
{
|
||
"code": "yahoo_shopping",
|
||
"display_name": "Yahoo 購物",
|
||
"platform_code": "yahoo_shopping",
|
||
"status_code": "paused",
|
||
"status_label": "待接入",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "保留本地平台促銷與價格監控欄位,接入前不影響現有作戰清單。",
|
||
},
|
||
{
|
||
"code": "ruten",
|
||
"display_name": "露天",
|
||
"platform_code": "ruten",
|
||
"status_code": "paused",
|
||
"status_label": "待接入",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "保留拍賣與長尾賣場價格訊號;正式來源未確認前不進入自動告警。",
|
||
},
|
||
{
|
||
"code": "shopify_brand_store",
|
||
"display_name": "品牌官網 / Shopify",
|
||
"platform_code": "shopify_brand_store",
|
||
"status_code": "paused",
|
||
"status_label": "待接入",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 API", "商品 Feed", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "保留品牌官網售價、組合與活動檔期,用來和 PChome 主推品對照。",
|
||
},
|
||
{
|
||
"code": "meta_commerce",
|
||
"display_name": "Meta Commerce",
|
||
"platform_code": "meta_commerce",
|
||
"status_code": "paused",
|
||
"status_label": "待接入",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 Catalog API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "保留廣告商品目錄與商品連結欄位,用於後續曝光與導流成效對照。",
|
||
},
|
||
{
|
||
"code": "coupang",
|
||
"display_name": "酷澎",
|
||
"platform_code": "coupang",
|
||
"status_code": "paused",
|
||
"status_label": "先暫停",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "先保留資料接口,等有穩定合法來源後再啟用,不會影響目前作戰清單。",
|
||
},
|
||
]
|
||
|
||
NORMALIZED_OFFER_FIELDS = [
|
||
{
|
||
"name": "source_code",
|
||
"label": "資料來源",
|
||
"required": True,
|
||
"plain_note": "例如 momo_reference、shopee、coupang。",
|
||
},
|
||
{
|
||
"name": "source_product_id",
|
||
"label": "外部商品 ID",
|
||
"required": True,
|
||
"plain_note": "外部平台或資料供應商給的商品編號。",
|
||
},
|
||
{
|
||
"name": "title",
|
||
"label": "商品名稱",
|
||
"required": True,
|
||
"plain_note": "用來做人工確認與名稱比對。",
|
||
},
|
||
{
|
||
"name": "price",
|
||
"label": "售價",
|
||
"required": True,
|
||
"plain_note": "只填可直接比較的成交或頁面售價。",
|
||
},
|
||
{
|
||
"name": "observed_at",
|
||
"label": "資料時間",
|
||
"required": True,
|
||
"plain_note": "這筆價格看到的時間。",
|
||
},
|
||
{
|
||
"name": "ingestion_method",
|
||
"label": "取得方式",
|
||
"required": True,
|
||
"plain_note": "official_api、provider_api、manual_csv 或 legacy_competitor_cache。",
|
||
},
|
||
{
|
||
"name": "pchome_product_id",
|
||
"label": "PChome 商品 ID",
|
||
"required": False,
|
||
"plain_note": "若已確認同款才填,未確認就留空。",
|
||
},
|
||
{
|
||
"name": "quality_score",
|
||
"label": "資料可信度",
|
||
"required": False,
|
||
"plain_note": "0 到 100,低於 76 不進自動告警。",
|
||
},
|
||
]
|
||
|
||
CSV_HEADER_ALIASES = {
|
||
"source_code": {"source_code", "資料來源", "來源", "平台來源"},
|
||
"platform_code": {"platform_code", "平台", "平台代碼"},
|
||
"source_product_id": {
|
||
"source_product_id",
|
||
"外部商品 ID",
|
||
"外部商品ID",
|
||
"平台商品編號",
|
||
"商品ID",
|
||
"商品編號",
|
||
},
|
||
"title": {"title", "商品名稱", "品名", "name"},
|
||
"price": {"price", "售價", "價格", "成交價"},
|
||
"observed_at": {"observed_at", "資料時間", "抓取時間", "看到時間", "時間"},
|
||
"ingestion_method": {"ingestion_method", "取得方式", "匯入方式", "來源方式"},
|
||
"currency": {"currency", "幣別"},
|
||
"original_price": {"original_price", "原價", "牌價"},
|
||
"product_url": {"product_url", "商品網址", "網址", "url"},
|
||
"brand": {"brand", "品牌"},
|
||
"category_text": {"category_text", "分類", "類別"},
|
||
"pchome_product_id": {
|
||
"pchome_product_id",
|
||
"PChome 商品 ID",
|
||
"PChome商品ID",
|
||
"PChome商品編號",
|
||
"pchome_id",
|
||
},
|
||
"momo_sku": {"momo_sku", "MOMO SKU", "momo_sku", "momo_i_code"},
|
||
"match_status": {"match_status", "同款狀態", "比對狀態", "是否同款"},
|
||
"quality_score": {"quality_score", "資料可信度", "可信度", "品質分數"},
|
||
"data_quality_status": {"data_quality_status", "資料狀態", "品質狀態"},
|
||
"quality_note": {"quality_note", "備註", "品質備註"},
|
||
}
|
||
|
||
ALLOWED_SOURCE_CODES = {source["code"] for source in SOURCE_CONTRACTS}
|
||
PAUSED_SOURCE_CODES = {
|
||
source["code"] for source in SOURCE_CONTRACTS if source["status_code"] == "paused"
|
||
}
|
||
ACTIVE_SOURCE_CODES = {
|
||
source["code"] for source in SOURCE_CONTRACTS if source["status_code"] == "active"
|
||
}
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class ExternalOfferPayload:
|
||
source_code: str
|
||
platform_code: str
|
||
source_product_id: str
|
||
title: str
|
||
price: float | None
|
||
observed_at: str
|
||
ingestion_method: str
|
||
currency: str = "TWD"
|
||
original_price: float | None = None
|
||
product_url: str | None = None
|
||
brand: str | None = None
|
||
category_text: str | None = None
|
||
pchome_product_id: str | None = None
|
||
momo_sku: str | None = None
|
||
match_status: str = "unmatched"
|
||
quality_score: float = 0.0
|
||
data_quality_status: str = "needs_review"
|
||
quality_notes: list[str] = field(default_factory=list)
|
||
|
||
def to_record(self) -> dict[str, Any]:
|
||
return {
|
||
"source_code": self.source_code,
|
||
"platform_code": self.platform_code,
|
||
"source_product_id": self.source_product_id,
|
||
"source_offer_key": f"{self.source_code}:{self.source_product_id}",
|
||
"title": self.title,
|
||
"price": self.price,
|
||
"currency": self.currency or "TWD",
|
||
"original_price": self.original_price,
|
||
"product_url": self.product_url,
|
||
"brand": self.brand,
|
||
"category_text": self.category_text,
|
||
"observed_at": self.observed_at,
|
||
"ingestion_method": self.ingestion_method,
|
||
"pchome_product_id": self.pchome_product_id,
|
||
"momo_sku": self.momo_sku,
|
||
"match_status": self.match_status,
|
||
"quality_score": self.quality_score,
|
||
"data_quality_status": self.data_quality_status,
|
||
"quality_notes_json": json.dumps(self.quality_notes, ensure_ascii=False),
|
||
}
|
||
|
||
|
||
def _to_float(value: Any) -> float | None:
|
||
if value is None or value == "":
|
||
return None
|
||
try:
|
||
return float(value)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
|
||
def _load_json_list(value: Any) -> list[Any]:
|
||
if not value:
|
||
return []
|
||
if isinstance(value, list):
|
||
return value
|
||
try:
|
||
parsed = json.loads(value)
|
||
return parsed if isinstance(parsed, list) else []
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
def _load_json_dict(value: Any) -> dict[str, Any]:
|
||
if not value:
|
||
return {}
|
||
if isinstance(value, dict):
|
||
return value
|
||
try:
|
||
parsed = json.loads(value)
|
||
return parsed if isinstance(parsed, dict) else {}
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _looks_like_pchome_product_id(product_id: Any) -> bool:
|
||
return bool(PCHOME_PRODUCT_ID_RE.match(str(product_id or "").strip()))
|
||
|
||
|
||
def _absolute_pchome_image_url(path: Any) -> str:
|
||
image_path = str(path or "").strip()
|
||
if not image_path:
|
||
return ""
|
||
if image_path.startswith("http://") or image_path.startswith("https://"):
|
||
return image_path
|
||
if not image_path.startswith("/"):
|
||
image_path = f"/{image_path}"
|
||
return f"{PCHOME_IMAGE_CDN_BASE}{image_path}"
|
||
|
||
|
||
def _pchome_api_timeout_seconds() -> float:
|
||
try:
|
||
return max(1.0, min(float(os.getenv("PCHOME_PUBLIC_API_TIMEOUT_SECONDS", "4")), 10.0))
|
||
except (TypeError, ValueError):
|
||
return 4.0
|
||
|
||
|
||
def _parse_pchome_jsonp(payload: str) -> dict[str, Any]:
|
||
match = re.search(r"jsonp\((.*)\);\s*}\s*catch", payload or "", re.DOTALL)
|
||
if not match:
|
||
match = re.search(r"jsonp\((.*)\);", payload or "", re.DOTALL)
|
||
if not match:
|
||
return {}
|
||
parsed = json.loads(match.group(1))
|
||
return parsed if isinstance(parsed, dict) else {}
|
||
|
||
|
||
def _fetch_pchome_public_image_map(product_ids: list[str]) -> dict[str, str]:
|
||
"""Fetch PChome product images from the public product API without blocking the list."""
|
||
ids = [
|
||
str(product_id or "").strip()
|
||
for product_id in product_ids
|
||
if _looks_like_pchome_product_id(product_id)
|
||
]
|
||
ids = list(dict.fromkeys(ids))
|
||
if not ids:
|
||
return {}
|
||
|
||
url = (
|
||
f"{PCHOME_PUBLIC_PRODUCT_API}"
|
||
f"?id={quote(','.join(ids), safe=',')}"
|
||
"&fields=Id,Name,Nick,Pic,Price"
|
||
"&_callback=jsonp"
|
||
)
|
||
try:
|
||
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
|
||
with urlopen(request, timeout=_pchome_api_timeout_seconds()) as response:
|
||
raw = response.read(512000).decode("utf-8", "ignore")
|
||
products = _parse_pchome_jsonp(raw)
|
||
except Exception as exc:
|
||
logger.warning("[ExternalOffer] PChome image API failed: %s", exc)
|
||
return {}
|
||
|
||
image_map: dict[str, str] = {}
|
||
for product_id, product in products.items():
|
||
if not isinstance(product, dict):
|
||
continue
|
||
pic = product.get("Pic") if isinstance(product.get("Pic"), dict) else {}
|
||
image_url = _absolute_pchome_image_url(pic.get("B") or pic.get("S") or pic.get("W"))
|
||
if image_url:
|
||
image_map[str(product_id)] = image_url
|
||
return image_map
|
||
|
||
|
||
_PCHOME_PRODUCT_URL_BASE = "https://24h.pchome.com.tw/prod/"
|
||
|
||
_REVIEW_REASON_LABELS = {
|
||
"makeup_catalog_selection_gap": "色號或款式需確認",
|
||
"variant_selection_review": "款式、色號或組合需確認",
|
||
"focused_exact_identity_ysl_blush_catalog": "品名接近,請確認色號",
|
||
"strong_product_line_match": "商品系列接近",
|
||
"strong_exact_spec_match": "規格看起來接近",
|
||
"identity_review": "同款證據待確認",
|
||
"manual_review": "需要人工比對賣場",
|
||
"unit_price_review": "容量或單位價需確認",
|
||
"unit_price_gap": "容量或單位價需確認",
|
||
"catalog_selection_gap": "任選或型錄款需確認",
|
||
"commercial_condition_gap": "活動條件或組合內容需確認",
|
||
"bundle_review": "組合件數需確認",
|
||
"count_review": "件數需確認",
|
||
}
|
||
|
||
|
||
def _build_pchome_product_url(product_id: Any) -> str | None:
|
||
product_id = str(product_id or "").strip()
|
||
if not product_id:
|
||
return None
|
||
if product_id.startswith(("http://", "https://")):
|
||
return product_id
|
||
return f"{_PCHOME_PRODUCT_URL_BASE}{product_id}"
|
||
|
||
|
||
def _review_reason_label(reason: Any) -> str:
|
||
key = str(reason or "").strip()
|
||
if not key:
|
||
return ""
|
||
label = _REVIEW_REASON_LABELS.get(key)
|
||
if label:
|
||
return label
|
||
lowered = key.lower()
|
||
if any(token in lowered for token in ("variant", "catalog", "selection", "color", "shade")):
|
||
return "款式、色號或組合需確認"
|
||
if any(token in lowered for token in ("unit", "capacity", "spec")):
|
||
return "容量或規格需確認"
|
||
if any(token in lowered for token in ("bundle", "count", "set")):
|
||
return "組合或件數需確認"
|
||
if any(token in lowered for token in ("exact", "identity", "match")):
|
||
return "品名或規格接近,仍需人工確認"
|
||
if any(token in lowered for token in ("gap", "conflict", "condition")):
|
||
return "候選資訊有差異,請比對賣場"
|
||
return "候選需要人工確認"
|
||
|
||
|
||
def _humanize_review_reasons(reasons: list[Any]) -> list[str]:
|
||
labels: list[str] = []
|
||
for reason in reasons:
|
||
label = _review_reason_label(reason)
|
||
if label and label not in labels:
|
||
labels.append(label)
|
||
return labels[:3] or ["請比對兩個賣場的品名、容量、色號與組合"]
|
||
|
||
|
||
def _has_table(conn, table_name: str) -> bool:
|
||
try:
|
||
return inspect(conn).has_table(table_name)
|
||
except Exception:
|
||
logger.warning("[ExternalMarket] table probe failed: %s", table_name, exc_info=True)
|
||
return False
|
||
|
||
|
||
def _quality_score_from_match(value: Any) -> float:
|
||
score = _to_float(value)
|
||
if score is None:
|
||
return 0.0
|
||
if 0 <= score <= 1:
|
||
score *= 100
|
||
return max(0.0, min(100.0, score))
|
||
|
||
|
||
def _normalize_source_code(value: Any) -> str:
|
||
raw = str(value or "").strip()
|
||
lower = raw.lower()
|
||
mapping = {
|
||
"momo": "momo_reference",
|
||
"momo_reference": "momo_reference",
|
||
"momo 外部價格參考": "momo_reference",
|
||
"蝦皮": "shopee",
|
||
"shopee": "shopee",
|
||
"酷澎": "coupang",
|
||
"coupang": "coupang",
|
||
}
|
||
return mapping.get(lower, mapping.get(raw, raw))
|
||
|
||
|
||
def _normalize_ingestion_method(value: Any) -> str:
|
||
raw = str(value or "").strip()
|
||
lower = raw.lower()
|
||
mapping = {
|
||
"備用資料": "manual_csv",
|
||
"備援資料": "manual_csv",
|
||
"手動資料": "manual_csv",
|
||
"手動 csv": "manual_csv",
|
||
"手動CSV": "manual_csv",
|
||
"manual_csv": "manual_csv",
|
||
"官方 api": "official_api",
|
||
"官方API": "official_api",
|
||
"official_api": "official_api",
|
||
"供應商 api": "provider_api",
|
||
"供應商API": "provider_api",
|
||
"provider_api": "provider_api",
|
||
}
|
||
return mapping.get(lower, mapping.get(raw, raw))
|
||
|
||
|
||
def _normalize_match_status(value: Any) -> str:
|
||
raw = str(value or "").strip()
|
||
lower = raw.lower()
|
||
mapping = {
|
||
"是": "verified",
|
||
"已確認": "verified",
|
||
"確認同款": "verified",
|
||
"同款": "verified",
|
||
"verified": "verified",
|
||
"否": "unmatched",
|
||
"不是": "unmatched",
|
||
"非同款": "unmatched",
|
||
"待確認": "unmatched",
|
||
"未確認": "unmatched",
|
||
"unmatched": "unmatched",
|
||
}
|
||
return mapping.get(lower, mapping.get(raw, raw))
|
||
|
||
|
||
def normalize_external_offer_payload(payload: dict[str, Any]) -> tuple[ExternalOfferPayload | None, list[str]]:
|
||
"""把 official API / provider API / manual CSV 的資料轉成同一份欄位。"""
|
||
errors: list[str] = []
|
||
source_code = _normalize_source_code(payload.get("source_code"))
|
||
platform_code = str(payload.get("platform_code") or source_code or "").strip()
|
||
source_product_id = str(payload.get("source_product_id") or "").strip()
|
||
title = str(payload.get("title") or payload.get("name") or "").strip()
|
||
ingestion_method = _normalize_ingestion_method(payload.get("ingestion_method"))
|
||
observed_at = str(payload.get("observed_at") or "").strip()
|
||
price = _to_float(payload.get("price"))
|
||
|
||
required_values = {
|
||
"資料來源": source_code,
|
||
"外部商品 ID": source_product_id,
|
||
"商品名稱": title,
|
||
"售價": price,
|
||
"資料時間": observed_at,
|
||
"取得方式": ingestion_method,
|
||
}
|
||
for label, value in required_values.items():
|
||
if value is None or value == "":
|
||
errors.append(f"缺少{label}")
|
||
|
||
if price is not None and price <= 0:
|
||
errors.append("售價必須大於 0")
|
||
|
||
if observed_at:
|
||
try:
|
||
datetime.fromisoformat(observed_at.replace("Z", "+00:00"))
|
||
except ValueError:
|
||
errors.append("資料時間格式需為 ISO 格式,例如 2026-06-15T10:00:00")
|
||
|
||
if errors:
|
||
return None, errors
|
||
|
||
quality_score = _to_float(payload.get("quality_score"))
|
||
if quality_score is None:
|
||
quality_score = 0.0
|
||
quality_notes = _load_json_list(payload.get("quality_notes"))
|
||
if not quality_notes and payload.get("quality_note"):
|
||
quality_notes = [str(payload.get("quality_note"))]
|
||
|
||
record = ExternalOfferPayload(
|
||
source_code=source_code,
|
||
platform_code=platform_code,
|
||
source_product_id=source_product_id,
|
||
title=title,
|
||
price=price,
|
||
observed_at=observed_at,
|
||
ingestion_method=ingestion_method,
|
||
currency=str(payload.get("currency") or "TWD").strip() or "TWD",
|
||
original_price=_to_float(payload.get("original_price")),
|
||
product_url=payload.get("product_url"),
|
||
brand=payload.get("brand"),
|
||
category_text=payload.get("category_text"),
|
||
pchome_product_id=payload.get("pchome_product_id"),
|
||
momo_sku=payload.get("momo_sku"),
|
||
match_status=_normalize_match_status(payload.get("match_status") or "unmatched"),
|
||
quality_score=max(0.0, min(100.0, quality_score)),
|
||
data_quality_status=str(payload.get("data_quality_status") or "needs_review"),
|
||
quality_notes=[str(item) for item in quality_notes],
|
||
)
|
||
return record, []
|
||
|
||
|
||
def _ensure_external_market_source_seeds(conn) -> None:
|
||
if not _has_table(conn, "external_market_sources"):
|
||
return
|
||
|
||
for source in SOURCE_CONTRACTS:
|
||
payload = {
|
||
"code": source["code"],
|
||
"display_name": source["display_name"],
|
||
"platform_code": source["platform_code"],
|
||
"source_kind": source["source_kind"],
|
||
"status": source["status_code"],
|
||
"enabled": source["status_code"] == "active",
|
||
"write_enabled": False,
|
||
"allowed_input_methods_json": json.dumps(source["input_methods"], ensure_ascii=False),
|
||
"quality_policy_json": json.dumps({
|
||
"minimum_match_status": "verified",
|
||
"minimum_quality_score": 76,
|
||
}, ensure_ascii=False),
|
||
"plain_note": source["plain_note"],
|
||
}
|
||
if conn.dialect.name == "postgresql":
|
||
conn.execute(text("""
|
||
INSERT INTO external_market_sources (
|
||
code, display_name, platform_code, source_kind, status, enabled,
|
||
write_enabled, allowed_input_methods_json, quality_policy_json, plain_note
|
||
)
|
||
VALUES (
|
||
:code, :display_name, :platform_code, :source_kind, :status, :enabled,
|
||
:write_enabled, :allowed_input_methods_json, :quality_policy_json, :plain_note
|
||
)
|
||
ON CONFLICT (code) DO UPDATE SET
|
||
display_name = EXCLUDED.display_name,
|
||
platform_code = EXCLUDED.platform_code,
|
||
source_kind = EXCLUDED.source_kind,
|
||
status = EXCLUDED.status,
|
||
enabled = EXCLUDED.enabled,
|
||
allowed_input_methods_json = EXCLUDED.allowed_input_methods_json,
|
||
quality_policy_json = EXCLUDED.quality_policy_json,
|
||
plain_note = EXCLUDED.plain_note,
|
||
updated_at = NOW()
|
||
"""), payload)
|
||
else:
|
||
conn.execute(text("""
|
||
INSERT INTO external_market_sources (
|
||
code, display_name, platform_code, source_kind, status, enabled,
|
||
write_enabled, allowed_input_methods_json, quality_policy_json, plain_note
|
||
)
|
||
VALUES (
|
||
:code, :display_name, :platform_code, :source_kind, :status, :enabled,
|
||
:write_enabled, :allowed_input_methods_json, :quality_policy_json, :plain_note
|
||
)
|
||
ON CONFLICT (code) DO UPDATE SET
|
||
display_name = excluded.display_name,
|
||
platform_code = excluded.platform_code,
|
||
source_kind = excluded.source_kind,
|
||
status = excluded.status,
|
||
enabled = excluded.enabled,
|
||
allowed_input_methods_json = excluded.allowed_input_methods_json,
|
||
quality_policy_json = excluded.quality_policy_json,
|
||
plain_note = excluded.plain_note,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
"""), payload)
|
||
|
||
|
||
def _fetch_legacy_momo_reference_rows(conn, limit: int) -> list[dict[str, Any]]:
|
||
if not all(_has_table(conn, table) for table in {"competitor_prices", "products", "price_records"}):
|
||
return []
|
||
|
||
if conn.dialect.name == "postgresql":
|
||
sql = """
|
||
WITH valid_cp AS (
|
||
SELECT DISTINCT ON (cp.sku, cp.competitor_product_id)
|
||
cp.sku AS momo_sku,
|
||
cp.competitor_product_id AS pchome_product_id,
|
||
cp.competitor_product_name AS pchome_product_name,
|
||
cp.price AS pchome_public_price,
|
||
cp.match_score,
|
||
cp.tags::text AS tags,
|
||
cp.crawled_at,
|
||
cp.expires_at
|
||
FROM competitor_prices cp
|
||
WHERE cp.source = 'pchome'
|
||
AND cp.sku IS NOT NULL
|
||
AND cp.competitor_product_id IS NOT NULL
|
||
AND cp.price IS NOT NULL
|
||
AND cp.price > 0
|
||
AND COALESCE(cp.match_score, 0) >= 0.76
|
||
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
|
||
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
|
||
ORDER BY cp.sku, cp.competitor_product_id, cp.crawled_at DESC NULLS LAST
|
||
)
|
||
SELECT
|
||
vc.*,
|
||
lm.momo_name,
|
||
lm.product_url,
|
||
lm.image_url,
|
||
lm.momo_price,
|
||
lm.momo_price_at
|
||
FROM valid_cp vc
|
||
JOIN LATERAL (
|
||
SELECT
|
||
p.name AS momo_name,
|
||
p.url AS product_url,
|
||
p.image_url AS image_url,
|
||
pr.price AS momo_price,
|
||
pr.timestamp AS momo_price_at
|
||
FROM products p
|
||
JOIN price_records pr ON pr.product_id = p.id
|
||
WHERE p.i_code = vc.momo_sku
|
||
AND p.status = 'ACTIVE'
|
||
AND pr.price IS NOT NULL
|
||
AND pr.price > 0
|
||
ORDER BY pr.timestamp DESC, pr.id DESC
|
||
LIMIT 1
|
||
) lm ON TRUE
|
||
ORDER BY COALESCE(lm.momo_price_at, vc.crawled_at) DESC NULLS LAST
|
||
LIMIT :limit
|
||
"""
|
||
else:
|
||
sql = """
|
||
WITH latest_cp AS (
|
||
SELECT
|
||
cp.sku AS momo_sku,
|
||
cp.competitor_product_id AS pchome_product_id,
|
||
cp.competitor_product_name AS pchome_product_name,
|
||
cp.price AS pchome_public_price,
|
||
cp.match_score,
|
||
cp.tags AS tags,
|
||
cp.crawled_at,
|
||
cp.expires_at,
|
||
ROW_NUMBER() OVER (
|
||
PARTITION BY cp.sku, cp.competitor_product_id
|
||
ORDER BY cp.crawled_at DESC
|
||
) AS rn
|
||
FROM competitor_prices cp
|
||
WHERE cp.source = 'pchome'
|
||
AND cp.sku IS NOT NULL
|
||
AND cp.competitor_product_id IS NOT NULL
|
||
AND cp.price IS NOT NULL
|
||
AND cp.price > 0
|
||
AND COALESCE(cp.match_score, 0) >= 0.76
|
||
AND COALESCE(cp.tags, '') LIKE '%identity_v2%'
|
||
),
|
||
latest_momo AS (
|
||
SELECT
|
||
p.i_code AS momo_sku,
|
||
p.name AS momo_name,
|
||
p.url AS product_url,
|
||
p.image_url AS image_url,
|
||
pr.price AS momo_price,
|
||
pr.timestamp AS momo_price_at,
|
||
ROW_NUMBER() OVER (
|
||
PARTITION BY p.i_code
|
||
ORDER BY pr.timestamp DESC, pr.id DESC
|
||
) AS rn
|
||
FROM products p
|
||
JOIN price_records pr ON pr.product_id = p.id
|
||
WHERE p.status = 'ACTIVE'
|
||
AND pr.price IS NOT NULL
|
||
AND pr.price > 0
|
||
)
|
||
SELECT
|
||
cp.momo_sku,
|
||
cp.pchome_product_id,
|
||
cp.pchome_product_name,
|
||
cp.match_score,
|
||
cp.tags,
|
||
cp.crawled_at,
|
||
cp.expires_at,
|
||
lm.momo_name,
|
||
lm.product_url,
|
||
lm.image_url,
|
||
lm.momo_price,
|
||
lm.momo_price_at
|
||
FROM latest_cp cp
|
||
JOIN latest_momo lm ON lm.momo_sku = cp.momo_sku AND lm.rn = 1
|
||
WHERE cp.rn = 1
|
||
ORDER BY COALESCE(lm.momo_price_at, cp.crawled_at) DESC
|
||
LIMIT :limit
|
||
"""
|
||
|
||
return [dict(row) for row in conn.execute(text(sql), {"limit": limit}).mappings().all()]
|
||
|
||
|
||
def _legacy_row_to_external_offer(row: dict[str, Any]) -> dict[str, Any]:
|
||
momo_sku = str(row.get("momo_sku") or "").strip()
|
||
pchome_product_id = str(row.get("pchome_product_id") or "").strip()
|
||
observed_at = row.get("momo_price_at") or row.get("crawled_at")
|
||
title = str(row.get("momo_name") or row.get("pchome_product_name") or momo_sku).strip()
|
||
quality_score = _quality_score_from_match(row.get("match_score"))
|
||
notes = [
|
||
"由已確認同款的舊比價快取自動同步",
|
||
"MOMO 最新價格作為外部參考價",
|
||
]
|
||
return {
|
||
"source_code": "momo_reference",
|
||
"platform_code": "momo",
|
||
"source_product_id": momo_sku,
|
||
"source_offer_key": f"momo_reference:{momo_sku}:{pchome_product_id}",
|
||
"title": title or momo_sku,
|
||
"brand": None,
|
||
"category_text": None,
|
||
"product_url": row.get("product_url"),
|
||
"image_url": row.get("image_url"),
|
||
"price": _to_float(row.get("momo_price")),
|
||
"original_price": None,
|
||
"currency": "TWD",
|
||
"stock_status": None,
|
||
"sold_count": None,
|
||
"rating": None,
|
||
"review_count": None,
|
||
"observed_at": observed_at,
|
||
"expires_at": row.get("expires_at"),
|
||
"ingestion_method": "legacy_competitor_cache",
|
||
"connector_key": "competitor_prices",
|
||
"pchome_product_id": pchome_product_id,
|
||
"momo_sku": momo_sku,
|
||
"match_status": "verified",
|
||
"quality_score": quality_score,
|
||
"data_quality_status": "verified" if quality_score >= 76 else "needs_review",
|
||
"quality_notes_json": json.dumps(notes, ensure_ascii=False),
|
||
"raw_payload_json": json.dumps({
|
||
"legacy_source": "competitor_prices",
|
||
"pchome_public_price": _to_float(row.get("pchome_public_price")),
|
||
"pchome_public_name": row.get("pchome_product_name"),
|
||
"match_score": str(row.get("match_score") or ""),
|
||
"tags": row.get("tags"),
|
||
"crawled_at": str(row.get("crawled_at") or ""),
|
||
"momo_price_at": str(row.get("momo_price_at") or ""),
|
||
}, ensure_ascii=False),
|
||
}
|
||
|
||
|
||
def _upsert_external_offer(conn, payload: dict[str, Any]) -> None:
|
||
if conn.dialect.name == "postgresql":
|
||
sql = """
|
||
INSERT INTO external_offers (
|
||
source_code, platform_code, source_product_id, source_offer_key, title,
|
||
brand, category_text, product_url, image_url, price, original_price,
|
||
currency, stock_status, sold_count, rating, review_count, observed_at,
|
||
expires_at, ingestion_method, connector_key, pchome_product_id, momo_sku,
|
||
match_status, quality_score, data_quality_status, quality_notes_json,
|
||
raw_payload_json
|
||
)
|
||
VALUES (
|
||
:source_code, :platform_code, :source_product_id, :source_offer_key, :title,
|
||
:brand, :category_text, :product_url, :image_url, :price, :original_price,
|
||
:currency, :stock_status, :sold_count, :rating, :review_count, :observed_at,
|
||
:expires_at, :ingestion_method, :connector_key, :pchome_product_id, :momo_sku,
|
||
:match_status, :quality_score, :data_quality_status, :quality_notes_json,
|
||
:raw_payload_json
|
||
)
|
||
ON CONFLICT ON CONSTRAINT uq_external_offer_source_product_observed DO UPDATE SET
|
||
source_offer_key = EXCLUDED.source_offer_key,
|
||
title = EXCLUDED.title,
|
||
product_url = EXCLUDED.product_url,
|
||
image_url = EXCLUDED.image_url,
|
||
price = EXCLUDED.price,
|
||
original_price = EXCLUDED.original_price,
|
||
expires_at = EXCLUDED.expires_at,
|
||
connector_key = EXCLUDED.connector_key,
|
||
pchome_product_id = EXCLUDED.pchome_product_id,
|
||
momo_sku = EXCLUDED.momo_sku,
|
||
match_status = EXCLUDED.match_status,
|
||
quality_score = EXCLUDED.quality_score,
|
||
data_quality_status = EXCLUDED.data_quality_status,
|
||
quality_notes_json = EXCLUDED.quality_notes_json,
|
||
raw_payload_json = EXCLUDED.raw_payload_json,
|
||
updated_at = NOW()
|
||
"""
|
||
else:
|
||
sql = """
|
||
INSERT INTO external_offers (
|
||
source_code, platform_code, source_product_id, source_offer_key, title,
|
||
brand, category_text, product_url, image_url, price, original_price,
|
||
currency, stock_status, sold_count, rating, review_count, observed_at,
|
||
expires_at, ingestion_method, connector_key, pchome_product_id, momo_sku,
|
||
match_status, quality_score, data_quality_status, quality_notes_json,
|
||
raw_payload_json
|
||
)
|
||
VALUES (
|
||
:source_code, :platform_code, :source_product_id, :source_offer_key, :title,
|
||
:brand, :category_text, :product_url, :image_url, :price, :original_price,
|
||
:currency, :stock_status, :sold_count, :rating, :review_count, :observed_at,
|
||
:expires_at, :ingestion_method, :connector_key, :pchome_product_id, :momo_sku,
|
||
:match_status, :quality_score, :data_quality_status, :quality_notes_json,
|
||
:raw_payload_json
|
||
)
|
||
ON CONFLICT (source_code, source_product_id, observed_at, ingestion_method) DO UPDATE SET
|
||
source_offer_key = excluded.source_offer_key,
|
||
title = excluded.title,
|
||
product_url = excluded.product_url,
|
||
image_url = excluded.image_url,
|
||
price = excluded.price,
|
||
original_price = excluded.original_price,
|
||
expires_at = excluded.expires_at,
|
||
connector_key = excluded.connector_key,
|
||
pchome_product_id = excluded.pchome_product_id,
|
||
momo_sku = excluded.momo_sku,
|
||
match_status = excluded.match_status,
|
||
quality_score = excluded.quality_score,
|
||
data_quality_status = excluded.data_quality_status,
|
||
quality_notes_json = excluded.quality_notes_json,
|
||
raw_payload_json = excluded.raw_payload_json,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
"""
|
||
conn.execute(text(sql), payload)
|
||
|
||
|
||
def _targeted_candidate_auto_type(candidate: dict[str, Any]) -> str:
|
||
explicit_type = str(candidate.get("auto_compare_type") or "").strip()
|
||
if explicit_type:
|
||
return explicit_type
|
||
if candidate.get("can_auto_compare"):
|
||
return "total_price"
|
||
return "manual_review"
|
||
|
||
|
||
def _targeted_candidate_needs_review(candidate: dict[str, Any]) -> bool:
|
||
"""總價自動同步前的最後防線,避免高分但款式待確認的候選進作戰清單。"""
|
||
if candidate.get("target_hard_veto") is True:
|
||
return True
|
||
price_basis = str(candidate.get("target_price_basis") or "").strip()
|
||
alert_tier = str(candidate.get("target_alert_tier") or "").strip()
|
||
if price_basis and price_basis != "total_price":
|
||
return True
|
||
if alert_tier and alert_tier != "price_alert_exact":
|
||
return True
|
||
review_reason_markers = {
|
||
"manual_review",
|
||
"identity_review",
|
||
"unit_price_review",
|
||
"variant_selection_review",
|
||
"variant_option_conflict",
|
||
"variant_descriptor_conflict",
|
||
"makeup_catalog_selection_gap",
|
||
"commercial_condition_gap",
|
||
"count_conflict",
|
||
"bundle_offer_conflict",
|
||
"multi_component_conflict",
|
||
"component_count_conflict",
|
||
}
|
||
reasons = {str(reason or "") for reason in (candidate.get("target_match_reasons") or [])}
|
||
return bool(reasons & review_reason_markers)
|
||
|
||
|
||
def _targeted_candidate_sync_rank(candidate: dict[str, Any]) -> tuple[float, float, float, float]:
|
||
"""同一個 PChome 商品有多個候選時,挑最適合營運判斷的一筆。"""
|
||
auto_type = _targeted_candidate_auto_type(candidate)
|
||
type_rank = 3.0 if auto_type == "total_price" else 2.0 if auto_type == "unit_price" else 0.0
|
||
try:
|
||
match_score = float(candidate.get("target_match_score") or 0.0)
|
||
except (TypeError, ValueError):
|
||
match_score = 0.0
|
||
|
||
unit_price_comparison = (
|
||
candidate.get("target_unit_price_comparison")
|
||
if isinstance(candidate.get("target_unit_price_comparison"), dict)
|
||
else {}
|
||
)
|
||
momo_total = _to_float(unit_price_comparison.get("momo_total_quantity")) or 0.0
|
||
pchome_total = _to_float(unit_price_comparison.get("competitor_total_quantity")) or 0.0
|
||
quantity_delta = 999999.0
|
||
same_quantity = 0.0
|
||
if momo_total > 0 and pchome_total > 0:
|
||
quantity_delta = abs(momo_total - pchome_total)
|
||
same_quantity = 1.0 if quantity_delta <= 0.0001 else 0.0
|
||
|
||
return (type_rank, same_quantity, -quantity_delta, match_score)
|
||
|
||
|
||
def _targeted_candidate_to_external_offer(
|
||
candidate: dict[str, Any],
|
||
*,
|
||
observed_at: datetime,
|
||
) -> tuple[dict[str, Any] | None, str]:
|
||
auto_type = _targeted_candidate_auto_type(candidate)
|
||
if auto_type not in {"total_price", "unit_price"}:
|
||
return None, "不是可自動使用的候選"
|
||
if auto_type == "total_price" and _targeted_candidate_needs_review(candidate):
|
||
return None, "候選仍需人工確認"
|
||
|
||
momo_sku = str(candidate.get("product_id") or candidate.get("goodsCode") or candidate.get("id") or "").strip()
|
||
pchome_product_id = str(candidate.get("target_pchome_product_id") or "").strip()
|
||
momo_price = _to_float(candidate.get("price"))
|
||
pchome_price = _to_float(candidate.get("target_pchome_price"))
|
||
if not momo_sku:
|
||
return None, "缺少 MOMO 商品 ID"
|
||
if not pchome_product_id:
|
||
return None, "缺少 PChome 商品 ID"
|
||
if not momo_price or momo_price <= 0:
|
||
return None, "缺少 MOMO 售價"
|
||
|
||
unit_price_comparison = (
|
||
candidate.get("target_unit_price_comparison")
|
||
if isinstance(candidate.get("target_unit_price_comparison"), dict)
|
||
else {}
|
||
)
|
||
is_unit_price = auto_type == "unit_price"
|
||
if is_unit_price and not unit_price_comparison.get("comparable"):
|
||
return None, "單位價證據不足"
|
||
|
||
match_score = _quality_score_from_match(candidate.get("target_match_score"))
|
||
if is_unit_price:
|
||
quality_score = max(match_score, 82.0)
|
||
price_basis = "unit_price"
|
||
else:
|
||
quality_score = match_score
|
||
price_basis = "total_price"
|
||
if quality_score < 76:
|
||
return None, "同款分數低於自動同步門檻"
|
||
|
||
title = str(candidate.get("name") or candidate.get("title") or momo_sku).strip()
|
||
notes = [
|
||
"由 PChome 商品自動反查 MOMO 候選同步",
|
||
"自動單位價比較" if is_unit_price else "可直接總價比價",
|
||
]
|
||
raw_payload = {
|
||
"source": "pchome_targeted_momo_search",
|
||
"auto_compare_type": auto_type,
|
||
"price_basis": price_basis,
|
||
"pchome_public_price": pchome_price,
|
||
"pchome_public_name": candidate.get("target_pchome_name"),
|
||
"match_score": candidate.get("target_match_score"),
|
||
"match_reasons": candidate.get("target_match_reasons") or [],
|
||
"comparison_mode": candidate.get("target_comparison_mode"),
|
||
"hard_veto": bool(candidate.get("target_hard_veto")),
|
||
"target_gap_pct": candidate.get("target_gap_pct"),
|
||
"unit_price_comparison": unit_price_comparison,
|
||
"search_term": candidate.get("target_search_term"),
|
||
"tags": [
|
||
"identity_v2",
|
||
"source_targeted_momo_search",
|
||
f"price_basis_{price_basis}",
|
||
f"auto_compare_{auto_type}",
|
||
],
|
||
}
|
||
return {
|
||
"source_code": "momo_reference",
|
||
"platform_code": "momo",
|
||
"source_product_id": momo_sku,
|
||
"source_offer_key": f"momo_reference:{momo_sku}:{pchome_product_id}:{price_basis}",
|
||
"title": title or momo_sku,
|
||
"brand": candidate.get("brand"),
|
||
"category_text": candidate.get("category") or candidate.get("category_text"),
|
||
"product_url": candidate.get("product_url") or candidate.get("url"),
|
||
"image_url": candidate.get("image_url"),
|
||
"price": momo_price,
|
||
"original_price": _to_float(candidate.get("original_price")),
|
||
"currency": "TWD",
|
||
"stock_status": None,
|
||
"sold_count": None,
|
||
"rating": None,
|
||
"review_count": None,
|
||
"observed_at": observed_at,
|
||
"expires_at": None,
|
||
"ingestion_method": "targeted_momo_search",
|
||
"connector_key": "pchome_targeted_momo_search",
|
||
"pchome_product_id": pchome_product_id,
|
||
"momo_sku": momo_sku,
|
||
"match_status": "verified",
|
||
"quality_score": round(quality_score, 2),
|
||
"data_quality_status": "verified",
|
||
"quality_notes_json": json.dumps(notes, ensure_ascii=False),
|
||
"raw_payload_json": json.dumps(raw_payload, ensure_ascii=False),
|
||
}, ""
|
||
|
||
|
||
def _targeted_review_candidate_to_external_offer(
|
||
candidate: dict[str, Any],
|
||
*,
|
||
observed_at: datetime,
|
||
) -> tuple[dict[str, Any] | None, str]:
|
||
"""保存待人工確認候選;這類資料不得進價格判斷。"""
|
||
alert_tier = str(candidate.get("target_alert_tier") or "").strip()
|
||
match_type = str(candidate.get("target_match_type") or "").strip()
|
||
if alert_tier not in {"identity_review", "unit_price_review"} and match_type in {"", "no_match"}:
|
||
return None, "不是可人工確認候選"
|
||
|
||
momo_sku = str(candidate.get("product_id") or candidate.get("goodsCode") or candidate.get("id") or "").strip()
|
||
pchome_product_id = str(candidate.get("target_pchome_product_id") or "").strip()
|
||
momo_price = _to_float(candidate.get("price"))
|
||
if not momo_sku:
|
||
return None, "缺少 MOMO 商品 ID"
|
||
if not pchome_product_id:
|
||
return None, "缺少 PChome 商品 ID"
|
||
if not momo_price or momo_price <= 0:
|
||
return None, "缺少 MOMO 售價"
|
||
|
||
match_score = _quality_score_from_match(candidate.get("target_match_score"))
|
||
if match_score < 60:
|
||
return None, "人工確認候選分數過低"
|
||
|
||
unit_price_comparison = (
|
||
candidate.get("target_unit_price_comparison")
|
||
if isinstance(candidate.get("target_unit_price_comparison"), dict)
|
||
else {}
|
||
)
|
||
title = str(candidate.get("name") or candidate.get("title") or momo_sku).strip()
|
||
price_basis = str(candidate.get("target_price_basis") or "manual_review").strip()
|
||
raw_payload = {
|
||
"source": "pchome_targeted_momo_search",
|
||
"review_state": "needs_review",
|
||
"auto_compare_type": "manual_review",
|
||
"price_basis": price_basis,
|
||
"pchome_public_price": _to_float(candidate.get("target_pchome_price")),
|
||
"pchome_public_name": candidate.get("target_pchome_name"),
|
||
"match_score": candidate.get("target_match_score"),
|
||
"match_reasons": candidate.get("target_match_reasons") or [],
|
||
"comparison_mode": candidate.get("target_comparison_mode"),
|
||
"match_type": match_type,
|
||
"alert_tier": alert_tier,
|
||
"hard_veto": bool(candidate.get("target_hard_veto")),
|
||
"target_gap_pct": candidate.get("target_gap_pct"),
|
||
"unit_price_comparison": unit_price_comparison,
|
||
"search_term": candidate.get("target_search_term"),
|
||
"tags": [
|
||
"identity_v2",
|
||
"source_targeted_momo_search",
|
||
"needs_review",
|
||
f"review_{alert_tier or 'identity_review'}",
|
||
],
|
||
}
|
||
return {
|
||
"source_code": "momo_reference",
|
||
"platform_code": "momo",
|
||
"source_product_id": momo_sku,
|
||
"source_offer_key": f"momo_reference:{momo_sku}:{pchome_product_id}:needs_review",
|
||
"title": title or momo_sku,
|
||
"brand": candidate.get("brand"),
|
||
"category_text": candidate.get("category") or candidate.get("category_text"),
|
||
"product_url": candidate.get("product_url") or candidate.get("url"),
|
||
"image_url": candidate.get("image_url"),
|
||
"price": momo_price,
|
||
"original_price": _to_float(candidate.get("original_price")),
|
||
"currency": "TWD",
|
||
"stock_status": None,
|
||
"sold_count": None,
|
||
"rating": None,
|
||
"review_count": None,
|
||
"observed_at": observed_at,
|
||
"expires_at": None,
|
||
"ingestion_method": "targeted_momo_review",
|
||
"connector_key": "pchome_targeted_momo_search_review",
|
||
"pchome_product_id": pchome_product_id,
|
||
"momo_sku": momo_sku,
|
||
"match_status": "needs_review",
|
||
"quality_score": round(match_score, 2),
|
||
"data_quality_status": "needs_review",
|
||
"quality_notes_json": json.dumps(["候選已找到,需人工確認同款或色號"], ensure_ascii=False),
|
||
"raw_payload_json": json.dumps(raw_payload, ensure_ascii=False),
|
||
}, ""
|
||
|
||
|
||
def sync_targeted_momo_candidates_to_external_offers(
|
||
engine,
|
||
candidates: list[dict[str, Any]],
|
||
*,
|
||
dry_run: bool = False,
|
||
) -> dict[str, Any]:
|
||
"""把頁面自動找到的安全 MOMO 候選同步進 external_offers。
|
||
|
||
只接受 total_price 與 unit_price 自動候選;人工確認候選不寫入。
|
||
"""
|
||
generated_at = datetime.now().isoformat(timespec="seconds")
|
||
candidates = list(candidates or [])
|
||
required_tables = {"external_market_sources", "external_offers"}
|
||
|
||
with engine.begin() as conn:
|
||
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
|
||
if missing_tables:
|
||
return {
|
||
"success": False,
|
||
"status": "skipped",
|
||
"generated_at": generated_at,
|
||
"candidate_count": len(candidates),
|
||
"written_count": 0,
|
||
"dry_run": dry_run,
|
||
"message": "外部報價同步暫時無法執行,缺少必要資料表。",
|
||
"missing_tables": missing_tables,
|
||
}
|
||
|
||
_ensure_external_market_source_seeds(conn)
|
||
base_observed_at = datetime.now()
|
||
ranked_offers: list[tuple[dict[str, Any], tuple[float, float, float, float]]] = []
|
||
skipped_reasons: dict[str, int] = {}
|
||
for index, candidate in enumerate(candidates):
|
||
offer, reason = _targeted_candidate_to_external_offer(
|
||
candidate,
|
||
observed_at=base_observed_at + timedelta(microseconds=index),
|
||
)
|
||
if offer:
|
||
ranked_offers.append((offer, _targeted_candidate_sync_rank(candidate)))
|
||
else:
|
||
skipped_reasons[reason] = skipped_reasons.get(reason, 0) + 1
|
||
selected_by_pchome: dict[str, tuple[dict[str, Any], tuple[float, float, float, float]]] = {}
|
||
for offer, rank in ranked_offers:
|
||
key = str(offer.get("pchome_product_id") or offer.get("source_offer_key") or "").strip()
|
||
existing = selected_by_pchome.get(key)
|
||
if existing is None or rank > existing[1]:
|
||
selected_by_pchome[key] = (offer, rank)
|
||
offers = [offer for offer, _ in selected_by_pchome.values()]
|
||
|
||
if not dry_run:
|
||
for offer in offers:
|
||
_upsert_external_offer(conn, offer)
|
||
if offers:
|
||
mark_pchome_growth_cache_stale()
|
||
|
||
unit_count = sum(
|
||
1
|
||
for offer in offers
|
||
if _load_json_dict(offer.get("raw_payload_json")).get("price_basis") == "unit_price"
|
||
)
|
||
total_count = len(offers) - unit_count
|
||
return {
|
||
"success": True,
|
||
"status": "dry_run" if dry_run else "synced",
|
||
"generated_at": generated_at,
|
||
"candidate_count": len(candidates),
|
||
"written_count": 0 if dry_run else len(offers),
|
||
"dry_run": dry_run,
|
||
"source_code": "momo_reference",
|
||
"total_price_count": total_count,
|
||
"unit_price_count": unit_count,
|
||
"skipped_reasons": skipped_reasons,
|
||
"message": (
|
||
"已把自動比價候選同步到外部價格參考。"
|
||
if not dry_run
|
||
else "已完成自動比價候選同步預檢,尚未寫入資料。"
|
||
),
|
||
}
|
||
|
||
|
||
def sync_targeted_momo_review_candidates_to_external_offers(
|
||
engine,
|
||
candidates: list[dict[str, Any]],
|
||
*,
|
||
dry_run: bool = False,
|
||
) -> dict[str, Any]:
|
||
"""把待確認 MOMO 候選保存起來,供人工審核;不進價格告警。"""
|
||
generated_at = datetime.now().isoformat(timespec="seconds")
|
||
candidates = list(candidates or [])
|
||
required_tables = {"external_market_sources", "external_offers"}
|
||
|
||
with engine.begin() as conn:
|
||
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
|
||
if missing_tables:
|
||
return {
|
||
"success": False,
|
||
"status": "skipped",
|
||
"generated_at": generated_at,
|
||
"candidate_count": len(candidates),
|
||
"written_count": 0,
|
||
"dry_run": dry_run,
|
||
"message": "待確認候選暫時無法保存,缺少必要資料表。",
|
||
"missing_tables": missing_tables,
|
||
}
|
||
|
||
_ensure_external_market_source_seeds(conn)
|
||
base_observed_at = datetime.now()
|
||
ranked_offers: list[tuple[dict[str, Any], tuple[float, float, float, float]]] = []
|
||
skipped_reasons: dict[str, int] = {}
|
||
for index, candidate in enumerate(candidates):
|
||
offer, reason = _targeted_review_candidate_to_external_offer(
|
||
candidate,
|
||
observed_at=base_observed_at + timedelta(microseconds=index),
|
||
)
|
||
if offer:
|
||
ranked_offers.append((offer, _targeted_candidate_sync_rank(candidate)))
|
||
else:
|
||
skipped_reasons[reason] = skipped_reasons.get(reason, 0) + 1
|
||
|
||
selected_by_pchome: dict[str, tuple[dict[str, Any], tuple[float, float, float, float]]] = {}
|
||
for offer, rank in ranked_offers:
|
||
key = str(offer.get("pchome_product_id") or offer.get("source_offer_key") or "").strip()
|
||
existing = selected_by_pchome.get(key)
|
||
if existing is None or rank > existing[1]:
|
||
selected_by_pchome[key] = (offer, rank)
|
||
offers = [offer for offer, _ in selected_by_pchome.values()]
|
||
|
||
if not dry_run:
|
||
for offer in offers:
|
||
_upsert_external_offer(conn, offer)
|
||
if offers:
|
||
mark_pchome_growth_cache_stale()
|
||
|
||
return {
|
||
"success": True,
|
||
"status": "dry_run" if dry_run else "synced",
|
||
"generated_at": generated_at,
|
||
"candidate_count": len(candidates),
|
||
"written_count": 0 if dry_run else len(offers),
|
||
"dry_run": dry_run,
|
||
"source_code": "momo_reference",
|
||
"skipped_reasons": skipped_reasons,
|
||
"message": (
|
||
"已保存待人工確認的 MOMO 候選。"
|
||
if not dry_run
|
||
else "已完成待確認候選預檢,尚未寫入資料。"
|
||
),
|
||
}
|
||
|
||
|
||
def sync_legacy_momo_reference_offers(engine, *, limit: int = 500, dry_run: bool = False) -> dict[str, Any]:
|
||
"""把既有已確認同款的比價快取自動同步到 external_offers。"""
|
||
limit = max(1, min(int(limit or 500), 5000))
|
||
generated_at = datetime.now().isoformat(timespec="seconds")
|
||
required_tables = {"external_market_sources", "external_offers", "competitor_prices", "products", "price_records"}
|
||
|
||
with engine.begin() as conn:
|
||
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
|
||
if missing_tables:
|
||
return {
|
||
"success": False,
|
||
"status": "skipped",
|
||
"generated_at": generated_at,
|
||
"candidate_count": 0,
|
||
"written_count": 0,
|
||
"dry_run": dry_run,
|
||
"message": "外部報價同步暫時無法執行,缺少必要資料表。",
|
||
"missing_tables": missing_tables,
|
||
}
|
||
|
||
_ensure_external_market_source_seeds(conn)
|
||
rows = _fetch_legacy_momo_reference_rows(conn, limit)
|
||
offers = [_legacy_row_to_external_offer(row) for row in rows]
|
||
offers = [
|
||
offer for offer in offers
|
||
if offer["source_product_id"] and offer["pchome_product_id"] and offer["price"]
|
||
]
|
||
|
||
if not dry_run:
|
||
for offer in offers:
|
||
_upsert_external_offer(conn, offer)
|
||
if offers:
|
||
mark_pchome_growth_cache_stale()
|
||
|
||
return {
|
||
"success": True,
|
||
"status": "dry_run" if dry_run else "synced",
|
||
"generated_at": generated_at,
|
||
"candidate_count": len(rows),
|
||
"written_count": 0 if dry_run else len(offers),
|
||
"dry_run": dry_run,
|
||
"source_code": "momo_reference",
|
||
"message": (
|
||
"已完成 MOMO 外部價格參考自動同步。"
|
||
if not dry_run
|
||
else "已完成 MOMO 外部價格參考同步預檢,尚未寫入資料。"
|
||
),
|
||
"sample_rows": [
|
||
{
|
||
"momo_sku": offer["momo_sku"],
|
||
"pchome_product_id": offer["pchome_product_id"],
|
||
"price": offer["price"],
|
||
"quality_score": offer["quality_score"],
|
||
}
|
||
for offer in offers[:5]
|
||
],
|
||
}
|
||
|
||
|
||
def _normalize_header(header: str) -> str:
|
||
cleaned = str(header or "").strip().replace("\ufeff", "")
|
||
for canonical, aliases in CSV_HEADER_ALIASES.items():
|
||
if cleaned in aliases:
|
||
return canonical
|
||
return cleaned
|
||
|
||
|
||
def _read_csv_rows(csv_text: str, limit: int) -> tuple[list[dict[str, Any]], list[str]]:
|
||
text_value = (csv_text or "").strip("\ufeff\n\r ")
|
||
if not text_value:
|
||
return [], ["CSV 內容是空的"]
|
||
|
||
sample = text_value[:4096]
|
||
try:
|
||
dialect = csv.Sniffer().sniff(sample, delimiters=",\t;")
|
||
except csv.Error:
|
||
dialect = csv.excel
|
||
|
||
reader = csv.DictReader(io.StringIO(text_value), dialect=dialect)
|
||
if not reader.fieldnames:
|
||
return [], ["找不到表頭列"]
|
||
|
||
raw_headers = [str(header or "").strip().replace("\ufeff", "") for header in reader.fieldnames]
|
||
normalized_headers = [_normalize_header(header) for header in raw_headers]
|
||
if len(set(normalized_headers)) != len(normalized_headers):
|
||
return [], ["表頭有重複欄位,請先合併或重新命名"]
|
||
|
||
rows = []
|
||
for index, raw_row in enumerate(reader, start=2):
|
||
if len(rows) >= limit:
|
||
break
|
||
normalized = {}
|
||
has_value = False
|
||
for raw_header, normalized_header in zip(raw_headers, normalized_headers):
|
||
value = raw_row.get(raw_header)
|
||
if value is not None and str(value).strip():
|
||
has_value = True
|
||
normalized[normalized_header] = str(value or "").strip()
|
||
if has_value:
|
||
normalized["_row_number"] = index
|
||
rows.append(normalized)
|
||
|
||
return rows, []
|
||
|
||
|
||
def _classify_offer_record(record: ExternalOfferPayload | None, errors: list[str]) -> dict[str, Any]:
|
||
if errors or record is None:
|
||
return {
|
||
"status_code": "blocked",
|
||
"status_label": "不能使用",
|
||
"can_enter_alerts": False,
|
||
"reasons": errors or ["資料格式需要修正"],
|
||
}
|
||
|
||
reasons: list[str] = []
|
||
source_code = record.source_code
|
||
match_status = (record.match_status or "").strip().lower()
|
||
is_verified_match = match_status in {"verified", "usable", "reviewed", "exact", "confirmed"}
|
||
has_pchome_id = bool(str(record.pchome_product_id or "").strip())
|
||
has_good_quality = record.quality_score >= 76
|
||
|
||
if source_code not in ALLOWED_SOURCE_CODES:
|
||
reasons.append("資料來源不在允許清單")
|
||
if source_code in PAUSED_SOURCE_CODES:
|
||
reasons.append("這個來源目前先暫停,不進告警")
|
||
if not is_verified_match:
|
||
reasons.append("尚未確認同款")
|
||
if not has_pchome_id:
|
||
reasons.append("缺少 PChome 商品 ID,無法連到業績")
|
||
if not has_good_quality:
|
||
reasons.append("資料可信度低於 76")
|
||
|
||
can_use = (
|
||
source_code in ACTIVE_SOURCE_CODES
|
||
and is_verified_match
|
||
and has_pchome_id
|
||
and has_good_quality
|
||
and not reasons
|
||
)
|
||
if can_use:
|
||
return {
|
||
"status_code": "ready",
|
||
"status_label": "可使用",
|
||
"can_enter_alerts": True,
|
||
"reasons": ["可進作戰清單"],
|
||
}
|
||
|
||
return {
|
||
"status_code": "review",
|
||
"status_label": "需人工確認",
|
||
"can_enter_alerts": False,
|
||
"reasons": reasons or ["需要人工確認"],
|
||
}
|
||
|
||
|
||
def dry_run_external_offer_csv(csv_text: str, *, limit: int = 200) -> dict[str, Any]:
|
||
"""檢查手動 CSV 是否能轉成外部報價格式;只讀,不寫 DB。"""
|
||
limit = max(1, min(int(limit or 200), 1000))
|
||
rows, parse_errors = _read_csv_rows(csv_text, limit=limit)
|
||
if parse_errors:
|
||
return {
|
||
"success": False,
|
||
"message": "CSV 預檢失敗,請先修正檔案格式。",
|
||
"summary": {
|
||
"total_rows": 0,
|
||
"ready_count": 0,
|
||
"review_count": 0,
|
||
"blocked_count": 0,
|
||
},
|
||
"errors": parse_errors,
|
||
"rows": [],
|
||
}
|
||
|
||
checked_rows = []
|
||
summary = {
|
||
"total_rows": len(rows),
|
||
"ready_count": 0,
|
||
"review_count": 0,
|
||
"blocked_count": 0,
|
||
}
|
||
for row in rows:
|
||
record, errors = normalize_external_offer_payload(row)
|
||
classification = _classify_offer_record(record, errors)
|
||
summary[f"{classification['status_code']}_count"] += 1
|
||
preview = record.to_record() if record else {}
|
||
checked_rows.append({
|
||
"row_number": row.get("_row_number"),
|
||
"status_code": classification["status_code"],
|
||
"status_label": classification["status_label"],
|
||
"can_enter_alerts": classification["can_enter_alerts"],
|
||
"reasons": classification["reasons"][:4],
|
||
"source_code": preview.get("source_code") or row.get("source_code") or "",
|
||
"source_product_id": preview.get("source_product_id") or row.get("source_product_id") or "",
|
||
"title": preview.get("title") or row.get("title") or "",
|
||
"price": preview.get("price"),
|
||
"pchome_product_id": preview.get("pchome_product_id") or "",
|
||
"quality_score": preview.get("quality_score") if preview else row.get("quality_score"),
|
||
})
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "CSV 預檢完成,尚未寫入資料。",
|
||
"summary": summary,
|
||
"errors": [],
|
||
"rows": checked_rows,
|
||
"manual_csv": build_connector_contracts()["manual_csv"],
|
||
}
|
||
|
||
|
||
def _legacy_momo_reference_stats(conn) -> dict[str, Any]:
|
||
if not _has_table(conn, "competitor_prices"):
|
||
return {"usable_offer_count": 0, "last_seen_at": None}
|
||
|
||
if conn.dialect.name == "postgresql":
|
||
sql = """
|
||
SELECT COUNT(*) AS usable_offer_count, MAX(crawled_at) AS last_seen_at
|
||
FROM competitor_prices
|
||
WHERE source = 'pchome'
|
||
AND competitor_product_id IS NOT NULL
|
||
AND price IS NOT NULL
|
||
AND price > 0
|
||
AND COALESCE(match_score, 0) >= 0.76
|
||
AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
|
||
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
|
||
"""
|
||
else:
|
||
sql = """
|
||
SELECT COUNT(*) AS usable_offer_count, MAX(crawled_at) AS last_seen_at
|
||
FROM competitor_prices
|
||
WHERE source = 'pchome'
|
||
AND competitor_product_id IS NOT NULL
|
||
AND price IS NOT NULL
|
||
AND price > 0
|
||
AND COALESCE(match_score, 0) >= 0.76
|
||
AND COALESCE(tags, '') LIKE '%identity_v2%'
|
||
"""
|
||
row = conn.execute(text(sql)).mappings().first() or {}
|
||
return {
|
||
"usable_offer_count": int(row.get("usable_offer_count") or 0),
|
||
"last_seen_at": str(row.get("last_seen_at") or "") or None,
|
||
}
|
||
|
||
|
||
def _normalized_offer_stats(conn) -> dict[str, dict[str, Any]]:
|
||
if not all(_has_table(conn, table) for table in {"external_market_sources", "external_offers"}):
|
||
return {}
|
||
|
||
rows = conn.execute(text("""
|
||
SELECT
|
||
s.code,
|
||
s.status,
|
||
s.enabled,
|
||
COUNT(o.id) AS offer_count,
|
||
SUM(CASE
|
||
WHEN o.data_quality_status IN ('verified', 'usable', 'reviewed')
|
||
AND COALESCE(o.quality_score, 0) >= 76
|
||
THEN 1 ELSE 0 END
|
||
) AS usable_offer_count,
|
||
SUM(CASE
|
||
WHEN o.match_status = 'needs_review'
|
||
OR o.data_quality_status = 'needs_review'
|
||
THEN 1 ELSE 0 END
|
||
) AS review_offer_count,
|
||
MAX(o.observed_at) AS last_seen_at
|
||
FROM external_market_sources s
|
||
LEFT JOIN external_offers o ON o.source_code = s.code
|
||
GROUP BY s.code, s.status, s.enabled
|
||
""")).mappings().all()
|
||
|
||
return {
|
||
str(row["code"]): {
|
||
"status": row.get("status"),
|
||
"enabled": bool(row.get("enabled")),
|
||
"offer_count": int(row.get("offer_count") or 0),
|
||
"usable_offer_count": int(row.get("usable_offer_count") or 0),
|
||
"review_offer_count": int(row.get("review_offer_count") or 0),
|
||
"last_seen_at": str(row.get("last_seen_at") or "") or None,
|
||
}
|
||
for row in rows
|
||
}
|
||
|
||
|
||
def build_connector_contracts() -> dict[str, Any]:
|
||
"""回傳 connector 與手動 CSV 共同遵守的欄位規格。"""
|
||
return {
|
||
"success": True,
|
||
"version": "2026-06-15",
|
||
"plain_summary": "所有外部市場資料都先轉成同一份商品報價格式,再進作戰清單。",
|
||
"sources": SOURCE_CONTRACTS,
|
||
"normalized_offer_fields": NORMALIZED_OFFER_FIELDS,
|
||
"manual_csv": {
|
||
"encoding": "utf-8-sig",
|
||
"required_headers": [
|
||
field["name"] for field in NORMALIZED_OFFER_FIELDS if field["required"]
|
||
],
|
||
"optional_headers": [
|
||
field["name"] for field in NORMALIZED_OFFER_FIELDS if not field["required"]
|
||
],
|
||
"plain_rule": "低可信度或未確認同款的資料只進待補資料清單,不自動發告警。",
|
||
},
|
||
}
|
||
|
||
|
||
def build_external_source_readiness(engine=None) -> dict[str, Any]:
|
||
"""建立畫面可用的外部資料來源狀態。"""
|
||
sources = [dict(source) for source in SOURCE_CONTRACTS]
|
||
normalized_stats: dict[str, dict[str, Any]] = {}
|
||
legacy_stats: dict[str, Any] = {"usable_offer_count": 0, "last_seen_at": None}
|
||
schema_ready = False
|
||
|
||
if engine is not None:
|
||
try:
|
||
with engine.connect() as conn:
|
||
schema_ready = all(
|
||
_has_table(conn, table)
|
||
for table in {"external_market_sources", "external_offers"}
|
||
)
|
||
normalized_stats = _normalized_offer_stats(conn)
|
||
legacy_stats = _legacy_momo_reference_stats(conn)
|
||
except Exception:
|
||
logger.warning("[ExternalMarket] source readiness failed", exc_info=True)
|
||
|
||
for source in sources:
|
||
stats = normalized_stats.get(source["code"], {})
|
||
if source["code"] == "momo_reference":
|
||
usable = max(
|
||
int(stats.get("usable_offer_count") or 0),
|
||
int(legacy_stats.get("usable_offer_count") or 0),
|
||
)
|
||
last_seen_at = stats.get("last_seen_at") or legacy_stats.get("last_seen_at")
|
||
else:
|
||
usable = int(stats.get("usable_offer_count") or 0)
|
||
last_seen_at = stats.get("last_seen_at")
|
||
|
||
source["schema_ready"] = schema_ready
|
||
source["usable_offer_count"] = usable
|
||
source["review_offer_count"] = int(stats.get("review_offer_count") or 0)
|
||
source["last_seen_at"] = last_seen_at
|
||
source["can_alert"] = source["status_code"] == "active" and usable > 0
|
||
if source["status_code"] == "active":
|
||
if usable:
|
||
source["plain_state"] = "已接入,可進作戰清單"
|
||
elif source["review_offer_count"]:
|
||
source["plain_state"] = "已有待確認候選"
|
||
else:
|
||
source["plain_state"] = "已接入,等待可用資料"
|
||
else:
|
||
source["plain_state"] = "先保留接口,不進告警"
|
||
|
||
active_count = sum(1 for source in sources if source["status_code"] == "active")
|
||
paused_count = sum(1 for source in sources if source["status_code"] == "paused")
|
||
usable_count = sum(int(source["usable_offer_count"]) for source in sources)
|
||
review_offer_count = sum(int(source.get("review_offer_count") or 0) for source in sources)
|
||
|
||
return {
|
||
"success": True,
|
||
"schema_ready": schema_ready,
|
||
"active_count": active_count,
|
||
"paused_count": paused_count,
|
||
"usable_offer_count": usable_count,
|
||
"review_offer_count": review_offer_count,
|
||
"sources": sources,
|
||
"connector_contract": build_connector_contracts(),
|
||
"plain_summary": "MOMO 先用;其他主流平台已列管,未接合法穩定來源前不進告警。",
|
||
}
|
||
|
||
|
||
def list_momo_review_candidates(engine, *, limit: int = 20) -> dict[str, Any]:
|
||
"""列出待人工確認的 MOMO 候選,供前台直接處理。"""
|
||
limit = max(1, min(int(limit or 20), 50))
|
||
generated_at = datetime.now().isoformat(timespec="seconds")
|
||
required_tables = {"external_offers"}
|
||
|
||
with engine.connect() as conn:
|
||
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
|
||
if missing_tables:
|
||
return {
|
||
"success": False,
|
||
"generated_at": generated_at,
|
||
"rows": [],
|
||
"count": 0,
|
||
"missing_tables": missing_tables,
|
||
"message": "待確認候選暫時無法讀取,缺少必要資料表。",
|
||
}
|
||
|
||
rows = conn.execute(text("""
|
||
SELECT
|
||
id,
|
||
source_product_id,
|
||
title,
|
||
product_url,
|
||
image_url,
|
||
price,
|
||
pchome_product_id,
|
||
momo_sku,
|
||
match_status,
|
||
quality_score,
|
||
data_quality_status,
|
||
quality_notes_json,
|
||
raw_payload_json,
|
||
observed_at,
|
||
updated_at
|
||
FROM external_offers
|
||
WHERE source_code = 'momo_reference'
|
||
AND ingestion_method = 'targeted_momo_review'
|
||
AND (
|
||
match_status = 'needs_review'
|
||
OR data_quality_status = 'needs_review'
|
||
)
|
||
ORDER BY observed_at DESC, id DESC
|
||
LIMIT :limit
|
||
"""), {"limit": limit * 4}).mappings().all()
|
||
|
||
pchome_image_map = _fetch_pchome_public_image_map([
|
||
str(row.get("pchome_product_id") or "").strip()
|
||
for row in rows
|
||
])
|
||
|
||
seen: set[tuple[str, str]] = set()
|
||
items: list[dict[str, Any]] = []
|
||
for row in rows:
|
||
raw_payload = _load_json_dict(row.get("raw_payload_json"))
|
||
quality_notes = _load_json_list(row.get("quality_notes_json"))
|
||
key = (
|
||
str(row.get("pchome_product_id") or "").strip(),
|
||
str(row.get("source_product_id") or "").strip(),
|
||
)
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
|
||
reasons = [
|
||
str(reason)
|
||
for reason in (raw_payload.get("match_reasons") or [])
|
||
if str(reason or "").strip()
|
||
]
|
||
if not reasons:
|
||
reasons = [str(note) for note in quality_notes if str(note or "").strip()]
|
||
reason_labels = _humanize_review_reasons(reasons)
|
||
pchome_product_id = row.get("pchome_product_id")
|
||
momo_url = row.get("product_url")
|
||
pchome_price = _to_float(raw_payload.get("pchome_public_price"))
|
||
momo_price = _to_float(row.get("price"))
|
||
gap_pct = _to_float(raw_payload.get("target_gap_pct"))
|
||
pchome_image_url = (
|
||
raw_payload.get("pchome_image_url")
|
||
or raw_payload.get("pchome_public_image_url")
|
||
or pchome_image_map.get(str(pchome_product_id or "").strip())
|
||
or ""
|
||
)
|
||
momo_image_url = row.get("image_url")
|
||
|
||
items.append({
|
||
"id": int(row.get("id")),
|
||
"pchome_product_id": pchome_product_id,
|
||
"pchome_product_name": raw_payload.get("pchome_public_name") or "",
|
||
"pchome_url": _build_pchome_product_url(pchome_product_id),
|
||
"pchome_image_url": pchome_image_url,
|
||
"pchome_price": pchome_price,
|
||
"momo_sku": row.get("momo_sku") or row.get("source_product_id"),
|
||
"momo_title": row.get("title"),
|
||
"momo_price": momo_price,
|
||
"momo_url": momo_url,
|
||
"product_url": momo_url,
|
||
"image_url": momo_image_url,
|
||
"momo_image_url": momo_image_url,
|
||
"quality_score": round(_to_float(row.get("quality_score")) or 0.0, 2),
|
||
"alert_tier": raw_payload.get("alert_tier") or "identity_review",
|
||
"price_basis": raw_payload.get("price_basis") or "manual_review",
|
||
"gap_pct": gap_pct,
|
||
"match_reasons": reason_labels,
|
||
"match_reason_labels": reason_labels,
|
||
"reason_summary": "、".join(reason_labels),
|
||
"observed_at": str(row.get("observed_at") or ""),
|
||
"updated_at": str(row.get("updated_at") or ""),
|
||
"plain_status": "待確認同款或色號",
|
||
"suggested_next_action": "確認同款後才進入價格判斷;不是同款就排除。",
|
||
})
|
||
if len(items) >= limit:
|
||
break
|
||
|
||
return {
|
||
"success": True,
|
||
"generated_at": generated_at,
|
||
"rows": items,
|
||
"count": len(items),
|
||
"message": "已整理 MOMO 待確認候選。",
|
||
}
|
||
|
||
|
||
def update_momo_review_candidate(engine, offer_id: int, action: str, *, note: str = "") -> dict[str, Any]:
|
||
"""確認或排除一筆 MOMO 待確認候選。"""
|
||
try:
|
||
offer_id = int(offer_id)
|
||
except (TypeError, ValueError):
|
||
return {"success": False, "message": "缺少有效的候選編號。"}
|
||
action = str(action or "").strip().lower()
|
||
if action not in {"confirm", "reject"}:
|
||
return {"success": False, "message": "請選擇確認同款或排除候選。"}
|
||
|
||
generated_at = datetime.now().isoformat(timespec="seconds")
|
||
new_match_status = "verified" if action == "confirm" else "rejected"
|
||
new_quality_status = "verified" if action == "confirm" else "rejected"
|
||
label = "人工確認同款" if action == "confirm" else "人工排除候選"
|
||
review_note = str(note or "").strip()[:240]
|
||
|
||
with engine.begin() as conn:
|
||
if not _has_table(conn, "external_offers"):
|
||
return {
|
||
"success": False,
|
||
"generated_at": generated_at,
|
||
"message": "待確認候選暫時無法更新,缺少必要資料表。",
|
||
}
|
||
|
||
row = conn.execute(text("""
|
||
SELECT id, match_status, data_quality_status, quality_notes_json, raw_payload_json
|
||
FROM external_offers
|
||
WHERE id = :offer_id
|
||
AND source_code = 'momo_reference'
|
||
AND ingestion_method = 'targeted_momo_review'
|
||
LIMIT 1
|
||
"""), {"offer_id": offer_id}).mappings().first()
|
||
if not row:
|
||
return {
|
||
"success": False,
|
||
"generated_at": generated_at,
|
||
"message": "找不到這筆待確認候選。",
|
||
}
|
||
|
||
raw_payload = _load_json_dict(row.get("raw_payload_json"))
|
||
raw_payload["review_state"] = new_match_status
|
||
raw_payload["reviewed_at"] = generated_at
|
||
raw_payload["review_action"] = action
|
||
if review_note:
|
||
raw_payload["review_note"] = review_note
|
||
tags = raw_payload.get("tags") if isinstance(raw_payload.get("tags"), list) else []
|
||
tag_to_add = "manual_verified" if action == "confirm" else "manual_rejected"
|
||
raw_payload["tags"] = [*tags, tag_to_add] if tag_to_add not in tags else tags
|
||
|
||
notes = [str(item) for item in _load_json_list(row.get("quality_notes_json")) if str(item or "").strip()]
|
||
notes.append(label if not review_note else f"{label}:{review_note}")
|
||
|
||
conn.execute(text("""
|
||
UPDATE external_offers
|
||
SET match_status = :match_status,
|
||
data_quality_status = :data_quality_status,
|
||
quality_notes_json = :quality_notes_json,
|
||
raw_payload_json = :raw_payload_json,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
WHERE id = :offer_id
|
||
"""), {
|
||
"offer_id": offer_id,
|
||
"match_status": new_match_status,
|
||
"data_quality_status": new_quality_status,
|
||
"quality_notes_json": json.dumps(notes[-6:], ensure_ascii=False),
|
||
"raw_payload_json": json.dumps(raw_payload, ensure_ascii=False),
|
||
})
|
||
mark_pchome_growth_cache_stale()
|
||
|
||
return {
|
||
"success": True,
|
||
"generated_at": generated_at,
|
||
"id": offer_id,
|
||
"action": action,
|
||
"match_status": new_match_status,
|
||
"data_quality_status": new_quality_status,
|
||
"message": "已確認同款,會進入作戰清單。" if action == "confirm" else "已排除候選,不會再進入待確認。",
|
||
}
|