V10.608 新增外部報價 CSV 預檢
All checks were successful
CD Pipeline / deploy (push) Successful in 1m7s

This commit is contained in:
OoO
2026-06-15 20:39:32 +08:00
parent 9260cc1740
commit df6714c3f7
9 changed files with 527 additions and 5 deletions

View File

@@ -9,6 +9,8 @@ from __future__ import annotations
import json
import logging
import csv
import io
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
@@ -105,6 +107,35 @@ NORMALIZED_OFFER_FIELDS = [
},
]
CSV_HEADER_ALIASES = {
"source_code": {"source_code", "資料來源", "來源", "平台來源"},
"platform_code": {"platform_code", "平台", "平台代碼"},
"source_product_id": {"source_product_id", "外部商品 ID", "外部商品ID", "商品ID", "商品編號"},
"title": {"title", "商品名稱", "品名", "name"},
"price": {"price", "售價", "價格", "成交價"},
"observed_at": {"observed_at", "資料時間", "抓取時間", "看到時間", "時間"},
"ingestion_method": {"ingestion_method", "取得方式", "匯入方式", "來源方式"},
"currency": {"currency", "幣別"},
"original_price": {"original_price", "原價", "牌價"},
"product_url": {"product_url", "商品網址", "網址", "url"},
"brand": {"brand", "品牌"},
"category_text": {"category_text", "分類", "類別"},
"pchome_product_id": {"pchome_product_id", "PChome 商品 ID", "PChome商品ID", "pchome_id"},
"momo_sku": {"momo_sku", "MOMO SKU", "momo_sku", "momo_i_code"},
"match_status": {"match_status", "同款狀態", "比對狀態"},
"quality_score": {"quality_score", "資料可信度", "可信度", "品質分數"},
"data_quality_status": {"data_quality_status", "資料狀態", "品質狀態"},
"quality_note": {"quality_note", "備註", "品質備註"},
}
ALLOWED_SOURCE_CODES = {source["code"] for source in SOURCE_CONTRACTS}
PAUSED_SOURCE_CODES = {
source["code"] for source in SOURCE_CONTRACTS if source["status_code"] == "paused"
}
ACTIVE_SOURCE_CODES = {
source["code"] for source in SOURCE_CONTRACTS if source["status_code"] == "active"
}
@dataclass(frozen=True)
class ExternalOfferPayload:
@@ -245,6 +276,156 @@ def normalize_external_offer_payload(payload: dict[str, Any]) -> tuple[ExternalO
return record, []
def _normalize_header(header: str) -> str:
cleaned = str(header or "").strip().replace("\ufeff", "")
for canonical, aliases in CSV_HEADER_ALIASES.items():
if cleaned in aliases:
return canonical
return cleaned
def _read_csv_rows(csv_text: str, limit: int) -> tuple[list[dict[str, Any]], list[str]]:
text_value = (csv_text or "").strip("\ufeff\n\r ")
if not text_value:
return [], ["CSV 內容是空的"]
sample = text_value[:4096]
try:
dialect = csv.Sniffer().sniff(sample, delimiters=",\t;")
except csv.Error:
dialect = csv.excel
reader = csv.DictReader(io.StringIO(text_value), dialect=dialect)
if not reader.fieldnames:
return [], ["找不到表頭列"]
raw_headers = [str(header or "").strip().replace("\ufeff", "") for header in reader.fieldnames]
normalized_headers = [_normalize_header(header) for header in raw_headers]
if len(set(normalized_headers)) != len(normalized_headers):
return [], ["表頭有重複欄位,請先合併或重新命名"]
rows = []
for index, raw_row in enumerate(reader, start=2):
if len(rows) >= limit:
break
normalized = {}
has_value = False
for raw_header, normalized_header in zip(raw_headers, normalized_headers):
value = raw_row.get(raw_header)
if value is not None and str(value).strip():
has_value = True
normalized[normalized_header] = str(value or "").strip()
if has_value:
normalized["_row_number"] = index
rows.append(normalized)
return rows, []
def _classify_offer_record(record: ExternalOfferPayload | None, errors: list[str]) -> dict[str, Any]:
if errors or record is None:
return {
"status_code": "blocked",
"status_label": "不能使用",
"can_enter_alerts": False,
"reasons": errors or ["資料格式需要修正"],
}
reasons: list[str] = []
source_code = record.source_code
match_status = (record.match_status or "").strip().lower()
is_verified_match = match_status in {"verified", "usable", "reviewed", "exact", "confirmed"}
has_pchome_id = bool(str(record.pchome_product_id or "").strip())
has_good_quality = record.quality_score >= 76
if source_code not in ALLOWED_SOURCE_CODES:
reasons.append("資料來源不在允許清單")
if source_code in PAUSED_SOURCE_CODES:
reasons.append("這個來源目前先暫停,不進告警")
if not is_verified_match:
reasons.append("尚未確認同款")
if not has_pchome_id:
reasons.append("缺少 PChome 商品 ID無法連到業績")
if not has_good_quality:
reasons.append("資料可信度低於 76")
can_use = (
source_code in ACTIVE_SOURCE_CODES
and is_verified_match
and has_pchome_id
and has_good_quality
and not reasons
)
if can_use:
return {
"status_code": "ready",
"status_label": "可使用",
"can_enter_alerts": True,
"reasons": ["可進作戰清單"],
}
return {
"status_code": "review",
"status_label": "需人工確認",
"can_enter_alerts": False,
"reasons": reasons or ["需要人工確認"],
}
def dry_run_external_offer_csv(csv_text: str, *, limit: int = 200) -> dict[str, Any]:
"""檢查手動 CSV 是否能轉成外部報價格式;只讀,不寫 DB。"""
limit = max(1, min(int(limit or 200), 1000))
rows, parse_errors = _read_csv_rows(csv_text, limit=limit)
if parse_errors:
return {
"success": False,
"message": "CSV 預檢失敗,請先修正檔案格式。",
"summary": {
"total_rows": 0,
"ready_count": 0,
"review_count": 0,
"blocked_count": 0,
},
"errors": parse_errors,
"rows": [],
}
checked_rows = []
summary = {
"total_rows": len(rows),
"ready_count": 0,
"review_count": 0,
"blocked_count": 0,
}
for row in rows:
record, errors = normalize_external_offer_payload(row)
classification = _classify_offer_record(record, errors)
summary[f"{classification['status_code']}_count"] += 1
preview = record.to_record() if record else {}
checked_rows.append({
"row_number": row.get("_row_number"),
"status_code": classification["status_code"],
"status_label": classification["status_label"],
"can_enter_alerts": classification["can_enter_alerts"],
"reasons": classification["reasons"][:4],
"source_code": preview.get("source_code") or row.get("source_code") or "",
"source_product_id": preview.get("source_product_id") or row.get("source_product_id") or "",
"title": preview.get("title") or row.get("title") or "",
"price": preview.get("price"),
"pchome_product_id": preview.get("pchome_product_id") or "",
"quality_score": preview.get("quality_score") if preview else row.get("quality_score"),
})
return {
"success": True,
"message": "CSV 預檢完成,尚未寫入資料。",
"summary": summary,
"errors": [],
"rows": checked_rows,
"manual_csv": build_connector_contracts()["manual_csv"],
}
def _legacy_momo_reference_stats(conn) -> dict[str, Any]:
if not _has_table(conn, "competitor_prices"):
return {"usable_offer_count": 0, "last_seen_at": None}