刷新過期 PChome identity 價格
All checks were successful
CD Pipeline / deploy (push) Successful in 1m2s

This commit is contained in:
OoO
2026-05-19 22:46:43 +08:00
parent cc3be7fd4f
commit d88d9b7326
5 changed files with 350 additions and 3 deletions

View File

@@ -341,6 +341,7 @@ LEFT JOIN competitor_prices cp
- `services/competitor_intel_repository.py` 是下游頁面、圖表、簡報的共用查詢出口;新增消費端不得各自硬寫不同 match threshold。
- competitor PPT 不可只輸出 matched rows 造成覆蓋率假象;`fetch_competitor_comparison_results()` 必須用 `LEFT JOIN valid_competitor` 保留高營收/高價但尚未有效配對的 MOMO 商品,並帶出 `match_status``candidate_count``best_match_score``match_diagnostic`,讓簡報與 AI 文案明確區分「高信心比對」與「待補身份/價格」。
- `services/competitor_identity_revalidator.py` 可對既有 `competitor_prices` legacy row 離線重跑 `identity_v2`:只有新版 matcher 分數 `>= 0.76` 且無 hard veto 才補 `identity_v2` / `legacy_revalidated` tags預設不刷新 `expires_at`,避免過期價格進入決策。
- `CompetitorPriceFeeder.run_expired_identity_refresh()` 會優先刷新已通過 `identity_v2` 但 TTL 過期的 PChome row直接用既有 `competitor_product_id` 批次呼叫 PChome 商品 API再用新版 matcher 重新驗證名稱/規格/價格 sanity通過後寫回 `competitor_prices``competitor_price_history`。這條路徑提升新鮮價格覆蓋率,但不降低 match threshold也不讓過期價格直接進入決策。
- Dashboard 必須把「待比對」拆成可診斷狀態:`價格過期待刷新``舊版配對待重驗``低分配對待審``身份否決``找不到同款``抓取異常``尚未搜尋`。不可再用單一「待比對」掩蓋資料品質原因。
### 執行方式

View File

@@ -2208,7 +2208,9 @@ def run_pchome_match_backfill_task():
include_expired=True,
write_attempts=True,
)
feeder_result = CompetitorPriceFeeder(engine=engine).run_unmatched_priority(limit=120)
feeder = CompetitorPriceFeeder(engine=engine)
refresh_result = feeder.run_expired_identity_refresh(limit=240)
feeder_result = feeder.run_unmatched_priority(limit=120)
pick_result = generate_product_pick_list(engine, limit=50)
clear_dashboard_cache()
clear_competitor_intel_cache()
@@ -2225,6 +2227,11 @@ def run_pchome_match_backfill_task():
"identity_revalidated_expired": revalidation_result.promoted_expired,
"identity_revalidation_rejected_low": revalidation_result.rejected_low_score,
"identity_revalidation_rejected_veto": revalidation_result.rejected_veto,
"expired_identity_refresh_total": refresh_result.total_skus,
"expired_identity_refresh_matched": refresh_result.matched,
"expired_identity_refresh_no_result": refresh_result.skipped_no_result,
"expired_identity_refresh_low_score": refresh_result.skipped_low_score,
"expired_identity_refresh_errors": refresh_result.errors,
"pick_candidates": pick_result.candidates,
"pick_written": pick_result.written,
"status": "Success",
@@ -2232,11 +2239,12 @@ def run_pchome_match_backfill_task():
logging.info(
f"[Scheduler] [PChomeBackfill] ✅ 完成 | "
f"revalidated={revalidation_result.promoted_fresh}+{revalidation_result.promoted_expired} "
f"refreshed={refresh_result.matched}/{refresh_result.total_skus} "
f"matched={feeder_result.matched}/{feeder_result.total_skus} "
f"history_written={feeder_result.history_written} "
f"pick_written={pick_result.written} "
f"errors={feeder_result.errors} "
f"耗時={feeder_result.duration_sec}s"
f"errors={feeder_result.errors + refresh_result.errors} "
f"耗時={feeder_result.duration_sec + refresh_result.duration_sec}s"
)
_save_stats('pchome_match_backfill', stats)

