Files
ewoooc/services/external_market_offer_service.py

1886 lines
74 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""外部市場來源與報價的正規化服務。
第一版只做資料規格、來源狀態與只讀統計,不主動抓資料、不寫 DB。
"""
from __future__ import annotations
import json
import logging
import csv
import io
import os
import re
from urllib.parse import quote
from urllib.request import Request, urlopen
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any
from sqlalchemy import inspect, text
from services.pchome_growth_cache_state import mark_pchome_growth_cache_stale
logger = logging.getLogger(__name__)
PCHOME_PUBLIC_PRODUCT_API = "https://ecapi.pchome.com.tw/ecshop/prodapi/v2/prod"
PCHOME_IMAGE_CDN_BASE = "https://cs-a.ecimg.tw"
PCHOME_PRODUCT_ID_RE = re.compile(r"^[A-Z0-9]{6}-[A-Z0-9]{9}-[0-9]{3}$")
SOURCE_CONTRACTS = [
{
"code": "momo_reference",
"display_name": "MOMO 外部價格參考",
"platform_code": "momo",
"status_code": "active",
"status_label": "正在使用",
"source_kind": "legacy_bridge",
"input_methods": ["既有比價快取", "手動 CSV", "供應商 API"],
"data_quality_label": "只採用已確認同款",
"plain_note": "目前用已確認同款的 MOMO 參考價,協助判斷 PChome 商品是否需要調整售價或曝光。",
},
{
"code": "shopee",
"display_name": "蝦皮",
"platform_code": "shopee",
"status_code": "paused",
"status_label": "先暫停",
"source_kind": "connector_contract",
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "先保留資料接口,等有穩定合法來源後再啟用,不會影響目前作戰清單。",
},
{
"code": "lazada",
"display_name": "Lazada",
"platform_code": "lazada",
"status_code": "paused",
"status_label": "待接入",
"source_kind": "connector_contract",
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "先建立跨境平台資料欄位,等合法穩定來源後才啟用價格與活動監控。",
},
{
"code": "amazon",
"display_name": "Amazon",
"platform_code": "amazon",
"status_code": "paused",
"status_label": "待接入",
"source_kind": "connector_contract",
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "保留 SKU / ASIN / Marketplace 維度,未接正式來源前不參與自動判斷。",
},
{
"code": "google_merchant",
"display_name": "Google Merchant / Shopping",
"platform_code": "google_merchant",
"status_code": "paused",
"status_label": "待接入",
"source_kind": "connector_contract",
"input_methods": ["官方報表", "官方 API", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "用於價格競爭力、商品資料完整度與成效 benchmark待正式資料源接入。",
},
{
"code": "tiktok_shop",
"display_name": "TikTok Shop",
"platform_code": "tiktok_shop",
"status_code": "paused",
"status_label": "待接入",
"source_kind": "connector_contract",
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "保留短影音電商的商品、促銷與內容導流欄位;未接合法穩定來源前不參與自動判斷。",
},
{
"code": "line_shopping",
"display_name": "LINE 購物",
"platform_code": "line_shopping",
"status_code": "paused",
"status_label": "待接入",
"source_kind": "connector_contract",
"input_methods": ["官方報表", "供應商 API", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "保留本地導購、點數回饋與活動檔期訊號;接入前只列為待補來源。",
},
{
"code": "rakuten",
"display_name": "Rakuten",
"platform_code": "rakuten",
"status_code": "paused",
"status_label": "待接入",
"source_kind": "connector_contract",
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "保留日本與海外平台比價欄位,未確認來源前只列入待接入清單。",
},
{
"code": "yahoo_shopping",
"display_name": "Yahoo 購物",
"platform_code": "yahoo_shopping",
"status_code": "paused",
"status_label": "待接入",
"source_kind": "connector_contract",
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "保留本地平台促銷與價格監控欄位,接入前不影響現有作戰清單。",
},
{
"code": "ruten",
"display_name": "露天",
"platform_code": "ruten",
"status_code": "paused",
"status_label": "待接入",
"source_kind": "connector_contract",
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "保留拍賣與長尾賣場價格訊號;正式來源未確認前不進入自動告警。",
},
{
"code": "shopify_brand_store",
"display_name": "品牌官網 / Shopify",
"platform_code": "shopify_brand_store",
"status_code": "paused",
"status_label": "待接入",
"source_kind": "connector_contract",
"input_methods": ["官方 API", "商品 Feed", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "保留品牌官網售價、組合與活動檔期,用來和 PChome 主推品對照。",
},
{
"code": "meta_commerce",
"display_name": "Meta Commerce",
"platform_code": "meta_commerce",
"status_code": "paused",
"status_label": "待接入",
"source_kind": "connector_contract",
"input_methods": ["官方 Catalog API", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "保留廣告商品目錄與商品連結欄位,用於後續曝光與導流成效對照。",
},
{
"code": "coupang",
"display_name": "酷澎",
"platform_code": "coupang",
"status_code": "paused",
"status_label": "先暫停",
"source_kind": "connector_contract",
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
"data_quality_label": "暫不進告警",
"plain_note": "先保留資料接口,等有穩定合法來源後再啟用,不會影響目前作戰清單。",
},
]
NORMALIZED_OFFER_FIELDS = [
{
"name": "source_code",
"label": "資料來源",
"required": True,
"plain_note": "例如 momo_reference、shopee、coupang。",
},
{
"name": "source_product_id",
"label": "外部商品 ID",
"required": True,
"plain_note": "外部平台或資料供應商給的商品編號。",
},
{
"name": "title",
"label": "商品名稱",
"required": True,
"plain_note": "用來做人工確認與名稱比對。",
},
{
"name": "price",
"label": "售價",
"required": True,
"plain_note": "只填可直接比較的成交或頁面售價。",
},
{
"name": "observed_at",
"label": "資料時間",
"required": True,
"plain_note": "這筆價格看到的時間。",
},
{
"name": "ingestion_method",
"label": "取得方式",
"required": True,
"plain_note": "official_api、provider_api、manual_csv 或 legacy_competitor_cache。",
},
{
"name": "pchome_product_id",
"label": "PChome 商品 ID",
"required": False,
"plain_note": "若已確認同款才填,未確認就留空。",
},
{
"name": "quality_score",
"label": "資料可信度",
"required": False,
"plain_note": "0 到 100低於 76 不進自動告警。",
},
]
CSV_HEADER_ALIASES = {
"source_code": {"source_code", "資料來源", "來源", "平台來源"},
"platform_code": {"platform_code", "平台", "平台代碼"},
"source_product_id": {
"source_product_id",
"外部商品 ID",
"外部商品ID",
"平台商品編號",
"商品ID",
"商品編號",
},
"title": {"title", "商品名稱", "品名", "name"},
"price": {"price", "售價", "價格", "成交價"},
"observed_at": {"observed_at", "資料時間", "抓取時間", "看到時間", "時間"},
"ingestion_method": {"ingestion_method", "取得方式", "匯入方式", "來源方式"},
"currency": {"currency", "幣別"},
"original_price": {"original_price", "原價", "牌價"},
"product_url": {"product_url", "商品網址", "網址", "url"},
"brand": {"brand", "品牌"},
"category_text": {"category_text", "分類", "類別"},
"pchome_product_id": {
"pchome_product_id",
"PChome 商品 ID",
"PChome商品ID",
"PChome商品編號",
"pchome_id",
},
"momo_sku": {"momo_sku", "MOMO SKU", "momo_sku", "momo_i_code"},
"match_status": {"match_status", "同款狀態", "比對狀態", "是否同款"},
"quality_score": {"quality_score", "資料可信度", "可信度", "品質分數"},
"data_quality_status": {"data_quality_status", "資料狀態", "品質狀態"},
"quality_note": {"quality_note", "備註", "品質備註"},
}
ALLOWED_SOURCE_CODES = {source["code"] for source in SOURCE_CONTRACTS}
PAUSED_SOURCE_CODES = {
source["code"] for source in SOURCE_CONTRACTS if source["status_code"] == "paused"
}
ACTIVE_SOURCE_CODES = {
source["code"] for source in SOURCE_CONTRACTS if source["status_code"] == "active"
}
@dataclass(frozen=True)
class ExternalOfferPayload:
source_code: str
platform_code: str
source_product_id: str
title: str
price: float | None
observed_at: str
ingestion_method: str
currency: str = "TWD"
original_price: float | None = None
product_url: str | None = None
brand: str | None = None
category_text: str | None = None
pchome_product_id: str | None = None
momo_sku: str | None = None
match_status: str = "unmatched"
quality_score: float = 0.0
data_quality_status: str = "needs_review"
quality_notes: list[str] = field(default_factory=list)
def to_record(self) -> dict[str, Any]:
return {
"source_code": self.source_code,
"platform_code": self.platform_code,
"source_product_id": self.source_product_id,
"source_offer_key": f"{self.source_code}:{self.source_product_id}",
"title": self.title,
"price": self.price,
"currency": self.currency or "TWD",
"original_price": self.original_price,
"product_url": self.product_url,
"brand": self.brand,
"category_text": self.category_text,
"observed_at": self.observed_at,
"ingestion_method": self.ingestion_method,
"pchome_product_id": self.pchome_product_id,
"momo_sku": self.momo_sku,
"match_status": self.match_status,
"quality_score": self.quality_score,
"data_quality_status": self.data_quality_status,
"quality_notes_json": json.dumps(self.quality_notes, ensure_ascii=False),
}
def _to_float(value: Any) -> float | None:
if value is None or value == "":
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _load_json_list(value: Any) -> list[Any]:
if not value:
return []
if isinstance(value, list):
return value
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, list) else []
except Exception:
return []
def _load_json_dict(value: Any) -> dict[str, Any]:
if not value:
return {}
if isinstance(value, dict):
return value
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, dict) else {}
except Exception:
return {}
def _looks_like_pchome_product_id(product_id: Any) -> bool:
return bool(PCHOME_PRODUCT_ID_RE.match(str(product_id or "").strip()))
def _absolute_pchome_image_url(path: Any) -> str:
image_path = str(path or "").strip()
if not image_path:
return ""
if image_path.startswith("http://") or image_path.startswith("https://"):
return image_path
if not image_path.startswith("/"):
image_path = f"/{image_path}"
return f"{PCHOME_IMAGE_CDN_BASE}{image_path}"
def _pchome_api_timeout_seconds() -> float:
try:
return max(1.0, min(float(os.getenv("PCHOME_PUBLIC_API_TIMEOUT_SECONDS", "4")), 10.0))
except (TypeError, ValueError):
return 4.0
def _parse_pchome_jsonp(payload: str) -> dict[str, Any]:
match = re.search(r"jsonp\((.*)\);\s*}\s*catch", payload or "", re.DOTALL)
if not match:
match = re.search(r"jsonp\((.*)\);", payload or "", re.DOTALL)
if not match:
return {}
parsed = json.loads(match.group(1))
return parsed if isinstance(parsed, dict) else {}
def _fetch_pchome_public_image_map(product_ids: list[str]) -> dict[str, str]:
"""Fetch PChome product images from the public product API without blocking the list."""
ids = [
str(product_id or "").strip()
for product_id in product_ids
if _looks_like_pchome_product_id(product_id)
]
ids = list(dict.fromkeys(ids))
if not ids:
return {}
url = (
f"{PCHOME_PUBLIC_PRODUCT_API}"
f"?id={quote(','.join(ids), safe=',')}"
"&fields=Id,Name,Nick,Pic,Price"
"&_callback=jsonp"
)
try:
request = Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(request, timeout=_pchome_api_timeout_seconds()) as response:
raw = response.read(512000).decode("utf-8", "ignore")
products = _parse_pchome_jsonp(raw)
except Exception as exc:
logger.warning("[ExternalOffer] PChome image API failed: %s", exc)
return {}
image_map: dict[str, str] = {}
for product_id, product in products.items():
if not isinstance(product, dict):
continue
pic = product.get("Pic") if isinstance(product.get("Pic"), dict) else {}
image_url = _absolute_pchome_image_url(pic.get("B") or pic.get("S") or pic.get("W"))
if image_url:
image_map[str(product_id)] = image_url
return image_map
_PCHOME_PRODUCT_URL_BASE = "https://24h.pchome.com.tw/prod/"
_REVIEW_REASON_LABELS = {
"makeup_catalog_selection_gap": "色號或款式需確認",
"variant_selection_review": "款式、色號或組合需確認",
"focused_exact_identity_ysl_blush_catalog": "品名接近,請確認色號",
"strong_product_line_match": "商品系列接近",
"strong_exact_spec_match": "規格看起來接近",
"identity_review": "同款證據待確認",
"manual_review": "需要人工比對賣場",
"unit_price_review": "容量或單位價需確認",
"unit_price_gap": "容量或單位價需確認",
"catalog_selection_gap": "任選或型錄款需確認",
"commercial_condition_gap": "活動條件或組合內容需確認",
"bundle_review": "組合件數需確認",
"count_review": "件數需確認",
}
def _build_pchome_product_url(product_id: Any) -> str | None:
product_id = str(product_id or "").strip()
if not product_id:
return None
if product_id.startswith(("http://", "https://")):
return product_id
return f"{_PCHOME_PRODUCT_URL_BASE}{product_id}"
def _review_reason_label(reason: Any) -> str:
key = str(reason or "").strip()
if not key:
return ""
label = _REVIEW_REASON_LABELS.get(key)
if label:
return label
lowered = key.lower()
if any(token in lowered for token in ("variant", "catalog", "selection", "color", "shade")):
return "款式、色號或組合需確認"
if any(token in lowered for token in ("unit", "capacity", "spec")):
return "容量或規格需確認"
if any(token in lowered for token in ("bundle", "count", "set")):
return "組合或件數需確認"
if any(token in lowered for token in ("exact", "identity", "match")):
return "品名或規格接近,仍需人工確認"
if any(token in lowered for token in ("gap", "conflict", "condition")):
return "候選資訊有差異,請比對賣場"
return "候選需要人工確認"
def _humanize_review_reasons(reasons: list[Any]) -> list[str]:
labels: list[str] = []
for reason in reasons:
label = _review_reason_label(reason)
if label and label not in labels:
labels.append(label)
return labels[:3] or ["請比對兩個賣場的品名、容量、色號與組合"]
def _has_table(conn, table_name: str) -> bool:
try:
return inspect(conn).has_table(table_name)
except Exception:
logger.warning("[ExternalMarket] table probe failed: %s", table_name, exc_info=True)
return False
def _quality_score_from_match(value: Any) -> float:
score = _to_float(value)
if score is None:
return 0.0
if 0 <= score <= 1:
score *= 100
return max(0.0, min(100.0, score))
def _normalize_source_code(value: Any) -> str:
raw = str(value or "").strip()
lower = raw.lower()
mapping = {
"momo": "momo_reference",
"momo_reference": "momo_reference",
"momo 外部價格參考": "momo_reference",
"蝦皮": "shopee",
"shopee": "shopee",
"酷澎": "coupang",
"coupang": "coupang",
}
return mapping.get(lower, mapping.get(raw, raw))
def _normalize_ingestion_method(value: Any) -> str:
raw = str(value or "").strip()
lower = raw.lower()
mapping = {
"備用資料": "manual_csv",
"備援資料": "manual_csv",
"手動資料": "manual_csv",
"手動 csv": "manual_csv",
"手動CSV": "manual_csv",
"manual_csv": "manual_csv",
"官方 api": "official_api",
"官方API": "official_api",
"official_api": "official_api",
"供應商 api": "provider_api",
"供應商API": "provider_api",
"provider_api": "provider_api",
}
return mapping.get(lower, mapping.get(raw, raw))
def _normalize_match_status(value: Any) -> str:
raw = str(value or "").strip()
lower = raw.lower()
mapping = {
"": "verified",
"已確認": "verified",
"確認同款": "verified",
"同款": "verified",
"verified": "verified",
"": "unmatched",
"不是": "unmatched",
"非同款": "unmatched",
"待確認": "unmatched",
"未確認": "unmatched",
"unmatched": "unmatched",
}
return mapping.get(lower, mapping.get(raw, raw))
def normalize_external_offer_payload(payload: dict[str, Any]) -> tuple[ExternalOfferPayload | None, list[str]]:
"""把 official API / provider API / manual CSV 的資料轉成同一份欄位。"""
errors: list[str] = []
source_code = _normalize_source_code(payload.get("source_code"))
platform_code = str(payload.get("platform_code") or source_code or "").strip()
source_product_id = str(payload.get("source_product_id") or "").strip()
title = str(payload.get("title") or payload.get("name") or "").strip()
ingestion_method = _normalize_ingestion_method(payload.get("ingestion_method"))
observed_at = str(payload.get("observed_at") or "").strip()
price = _to_float(payload.get("price"))
required_values = {
"資料來源": source_code,
"外部商品 ID": source_product_id,
"商品名稱": title,
"售價": price,
"資料時間": observed_at,
"取得方式": ingestion_method,
}
for label, value in required_values.items():
if value is None or value == "":
errors.append(f"缺少{label}")
if price is not None and price <= 0:
errors.append("售價必須大於 0")
if observed_at:
try:
datetime.fromisoformat(observed_at.replace("Z", "+00:00"))
except ValueError:
errors.append("資料時間格式需為 ISO 格式,例如 2026-06-15T10:00:00")
if errors:
return None, errors
quality_score = _to_float(payload.get("quality_score"))
if quality_score is None:
quality_score = 0.0
quality_notes = _load_json_list(payload.get("quality_notes"))
if not quality_notes and payload.get("quality_note"):
quality_notes = [str(payload.get("quality_note"))]
record = ExternalOfferPayload(
source_code=source_code,
platform_code=platform_code,
source_product_id=source_product_id,
title=title,
price=price,
observed_at=observed_at,
ingestion_method=ingestion_method,
currency=str(payload.get("currency") or "TWD").strip() or "TWD",
original_price=_to_float(payload.get("original_price")),
product_url=payload.get("product_url"),
brand=payload.get("brand"),
category_text=payload.get("category_text"),
pchome_product_id=payload.get("pchome_product_id"),
momo_sku=payload.get("momo_sku"),
match_status=_normalize_match_status(payload.get("match_status") or "unmatched"),
quality_score=max(0.0, min(100.0, quality_score)),
data_quality_status=str(payload.get("data_quality_status") or "needs_review"),
quality_notes=[str(item) for item in quality_notes],
)
return record, []
def _ensure_external_market_source_seeds(conn) -> None:
if not _has_table(conn, "external_market_sources"):
return
for source in SOURCE_CONTRACTS:
payload = {
"code": source["code"],
"display_name": source["display_name"],
"platform_code": source["platform_code"],
"source_kind": source["source_kind"],
"status": source["status_code"],
"enabled": source["status_code"] == "active",
"write_enabled": False,
"allowed_input_methods_json": json.dumps(source["input_methods"], ensure_ascii=False),
"quality_policy_json": json.dumps({
"minimum_match_status": "verified",
"minimum_quality_score": 76,
}, ensure_ascii=False),
"plain_note": source["plain_note"],
}
if conn.dialect.name == "postgresql":
conn.execute(text("""
INSERT INTO external_market_sources (
code, display_name, platform_code, source_kind, status, enabled,
write_enabled, allowed_input_methods_json, quality_policy_json, plain_note
)
VALUES (
:code, :display_name, :platform_code, :source_kind, :status, :enabled,
:write_enabled, :allowed_input_methods_json, :quality_policy_json, :plain_note
)
ON CONFLICT (code) DO UPDATE SET
display_name = EXCLUDED.display_name,
platform_code = EXCLUDED.platform_code,
source_kind = EXCLUDED.source_kind,
status = EXCLUDED.status,
enabled = EXCLUDED.enabled,
allowed_input_methods_json = EXCLUDED.allowed_input_methods_json,
quality_policy_json = EXCLUDED.quality_policy_json,
plain_note = EXCLUDED.plain_note,
updated_at = NOW()
"""), payload)
else:
conn.execute(text("""
INSERT INTO external_market_sources (
code, display_name, platform_code, source_kind, status, enabled,
write_enabled, allowed_input_methods_json, quality_policy_json, plain_note
)
VALUES (
:code, :display_name, :platform_code, :source_kind, :status, :enabled,
:write_enabled, :allowed_input_methods_json, :quality_policy_json, :plain_note
)
ON CONFLICT (code) DO UPDATE SET
display_name = excluded.display_name,
platform_code = excluded.platform_code,
source_kind = excluded.source_kind,
status = excluded.status,
enabled = excluded.enabled,
allowed_input_methods_json = excluded.allowed_input_methods_json,
quality_policy_json = excluded.quality_policy_json,
plain_note = excluded.plain_note,
updated_at = CURRENT_TIMESTAMP
"""), payload)
def _fetch_legacy_momo_reference_rows(conn, limit: int) -> list[dict[str, Any]]:
if not all(_has_table(conn, table) for table in {"competitor_prices", "products", "price_records"}):
return []
if conn.dialect.name == "postgresql":
sql = """
WITH valid_cp AS (
SELECT DISTINCT ON (cp.sku, cp.competitor_product_id)
cp.sku AS momo_sku,
cp.competitor_product_id AS pchome_product_id,
cp.competitor_product_name AS pchome_product_name,
cp.price AS pchome_public_price,
cp.match_score,
cp.tags::text AS tags,
cp.crawled_at,
cp.expires_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND cp.sku IS NOT NULL
AND cp.competitor_product_id IS NOT NULL
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= 0.76
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.competitor_product_id, cp.crawled_at DESC NULLS LAST
)
SELECT
vc.*,
lm.momo_name,
lm.product_url,
lm.image_url,
lm.momo_price,
lm.momo_price_at
FROM valid_cp vc
JOIN LATERAL (
SELECT
p.name AS momo_name,
p.url AS product_url,
p.image_url AS image_url,
pr.price AS momo_price,
pr.timestamp AS momo_price_at
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.i_code = vc.momo_sku
AND p.status = 'ACTIVE'
AND pr.price IS NOT NULL
AND pr.price > 0
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) lm ON TRUE
ORDER BY COALESCE(lm.momo_price_at, vc.crawled_at) DESC NULLS LAST
LIMIT :limit
"""
else:
sql = """
WITH latest_cp AS (
SELECT
cp.sku AS momo_sku,
cp.competitor_product_id AS pchome_product_id,
cp.competitor_product_name AS pchome_product_name,
cp.price AS pchome_public_price,
cp.match_score,
cp.tags AS tags,
cp.crawled_at,
cp.expires_at,
ROW_NUMBER() OVER (
PARTITION BY cp.sku, cp.competitor_product_id
ORDER BY cp.crawled_at DESC
) AS rn
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND cp.sku IS NOT NULL
AND cp.competitor_product_id IS NOT NULL
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= 0.76
AND COALESCE(cp.tags, '') LIKE '%identity_v2%'
),
latest_momo AS (
SELECT
p.i_code AS momo_sku,
p.name AS momo_name,
p.url AS product_url,
p.image_url AS image_url,
pr.price AS momo_price,
pr.timestamp AS momo_price_at,
ROW_NUMBER() OVER (
PARTITION BY p.i_code
ORDER BY pr.timestamp DESC, pr.id DESC
) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
AND pr.price IS NOT NULL
AND pr.price > 0
)
SELECT
cp.momo_sku,
cp.pchome_product_id,
cp.pchome_product_name,
cp.match_score,
cp.tags,
cp.crawled_at,
cp.expires_at,
lm.momo_name,
lm.product_url,
lm.image_url,
lm.momo_price,
lm.momo_price_at
FROM latest_cp cp
JOIN latest_momo lm ON lm.momo_sku = cp.momo_sku AND lm.rn = 1
WHERE cp.rn = 1
ORDER BY COALESCE(lm.momo_price_at, cp.crawled_at) DESC
LIMIT :limit
"""
return [dict(row) for row in conn.execute(text(sql), {"limit": limit}).mappings().all()]
def _legacy_row_to_external_offer(row: dict[str, Any]) -> dict[str, Any]:
momo_sku = str(row.get("momo_sku") or "").strip()
pchome_product_id = str(row.get("pchome_product_id") or "").strip()
observed_at = row.get("momo_price_at") or row.get("crawled_at")
title = str(row.get("momo_name") or row.get("pchome_product_name") or momo_sku).strip()
quality_score = _quality_score_from_match(row.get("match_score"))
notes = [
"由已確認同款的舊比價快取自動同步",
"MOMO 最新價格作為外部參考價",
]
return {
"source_code": "momo_reference",
"platform_code": "momo",
"source_product_id": momo_sku,
"source_offer_key": f"momo_reference:{momo_sku}:{pchome_product_id}",
"title": title or momo_sku,
"brand": None,
"category_text": None,
"product_url": row.get("product_url"),
"image_url": row.get("image_url"),
"price": _to_float(row.get("momo_price")),
"original_price": None,
"currency": "TWD",
"stock_status": None,
"sold_count": None,
"rating": None,
"review_count": None,
"observed_at": observed_at,
"expires_at": row.get("expires_at"),
"ingestion_method": "legacy_competitor_cache",
"connector_key": "competitor_prices",
"pchome_product_id": pchome_product_id,
"momo_sku": momo_sku,
"match_status": "verified",
"quality_score": quality_score,
"data_quality_status": "verified" if quality_score >= 76 else "needs_review",
"quality_notes_json": json.dumps(notes, ensure_ascii=False),
"raw_payload_json": json.dumps({
"legacy_source": "competitor_prices",
"pchome_public_price": _to_float(row.get("pchome_public_price")),
"pchome_public_name": row.get("pchome_product_name"),
"match_score": str(row.get("match_score") or ""),
"tags": row.get("tags"),
"crawled_at": str(row.get("crawled_at") or ""),
"momo_price_at": str(row.get("momo_price_at") or ""),
}, ensure_ascii=False),
}
def _upsert_external_offer(conn, payload: dict[str, Any]) -> None:
if conn.dialect.name == "postgresql":
sql = """
INSERT INTO external_offers (
source_code, platform_code, source_product_id, source_offer_key, title,
brand, category_text, product_url, image_url, price, original_price,
currency, stock_status, sold_count, rating, review_count, observed_at,
expires_at, ingestion_method, connector_key, pchome_product_id, momo_sku,
match_status, quality_score, data_quality_status, quality_notes_json,
raw_payload_json
)
VALUES (
:source_code, :platform_code, :source_product_id, :source_offer_key, :title,
:brand, :category_text, :product_url, :image_url, :price, :original_price,
:currency, :stock_status, :sold_count, :rating, :review_count, :observed_at,
:expires_at, :ingestion_method, :connector_key, :pchome_product_id, :momo_sku,
:match_status, :quality_score, :data_quality_status, :quality_notes_json,
:raw_payload_json
)
ON CONFLICT ON CONSTRAINT uq_external_offer_source_product_observed DO UPDATE SET
source_offer_key = EXCLUDED.source_offer_key,
title = EXCLUDED.title,
product_url = EXCLUDED.product_url,
image_url = EXCLUDED.image_url,
price = EXCLUDED.price,
original_price = EXCLUDED.original_price,
expires_at = EXCLUDED.expires_at,
connector_key = EXCLUDED.connector_key,
pchome_product_id = EXCLUDED.pchome_product_id,
momo_sku = EXCLUDED.momo_sku,
match_status = EXCLUDED.match_status,
quality_score = EXCLUDED.quality_score,
data_quality_status = EXCLUDED.data_quality_status,
quality_notes_json = EXCLUDED.quality_notes_json,
raw_payload_json = EXCLUDED.raw_payload_json,
updated_at = NOW()
"""
else:
sql = """
INSERT INTO external_offers (
source_code, platform_code, source_product_id, source_offer_key, title,
brand, category_text, product_url, image_url, price, original_price,
currency, stock_status, sold_count, rating, review_count, observed_at,
expires_at, ingestion_method, connector_key, pchome_product_id, momo_sku,
match_status, quality_score, data_quality_status, quality_notes_json,
raw_payload_json
)
VALUES (
:source_code, :platform_code, :source_product_id, :source_offer_key, :title,
:brand, :category_text, :product_url, :image_url, :price, :original_price,
:currency, :stock_status, :sold_count, :rating, :review_count, :observed_at,
:expires_at, :ingestion_method, :connector_key, :pchome_product_id, :momo_sku,
:match_status, :quality_score, :data_quality_status, :quality_notes_json,
:raw_payload_json
)
ON CONFLICT (source_code, source_product_id, observed_at, ingestion_method) DO UPDATE SET
source_offer_key = excluded.source_offer_key,
title = excluded.title,
product_url = excluded.product_url,
image_url = excluded.image_url,
price = excluded.price,
original_price = excluded.original_price,
expires_at = excluded.expires_at,
connector_key = excluded.connector_key,
pchome_product_id = excluded.pchome_product_id,
momo_sku = excluded.momo_sku,
match_status = excluded.match_status,
quality_score = excluded.quality_score,
data_quality_status = excluded.data_quality_status,
quality_notes_json = excluded.quality_notes_json,
raw_payload_json = excluded.raw_payload_json,
updated_at = CURRENT_TIMESTAMP
"""
conn.execute(text(sql), payload)
def _targeted_candidate_auto_type(candidate: dict[str, Any]) -> str:
explicit_type = str(candidate.get("auto_compare_type") or "").strip()
if explicit_type:
return explicit_type
if candidate.get("can_auto_compare"):
return "total_price"
return "manual_review"
def _targeted_candidate_needs_review(candidate: dict[str, Any]) -> bool:
"""總價自動同步前的最後防線,避免高分但款式待確認的候選進作戰清單。"""
if candidate.get("target_hard_veto") is True:
return True
price_basis = str(candidate.get("target_price_basis") or "").strip()
alert_tier = str(candidate.get("target_alert_tier") or "").strip()
if price_basis and price_basis != "total_price":
return True
if alert_tier and alert_tier != "price_alert_exact":
return True
review_reason_markers = {
"manual_review",
"identity_review",
"unit_price_review",
"variant_selection_review",
"variant_option_conflict",
"variant_descriptor_conflict",
"makeup_catalog_selection_gap",
"commercial_condition_gap",
"count_conflict",
"bundle_offer_conflict",
"multi_component_conflict",
"component_count_conflict",
}
reasons = {str(reason or "") for reason in (candidate.get("target_match_reasons") or [])}
return bool(reasons & review_reason_markers)
def _targeted_candidate_sync_rank(candidate: dict[str, Any]) -> tuple[float, float, float, float]:
"""同一個 PChome 商品有多個候選時,挑最適合營運判斷的一筆。"""
auto_type = _targeted_candidate_auto_type(candidate)
type_rank = 3.0 if auto_type == "total_price" else 2.0 if auto_type == "unit_price" else 0.0
try:
match_score = float(candidate.get("target_match_score") or 0.0)
except (TypeError, ValueError):
match_score = 0.0
unit_price_comparison = (
candidate.get("target_unit_price_comparison")
if isinstance(candidate.get("target_unit_price_comparison"), dict)
else {}
)
momo_total = _to_float(unit_price_comparison.get("momo_total_quantity")) or 0.0
pchome_total = _to_float(unit_price_comparison.get("competitor_total_quantity")) or 0.0
quantity_delta = 999999.0
same_quantity = 0.0
if momo_total > 0 and pchome_total > 0:
quantity_delta = abs(momo_total - pchome_total)
same_quantity = 1.0 if quantity_delta <= 0.0001 else 0.0
return (type_rank, same_quantity, -quantity_delta, match_score)
def _targeted_candidate_to_external_offer(
candidate: dict[str, Any],
*,
observed_at: datetime,
) -> tuple[dict[str, Any] | None, str]:
auto_type = _targeted_candidate_auto_type(candidate)
if auto_type not in {"total_price", "unit_price"}:
return None, "不是可自動使用的候選"
if auto_type == "total_price" and _targeted_candidate_needs_review(candidate):
return None, "候選仍需人工確認"
momo_sku = str(candidate.get("product_id") or candidate.get("goodsCode") or candidate.get("id") or "").strip()
pchome_product_id = str(candidate.get("target_pchome_product_id") or "").strip()
momo_price = _to_float(candidate.get("price"))
pchome_price = _to_float(candidate.get("target_pchome_price"))
if not momo_sku:
return None, "缺少 MOMO 商品 ID"
if not pchome_product_id:
return None, "缺少 PChome 商品 ID"
if not momo_price or momo_price <= 0:
return None, "缺少 MOMO 售價"
unit_price_comparison = (
candidate.get("target_unit_price_comparison")
if isinstance(candidate.get("target_unit_price_comparison"), dict)
else {}
)
is_unit_price = auto_type == "unit_price"
if is_unit_price and not unit_price_comparison.get("comparable"):
return None, "單位價證據不足"
match_score = _quality_score_from_match(candidate.get("target_match_score"))
if is_unit_price:
quality_score = max(match_score, 82.0)
price_basis = "unit_price"
else:
quality_score = match_score
price_basis = "total_price"
if quality_score < 76:
return None, "同款分數低於自動同步門檻"
title = str(candidate.get("name") or candidate.get("title") or momo_sku).strip()
notes = [
"由 PChome 商品自動反查 MOMO 候選同步",
"自動單位價比較" if is_unit_price else "可直接總價比價",
]
raw_payload = {
"source": "pchome_targeted_momo_search",
"auto_compare_type": auto_type,
"price_basis": price_basis,
"pchome_public_price": pchome_price,
"pchome_public_name": candidate.get("target_pchome_name"),
"match_score": candidate.get("target_match_score"),
"match_reasons": candidate.get("target_match_reasons") or [],
"comparison_mode": candidate.get("target_comparison_mode"),
"hard_veto": bool(candidate.get("target_hard_veto")),
"target_gap_pct": candidate.get("target_gap_pct"),
"unit_price_comparison": unit_price_comparison,
"search_term": candidate.get("target_search_term"),
"tags": [
"identity_v2",
"source_targeted_momo_search",
f"price_basis_{price_basis}",
f"auto_compare_{auto_type}",
],
}
return {
"source_code": "momo_reference",
"platform_code": "momo",
"source_product_id": momo_sku,
"source_offer_key": f"momo_reference:{momo_sku}:{pchome_product_id}:{price_basis}",
"title": title or momo_sku,
"brand": candidate.get("brand"),
"category_text": candidate.get("category") or candidate.get("category_text"),
"product_url": candidate.get("product_url") or candidate.get("url"),
"image_url": candidate.get("image_url"),
"price": momo_price,
"original_price": _to_float(candidate.get("original_price")),
"currency": "TWD",
"stock_status": None,
"sold_count": None,
"rating": None,
"review_count": None,
"observed_at": observed_at,
"expires_at": None,
"ingestion_method": "targeted_momo_search",
"connector_key": "pchome_targeted_momo_search",
"pchome_product_id": pchome_product_id,
"momo_sku": momo_sku,
"match_status": "verified",
"quality_score": round(quality_score, 2),
"data_quality_status": "verified",
"quality_notes_json": json.dumps(notes, ensure_ascii=False),
"raw_payload_json": json.dumps(raw_payload, ensure_ascii=False),
}, ""
def _targeted_review_candidate_to_external_offer(
candidate: dict[str, Any],
*,
observed_at: datetime,
) -> tuple[dict[str, Any] | None, str]:
"""保存待人工確認候選;這類資料不得進價格判斷。"""
alert_tier = str(candidate.get("target_alert_tier") or "").strip()
match_type = str(candidate.get("target_match_type") or "").strip()
if alert_tier not in {"identity_review", "unit_price_review"} and match_type in {"", "no_match"}:
return None, "不是可人工確認候選"
momo_sku = str(candidate.get("product_id") or candidate.get("goodsCode") or candidate.get("id") or "").strip()
pchome_product_id = str(candidate.get("target_pchome_product_id") or "").strip()
momo_price = _to_float(candidate.get("price"))
if not momo_sku:
return None, "缺少 MOMO 商品 ID"
if not pchome_product_id:
return None, "缺少 PChome 商品 ID"
if not momo_price or momo_price <= 0:
return None, "缺少 MOMO 售價"
match_score = _quality_score_from_match(candidate.get("target_match_score"))
if match_score < 60:
return None, "人工確認候選分數過低"
unit_price_comparison = (
candidate.get("target_unit_price_comparison")
if isinstance(candidate.get("target_unit_price_comparison"), dict)
else {}
)
title = str(candidate.get("name") or candidate.get("title") or momo_sku).strip()
price_basis = str(candidate.get("target_price_basis") or "manual_review").strip()
raw_payload = {
"source": "pchome_targeted_momo_search",
"review_state": "needs_review",
"auto_compare_type": "manual_review",
"price_basis": price_basis,
"pchome_public_price": _to_float(candidate.get("target_pchome_price")),
"pchome_public_name": candidate.get("target_pchome_name"),
"match_score": candidate.get("target_match_score"),
"match_reasons": candidate.get("target_match_reasons") or [],
"comparison_mode": candidate.get("target_comparison_mode"),
"match_type": match_type,
"alert_tier": alert_tier,
"hard_veto": bool(candidate.get("target_hard_veto")),
"target_gap_pct": candidate.get("target_gap_pct"),
"unit_price_comparison": unit_price_comparison,
"search_term": candidate.get("target_search_term"),
"tags": [
"identity_v2",
"source_targeted_momo_search",
"needs_review",
f"review_{alert_tier or 'identity_review'}",
],
}
return {
"source_code": "momo_reference",
"platform_code": "momo",
"source_product_id": momo_sku,
"source_offer_key": f"momo_reference:{momo_sku}:{pchome_product_id}:needs_review",
"title": title or momo_sku,
"brand": candidate.get("brand"),
"category_text": candidate.get("category") or candidate.get("category_text"),
"product_url": candidate.get("product_url") or candidate.get("url"),
"image_url": candidate.get("image_url"),
"price": momo_price,
"original_price": _to_float(candidate.get("original_price")),
"currency": "TWD",
"stock_status": None,
"sold_count": None,
"rating": None,
"review_count": None,
"observed_at": observed_at,
"expires_at": None,
"ingestion_method": "targeted_momo_review",
"connector_key": "pchome_targeted_momo_search_review",
"pchome_product_id": pchome_product_id,
"momo_sku": momo_sku,
"match_status": "needs_review",
"quality_score": round(match_score, 2),
"data_quality_status": "needs_review",
"quality_notes_json": json.dumps(["候選已找到,需人工確認同款或色號"], ensure_ascii=False),
"raw_payload_json": json.dumps(raw_payload, ensure_ascii=False),
}, ""
def sync_targeted_momo_candidates_to_external_offers(
engine,
candidates: list[dict[str, Any]],
*,
dry_run: bool = False,
) -> dict[str, Any]:
"""把頁面自動找到的安全 MOMO 候選同步進 external_offers。
只接受 total_price 與 unit_price 自動候選;人工確認候選不寫入。
"""
generated_at = datetime.now().isoformat(timespec="seconds")
candidates = list(candidates or [])
required_tables = {"external_market_sources", "external_offers"}
with engine.begin() as conn:
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
if missing_tables:
return {
"success": False,
"status": "skipped",
"generated_at": generated_at,
"candidate_count": len(candidates),
"written_count": 0,
"dry_run": dry_run,
"message": "外部報價同步暫時無法執行,缺少必要資料表。",
"missing_tables": missing_tables,
}
_ensure_external_market_source_seeds(conn)
base_observed_at = datetime.now()
ranked_offers: list[tuple[dict[str, Any], tuple[float, float, float, float]]] = []
skipped_reasons: dict[str, int] = {}
for index, candidate in enumerate(candidates):
offer, reason = _targeted_candidate_to_external_offer(
candidate,
observed_at=base_observed_at + timedelta(microseconds=index),
)
if offer:
ranked_offers.append((offer, _targeted_candidate_sync_rank(candidate)))
else:
skipped_reasons[reason] = skipped_reasons.get(reason, 0) + 1
selected_by_pchome: dict[str, tuple[dict[str, Any], tuple[float, float, float, float]]] = {}
for offer, rank in ranked_offers:
key = str(offer.get("pchome_product_id") or offer.get("source_offer_key") or "").strip()
existing = selected_by_pchome.get(key)
if existing is None or rank > existing[1]:
selected_by_pchome[key] = (offer, rank)
offers = [offer for offer, _ in selected_by_pchome.values()]
if not dry_run:
for offer in offers:
_upsert_external_offer(conn, offer)
if offers:
mark_pchome_growth_cache_stale()
unit_count = sum(
1
for offer in offers
if _load_json_dict(offer.get("raw_payload_json")).get("price_basis") == "unit_price"
)
total_count = len(offers) - unit_count
return {
"success": True,
"status": "dry_run" if dry_run else "synced",
"generated_at": generated_at,
"candidate_count": len(candidates),
"written_count": 0 if dry_run else len(offers),
"dry_run": dry_run,
"source_code": "momo_reference",
"total_price_count": total_count,
"unit_price_count": unit_count,
"skipped_reasons": skipped_reasons,
"message": (
"已把自動比價候選同步到外部價格參考。"
if not dry_run
else "已完成自動比價候選同步預檢,尚未寫入資料。"
),
}
def sync_targeted_momo_review_candidates_to_external_offers(
engine,
candidates: list[dict[str, Any]],
*,
dry_run: bool = False,
) -> dict[str, Any]:
"""把待確認 MOMO 候選保存起來,供人工審核;不進價格告警。"""
generated_at = datetime.now().isoformat(timespec="seconds")
candidates = list(candidates or [])
required_tables = {"external_market_sources", "external_offers"}
with engine.begin() as conn:
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
if missing_tables:
return {
"success": False,
"status": "skipped",
"generated_at": generated_at,
"candidate_count": len(candidates),
"written_count": 0,
"dry_run": dry_run,
"message": "待確認候選暫時無法保存,缺少必要資料表。",
"missing_tables": missing_tables,
}
_ensure_external_market_source_seeds(conn)
base_observed_at = datetime.now()
ranked_offers: list[tuple[dict[str, Any], tuple[float, float, float, float]]] = []
skipped_reasons: dict[str, int] = {}
for index, candidate in enumerate(candidates):
offer, reason = _targeted_review_candidate_to_external_offer(
candidate,
observed_at=base_observed_at + timedelta(microseconds=index),
)
if offer:
ranked_offers.append((offer, _targeted_candidate_sync_rank(candidate)))
else:
skipped_reasons[reason] = skipped_reasons.get(reason, 0) + 1
selected_by_pchome: dict[str, tuple[dict[str, Any], tuple[float, float, float, float]]] = {}
for offer, rank in ranked_offers:
key = str(offer.get("pchome_product_id") or offer.get("source_offer_key") or "").strip()
existing = selected_by_pchome.get(key)
if existing is None or rank > existing[1]:
selected_by_pchome[key] = (offer, rank)
offers = [offer for offer, _ in selected_by_pchome.values()]
if not dry_run:
for offer in offers:
_upsert_external_offer(conn, offer)
if offers:
mark_pchome_growth_cache_stale()
return {
"success": True,
"status": "dry_run" if dry_run else "synced",
"generated_at": generated_at,
"candidate_count": len(candidates),
"written_count": 0 if dry_run else len(offers),
"dry_run": dry_run,
"source_code": "momo_reference",
"skipped_reasons": skipped_reasons,
"message": (
"已保存待人工確認的 MOMO 候選。"
if not dry_run
else "已完成待確認候選預檢,尚未寫入資料。"
),
}
def sync_legacy_momo_reference_offers(engine, *, limit: int = 500, dry_run: bool = False) -> dict[str, Any]:
"""把既有已確認同款的比價快取自動同步到 external_offers。"""
limit = max(1, min(int(limit or 500), 5000))
generated_at = datetime.now().isoformat(timespec="seconds")
required_tables = {"external_market_sources", "external_offers", "competitor_prices", "products", "price_records"}
with engine.begin() as conn:
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
if missing_tables:
return {
"success": False,
"status": "skipped",
"generated_at": generated_at,
"candidate_count": 0,
"written_count": 0,
"dry_run": dry_run,
"message": "外部報價同步暫時無法執行,缺少必要資料表。",
"missing_tables": missing_tables,
}
_ensure_external_market_source_seeds(conn)
rows = _fetch_legacy_momo_reference_rows(conn, limit)
offers = [_legacy_row_to_external_offer(row) for row in rows]
offers = [
offer for offer in offers
if offer["source_product_id"] and offer["pchome_product_id"] and offer["price"]
]
if not dry_run:
for offer in offers:
_upsert_external_offer(conn, offer)
if offers:
mark_pchome_growth_cache_stale()
return {
"success": True,
"status": "dry_run" if dry_run else "synced",
"generated_at": generated_at,
"candidate_count": len(rows),
"written_count": 0 if dry_run else len(offers),
"dry_run": dry_run,
"source_code": "momo_reference",
"message": (
"已完成 MOMO 外部價格參考自動同步。"
if not dry_run
else "已完成 MOMO 外部價格參考同步預檢,尚未寫入資料。"
),
"sample_rows": [
{
"momo_sku": offer["momo_sku"],
"pchome_product_id": offer["pchome_product_id"],
"price": offer["price"],
"quality_score": offer["quality_score"],
}
for offer in offers[:5]
],
}
def _normalize_header(header: str) -> str:
cleaned = str(header or "").strip().replace("\ufeff", "")
for canonical, aliases in CSV_HEADER_ALIASES.items():
if cleaned in aliases:
return canonical
return cleaned
def _read_csv_rows(csv_text: str, limit: int) -> tuple[list[dict[str, Any]], list[str]]:
text_value = (csv_text or "").strip("\ufeff\n\r ")
if not text_value:
return [], ["CSV 內容是空的"]
sample = text_value[:4096]
try:
dialect = csv.Sniffer().sniff(sample, delimiters=",\t;")
except csv.Error:
dialect = csv.excel
reader = csv.DictReader(io.StringIO(text_value), dialect=dialect)
if not reader.fieldnames:
return [], ["找不到表頭列"]
raw_headers = [str(header or "").strip().replace("\ufeff", "") for header in reader.fieldnames]
normalized_headers = [_normalize_header(header) for header in raw_headers]
if len(set(normalized_headers)) != len(normalized_headers):
return [], ["表頭有重複欄位,請先合併或重新命名"]
rows = []
for index, raw_row in enumerate(reader, start=2):
if len(rows) >= limit:
break
normalized = {}
has_value = False
for raw_header, normalized_header in zip(raw_headers, normalized_headers):
value = raw_row.get(raw_header)
if value is not None and str(value).strip():
has_value = True
normalized[normalized_header] = str(value or "").strip()
if has_value:
normalized["_row_number"] = index
rows.append(normalized)
return rows, []
def _classify_offer_record(record: ExternalOfferPayload | None, errors: list[str]) -> dict[str, Any]:
if errors or record is None:
return {
"status_code": "blocked",
"status_label": "不能使用",
"can_enter_alerts": False,
"reasons": errors or ["資料格式需要修正"],
}
reasons: list[str] = []
source_code = record.source_code
match_status = (record.match_status or "").strip().lower()
is_verified_match = match_status in {"verified", "usable", "reviewed", "exact", "confirmed"}
has_pchome_id = bool(str(record.pchome_product_id or "").strip())
has_good_quality = record.quality_score >= 76
if source_code not in ALLOWED_SOURCE_CODES:
reasons.append("資料來源不在允許清單")
if source_code in PAUSED_SOURCE_CODES:
reasons.append("這個來源目前先暫停,不進告警")
if not is_verified_match:
reasons.append("尚未確認同款")
if not has_pchome_id:
reasons.append("缺少 PChome 商品 ID無法連到業績")
if not has_good_quality:
reasons.append("資料可信度低於 76")
can_use = (
source_code in ACTIVE_SOURCE_CODES
and is_verified_match
and has_pchome_id
and has_good_quality
and not reasons
)
if can_use:
return {
"status_code": "ready",
"status_label": "可使用",
"can_enter_alerts": True,
"reasons": ["可進作戰清單"],
}
return {
"status_code": "review",
"status_label": "需人工確認",
"can_enter_alerts": False,
"reasons": reasons or ["需要人工確認"],
}
def dry_run_external_offer_csv(csv_text: str, *, limit: int = 200) -> dict[str, Any]:
"""檢查手動 CSV 是否能轉成外部報價格式;只讀,不寫 DB。"""
limit = max(1, min(int(limit or 200), 1000))
rows, parse_errors = _read_csv_rows(csv_text, limit=limit)
if parse_errors:
return {
"success": False,
"message": "CSV 預檢失敗,請先修正檔案格式。",
"summary": {
"total_rows": 0,
"ready_count": 0,
"review_count": 0,
"blocked_count": 0,
},
"errors": parse_errors,
"rows": [],
}
checked_rows = []
summary = {
"total_rows": len(rows),
"ready_count": 0,
"review_count": 0,
"blocked_count": 0,
}
for row in rows:
record, errors = normalize_external_offer_payload(row)
classification = _classify_offer_record(record, errors)
summary[f"{classification['status_code']}_count"] += 1
preview = record.to_record() if record else {}
checked_rows.append({
"row_number": row.get("_row_number"),
"status_code": classification["status_code"],
"status_label": classification["status_label"],
"can_enter_alerts": classification["can_enter_alerts"],
"reasons": classification["reasons"][:4],
"source_code": preview.get("source_code") or row.get("source_code") or "",
"source_product_id": preview.get("source_product_id") or row.get("source_product_id") or "",
"title": preview.get("title") or row.get("title") or "",
"price": preview.get("price"),
"pchome_product_id": preview.get("pchome_product_id") or "",
"quality_score": preview.get("quality_score") if preview else row.get("quality_score"),
})
return {
"success": True,
"message": "CSV 預檢完成,尚未寫入資料。",
"summary": summary,
"errors": [],
"rows": checked_rows,
"manual_csv": build_connector_contracts()["manual_csv"],
}
def _legacy_momo_reference_stats(conn) -> dict[str, Any]:
if not _has_table(conn, "competitor_prices"):
return {"usable_offer_count": 0, "last_seen_at": None}
if conn.dialect.name == "postgresql":
sql = """
SELECT COUNT(*) AS usable_offer_count, MAX(crawled_at) AS last_seen_at
FROM competitor_prices
WHERE source = 'pchome'
AND competitor_product_id IS NOT NULL
AND price IS NOT NULL
AND price > 0
AND COALESCE(match_score, 0) >= 0.76
AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
"""
else:
sql = """
SELECT COUNT(*) AS usable_offer_count, MAX(crawled_at) AS last_seen_at
FROM competitor_prices
WHERE source = 'pchome'
AND competitor_product_id IS NOT NULL
AND price IS NOT NULL
AND price > 0
AND COALESCE(match_score, 0) >= 0.76
AND COALESCE(tags, '') LIKE '%identity_v2%'
"""
row = conn.execute(text(sql)).mappings().first() or {}
return {
"usable_offer_count": int(row.get("usable_offer_count") or 0),
"last_seen_at": str(row.get("last_seen_at") or "") or None,
}
def _normalized_offer_stats(conn) -> dict[str, dict[str, Any]]:
if not all(_has_table(conn, table) for table in {"external_market_sources", "external_offers"}):
return {}
rows = conn.execute(text("""
SELECT
s.code,
s.status,
s.enabled,
COUNT(o.id) AS offer_count,
SUM(CASE
WHEN o.data_quality_status IN ('verified', 'usable', 'reviewed')
AND COALESCE(o.quality_score, 0) >= 76
THEN 1 ELSE 0 END
) AS usable_offer_count,
SUM(CASE
WHEN o.match_status = 'needs_review'
OR o.data_quality_status = 'needs_review'
THEN 1 ELSE 0 END
) AS review_offer_count,
MAX(o.observed_at) AS last_seen_at
FROM external_market_sources s
LEFT JOIN external_offers o ON o.source_code = s.code
GROUP BY s.code, s.status, s.enabled
""")).mappings().all()
return {
str(row["code"]): {
"status": row.get("status"),
"enabled": bool(row.get("enabled")),
"offer_count": int(row.get("offer_count") or 0),
"usable_offer_count": int(row.get("usable_offer_count") or 0),
"review_offer_count": int(row.get("review_offer_count") or 0),
"last_seen_at": str(row.get("last_seen_at") or "") or None,
}
for row in rows
}
def build_connector_contracts() -> dict[str, Any]:
"""回傳 connector 與手動 CSV 共同遵守的欄位規格。"""
return {
"success": True,
"version": "2026-06-15",
"plain_summary": "所有外部市場資料都先轉成同一份商品報價格式,再進作戰清單。",
"sources": SOURCE_CONTRACTS,
"normalized_offer_fields": NORMALIZED_OFFER_FIELDS,
"manual_csv": {
"encoding": "utf-8-sig",
"required_headers": [
field["name"] for field in NORMALIZED_OFFER_FIELDS if field["required"]
],
"optional_headers": [
field["name"] for field in NORMALIZED_OFFER_FIELDS if not field["required"]
],
"plain_rule": "低可信度或未確認同款的資料只進待補資料清單,不自動發告警。",
},
}
def build_external_source_readiness(engine=None) -> dict[str, Any]:
"""建立畫面可用的外部資料來源狀態。"""
sources = [dict(source) for source in SOURCE_CONTRACTS]
normalized_stats: dict[str, dict[str, Any]] = {}
legacy_stats: dict[str, Any] = {"usable_offer_count": 0, "last_seen_at": None}
schema_ready = False
if engine is not None:
try:
with engine.connect() as conn:
schema_ready = all(
_has_table(conn, table)
for table in {"external_market_sources", "external_offers"}
)
normalized_stats = _normalized_offer_stats(conn)
legacy_stats = _legacy_momo_reference_stats(conn)
except Exception:
logger.warning("[ExternalMarket] source readiness failed", exc_info=True)
for source in sources:
stats = normalized_stats.get(source["code"], {})
if source["code"] == "momo_reference":
usable = max(
int(stats.get("usable_offer_count") or 0),
int(legacy_stats.get("usable_offer_count") or 0),
)
last_seen_at = stats.get("last_seen_at") or legacy_stats.get("last_seen_at")
else:
usable = int(stats.get("usable_offer_count") or 0)
last_seen_at = stats.get("last_seen_at")
source["schema_ready"] = schema_ready
source["usable_offer_count"] = usable
source["review_offer_count"] = int(stats.get("review_offer_count") or 0)
source["last_seen_at"] = last_seen_at
source["can_alert"] = source["status_code"] == "active" and usable > 0
if source["status_code"] == "active":
if usable:
source["plain_state"] = "已接入,可進作戰清單"
elif source["review_offer_count"]:
source["plain_state"] = "已有待確認候選"
else:
source["plain_state"] = "已接入,等待可用資料"
else:
source["plain_state"] = "先保留接口,不進告警"
active_count = sum(1 for source in sources if source["status_code"] == "active")
paused_count = sum(1 for source in sources if source["status_code"] == "paused")
usable_count = sum(int(source["usable_offer_count"]) for source in sources)
review_offer_count = sum(int(source.get("review_offer_count") or 0) for source in sources)
return {
"success": True,
"schema_ready": schema_ready,
"active_count": active_count,
"paused_count": paused_count,
"usable_offer_count": usable_count,
"review_offer_count": review_offer_count,
"sources": sources,
"connector_contract": build_connector_contracts(),
"plain_summary": "MOMO 先用;其他主流平台已列管,未接合法穩定來源前不進告警。",
}
def list_momo_review_candidates(engine, *, limit: int = 20) -> dict[str, Any]:
"""列出待人工確認的 MOMO 候選,供前台直接處理。"""
limit = max(1, min(int(limit or 20), 50))
generated_at = datetime.now().isoformat(timespec="seconds")
required_tables = {"external_offers"}
with engine.connect() as conn:
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
if missing_tables:
return {
"success": False,
"generated_at": generated_at,
"rows": [],
"count": 0,
"missing_tables": missing_tables,
"message": "待確認候選暫時無法讀取,缺少必要資料表。",
}
rows = conn.execute(text("""
SELECT
id,
source_product_id,
title,
product_url,
image_url,
price,
pchome_product_id,
momo_sku,
match_status,
quality_score,
data_quality_status,
quality_notes_json,
raw_payload_json,
observed_at,
updated_at
FROM external_offers
WHERE source_code = 'momo_reference'
AND ingestion_method = 'targeted_momo_review'
AND (
match_status = 'needs_review'
OR data_quality_status = 'needs_review'
)
ORDER BY observed_at DESC, id DESC
LIMIT :limit
"""), {"limit": limit * 4}).mappings().all()
pchome_image_map = _fetch_pchome_public_image_map([
str(row.get("pchome_product_id") or "").strip()
for row in rows
])
seen: set[tuple[str, str]] = set()
items: list[dict[str, Any]] = []
for row in rows:
raw_payload = _load_json_dict(row.get("raw_payload_json"))
quality_notes = _load_json_list(row.get("quality_notes_json"))
key = (
str(row.get("pchome_product_id") or "").strip(),
str(row.get("source_product_id") or "").strip(),
)
if key in seen:
continue
seen.add(key)
reasons = [
str(reason)
for reason in (raw_payload.get("match_reasons") or [])
if str(reason or "").strip()
]
if not reasons:
reasons = [str(note) for note in quality_notes if str(note or "").strip()]
reason_labels = _humanize_review_reasons(reasons)
pchome_product_id = row.get("pchome_product_id")
momo_url = row.get("product_url")
pchome_price = _to_float(raw_payload.get("pchome_public_price"))
momo_price = _to_float(row.get("price"))
gap_pct = _to_float(raw_payload.get("target_gap_pct"))
pchome_image_url = (
raw_payload.get("pchome_image_url")
or raw_payload.get("pchome_public_image_url")
or pchome_image_map.get(str(pchome_product_id or "").strip())
or ""
)
momo_image_url = row.get("image_url")
items.append({
"id": int(row.get("id")),
"pchome_product_id": pchome_product_id,
"pchome_product_name": raw_payload.get("pchome_public_name") or "",
"pchome_url": _build_pchome_product_url(pchome_product_id),
"pchome_image_url": pchome_image_url,
"pchome_price": pchome_price,
"momo_sku": row.get("momo_sku") or row.get("source_product_id"),
"momo_title": row.get("title"),
"momo_price": momo_price,
"momo_url": momo_url,
"product_url": momo_url,
"image_url": momo_image_url,
"momo_image_url": momo_image_url,
"quality_score": round(_to_float(row.get("quality_score")) or 0.0, 2),
"alert_tier": raw_payload.get("alert_tier") or "identity_review",
"price_basis": raw_payload.get("price_basis") or "manual_review",
"gap_pct": gap_pct,
"match_reasons": reason_labels,
"match_reason_labels": reason_labels,
"reason_summary": "".join(reason_labels),
"observed_at": str(row.get("observed_at") or ""),
"updated_at": str(row.get("updated_at") or ""),
"plain_status": "待確認同款或色號",
"suggested_next_action": "確認同款後才進入價格判斷;不是同款就排除。",
})
if len(items) >= limit:
break
return {
"success": True,
"generated_at": generated_at,
"rows": items,
"count": len(items),
"message": "已整理 MOMO 待確認候選。",
}
def update_momo_review_candidate(engine, offer_id: int, action: str, *, note: str = "") -> dict[str, Any]:
"""確認或排除一筆 MOMO 待確認候選。"""
try:
offer_id = int(offer_id)
except (TypeError, ValueError):
return {"success": False, "message": "缺少有效的候選編號。"}
action = str(action or "").strip().lower()
if action not in {"confirm", "reject"}:
return {"success": False, "message": "請選擇確認同款或排除候選。"}
generated_at = datetime.now().isoformat(timespec="seconds")
new_match_status = "verified" if action == "confirm" else "rejected"
new_quality_status = "verified" if action == "confirm" else "rejected"
label = "人工確認同款" if action == "confirm" else "人工排除候選"
review_note = str(note or "").strip()[:240]
with engine.begin() as conn:
if not _has_table(conn, "external_offers"):
return {
"success": False,
"generated_at": generated_at,
"message": "待確認候選暫時無法更新,缺少必要資料表。",
}
row = conn.execute(text("""
SELECT id, match_status, data_quality_status, quality_notes_json, raw_payload_json
FROM external_offers
WHERE id = :offer_id
AND source_code = 'momo_reference'
AND ingestion_method = 'targeted_momo_review'
LIMIT 1
"""), {"offer_id": offer_id}).mappings().first()
if not row:
return {
"success": False,
"generated_at": generated_at,
"message": "找不到這筆待確認候選。",
}
raw_payload = _load_json_dict(row.get("raw_payload_json"))
raw_payload["review_state"] = new_match_status
raw_payload["reviewed_at"] = generated_at
raw_payload["review_action"] = action
if review_note:
raw_payload["review_note"] = review_note
tags = raw_payload.get("tags") if isinstance(raw_payload.get("tags"), list) else []
tag_to_add = "manual_verified" if action == "confirm" else "manual_rejected"
raw_payload["tags"] = [*tags, tag_to_add] if tag_to_add not in tags else tags
notes = [str(item) for item in _load_json_list(row.get("quality_notes_json")) if str(item or "").strip()]
notes.append(label if not review_note else f"{label}{review_note}")
conn.execute(text("""
UPDATE external_offers
SET match_status = :match_status,
data_quality_status = :data_quality_status,
quality_notes_json = :quality_notes_json,
raw_payload_json = :raw_payload_json,
updated_at = CURRENT_TIMESTAMP
WHERE id = :offer_id
"""), {
"offer_id": offer_id,
"match_status": new_match_status,
"data_quality_status": new_quality_status,
"quality_notes_json": json.dumps(notes[-6:], ensure_ascii=False),
"raw_payload_json": json.dumps(raw_payload, ensure_ascii=False),
})
mark_pchome_growth_cache_stale()
return {
"success": True,
"generated_at": generated_at,
"id": offer_id,
"action": action,
"match_status": new_match_status,
"data_quality_status": new_quality_status,
"message": "已確認同款,會進入作戰清單。" if action == "confirm" else "已排除候選,不會再進入待確認。",
}