強化 PChome legacy 配對重驗證
All checks were successful
CD Pipeline / deploy (push) Successful in 1m3s

This commit is contained in:
OoO
2026-05-19 22:18:32 +08:00
parent f8222006b8
commit 40ddf4eee0
5 changed files with 597 additions and 2 deletions

View File

@@ -338,6 +338,8 @@ LEFT JOIN competitor_prices cp
- Dashboard、AI pick、Hermes、Excel export、daily/growth 圖表與 competitor PPT 必須以 `competitor_prices + competitor_price_history + competitor_match_attempts` 為短期唯一生產真相源,且只消費 `identity_v2` matcher 驗證過的配對;舊版僅靠 `match_score` 的快取不可直接進入決策或簡報。
- `pchome_matches` 與 live `pchome_batch()` 僅保留 legacy compatibility不得作為新簡報或 AI 決策主來源。
- `services/competitor_intel_repository.py` 是下游頁面、圖表、簡報的共用查詢出口;新增消費端不得各自硬寫不同 match threshold。
- `services/competitor_identity_revalidator.py` 可對既有 `competitor_prices` legacy row 離線重跑 `identity_v2`:只有新版 matcher 分數 `>= 0.76` 且無 hard veto 才補 `identity_v2` / `legacy_revalidated` tags預設不刷新 `expires_at`,避免過期價格進入決策。
- Dashboard 必須把「待比對」拆成可診斷狀態:`價格過期待刷新``舊版配對待重驗``低分配對待審``身份否決``找不到同款``抓取異常``尚未搜尋`。不可再用單一「待比對」掩蓋資料品質原因。
### 執行方式
@@ -345,6 +347,12 @@ LEFT JOIN competitor_prices cp
# 手動觸發一輪抓取
python3 services/competitor_price_feeder.py
# 預覽 legacy PChome 快取 identity_v2 重驗證(不寫入)
python3 -m services.competitor_identity_revalidator --limit 500
# 寫入安全通過的 identity_v2 tag不刷新過期價格
python3 -m services.competitor_identity_revalidator --limit 500 --apply
# 未來整合為 K3s CronJob每 4 小時)
# k8s/jobs/competitor-price-feeder-cronjob.yaml
```

View File

@@ -62,7 +62,70 @@ def _to_float(value):
return None
def _build_pchome_match_status(attempt=None):
def _build_pchome_match_status(attempt=None, ineligible=None):
if attempt:
status = attempt.get('attempt_status') or 'unknown'
if status == 'matched':
score = _to_float(attempt.get('best_match_score'))
score_text = f"最佳候選 {round(score * 100)}%" if score is not None else "已完成身份比對"
return {
'label': '已配對待刷新',
'tone': 'watch',
'summary': '曾通過 identity_v2但目前沒有有效價格快取等待下一輪刷新',
'detail': score_text,
}
if status == 'expired_match':
score = _to_float(attempt.get('best_match_score'))
score_text = f"身份分數 {round(score * 100)}%" if score is not None else "已完成身份比對"
return {
'label': '價格過期待刷新',
'tone': 'watch',
'summary': '同款身份已確認,但 PChome 價格快取過期,不顯示舊價避免誤判',
'detail': score_text,
}
if status == 'identity_veto':
score = _to_float(attempt.get('best_match_score'))
score_text = f"最佳候選 {round(score * 100)}%" if score is not None else "已拒絕候選"
return {
'label': '身份否決',
'tone': 'neutral',
'summary': '新版 identity_v2 判定不是同款,已阻擋自動比價',
'detail': score_text,
}
if ineligible:
reason = ineligible.get('reason') or 'not_eligible'
score = _to_float(ineligible.get('match_score'))
score_text = f"match {round(score * 100)}%" if score is not None else None
if reason == 'expired_match':
return {
'label': '價格過期待刷新',
'tone': 'watch',
'summary': '已有高信心同款配對,但 PChome 價格快取過期,等待補抓刷新',
'detail': score_text,
}
if reason == 'legacy_without_identity_v2':
return {
'label': '舊版配對待重驗',
'tone': 'neutral',
'summary': '舊版 PChome 配對尚未通過 identity_v2不進入正式決策',
'detail': score_text,
}
if reason == 'below_score_floor':
return {
'label': '低分配對待審',
'tone': 'neutral',
'summary': '已有候選但低於高信心門檻,避免錯配所以暫不採用',
'detail': score_text,
}
if reason == 'invalid_price':
return {
'label': '價格無效待刷新',
'tone': 'watch',
'summary': 'PChome 配對缺少有效價格,等待下一輪補抓',
'detail': None,
}
if not attempt:
return {
'label': '尚未搜尋',
@@ -218,6 +281,89 @@ def _load_pchome_competitor_map(session, skus):
return result
def _load_pchome_ineligible_competitor_map(session, skus):
"""Read non-decision PChome rows so the UI can explain why a SKU is pending."""
sku_list = [str(sku) for sku in skus if sku]
if not sku_list:
return {}
try:
stmt = text("""
WITH ineligible AS (
SELECT
sku,
price,
competitor_product_id,
competitor_product_name,
match_score,
tags,
crawled_at,
expires_at,
CASE
WHEN price IS NULL OR price <= 0 THEN 'invalid_price'
WHEN (expires_at IS NOT NULL AND expires_at <= CURRENT_TIMESTAMP)
AND COALESCE(match_score, 0) >= :match_score_floor
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
THEN 'expired_match'
WHEN NOT (COALESCE(tags, '[]'::jsonb) ? 'identity_v2')
THEN 'legacy_without_identity_v2'
WHEN COALESCE(match_score, 0) < :match_score_floor
THEN 'below_score_floor'
ELSE 'not_eligible'
END AS reason,
ROW_NUMBER() OVER (
PARTITION BY sku
ORDER BY
CASE
WHEN (expires_at IS NOT NULL AND expires_at <= CURRENT_TIMESTAMP)
AND COALESCE(match_score, 0) >= :match_score_floor
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
THEN 0
WHEN NOT (COALESCE(tags, '[]'::jsonb) ? 'identity_v2') THEN 1
WHEN COALESCE(match_score, 0) < :match_score_floor THEN 2
ELSE 3
END,
crawled_at DESC NULLS LAST,
match_score DESC NULLS LAST
) AS rn
FROM competitor_prices
WHERE source = 'pchome'
AND sku IN :skus
AND NOT (
(expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
AND price IS NOT NULL
AND price > 0
AND COALESCE(match_score, 0) >= :match_score_floor
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
)
)
SELECT *
FROM ineligible
WHERE rn = 1
""").bindparams(bindparam("skus", expanding=True))
rows = session.execute(
stmt,
{"skus": sku_list, "match_score_floor": PCHOME_MATCH_SCORE_FLOOR},
).mappings().all()
except Exception as exc:
sys_log.warning(f"[Dashboard] PChome 非有效配對原因讀取略過: {exc}")
return {}
result = {}
for row in rows:
result[str(row.get('sku'))] = {
'reason': row.get('reason'),
'price': _to_float(row.get('price')),
'product_id': row.get('competitor_product_id'),
'product_name': row.get('competitor_product_name'),
'match_score': _to_float(row.get('match_score')),
'tags': row.get('tags'),
'crawled_at': row.get('crawled_at'),
'expires_at': row.get('expires_at'),
}
return result
def _load_pchome_match_attempt_map(session, skus):
sku_list = [str(sku) for sku in skus if sku]
if not sku_list:
@@ -1558,14 +1704,20 @@ def index():
session,
[item['record'].product.i_code for item in paged_items]
)
pchome_ineligible_map = _load_pchome_ineligible_competitor_map(
session,
[item['record'].product.i_code for item in paged_items]
)
for item in paged_items:
product = item['record'].product
sku = str(product.i_code)
competitor = pchome_map.get(sku)
attempt = pchome_attempt_map.get(sku)
match_status = _build_pchome_match_status(attempt)
ineligible = pchome_ineligible_map.get(sku)
match_status = _build_pchome_match_status(attempt, ineligible=ineligible)
item['pchome_competitor'] = competitor
item['pchome_match_attempt'] = attempt
item['pchome_ineligible_competitor'] = ineligible
item['pchome_match_status'] = match_status
item['competitor_decision'] = _build_competitor_decision(
item['record'].price,

View File

@@ -2193,6 +2193,7 @@ def run_pchome_match_backfill_task():
from sqlalchemy import create_engine
from services.ai_product_pick_agent import generate_product_pick_list
from services.cache_manager import clear_dashboard_cache
from services.competitor_identity_revalidator import revalidate_existing_competitor_identities
from services.competitor_intel_repository import clear_competitor_intel_cache
from services.competitor_price_feeder import CompetitorPriceFeeder
@@ -2200,6 +2201,13 @@ def run_pchome_match_backfill_task():
logging.info(f"[Scheduler] [PChomeBackfill] 🚀 啟動待比對補抓任務 | {now_str}")
engine = create_engine(DATABASE_PATH)
revalidation_result = revalidate_existing_competitor_identities(
engine,
limit=500,
dry_run=False,
include_expired=True,
write_attempts=True,
)
feeder_result = CompetitorPriceFeeder(engine=engine).run_unmatched_priority(limit=120)
pick_result = generate_product_pick_list(engine, limit=50)
clear_dashboard_cache()
@@ -2213,12 +2221,17 @@ def run_pchome_match_backfill_task():
"errors": feeder_result.errors,
"duration_sec": feeder_result.duration_sec,
"history_written": feeder_result.history_written,
"identity_revalidated_fresh": revalidation_result.promoted_fresh,
"identity_revalidated_expired": revalidation_result.promoted_expired,
"identity_revalidation_rejected_low": revalidation_result.rejected_low_score,
"identity_revalidation_rejected_veto": revalidation_result.rejected_veto,
"pick_candidates": pick_result.candidates,
"pick_written": pick_result.written,
"status": "Success",
}
logging.info(
f"[Scheduler] [PChomeBackfill] ✅ 完成 | "
f"revalidated={revalidation_result.promoted_fresh}+{revalidation_result.promoted_expired} "
f"matched={feeder_result.matched}/{feeder_result.total_skus} "
f"history_written={feeder_result.history_written} "
f"pick_written={pick_result.written} "

View File

@@ -0,0 +1,357 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Revalidate legacy PChome competitor rows with the identity_v2 matcher.
This module upgrades only rows that the current matcher can prove are the same
product. It does not relax the production match gate and it does not refresh
expired prices by default.
"""
from __future__ import annotations
import argparse
import json
import logging
from dataclasses import asdict, dataclass
from typing import Any, Iterable
from sqlalchemy import text
from services.competitor_price_feeder import MIN_MATCH_SCORE
from services.marketplace_product_matcher import MatchDiagnostics, score_marketplace_match
logger = logging.getLogger(__name__)
MATCH_ALGORITHM_TAG = "identity_v2"
REVALIDATION_TAG = "legacy_revalidated"
REVALIDATION_SOURCE_TAG = "revalidated_from_competitor_prices"
@dataclass
class RevalidationDecision:
sku: str
competitor_product_id: str | None
status: str
accepted: bool
score: float | None
hard_veto: bool
is_expired: bool
tags: list[str]
diagnostic: str
@dataclass
class RevalidationStats:
scanned: int = 0
promoted_fresh: int = 0
promoted_expired: int = 0
rejected_low_score: int = 0
rejected_veto: int = 0
skipped: int = 0
attempts_written: int = 0
errors: int = 0
samples: list[dict[str, Any]] | None = None
def _json_tags(value: Any) -> list[str]:
if not value:
return []
if isinstance(value, list):
return [str(item) for item in value if item]
if isinstance(value, tuple):
return [str(item) for item in value if item]
if isinstance(value, str):
try:
parsed = json.loads(value)
except Exception:
return []
if isinstance(parsed, list):
return [str(item) for item in parsed if item]
return []
def _dedupe(values: Iterable[str]) -> list[str]:
result: list[str] = []
seen: set[str] = set()
for value in values:
if not value or value in seen:
continue
result.append(value)
seen.add(value)
return result
def _diagnostic_text(diagnostics: MatchDiagnostics | None) -> str:
if diagnostics is None:
return ""
reasons = ",".join(diagnostics.reasons or ())
return (
f"score={diagnostics.score}; brand={diagnostics.brand_score}; "
f"token={diagnostics.token_score}; spec={diagnostics.spec_score}; "
f"seq={diagnostics.sequence_score}; type={diagnostics.type_score}; "
f"penalty={diagnostics.price_penalty}; veto={diagnostics.hard_veto}; "
f"reasons={reasons}"
)
def _build_revalidated_tags(existing_tags: Any, diagnostics: MatchDiagnostics) -> list[str]:
tags = list(_json_tags(existing_tags))
tags.extend(diagnostics.tags)
tags.append(REVALIDATION_TAG)
tags.append(REVALIDATION_SOURCE_TAG)
for reason in diagnostics.reasons or ():
tags.append(f"match_{reason}")
return _dedupe(tags)
def classify_legacy_competitor_row(row: dict[str, Any]) -> RevalidationDecision:
"""Classify a legacy competitor_prices row without mutating storage."""
sku = str(row.get("sku") or "")
momo_name = row.get("momo_name") or ""
competitor_name = row.get("competitor_product_name") or ""
competitor_product_id = row.get("competitor_product_id")
is_expired = bool(row.get("is_expired"))
if not sku or not momo_name or not competitor_name:
return RevalidationDecision(
sku=sku,
competitor_product_id=competitor_product_id,
status="skipped_missing_identity_text",
accepted=False,
score=None,
hard_veto=False,
is_expired=is_expired,
tags=_json_tags(row.get("tags")),
diagnostic="missing momo_name or competitor_product_name",
)
diagnostics = score_marketplace_match(
momo_name,
competitor_name,
momo_price=row.get("momo_price"),
competitor_price=row.get("pchome_price"),
)
tags = _build_revalidated_tags(row.get("tags"), diagnostics)
diagnostic = _diagnostic_text(diagnostics)
if diagnostics.score >= MIN_MATCH_SCORE and not diagnostics.hard_veto:
return RevalidationDecision(
sku=sku,
competitor_product_id=competitor_product_id,
status="expired_match" if is_expired else "matched",
accepted=True,
score=diagnostics.score,
hard_veto=False,
is_expired=is_expired,
tags=tags,
diagnostic=diagnostic,
)
return RevalidationDecision(
sku=sku,
competitor_product_id=competitor_product_id,
status="identity_veto" if diagnostics.hard_veto else "low_score",
accepted=False,
score=diagnostics.score,
hard_veto=diagnostics.hard_veto,
is_expired=is_expired,
tags=tags,
diagnostic=diagnostic,
)
def _fetch_legacy_rows(conn, source: str, limit: int, include_expired: bool) -> list[dict[str, Any]]:
expiry_filter = "" if include_expired else "AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)"
sql = text(f"""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name AS momo_name,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
)
SELECT
cp.id AS competitor_price_id,
lm.product_id AS momo_product_id,
lm.sku,
lm.momo_name,
lm.momo_price,
cp.price AS pchome_price,
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score AS previous_match_score,
cp.tags,
cp.crawled_at,
cp.expires_at,
CASE
WHEN cp.expires_at IS NOT NULL AND cp.expires_at <= CURRENT_TIMESTAMP
THEN TRUE ELSE FALSE
END AS is_expired
FROM latest_momo lm
JOIN competitor_prices cp
ON cp.sku = lm.sku
AND cp.source = :source
WHERE lm.rn = 1
AND cp.price IS NOT NULL
AND cp.price > 0
AND cp.competitor_product_name IS NOT NULL
AND NOT (COALESCE(cp.tags, '[]'::jsonb) ? :identity_tag)
{expiry_filter}
ORDER BY
CASE WHEN cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP THEN 0 ELSE 1 END,
cp.crawled_at DESC NULLS LAST,
cp.match_score DESC NULLS LAST,
lm.sku
LIMIT :limit
""")
rows = conn.execute(
sql,
{
"source": source,
"identity_tag": MATCH_ALGORITHM_TAG,
"limit": max(1, int(limit)),
},
).mappings().all()
return [dict(row) for row in rows]
def _ensure_attempt_table(conn) -> None:
from services.competitor_price_feeder import CompetitorPriceFeeder
feeder = CompetitorPriceFeeder(engine=None)
feeder._ensure_competitor_match_attempts_table(conn)
def _insert_attempt(conn, row: dict[str, Any], decision: RevalidationDecision, source: str) -> None:
search_terms_expr = "CAST(:search_terms AS jsonb)" if conn.dialect.name == "postgresql" else ":search_terms"
conn.execute(text(f"""
INSERT INTO competitor_match_attempts
(sku, source, momo_product_id, momo_product_name, momo_price,
search_terms, candidate_count, attempt_status,
best_competitor_product_id, best_competitor_product_name,
best_competitor_price, best_match_score, error_message,
attempted_at)
VALUES
(:sku, :source, :momo_product_id, :momo_product_name, :momo_price,
{search_terms_expr}, :candidate_count, :attempt_status,
:best_id, :best_name,
:best_price, :best_score, :error_message,
CURRENT_TIMESTAMP)
"""), {
"sku": decision.sku,
"source": source,
"momo_product_id": row.get("momo_product_id"),
"momo_product_name": row.get("momo_name"),
"momo_price": row.get("momo_price"),
"search_terms": json.dumps(["legacy_competitor_prices_revalidation"], ensure_ascii=False),
"candidate_count": 1,
"attempt_status": decision.status,
"best_id": decision.competitor_product_id,
"best_name": (row.get("competitor_product_name") or "")[:300] or None,
"best_price": row.get("pchome_price"),
"best_score": decision.score,
"error_message": (decision.diagnostic or "")[:1000] or None,
})
def revalidate_existing_competitor_identities(
engine,
*,
source: str = "pchome",
limit: int = 500,
dry_run: bool = True,
include_expired: bool = True,
write_attempts: bool = True,
sample_limit: int = 10,
) -> RevalidationStats:
"""Re-score legacy competitor_prices rows and optionally persist safe upgrades."""
stats = RevalidationStats(samples=[])
with engine.begin() as conn:
rows = _fetch_legacy_rows(conn, source=source, limit=limit, include_expired=include_expired)
stats.scanned = len(rows)
if not dry_run and write_attempts:
_ensure_attempt_table(conn)
for row in rows:
try:
decision = classify_legacy_competitor_row(row)
if len(stats.samples or []) < sample_limit:
stats.samples.append({
"sku": decision.sku,
"status": decision.status,
"score": decision.score,
"is_expired": decision.is_expired,
"momo_name": (row.get("momo_name") or "")[:60],
"competitor_name": (row.get("competitor_product_name") or "")[:60],
})
if decision.accepted:
if decision.is_expired:
stats.promoted_expired += 1
else:
stats.promoted_fresh += 1
if not dry_run:
conn.execute(text("""
UPDATE competitor_prices
SET
match_score = :match_score,
tags = CAST(:tags AS jsonb)
WHERE id = :competitor_price_id
"""), {
"match_score": decision.score,
"tags": json.dumps(decision.tags, ensure_ascii=False),
"competitor_price_id": row.get("competitor_price_id"),
})
elif decision.status == "identity_veto":
stats.rejected_veto += 1
elif decision.status == "low_score":
stats.rejected_low_score += 1
else:
stats.skipped += 1
if not dry_run and write_attempts:
_insert_attempt(conn, row, decision, source=source)
stats.attempts_written += 1
except Exception as exc:
stats.errors += 1
logger.warning(
"[CompetitorIdentityRevalidator] row failed sku=%s: %s",
row.get("sku"),
exc,
exc_info=True,
)
return stats
def _main() -> int:
parser = argparse.ArgumentParser(description="Revalidate legacy PChome competitor identities.")
parser.add_argument("--limit", type=int, default=500)
parser.add_argument("--source", default="pchome")
parser.add_argument("--apply", action="store_true", help="Persist accepted tags and attempt audit rows.")
parser.add_argument("--fresh-only", action="store_true", help="Skip expired competitor prices.")
parser.add_argument("--no-attempts", action="store_true", help="Do not append competitor_match_attempts rows.")
args = parser.parse_args()
from database.manager import DatabaseManager
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
engine = DatabaseManager().engine
stats = revalidate_existing_competitor_identities(
engine,
source=args.source,
limit=args.limit,
dry_run=not args.apply,
include_expired=not args.fresh_only,
write_attempts=not args.no_attempts,
)
print(json.dumps(asdict(stats), ensure_ascii=False, indent=2, default=str))
return 0
if __name__ == "__main__":
raise SystemExit(_main())

View File

@@ -0,0 +1,65 @@
from datetime import datetime, timedelta
def test_revalidator_promotes_legacy_same_product_without_refreshing_expired_price():
from services.competitor_identity_revalidator import classify_legacy_competitor_row
row = {
"sku": "10950080",
"momo_name": "【台酒生技】黑酵母酒粕逆齡活膚青春露5入-(120ml/入)",
"momo_price": 999,
"pchome_price": 899,
"competitor_product_id": "PC-1",
"competitor_product_name": "【台酒生技】金粹黑酵母酒粕逆齡活膚青春露120ml_5入",
"tags": ["discount_10pct"],
"is_expired": True,
"expires_at": datetime.utcnow() - timedelta(hours=1),
}
decision = classify_legacy_competitor_row(row)
assert decision.accepted is True
assert decision.status == "expired_match"
assert decision.score >= 0.76
assert "identity_v2" in decision.tags
assert "legacy_revalidated" in decision.tags
def test_revalidator_rejects_legacy_brand_conflict():
from services.competitor_identity_revalidator import classify_legacy_competitor_row
row = {
"sku": "BAD-1",
"momo_name": "【蘭蔻】官方直營 玫瑰霜60ml+玫瑰精露150ml",
"momo_price": 18765,
"pchome_price": 1249,
"competitor_product_id": "PC-BAD",
"competitor_product_name": "LOREAL Paris 巴黎萊雅 金致臻顏花蜜奢養膠原輕盈乳霜_60ml",
"tags": [],
"is_expired": False,
}
decision = classify_legacy_competitor_row(row)
assert decision.accepted is False
assert decision.status == "identity_veto"
assert decision.hard_veto is True
assert "brand_conflict" in decision.diagnostic
def test_dashboard_match_status_distinguishes_expired_and_legacy_rows():
from routes.dashboard_routes import _build_pchome_match_status
expired = _build_pchome_match_status(
None,
ineligible={"reason": "expired_match", "match_score": 0.91},
)
legacy = _build_pchome_match_status(
None,
ineligible={"reason": "legacy_without_identity_v2", "match_score": 0.82},
)
assert expired["label"] == "價格過期待刷新"
assert expired["tone"] == "watch"
assert legacy["label"] == "舊版配對待重驗"
assert "identity_v2" in legacy["summary"]