View File

@@ -155,6 +155,11 @@ def _format_match_diagnostics(diagnostics) -> str:
)
def _product_id_key(product_id: str) -> str:
"""Normalize PChome IDs for comparing cached IDs with API-returned IDs."""
return re.sub(r"[^A-Z0-9]", "", str(product_id or "").upper())
def _find_best_match_detail(
momo_name: str,
pchome_products: list,
@@ -529,6 +534,59 @@ class CompetitorPriceFeeder:
).fetchall()
return [dict(r._mapping) for r in rows]
def _fetch_expired_identity_skus(self, limit: int = 120) -> list:
"""
取得 identity_v2 已確認、但 PChome 價格快取過期的商品。
這些商品不需重新 keyword search先用既有 PChome product_id 批次刷新價格。
"""
if self.engine is None:
raise RuntimeError("需要注入 SQLAlchemy engine")
from sqlalchemy import text
sql = text("""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
)
SELECT
lm.product_id,
lm.sku,
lm.name,
lm.category,
lm.momo_price,
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score,
cp.expires_at
FROM latest_momo lm
JOIN competitor_prices cp
ON cp.sku = lm.sku
AND cp.source = 'pchome'
AND cp.competitor_product_id IS NOT NULL
AND cp.competitor_product_id <> ''
AND cp.expires_at IS NOT NULL
AND cp.expires_at <= CURRENT_TIMESTAMP
AND COALESCE(cp.match_score, 0) >= :match_score_floor
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
WHERE lm.rn = 1
ORDER BY cp.expires_at ASC, lm.momo_price DESC NULLS LAST, lm.sku
LIMIT :limit
""")
with self.engine.connect() as conn:
rows = conn.execute(
sql,
{"limit": max(1, min(int(limit), 500)), "match_score_floor": MIN_MATCH_SCORE},
).fetchall()
return [dict(r._mapping) for r in rows]
def _upsert_competitor_price(
self,
sku: str,
@@ -834,6 +892,195 @@ class CompetitorPriceFeeder:
attempts_written=attempts_written,
)
def _run_known_identity_refresh_items(
self,
skus: list,
source: str = "pchome",
label: str = "已確認身份價格刷新",
) -> FeederResult:
start = time.time()
if source != "pchome":
logger.warning(f"[Feeder] 尚未支援 source={source},跳過")
return FeederResult(0, 0, 0, 0, 0, 0.0)
if not skus:
return FeederResult(0, 0, 0, 0, 0, 0.0)
from services.pchome_crawler import PChomeCrawler
crawler = PChomeCrawler(timeout=30, delay=RATE_DELAY)
requested_ids = [
str(item.get("competitor_product_id") or "").strip()
for item in skus
if str(item.get("competitor_product_id") or "").strip()
]
ok, message, products = crawler.fetch_product_details(requested_ids, batch_size=20)
product_map = {_product_id_key(product.product_id): product for product in products} if ok else {}
logger.info(
f"[Feeder] {label} product_id 批次查詢 | requested={len(requested_ids)} "
f"returned={len(product_map)} msg={message}"
)
matched = 0
skipped_no = 0
skipped_low = 0
errors = 0
history_written = 0
attempts_written = 0
for item in skus:
sku = item["sku"]
momo_name = item["name"]
momo_product_id = item.get("product_id")
momo_price = item.get("momo_price")
competitor_product_id = str(item.get("competitor_product_id") or "").strip()
search_terms = [f"known_product_id:{competitor_product_id}"] if competitor_product_id else []
try:
product = product_map.get(_product_id_key(competitor_product_id))
if not product:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=0,
attempt_status="refresh_no_result",
error_message=f"PChome product_id not returned: {competitor_product_id}",
source=source,
)
skipped_no += 1
attempts_written += 1
continue
result = _find_best_match_detail(momo_name, [product], momo_price=momo_price)
if not result:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=1,
attempt_status="refresh_no_match",
source=source,
)
skipped_no += 1
attempts_written += 1
continue
best_product, score, diagnostics = result
if score < MIN_MATCH_SCORE:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=1,
attempt_status="refresh_low_score",
best_product=best_product,
best_score=score,
error_message=_format_match_diagnostics(diagnostics),
source=source,
)
skipped_low += 1
attempts_written += 1
continue
tags = _extract_tags(best_product)
tags.extend(getattr(diagnostics, "tags", []))
for reason in getattr(diagnostics, "reasons", ()) or ():
tags.append(f"match_{reason}")
tags.append("refresh_known_identity")
tags = list(dict.fromkeys(tags))
should_write, write_reason = self._should_upsert_competitor_price(
sku,
best_product,
score,
source=source,
)
if not should_write:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=1,
attempt_status="refresh_needs_review",
best_product=best_product,
best_score=score,
error_message=f"{write_reason}; {_format_match_diagnostics(diagnostics)}",
source=source,
)
skipped_low += 1
attempts_written += 1
continue
tags.append(write_reason)
self._upsert_competitor_price(
sku,
best_product,
score,
tags,
momo_product_id=momo_product_id,
momo_price=momo_price,
source=source,
)
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
candidate_count=1,
attempt_status="matched",
best_product=best_product,
best_score=score,
source=source,
)
matched += 1
history_written += 1
attempts_written += 1
except Exception as e:
logger.error(f"[Feeder] {sku} 已確認身份刷新失敗: {e}")
try:
self._record_match_attempt(
sku,
momo_name,
momo_product_id=momo_product_id,
momo_price=momo_price,
search_terms=search_terms,
attempt_status="refresh_error",
error_message=str(e),
source=source,
)
attempts_written += 1
except Exception as attempt_error:
logger.warning(f"[Feeder] {sku} 刷新嘗試紀錄寫入失敗: {attempt_error}")
errors += 1
duration = round(time.time() - start, 2)
logger.info(
f"[Feeder] {label} 完成 matched={matched}/{len(skus)} "
f"skip_no={skipped_no} skip_low={skipped_low} errors={errors} "
f"history_written={history_written} attempts_written={attempts_written} 耗時={duration}s"
)
return FeederResult(
total_skus=len(skus),
matched=matched,
skipped_no_result=skipped_no,
skipped_low_score=skipped_low,
errors=errors,
duration_sec=duration,
history_written=history_written,
attempts_written=attempts_written,
)
def run(self, source: str = "pchome") -> FeederResult:
"""
執行一輪競品價格抓取與寫入
@@ -852,6 +1099,16 @@ class CompetitorPriceFeeder:
return self._run_sku_items(skus, source=source, label="PChome 競品價格")
def run_expired_identity_refresh(self, limit: int = 120, source: str = "pchome") -> FeederResult:
"""刷新已通過 identity_v2、但 PChome 價格快取過期的商品。"""
try:
skus = self._fetch_expired_identity_skus(limit=limit)
except Exception as e:
logger.error(f"[Feeder] 讀取過期 identity_v2 商品失敗: {e}")
return FeederResult(0, 0, 0, 0, 1, 0.0)
return self._run_known_identity_refresh_items(skus, source=source, label="identity_v2 過期價格刷新")
def run_unmatched_priority(self, limit: int = 80, source: str = "pchome") -> FeederResult:
"""優先補抓尚未有有效 PChome 配對的高價商品。"""
try:

View File

@@ -1,5 +1,6 @@
from pathlib import Path
import logging
from datetime import datetime
ROOT = Path(__file__).resolve().parents[1]
@@ -24,6 +25,9 @@ def test_competitor_feeder_persists_all_match_attempt_outcomes():
assert "_should_upsert_competitor_price" in source
assert "replace_legacy_unverified" in source
assert "identity_v2" in source
assert "_fetch_expired_identity_skus" in source
assert "run_expired_identity_refresh" in source
assert "refresh_known_identity" in source
assert "CREATE TABLE IF NOT EXISTS competitor_match_attempts" in migration
assert "attempt_status" in migration
@@ -47,3 +51,75 @@ def test_competitor_feeder_logs_keyword_parser_fallback(monkeypatch, caplog):
assert terms
assert "fallback to cleaned product name" in caplog.text
def test_competitor_feeder_refreshes_expired_identity_by_known_product_id(monkeypatch):
from services.competitor_price_feeder import CompetitorPriceFeeder
from services.pchome_crawler import PChomeProduct
requested = []
product = PChomeProduct(
product_id="DDAB01-1900ABCD",
name="舒特膚 AD 乳液 200ml",
price=899,
original_price=999,
discount=10,
image_url="",
product_url="https://24h.pchome.com.tw/prod/DDAB01-1900ABCD",
stock=50,
store="24h",
rating=4.8,
review_count=12,
is_on_sale=True,
crawled_at=datetime.now(),
)
class FakeCrawler:
def __init__(self, *_args, **_kwargs):
pass
def fetch_product_details(self, product_ids, batch_size=20):
requested.extend(product_ids)
return True, "ok", [product]
monkeypatch.setattr("services.pchome_crawler.PChomeCrawler", FakeCrawler)
feeder = CompetitorPriceFeeder(engine=object())
writes = []
attempts = []
monkeypatch.setattr(
feeder,
"_should_upsert_competitor_price",
lambda *_args, **_kwargs: (True, "same_or_empty_existing"),
)
monkeypatch.setattr(
feeder,
"_upsert_competitor_price",
lambda sku, product, score, tags, **kwargs: writes.append({
"sku": sku,
"product_id": product.product_id,
"score": score,
"tags": tags,
**kwargs,
}),
)
monkeypatch.setattr(
feeder,
"_record_match_attempt",
lambda *args, **kwargs: attempts.append(kwargs),
)
result = feeder._run_known_identity_refresh_items([{
"sku": "A001",
"name": "舒特膚 AD 乳液 200ml",
"product_id": 1,
"momo_price": 980,
"competitor_product_id": "DDAB01-1900ABCD",
}])
assert requested == ["DDAB01-1900ABCD"]
assert result.matched == 1
assert writes[0]["product_id"] == "DDAB01-1900ABCD"
assert "identity_v2" in writes[0]["tags"]
assert "refresh_known_identity" in writes[0]["tags"]
assert attempts[0]["attempt_status"] == "matched"
assert attempts[0]["search_terms"] == ["known_product_id:DDAB01-1900ABCD"]

View File

@@ -359,6 +359,7 @@ def test_ai_product_pick_agent_uses_real_competitor_data_and_dashboard_action():
feeder_source = (ROOT / "services/competitor_price_feeder.py").read_text(encoding="utf-8")
agent_source = (ROOT / "services/ai_product_pick_agent.py").read_text(encoding="utf-8")
route_source = (ROOT / "routes/ai_routes.py").read_text(encoding="utf-8")
scheduler_source = (ROOT / "scheduler.py").read_text(encoding="utf-8")
template = (ROOT / "templates/ai_intelligence.html").read_text(encoding="utf-8")
assert "MIN_MATCH_SCORE = 0.76" in feeder_source
@@ -369,7 +370,11 @@ def test_ai_product_pick_agent_uses_real_competitor_data_and_dashboard_action():
assert "_search_pchome_candidates" in feeder_source
assert "crawler.search_products(keyword, limit=SEARCH_LIMIT)" in feeder_source
assert "_fetch_unmatched_priority_skus" in feeder_source
assert "_fetch_expired_identity_skus" in feeder_source
assert "run_expired_identity_refresh" in feeder_source
assert "fetch_product_details(requested_ids" in feeder_source
assert "run_unmatched_priority" in feeder_source
assert "run_expired_identity_refresh(limit=240)" in scheduler_source
assert "generate_product_pick_list" in agent_source
assert "clear_dashboard_cache()" in route_source