補齊比價覆核隊列分頁
All checks were successful
CD Pipeline / deploy (push) Successful in 1m3s

This commit is contained in:
OoO
2026-05-20 01:35:10 +08:00
parent b2f9ccf4f0
commit c9aa2efd8b
4 changed files with 244 additions and 32 deletions

View File

@@ -56,7 +56,7 @@ SQL漏斗(~300筆)
- 比對覆蓋率補強入口:`POST /api/ai/pchome-match/backfill`,優先補抓仍無有效 PChome 配對的高價 ACTIVE 商品,完成後自動重算 AI 挑品清單。
- 排程閉環:`run_pchome_match_backfill_task` 每日 10:30 執行,補抓 PChome 待比對商品、寫入歷史價格,再重算 `strategy='product_pick'` 清單。
- PChome / MOMO 競價摘要出口 `services/competitor_intel_repository.py` 使用 30 分鐘共享快取(`COMPETITOR_INTEL_CACHE_TTL_SECONDS` 可調),避免 `/growth_analysis``/daily_sales`、PPT/AI 報表每次請求重跑昂貴覆蓋率與價差趨勢查詢;`run_competitor_price_feeder_task` 與 PChome backfill 完成後會主動清除快取。快取只包摘要輸出,不改 matcher 的高信心門檻與 identity_v2 準確性規則。
- 商品看板第一屏:`/` 的 V2 看板直接以 `products``price_records``competitor_prices``competitor_match_attempts``ai_price_recommendations` 顯示比對覆蓋率、PChome 優勢、MOMO 威脅、AI 挑品、待比對優先清單與 PChome 覆核隊列;`filter=ai_picks` 可查看 50 品 AI 挑品列表,`filter=pchome_review` 可直接查看需人工處理的比價覆核 SKU列內顯示候選 PChome 商品、候選價、match score、單位價換算摘要與人工動作。商品看板深度快取同時寫入 `data/dashboard_full_cache.pkl`,供多個 Gunicorn worker 共用,避免部署後各 worker 重複重建 7,000+ 商品統計造成開頁變慢;所有資料異動與 AI 挑品重算都透過 `clear_dashboard_cache()` 同步清除記憶體與共享快取,手動重算 API 會立即預熱商品看板快取,避免第一位使用者承擔重建成本。
- 商品看板第一屏:`/` 的 V2 看板直接以 `products``price_records``competitor_prices``competitor_match_attempts``ai_price_recommendations` 顯示比對覆蓋率、PChome 優勢、MOMO 威脅、AI 挑品、待比對優先清單與 PChome 覆核隊列;`filter=ai_picks` 可查看 50 品 AI 挑品列表,`filter=pchome_review` 可直接查看需人工處理的比價覆核 SKU並以 DB 分頁支援 search/category 後的完整隊列,不得只截前 50 筆。列內顯示候選 PChome 商品、候選價、match score、單位價換算摘要與人工動作。商品看板深度快取同時寫入 `data/dashboard_full_cache.pkl`,供多個 Gunicorn worker 共用,避免部署後各 worker 重複重建 7,000+ 商品統計造成開頁變慢;所有資料異動與 AI 挑品重算都透過 `clear_dashboard_cache()` 同步清除記憶體與共享快取,手動重算 API 會立即預熱商品看板快取,避免第一位使用者承擔重建成本。
| 角色 | 模型 | 主機 | 成本 | 每日限額 |
|------|------|------|------|---------|

View File

