Files
ewoooc/services/competitor_intel_repository.py
2026-06-01 12:19:48 +08:00

1811 lines
72 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""PChome / MOMO 競價情報共用資料出口。
短期 canonical source
- competitor_prices目前有效配對
- competitor_price_history價格歷史趨勢
- competitor_match_attempts未配對與低信心診斷
"""
from __future__ import annotations
import os
import json
import pickle
import re
import time
from datetime import date, datetime, timedelta
from pathlib import Path
from threading import Lock
from typing import Any, Optional, Union
from sqlalchemy import inspect, text
PCHOME_MATCH_SCORE_FLOOR = 0.76
UNIT_COMPARABLE_STATUSES = {"unit_comparable", "refresh_unit_comparable"}
UNIT_PRICE_DECISION_STATUSES = UNIT_COMPARABLE_STATUSES | {"manual_unit_price_required"}
MANUAL_CLOSED_ATTEMPT_STATUSES = {
"manual_rejected",
"manual_unit_price_required",
"manual_needs_research",
}
ACTIONABLE_ATTEMPT_STATUSES = {
"rescore_accepted_current",
"unit_comparable",
"refresh_unit_comparable",
"identity_veto",
"low_score",
"refresh_low_score",
"recoverable_low_score",
"true_low_confidence",
"protected_existing_match",
"expired_match",
"refresh_no_result",
"no_result",
}
REVIEW_QUEUE_ATTEMPT_STATUSES = ACTIONABLE_ATTEMPT_STATUSES | MANUAL_CLOSED_ATTEMPT_STATUSES
REVIEW_STATUS_FILTER_GROUPS = {
"rescore_accepted": ("rescore_accepted_current",),
"unit_comparable": ("unit_comparable", "refresh_unit_comparable"),
"identity_veto": ("identity_veto",),
"low_score": ("low_score", "refresh_low_score", "recoverable_low_score", "true_low_confidence"),
"recoverable_low_score": ("recoverable_low_score",),
"true_low_confidence": ("true_low_confidence",),
"legacy_low_score": ("low_score", "refresh_low_score"),
"protected_existing_match": ("protected_existing_match",),
"expired_match": ("expired_match",),
"no_result": ("no_result", "refresh_no_result"),
"manual_closed": ("manual_rejected", "manual_unit_price_required", "manual_needs_research"),
}
ATTEMPT_STATUS_LABELS = {
"rescore_accepted_current": "重算待人工覆核",
"unit_comparable": "需單位價比較",
"refresh_unit_comparable": "需單位價比較",
"identity_veto": "身份否決",
"low_score": "低信心舊候選",
"refresh_low_score": "刷新後仍低信心舊候選",
"recoverable_low_score": "近門檻可救回",
"true_low_confidence": "證據不足待觀察",
"protected_existing_match": "既有較強配對保護中",
"expired_match": "價格過期待刷新",
"refresh_no_result": "刷新找不到商品",
"no_result": "找不到同款",
"never_attempted": "尚未搜尋",
"manual_accepted": "人工已採用",
"manual_rejected": "人工已否決",
"manual_unit_price_required": "人工標記單位價",
"manual_needs_research": "人工要求補搜尋",
}
ATTEMPT_ACTION_LABELS = {
"rescore_accepted_current": "人工確認身份後才可採用",
"unit_comparable": "人工確認檔期、贈品與單位價",
"refresh_unit_comparable": "人工確認檔期、贈品與單位價",
"identity_veto": "確認是否為不同商品線或規格",
"low_score": "先補搜尋或重算,避免舊候選直接進正式價差",
"refresh_low_score": "刷新後仍低分,需補搜尋詞或等待新證據",
"recoverable_low_score": "優先回放這批近門檻同品線候選",
"true_low_confidence": "保守保留,等待更明確的身份證據",
"protected_existing_match": "比較新舊候選證據,避免覆蓋較強正式配對",
"expired_match": "重新刷新 PChome 價格",
"refresh_no_result": "調整搜尋詞後重抓",
"no_result": "補充搜尋詞或品牌關鍵字",
"never_attempted": "排入 PChome 補抓",
"manual_accepted": "已寫入正式 PChome 同款配對",
"manual_rejected": "已關閉此候選,等待下一輪新候選",
"manual_unit_price_required": "維持單位價比較,不寫入正式總價差",
"manual_needs_research": "補搜尋詞或重新抓取後再判斷",
}
MANUAL_REVIEW_ACTION_LABELS = {
"accept_identity": "人工採用",
"reject_identity": "人工否決",
"unit_price_required": "人工單位價",
"needs_research": "需補搜尋",
}
DECISION_ACTION_LABELS = {
"compare_existing_identity": "比較既有正式候選與新候選",
"review_accept_identity": "人工確認身份後採用同款",
"unit_price_required": "確認單位價 / 組合差異",
"needs_research": "補搜尋詞或重新抓取",
"verify_or_reject_identity": "確認身份或否決候選",
"refresh_or_compare_identity": "刷新價格或比較候選",
"human_review": "人工覆核",
}
DATA_QUALITY_LABELS = {
"complete": "證據完整",
"partial": "證據部分完整",
"missing": "缺少候選證據",
"stale": "證據過期",
}
MATCH_DIAGNOSTIC_REASON_LABELS = {
"brand_conflict": "品牌不符",
"product_line_conflict": "商品線不符",
"type_conflict": "品類不符",
"volume_conflict": "容量差異",
"weight_conflict": "重量差異",
"dosage_conflict": "劑量差異",
"count_conflict": "件數差異",
"component_count_conflict": "入數差異",
"multi_component_conflict": "組合差異",
"refill_pack_conflict": "補充包差異",
"unit_comparable": "需單位價",
"price_ratio_extreme": "價差極端",
"price_ratio_wide": "價差過大",
"catalog_count_omission": "目錄入數待確認",
"makeup_usage_conflict": "彩妝用途不同",
"makeup_finish_conflict": "妝效質地不同",
"nail_tool_function_conflict": "工具功能不同",
"schick_razor_line_conflict": "除毛刀品線不同",
"lancome_line_conflict": "蘭蔻商品線不符",
"dr_hsieh_labsmart_line_conflict": "達特醫 LabSmart 系列不符",
"hoi_candle_line_conflict": "香氛蠟燭系列不符",
"sun_protection_line_conflict": "防曬品線不符",
"saugella_variant_conflict": "賽吉兒款式不符",
"lactacyd_variant_conflict": "立朵舒款式不符",
"aroma_scent_variant_conflict": "香氛香味款式不符",
"variant_descriptor_conflict": "款式描述不同",
"variant_selection_review": "多款任選待確認",
"strong_exact_spec_match": "強規格同款",
"strong_product_line_match": "商品線強吻合",
"shared_identity_anchor_reordered_line": "身份詞順序同款",
"focused_exact_identity_lab52_mouthwash": "Lab52 漱口水同款",
"focused_exact_identity_derma_eco_skin_oil": "Derma 護膚油同款",
"focused_exact_identity_pavaruni_40_scent_oil": "Pavaruni 40 香味精油同款",
"focused_exact_identity_pavaruni_20_scent_candle": "Pavaruni 20 香味蠟燭同款",
"focused_exact_identity_yuskin_classic_cream_30g_6pack": "悠斯晶 30g 六入同款",
"focused_exact_identity_lush_sakura_body_spray": "LUSH 櫻之花噴霧同款",
"focused_exact_identity_artmis_virile_gel": "ARTMIS 凝膠同款",
"focused_exact_identity_johnsons_baby_lotion_variant_catalog": "嬌生乳液型錄款待確認",
"shared_model_token": "型號一致",
}
MATCH_TYPE_LABELS = {
"exact": "高信心同款",
"same_product_different_pack": "同商品不同包裝",
"same_line_variant": "同系列不同款",
"comparable": "可比但需覆核",
"no_match": "非同款",
}
PRICE_BASIS_LABELS = {
"total_price": "總價可比",
"unit_price": "單位價可比",
"manual_review": "人工覆核後可比",
"none": "不可比",
}
ALERT_TIER_LABELS = {
"price_alert_exact": "可直接價格告警",
"unit_price_review": "單位價覆核",
"identity_review": "身份覆核",
"suppress": "不告警",
}
COMPETITOR_INTEL_CACHE_TTL_SECONDS = int(os.getenv("COMPETITOR_INTEL_CACHE_TTL_SECONDS", "1800"))
_BASE_DIR = Path(__file__).resolve().parents[1]
_CACHE_FILE = _BASE_DIR / "data" / "competitor_intel_cache.pkl"
_CACHE_LOCK = Lock()
_MEM_CACHE: dict[str, dict[str, Any]] = {}
def _num(value: Any) -> float:
try:
return float(value or 0)
except (TypeError, ValueError):
return 0.0
def _date_label(value: Any) -> str:
if hasattr(value, "strftime"):
return value.strftime("%Y-%m-%d")
return str(value or "")
def _month_label(value: Any) -> str:
if hasattr(value, "strftime"):
return value.strftime("%Y-%m")
return str(value or "")[:7]
def _attempt_status_label(status: Any) -> str:
return ATTEMPT_STATUS_LABELS.get(str(status or ""), str(status or "狀態待釐清"))
def _attempt_action_label(status: Any) -> str:
return ATTEMPT_ACTION_LABELS.get(str(status or ""), "人工確認比對證據")
def _parse_json_payload(value: Any) -> dict[str, Any]:
if isinstance(value, dict):
return value
if not value:
return {}
if isinstance(value, str):
try:
payload = json.loads(value)
return payload if isinstance(payload, dict) else {}
except Exception:
return {}
return {}
def _parse_tag_list(value: Any) -> list[str]:
if isinstance(value, list):
return [str(item) for item in value if item]
if isinstance(value, str):
try:
payload = json.loads(value)
if isinstance(payload, list):
return [str(item) for item in payload if item]
except Exception:
return []
return []
def _tag_suffix(tags: list[str], prefix: str) -> str:
marker = f"{prefix}_"
for tag in tags:
if tag.startswith(marker):
return tag.removeprefix(marker)
return ""
def _empty_manual_review_summary() -> dict[str, Any]:
return {
"total": 0,
"accept_identity": 0,
"reject_identity": 0,
"unit_price_required": 0,
"needs_research": 0,
"accept_rate": 0,
"action_labels": MANUAL_REVIEW_ACTION_LABELS,
}
def _extract_match_diagnostic_reasons(
diagnostic_text: Any,
diagnostic_payload: Optional[dict[str, Any]] = None,
) -> list[dict[str, str]]:
"""Translate matcher diagnostics into short operator-facing reason chips."""
text_value = str(diagnostic_text or "")
raw_reasons: list[Any] = []
if isinstance(diagnostic_payload, dict):
payload_reasons = diagnostic_payload.get("reasons")
if isinstance(payload_reasons, list):
raw_reasons.extend(payload_reasons)
elif isinstance(payload_reasons, str):
raw_reasons.extend(payload_reasons.replace("|", ",").split(","))
if text_value:
reason_blob = ""
for part in text_value.split(";"):
key, _, value = part.strip().partition("=")
if key.strip() == "reasons":
reason_blob = value.strip()
break
if reason_blob:
raw_reasons.extend(reason_blob.replace("|", ",").split(","))
if not raw_reasons:
return []
reasons: list[dict[str, str]] = []
seen: set[str] = set()
for raw_reason in raw_reasons:
code = str(raw_reason or "").strip()
if not code or code in seen:
continue
seen.add(code)
reasons.append({
"code": code,
"label": MATCH_DIAGNOSTIC_REASON_LABELS.get(code, code.replace("_", " ")),
})
return reasons
def _build_unit_comparison_for_attempt(row: dict[str, Any]) -> Optional[dict[str, Any]]:
status = str(row.get("attempt_status") or "")
if status not in UNIT_PRICE_DECISION_STATUSES:
return None
try:
from services.marketplace_product_matcher import build_unit_price_comparison
return build_unit_price_comparison(
row.get("name") or row.get("momo_product_name") or "",
row.get("best_competitor_product_name") or "",
row.get("momo_price"),
row.get("best_competitor_price"),
)
except Exception:
return {"comparable": False, "reason": "build_error"}
def _build_unit_price_business_insight(
unit_comparison: Optional[dict[str, Any]],
item: dict[str, Any],
) -> dict[str, Any]:
"""Turn unit-price math into an operator-facing business signal."""
if not isinstance(unit_comparison, dict) or not unit_comparison.get("comparable"):
return {}
unit_gap_pct = _num(unit_comparison.get("unit_gap_pct"))
unit_gap_amount = _num(unit_comparison.get("unit_gap_amount"))
momo_unit_price = _num(unit_comparison.get("momo_unit_price"))
competitor_unit_price = _num(unit_comparison.get("competitor_unit_price"))
unit_label = str(unit_comparison.get("unit_label") or "單位")
abs_gap = abs(unit_gap_pct)
if abs_gap < 3:
direction = "near_parity"
label = "單位價接近"
summary = f"單位價接近,差距 {unit_gap_pct:+.1f}%/{unit_label},先確認檔期、贈品與運費條件"
action_hint = "檢查檔期與贈品條件後再決定是否列入價格訊號"
elif unit_gap_pct > 0:
direction = "pchome_cheaper"
label = "PChome 單位價較低"
summary = f"PChome 單位價低 {abs_gap:.1f}%/{unit_label},屬於潛在價格壓力"
action_hint = "確認同品、同單位與檔期條件後,可納入競價壓力觀察"
else:
direction = "momo_cheaper"
label = "MOMO 單位價較低"
summary = f"MOMO 單位價低 {abs_gap:.1f}%/{unit_label},目前不應誤判為 PChome 價格壓力"
action_hint = "保留為單位價比較證據,不寫入總價型正式價差"
severity = "high" if abs_gap >= 15 else "medium" if abs_gap >= 5 else "low"
return {
"label": label,
"summary": summary,
"direction": direction,
"severity": severity,
"unit_label": unit_label,
"unit_gap_pct": round(unit_gap_pct, 2),
"unit_gap_amount": round(unit_gap_amount, 4),
"momo_unit_price": round(momo_unit_price, 4),
"competitor_unit_price": round(competitor_unit_price, 4),
"action_hint": action_hint,
"attempt_status": item.get("attempt_status") or "",
}
def _review_action_code(attempt_status: str) -> str:
if attempt_status == "rescore_accepted_current":
return "review_accept_identity"
if attempt_status in UNIT_PRICE_DECISION_STATUSES:
return "unit_price_required"
if attempt_status in {"no_result", "refresh_no_result", "manual_needs_research"}:
return "needs_research"
if attempt_status in {"identity_veto", "manual_rejected"}:
return "verify_or_reject_identity"
if attempt_status == "protected_existing_match":
return "compare_existing_identity"
if attempt_status == "expired_match":
return "refresh_or_compare_identity"
return "human_review"
def _review_data_quality(attempt_status: str, item: dict[str, Any]) -> str:
if attempt_status in {"no_result", "refresh_no_result", "never_attempted"}:
return "missing"
if not item.get("candidate_pc_id") or not item.get("candidate_pc_name"):
return "missing"
if item.get("candidate_pc_price", 0) <= 0 or item.get("momo_price", 0) <= 0:
return "partial"
if attempt_status == "rescore_accepted_current":
return "complete"
return "partial"
def _review_severity(attempt_status: str, item: dict[str, Any]) -> str:
momo_price = _num(item.get("momo_price"))
candidate_price = _num(item.get("candidate_pc_price"))
price_gap_pct = 0.0
if momo_price > 0 and candidate_price > 0:
price_gap_pct = (momo_price - candidate_price) / max(candidate_price, 1) * 100
if attempt_status == "rescore_accepted_current" and price_gap_pct >= 10:
return "P1"
if attempt_status == "rescore_accepted_current":
return "P2"
if attempt_status in UNIT_PRICE_DECISION_STATUSES:
return "P2"
if attempt_status == "protected_existing_match":
conflict = item.get("existing_match_conflict")
if isinstance(conflict, dict) and _num(conflict.get("score_delta")) >= 0.03:
return "P2"
return "P3"
if attempt_status in {"recoverable_low_score", "expired_match"}:
return "P3"
return "P4"
_EXISTING_MATCH_FIELD_RE = re.compile(r"\b(existing_id|incoming_id|existing_score|incoming_score)=([^;]+)")
def _parse_existing_match_conflict(error_message: Any) -> dict[str, Any]:
text = str(error_message or "")
if "existing_match_conflict" not in text:
return {}
fields = {match.group(1): match.group(2).strip() for match in _EXISTING_MATCH_FIELD_RE.finditer(text)}
if not fields:
return {}
existing_score = _num(fields.get("existing_score"))
incoming_score = _num(fields.get("incoming_score"))
return {
"existing_product_id": fields.get("existing_id") or "",
"incoming_product_id": fields.get("incoming_id") or "",
"existing_score": round(existing_score, 3),
"incoming_score": round(incoming_score, 3),
"score_delta": round(incoming_score - existing_score, 3),
}
def _build_review_decision_envelope(item: dict[str, Any]) -> dict[str, Any]:
"""Build the shared evidence contract for an operator review queue item."""
attempt_status = str(item.get("attempt_status") or "")
momo_price = _num(item.get("momo_price"))
candidate_price = _num(item.get("candidate_pc_price"))
gap_amount = None
gap_pct = None
if momo_price > 0 and candidate_price > 0:
gap_amount = round(momo_price - candidate_price, 2)
gap_pct = round((momo_price - candidate_price) / max(candidate_price, 1) * 100, 1)
evidence: list[dict[str, Any]] = [
{
"type": "review_status",
"metric": "attempt_status",
"value": attempt_status,
"basis": item.get("status_label") or _attempt_status_label(attempt_status),
},
{
"type": "match",
"metric": "match_score",
"value": round(_num(item.get("best_match_score")), 3),
"basis": "/".join(
part
for part in (
item.get("match_type") or "unknown",
item.get("price_basis") or "unknown",
item.get("alert_tier") or "unknown",
)
if part
),
"confidence": round(_num(item.get("best_match_score")), 3) or None,
},
]
if gap_pct is not None:
evidence.append({
"type": "price",
"metric": "candidate_gap_pct",
"value": f"{gap_pct:+.1f}%",
"basis": "MOMO latest price + PChome review candidate",
})
reason_text = item.get("diagnostic_reason_text")
if reason_text:
evidence.append({
"type": "diagnostic",
"metric": "reasons",
"value": reason_text,
"basis": "match_diagnostic_json.reasons",
})
unit_price_insight = item.get("unit_price_insight")
if isinstance(unit_price_insight, dict) and unit_price_insight:
evidence.append({
"type": "unit_price",
"metric": "unit_price_gap_pct",
"value": f"{_num(unit_price_insight.get('unit_gap_pct')):+.1f}%",
"basis": unit_price_insight.get("summary") or "unit price comparison",
})
existing_conflict = item.get("existing_match_conflict")
if isinstance(existing_conflict, dict) and existing_conflict:
score_delta = _num(existing_conflict.get("score_delta"))
incoming_score = _num(existing_conflict.get("incoming_score"))
existing_score = _num(existing_conflict.get("existing_score"))
evidence.append({
"type": "conflict",
"metric": "existing_match_conflict",
"value": (
f"新候選 {existing_conflict.get('incoming_product_id') or 'unknown'} "
f"{incoming_score:.3f} vs "
f"既有候選 {existing_conflict.get('existing_product_id') or 'unknown'} "
f"{existing_score:.3f}"
),
"basis": f"score_delta={score_delta:+.3f}; overwrite_protection=on",
})
return {
"decision_id": (
"review_queue:"
f"{item.get('sku') or 'unknown'}:"
f"{attempt_status or 'unknown'}:"
f"{item.get('candidate_pc_id') or 'no_candidate'}"
),
"source_agent": "review_queue",
"decision_type": "pchome_match_review",
"severity": _review_severity(attempt_status, item),
"subject": {
"sku": str(item.get("sku") or ""),
"name": item.get("name") or "",
"event_type": "pchome_match_review",
"competitor_product_id": item.get("candidate_pc_id") or "",
"competitor_product_name": item.get("candidate_pc_name") or "",
},
"evidence": evidence,
"recommended_action": {
"action": _review_action_code(attempt_status),
"owner": "營運",
"requires_hitl": True,
},
"expected_impact": {
"gap_amount": gap_amount,
"candidate_gap_pct": gap_pct,
"unit_price_insight": unit_price_insight if isinstance(unit_price_insight, dict) else {},
"existing_match_conflict": existing_conflict if isinstance(existing_conflict, dict) else {},
"risk_reduction": "medium" if attempt_status in {"rescore_accepted_current", "recoverable_low_score"} else "watch",
},
"confidence": round(_num(item.get("best_match_score")), 3),
"guardrails": {
"can_auto_execute": False,
"blocked_reason": "PChome 候選需人工覆核;不得自動寫入正式 competitor_prices",
"data_quality": _review_data_quality(attempt_status, item),
"attempt_status": attempt_status,
"existing_match_protected": bool(existing_conflict),
"match_type": item.get("match_type") or "",
"price_basis": item.get("price_basis") or "",
"alert_tier": item.get("alert_tier") or "",
},
"trace": {
"source": "competitor_match_attempts",
"attempted_at": item.get("attempted_at") or "",
},
}
def _decision_action_label(action_code: str) -> str:
return DECISION_ACTION_LABELS.get(action_code or "", action_code or "人工覆核")
def _data_quality_label(data_quality: str) -> str:
return DATA_QUALITY_LABELS.get(data_quality or "", data_quality or "證據部分完整")
def summarize_review_decision_envelopes(
review_queue: list[dict[str, Any]],
limit: int = 5,
) -> dict[str, Any]:
"""Create a compact evidence brief for OpenClaw/PPT from shared envelopes."""
limit = max(1, min(int(limit or 5), 10))
lines: list[str] = []
items: list[dict[str, Any]] = []
severity_counts: dict[str, int] = {}
data_quality_counts: dict[str, int] = {}
hitl_count = 0
auto_execute_blocked_count = 0
for idx, row in enumerate((review_queue or [])[:limit], start=1):
envelope = row.get("decision_envelope") or {}
subject = envelope.get("subject") if isinstance(envelope.get("subject"), dict) else {}
guardrails = envelope.get("guardrails") if isinstance(envelope.get("guardrails"), dict) else {}
action = envelope.get("recommended_action") if isinstance(envelope.get("recommended_action"), dict) else {}
expected = envelope.get("expected_impact") if isinstance(envelope.get("expected_impact"), dict) else {}
evidence = envelope.get("evidence") if isinstance(envelope.get("evidence"), list) else []
severity = str(envelope.get("severity") or "P4")
data_quality = str(guardrails.get("data_quality") or "partial")
action_code = str(action.get("action") or "human_review")
requires_hitl = bool(action.get("requires_hitl", True))
can_auto_execute = bool(guardrails.get("can_auto_execute"))
sku = str(subject.get("sku") or row.get("sku") or "")
name = str(subject.get("name") or row.get("name") or "")
pchome_id = str(subject.get("competitor_product_id") or row.get("candidate_pc_id") or "")
gap_pct = expected.get("candidate_gap_pct")
gap_text = f"價差 {gap_pct:+.1f}%" if isinstance(gap_pct, (int, float)) else ""
unit_insight = expected.get("unit_price_insight")
unit_gap_pct = unit_insight.get("unit_gap_pct") if isinstance(unit_insight, dict) else None
unit_label = unit_insight.get("unit_label") if isinstance(unit_insight, dict) else ""
unit_text = (
f"單位價差 {unit_gap_pct:+.1f}%/{unit_label or '單位'}"
if isinstance(unit_gap_pct, (int, float))
else ""
)
evidence_basis = ""
for evidence_row in evidence:
if isinstance(evidence_row, dict) and evidence_row.get("metric") == "match_score":
evidence_basis = str(evidence_row.get("basis") or "")
break
severity_counts[severity] = severity_counts.get(severity, 0) + 1
data_quality_counts[data_quality] = data_quality_counts.get(data_quality, 0) + 1
if requires_hitl:
hitl_count += 1
if not can_auto_execute:
auto_execute_blocked_count += 1
pchome_text = f"PChome {pchome_id}" if pchome_id else "無候選 ID"
line_parts = [
f"{idx}. [{severity}/{_data_quality_label(data_quality)}{'/HITL' if requires_hitl else ''}]",
f"SKU {sku}",
name[:28],
f"{_decision_action_label(action_code)}",
pchome_text,
]
if gap_text:
line_parts.append(gap_text)
if unit_text:
line_parts.append(unit_text)
if evidence_basis:
line_parts.append(evidence_basis)
line = " | ".join(part for part in line_parts if part)
lines.append(line)
items.append({
"decision_id": envelope.get("decision_id") or "",
"severity": severity,
"sku": sku,
"name": name,
"competitor_product_id": pchome_id,
"action": action_code,
"action_label": _decision_action_label(action_code),
"data_quality": data_quality,
"data_quality_label": _data_quality_label(data_quality),
"requires_hitl": requires_hitl,
"can_auto_execute": can_auto_execute,
"candidate_gap_pct": gap_pct,
"unit_price_gap_pct": unit_gap_pct,
"line": line,
})
return {
"items": items,
"lines": lines,
"text": "\n".join(lines) if lines else "(目前沒有待覆核決策信封)",
"severity_counts": severity_counts,
"data_quality_counts": data_quality_counts,
"hitl_count": hitl_count,
"auto_execute_blocked_count": auto_execute_blocked_count,
}
def _format_competitor_review_item(row: dict[str, Any]) -> dict[str, Any]:
item = dict(row)
unit_comparison = _build_unit_comparison_for_attempt(item)
unit_price_insight = _build_unit_price_business_insight(unit_comparison, item)
match_diagnostic = item.get("error_message") or ""
diagnostic_payload = _parse_json_payload(item.get("match_diagnostic_json"))
tags = _parse_tag_list(item.get("tags"))
match_type = (
diagnostic_payload.get("match_type")
or _tag_suffix(tags, "match_type")
or ("same_product_different_pack" if item.get("attempt_status") in UNIT_COMPARABLE_STATUSES else "")
)
price_basis = diagnostic_payload.get("price_basis") or _tag_suffix(tags, "price_basis") or ""
alert_tier = diagnostic_payload.get("alert_tier") or _tag_suffix(tags, "alert_tier") or ""
evidence_flags = diagnostic_payload.get("evidence_flags") or []
diagnostic_reasons = _extract_match_diagnostic_reasons(match_diagnostic, diagnostic_payload)
existing_match_conflict = _parse_existing_match_conflict(match_diagnostic)
formatted = {
"sku": str(item.get("sku") or ""),
"name": item.get("name") or "",
"category": item.get("category") or "",
"momo_price": _num(item.get("momo_price")),
"attempt_status": item.get("attempt_status") or "",
"status_label": _attempt_status_label(item.get("attempt_status")),
"action_label": _attempt_action_label(item.get("attempt_status")),
"candidate_count": int(item.get("candidate_count") or 0),
"candidate_pc_id": item.get("best_competitor_product_id"),
"candidate_pc_name": item.get("best_competitor_product_name") or "",
"candidate_pc_price": _num(item.get("best_competitor_price")),
"best_match_score": _num(item.get("best_match_score")),
"match_diagnostic": match_diagnostic,
"match_type": match_type,
"match_type_label": MATCH_TYPE_LABELS.get(match_type, match_type or "待判讀"),
"price_basis": price_basis,
"price_basis_label": PRICE_BASIS_LABELS.get(price_basis, price_basis or "待判讀"),
"alert_tier": alert_tier,
"alert_tier_label": ALERT_TIER_LABELS.get(alert_tier, alert_tier or "待判讀"),
"evidence_flags": list(evidence_flags) if isinstance(evidence_flags, list) else [],
"diagnostic_reasons": diagnostic_reasons,
"diagnostic_reason_text": "".join(reason["label"] for reason in diagnostic_reasons),
"existing_match_conflict": existing_match_conflict,
"attempted_at": _date_label(item.get("attempted_at")),
"unit_comparison": unit_comparison,
"unit_price_insight": unit_price_insight,
}
formatted["decision_envelope"] = _build_review_decision_envelope(formatted)
return formatted
def clear_competitor_intel_cache() -> None:
"""Clear cached PChome/MOMO intelligence after crawler/import updates."""
with _CACHE_LOCK:
_MEM_CACHE.clear()
try:
if _CACHE_FILE.exists():
_CACHE_FILE.unlink()
except OSError:
pass
def _load_shared_cache() -> dict[str, dict[str, Any]]:
if not _CACHE_FILE.exists():
return {}
try:
with _CACHE_FILE.open("rb") as handle:
payload = pickle.load(handle)
return payload if isinstance(payload, dict) else {}
except Exception:
return {}
def _write_shared_cache(payload: dict[str, dict[str, Any]]) -> None:
try:
_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp_file = _CACHE_FILE.with_suffix(f".{os.getpid()}.tmp")
with tmp_file.open("wb") as handle:
pickle.dump(payload, handle, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(tmp_file, _CACHE_FILE)
except Exception:
try:
if "tmp_file" in locals() and tmp_file.exists():
tmp_file.unlink()
except OSError:
pass
def _cached_payload(cache_key: str, producer, ttl_seconds: int = COMPETITOR_INTEL_CACHE_TTL_SECONDS):
if ttl_seconds <= 0:
return producer()
now = time.time()
with _CACHE_LOCK:
entry = _MEM_CACHE.get(cache_key)
if entry and now - float(entry.get("time", 0)) < ttl_seconds:
return entry.get("value")
shared = _load_shared_cache()
entry = shared.get(cache_key)
if entry and now - float(entry.get("time", 0)) < ttl_seconds:
_MEM_CACHE[cache_key] = entry
return entry.get("value")
value = producer()
entry = {"time": now, "value": value}
with _CACHE_LOCK:
_MEM_CACHE[cache_key] = entry
shared = _load_shared_cache()
shared[cache_key] = entry
stale_before = now - max(ttl_seconds * 4, 3600)
shared = {
key: item
for key, item in shared.items()
if isinstance(item, dict) and float(item.get("time", 0)) >= stale_before
}
_write_shared_cache(shared)
return value
def fetch_competitor_coverage(engine) -> dict:
return _cached_payload(
f"coverage:v9:floor={PCHOME_MATCH_SCORE_FLOOR}:manual_reviews=1:rescore=1:review_no_fresh=1:decision_ready=1:open_queue=1",
lambda: _fetch_competitor_coverage_uncached(engine),
)
def _fetch_competitor_coverage_uncached(engine) -> dict:
"""讀取目前 PChome 比價覆蓋率與待審分類。"""
inspector = inspect(engine)
manual_review_summary = _empty_manual_review_summary()
if inspector.has_table("competitor_match_reviews"):
manual_review_summary = _fetch_manual_review_summary(engine)
if not inspector.has_table("competitor_prices"):
return {
"active_with_price": 0,
"valid_matches": 0,
"fresh_matches": 0,
"stale_matches": 0,
"pending": 0,
"decision_ready_matches": 0,
"identity_coverage_matches": 0,
"identity_coverage_rate": 0,
"pending_identity_count": 0,
"stale_identity_count": 0,
"match_rate": 0,
"fresh_match_rate": 0,
"decision_ready_rate": 0,
"last_decision_ready_crawled_at": None,
"attempt_status": {},
"unit_comparable_count": 0,
"rescore_accepted_count": 0,
"actionable_review_count": 0,
"manual_closed_count": 0,
"manual_review_summary": manual_review_summary,
"manual_review_total": manual_review_summary["total"],
"manual_accept_count": manual_review_summary["accept_identity"],
"manual_reject_count": manual_review_summary["reject_identity"],
"manual_unit_price_count": manual_review_summary["unit_price_required"],
"manual_accept_rate": manual_review_summary["accept_rate"],
}
has_match_attempts = inspector.has_table("competitor_match_attempts")
attempt_cte = """
latest_attempt AS (
SELECT
NULL AS sku,
NULL AS attempt_status
WHERE FALSE
)
"""
if has_match_attempts:
attempt_cte = """
latest_attempt AS (
SELECT DISTINCT ON (sku)
sku,
attempt_status
FROM competitor_match_attempts
WHERE source = 'pchome'
ORDER BY sku, attempted_at DESC NULLS LAST
)
"""
sql = text(f"""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
latest_price.price AS momo_price
FROM products p
JOIN LATERAL (
SELECT pr.price
FROM price_records pr
WHERE pr.product_id = p.id
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) latest_price ON TRUE
WHERE p.status = 'ACTIVE'
),
identity_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku,
cp.expires_at,
cp.crawled_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
),
fresh_competitor AS (
SELECT sku, crawled_at
FROM identity_competitor
WHERE expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP
),
{attempt_cte}
SELECT
(SELECT COUNT(*) FROM latest_momo) AS active_with_price,
(SELECT COUNT(*)
FROM latest_momo lm
JOIN identity_competitor ic ON ic.sku = lm.sku) AS valid_matches,
(SELECT COUNT(*)
FROM latest_momo lm
JOIN fresh_competitor fc ON fc.sku = lm.sku) AS fresh_matches,
(SELECT COUNT(*)
FROM latest_momo lm
JOIN identity_competitor ic ON ic.sku = lm.sku
LEFT JOIN fresh_competitor fc ON fc.sku = lm.sku
WHERE fc.sku IS NULL) AS stale_matches,
(SELECT COUNT(*)
FROM latest_momo lm
LEFT JOIN identity_competitor ic ON ic.sku = lm.sku
WHERE ic.sku IS NULL) AS pending,
(SELECT MAX(fc.crawled_at)
FROM latest_momo lm
JOIN fresh_competitor fc ON fc.sku = lm.sku) AS last_decision_ready_crawled_at,
COALESCE(la.attempt_status, 'never_attempted') AS attempt_status,
COUNT(*) AS status_count
FROM latest_momo lm
LEFT JOIN fresh_competitor fc ON fc.sku = lm.sku
LEFT JOIN latest_attempt la ON la.sku = lm.sku
WHERE fc.sku IS NULL
GROUP BY COALESCE(la.attempt_status, 'never_attempted')
""")
with engine.connect() as conn:
rows = conn.execute(sql).mappings().all()
active = int(rows[0].get("active_with_price") or 0) if rows else 0
valid = int(rows[0].get("valid_matches") or 0) if rows else 0
fresh = int(rows[0].get("fresh_matches") or 0) if rows else 0
stale = int(rows[0].get("stale_matches") or 0) if rows else 0
pending = int(rows[0].get("pending") or 0) if rows else 0
statuses = {
str(row.get("attempt_status")): int(row.get("status_count") or 0)
for row in rows
}
unit_count = sum(statuses.get(status, 0) for status in UNIT_COMPARABLE_STATUSES)
rescore_accepted_count = int(statuses.get("rescore_accepted_current") or 0)
actionable_count = sum(statuses.get(status, 0) for status in ACTIONABLE_ATTEMPT_STATUSES)
manual_closed_count = sum(statuses.get(status, 0) for status in MANUAL_CLOSED_ATTEMPT_STATUSES)
last_decision_ready_crawled_at = rows[0].get("last_decision_ready_crawled_at") if rows else None
return {
"active_with_price": active,
"valid_matches": valid,
"fresh_matches": fresh,
"stale_matches": stale,
"pending": pending,
"decision_ready_matches": fresh,
"identity_coverage_matches": valid,
"identity_coverage_rate": round(valid / max(active, 1) * 100, 1),
"pending_identity_count": pending,
"stale_identity_count": stale,
"match_rate": round(valid / max(active, 1) * 100, 1),
"fresh_match_rate": round(fresh / max(valid, 1) * 100, 1),
"decision_ready_rate": round(fresh / max(active, 1) * 100, 1),
"last_decision_ready_crawled_at": last_decision_ready_crawled_at,
"attempt_status": statuses,
"unit_comparable_count": unit_count,
"rescore_accepted_count": rescore_accepted_count,
"actionable_review_count": actionable_count,
"manual_closed_count": manual_closed_count,
"manual_review_summary": manual_review_summary,
"manual_review_total": manual_review_summary["total"],
"manual_accept_count": manual_review_summary["accept_identity"],
"manual_reject_count": manual_review_summary["reject_identity"],
"manual_unit_price_count": manual_review_summary["unit_price_required"],
"manual_accept_rate": manual_review_summary["accept_rate"],
"match_score_floor": PCHOME_MATCH_SCORE_FLOOR,
}
def _fetch_manual_review_summary(engine) -> dict[str, Any]:
sql = text("""
WITH latest_reviews AS (
SELECT DISTINCT ON (sku, source, candidate_product_id)
sku,
source,
candidate_product_id,
review_action,
reviewed_at
FROM competitor_match_reviews
WHERE source = 'pchome'
ORDER BY sku, source, candidate_product_id, reviewed_at DESC, id DESC
)
SELECT
review_action,
COUNT(*) AS action_count
FROM latest_reviews
GROUP BY review_action
""")
try:
with engine.connect() as conn:
rows = conn.execute(sql).mappings().all()
except Exception:
return _empty_manual_review_summary()
summary = _empty_manual_review_summary()
for row in rows:
action = str(row.get("review_action") or "")
if action in summary:
summary[action] = int(row.get("action_count") or 0)
summary["total"] = sum(
int(summary.get(action) or 0)
for action in MANUAL_REVIEW_ACTION_LABELS
)
summary["accept_rate"] = round(
summary["accept_identity"] / max(summary["total"], 1) * 100,
1,
)
return summary
def fetch_competitor_gap_trend(engine, days: int = 30) -> dict:
days = max(7, min(int(days or 30), 120))
return _cached_payload(
f"gap_trend:v2:days={days}:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_competitor_gap_trend_uncached(engine, days=days),
)
def _fetch_competitor_gap_trend_uncached(engine, days: int = 30) -> dict:
"""近 N 天 PChome 價差壓力趨勢。"""
if not inspect(engine).has_table("competitor_price_history"):
return {"labels": [], "avg_gap_pct": [], "risk_count": [], "momo_advantage_count": [], "match_count": []}
days = max(7, min(int(days or 30), 120))
sql = text(f"""
WITH latest_history AS (
SELECT
date_trunc('day', cph.crawled_at)::date AS bucket_date,
cph.sku,
cph.momo_price,
cph.price AS pchome_price,
ROW_NUMBER() OVER (
PARTITION BY date_trunc('day', cph.crawled_at)::date, cph.sku
ORDER BY cph.crawled_at DESC
) AS rn
FROM competitor_price_history cph
WHERE cph.source = 'pchome'
AND cph.crawled_at >= CURRENT_DATE - (:days * INTERVAL '1 day')
AND cph.momo_price IS NOT NULL
AND cph.momo_price > 0
AND cph.price IS NOT NULL
AND cph.price > 0
AND COALESCE(cph.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cph.tags, '[]'::jsonb) ? 'identity_v2'
)
SELECT
bucket_date,
COUNT(*) AS match_count,
ROUND(AVG((momo_price - pchome_price) / pchome_price * 100)::numeric, 2) AS avg_gap_pct,
SUM(CASE WHEN momo_price > pchome_price * 1.05 THEN 1 ELSE 0 END) AS risk_count,
SUM(CASE WHEN momo_price < pchome_price * 0.95 THEN 1 ELSE 0 END) AS momo_advantage_count
FROM latest_history
WHERE rn = 1
GROUP BY bucket_date
ORDER BY bucket_date
""")
with engine.connect() as conn:
rows = conn.execute(sql, {"days": days}).mappings().all()
return {
"labels": [_date_label(row.get("bucket_date")) for row in rows],
"avg_gap_pct": [_num(row.get("avg_gap_pct")) for row in rows],
"risk_count": [int(row.get("risk_count") or 0) for row in rows],
"momo_advantage_count": [int(row.get("momo_advantage_count") or 0) for row in rows],
"match_count": [int(row.get("match_count") or 0) for row in rows],
}
def fetch_competitor_monthly_pressure(engine, months: int = 12) -> dict:
months = max(3, min(int(months or 12), 36))
return _cached_payload(
f"monthly_pressure:v2:months={months}:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_competitor_monthly_pressure_uncached(engine, months=months),
)
def _fetch_competitor_monthly_pressure_uncached(engine, months: int = 12) -> dict:
"""月度競品價格壓力,用於 growth analysis。"""
if not inspect(engine).has_table("competitor_price_history"):
return {"labels": [], "avg_gap_pct": [], "risk_count": [], "match_count": []}
months = max(3, min(int(months or 12), 36))
sql = text(f"""
WITH latest_history AS (
SELECT
date_trunc('month', cph.crawled_at)::date AS bucket_month,
cph.sku,
cph.momo_price,
cph.price AS pchome_price,
ROW_NUMBER() OVER (
PARTITION BY date_trunc('month', cph.crawled_at)::date, cph.sku
ORDER BY cph.crawled_at DESC
) AS rn
FROM competitor_price_history cph
WHERE cph.source = 'pchome'
AND cph.crawled_at >= date_trunc('month', CURRENT_DATE) - (:months * INTERVAL '1 month')
AND cph.momo_price IS NOT NULL
AND cph.momo_price > 0
AND cph.price IS NOT NULL
AND cph.price > 0
AND COALESCE(cph.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cph.tags, '[]'::jsonb) ? 'identity_v2'
)
SELECT
bucket_month,
COUNT(*) AS match_count,
ROUND(AVG((momo_price - pchome_price) / pchome_price * 100)::numeric, 2) AS avg_gap_pct,
SUM(CASE WHEN momo_price > pchome_price * 1.05 THEN 1 ELSE 0 END) AS risk_count
FROM latest_history
WHERE rn = 1
GROUP BY bucket_month
ORDER BY bucket_month
""")
with engine.connect() as conn:
rows = conn.execute(sql, {"months": months}).mappings().all()
return {
"labels": [_month_label(row.get("bucket_month")) for row in rows],
"avg_gap_pct": [_num(row.get("avg_gap_pct")) for row in rows],
"risk_count": [int(row.get("risk_count") or 0) for row in rows],
"match_count": [int(row.get("match_count") or 0) for row in rows],
}
def fetch_top_competitor_risks(engine, limit: int = 10) -> list[dict]:
limit = max(1, min(int(limit or 10), 50))
return _cached_payload(
f"top_risks:v2:limit={limit}:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_top_competitor_risks_uncached(engine, limit=limit),
)
def _fetch_top_competitor_risks_uncached(engine, limit: int = 10) -> list[dict]:
"""目前 MOMO 比 PChome 貴的高風險商品。"""
if not inspect(engine).has_table("competitor_prices"):
return []
limit = max(1, min(int(limit or 10), 50))
sql = text(f"""
WITH valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku,
cp.price AS pchome_price,
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score,
cp.tags,
cp.match_diagnostic_json,
cp.comparison_mode,
cp.crawled_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
)
SELECT
p.i_code AS sku,
p.name,
p.category,
latest_price.momo_price,
vc.pchome_price,
vc.competitor_product_id,
vc.competitor_product_name,
vc.match_score,
vc.tags,
vc.match_diagnostic_json,
vc.comparison_mode,
vc.crawled_at,
(latest_price.momo_price - vc.pchome_price) AS gap_amount,
((latest_price.momo_price - vc.pchome_price) / vc.pchome_price * 100) AS gap_pct
FROM valid_competitor vc
JOIN products p
ON p.i_code = vc.sku
AND p.status = 'ACTIVE'
JOIN LATERAL (
SELECT pr.price AS momo_price
FROM price_records pr
WHERE pr.product_id = p.id
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) latest_price ON TRUE
WHERE latest_price.momo_price > vc.pchome_price * 1.05
ORDER BY gap_pct DESC NULLS LAST, gap_amount DESC NULLS LAST
LIMIT :limit
""")
with engine.connect() as conn:
rows = conn.execute(sql, {"limit": limit}).mappings().all()
result = []
for row in rows:
diagnostic_payload = _parse_json_payload(row.get("match_diagnostic_json"))
tags = _parse_tag_list(row.get("tags"))
match_type = diagnostic_payload.get("match_type") or _tag_suffix(tags, "match_type")
price_basis = diagnostic_payload.get("price_basis") or _tag_suffix(tags, "price_basis")
alert_tier = diagnostic_payload.get("alert_tier") or _tag_suffix(tags, "alert_tier")
result.append({
"sku": str(row.get("sku") or ""),
"name": row.get("name") or "",
"category": row.get("category") or "",
"momo_price": _num(row.get("momo_price")),
"pchome_price": _num(row.get("pchome_price")),
"gap_amount": _num(row.get("gap_amount")),
"gap_pct": _num(row.get("gap_pct")),
"match_score": _num(row.get("match_score")),
"pchome_id": row.get("competitor_product_id"),
"pchome_name": row.get("competitor_product_name") or "",
"match_type": match_type,
"match_type_label": MATCH_TYPE_LABELS.get(match_type, match_type or "待判讀"),
"price_basis": price_basis,
"price_basis_label": PRICE_BASIS_LABELS.get(price_basis, price_basis or "待判讀"),
"alert_tier": alert_tier,
"alert_tier_label": ALERT_TIER_LABELS.get(alert_tier, alert_tier or "待判讀"),
"crawled_at": _date_label(row.get("crawled_at")),
})
return result
def fetch_competitor_review_queue(engine, limit: int = 12) -> list[dict]:
"""可行動的 PChome 比對覆核隊列,供 Dashboard / AI / PPT 共用。"""
limit = max(1, min(int(limit or 12), 50))
return _cached_payload(
f"review_queue:v3:limit={limit}:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_competitor_review_queue_uncached(engine, limit=limit),
)
def fetch_competitor_review_queue_page(
engine,
page: int = 1,
per_page: int = 50,
search_query: str = "",
category: str = "",
status_filter: str = "",
count_total: bool = True,
) -> dict:
"""Paginated PChome review queue for operator-facing Dashboard pages."""
page = max(1, int(page or 1))
per_page = max(1, min(int(per_page or 50), 100))
search_query = (search_query or "").strip()
category = (category or "").strip()
status_filter = (status_filter or "").strip()
if status_filter not in REVIEW_STATUS_FILTER_GROUPS:
status_filter = ""
cache_key = (
"review_queue_page:v3:"
f"page={page}:per={per_page}:q={search_query.lower()}:cat={category}:"
f"status={status_filter}:"
f"count={int(bool(count_total))}:"
f"floor={PCHOME_MATCH_SCORE_FLOOR}"
)
return _cached_payload(
cache_key,
lambda: _fetch_competitor_review_queue_page_uncached(
engine,
page=page,
per_page=per_page,
search_query=search_query,
category=category,
status_filter=status_filter,
count_total=count_total,
),
ttl_seconds=min(COMPETITOR_INTEL_CACHE_TTL_SECONDS, 300),
)
def _review_queue_cte_and_filter(
search_query: str = "",
category: str = "",
status_filter: str = "",
) -> tuple[str, dict[str, Any]]:
params: dict[str, Any] = {}
status_filter = (status_filter or "").strip()
status_values = REVIEW_STATUS_FILTER_GROUPS.get(status_filter) or tuple(ACTIONABLE_ATTEMPT_STATUSES)
status_sql = ", ".join(f"'{status}'" for status in status_values)
filters = [
f"la.attempt_status IN ({status_sql})",
f"""NOT EXISTS (
SELECT 1
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND cp.sku = la.sku
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
)""",
]
if search_query:
params["search_like"] = f"%{search_query.lower()}%"
filters.append("(LOWER(p.name) LIKE :search_like OR LOWER(p.i_code) LIKE :search_like)")
if category:
params["category"] = category
filters.append("p.category = :category")
where_sql = "\n AND ".join(filters)
cte = f"""
WITH latest_attempt AS (
SELECT DISTINCT ON (cma.sku)
cma.sku,
cma.attempt_status,
cma.candidate_count,
cma.best_competitor_product_id,
cma.best_competitor_product_name,
cma.best_competitor_price,
cma.best_match_score,
cma.match_diagnostic_json,
cma.error_message,
cma.attempted_at
FROM competitor_match_attempts cma
WHERE cma.source = 'pchome'
ORDER BY cma.sku, cma.attempted_at DESC NULLS LAST
),
review_rows AS (
SELECT
p.i_code AS sku,
p.name,
p.category,
latest_price.price AS momo_price,
la.attempt_status,
la.candidate_count,
la.best_competitor_product_id,
la.best_competitor_product_name,
la.best_competitor_price,
la.best_match_score,
la.match_diagnostic_json,
la.error_message,
la.attempted_at,
CASE
WHEN la.attempt_status = 'rescore_accepted_current' THEN 0
WHEN la.attempt_status IN ('unit_comparable', 'refresh_unit_comparable') THEN 1
WHEN la.attempt_status = 'identity_veto' THEN 2
WHEN la.attempt_status IN ('recoverable_low_score', 'low_score', 'refresh_low_score') THEN 3
WHEN la.attempt_status = 'protected_existing_match' THEN 4
WHEN la.attempt_status = 'true_low_confidence' THEN 5
WHEN la.attempt_status = 'expired_match' THEN 6
ELSE 7
END AS priority_rank
FROM latest_attempt la
JOIN products p
ON p.i_code = la.sku
AND p.status = 'ACTIVE'
JOIN LATERAL (
SELECT pr.price
FROM price_records pr
WHERE pr.product_id = p.id
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) latest_price ON TRUE
WHERE {where_sql}
)
"""
return cte, params
def _fetch_competitor_review_queue_page_uncached(
engine,
page: int = 1,
per_page: int = 50,
search_query: str = "",
category: str = "",
status_filter: str = "",
count_total: bool = True,
) -> dict:
inspector = inspect(engine)
if not (
inspector.has_table("products")
and inspector.has_table("price_records")
and inspector.has_table("competitor_prices")
and inspector.has_table("competitor_match_attempts")
):
return {
"items": [],
"total": 0,
"page": max(1, int(page or 1)),
"per_page": per_page,
"status_filter": status_filter,
}
page = max(1, int(page or 1))
per_page = max(1, min(int(per_page or 50), 100))
cte, params = _review_queue_cte_and_filter(
search_query=search_query,
category=category,
status_filter=status_filter,
)
page_params = {
**params,
"limit": per_page,
"offset": (page - 1) * per_page,
}
if count_total:
page_sql = text(cte + """
, total_rows AS (
SELECT COUNT(*) AS total_count
FROM review_rows
),
paged_rows AS (
SELECT *
FROM review_rows
ORDER BY
priority_rank ASC,
momo_price DESC NULLS LAST,
best_match_score DESC NULLS LAST,
attempted_at DESC NULLS LAST
LIMIT :limit OFFSET :offset
)
SELECT paged_rows.*, total_rows.total_count
FROM total_rows
LEFT JOIN paged_rows ON TRUE
""")
else:
page_sql = text(cte + """
SELECT *
FROM review_rows
ORDER BY
priority_rank ASC,
momo_price DESC NULLS LAST,
best_match_score DESC NULLS LAST,
attempted_at DESC NULLS LAST
LIMIT :limit OFFSET :offset
""")
with engine.connect() as conn:
rows = conn.execute(page_sql, page_params).mappings().all()
total = int(rows[0].get("total_count") or 0) if count_total and rows else -1
item_rows = [dict(row) for row in rows if row.get("sku")]
return {
"items": [_format_competitor_review_item(row) for row in item_rows],
"total": total,
"page": page,
"per_page": per_page,
"status_filter": status_filter,
}
def _fetch_competitor_review_queue_uncached(engine, limit: int = 12) -> list[dict]:
inspector = inspect(engine)
if not (
inspector.has_table("products")
and inspector.has_table("price_records")
and inspector.has_table("competitor_prices")
and inspector.has_table("competitor_match_attempts")
):
return []
limit = max(1, min(int(limit or 12), 50))
sql = text(f"""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.category,
latest_price.price AS momo_price
FROM products p
JOIN LATERAL (
SELECT pr.price
FROM price_records pr
WHERE pr.product_id = p.id
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) latest_price ON TRUE
WHERE p.status = 'ACTIVE'
),
valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
),
latest_attempt AS (
SELECT DISTINCT ON (cma.sku)
cma.sku,
cma.attempt_status,
cma.candidate_count,
cma.best_competitor_product_id,
cma.best_competitor_product_name,
cma.best_competitor_price,
cma.best_match_score,
cma.match_diagnostic_json,
cma.error_message,
cma.attempted_at
FROM competitor_match_attempts cma
WHERE cma.source = 'pchome'
ORDER BY cma.sku, cma.attempted_at DESC NULLS LAST
)
SELECT
lm.sku,
lm.name,
lm.category,
lm.momo_price,
la.attempt_status,
la.candidate_count,
la.best_competitor_product_id,
la.best_competitor_product_name,
la.best_competitor_price,
la.best_match_score,
la.match_diagnostic_json,
la.error_message,
la.attempted_at
FROM latest_momo lm
JOIN latest_attempt la ON la.sku = lm.sku
LEFT JOIN valid_competitor vc ON vc.sku = lm.sku
WHERE vc.sku IS NULL
AND la.attempt_status IN (
'rescore_accepted_current',
'unit_comparable',
'refresh_unit_comparable',
'identity_veto',
'low_score',
'refresh_low_score',
'recoverable_low_score',
'true_low_confidence',
'protected_existing_match',
'expired_match',
'refresh_no_result',
'no_result'
)
ORDER BY
CASE
WHEN la.attempt_status = 'rescore_accepted_current' THEN 0
WHEN la.attempt_status IN ('unit_comparable', 'refresh_unit_comparable') THEN 1
WHEN la.attempt_status = 'identity_veto' THEN 2
WHEN la.attempt_status IN ('recoverable_low_score', 'low_score', 'refresh_low_score') THEN 3
WHEN la.attempt_status = 'protected_existing_match' THEN 4
WHEN la.attempt_status = 'true_low_confidence' THEN 5
WHEN la.attempt_status = 'expired_match' THEN 6
ELSE 7
END,
lm.momo_price DESC NULLS LAST,
la.best_match_score DESC NULLS LAST,
la.attempted_at DESC NULLS LAST
LIMIT :limit
""")
with engine.connect() as conn:
rows = conn.execute(sql, {"limit": limit}).mappings().all()
return [_format_competitor_review_item(dict(row)) for row in rows]
def fetch_competitor_comparison_results(
engine,
start_date: Optional[Union[date, datetime, str]] = None,
end_date: Optional[Union[date, datetime, str]] = None,
limit: int = 30,
) -> list[dict]:
"""輸出與 legacy competitor PPT 相容的比價結果,不再 live crawl。"""
limit = max(1, min(int(limit or 30), 100))
inspector = inspect(engine)
if not (
inspector.has_table("products")
and inspector.has_table("price_records")
):
return []
start_date_param = str(start_date).replace("/", "-")[:10] if start_date else ""
end_date_param = str(end_date).replace("/", "-")[:10] if end_date else ""
requested_historical_prices = bool(start_date_param or end_date_param)
use_history_prices = bool(requested_historical_prices and inspector.has_table("competitor_price_history"))
if requested_historical_prices and not use_history_prices:
return []
if not (use_history_prices or inspector.has_table("competitor_prices")):
return []
has_daily_sales = inspector.has_table("daily_sales")
has_match_attempts = inspector.has_table("competitor_match_attempts")
sales_cte = ""
sales_join = ""
sales_select = "0 AS momo_revenue,"
momo_price_cutoff = ""
attempt_cte = """
latest_attempt AS (
SELECT
NULL AS sku,
NULL AS attempt_status,
NULL AS candidate_count,
NULL AS best_competitor_product_id,
NULL AS best_competitor_product_name,
NULL AS best_competitor_price,
NULL AS best_match_score,
NULL AS match_diagnostic_json,
NULL AS error_message,
NULL AS attempted_at
WHERE FALSE
)
"""
order_expr = (
"lm.momo_price DESC NULLS LAST, "
"(vc.pchome_price IS NULL), "
"ABS((lm.momo_price - vc.pchome_price) / vc.pchome_price * 100) DESC NULLS LAST"
)
params: dict[str, Any] = {"limit": limit}
if end_date_param:
params["end_date"] = end_date_param
momo_price_cutoff = "AND pr.timestamp < DATE(:end_date) + INTERVAL '1 day'"
if has_daily_sales:
where = []
if start_date_param:
where.append("DATE(s.date) >= DATE(:start_date)")
params["start_date"] = start_date_param
if end_date_param:
where.append("DATE(s.date) <= DATE(:end_date)")
sales_where = "WHERE " + " AND ".join(where) if where else ""
sales_cte = f""",
sales_rank AS (
SELECT
s.product_id,
SUM(COALESCE(s.revenue, 0)) AS momo_revenue
FROM daily_sales s
{sales_where}
GROUP BY s.product_id
)
"""
sales_join = "LEFT JOIN sales_rank sr ON sr.product_id = lm.product_id"
sales_select = "COALESCE(sr.momo_revenue, 0) AS momo_revenue,"
order_expr = (
"COALESCE(sr.momo_revenue, 0) DESC, "
"(vc.pchome_price IS NULL), "
"ABS((lm.momo_price - vc.pchome_price) / vc.pchome_price * 100) DESC NULLS LAST"
)
if has_match_attempts:
attempt_cte = """
latest_attempt AS (
SELECT DISTINCT ON (cma.sku)
cma.sku,
cma.attempt_status,
cma.candidate_count,
cma.best_competitor_product_id,
cma.best_competitor_product_name,
cma.best_competitor_price,
cma.best_match_score,
cma.match_diagnostic_json,
cma.error_message,
cma.attempted_at
FROM competitor_match_attempts cma
WHERE cma.source = 'pchome'
ORDER BY cma.sku, cma.attempted_at DESC NULLS LAST
)
"""
if use_history_prices:
history_filters = [
"cph.source = 'pchome'",
"cph.price IS NOT NULL",
"cph.price > 0",
f"COALESCE(cph.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}",
"COALESCE(cph.tags, '[]'::jsonb) ? 'identity_v2'",
]
if start_date_param:
params["start_date"] = start_date_param
history_filters.append("cph.crawled_at >= DATE(:start_date)")
if end_date_param:
history_filters.append("cph.crawled_at < DATE(:end_date) + INTERVAL '1 day'")
history_where = "\n AND ".join(history_filters)
valid_competitor_cte = f"""
valid_competitor AS (
SELECT DISTINCT ON (cph.sku)
cph.sku,
cph.price AS pchome_price,
cph.competitor_product_id,
cph.competitor_product_name,
cph.match_score,
cph.tags,
cph.match_diagnostic_json,
cph.comparison_mode,
cph.crawled_at,
'competitor_price_history' AS competitor_source
FROM competitor_price_history cph
WHERE {history_where}
ORDER BY cph.sku, cph.crawled_at DESC NULLS LAST
)
"""
else:
valid_competitor_cte = f"""
valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku,
cp.price AS pchome_price,
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score,
cp.tags,
cp.match_diagnostic_json,
cp.comparison_mode,
cp.crawled_at,
'competitor_prices' AS competitor_source
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
)
"""
sql = text(f"""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
{momo_price_cutoff}
),
{valid_competitor_cte},
{attempt_cte}
{sales_cte}
SELECT
lm.sku,
lm.name,
lm.momo_price,
vc.pchome_price,
vc.competitor_product_id,
vc.competitor_product_name,
vc.match_score,
vc.tags,
vc.match_diagnostic_json,
vc.comparison_mode,
vc.crawled_at AS competitor_crawled_at,
vc.competitor_source,
la.attempt_status,
la.candidate_count,
la.best_competitor_product_id,
la.best_competitor_product_name,
la.best_competitor_price,
la.best_match_score,
la.match_diagnostic_json AS attempt_match_diagnostic_json,
la.error_message,
la.attempted_at,
{sales_select}
(lm.momo_price - vc.pchome_price) AS price_diff,
((lm.momo_price - vc.pchome_price) / vc.pchome_price * 100) AS price_diff_pct
FROM latest_momo lm
LEFT JOIN valid_competitor vc ON vc.sku = lm.sku
LEFT JOIN latest_attempt la ON la.sku = lm.sku
{sales_join}
WHERE lm.rn = 1
AND lm.momo_price > 0
ORDER BY {order_expr}
LIMIT :limit
""")
with engine.connect() as conn:
rows = conn.execute(sql, params).mappings().all()
results = []
for row in rows:
pchome_id = row.get("competitor_product_id")
found = bool(row.get("pchome_price"))
match_status = "matched" if found else (row.get("attempt_status") or "no_valid_match")
diagnostic_payload = _parse_json_payload(
row.get("match_diagnostic_json") or row.get("attempt_match_diagnostic_json")
)
tags = _parse_tag_list(row.get("tags"))
match_type = diagnostic_payload.get("match_type") or _tag_suffix(tags, "match_type")
price_basis = diagnostic_payload.get("price_basis") or _tag_suffix(tags, "price_basis")
alert_tier = diagnostic_payload.get("alert_tier") or _tag_suffix(tags, "alert_tier")
unit_comparison = _build_unit_comparison_for_attempt({
"attempt_status": match_status,
"name": row.get("name") or "",
"best_competitor_product_name": row.get("best_competitor_product_name") or "",
"momo_price": row.get("momo_price"),
"best_competitor_price": row.get("best_competitor_price"),
})
results.append({
"found": found,
"momo_icode": str(row.get("sku") or ""),
"momo_name": row.get("name") or "",
"momo_price": _num(row.get("momo_price")),
"pc_name": row.get("competitor_product_name") or "",
"pc_price": _num(row.get("pchome_price")),
"pc_url": f"https://24h.pchome.com.tw/prod/{pchome_id}" if pchome_id else "",
"candidate_pc_id": row.get("best_competitor_product_id"),
"candidate_pc_name": row.get("best_competitor_product_name") or "",
"candidate_pc_price": _num(row.get("best_competitor_price")),
"price_diff": _num(row.get("price_diff")),
"price_diff_pct": _num(row.get("price_diff_pct")),
"match_score": _num(row.get("match_score")),
"momo_revenue": _num(row.get("momo_revenue")),
"competitor_source": row.get("competitor_source") or "",
"pc_crawled_at": _date_label(row.get("competitor_crawled_at")),
"match_status": match_status,
"match_status_label": _attempt_status_label(match_status),
"action_label": _attempt_action_label(match_status),
"match_type": match_type,
"match_type_label": MATCH_TYPE_LABELS.get(match_type, match_type or "待判讀"),
"price_basis": price_basis,
"price_basis_label": PRICE_BASIS_LABELS.get(price_basis, price_basis or "待判讀"),
"alert_tier": alert_tier,
"alert_tier_label": ALERT_TIER_LABELS.get(alert_tier, alert_tier or "待判讀"),
"candidate_count": int(row.get("candidate_count") or 0),
"best_match_score": _num(row.get("best_match_score")),
"match_diagnostic": row.get("error_message") or "",
"unit_comparison": unit_comparison,
})
return results
def build_competitor_intel_payload(engine, days: int = 30) -> dict:
"""頁面、AI、PPT 可共用的摘要 payload。"""
review_queue = fetch_competitor_review_queue(engine, limit=12)
return {
"coverage": fetch_competitor_coverage(engine),
"trend": fetch_competitor_gap_trend(engine, days=days),
"top_risks": fetch_top_competitor_risks(engine, limit=10),
"review_queue": review_queue,
"review_decision_brief": summarize_review_decision_envelopes(review_queue, limit=5),
"match_score_floor": PCHOME_MATCH_SCORE_FLOOR,
}