V10.610 作戰清單優先讀外部報價層
All checks were successful
CD Pipeline / deploy (push) Successful in 1m11s

This commit is contained in:
OoO
2026-06-16 09:21:40 +08:00
parent a3ace326c8
commit b7cb807683
6 changed files with 217 additions and 6 deletions

View File

@@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.609"
SYSTEM_VERSION = "V10.610"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -1,8 +1,8 @@
# PChome 業績成長自動化作戰系統 — AI 競價情報模組 Single Source of Truth
> **最後更新**: 2026-06-15 (台北時間)
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地LLM 路由紅線升級為 Ollama-first 三主機級聯PChome 後台業績匯入韌性已補強產品定位正名為「PChome 業績成長自動化作戰系統」;外部市場來源正規化層、自動同步與 CSV 備援預檢已建立
> **適用版本**: V10.609
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地LLM 路由紅線升級為 Ollama-first 三主機級聯PChome 後台業績匯入韌性已補強產品定位正名為「PChome 業績成長自動化作戰系統」;外部市場來源正規化層、自動同步、作戰清單優先讀取與 CSV 備援預檢已建立
> **適用版本**: V10.610
---
@@ -56,6 +56,7 @@
- V10.607 新增 `external_market_sources` / `external_offers` 正規化層與 `/api/ai/pchome-growth/source-contract` 只讀 API。MOMO 先以既有比價快取橋接進來源狀態;蝦皮與酷澎只保留 official API、provider API、manual CSV contract預設暫停且不進告警。
- V10.608 新增 `/api/ai/pchome-growth/external-offers/csv-dry-run` 與 AI 情報頁「外部報價預檢」。CSV 預檢只讀、不寫 DB逐列回報「可使用」「需人工確認」「不能使用」並支援中文表頭避免格式小錯造成整批匯入失敗。
- V10.609 明確把外部報價主路徑改為自動化:`run_external_offer_sync_task` 每 4 小時將已確認同款的既有比價快取同步進 `external_offers`。CSV 只保留為 API / crawler / provider 失敗時的備援預檢入口,不是日常營運主流程。
- V10.610 起 `/api/ai/pchome-growth/opportunities` 優先讀取 `external_offers` 的自動同步資料;只有新資料層缺資料時才 fallback 舊 `competitor_prices`。API stats 會回傳資料來源計數,方便確認作戰清單是否已走新資料層。
## 零之一、12 Agent 決策信封2026-05-24

View File

@@ -205,4 +205,10 @@
- 新增 `sync_legacy_momo_reference_offers()`,自動把已確認同款的既有比價快取同步進 `external_offers`
- 新增 `run_external_offer_sync_task`,每 4 小時自動執行;排在 competitor feeder 後,同步 MOMO 外部價格參考資料層。
- CSV 保留為 API / crawler / provider 故障時的救援預檢;日常目標是自動抓、自動同步、自動進作戰清單。
- 下一步:讓 PChome 成長作戰清單優先讀 `external_offers`,再 fallback 舊 `competitor_prices`,逐步把舊表降為 bridge。
## 13. 2026-06-16 V10.610 PChome 作戰清單優先讀新資料層
- `/api/ai/pchome-growth/opportunities` 已改成優先讀 `external_offers`,只有缺資料時才 fallback 舊 `competitor_prices`
- `external_offers.raw_payload_json` 會保留舊比價快取中的 PChome 公開價,讓新資料層仍可算出 MOMO / PChome 價差。
- API stats 新增 `external_data_source_counts`,可看到「自動同步資料層」與「舊比價快取」各有多少筆。
- 下一步:把其他比價報表與 AI 告警逐步改讀 `external_offers`,讓 `competitor_prices` 降為 bridge/cache。

View File

