Files
ewoooc/services/competitor_price_feeder.py
OoO 75390f8495
All checks were successful
CD Pipeline / deploy (push) Successful in 1m19s
收緊 PChome 同款比對門檻
2026-05-19 15:53:09 +08:00

905 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
競品價格補給線 Worker (Competitor Price Feeder)
角色:獨立背景 Worker生產者端
架構位置:
[本 Worker — 每 4 小時跑一次] → competitor_prices DB 表(最新快取)
→ competitor_price_history DB 表(歷史快照)
[AI Pipeline] → fetch_candidates() LEFT JOIN competitor_prices消費者端
設計原則:
- 與 AI Pipeline 完全解耦:本 Worker 掛了不影響核心大腦
- 自帶重試機制,不阻塞主排程
- 語意化標籤 (tags) 讓 Hermes 獲得更豐富的情境
爬取邏輯:
MOMO 商品名稱 → PChome 關鍵字搜尋 → 模糊比對最佳匹配 → 寫入 competitor_prices + competitor_price_history
依賴:
services/pchome_crawler.py — 搜尋 + 批量 API
services/price_comparison.py — ProductNameParser + 模糊比對
"""
import json
import logging
import re
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Optional
logger = logging.getLogger(__name__)
# ── 比對參數 ─────────────────────────────────────────
MIN_MATCH_SCORE = 0.76 # 低於此分數不寫入;核心比價寧可待審也不能錯配
REPLACE_DIFFERENT_PRODUCT_SCORE = 0.84 # 已有不同 PChome 商品時,需超高信心才覆蓋
SEARCH_LIMIT = 12 # 每個搜尋詞取 PChome 前 N 筆
MAX_SEARCH_TERMS = 3 # 每個 MOMO 商品最多嘗試幾組搜尋詞
BATCH_SIZE = 30 # 每批 DB 寫入筆數
RATE_DELAY = 0.8 # 每次 PChome 請求間隔(秒)
TTL_HOURS = 6 # competitor_prices 快取有效期
# ── Feeder 結果 ───────────────────────────────────────
@dataclass
class FeederResult:
total_skus: int
matched: int
skipped_no_result: int
skipped_low_score: int
errors: int
duration_sec: float
history_written: int = 0
attempts_written: int = 0
def _extract_tags(pchome_product) -> list:
"""
從 PChomeProduct 物件提取語意標籤
標籤設計:
- "on_sale" — is_on_sale = True
- "discount_10pct" — 折扣 10~19%
- "discount_20pct" — 折扣 20~29%
- "discount_30pct" — 折扣 ≥ 30%
- "low_stock" — 庫存 < 10
- "high_rating" — 評分 ≥ 4.5
"""
tags = []
if pchome_product.is_on_sale:
tags.append("on_sale")
try:
disc = int(pchome_product.discount or 0)
except (ValueError, TypeError):
disc = 0
if disc >= 30:
tags.append("discount_30pct")
elif disc >= 20:
tags.append("discount_20pct")
elif disc >= 10:
tags.append("discount_10pct")
try:
stock = int(pchome_product.stock) if pchome_product.stock is not None else None
if stock is not None and 0 < stock < 10:
tags.append("low_stock")
except (ValueError, TypeError):
pass
try:
if pchome_product.rating and float(pchome_product.rating) >= 4.5:
tags.append("high_rating")
except (ValueError, TypeError):
pass
return tags
def _clean_search_text(value: str) -> str:
value = re.sub(r'[(][^)]*[)]', ' ', value or '')
value = re.sub(r'[【\[].*?[】\]]', ' ', value)
value = re.sub(r'[^\w\u4e00-\u9fff]+', ' ', value)
return re.sub(r'\s+', ' ', value).strip()
def _dedupe_terms(terms: list) -> list:
result = []
seen = set()
for term in terms:
cleaned = _clean_search_text(term)
if len(cleaned) < 2:
continue
key = cleaned.lower()
if key in seen:
continue
seen.add(key)
result.append(cleaned[:36])
if len(result) >= MAX_SEARCH_TERMS:
break
return result
def _build_search_keywords(momo_name: str) -> list:
"""
用多組商品身份線索搜尋 PChome提高命中率但仍交給身份比對門檻把關。
"""
try:
from services.marketplace_product_matcher import build_search_terms
terms = build_search_terms(momo_name, max_terms=MAX_SEARCH_TERMS)
except Exception:
logger.debug(
"[Feeder] marketplace matcher failed while building search keywords; "
"fallback to cleaned product name",
exc_info=True,
)
cleaned = _clean_search_text(momo_name)
terms = [cleaned[:36], cleaned[:24]]
return _dedupe_terms(terms)
def _format_match_diagnostics(diagnostics) -> str:
if not diagnostics:
return ""
reasons = ",".join(getattr(diagnostics, "reasons", ()) or ())
return (
f"score={diagnostics.score}; brand={diagnostics.brand_score}; "
f"token={diagnostics.token_score}; spec={diagnostics.spec_score}; "
f"seq={diagnostics.sequence_score}; type={diagnostics.type_score}; "
f"penalty={diagnostics.price_penalty}; veto={diagnostics.hard_veto}; "
f"reasons={reasons}"
)
def _find_best_match_detail(
momo_name: str,
pchome_products: list,
momo_price: float = None,
) -> Optional[tuple]:
"""
從 PChome 搜尋結果中找出與 MOMO 商品名稱最接近的一筆
Args:
momo_name: MOMO 商品名稱
pchome_products: PChomeProduct 列表
Returns:
(PChomeProduct, score, diagnostics) or None
"""
from services.marketplace_product_matcher import score_marketplace_match
best, best_score, best_diagnostics = None, 0.0, None
for p in pchome_products:
diagnostics = score_marketplace_match(
momo_name,
p.name,
momo_price=momo_price,
competitor_price=getattr(p, "price", None),
)
score = diagnostics.score
if score > best_score:
best, best_score, best_diagnostics = p, score, diagnostics
return (best, best_score, best_diagnostics) if best else None
def _find_best_match(momo_name: str, pchome_products: list) -> Optional[tuple]:
"""Backward-compatible helper for smoke scripts."""
result = _find_best_match_detail(momo_name, pchome_products)
if not result:
return None
best, score, _diagnostics = result
return best, score
def _search_pchome_candidates(crawler, momo_name: str, keywords: list = None, momo_price: float = None) -> list:
"""以多組搜尋詞擴大 PChome 候選池,找到可信候選後提早停止。"""
candidates = []
seen_ids = set()
for keyword in keywords or _build_search_keywords(momo_name):
ok, _, products = crawler.search_products(keyword, limit=SEARCH_LIMIT)
if not ok or not products:
continue
for product in products:
if product.product_id in seen_ids:
continue
seen_ids.add(product.product_id)
candidates.append(product)
best = _find_best_match_detail(momo_name, candidates, momo_price=momo_price)
if best and best[1] >= 0.76:
break
return candidates
def _structural_similarity(momo_p, pchome_p) -> float:
"""
結構化相似度計算(品牌 + 規格 + 關鍵字)
權重:品牌匹配 0.4 + 規格匹配 0.3 + 關鍵字相似 0.3
"""
from difflib import SequenceMatcher
score = 0.0
# 品牌比對 (0.4)
if momo_p.brand and pchome_p.brand:
if momo_p.brand == pchome_p.brand:
score += 0.4
elif momo_p.brand in pchome_p.brand or pchome_p.brand in momo_p.brand:
score += 0.2
elif not momo_p.brand and not pchome_p.brand:
score += 0.1 # 都沒有品牌,不扣分
# 規格比對 (0.3) — 容量/克重
momo_specs = momo_p.specs or {}
pchome_specs = pchome_p.specs or {}
if momo_specs and pchome_specs:
matching_specs = sum(
1 for k, v in momo_specs.items()
if pchome_specs.get(k) == v
)
total_specs = max(len(momo_specs), len(pchome_specs), 1)
score += 0.3 * (matching_specs / total_specs)
elif not momo_specs and not pchome_specs:
score += 0.15
# 關鍵字相似度 (0.3)
momo_kws = " ".join(momo_p.keywords or [])
pchome_kws = " ".join(pchome_p.keywords or [])
if momo_kws and pchome_kws:
kw_sim = SequenceMatcher(None, momo_kws.lower(), pchome_kws.lower()).ratio()
score += 0.3 * kw_sim
return round(score, 3)
class CompetitorPriceFeeder:
"""
競品價格補給線 Worker
用法:
feeder = CompetitorPriceFeeder(engine=db_engine)
result = feeder.run(source="pchome")
"""
def __init__(self, engine=None):
self.engine = engine
self._history_table_ready = False
self._attempt_table_ready = False
def _ensure_competitor_price_history_table(self, conn):
"""確保競品價格歷史表存在;排程可自癒補表,不依賴手動 migration。"""
if self._history_table_ready:
return
from sqlalchemy import text
if conn.dialect.name == "postgresql":
conn.execute(text("""
CREATE TABLE IF NOT EXISTS competitor_price_history (
id BIGSERIAL PRIMARY KEY,
sku VARCHAR(50) NOT NULL,
source VARCHAR(30) NOT NULL DEFAULT 'pchome',
momo_product_id INTEGER,
momo_price NUMERIC(10,2),
price NUMERIC(10,2) NOT NULL,
original_price NUMERIC(10,2),
discount_pct INTEGER,
competitor_product_id VARCHAR(100),
competitor_product_name TEXT,
match_score NUMERIC(4,3),
tags JSONB DEFAULT '[]'::jsonb,
crawled_at TIMESTAMP NOT NULL DEFAULT NOW()
)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_price_history_sku_source_time
ON competitor_price_history (sku, source, crawled_at DESC)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_price_history_competitor_id
ON competitor_price_history (competitor_product_id)
"""))
else:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS competitor_price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sku VARCHAR(50) NOT NULL,
source VARCHAR(30) NOT NULL DEFAULT 'pchome',
momo_product_id INTEGER,
momo_price NUMERIC(10,2),
price NUMERIC(10,2) NOT NULL,
original_price NUMERIC(10,2),
discount_pct INTEGER,
competitor_product_id VARCHAR(100),
competitor_product_name TEXT,
match_score NUMERIC(4,3),
tags TEXT DEFAULT '[]',
crawled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_price_history_sku_source_time
ON competitor_price_history (sku, source, crawled_at DESC)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_price_history_competitor_id
ON competitor_price_history (competitor_product_id)
"""))
self._history_table_ready = True
def _ensure_competitor_match_attempts_table(self, conn):
"""確保 PChome 比對嘗試表存在;成功、低分、無結果與錯誤都要留痕。"""
if self._attempt_table_ready:
return
from sqlalchemy import text
if conn.dialect.name == "postgresql":
conn.execute(text("""
CREATE TABLE IF NOT EXISTS competitor_match_attempts (
id BIGSERIAL PRIMARY KEY,
sku VARCHAR(50) NOT NULL,
source VARCHAR(30) NOT NULL DEFAULT 'pchome',
momo_product_id INTEGER,
momo_product_name TEXT,
momo_price NUMERIC(10,2),
search_terms JSONB DEFAULT '[]'::jsonb,
candidate_count INTEGER DEFAULT 0,
attempt_status VARCHAR(30) NOT NULL,
best_competitor_product_id VARCHAR(100),
best_competitor_product_name TEXT,
best_competitor_price NUMERIC(10,2),
best_match_score NUMERIC(4,3),
error_message TEXT,
attempted_at TIMESTAMP NOT NULL DEFAULT NOW()
)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_match_attempts_sku_source_time
ON competitor_match_attempts (sku, source, attempted_at DESC)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_match_attempts_status_time
ON competitor_match_attempts (attempt_status, attempted_at DESC)
"""))
else:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS competitor_match_attempts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sku VARCHAR(50) NOT NULL,
source VARCHAR(30) NOT NULL DEFAULT 'pchome',
momo_product_id INTEGER,
momo_product_name TEXT,
momo_price NUMERIC(10,2),
search_terms TEXT DEFAULT '[]',
candidate_count INTEGER DEFAULT 0,
attempt_status VARCHAR(30) NOT NULL,
best_competitor_product_id VARCHAR(100),
best_competitor_product_name TEXT,
best_competitor_price NUMERIC(10,2),
best_match_score NUMERIC(4,3),
error_message TEXT,
attempted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_match_attempts_sku_source_time
ON competitor_match_attempts (sku, source, attempted_at DESC)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_match_attempts_status_time
ON competitor_match_attempts (attempt_status, attempted_at DESC)
"""))
self._attempt_table_ready = True
def _record_match_attempt(
self,
sku: str,
momo_name: str,
momo_product_id: int = None,
momo_price: float = None,
search_terms: list = None,
candidate_count: int = 0,
attempt_status: str = "unknown",
best_product=None,
best_score: float = None,
error_message: str = None,
source: str = "pchome",
) -> None:
"""追加一筆 PChome 比對嘗試紀錄,讓待比對/低信心也能回溯。"""
from sqlalchemy import text
with self.engine.begin() as conn:
self._ensure_competitor_match_attempts_table(conn)
search_terms_expr = "CAST(:search_terms AS jsonb)" if conn.dialect.name == "postgresql" else ":search_terms"
conn.execute(text(f"""
INSERT INTO competitor_match_attempts
(sku, source, momo_product_id, momo_product_name, momo_price,
search_terms, candidate_count, attempt_status,
best_competitor_product_id, best_competitor_product_name,
best_competitor_price, best_match_score, error_message,
attempted_at)
VALUES
(:sku, :source, :momo_product_id, :momo_product_name, :momo_price,
{search_terms_expr}, :candidate_count, :attempt_status,
:best_id, :best_name,
:best_price, :best_score, :error_message,
CURRENT_TIMESTAMP)
"""), {
"sku": sku,
"source": source,
"momo_product_id": momo_product_id,
"momo_product_name": momo_name,
"momo_price": momo_price,
"search_terms": json.dumps(search_terms or [], ensure_ascii=False),
"candidate_count": candidate_count,
"attempt_status": attempt_status,
"best_id": getattr(best_product, "product_id", None),
"best_name": (getattr(best_product, "name", None) or "")[:300] or None,
"best_price": getattr(best_product, "price", None),
"best_score": best_score,
"error_message": (error_message or "")[:1000] or None,
})
def _fetch_active_skus(self) -> list:
"""
從 products 表取得待監控的 ACTIVE 商品清單
Returns:
list of {"sku": str, "name": str, "category": str}
"""
if self.engine is None:
raise RuntimeError("需要注入 SQLAlchemy engine")
from sqlalchemy import text
sql = text("""
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.category,
(
SELECT pr.price
FROM price_records pr
WHERE pr.product_id = p.id
ORDER BY pr.timestamp DESC
LIMIT 1
) AS momo_price
FROM products p
WHERE p.status = 'ACTIVE'
AND EXISTS (
SELECT 1
FROM price_records pr
WHERE pr.product_id = p.id
)
ORDER BY p.i_code
""")
with self.engine.connect() as conn:
rows = conn.execute(sql).fetchall()
return [dict(r._mapping) for r in rows]
def _fetch_unmatched_priority_skus(self, limit: int = 80) -> list:
"""
取得目前沒有有效 PChome 配對的高價 ACTIVE 商品,供補強流程優先處理。
"""
if self.engine is None:
raise RuntimeError("需要注入 SQLAlchemy engine")
from sqlalchemy import text
sql = text("""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
)
SELECT
lm.product_id,
lm.sku,
lm.name,
lm.category,
lm.momo_price
FROM latest_momo lm
LEFT JOIN competitor_prices cp
ON cp.sku = lm.sku
AND cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND COALESCE(cp.match_score, 0) >= :match_score_floor
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
WHERE lm.rn = 1
AND cp.sku IS NULL
ORDER BY lm.momo_price DESC NULLS LAST, lm.sku
LIMIT :limit
""")
with self.engine.connect() as conn:
rows = conn.execute(
sql,
{"limit": max(1, min(int(limit), 300)), "match_score_floor": MIN_MATCH_SCORE},
).fetchall()
return [dict(r._mapping) for r in rows]
def _upsert_competitor_price(
self,
sku: str,
product, # PChomeProduct
match_score: float,
tags: list,
momo_product_id: int = None,
momo_price: float = None,
source: str = "pchome",
):
"""單筆寫入/更新最新快取,並追加一筆歷史快照。"""
from sqlalchemy import text
_taipei = timezone(timedelta(hours=8))
expires_at = (datetime.now(_taipei) + timedelta(hours=TTL_HOURS)).strftime("%Y-%m-%d %H:%M:%S")
tags_json = json.dumps(tags, ensure_ascii=False)
with self.engine.begin() as conn:
self._ensure_competitor_price_history_table(conn)
conn.execute(text("""
INSERT INTO competitor_prices
(sku, source, price, original_price, discount_pct,
competitor_product_id, competitor_product_name,
match_score, tags, crawled_at, expires_at)
VALUES
(:sku, :source, :price, :original_price, :discount_pct,
:comp_id, :comp_name,
:match_score, :tags, CURRENT_TIMESTAMP, :expires_at)
ON CONFLICT (sku, source) DO UPDATE
SET price = EXCLUDED.price,
original_price = EXCLUDED.original_price,
discount_pct = EXCLUDED.discount_pct,
competitor_product_id = EXCLUDED.competitor_product_id,
competitor_product_name = EXCLUDED.competitor_product_name,
match_score = EXCLUDED.match_score,
tags = EXCLUDED.tags,
crawled_at = CURRENT_TIMESTAMP,
expires_at = :expires_at
"""), {
"sku": sku,
"source": source,
"price": product.price,
"original_price":product.original_price,
"discount_pct": product.discount,
"comp_id": product.product_id,
"comp_name": product.name[:200],
"match_score": match_score,
"tags": tags_json,
"expires_at": expires_at,
})
conn.execute(text("""
INSERT INTO competitor_price_history
(sku, source, momo_product_id, momo_price,
price, original_price, discount_pct,
competitor_product_id, competitor_product_name,
match_score, tags, crawled_at)
VALUES
(:sku, :source, :momo_product_id, :momo_price,
:price, :original_price, :discount_pct,
:comp_id, :comp_name,
:match_score, :tags, CURRENT_TIMESTAMP)
"""), {
"sku": sku,
"source": source,
"momo_product_id": momo_product_id,
"momo_price": momo_price,
"price": product.price,
"original_price": product.original_price,
"discount_pct": product.discount,
"comp_id": product.product_id,
"comp_name": product.name[:200],
"match_score": match_score,
"tags": tags_json,
})
def _should_upsert_competitor_price(
self,
sku: str,
product,
match_score: float,
source: str = "pchome",
) -> tuple[bool, str]:
"""
保護正式 competitor_prices若既有配對是不同 PChome 商品,
只有超高信心才允許覆蓋,避免新 matcher 一次污染核心比價資料。
"""
from sqlalchemy import text
with self.engine.connect() as conn:
row = conn.execute(text("""
SELECT competitor_product_id, match_score, tags
FROM competitor_prices
WHERE sku = :sku
AND source = :source
LIMIT 1
"""), {"sku": sku, "source": source}).mappings().first()
if not row:
return True, "new_match"
existing_id = str(row.get("competitor_product_id") or "")
incoming_id = str(getattr(product, "product_id", "") or "")
try:
existing_score = float(row.get("match_score") or 0)
except (TypeError, ValueError):
existing_score = 0.0
existing_tags = row.get("tags") or []
if isinstance(existing_tags, str):
try:
existing_tags = json.loads(existing_tags)
except Exception:
existing_tags = []
if "identity_v2" not in existing_tags:
return True, "replace_legacy_unverified"
if not existing_id or existing_id == incoming_id:
return True, "same_or_empty_existing"
if existing_score < MIN_MATCH_SCORE:
return True, f"replace_low_existing_score={existing_score:.3f}"
if match_score >= REPLACE_DIFFERENT_PRODUCT_SCORE:
return True, f"replace_high_confidence_score={match_score:.3f}"
return (
False,
f"existing_match_conflict;existing_id={existing_id};"
f"incoming_id={incoming_id};existing_score={existing_score:.3f};"
f"incoming_score={match_score:.3f}",
)
def _run_sku_items(self, skus: list, source: str = "pchome", label: str = "PChome 競品價格") -> FeederResult:
start = time.time()
if source != "pchome":
logger.warning(f"[Feeder] 尚未支援 source={source},跳過")
return FeederResult(0, 0, 0, 0, 0, 0.0)
from services.pchome_crawler import PChomeCrawler
crawler = PChomeCrawler(timeout=30, delay=RATE_DELAY)
logger.info(f"[Feeder] 開始抓取 {len(skus)} 支商品的 {label}")
matched = 0
skipped_no = 0
skipped_low = 0
errors = 0
history_written = 0
attempts_written = 0
for item in skus:
sku = item["sku"]
momo_name = item["name"]
momo_product_id = item.get("product_id")
momo_price = item.get("momo_price")
search_terms = _build_search_keywords(momo_name)
try:
products = _search_pchome_candidates(crawler, momo_name, search_terms, momo_price=momo_price)
if not products:
logger.debug(f"[Feeder] {sku} 無搜尋結果,跳過")
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=0,
attempt_status="no_result",
source=source,
)
attempts_written += 1
skipped_no += 1
continue
result = _find_best_match_detail(momo_name, products, momo_price=momo_price)
if not result:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=len(products),
attempt_status="no_match",
source=source,
)
attempts_written += 1
skipped_no += 1
continue
best_product, score, diagnostics = result
if score < MIN_MATCH_SCORE:
logger.debug(
f"[Feeder] {sku} 比對分數過低 ({score:.3f} < {MIN_MATCH_SCORE})"
f"{_format_match_diagnostics(diagnostics)}"
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=len(products),
attempt_status="low_score",
best_product=best_product,
best_score=score,
error_message=_format_match_diagnostics(diagnostics),
source=source,
)
attempts_written += 1
skipped_low += 1
continue
tags = _extract_tags(best_product)
tags.extend(getattr(diagnostics, "tags", []))
for reason in getattr(diagnostics, "reasons", ()) or ():
tags.append(f"match_{reason}")
tags = list(dict.fromkeys(tags))
should_write, write_reason = self._should_upsert_competitor_price(
sku,
best_product,
score,
source=source,
)
if not should_write:
logger.info(f"[Feeder] {sku} 進入人工覆核,不覆蓋既有配對 | {write_reason}")
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=len(products),
attempt_status="needs_review",
best_product=best_product,
best_score=score,
error_message=f"{write_reason}; {_format_match_diagnostics(diagnostics)}",
source=source,
)
attempts_written += 1
skipped_low += 1
continue
tags.append(write_reason)
self._upsert_competitor_price(
sku,
best_product,
score,
tags,
momo_product_id=momo_product_id,
momo_price=momo_price,
source=source,
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=len(products),
attempt_status="matched",
best_product=best_product,
best_score=score,
source=source,
)
matched += 1
history_written += 1
attempts_written += 1
logger.debug(
f"[Feeder] {sku} → PChome ${best_product.price} "
f"score={score:.3f} tags={tags}"
)
except Exception as e:
logger.error(f"[Feeder] {sku} 處理失敗: {e}")
try:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
attempt_status="error",
error_message=str(e),
source=source,
)
attempts_written += 1
except Exception as attempt_error:
logger.warning(f"[Feeder] {sku} 比對嘗試紀錄寫入失敗: {attempt_error}")
errors += 1
duration = round(time.time() - start, 2)
logger.info(
f"[Feeder] 完成 matched={matched} skipped_no={skipped_no} "
f"skipped_low={skipped_low} errors={errors} "
f"history_written={history_written} attempts_written={attempts_written} 耗時={duration}s"
)
return FeederResult(
total_skus=len(skus),
matched=matched,
skipped_no_result=skipped_no,
skipped_low_score=skipped_low,
errors=errors,
duration_sec=duration,
history_written=history_written,
attempts_written=attempts_written,
)
def run(self, source: str = "pchome") -> FeederResult:
"""
執行一輪競品價格抓取與寫入
Args:
source: 競品來源代碼(目前支援 'pchome'
Returns:
FeederResult
"""
try:
skus = self._fetch_active_skus()
except Exception as e:
logger.error(f"[Feeder] 讀取商品清單失敗: {e}")
return FeederResult(0, 0, 0, 0, 1, 0.0)
return self._run_sku_items(skus, source=source, label="PChome 競品價格")
def run_unmatched_priority(self, limit: int = 80, source: str = "pchome") -> FeederResult:
"""優先補抓尚未有有效 PChome 配對的高價商品。"""
try:
skus = self._fetch_unmatched_priority_skus(limit=limit)
except Exception as e:
logger.error(f"[Feeder] 讀取待比對優先商品失敗: {e}")
return FeederResult(0, 0, 0, 0, 1, 0.0)
return self._run_sku_items(skus, source=source, label="待比對優先補抓")
# ─────────────────────────────────────────────
# CLI 測試(不依賴 DB直接測試爬蟲 + 比對邏輯)
# python3 services/competitor_price_feeder.py
# ─────────────────────────────────────────────
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
from services.pchome_crawler import PChomeCrawler
test_items = [
{"sku": "A003", "name": "舒特膚AD乳液200ml"},
{"sku": "A001", "name": "玻尿酸面膜10片裝"},
{"sku": "A009", "name": "美白化妝水150ml"},
]
crawler = PChomeCrawler(delay=0.8)
print("=== Competitor Price Feeder CLI 測試 ===\n")
for item in test_items:
keyword = item["name"][:20]
ok, msg, products = crawler.search_products(keyword, limit=10)
if not ok or not products:
print(f"[{item['sku']}] 無結果: {msg}")
continue
result = _find_best_match(item["name"], products)
if not result:
print(f"[{item['sku']}] 無法比對")
continue
best, score = result
tags = _extract_tags(best)
symbol = "" if score >= MIN_MATCH_SCORE else "⚠️ 低分"
print(
f"{symbol} [{item['sku']}] {item['name'][:25]}\n"
f" → PChome: {best.name[:40]}\n"
f" → 售價 ${best.price} | 分數 {score:.3f} | 標籤 {tags}\n"
)