Files
ewoooc/services/pchome_revenue_growth_service.py
ogt e6deaa4711
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
feat: link growth dashboard metrics to details
2026-06-24 14:28:17 +08:00

931 lines
41 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""PChome 業績成長自動化作戰系統的只讀作戰清單。"""
from __future__ import annotations
import json
import logging
from datetime import datetime
from typing import Any
from sqlalchemy import bindparam, inspect, text
from services.external_market_offer_service import build_external_source_readiness
logger = logging.getLogger(__name__)
SYSTEM_DISPLAY_NAME = "PChome 業績成長自動化作戰系統"
PRIMARY_SALES_SOURCE = "PChome 後台業績"
ACTIVE_EXTERNAL_SOURCES = ("MOMO 外部價格參考",)
PAUSED_EXTERNAL_SOURCES = ("蝦皮", "酷澎")
def _to_float(value: Any, default: float = 0.0) -> float:
if value is None:
return default
try:
return float(value)
except (TypeError, ValueError):
return default
def _quote_identifier(identifier: str) -> str:
return '"' + identifier.replace('"', '""') + '"'
def _first_available(columns: set[str], candidates: list[str]) -> str | None:
return next((col for col in candidates if col in columns), None)
def _load_json_tags(value: Any) -> list[str]:
if not value:
return []
if isinstance(value, list):
return value
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, list) else []
except Exception:
return []
def _table_exists(engine, table_name: str) -> bool:
try:
return inspect(engine).has_table(table_name)
except Exception:
logger.warning("[PChomeGrowth] table probe failed: %s", table_name, exc_info=True)
return False
def _daily_sales_columns(conn) -> dict[str, str | None]:
if conn.dialect.name == "postgresql":
rows = conn.execute(text("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'daily_sales_snapshot'
""")).fetchall()
columns = {row[0] for row in rows}
elif conn.dialect.name == "sqlite":
rows = conn.execute(text("PRAGMA table_info(daily_sales_snapshot)")).fetchall()
columns = {row[1] for row in rows}
else:
result = conn.execute(text("SELECT * FROM daily_sales_snapshot LIMIT 0"))
columns = set(result.keys())
return {
"sku": _first_available(columns, ["商品ID", "Product ID", "ID", "Item Code"]),
"name": _first_available(columns, ["商品名稱", "商品名", "品名", "Product Name", "Name"]),
"date": _first_available(columns, ["snapshot_date", "日期", "訂單日期", "交易日期", "Date"]),
"revenue": _first_available(columns, ["總業績", "銷售金額", "業績", "金額", "Amount", "Sales", "Total"]),
"qty": _first_available(columns, ["數量", "銷售數量", "銷量", "Qty", "Quantity"]),
"price": _first_available(columns, ["商品單位售價", "單價", "售價", "Price", "Unit Price"]),
"category": _first_available(columns, ["商品館", "館別", "分類", "Category"]),
"vendor": _first_available(columns, ["廠商名稱", "供應商", "Vendor"]),
}
def _as_text_expr(identifier_or_expr: str, dialect_name: str, *, raw: bool = False) -> str:
expr = identifier_or_expr if raw else _quote_identifier(identifier_or_expr)
if dialect_name == "postgresql":
return f"{expr}::text"
return f"CAST({expr} AS TEXT)"
def _numeric_expr(identifier: str, dialect_name: str) -> str:
quoted = _quote_identifier(identifier)
if dialect_name == "postgresql":
return f"COALESCE(NULLIF(regexp_replace({quoted}::text, '[^0-9.-]', '', 'g'), '')::numeric, 0)"
return f"COALESCE(CAST(REPLACE(CAST({quoted} AS TEXT), ',', '') AS REAL), 0)"
def _fetch_sales_rows(conn, limit: int) -> tuple[list[dict[str, Any]], str | None]:
cols = _daily_sales_columns(conn)
missing = [key for key in ["sku", "name", "date", "revenue"] if not cols.get(key)]
if missing:
raise RuntimeError("PChome 業績檔缺少必要欄位:" + "".join(missing))
dialect = conn.dialect.name
sku_col = _quote_identifier(cols["sku"])
name_col = _quote_identifier(cols["name"])
date_col = _quote_identifier(cols["date"])
revenue_expr = _numeric_expr(cols["revenue"], dialect)
qty_expr = _numeric_expr(cols["qty"], dialect) if cols.get("qty") else "0"
price_expr = _numeric_expr(cols["price"], dialect) if cols.get("price") else "0"
category_text = _as_text_expr(cols["category"], dialect) if cols.get("category") else "NULL"
vendor_text = _as_text_expr(cols["vendor"], dialect) if cols.get("vendor") else "NULL"
sku_text = _as_text_expr(cols["sku"], dialect)
name_text = _as_text_expr(cols["name"], dialect)
candidate_limit = max(limit * 4, 80)
if dialect == "postgresql":
sale_date_expr = f"NULLIF({date_col}::text, '')::date"
curr_window = "lw.latest_date - INTERVAL '6 days'"
prev_window_start = "lw.latest_date - INTERVAL '13 days'"
prev_window_end = "lw.latest_date - INTERVAL '6 days'"
else:
sale_date_expr = f"date({date_col})"
curr_window = "date(lw.latest_date, '-6 days')"
prev_window_start = "date(lw.latest_date, '-13 days')"
prev_window_end = "date(lw.latest_date, '-6 days')"
order_metric = "CASE WHEN sales_7d >= sales_prev_7d THEN sales_7d ELSE sales_prev_7d END"
rows = conn.execute(text(f"""
WITH sales_rows AS (
SELECT
NULLIF(TRIM({sku_text}), '') AS pchome_product_id,
NULLIF(TRIM({name_text}), '') AS product_name,
NULLIF(TRIM({_as_text_expr(category_text, dialect, raw=True)}), '') AS category,
NULLIF(TRIM({_as_text_expr(vendor_text, dialect, raw=True)}), '') AS vendor,
{sale_date_expr} AS sale_date,
{revenue_expr} AS revenue,
{qty_expr} AS qty,
{price_expr} AS unit_price
FROM daily_sales_snapshot
WHERE {sku_col} IS NOT NULL
),
latest_window AS (
SELECT MAX(sale_date) AS latest_date
FROM sales_rows
WHERE sale_date IS NOT NULL
),
sales AS (
SELECT
sr.pchome_product_id,
MAX(sr.product_name) AS product_name,
MAX(sr.category) AS category,
MAX(sr.vendor) AS vendor,
SUM(CASE WHEN sr.sale_date >= {curr_window}
THEN sr.revenue ELSE 0 END) AS sales_7d,
SUM(CASE WHEN sr.sale_date >= {prev_window_start}
AND sr.sale_date < {prev_window_end}
THEN sr.revenue ELSE 0 END) AS sales_prev_7d,
SUM(CASE WHEN sr.sale_date >= {curr_window}
THEN sr.qty ELSE 0 END) AS qty_7d,
CASE
WHEN SUM(CASE WHEN sr.sale_date >= {curr_window} THEN sr.qty ELSE 0 END) > 0
THEN SUM(CASE WHEN sr.sale_date >= {curr_window} THEN sr.revenue ELSE 0 END)
/ NULLIF(SUM(CASE WHEN sr.sale_date >= {curr_window} THEN sr.qty ELSE 0 END), 0)
ELSE NULLIF(MAX(CASE WHEN sr.sale_date >= {curr_window} THEN sr.unit_price ELSE 0 END), 0)
END AS pchome_price,
MAX(sr.sale_date) AS last_sale_date,
MAX(lw.latest_date) AS latest_sales_date
FROM sales_rows sr
CROSS JOIN latest_window lw
WHERE sr.pchome_product_id IS NOT NULL
GROUP BY sr.pchome_product_id
)
SELECT *
FROM sales
WHERE sales_7d > 0 OR sales_prev_7d > 0
ORDER BY {order_metric} DESC, qty_7d DESC
LIMIT :limit
"""), {"limit": candidate_limit}).mappings().all()
mapped_rows = [dict(row) for row in rows]
latest_date = None
for row in mapped_rows:
value = row.get("latest_sales_date")
if value:
latest_date = value.isoformat() if hasattr(value, "isoformat") else str(value)
break
return mapped_rows, latest_date
def _fetch_sales_summary(conn) -> dict[str, Any]:
cols = _daily_sales_columns(conn)
missing = [key for key in ["sku", "date", "revenue"] if not cols.get(key)]
if missing:
return {}
dialect = conn.dialect.name
sku_col = _quote_identifier(cols["sku"])
date_col = _quote_identifier(cols["date"])
revenue_expr = _numeric_expr(cols["revenue"], dialect)
qty_expr = _numeric_expr(cols["qty"], dialect) if cols.get("qty") else "0"
category_text = _as_text_expr(cols["category"], dialect) if cols.get("category") else "NULL"
sku_text = _as_text_expr(cols["sku"], dialect)
if dialect == "postgresql":
sale_date_expr = f"NULLIF({date_col}::text, '')::date"
curr_window = "lw.latest_date - INTERVAL '6 days'"
prev_window_start = "lw.latest_date - INTERVAL '13 days'"
prev_window_end = "lw.latest_date - INTERVAL '6 days'"
else:
sale_date_expr = f"date({date_col})"
curr_window = "date(lw.latest_date, '-6 days')"
prev_window_start = "date(lw.latest_date, '-13 days')"
prev_window_end = "date(lw.latest_date, '-6 days')"
row = conn.execute(text(f"""
WITH sales_rows AS (
SELECT
NULLIF(TRIM({sku_text}), '') AS pchome_product_id,
NULLIF(TRIM({_as_text_expr(category_text, dialect, raw=True)}), '') AS category,
{sale_date_expr} AS sale_date,
{revenue_expr} AS revenue,
{qty_expr} AS qty
FROM daily_sales_snapshot
WHERE {sku_col} IS NOT NULL
),
latest_window AS (
SELECT MAX(sale_date) AS latest_date
FROM sales_rows
WHERE sale_date IS NOT NULL
),
per_product AS (
SELECT
sr.pchome_product_id,
MAX(sr.category) AS category,
SUM(CASE WHEN sr.sale_date >= {curr_window}
THEN sr.revenue ELSE 0 END) AS sales_7d,
SUM(CASE WHEN sr.sale_date >= {prev_window_start}
AND sr.sale_date < {prev_window_end}
THEN sr.revenue ELSE 0 END) AS sales_prev_7d,
SUM(CASE WHEN sr.sale_date >= {curr_window}
THEN sr.qty ELSE 0 END) AS qty_7d,
MAX(lw.latest_date) AS latest_sales_date
FROM sales_rows sr
CROSS JOIN latest_window lw
WHERE sr.pchome_product_id IS NOT NULL
GROUP BY sr.pchome_product_id
),
category_sales AS (
SELECT
COALESCE(NULLIF(category, ''), '未分類') AS category,
SUM(sales_7d) AS sales_7d
FROM per_product
GROUP BY COALESCE(NULLIF(category, ''), '未分類')
)
SELECT
MAX(latest_sales_date) AS latest_sales_date,
COALESCE(SUM(sales_7d), 0) AS overall_sales_7d,
COALESCE(SUM(sales_prev_7d), 0) AS overall_sales_prev_7d,
COALESCE(SUM(qty_7d), 0) AS overall_qty_7d,
SUM(CASE WHEN sales_7d > 0 THEN 1 ELSE 0 END) AS active_product_count,
SUM(CASE WHEN sales_prev_7d > 0 AND sales_7d < sales_prev_7d THEN 1 ELSE 0 END) AS declining_product_count,
(
SELECT category
FROM category_sales
WHERE sales_7d > 0
ORDER BY sales_7d DESC
LIMIT 1
) AS top_category,
(
SELECT COALESCE(sales_7d, 0)
FROM category_sales
WHERE sales_7d > 0
ORDER BY sales_7d DESC
LIMIT 1
) AS top_category_sales_7d
FROM per_product
""")).mappings().first()
if not row:
return {}
current = _to_float(row.get("overall_sales_7d"))
previous = _to_float(row.get("overall_sales_prev_7d"))
delta_pct = ((current - previous) / previous * 100) if previous else None
latest_date = row.get("latest_sales_date")
return {
"overall_latest_sales_date": latest_date.isoformat() if hasattr(latest_date, "isoformat") else str(latest_date or ""),
"overall_sales_7d": round(current, 2),
"overall_sales_prev_7d": round(previous, 2),
"overall_sales_delta_pct": round(delta_pct, 1) if delta_pct is not None else None,
"overall_qty_7d": round(_to_float(row.get("overall_qty_7d")), 2),
"active_product_count": int(row.get("active_product_count") or 0),
"declining_product_count": int(row.get("declining_product_count") or 0),
"top_category": row.get("top_category") or "",
"top_category_sales_7d": round(_to_float(row.get("top_category_sales_7d")), 2),
}
def _json_dict(value: Any) -> dict[str, Any]:
if not value:
return {}
if isinstance(value, dict):
return value
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, dict) else {}
except Exception:
return {}
def _match_score_from_quality(value: Any) -> float:
score = _to_float(value)
if score > 1:
score = score / 100
return max(0, min(1, score))
def _external_price_basis(raw_payload: dict[str, Any]) -> str:
price_basis = str(raw_payload.get("price_basis") or "").strip()
return price_basis if price_basis in {"total_price", "unit_price"} else "total_price"
def _external_price_basis_label(price_basis: str) -> str:
return "單位價" if price_basis == "unit_price" else "商品總價"
def _fetch_normalized_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
inspector = inspect(conn)
if not inspector.has_table("external_offers"):
return {}
ids = [str(item).strip() for item in pchome_product_ids if str(item or "").strip()]
if not ids:
return {}
if conn.dialect.name == "postgresql":
sql = """
WITH latest_offer AS (
SELECT DISTINCT ON (eo.pchome_product_id)
eo.pchome_product_id,
eo.source_product_id AS momo_sku,
eo.title AS momo_name,
eo.price AS momo_price,
eo.quality_score,
eo.quality_notes_json,
eo.raw_payload_json,
eo.observed_at,
eo.ingestion_method
FROM external_offers eo
WHERE eo.source_code = 'momo_reference'
AND eo.pchome_product_id IS NOT NULL
AND eo.pchome_product_id IN :ids
AND eo.price IS NOT NULL
AND eo.price > 0
AND COALESCE(eo.quality_score, 0) >= 76
AND eo.match_status IN ('verified', 'usable', 'reviewed', 'exact', 'confirmed')
AND eo.data_quality_status IN ('verified', 'usable', 'reviewed')
AND (eo.expires_at IS NULL OR eo.expires_at > CURRENT_TIMESTAMP)
ORDER BY eo.pchome_product_id, eo.observed_at DESC NULLS LAST, eo.id DESC
)
SELECT * FROM latest_offer
"""
else:
sql = """
WITH latest_offer AS (
SELECT
eo.pchome_product_id,
eo.source_product_id AS momo_sku,
eo.title AS momo_name,
eo.price AS momo_price,
eo.quality_score,
eo.quality_notes_json,
eo.raw_payload_json,
eo.observed_at,
eo.ingestion_method,
ROW_NUMBER() OVER (
PARTITION BY eo.pchome_product_id
ORDER BY eo.observed_at DESC, eo.id DESC
) AS rn
FROM external_offers eo
WHERE eo.source_code = 'momo_reference'
AND eo.pchome_product_id IS NOT NULL
AND eo.pchome_product_id IN :ids
AND eo.price IS NOT NULL
AND eo.price > 0
AND COALESCE(eo.quality_score, 0) >= 76
AND eo.match_status IN ('verified', 'usable', 'reviewed', 'exact', 'confirmed')
AND eo.data_quality_status IN ('verified', 'usable', 'reviewed')
)
SELECT *
FROM latest_offer
WHERE rn = 1
"""
stmt = text(sql).bindparams(bindparam("ids", expanding=True))
rows = conn.execute(stmt, {"ids": ids}).mappings().all()
result: dict[str, dict[str, Any]] = {}
for row in rows:
raw_payload = _json_dict(row.get("raw_payload_json"))
price_basis = _external_price_basis(raw_payload)
unit_price_comparison = _json_dict(raw_payload.get("unit_price_comparison"))
key = str(row.get("pchome_product_id") or "").strip()
if not key:
continue
result[key] = {
"pchome_product_id": key,
"pchome_public_name": raw_payload.get("pchome_public_name"),
"momo_sku": row.get("momo_sku"),
"momo_name": row.get("momo_name"),
"momo_price": row.get("momo_price"),
"pchome_price": raw_payload.get("pchome_public_price"),
"price_basis": price_basis,
"price_basis_label": _external_price_basis_label(price_basis),
"unit_label": unit_price_comparison.get("unit_label"),
"momo_unit_price": unit_price_comparison.get("momo_unit_price"),
"pchome_unit_price": unit_price_comparison.get("competitor_unit_price"),
"unit_gap_pct": unit_price_comparison.get("unit_gap_pct"),
"momo_total_quantity": unit_price_comparison.get("momo_total_quantity"),
"pchome_total_quantity": unit_price_comparison.get("competitor_total_quantity"),
"match_score": _match_score_from_quality(row.get("quality_score")),
"tags": raw_payload.get("tags") or ["external_offers", "verified"],
"crawled_at": row.get("observed_at"),
"momo_price_at": row.get("observed_at"),
"data_source": "external_offers",
"ingestion_method": row.get("ingestion_method"),
}
return result
def _fetch_legacy_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
inspector = inspect(conn)
if not all(inspector.has_table(table) for table in {"competitor_prices", "products", "price_records"}):
return {}
ids = [str(item).strip() for item in pchome_product_ids if str(item or "").strip()]
if not ids:
return {}
if conn.dialect.name == "postgresql":
sql = """
WITH valid_cp AS (
SELECT DISTINCT ON (cp.competitor_product_id)
cp.competitor_product_id AS pchome_product_id,
cp.competitor_product_name AS pchome_public_name,
cp.sku AS momo_sku,
cp.price AS pchome_price,
cp.match_score,
cp.tags,
cp.crawled_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND cp.competitor_product_id IS NOT NULL
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= 0.76
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
AND cp.competitor_product_id IN :ids
ORDER BY cp.competitor_product_id, cp.crawled_at DESC NULLS LAST
)
SELECT vc.*, lm.momo_name, lm.momo_price, lm.momo_price_at
FROM valid_cp vc
LEFT JOIN LATERAL (
SELECT
p.name AS momo_name,
pr.price AS momo_price,
pr.timestamp AS momo_price_at
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.i_code = vc.momo_sku
AND p.status = 'ACTIVE'
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) lm ON TRUE
"""
else:
sql = """
WITH latest_cp AS (
SELECT
cp.competitor_product_id AS pchome_product_id,
cp.competitor_product_name AS pchome_public_name,
cp.sku AS momo_sku,
cp.price AS pchome_price,
cp.match_score,
cp.tags,
cp.crawled_at,
ROW_NUMBER() OVER (
PARTITION BY cp.competitor_product_id
ORDER BY cp.crawled_at DESC
) AS rn
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND cp.competitor_product_id IS NOT NULL
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= 0.76
AND COALESCE(cp.tags, '') LIKE '%identity_v2%'
AND cp.competitor_product_id IN :ids
),
latest_momo AS (
SELECT
p.i_code AS momo_sku,
p.name AS momo_name,
pr.price AS momo_price,
pr.timestamp AS momo_price_at,
ROW_NUMBER() OVER (
PARTITION BY p.i_code
ORDER BY pr.timestamp DESC, pr.id DESC
) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
AND p.i_code IN (SELECT momo_sku FROM latest_cp)
)
SELECT cp.pchome_product_id, cp.pchome_public_name, cp.momo_sku,
cp.pchome_price, cp.match_score, cp.tags, cp.crawled_at,
lm.momo_name, lm.momo_price, lm.momo_price_at
FROM latest_cp cp
LEFT JOIN latest_momo lm ON lm.momo_sku = cp.momo_sku AND lm.rn = 1
WHERE cp.rn = 1
"""
stmt = text(sql).bindparams(bindparam("ids", expanding=True))
rows = conn.execute(stmt, {"ids": ids}).mappings().all()
result: dict[str, dict[str, Any]] = {}
for row in rows:
key = str(row.get("pchome_product_id") or "").strip()
if key:
item = dict(row)
item["data_source"] = "competitor_prices"
result[key] = item
return result
def _fetch_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
normalized_map = _fetch_normalized_external_price_map(conn, pchome_product_ids)
missing_ids = [
str(item).strip()
for item in pchome_product_ids
if str(item or "").strip() and str(item).strip() not in normalized_map
]
legacy_map = _fetch_legacy_external_price_map(conn, missing_ids)
return {**legacy_map, **normalized_map}
def _fetch_review_candidate_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
inspector = inspect(conn)
if not inspector.has_table("external_offers"):
return {}
ids = [str(item).strip() for item in pchome_product_ids if str(item or "").strip()]
if not ids:
return {}
if conn.dialect.name == "postgresql":
sql = """
WITH latest_review AS (
SELECT DISTINCT ON (eo.pchome_product_id)
eo.id,
eo.pchome_product_id,
eo.source_product_id AS momo_sku,
eo.title AS momo_name,
eo.product_url,
eo.price AS momo_price,
eo.quality_score,
eo.raw_payload_json,
eo.observed_at
FROM external_offers eo
WHERE eo.source_code = 'momo_reference'
AND eo.ingestion_method = 'targeted_momo_review'
AND eo.pchome_product_id IS NOT NULL
AND eo.pchome_product_id IN :ids
AND (
eo.match_status = 'needs_review'
OR eo.data_quality_status = 'needs_review'
)
ORDER BY eo.pchome_product_id, eo.observed_at DESC NULLS LAST, eo.id DESC
)
SELECT * FROM latest_review
"""
else:
sql = """
WITH latest_review AS (
SELECT
eo.id,
eo.pchome_product_id,
eo.source_product_id AS momo_sku,
eo.title AS momo_name,
eo.product_url,
eo.price AS momo_price,
eo.quality_score,
eo.raw_payload_json,
eo.observed_at,
ROW_NUMBER() OVER (
PARTITION BY eo.pchome_product_id
ORDER BY eo.observed_at DESC, eo.id DESC
) AS rn
FROM external_offers eo
WHERE eo.source_code = 'momo_reference'
AND eo.ingestion_method = 'targeted_momo_review'
AND eo.pchome_product_id IS NOT NULL
AND eo.pchome_product_id IN :ids
AND (
eo.match_status = 'needs_review'
OR eo.data_quality_status = 'needs_review'
)
)
SELECT *
FROM latest_review
WHERE rn = 1
"""
stmt = text(sql).bindparams(bindparam("ids", expanding=True))
rows = conn.execute(stmt, {"ids": ids}).mappings().all()
result: dict[str, dict[str, Any]] = {}
for row in rows:
key = str(row.get("pchome_product_id") or "").strip()
if not key:
continue
raw_payload = _json_dict(row.get("raw_payload_json"))
reasons = raw_payload.get("match_reasons") if isinstance(raw_payload.get("match_reasons"), list) else []
result[key] = {
"id": row.get("id"),
"pchome_product_id": key,
"pchome_product_name": raw_payload.get("pchome_public_name"),
"pchome_price": raw_payload.get("pchome_public_price"),
"momo_sku": row.get("momo_sku"),
"momo_name": row.get("momo_name"),
"momo_price": row.get("momo_price"),
"product_url": row.get("product_url"),
"quality_score": round(_to_float(row.get("quality_score")), 2),
"match_score": _match_score_from_quality(row.get("quality_score")),
"match_reasons": [str(reason) for reason in reasons[:5]],
"gap_pct": raw_payload.get("target_gap_pct"),
"observed_at": str(row.get("observed_at") or ""),
}
return result
def _score_opportunity(
sales_row: dict[str, Any],
external_row: dict[str, Any] | None,
review_candidate: dict[str, Any] | None = None,
) -> dict[str, Any]:
sales_7d = _to_float(sales_row.get("sales_7d"))
sales_prev_7d = _to_float(sales_row.get("sales_prev_7d"))
qty_7d = _to_float(sales_row.get("qty_7d"))
sales_delta_pct = None
if sales_prev_7d > 0:
sales_delta_pct = (sales_7d - sales_prev_7d) / sales_prev_7d * 100
volume_score = min(34, max(sales_7d, sales_prev_7d) / 80000 * 34)
qty_score = min(10, qty_7d / 25 * 10)
decline_score = min(24, abs(sales_delta_pct) / 45 * 24) if sales_delta_pct is not None and sales_delta_pct < 0 else 0
data_quality_score = 54
external_payload = None
review_candidate_payload = None
action_code = "map_external_product"
action_label = "先補商品對應"
action_message = "這項商品已有業績訊號,但還沒有可確認的 MOMO 對照商品。先補對應,後續才能判斷價格壓力。"
reason_lines = []
if external_row:
price_basis = str(external_row.get("price_basis") or "total_price")
price_basis_label = external_row.get("price_basis_label") or _external_price_basis_label(price_basis)
pchome_price = _to_float(external_row.get("pchome_price"))
momo_price = _to_float(external_row.get("momo_price"))
pchome_compare_price = pchome_price
momo_compare_price = momo_price
if price_basis == "unit_price":
pchome_unit_price = _to_float(external_row.get("pchome_unit_price"))
momo_unit_price = _to_float(external_row.get("momo_unit_price"))
pchome_compare_price = pchome_unit_price
momo_compare_price = momo_unit_price
gap_pct = _to_float(external_row.get("unit_gap_pct"), default=None)
if gap_pct is None and pchome_compare_price and momo_compare_price:
gap_pct = (momo_compare_price - pchome_compare_price) / pchome_compare_price * 100
else:
pchome_unit_price = None
momo_unit_price = None
gap_pct = ((momo_price - pchome_price) / pchome_price * 100) if pchome_price else None
tags = _load_json_tags(external_row.get("tags"))
data_quality_score = 78 + min(12, _to_float(external_row.get("match_score")) * 12)
external_payload = {
"source": "MOMO",
"data_source": external_row.get("data_source") or "competitor_prices",
"data_source_label": "自動同步資料層"
if external_row.get("data_source") == "external_offers"
else "舊比價快取",
"momo_sku": external_row.get("momo_sku"),
"momo_name": external_row.get("momo_name"),
"momo_price": round(momo_price, 2) if momo_price else None,
"pchome_price": round(pchome_price, 2) if pchome_price else None,
"price_basis": price_basis,
"price_basis_label": price_basis_label,
"unit_label": external_row.get("unit_label") or "",
"momo_unit_price": round(momo_unit_price, 4) if momo_unit_price else None,
"pchome_unit_price": round(pchome_unit_price, 4) if pchome_unit_price else None,
"momo_total_quantity": external_row.get("momo_total_quantity"),
"pchome_total_quantity": external_row.get("pchome_total_quantity"),
"gap_pct": round(gap_pct, 1) if gap_pct is not None else None,
"match_score": round(_to_float(external_row.get("match_score")), 3),
"tags": tags,
"updated_at": str(external_row.get("crawled_at") or ""),
}
if gap_pct is not None and gap_pct < -5:
action_code = "review_price_or_promo"
action_label = "檢查售價與活動"
action_message = "MOMO 外部參考價比較低,建議檢查 PChome 售價、活動組合或曝光策略。"
elif gap_pct is not None and gap_pct > 5:
action_code = "amplify_price_advantage"
action_label = "放大價格優勢"
action_message = "PChome 目前有價格優勢,適合檢查曝光、文案與活動位置。"
elif sales_delta_pct is not None and sales_delta_pct < -10:
action_code = "recover_sales_momentum"
action_label = "找回銷售動能"
action_message = "價格差距不大,但近 7 天業績轉弱,建議檢查曝光、庫存與商品頁內容。"
else:
action_code = "monitor"
action_label = "持續觀察"
action_message = "業績與外部價格暫無明顯異常,先保留在觀察清單。"
if gap_pct is not None:
basis_prefix = "單位價" if price_basis == "unit_price" else "價格"
if gap_pct > 0:
reason_lines.append(f"PChome {basis_prefix}目前比 MOMO 低約 {abs(gap_pct):.1f}%。")
elif gap_pct < 0:
reason_lines.append(f"MOMO {basis_prefix}目前比 PChome 低約 {abs(gap_pct):.1f}%。")
else:
reason_lines.append(f"PChome 與 MOMO {basis_prefix}幾乎相同。")
else:
if review_candidate:
review_candidate_payload = {
"id": review_candidate.get("id"),
"momo_sku": review_candidate.get("momo_sku"),
"momo_name": review_candidate.get("momo_name"),
"momo_price": review_candidate.get("momo_price"),
"pchome_price": review_candidate.get("pchome_price"),
"quality_score": review_candidate.get("quality_score"),
"gap_pct": review_candidate.get("gap_pct"),
"match_reasons": review_candidate.get("match_reasons") or [],
"product_url": review_candidate.get("product_url"),
}
data_quality_score = max(data_quality_score, 62)
action_code = "review_external_candidate"
action_label = "確認候選"
action_message = "已找到 MOMO 候選,但還要確認同款、色號或組合後才能進價格判斷。"
reason_lines.append("已找到 MOMO 候選,先確認同款、色號或組合。")
else:
data_quality_score -= 12
reason_lines.append("尚未找到可確認的 MOMO 對照商品。")
if sales_delta_pct is None:
reason_lines.append("前 7 天沒有可比基準,先看近 7 天表現。")
elif sales_delta_pct < 0:
reason_lines.append(f"近 7 天業績比前 7 天少約 {abs(sales_delta_pct):.1f}%。")
elif sales_delta_pct > 0:
reason_lines.append(f"近 7 天業績比前 7 天多約 {sales_delta_pct:.1f}%。")
else:
reason_lines.append("近 7 天業績與前 7 天大致持平。")
if sales_7d > 0:
reason_lines.append(f"近 7 天業績約 NT$ {sales_7d:,.0f},銷量 {qty_7d:,.0f}")
mapping_gap_score = 18 if not external_row and max(sales_7d, sales_prev_7d) > 0 else 0
priority_score = min(100, volume_score + qty_score + decline_score + mapping_gap_score + data_quality_score * 0.18)
if external_payload and external_payload.get("gap_pct") is not None:
gap = float(external_payload["gap_pct"])
if gap < -5:
priority_score = min(100, priority_score + min(18, abs(gap) / 20 * 18))
elif gap > 5:
priority_score = min(100, priority_score + min(10, gap / 25 * 10))
issues = []
if not external_row:
issues.append("需要確認 MOMO 候選" if review_candidate_payload else "需要補商品對應")
if sales_delta_pct is None:
issues.append("前期業績不足")
return {
"pchome_product_id": str(sales_row.get("pchome_product_id") or ""),
"product_name": sales_row.get("product_name") or "未命名商品",
"category": sales_row.get("category") or "",
"vendor": sales_row.get("vendor") or "",
"sales_7d": round(sales_7d, 2),
"sales_prev_7d": round(sales_prev_7d, 2),
"sales_delta_pct": round(sales_delta_pct, 1) if sales_delta_pct is not None else None,
"qty_7d": round(qty_7d, 2),
"pchome_price": round(_to_float(sales_row.get("pchome_price")), 2)
if _to_float(sales_row.get("pchome_price")) > 0
else None,
"last_sale_date": str(sales_row.get("last_sale_date") or ""),
"external_price": external_payload,
"review_candidate": review_candidate_payload,
"priority_score": round(priority_score, 1),
"recommended_action": {
"code": action_code,
"label": action_label,
"message": action_message,
},
"reason_lines": reason_lines[:4],
"data_quality": {
"label": (
"資料可用單位價判斷"
if external_payload and external_payload.get("price_basis") == "unit_price"
else "資料可直接判斷"
if external_row
else "候選待確認"
if review_candidate_payload
else "需要補資料"
),
"score": round(max(0, min(100, data_quality_score)), 1),
"issues": issues,
},
}
def build_pchome_growth_opportunities(engine, limit: int = 20) -> dict[str, Any]:
"""讀取 PChome 業績與已驗證外部價格,產生營運用作戰清單。"""
limit = max(5, min(int(limit or 20), 50))
generated_at = datetime.now().isoformat(timespec="seconds")
source_readiness = build_external_source_readiness(engine)
review_candidate_count = int(source_readiness.get("review_offer_count") or 0)
source_scope = {
"primary_goal": "提升 PChome 業績",
"primary_sales_source": PRIMARY_SALES_SOURCE,
"active_external_sources": list(ACTIVE_EXTERNAL_SOURCES),
"paused_external_sources": list(PAUSED_EXTERNAL_SOURCES),
"plain_note": "蝦皮與酷澎先暫停,不進作戰清單,也不發告警。",
"source_readiness": source_readiness,
}
if not _table_exists(engine, "daily_sales_snapshot"):
return {
"success": True,
"system_name": SYSTEM_DISPLAY_NAME,
"generated_at": generated_at,
"source_scope": source_scope,
"stats": {
"latest_sales_date": None,
"candidate_count": 0,
"mapped_count": 0,
"mapping_rate": 0,
"needs_mapping_count": 0,
"review_candidate_count": review_candidate_count,
},
"opportunities": [],
"message": "目前還沒有 PChome 業績資料,請先完成業績匯入。",
}
with engine.connect() as conn:
sales_summary = _fetch_sales_summary(conn)
try:
sales_rows, latest_sales_date = _fetch_sales_rows(conn, limit=limit)
except RuntimeError as exc:
return {
"success": True,
"system_name": SYSTEM_DISPLAY_NAME,
"generated_at": generated_at,
"source_scope": source_scope,
"stats": {
"latest_sales_date": sales_summary.get("overall_latest_sales_date"),
"candidate_count": 0,
"mapped_count": 0,
"mapping_rate": 0,
"needs_mapping_count": 0,
"review_candidate_count": review_candidate_count,
"total_sales_7d": 0,
"opportunity_sales_7d": 0,
"action_counts": {},
"action_code_counts": {},
"external_data_source_counts": {},
**sales_summary,
},
"opportunities": [],
"message": str(exc),
}
sales_ids = [str(row.get("pchome_product_id") or "") for row in sales_rows]
external_map = _fetch_external_price_map(conn, sales_ids)
review_candidate_map = _fetch_review_candidate_map(conn, sales_ids)
opportunities = []
for row in sales_rows:
key = str(row.get("pchome_product_id") or "").strip()
opportunities.append(_score_opportunity(row, external_map.get(key), review_candidate_map.get(key)))
opportunities.sort(key=lambda item: item["priority_score"], reverse=True)
opportunities = opportunities[:limit]
needs_mapping_count = sum(1 for item in opportunities if not item.get("external_price"))
mapped_count = len(opportunities) - needs_mapping_count
mapping_rate = round(mapped_count / max(len(opportunities), 1) * 100, 1)
action_counts: dict[str, int] = {}
action_code_counts: dict[str, int] = {}
external_data_source_counts: dict[str, int] = {}
for item in opportunities:
label = item["recommended_action"]["label"]
code = item["recommended_action"]["code"]
action_counts[label] = action_counts.get(label, 0) + 1
action_code_counts[code] = action_code_counts.get(code, 0) + 1
external_price = item.get("external_price") or {}
data_source_label = external_price.get("data_source_label")
if data_source_label:
external_data_source_counts[data_source_label] = (
external_data_source_counts.get(data_source_label, 0) + 1
)
return {
"success": True,
"system_name": SYSTEM_DISPLAY_NAME,
"generated_at": generated_at,
"source_scope": source_scope,
"stats": {
"latest_sales_date": latest_sales_date or sales_summary.get("overall_latest_sales_date"),
"candidate_count": len(opportunities),
"mapped_count": mapped_count,
"mapping_rate": mapping_rate,
"needs_mapping_count": needs_mapping_count,
"review_candidate_count": review_candidate_count,
"total_sales_7d": round(sum(_to_float(item.get("sales_7d")) for item in opportunities), 2),
"opportunity_sales_7d": round(sum(_to_float(item.get("sales_7d")) for item in opportunities), 2),
"action_counts": action_counts,
"action_code_counts": action_code_counts,
"external_data_source_counts": external_data_source_counts,
**sales_summary,
},
"opportunities": opportunities,
"message": "已整理今日 PChome 業績成長作戰清單。",
}