diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index 3aebb48..7f092c5 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -338,6 +338,8 @@ LEFT JOIN competitor_prices cp - Dashboard、AI pick、Hermes、Excel export、daily/growth 圖表與 competitor PPT 必須以 `competitor_prices + competitor_price_history + competitor_match_attempts` 為短期唯一生產真相源,且只消費 `identity_v2` matcher 驗證過的配對;舊版僅靠 `match_score` 的快取不可直接進入決策或簡報。 - `pchome_matches` 與 live `pchome_batch()` 僅保留 legacy compatibility,不得作為新簡報或 AI 決策主來源。 - `services/competitor_intel_repository.py` 是下游頁面、圖表、簡報的共用查詢出口;新增消費端不得各自硬寫不同 match threshold。 +- `services/competitor_identity_revalidator.py` 可對既有 `competitor_prices` legacy row 離線重跑 `identity_v2`:只有新版 matcher 分數 `>= 0.76` 且無 hard veto 才補 `identity_v2` / `legacy_revalidated` tags;預設不刷新 `expires_at`,避免過期價格進入決策。 +- Dashboard 必須把「待比對」拆成可診斷狀態:`價格過期待刷新`、`舊版配對待重驗`、`低分配對待審`、`身份否決`、`找不到同款`、`抓取異常`、`尚未搜尋`。不可再用單一「待比對」掩蓋資料品質原因。 ### 執行方式 @@ -345,6 +347,12 @@ LEFT JOIN competitor_prices cp # 手動觸發一輪抓取 python3 services/competitor_price_feeder.py +# 預覽 legacy PChome 快取 identity_v2 重驗證(不寫入) +python3 -m services.competitor_identity_revalidator --limit 500 + +# 寫入安全通過的 identity_v2 tag;不刷新過期價格 +python3 -m services.competitor_identity_revalidator --limit 500 --apply + # 未來整合為 K3s CronJob(每 4 小時) # k8s/jobs/competitor-price-feeder-cronjob.yaml ``` diff --git a/routes/dashboard_routes.py b/routes/dashboard_routes.py index 981bee3..4e62a5f 100644 --- a/routes/dashboard_routes.py +++ b/routes/dashboard_routes.py @@ -62,7 +62,70 @@ def _to_float(value): return None -def _build_pchome_match_status(attempt=None): +def _build_pchome_match_status(attempt=None, ineligible=None): + if attempt: + status = attempt.get('attempt_status') or 'unknown' + if status == 'matched': + score = _to_float(attempt.get('best_match_score')) + score_text = f"最佳候選 {round(score * 100)}%" if score is not None else "已完成身份比對" + return { + 'label': '已配對待刷新', + 'tone': 'watch', + 'summary': '曾通過 identity_v2,但目前沒有有效價格快取,等待下一輪刷新', + 'detail': score_text, + } + if status == 'expired_match': + score = _to_float(attempt.get('best_match_score')) + score_text = f"身份分數 {round(score * 100)}%" if score is not None else "已完成身份比對" + return { + 'label': '價格過期待刷新', + 'tone': 'watch', + 'summary': '同款身份已確認,但 PChome 價格快取過期,不顯示舊價避免誤判', + 'detail': score_text, + } + if status == 'identity_veto': + score = _to_float(attempt.get('best_match_score')) + score_text = f"最佳候選 {round(score * 100)}%" if score is not None else "已拒絕候選" + return { + 'label': '身份否決', + 'tone': 'neutral', + 'summary': '新版 identity_v2 判定不是同款,已阻擋自動比價', + 'detail': score_text, + } + + if ineligible: + reason = ineligible.get('reason') or 'not_eligible' + score = _to_float(ineligible.get('match_score')) + score_text = f"match {round(score * 100)}%" if score is not None else None + if reason == 'expired_match': + return { + 'label': '價格過期待刷新', + 'tone': 'watch', + 'summary': '已有高信心同款配對,但 PChome 價格快取過期,等待補抓刷新', + 'detail': score_text, + } + if reason == 'legacy_without_identity_v2': + return { + 'label': '舊版配對待重驗', + 'tone': 'neutral', + 'summary': '舊版 PChome 配對尚未通過 identity_v2,不進入正式決策', + 'detail': score_text, + } + if reason == 'below_score_floor': + return { + 'label': '低分配對待審', + 'tone': 'neutral', + 'summary': '已有候選但低於高信心門檻,避免錯配所以暫不採用', + 'detail': score_text, + } + if reason == 'invalid_price': + return { + 'label': '價格無效待刷新', + 'tone': 'watch', + 'summary': 'PChome 配對缺少有效價格,等待下一輪補抓', + 'detail': None, + } + if not attempt: return { 'label': '尚未搜尋', @@ -218,6 +281,89 @@ def _load_pchome_competitor_map(session, skus): return result +def _load_pchome_ineligible_competitor_map(session, skus): + """Read non-decision PChome rows so the UI can explain why a SKU is pending.""" + sku_list = [str(sku) for sku in skus if sku] + if not sku_list: + return {} + + try: + stmt = text(""" + WITH ineligible AS ( + SELECT + sku, + price, + competitor_product_id, + competitor_product_name, + match_score, + tags, + crawled_at, + expires_at, + CASE + WHEN price IS NULL OR price <= 0 THEN 'invalid_price' + WHEN (expires_at IS NOT NULL AND expires_at <= CURRENT_TIMESTAMP) + AND COALESCE(match_score, 0) >= :match_score_floor + AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2' + THEN 'expired_match' + WHEN NOT (COALESCE(tags, '[]'::jsonb) ? 'identity_v2') + THEN 'legacy_without_identity_v2' + WHEN COALESCE(match_score, 0) < :match_score_floor + THEN 'below_score_floor' + ELSE 'not_eligible' + END AS reason, + ROW_NUMBER() OVER ( + PARTITION BY sku + ORDER BY + CASE + WHEN (expires_at IS NOT NULL AND expires_at <= CURRENT_TIMESTAMP) + AND COALESCE(match_score, 0) >= :match_score_floor + AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2' + THEN 0 + WHEN NOT (COALESCE(tags, '[]'::jsonb) ? 'identity_v2') THEN 1 + WHEN COALESCE(match_score, 0) < :match_score_floor THEN 2 + ELSE 3 + END, + crawled_at DESC NULLS LAST, + match_score DESC NULLS LAST + ) AS rn + FROM competitor_prices + WHERE source = 'pchome' + AND sku IN :skus + AND NOT ( + (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP) + AND price IS NOT NULL + AND price > 0 + AND COALESCE(match_score, 0) >= :match_score_floor + AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2' + ) + ) + SELECT * + FROM ineligible + WHERE rn = 1 + """).bindparams(bindparam("skus", expanding=True)) + rows = session.execute( + stmt, + {"skus": sku_list, "match_score_floor": PCHOME_MATCH_SCORE_FLOOR}, + ).mappings().all() + except Exception as exc: + sys_log.warning(f"[Dashboard] PChome 非有效配對原因讀取略過: {exc}") + return {} + + result = {} + for row in rows: + result[str(row.get('sku'))] = { + 'reason': row.get('reason'), + 'price': _to_float(row.get('price')), + 'product_id': row.get('competitor_product_id'), + 'product_name': row.get('competitor_product_name'), + 'match_score': _to_float(row.get('match_score')), + 'tags': row.get('tags'), + 'crawled_at': row.get('crawled_at'), + 'expires_at': row.get('expires_at'), + } + return result + + def _load_pchome_match_attempt_map(session, skus): sku_list = [str(sku) for sku in skus if sku] if not sku_list: @@ -1558,14 +1704,20 @@ def index(): session, [item['record'].product.i_code for item in paged_items] ) + pchome_ineligible_map = _load_pchome_ineligible_competitor_map( + session, + [item['record'].product.i_code for item in paged_items] + ) for item in paged_items: product = item['record'].product sku = str(product.i_code) competitor = pchome_map.get(sku) attempt = pchome_attempt_map.get(sku) - match_status = _build_pchome_match_status(attempt) + ineligible = pchome_ineligible_map.get(sku) + match_status = _build_pchome_match_status(attempt, ineligible=ineligible) item['pchome_competitor'] = competitor item['pchome_match_attempt'] = attempt + item['pchome_ineligible_competitor'] = ineligible item['pchome_match_status'] = match_status item['competitor_decision'] = _build_competitor_decision( item['record'].price, diff --git a/scheduler.py b/scheduler.py index 7ef2983..d189578 100644 --- a/scheduler.py +++ b/scheduler.py @@ -2193,6 +2193,7 @@ def run_pchome_match_backfill_task(): from sqlalchemy import create_engine from services.ai_product_pick_agent import generate_product_pick_list from services.cache_manager import clear_dashboard_cache + from services.competitor_identity_revalidator import revalidate_existing_competitor_identities from services.competitor_intel_repository import clear_competitor_intel_cache from services.competitor_price_feeder import CompetitorPriceFeeder @@ -2200,6 +2201,13 @@ def run_pchome_match_backfill_task(): logging.info(f"[Scheduler] [PChomeBackfill] 🚀 啟動待比對補抓任務 | {now_str}") engine = create_engine(DATABASE_PATH) + revalidation_result = revalidate_existing_competitor_identities( + engine, + limit=500, + dry_run=False, + include_expired=True, + write_attempts=True, + ) feeder_result = CompetitorPriceFeeder(engine=engine).run_unmatched_priority(limit=120) pick_result = generate_product_pick_list(engine, limit=50) clear_dashboard_cache() @@ -2213,12 +2221,17 @@ def run_pchome_match_backfill_task(): "errors": feeder_result.errors, "duration_sec": feeder_result.duration_sec, "history_written": feeder_result.history_written, + "identity_revalidated_fresh": revalidation_result.promoted_fresh, + "identity_revalidated_expired": revalidation_result.promoted_expired, + "identity_revalidation_rejected_low": revalidation_result.rejected_low_score, + "identity_revalidation_rejected_veto": revalidation_result.rejected_veto, "pick_candidates": pick_result.candidates, "pick_written": pick_result.written, "status": "Success", } logging.info( f"[Scheduler] [PChomeBackfill] ✅ 完成 | " + f"revalidated={revalidation_result.promoted_fresh}+{revalidation_result.promoted_expired} " f"matched={feeder_result.matched}/{feeder_result.total_skus} " f"history_written={feeder_result.history_written} " f"pick_written={pick_result.written} " diff --git a/services/competitor_identity_revalidator.py b/services/competitor_identity_revalidator.py new file mode 100644 index 0000000..79e4140 --- /dev/null +++ b/services/competitor_identity_revalidator.py @@ -0,0 +1,357 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Revalidate legacy PChome competitor rows with the identity_v2 matcher. + +This module upgrades only rows that the current matcher can prove are the same +product. It does not relax the production match gate and it does not refresh +expired prices by default. +""" + +from __future__ import annotations + +import argparse +import json +import logging +from dataclasses import asdict, dataclass +from typing import Any, Iterable + +from sqlalchemy import text + +from services.competitor_price_feeder import MIN_MATCH_SCORE +from services.marketplace_product_matcher import MatchDiagnostics, score_marketplace_match + +logger = logging.getLogger(__name__) + +MATCH_ALGORITHM_TAG = "identity_v2" +REVALIDATION_TAG = "legacy_revalidated" +REVALIDATION_SOURCE_TAG = "revalidated_from_competitor_prices" + + +@dataclass +class RevalidationDecision: + sku: str + competitor_product_id: str | None + status: str + accepted: bool + score: float | None + hard_veto: bool + is_expired: bool + tags: list[str] + diagnostic: str + + +@dataclass +class RevalidationStats: + scanned: int = 0 + promoted_fresh: int = 0 + promoted_expired: int = 0 + rejected_low_score: int = 0 + rejected_veto: int = 0 + skipped: int = 0 + attempts_written: int = 0 + errors: int = 0 + samples: list[dict[str, Any]] | None = None + + +def _json_tags(value: Any) -> list[str]: + if not value: + return [] + if isinstance(value, list): + return [str(item) for item in value if item] + if isinstance(value, tuple): + return [str(item) for item in value if item] + if isinstance(value, str): + try: + parsed = json.loads(value) + except Exception: + return [] + if isinstance(parsed, list): + return [str(item) for item in parsed if item] + return [] + + +def _dedupe(values: Iterable[str]) -> list[str]: + result: list[str] = [] + seen: set[str] = set() + for value in values: + if not value or value in seen: + continue + result.append(value) + seen.add(value) + return result + + +def _diagnostic_text(diagnostics: MatchDiagnostics | None) -> str: + if diagnostics is None: + return "" + reasons = ",".join(diagnostics.reasons or ()) + return ( + f"score={diagnostics.score}; brand={diagnostics.brand_score}; " + f"token={diagnostics.token_score}; spec={diagnostics.spec_score}; " + f"seq={diagnostics.sequence_score}; type={diagnostics.type_score}; " + f"penalty={diagnostics.price_penalty}; veto={diagnostics.hard_veto}; " + f"reasons={reasons}" + ) + + +def _build_revalidated_tags(existing_tags: Any, diagnostics: MatchDiagnostics) -> list[str]: + tags = list(_json_tags(existing_tags)) + tags.extend(diagnostics.tags) + tags.append(REVALIDATION_TAG) + tags.append(REVALIDATION_SOURCE_TAG) + for reason in diagnostics.reasons or (): + tags.append(f"match_{reason}") + return _dedupe(tags) + + +def classify_legacy_competitor_row(row: dict[str, Any]) -> RevalidationDecision: + """Classify a legacy competitor_prices row without mutating storage.""" + sku = str(row.get("sku") or "") + momo_name = row.get("momo_name") or "" + competitor_name = row.get("competitor_product_name") or "" + competitor_product_id = row.get("competitor_product_id") + is_expired = bool(row.get("is_expired")) + + if not sku or not momo_name or not competitor_name: + return RevalidationDecision( + sku=sku, + competitor_product_id=competitor_product_id, + status="skipped_missing_identity_text", + accepted=False, + score=None, + hard_veto=False, + is_expired=is_expired, + tags=_json_tags(row.get("tags")), + diagnostic="missing momo_name or competitor_product_name", + ) + + diagnostics = score_marketplace_match( + momo_name, + competitor_name, + momo_price=row.get("momo_price"), + competitor_price=row.get("pchome_price"), + ) + tags = _build_revalidated_tags(row.get("tags"), diagnostics) + diagnostic = _diagnostic_text(diagnostics) + + if diagnostics.score >= MIN_MATCH_SCORE and not diagnostics.hard_veto: + return RevalidationDecision( + sku=sku, + competitor_product_id=competitor_product_id, + status="expired_match" if is_expired else "matched", + accepted=True, + score=diagnostics.score, + hard_veto=False, + is_expired=is_expired, + tags=tags, + diagnostic=diagnostic, + ) + + return RevalidationDecision( + sku=sku, + competitor_product_id=competitor_product_id, + status="identity_veto" if diagnostics.hard_veto else "low_score", + accepted=False, + score=diagnostics.score, + hard_veto=diagnostics.hard_veto, + is_expired=is_expired, + tags=tags, + diagnostic=diagnostic, + ) + + +def _fetch_legacy_rows(conn, source: str, limit: int, include_expired: bool) -> list[dict[str, Any]]: + expiry_filter = "" if include_expired else "AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)" + sql = text(f""" + WITH latest_momo AS ( + SELECT + p.id AS product_id, + p.i_code AS sku, + p.name AS momo_name, + pr.price AS momo_price, + ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn + FROM products p + JOIN price_records pr ON pr.product_id = p.id + WHERE p.status = 'ACTIVE' + ) + SELECT + cp.id AS competitor_price_id, + lm.product_id AS momo_product_id, + lm.sku, + lm.momo_name, + lm.momo_price, + cp.price AS pchome_price, + cp.competitor_product_id, + cp.competitor_product_name, + cp.match_score AS previous_match_score, + cp.tags, + cp.crawled_at, + cp.expires_at, + CASE + WHEN cp.expires_at IS NOT NULL AND cp.expires_at <= CURRENT_TIMESTAMP + THEN TRUE ELSE FALSE + END AS is_expired + FROM latest_momo lm + JOIN competitor_prices cp + ON cp.sku = lm.sku + AND cp.source = :source + WHERE lm.rn = 1 + AND cp.price IS NOT NULL + AND cp.price > 0 + AND cp.competitor_product_name IS NOT NULL + AND NOT (COALESCE(cp.tags, '[]'::jsonb) ? :identity_tag) + {expiry_filter} + ORDER BY + CASE WHEN cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP THEN 0 ELSE 1 END, + cp.crawled_at DESC NULLS LAST, + cp.match_score DESC NULLS LAST, + lm.sku + LIMIT :limit + """) + rows = conn.execute( + sql, + { + "source": source, + "identity_tag": MATCH_ALGORITHM_TAG, + "limit": max(1, int(limit)), + }, + ).mappings().all() + return [dict(row) for row in rows] + + +def _ensure_attempt_table(conn) -> None: + from services.competitor_price_feeder import CompetitorPriceFeeder + + feeder = CompetitorPriceFeeder(engine=None) + feeder._ensure_competitor_match_attempts_table(conn) + + +def _insert_attempt(conn, row: dict[str, Any], decision: RevalidationDecision, source: str) -> None: + search_terms_expr = "CAST(:search_terms AS jsonb)" if conn.dialect.name == "postgresql" else ":search_terms" + conn.execute(text(f""" + INSERT INTO competitor_match_attempts + (sku, source, momo_product_id, momo_product_name, momo_price, + search_terms, candidate_count, attempt_status, + best_competitor_product_id, best_competitor_product_name, + best_competitor_price, best_match_score, error_message, + attempted_at) + VALUES + (:sku, :source, :momo_product_id, :momo_product_name, :momo_price, + {search_terms_expr}, :candidate_count, :attempt_status, + :best_id, :best_name, + :best_price, :best_score, :error_message, + CURRENT_TIMESTAMP) + """), { + "sku": decision.sku, + "source": source, + "momo_product_id": row.get("momo_product_id"), + "momo_product_name": row.get("momo_name"), + "momo_price": row.get("momo_price"), + "search_terms": json.dumps(["legacy_competitor_prices_revalidation"], ensure_ascii=False), + "candidate_count": 1, + "attempt_status": decision.status, + "best_id": decision.competitor_product_id, + "best_name": (row.get("competitor_product_name") or "")[:300] or None, + "best_price": row.get("pchome_price"), + "best_score": decision.score, + "error_message": (decision.diagnostic or "")[:1000] or None, + }) + + +def revalidate_existing_competitor_identities( + engine, + *, + source: str = "pchome", + limit: int = 500, + dry_run: bool = True, + include_expired: bool = True, + write_attempts: bool = True, + sample_limit: int = 10, +) -> RevalidationStats: + """Re-score legacy competitor_prices rows and optionally persist safe upgrades.""" + stats = RevalidationStats(samples=[]) + with engine.begin() as conn: + rows = _fetch_legacy_rows(conn, source=source, limit=limit, include_expired=include_expired) + stats.scanned = len(rows) + if not dry_run and write_attempts: + _ensure_attempt_table(conn) + + for row in rows: + try: + decision = classify_legacy_competitor_row(row) + if len(stats.samples or []) < sample_limit: + stats.samples.append({ + "sku": decision.sku, + "status": decision.status, + "score": decision.score, + "is_expired": decision.is_expired, + "momo_name": (row.get("momo_name") or "")[:60], + "competitor_name": (row.get("competitor_product_name") or "")[:60], + }) + + if decision.accepted: + if decision.is_expired: + stats.promoted_expired += 1 + else: + stats.promoted_fresh += 1 + if not dry_run: + conn.execute(text(""" + UPDATE competitor_prices + SET + match_score = :match_score, + tags = CAST(:tags AS jsonb) + WHERE id = :competitor_price_id + """), { + "match_score": decision.score, + "tags": json.dumps(decision.tags, ensure_ascii=False), + "competitor_price_id": row.get("competitor_price_id"), + }) + elif decision.status == "identity_veto": + stats.rejected_veto += 1 + elif decision.status == "low_score": + stats.rejected_low_score += 1 + else: + stats.skipped += 1 + + if not dry_run and write_attempts: + _insert_attempt(conn, row, decision, source=source) + stats.attempts_written += 1 + except Exception as exc: + stats.errors += 1 + logger.warning( + "[CompetitorIdentityRevalidator] row failed sku=%s: %s", + row.get("sku"), + exc, + exc_info=True, + ) + + return stats + + +def _main() -> int: + parser = argparse.ArgumentParser(description="Revalidate legacy PChome competitor identities.") + parser.add_argument("--limit", type=int, default=500) + parser.add_argument("--source", default="pchome") + parser.add_argument("--apply", action="store_true", help="Persist accepted tags and attempt audit rows.") + parser.add_argument("--fresh-only", action="store_true", help="Skip expired competitor prices.") + parser.add_argument("--no-attempts", action="store_true", help="Do not append competitor_match_attempts rows.") + args = parser.parse_args() + + from database.manager import DatabaseManager + + logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") + engine = DatabaseManager().engine + stats = revalidate_existing_competitor_identities( + engine, + source=args.source, + limit=args.limit, + dry_run=not args.apply, + include_expired=not args.fresh_only, + write_attempts=not args.no_attempts, + ) + print(json.dumps(asdict(stats), ensure_ascii=False, indent=2, default=str)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(_main()) diff --git a/tests/test_competitor_identity_revalidator.py b/tests/test_competitor_identity_revalidator.py new file mode 100644 index 0000000..cc748c9 --- /dev/null +++ b/tests/test_competitor_identity_revalidator.py @@ -0,0 +1,65 @@ +from datetime import datetime, timedelta + + +def test_revalidator_promotes_legacy_same_product_without_refreshing_expired_price(): + from services.competitor_identity_revalidator import classify_legacy_competitor_row + + row = { + "sku": "10950080", + "momo_name": "【台酒生技】黑酵母酒粕逆齡活膚青春露5入-(120ml/入)", + "momo_price": 999, + "pchome_price": 899, + "competitor_product_id": "PC-1", + "competitor_product_name": "【台酒生技】金粹黑酵母酒粕逆齡活膚青春露120ml_5入", + "tags": ["discount_10pct"], + "is_expired": True, + "expires_at": datetime.utcnow() - timedelta(hours=1), + } + + decision = classify_legacy_competitor_row(row) + + assert decision.accepted is True + assert decision.status == "expired_match" + assert decision.score >= 0.76 + assert "identity_v2" in decision.tags + assert "legacy_revalidated" in decision.tags + + +def test_revalidator_rejects_legacy_brand_conflict(): + from services.competitor_identity_revalidator import classify_legacy_competitor_row + + row = { + "sku": "BAD-1", + "momo_name": "【蘭蔻】官方直營 玫瑰霜60ml+玫瑰精露150ml", + "momo_price": 18765, + "pchome_price": 1249, + "competitor_product_id": "PC-BAD", + "competitor_product_name": "LOREAL Paris 巴黎萊雅 金致臻顏花蜜奢養膠原輕盈乳霜_60ml", + "tags": [], + "is_expired": False, + } + + decision = classify_legacy_competitor_row(row) + + assert decision.accepted is False + assert decision.status == "identity_veto" + assert decision.hard_veto is True + assert "brand_conflict" in decision.diagnostic + + +def test_dashboard_match_status_distinguishes_expired_and_legacy_rows(): + from routes.dashboard_routes import _build_pchome_match_status + + expired = _build_pchome_match_status( + None, + ineligible={"reason": "expired_match", "match_score": 0.91}, + ) + legacy = _build_pchome_match_status( + None, + ineligible={"reason": "legacy_without_identity_v2", "match_score": 0.82}, + ) + + assert expired["label"] == "價格過期待刷新" + assert expired["tone"] == "watch" + assert legacy["label"] == "舊版配對待重驗" + assert "identity_v2" in legacy["summary"]