@@ -509,6 +509,32 @@ def _merge_competitor_review_context(overview, review_context):
return overview
def _normalize_dashboard_category_filter(category_filter):
if not category_filter or category_filter == 'all':
return ''
if "(" in category_filter and "筆)" in category_filter:
return category_filter.rsplit(" (", 1)[0]
return category_filter
def _load_competitor_review_page(session, page=1, per_page=50, search_query='', category_filter='all'):
try:
from services.competitor_intel_repository import fetch_competitor_review_queue_page
engine = _get_session_engine(session)
if not engine:
return {'items': [], 'total': 0, 'page': page, 'per_page': per_page}
return fetch_competitor_review_queue_page(
engine,
page=page,
per_page=per_page,
search_query=search_query,
category=_normalize_dashboard_category_filter(category_filter),
)
except Exception as exc:
sys_log.warning(f"[Dashboard] PChome 覆核隊列分頁讀取略過: {exc}")
return {'items': [], 'total': 0, 'page': page, 'per_page': per_page}
def _parse_agent_footprint(value):
if not value:
return {}
@@ -1712,12 +1738,20 @@ def index():
review_queue = []
review_queue_map = {}
review_queue_order = {}
review_queue_total = 0
if filter_type == 'ai_picks':
ai_pick_skus, ai_pick_map = _load_ai_pick_selection(session, PRODUCT_PICK_LIST_LIMIT)
ai_pick_summary = _summarize_ai_pick_selection(ai_pick_map)
elif filter_type == 'pchome_review':
review_context = _load_competitor_review_context(session, limit=50)
review_queue = review_context.get('review_queue') or []
review_page = _load_competitor_review_page(
session,
page=page,
per_page=per_page,
search_query=search_query,
category_filter=category_filter,
)
review_queue = review_page.get('items') or []
review_queue_total = int(review_page.get('total') or len(review_queue))
review_queue_map = {
str(row.get('sku') or ''): row
for row in review_queue
@@ -1776,9 +1810,7 @@ def index():
})
else:
if category_filter != 'all':
real_category = category_filter
if "(" in category_filter and "筆)" in category_filter:
real_category = category_filter.rsplit(" (", 1)[0]
real_category = _normalize_dashboard_category_filter(category_filter)
filtered_items = [item for item in base_items if item['record'].product.category == real_category]
else:
filtered_items = base_items
@@ -1815,11 +1847,16 @@ def index():
sorted_items = sorted(filtered_items, key=get_sort_key, reverse=reverse)
# 分頁
total_items = len(sorted_items)
total_pages = math.ceil(total_items / per_page)
if filter_type == 'pchome_review':
total_items = review_queue_total
total_pages = math.ceil(total_items / per_page)
paged_items = sorted_items
else:
total_items = len(sorted_items)
total_pages = math.ceil(total_items / per_page)
start_idx = (page - 1) * per_page
paged_items = sorted_items[start_idx: start_idx + per_page]
start_idx = (page - 1) * per_page
paged_items = sorted_items[start_idx: start_idx + per_page]
# 為前端準備安全的 created_at 屬性
for item in paged_items:

View File

