931 lines
41 KiB
Python
931 lines
41 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""PChome 業績成長自動化作戰系統的只讀作戰清單。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from sqlalchemy import bindparam, inspect, text
|
|
|
|
from services.external_market_offer_service import build_external_source_readiness
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SYSTEM_DISPLAY_NAME = "PChome 業績成長自動化作戰系統"
|
|
PRIMARY_SALES_SOURCE = "PChome 後台業績"
|
|
ACTIVE_EXTERNAL_SOURCES = ("MOMO 外部價格參考",)
|
|
PAUSED_EXTERNAL_SOURCES = ("蝦皮", "酷澎")
|
|
|
|
|
|
def _to_float(value: Any, default: float = 0.0) -> float:
|
|
if value is None:
|
|
return default
|
|
try:
|
|
return float(value)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _quote_identifier(identifier: str) -> str:
|
|
return '"' + identifier.replace('"', '""') + '"'
|
|
|
|
|
|
def _first_available(columns: set[str], candidates: list[str]) -> str | None:
|
|
return next((col for col in candidates if col in columns), None)
|
|
|
|
|
|
def _load_json_tags(value: Any) -> list[str]:
|
|
if not value:
|
|
return []
|
|
if isinstance(value, list):
|
|
return value
|
|
try:
|
|
parsed = json.loads(value)
|
|
return parsed if isinstance(parsed, list) else []
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def _table_exists(engine, table_name: str) -> bool:
|
|
try:
|
|
return inspect(engine).has_table(table_name)
|
|
except Exception:
|
|
logger.warning("[PChomeGrowth] table probe failed: %s", table_name, exc_info=True)
|
|
return False
|
|
|
|
|
|
def _daily_sales_columns(conn) -> dict[str, str | None]:
|
|
if conn.dialect.name == "postgresql":
|
|
rows = conn.execute(text("""
|
|
SELECT column_name
|
|
FROM information_schema.columns
|
|
WHERE table_name = 'daily_sales_snapshot'
|
|
""")).fetchall()
|
|
columns = {row[0] for row in rows}
|
|
elif conn.dialect.name == "sqlite":
|
|
rows = conn.execute(text("PRAGMA table_info(daily_sales_snapshot)")).fetchall()
|
|
columns = {row[1] for row in rows}
|
|
else:
|
|
result = conn.execute(text("SELECT * FROM daily_sales_snapshot LIMIT 0"))
|
|
columns = set(result.keys())
|
|
|
|
return {
|
|
"sku": _first_available(columns, ["商品ID", "Product ID", "ID", "Item Code"]),
|
|
"name": _first_available(columns, ["商品名稱", "商品名", "品名", "Product Name", "Name"]),
|
|
"date": _first_available(columns, ["snapshot_date", "日期", "訂單日期", "交易日期", "Date"]),
|
|
"revenue": _first_available(columns, ["總業績", "銷售金額", "業績", "金額", "Amount", "Sales", "Total"]),
|
|
"qty": _first_available(columns, ["數量", "銷售數量", "銷量", "Qty", "Quantity"]),
|
|
"price": _first_available(columns, ["商品單位售價", "單價", "售價", "Price", "Unit Price"]),
|
|
"category": _first_available(columns, ["商品館", "館別", "分類", "Category"]),
|
|
"vendor": _first_available(columns, ["廠商名稱", "供應商", "Vendor"]),
|
|
}
|
|
|
|
|
|
def _as_text_expr(identifier_or_expr: str, dialect_name: str, *, raw: bool = False) -> str:
|
|
expr = identifier_or_expr if raw else _quote_identifier(identifier_or_expr)
|
|
if dialect_name == "postgresql":
|
|
return f"{expr}::text"
|
|
return f"CAST({expr} AS TEXT)"
|
|
|
|
|
|
def _numeric_expr(identifier: str, dialect_name: str) -> str:
|
|
quoted = _quote_identifier(identifier)
|
|
if dialect_name == "postgresql":
|
|
return f"COALESCE(NULLIF(regexp_replace({quoted}::text, '[^0-9.-]', '', 'g'), '')::numeric, 0)"
|
|
return f"COALESCE(CAST(REPLACE(CAST({quoted} AS TEXT), ',', '') AS REAL), 0)"
|
|
|
|
|
|
def _fetch_sales_rows(conn, limit: int) -> tuple[list[dict[str, Any]], str | None]:
|
|
cols = _daily_sales_columns(conn)
|
|
missing = [key for key in ["sku", "name", "date", "revenue"] if not cols.get(key)]
|
|
if missing:
|
|
raise RuntimeError("PChome 業績檔缺少必要欄位:" + "、".join(missing))
|
|
|
|
dialect = conn.dialect.name
|
|
sku_col = _quote_identifier(cols["sku"])
|
|
name_col = _quote_identifier(cols["name"])
|
|
date_col = _quote_identifier(cols["date"])
|
|
revenue_expr = _numeric_expr(cols["revenue"], dialect)
|
|
qty_expr = _numeric_expr(cols["qty"], dialect) if cols.get("qty") else "0"
|
|
price_expr = _numeric_expr(cols["price"], dialect) if cols.get("price") else "0"
|
|
category_text = _as_text_expr(cols["category"], dialect) if cols.get("category") else "NULL"
|
|
vendor_text = _as_text_expr(cols["vendor"], dialect) if cols.get("vendor") else "NULL"
|
|
sku_text = _as_text_expr(cols["sku"], dialect)
|
|
name_text = _as_text_expr(cols["name"], dialect)
|
|
candidate_limit = max(limit * 4, 80)
|
|
|
|
if dialect == "postgresql":
|
|
sale_date_expr = f"NULLIF({date_col}::text, '')::date"
|
|
curr_window = "lw.latest_date - INTERVAL '6 days'"
|
|
prev_window_start = "lw.latest_date - INTERVAL '13 days'"
|
|
prev_window_end = "lw.latest_date - INTERVAL '6 days'"
|
|
else:
|
|
sale_date_expr = f"date({date_col})"
|
|
curr_window = "date(lw.latest_date, '-6 days')"
|
|
prev_window_start = "date(lw.latest_date, '-13 days')"
|
|
prev_window_end = "date(lw.latest_date, '-6 days')"
|
|
|
|
order_metric = "CASE WHEN sales_7d >= sales_prev_7d THEN sales_7d ELSE sales_prev_7d END"
|
|
rows = conn.execute(text(f"""
|
|
WITH sales_rows AS (
|
|
SELECT
|
|
NULLIF(TRIM({sku_text}), '') AS pchome_product_id,
|
|
NULLIF(TRIM({name_text}), '') AS product_name,
|
|
NULLIF(TRIM({_as_text_expr(category_text, dialect, raw=True)}), '') AS category,
|
|
NULLIF(TRIM({_as_text_expr(vendor_text, dialect, raw=True)}), '') AS vendor,
|
|
{sale_date_expr} AS sale_date,
|
|
{revenue_expr} AS revenue,
|
|
{qty_expr} AS qty,
|
|
{price_expr} AS unit_price
|
|
FROM daily_sales_snapshot
|
|
WHERE {sku_col} IS NOT NULL
|
|
),
|
|
latest_window AS (
|
|
SELECT MAX(sale_date) AS latest_date
|
|
FROM sales_rows
|
|
WHERE sale_date IS NOT NULL
|
|
),
|
|
sales AS (
|
|
SELECT
|
|
sr.pchome_product_id,
|
|
MAX(sr.product_name) AS product_name,
|
|
MAX(sr.category) AS category,
|
|
MAX(sr.vendor) AS vendor,
|
|
SUM(CASE WHEN sr.sale_date >= {curr_window}
|
|
THEN sr.revenue ELSE 0 END) AS sales_7d,
|
|
SUM(CASE WHEN sr.sale_date >= {prev_window_start}
|
|
AND sr.sale_date < {prev_window_end}
|
|
THEN sr.revenue ELSE 0 END) AS sales_prev_7d,
|
|
SUM(CASE WHEN sr.sale_date >= {curr_window}
|
|
THEN sr.qty ELSE 0 END) AS qty_7d,
|
|
CASE
|
|
WHEN SUM(CASE WHEN sr.sale_date >= {curr_window} THEN sr.qty ELSE 0 END) > 0
|
|
THEN SUM(CASE WHEN sr.sale_date >= {curr_window} THEN sr.revenue ELSE 0 END)
|
|
/ NULLIF(SUM(CASE WHEN sr.sale_date >= {curr_window} THEN sr.qty ELSE 0 END), 0)
|
|
ELSE NULLIF(MAX(CASE WHEN sr.sale_date >= {curr_window} THEN sr.unit_price ELSE 0 END), 0)
|
|
END AS pchome_price,
|
|
MAX(sr.sale_date) AS last_sale_date,
|
|
MAX(lw.latest_date) AS latest_sales_date
|
|
FROM sales_rows sr
|
|
CROSS JOIN latest_window lw
|
|
WHERE sr.pchome_product_id IS NOT NULL
|
|
GROUP BY sr.pchome_product_id
|
|
)
|
|
SELECT *
|
|
FROM sales
|
|
WHERE sales_7d > 0 OR sales_prev_7d > 0
|
|
ORDER BY {order_metric} DESC, qty_7d DESC
|
|
LIMIT :limit
|
|
"""), {"limit": candidate_limit}).mappings().all()
|
|
|
|
mapped_rows = [dict(row) for row in rows]
|
|
latest_date = None
|
|
for row in mapped_rows:
|
|
value = row.get("latest_sales_date")
|
|
if value:
|
|
latest_date = value.isoformat() if hasattr(value, "isoformat") else str(value)
|
|
break
|
|
return mapped_rows, latest_date
|
|
|
|
|
|
def _fetch_sales_summary(conn) -> dict[str, Any]:
|
|
cols = _daily_sales_columns(conn)
|
|
missing = [key for key in ["sku", "date", "revenue"] if not cols.get(key)]
|
|
if missing:
|
|
return {}
|
|
|
|
dialect = conn.dialect.name
|
|
sku_col = _quote_identifier(cols["sku"])
|
|
date_col = _quote_identifier(cols["date"])
|
|
revenue_expr = _numeric_expr(cols["revenue"], dialect)
|
|
qty_expr = _numeric_expr(cols["qty"], dialect) if cols.get("qty") else "0"
|
|
category_text = _as_text_expr(cols["category"], dialect) if cols.get("category") else "NULL"
|
|
sku_text = _as_text_expr(cols["sku"], dialect)
|
|
|
|
if dialect == "postgresql":
|
|
sale_date_expr = f"NULLIF({date_col}::text, '')::date"
|
|
curr_window = "lw.latest_date - INTERVAL '6 days'"
|
|
prev_window_start = "lw.latest_date - INTERVAL '13 days'"
|
|
prev_window_end = "lw.latest_date - INTERVAL '6 days'"
|
|
else:
|
|
sale_date_expr = f"date({date_col})"
|
|
curr_window = "date(lw.latest_date, '-6 days')"
|
|
prev_window_start = "date(lw.latest_date, '-13 days')"
|
|
prev_window_end = "date(lw.latest_date, '-6 days')"
|
|
|
|
row = conn.execute(text(f"""
|
|
WITH sales_rows AS (
|
|
SELECT
|
|
NULLIF(TRIM({sku_text}), '') AS pchome_product_id,
|
|
NULLIF(TRIM({_as_text_expr(category_text, dialect, raw=True)}), '') AS category,
|
|
{sale_date_expr} AS sale_date,
|
|
{revenue_expr} AS revenue,
|
|
{qty_expr} AS qty
|
|
FROM daily_sales_snapshot
|
|
WHERE {sku_col} IS NOT NULL
|
|
),
|
|
latest_window AS (
|
|
SELECT MAX(sale_date) AS latest_date
|
|
FROM sales_rows
|
|
WHERE sale_date IS NOT NULL
|
|
),
|
|
per_product AS (
|
|
SELECT
|
|
sr.pchome_product_id,
|
|
MAX(sr.category) AS category,
|
|
SUM(CASE WHEN sr.sale_date >= {curr_window}
|
|
THEN sr.revenue ELSE 0 END) AS sales_7d,
|
|
SUM(CASE WHEN sr.sale_date >= {prev_window_start}
|
|
AND sr.sale_date < {prev_window_end}
|
|
THEN sr.revenue ELSE 0 END) AS sales_prev_7d,
|
|
SUM(CASE WHEN sr.sale_date >= {curr_window}
|
|
THEN sr.qty ELSE 0 END) AS qty_7d,
|
|
MAX(lw.latest_date) AS latest_sales_date
|
|
FROM sales_rows sr
|
|
CROSS JOIN latest_window lw
|
|
WHERE sr.pchome_product_id IS NOT NULL
|
|
GROUP BY sr.pchome_product_id
|
|
),
|
|
category_sales AS (
|
|
SELECT
|
|
COALESCE(NULLIF(category, ''), '未分類') AS category,
|
|
SUM(sales_7d) AS sales_7d
|
|
FROM per_product
|
|
GROUP BY COALESCE(NULLIF(category, ''), '未分類')
|
|
)
|
|
SELECT
|
|
MAX(latest_sales_date) AS latest_sales_date,
|
|
COALESCE(SUM(sales_7d), 0) AS overall_sales_7d,
|
|
COALESCE(SUM(sales_prev_7d), 0) AS overall_sales_prev_7d,
|
|
COALESCE(SUM(qty_7d), 0) AS overall_qty_7d,
|
|
SUM(CASE WHEN sales_7d > 0 THEN 1 ELSE 0 END) AS active_product_count,
|
|
SUM(CASE WHEN sales_prev_7d > 0 AND sales_7d < sales_prev_7d THEN 1 ELSE 0 END) AS declining_product_count,
|
|
(
|
|
SELECT category
|
|
FROM category_sales
|
|
WHERE sales_7d > 0
|
|
ORDER BY sales_7d DESC
|
|
LIMIT 1
|
|
) AS top_category,
|
|
(
|
|
SELECT COALESCE(sales_7d, 0)
|
|
FROM category_sales
|
|
WHERE sales_7d > 0
|
|
ORDER BY sales_7d DESC
|
|
LIMIT 1
|
|
) AS top_category_sales_7d
|
|
FROM per_product
|
|
""")).mappings().first()
|
|
|
|
if not row:
|
|
return {}
|
|
current = _to_float(row.get("overall_sales_7d"))
|
|
previous = _to_float(row.get("overall_sales_prev_7d"))
|
|
delta_pct = ((current - previous) / previous * 100) if previous else None
|
|
latest_date = row.get("latest_sales_date")
|
|
return {
|
|
"overall_latest_sales_date": latest_date.isoformat() if hasattr(latest_date, "isoformat") else str(latest_date or ""),
|
|
"overall_sales_7d": round(current, 2),
|
|
"overall_sales_prev_7d": round(previous, 2),
|
|
"overall_sales_delta_pct": round(delta_pct, 1) if delta_pct is not None else None,
|
|
"overall_qty_7d": round(_to_float(row.get("overall_qty_7d")), 2),
|
|
"active_product_count": int(row.get("active_product_count") or 0),
|
|
"declining_product_count": int(row.get("declining_product_count") or 0),
|
|
"top_category": row.get("top_category") or "",
|
|
"top_category_sales_7d": round(_to_float(row.get("top_category_sales_7d")), 2),
|
|
}
|
|
|
|
|
|
def _json_dict(value: Any) -> dict[str, Any]:
|
|
if not value:
|
|
return {}
|
|
if isinstance(value, dict):
|
|
return value
|
|
try:
|
|
parsed = json.loads(value)
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _match_score_from_quality(value: Any) -> float:
|
|
score = _to_float(value)
|
|
if score > 1:
|
|
score = score / 100
|
|
return max(0, min(1, score))
|
|
|
|
|
|
def _external_price_basis(raw_payload: dict[str, Any]) -> str:
|
|
price_basis = str(raw_payload.get("price_basis") or "").strip()
|
|
return price_basis if price_basis in {"total_price", "unit_price"} else "total_price"
|
|
|
|
|
|
def _external_price_basis_label(price_basis: str) -> str:
|
|
return "單位價" if price_basis == "unit_price" else "商品總價"
|
|
|
|
|
|
def _fetch_normalized_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
|
|
inspector = inspect(conn)
|
|
if not inspector.has_table("external_offers"):
|
|
return {}
|
|
ids = [str(item).strip() for item in pchome_product_ids if str(item or "").strip()]
|
|
if not ids:
|
|
return {}
|
|
|
|
if conn.dialect.name == "postgresql":
|
|
sql = """
|
|
WITH latest_offer AS (
|
|
SELECT DISTINCT ON (eo.pchome_product_id)
|
|
eo.pchome_product_id,
|
|
eo.source_product_id AS momo_sku,
|
|
eo.title AS momo_name,
|
|
eo.price AS momo_price,
|
|
eo.quality_score,
|
|
eo.quality_notes_json,
|
|
eo.raw_payload_json,
|
|
eo.observed_at,
|
|
eo.ingestion_method
|
|
FROM external_offers eo
|
|
WHERE eo.source_code = 'momo_reference'
|
|
AND eo.pchome_product_id IS NOT NULL
|
|
AND eo.pchome_product_id IN :ids
|
|
AND eo.price IS NOT NULL
|
|
AND eo.price > 0
|
|
AND COALESCE(eo.quality_score, 0) >= 76
|
|
AND eo.match_status IN ('verified', 'usable', 'reviewed', 'exact', 'confirmed')
|
|
AND eo.data_quality_status IN ('verified', 'usable', 'reviewed')
|
|
AND (eo.expires_at IS NULL OR eo.expires_at > CURRENT_TIMESTAMP)
|
|
ORDER BY eo.pchome_product_id, eo.observed_at DESC NULLS LAST, eo.id DESC
|
|
)
|
|
SELECT * FROM latest_offer
|
|
"""
|
|
else:
|
|
sql = """
|
|
WITH latest_offer AS (
|
|
SELECT
|
|
eo.pchome_product_id,
|
|
eo.source_product_id AS momo_sku,
|
|
eo.title AS momo_name,
|
|
eo.price AS momo_price,
|
|
eo.quality_score,
|
|
eo.quality_notes_json,
|
|
eo.raw_payload_json,
|
|
eo.observed_at,
|
|
eo.ingestion_method,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY eo.pchome_product_id
|
|
ORDER BY eo.observed_at DESC, eo.id DESC
|
|
) AS rn
|
|
FROM external_offers eo
|
|
WHERE eo.source_code = 'momo_reference'
|
|
AND eo.pchome_product_id IS NOT NULL
|
|
AND eo.pchome_product_id IN :ids
|
|
AND eo.price IS NOT NULL
|
|
AND eo.price > 0
|
|
AND COALESCE(eo.quality_score, 0) >= 76
|
|
AND eo.match_status IN ('verified', 'usable', 'reviewed', 'exact', 'confirmed')
|
|
AND eo.data_quality_status IN ('verified', 'usable', 'reviewed')
|
|
)
|
|
SELECT *
|
|
FROM latest_offer
|
|
WHERE rn = 1
|
|
"""
|
|
|
|
stmt = text(sql).bindparams(bindparam("ids", expanding=True))
|
|
rows = conn.execute(stmt, {"ids": ids}).mappings().all()
|
|
result: dict[str, dict[str, Any]] = {}
|
|
for row in rows:
|
|
raw_payload = _json_dict(row.get("raw_payload_json"))
|
|
price_basis = _external_price_basis(raw_payload)
|
|
unit_price_comparison = _json_dict(raw_payload.get("unit_price_comparison"))
|
|
key = str(row.get("pchome_product_id") or "").strip()
|
|
if not key:
|
|
continue
|
|
result[key] = {
|
|
"pchome_product_id": key,
|
|
"pchome_public_name": raw_payload.get("pchome_public_name"),
|
|
"momo_sku": row.get("momo_sku"),
|
|
"momo_name": row.get("momo_name"),
|
|
"momo_price": row.get("momo_price"),
|
|
"pchome_price": raw_payload.get("pchome_public_price"),
|
|
"price_basis": price_basis,
|
|
"price_basis_label": _external_price_basis_label(price_basis),
|
|
"unit_label": unit_price_comparison.get("unit_label"),
|
|
"momo_unit_price": unit_price_comparison.get("momo_unit_price"),
|
|
"pchome_unit_price": unit_price_comparison.get("competitor_unit_price"),
|
|
"unit_gap_pct": unit_price_comparison.get("unit_gap_pct"),
|
|
"momo_total_quantity": unit_price_comparison.get("momo_total_quantity"),
|
|
"pchome_total_quantity": unit_price_comparison.get("competitor_total_quantity"),
|
|
"match_score": _match_score_from_quality(row.get("quality_score")),
|
|
"tags": raw_payload.get("tags") or ["external_offers", "verified"],
|
|
"crawled_at": row.get("observed_at"),
|
|
"momo_price_at": row.get("observed_at"),
|
|
"data_source": "external_offers",
|
|
"ingestion_method": row.get("ingestion_method"),
|
|
}
|
|
return result
|
|
|
|
|
|
def _fetch_legacy_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
|
|
inspector = inspect(conn)
|
|
if not all(inspector.has_table(table) for table in {"competitor_prices", "products", "price_records"}):
|
|
return {}
|
|
ids = [str(item).strip() for item in pchome_product_ids if str(item or "").strip()]
|
|
if not ids:
|
|
return {}
|
|
|
|
if conn.dialect.name == "postgresql":
|
|
sql = """
|
|
WITH valid_cp AS (
|
|
SELECT DISTINCT ON (cp.competitor_product_id)
|
|
cp.competitor_product_id AS pchome_product_id,
|
|
cp.competitor_product_name AS pchome_public_name,
|
|
cp.sku AS momo_sku,
|
|
cp.price AS pchome_price,
|
|
cp.match_score,
|
|
cp.tags,
|
|
cp.crawled_at
|
|
FROM competitor_prices cp
|
|
WHERE cp.source = 'pchome'
|
|
AND cp.competitor_product_id IS NOT NULL
|
|
AND cp.price IS NOT NULL
|
|
AND cp.price > 0
|
|
AND COALESCE(cp.match_score, 0) >= 0.76
|
|
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
|
|
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
|
|
AND cp.competitor_product_id IN :ids
|
|
ORDER BY cp.competitor_product_id, cp.crawled_at DESC NULLS LAST
|
|
)
|
|
SELECT vc.*, lm.momo_name, lm.momo_price, lm.momo_price_at
|
|
FROM valid_cp vc
|
|
LEFT JOIN LATERAL (
|
|
SELECT
|
|
p.name AS momo_name,
|
|
pr.price AS momo_price,
|
|
pr.timestamp AS momo_price_at
|
|
FROM products p
|
|
JOIN price_records pr ON pr.product_id = p.id
|
|
WHERE p.i_code = vc.momo_sku
|
|
AND p.status = 'ACTIVE'
|
|
ORDER BY pr.timestamp DESC, pr.id DESC
|
|
LIMIT 1
|
|
) lm ON TRUE
|
|
"""
|
|
else:
|
|
sql = """
|
|
WITH latest_cp AS (
|
|
SELECT
|
|
cp.competitor_product_id AS pchome_product_id,
|
|
cp.competitor_product_name AS pchome_public_name,
|
|
cp.sku AS momo_sku,
|
|
cp.price AS pchome_price,
|
|
cp.match_score,
|
|
cp.tags,
|
|
cp.crawled_at,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY cp.competitor_product_id
|
|
ORDER BY cp.crawled_at DESC
|
|
) AS rn
|
|
FROM competitor_prices cp
|
|
WHERE cp.source = 'pchome'
|
|
AND cp.competitor_product_id IS NOT NULL
|
|
AND cp.price IS NOT NULL
|
|
AND cp.price > 0
|
|
AND COALESCE(cp.match_score, 0) >= 0.76
|
|
AND COALESCE(cp.tags, '') LIKE '%identity_v2%'
|
|
AND cp.competitor_product_id IN :ids
|
|
),
|
|
latest_momo AS (
|
|
SELECT
|
|
p.i_code AS momo_sku,
|
|
p.name AS momo_name,
|
|
pr.price AS momo_price,
|
|
pr.timestamp AS momo_price_at,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY p.i_code
|
|
ORDER BY pr.timestamp DESC, pr.id DESC
|
|
) AS rn
|
|
FROM products p
|
|
JOIN price_records pr ON pr.product_id = p.id
|
|
WHERE p.status = 'ACTIVE'
|
|
AND p.i_code IN (SELECT momo_sku FROM latest_cp)
|
|
)
|
|
SELECT cp.pchome_product_id, cp.pchome_public_name, cp.momo_sku,
|
|
cp.pchome_price, cp.match_score, cp.tags, cp.crawled_at,
|
|
lm.momo_name, lm.momo_price, lm.momo_price_at
|
|
FROM latest_cp cp
|
|
LEFT JOIN latest_momo lm ON lm.momo_sku = cp.momo_sku AND lm.rn = 1
|
|
WHERE cp.rn = 1
|
|
"""
|
|
|
|
stmt = text(sql).bindparams(bindparam("ids", expanding=True))
|
|
rows = conn.execute(stmt, {"ids": ids}).mappings().all()
|
|
result: dict[str, dict[str, Any]] = {}
|
|
for row in rows:
|
|
key = str(row.get("pchome_product_id") or "").strip()
|
|
if key:
|
|
item = dict(row)
|
|
item["data_source"] = "competitor_prices"
|
|
result[key] = item
|
|
return result
|
|
|
|
|
|
def _fetch_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
|
|
normalized_map = _fetch_normalized_external_price_map(conn, pchome_product_ids)
|
|
missing_ids = [
|
|
str(item).strip()
|
|
for item in pchome_product_ids
|
|
if str(item or "").strip() and str(item).strip() not in normalized_map
|
|
]
|
|
legacy_map = _fetch_legacy_external_price_map(conn, missing_ids)
|
|
return {**legacy_map, **normalized_map}
|
|
|
|
|
|
def _fetch_review_candidate_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
|
|
inspector = inspect(conn)
|
|
if not inspector.has_table("external_offers"):
|
|
return {}
|
|
ids = [str(item).strip() for item in pchome_product_ids if str(item or "").strip()]
|
|
if not ids:
|
|
return {}
|
|
|
|
if conn.dialect.name == "postgresql":
|
|
sql = """
|
|
WITH latest_review AS (
|
|
SELECT DISTINCT ON (eo.pchome_product_id)
|
|
eo.id,
|
|
eo.pchome_product_id,
|
|
eo.source_product_id AS momo_sku,
|
|
eo.title AS momo_name,
|
|
eo.product_url,
|
|
eo.price AS momo_price,
|
|
eo.quality_score,
|
|
eo.raw_payload_json,
|
|
eo.observed_at
|
|
FROM external_offers eo
|
|
WHERE eo.source_code = 'momo_reference'
|
|
AND eo.ingestion_method = 'targeted_momo_review'
|
|
AND eo.pchome_product_id IS NOT NULL
|
|
AND eo.pchome_product_id IN :ids
|
|
AND (
|
|
eo.match_status = 'needs_review'
|
|
OR eo.data_quality_status = 'needs_review'
|
|
)
|
|
ORDER BY eo.pchome_product_id, eo.observed_at DESC NULLS LAST, eo.id DESC
|
|
)
|
|
SELECT * FROM latest_review
|
|
"""
|
|
else:
|
|
sql = """
|
|
WITH latest_review AS (
|
|
SELECT
|
|
eo.id,
|
|
eo.pchome_product_id,
|
|
eo.source_product_id AS momo_sku,
|
|
eo.title AS momo_name,
|
|
eo.product_url,
|
|
eo.price AS momo_price,
|
|
eo.quality_score,
|
|
eo.raw_payload_json,
|
|
eo.observed_at,
|
|
ROW_NUMBER() OVER (
|
|
PARTITION BY eo.pchome_product_id
|
|
ORDER BY eo.observed_at DESC, eo.id DESC
|
|
) AS rn
|
|
FROM external_offers eo
|
|
WHERE eo.source_code = 'momo_reference'
|
|
AND eo.ingestion_method = 'targeted_momo_review'
|
|
AND eo.pchome_product_id IS NOT NULL
|
|
AND eo.pchome_product_id IN :ids
|
|
AND (
|
|
eo.match_status = 'needs_review'
|
|
OR eo.data_quality_status = 'needs_review'
|
|
)
|
|
)
|
|
SELECT *
|
|
FROM latest_review
|
|
WHERE rn = 1
|
|
"""
|
|
|
|
stmt = text(sql).bindparams(bindparam("ids", expanding=True))
|
|
rows = conn.execute(stmt, {"ids": ids}).mappings().all()
|
|
result: dict[str, dict[str, Any]] = {}
|
|
for row in rows:
|
|
key = str(row.get("pchome_product_id") or "").strip()
|
|
if not key:
|
|
continue
|
|
raw_payload = _json_dict(row.get("raw_payload_json"))
|
|
reasons = raw_payload.get("match_reasons") if isinstance(raw_payload.get("match_reasons"), list) else []
|
|
result[key] = {
|
|
"id": row.get("id"),
|
|
"pchome_product_id": key,
|
|
"pchome_product_name": raw_payload.get("pchome_public_name"),
|
|
"pchome_price": raw_payload.get("pchome_public_price"),
|
|
"momo_sku": row.get("momo_sku"),
|
|
"momo_name": row.get("momo_name"),
|
|
"momo_price": row.get("momo_price"),
|
|
"product_url": row.get("product_url"),
|
|
"quality_score": round(_to_float(row.get("quality_score")), 2),
|
|
"match_score": _match_score_from_quality(row.get("quality_score")),
|
|
"match_reasons": [str(reason) for reason in reasons[:5]],
|
|
"gap_pct": raw_payload.get("target_gap_pct"),
|
|
"observed_at": str(row.get("observed_at") or ""),
|
|
}
|
|
return result
|
|
|
|
|
|
def _score_opportunity(
|
|
sales_row: dict[str, Any],
|
|
external_row: dict[str, Any] | None,
|
|
review_candidate: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
sales_7d = _to_float(sales_row.get("sales_7d"))
|
|
sales_prev_7d = _to_float(sales_row.get("sales_prev_7d"))
|
|
qty_7d = _to_float(sales_row.get("qty_7d"))
|
|
sales_delta_pct = None
|
|
if sales_prev_7d > 0:
|
|
sales_delta_pct = (sales_7d - sales_prev_7d) / sales_prev_7d * 100
|
|
|
|
volume_score = min(34, max(sales_7d, sales_prev_7d) / 80000 * 34)
|
|
qty_score = min(10, qty_7d / 25 * 10)
|
|
decline_score = min(24, abs(sales_delta_pct) / 45 * 24) if sales_delta_pct is not None and sales_delta_pct < 0 else 0
|
|
data_quality_score = 54
|
|
external_payload = None
|
|
review_candidate_payload = None
|
|
action_code = "map_external_product"
|
|
action_label = "先補商品對應"
|
|
action_message = "這項商品已有業績訊號,但還沒有可確認的 MOMO 對照商品。先補對應,後續才能判斷價格壓力。"
|
|
reason_lines = []
|
|
|
|
if external_row:
|
|
price_basis = str(external_row.get("price_basis") or "total_price")
|
|
price_basis_label = external_row.get("price_basis_label") or _external_price_basis_label(price_basis)
|
|
pchome_price = _to_float(external_row.get("pchome_price"))
|
|
momo_price = _to_float(external_row.get("momo_price"))
|
|
pchome_compare_price = pchome_price
|
|
momo_compare_price = momo_price
|
|
if price_basis == "unit_price":
|
|
pchome_unit_price = _to_float(external_row.get("pchome_unit_price"))
|
|
momo_unit_price = _to_float(external_row.get("momo_unit_price"))
|
|
pchome_compare_price = pchome_unit_price
|
|
momo_compare_price = momo_unit_price
|
|
gap_pct = _to_float(external_row.get("unit_gap_pct"), default=None)
|
|
if gap_pct is None and pchome_compare_price and momo_compare_price:
|
|
gap_pct = (momo_compare_price - pchome_compare_price) / pchome_compare_price * 100
|
|
else:
|
|
pchome_unit_price = None
|
|
momo_unit_price = None
|
|
gap_pct = ((momo_price - pchome_price) / pchome_price * 100) if pchome_price else None
|
|
tags = _load_json_tags(external_row.get("tags"))
|
|
data_quality_score = 78 + min(12, _to_float(external_row.get("match_score")) * 12)
|
|
external_payload = {
|
|
"source": "MOMO",
|
|
"data_source": external_row.get("data_source") or "competitor_prices",
|
|
"data_source_label": "自動同步資料層"
|
|
if external_row.get("data_source") == "external_offers"
|
|
else "舊比價快取",
|
|
"momo_sku": external_row.get("momo_sku"),
|
|
"momo_name": external_row.get("momo_name"),
|
|
"momo_price": round(momo_price, 2) if momo_price else None,
|
|
"pchome_price": round(pchome_price, 2) if pchome_price else None,
|
|
"price_basis": price_basis,
|
|
"price_basis_label": price_basis_label,
|
|
"unit_label": external_row.get("unit_label") or "",
|
|
"momo_unit_price": round(momo_unit_price, 4) if momo_unit_price else None,
|
|
"pchome_unit_price": round(pchome_unit_price, 4) if pchome_unit_price else None,
|
|
"momo_total_quantity": external_row.get("momo_total_quantity"),
|
|
"pchome_total_quantity": external_row.get("pchome_total_quantity"),
|
|
"gap_pct": round(gap_pct, 1) if gap_pct is not None else None,
|
|
"match_score": round(_to_float(external_row.get("match_score")), 3),
|
|
"tags": tags,
|
|
"updated_at": str(external_row.get("crawled_at") or ""),
|
|
}
|
|
|
|
if gap_pct is not None and gap_pct < -5:
|
|
action_code = "review_price_or_promo"
|
|
action_label = "檢查售價與活動"
|
|
action_message = "MOMO 外部參考價比較低,建議檢查 PChome 售價、活動組合或曝光策略。"
|
|
elif gap_pct is not None and gap_pct > 5:
|
|
action_code = "amplify_price_advantage"
|
|
action_label = "放大價格優勢"
|
|
action_message = "PChome 目前有價格優勢,適合檢查曝光、文案與活動位置。"
|
|
elif sales_delta_pct is not None and sales_delta_pct < -10:
|
|
action_code = "recover_sales_momentum"
|
|
action_label = "找回銷售動能"
|
|
action_message = "價格差距不大,但近 7 天業績轉弱,建議檢查曝光、庫存與商品頁內容。"
|
|
else:
|
|
action_code = "monitor"
|
|
action_label = "持續觀察"
|
|
action_message = "業績與外部價格暫無明顯異常,先保留在觀察清單。"
|
|
|
|
if gap_pct is not None:
|
|
basis_prefix = "單位價" if price_basis == "unit_price" else "價格"
|
|
if gap_pct > 0:
|
|
reason_lines.append(f"PChome {basis_prefix}目前比 MOMO 低約 {abs(gap_pct):.1f}%。")
|
|
elif gap_pct < 0:
|
|
reason_lines.append(f"MOMO {basis_prefix}目前比 PChome 低約 {abs(gap_pct):.1f}%。")
|
|
else:
|
|
reason_lines.append(f"PChome 與 MOMO {basis_prefix}幾乎相同。")
|
|
else:
|
|
if review_candidate:
|
|
review_candidate_payload = {
|
|
"id": review_candidate.get("id"),
|
|
"momo_sku": review_candidate.get("momo_sku"),
|
|
"momo_name": review_candidate.get("momo_name"),
|
|
"momo_price": review_candidate.get("momo_price"),
|
|
"pchome_price": review_candidate.get("pchome_price"),
|
|
"quality_score": review_candidate.get("quality_score"),
|
|
"gap_pct": review_candidate.get("gap_pct"),
|
|
"match_reasons": review_candidate.get("match_reasons") or [],
|
|
"product_url": review_candidate.get("product_url"),
|
|
}
|
|
data_quality_score = max(data_quality_score, 62)
|
|
action_code = "review_external_candidate"
|
|
action_label = "確認候選"
|
|
action_message = "已找到 MOMO 候選,但還要確認同款、色號或組合後才能進價格判斷。"
|
|
reason_lines.append("已找到 MOMO 候選,先確認同款、色號或組合。")
|
|
else:
|
|
data_quality_score -= 12
|
|
reason_lines.append("尚未找到可確認的 MOMO 對照商品。")
|
|
|
|
if sales_delta_pct is None:
|
|
reason_lines.append("前 7 天沒有可比基準,先看近 7 天表現。")
|
|
elif sales_delta_pct < 0:
|
|
reason_lines.append(f"近 7 天業績比前 7 天少約 {abs(sales_delta_pct):.1f}%。")
|
|
elif sales_delta_pct > 0:
|
|
reason_lines.append(f"近 7 天業績比前 7 天多約 {sales_delta_pct:.1f}%。")
|
|
else:
|
|
reason_lines.append("近 7 天業績與前 7 天大致持平。")
|
|
|
|
if sales_7d > 0:
|
|
reason_lines.append(f"近 7 天業績約 NT$ {sales_7d:,.0f},銷量 {qty_7d:,.0f}。")
|
|
|
|
mapping_gap_score = 18 if not external_row and max(sales_7d, sales_prev_7d) > 0 else 0
|
|
priority_score = min(100, volume_score + qty_score + decline_score + mapping_gap_score + data_quality_score * 0.18)
|
|
if external_payload and external_payload.get("gap_pct") is not None:
|
|
gap = float(external_payload["gap_pct"])
|
|
if gap < -5:
|
|
priority_score = min(100, priority_score + min(18, abs(gap) / 20 * 18))
|
|
elif gap > 5:
|
|
priority_score = min(100, priority_score + min(10, gap / 25 * 10))
|
|
|
|
issues = []
|
|
if not external_row:
|
|
issues.append("需要確認 MOMO 候選" if review_candidate_payload else "需要補商品對應")
|
|
if sales_delta_pct is None:
|
|
issues.append("前期業績不足")
|
|
|
|
return {
|
|
"pchome_product_id": str(sales_row.get("pchome_product_id") or ""),
|
|
"product_name": sales_row.get("product_name") or "未命名商品",
|
|
"category": sales_row.get("category") or "",
|
|
"vendor": sales_row.get("vendor") or "",
|
|
"sales_7d": round(sales_7d, 2),
|
|
"sales_prev_7d": round(sales_prev_7d, 2),
|
|
"sales_delta_pct": round(sales_delta_pct, 1) if sales_delta_pct is not None else None,
|
|
"qty_7d": round(qty_7d, 2),
|
|
"pchome_price": round(_to_float(sales_row.get("pchome_price")), 2)
|
|
if _to_float(sales_row.get("pchome_price")) > 0
|
|
else None,
|
|
"last_sale_date": str(sales_row.get("last_sale_date") or ""),
|
|
"external_price": external_payload,
|
|
"review_candidate": review_candidate_payload,
|
|
"priority_score": round(priority_score, 1),
|
|
"recommended_action": {
|
|
"code": action_code,
|
|
"label": action_label,
|
|
"message": action_message,
|
|
},
|
|
"reason_lines": reason_lines[:4],
|
|
"data_quality": {
|
|
"label": (
|
|
"資料可用單位價判斷"
|
|
if external_payload and external_payload.get("price_basis") == "unit_price"
|
|
else "資料可直接判斷"
|
|
if external_row
|
|
else "候選待確認"
|
|
if review_candidate_payload
|
|
else "需要補資料"
|
|
),
|
|
"score": round(max(0, min(100, data_quality_score)), 1),
|
|
"issues": issues,
|
|
},
|
|
}
|
|
|
|
|
|
def build_pchome_growth_opportunities(engine, limit: int = 20) -> dict[str, Any]:
|
|
"""讀取 PChome 業績與已驗證外部價格,產生營運用作戰清單。"""
|
|
limit = max(5, min(int(limit or 20), 50))
|
|
generated_at = datetime.now().isoformat(timespec="seconds")
|
|
source_readiness = build_external_source_readiness(engine)
|
|
review_candidate_count = int(source_readiness.get("review_offer_count") or 0)
|
|
source_scope = {
|
|
"primary_goal": "提升 PChome 業績",
|
|
"primary_sales_source": PRIMARY_SALES_SOURCE,
|
|
"active_external_sources": list(ACTIVE_EXTERNAL_SOURCES),
|
|
"paused_external_sources": list(PAUSED_EXTERNAL_SOURCES),
|
|
"plain_note": "蝦皮與酷澎先暫停,不進作戰清單,也不發告警。",
|
|
"source_readiness": source_readiness,
|
|
}
|
|
|
|
if not _table_exists(engine, "daily_sales_snapshot"):
|
|
return {
|
|
"success": True,
|
|
"system_name": SYSTEM_DISPLAY_NAME,
|
|
"generated_at": generated_at,
|
|
"source_scope": source_scope,
|
|
"stats": {
|
|
"latest_sales_date": None,
|
|
"candidate_count": 0,
|
|
"mapped_count": 0,
|
|
"mapping_rate": 0,
|
|
"needs_mapping_count": 0,
|
|
"review_candidate_count": review_candidate_count,
|
|
},
|
|
"opportunities": [],
|
|
"message": "目前還沒有 PChome 業績資料,請先完成業績匯入。",
|
|
}
|
|
|
|
with engine.connect() as conn:
|
|
sales_summary = _fetch_sales_summary(conn)
|
|
try:
|
|
sales_rows, latest_sales_date = _fetch_sales_rows(conn, limit=limit)
|
|
except RuntimeError as exc:
|
|
return {
|
|
"success": True,
|
|
"system_name": SYSTEM_DISPLAY_NAME,
|
|
"generated_at": generated_at,
|
|
"source_scope": source_scope,
|
|
"stats": {
|
|
"latest_sales_date": sales_summary.get("overall_latest_sales_date"),
|
|
"candidate_count": 0,
|
|
"mapped_count": 0,
|
|
"mapping_rate": 0,
|
|
"needs_mapping_count": 0,
|
|
"review_candidate_count": review_candidate_count,
|
|
"total_sales_7d": 0,
|
|
"opportunity_sales_7d": 0,
|
|
"action_counts": {},
|
|
"action_code_counts": {},
|
|
"external_data_source_counts": {},
|
|
**sales_summary,
|
|
},
|
|
"opportunities": [],
|
|
"message": str(exc),
|
|
}
|
|
sales_ids = [str(row.get("pchome_product_id") or "") for row in sales_rows]
|
|
external_map = _fetch_external_price_map(conn, sales_ids)
|
|
review_candidate_map = _fetch_review_candidate_map(conn, sales_ids)
|
|
|
|
opportunities = []
|
|
for row in sales_rows:
|
|
key = str(row.get("pchome_product_id") or "").strip()
|
|
opportunities.append(_score_opportunity(row, external_map.get(key), review_candidate_map.get(key)))
|
|
|
|
opportunities.sort(key=lambda item: item["priority_score"], reverse=True)
|
|
opportunities = opportunities[:limit]
|
|
needs_mapping_count = sum(1 for item in opportunities if not item.get("external_price"))
|
|
mapped_count = len(opportunities) - needs_mapping_count
|
|
mapping_rate = round(mapped_count / max(len(opportunities), 1) * 100, 1)
|
|
action_counts: dict[str, int] = {}
|
|
action_code_counts: dict[str, int] = {}
|
|
external_data_source_counts: dict[str, int] = {}
|
|
for item in opportunities:
|
|
label = item["recommended_action"]["label"]
|
|
code = item["recommended_action"]["code"]
|
|
action_counts[label] = action_counts.get(label, 0) + 1
|
|
action_code_counts[code] = action_code_counts.get(code, 0) + 1
|
|
external_price = item.get("external_price") or {}
|
|
data_source_label = external_price.get("data_source_label")
|
|
if data_source_label:
|
|
external_data_source_counts[data_source_label] = (
|
|
external_data_source_counts.get(data_source_label, 0) + 1
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"system_name": SYSTEM_DISPLAY_NAME,
|
|
"generated_at": generated_at,
|
|
"source_scope": source_scope,
|
|
"stats": {
|
|
"latest_sales_date": latest_sales_date or sales_summary.get("overall_latest_sales_date"),
|
|
"candidate_count": len(opportunities),
|
|
"mapped_count": mapped_count,
|
|
"mapping_rate": mapping_rate,
|
|
"needs_mapping_count": needs_mapping_count,
|
|
"review_candidate_count": review_candidate_count,
|
|
"total_sales_7d": round(sum(_to_float(item.get("sales_7d")) for item in opportunities), 2),
|
|
"opportunity_sales_7d": round(sum(_to_float(item.get("sales_7d")) for item in opportunities), 2),
|
|
"action_counts": action_counts,
|
|
"action_code_counts": action_code_counts,
|
|
"external_data_source_counts": external_data_source_counts,
|
|
**sales_summary,
|
|
},
|
|
"opportunities": opportunities,
|
|
"message": "已整理今日 PChome 業績成長作戰清單。",
|
|
}
|