Files
ewoooc/services/pchome_revenue_growth_service.py
OoO 9260cc1740
All checks were successful
CD Pipeline / deploy (push) Successful in 1m4s
V10.607 建立外部市場來源正規化層
2026-06-15 16:19:03 +08:00

473 lines
20 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""PChome 業績成長自動化作戰系統的只讀作戰清單。"""
from __future__ import annotations
import json
import logging
from datetime import datetime
from typing import Any
from sqlalchemy import bindparam, inspect, text
from services.external_market_offer_service import build_external_source_readiness
logger = logging.getLogger(__name__)
SYSTEM_DISPLAY_NAME = "PChome 業績成長自動化作戰系統"
PRIMARY_SALES_SOURCE = "PChome 後台業績"
ACTIVE_EXTERNAL_SOURCES = ("MOMO 外部價格參考",)
PAUSED_EXTERNAL_SOURCES = ("蝦皮", "酷澎")
def _to_float(value: Any, default: float = 0.0) -> float:
if value is None:
return default
try:
return float(value)
except (TypeError, ValueError):
return default
def _quote_identifier(identifier: str) -> str:
return '"' + identifier.replace('"', '""') + '"'
def _first_available(columns: set[str], candidates: list[str]) -> str | None:
return next((col for col in candidates if col in columns), None)
def _load_json_tags(value: Any) -> list[str]:
if not value:
return []
if isinstance(value, list):
return value
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, list) else []
except Exception:
return []
def _table_exists(engine, table_name: str) -> bool:
try:
return inspect(engine).has_table(table_name)
except Exception:
logger.warning("[PChomeGrowth] table probe failed: %s", table_name, exc_info=True)
return False
def _daily_sales_columns(conn) -> dict[str, str | None]:
if conn.dialect.name == "postgresql":
rows = conn.execute(text("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'daily_sales_snapshot'
""")).fetchall()
columns = {row[0] for row in rows}
elif conn.dialect.name == "sqlite":
rows = conn.execute(text("PRAGMA table_info(daily_sales_snapshot)")).fetchall()
columns = {row[1] for row in rows}
else:
result = conn.execute(text("SELECT * FROM daily_sales_snapshot LIMIT 0"))
columns = set(result.keys())
return {
"sku": _first_available(columns, ["商品ID", "Product ID", "ID", "Item Code"]),
"name": _first_available(columns, ["商品名稱", "商品名", "品名", "Product Name", "Name"]),
"date": _first_available(columns, ["snapshot_date", "日期", "訂單日期", "交易日期", "Date"]),
"revenue": _first_available(columns, ["總業績", "銷售金額", "業績", "金額", "Amount", "Sales", "Total"]),
"qty": _first_available(columns, ["數量", "銷售數量", "銷量", "Qty", "Quantity"]),
"category": _first_available(columns, ["商品館", "館別", "分類", "Category"]),
"vendor": _first_available(columns, ["廠商名稱", "供應商", "Vendor"]),
}
def _as_text_expr(identifier_or_expr: str, dialect_name: str, *, raw: bool = False) -> str:
expr = identifier_or_expr if raw else _quote_identifier(identifier_or_expr)
if dialect_name == "postgresql":
return f"{expr}::text"
return f"CAST({expr} AS TEXT)"
def _numeric_expr(identifier: str, dialect_name: str) -> str:
quoted = _quote_identifier(identifier)
if dialect_name == "postgresql":
return f"COALESCE(NULLIF(regexp_replace({quoted}::text, '[^0-9.-]', '', 'g'), '')::numeric, 0)"
return f"COALESCE(CAST(REPLACE(CAST({quoted} AS TEXT), ',', '') AS REAL), 0)"
def _fetch_sales_rows(conn, limit: int) -> tuple[list[dict[str, Any]], str | None]:
cols = _daily_sales_columns(conn)
missing = [key for key in ["sku", "name", "date", "revenue"] if not cols.get(key)]
if missing:
raise RuntimeError("PChome 業績檔缺少必要欄位:" + "".join(missing))
dialect = conn.dialect.name
sku_col = _quote_identifier(cols["sku"])
name_col = _quote_identifier(cols["name"])
date_col = _quote_identifier(cols["date"])
revenue_expr = _numeric_expr(cols["revenue"], dialect)
qty_expr = _numeric_expr(cols["qty"], dialect) if cols.get("qty") else "0"
category_text = _as_text_expr(cols["category"], dialect) if cols.get("category") else "NULL"
vendor_text = _as_text_expr(cols["vendor"], dialect) if cols.get("vendor") else "NULL"
sku_text = _as_text_expr(cols["sku"], dialect)
name_text = _as_text_expr(cols["name"], dialect)
candidate_limit = max(limit * 4, 80)
if dialect == "postgresql":
sale_date_expr = f"NULLIF({date_col}::text, '')::date"
curr_window = "lw.latest_date - INTERVAL '6 days'"
prev_window_start = "lw.latest_date - INTERVAL '13 days'"
prev_window_end = "lw.latest_date - INTERVAL '6 days'"
else:
sale_date_expr = f"date({date_col})"
curr_window = "date(lw.latest_date, '-6 days')"
prev_window_start = "date(lw.latest_date, '-13 days')"
prev_window_end = "date(lw.latest_date, '-6 days')"
order_metric = "CASE WHEN sales_7d >= sales_prev_7d THEN sales_7d ELSE sales_prev_7d END"
rows = conn.execute(text(f"""
WITH sales_rows AS (
SELECT
NULLIF(TRIM({sku_text}), '') AS pchome_product_id,
NULLIF(TRIM({name_text}), '') AS product_name,
NULLIF(TRIM({_as_text_expr(category_text, dialect, raw=True)}), '') AS category,
NULLIF(TRIM({_as_text_expr(vendor_text, dialect, raw=True)}), '') AS vendor,
{sale_date_expr} AS sale_date,
{revenue_expr} AS revenue,
{qty_expr} AS qty
FROM daily_sales_snapshot
WHERE {sku_col} IS NOT NULL
),
latest_window AS (
SELECT MAX(sale_date) AS latest_date
FROM sales_rows
WHERE sale_date IS NOT NULL
),
sales AS (
SELECT
sr.pchome_product_id,
MAX(sr.product_name) AS product_name,
MAX(sr.category) AS category,
MAX(sr.vendor) AS vendor,
SUM(CASE WHEN sr.sale_date >= {curr_window}
THEN sr.revenue ELSE 0 END) AS sales_7d,
SUM(CASE WHEN sr.sale_date >= {prev_window_start}
AND sr.sale_date < {prev_window_end}
THEN sr.revenue ELSE 0 END) AS sales_prev_7d,
SUM(CASE WHEN sr.sale_date >= {curr_window}
THEN sr.qty ELSE 0 END) AS qty_7d,
MAX(sr.sale_date) AS last_sale_date,
MAX(lw.latest_date) AS latest_sales_date
FROM sales_rows sr
CROSS JOIN latest_window lw
WHERE sr.pchome_product_id IS NOT NULL
GROUP BY sr.pchome_product_id
)
SELECT *
FROM sales
WHERE sales_7d > 0 OR sales_prev_7d > 0
ORDER BY {order_metric} DESC, qty_7d DESC
LIMIT :limit
"""), {"limit": candidate_limit}).mappings().all()
mapped_rows = [dict(row) for row in rows]
latest_date = None
for row in mapped_rows:
value = row.get("latest_sales_date")
if value:
latest_date = value.isoformat() if hasattr(value, "isoformat") else str(value)
break
return mapped_rows, latest_date
def _fetch_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
inspector = inspect(conn)
if not all(inspector.has_table(table) for table in {"competitor_prices", "products", "price_records"}):
return {}
ids = [str(item).strip() for item in pchome_product_ids if str(item or "").strip()]
if not ids:
return {}
if conn.dialect.name == "postgresql":
sql = """
WITH valid_cp AS (
SELECT DISTINCT ON (cp.competitor_product_id)
cp.competitor_product_id AS pchome_product_id,
cp.competitor_product_name AS pchome_public_name,
cp.sku AS momo_sku,
cp.price AS pchome_price,
cp.match_score,
cp.tags,
cp.crawled_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND cp.competitor_product_id IS NOT NULL
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= 0.76
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
AND cp.competitor_product_id IN :ids
ORDER BY cp.competitor_product_id, cp.crawled_at DESC NULLS LAST
)
SELECT vc.*, lm.momo_name, lm.momo_price, lm.momo_price_at
FROM valid_cp vc
LEFT JOIN LATERAL (
SELECT
p.name AS momo_name,
pr.price AS momo_price,
pr.timestamp AS momo_price_at
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.i_code = vc.momo_sku
AND p.status = 'ACTIVE'
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) lm ON TRUE
"""
else:
sql = """
WITH latest_cp AS (
SELECT
cp.competitor_product_id AS pchome_product_id,
cp.competitor_product_name AS pchome_public_name,
cp.sku AS momo_sku,
cp.price AS pchome_price,
cp.match_score,
cp.tags,
cp.crawled_at,
ROW_NUMBER() OVER (
PARTITION BY cp.competitor_product_id
ORDER BY cp.crawled_at DESC
) AS rn
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND cp.competitor_product_id IS NOT NULL
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= 0.76
AND COALESCE(cp.tags, '') LIKE '%identity_v2%'
AND cp.competitor_product_id IN :ids
),
latest_momo AS (
SELECT
p.i_code AS momo_sku,
p.name AS momo_name,
pr.price AS momo_price,
pr.timestamp AS momo_price_at,
ROW_NUMBER() OVER (
PARTITION BY p.i_code
ORDER BY pr.timestamp DESC, pr.id DESC
) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
AND p.i_code IN (SELECT momo_sku FROM latest_cp)
)
SELECT cp.pchome_product_id, cp.pchome_public_name, cp.momo_sku,
cp.pchome_price, cp.match_score, cp.tags, cp.crawled_at,
lm.momo_name, lm.momo_price, lm.momo_price_at
FROM latest_cp cp
LEFT JOIN latest_momo lm ON lm.momo_sku = cp.momo_sku AND lm.rn = 1
WHERE cp.rn = 1
"""
stmt = text(sql).bindparams(bindparam("ids", expanding=True))
rows = conn.execute(stmt, {"ids": ids}).mappings().all()
result: dict[str, dict[str, Any]] = {}
for row in rows:
key = str(row.get("pchome_product_id") or "").strip()
if key:
result[key] = dict(row)
return result
def _score_opportunity(sales_row: dict[str, Any], external_row: dict[str, Any] | None) -> dict[str, Any]:
sales_7d = _to_float(sales_row.get("sales_7d"))
sales_prev_7d = _to_float(sales_row.get("sales_prev_7d"))
qty_7d = _to_float(sales_row.get("qty_7d"))
sales_delta_pct = None
if sales_prev_7d > 0:
sales_delta_pct = (sales_7d - sales_prev_7d) / sales_prev_7d * 100
volume_score = min(34, max(sales_7d, sales_prev_7d) / 80000 * 34)
qty_score = min(10, qty_7d / 25 * 10)
decline_score = min(24, abs(sales_delta_pct) / 45 * 24) if sales_delta_pct is not None and sales_delta_pct < 0 else 0
data_quality_score = 54
external_payload = None
action_code = "map_external_product"
action_label = "先補商品對應"
action_message = "這項商品已有業績訊號,但還沒有可確認的 MOMO 對照商品。先補對應,後續才能判斷價格壓力。"
reason_lines = []
if external_row:
pchome_price = _to_float(external_row.get("pchome_price"))
momo_price = _to_float(external_row.get("momo_price"))
gap_pct = ((momo_price - pchome_price) / pchome_price * 100) if pchome_price else None
tags = _load_json_tags(external_row.get("tags"))
data_quality_score = 78 + min(12, _to_float(external_row.get("match_score")) * 12)
external_payload = {
"source": "MOMO",
"momo_sku": external_row.get("momo_sku"),
"momo_name": external_row.get("momo_name"),
"momo_price": round(momo_price, 2) if momo_price else None,
"pchome_price": round(pchome_price, 2) if pchome_price else None,
"gap_pct": round(gap_pct, 1) if gap_pct is not None else None,
"match_score": round(_to_float(external_row.get("match_score")), 3),
"tags": tags,
"updated_at": str(external_row.get("crawled_at") or ""),
}
if gap_pct is not None and gap_pct < -5:
action_code = "review_price_or_promo"
action_label = "檢查售價與活動"
action_message = "MOMO 外部參考價比較低,建議檢查 PChome 售價、活動組合或曝光策略。"
elif gap_pct is not None and gap_pct > 5:
action_code = "amplify_price_advantage"
action_label = "放大價格優勢"
action_message = "PChome 目前有價格優勢,適合檢查曝光、文案與活動位置。"
elif sales_delta_pct is not None and sales_delta_pct < -10:
action_code = "recover_sales_momentum"
action_label = "找回銷售動能"
action_message = "價格差距不大,但近 7 天業績轉弱,建議檢查曝光、庫存與商品頁內容。"
else:
action_code = "monitor"
action_label = "持續觀察"
action_message = "業績與外部價格暫無明顯異常,先保留在觀察清單。"
if gap_pct is not None:
if gap_pct > 0:
reason_lines.append(f"PChome 目前比 MOMO 低約 {abs(gap_pct):.1f}%。")
elif gap_pct < 0:
reason_lines.append(f"MOMO 目前比 PChome 低約 {abs(gap_pct):.1f}%。")
else:
reason_lines.append("PChome 與 MOMO 價格幾乎相同。")
else:
data_quality_score -= 12
reason_lines.append("尚未找到可確認的 MOMO 對照商品。")
if sales_delta_pct is None:
reason_lines.append("前 7 天沒有可比基準,先看近 7 天表現。")
elif sales_delta_pct < 0:
reason_lines.append(f"近 7 天業績比前 7 天少約 {abs(sales_delta_pct):.1f}%。")
elif sales_delta_pct > 0:
reason_lines.append(f"近 7 天業績比前 7 天多約 {sales_delta_pct:.1f}%。")
else:
reason_lines.append("近 7 天業績與前 7 天大致持平。")
if sales_7d > 0:
reason_lines.append(f"近 7 天業績約 NT$ {sales_7d:,.0f},銷量 {qty_7d:,.0f}")
mapping_gap_score = 18 if not external_row and max(sales_7d, sales_prev_7d) > 0 else 0
priority_score = min(100, volume_score + qty_score + decline_score + mapping_gap_score + data_quality_score * 0.18)
if external_payload and external_payload.get("gap_pct") is not None:
gap = float(external_payload["gap_pct"])
if gap < -5:
priority_score = min(100, priority_score + min(18, abs(gap) / 20 * 18))
elif gap > 5:
priority_score = min(100, priority_score + min(10, gap / 25 * 10))
issues = []
if not external_row:
issues.append("需要補商品對應")
if sales_delta_pct is None:
issues.append("前期業績不足")
return {
"pchome_product_id": str(sales_row.get("pchome_product_id") or ""),
"product_name": sales_row.get("product_name") or "未命名商品",
"category": sales_row.get("category") or "",
"vendor": sales_row.get("vendor") or "",
"sales_7d": round(sales_7d, 2),
"sales_prev_7d": round(sales_prev_7d, 2),
"sales_delta_pct": round(sales_delta_pct, 1) if sales_delta_pct is not None else None,
"qty_7d": round(qty_7d, 2),
"last_sale_date": str(sales_row.get("last_sale_date") or ""),
"external_price": external_payload,
"priority_score": round(priority_score, 1),
"recommended_action": {
"code": action_code,
"label": action_label,
"message": action_message,
},
"reason_lines": reason_lines[:4],
"data_quality": {
"label": "資料可直接判斷" if external_row else "需要補資料",
"score": round(max(0, min(100, data_quality_score)), 1),
"issues": issues,
},
}
def build_pchome_growth_opportunities(engine, limit: int = 20) -> dict[str, Any]:
"""讀取 PChome 業績與已驗證外部價格,產生營運用作戰清單。"""
limit = max(5, min(int(limit or 20), 50))
generated_at = datetime.now().isoformat(timespec="seconds")
source_readiness = build_external_source_readiness(engine)
source_scope = {
"primary_goal": "提升 PChome 業績",
"primary_sales_source": PRIMARY_SALES_SOURCE,
"active_external_sources": list(ACTIVE_EXTERNAL_SOURCES),
"paused_external_sources": list(PAUSED_EXTERNAL_SOURCES),
"plain_note": "蝦皮與酷澎先暫停,不進作戰清單,也不發告警。",
"source_readiness": source_readiness,
}
if not _table_exists(engine, "daily_sales_snapshot"):
return {
"success": True,
"system_name": SYSTEM_DISPLAY_NAME,
"generated_at": generated_at,
"source_scope": source_scope,
"stats": {
"latest_sales_date": None,
"candidate_count": 0,
"mapped_count": 0,
"mapping_rate": 0,
"needs_mapping_count": 0,
},
"opportunities": [],
"message": "目前還沒有 PChome 業績資料,請先完成業績匯入。",
}
with engine.connect() as conn:
sales_rows, latest_sales_date = _fetch_sales_rows(conn, limit=limit)
sales_ids = [str(row.get("pchome_product_id") or "") for row in sales_rows]
external_map = _fetch_external_price_map(conn, sales_ids)
opportunities = []
for row in sales_rows:
key = str(row.get("pchome_product_id") or "").strip()
opportunities.append(_score_opportunity(row, external_map.get(key)))
opportunities.sort(key=lambda item: item["priority_score"], reverse=True)
opportunities = opportunities[:limit]
needs_mapping_count = sum(1 for item in opportunities if not item.get("external_price"))
mapped_count = len(opportunities) - needs_mapping_count
mapping_rate = round(mapped_count / max(len(opportunities), 1) * 100, 1)
action_counts: dict[str, int] = {}
for item in opportunities:
label = item["recommended_action"]["label"]
action_counts[label] = action_counts.get(label, 0) + 1
return {
"success": True,
"system_name": SYSTEM_DISPLAY_NAME,
"generated_at": generated_at,
"source_scope": source_scope,
"stats": {
"latest_sales_date": latest_sales_date,
"candidate_count": len(opportunities),
"mapped_count": mapped_count,
"mapping_rate": mapping_rate,
"needs_mapping_count": needs_mapping_count,
"total_sales_7d": round(sum(_to_float(item.get("sales_7d")) for item in opportunities), 2),
"action_counts": action_counts,
},
"opportunities": opportunities,
"message": "已整理今日 PChome 業績成長作戰清單。",
}