#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Revalidate legacy PChome competitor rows with the identity_v2 matcher. This module upgrades only rows that the current matcher can prove are the same product. It does not relax the production match gate and it does not refresh expired prices by default. """ from __future__ import annotations import argparse import json import logging from dataclasses import asdict, dataclass from typing import Any, Iterable from sqlalchemy import text from services.competitor_price_feeder import MIN_MATCH_SCORE from services.marketplace_product_matcher import MatchDiagnostics, score_marketplace_match logger = logging.getLogger(__name__) MATCH_ALGORITHM_TAG = "identity_v2" REVALIDATION_TAG = "legacy_revalidated" REVALIDATION_SOURCE_TAG = "revalidated_from_competitor_prices" @dataclass class RevalidationDecision: sku: str competitor_product_id: str | None status: str accepted: bool score: float | None hard_veto: bool is_expired: bool tags: list[str] diagnostic: str @dataclass class RevalidationStats: scanned: int = 0 promoted_fresh: int = 0 promoted_expired: int = 0 rejected_low_score: int = 0 rejected_veto: int = 0 skipped: int = 0 attempts_written: int = 0 errors: int = 0 samples: list[dict[str, Any]] | None = None def _json_tags(value: Any) -> list[str]: if not value: return [] if isinstance(value, list): return [str(item) for item in value if item] if isinstance(value, tuple): return [str(item) for item in value if item] if isinstance(value, str): try: parsed = json.loads(value) except Exception: return [] if isinstance(parsed, list): return [str(item) for item in parsed if item] return [] def _dedupe(values: Iterable[str]) -> list[str]: result: list[str] = [] seen: set[str] = set() for value in values: if not value or value in seen: continue result.append(value) seen.add(value) return result def _diagnostic_text(diagnostics: MatchDiagnostics | None) -> str: if diagnostics is None: return "" reasons = ",".join(diagnostics.reasons or ()) return ( f"score={diagnostics.score}; brand={diagnostics.brand_score}; " f"token={diagnostics.token_score}; spec={diagnostics.spec_score}; " f"seq={diagnostics.sequence_score}; type={diagnostics.type_score}; " f"penalty={diagnostics.price_penalty}; veto={diagnostics.hard_veto}; " f"reasons={reasons}" ) def _build_revalidated_tags(existing_tags: Any, diagnostics: MatchDiagnostics) -> list[str]: tags = list(_json_tags(existing_tags)) tags.extend(diagnostics.tags) tags.append(REVALIDATION_TAG) tags.append(REVALIDATION_SOURCE_TAG) for reason in diagnostics.reasons or (): tags.append(f"match_{reason}") return _dedupe(tags) def classify_legacy_competitor_row(row: dict[str, Any]) -> RevalidationDecision: """Classify a legacy competitor_prices row without mutating storage.""" sku = str(row.get("sku") or "") momo_name = row.get("momo_name") or "" competitor_name = row.get("competitor_product_name") or "" competitor_product_id = row.get("competitor_product_id") is_expired = bool(row.get("is_expired")) if not sku or not momo_name or not competitor_name: return RevalidationDecision( sku=sku, competitor_product_id=competitor_product_id, status="skipped_missing_identity_text", accepted=False, score=None, hard_veto=False, is_expired=is_expired, tags=_json_tags(row.get("tags")), diagnostic="missing momo_name or competitor_product_name", ) diagnostics = score_marketplace_match( momo_name, competitor_name, momo_price=row.get("momo_price"), competitor_price=row.get("pchome_price"), ) tags = _build_revalidated_tags(row.get("tags"), diagnostics) diagnostic = _diagnostic_text(diagnostics) if diagnostics.score >= MIN_MATCH_SCORE and not diagnostics.hard_veto: return RevalidationDecision( sku=sku, competitor_product_id=competitor_product_id, status="expired_match" if is_expired else "matched", accepted=True, score=diagnostics.score, hard_veto=False, is_expired=is_expired, tags=tags, diagnostic=diagnostic, ) return RevalidationDecision( sku=sku, competitor_product_id=competitor_product_id, status="identity_veto" if diagnostics.hard_veto else "low_score", accepted=False, score=diagnostics.score, hard_veto=diagnostics.hard_veto, is_expired=is_expired, tags=tags, diagnostic=diagnostic, ) def _fetch_legacy_rows(conn, source: str, limit: int, include_expired: bool) -> list[dict[str, Any]]: expiry_filter = "" if include_expired else "AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)" sql = text(f""" WITH latest_momo AS ( SELECT p.id AS product_id, p.i_code AS sku, p.name AS momo_name, pr.price AS momo_price, ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn FROM products p JOIN price_records pr ON pr.product_id = p.id WHERE p.status = 'ACTIVE' ) SELECT cp.id AS competitor_price_id, lm.product_id AS momo_product_id, lm.sku, lm.momo_name, lm.momo_price, cp.price AS pchome_price, cp.competitor_product_id, cp.competitor_product_name, cp.match_score AS previous_match_score, cp.tags, cp.crawled_at, cp.expires_at, CASE WHEN cp.expires_at IS NOT NULL AND cp.expires_at <= CURRENT_TIMESTAMP THEN TRUE ELSE FALSE END AS is_expired FROM latest_momo lm JOIN competitor_prices cp ON cp.sku = lm.sku AND cp.source = :source WHERE lm.rn = 1 AND cp.price IS NOT NULL AND cp.price > 0 AND cp.competitor_product_name IS NOT NULL AND NOT (COALESCE(cp.tags, '[]'::jsonb) ? :identity_tag) {expiry_filter} ORDER BY CASE WHEN cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP THEN 0 ELSE 1 END, cp.crawled_at DESC NULLS LAST, cp.match_score DESC NULLS LAST, lm.sku LIMIT :limit """) rows = conn.execute( sql, { "source": source, "identity_tag": MATCH_ALGORITHM_TAG, "limit": max(1, int(limit)), }, ).mappings().all() return [dict(row) for row in rows] def _ensure_attempt_table(conn) -> None: from services.competitor_price_feeder import CompetitorPriceFeeder feeder = CompetitorPriceFeeder(engine=None) feeder._ensure_competitor_match_attempts_table(conn) def _insert_attempt(conn, row: dict[str, Any], decision: RevalidationDecision, source: str) -> None: search_terms_expr = "CAST(:search_terms AS jsonb)" if conn.dialect.name == "postgresql" else ":search_terms" conn.execute(text(f""" INSERT INTO competitor_match_attempts (sku, source, momo_product_id, momo_product_name, momo_price, search_terms, candidate_count, attempt_status, best_competitor_product_id, best_competitor_product_name, best_competitor_price, best_match_score, error_message, attempted_at) VALUES (:sku, :source, :momo_product_id, :momo_product_name, :momo_price, {search_terms_expr}, :candidate_count, :attempt_status, :best_id, :best_name, :best_price, :best_score, :error_message, CURRENT_TIMESTAMP) """), { "sku": decision.sku, "source": source, "momo_product_id": row.get("momo_product_id"), "momo_product_name": row.get("momo_name"), "momo_price": row.get("momo_price"), "search_terms": json.dumps(["legacy_competitor_prices_revalidation"], ensure_ascii=False), "candidate_count": 1, "attempt_status": decision.status, "best_id": decision.competitor_product_id, "best_name": (row.get("competitor_product_name") or "")[:300] or None, "best_price": row.get("pchome_price"), "best_score": decision.score, "error_message": (decision.diagnostic or "")[:1000] or None, }) def revalidate_existing_competitor_identities( engine, *, source: str = "pchome", limit: int = 500, dry_run: bool = True, include_expired: bool = True, write_attempts: bool = True, sample_limit: int = 10, ) -> RevalidationStats: """Re-score legacy competitor_prices rows and optionally persist safe upgrades.""" stats = RevalidationStats(samples=[]) with engine.begin() as conn: rows = _fetch_legacy_rows(conn, source=source, limit=limit, include_expired=include_expired) stats.scanned = len(rows) if not dry_run and write_attempts: _ensure_attempt_table(conn) for row in rows: try: decision = classify_legacy_competitor_row(row) if len(stats.samples or []) < sample_limit: stats.samples.append({ "sku": decision.sku, "status": decision.status, "score": decision.score, "is_expired": decision.is_expired, "momo_name": (row.get("momo_name") or "")[:60], "competitor_name": (row.get("competitor_product_name") or "")[:60], }) if decision.accepted: if decision.is_expired: stats.promoted_expired += 1 else: stats.promoted_fresh += 1 if not dry_run: conn.execute(text(""" UPDATE competitor_prices SET match_score = :match_score, tags = CAST(:tags AS jsonb) WHERE id = :competitor_price_id """), { "match_score": decision.score, "tags": json.dumps(decision.tags, ensure_ascii=False), "competitor_price_id": row.get("competitor_price_id"), }) elif decision.status == "identity_veto": stats.rejected_veto += 1 elif decision.status == "low_score": stats.rejected_low_score += 1 else: stats.skipped += 1 if not dry_run and write_attempts: _insert_attempt(conn, row, decision, source=source) stats.attempts_written += 1 except Exception as exc: stats.errors += 1 logger.warning( "[CompetitorIdentityRevalidator] row failed sku=%s: %s", row.get("sku"), exc, exc_info=True, ) return stats def _main() -> int: parser = argparse.ArgumentParser(description="Revalidate legacy PChome competitor identities.") parser.add_argument("--limit", type=int, default=500) parser.add_argument("--source", default="pchome") parser.add_argument("--apply", action="store_true", help="Persist accepted tags and attempt audit rows.") parser.add_argument("--fresh-only", action="store_true", help="Skip expired competitor prices.") parser.add_argument("--no-attempts", action="store_true", help="Do not append competitor_match_attempts rows.") args = parser.parse_args() from database.manager import DatabaseManager logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") engine = DatabaseManager().engine stats = revalidate_existing_competitor_identities( engine, source=args.source, limit=args.limit, dry_run=not args.apply, include_expired=not args.fresh_only, write_attempts=not args.no_attempts, ) print(json.dumps(asdict(stats), ensure_ascii=False, indent=2, default=str)) return 0 if __name__ == "__main__": raise SystemExit(_main())