Files
ewoooc/services/competitor_intel_repository.py
OoO 756b01af66
All checks were successful
CD Pipeline / deploy (push) Successful in 1m7s
補齊 PChome 比價人工覆核閉環
2026-05-20 10:00:58 +08:00

1047 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""PChome / MOMO 競價情報共用資料出口。
短期 canonical source
- competitor_prices目前有效配對
- competitor_price_history價格歷史趨勢
- competitor_match_attempts未配對與低信心診斷
"""
from __future__ import annotations
import os
import pickle
import time
from datetime import date, datetime, timedelta
from pathlib import Path
from threading import Lock
from typing import Any, Optional, Union
from sqlalchemy import inspect, text
PCHOME_MATCH_SCORE_FLOOR = 0.76
UNIT_COMPARABLE_STATUSES = {"unit_comparable", "refresh_unit_comparable"}
ACTIONABLE_ATTEMPT_STATUSES = {
"unit_comparable",
"refresh_unit_comparable",
"identity_veto",
"low_score",
"expired_match",
"refresh_no_result",
"no_result",
}
REVIEW_STATUS_FILTER_GROUPS = {
"unit_comparable": ("unit_comparable", "refresh_unit_comparable"),
"identity_veto": ("identity_veto",),
"low_score": ("low_score",),
"expired_match": ("expired_match",),
"no_result": ("no_result", "refresh_no_result"),
}
ATTEMPT_STATUS_LABELS = {
"unit_comparable": "需單位價比較",
"refresh_unit_comparable": "需單位價比較",
"identity_veto": "身份否決",
"low_score": "低信心待審",
"expired_match": "價格過期待刷新",
"refresh_no_result": "刷新找不到商品",
"no_result": "找不到同款",
"never_attempted": "尚未搜尋",
"manual_accepted": "人工已採用",
"manual_rejected": "人工已否決",
"manual_unit_price_required": "人工標記單位價",
"manual_needs_research": "人工要求補搜尋",
}
ATTEMPT_ACTION_LABELS = {
"unit_comparable": "人工確認檔期、贈品與單位價",
"refresh_unit_comparable": "人工確認檔期、贈品與單位價",
"identity_veto": "確認是否為不同商品線或規格",
"low_score": "人工審核候選商品身份",
"expired_match": "重新刷新 PChome 價格",
"refresh_no_result": "調整搜尋詞後重抓",
"no_result": "補充搜尋詞或品牌關鍵字",
"never_attempted": "排入 PChome 補抓",
"manual_accepted": "已寫入正式 PChome 同款配對",
"manual_rejected": "已關閉此候選,等待下一輪新候選",
"manual_unit_price_required": "維持單位價比較,不寫入正式總價差",
"manual_needs_research": "補搜尋詞或重新抓取後再判斷",
}
MATCH_DIAGNOSTIC_REASON_LABELS = {
"brand_conflict": "品牌不符",
"product_line_conflict": "商品線不符",
"type_conflict": "品類不符",
"volume_conflict": "容量差異",
"weight_conflict": "重量差異",
"count_conflict": "件數差異",
"component_count_conflict": "入數差異",
"multi_component_conflict": "組合差異",
"refill_pack_conflict": "補充包差異",
"unit_comparable": "需單位價",
"price_ratio_extreme": "價差極端",
"price_ratio_wide": "價差過大",
}
COMPETITOR_INTEL_CACHE_TTL_SECONDS = int(os.getenv("COMPETITOR_INTEL_CACHE_TTL_SECONDS", "1800"))
_BASE_DIR = Path(__file__).resolve().parents[1]
_CACHE_FILE = _BASE_DIR / "data" / "competitor_intel_cache.pkl"
_CACHE_LOCK = Lock()
_MEM_CACHE: dict[str, dict[str, Any]] = {}
def _num(value: Any) -> float:
try:
return float(value or 0)
except (TypeError, ValueError):
return 0.0
def _date_label(value: Any) -> str:
if hasattr(value, "strftime"):
return value.strftime("%Y-%m-%d")
return str(value or "")
def _month_label(value: Any) -> str:
if hasattr(value, "strftime"):
return value.strftime("%Y-%m")
return str(value or "")[:7]
def _attempt_status_label(status: Any) -> str:
return ATTEMPT_STATUS_LABELS.get(str(status or ""), str(status or "待比對"))
def _attempt_action_label(status: Any) -> str:
return ATTEMPT_ACTION_LABELS.get(str(status or ""), "人工確認比對證據")
def _extract_match_diagnostic_reasons(diagnostic_text: Any) -> list[dict[str, str]]:
"""Translate matcher diagnostics into short operator-facing reason chips."""
text_value = str(diagnostic_text or "")
if not text_value:
return []
reason_blob = ""
for part in text_value.split(";"):
key, _, value = part.strip().partition("=")
if key.strip() == "reasons":
reason_blob = value.strip()
break
if not reason_blob:
return []
reasons: list[dict[str, str]] = []
seen: set[str] = set()
for raw_reason in reason_blob.replace("|", ",").split(","):
code = raw_reason.strip()
if not code or code in seen:
continue
seen.add(code)
reasons.append({
"code": code,
"label": MATCH_DIAGNOSTIC_REASON_LABELS.get(code, code.replace("_", " ")),
})
return reasons
def _build_unit_comparison_for_attempt(row: dict[str, Any]) -> Optional[dict[str, Any]]:
status = str(row.get("attempt_status") or "")
if status not in UNIT_COMPARABLE_STATUSES:
return None
try:
from services.marketplace_product_matcher import build_unit_price_comparison
return build_unit_price_comparison(
row.get("name") or row.get("momo_product_name") or "",
row.get("best_competitor_product_name") or "",
row.get("momo_price"),
row.get("best_competitor_price"),
)
except Exception:
return {"comparable": False, "reason": "build_error"}
def _format_competitor_review_item(row: dict[str, Any]) -> dict[str, Any]:
item = dict(row)
unit_comparison = _build_unit_comparison_for_attempt(item)
match_diagnostic = item.get("error_message") or ""
diagnostic_reasons = _extract_match_diagnostic_reasons(match_diagnostic)
return {
"sku": str(item.get("sku") or ""),
"name": item.get("name") or "",
"category": item.get("category") or "",
"momo_price": _num(item.get("momo_price")),
"attempt_status": item.get("attempt_status") or "",
"status_label": _attempt_status_label(item.get("attempt_status")),
"action_label": _attempt_action_label(item.get("attempt_status")),
"candidate_count": int(item.get("candidate_count") or 0),
"candidate_pc_id": item.get("best_competitor_product_id"),
"candidate_pc_name": item.get("best_competitor_product_name") or "",
"candidate_pc_price": _num(item.get("best_competitor_price")),
"best_match_score": _num(item.get("best_match_score")),
"match_diagnostic": match_diagnostic,
"diagnostic_reasons": diagnostic_reasons,
"diagnostic_reason_text": "".join(reason["label"] for reason in diagnostic_reasons),
"attempted_at": _date_label(item.get("attempted_at")),
"unit_comparison": unit_comparison,
}
def clear_competitor_intel_cache() -> None:
"""Clear cached PChome/MOMO intelligence after crawler/import updates."""
with _CACHE_LOCK:
_MEM_CACHE.clear()
try:
if _CACHE_FILE.exists():
_CACHE_FILE.unlink()
except OSError:
pass
def _load_shared_cache() -> dict[str, dict[str, Any]]:
if not _CACHE_FILE.exists():
return {}
try:
with _CACHE_FILE.open("rb") as handle:
payload = pickle.load(handle)
return payload if isinstance(payload, dict) else {}
except Exception:
return {}
def _write_shared_cache(payload: dict[str, dict[str, Any]]) -> None:
try:
_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp_file = _CACHE_FILE.with_suffix(f".{os.getpid()}.tmp")
with tmp_file.open("wb") as handle:
pickle.dump(payload, handle, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(tmp_file, _CACHE_FILE)
except Exception:
try:
if "tmp_file" in locals() and tmp_file.exists():
tmp_file.unlink()
except OSError:
pass
def _cached_payload(cache_key: str, producer, ttl_seconds: int = COMPETITOR_INTEL_CACHE_TTL_SECONDS):
if ttl_seconds <= 0:
return producer()
now = time.time()
with _CACHE_LOCK:
entry = _MEM_CACHE.get(cache_key)
if entry and now - float(entry.get("time", 0)) < ttl_seconds:
return entry.get("value")
shared = _load_shared_cache()
entry = shared.get(cache_key)
if entry and now - float(entry.get("time", 0)) < ttl_seconds:
_MEM_CACHE[cache_key] = entry
return entry.get("value")
value = producer()
entry = {"time": now, "value": value}
with _CACHE_LOCK:
_MEM_CACHE[cache_key] = entry
shared = _load_shared_cache()
shared[cache_key] = entry
stale_before = now - max(ttl_seconds * 4, 3600)
shared = {
key: item
for key, item in shared.items()
if isinstance(item, dict) and float(item.get("time", 0)) >= stale_before
}
_write_shared_cache(shared)
return value
def fetch_competitor_coverage(engine) -> dict:
return _cached_payload(
f"coverage:v2:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_competitor_coverage_uncached(engine),
)
def _fetch_competitor_coverage_uncached(engine) -> dict:
"""讀取目前 PChome 比價覆蓋率與待審分類。"""
inspector = inspect(engine)
if not inspector.has_table("competitor_prices"):
return {
"active_with_price": 0,
"valid_matches": 0,
"pending": 0,
"match_rate": 0,
"attempt_status": {},
"unit_comparable_count": 0,
"actionable_review_count": 0,
}
has_match_attempts = inspector.has_table("competitor_match_attempts")
attempt_cte = """
latest_attempt AS (
SELECT
NULL AS sku,
NULL AS attempt_status
WHERE FALSE
)
"""
if has_match_attempts:
attempt_cte = """
latest_attempt AS (
SELECT DISTINCT ON (sku)
sku,
attempt_status
FROM competitor_match_attempts
WHERE source = 'pchome'
ORDER BY sku, attempted_at DESC NULLS LAST
)
"""
sql = text(f"""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
),
valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
),
{attempt_cte}
SELECT
(SELECT COUNT(*) FROM latest_momo WHERE rn = 1) AS active_with_price,
(SELECT COUNT(*) FROM valid_competitor) AS valid_matches,
(SELECT COUNT(*)
FROM latest_momo lm
LEFT JOIN valid_competitor vc ON vc.sku = lm.sku
WHERE lm.rn = 1 AND vc.sku IS NULL) AS pending,
COALESCE(la.attempt_status, 'never_attempted') AS attempt_status,
COUNT(*) AS status_count
FROM latest_momo lm
LEFT JOIN valid_competitor vc ON vc.sku = lm.sku
LEFT JOIN latest_attempt la ON la.sku = lm.sku
WHERE lm.rn = 1
AND vc.sku IS NULL
GROUP BY COALESCE(la.attempt_status, 'never_attempted')
""")
with engine.connect() as conn:
rows = conn.execute(sql).mappings().all()
active = int(rows[0].get("active_with_price") or 0) if rows else 0
valid = int(rows[0].get("valid_matches") or 0) if rows else 0
pending = int(rows[0].get("pending") or 0) if rows else 0
statuses = {
str(row.get("attempt_status")): int(row.get("status_count") or 0)
for row in rows
}
unit_count = sum(statuses.get(status, 0) for status in UNIT_COMPARABLE_STATUSES)
actionable_count = sum(statuses.get(status, 0) for status in ACTIONABLE_ATTEMPT_STATUSES)
return {
"active_with_price": active,
"valid_matches": valid,
"pending": pending,
"match_rate": round(valid / max(active, 1) * 100, 1),
"attempt_status": statuses,
"unit_comparable_count": unit_count,
"actionable_review_count": actionable_count,
"match_score_floor": PCHOME_MATCH_SCORE_FLOOR,
}
def fetch_competitor_gap_trend(engine, days: int = 30) -> dict:
days = max(7, min(int(days or 30), 120))
return _cached_payload(
f"gap_trend:v2:days={days}:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_competitor_gap_trend_uncached(engine, days=days),
)
def _fetch_competitor_gap_trend_uncached(engine, days: int = 30) -> dict:
"""近 N 天 PChome 價差壓力趨勢。"""
if not inspect(engine).has_table("competitor_price_history"):
return {"labels": [], "avg_gap_pct": [], "risk_count": [], "momo_advantage_count": [], "match_count": []}
days = max(7, min(int(days or 30), 120))
sql = text(f"""
WITH latest_history AS (
SELECT
date_trunc('day', cph.crawled_at)::date AS bucket_date,
cph.sku,
cph.momo_price,
cph.price AS pchome_price,
ROW_NUMBER() OVER (
PARTITION BY date_trunc('day', cph.crawled_at)::date, cph.sku
ORDER BY cph.crawled_at DESC
) AS rn
FROM competitor_price_history cph
WHERE cph.source = 'pchome'
AND cph.crawled_at >= CURRENT_DATE - (:days * INTERVAL '1 day')
AND cph.momo_price IS NOT NULL
AND cph.momo_price > 0
AND cph.price IS NOT NULL
AND cph.price > 0
AND COALESCE(cph.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cph.tags, '[]'::jsonb) ? 'identity_v2'
)
SELECT
bucket_date,
COUNT(*) AS match_count,
ROUND(AVG((momo_price - pchome_price) / pchome_price * 100)::numeric, 2) AS avg_gap_pct,
SUM(CASE WHEN momo_price > pchome_price * 1.05 THEN 1 ELSE 0 END) AS risk_count,
SUM(CASE WHEN momo_price < pchome_price * 0.95 THEN 1 ELSE 0 END) AS momo_advantage_count
FROM latest_history
WHERE rn = 1
GROUP BY bucket_date
ORDER BY bucket_date
""")
with engine.connect() as conn:
rows = conn.execute(sql, {"days": days}).mappings().all()
return {
"labels": [_date_label(row.get("bucket_date")) for row in rows],
"avg_gap_pct": [_num(row.get("avg_gap_pct")) for row in rows],
"risk_count": [int(row.get("risk_count") or 0) for row in rows],
"momo_advantage_count": [int(row.get("momo_advantage_count") or 0) for row in rows],
"match_count": [int(row.get("match_count") or 0) for row in rows],
}
def fetch_competitor_monthly_pressure(engine, months: int = 12) -> dict:
months = max(3, min(int(months or 12), 36))
return _cached_payload(
f"monthly_pressure:v2:months={months}:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_competitor_monthly_pressure_uncached(engine, months=months),
)
def _fetch_competitor_monthly_pressure_uncached(engine, months: int = 12) -> dict:
"""月度競品價格壓力,用於 growth analysis。"""
if not inspect(engine).has_table("competitor_price_history"):
return {"labels": [], "avg_gap_pct": [], "risk_count": [], "match_count": []}
months = max(3, min(int(months or 12), 36))
sql = text(f"""
WITH latest_history AS (
SELECT
date_trunc('month', cph.crawled_at)::date AS bucket_month,
cph.sku,
cph.momo_price,
cph.price AS pchome_price,
ROW_NUMBER() OVER (
PARTITION BY date_trunc('month', cph.crawled_at)::date, cph.sku
ORDER BY cph.crawled_at DESC
) AS rn
FROM competitor_price_history cph
WHERE cph.source = 'pchome'
AND cph.crawled_at >= date_trunc('month', CURRENT_DATE) - (:months * INTERVAL '1 month')
AND cph.momo_price IS NOT NULL
AND cph.momo_price > 0
AND cph.price IS NOT NULL
AND cph.price > 0
AND COALESCE(cph.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cph.tags, '[]'::jsonb) ? 'identity_v2'
)
SELECT
bucket_month,
COUNT(*) AS match_count,
ROUND(AVG((momo_price - pchome_price) / pchome_price * 100)::numeric, 2) AS avg_gap_pct,
SUM(CASE WHEN momo_price > pchome_price * 1.05 THEN 1 ELSE 0 END) AS risk_count
FROM latest_history
WHERE rn = 1
GROUP BY bucket_month
ORDER BY bucket_month
""")
with engine.connect() as conn:
rows = conn.execute(sql, {"months": months}).mappings().all()
return {
"labels": [_month_label(row.get("bucket_month")) for row in rows],
"avg_gap_pct": [_num(row.get("avg_gap_pct")) for row in rows],
"risk_count": [int(row.get("risk_count") or 0) for row in rows],
"match_count": [int(row.get("match_count") or 0) for row in rows],
}
def fetch_top_competitor_risks(engine, limit: int = 10) -> list[dict]:
limit = max(1, min(int(limit or 10), 50))
return _cached_payload(
f"top_risks:v2:limit={limit}:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_top_competitor_risks_uncached(engine, limit=limit),
)
def _fetch_top_competitor_risks_uncached(engine, limit: int = 10) -> list[dict]:
"""目前 MOMO 比 PChome 貴的高風險商品。"""
if not inspect(engine).has_table("competitor_prices"):
return []
limit = max(1, min(int(limit or 10), 50))
sql = text(f"""
WITH valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku,
cp.price AS pchome_price,
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score,
cp.crawled_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
)
SELECT
p.i_code AS sku,
p.name,
p.category,
latest_price.momo_price,
vc.pchome_price,
vc.competitor_product_id,
vc.competitor_product_name,
vc.match_score,
vc.crawled_at,
(latest_price.momo_price - vc.pchome_price) AS gap_amount,
((latest_price.momo_price - vc.pchome_price) / vc.pchome_price * 100) AS gap_pct
FROM valid_competitor vc
JOIN products p
ON p.i_code = vc.sku
AND p.status = 'ACTIVE'
JOIN LATERAL (
SELECT pr.price AS momo_price
FROM price_records pr
WHERE pr.product_id = p.id
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) latest_price ON TRUE
WHERE latest_price.momo_price > vc.pchome_price * 1.05
ORDER BY gap_pct DESC NULLS LAST, gap_amount DESC NULLS LAST
LIMIT :limit
""")
with engine.connect() as conn:
rows = conn.execute(sql, {"limit": limit}).mappings().all()
result = []
for row in rows:
result.append({
"sku": str(row.get("sku") or ""),
"name": row.get("name") or "",
"category": row.get("category") or "",
"momo_price": _num(row.get("momo_price")),
"pchome_price": _num(row.get("pchome_price")),
"gap_amount": _num(row.get("gap_amount")),
"gap_pct": _num(row.get("gap_pct")),
"match_score": _num(row.get("match_score")),
"pchome_id": row.get("competitor_product_id"),
"pchome_name": row.get("competitor_product_name") or "",
"crawled_at": _date_label(row.get("crawled_at")),
})
return result
def fetch_competitor_review_queue(engine, limit: int = 12) -> list[dict]:
"""可行動的 PChome 比對覆核隊列,供 Dashboard / AI / PPT 共用。"""
limit = max(1, min(int(limit or 12), 50))
return _cached_payload(
f"review_queue:v1:limit={limit}:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_competitor_review_queue_uncached(engine, limit=limit),
)
def fetch_competitor_review_queue_page(
engine,
page: int = 1,
per_page: int = 50,
search_query: str = "",
category: str = "",
status_filter: str = "",
) -> dict:
"""Paginated PChome review queue for operator-facing Dashboard pages."""
page = max(1, int(page or 1))
per_page = max(1, min(int(per_page or 50), 100))
search_query = (search_query or "").strip()
category = (category or "").strip()
status_filter = (status_filter or "").strip()
if status_filter not in REVIEW_STATUS_FILTER_GROUPS:
status_filter = ""
cache_key = (
"review_queue_page:v1:"
f"page={page}:per={per_page}:q={search_query.lower()}:cat={category}:"
f"status={status_filter}:"
f"floor={PCHOME_MATCH_SCORE_FLOOR}"
)
return _cached_payload(
cache_key,
lambda: _fetch_competitor_review_queue_page_uncached(
engine,
page=page,
per_page=per_page,
search_query=search_query,
category=category,
status_filter=status_filter,
),
ttl_seconds=min(COMPETITOR_INTEL_CACHE_TTL_SECONDS, 300),
)
def _review_queue_cte_and_filter(
search_query: str = "",
category: str = "",
status_filter: str = "",
) -> tuple[str, dict[str, Any]]:
params: dict[str, Any] = {}
status_filter = (status_filter or "").strip()
status_values = REVIEW_STATUS_FILTER_GROUPS.get(status_filter) or tuple(ACTIONABLE_ATTEMPT_STATUSES)
status_sql = ", ".join(f"'{status}'" for status in status_values)
filters = [
"lm.rn = 1",
"vc.sku IS NULL",
f"la.attempt_status IN ({status_sql})",
]
if search_query:
params["search_like"] = f"%{search_query.lower()}%"
filters.append("(LOWER(lm.name) LIKE :search_like OR LOWER(lm.sku) LIKE :search_like)")
if category:
params["category"] = category
filters.append("lm.category = :category")
where_sql = "\n AND ".join(filters)
cte = f"""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
),
valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
),
latest_attempt AS (
SELECT DISTINCT ON (cma.sku)
cma.sku,
cma.attempt_status,
cma.candidate_count,
cma.best_competitor_product_id,
cma.best_competitor_product_name,
cma.best_competitor_price,
cma.best_match_score,
cma.error_message,
cma.attempted_at
FROM competitor_match_attempts cma
WHERE cma.source = 'pchome'
ORDER BY cma.sku, cma.attempted_at DESC NULLS LAST
),
review_rows AS (
SELECT
lm.sku,
lm.name,
lm.category,
lm.momo_price,
la.attempt_status,
la.candidate_count,
la.best_competitor_product_id,
la.best_competitor_product_name,
la.best_competitor_price,
la.best_match_score,
la.error_message,
la.attempted_at,
CASE
WHEN la.attempt_status IN ('unit_comparable', 'refresh_unit_comparable') THEN 0
WHEN la.attempt_status = 'identity_veto' THEN 1
WHEN la.attempt_status = 'low_score' THEN 2
WHEN la.attempt_status = 'expired_match' THEN 3
ELSE 4
END AS priority_rank
FROM latest_momo lm
JOIN latest_attempt la ON la.sku = lm.sku
LEFT JOIN valid_competitor vc ON vc.sku = lm.sku
WHERE {where_sql}
)
"""
return cte, params
def _fetch_competitor_review_queue_page_uncached(
engine,
page: int = 1,
per_page: int = 50,
search_query: str = "",
category: str = "",
status_filter: str = "",
) -> dict:
inspector = inspect(engine)
if not (
inspector.has_table("products")
and inspector.has_table("price_records")
and inspector.has_table("competitor_prices")
and inspector.has_table("competitor_match_attempts")
):
return {
"items": [],
"total": 0,
"page": max(1, int(page or 1)),
"per_page": per_page,
"status_filter": status_filter,
}
page = max(1, int(page or 1))
per_page = max(1, min(int(per_page or 50), 100))
cte, params = _review_queue_cte_and_filter(
search_query=search_query,
category=category,
status_filter=status_filter,
)
page_params = {
**params,
"limit": per_page,
"offset": (page - 1) * per_page,
}
count_sql = text(cte + " SELECT COUNT(*) AS total FROM review_rows")
page_sql = text(cte + """
SELECT *
FROM review_rows
ORDER BY
priority_rank ASC,
momo_price DESC NULLS LAST,
best_match_score DESC NULLS LAST,
attempted_at DESC NULLS LAST
LIMIT :limit OFFSET :offset
""")
with engine.connect() as conn:
total = int(conn.execute(count_sql, params).scalar() or 0)
rows = conn.execute(page_sql, page_params).mappings().all()
return {
"items": [_format_competitor_review_item(dict(row)) for row in rows],
"total": total,
"page": page,
"per_page": per_page,
"status_filter": status_filter,
}
def _fetch_competitor_review_queue_uncached(engine, limit: int = 12) -> list[dict]:
inspector = inspect(engine)
if not (
inspector.has_table("products")
and inspector.has_table("price_records")
and inspector.has_table("competitor_prices")
and inspector.has_table("competitor_match_attempts")
):
return []
limit = max(1, min(int(limit or 12), 50))
sql = text(f"""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
),
valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
),
latest_attempt AS (
SELECT DISTINCT ON (cma.sku)
cma.sku,
cma.attempt_status,
cma.candidate_count,
cma.best_competitor_product_id,
cma.best_competitor_product_name,
cma.best_competitor_price,
cma.best_match_score,
cma.error_message,
cma.attempted_at
FROM competitor_match_attempts cma
WHERE cma.source = 'pchome'
ORDER BY cma.sku, cma.attempted_at DESC NULLS LAST
)
SELECT
lm.sku,
lm.name,
lm.category,
lm.momo_price,
la.attempt_status,
la.candidate_count,
la.best_competitor_product_id,
la.best_competitor_product_name,
la.best_competitor_price,
la.best_match_score,
la.error_message,
la.attempted_at
FROM latest_momo lm
JOIN latest_attempt la ON la.sku = lm.sku
LEFT JOIN valid_competitor vc ON vc.sku = lm.sku
WHERE lm.rn = 1
AND vc.sku IS NULL
AND la.attempt_status IN (
'unit_comparable',
'refresh_unit_comparable',
'identity_veto',
'low_score',
'expired_match',
'refresh_no_result',
'no_result'
)
ORDER BY
CASE
WHEN la.attempt_status IN ('unit_comparable', 'refresh_unit_comparable') THEN 0
WHEN la.attempt_status = 'identity_veto' THEN 1
WHEN la.attempt_status = 'low_score' THEN 2
WHEN la.attempt_status = 'expired_match' THEN 3
ELSE 4
END,
lm.momo_price DESC NULLS LAST,
la.best_match_score DESC NULLS LAST,
la.attempted_at DESC NULLS LAST
LIMIT :limit
""")
with engine.connect() as conn:
rows = conn.execute(sql, {"limit": limit}).mappings().all()
return [_format_competitor_review_item(dict(row)) for row in rows]
def fetch_competitor_comparison_results(
engine,
start_date: Optional[Union[date, datetime, str]] = None,
end_date: Optional[Union[date, datetime, str]] = None,
limit: int = 30,
) -> list[dict]:
"""輸出與 legacy competitor PPT 相容的比價結果,不再 live crawl。"""
limit = max(1, min(int(limit or 30), 100))
inspector = inspect(engine)
if not (
inspector.has_table("products")
and inspector.has_table("price_records")
and inspector.has_table("competitor_prices")
):
return []
has_daily_sales = inspector.has_table("daily_sales")
has_match_attempts = inspector.has_table("competitor_match_attempts")
sales_cte = ""
sales_join = ""
sales_select = "0 AS momo_revenue,"
attempt_cte = """
latest_attempt AS (
SELECT
NULL AS sku,
NULL AS attempt_status,
NULL AS candidate_count,
NULL AS best_competitor_product_id,
NULL AS best_competitor_product_name,
NULL AS best_competitor_price,
NULL AS best_match_score,
NULL AS error_message,
NULL AS attempted_at
WHERE FALSE
)
"""
order_expr = (
"lm.momo_price DESC NULLS LAST, "
"(vc.pchome_price IS NULL), "
"ABS((lm.momo_price - vc.pchome_price) / vc.pchome_price * 100) DESC NULLS LAST"
)
params: dict[str, Any] = {"limit": limit}
if has_daily_sales:
where = []
if start_date:
where.append("DATE(s.date) >= DATE(:start_date)")
params["start_date"] = str(start_date).replace("/", "-")[:10]
if end_date:
where.append("DATE(s.date) <= DATE(:end_date)")
params["end_date"] = str(end_date).replace("/", "-")[:10]
sales_where = "WHERE " + " AND ".join(where) if where else ""
sales_cte = f""",
sales_rank AS (
SELECT
s.product_id,
SUM(COALESCE(s.revenue, 0)) AS momo_revenue
FROM daily_sales s
{sales_where}
GROUP BY s.product_id
)
"""
sales_join = "LEFT JOIN sales_rank sr ON sr.product_id = lm.product_id"
sales_select = "COALESCE(sr.momo_revenue, 0) AS momo_revenue,"
order_expr = (
"COALESCE(sr.momo_revenue, 0) DESC, "
"(vc.pchome_price IS NULL), "
"ABS((lm.momo_price - vc.pchome_price) / vc.pchome_price * 100) DESC NULLS LAST"
)
if has_match_attempts:
attempt_cte = """
latest_attempt AS (
SELECT DISTINCT ON (cma.sku)
cma.sku,
cma.attempt_status,
cma.candidate_count,
cma.best_competitor_product_id,
cma.best_competitor_product_name,
cma.best_competitor_price,
cma.best_match_score,
cma.error_message,
cma.attempted_at
FROM competitor_match_attempts cma
WHERE cma.source = 'pchome'
ORDER BY cma.sku, cma.attempted_at DESC NULLS LAST
)
"""
sql = text(f"""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
),
valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku,
cp.price AS pchome_price,
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
),
{attempt_cte}
{sales_cte}
SELECT
lm.sku,
lm.name,
lm.momo_price,
vc.pchome_price,
vc.competitor_product_id,
vc.competitor_product_name,
vc.match_score,
la.attempt_status,
la.candidate_count,
la.best_competitor_product_id,
la.best_competitor_product_name,
la.best_competitor_price,
la.best_match_score,
la.error_message,
la.attempted_at,
{sales_select}
(vc.pchome_price - lm.momo_price) AS price_diff,
((vc.pchome_price - lm.momo_price) / lm.momo_price * 100) AS price_diff_pct
FROM latest_momo lm
LEFT JOIN valid_competitor vc ON vc.sku = lm.sku
LEFT JOIN latest_attempt la ON la.sku = lm.sku
{sales_join}
WHERE lm.rn = 1
AND lm.momo_price > 0
ORDER BY {order_expr}
LIMIT :limit
""")
with engine.connect() as conn:
rows = conn.execute(sql, params).mappings().all()
results = []
for row in rows:
pchome_id = row.get("competitor_product_id")
found = bool(row.get("pchome_price"))
match_status = "matched" if found else (row.get("attempt_status") or "no_valid_match")
unit_comparison = _build_unit_comparison_for_attempt({
"attempt_status": match_status,
"name": row.get("name") or "",
"best_competitor_product_name": row.get("best_competitor_product_name") or "",
"momo_price": row.get("momo_price"),
"best_competitor_price": row.get("best_competitor_price"),
})
results.append({
"found": found,
"momo_icode": str(row.get("sku") or ""),
"momo_name": row.get("name") or "",
"momo_price": _num(row.get("momo_price")),
"pc_name": row.get("competitor_product_name") or "",
"pc_price": _num(row.get("pchome_price")),
"pc_url": f"https://24h.pchome.com.tw/prod/{pchome_id}" if pchome_id else "",
"candidate_pc_id": row.get("best_competitor_product_id"),
"candidate_pc_name": row.get("best_competitor_product_name") or "",
"candidate_pc_price": _num(row.get("best_competitor_price")),
"price_diff": _num(row.get("price_diff")),
"price_diff_pct": _num(row.get("price_diff_pct")),
"match_score": _num(row.get("match_score")),
"momo_revenue": _num(row.get("momo_revenue")),
"match_status": match_status,
"match_status_label": _attempt_status_label(match_status),
"action_label": _attempt_action_label(match_status),
"candidate_count": int(row.get("candidate_count") or 0),
"best_match_score": _num(row.get("best_match_score")),
"match_diagnostic": row.get("error_message") or "",
"unit_comparison": unit_comparison,
})
return results
def build_competitor_intel_payload(engine, days: int = 30) -> dict:
"""頁面、AI、PPT 可共用的摘要 payload。"""
return {
"coverage": fetch_competitor_coverage(engine),
"trend": fetch_competitor_gap_trend(engine, days=days),
"top_risks": fetch_top_competitor_risks(engine, limit=10),
"review_queue": fetch_competitor_review_queue(engine, limit=12),
"match_score_floor": PCHOME_MATCH_SCORE_FLOOR,
}