Files
ewoooc/scripts/audit_competitor_identity_jsonl.py
OoO 01c888c565
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
V10.396 add offline competitor identity audit
2026-05-24 11:09:02 +08:00

184 lines
6.0 KiB
Python

#!/usr/bin/env python3
"""Offline audit for PChome competitor identity rows.
This script intentionally does not connect to the database. Export candidate
rows as JSONL, then run the current local matcher against that file.
"""
from __future__ import annotations
import argparse
import json
import sys
from collections import Counter
from pathlib import Path
from typing import Any, Iterable, Iterator
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from services.competitor_price_feeder import MIN_MATCH_SCORE # noqa: E402
from services.marketplace_product_matcher import score_marketplace_match # noqa: E402
PRICE_KEYS = ("pchome_price", "competitor_price", "best_competitor_price", "price")
COMPETITOR_NAME_KEYS = ("competitor_product_name", "best_competitor_product_name", "pchome_name")
def _first_present(row: dict[str, Any], keys: Iterable[str]) -> Any:
for key in keys:
value = row.get(key)
if value not in (None, ""):
return value
return None
def _as_bool(value: Any) -> bool:
if isinstance(value, bool):
return value
if value is None:
return False
if isinstance(value, (int, float)):
return bool(value)
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "t", "yes", "y"}
return False
def _read_jsonl(path: str) -> Iterator[dict[str, Any]]:
handle = sys.stdin if path == "-" else open(path, "r", encoding="utf-8")
try:
for line_no, line in enumerate(handle, start=1):
line = line.strip()
if not line:
continue
try:
value = json.loads(line)
except json.JSONDecodeError as exc:
yield {"_invalid_json": True, "_line_no": line_no, "_error": str(exc)}
continue
if isinstance(value, dict):
yield value
else:
yield {"_invalid_json": True, "_line_no": line_no, "_error": "line is not a JSON object"}
finally:
if handle is not sys.stdin:
handle.close()
def _sample(row: dict[str, Any], diagnostics, status: str) -> dict[str, Any]:
competitor_name = _first_present(row, COMPETITOR_NAME_KEYS)
competitor_price = _first_present(row, PRICE_KEYS)
return {
"sku": row.get("sku"),
"status": status,
"stored_score": row.get("stored_score") or row.get("match_score") or row.get("best_match_score"),
"current_score": diagnostics.score,
"current_veto": diagnostics.hard_veto,
"current_mode": diagnostics.comparison_mode,
"reasons": list(diagnostics.reasons or ()),
"momo": str(row.get("momo_name") or row.get("momo_product_name") or "")[:140],
"competitor": str(competitor_name or "")[:140],
"momo_price": row.get("momo_price"),
"competitor_price": competitor_price,
"crawled_at": row.get("crawled_at"),
}
def audit_rows(
rows: Iterable[dict[str, Any]],
*,
min_score: float = MIN_MATCH_SCORE,
sample_limit: int = 20,
) -> dict[str, Any]:
status_counts: Counter[str] = Counter()
reason_counts: Counter[str] = Counter()
summary: dict[str, Any] = {
"scanned": 0,
"accepted_current": 0,
"veto_current": 0,
"low_score_current": 0,
"skipped": 0,
"invalid_json": 0,
"expired": 0,
"fresh_veto_or_low": 0,
"samples": [],
}
for row in rows:
if row.get("_invalid_json"):
summary["invalid_json"] += 1
continue
summary["scanned"] += 1
momo_name = row.get("momo_name") or row.get("momo_product_name")
competitor_name = _first_present(row, COMPETITOR_NAME_KEYS)
competitor_price = _first_present(row, PRICE_KEYS)
is_expired = _as_bool(row.get("is_expired"))
if is_expired:
summary["expired"] += 1
if not momo_name or not competitor_name:
status_counts["skipped_missing_identity_text"] += 1
summary["skipped"] += 1
continue
diagnostics = score_marketplace_match(
str(momo_name),
str(competitor_name),
momo_price=row.get("momo_price"),
competitor_price=competitor_price,
)
reason_counts.update(diagnostics.reasons or ())
if diagnostics.hard_veto:
status = "identity_veto"
summary["veto_current"] += 1
elif diagnostics.score < min_score:
status = "low_score"
summary["low_score_current"] += 1
else:
status = "accepted"
summary["accepted_current"] += 1
status_counts[status] += 1
if status != "accepted" and not is_expired:
summary["fresh_veto_or_low"] += 1
if len(summary["samples"]) < sample_limit:
summary["samples"].append(_sample(row, diagnostics, status))
summary["status_counts"] = dict(status_counts)
summary["top_reasons"] = [
{"reason": reason, "count": count}
for reason, count in reason_counts.most_common(30)
]
return summary
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
description="Offline audit for competitor identity JSONL rows.",
epilog=(
"Example export SQL shape: SELECT row_to_json(t) FROM (...) t; "
"Feed one JSON object per line into this script."
),
)
parser.add_argument("input", nargs="?", default="-", help="JSONL file path, or '-' for stdin.")
parser.add_argument("--sample-limit", type=int, default=20)
parser.add_argument("--min-score", type=float, default=MIN_MATCH_SCORE)
args = parser.parse_args(argv)
summary = audit_rows(
_read_jsonl(args.input),
min_score=args.min_score,
sample_limit=max(0, args.sample_limit),
)
print(json.dumps(summary, ensure_ascii=False, indent=2, default=str))
return 0
if __name__ == "__main__":
raise SystemExit(main())