1438 lines
57 KiB
Python
1438 lines
57 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""外部市場來源與報價的正規化服務。
|
||
|
||
第一版只做資料規格、來源狀態與只讀統計,不主動抓資料、不寫 DB。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import csv
|
||
import io
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timedelta
|
||
from typing import Any
|
||
|
||
from sqlalchemy import inspect, text
|
||
|
||
from services.pchome_growth_cache_state import mark_pchome_growth_cache_stale
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
SOURCE_CONTRACTS = [
|
||
{
|
||
"code": "momo_reference",
|
||
"display_name": "MOMO 外部價格參考",
|
||
"platform_code": "momo",
|
||
"status_code": "active",
|
||
"status_label": "正在使用",
|
||
"source_kind": "legacy_bridge",
|
||
"input_methods": ["既有比價快取", "手動 CSV", "供應商 API"],
|
||
"data_quality_label": "只採用已確認同款",
|
||
"plain_note": "目前用已確認同款的 MOMO 參考價,協助判斷 PChome 商品是否需要調整售價或曝光。",
|
||
},
|
||
{
|
||
"code": "shopee",
|
||
"display_name": "蝦皮",
|
||
"platform_code": "shopee",
|
||
"status_code": "paused",
|
||
"status_label": "先暫停",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "先保留資料接口,等有穩定合法來源後再啟用,不會影響目前作戰清單。",
|
||
},
|
||
{
|
||
"code": "coupang",
|
||
"display_name": "酷澎",
|
||
"platform_code": "coupang",
|
||
"status_code": "paused",
|
||
"status_label": "先暫停",
|
||
"source_kind": "connector_contract",
|
||
"input_methods": ["官方 API", "供應商 API", "手動 CSV"],
|
||
"data_quality_label": "暫不進告警",
|
||
"plain_note": "先保留資料接口,等有穩定合法來源後再啟用,不會影響目前作戰清單。",
|
||
},
|
||
]
|
||
|
||
NORMALIZED_OFFER_FIELDS = [
|
||
{
|
||
"name": "source_code",
|
||
"label": "資料來源",
|
||
"required": True,
|
||
"plain_note": "例如 momo_reference、shopee、coupang。",
|
||
},
|
||
{
|
||
"name": "source_product_id",
|
||
"label": "外部商品 ID",
|
||
"required": True,
|
||
"plain_note": "外部平台或資料供應商給的商品編號。",
|
||
},
|
||
{
|
||
"name": "title",
|
||
"label": "商品名稱",
|
||
"required": True,
|
||
"plain_note": "用來做人工確認與名稱比對。",
|
||
},
|
||
{
|
||
"name": "price",
|
||
"label": "售價",
|
||
"required": True,
|
||
"plain_note": "只填可直接比較的成交或頁面售價。",
|
||
},
|
||
{
|
||
"name": "observed_at",
|
||
"label": "資料時間",
|
||
"required": True,
|
||
"plain_note": "這筆價格看到的時間。",
|
||
},
|
||
{
|
||
"name": "ingestion_method",
|
||
"label": "取得方式",
|
||
"required": True,
|
||
"plain_note": "official_api、provider_api、manual_csv 或 legacy_competitor_cache。",
|
||
},
|
||
{
|
||
"name": "pchome_product_id",
|
||
"label": "PChome 商品 ID",
|
||
"required": False,
|
||
"plain_note": "若已確認同款才填,未確認就留空。",
|
||
},
|
||
{
|
||
"name": "quality_score",
|
||
"label": "資料可信度",
|
||
"required": False,
|
||
"plain_note": "0 到 100,低於 76 不進自動告警。",
|
||
},
|
||
]
|
||
|
||
CSV_HEADER_ALIASES = {
|
||
"source_code": {"source_code", "資料來源", "來源", "平台來源"},
|
||
"platform_code": {"platform_code", "平台", "平台代碼"},
|
||
"source_product_id": {
|
||
"source_product_id",
|
||
"外部商品 ID",
|
||
"外部商品ID",
|
||
"平台商品編號",
|
||
"商品ID",
|
||
"商品編號",
|
||
},
|
||
"title": {"title", "商品名稱", "品名", "name"},
|
||
"price": {"price", "售價", "價格", "成交價"},
|
||
"observed_at": {"observed_at", "資料時間", "抓取時間", "看到時間", "時間"},
|
||
"ingestion_method": {"ingestion_method", "取得方式", "匯入方式", "來源方式"},
|
||
"currency": {"currency", "幣別"},
|
||
"original_price": {"original_price", "原價", "牌價"},
|
||
"product_url": {"product_url", "商品網址", "網址", "url"},
|
||
"brand": {"brand", "品牌"},
|
||
"category_text": {"category_text", "分類", "類別"},
|
||
"pchome_product_id": {
|
||
"pchome_product_id",
|
||
"PChome 商品 ID",
|
||
"PChome商品ID",
|
||
"PChome商品編號",
|
||
"pchome_id",
|
||
},
|
||
"momo_sku": {"momo_sku", "MOMO SKU", "momo_sku", "momo_i_code"},
|
||
"match_status": {"match_status", "同款狀態", "比對狀態", "是否同款"},
|
||
"quality_score": {"quality_score", "資料可信度", "可信度", "品質分數"},
|
||
"data_quality_status": {"data_quality_status", "資料狀態", "品質狀態"},
|
||
"quality_note": {"quality_note", "備註", "品質備註"},
|
||
}
|
||
|
||
ALLOWED_SOURCE_CODES = {source["code"] for source in SOURCE_CONTRACTS}
|
||
PAUSED_SOURCE_CODES = {
|
||
source["code"] for source in SOURCE_CONTRACTS if source["status_code"] == "paused"
|
||
}
|
||
ACTIVE_SOURCE_CODES = {
|
||
source["code"] for source in SOURCE_CONTRACTS if source["status_code"] == "active"
|
||
}
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class ExternalOfferPayload:
|
||
source_code: str
|
||
platform_code: str
|
||
source_product_id: str
|
||
title: str
|
||
price: float | None
|
||
observed_at: str
|
||
ingestion_method: str
|
||
currency: str = "TWD"
|
||
original_price: float | None = None
|
||
product_url: str | None = None
|
||
brand: str | None = None
|
||
category_text: str | None = None
|
||
pchome_product_id: str | None = None
|
||
momo_sku: str | None = None
|
||
match_status: str = "unmatched"
|
||
quality_score: float = 0.0
|
||
data_quality_status: str = "needs_review"
|
||
quality_notes: list[str] = field(default_factory=list)
|
||
|
||
def to_record(self) -> dict[str, Any]:
|
||
return {
|
||
"source_code": self.source_code,
|
||
"platform_code": self.platform_code,
|
||
"source_product_id": self.source_product_id,
|
||
"source_offer_key": f"{self.source_code}:{self.source_product_id}",
|
||
"title": self.title,
|
||
"price": self.price,
|
||
"currency": self.currency or "TWD",
|
||
"original_price": self.original_price,
|
||
"product_url": self.product_url,
|
||
"brand": self.brand,
|
||
"category_text": self.category_text,
|
||
"observed_at": self.observed_at,
|
||
"ingestion_method": self.ingestion_method,
|
||
"pchome_product_id": self.pchome_product_id,
|
||
"momo_sku": self.momo_sku,
|
||
"match_status": self.match_status,
|
||
"quality_score": self.quality_score,
|
||
"data_quality_status": self.data_quality_status,
|
||
"quality_notes_json": json.dumps(self.quality_notes, ensure_ascii=False),
|
||
}
|
||
|
||
|
||
def _to_float(value: Any) -> float | None:
|
||
if value is None or value == "":
|
||
return None
|
||
try:
|
||
return float(value)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
|
||
def _load_json_list(value: Any) -> list[Any]:
|
||
if not value:
|
||
return []
|
||
if isinstance(value, list):
|
||
return value
|
||
try:
|
||
parsed = json.loads(value)
|
||
return parsed if isinstance(parsed, list) else []
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
def _load_json_dict(value: Any) -> dict[str, Any]:
|
||
if not value:
|
||
return {}
|
||
if isinstance(value, dict):
|
||
return value
|
||
try:
|
||
parsed = json.loads(value)
|
||
return parsed if isinstance(parsed, dict) else {}
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _has_table(conn, table_name: str) -> bool:
|
||
try:
|
||
return inspect(conn).has_table(table_name)
|
||
except Exception:
|
||
logger.warning("[ExternalMarket] table probe failed: %s", table_name, exc_info=True)
|
||
return False
|
||
|
||
|
||
def _quality_score_from_match(value: Any) -> float:
|
||
score = _to_float(value)
|
||
if score is None:
|
||
return 0.0
|
||
if 0 <= score <= 1:
|
||
score *= 100
|
||
return max(0.0, min(100.0, score))
|
||
|
||
|
||
def _normalize_source_code(value: Any) -> str:
|
||
raw = str(value or "").strip()
|
||
lower = raw.lower()
|
||
mapping = {
|
||
"momo": "momo_reference",
|
||
"momo_reference": "momo_reference",
|
||
"momo 外部價格參考": "momo_reference",
|
||
"蝦皮": "shopee",
|
||
"shopee": "shopee",
|
||
"酷澎": "coupang",
|
||
"coupang": "coupang",
|
||
}
|
||
return mapping.get(lower, mapping.get(raw, raw))
|
||
|
||
|
||
def _normalize_ingestion_method(value: Any) -> str:
|
||
raw = str(value or "").strip()
|
||
lower = raw.lower()
|
||
mapping = {
|
||
"備用資料": "manual_csv",
|
||
"備援資料": "manual_csv",
|
||
"手動資料": "manual_csv",
|
||
"手動 csv": "manual_csv",
|
||
"手動CSV": "manual_csv",
|
||
"manual_csv": "manual_csv",
|
||
"官方 api": "official_api",
|
||
"官方API": "official_api",
|
||
"official_api": "official_api",
|
||
"供應商 api": "provider_api",
|
||
"供應商API": "provider_api",
|
||
"provider_api": "provider_api",
|
||
}
|
||
return mapping.get(lower, mapping.get(raw, raw))
|
||
|
||
|
||
def _normalize_match_status(value: Any) -> str:
|
||
raw = str(value or "").strip()
|
||
lower = raw.lower()
|
||
mapping = {
|
||
"是": "verified",
|
||
"已確認": "verified",
|
||
"確認同款": "verified",
|
||
"同款": "verified",
|
||
"verified": "verified",
|
||
"否": "unmatched",
|
||
"不是": "unmatched",
|
||
"非同款": "unmatched",
|
||
"待確認": "unmatched",
|
||
"未確認": "unmatched",
|
||
"unmatched": "unmatched",
|
||
}
|
||
return mapping.get(lower, mapping.get(raw, raw))
|
||
|
||
|
||
def normalize_external_offer_payload(payload: dict[str, Any]) -> tuple[ExternalOfferPayload | None, list[str]]:
|
||
"""把 official API / provider API / manual CSV 的資料轉成同一份欄位。"""
|
||
errors: list[str] = []
|
||
source_code = _normalize_source_code(payload.get("source_code"))
|
||
platform_code = str(payload.get("platform_code") or source_code or "").strip()
|
||
source_product_id = str(payload.get("source_product_id") or "").strip()
|
||
title = str(payload.get("title") or payload.get("name") or "").strip()
|
||
ingestion_method = _normalize_ingestion_method(payload.get("ingestion_method"))
|
||
observed_at = str(payload.get("observed_at") or "").strip()
|
||
price = _to_float(payload.get("price"))
|
||
|
||
required_values = {
|
||
"資料來源": source_code,
|
||
"外部商品 ID": source_product_id,
|
||
"商品名稱": title,
|
||
"售價": price,
|
||
"資料時間": observed_at,
|
||
"取得方式": ingestion_method,
|
||
}
|
||
for label, value in required_values.items():
|
||
if value is None or value == "":
|
||
errors.append(f"缺少{label}")
|
||
|
||
if price is not None and price <= 0:
|
||
errors.append("售價必須大於 0")
|
||
|
||
if observed_at:
|
||
try:
|
||
datetime.fromisoformat(observed_at.replace("Z", "+00:00"))
|
||
except ValueError:
|
||
errors.append("資料時間格式需為 ISO 格式,例如 2026-06-15T10:00:00")
|
||
|
||
if errors:
|
||
return None, errors
|
||
|
||
quality_score = _to_float(payload.get("quality_score"))
|
||
if quality_score is None:
|
||
quality_score = 0.0
|
||
quality_notes = _load_json_list(payload.get("quality_notes"))
|
||
if not quality_notes and payload.get("quality_note"):
|
||
quality_notes = [str(payload.get("quality_note"))]
|
||
|
||
record = ExternalOfferPayload(
|
||
source_code=source_code,
|
||
platform_code=platform_code,
|
||
source_product_id=source_product_id,
|
||
title=title,
|
||
price=price,
|
||
observed_at=observed_at,
|
||
ingestion_method=ingestion_method,
|
||
currency=str(payload.get("currency") or "TWD").strip() or "TWD",
|
||
original_price=_to_float(payload.get("original_price")),
|
||
product_url=payload.get("product_url"),
|
||
brand=payload.get("brand"),
|
||
category_text=payload.get("category_text"),
|
||
pchome_product_id=payload.get("pchome_product_id"),
|
||
momo_sku=payload.get("momo_sku"),
|
||
match_status=_normalize_match_status(payload.get("match_status") or "unmatched"),
|
||
quality_score=max(0.0, min(100.0, quality_score)),
|
||
data_quality_status=str(payload.get("data_quality_status") or "needs_review"),
|
||
quality_notes=[str(item) for item in quality_notes],
|
||
)
|
||
return record, []
|
||
|
||
|
||
def _ensure_external_market_source_seeds(conn) -> None:
|
||
if not _has_table(conn, "external_market_sources"):
|
||
return
|
||
|
||
for source in SOURCE_CONTRACTS:
|
||
payload = {
|
||
"code": source["code"],
|
||
"display_name": source["display_name"],
|
||
"platform_code": source["platform_code"],
|
||
"source_kind": source["source_kind"],
|
||
"status": source["status_code"],
|
||
"enabled": source["status_code"] == "active",
|
||
"write_enabled": False,
|
||
"allowed_input_methods_json": json.dumps(source["input_methods"], ensure_ascii=False),
|
||
"quality_policy_json": json.dumps({
|
||
"minimum_match_status": "verified",
|
||
"minimum_quality_score": 76,
|
||
}, ensure_ascii=False),
|
||
"plain_note": source["plain_note"],
|
||
}
|
||
if conn.dialect.name == "postgresql":
|
||
conn.execute(text("""
|
||
INSERT INTO external_market_sources (
|
||
code, display_name, platform_code, source_kind, status, enabled,
|
||
write_enabled, allowed_input_methods_json, quality_policy_json, plain_note
|
||
)
|
||
VALUES (
|
||
:code, :display_name, :platform_code, :source_kind, :status, :enabled,
|
||
:write_enabled, :allowed_input_methods_json, :quality_policy_json, :plain_note
|
||
)
|
||
ON CONFLICT (code) DO UPDATE SET
|
||
display_name = EXCLUDED.display_name,
|
||
platform_code = EXCLUDED.platform_code,
|
||
source_kind = EXCLUDED.source_kind,
|
||
status = EXCLUDED.status,
|
||
enabled = EXCLUDED.enabled,
|
||
allowed_input_methods_json = EXCLUDED.allowed_input_methods_json,
|
||
quality_policy_json = EXCLUDED.quality_policy_json,
|
||
plain_note = EXCLUDED.plain_note,
|
||
updated_at = NOW()
|
||
"""), payload)
|
||
else:
|
||
conn.execute(text("""
|
||
INSERT INTO external_market_sources (
|
||
code, display_name, platform_code, source_kind, status, enabled,
|
||
write_enabled, allowed_input_methods_json, quality_policy_json, plain_note
|
||
)
|
||
VALUES (
|
||
:code, :display_name, :platform_code, :source_kind, :status, :enabled,
|
||
:write_enabled, :allowed_input_methods_json, :quality_policy_json, :plain_note
|
||
)
|
||
ON CONFLICT (code) DO UPDATE SET
|
||
display_name = excluded.display_name,
|
||
platform_code = excluded.platform_code,
|
||
source_kind = excluded.source_kind,
|
||
status = excluded.status,
|
||
enabled = excluded.enabled,
|
||
allowed_input_methods_json = excluded.allowed_input_methods_json,
|
||
quality_policy_json = excluded.quality_policy_json,
|
||
plain_note = excluded.plain_note,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
"""), payload)
|
||
|
||
|
||
def _fetch_legacy_momo_reference_rows(conn, limit: int) -> list[dict[str, Any]]:
|
||
if not all(_has_table(conn, table) for table in {"competitor_prices", "products", "price_records"}):
|
||
return []
|
||
|
||
if conn.dialect.name == "postgresql":
|
||
sql = """
|
||
WITH valid_cp AS (
|
||
SELECT DISTINCT ON (cp.sku, cp.competitor_product_id)
|
||
cp.sku AS momo_sku,
|
||
cp.competitor_product_id AS pchome_product_id,
|
||
cp.competitor_product_name AS pchome_product_name,
|
||
cp.price AS pchome_public_price,
|
||
cp.match_score,
|
||
cp.tags::text AS tags,
|
||
cp.crawled_at,
|
||
cp.expires_at
|
||
FROM competitor_prices cp
|
||
WHERE cp.source = 'pchome'
|
||
AND cp.sku IS NOT NULL
|
||
AND cp.competitor_product_id IS NOT NULL
|
||
AND cp.price IS NOT NULL
|
||
AND cp.price > 0
|
||
AND COALESCE(cp.match_score, 0) >= 0.76
|
||
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
|
||
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
|
||
ORDER BY cp.sku, cp.competitor_product_id, cp.crawled_at DESC NULLS LAST
|
||
)
|
||
SELECT
|
||
vc.*,
|
||
lm.momo_name,
|
||
lm.product_url,
|
||
lm.image_url,
|
||
lm.momo_price,
|
||
lm.momo_price_at
|
||
FROM valid_cp vc
|
||
JOIN LATERAL (
|
||
SELECT
|
||
p.name AS momo_name,
|
||
p.url AS product_url,
|
||
p.image_url AS image_url,
|
||
pr.price AS momo_price,
|
||
pr.timestamp AS momo_price_at
|
||
FROM products p
|
||
JOIN price_records pr ON pr.product_id = p.id
|
||
WHERE p.i_code = vc.momo_sku
|
||
AND p.status = 'ACTIVE'
|
||
AND pr.price IS NOT NULL
|
||
AND pr.price > 0
|
||
ORDER BY pr.timestamp DESC, pr.id DESC
|
||
LIMIT 1
|
||
) lm ON TRUE
|
||
ORDER BY COALESCE(lm.momo_price_at, vc.crawled_at) DESC NULLS LAST
|
||
LIMIT :limit
|
||
"""
|
||
else:
|
||
sql = """
|
||
WITH latest_cp AS (
|
||
SELECT
|
||
cp.sku AS momo_sku,
|
||
cp.competitor_product_id AS pchome_product_id,
|
||
cp.competitor_product_name AS pchome_product_name,
|
||
cp.price AS pchome_public_price,
|
||
cp.match_score,
|
||
cp.tags AS tags,
|
||
cp.crawled_at,
|
||
cp.expires_at,
|
||
ROW_NUMBER() OVER (
|
||
PARTITION BY cp.sku, cp.competitor_product_id
|
||
ORDER BY cp.crawled_at DESC
|
||
) AS rn
|
||
FROM competitor_prices cp
|
||
WHERE cp.source = 'pchome'
|
||
AND cp.sku IS NOT NULL
|
||
AND cp.competitor_product_id IS NOT NULL
|
||
AND cp.price IS NOT NULL
|
||
AND cp.price > 0
|
||
AND COALESCE(cp.match_score, 0) >= 0.76
|
||
AND COALESCE(cp.tags, '') LIKE '%identity_v2%'
|
||
),
|
||
latest_momo AS (
|
||
SELECT
|
||
p.i_code AS momo_sku,
|
||
p.name AS momo_name,
|
||
p.url AS product_url,
|
||
p.image_url AS image_url,
|
||
pr.price AS momo_price,
|
||
pr.timestamp AS momo_price_at,
|
||
ROW_NUMBER() OVER (
|
||
PARTITION BY p.i_code
|
||
ORDER BY pr.timestamp DESC, pr.id DESC
|
||
) AS rn
|
||
FROM products p
|
||
JOIN price_records pr ON pr.product_id = p.id
|
||
WHERE p.status = 'ACTIVE'
|
||
AND pr.price IS NOT NULL
|
||
AND pr.price > 0
|
||
)
|
||
SELECT
|
||
cp.momo_sku,
|
||
cp.pchome_product_id,
|
||
cp.pchome_product_name,
|
||
cp.match_score,
|
||
cp.tags,
|
||
cp.crawled_at,
|
||
cp.expires_at,
|
||
lm.momo_name,
|
||
lm.product_url,
|
||
lm.image_url,
|
||
lm.momo_price,
|
||
lm.momo_price_at
|
||
FROM latest_cp cp
|
||
JOIN latest_momo lm ON lm.momo_sku = cp.momo_sku AND lm.rn = 1
|
||
WHERE cp.rn = 1
|
||
ORDER BY COALESCE(lm.momo_price_at, cp.crawled_at) DESC
|
||
LIMIT :limit
|
||
"""
|
||
|
||
return [dict(row) for row in conn.execute(text(sql), {"limit": limit}).mappings().all()]
|
||
|
||
|
||
def _legacy_row_to_external_offer(row: dict[str, Any]) -> dict[str, Any]:
|
||
momo_sku = str(row.get("momo_sku") or "").strip()
|
||
pchome_product_id = str(row.get("pchome_product_id") or "").strip()
|
||
observed_at = row.get("momo_price_at") or row.get("crawled_at")
|
||
title = str(row.get("momo_name") or row.get("pchome_product_name") or momo_sku).strip()
|
||
quality_score = _quality_score_from_match(row.get("match_score"))
|
||
notes = [
|
||
"由已確認同款的舊比價快取自動同步",
|
||
"MOMO 最新價格作為外部參考價",
|
||
]
|
||
return {
|
||
"source_code": "momo_reference",
|
||
"platform_code": "momo",
|
||
"source_product_id": momo_sku,
|
||
"source_offer_key": f"momo_reference:{momo_sku}:{pchome_product_id}",
|
||
"title": title or momo_sku,
|
||
"brand": None,
|
||
"category_text": None,
|
||
"product_url": row.get("product_url"),
|
||
"image_url": row.get("image_url"),
|
||
"price": _to_float(row.get("momo_price")),
|
||
"original_price": None,
|
||
"currency": "TWD",
|
||
"stock_status": None,
|
||
"sold_count": None,
|
||
"rating": None,
|
||
"review_count": None,
|
||
"observed_at": observed_at,
|
||
"expires_at": row.get("expires_at"),
|
||
"ingestion_method": "legacy_competitor_cache",
|
||
"connector_key": "competitor_prices",
|
||
"pchome_product_id": pchome_product_id,
|
||
"momo_sku": momo_sku,
|
||
"match_status": "verified",
|
||
"quality_score": quality_score,
|
||
"data_quality_status": "verified" if quality_score >= 76 else "needs_review",
|
||
"quality_notes_json": json.dumps(notes, ensure_ascii=False),
|
||
"raw_payload_json": json.dumps({
|
||
"legacy_source": "competitor_prices",
|
||
"pchome_public_price": _to_float(row.get("pchome_public_price")),
|
||
"pchome_public_name": row.get("pchome_product_name"),
|
||
"match_score": str(row.get("match_score") or ""),
|
||
"tags": row.get("tags"),
|
||
"crawled_at": str(row.get("crawled_at") or ""),
|
||
"momo_price_at": str(row.get("momo_price_at") or ""),
|
||
}, ensure_ascii=False),
|
||
}
|
||
|
||
|
||
def _upsert_external_offer(conn, payload: dict[str, Any]) -> None:
|
||
if conn.dialect.name == "postgresql":
|
||
sql = """
|
||
INSERT INTO external_offers (
|
||
source_code, platform_code, source_product_id, source_offer_key, title,
|
||
brand, category_text, product_url, image_url, price, original_price,
|
||
currency, stock_status, sold_count, rating, review_count, observed_at,
|
||
expires_at, ingestion_method, connector_key, pchome_product_id, momo_sku,
|
||
match_status, quality_score, data_quality_status, quality_notes_json,
|
||
raw_payload_json
|
||
)
|
||
VALUES (
|
||
:source_code, :platform_code, :source_product_id, :source_offer_key, :title,
|
||
:brand, :category_text, :product_url, :image_url, :price, :original_price,
|
||
:currency, :stock_status, :sold_count, :rating, :review_count, :observed_at,
|
||
:expires_at, :ingestion_method, :connector_key, :pchome_product_id, :momo_sku,
|
||
:match_status, :quality_score, :data_quality_status, :quality_notes_json,
|
||
:raw_payload_json
|
||
)
|
||
ON CONFLICT ON CONSTRAINT uq_external_offer_source_product_observed DO UPDATE SET
|
||
source_offer_key = EXCLUDED.source_offer_key,
|
||
title = EXCLUDED.title,
|
||
product_url = EXCLUDED.product_url,
|
||
image_url = EXCLUDED.image_url,
|
||
price = EXCLUDED.price,
|
||
original_price = EXCLUDED.original_price,
|
||
expires_at = EXCLUDED.expires_at,
|
||
connector_key = EXCLUDED.connector_key,
|
||
pchome_product_id = EXCLUDED.pchome_product_id,
|
||
momo_sku = EXCLUDED.momo_sku,
|
||
match_status = EXCLUDED.match_status,
|
||
quality_score = EXCLUDED.quality_score,
|
||
data_quality_status = EXCLUDED.data_quality_status,
|
||
quality_notes_json = EXCLUDED.quality_notes_json,
|
||
raw_payload_json = EXCLUDED.raw_payload_json,
|
||
updated_at = NOW()
|
||
"""
|
||
else:
|
||
sql = """
|
||
INSERT INTO external_offers (
|
||
source_code, platform_code, source_product_id, source_offer_key, title,
|
||
brand, category_text, product_url, image_url, price, original_price,
|
||
currency, stock_status, sold_count, rating, review_count, observed_at,
|
||
expires_at, ingestion_method, connector_key, pchome_product_id, momo_sku,
|
||
match_status, quality_score, data_quality_status, quality_notes_json,
|
||
raw_payload_json
|
||
)
|
||
VALUES (
|
||
:source_code, :platform_code, :source_product_id, :source_offer_key, :title,
|
||
:brand, :category_text, :product_url, :image_url, :price, :original_price,
|
||
:currency, :stock_status, :sold_count, :rating, :review_count, :observed_at,
|
||
:expires_at, :ingestion_method, :connector_key, :pchome_product_id, :momo_sku,
|
||
:match_status, :quality_score, :data_quality_status, :quality_notes_json,
|
||
:raw_payload_json
|
||
)
|
||
ON CONFLICT (source_code, source_product_id, observed_at, ingestion_method) DO UPDATE SET
|
||
source_offer_key = excluded.source_offer_key,
|
||
title = excluded.title,
|
||
product_url = excluded.product_url,
|
||
image_url = excluded.image_url,
|
||
price = excluded.price,
|
||
original_price = excluded.original_price,
|
||
expires_at = excluded.expires_at,
|
||
connector_key = excluded.connector_key,
|
||
pchome_product_id = excluded.pchome_product_id,
|
||
momo_sku = excluded.momo_sku,
|
||
match_status = excluded.match_status,
|
||
quality_score = excluded.quality_score,
|
||
data_quality_status = excluded.data_quality_status,
|
||
quality_notes_json = excluded.quality_notes_json,
|
||
raw_payload_json = excluded.raw_payload_json,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
"""
|
||
conn.execute(text(sql), payload)
|
||
|
||
|
||
def _targeted_candidate_auto_type(candidate: dict[str, Any]) -> str:
|
||
explicit_type = str(candidate.get("auto_compare_type") or "").strip()
|
||
if explicit_type:
|
||
return explicit_type
|
||
if candidate.get("can_auto_compare"):
|
||
return "total_price"
|
||
return "manual_review"
|
||
|
||
|
||
def _targeted_candidate_needs_review(candidate: dict[str, Any]) -> bool:
|
||
"""總價自動同步前的最後防線,避免高分但款式待確認的候選進作戰清單。"""
|
||
if candidate.get("target_hard_veto") is True:
|
||
return True
|
||
price_basis = str(candidate.get("target_price_basis") or "").strip()
|
||
alert_tier = str(candidate.get("target_alert_tier") or "").strip()
|
||
if price_basis and price_basis != "total_price":
|
||
return True
|
||
if alert_tier and alert_tier != "price_alert_exact":
|
||
return True
|
||
review_reason_markers = {
|
||
"manual_review",
|
||
"identity_review",
|
||
"unit_price_review",
|
||
"variant_selection_review",
|
||
"variant_option_conflict",
|
||
"variant_descriptor_conflict",
|
||
"makeup_catalog_selection_gap",
|
||
"commercial_condition_gap",
|
||
"count_conflict",
|
||
"bundle_offer_conflict",
|
||
"multi_component_conflict",
|
||
"component_count_conflict",
|
||
}
|
||
reasons = {str(reason or "") for reason in (candidate.get("target_match_reasons") or [])}
|
||
return bool(reasons & review_reason_markers)
|
||
|
||
|
||
def _targeted_candidate_sync_rank(candidate: dict[str, Any]) -> tuple[float, float, float, float]:
|
||
"""同一個 PChome 商品有多個候選時,挑最適合營運判斷的一筆。"""
|
||
auto_type = _targeted_candidate_auto_type(candidate)
|
||
type_rank = 3.0 if auto_type == "total_price" else 2.0 if auto_type == "unit_price" else 0.0
|
||
try:
|
||
match_score = float(candidate.get("target_match_score") or 0.0)
|
||
except (TypeError, ValueError):
|
||
match_score = 0.0
|
||
|
||
unit_price_comparison = (
|
||
candidate.get("target_unit_price_comparison")
|
||
if isinstance(candidate.get("target_unit_price_comparison"), dict)
|
||
else {}
|
||
)
|
||
momo_total = _to_float(unit_price_comparison.get("momo_total_quantity"))
|
||
pchome_total = _to_float(unit_price_comparison.get("competitor_total_quantity"))
|
||
quantity_delta = 999999.0
|
||
same_quantity = 0.0
|
||
if momo_total > 0 and pchome_total > 0:
|
||
quantity_delta = abs(momo_total - pchome_total)
|
||
same_quantity = 1.0 if quantity_delta <= 0.0001 else 0.0
|
||
|
||
return (type_rank, same_quantity, -quantity_delta, match_score)
|
||
|
||
|
||
def _targeted_candidate_to_external_offer(
|
||
candidate: dict[str, Any],
|
||
*,
|
||
observed_at: datetime,
|
||
) -> tuple[dict[str, Any] | None, str]:
|
||
auto_type = _targeted_candidate_auto_type(candidate)
|
||
if auto_type not in {"total_price", "unit_price"}:
|
||
return None, "不是可自動使用的候選"
|
||
if auto_type == "total_price" and _targeted_candidate_needs_review(candidate):
|
||
return None, "候選仍需人工確認"
|
||
|
||
momo_sku = str(candidate.get("product_id") or candidate.get("goodsCode") or candidate.get("id") or "").strip()
|
||
pchome_product_id = str(candidate.get("target_pchome_product_id") or "").strip()
|
||
momo_price = _to_float(candidate.get("price"))
|
||
pchome_price = _to_float(candidate.get("target_pchome_price"))
|
||
if not momo_sku:
|
||
return None, "缺少 MOMO 商品 ID"
|
||
if not pchome_product_id:
|
||
return None, "缺少 PChome 商品 ID"
|
||
if not momo_price or momo_price <= 0:
|
||
return None, "缺少 MOMO 售價"
|
||
|
||
unit_price_comparison = (
|
||
candidate.get("target_unit_price_comparison")
|
||
if isinstance(candidate.get("target_unit_price_comparison"), dict)
|
||
else {}
|
||
)
|
||
is_unit_price = auto_type == "unit_price"
|
||
if is_unit_price and not unit_price_comparison.get("comparable"):
|
||
return None, "單位價證據不足"
|
||
|
||
match_score = _quality_score_from_match(candidate.get("target_match_score"))
|
||
if is_unit_price:
|
||
quality_score = max(match_score, 82.0)
|
||
price_basis = "unit_price"
|
||
else:
|
||
quality_score = match_score
|
||
price_basis = "total_price"
|
||
if quality_score < 76:
|
||
return None, "同款分數低於自動同步門檻"
|
||
|
||
title = str(candidate.get("name") or candidate.get("title") or momo_sku).strip()
|
||
notes = [
|
||
"由 PChome 商品自動反查 MOMO 候選同步",
|
||
"自動單位價比較" if is_unit_price else "可直接總價比價",
|
||
]
|
||
raw_payload = {
|
||
"source": "pchome_targeted_momo_search",
|
||
"auto_compare_type": auto_type,
|
||
"price_basis": price_basis,
|
||
"pchome_public_price": pchome_price,
|
||
"pchome_public_name": candidate.get("target_pchome_name"),
|
||
"match_score": candidate.get("target_match_score"),
|
||
"match_reasons": candidate.get("target_match_reasons") or [],
|
||
"comparison_mode": candidate.get("target_comparison_mode"),
|
||
"hard_veto": bool(candidate.get("target_hard_veto")),
|
||
"target_gap_pct": candidate.get("target_gap_pct"),
|
||
"unit_price_comparison": unit_price_comparison,
|
||
"search_term": candidate.get("target_search_term"),
|
||
"tags": [
|
||
"identity_v2",
|
||
"source_targeted_momo_search",
|
||
f"price_basis_{price_basis}",
|
||
f"auto_compare_{auto_type}",
|
||
],
|
||
}
|
||
return {
|
||
"source_code": "momo_reference",
|
||
"platform_code": "momo",
|
||
"source_product_id": momo_sku,
|
||
"source_offer_key": f"momo_reference:{momo_sku}:{pchome_product_id}:{price_basis}",
|
||
"title": title or momo_sku,
|
||
"brand": candidate.get("brand"),
|
||
"category_text": candidate.get("category") or candidate.get("category_text"),
|
||
"product_url": candidate.get("product_url") or candidate.get("url"),
|
||
"image_url": candidate.get("image_url"),
|
||
"price": momo_price,
|
||
"original_price": _to_float(candidate.get("original_price")),
|
||
"currency": "TWD",
|
||
"stock_status": None,
|
||
"sold_count": None,
|
||
"rating": None,
|
||
"review_count": None,
|
||
"observed_at": observed_at,
|
||
"expires_at": None,
|
||
"ingestion_method": "targeted_momo_search",
|
||
"connector_key": "pchome_targeted_momo_search",
|
||
"pchome_product_id": pchome_product_id,
|
||
"momo_sku": momo_sku,
|
||
"match_status": "verified",
|
||
"quality_score": round(quality_score, 2),
|
||
"data_quality_status": "verified",
|
||
"quality_notes_json": json.dumps(notes, ensure_ascii=False),
|
||
"raw_payload_json": json.dumps(raw_payload, ensure_ascii=False),
|
||
}, ""
|
||
|
||
|
||
def _targeted_review_candidate_to_external_offer(
|
||
candidate: dict[str, Any],
|
||
*,
|
||
observed_at: datetime,
|
||
) -> tuple[dict[str, Any] | None, str]:
|
||
"""保存待人工確認候選;這類資料不得進價格判斷。"""
|
||
alert_tier = str(candidate.get("target_alert_tier") or "").strip()
|
||
match_type = str(candidate.get("target_match_type") or "").strip()
|
||
if alert_tier not in {"identity_review", "unit_price_review"} and match_type in {"", "no_match"}:
|
||
return None, "不是可人工確認候選"
|
||
|
||
momo_sku = str(candidate.get("product_id") or candidate.get("goodsCode") or candidate.get("id") or "").strip()
|
||
pchome_product_id = str(candidate.get("target_pchome_product_id") or "").strip()
|
||
momo_price = _to_float(candidate.get("price"))
|
||
if not momo_sku:
|
||
return None, "缺少 MOMO 商品 ID"
|
||
if not pchome_product_id:
|
||
return None, "缺少 PChome 商品 ID"
|
||
if not momo_price or momo_price <= 0:
|
||
return None, "缺少 MOMO 售價"
|
||
|
||
match_score = _quality_score_from_match(candidate.get("target_match_score"))
|
||
if match_score < 60:
|
||
return None, "人工確認候選分數過低"
|
||
|
||
unit_price_comparison = (
|
||
candidate.get("target_unit_price_comparison")
|
||
if isinstance(candidate.get("target_unit_price_comparison"), dict)
|
||
else {}
|
||
)
|
||
title = str(candidate.get("name") or candidate.get("title") or momo_sku).strip()
|
||
price_basis = str(candidate.get("target_price_basis") or "manual_review").strip()
|
||
raw_payload = {
|
||
"source": "pchome_targeted_momo_search",
|
||
"review_state": "needs_review",
|
||
"auto_compare_type": "manual_review",
|
||
"price_basis": price_basis,
|
||
"pchome_public_price": _to_float(candidate.get("target_pchome_price")),
|
||
"pchome_public_name": candidate.get("target_pchome_name"),
|
||
"match_score": candidate.get("target_match_score"),
|
||
"match_reasons": candidate.get("target_match_reasons") or [],
|
||
"comparison_mode": candidate.get("target_comparison_mode"),
|
||
"match_type": match_type,
|
||
"alert_tier": alert_tier,
|
||
"hard_veto": bool(candidate.get("target_hard_veto")),
|
||
"target_gap_pct": candidate.get("target_gap_pct"),
|
||
"unit_price_comparison": unit_price_comparison,
|
||
"search_term": candidate.get("target_search_term"),
|
||
"tags": [
|
||
"identity_v2",
|
||
"source_targeted_momo_search",
|
||
"needs_review",
|
||
f"review_{alert_tier or 'identity_review'}",
|
||
],
|
||
}
|
||
return {
|
||
"source_code": "momo_reference",
|
||
"platform_code": "momo",
|
||
"source_product_id": momo_sku,
|
||
"source_offer_key": f"momo_reference:{momo_sku}:{pchome_product_id}:needs_review",
|
||
"title": title or momo_sku,
|
||
"brand": candidate.get("brand"),
|
||
"category_text": candidate.get("category") or candidate.get("category_text"),
|
||
"product_url": candidate.get("product_url") or candidate.get("url"),
|
||
"image_url": candidate.get("image_url"),
|
||
"price": momo_price,
|
||
"original_price": _to_float(candidate.get("original_price")),
|
||
"currency": "TWD",
|
||
"stock_status": None,
|
||
"sold_count": None,
|
||
"rating": None,
|
||
"review_count": None,
|
||
"observed_at": observed_at,
|
||
"expires_at": None,
|
||
"ingestion_method": "targeted_momo_review",
|
||
"connector_key": "pchome_targeted_momo_search_review",
|
||
"pchome_product_id": pchome_product_id,
|
||
"momo_sku": momo_sku,
|
||
"match_status": "needs_review",
|
||
"quality_score": round(match_score, 2),
|
||
"data_quality_status": "needs_review",
|
||
"quality_notes_json": json.dumps(["候選已找到,需人工確認同款或色號"], ensure_ascii=False),
|
||
"raw_payload_json": json.dumps(raw_payload, ensure_ascii=False),
|
||
}, ""
|
||
|
||
|
||
def sync_targeted_momo_candidates_to_external_offers(
|
||
engine,
|
||
candidates: list[dict[str, Any]],
|
||
*,
|
||
dry_run: bool = False,
|
||
) -> dict[str, Any]:
|
||
"""把頁面自動找到的安全 MOMO 候選同步進 external_offers。
|
||
|
||
只接受 total_price 與 unit_price 自動候選;人工確認候選不寫入。
|
||
"""
|
||
generated_at = datetime.now().isoformat(timespec="seconds")
|
||
candidates = list(candidates or [])
|
||
required_tables = {"external_market_sources", "external_offers"}
|
||
|
||
with engine.begin() as conn:
|
||
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
|
||
if missing_tables:
|
||
return {
|
||
"success": False,
|
||
"status": "skipped",
|
||
"generated_at": generated_at,
|
||
"candidate_count": len(candidates),
|
||
"written_count": 0,
|
||
"dry_run": dry_run,
|
||
"message": "外部報價同步暫時無法執行,缺少必要資料表。",
|
||
"missing_tables": missing_tables,
|
||
}
|
||
|
||
_ensure_external_market_source_seeds(conn)
|
||
base_observed_at = datetime.now()
|
||
ranked_offers: list[tuple[dict[str, Any], tuple[float, float, float, float]]] = []
|
||
skipped_reasons: dict[str, int] = {}
|
||
for index, candidate in enumerate(candidates):
|
||
offer, reason = _targeted_candidate_to_external_offer(
|
||
candidate,
|
||
observed_at=base_observed_at + timedelta(microseconds=index),
|
||
)
|
||
if offer:
|
||
ranked_offers.append((offer, _targeted_candidate_sync_rank(candidate)))
|
||
else:
|
||
skipped_reasons[reason] = skipped_reasons.get(reason, 0) + 1
|
||
selected_by_pchome: dict[str, tuple[dict[str, Any], tuple[float, float, float, float]]] = {}
|
||
for offer, rank in ranked_offers:
|
||
key = str(offer.get("pchome_product_id") or offer.get("source_offer_key") or "").strip()
|
||
existing = selected_by_pchome.get(key)
|
||
if existing is None or rank > existing[1]:
|
||
selected_by_pchome[key] = (offer, rank)
|
||
offers = [offer for offer, _ in selected_by_pchome.values()]
|
||
|
||
if not dry_run:
|
||
for offer in offers:
|
||
_upsert_external_offer(conn, offer)
|
||
if offers:
|
||
mark_pchome_growth_cache_stale()
|
||
|
||
unit_count = sum(
|
||
1
|
||
for offer in offers
|
||
if _load_json_dict(offer.get("raw_payload_json")).get("price_basis") == "unit_price"
|
||
)
|
||
total_count = len(offers) - unit_count
|
||
return {
|
||
"success": True,
|
||
"status": "dry_run" if dry_run else "synced",
|
||
"generated_at": generated_at,
|
||
"candidate_count": len(candidates),
|
||
"written_count": 0 if dry_run else len(offers),
|
||
"dry_run": dry_run,
|
||
"source_code": "momo_reference",
|
||
"total_price_count": total_count,
|
||
"unit_price_count": unit_count,
|
||
"skipped_reasons": skipped_reasons,
|
||
"message": (
|
||
"已把自動比價候選同步到外部價格參考。"
|
||
if not dry_run
|
||
else "已完成自動比價候選同步預檢,尚未寫入資料。"
|
||
),
|
||
}
|
||
|
||
|
||
def sync_targeted_momo_review_candidates_to_external_offers(
|
||
engine,
|
||
candidates: list[dict[str, Any]],
|
||
*,
|
||
dry_run: bool = False,
|
||
) -> dict[str, Any]:
|
||
"""把待確認 MOMO 候選保存起來,供人工審核;不進價格告警。"""
|
||
generated_at = datetime.now().isoformat(timespec="seconds")
|
||
candidates = list(candidates or [])
|
||
required_tables = {"external_market_sources", "external_offers"}
|
||
|
||
with engine.begin() as conn:
|
||
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
|
||
if missing_tables:
|
||
return {
|
||
"success": False,
|
||
"status": "skipped",
|
||
"generated_at": generated_at,
|
||
"candidate_count": len(candidates),
|
||
"written_count": 0,
|
||
"dry_run": dry_run,
|
||
"message": "待確認候選暫時無法保存,缺少必要資料表。",
|
||
"missing_tables": missing_tables,
|
||
}
|
||
|
||
_ensure_external_market_source_seeds(conn)
|
||
base_observed_at = datetime.now()
|
||
ranked_offers: list[tuple[dict[str, Any], tuple[float, float, float, float]]] = []
|
||
skipped_reasons: dict[str, int] = {}
|
||
for index, candidate in enumerate(candidates):
|
||
offer, reason = _targeted_review_candidate_to_external_offer(
|
||
candidate,
|
||
observed_at=base_observed_at + timedelta(microseconds=index),
|
||
)
|
||
if offer:
|
||
ranked_offers.append((offer, _targeted_candidate_sync_rank(candidate)))
|
||
else:
|
||
skipped_reasons[reason] = skipped_reasons.get(reason, 0) + 1
|
||
|
||
selected_by_pchome: dict[str, tuple[dict[str, Any], tuple[float, float, float, float]]] = {}
|
||
for offer, rank in ranked_offers:
|
||
key = str(offer.get("pchome_product_id") or offer.get("source_offer_key") or "").strip()
|
||
existing = selected_by_pchome.get(key)
|
||
if existing is None or rank > existing[1]:
|
||
selected_by_pchome[key] = (offer, rank)
|
||
offers = [offer for offer, _ in selected_by_pchome.values()]
|
||
|
||
if not dry_run:
|
||
for offer in offers:
|
||
_upsert_external_offer(conn, offer)
|
||
if offers:
|
||
mark_pchome_growth_cache_stale()
|
||
|
||
return {
|
||
"success": True,
|
||
"status": "dry_run" if dry_run else "synced",
|
||
"generated_at": generated_at,
|
||
"candidate_count": len(candidates),
|
||
"written_count": 0 if dry_run else len(offers),
|
||
"dry_run": dry_run,
|
||
"source_code": "momo_reference",
|
||
"skipped_reasons": skipped_reasons,
|
||
"message": (
|
||
"已保存待人工確認的 MOMO 候選。"
|
||
if not dry_run
|
||
else "已完成待確認候選預檢,尚未寫入資料。"
|
||
),
|
||
}
|
||
|
||
|
||
def sync_legacy_momo_reference_offers(engine, *, limit: int = 500, dry_run: bool = False) -> dict[str, Any]:
|
||
"""把既有已確認同款的比價快取自動同步到 external_offers。"""
|
||
limit = max(1, min(int(limit or 500), 5000))
|
||
generated_at = datetime.now().isoformat(timespec="seconds")
|
||
required_tables = {"external_market_sources", "external_offers", "competitor_prices", "products", "price_records"}
|
||
|
||
with engine.begin() as conn:
|
||
missing_tables = sorted(table for table in required_tables if not _has_table(conn, table))
|
||
if missing_tables:
|
||
return {
|
||
"success": False,
|
||
"status": "skipped",
|
||
"generated_at": generated_at,
|
||
"candidate_count": 0,
|
||
"written_count": 0,
|
||
"dry_run": dry_run,
|
||
"message": "外部報價同步暫時無法執行,缺少必要資料表。",
|
||
"missing_tables": missing_tables,
|
||
}
|
||
|
||
_ensure_external_market_source_seeds(conn)
|
||
rows = _fetch_legacy_momo_reference_rows(conn, limit)
|
||
offers = [_legacy_row_to_external_offer(row) for row in rows]
|
||
offers = [
|
||
offer for offer in offers
|
||
if offer["source_product_id"] and offer["pchome_product_id"] and offer["price"]
|
||
]
|
||
|
||
if not dry_run:
|
||
for offer in offers:
|
||
_upsert_external_offer(conn, offer)
|
||
if offers:
|
||
mark_pchome_growth_cache_stale()
|
||
|
||
return {
|
||
"success": True,
|
||
"status": "dry_run" if dry_run else "synced",
|
||
"generated_at": generated_at,
|
||
"candidate_count": len(rows),
|
||
"written_count": 0 if dry_run else len(offers),
|
||
"dry_run": dry_run,
|
||
"source_code": "momo_reference",
|
||
"message": (
|
||
"已完成 MOMO 外部價格參考自動同步。"
|
||
if not dry_run
|
||
else "已完成 MOMO 外部價格參考同步預檢,尚未寫入資料。"
|
||
),
|
||
"sample_rows": [
|
||
{
|
||
"momo_sku": offer["momo_sku"],
|
||
"pchome_product_id": offer["pchome_product_id"],
|
||
"price": offer["price"],
|
||
"quality_score": offer["quality_score"],
|
||
}
|
||
for offer in offers[:5]
|
||
],
|
||
}
|
||
|
||
|
||
def _normalize_header(header: str) -> str:
|
||
cleaned = str(header or "").strip().replace("\ufeff", "")
|
||
for canonical, aliases in CSV_HEADER_ALIASES.items():
|
||
if cleaned in aliases:
|
||
return canonical
|
||
return cleaned
|
||
|
||
|
||
def _read_csv_rows(csv_text: str, limit: int) -> tuple[list[dict[str, Any]], list[str]]:
|
||
text_value = (csv_text or "").strip("\ufeff\n\r ")
|
||
if not text_value:
|
||
return [], ["CSV 內容是空的"]
|
||
|
||
sample = text_value[:4096]
|
||
try:
|
||
dialect = csv.Sniffer().sniff(sample, delimiters=",\t;")
|
||
except csv.Error:
|
||
dialect = csv.excel
|
||
|
||
reader = csv.DictReader(io.StringIO(text_value), dialect=dialect)
|
||
if not reader.fieldnames:
|
||
return [], ["找不到表頭列"]
|
||
|
||
raw_headers = [str(header or "").strip().replace("\ufeff", "") for header in reader.fieldnames]
|
||
normalized_headers = [_normalize_header(header) for header in raw_headers]
|
||
if len(set(normalized_headers)) != len(normalized_headers):
|
||
return [], ["表頭有重複欄位,請先合併或重新命名"]
|
||
|
||
rows = []
|
||
for index, raw_row in enumerate(reader, start=2):
|
||
if len(rows) >= limit:
|
||
break
|
||
normalized = {}
|
||
has_value = False
|
||
for raw_header, normalized_header in zip(raw_headers, normalized_headers):
|
||
value = raw_row.get(raw_header)
|
||
if value is not None and str(value).strip():
|
||
has_value = True
|
||
normalized[normalized_header] = str(value or "").strip()
|
||
if has_value:
|
||
normalized["_row_number"] = index
|
||
rows.append(normalized)
|
||
|
||
return rows, []
|
||
|
||
|
||
def _classify_offer_record(record: ExternalOfferPayload | None, errors: list[str]) -> dict[str, Any]:
|
||
if errors or record is None:
|
||
return {
|
||
"status_code": "blocked",
|
||
"status_label": "不能使用",
|
||
"can_enter_alerts": False,
|
||
"reasons": errors or ["資料格式需要修正"],
|
||
}
|
||
|
||
reasons: list[str] = []
|
||
source_code = record.source_code
|
||
match_status = (record.match_status or "").strip().lower()
|
||
is_verified_match = match_status in {"verified", "usable", "reviewed", "exact", "confirmed"}
|
||
has_pchome_id = bool(str(record.pchome_product_id or "").strip())
|
||
has_good_quality = record.quality_score >= 76
|
||
|
||
if source_code not in ALLOWED_SOURCE_CODES:
|
||
reasons.append("資料來源不在允許清單")
|
||
if source_code in PAUSED_SOURCE_CODES:
|
||
reasons.append("這個來源目前先暫停,不進告警")
|
||
if not is_verified_match:
|
||
reasons.append("尚未確認同款")
|
||
if not has_pchome_id:
|
||
reasons.append("缺少 PChome 商品 ID,無法連到業績")
|
||
if not has_good_quality:
|
||
reasons.append("資料可信度低於 76")
|
||
|
||
can_use = (
|
||
source_code in ACTIVE_SOURCE_CODES
|
||
and is_verified_match
|
||
and has_pchome_id
|
||
and has_good_quality
|
||
and not reasons
|
||
)
|
||
if can_use:
|
||
return {
|
||
"status_code": "ready",
|
||
"status_label": "可使用",
|
||
"can_enter_alerts": True,
|
||
"reasons": ["可進作戰清單"],
|
||
}
|
||
|
||
return {
|
||
"status_code": "review",
|
||
"status_label": "需人工確認",
|
||
"can_enter_alerts": False,
|
||
"reasons": reasons or ["需要人工確認"],
|
||
}
|
||
|
||
|
||
def dry_run_external_offer_csv(csv_text: str, *, limit: int = 200) -> dict[str, Any]:
|
||
"""檢查手動 CSV 是否能轉成外部報價格式;只讀,不寫 DB。"""
|
||
limit = max(1, min(int(limit or 200), 1000))
|
||
rows, parse_errors = _read_csv_rows(csv_text, limit=limit)
|
||
if parse_errors:
|
||
return {
|
||
"success": False,
|
||
"message": "CSV 預檢失敗,請先修正檔案格式。",
|
||
"summary": {
|
||
"total_rows": 0,
|
||
"ready_count": 0,
|
||
"review_count": 0,
|
||
"blocked_count": 0,
|
||
},
|
||
"errors": parse_errors,
|
||
"rows": [],
|
||
}
|
||
|
||
checked_rows = []
|
||
summary = {
|
||
"total_rows": len(rows),
|
||
"ready_count": 0,
|
||
"review_count": 0,
|
||
"blocked_count": 0,
|
||
}
|
||
for row in rows:
|
||
record, errors = normalize_external_offer_payload(row)
|
||
classification = _classify_offer_record(record, errors)
|
||
summary[f"{classification['status_code']}_count"] += 1
|
||
preview = record.to_record() if record else {}
|
||
checked_rows.append({
|
||
"row_number": row.get("_row_number"),
|
||
"status_code": classification["status_code"],
|
||
"status_label": classification["status_label"],
|
||
"can_enter_alerts": classification["can_enter_alerts"],
|
||
"reasons": classification["reasons"][:4],
|
||
"source_code": preview.get("source_code") or row.get("source_code") or "",
|
||
"source_product_id": preview.get("source_product_id") or row.get("source_product_id") or "",
|
||
"title": preview.get("title") or row.get("title") or "",
|
||
"price": preview.get("price"),
|
||
"pchome_product_id": preview.get("pchome_product_id") or "",
|
||
"quality_score": preview.get("quality_score") if preview else row.get("quality_score"),
|
||
})
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "CSV 預檢完成,尚未寫入資料。",
|
||
"summary": summary,
|
||
"errors": [],
|
||
"rows": checked_rows,
|
||
"manual_csv": build_connector_contracts()["manual_csv"],
|
||
}
|
||
|
||
|
||
def _legacy_momo_reference_stats(conn) -> dict[str, Any]:
|
||
if not _has_table(conn, "competitor_prices"):
|
||
return {"usable_offer_count": 0, "last_seen_at": None}
|
||
|
||
if conn.dialect.name == "postgresql":
|
||
sql = """
|
||
SELECT COUNT(*) AS usable_offer_count, MAX(crawled_at) AS last_seen_at
|
||
FROM competitor_prices
|
||
WHERE source = 'pchome'
|
||
AND competitor_product_id IS NOT NULL
|
||
AND price IS NOT NULL
|
||
AND price > 0
|
||
AND COALESCE(match_score, 0) >= 0.76
|
||
AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
|
||
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
|
||
"""
|
||
else:
|
||
sql = """
|
||
SELECT COUNT(*) AS usable_offer_count, MAX(crawled_at) AS last_seen_at
|
||
FROM competitor_prices
|
||
WHERE source = 'pchome'
|
||
AND competitor_product_id IS NOT NULL
|
||
AND price IS NOT NULL
|
||
AND price > 0
|
||
AND COALESCE(match_score, 0) >= 0.76
|
||
AND COALESCE(tags, '') LIKE '%identity_v2%'
|
||
"""
|
||
row = conn.execute(text(sql)).mappings().first() or {}
|
||
return {
|
||
"usable_offer_count": int(row.get("usable_offer_count") or 0),
|
||
"last_seen_at": str(row.get("last_seen_at") or "") or None,
|
||
}
|
||
|
||
|
||
def _normalized_offer_stats(conn) -> dict[str, dict[str, Any]]:
|
||
if not all(_has_table(conn, table) for table in {"external_market_sources", "external_offers"}):
|
||
return {}
|
||
|
||
rows = conn.execute(text("""
|
||
SELECT
|
||
s.code,
|
||
s.status,
|
||
s.enabled,
|
||
COUNT(o.id) AS offer_count,
|
||
SUM(CASE
|
||
WHEN o.data_quality_status IN ('verified', 'usable', 'reviewed')
|
||
AND COALESCE(o.quality_score, 0) >= 76
|
||
THEN 1 ELSE 0 END
|
||
) AS usable_offer_count,
|
||
SUM(CASE
|
||
WHEN o.match_status = 'needs_review'
|
||
OR o.data_quality_status = 'needs_review'
|
||
THEN 1 ELSE 0 END
|
||
) AS review_offer_count,
|
||
MAX(o.observed_at) AS last_seen_at
|
||
FROM external_market_sources s
|
||
LEFT JOIN external_offers o ON o.source_code = s.code
|
||
GROUP BY s.code, s.status, s.enabled
|
||
""")).mappings().all()
|
||
|
||
return {
|
||
str(row["code"]): {
|
||
"status": row.get("status"),
|
||
"enabled": bool(row.get("enabled")),
|
||
"offer_count": int(row.get("offer_count") or 0),
|
||
"usable_offer_count": int(row.get("usable_offer_count") or 0),
|
||
"review_offer_count": int(row.get("review_offer_count") or 0),
|
||
"last_seen_at": str(row.get("last_seen_at") or "") or None,
|
||
}
|
||
for row in rows
|
||
}
|
||
|
||
|
||
def build_connector_contracts() -> dict[str, Any]:
|
||
"""回傳 connector 與手動 CSV 共同遵守的欄位規格。"""
|
||
return {
|
||
"success": True,
|
||
"version": "2026-06-15",
|
||
"plain_summary": "所有外部市場資料都先轉成同一份商品報價格式,再進作戰清單。",
|
||
"sources": SOURCE_CONTRACTS,
|
||
"normalized_offer_fields": NORMALIZED_OFFER_FIELDS,
|
||
"manual_csv": {
|
||
"encoding": "utf-8-sig",
|
||
"required_headers": [
|
||
field["name"] for field in NORMALIZED_OFFER_FIELDS if field["required"]
|
||
],
|
||
"optional_headers": [
|
||
field["name"] for field in NORMALIZED_OFFER_FIELDS if not field["required"]
|
||
],
|
||
"plain_rule": "低可信度或未確認同款的資料只進待補資料清單,不自動發告警。",
|
||
},
|
||
}
|
||
|
||
|
||
def build_external_source_readiness(engine=None) -> dict[str, Any]:
|
||
"""建立畫面可用的外部資料來源狀態。"""
|
||
sources = [dict(source) for source in SOURCE_CONTRACTS]
|
||
normalized_stats: dict[str, dict[str, Any]] = {}
|
||
legacy_stats: dict[str, Any] = {"usable_offer_count": 0, "last_seen_at": None}
|
||
schema_ready = False
|
||
|
||
if engine is not None:
|
||
try:
|
||
with engine.connect() as conn:
|
||
schema_ready = all(
|
||
_has_table(conn, table)
|
||
for table in {"external_market_sources", "external_offers"}
|
||
)
|
||
normalized_stats = _normalized_offer_stats(conn)
|
||
legacy_stats = _legacy_momo_reference_stats(conn)
|
||
except Exception:
|
||
logger.warning("[ExternalMarket] source readiness failed", exc_info=True)
|
||
|
||
for source in sources:
|
||
stats = normalized_stats.get(source["code"], {})
|
||
if source["code"] == "momo_reference":
|
||
usable = max(
|
||
int(stats.get("usable_offer_count") or 0),
|
||
int(legacy_stats.get("usable_offer_count") or 0),
|
||
)
|
||
last_seen_at = stats.get("last_seen_at") or legacy_stats.get("last_seen_at")
|
||
else:
|
||
usable = int(stats.get("usable_offer_count") or 0)
|
||
last_seen_at = stats.get("last_seen_at")
|
||
|
||
source["schema_ready"] = schema_ready
|
||
source["usable_offer_count"] = usable
|
||
source["review_offer_count"] = int(stats.get("review_offer_count") or 0)
|
||
source["last_seen_at"] = last_seen_at
|
||
source["can_alert"] = source["status_code"] == "active" and usable > 0
|
||
if source["status_code"] == "active":
|
||
if usable:
|
||
source["plain_state"] = "已接入,可進作戰清單"
|
||
elif source["review_offer_count"]:
|
||
source["plain_state"] = "已有待確認候選"
|
||
else:
|
||
source["plain_state"] = "已接入,等待可用資料"
|
||
else:
|
||
source["plain_state"] = "先保留接口,不進告警"
|
||
|
||
active_count = sum(1 for source in sources if source["status_code"] == "active")
|
||
paused_count = sum(1 for source in sources if source["status_code"] == "paused")
|
||
usable_count = sum(int(source["usable_offer_count"]) for source in sources)
|
||
review_offer_count = sum(int(source.get("review_offer_count") or 0) for source in sources)
|
||
|
||
return {
|
||
"success": True,
|
||
"schema_ready": schema_ready,
|
||
"active_count": active_count,
|
||
"paused_count": paused_count,
|
||
"usable_offer_count": usable_count,
|
||
"review_offer_count": review_offer_count,
|
||
"sources": sources,
|
||
"connector_contract": build_connector_contracts(),
|
||
"plain_summary": "MOMO 先用;蝦皮與酷澎先保留接口,暫不進告警。",
|
||
}
|