@@ -360,6 +360,7 @@ def _fetch_legacy_momo_reference_rows(conn, limit: int) -> list[dict[str, Any]]:
cp.sku AS momo_sku,
cp.competitor_product_id AS pchome_product_id,
cp.competitor_product_name AS pchome_product_name,
cp.price AS pchome_public_price,
cp.match_score,
cp.tags::text AS tags,
cp.crawled_at,
@@ -409,6 +410,7 @@ def _fetch_legacy_momo_reference_rows(conn, limit: int) -> list[dict[str, Any]]:
cp.sku AS momo_sku,
cp.competitor_product_id AS pchome_product_id,
cp.competitor_product_name AS pchome_product_name,
cp.price AS pchome_public_price,
cp.match_score,
cp.tags AS tags,
cp.crawled_at,
@@ -506,6 +508,8 @@ def _legacy_row_to_external_offer(row: dict[str, Any]) -> dict[str, Any]:
"quality_notes_json": json.dumps(notes, ensure_ascii=False),
"raw_payload_json": json.dumps({
"legacy_source": "competitor_prices",
"pchome_public_price": _to_float(row.get("pchome_public_price")),
"pchome_public_name": row.get("pchome_product_name"),
"match_score": str(row.get("match_score") or ""),
"tags": row.get("tags"),
"crawled_at": str(row.get("crawled_at") or ""),

View File

@@ -183,7 +183,118 @@ def _fetch_sales_rows(conn, limit: int) -> tuple[list[dict[str, Any]], str | Non
return mapped_rows, latest_date
def _fetch_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
def _json_dict(value: Any) -> dict[str, Any]:
if not value:
return {}
if isinstance(value, dict):
return value
try:
parsed = json.loads(value)
return parsed if isinstance(parsed, dict) else {}
except Exception:
return {}
def _match_score_from_quality(value: Any) -> float:
score = _to_float(value)
if score > 1:
score = score / 100
return max(0, min(1, score))
def _fetch_normalized_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
inspector = inspect(conn)
if not inspector.has_table("external_offers"):
return {}
ids = [str(item).strip() for item in pchome_product_ids if str(item or "").strip()]
if not ids:
return {}
if conn.dialect.name == "postgresql":
sql = """
WITH latest_offer AS (
SELECT DISTINCT ON (eo.pchome_product_id)
eo.pchome_product_id,
eo.source_product_id AS momo_sku,
eo.title AS momo_name,
eo.price AS momo_price,
eo.quality_score,
eo.quality_notes_json,
eo.raw_payload_json,
eo.observed_at,
eo.ingestion_method
FROM external_offers eo
WHERE eo.source_code = 'momo_reference'
AND eo.pchome_product_id IS NOT NULL
AND eo.pchome_product_id IN :ids
AND eo.price IS NOT NULL
AND eo.price > 0
AND COALESCE(eo.quality_score, 0) >= 76
AND eo.match_status IN ('verified', 'usable', 'reviewed', 'exact', 'confirmed')
AND eo.data_quality_status IN ('verified', 'usable', 'reviewed')
AND (eo.expires_at IS NULL OR eo.expires_at > CURRENT_TIMESTAMP)
ORDER BY eo.pchome_product_id, eo.observed_at DESC NULLS LAST, eo.id DESC
)
SELECT * FROM latest_offer
"""
else:
sql = """
WITH latest_offer AS (
SELECT
eo.pchome_product_id,
eo.source_product_id AS momo_sku,
eo.title AS momo_name,
eo.price AS momo_price,
eo.quality_score,
eo.quality_notes_json,
eo.raw_payload_json,
eo.observed_at,
eo.ingestion_method,
ROW_NUMBER() OVER (
PARTITION BY eo.pchome_product_id
ORDER BY eo.observed_at DESC, eo.id DESC
) AS rn
FROM external_offers eo
WHERE eo.source_code = 'momo_reference'
AND eo.pchome_product_id IS NOT NULL
AND eo.pchome_product_id IN :ids
AND eo.price IS NOT NULL
AND eo.price > 0
AND COALESCE(eo.quality_score, 0) >= 76
AND eo.match_status IN ('verified', 'usable', 'reviewed', 'exact', 'confirmed')
AND eo.data_quality_status IN ('verified', 'usable', 'reviewed')
)
SELECT *
FROM latest_offer
WHERE rn = 1
"""
stmt = text(sql).bindparams(bindparam("ids", expanding=True))
rows = conn.execute(stmt, {"ids": ids}).mappings().all()
result: dict[str, dict[str, Any]] = {}
for row in rows:
raw_payload = _json_dict(row.get("raw_payload_json"))
key = str(row.get("pchome_product_id") or "").strip()
if not key:
continue
result[key] = {
"pchome_product_id": key,
"pchome_public_name": raw_payload.get("pchome_public_name"),
"momo_sku": row.get("momo_sku"),
"momo_name": row.get("momo_name"),
"momo_price": row.get("momo_price"),
"pchome_price": raw_payload.get("pchome_public_price"),
"match_score": _match_score_from_quality(row.get("quality_score")),
"tags": ["external_offers", "verified"],
"crawled_at": row.get("observed_at"),
"momo_price_at": row.get("observed_at"),
"data_source": "external_offers",
"ingestion_method": row.get("ingestion_method"),
}
return result
def _fetch_legacy_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
inspector = inspect(conn)
if not all(inspector.has_table(table) for table in {"competitor_prices", "products", "price_records"}):
return {}
@@ -281,10 +392,23 @@ def _fetch_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str,
for row in rows:
key = str(row.get("pchome_product_id") or "").strip()
if key:
result[key] = dict(row)
item = dict(row)
item["data_source"] = "competitor_prices"
result[key] = item
return result
def _fetch_external_price_map(conn, pchome_product_ids: list[str]) -> dict[str, dict[str, Any]]:
normalized_map = _fetch_normalized_external_price_map(conn, pchome_product_ids)
missing_ids = [
str(item).strip()
for item in pchome_product_ids
if str(item or "").strip() and str(item).strip() not in normalized_map
]
legacy_map = _fetch_legacy_external_price_map(conn, missing_ids)
return {**legacy_map, **normalized_map}
def _score_opportunity(sales_row: dict[str, Any], external_row: dict[str, Any] | None) -> dict[str, Any]:
sales_7d = _to_float(sales_row.get("sales_7d"))
sales_prev_7d = _to_float(sales_row.get("sales_prev_7d"))
@@ -311,6 +435,10 @@ def _score_opportunity(sales_row: dict[str, Any], external_row: dict[str, Any] |
data_quality_score = 78 + min(12, _to_float(external_row.get("match_score")) * 12)
external_payload = {
"source": "MOMO",
"data_source": external_row.get("data_source") or "competitor_prices",
"data_source_label": "自動同步資料層"
if external_row.get("data_source") == "external_offers"
else "舊比價快取",
"momo_sku": external_row.get("momo_sku"),
"momo_name": external_row.get("momo_name"),
"momo_price": round(momo_price, 2) if momo_price else None,
@@ -449,9 +577,16 @@ def build_pchome_growth_opportunities(engine, limit: int = 20) -> dict[str, Any]
mapped_count = len(opportunities) - needs_mapping_count
mapping_rate = round(mapped_count / max(len(opportunities), 1) * 100, 1)
action_counts: dict[str, int] = {}
external_data_source_counts: dict[str, int] = {}
for item in opportunities:
label = item["recommended_action"]["label"]
action_counts[label] = action_counts.get(label, 0) + 1
external_price = item.get("external_price") or {}
data_source_label = external_price.get("data_source_label")
if data_source_label:
external_data_source_counts[data_source_label] = (
external_data_source_counts.get(data_source_label, 0) + 1
)
return {
"success": True,
@@ -466,6 +601,7 @@ def build_pchome_growth_opportunities(engine, limit: int = 20) -> dict[str, Any]
"needs_mapping_count": needs_mapping_count,
"total_sales_7d": round(sum(_to_float(item.get("sales_7d")) for item in opportunities), 2),
"action_counts": action_counts,
"external_data_source_counts": external_data_source_counts,
},
"opportunities": opportunities,
"message": "已整理今日 PChome 業績成長作戰清單。",

View File

@@ -47,6 +47,46 @@ def _seed_growth_tables(engine):
"""))
def _seed_growth_external_offers(engine):
with engine.begin() as conn:
conn.execute(text("""
CREATE TABLE external_offers (
id INTEGER PRIMARY KEY,
source_code TEXT,
platform_code TEXT,
source_product_id TEXT,
source_offer_key TEXT,
title TEXT,
price REAL,
observed_at TEXT,
expires_at TEXT,
ingestion_method TEXT,
pchome_product_id TEXT,
momo_sku TEXT,
match_status TEXT,
quality_score REAL,
data_quality_status TEXT,
quality_notes_json TEXT,
raw_payload_json TEXT
)
"""))
conn.execute(text("""
INSERT INTO external_offers (
id, source_code, platform_code, source_product_id, source_offer_key,
title, price, observed_at, expires_at, ingestion_method,
pchome_product_id, momo_sku, match_status, quality_score,
data_quality_status, quality_notes_json, raw_payload_json
)
VALUES (
1, 'momo_reference', 'momo', 'MOMO-NEW', 'momo_reference:MOMO-NEW:PCH-1',
'MOMO 新資料層商品', 870, '2026-06-14 12:00:00', NULL, 'legacy_competitor_cache',
'PCH-1', 'MOMO-NEW', 'verified', 92,
'verified', '["自動同步"]',
'{"pchome_public_price": 1000, "pchome_public_name": "PChome 公開商品"}'
)
"""))
def test_pchome_growth_opportunities_use_plain_language_and_pause_shopee_coupang():
from services.pchome_revenue_growth_service import build_pchome_growth_opportunities
@@ -71,9 +111,33 @@ def test_pchome_growth_opportunities_use_plain_language_and_pause_shopee_coupang
actions = {item["pchome_product_id"]: item["recommended_action"]["label"] for item in payload["opportunities"]}
assert actions["PCH-1"] == "檢查售價與活動"
assert actions["PCH-2"] == "先補商品對應"
pchome_1 = next(item for item in payload["opportunities"] if item["pchome_product_id"] == "PCH-1")
assert pchome_1["external_price"]["data_source"] == "competitor_prices"
assert payload["stats"]["external_data_source_counts"] == {"舊比價快取": 1}
assert all("identity" not in " ".join(item["reason_lines"]).lower() for item in payload["opportunities"])
def test_pchome_growth_prefers_external_offers_over_legacy_competitor_cache():
from services.pchome_revenue_growth_service import build_pchome_growth_opportunities
engine = create_engine("sqlite:///:memory:")
_seed_growth_tables(engine)
_seed_growth_external_offers(engine)
payload = build_pchome_growth_opportunities(engine, limit=5)
assert payload["success"] is True
pchome_1 = next(item for item in payload["opportunities"] if item["pchome_product_id"] == "PCH-1")
external_price = pchome_1["external_price"]
assert external_price["data_source"] == "external_offers"
assert external_price["data_source_label"] == "自動同步資料層"
assert external_price["momo_sku"] == "MOMO-NEW"
assert external_price["momo_price"] == 870
assert external_price["pchome_price"] == 1000
assert external_price["gap_pct"] == -13.0
assert payload["stats"]["external_data_source_counts"] == {"自動同步資料層": 1}
def test_ai_product_pick_sales_join_by_sku_disabled_by_default(monkeypatch):
from services.ai_product_pick_agent import _sales_join_by_momo_sku_enabled