Files
ewoooc/services/ai_product_pick_agent.py
OoO 2ac7410d40
All checks were successful
CD Pipeline / deploy (push) Successful in 2m20s
fix(dashboard): prewarm cache and expose pick evidence
2026-05-01 16:34:13 +08:00

499 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AI 建議挑品 Agent
以真實 DB 資料建立可操作的 PChome 銷售挑品清單:
- MOMO 最新價格
- PChome 最新競品價格與商品 ID
- PChome 歷史快照
- 近 7 天銷售資料(若 daily_sales_snapshot 可用)
此 Agent 不補假資料;資料不足的欄位只降低分數或略過。
"""
import json
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
@dataclass
class ProductPickResult:
candidates: int
written: int
picks: List[Dict[str, Any]]
generated_at: str
def _to_float(value, default=0.0) -> float:
if value is None:
return default
try:
return float(value)
except (TypeError, ValueError):
return default
def _load_json_tags(value) -> List[str]:
if not value:
return []
if isinstance(value, list):
return value
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, list) else []
except Exception:
return []
def _has_daily_sales_snapshot(conn) -> bool:
from sqlalchemy import text
try:
if conn.dialect.name == "postgresql":
row = conn.execute(text("SELECT to_regclass('daily_sales_snapshot') AS table_name")).mappings().first()
return bool(row and row.get("table_name"))
row = conn.execute(text("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='daily_sales_snapshot'
""")).first()
return bool(row)
except Exception:
return False
def _daily_sales_columns(conn) -> Dict[str, str]:
"""依正式匯入表實際欄位挑選可用欄名。"""
from sqlalchemy import text
rows = conn.execute(text("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'daily_sales_snapshot'
""")).fetchall()
columns = {row[0] for row in rows}
def first_available(candidates):
return next((col for col in candidates if col in columns), None)
return {
"sku": first_available(["商品ID", "Product ID", "ID", "i_code", "Item Code"]),
"date": first_available(["snapshot_date", "日期", "訂單日期", "交易日期", "Date"]),
"revenue": first_available(["總業績", "銷售金額", "業績", "金額", "Amount", "Sales", "Total"]),
"qty": first_available(["數量", "銷售數量", "銷量", "Qty", "Quantity"]),
"profit": first_available(["毛利", "Profit", "利潤"]),
"cost": first_available(["總成本", "成本", "Cost", "進價"]),
}
def _quote_identifier(identifier: str) -> str:
return '"' + identifier.replace('"', '""') + '"'
def _fetch_candidates(conn, limit: int) -> List[Dict[str, Any]]:
from sqlalchemy import text
sales_join = ""
sales_select = "0 AS sales_7d, 0 AS sales_prev_7d, 0 AS qty_7d, 0 AS profit_7d, 0 AS cost_7d"
sales_cols = {}
if _has_daily_sales_snapshot(conn):
sales_cols = _daily_sales_columns(conn)
if not all([sales_cols.get("sku"), sales_cols.get("date"), sales_cols.get("revenue"), sales_cols.get("qty")]):
sales_cols = {}
if sales_cols:
sku_col = _quote_identifier(sales_cols["sku"])
date_col = _quote_identifier(sales_cols["date"])
revenue_col = _quote_identifier(sales_cols["revenue"])
qty_col = _quote_identifier(sales_cols["qty"])
profit_col = _quote_identifier(sales_cols["profit"]) if sales_cols.get("profit") else None
cost_col = _quote_identifier(sales_cols["cost"]) if sales_cols.get("cost") else None
profit_expr = f"COALESCE({profit_col}::numeric, 0)" if profit_col else "0"
cost_expr = f"COALESCE({cost_col}::numeric, 0)" if cost_col else "0"
sales_join = """
LEFT JOIN (
SELECT
{sku_col} AS sku,
SUM(CASE WHEN {date_col}::date >= CURRENT_DATE - 7
THEN COALESCE({revenue_col}::numeric, 0) ELSE 0 END) AS sales_7d,
SUM(CASE WHEN {date_col}::date >= CURRENT_DATE - 14
AND {date_col}::date < CURRENT_DATE - 7
THEN COALESCE({revenue_col}::numeric, 0) ELSE 0 END) AS sales_prev_7d,
SUM(CASE WHEN {date_col}::date >= CURRENT_DATE - 7
THEN COALESCE({qty_col}::numeric, 0) ELSE 0 END) AS qty_7d,
SUM(CASE WHEN {date_col}::date >= CURRENT_DATE - 7
THEN {profit_expr} ELSE 0 END) AS profit_7d,
SUM(CASE WHEN {date_col}::date >= CURRENT_DATE - 7
THEN {cost_expr} ELSE 0 END) AS cost_7d
FROM daily_sales_snapshot
GROUP BY {sku_col}
) sales ON sales.sku = lm.sku
""".format(
sku_col=sku_col,
date_col=date_col,
revenue_col=revenue_col,
qty_col=qty_col,
profit_expr=profit_expr,
cost_expr=cost_expr,
)
sales_select = """
COALESCE(sales.sales_7d, 0) AS sales_7d,
COALESCE(sales.sales_prev_7d, 0) AS sales_prev_7d,
COALESCE(sales.qty_7d, 0) AS qty_7d,
COALESCE(sales.profit_7d, 0) AS profit_7d,
COALESCE(sales.cost_7d, 0) AS cost_7d
"""
sql = text(f"""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.url,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
),
history_stats AS (
SELECT
sku,
source,
COUNT(*) AS history_points,
MIN(price) AS min_pchome_price,
MAX(price) AS max_pchome_price
FROM competitor_price_history
WHERE source = 'pchome'
AND crawled_at >= CURRENT_TIMESTAMP - INTERVAL '30 days'
GROUP BY sku, source
)
SELECT
lm.product_id,
lm.sku,
lm.name,
lm.url,
lm.category,
lm.momo_price,
cp.price AS pchome_price,
cp.original_price,
cp.discount_pct,
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score,
cp.tags,
cp.crawled_at,
COALESCE(hs.history_points, 0) AS history_points,
hs.min_pchome_price,
hs.max_pchome_price,
{sales_select}
FROM latest_momo lm
JOIN competitor_prices cp
ON cp.sku = lm.sku
AND cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.match_score >= 0.42
LEFT JOIN history_stats hs
ON hs.sku = lm.sku
AND hs.source = cp.source
{sales_join}
WHERE lm.rn = 1
ORDER BY cp.match_score DESC, cp.crawled_at DESC
LIMIT :limit
""")
try:
return [dict(row) for row in conn.execute(sql, {"limit": max(limit * 6, 100)}).mappings().all()]
except Exception as exc:
logger.warning("[ProductPickAgent] sales-aware query failed, fallback without sales: %s", exc)
try:
conn.rollback()
except Exception:
pass
fallback = text("""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.url,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
)
SELECT
lm.product_id,
lm.sku,
lm.name,
lm.url,
lm.category,
lm.momo_price,
cp.price AS pchome_price,
cp.original_price,
cp.discount_pct,
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score,
cp.tags,
cp.crawled_at,
0 AS history_points,
NULL AS min_pchome_price,
NULL AS max_pchome_price,
0 AS sales_7d,
0 AS sales_prev_7d,
0 AS qty_7d,
0 AS profit_7d,
0 AS cost_7d
FROM latest_momo lm
JOIN competitor_prices cp
ON cp.sku = lm.sku
AND cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.match_score >= 0.42
WHERE lm.rn = 1
ORDER BY cp.match_score DESC, cp.crawled_at DESC
LIMIT :limit
""")
return [dict(row) for row in conn.execute(fallback, {"limit": max(limit * 6, 100)}).mappings().all()]
def _score_candidate(row: Dict[str, Any]) -> Dict[str, Any]:
momo_price = _to_float(row.get("momo_price"))
pchome_price = _to_float(row.get("pchome_price"))
match_score = _to_float(row.get("match_score"))
sales_7d = _to_float(row.get("sales_7d"))
sales_prev_7d = _to_float(row.get("sales_prev_7d"))
qty_7d = _to_float(row.get("qty_7d"))
profit_7d = _to_float(row.get("profit_7d"))
cost_7d = _to_float(row.get("cost_7d"))
history_points = int(_to_float(row.get("history_points")))
min_pchome_price = _to_float(row.get("min_pchome_price"))
tags = _load_json_tags(row.get("tags"))
gap_pct = ((momo_price - pchome_price) / pchome_price * 100) if pchome_price else 0
sales_delta = ((sales_7d - sales_prev_7d) / sales_prev_7d * 100) if sales_prev_7d else None
if not profit_7d and cost_7d and sales_7d:
profit_7d = sales_7d - cost_7d
margin_rate = (profit_7d / sales_7d * 100) if sales_7d and profit_7d else None
price_score = max(0, min(40, gap_pct * 1.9 + 8))
match_component = max(0, min(30, match_score * 30))
sales_component = 0
if sales_7d > 0:
sales_component += min(9, sales_7d / 30000 * 9)
if qty_7d > 0:
sales_component += min(4, qty_7d / 20 * 4)
if sales_delta is not None and sales_delta > 0:
sales_component += min(7, sales_delta / 40 * 7)
margin_component = 0
if margin_rate is not None:
margin_component = max(0, min(10, margin_rate / 35 * 10))
history_component = min(12, history_points * 2.4)
promo_component = 0
if any(tag in tags for tag in ["on_sale", "discount_10pct", "discount_20pct", "discount_30pct"]):
promo_component += 5
if "high_rating" in tags:
promo_component += 3
if "low_stock" in tags:
promo_component -= 4
price_position_component = 0
if min_pchome_price and pchome_price:
if pchome_price <= min_pchome_price * 1.03:
price_position_component = 6
elif pchome_price <= min_pchome_price * 1.08:
price_position_component = 3
opportunity_score = min(
100,
price_score + sales_component + margin_component + promo_component + price_position_component,
)
evidence_quality = min(
100,
match_component
+ history_component
+ (12 if sales_7d > 0 else 0)
+ (8 if margin_rate is not None else 0)
+ (8 if row.get("competitor_product_id") and row.get("competitor_product_name") else 0)
+ (6 if row.get("crawled_at") else 0),
)
score = round(min(100, opportunity_score + evidence_quality * 0.35), 1)
confidence = round(max(0.45, min(0.98, (score * 0.65 + evidence_quality * 0.35) / 100)), 3)
missing_evidence = []
if history_points < 3:
missing_evidence.append("PChome 價格歷史不足 3 筆")
if sales_7d <= 0:
missing_evidence.append("近 7 天銷售額缺口")
if qty_7d <= 0:
missing_evidence.append("近 7 天銷量缺口")
if margin_rate is None:
missing_evidence.append("毛利/成本缺口")
if not row.get("competitor_product_id"):
missing_evidence.append("PChome 商品 ID 缺口")
if confidence >= 0.78 and evidence_quality >= 70:
confidence_band = "high"
elif confidence >= 0.65 and evidence_quality >= 55:
confidence_band = "medium"
else:
confidence_band = "needs_evidence"
if gap_pct >= 10:
angle = "PChome 價格優勢明顯"
elif gap_pct >= 3:
angle = "PChome 小幅價格優勢"
elif sales_7d > 0:
angle = "近期有銷售動能,可搭配內容或檔期測試"
else:
angle = "比對信心足夠,可列入觀察型挑品"
reason_parts = [
f"{angle}PChome ${pchome_price:,.0f} vs MOMO ${momo_price:,.0f}",
f"價差 {gap_pct:+.1f}%",
f"比對信心 {match_score:.2f}",
]
if sales_7d > 0:
reason_parts.append(f"近 7 天銷售額 ${sales_7d:,.0f}")
if margin_rate is not None:
reason_parts.append(f"近 7 天毛利率 {margin_rate:.1f}%")
if history_points:
reason_parts.append(f"已有 {history_points} 筆 PChome 歷史快照")
if price_position_component:
reason_parts.append("目前 PChome 價格接近 30 天低點")
if "high_rating" in tags:
reason_parts.append("PChome 商品評價訊號佳")
if "low_stock" in tags:
reason_parts.append("PChome 庫存偏低,需留意供貨")
if missing_evidence:
reason_parts.append("待補證據:" + "".join(missing_evidence[:3]))
return {
**row,
"gap_pct": round(gap_pct, 1),
"sales_7d_delta": round(sales_delta, 1) if sales_delta is not None else 0,
"pick_score": score,
"confidence": confidence,
"evidence_quality": round(evidence_quality, 1),
"opportunity_score": round(opportunity_score, 1),
"margin_rate": round(margin_rate, 1) if margin_rate is not None else None,
"confidence_band": confidence_band,
"missing_evidence": missing_evidence,
"reason": "".join(reason_parts),
}
def _write_pick(conn, pick: Dict[str, Any]) -> None:
from sqlalchemy import text
footprint = {
"agent": {
"name": "PChomeProductPickAgent",
"version": "v1",
"generated_at": datetime.now().isoformat(timespec="seconds"),
"inputs": ["products", "price_records", "competitor_prices", "competitor_price_history", "daily_sales_snapshot"],
"score": pick["pick_score"],
"opportunity_score": pick.get("opportunity_score"),
"evidence_quality": pick.get("evidence_quality"),
"margin_rate": pick.get("margin_rate"),
"confidence_band": pick.get("confidence_band"),
"missing_evidence": pick.get("missing_evidence", []),
},
"competitor": {
"source": "pchome",
"product_id": pick.get("competitor_product_id"),
"product_name": pick.get("competitor_product_name"),
"match_score": _to_float(pick.get("match_score")),
},
}
conn.execute(text("""
INSERT INTO ai_price_recommendations
(sku, name, reason, strategy, confidence,
momo_price, pchome_price, gap_pct, sales_7d_delta,
model_footprint, status, created_at, updated_at)
VALUES
(:sku, :name, :reason, 'product_pick', :confidence,
:momo_price, :pchome_price, :gap_pct, :sales_7d_delta,
:footprint, 'pending', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
ON CONFLICT (sku) DO UPDATE
SET reason = EXCLUDED.reason,
strategy = 'product_pick',
confidence = EXCLUDED.confidence,
momo_price = EXCLUDED.momo_price,
pchome_price = EXCLUDED.pchome_price,
gap_pct = EXCLUDED.gap_pct,
sales_7d_delta = EXCLUDED.sales_7d_delta,
model_footprint = EXCLUDED.model_footprint,
status = 'pending',
updated_at = CURRENT_TIMESTAMP
"""), {
"sku": pick["sku"],
"name": pick["name"],
"reason": pick["reason"],
"confidence": pick["confidence"],
"momo_price": pick["momo_price"],
"pchome_price": pick["pchome_price"],
"gap_pct": pick["gap_pct"],
"sales_7d_delta": pick["sales_7d_delta"],
"footprint": json.dumps(footprint, ensure_ascii=False),
})
def _supersede_old_picks(conn, current_skus: List[str]) -> None:
from sqlalchemy import bindparam, text
if not current_skus:
conn.execute(text("""
UPDATE ai_price_recommendations
SET status = 'superseded',
updated_at = CURRENT_TIMESTAMP
WHERE strategy = 'product_pick'
AND status = 'pending'
"""))
return
stmt = text("""
UPDATE ai_price_recommendations
SET status = 'superseded',
updated_at = CURRENT_TIMESTAMP
WHERE strategy = 'product_pick'
AND status = 'pending'
AND sku NOT IN :current_skus
""").bindparams(bindparam("current_skus", expanding=True))
conn.execute(stmt, {"current_skus": [str(sku) for sku in current_skus]})
def generate_product_pick_list(engine, limit: int = 50) -> ProductPickResult:
"""產生並保存 AI 建議挑品清單。"""
generated_at = datetime.now().isoformat(timespec="seconds")
with engine.connect() as conn:
rows = _fetch_candidates(conn, limit)
with engine.begin() as conn:
scored = [_score_candidate(row) for row in rows if _to_float(row.get("pchome_price")) > 0]
picks = [
pick for pick in scored
if pick["pick_score"] >= 45 and (_to_float(pick.get("match_score")) >= 0.42)
]
picks.sort(key=lambda item: item["pick_score"], reverse=True)
picks = picks[:limit]
for pick in picks:
_write_pick(conn, pick)
_supersede_old_picks(conn, [pick["sku"] for pick in picks])
return ProductPickResult(
candidates=len(rows),
written=len(picks),
picks=picks,
generated_at=generated_at,
)