#!/usr/bin/env python3 """Offline audit for PChome competitor identity rows. This script intentionally does not connect to the database. Export candidate rows as JSONL, then run the current local matcher against that file. """ from __future__ import annotations import argparse import json import sys from collections import Counter from pathlib import Path from typing import Any, Iterable, Iterator ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from services.competitor_price_feeder import MIN_MATCH_SCORE # noqa: E402 from services.marketplace_product_matcher import score_marketplace_match # noqa: E402 PRICE_KEYS = ("pchome_price", "competitor_price", "best_competitor_price", "price") COMPETITOR_NAME_KEYS = ("competitor_product_name", "best_competitor_product_name", "pchome_name") def _first_present(row: dict[str, Any], keys: Iterable[str]) -> Any: for key in keys: value = row.get(key) if value not in (None, ""): return value return None def _as_bool(value: Any) -> bool: if isinstance(value, bool): return value if value is None: return False if isinstance(value, (int, float)): return bool(value) if isinstance(value, str): return value.strip().lower() in {"1", "true", "t", "yes", "y"} return False def _read_jsonl(path: str) -> Iterator[dict[str, Any]]: handle = sys.stdin if path == "-" else open(path, "r", encoding="utf-8") try: for line_no, line in enumerate(handle, start=1): line = line.strip() if not line: continue try: value = json.loads(line) except json.JSONDecodeError as exc: yield {"_invalid_json": True, "_line_no": line_no, "_error": str(exc)} continue if isinstance(value, dict): yield value else: yield {"_invalid_json": True, "_line_no": line_no, "_error": "line is not a JSON object"} finally: if handle is not sys.stdin: handle.close() def _sample(row: dict[str, Any], diagnostics, status: str) -> dict[str, Any]: competitor_name = _first_present(row, COMPETITOR_NAME_KEYS) competitor_price = _first_present(row, PRICE_KEYS) return { "sku": row.get("sku"), "status": status, "stored_score": row.get("stored_score") or row.get("match_score") or row.get("best_match_score"), "current_score": diagnostics.score, "current_veto": diagnostics.hard_veto, "current_mode": diagnostics.comparison_mode, "reasons": list(diagnostics.reasons or ()), "momo": str(row.get("momo_name") or row.get("momo_product_name") or "")[:140], "competitor": str(competitor_name or "")[:140], "momo_price": row.get("momo_price"), "competitor_price": competitor_price, "crawled_at": row.get("crawled_at"), } def audit_rows( rows: Iterable[dict[str, Any]], *, min_score: float = MIN_MATCH_SCORE, sample_limit: int = 20, ) -> dict[str, Any]: status_counts: Counter[str] = Counter() reason_counts: Counter[str] = Counter() summary: dict[str, Any] = { "scanned": 0, "accepted_current": 0, "veto_current": 0, "low_score_current": 0, "skipped": 0, "invalid_json": 0, "expired": 0, "fresh_veto_or_low": 0, "samples": [], } for row in rows: if row.get("_invalid_json"): summary["invalid_json"] += 1 continue summary["scanned"] += 1 momo_name = row.get("momo_name") or row.get("momo_product_name") competitor_name = _first_present(row, COMPETITOR_NAME_KEYS) competitor_price = _first_present(row, PRICE_KEYS) is_expired = _as_bool(row.get("is_expired")) if is_expired: summary["expired"] += 1 if not momo_name or not competitor_name: status_counts["skipped_missing_identity_text"] += 1 summary["skipped"] += 1 continue diagnostics = score_marketplace_match( str(momo_name), str(competitor_name), momo_price=row.get("momo_price"), competitor_price=competitor_price, ) reason_counts.update(diagnostics.reasons or ()) if diagnostics.hard_veto: status = "identity_veto" summary["veto_current"] += 1 elif diagnostics.score < min_score: status = "low_score" summary["low_score_current"] += 1 else: status = "accepted" summary["accepted_current"] += 1 status_counts[status] += 1 if status != "accepted" and not is_expired: summary["fresh_veto_or_low"] += 1 if len(summary["samples"]) < sample_limit: summary["samples"].append(_sample(row, diagnostics, status)) summary["status_counts"] = dict(status_counts) summary["top_reasons"] = [ {"reason": reason, "count": count} for reason, count in reason_counts.most_common(30) ] return summary def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( description="Offline audit for competitor identity JSONL rows.", epilog=( "Example export SQL shape: SELECT row_to_json(t) FROM (...) t; " "Feed one JSON object per line into this script." ), ) parser.add_argument("input", nargs="?", default="-", help="JSONL file path, or '-' for stdin.") parser.add_argument("--sample-limit", type=int, default=20) parser.add_argument("--min-score", type=float, default=MIN_MATCH_SCORE) args = parser.parse_args(argv) summary = audit_rows( _read_jsonl(args.input), min_score=args.min_score, sample_limit=max(0, args.sample_limit), ) print(json.dumps(summary, ensure_ascii=False, indent=2, default=str)) return 0 if __name__ == "__main__": raise SystemExit(main())