V10.606 正名 PChome 業績成長作戰系統
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
This commit is contained in:
@@ -14,6 +14,7 @@ AI 建議挑品 Agent
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List
|
||||
@@ -107,6 +108,16 @@ def _identity_match_condition(conn, alias: str = "cp") -> str:
|
||||
return f"AND COALESCE({alias}.tags, '') LIKE '%identity_v2%'"
|
||||
|
||||
|
||||
def _sales_join_by_momo_sku_enabled() -> bool:
|
||||
"""舊 MOMO SKU 直連銷售表的路徑預設關閉,避免把 PChome 業績 ID 誤當 MOMO SKU。"""
|
||||
return os.getenv("PCHOME_SALES_JOIN_BY_MOMO_SKU_ENABLED", "false").strip().lower() in {
|
||||
"1",
|
||||
"true",
|
||||
"yes",
|
||||
"on",
|
||||
}
|
||||
|
||||
|
||||
def _fetch_candidates_without_sales(conn, limit: int) -> List[Dict[str, Any]]:
|
||||
"""DB-portable fallback query used when sales-aware PostgreSQL SQL is unavailable."""
|
||||
from sqlalchemy import text
|
||||
@@ -172,7 +183,7 @@ def _fetch_candidates(conn, limit: int) -> List[Dict[str, Any]]:
|
||||
sales_join = ""
|
||||
sales_select = "0 AS sales_7d, 0 AS sales_prev_7d, 0 AS qty_7d, 0 AS profit_7d, 0 AS cost_7d"
|
||||
sales_cols = {}
|
||||
if _has_daily_sales_snapshot(conn):
|
||||
if _sales_join_by_momo_sku_enabled() and _has_daily_sales_snapshot(conn):
|
||||
try:
|
||||
sales_cols = _daily_sales_columns(conn)
|
||||
except Exception as exc:
|
||||
|
||||
468
services/pchome_revenue_growth_service.py
Normal file
468
services/pchome_revenue_growth_service.py
Normal file
@@ -0,0 +1,468 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""PChome 業績成長自動化作戰系統的只讀作戰清單。"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import bindparam, inspect, text
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
SYSTEM_DISPLAY_NAME = "PChome 業績成長自動化作戰系統"
|
||||
PRIMARY_SALES_SOURCE = "PChome 後台業績"
|
||||
ACTIVE_EXTERNAL_SOURCES = ("MOMO 外部價格參考",)
|
||||
PAUSED_EXTERNAL_SOURCES = ("蝦皮", "酷澎")
|
||||
|
||||
|
||||
def _to_float(value: Any, default: float = 0.0) -> float:
|
||||
if value is None:
|
||||
return default
|
||||
try:
|
||||
return float(value)
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
|
||||
def _quote_identifier(identifier: str) -> str:
|
||||
return '"' + identifier.replace('"', '""') + '"'
|
||||
|
||||
|
||||
def _first_available(columns: set[str], candidates: list[str]) -> str | None:
|
||||
return next((col for col in candidates if col in columns), None)
|
||||
|
||||
|
||||
def _load_json_tags(value: Any) -> list[str]:
|
||||
if not value:
|
||||
return []
|
||||
if isinstance(value, list):
|
||||
return value
|
||||
try:
|
||||
parsed = json.loads(value)
|
||||
return parsed if isinstance(parsed, list) else []
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def _table_exists(engine, table_name: str) -> bool:
|
||||
try:
|
||||
return inspect(engine).has_table(table_name)
|
||||
except Exception:
|
||||
logger.warning("[PChomeGrowth] table probe failed: %s", table_name, exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
def _daily_sales_columns(conn) -> dict[str, str | None]:
|
||||
if conn.dialect.name == "postgresql":
|
||||
rows = conn.execute(text("""
|
||||
SELECT column_name
|
||||
FROM information_schema.columns
|
||||
WHERE table_name = 'daily_sales_snapshot'
|
||||
""")).fetchall()
|
||||
columns = {row[0] for row in rows}
|
||||
elif conn.dialect.name == "sqlite":
|
||||
rows = conn.execute(text("PRAGMA table_info(daily_sales_snapshot)")).fetchall()
|
||||
columns = {row[1] for row in rows}
|
||||
else:
|
||||
result = conn.execute(text("SELECT * FROM daily_sales_snapshot LIMIT 0"))
|
||||
columns = set(result.keys())
|
||||
|
||||
return {
|
||||
"sku": _first_available(columns, ["商品ID", "Product ID", "ID", "Item Code"]),
|
||||
"name": _first_available(columns, ["商品名稱", "商品名", "品名", "Product Name", "Name"]),
|
||||
"date": _first_available(columns, ["snapshot_date", "日期", "訂單日期", "交易日期", "Date"]),
|
||||
"revenue": _first_available(columns, ["總業績", "銷售金額", "業績", "金額", "Amount", "Sales", "Total"]),
|
||||
"qty": _first_available(columns, ["數量", "銷售數量", "銷量", "Qty", "Quantity"]),
|
||||
"category": _first_available(columns, ["商品館", "館別", "分類", "Category"]),
|
||||
"vendor": _first_available(columns, ["廠商名稱", "供應商", "Vendor"]),
|
||||
}
|
||||
|
||||
|
||||
def _as_text_expr(identifier_or_expr: str, dialect_name: str, *, raw: bool = False) -> str:
|
||||
expr = identifier_or_expr if raw else _quote_identifier(identifier_or_expr)
|
||||
if dialect_name == "postgresql":
|
||||
return f"{expr}::text"
|
||||
return f"CAST({expr} AS TEXT)"
|
||||
|
||||
|
||||
def _numeric_expr(identifier: str, dialect_name: str) -> str:
|
||||
quoted = _quote_identifier(identifier)
|
||||
if dialect_name == "postgresql":
|
||||
return f"COALESCE(NULLIF(regexp_replace({quoted}::text, '[^0-9.-]', '', 'g'), '')::numeric, 0)"
|
||||
return f"COALESCE(CAST(REPLACE(CAST({quoted} AS TEXT), ',', '') AS REAL), 0)"
|
||||
|
||||
|
||||
def _fetch_sales_rows(conn, limit: int) -> tuple[list[dict[str, Any]], str | None]:
|
||||
cols = _daily_sales_columns(conn)
|
||||
missing = [key for key in ["sku", "name", "date", "revenue"] if not cols.get(key)]
|
||||
if missing:
|
||||
raise RuntimeError("PChome 業績檔缺少必要欄位:" + "、".join(missing))
|
||||
|
||||
dialect = conn.dialect.name
|
||||
sku_col = _quote_identifier(cols["sku"])
|
||||
name_col = _quote_identifier(cols["name"])
|
||||
date_col = _quote_identifier(cols["date"])
|
||||
revenue_expr = _numeric_expr(cols["revenue"], dialect)
|
||||
qty_expr = _numeric_expr(cols["qty"], dialect) if cols.get("qty") else "0"
|
||||
category_text = _as_text_expr(cols["category"], dialect) if cols.get("category") else "NULL"
|
||||
vendor_text = _as_text_expr(cols["vendor"], dialect) if cols.get("vendor") else "NULL"
|
||||
sku_text = _as_text_expr(cols["sku"], dialect)
|
||||
name_text = _as_text_expr(cols["name"], dialect)
|
||||
candidate_limit = max(limit * 4, 80)
|
||||
|
||||
if dialect == "postgresql":
|
||||
sale_date_expr = f"NULLIF({date_col}::text, '')::date"
|
||||
curr_window = "lw.latest_date - INTERVAL '6 days'"
|
||||
prev_window_start = "lw.latest_date - INTERVAL '13 days'"
|
||||
prev_window_end = "lw.latest_date - INTERVAL '6 days'"
|
||||
else:
|
||||
sale_date_expr = f"date({date_col})"
|
||||
curr_window = "date(lw.latest_date, '-6 days')"
|
||||
prev_window_start = "date(lw.latest_date, '-13 days')"
|
||||
prev_window_end = "date(lw.latest_date, '-6 days')"
|
||||
|
||||
order_metric = "CASE WHEN sales_7d >= sales_prev_7d THEN sales_7d ELSE sales_prev_7d END"
|
||||
rows = conn.execute(text(f"""
|
||||
WITH sales_rows AS (
|
||||
SELECT
|
||||
NULLIF(TRIM({sku_text}), '') AS pchome_product_id,
|
||||
NULLIF(TRIM({name_text}), '') AS product_name,
|
||||
NULLIF(TRIM({_as_text_expr(category_text, dialect, raw=True)}), '') AS category,
|
||||
NULLIF(TRIM({_as_text_expr(vendor_text, dialect, raw=True)}), '') AS vendor,
|
||||
{sale_date_expr} AS sale_date,
|
||||
{revenue_expr} AS revenue,
|
||||
{qty_expr} AS qty
|
||||
FROM daily_sales_snapshot
|
||||
WHERE {sku_col} IS NOT NULL
|
||||
),
|
||||
latest_window AS (
|
||||
SELECT MAX(sale_date) AS latest_date
|
||||
FROM sales_rows
|
||||
WHERE sale_date IS NOT NULL
|
||||
),
|
||||
sales AS (
|
||||
SELECT
|
||||
sr.pchome_product_id,
|
||||
MAX(sr.product_name) AS product_name,
|
||||
MAX(sr.category) AS category,
|
||||
MAX(sr.vendor) AS vendor,
|
||||
SUM(CASE WHEN sr.sale_date >= {curr_window}
|
||||
THEN sr.revenue ELSE 0 END) AS sales_7d,
|
||||
SUM(CASE WHEN sr.sale_date >= {prev_window_start}
|
||||
AND sr.sale_date < {prev_window_end}
|
||||
THEN sr.revenue ELSE 0 END) AS sales_prev_7d,
|
||||
SUM(CASE WHEN sr.sale_date >= {curr_window}
|
||||
THEN sr.qty ELSE 0 END) AS qty_7d,
|
||||
MAX(sr.sale_date) AS last_sale_date,
|
||||
MAX(lw.latest_date) AS latest_sales_date
|
||||
FROM sales_rows sr
|
||||
CROSS JOIN latest_window lw
|
||||
WHERE sr.pchome_product_id IS NOT NULL
|
||||
GROUP BY sr.pchome_product_id
|
||||
)
|
||||
SELECT *
|
||||
FROM sales
|
||||
WHERE sales_7d > 0 OR sales_prev_7d > 0
|
||||
ORDER BY {order_metric} DESC, qty_7d DESC
|
||||
LIMIT :limit
|
||||
"""), {"limit": candidate_limit}).mappings().all()
|
||||
|
||||
mapped_rows = [dict(row) for row in rows]
|
||||
latest_date = None
|
||||
for row in mapped_rows:
|
||||
value = row.get("latest_sales_date")
|
||||
if value:
|
||||
latest_date = value.isoformat() if hasattr(value, "isoformat") else str(value)
|
||||
break
|
||||
return mapped_rows, latest_date
|
||||
|
||||
|
||||
def _fetch_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
|
||||
inspector = inspect(conn)
|
||||
if not all(inspector.has_table(table) for table in {"competitor_prices", "products", "price_records"}):
|
||||
return {}
|
||||
ids = [str(item).strip() for item in pchome_product_ids if str(item or "").strip()]
|
||||
if not ids:
|
||||
return {}
|
||||
|
||||
if conn.dialect.name == "postgresql":
|
||||
sql = """
|
||||
WITH valid_cp AS (
|
||||
SELECT DISTINCT ON (cp.competitor_product_id)
|
||||
cp.competitor_product_id AS pchome_product_id,
|
||||
cp.competitor_product_name AS pchome_public_name,
|
||||
cp.sku AS momo_sku,
|
||||
cp.price AS pchome_price,
|
||||
cp.match_score,
|
||||
cp.tags,
|
||||
cp.crawled_at
|
||||
FROM competitor_prices cp
|
||||
WHERE cp.source = 'pchome'
|
||||
AND cp.competitor_product_id IS NOT NULL
|
||||
AND cp.price IS NOT NULL
|
||||
AND cp.price > 0
|
||||
AND COALESCE(cp.match_score, 0) >= 0.76
|
||||
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
|
||||
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
|
||||
AND cp.competitor_product_id IN :ids
|
||||
ORDER BY cp.competitor_product_id, cp.crawled_at DESC NULLS LAST
|
||||
)
|
||||
SELECT vc.*, lm.momo_name, lm.momo_price, lm.momo_price_at
|
||||
FROM valid_cp vc
|
||||
LEFT JOIN LATERAL (
|
||||
SELECT
|
||||
p.name AS momo_name,
|
||||
pr.price AS momo_price,
|
||||
pr.timestamp AS momo_price_at
|
||||
FROM products p
|
||||
JOIN price_records pr ON pr.product_id = p.id
|
||||
WHERE p.i_code = vc.momo_sku
|
||||
AND p.status = 'ACTIVE'
|
||||
ORDER BY pr.timestamp DESC, pr.id DESC
|
||||
LIMIT 1
|
||||
) lm ON TRUE
|
||||
"""
|
||||
else:
|
||||
sql = """
|
||||
WITH latest_cp AS (
|
||||
SELECT
|
||||
cp.competitor_product_id AS pchome_product_id,
|
||||
cp.competitor_product_name AS pchome_public_name,
|
||||
cp.sku AS momo_sku,
|
||||
cp.price AS pchome_price,
|
||||
cp.match_score,
|
||||
cp.tags,
|
||||
cp.crawled_at,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY cp.competitor_product_id
|
||||
ORDER BY cp.crawled_at DESC
|
||||
) AS rn
|
||||
FROM competitor_prices cp
|
||||
WHERE cp.source = 'pchome'
|
||||
AND cp.competitor_product_id IS NOT NULL
|
||||
AND cp.price IS NOT NULL
|
||||
AND cp.price > 0
|
||||
AND COALESCE(cp.match_score, 0) >= 0.76
|
||||
AND COALESCE(cp.tags, '') LIKE '%identity_v2%'
|
||||
AND cp.competitor_product_id IN :ids
|
||||
),
|
||||
latest_momo AS (
|
||||
SELECT
|
||||
p.i_code AS momo_sku,
|
||||
p.name AS momo_name,
|
||||
pr.price AS momo_price,
|
||||
pr.timestamp AS momo_price_at,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY p.i_code
|
||||
ORDER BY pr.timestamp DESC, pr.id DESC
|
||||
) AS rn
|
||||
FROM products p
|
||||
JOIN price_records pr ON pr.product_id = p.id
|
||||
WHERE p.status = 'ACTIVE'
|
||||
AND p.i_code IN (SELECT momo_sku FROM latest_cp)
|
||||
)
|
||||
SELECT cp.pchome_product_id, cp.pchome_public_name, cp.momo_sku,
|
||||
cp.pchome_price, cp.match_score, cp.tags, cp.crawled_at,
|
||||
lm.momo_name, lm.momo_price, lm.momo_price_at
|
||||
FROM latest_cp cp
|
||||
LEFT JOIN latest_momo lm ON lm.momo_sku = cp.momo_sku AND lm.rn = 1
|
||||
WHERE cp.rn = 1
|
||||
"""
|
||||
|
||||
stmt = text(sql).bindparams(bindparam("ids", expanding=True))
|
||||
rows = conn.execute(stmt, {"ids": ids}).mappings().all()
|
||||
result: dict[str, dict[str, Any]] = {}
|
||||
for row in rows:
|
||||
key = str(row.get("pchome_product_id") or "").strip()
|
||||
if key:
|
||||
result[key] = dict(row)
|
||||
return result
|
||||
|
||||
|
||||
def _score_opportunity(sales_row: dict[str, Any], external_row: dict[str, Any] | None) -> dict[str, Any]:
|
||||
sales_7d = _to_float(sales_row.get("sales_7d"))
|
||||
sales_prev_7d = _to_float(sales_row.get("sales_prev_7d"))
|
||||
qty_7d = _to_float(sales_row.get("qty_7d"))
|
||||
sales_delta_pct = None
|
||||
if sales_prev_7d > 0:
|
||||
sales_delta_pct = (sales_7d - sales_prev_7d) / sales_prev_7d * 100
|
||||
|
||||
volume_score = min(34, max(sales_7d, sales_prev_7d) / 80000 * 34)
|
||||
qty_score = min(10, qty_7d / 25 * 10)
|
||||
decline_score = min(24, abs(sales_delta_pct) / 45 * 24) if sales_delta_pct is not None and sales_delta_pct < 0 else 0
|
||||
data_quality_score = 54
|
||||
external_payload = None
|
||||
action_code = "map_external_product"
|
||||
action_label = "先補商品對應"
|
||||
action_message = "這項商品已有業績訊號,但還沒有可確認的 MOMO 對照商品。先補對應,後續才能判斷價格壓力。"
|
||||
reason_lines = []
|
||||
|
||||
if external_row:
|
||||
pchome_price = _to_float(external_row.get("pchome_price"))
|
||||
momo_price = _to_float(external_row.get("momo_price"))
|
||||
gap_pct = ((momo_price - pchome_price) / pchome_price * 100) if pchome_price else None
|
||||
tags = _load_json_tags(external_row.get("tags"))
|
||||
data_quality_score = 78 + min(12, _to_float(external_row.get("match_score")) * 12)
|
||||
external_payload = {
|
||||
"source": "MOMO",
|
||||
"momo_sku": external_row.get("momo_sku"),
|
||||
"momo_name": external_row.get("momo_name"),
|
||||
"momo_price": round(momo_price, 2) if momo_price else None,
|
||||
"pchome_price": round(pchome_price, 2) if pchome_price else None,
|
||||
"gap_pct": round(gap_pct, 1) if gap_pct is not None else None,
|
||||
"match_score": round(_to_float(external_row.get("match_score")), 3),
|
||||
"tags": tags,
|
||||
"updated_at": str(external_row.get("crawled_at") or ""),
|
||||
}
|
||||
|
||||
if gap_pct is not None and gap_pct < -5:
|
||||
action_code = "review_price_or_promo"
|
||||
action_label = "檢查售價與活動"
|
||||
action_message = "MOMO 外部參考價比較低,建議檢查 PChome 售價、活動組合或曝光策略。"
|
||||
elif gap_pct is not None and gap_pct > 5:
|
||||
action_code = "amplify_price_advantage"
|
||||
action_label = "放大價格優勢"
|
||||
action_message = "PChome 目前有價格優勢,適合檢查曝光、文案與活動位置。"
|
||||
elif sales_delta_pct is not None and sales_delta_pct < -10:
|
||||
action_code = "recover_sales_momentum"
|
||||
action_label = "找回銷售動能"
|
||||
action_message = "價格差距不大,但近 7 天業績轉弱,建議檢查曝光、庫存與商品頁內容。"
|
||||
else:
|
||||
action_code = "monitor"
|
||||
action_label = "持續觀察"
|
||||
action_message = "業績與外部價格暫無明顯異常,先保留在觀察清單。"
|
||||
|
||||
if gap_pct is not None:
|
||||
if gap_pct > 0:
|
||||
reason_lines.append(f"PChome 目前比 MOMO 低約 {abs(gap_pct):.1f}%。")
|
||||
elif gap_pct < 0:
|
||||
reason_lines.append(f"MOMO 目前比 PChome 低約 {abs(gap_pct):.1f}%。")
|
||||
else:
|
||||
reason_lines.append("PChome 與 MOMO 價格幾乎相同。")
|
||||
else:
|
||||
data_quality_score -= 12
|
||||
reason_lines.append("尚未找到可確認的 MOMO 對照商品。")
|
||||
|
||||
if sales_delta_pct is None:
|
||||
reason_lines.append("前 7 天沒有可比基準,先看近 7 天表現。")
|
||||
elif sales_delta_pct < 0:
|
||||
reason_lines.append(f"近 7 天業績比前 7 天少約 {abs(sales_delta_pct):.1f}%。")
|
||||
elif sales_delta_pct > 0:
|
||||
reason_lines.append(f"近 7 天業績比前 7 天多約 {sales_delta_pct:.1f}%。")
|
||||
else:
|
||||
reason_lines.append("近 7 天業績與前 7 天大致持平。")
|
||||
|
||||
if sales_7d > 0:
|
||||
reason_lines.append(f"近 7 天業績約 NT$ {sales_7d:,.0f},銷量 {qty_7d:,.0f}。")
|
||||
|
||||
mapping_gap_score = 18 if not external_row and max(sales_7d, sales_prev_7d) > 0 else 0
|
||||
priority_score = min(100, volume_score + qty_score + decline_score + mapping_gap_score + data_quality_score * 0.18)
|
||||
if external_payload and external_payload.get("gap_pct") is not None:
|
||||
gap = float(external_payload["gap_pct"])
|
||||
if gap < -5:
|
||||
priority_score = min(100, priority_score + min(18, abs(gap) / 20 * 18))
|
||||
elif gap > 5:
|
||||
priority_score = min(100, priority_score + min(10, gap / 25 * 10))
|
||||
|
||||
issues = []
|
||||
if not external_row:
|
||||
issues.append("需要補商品對應")
|
||||
if sales_delta_pct is None:
|
||||
issues.append("前期業績不足")
|
||||
|
||||
return {
|
||||
"pchome_product_id": str(sales_row.get("pchome_product_id") or ""),
|
||||
"product_name": sales_row.get("product_name") or "未命名商品",
|
||||
"category": sales_row.get("category") or "",
|
||||
"vendor": sales_row.get("vendor") or "",
|
||||
"sales_7d": round(sales_7d, 2),
|
||||
"sales_prev_7d": round(sales_prev_7d, 2),
|
||||
"sales_delta_pct": round(sales_delta_pct, 1) if sales_delta_pct is not None else None,
|
||||
"qty_7d": round(qty_7d, 2),
|
||||
"last_sale_date": str(sales_row.get("last_sale_date") or ""),
|
||||
"external_price": external_payload,
|
||||
"priority_score": round(priority_score, 1),
|
||||
"recommended_action": {
|
||||
"code": action_code,
|
||||
"label": action_label,
|
||||
"message": action_message,
|
||||
},
|
||||
"reason_lines": reason_lines[:4],
|
||||
"data_quality": {
|
||||
"label": "資料可直接判斷" if external_row else "需要補資料",
|
||||
"score": round(max(0, min(100, data_quality_score)), 1),
|
||||
"issues": issues,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def build_pchome_growth_opportunities(engine, limit: int = 20) -> dict[str, Any]:
|
||||
"""讀取 PChome 業績與已驗證外部價格,產生營運用作戰清單。"""
|
||||
limit = max(5, min(int(limit or 20), 50))
|
||||
generated_at = datetime.now().isoformat(timespec="seconds")
|
||||
source_scope = {
|
||||
"primary_goal": "提升 PChome 業績",
|
||||
"primary_sales_source": PRIMARY_SALES_SOURCE,
|
||||
"active_external_sources": list(ACTIVE_EXTERNAL_SOURCES),
|
||||
"paused_external_sources": list(PAUSED_EXTERNAL_SOURCES),
|
||||
"plain_note": "蝦皮與酷澎先暫停,不進作戰清單,也不發告警。",
|
||||
}
|
||||
|
||||
if not _table_exists(engine, "daily_sales_snapshot"):
|
||||
return {
|
||||
"success": True,
|
||||
"system_name": SYSTEM_DISPLAY_NAME,
|
||||
"generated_at": generated_at,
|
||||
"source_scope": source_scope,
|
||||
"stats": {
|
||||
"latest_sales_date": None,
|
||||
"candidate_count": 0,
|
||||
"mapped_count": 0,
|
||||
"mapping_rate": 0,
|
||||
"needs_mapping_count": 0,
|
||||
},
|
||||
"opportunities": [],
|
||||
"message": "目前還沒有 PChome 業績資料,請先完成業績匯入。",
|
||||
}
|
||||
|
||||
with engine.connect() as conn:
|
||||
sales_rows, latest_sales_date = _fetch_sales_rows(conn, limit=limit)
|
||||
sales_ids = [str(row.get("pchome_product_id") or "") for row in sales_rows]
|
||||
external_map = _fetch_external_price_map(conn, sales_ids)
|
||||
|
||||
opportunities = []
|
||||
for row in sales_rows:
|
||||
key = str(row.get("pchome_product_id") or "").strip()
|
||||
opportunities.append(_score_opportunity(row, external_map.get(key)))
|
||||
|
||||
opportunities.sort(key=lambda item: item["priority_score"], reverse=True)
|
||||
opportunities = opportunities[:limit]
|
||||
needs_mapping_count = sum(1 for item in opportunities if not item.get("external_price"))
|
||||
mapped_count = len(opportunities) - needs_mapping_count
|
||||
mapping_rate = round(mapped_count / max(len(opportunities), 1) * 100, 1)
|
||||
action_counts: dict[str, int] = {}
|
||||
for item in opportunities:
|
||||
label = item["recommended_action"]["label"]
|
||||
action_counts[label] = action_counts.get(label, 0) + 1
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"system_name": SYSTEM_DISPLAY_NAME,
|
||||
"generated_at": generated_at,
|
||||
"source_scope": source_scope,
|
||||
"stats": {
|
||||
"latest_sales_date": latest_sales_date,
|
||||
"candidate_count": len(opportunities),
|
||||
"mapped_count": mapped_count,
|
||||
"mapping_rate": mapping_rate,
|
||||
"needs_mapping_count": needs_mapping_count,
|
||||
"total_sales_7d": round(sum(_to_float(item.get("sales_7d")) for item in opportunities), 2),
|
||||
"action_counts": action_counts,
|
||||
},
|
||||
"opportunities": opportunities,
|
||||
"message": "已整理今日 PChome 業績成長作戰清單。",
|
||||
}
|
||||
Reference in New Issue
Block a user