@@ -102,6 +102,28 @@ def _build_unit_comparison_for_attempt(row: dict[str, Any]) -> Optional[dict[str
return {"comparable": False, "reason": "build_error"}
def _format_competitor_review_item(row: dict[str, Any]) -> dict[str, Any]:
item = dict(row)
unit_comparison = _build_unit_comparison_for_attempt(item)
return {
"sku": str(item.get("sku") or ""),
"name": item.get("name") or "",
"category": item.get("category") or "",
"momo_price": _num(item.get("momo_price")),
"attempt_status": item.get("attempt_status") or "",
"status_label": _attempt_status_label(item.get("attempt_status")),
"action_label": _attempt_action_label(item.get("attempt_status")),
"candidate_count": int(item.get("candidate_count") or 0),
"candidate_pc_id": item.get("best_competitor_product_id"),
"candidate_pc_name": item.get("best_competitor_product_name") or "",
"candidate_pc_price": _num(item.get("best_competitor_price")),
"best_match_score": _num(item.get("best_match_score")),
"match_diagnostic": item.get("error_message") or "",
"attempted_at": _date_label(item.get("attempted_at")),
"unit_comparison": unit_comparison,
}
def clear_competitor_intel_cache() -> None:
"""Clear cached PChome/MOMO intelligence after crawler/import updates."""
with _CACHE_LOCK:
@@ -478,6 +500,177 @@ def fetch_competitor_review_queue(engine, limit: int = 12) -> list[dict]:
)
def fetch_competitor_review_queue_page(
engine,
page: int = 1,
per_page: int = 50,
search_query: str = "",
category: str = "",
) -> dict:
"""Paginated PChome review queue for operator-facing Dashboard pages."""
page = max(1, int(page or 1))
per_page = max(1, min(int(per_page or 50), 100))
search_query = (search_query or "").strip()
category = (category or "").strip()
cache_key = (
"review_queue_page:v1:"
f"page={page}:per={per_page}:q={search_query.lower()}:cat={category}:"
f"floor={PCHOME_MATCH_SCORE_FLOOR}"
)
return _cached_payload(
cache_key,
lambda: _fetch_competitor_review_queue_page_uncached(
engine,
page=page,
per_page=per_page,
search_query=search_query,
category=category,
),
ttl_seconds=min(COMPETITOR_INTEL_CACHE_TTL_SECONDS, 300),
)
def _review_queue_cte_and_filter(search_query: str = "", category: str = "") -> tuple[str, dict[str, Any]]:
params: dict[str, Any] = {}
filters = [
"lm.rn = 1",
"vc.sku IS NULL",
"""la.attempt_status IN (
'unit_comparable',
'refresh_unit_comparable',
'identity_veto',
'low_score',
'expired_match',
'refresh_no_result',
'no_result'
)""",
]
if search_query:
params["search_like"] = f"%{search_query.lower()}%"
filters.append("(LOWER(lm.name) LIKE :search_like OR LOWER(lm.sku) LIKE :search_like)")
if category:
params["category"] = category
filters.append("lm.category = :category")
where_sql = "\n AND ".join(filters)
cte = f"""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
),
valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
),
latest_attempt AS (
SELECT DISTINCT ON (cma.sku)
cma.sku,
cma.attempt_status,
cma.candidate_count,
cma.best_competitor_product_id,
cma.best_competitor_product_name,
cma.best_competitor_price,
cma.best_match_score,
cma.error_message,
cma.attempted_at
FROM competitor_match_attempts cma
WHERE cma.source = 'pchome'
ORDER BY cma.sku, cma.attempted_at DESC NULLS LAST
),
review_rows AS (
SELECT
lm.sku,
lm.name,
lm.category,
lm.momo_price,
la.attempt_status,
la.candidate_count,
la.best_competitor_product_id,
la.best_competitor_product_name,
la.best_competitor_price,
la.best_match_score,
la.error_message,
la.attempted_at,
CASE
WHEN la.attempt_status IN ('unit_comparable', 'refresh_unit_comparable') THEN 0
WHEN la.attempt_status = 'identity_veto' THEN 1
WHEN la.attempt_status = 'low_score' THEN 2
WHEN la.attempt_status = 'expired_match' THEN 3
ELSE 4
END AS priority_rank
FROM latest_momo lm
JOIN latest_attempt la ON la.sku = lm.sku
LEFT JOIN valid_competitor vc ON vc.sku = lm.sku
WHERE {where_sql}
)
"""
return cte, params
def _fetch_competitor_review_queue_page_uncached(
engine,
page: int = 1,
per_page: int = 50,
search_query: str = "",
category: str = "",
) -> dict:
inspector = inspect(engine)
if not (
inspector.has_table("products")
and inspector.has_table("price_records")
and inspector.has_table("competitor_prices")
and inspector.has_table("competitor_match_attempts")
):
return {"items": [], "total": 0, "page": max(1, int(page or 1)), "per_page": per_page}
page = max(1, int(page or 1))
per_page = max(1, min(int(per_page or 50), 100))
cte, params = _review_queue_cte_and_filter(search_query=search_query, category=category)
page_params = {
**params,
"limit": per_page,
"offset": (page - 1) * per_page,
}
count_sql = text(cte + " SELECT COUNT(*) AS total FROM review_rows")
page_sql = text(cte + """
SELECT *
FROM review_rows
ORDER BY
priority_rank ASC,
momo_price DESC NULLS LAST,
best_match_score DESC NULLS LAST,
attempted_at DESC NULLS LAST
LIMIT :limit OFFSET :offset
""")
with engine.connect() as conn:
total = int(conn.execute(count_sql, params).scalar() or 0)
rows = conn.execute(page_sql, page_params).mappings().all()
return {
"items": [_format_competitor_review_item(dict(row)) for row in rows],
"total": total,
"page": page,
"per_page": per_page,
}
def _fetch_competitor_review_queue_uncached(engine, limit: int = 12) -> list[dict]:
inspector = inspect(engine)
if not (
@@ -572,28 +765,7 @@ def _fetch_competitor_review_queue_uncached(engine, limit: int = 12) -> list[dic
with engine.connect() as conn:
rows = conn.execute(sql, {"limit": limit}).mappings().all()
queue = []
for row in rows:
item = dict(row)
unit_comparison = _build_unit_comparison_for_attempt(item)
queue.append({
"sku": str(item.get("sku") or ""),
"name": item.get("name") or "",
"category": item.get("category") or "",
"momo_price": _num(item.get("momo_price")),
"attempt_status": item.get("attempt_status") or "",
"status_label": _attempt_status_label(item.get("attempt_status")),
"action_label": _attempt_action_label(item.get("attempt_status")),
"candidate_count": int(item.get("candidate_count") or 0),
"candidate_pc_id": item.get("best_competitor_product_id"),
"candidate_pc_name": item.get("best_competitor_product_name") or "",
"candidate_pc_price": _num(item.get("best_competitor_price")),
"best_match_score": _num(item.get("best_match_score")),
"match_diagnostic": item.get("error_message") or "",
"attempted_at": _date_label(item.get("attempted_at")),
"unit_comparison": unit_comparison,
})
return queue
return [_format_competitor_review_item(dict(row)) for row in rows]
def fetch_competitor_comparison_results(

View File

@@ -131,6 +131,8 @@ def test_dashboard_v2_is_production_default_and_uses_real_dashboard_data():
assert "force_rebuild=False" in route_source
assert "def _load_competitor_decision_overview(session, latest_items=None)" in route_source
assert "fetch_competitor_review_queue" in route_source
assert "fetch_competitor_review_queue_page" in route_source
assert "_load_competitor_review_page(" in route_source
assert "_load_competitor_decision_overview(session, unique_items)" in route_source
assert "item_map = {}" in route_source
assert "competitor_map = _load_pchome_competitor_map(session, item_map.keys())" in route_source
@@ -139,6 +141,7 @@ def test_dashboard_v2_is_production_default_and_uses_real_dashboard_data():
assert "review_queue_count" in route_source
assert "unit_comparable_count" in route_source
assert "filter_type == 'pchome_review'" in route_source
assert "total_items = review_queue_total" in route_source
assert "MockRecord" not in route_source
assert "{% for item in items %}" in dashboard
assert "比價監控總覽" in dashboard