Files
ewoooc/services/competitor_price_feeder.py
OoO 0cea70890a
All checks were successful
CD Pipeline / deploy (push) Successful in 1m21s
導入 browse.sh 比價診斷計畫
2026-05-21 18:40:49 +08:00

2150 lines
91 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
競品價格補給線 Worker (Competitor Price Feeder)
角色:獨立背景 Worker生產者端
架構位置:
[本 Worker — 每 4 小時跑一次] → competitor_prices DB 表(最新快取)
→ competitor_price_history DB 表(歷史快照)
[AI Pipeline] → fetch_candidates() LEFT JOIN competitor_prices消費者端
設計原則:
- 與 AI Pipeline 完全解耦:本 Worker 掛了不影響核心大腦
- 自帶重試機制,不阻塞主排程
- 語意化標籤 (tags) 讓 Hermes 獲得更豐富的情境
爬取邏輯:
MOMO 商品名稱 → PChome 關鍵字搜尋 → 模糊比對最佳匹配 → 寫入 competitor_prices + competitor_price_history
依賴:
services/pchome_crawler.py — 搜尋 + 批量 API
services/price_comparison.py — ProductNameParser + 模糊比對
"""
import json
import logging
import os
import re
import time
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Optional
from urllib.parse import quote_plus
logger = logging.getLogger(__name__)
# ── 比對參數 ─────────────────────────────────────────
MIN_MATCH_SCORE = 0.76 # 低於此分數不寫入;核心比價寧可待審也不能錯配
REPLACE_DIFFERENT_PRODUCT_SCORE = 0.84 # 已有不同 PChome 商品時,需超高信心才覆蓋
EARLY_STOP_MATCH_SCORE = 0.90 # 搜尋候選池只有強同款才提前停止,避免次佳候選卡住後續精準搜尋詞
SEARCH_LIMIT = int(os.getenv("PCHOME_FEEDER_SEARCH_LIMIT", "20")) # 每個搜尋詞取 PChome 前 N 筆
MAX_SEARCH_TERMS = int(os.getenv("PCHOME_FEEDER_MAX_SEARCH_TERMS", "5")) # 每個 MOMO 商品最多嘗試幾組搜尋詞
SEARCH_MAX_PAGES = int(os.getenv("PCHOME_FEEDER_SEARCH_MAX_PAGES", "2")) # 每個搜尋詞最多掃描 PChome 搜尋頁數
BATCH_SIZE = 30 # 每批 DB 寫入筆數
RATE_DELAY = float(os.getenv("PCHOME_FEEDER_RATE_DELAY", "1.0")) # 每次 PChome 請求間隔(秒)
TTL_HOURS = 6 # competitor_prices 快取有效期
REQUEST_TIMEOUT = float(os.getenv("PCHOME_FEEDER_TIMEOUT", "12")) # 避免外部搜尋 API 長時間卡住排程
VARIANT_RECALL_SORTS = ("sale/dc", "new/dc")
RECOVERABLE_LOW_SCORE_FLOOR = max(MIN_MATCH_SCORE - 0.03, 0.72)
BROWSE_SH_DIAGNOSTIC_ENABLED = os.getenv("PCHOME_FEEDER_BROWSE_SH_DIAGNOSTIC_ENABLED", "true").lower() in {"1", "true", "yes", "on"}
BROWSE_SH_EXECUTE_ENABLED = os.getenv("PCHOME_FEEDER_BROWSE_SH_EXECUTE_ENABLED", "false").lower() in {"1", "true", "yes", "on"}
BROWSE_SH_TIMEOUT_SECONDS = int(os.getenv("PCHOME_FEEDER_BROWSE_SH_TIMEOUT", "20"))
BROWSE_SH_MAX_EXECUTIONS_PER_RUN = int(os.getenv("PCHOME_FEEDER_BROWSE_SH_MAX_PER_RUN", "3"))
BROWSE_SH_OUTPUT_PREVIEW_CHARS = int(os.getenv("PCHOME_FEEDER_BROWSE_SH_OUTPUT_PREVIEW_CHARS", "1200"))
RECOVERABLE_DIAGNOSTIC_REASONS = {
"strong_product_line_match",
"strong_exact_spec_match",
"shared_identity_anchor",
"shared_identity_anchor_no_spec",
"shared_identity_anchor_packaging_variant",
"shared_identity_anchor_marketing_variant",
"shared_identity_anchor_core_line",
"shared_identity_anchor_variant_safe",
"shared_model_token",
"spec_name_alignment",
}
# ── Feeder 結果 ───────────────────────────────────────
@dataclass
class FeederResult:
total_skus: int
matched: int
skipped_no_result: int
skipped_low_score: int
errors: int
duration_sec: float
history_written: int = 0
attempts_written: int = 0
def _has_recoverable_identity_signal(diagnostics) -> bool:
if not diagnostics:
return False
reasons = set(getattr(diagnostics, "reasons", ()) or ())
if reasons & RECOVERABLE_DIAGNOSTIC_REASONS:
return True
return (
getattr(diagnostics, "brand_score", 0) >= 0.95
and getattr(diagnostics, "token_score", 0) >= 0.56
and getattr(diagnostics, "sequence_score", 0) >= 0.50
and getattr(diagnostics, "comparison_mode", "exact_identity") == "exact_identity"
)
def _classify_low_score_attempt(score: float, diagnostics) -> str:
if getattr(diagnostics, "hard_veto", False):
return "identity_veto"
if score >= RECOVERABLE_LOW_SCORE_FLOOR and _has_recoverable_identity_signal(diagnostics):
return "recoverable_low_score"
return "true_low_confidence"
def _has_variant_selection_gap(
momo_name: str,
ranked_matches: list[tuple],
best_score: float,
) -> bool:
"""True when source lacks explicit variant selection but top candidates require one."""
try:
from services.marketplace_product_matcher import (
_explicit_variant_option_tokens,
parse_product_identity,
)
except Exception:
return False
source_identity = parse_product_identity(momo_name)
source_options = set(_explicit_variant_option_tokens(source_identity))
if re.search(r"任選\s*[一二兩三四五六七八九十0-9]+\s*款", momo_name):
source_options -= {str(value) for value in range(1, 11)}
source_options -= {f"{value:02d}" for value in range(1, 11)}
if source_options:
return False
threshold = max(best_score - 0.02, RECOVERABLE_LOW_SCORE_FLOOR)
option_buckets: set[str] = set()
for product, score, diagnostics in ranked_matches[:5]:
if getattr(diagnostics, "hard_veto", False) or score < threshold:
continue
candidate_identity = parse_product_identity(getattr(product, "name", "") or "")
options = _explicit_variant_option_tokens(candidate_identity)
if len(options) >= 2:
return True
option_buckets.update(options)
if len(option_buckets) >= 2:
return True
return False
def _extract_tags(pchome_product) -> list:
"""
從 PChomeProduct 物件提取語意標籤
標籤設計:
- "on_sale" — is_on_sale = True
- "discount_10pct" — 折扣 10~19%
- "discount_20pct" — 折扣 20~29%
- "discount_30pct" — 折扣 ≥ 30%
- "low_stock" — 庫存 < 10
- "high_rating" — 評分 ≥ 4.5
"""
tags = []
if pchome_product.is_on_sale:
tags.append("on_sale")
try:
disc = int(pchome_product.discount or 0)
except (ValueError, TypeError):
disc = 0
if disc >= 30:
tags.append("discount_30pct")
elif disc >= 20:
tags.append("discount_20pct")
elif disc >= 10:
tags.append("discount_10pct")
try:
stock = int(pchome_product.stock) if pchome_product.stock is not None else None
if stock is not None and 0 < stock < 10:
tags.append("low_stock")
except (ValueError, TypeError):
pass
try:
if pchome_product.rating and float(pchome_product.rating) >= 4.5:
tags.append("high_rating")
except (ValueError, TypeError):
pass
return tags
def _extend_match_tags(tags: list, diagnostics, extra: list[str] = None) -> list:
"""Attach matcher evidence tags in one place so all feeder lanes agree."""
tags = list(tags or [])
tags.extend(getattr(diagnostics, "tags", []) or [])
for reason in getattr(diagnostics, "reasons", ()) or ():
tags.append(f"match_{reason}")
if extra:
tags.extend(extra)
return list(dict.fromkeys(tag for tag in tags if tag))
def _clean_search_text(value: str) -> str:
value = re.sub(r'[()]', ' ', value or '')
value = re.sub(r'[【】\[\]]', ' ', value)
value = re.sub(r'[^\w\u4e00-\u9fff]+', ' ', value)
return re.sub(r'\s+', ' ', value).strip()
def _dedupe_terms(terms: list) -> list:
result = []
seen = set()
for term in terms:
cleaned = _clean_search_text(term)
if len(cleaned) < 2:
continue
key = cleaned.lower()
if key in seen:
continue
seen.add(key)
result.append(cleaned[:36])
if len(result) >= MAX_SEARCH_TERMS:
break
return result
def _build_search_keywords(momo_name: str) -> list:
"""
用多組商品身份線索搜尋 PChome提高命中率但仍交給身份比對門檻把關。
"""
try:
from services.marketplace_product_matcher import build_search_terms
terms = build_search_terms(momo_name, max_terms=MAX_SEARCH_TERMS)
terms.append(momo_name)
except Exception:
logger.debug(
"[Feeder] marketplace matcher failed while building search keywords; "
"fallback to cleaned product name",
exc_info=True,
)
cleaned = _clean_search_text(momo_name)
terms = [cleaned[:36], cleaned[:24]]
primary_terms = _dedupe_terms(terms[: max(1, MAX_SEARCH_TERMS - 1)])
original_terms = _dedupe_terms([momo_name])
for term in original_terms:
if term.lower() not in {existing.lower() for existing in primary_terms}:
primary_terms.append(term)
break
return _dedupe_terms(primary_terms)
def _build_variant_recall_search_plan(momo_name: str, keywords: list[str]) -> list[tuple[str, str | None]]:
plan = [(keyword, None) for keyword in (keywords or [])]
try:
from services.marketplace_product_matcher import parse_product_identity
identity = parse_product_identity(momo_name)
except Exception:
return plan
brand_tokens = {token.lower() for token in getattr(identity, "brand_tokens", set())}
if not ({"dashing", "diva"} <= brand_tokens and "美甲片" in getattr(identity, "searchable_name", "")):
return plan
searchable_name = getattr(identity, "searchable_name", "")
broad_keywords = []
if "足部時尚潮流美甲片" in searchable_name:
broad_keywords.append("dashing diva 足部時尚潮流美甲片")
elif "頂級璀燦美甲片" in searchable_name:
broad_keywords.append("dashing diva 頂級璀燦美甲片")
elif "時尚潮流美甲片" in searchable_name:
broad_keywords.append("dashing diva 時尚潮流美甲片")
elif "薄型經典美甲片" in searchable_name:
broad_keywords.append("dashing diva 薄型經典美甲片")
broad_keywords.extend((
"dashing diva magicpress",
"dashing diva 美甲片",
))
seen = {(keyword.lower(), sort) for keyword, sort in plan}
def append(keyword: str, sort: str | None = None) -> None:
key = (keyword.lower(), sort)
if key in seen:
return
seen.add(key)
plan.append((keyword, sort))
for broad_keyword in broad_keywords:
append(broad_keyword, None)
if broad_keywords:
for sort in VARIANT_RECALL_SORTS:
append(broad_keywords[0], sort)
return plan
def _format_match_diagnostics(diagnostics) -> str:
if not diagnostics:
return ""
reasons = ",".join(getattr(diagnostics, "reasons", ()) or ())
return (
f"score={diagnostics.score}; brand={diagnostics.brand_score}; "
f"token={diagnostics.token_score}; spec={diagnostics.spec_score}; "
f"seq={diagnostics.sequence_score}; type={diagnostics.type_score}; "
f"penalty={diagnostics.price_penalty}; veto={diagnostics.hard_veto}; "
f"mode={getattr(diagnostics, 'comparison_mode', 'exact_identity')}; "
f"match_type={getattr(diagnostics, 'match_type', '')}; "
f"price_basis={getattr(diagnostics, 'price_basis', '')}; "
f"alert_tier={getattr(diagnostics, 'alert_tier', '')}; "
f"reasons={reasons}"
)
def _match_diagnostics_payload(diagnostics) -> dict:
"""Serialize matcher diagnostics for review/report consumers."""
if not diagnostics:
return {}
return {
"score": getattr(diagnostics, "score", None),
"brand_score": getattr(diagnostics, "brand_score", None),
"token_score": getattr(diagnostics, "token_score", None),
"spec_score": getattr(diagnostics, "spec_score", None),
"sequence_score": getattr(diagnostics, "sequence_score", None),
"type_score": getattr(diagnostics, "type_score", None),
"price_penalty": getattr(diagnostics, "price_penalty", None),
"hard_veto": bool(getattr(diagnostics, "hard_veto", False)),
"comparison_mode": getattr(diagnostics, "comparison_mode", "exact_identity"),
"match_type": getattr(diagnostics, "match_type", None),
"price_basis": getattr(diagnostics, "price_basis", None),
"alert_tier": getattr(diagnostics, "alert_tier", None),
"evidence_flags": list(getattr(diagnostics, "evidence_flags", ()) or ()),
"reasons": list(getattr(diagnostics, "reasons", ()) or ()),
}
def _pchome_search_url(keyword: str) -> str:
return f"https://ecshweb.pchome.com.tw/search/v3.3/?q={quote_plus(keyword or '')}"
def _build_browse_sh_diagnostic_payload(
momo_name: str,
search_terms: list[str] = None,
reason: str = "unknown",
best_product=None,
best_score: float = None,
diagnostics=None,
candidate_count: int = 0,
) -> dict:
"""Build a read-only browse.sh probe plan for low-confidence PChome cases."""
if not BROWSE_SH_DIAGNOSTIC_ENABLED:
return {}
terms = _dedupe_terms(search_terms or _build_search_keywords(momo_name))[:3]
urls = [_pchome_search_url(term) for term in terms]
product_url = getattr(best_product, "product_url", None)
if product_url:
urls.append(product_url)
urls = list(dict.fromkeys(url for url in urls if url))
primary_url = urls[0] if urls else _pchome_search_url(momo_name)
diagnostic_payload = _match_diagnostics_payload(diagnostics)
return {
"tool": "browse.sh",
"mode": "execute_on_demand" if BROWSE_SH_EXECUTE_ENABLED else "plan_only",
"reason": reason,
"execute_enabled": BROWSE_SH_EXECUTE_ENABLED,
"timeout_seconds": BROWSE_SH_TIMEOUT_SECONDS,
"candidate_count": int(candidate_count or 0),
"momo_name": (momo_name or "")[:300],
"search_terms": terms,
"urls": urls,
"suggested_commands": [
{
"purpose": "static_fetch_first_page",
"args": ["get", primary_url],
},
{
"purpose": "manual_browser_probe",
"args": ["open", primary_url],
},
],
"best_candidate": {
"product_id": getattr(best_product, "product_id", None),
"name": (getattr(best_product, "name", None) or "")[:300] or None,
"price": getattr(best_product, "price", None),
"url": product_url,
"score": best_score,
} if best_product else None,
"diagnostic_codes": diagnostic_payload.get("reasons") or [],
"comparison_mode": diagnostic_payload.get("comparison_mode"),
"hard_veto": diagnostic_payload.get("hard_veto"),
"execution": {"status": "disabled"},
}
def _product_snapshot_payload(product) -> dict:
payload = {
"competitor_product_url": None,
"competitor_image_url": None,
"competitor_stock": None,
}
if not product:
return payload
payload.update({
"competitor_product_url": getattr(product, "product_url", None),
"competitor_image_url": getattr(product, "image_url", None),
"competitor_stock": getattr(product, "stock", None),
})
return payload
def _product_id_key(product_id: str) -> str:
"""Normalize PChome IDs for comparing cached IDs with API-returned IDs."""
return re.sub(r"[^A-Z0-9]", "", str(product_id or "").upper())
def _find_best_match_detail(
momo_name: str,
pchome_products: list,
momo_price: float = None,
) -> Optional[tuple]:
"""
從 PChome 搜尋結果中找出與 MOMO 商品名稱最接近的一筆
Args:
momo_name: MOMO 商品名稱
pchome_products: PChomeProduct 列表
Returns:
(PChomeProduct, score, diagnostics) or None
"""
ranked = _rank_match_details(momo_name, pchome_products, momo_price=momo_price)
return ranked[0] if ranked else None
def _rank_match_details(
momo_name: str,
pchome_products: list,
momo_price: float = None,
) -> list[tuple]:
"""Score all PChome candidates and return them from strongest to weakest."""
from services.marketplace_product_matcher import score_marketplace_match
ranked = []
for p in pchome_products:
diagnostics = score_marketplace_match(
momo_name,
p.name,
momo_price=momo_price,
competitor_price=getattr(p, "price", None),
)
ranked.append((p, diagnostics.score, diagnostics))
return sorted(ranked, key=lambda item: item[1], reverse=True)
def _find_best_match(momo_name: str, pchome_products: list) -> Optional[tuple]:
"""Backward-compatible helper for smoke scripts."""
result = _find_best_match_detail(momo_name, pchome_products)
if not result:
return None
best, score, _diagnostics = result
return best, score
def _search_pchome_candidates(crawler, momo_name: str, keywords: list = None, momo_price: float = None) -> list:
"""以多組搜尋詞擴大 PChome 候選池,只在強同款時提前停止。"""
candidates = []
seen_ids = set()
search_limit = SEARCH_LIMIT * max(1, SEARCH_MAX_PAGES)
active_keywords = keywords or _build_search_keywords(momo_name)
search_plan = _build_variant_recall_search_plan(momo_name, active_keywords)
for keyword, sort in search_plan:
if sort:
ok, _, products = crawler.search_products(
keyword,
limit=search_limit,
max_pages=SEARCH_MAX_PAGES,
sort=sort,
)
else:
ok, _, products = crawler.search_products(keyword, limit=search_limit, max_pages=SEARCH_MAX_PAGES)
if not ok or not products:
continue
for product in products:
if product.product_id in seen_ids:
continue
seen_ids.add(product.product_id)
candidates.append(product)
best = _find_best_match_detail(momo_name, candidates, momo_price=momo_price)
if best and best[1] >= EARLY_STOP_MATCH_SCORE:
break
return candidates
def _recover_low_score_with_fresh_search(
crawler,
momo_name: str,
momo_price: float = None,
existing_product_id: str = "",
) -> tuple[Optional[tuple], list[str], int]:
"""
當 legacy / known-id 候選重評仍低分時,再跑一次 fresh keyword search
嘗試把舊錯配洗成新的真同款。
"""
keywords = _build_search_keywords(momo_name)
candidates = _search_pchome_candidates(
crawler,
momo_name,
keywords=keywords,
momo_price=momo_price,
)
if existing_product_id:
existing_key = _product_id_key(existing_product_id)
fresh_candidates = [
candidate
for candidate in candidates
if _product_id_key(getattr(candidate, "product_id", "")) != existing_key
]
if fresh_candidates:
candidates = fresh_candidates
best = _find_best_match_detail(momo_name, candidates, momo_price=momo_price)
return best, keywords, len(candidates)
def _structural_similarity(momo_p, pchome_p) -> float:
"""
結構化相似度計算(品牌 + 規格 + 關鍵字)
權重:品牌匹配 0.4 + 規格匹配 0.3 + 關鍵字相似 0.3
"""
from difflib import SequenceMatcher
score = 0.0
# 品牌比對 (0.4)
if momo_p.brand and pchome_p.brand:
if momo_p.brand == pchome_p.brand:
score += 0.4
elif momo_p.brand in pchome_p.brand or pchome_p.brand in momo_p.brand:
score += 0.2
elif not momo_p.brand and not pchome_p.brand:
score += 0.1 # 都沒有品牌,不扣分
# 規格比對 (0.3) — 容量/克重
momo_specs = momo_p.specs or {}
pchome_specs = pchome_p.specs or {}
if momo_specs and pchome_specs:
matching_specs = sum(
1 for k, v in momo_specs.items()
if pchome_specs.get(k) == v
)
total_specs = max(len(momo_specs), len(pchome_specs), 1)
score += 0.3 * (matching_specs / total_specs)
elif not momo_specs and not pchome_specs:
score += 0.15
# 關鍵字相似度 (0.3)
momo_kws = " ".join(momo_p.keywords or [])
pchome_kws = " ".join(pchome_p.keywords or [])
if momo_kws and pchome_kws:
kw_sim = SequenceMatcher(None, momo_kws.lower(), pchome_kws.lower()).ratio()
score += 0.3 * kw_sim
return round(score, 3)
class CompetitorPriceFeeder:
"""
競品價格補給線 Worker
用法:
feeder = CompetitorPriceFeeder(engine=db_engine)
result = feeder.run(source="pchome")
"""
def __init__(self, engine=None):
self.engine = engine
self._history_table_ready = False
self._attempt_table_ready = False
self._price_table_columns_ready = False
self._browse_sh_executions = 0
def _ensure_table_columns(self, conn, table: str, column_specs: list[tuple[str, str]]) -> None:
"""補齊既有表欄位;避免正式端舊表在新 INSERT 時炸掉。"""
from sqlalchemy import inspect, text
inspector = inspect(conn)
if not inspector.has_table(table):
return
existing = {column["name"] for column in inspector.get_columns(table)}
for column_name, column_type in column_specs:
if column_name in existing:
continue
conn.execute(text(f"ALTER TABLE {table} ADD COLUMN {column_name} {column_type}"))
existing.add(column_name)
def _ensure_competitor_prices_columns(self, conn) -> None:
if self._price_table_columns_ready:
return
self._ensure_table_columns(conn, "competitor_prices", [
("competitor_product_url", "TEXT"),
("competitor_image_url", "TEXT"),
("competitor_stock", "INTEGER"),
("match_diagnostic_json", "JSONB" if conn.dialect.name == "postgresql" else "TEXT"),
("comparison_mode", "VARCHAR(40)"),
("hard_veto", "BOOLEAN"),
("diagnostic_codes", "JSONB" if conn.dialect.name == "postgresql" else "TEXT"),
])
self._price_table_columns_ready = True
def _ensure_competitor_price_history_table(self, conn):
"""確保競品價格歷史表存在;排程可自癒補表,不依賴手動 migration。"""
if self._history_table_ready:
return
from sqlalchemy import text
if conn.dialect.name == "postgresql":
conn.execute(text("""
CREATE TABLE IF NOT EXISTS competitor_price_history (
id BIGSERIAL PRIMARY KEY,
sku VARCHAR(50) NOT NULL,
source VARCHAR(30) NOT NULL DEFAULT 'pchome',
momo_product_id INTEGER,
momo_price NUMERIC(10,2),
price NUMERIC(10,2) NOT NULL,
original_price NUMERIC(10,2),
discount_pct INTEGER,
competitor_product_id VARCHAR(100),
competitor_product_name TEXT,
competitor_product_url TEXT,
competitor_image_url TEXT,
competitor_stock INTEGER,
match_score NUMERIC(4,3),
tags JSONB DEFAULT '[]'::jsonb,
match_diagnostic_json JSONB,
comparison_mode VARCHAR(40),
hard_veto BOOLEAN,
diagnostic_codes JSONB,
crawled_at TIMESTAMP NOT NULL DEFAULT NOW()
)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_price_history_sku_source_time
ON competitor_price_history (sku, source, crawled_at DESC)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_price_history_competitor_id
ON competitor_price_history (competitor_product_id)
"""))
else:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS competitor_price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sku VARCHAR(50) NOT NULL,
source VARCHAR(30) NOT NULL DEFAULT 'pchome',
momo_product_id INTEGER,
momo_price NUMERIC(10,2),
price NUMERIC(10,2) NOT NULL,
original_price NUMERIC(10,2),
discount_pct INTEGER,
competitor_product_id VARCHAR(100),
competitor_product_name TEXT,
competitor_product_url TEXT,
competitor_image_url TEXT,
competitor_stock INTEGER,
match_score NUMERIC(4,3),
tags TEXT DEFAULT '[]',
match_diagnostic_json TEXT,
comparison_mode VARCHAR(40),
hard_veto BOOLEAN,
diagnostic_codes TEXT,
crawled_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_price_history_sku_source_time
ON competitor_price_history (sku, source, crawled_at DESC)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_price_history_competitor_id
ON competitor_price_history (competitor_product_id)
"""))
self._ensure_table_columns(conn, "competitor_price_history", [
("competitor_product_url", "TEXT"),
("competitor_image_url", "TEXT"),
("competitor_stock", "INTEGER"),
("match_diagnostic_json", "JSONB" if conn.dialect.name == "postgresql" else "TEXT"),
("comparison_mode", "VARCHAR(40)"),
("hard_veto", "BOOLEAN"),
("diagnostic_codes", "JSONB" if conn.dialect.name == "postgresql" else "TEXT"),
])
self._history_table_ready = True
def _ensure_competitor_match_attempts_table(self, conn):
"""確保 PChome 比對嘗試表存在;成功、低分、無結果與錯誤都要留痕。"""
if self._attempt_table_ready:
return
from sqlalchemy import text
if conn.dialect.name == "postgresql":
conn.execute(text("""
CREATE TABLE IF NOT EXISTS competitor_match_attempts (
id BIGSERIAL PRIMARY KEY,
sku VARCHAR(50) NOT NULL,
source VARCHAR(30) NOT NULL DEFAULT 'pchome',
momo_product_id INTEGER,
momo_product_name TEXT,
momo_price NUMERIC(10,2),
search_terms JSONB DEFAULT '[]'::jsonb,
candidate_count INTEGER DEFAULT 0,
attempt_status VARCHAR(30) NOT NULL,
best_competitor_product_id VARCHAR(100),
best_competitor_product_name TEXT,
competitor_product_url TEXT,
competitor_image_url TEXT,
competitor_stock INTEGER,
best_competitor_price NUMERIC(10,2),
best_match_score NUMERIC(4,3),
match_diagnostic_json JSONB,
comparison_mode VARCHAR(40),
hard_veto BOOLEAN,
diagnostic_codes JSONB,
browse_diagnostic_json JSONB,
error_message TEXT,
attempted_at TIMESTAMP NOT NULL DEFAULT NOW()
)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_match_attempts_sku_source_time
ON competitor_match_attempts (sku, source, attempted_at DESC)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_match_attempts_status_time
ON competitor_match_attempts (attempt_status, attempted_at DESC)
"""))
else:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS competitor_match_attempts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sku VARCHAR(50) NOT NULL,
source VARCHAR(30) NOT NULL DEFAULT 'pchome',
momo_product_id INTEGER,
momo_product_name TEXT,
momo_price NUMERIC(10,2),
search_terms TEXT DEFAULT '[]',
candidate_count INTEGER DEFAULT 0,
attempt_status VARCHAR(30) NOT NULL,
best_competitor_product_id VARCHAR(100),
best_competitor_product_name TEXT,
competitor_product_url TEXT,
competitor_image_url TEXT,
competitor_stock INTEGER,
best_competitor_price NUMERIC(10,2),
best_match_score NUMERIC(4,3),
match_diagnostic_json TEXT,
comparison_mode VARCHAR(40),
hard_veto BOOLEAN,
diagnostic_codes TEXT,
browse_diagnostic_json TEXT,
error_message TEXT,
attempted_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_match_attempts_sku_source_time
ON competitor_match_attempts (sku, source, attempted_at DESC)
"""))
conn.execute(text("""
CREATE INDEX IF NOT EXISTS idx_comp_match_attempts_status_time
ON competitor_match_attempts (attempt_status, attempted_at DESC)
"""))
self._ensure_table_columns(conn, "competitor_match_attempts", [
("competitor_product_url", "TEXT"),
("competitor_image_url", "TEXT"),
("competitor_stock", "INTEGER"),
("match_diagnostic_json", "JSONB" if conn.dialect.name == "postgresql" else "TEXT"),
("comparison_mode", "VARCHAR(40)"),
("hard_veto", "BOOLEAN"),
("diagnostic_codes", "JSONB" if conn.dialect.name == "postgresql" else "TEXT"),
("browse_diagnostic_json", "JSONB" if conn.dialect.name == "postgresql" else "TEXT"),
])
self._attempt_table_ready = True
def _prepare_browse_diagnostic(
self,
momo_name: str,
search_terms: list = None,
reason: str = "unknown",
best_product=None,
best_score: float = None,
diagnostics=None,
candidate_count: int = 0,
) -> dict:
"""Return browse.sh diagnostic evidence; CLI execution remains opt-in and rate-limited."""
payload = _build_browse_sh_diagnostic_payload(
momo_name,
search_terms=search_terms,
reason=reason,
best_product=best_product,
best_score=best_score,
diagnostics=diagnostics,
candidate_count=candidate_count,
)
if not payload or not BROWSE_SH_EXECUTE_ENABLED:
return payload
if self._browse_sh_executions >= BROWSE_SH_MAX_EXECUTIONS_PER_RUN:
payload["execution"] = {"status": "rate_limited"}
return payload
command_args = tuple((payload.get("suggested_commands") or [{}])[0].get("args") or ())
if not command_args:
payload["execution"] = {"status": "missing_command"}
return payload
try:
from services.browse_sh_tool import BrowseShTool
self._browse_sh_executions += 1
result = BrowseShTool(timeout_seconds=BROWSE_SH_TIMEOUT_SECONDS).run(
command_args,
timeout_seconds=BROWSE_SH_TIMEOUT_SECONDS,
)
payload["execution"] = {
"status": "ok" if result.ok else "failed",
"returncode": result.returncode,
"timed_out": result.timed_out,
"unavailable_reason": result.unavailable_reason,
"stdout_preview": (result.stdout or "")[:BROWSE_SH_OUTPUT_PREVIEW_CHARS],
"stderr_preview": (result.stderr or "")[:BROWSE_SH_OUTPUT_PREVIEW_CHARS],
}
except Exception as exc:
payload["execution"] = {
"status": "error",
"error": str(exc)[:500],
}
return payload
def _record_match_attempt(
self,
sku: str,
momo_name: str,
momo_product_id: int = None,
momo_price: float = None,
search_terms: list = None,
candidate_count: int = 0,
attempt_status: str = "unknown",
best_product=None,
best_score: float = None,
diagnostics=None,
browse_diagnostic: dict = None,
error_message: str = None,
source: str = "pchome",
) -> None:
"""追加一筆 PChome 比對嘗試紀錄,讓待比對/低信心也能回溯。"""
from sqlalchemy import text
with self.engine.begin() as conn:
self._ensure_competitor_match_attempts_table(conn)
search_terms_expr = "CAST(:search_terms AS jsonb)" if conn.dialect.name == "postgresql" else ":search_terms"
json_cast = "CAST(:match_diagnostic_json AS jsonb)" if conn.dialect.name == "postgresql" else ":match_diagnostic_json"
codes_cast = "CAST(:diagnostic_codes AS jsonb)" if conn.dialect.name == "postgresql" else ":diagnostic_codes"
browse_cast = "CAST(:browse_diagnostic_json AS jsonb)" if conn.dialect.name == "postgresql" else ":browse_diagnostic_json"
diagnostic_payload = _match_diagnostics_payload(diagnostics)
diagnostic_codes = diagnostic_payload.get("reasons") or []
product_payload = _product_snapshot_payload(best_product)
browse_diagnostic_json = (
json.dumps(browse_diagnostic, ensure_ascii=False)
if browse_diagnostic
else None
)
conn.execute(text(f"""
INSERT INTO competitor_match_attempts
(sku, source, momo_product_id, momo_product_name, momo_price,
search_terms, candidate_count, attempt_status,
best_competitor_product_id, best_competitor_product_name,
competitor_product_url, competitor_image_url, competitor_stock,
best_competitor_price, best_match_score,
match_diagnostic_json, comparison_mode, hard_veto, diagnostic_codes,
browse_diagnostic_json,
error_message,
attempted_at)
VALUES
(:sku, :source, :momo_product_id, :momo_product_name, :momo_price,
{search_terms_expr}, :candidate_count, :attempt_status,
:best_id, :best_name,
:competitor_product_url, :competitor_image_url, :competitor_stock,
:best_price, :best_score,
{json_cast}, :comparison_mode, :hard_veto, {codes_cast},
{browse_cast},
:error_message,
CURRENT_TIMESTAMP)
"""), {
"sku": sku,
"source": source,
"momo_product_id": momo_product_id,
"momo_product_name": momo_name,
"momo_price": momo_price,
"search_terms": json.dumps(search_terms or [], ensure_ascii=False),
"candidate_count": candidate_count,
"attempt_status": attempt_status,
"best_id": getattr(best_product, "product_id", None),
"best_name": (getattr(best_product, "name", None) or "")[:300] or None,
**product_payload,
"best_price": getattr(best_product, "price", None),
"best_score": best_score,
"match_diagnostic_json": json.dumps(diagnostic_payload, ensure_ascii=False) if diagnostic_payload else None,
"comparison_mode": diagnostic_payload.get("comparison_mode"),
"hard_veto": diagnostic_payload.get("hard_veto"),
"diagnostic_codes": json.dumps(diagnostic_codes, ensure_ascii=False) if diagnostic_codes else None,
"browse_diagnostic_json": browse_diagnostic_json,
"error_message": (error_message or "")[:1000] or None,
})
def _fetch_active_skus(self) -> list:
"""
從 products 表取得待監控的 ACTIVE 商品清單
Returns:
list of {"sku": str, "name": str, "category": str}
"""
if self.engine is None:
raise RuntimeError("需要注入 SQLAlchemy engine")
from sqlalchemy import text
sql = text("""
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.category,
(
SELECT pr.price
FROM price_records pr
WHERE pr.product_id = p.id
ORDER BY pr.timestamp DESC
LIMIT 1
) AS momo_price
FROM products p
WHERE p.status = 'ACTIVE'
AND EXISTS (
SELECT 1
FROM price_records pr
WHERE pr.product_id = p.id
)
ORDER BY p.i_code
""")
with self.engine.connect() as conn:
rows = conn.execute(sql).fetchall()
return [dict(r._mapping) for r in rows]
def _fetch_unmatched_priority_skus(self, limit: int = 80) -> list:
"""
取得目前沒有有效 PChome 配對的高價 ACTIVE 商品,供補強流程優先處理。
"""
if self.engine is None:
raise RuntimeError("需要注入 SQLAlchemy engine")
from sqlalchemy import text
sql = text("""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
)
SELECT
lm.product_id,
lm.sku,
lm.name,
lm.category,
lm.momo_price
FROM latest_momo lm
LEFT JOIN competitor_prices cp
ON cp.sku = lm.sku
AND cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND COALESCE(cp.match_score, 0) >= :match_score_floor
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
WHERE lm.rn = 1
AND cp.sku IS NULL
ORDER BY lm.momo_price DESC NULLS LAST, lm.sku
LIMIT :limit
""")
with self.engine.connect() as conn:
rows = conn.execute(
sql,
{"limit": max(1, min(int(limit), 300)), "match_score_floor": MIN_MATCH_SCORE},
).fetchall()
return [dict(r._mapping) for r in rows]
def _fetch_retryable_candidate_skus(self, limit: int = 80, min_score: float = 0.70) -> list:
"""
取得近門檻且非 hard veto 的候選,供 matcher 升級後重新評分。
這條路徑不重新搜尋,只用前次留下的 PChome product_id 批次查詢最新商品資料,
適合把舊 scorer 卡在 0.70~0.759 的真同款重新推進正式比價。
僅重跑明顯仍在 exact identity 軌道內、具回收價值的候選;
真正低信心與 hard veto 不再反覆空轉。
"""
if self.engine is None:
raise RuntimeError("需要注入 SQLAlchemy engine")
from sqlalchemy import text
sql = text("""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
),
latest_attempt AS (
SELECT DISTINCT ON (cma.sku)
cma.sku,
cma.best_competitor_product_id,
cma.best_competitor_product_name,
cma.best_match_score,
cma.attempt_status,
cma.hard_veto,
cma.attempted_at
FROM competitor_match_attempts cma
WHERE cma.source = 'pchome'
ORDER BY cma.sku, cma.attempted_at DESC, cma.id DESC
)
SELECT
lm.product_id,
lm.sku,
lm.name,
lm.category,
lm.momo_price,
la.best_competitor_product_id AS competitor_product_id,
la.best_competitor_product_name AS competitor_product_name,
la.best_match_score,
la.attempt_status
FROM latest_momo lm
JOIN latest_attempt la
ON la.sku = lm.sku
LEFT JOIN competitor_prices cp
ON cp.sku = lm.sku
AND cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND COALESCE(cp.match_score, 0) >= :match_score_floor
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
WHERE lm.rn = 1
AND cp.sku IS NULL
AND la.attempt_status IN ('low_score', 'refresh_low_score', 'recoverable_low_score')
AND la.best_competitor_product_id IS NOT NULL
AND la.best_competitor_product_id <> ''
AND COALESCE(la.best_match_score, 0) >= :min_score
AND COALESCE(la.hard_veto, false) = false
ORDER BY la.best_match_score DESC NULLS LAST, lm.momo_price DESC NULLS LAST, lm.sku
LIMIT :limit
""")
with self.engine.connect() as conn:
rows = conn.execute(
sql,
{
"limit": max(1, min(int(limit), 300)),
"min_score": float(min_score),
"match_score_floor": MIN_MATCH_SCORE,
},
).fetchall()
return [dict(r._mapping) for r in rows]
def _fetch_expired_identity_skus(self, limit: int = 120) -> list:
"""
取得 identity_v2 已確認、但 PChome 價格快取過期的商品。
這些商品不需重新 keyword search先用既有 PChome product_id 批次刷新價格。
"""
if self.engine is None:
raise RuntimeError("需要注入 SQLAlchemy engine")
from sqlalchemy import text
sql = text("""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
)
SELECT
lm.product_id,
lm.sku,
lm.name,
lm.category,
lm.momo_price,
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score,
cp.expires_at
FROM latest_momo lm
JOIN competitor_prices cp
ON cp.sku = lm.sku
AND cp.source = 'pchome'
AND cp.competitor_product_id IS NOT NULL
AND cp.competitor_product_id <> ''
AND cp.expires_at IS NOT NULL
AND cp.expires_at <= CURRENT_TIMESTAMP
AND COALESCE(cp.match_score, 0) >= :match_score_floor
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
WHERE lm.rn = 1
ORDER BY cp.expires_at ASC, lm.momo_price DESC NULLS LAST, lm.sku
LIMIT :limit
""")
with self.engine.connect() as conn:
rows = conn.execute(
sql,
{"limit": max(1, min(int(limit), 500)), "match_score_floor": MIN_MATCH_SCORE},
).fetchall()
return [dict(r._mapping) for r in rows]
def _upsert_competitor_price(
self,
sku: str,
product, # PChomeProduct
match_score: float,
tags: list,
momo_product_id: int = None,
momo_price: float = None,
diagnostics=None,
source: str = "pchome",
):
"""單筆寫入/更新最新快取,並追加一筆歷史快照。"""
from sqlalchemy import text
_taipei = timezone(timedelta(hours=8))
expires_at = (datetime.now(_taipei) + timedelta(hours=TTL_HOURS)).strftime("%Y-%m-%d %H:%M:%S")
tags_json = json.dumps(tags, ensure_ascii=False)
diagnostic_payload = _match_diagnostics_payload(diagnostics)
diagnostic_codes = diagnostic_payload.get("reasons") or []
product_payload = _product_snapshot_payload(product)
with self.engine.begin() as conn:
self._ensure_competitor_price_history_table(conn)
self._ensure_competitor_prices_columns(conn)
json_expr = "CAST(:match_diagnostic_json AS jsonb)" if conn.dialect.name == "postgresql" else ":match_diagnostic_json"
codes_expr = "CAST(:diagnostic_codes AS jsonb)" if conn.dialect.name == "postgresql" else ":diagnostic_codes"
conn.execute(text("""
INSERT INTO competitor_prices
(sku, source, price, original_price, discount_pct,
competitor_product_id, competitor_product_name,
competitor_product_url, competitor_image_url, competitor_stock,
match_score, tags, match_diagnostic_json,
comparison_mode, hard_veto, diagnostic_codes,
crawled_at, expires_at)
VALUES
(:sku, :source, :price, :original_price, :discount_pct,
:comp_id, :comp_name,
:competitor_product_url, :competitor_image_url, :competitor_stock,
:match_score, :tags, {json_expr},
:comparison_mode, :hard_veto, {codes_expr},
CURRENT_TIMESTAMP, :expires_at)
ON CONFLICT (sku, source) DO UPDATE
SET price = EXCLUDED.price,
original_price = EXCLUDED.original_price,
discount_pct = EXCLUDED.discount_pct,
competitor_product_id = EXCLUDED.competitor_product_id,
competitor_product_name = EXCLUDED.competitor_product_name,
competitor_product_url = EXCLUDED.competitor_product_url,
competitor_image_url = EXCLUDED.competitor_image_url,
competitor_stock = EXCLUDED.competitor_stock,
match_score = EXCLUDED.match_score,
tags = EXCLUDED.tags,
match_diagnostic_json = EXCLUDED.match_diagnostic_json,
comparison_mode = EXCLUDED.comparison_mode,
hard_veto = EXCLUDED.hard_veto,
diagnostic_codes = EXCLUDED.diagnostic_codes,
crawled_at = CURRENT_TIMESTAMP,
expires_at = :expires_at
""".format(json_expr=json_expr, codes_expr=codes_expr)), {
"sku": sku,
"source": source,
"price": product.price,
"original_price":product.original_price,
"discount_pct": product.discount,
"comp_id": product.product_id,
"comp_name": product.name[:200],
**product_payload,
"match_score": match_score,
"tags": tags_json,
"match_diagnostic_json": json.dumps(diagnostic_payload, ensure_ascii=False) if diagnostic_payload else None,
"comparison_mode": diagnostic_payload.get("comparison_mode"),
"hard_veto": diagnostic_payload.get("hard_veto"),
"diagnostic_codes": json.dumps(diagnostic_codes, ensure_ascii=False) if diagnostic_codes else None,
"expires_at": expires_at,
})
conn.execute(text("""
INSERT INTO competitor_price_history
(sku, source, momo_product_id, momo_price,
price, original_price, discount_pct,
competitor_product_id, competitor_product_name,
competitor_product_url, competitor_image_url, competitor_stock,
match_score, tags, match_diagnostic_json,
comparison_mode, hard_veto, diagnostic_codes,
crawled_at)
VALUES
(:sku, :source, :momo_product_id, :momo_price,
:price, :original_price, :discount_pct,
:comp_id, :comp_name,
:competitor_product_url, :competitor_image_url, :competitor_stock,
:match_score, :tags, {json_expr},
:comparison_mode, :hard_veto, {codes_expr},
CURRENT_TIMESTAMP)
""".format(json_expr=json_expr, codes_expr=codes_expr)), {
"sku": sku,
"source": source,
"momo_product_id": momo_product_id,
"momo_price": momo_price,
"price": product.price,
"original_price": product.original_price,
"discount_pct": product.discount,
"comp_id": product.product_id,
"comp_name": product.name[:200],
**product_payload,
"match_score": match_score,
"tags": tags_json,
"match_diagnostic_json": json.dumps(diagnostic_payload, ensure_ascii=False) if diagnostic_payload else None,
"comparison_mode": diagnostic_payload.get("comparison_mode"),
"hard_veto": diagnostic_payload.get("hard_veto"),
"diagnostic_codes": json.dumps(diagnostic_codes, ensure_ascii=False) if diagnostic_codes else None,
})
def _should_upsert_competitor_price(
self,
sku: str,
product,
match_score: float,
source: str = "pchome",
) -> tuple[bool, str]:
"""
保護正式 competitor_prices若既有配對是不同 PChome 商品,
只有超高信心才允許覆蓋,避免新 matcher 一次污染核心比價資料。
"""
from sqlalchemy import text
with self.engine.connect() as conn:
row = conn.execute(text("""
SELECT competitor_product_id, competitor_product_name, match_score, tags
FROM competitor_prices
WHERE sku = :sku
AND source = :source
LIMIT 1
"""), {"sku": sku, "source": source}).mappings().first()
if not row:
return True, "new_match"
existing_id = str(row.get("competitor_product_id") or "")
existing_name = str(row.get("competitor_product_name") or "")
incoming_id = str(getattr(product, "product_id", "") or "")
incoming_name = str(getattr(product, "name", "") or "")
try:
existing_score = float(row.get("match_score") or 0)
except (TypeError, ValueError):
existing_score = 0.0
existing_tags = row.get("tags") or []
if isinstance(existing_tags, str):
try:
existing_tags = json.loads(existing_tags)
except Exception:
existing_tags = []
if "identity_v2" not in existing_tags:
return True, "replace_legacy_unverified"
if not existing_id or existing_id == incoming_id:
return True, "same_or_empty_existing"
if existing_score < MIN_MATCH_SCORE:
return True, f"replace_low_existing_score={existing_score:.3f}"
if existing_name and incoming_name:
try:
from services.marketplace_product_matcher import score_marketplace_match
candidate_equivalence = score_marketplace_match(existing_name, incoming_name)
except Exception:
candidate_equivalence = None
if (
candidate_equivalence
and candidate_equivalence.score >= 0.88
and not candidate_equivalence.hard_veto
and candidate_equivalence.comparison_mode == "exact_identity"
and match_score >= max(MIN_MATCH_SCORE, existing_score + 0.015)
):
return (
True,
"replace_same_identity_better_score="
f"{existing_score:.3f}->{match_score:.3f}",
)
if match_score >= REPLACE_DIFFERENT_PRODUCT_SCORE:
return True, f"replace_high_confidence_score={match_score:.3f}"
return (
False,
f"existing_match_conflict;existing_id={existing_id};"
f"incoming_id={incoming_id};existing_score={existing_score:.3f};"
f"incoming_score={match_score:.3f}",
)
def _fetch_latest_manual_review_for_candidate(
self,
sku: str,
competitor_product_id: str,
source: str = "pchome",
) -> Optional[dict]:
"""Read the latest human review for this exact candidate, if the table exists."""
if not competitor_product_id:
return None
from sqlalchemy import text
try:
with self.engine.connect() as conn:
row = conn.execute(text("""
SELECT review_action, review_reason, reviewer_identity, reviewed_at
FROM competitor_match_reviews
WHERE sku = :sku
AND source = :source
AND candidate_product_id = :candidate_id
ORDER BY reviewed_at DESC, id DESC
LIMIT 1
"""), {
"sku": sku,
"source": source,
"candidate_id": competitor_product_id,
}).mappings().first()
except Exception:
return None
return dict(row) if row else None
def _run_sku_items(self, skus: list, source: str = "pchome", label: str = "PChome 競品價格") -> FeederResult:
start = time.time()
if source != "pchome":
logger.warning(f"[Feeder] 尚未支援 source={source},跳過")
return FeederResult(0, 0, 0, 0, 0, 0.0)
from services.pchome_crawler import PChomeCrawler
crawler = PChomeCrawler(timeout=REQUEST_TIMEOUT, delay=RATE_DELAY)
logger.info(f"[Feeder] 開始抓取 {len(skus)} 支商品的 {label}")
matched = 0
skipped_no = 0
skipped_low = 0
errors = 0
history_written = 0
attempts_written = 0
for item in skus:
sku = item["sku"]
momo_name = item["name"]
momo_product_id = item.get("product_id")
momo_price = item.get("momo_price")
search_terms = _build_search_keywords(momo_name)
try:
products = _search_pchome_candidates(crawler, momo_name, search_terms, momo_price=momo_price)
if not products:
logger.debug(f"[Feeder] {sku} 無搜尋結果,跳過")
browse_diagnostic = self._prepare_browse_diagnostic(
momo_name,
search_terms=search_terms,
reason="no_result",
candidate_count=0,
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=0,
attempt_status="no_result",
browse_diagnostic=browse_diagnostic,
source=source,
)
attempts_written += 1
skipped_no += 1
continue
ranked_matches = _rank_match_details(momo_name, products, momo_price=momo_price)
if not ranked_matches:
browse_diagnostic = self._prepare_browse_diagnostic(
momo_name,
search_terms=search_terms,
reason="no_match",
candidate_count=len(products),
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=len(products),
attempt_status="no_match",
browse_diagnostic=browse_diagnostic,
source=source,
)
attempts_written += 1
skipped_no += 1
continue
selected_match = None
manually_rejected_ids: list[str] = []
for candidate_product, candidate_score, candidate_diagnostics in ranked_matches:
candidate_review = self._fetch_latest_manual_review_for_candidate(
sku,
getattr(candidate_product, "product_id", None),
source=source,
)
if (candidate_review or {}).get("review_action") == "reject_identity":
manually_rejected_ids.append(str(getattr(candidate_product, "product_id", "") or ""))
continue
selected_match = (
candidate_product,
candidate_score,
candidate_diagnostics,
candidate_review,
)
break
if not selected_match:
best_product, score, diagnostics = ranked_matches[0]
rejected_note = ",".join(product_id for product_id in manually_rejected_ids if product_id)
logger.info(
f"[Feeder] {sku} 所有可信候選都已被人工否決,跳過正式寫入 | "
f"rejected_candidates={rejected_note}"
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=len(products),
attempt_status="manual_rejected",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
error_message=(
f"manual_review_rejected; rejected_candidates={rejected_note}; "
f"{_format_match_diagnostics(diagnostics)}"
),
source=source,
)
attempts_written += 1
skipped_low += 1
continue
best_product, score, diagnostics, manual_review = selected_match
manual_action = (manual_review or {}).get("review_action")
if manual_action == "unit_price_required":
logger.info(
f"[Feeder] {sku} 候選已被人工標記為單位價比較,不寫正式總價差 | "
f"candidate={getattr(best_product, 'product_id', None)}"
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=len(products),
attempt_status="manual_unit_price_required",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
error_message=f"manual_review_unit_price_required; {_format_match_diagnostics(diagnostics)}",
source=source,
)
attempts_written += 1
skipped_low += 1
continue
manual_accept_override = manual_action == "accept_identity"
if getattr(diagnostics, "comparison_mode", "") == "unit_comparable" and not manual_accept_override:
logger.info(
f"[Feeder] {sku} 候選屬單位價可比但非同販售組合,"
f"不寫入正式價差 | {_format_match_diagnostics(diagnostics)}"
)
browse_diagnostic = self._prepare_browse_diagnostic(
momo_name,
search_terms=search_terms,
reason="unit_comparable",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
candidate_count=len(products),
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=len(products),
attempt_status="unit_comparable",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
browse_diagnostic=browse_diagnostic,
error_message=_format_match_diagnostics(diagnostics),
source=source,
)
attempts_written += 1
skipped_low += 1
continue
if score < MIN_MATCH_SCORE and not manual_accept_override:
attempt_status = _classify_low_score_attempt(score, diagnostics)
if (
attempt_status == "recoverable_low_score"
and _has_variant_selection_gap(momo_name, ranked_matches, score)
):
attempt_status = "true_low_confidence"
logger.debug(
f"[Feeder] {sku} 比對分數過低 ({score:.3f} < {MIN_MATCH_SCORE})"
f"{_format_match_diagnostics(diagnostics)}"
)
browse_diagnostic = self._prepare_browse_diagnostic(
momo_name,
search_terms=search_terms,
reason=attempt_status,
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
candidate_count=len(products),
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=len(products),
attempt_status=attempt_status,
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
browse_diagnostic=browse_diagnostic,
error_message=_format_match_diagnostics(diagnostics),
source=source,
)
attempts_written += 1
skipped_low += 1
continue
if manual_accept_override:
score = max(score, MIN_MATCH_SCORE)
tags = _extend_match_tags(_extract_tags(best_product), diagnostics)
if manual_accept_override:
tags.extend(["manual_review", "manual_accept"])
tags = [tag for tag in tags if tag != "identity_veto"]
tags = list(dict.fromkeys(tags))
should_write, write_reason = self._should_upsert_competitor_price(
sku,
best_product,
score,
source=source,
)
if manual_accept_override and not should_write:
should_write = True
write_reason = "manual_accept_override"
if not should_write:
logger.info(f"[Feeder] {sku} 進入人工覆核,不覆蓋既有配對 | {write_reason}")
browse_diagnostic = self._prepare_browse_diagnostic(
momo_name,
search_terms=search_terms,
reason="protected_existing_match",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
candidate_count=len(products),
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=len(products),
attempt_status="protected_existing_match",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
browse_diagnostic=browse_diagnostic,
error_message=f"{write_reason}; {_format_match_diagnostics(diagnostics)}",
source=source,
)
attempts_written += 1
skipped_low += 1
continue
tags.append(write_reason)
self._upsert_competitor_price(
sku,
best_product,
score,
tags,
momo_product_id=momo_product_id,
momo_price=momo_price,
diagnostics=diagnostics,
source=source,
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=len(products),
attempt_status="matched",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
source=source,
)
matched += 1
history_written += 1
attempts_written += 1
logger.debug(
f"[Feeder] {sku} → PChome ${best_product.price} "
f"score={score:.3f} tags={tags}"
)
except Exception as e:
logger.error(f"[Feeder] {sku} 處理失敗: {e}")
try:
browse_diagnostic = self._prepare_browse_diagnostic(
momo_name,
search_terms=search_terms,
reason="crawler_error",
candidate_count=0,
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
attempt_status="error",
browse_diagnostic=browse_diagnostic,
error_message=str(e),
source=source,
)
attempts_written += 1
except Exception as attempt_error:
logger.warning(f"[Feeder] {sku} 比對嘗試紀錄寫入失敗: {attempt_error}")
errors += 1
duration = round(time.time() - start, 2)
logger.info(
f"[Feeder] 完成 matched={matched} skipped_no={skipped_no} "
f"skipped_low={skipped_low} errors={errors} "
f"history_written={history_written} attempts_written={attempts_written} 耗時={duration}s"
)
return FeederResult(
total_skus=len(skus),
matched=matched,
skipped_no_result=skipped_no,
skipped_low_score=skipped_low,
errors=errors,
duration_sec=duration,
history_written=history_written,
attempts_written=attempts_written,
)
def _run_known_identity_refresh_items(
self,
skus: list,
source: str = "pchome",
label: str = "已確認身份價格刷新",
) -> FeederResult:
start = time.time()
if source != "pchome":
logger.warning(f"[Feeder] 尚未支援 source={source},跳過")
return FeederResult(0, 0, 0, 0, 0, 0.0)
if not skus:
return FeederResult(0, 0, 0, 0, 0, 0.0)
from services.pchome_crawler import PChomeCrawler
crawler = PChomeCrawler(timeout=REQUEST_TIMEOUT, delay=RATE_DELAY)
requested_ids = [
str(item.get("competitor_product_id") or "").strip()
for item in skus
if str(item.get("competitor_product_id") or "").strip()
]
ok, message, products = crawler.fetch_product_details(requested_ids, batch_size=20)
product_map = {_product_id_key(product.product_id): product for product in products} if ok else {}
logger.info(
f"[Feeder] {label} product_id 批次查詢 | requested={len(requested_ids)} "
f"returned={len(product_map)} msg={message}"
)
matched = 0
skipped_no = 0
skipped_low = 0
errors = 0
history_written = 0
attempts_written = 0
for item in skus:
sku = item["sku"]
momo_name = item["name"]
momo_product_id = item.get("product_id")
momo_price = item.get("momo_price")
competitor_product_id = str(item.get("competitor_product_id") or "").strip()
search_terms = [f"known_product_id:{competitor_product_id}"] if competitor_product_id else []
try:
product = product_map.get(_product_id_key(competitor_product_id))
if not product:
recovered, recovery_terms, recovery_candidate_count = _recover_low_score_with_fresh_search(
crawler,
momo_name,
momo_price=momo_price,
existing_product_id=competitor_product_id,
)
if recovered:
best_product, score, diagnostics = recovered
if getattr(diagnostics, "comparison_mode", "") == "unit_comparable":
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms + [term for term in recovery_terms if term not in search_terms],
candidate_count=max(1, recovery_candidate_count),
attempt_status="refresh_unit_comparable",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
error_message=_format_match_diagnostics(diagnostics),
source=source,
)
skipped_low += 1
attempts_written += 1
continue
if score >= MIN_MATCH_SCORE and not getattr(diagnostics, "hard_veto", False):
tags = _extend_match_tags(
_extract_tags(best_product),
diagnostics,
["refresh_known_identity", "fresh_search_recovery", "missing_known_product_id"],
)
should_write, write_reason = self._should_upsert_competitor_price(
sku,
best_product,
score,
source=source,
)
attempt_terms = search_terms + [term for term in recovery_terms if term not in search_terms]
if not should_write:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=attempt_terms,
candidate_count=max(1, recovery_candidate_count),
attempt_status="protected_existing_match",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
error_message=f"{write_reason}; {_format_match_diagnostics(diagnostics)}",
source=source,
)
skipped_low += 1
attempts_written += 1
continue
tags.append(write_reason)
self._upsert_competitor_price(
sku,
best_product,
score,
tags,
momo_product_id=momo_product_id,
momo_price=momo_price,
diagnostics=diagnostics,
source=source,
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=attempt_terms,
candidate_count=max(1, recovery_candidate_count),
attempt_status="matched",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
source=source,
)
matched += 1
history_written += 1
attempts_written += 1
continue
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms + [term for term in recovery_terms if term not in search_terms],
candidate_count=max(0, recovery_candidate_count),
attempt_status="refresh_no_result",
error_message=f"PChome product_id not returned: {competitor_product_id}",
source=source,
)
skipped_no += 1
attempts_written += 1
continue
result = _find_best_match_detail(momo_name, [product], momo_price=momo_price)
if not result:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=1,
attempt_status="refresh_no_match",
source=source,
)
skipped_no += 1
attempts_written += 1
continue
best_product, score, diagnostics = result
if getattr(diagnostics, "comparison_mode", "") == "unit_comparable":
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=1,
attempt_status="refresh_unit_comparable",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
error_message=_format_match_diagnostics(diagnostics),
source=source,
)
skipped_low += 1
attempts_written += 1
continue
if score < MIN_MATCH_SCORE:
recovery_terms: list[str] = []
recovery_candidate_count = 0
if not getattr(diagnostics, "hard_veto", False):
recovered, recovery_terms, recovery_candidate_count = _recover_low_score_with_fresh_search(
crawler,
momo_name,
momo_price=momo_price,
existing_product_id=competitor_product_id,
)
if recovered:
recovered_product, recovered_score, recovered_diagnostics = recovered
if recovered_score > score:
best_product, score, diagnostics = recovered_product, recovered_score, recovered_diagnostics
if score >= MIN_MATCH_SCORE:
extras = ["refresh_known_identity"]
if recovery_terms:
extras.append("fresh_search_recovery")
tags = _extend_match_tags(_extract_tags(best_product), diagnostics, extras)
should_write, write_reason = self._should_upsert_competitor_price(
sku,
best_product,
score,
source=source,
)
candidate_count = max(1, recovery_candidate_count or 1)
attempt_terms = search_terms + [term for term in recovery_terms if term not in search_terms]
if not should_write:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=attempt_terms,
candidate_count=candidate_count,
attempt_status="protected_existing_match",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
error_message=f"{write_reason}; {_format_match_diagnostics(diagnostics)}",
source=source,
)
skipped_low += 1
attempts_written += 1
continue
tags.append(write_reason)
self._upsert_competitor_price(
sku,
best_product,
score,
tags,
momo_product_id=momo_product_id,
momo_price=momo_price,
diagnostics=diagnostics,
source=source,
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=attempt_terms,
candidate_count=candidate_count,
attempt_status="matched",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
source=source,
)
matched += 1
history_written += 1
attempts_written += 1
continue
attempt_status = _classify_low_score_attempt(score, diagnostics)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms + [term for term in recovery_terms if term not in search_terms],
candidate_count=max(1, recovery_candidate_count or 1),
attempt_status=attempt_status,
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
error_message=_format_match_diagnostics(diagnostics),
source=source,
)
skipped_low += 1
attempts_written += 1
continue
tags = _extend_match_tags(_extract_tags(best_product), diagnostics, ["refresh_known_identity"])
should_write, write_reason = self._should_upsert_competitor_price(
sku,
best_product,
score,
source=source,
)
if not should_write:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=1,
attempt_status="protected_existing_match",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
error_message=f"{write_reason}; {_format_match_diagnostics(diagnostics)}",
source=source,
)
skipped_low += 1
attempts_written += 1
continue
tags.append(write_reason)
self._upsert_competitor_price(
sku,
best_product,
score,
tags,
momo_product_id=momo_product_id,
momo_price=momo_price,
diagnostics=diagnostics,
source=source,
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=1,
attempt_status="matched",
best_product=best_product,
best_score=score,
diagnostics=diagnostics,
source=source,
)
matched += 1
history_written += 1
attempts_written += 1
except Exception as e:
logger.error(f"[Feeder] {sku} 已確認身份刷新失敗: {e}")
try:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
attempt_status="refresh_error",
error_message=str(e),
source=source,
)
attempts_written += 1
except Exception as attempt_error:
logger.warning(f"[Feeder] {sku} 刷新嘗試紀錄寫入失敗: {attempt_error}")
errors += 1
duration = round(time.time() - start, 2)
logger.info(
f"[Feeder] {label} 完成 matched={matched}/{len(skus)} "
f"skip_no={skipped_no} skip_low={skipped_low} errors={errors} "
f"history_written={history_written} attempts_written={attempts_written} 耗時={duration}s"
)
return FeederResult(
total_skus=len(skus),
matched=matched,
skipped_no_result=skipped_no,
skipped_low_score=skipped_low,
errors=errors,
duration_sec=duration,
history_written=history_written,
attempts_written=attempts_written,
)
def run(self, source: str = "pchome") -> FeederResult:
"""
執行一輪競品價格抓取與寫入
Args:
source: 競品來源代碼(目前支援 'pchome'
Returns:
FeederResult
"""
try:
skus = self._fetch_active_skus()
except Exception as e:
logger.error(f"[Feeder] 讀取商品清單失敗: {e}")
return FeederResult(0, 0, 0, 0, 1, 0.0)
return self._run_sku_items(skus, source=source, label="PChome 競品價格")
def run_expired_identity_refresh(self, limit: int = 120, source: str = "pchome") -> FeederResult:
"""刷新已通過 identity_v2、但 PChome 價格快取過期的商品。"""
try:
skus = self._fetch_expired_identity_skus(limit=limit)
except Exception as e:
logger.error(f"[Feeder] 讀取過期 identity_v2 商品失敗: {e}")
return FeederResult(0, 0, 0, 0, 1, 0.0)
return self._run_known_identity_refresh_items(skus, source=source, label="identity_v2 過期價格刷新")
def run_retryable_candidate_revalidation(
self,
limit: int = 80,
min_score: float = 0.70,
source: str = "pchome",
) -> FeederResult:
"""重新評分近門檻候選,避免 matcher 升級後仍卡在舊的 low_score。"""
try:
skus = self._fetch_retryable_candidate_skus(limit=limit, min_score=min_score)
except Exception as e:
logger.error(f"[Feeder] 讀取近門檻候選失敗: {e}")
return FeederResult(0, 0, 0, 0, 1, 0.0)
return self._run_known_identity_refresh_items(
skus,
source=source,
label="近門檻候選重新評分",
)
def run_unmatched_priority(self, limit: int = 80, source: str = "pchome") -> FeederResult:
"""優先補抓尚未有有效 PChome 配對的高價商品。"""
try:
skus = self._fetch_unmatched_priority_skus(limit=limit)
except Exception as e:
logger.error(f"[Feeder] 讀取待比對優先商品失敗: {e}")
return FeederResult(0, 0, 0, 0, 1, 0.0)
return self._run_sku_items(skus, source=source, label="待比對優先補抓")
# ─────────────────────────────────────────────
# CLI 測試(不依賴 DB直接測試爬蟲 + 比對邏輯)
# python3 services/competitor_price_feeder.py
# ─────────────────────────────────────────────
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
from services.pchome_crawler import PChomeCrawler
test_items = [
{"sku": "A003", "name": "舒特膚AD乳液200ml"},
{"sku": "A001", "name": "玻尿酸面膜10片裝"},
{"sku": "A009", "name": "美白化妝水150ml"},
]
crawler = PChomeCrawler(delay=0.8)
print("=== Competitor Price Feeder CLI 測試 ===\n")
for item in test_items:
keyword = item["name"][:20]
ok, msg, products = crawler.search_products(keyword, limit=10)
if not ok or not products:
print(f"[{item['sku']}] 無結果: {msg}")
continue
result = _find_best_match(item["name"], products)
if not result:
print(f"[{item['sku']}] 無法比對")
continue
best, score = result
tags = _extract_tags(best)
symbol = "" if score >= MIN_MATCH_SCORE else "⚠️ 低分"
print(
f"{symbol} [{item['sku']}] {item['name'][:25]}\n"
f" → PChome: {best.name[:40]}\n"
f" → 售價 ${best.price} | 分數 {score:.3f} | 標籤 {tags}\n"
)