Files
ewoooc/routes/dashboard_routes.py
OoO 89407b054f
All checks were successful
CD Pipeline / deploy (push) Successful in 1m10s
feat: add pchome growth command center
2026-06-18 15:14:38 +08:00

2934 lines
123 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
商品看板路由模組
包含:首頁儀表板、商品列表、統計數據
"""
import os
import json
import math
import re
import time
import hashlib
import pickle
import threading
from datetime import datetime, timezone, timedelta
from types import SimpleNamespace
from flask import Blueprint, request, render_template, jsonify
from sqlalchemy import func, and_, text, bindparam
from sqlalchemy.orm import joinedload
from auth import login_required, get_current_user
from config import BASE_DIR, SYSTEM_VERSION, public_url
from database.manager import DatabaseManager
from database.models import Product, PriceRecord
from services.logger_manager import SystemLogger
from services.cache_manager import (
_DASHBOARD_DATA_CACHE,
_DASHBOARD_CACHE_TTL,
_DASHBOARD_SHARED_CACHE_FILE,
_DASHBOARD_STALE_CACHE_FILE,
)
from utils.momo_url_utils import build_momo_product_url, normalize_momo_product_url
# 時區設定
TAIPEI_TZ = timezone(timedelta(hours=8))
# Logger
sys_log = SystemLogger("DashboardRoutes").get_logger()
# Blueprint 定義
dashboard_bp = Blueprint('dashboard', __name__)
PRODUCT_PICK_LIST_LIMIT = 50
PCHOME_MATCH_SCORE_FLOOR = 0.76
REVIEW_STATUS_OPTIONS = [
{
'key': 'all',
'label': '全部待處理',
'statuses': (
'unit_comparable',
'refresh_unit_comparable',
'identity_veto',
'low_score',
'refresh_low_score',
'recoverable_low_score',
'true_low_confidence',
'protected_existing_match',
'expired_match',
'refresh_no_result',
'no_result',
'rescore_accepted_current',
),
},
{'key': 'rescore_accepted', 'label': '重算覆核', 'statuses': ('rescore_accepted_current',)},
{
'key': 'unit_comparable',
'label': '需單位價',
'statuses': ('unit_comparable', 'refresh_unit_comparable'),
},
{
'key': 'catalog_comparable',
'label': '型錄可比',
'statuses': ('true_low_confidence', 'catalog_variant_review', 'catalog_unit_review', 'catalog_identity_review'),
},
{'key': 'catalog_variant_review', 'label': '選項待核', 'statuses': ('true_low_confidence', 'catalog_variant_review')},
{'key': 'catalog_unit_review', 'label': '入數待核', 'statuses': ('true_low_confidence', 'catalog_unit_review')},
{'key': 'catalog_identity_review', 'label': '身份待核', 'statuses': ('true_low_confidence', 'catalog_identity_review')},
{'key': 'identity_veto', 'label': '已排除', 'statuses': ('identity_veto',)},
{'key': 'recoverable_low_score', 'label': '近門檻可救', 'statuses': ('recoverable_low_score',)},
{'key': 'true_low_confidence', 'label': '證據不足', 'statuses': ('true_low_confidence',)},
{'key': 'legacy_low_score', 'label': '低信心舊候選', 'statuses': ('low_score', 'refresh_low_score')},
{'key': 'protected_existing_match', 'label': '既有保護', 'statuses': ('protected_existing_match',)},
{'key': 'expired_match', 'label': '價格過期', 'statuses': ('expired_match',)},
{'key': 'no_result', 'label': '找不到同款', 'statuses': ('no_result', 'refresh_no_result')},
{
'key': 'manual_closed',
'label': '人工閉環',
'statuses': ('manual_rejected', 'manual_unit_price_required', 'manual_needs_research'),
},
]
def _build_pchome_product_url(product_id):
if not product_id:
return None
return f"https://24h.pchome.com.tw/prod/{str(product_id).strip()}"
def _build_momo_product_url(i_code):
return build_momo_product_url(i_code)
def _to_float(value):
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _diagnostic_match_rejection_label(diagnostic_text, score_text, *, blocked=True):
diagnostic_text = diagnostic_text or ''
suffix = '已排除,不進入價格比較' if blocked else '暫不採用,等待補搜尋或人工補證據'
score_match = re.search(r"(\d+)%", score_text or "")
score_pct = int(score_match.group(1)) if score_match else None
if 'unit_comparable' in diagnostic_text:
return '需單位價比較', f'{score_text},同核心商品但販售組合不同,需轉換每 ml / 每入後再判讀'
if 'refill_pack_conflict' in diagnostic_text:
return '補充包不相容', f'{score_text},補充瓶/補充包與一般正裝不同,{suffix}'
if any(token in diagnostic_text for token in ('bundle_offer_conflict', 'multi_component_conflict')):
return '組合規格不相容', f'{score_text},組合包/多件組與單品不同,{suffix}'
if 'brand_conflict' in diagnostic_text:
return '品牌不符已排除', f'{score_text},品牌不一致,{suffix}'
if any(token in diagnostic_text for token in ('volume_conflict', 'weight_conflict', 'count_conflict', 'component_count_conflict')):
return '規格不符已排除', f'{score_text},容量/件數不一致,{suffix}'
if 'type_conflict' in diagnostic_text:
return '品類不符已排除', f'{score_text},品類不一致,{suffix}'
if any(token in diagnostic_text for token in ('product_line_conflict', 'model_line_conflict')):
return '系列不符已排除', f'{score_text},商品線/型號不一致,{suffix}'
if 'makeup_finish_conflict' in diagnostic_text:
return '妝效質地不同', f'{score_text},同品牌同系列但妝效質地不同,{suffix}'
if 'nail_tool_function_conflict' in diagnostic_text:
return '工具功能不同', f'{score_text},同品牌但指甲工具功能不同,{suffix}'
if 'schick_razor_line_conflict' in diagnostic_text:
return '除毛刀品線不同', f'{score_text},同品牌但除毛刀子系列不同,{suffix}'
if any(token in diagnostic_text for token in (
'lancome_line_conflict',
'dr_hsieh_labsmart_line_conflict',
'hoi_candle_line_conflict',
'sun_protection_line_conflict',
)):
return '商品線不符已排除', f'{score_text},同品牌但商品線或用途不同,{suffix}'
if any(token in diagnostic_text for token in (
'saugella_variant_conflict',
'lactacyd_variant_conflict',
)):
return '款式版本不符', f'{score_text},同品牌同容量但清潔/保養款式不同,{suffix}'
if 'aroma_scent_variant_conflict' in diagnostic_text:
return '香味款式不符', f'{score_text},香氛商品香味或款式不同,{suffix}'
if 'variant_selection_review' in diagnostic_text:
return '多款任選待確認', f'{score_text},一側是多款任選或缺少明確色號,需人工確認'
if not blocked and score_pct is not None and score_pct < 60:
return '未找到可信同款', f'{score_text},最佳候選相似度不足,需補搜尋詞或確認 PChome 無同款'
return '身份不符已排除' if blocked else '低信心待補強', f'{score_text}{suffix}'
def _build_pchome_match_status(attempt=None, ineligible=None):
if attempt:
status = attempt.get('attempt_status') or 'unknown'
if status == 'rescore_accepted_current':
score = _to_float(attempt.get('best_match_score'))
score_text = f"目前信心 {int(score * 100)}%" if score is not None else '目前信心待補'
return {
'label': '重算待人工覆核',
'tone': 'watch',
'score': score,
'summary': f'{score_text},最新版 matcher 已通過重算覆核門檻;仍需人工確認身份後才可寫入正式 PChome 價差',
'detail': score_text,
}
if status == 'manual_accepted':
score = _to_float(attempt.get('best_match_score'))
score_text = f"人工採用 {round(score * 100)}%" if score is not None else "人工已採用同款"
return {
'label': '人工已採用',
'tone': 'win',
'summary': '人工已確認這筆 PChome 候選為同款,下一輪 feeder 會以 manual_accept 標籤維持正式配對',
'detail': score_text,
}
if status == 'manual_rejected':
score = _to_float(attempt.get('best_match_score'))
score_text = f"否決候選 {round(score * 100)}%" if score is not None else "人工已否決候選"
return {
'label': '人工已否決',
'tone': 'neutral',
'blocks_price_gap': True,
'summary': '人工已否決這筆 PChome 候選;後續 feeder 命中同一候選時會跳過正式價差寫入',
'detail': score_text,
}
if status == 'manual_unit_price_required':
score = _to_float(attempt.get('best_match_score'))
score_text = f"候選 {round(score * 100)}%" if score is not None else "人工標記單位價"
unit_comparison = attempt.get('unit_comparison') or {}
unit_insight = attempt.get('unit_price_insight') or {}
summary = '人工已判定總價不可直接比較,需以每 ml / 每 g / 每入單位價與檔期條件判讀'
if unit_comparison.get('comparable') and unit_insight.get('summary'):
summary = f"{unit_insight.get('summary')};仍不寫入正式總價差"
elif unit_comparison.get('summary'):
summary = f"已換算單位價:{unit_comparison.get('summary')};仍不寫入正式總價差"
return {
'label': '人工標記單位價',
'tone': 'watch',
'blocks_price_gap': True,
'summary': summary,
'detail': score_text,
}
if status == 'manual_needs_research':
score = _to_float(attempt.get('best_match_score'))
score_text = f"原候選 {round(score * 100)}%" if score is not None else "需補搜尋"
return {
'label': '人工要求補搜尋',
'tone': 'neutral',
'blocks_price_gap': True,
'summary': '人工要求補搜尋詞或重新抓取,不會把目前候選寫入正式 PChome 價差',
'detail': score_text,
}
if status == 'matched':
score = _to_float(attempt.get('best_match_score'))
score_text = f"最佳候選 {round(score * 100)}%" if score is not None else "已完成身份比對"
return {
'label': '已配對待刷新',
'tone': 'watch',
'summary': '曾通過 identity_v2但目前沒有有效價格快取等待下一輪刷新',
'detail': score_text,
}
if status == 'expired_match':
score = _to_float(attempt.get('best_match_score'))
score_text = f"身份分數 {round(score * 100)}%" if score is not None else "已完成身份比對"
return {
'label': '價格過期待刷新',
'tone': 'watch',
'summary': '同款身份已確認,但 PChome 價格快取過期,不顯示舊價避免誤判',
'detail': score_text,
}
if status == 'identity_veto':
score = _to_float(attempt.get('best_match_score'))
score_text = f"最佳候選 {round(score * 100)}%" if score is not None else "已拒絕候選"
label, summary = _diagnostic_match_rejection_label(
attempt.get('error_message'),
score_text,
blocked=True,
)
return {
'label': label,
'tone': 'neutral',
'summary': summary,
'detail': score_text,
}
if status in {'unit_comparable', 'refresh_unit_comparable'}:
score = _to_float(attempt.get('best_match_score'))
score_text = f"最佳候選 {round(score * 100)}%" if score is not None else "已找到同核心候選"
unit_comparison = attempt.get('unit_comparison') or {}
if unit_comparison.get('comparable'):
return {
'label': '需單位價比較',
'tone': 'watch',
'summary': f"已換算單位價:{unit_comparison.get('summary')};仍需人工確認檔期與贈品條件",
'detail': score_text,
}
return {
'label': '需單位價比較',
'tone': 'watch',
'summary': '候選同核心商品,但販售組合/買送不同;不可直接比總價,需用單位價或人工覆核',
'detail': score_text,
}
if status in {'catalog_variant_review', 'catalog_unit_review', 'catalog_identity_review'}:
score = _to_float(attempt.get('best_match_score'))
score_text = f"候選信心 {round(score * 100)}%" if score is not None else "候選信心待補"
candidate_count = int(attempt.get('candidate_count') or 0)
catalog_labels = {
'catalog_variant_review': (
'選項 / 色號待核',
'同品線候選已找到,但色號、香味、款式或任選組合仍需人工確認,避免錯配成正式價差',
),
'catalog_unit_review': (
'單位價 / 入數待核',
'同核心商品可比,但容量、入數或買送組合不同;需先換算單位價與檔期條件',
),
'catalog_identity_review': (
'身份採用待核',
'候選具備高信心身份證據,需人工最後確認後才寫入正式 PChome 價差',
),
}
label, summary = catalog_labels[status]
return {
'label': label,
'tone': 'watch',
'blocks_price_gap': True,
'summary': summary,
'detail': f'{score_text} / {candidate_count} 筆候選' if candidate_count else score_text,
}
if ineligible:
reason = ineligible.get('reason') or 'not_eligible'
score = _to_float(ineligible.get('match_score'))
score_text = f"match {round(score * 100)}%" if score is not None else None
if reason == 'expired_match':
return {
'label': '價格過期待刷新',
'tone': 'watch',
'summary': '已有高信心同款配對,但 PChome 價格快取過期,等待補抓刷新',
'detail': score_text,
}
if reason == 'legacy_without_identity_v2':
return {
'label': '舊版配對待重驗',
'tone': 'neutral',
'summary': '舊版 PChome 配對尚未通過 identity_v2不進入正式決策',
'detail': score_text,
}
if reason == 'below_score_floor':
return {
'label': '低分配對待補強',
'tone': 'neutral',
'summary': '已有候選但低於高信心門檻,避免錯配所以暫不採用',
'detail': score_text,
}
if reason == 'invalid_price':
return {
'label': '價格無效待刷新',
'tone': 'watch',
'summary': 'PChome 配對缺少有效價格,等待下一輪補抓',
'detail': None,
}
if not attempt:
return {
'label': '尚未搜尋',
'tone': 'neutral',
'summary': '尚未進入 PChome 補抓佇列',
'detail': None,
}
status = attempt.get('attempt_status') or 'unknown'
score = _to_float(attempt.get('best_match_score'))
candidate_count = int(attempt.get('candidate_count') or 0)
score_text = f"最佳候選 {round(score * 100)}%" if score is not None else "尚無候選分數"
if status in {'low_score', 'refresh_low_score', 'recoverable_low_score', 'true_low_confidence'}:
diagnostic_text = attempt.get('error_message') or ''
label, summary = _diagnostic_match_rejection_label(
diagnostic_text,
score_text,
blocked=False,
)
if status == 'recoverable_low_score':
label = '近門檻可回收'
summary = '同品線證據已足夠,但分數仍略低於正式採用門檻;優先排入回刷或人工採用'
elif status == 'true_low_confidence':
label = '證據不足'
summary = '目前候選仍缺乏足夠身份證據,先保守不採用'
elif status in {'low_score', 'refresh_low_score'} and label != '未找到可信同款':
label = '低信心舊候選'
summary = '這批是舊分數或刷新後仍低分的候選,應先補搜尋詞或重算,不直接寫入正式價差'
return {
'label': label,
'tone': 'neutral',
'summary': summary,
'detail': f'{candidate_count} 筆候選',
}
if status in {'needs_review', 'refresh_needs_review', 'protected_existing_match'}:
return {
'label': '既有配對保護',
'tone': 'neutral',
'summary': '新候選合理,但正式環境已存在更強既有配對,需人工確認後才覆蓋',
'detail': f'{score_text} / {candidate_count} 筆候選',
}
if status in {'no_result', 'no_match', 'refresh_no_match'}:
return {
'label': '找不到同款',
'tone': 'neutral',
'summary': 'PChome 搜尋無可信候選,需補關鍵字或人工覆核',
'detail': f'{candidate_count} 筆候選',
}
if status in {'error', 'refresh_error'}:
return {
'label': '抓取異常',
'tone': 'risk',
'summary': 'PChome 比對流程發生異常,請補抓或查看後台嘗試紀錄',
'detail': f'{candidate_count} 筆候選' if candidate_count else '流程異常',
}
return {
'label': '狀態待釐清',
'tone': 'neutral',
'summary': '已有比對紀錄但尚未分類,需檢查補抓紀錄或重新排入覆核',
'detail': score_text,
}
def _build_competitor_decision(momo_price, pchome_price, match_status=None):
status = match_status or _build_pchome_match_status()
if status.get('blocks_price_gap') or not pchome_price:
return {
'label': status['label'],
'tone': status['tone'],
'gap_amount': None,
'gap_pct': None,
'summary': status['summary']
}
momo_price = float(momo_price or 0)
pchome_price = float(pchome_price)
gap_amount = momo_price - pchome_price
gap_pct = (gap_amount / pchome_price * 100) if pchome_price else 0
if gap_pct >= 5:
return {
'label': 'PChome 價格壓力',
'tone': 'risk',
'gap_amount': gap_amount,
'gap_pct': gap_pct,
'summary': 'PChome 較便宜,需評估 MOMO 價格、促銷或曝光策略'
}
if gap_pct <= -5:
return {
'label': 'MOMO 價格優勢',
'tone': 'win',
'gap_amount': gap_amount,
'gap_pct': gap_pct,
'summary': 'MOMO 較便宜,可優先檢查毛利與曝光機會'
}
return {
'label': '價格接近',
'tone': 'watch',
'gap_amount': gap_amount,
'gap_pct': gap_pct,
'summary': '價差有限,建議主打服務、到貨或回饋'
}
def _load_pchome_competitor_map(session, skus):
sku_list = [str(sku) for sku in skus if sku]
if not sku_list:
return {}
try:
stmt = text("""
SELECT DISTINCT ON (sku)
sku,
price,
original_price,
discount_pct,
competitor_product_id,
competitor_product_name,
competitor_product_url,
competitor_image_url,
competitor_stock,
match_score,
tags,
comparison_mode,
hard_veto,
diagnostic_codes,
crawled_at,
expires_at
FROM competitor_prices
WHERE source = 'pchome'
AND sku IN :skus
AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
AND price IS NOT NULL
AND price > 0
AND COALESCE(match_score, 0) >= :match_score_floor
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY sku, crawled_at DESC NULLS LAST
""").bindparams(bindparam("skus", expanding=True))
rows = session.execute(
stmt,
{"skus": sku_list, "match_score_floor": PCHOME_MATCH_SCORE_FLOOR},
).mappings().all()
except Exception as exc:
sys_log.warning(f"[Dashboard] PChome 競品價格資料讀取略過: {exc}")
return {}
result = {}
for row in rows:
competitor_product_id = row.get('competitor_product_id')
result[str(row.get('sku'))] = {
'source': 'pchome',
'price': _to_float(row.get('price')),
'original_price': _to_float(row.get('original_price')),
'discount_pct': row.get('discount_pct'),
'product_id': competitor_product_id,
'product_name': row.get('competitor_product_name'),
'product_url': row.get('competitor_product_url') or _build_pchome_product_url(competitor_product_id),
'image_url': row.get('competitor_image_url'),
'stock': row.get('competitor_stock'),
'match_score': _to_float(row.get('match_score')),
'tags': row.get('tags'),
'comparison_mode': row.get('comparison_mode'),
'hard_veto': row.get('hard_veto'),
'diagnostic_codes': row.get('diagnostic_codes'),
'crawled_at': row.get('crawled_at'),
'expires_at': row.get('expires_at'),
}
return result
def _load_pchome_ineligible_competitor_map(session, skus):
"""Read non-decision PChome rows so the UI can explain why a SKU is pending."""
sku_list = [str(sku) for sku in skus if sku]
if not sku_list:
return {}
try:
stmt = text("""
WITH ineligible AS (
SELECT
sku,
price,
competitor_product_id,
competitor_product_name,
competitor_product_url,
competitor_image_url,
competitor_stock,
match_score,
tags,
comparison_mode,
hard_veto,
diagnostic_codes,
crawled_at,
expires_at,
CASE
WHEN price IS NULL OR price <= 0 THEN 'invalid_price'
WHEN (expires_at IS NOT NULL AND expires_at <= CURRENT_TIMESTAMP)
AND COALESCE(match_score, 0) >= :match_score_floor
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
THEN 'expired_match'
WHEN NOT (COALESCE(tags, '[]'::jsonb) ? 'identity_v2')
THEN 'legacy_without_identity_v2'
WHEN COALESCE(match_score, 0) < :match_score_floor
THEN 'below_score_floor'
ELSE 'not_eligible'
END AS reason,
ROW_NUMBER() OVER (
PARTITION BY sku
ORDER BY
CASE
WHEN (expires_at IS NOT NULL AND expires_at <= CURRENT_TIMESTAMP)
AND COALESCE(match_score, 0) >= :match_score_floor
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
THEN 0
WHEN NOT (COALESCE(tags, '[]'::jsonb) ? 'identity_v2') THEN 1
WHEN COALESCE(match_score, 0) < :match_score_floor THEN 2
ELSE 3
END,
crawled_at DESC NULLS LAST,
match_score DESC NULLS LAST
) AS rn
FROM competitor_prices
WHERE source = 'pchome'
AND sku IN :skus
AND NOT (
(expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
AND price IS NOT NULL
AND price > 0
AND COALESCE(match_score, 0) >= :match_score_floor
AND COALESCE(tags, '[]'::jsonb) ? 'identity_v2'
)
)
SELECT *
FROM ineligible
WHERE rn = 1
""").bindparams(bindparam("skus", expanding=True))
rows = session.execute(
stmt,
{"skus": sku_list, "match_score_floor": PCHOME_MATCH_SCORE_FLOOR},
).mappings().all()
except Exception as exc:
sys_log.warning(f"[Dashboard] PChome 非有效配對原因讀取略過: {exc}")
return {}
result = {}
for row in rows:
result[str(row.get('sku'))] = {
'reason': row.get('reason'),
'price': _to_float(row.get('price')),
'product_id': row.get('competitor_product_id'),
'product_name': row.get('competitor_product_name'),
'product_url': row.get('competitor_product_url') or _build_pchome_product_url(row.get('competitor_product_id')),
'image_url': row.get('competitor_image_url'),
'stock': row.get('competitor_stock'),
'match_score': _to_float(row.get('match_score')),
'tags': row.get('tags'),
'comparison_mode': row.get('comparison_mode'),
'hard_veto': row.get('hard_veto'),
'diagnostic_codes': row.get('diagnostic_codes'),
'crawled_at': row.get('crawled_at'),
'expires_at': row.get('expires_at'),
}
return result
def _load_pchome_match_attempt_map(session, skus):
sku_list = [str(sku) for sku in skus if sku]
if not sku_list:
return {}
try:
stmt = text("""
WITH latest_momo AS (
SELECT
p.i_code AS sku,
p.name AS momo_product_name,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.i_code IN :skus
),
ranked AS (
SELECT
cma.sku,
cma.attempt_status,
cma.candidate_count,
cma.best_competitor_product_id,
cma.best_competitor_product_name,
cma.competitor_product_url,
cma.competitor_image_url,
cma.competitor_stock,
cma.best_competitor_price,
cma.best_match_score,
cma.match_diagnostic_json,
cma.comparison_mode,
cma.hard_veto,
cma.diagnostic_codes,
cma.error_message,
cma.attempted_at,
lm.momo_product_name,
lm.momo_price,
ROW_NUMBER() OVER (PARTITION BY cma.sku ORDER BY cma.attempted_at DESC) AS rn
FROM competitor_match_attempts cma
LEFT JOIN latest_momo lm
ON lm.sku = cma.sku
AND lm.rn = 1
WHERE cma.source = 'pchome'
AND cma.sku IN :skus
)
SELECT *
FROM ranked
WHERE rn = 1
""").bindparams(bindparam("skus", expanding=True))
rows = session.execute(stmt, {"skus": sku_list}).mappings().all()
except Exception as exc:
sys_log.warning(f"[Dashboard] PChome 比對嘗試資料讀取略過: {exc}")
return {}
result = {}
try:
from services.competitor_intel_repository import (
_build_unit_comparison_for_attempt,
_build_unit_price_business_insight,
_extract_match_diagnostic_reasons,
_parse_json_payload,
)
except Exception:
_build_unit_comparison_for_attempt = None
_build_unit_price_business_insight = None
_extract_match_diagnostic_reasons = None
_parse_json_payload = None
for row in rows:
item = dict(row)
if item.get('best_competitor_product_id') and not item.get('competitor_product_url'):
item['competitor_product_url'] = _build_pchome_product_url(item.get('best_competitor_product_id'))
if _extract_match_diagnostic_reasons and _parse_json_payload:
diagnostic_payload = _parse_json_payload(item.get('match_diagnostic_json'))
diagnostic_reasons = _extract_match_diagnostic_reasons(item.get('error_message'), diagnostic_payload)
item['diagnostic_reasons'] = diagnostic_reasons
item['diagnostic_reason_text'] = ''.join(reason['label'] for reason in diagnostic_reasons)
if item.get('attempt_status') in {'unit_comparable', 'refresh_unit_comparable', 'manual_unit_price_required'}:
try:
if _build_unit_comparison_for_attempt and _build_unit_price_business_insight:
item['unit_comparison'] = _build_unit_comparison_for_attempt(item)
item['unit_price_insight'] = _build_unit_price_business_insight(item.get('unit_comparison'), item)
else:
from services.marketplace_product_matcher import build_unit_price_comparison
item['unit_comparison'] = build_unit_price_comparison(
item.get('momo_product_name') or '',
item.get('best_competitor_product_name') or '',
item.get('momo_price'),
item.get('best_competitor_price'),
)
except Exception as exc:
sys_log.warning(f"[Dashboard] PChome 單位價比較資料建立略過: {exc}")
item['unit_comparison'] = {'comparable': False, 'reason': 'build_error'}
result[str(row.get('sku'))] = item
return result
def _format_dashboard_dt(value):
if not value:
return None
if hasattr(value, "strftime"):
return value.strftime("%Y-%m-%d %H:%M")
return str(value)
def _get_session_engine(session):
try:
return session.get_bind()
except Exception:
return getattr(session, 'bind', None)
def _load_competitor_review_context(session, limit=12):
try:
from services.competitor_intel_repository import (
fetch_competitor_coverage,
fetch_competitor_review_queue,
)
engine = _get_session_engine(session)
if not engine:
return {'coverage': {}, 'review_queue': []}
return {
'coverage': fetch_competitor_coverage(engine) or {},
'review_queue': fetch_competitor_review_queue(engine, limit=limit) or [],
}
except Exception as exc:
sys_log.warning(f"[Dashboard] PChome 覆核隊列讀取略過: {exc}")
return {'coverage': {}, 'review_queue': []}
def _merge_competitor_review_context(overview, review_context):
coverage = review_context.get('coverage') or {}
review_queue = review_context.get('review_queue') or []
attempt_status = coverage.get('attempt_status') or {}
explicit_catalog_count = sum(
int(attempt_status.get(status) or 0)
for status in ('catalog_variant_review', 'catalog_unit_review', 'catalog_identity_review')
)
legacy_catalog_count = max(int(coverage.get('catalog_comparable_count') or 0) - explicit_catalog_count, 0)
review_status_counts = {}
for option in REVIEW_STATUS_OPTIONS:
if option['key'] == 'catalog_comparable':
review_status_counts[option['key']] = int(coverage.get('catalog_comparable_count') or 0)
elif option['key'] == 'catalog_variant_review':
review_status_counts[option['key']] = int(coverage.get('catalog_variant_review_count') or 0)
elif option['key'] == 'catalog_unit_review':
review_status_counts[option['key']] = int(coverage.get('catalog_unit_review_count') or 0)
elif option['key'] == 'catalog_identity_review':
review_status_counts[option['key']] = int(coverage.get('catalog_identity_review_count') or 0)
elif option['key'] == 'true_low_confidence':
review_status_counts[option['key']] = max(
int(attempt_status.get('true_low_confidence') or 0)
- legacy_catalog_count,
0,
)
else:
review_status_counts[option['key']] = sum(
int(attempt_status.get(status) or 0)
for status in option['statuses']
)
overview.update({
'total_active': int(coverage.get('active_with_price') or overview.get('total_active') or 0),
'matched_count': int(coverage.get('valid_matches') or overview.get('matched_count') or 0),
'identity_coverage_count': int(
coverage.get('identity_coverage_matches')
or coverage.get('valid_matches')
or overview.get('identity_coverage_count')
or 0
),
'identity_coverage_rate': coverage.get(
'identity_coverage_rate',
overview.get('identity_coverage_rate') or coverage.get('match_rate', 0),
),
'match_rate': coverage.get('match_rate', overview.get('match_rate') or 0),
'fresh_match_count': int(coverage.get('fresh_matches') or 0),
'fresh_match_rate': coverage.get('fresh_match_rate', 0),
'decision_ready_count': int(coverage.get('decision_ready_matches') or coverage.get('fresh_matches') or 0),
'decision_ready_rate': coverage.get('decision_ready_rate', 0),
'decision_support_count': int(coverage.get('decision_support_count') or 0),
'decision_support_rate': coverage.get('decision_support_rate', 0),
'decision_support_non_exact_count': int(coverage.get('decision_support_non_exact_count') or 0),
'catalog_comparable_count': int(coverage.get('catalog_comparable_count') or 0),
'catalog_comparable_rate': coverage.get('catalog_comparable_rate', 0),
'catalog_variant_review_count': int(coverage.get('catalog_variant_review_count') or 0),
'catalog_unit_review_count': int(coverage.get('catalog_unit_review_count') or 0),
'catalog_identity_review_count': int(coverage.get('catalog_identity_review_count') or 0),
'catalog_review_plan': coverage.get('catalog_review_plan') or {},
'stale_match_count': int(coverage.get('stale_matches') or 0),
'unknown_freshness_count': int(coverage.get('unknown_freshness_matches') or 0),
'pending_match_count': int(coverage.get('pending') or overview.get('pending_match_count') or 0),
'review_queue_count': int(coverage.get('actionable_review_count') or len(review_queue) or 0),
'manual_closed_count': int(coverage.get('manual_closed_count') or 0),
'unit_comparable_count': int(coverage.get('unit_comparable_count') or 0),
'rescore_accepted_count': int(
coverage.get('rescore_accepted_count')
or review_status_counts.get('rescore_accepted')
or 0
),
'review_status_counts': review_status_counts,
'review_queue': review_queue[:3],
})
last_decision_ready = coverage.get('last_decision_ready_crawled_at')
if last_decision_ready:
overview['last_pchome_crawled'] = _format_dashboard_dt(last_decision_ready)
return overview
def _normalize_dashboard_category_filter(category_filter):
if not category_filter or category_filter == 'all':
return ''
if "(" in category_filter and "筆)" in category_filter:
return category_filter.rsplit(" (", 1)[0]
return category_filter
def _normalize_review_status_filter(review_status):
review_status = (review_status or '').strip()
if review_status == 'low_score':
review_status = 'legacy_low_score'
valid_keys = {option['key'] for option in REVIEW_STATUS_OPTIONS}
return review_status if review_status in valid_keys else 'all'
def _build_review_status_options(overview):
counts = (overview or {}).get('review_status_counts') or {}
return [
{
'key': option['key'],
'label': option['label'],
'count': int(counts.get(option['key']) or 0),
}
for option in REVIEW_STATUS_OPTIONS
]
def _load_competitor_review_page(
session,
page=1,
per_page=50,
search_query='',
category_filter='all',
review_status='all',
count_total=True,
):
try:
from services.competitor_intel_repository import fetch_competitor_review_queue_page
engine = _get_session_engine(session)
if not engine:
return {'items': [], 'total': 0, 'page': page, 'per_page': per_page}
review_status = _normalize_review_status_filter(review_status)
return fetch_competitor_review_queue_page(
engine,
page=page,
per_page=per_page,
search_query=search_query,
category=_normalize_dashboard_category_filter(category_filter),
status_filter='' if review_status == 'all' else review_status,
count_total=count_total,
)
except Exception as exc:
sys_log.warning(f"[Dashboard] PChome 覆核隊列分頁讀取略過: {exc}")
return {'items': [], 'total': 0, 'page': page, 'per_page': per_page}
def _parse_agent_footprint(value):
if not value:
return {}
if isinstance(value, str):
try:
value = json.loads(value)
except Exception:
return {}
if not isinstance(value, dict):
return {}
agent = value.get('agent')
return agent if isinstance(agent, dict) else {}
def _ai_pick_evidence_fields(model_footprint):
agent = _parse_agent_footprint(model_footprint)
missing_evidence = agent.get('missing_evidence') or []
if isinstance(missing_evidence, str):
missing_evidence = [missing_evidence]
missing_evidence = [str(item) for item in missing_evidence if item]
return {
'opportunity_score': _to_float(agent.get('opportunity_score')) or 0,
'evidence_quality': _to_float(agent.get('evidence_quality')) or 0,
'confidence_band': agent.get('confidence_band') or 'needs_evidence',
'missing_evidence': missing_evidence,
'missing_evidence_text': ''.join(missing_evidence),
'margin_rate': _to_float(agent.get('margin_rate')),
}
def _dashboard_decision_row(row, tone):
sku = str(row.get('sku') or '')
pchome_id = row.get('competitor_product_id')
momo_url = normalize_momo_product_url(row.get('momo_url'), sku) or _build_momo_product_url(sku)
return {
'sku': sku,
'name': row.get('name') or '',
'category': row.get('category') or '',
'momo_price': _to_float(row.get('momo_price')) or 0,
'pchome_price': _to_float(row.get('pchome_price')) or 0,
'gap_pct': _to_float(row.get('gap_pct')) or 0,
'gap_amount': _to_float(row.get('gap_amount')) or 0,
'confidence': _to_float(row.get('confidence')),
'reason': row.get('reason') or '',
'tone': tone,
'momo_url': momo_url,
'pchome_id': pchome_id,
'pchome_name': row.get('competitor_product_name') or '',
'pchome_url': _build_pchome_product_url(pchome_id),
'crawled_at': _format_dashboard_dt(row.get('crawled_at') or row.get('created_at')),
**_ai_pick_evidence_fields(row.get('model_footprint')),
}
def _load_competitor_decision_overview(session, latest_items=None):
"""讀取商品看板第一屏使用的 PChome 比價決策摘要。全部來自正式 DB。"""
cache_key = 'competitor_decision_overview'
cache_ts_key = 'competitor_decision_overview_timestamp'
cached = _DASHBOARD_DATA_CACHE.get(cache_key)
cached_ts = _DASHBOARD_DATA_CACHE.get(cache_ts_key)
if cached and cached_ts:
age = time.time() - cached_ts
if age < min(_DASHBOARD_CACHE_TTL, 300):
return cached
default = {
'total_active': 0,
'matched_count': 0,
'match_rate': 0,
'fresh_match_count': 0,
'fresh_match_rate': 0,
'decision_ready_count': 0,
'decision_ready_rate': 0,
'decision_support_count': 0,
'decision_support_rate': 0,
'decision_support_non_exact_count': 0,
'catalog_comparable_count': 0,
'catalog_comparable_rate': 0,
'catalog_variant_review_count': 0,
'catalog_unit_review_count': 0,
'catalog_identity_review_count': 0,
'catalog_review_plan': {},
'stale_match_count': 0,
'unknown_freshness_count': 0,
'pchome_advantage_count': 0,
'momo_threat_count': 0,
'near_count': 0,
'pending_match_count': 0,
'ai_pick_count': 0,
'avg_advantage_gap': 0,
'last_pchome_crawled': None,
'top_picks': [],
'top_pchome_advantages': [],
'top_momo_threats': [],
'pending_priority': [],
'review_queue_count': 0,
'unit_comparable_count': 0,
'rescore_accepted_count': 0,
'review_queue': [],
}
if latest_items:
try:
item_map = {}
for item in latest_items:
record = item.get('record')
product = getattr(record, 'product', None)
sku = str(getattr(product, 'i_code', '') or '')
if not sku:
continue
safe_product_url = normalize_momo_product_url(getattr(product, 'url', None), sku)
item_map[sku] = {
'sku': sku,
'name': getattr(product, 'name', '') or '',
'category': getattr(product, 'category', '') or '',
'momo_url': safe_product_url or _build_momo_product_url(sku),
'momo_price': _to_float(getattr(record, 'price', None)) or 0,
}
competitor_map = _load_pchome_competitor_map(session, item_map.keys())
compared = []
for sku, item in item_map.items():
competitor = competitor_map.get(sku)
pchome_price = _to_float(competitor.get('price')) if competitor else None
if not pchome_price:
continue
gap_amount = item['momo_price'] - pchome_price
gap_pct = gap_amount / pchome_price * 100 if pchome_price else 0
compared.append({
**item,
'pchome_price': pchome_price,
'competitor_product_id': competitor.get('product_id'),
'competitor_product_name': competitor.get('product_name'),
'match_score': competitor.get('match_score'),
'crawled_at': competitor.get('crawled_at'),
'gap_amount': gap_amount,
'gap_pct': gap_pct,
})
picks_rows = session.execute(text("""
SELECT
sku,
name,
momo_price,
pchome_price,
gap_pct,
confidence,
reason,
model_footprint,
created_at
FROM ai_price_recommendations
WHERE strategy = 'product_pick'
AND status = 'pending'
ORDER BY confidence DESC NULLS LAST, gap_pct DESC NULLS LAST, created_at DESC
LIMIT 3
""")).mappings().all()
ai_pick_count = session.execute(text("""
SELECT COUNT(*)
FROM ai_price_recommendations
WHERE strategy = 'product_pick'
AND status = 'pending'
""")).scalar() or 0
total_active = len(item_map)
matched_count = len(compared)
pchome_advantages = [row for row in compared if row['gap_pct'] >= 5]
momo_threats = [row for row in compared if row['gap_pct'] <= -5]
near_items = [row for row in compared if -5 < row['gap_pct'] < 5]
pending_items = [
row for sku, row in item_map.items()
if sku not in competitor_map
]
last_crawled = max(
(row.get('crawled_at') for row in compared if row.get('crawled_at')),
default=None,
)
overview = dict(default)
overview.update({
'total_active': total_active,
'matched_count': matched_count,
'match_rate': round(matched_count / max(total_active, 1) * 100, 1),
'pchome_advantage_count': len(pchome_advantages),
'momo_threat_count': len(momo_threats),
'near_count': len(near_items),
'pending_match_count': max(total_active - matched_count, 0),
'ai_pick_count': int(ai_pick_count),
'avg_advantage_gap': round(
sum(row['gap_pct'] for row in pchome_advantages) / len(pchome_advantages),
1,
) if pchome_advantages else 0,
'last_pchome_crawled': _format_dashboard_dt(last_crawled),
})
overview['top_pchome_advantages'] = [
_dashboard_decision_row(row, 'win')
for row in sorted(pchome_advantages, key=lambda row: row['gap_pct'], reverse=True)[:3]
]
overview['top_momo_threats'] = [
_dashboard_decision_row(row, 'risk')
for row in sorted(momo_threats, key=lambda row: row['gap_pct'])[:3]
]
overview['top_picks'] = []
for row in picks_rows:
pick = dict(row)
competitor = competitor_map.get(str(pick.get('sku') or '')) or {}
pick['competitor_product_id'] = competitor.get('product_id')
pick['competitor_product_name'] = competitor.get('product_name')
pick['crawled_at'] = competitor.get('crawled_at')
overview['top_picks'].append(_dashboard_decision_row(pick, 'pick'))
overview['pending_priority'] = [
{
'sku': row['sku'],
'name': row['name'],
'category': row['category'],
'momo_price': row['momo_price'],
'momo_url': normalize_momo_product_url(row.get('momo_url'), row.get('sku')) or _build_momo_product_url(row.get('sku')),
}
for row in sorted(pending_items, key=lambda row: row['momo_price'], reverse=True)[:3]
]
_merge_competitor_review_context(
overview,
_load_competitor_review_context(session, limit=12),
)
_DASHBOARD_DATA_CACHE[cache_key] = overview
_DASHBOARD_DATA_CACHE[cache_ts_key] = time.time()
return overview
except Exception as exc:
sys_log.warning(f"[Dashboard] PChome 比價快取摘要讀取略過,改用 SQL 後備: {exc}")
try:
session.rollback()
except Exception:
pass
latest_compared_cte = f"""
WITH latest_momo AS (
SELECT
p.id AS product_id,
p.i_code AS sku,
p.name,
p.url AS momo_url,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
),
latest_products AS (
SELECT * FROM latest_momo WHERE rn = 1
),
valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku,
cp.price AS pchome_price,
cp.competitor_product_id,
cp.competitor_product_name,
cp.match_score,
cp.crawled_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
),
compared AS (
SELECT
lp.*,
vc.pchome_price,
vc.competitor_product_id,
vc.competitor_product_name,
vc.match_score,
vc.crawled_at,
(lp.momo_price - vc.pchome_price) AS gap_amount,
((lp.momo_price - vc.pchome_price) / vc.pchome_price * 100) AS gap_pct
FROM latest_products lp
JOIN valid_competitor vc ON vc.sku = lp.sku
)
"""
stats_sql = text(latest_compared_cte + """
SELECT
(SELECT COUNT(*) FROM products WHERE status = 'ACTIVE') AS total_active,
(SELECT COUNT(*) FROM compared) AS matched_count,
(SELECT COUNT(*) FROM compared WHERE gap_pct >= 5) AS pchome_advantage_count,
(SELECT COUNT(*) FROM compared WHERE gap_pct <= -5) AS momo_threat_count,
(SELECT COUNT(*) FROM compared WHERE gap_pct > -5 AND gap_pct < 5) AS near_count,
(SELECT COALESCE(ROUND(AVG(gap_pct)::numeric, 1), 0) FROM compared WHERE gap_pct >= 5) AS avg_advantage_gap,
(SELECT COUNT(*) FROM ai_price_recommendations WHERE strategy = 'product_pick' AND status = 'pending') AS ai_pick_count,
(SELECT MAX(crawled_at) FROM compared) AS last_pchome_crawled
""")
advantage_sql = text(latest_compared_cte + """
SELECT *
FROM compared
WHERE gap_pct >= 5
ORDER BY gap_pct DESC NULLS LAST, crawled_at DESC NULLS LAST
LIMIT 3
""")
threat_sql = text(latest_compared_cte + """
SELECT *
FROM compared
WHERE gap_pct <= -5
ORDER BY gap_pct ASC NULLS LAST, crawled_at DESC NULLS LAST
LIMIT 3
""")
pending_sql = text(f"""
WITH latest_momo AS (
SELECT
p.i_code AS sku,
p.name,
p.url AS momo_url,
p.category,
pr.price AS momo_price,
ROW_NUMBER() OVER (PARTITION BY p.id ORDER BY pr.timestamp DESC, pr.id DESC) AS rn
FROM products p
JOIN price_records pr ON pr.product_id = p.id
WHERE p.status = 'ACTIVE'
)
SELECT lm.*
FROM latest_momo lm
LEFT JOIN competitor_prices cp
ON cp.sku = lm.sku
AND cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
WHERE lm.rn = 1
AND cp.sku IS NULL
ORDER BY lm.momo_price DESC NULLS LAST
LIMIT 3
""")
picks_sql = text(f"""
WITH valid_competitor AS (
SELECT DISTINCT ON (cp.sku)
cp.sku,
cp.competitor_product_id,
cp.competitor_product_name,
cp.crawled_at
FROM competitor_prices cp
WHERE cp.source = 'pchome'
AND (cp.expires_at IS NULL OR cp.expires_at > CURRENT_TIMESTAMP)
AND cp.price IS NOT NULL
AND cp.price > 0
AND COALESCE(cp.match_score, 0) >= {PCHOME_MATCH_SCORE_FLOOR}
AND COALESCE(cp.tags, '[]'::jsonb) ? 'identity_v2'
ORDER BY cp.sku, cp.crawled_at DESC NULLS LAST
)
SELECT
ar.sku,
ar.name,
ar.momo_price,
ar.pchome_price,
ar.gap_pct,
ar.confidence,
ar.reason,
ar.model_footprint,
ar.created_at,
vc.competitor_product_id,
vc.competitor_product_name,
vc.crawled_at
FROM ai_price_recommendations ar
LEFT JOIN valid_competitor vc ON vc.sku = ar.sku
WHERE ar.strategy = 'product_pick'
AND ar.status = 'pending'
ORDER BY ar.confidence DESC NULLS LAST, ar.gap_pct DESC NULLS LAST, ar.created_at DESC
LIMIT 3
""")
try:
stats = session.execute(stats_sql).mappings().first()
overview = dict(default)
if stats:
total_active = int(stats.get('total_active') or 0)
matched_count = int(stats.get('matched_count') or 0)
overview.update({
'total_active': total_active,
'matched_count': matched_count,
'match_rate': round(matched_count / max(total_active, 1) * 100, 1),
'pchome_advantage_count': int(stats.get('pchome_advantage_count') or 0),
'momo_threat_count': int(stats.get('momo_threat_count') or 0),
'near_count': int(stats.get('near_count') or 0),
'pending_match_count': max(total_active - matched_count, 0),
'ai_pick_count': int(stats.get('ai_pick_count') or 0),
'avg_advantage_gap': _to_float(stats.get('avg_advantage_gap')) or 0,
'last_pchome_crawled': _format_dashboard_dt(stats.get('last_pchome_crawled')),
})
overview['top_pchome_advantages'] = [
_dashboard_decision_row(row, 'win')
for row in session.execute(advantage_sql).mappings().all()
]
overview['top_momo_threats'] = [
_dashboard_decision_row(row, 'risk')
for row in session.execute(threat_sql).mappings().all()
]
overview['top_picks'] = [
_dashboard_decision_row(row, 'pick')
for row in session.execute(picks_sql).mappings().all()
]
overview['pending_priority'] = [
{
'sku': str(row.get('sku') or ''),
'name': row.get('name') or '',
'category': row.get('category') or '',
'momo_price': _to_float(row.get('momo_price')) or 0,
'momo_url': normalize_momo_product_url(row.get('momo_url'), row.get('sku')) or _build_momo_product_url(row.get('sku')),
}
for row in session.execute(pending_sql).mappings().all()
]
_merge_competitor_review_context(
overview,
_load_competitor_review_context(session, limit=12),
)
_DASHBOARD_DATA_CACHE[cache_key] = overview
_DASHBOARD_DATA_CACHE[cache_ts_key] = time.time()
return overview
except Exception as exc:
sys_log.warning(f"[Dashboard] PChome 比價決策摘要讀取略過: {exc}")
try:
session.rollback()
except Exception:
pass
_merge_competitor_review_context(
default,
_load_competitor_review_context(session, limit=12),
)
_DASHBOARD_DATA_CACHE[cache_key] = default
_DASHBOARD_DATA_CACHE[cache_ts_key] = time.time()
return default
def _load_pchome_growth_command_center(session):
"""Build the first-screen PChome revenue command center from real sales and comparison data."""
cache_key = 'pchome_growth_command_center'
cache_ts_key = 'pchome_growth_command_center_timestamp'
cached = _DASHBOARD_DATA_CACHE.get(cache_key)
cached_ts = _DASHBOARD_DATA_CACHE.get(cache_ts_key)
if cached and cached_ts and time.time() - cached_ts < 120:
return cached
default = {
'success': False,
'latest_sales_date': None,
'sales_7d': 0,
'sales_prev_7d': 0,
'sales_delta_pct': None,
'sales_delta_label': '待匯入',
'sales_delta_tone': 'neutral',
'sales_current_width': 0,
'sales_prev_width': 0,
'qty_7d': 0,
'active_product_count': 0,
'declining_product_count': 0,
'top_category': '',
'top_category_sales_7d': 0,
'candidate_count': 0,
'mapped_count': 0,
'mapping_rate': 0,
'mapping_rate_width': 0,
'needs_mapping_count': 0,
'opportunity_sales_7d': 0,
'action_code_counts': {},
'action_counts': {},
'priority_tasks': [],
'strategy_lanes': [],
'top_opportunities': [],
'message': 'PChome 業績作戰資料整理中',
}
try:
from services.pchome_revenue_growth_service import build_pchome_growth_opportunities
engine = session.get_bind()
payload = build_pchome_growth_opportunities(engine, limit=16)
stats = payload.get('stats') or {}
opportunities = payload.get('opportunities') or []
sales_7d = _to_float(stats.get('overall_sales_7d')) or 0
sales_prev_7d = _to_float(stats.get('overall_sales_prev_7d')) or 0
sales_delta_pct = stats.get('overall_sales_delta_pct')
sales_delta_value = _to_float(sales_delta_pct)
max_sales = max(sales_7d, sales_prev_7d, 1)
mapping_rate = _to_float(stats.get('mapping_rate')) or 0
action_code_counts = stats.get('action_code_counts') or {}
if sales_delta_value is None:
sales_delta_label = '前期不足'
sales_delta_tone = 'neutral'
elif sales_delta_value < -10:
sales_delta_label = f'較前 7 天 {sales_delta_value:.1f}%'
sales_delta_tone = 'danger'
elif sales_delta_value < 0:
sales_delta_label = f'較前 7 天 {sales_delta_value:.1f}%'
sales_delta_tone = 'warning'
else:
sales_delta_label = f'較前 7 天 +{sales_delta_value:.1f}%'
sales_delta_tone = 'success'
priority_tasks = []
needs_mapping = int(stats.get('needs_mapping_count') or 0)
if needs_mapping:
priority_tasks.append({
'rank': 1,
'tone': 'danger' if mapping_rate < 25 else 'warning',
'title': f'先補 {needs_mapping} 個高業績商品對應',
'metric': f'比價覆蓋 {mapping_rate:.1f}%',
'action': 'backfill',
'button': '啟動補抓',
})
review_price_count = int(action_code_counts.get('review_price_or_promo') or 0)
if review_price_count:
priority_tasks.append({
'rank': len(priority_tasks) + 1,
'tone': 'danger',
'title': f'檢查 {review_price_count} 個 MOMO 低價壓力商品',
'metric': '調價 / 券 / 組合',
'action': 'price_review',
'button': '看價格壓力',
})
amplify_count = int(action_code_counts.get('amplify_price_advantage') or 0)
if amplify_count:
priority_tasks.append({
'rank': len(priority_tasks) + 1,
'tone': 'success',
'title': f'放大 {amplify_count} 個 PChome 價格優勢',
'metric': '曝光 / 文案 / 主推',
'action': 'ai_picks',
'button': '看主推清單',
})
recover_count = int(action_code_counts.get('recover_sales_momentum') or 0)
if recover_count:
priority_tasks.append({
'rank': len(priority_tasks) + 1,
'tone': 'warning',
'title': f'找回 {recover_count} 個下滑商品動能',
'metric': '庫存 / 頁面 / 活動',
'action': 'daily_sales',
'button': '看業績',
})
if not priority_tasks:
priority_tasks.append({
'rank': 1,
'tone': 'neutral',
'title': '先看今日高業績商品',
'metric': '業績穩定,持續監控',
'action': 'daily_sales',
'button': '看業績',
})
strategy_lanes = [
{
'key': 'price_pressure',
'label': 'MOMO 更便宜',
'value': review_price_count,
'action': '檢查售價 / 券 / 組合',
'tone': 'danger',
},
{
'key': 'price_advantage',
'label': 'PChome 有優勢',
'value': amplify_count,
'action': '拉曝光 / 強化文案',
'tone': 'success',
},
{
'key': 'bundle',
'label': '單品 / 組合待判斷',
'value': sum(1 for item in opportunities if (item.get('external_price') or {}).get('price_basis') == 'unit_price'),
'action': '看單位價,決定組合包',
'tone': 'warning',
},
{
'key': 'mapping',
'label': '找不到同款',
'value': needs_mapping,
'action': '補抓 MOMO 候選',
'tone': 'neutral',
},
]
command_center = dict(default)
command_center.update({
'success': bool(payload.get('success', True)),
'latest_sales_date': stats.get('overall_latest_sales_date') or stats.get('latest_sales_date'),
'sales_7d': sales_7d,
'sales_prev_7d': sales_prev_7d,
'sales_delta_pct': sales_delta_value,
'sales_delta_label': sales_delta_label,
'sales_delta_tone': sales_delta_tone,
'sales_current_width': round(sales_7d / max_sales * 100, 1),
'sales_prev_width': round(sales_prev_7d / max_sales * 100, 1),
'qty_7d': _to_float(stats.get('overall_qty_7d')) or 0,
'active_product_count': int(stats.get('active_product_count') or 0),
'declining_product_count': int(stats.get('declining_product_count') or 0),
'top_category': stats.get('top_category') or '',
'top_category_sales_7d': _to_float(stats.get('top_category_sales_7d')) or 0,
'candidate_count': int(stats.get('candidate_count') or 0),
'mapped_count': int(stats.get('mapped_count') or 0),
'mapping_rate': round(mapping_rate, 1),
'mapping_rate_width': round(max(0, min(100, mapping_rate)), 1),
'needs_mapping_count': needs_mapping,
'opportunity_sales_7d': _to_float(stats.get('opportunity_sales_7d') or stats.get('total_sales_7d')) or 0,
'action_code_counts': action_code_counts,
'action_counts': stats.get('action_counts') or {},
'priority_tasks': priority_tasks[:4],
'strategy_lanes': strategy_lanes,
'top_opportunities': opportunities[:6],
'message': payload.get('message') or default['message'],
})
_DASHBOARD_DATA_CACHE[cache_key] = command_center
_DASHBOARD_DATA_CACHE[cache_ts_key] = time.time()
return command_center
except Exception as exc:
sys_log.warning(f"[Dashboard] PChome 業績作戰台讀取略過: {exc}")
try:
session.rollback()
except Exception:
pass
_DASHBOARD_DATA_CACHE[cache_key] = default
_DASHBOARD_DATA_CACHE[cache_ts_key] = time.time()
return default
def _load_ai_pick_selection(session, limit=PRODUCT_PICK_LIST_LIMIT):
"""讀取商品看板 AI 挑品清單排序,供列表篩選使用。"""
sql = text("""
SELECT
sku,
confidence,
reason,
momo_price,
pchome_price,
gap_pct,
model_footprint,
created_at
FROM ai_price_recommendations
WHERE strategy = 'product_pick'
AND status = 'pending'
ORDER BY confidence DESC NULLS LAST, gap_pct DESC NULLS LAST, created_at DESC
LIMIT :limit
""")
try:
rows = session.execute(sql, {"limit": limit}).mappings().all()
except Exception as exc:
sys_log.warning(f"[Dashboard] AI 挑品清單讀取略過: {exc}")
try:
session.rollback()
except Exception:
pass
return [], {}
skus = []
pick_map = {}
for idx, row in enumerate(rows, start=1):
sku = str(row.get('sku') or '')
if not sku or sku in pick_map:
continue
skus.append(sku)
pick_map[sku] = {
'rank': idx,
'confidence': _to_float(row.get('confidence')) or 0,
'reason': row.get('reason') or '',
'momo_price': _to_float(row.get('momo_price')) or 0,
'pchome_price': _to_float(row.get('pchome_price')) or 0,
'gap_pct': _to_float(row.get('gap_pct')) or 0,
'created_at': _format_dashboard_dt(row.get('created_at')),
**_ai_pick_evidence_fields(row.get('model_footprint')),
}
return skus, pick_map
def _summarize_ai_pick_selection(ai_pick_map):
"""彙整目前 AI 挑品清單的可操作摘要,全部來自 ai_price_recommendations。"""
picks = list(ai_pick_map.values())
if not picks:
return {
'count': 0,
'avg_confidence': 0,
'avg_evidence_quality': 0,
'avg_opportunity_score': 0,
'avg_gap_pct': 0,
'max_gap_pct': 0,
'total_gap_amount': 0,
'high_confidence_count': 0,
'needs_evidence_count': 0,
'top_missing_evidence': [],
'generated_at': None,
}
confidence_values = [pick.get('confidence', 0) for pick in picks]
evidence_values = [pick.get('evidence_quality', 0) for pick in picks]
opportunity_values = [pick.get('opportunity_score', 0) for pick in picks]
gap_values = [pick.get('gap_pct', 0) for pick in picks]
missing_counts = {}
for pick in picks:
for label in pick.get('missing_evidence', []):
missing_counts[label] = missing_counts.get(label, 0) + 1
total_gap_amount = sum(
max((pick.get('momo_price') or 0) - (pick.get('pchome_price') or 0), 0)
for pick in picks
)
return {
'count': len(picks),
'avg_confidence': round(sum(confidence_values) / len(confidence_values), 3),
'avg_evidence_quality': round(sum(evidence_values) / len(evidence_values), 1),
'avg_opportunity_score': round(sum(opportunity_values) / len(opportunity_values), 1),
'avg_gap_pct': round(sum(gap_values) / len(gap_values), 1),
'max_gap_pct': round(max(gap_values), 1),
'total_gap_amount': round(total_gap_amount),
'high_confidence_count': sum(1 for value in confidence_values if value >= 0.65),
'needs_evidence_count': sum(1 for pick in picks if pick.get('confidence_band') == 'needs_evidence'),
'top_missing_evidence': [
{'label': label, 'count': count}
for label, count in sorted(missing_counts.items(), key=lambda item: item[1], reverse=True)[:3]
],
'generated_at': max(
(pick.get('created_at') for pick in picks if pick.get('created_at')),
default=None,
),
}
# ==========================================
# 快取與監控變數
# ==========================================
import fcntl
_DASHBOARD_LOCK_FILE = os.path.join(BASE_DIR, 'data', '.dashboard_cache.lock') # V-Opt: 檔案鎖(跨進程)
class FileLock:
"""簡單的檔案鎖,用於 gunicorn 多進程環境"""
def __init__(self, lock_file):
self.lock_file = lock_file
self.fd = None
def acquire(self, blocking=True):
"""取得鎖"""
try:
self.fd = open(self.lock_file, 'w')
if blocking:
fcntl.flock(self.fd, fcntl.LOCK_EX)
else:
fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except (IOError, OSError):
if self.fd:
self.fd.close()
self.fd = None
return False
def release(self):
"""釋放鎖"""
if self.fd:
fcntl.flock(self.fd, fcntl.LOCK_UN)
self.fd.close()
self.fd = None
_DASHBOARD_STALE_CACHE_MAX_AGE = 86400
_DASHBOARD_REFRESH_STATE = {
'started_at': 0,
'running': False,
}
def _new_dashboard_file_lock():
return FileLock(_DASHBOARD_LOCK_FILE)
def _load_dashboard_cache_file(now, cache_path, *, allow_stale=False, label='共享'):
"""讀取跨 worker 商品看板深度快取;必要時可讀舊快取救援首屏。"""
cache_file = str(cache_path)
if not os.path.exists(cache_file):
return None
try:
with open(cache_file, 'rb') as f:
payload = pickle.load(f)
full_timestamp = payload.get('full_timestamp')
full_data = payload.get('full_data')
if not full_timestamp or not full_data:
return None
age = now.timestamp() - full_timestamp
max_age = _DASHBOARD_STALE_CACHE_MAX_AGE if allow_stale else _DASHBOARD_CACHE_TTL
if age >= max_age:
return None
_DASHBOARD_DATA_CACHE['full_data'] = full_data
_DASHBOARD_DATA_CACHE['full_timestamp'] = full_timestamp
_DASHBOARD_DATA_CACHE['consolidated_data'] = payload.get('consolidated_data')
_DASHBOARD_DATA_CACHE['consolidated_timestamp'] = payload.get('consolidated_timestamp')
_DASHBOARD_DATA_CACHE['today_start'] = payload.get('today_start')
level = sys_log.info if allow_stale and age >= _DASHBOARD_CACHE_TTL else sys_log.debug
level(f"[Dashboard] [Cache] ✅ 使用{label}完整看板快取 | 快取年齡: {age:.0f}")
return full_data
except Exception as exc:
sys_log.warning(f"[Dashboard] [Cache] {label}快取讀取失敗,改走資料庫重建: {exc}")
return None
def _load_shared_full_dashboard_cache(now):
return _load_dashboard_cache_file(now, _DASHBOARD_SHARED_CACHE_FILE)
def _load_stale_full_dashboard_cache(now):
stale_data = _load_dashboard_cache_file(
now,
_DASHBOARD_STALE_CACHE_FILE,
allow_stale=True,
label='舊版救援',
)
if stale_data:
return stale_data
return _load_dashboard_cache_file(
now,
_DASHBOARD_SHARED_CACHE_FILE,
allow_stale=True,
label='過期救援',
)
def _load_expired_shared_full_dashboard_cache(now):
"""只讀原 shared cache 檔的過期版本;不讀 clear_cache 後搬走的 stale 檔。"""
return _load_dashboard_cache_file(
now,
_DASHBOARD_SHARED_CACHE_FILE,
allow_stale=True,
label='過期共享',
)
def _trigger_dashboard_background_refresh(reason):
"""使用過期 shared cache 先回首頁時,背景補一次 fresh cache避免使用者卡 10s。"""
now_ts = time.time()
if _DASHBOARD_REFRESH_STATE['running']:
return
if now_ts - _DASHBOARD_REFRESH_STATE['started_at'] < 60:
return
def _refresh():
_DASHBOARD_REFRESH_STATE['running'] = True
try:
warm_full_dashboard_cache(reason=reason, force_rebuild=True)
except Exception as exc:
sys_log.warning(f"[Dashboard] [Cache] 背景預熱失敗: {exc}")
finally:
_DASHBOARD_REFRESH_STATE['running'] = False
_DASHBOARD_REFRESH_STATE['started_at'] = now_ts
thread = threading.Thread(target=_refresh, name='dashboard-cache-refresh', daemon=True)
thread.start()
def _write_shared_full_dashboard_cache(full_data):
"""原子寫入跨 worker 共享的商品看板深度快取。"""
cache_file = str(_DASHBOARD_SHARED_CACHE_FILE)
tmp_file = f"{cache_file}.{os.getpid()}.tmp"
payload = {
'full_data': full_data,
'full_timestamp': _DASHBOARD_DATA_CACHE.get('full_timestamp'),
'consolidated_data': _DASHBOARD_DATA_CACHE.get('consolidated_data'),
'consolidated_timestamp': _DASHBOARD_DATA_CACHE.get('consolidated_timestamp'),
'today_start': _DASHBOARD_DATA_CACHE.get('today_start'),
}
try:
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
with open(tmp_file, 'wb') as f:
pickle.dump(payload, f, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(tmp_file, cache_file)
except Exception as exc:
sys_log.warning(f"[Dashboard] [Cache] 共享快取寫入失敗,仍保留記憶體快取: {exc}")
try:
if os.path.exists(tmp_file):
os.remove(tmp_file)
except OSError:
pass
def warm_full_dashboard_cache(reason='manual', force_rebuild=False):
"""供 API、排程與 Gunicorn worker 啟動時預熱商品看板完整快取。"""
started = time.time()
data = get_full_dashboard_data(force_rebuild=force_rebuild)
duration_ms = (time.time() - started) * 1000
sys_log.info(
f"[Dashboard] [Cache] ✅ 預熱完成 | reason={reason} | "
f"items={len(data.get('unique_items', [])) if data else 0} | 耗時={duration_ms:.0f}ms"
)
return data
# 慢查詢監控
_SLOW_QUERY_STATS = {
'total_queries': 0,
'slow_queries': 0,
'very_slow_queries': 0,
'total_query_time_ms': 0,
'last_slow_query': None,
'last_slow_query_time': None,
}
_SLOW_QUERY_THRESHOLD_MS = 1000
_VERY_SLOW_QUERY_THRESHOLD_MS = 5000
def track_query_time(query_name, duration_ms):
"""追蹤查詢時間,更新慢查詢統計"""
global _SLOW_QUERY_STATS
_SLOW_QUERY_STATS['total_queries'] += 1
_SLOW_QUERY_STATS['total_query_time_ms'] += duration_ms
if duration_ms >= _VERY_SLOW_QUERY_THRESHOLD_MS:
_SLOW_QUERY_STATS['very_slow_queries'] += 1
_SLOW_QUERY_STATS['slow_queries'] += 1
_SLOW_QUERY_STATS['last_slow_query'] = query_name
_SLOW_QUERY_STATS['last_slow_query_time'] = datetime.now(TAIPEI_TZ).isoformat()
elif duration_ms >= _SLOW_QUERY_THRESHOLD_MS:
_SLOW_QUERY_STATS['slow_queries'] += 1
_SLOW_QUERY_STATS['last_slow_query'] = query_name
_SLOW_QUERY_STATS['last_slow_query_time'] = datetime.now(TAIPEI_TZ).isoformat()
# ==========================================
# 輔助函數
# ==========================================
def get_color_for_string(s):
"""為字串生成一個穩定且美觀的 HSL 顏色"""
if not s:
return "hsl(0, 0%, 85%)"
hash_val = int(hashlib.md5(s.encode('utf-8'), usedforsecurity=False).hexdigest(), 16)
hue = hash_val % 360
return f"hsl({hue}, 60%, 88%)"
def load_scheduler_stats():
"""讀取排程統計資料"""
stats_path = os.path.join(BASE_DIR, 'data', 'scheduler_stats.json')
if os.path.exists(stats_path):
try:
with open(stats_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (IOError, json.JSONDecodeError):
return {}
return {}
def _load_dashboard_system_status():
system_status = {"status": "UNKNOWN", "message": "尚無執行紀錄", "timestamp": "-"}
status_path = os.path.join(BASE_DIR, 'data/system_status.json')
if os.path.exists(status_path):
try:
with open(status_path, 'r', encoding='utf-8') as f:
system_status = json.load(f)
except Exception:
pass
return system_status
def _load_dashboard_categories(session):
try:
rows = session.execute(text("""
SELECT COALESCE(NULLIF(category, ''), '未分類') AS category, COUNT(*) AS count
FROM products
WHERE status = 'ACTIVE'
GROUP BY COALESCE(NULLIF(category, ''), '未分類')
ORDER BY COALESCE(NULLIF(category, ''), '未分類')
""")).mappings().all()
return [f"{row.get('category')} ({int(row.get('count') or 0)}筆)" for row in rows]
except Exception as exc:
sys_log.warning(f"[Dashboard] 分類清單讀取略過: {exc}")
return []
def _build_review_dashboard_items(session, review_queue, today_start):
"""只替 PChome 覆核當頁建立商品列,避免載入全站商品快取。"""
sku_order = [str(row.get('sku') or '') for row in review_queue if row.get('sku')]
if not sku_order:
return []
today_cutoff = today_start.replace(tzinfo=None) if getattr(today_start, 'tzinfo', None) else today_start
seven_day_cutoff = today_cutoff - timedelta(days=6)
stmt = text("""
SELECT
p.id,
p.i_code,
p.name,
p.category,
p.url,
p.image_url,
p.created_at,
latest_price.price AS current_price,
latest_price.timestamp AS current_timestamp,
yesterday_price.price AS yesterday_price,
week_price.price AS week_price
FROM products p
JOIN LATERAL (
SELECT pr.price, pr.timestamp
FROM price_records pr
WHERE pr.product_id = p.id
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) latest_price ON TRUE
LEFT JOIN LATERAL (
SELECT pr.price
FROM price_records pr
WHERE pr.product_id = p.id
AND pr.timestamp < :today_cutoff
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) yesterday_price ON TRUE
LEFT JOIN LATERAL (
SELECT pr.price
FROM price_records pr
WHERE pr.product_id = p.id
AND pr.timestamp < :seven_day_cutoff
ORDER BY pr.timestamp DESC, pr.id DESC
LIMIT 1
) week_price ON TRUE
WHERE p.i_code IN :skus
""").bindparams(bindparam("skus", expanding=True))
rows = session.execute(
stmt,
{
"skus": sku_order,
"today_cutoff": today_cutoff,
"seven_day_cutoff": seven_day_cutoff,
},
).mappings().all()
row_map = {str(row.get('i_code') or ''): row for row in rows}
review_map = {str(row.get('sku') or ''): row for row in review_queue if row.get('sku')}
items = []
for sku in sku_order:
row = row_map.get(sku)
review = review_map.get(sku) or {}
if not row:
continue
price = _to_float(review.get('momo_price')) or _to_float(row.get('current_price')) or 0
yesterday_price = _to_float(row.get('yesterday_price'))
week_price = _to_float(row.get('week_price'))
yesterday_diff = price - yesterday_price if yesterday_price is not None else 0
week_diff = price - week_price if week_price is not None else 0
product = SimpleNamespace(
id=row.get('id'),
i_code=sku,
name=row.get('name') or review.get('name') or '',
category=row.get('category') or review.get('category') or '',
url=row.get('url'),
image_url=row.get('image_url'),
created_at=row.get('created_at'),
)
record = SimpleNamespace(
product=product,
product_id=row.get('id'),
price=price,
timestamp=row.get('current_timestamp'),
)
status = "PRICE_UP" if yesterday_diff > 0 else ("PRICE_DOWN" if yesterday_diff < 0 else "NONE")
items.append({
'record': record,
'safe_product_url': normalize_momo_product_url(row.get('url'), sku) or _build_momo_product_url(sku),
'stats': {'7d_diff': week_diff, '30d_diff': 0, '1d_diff': 0},
'yesterday_diff': yesterday_diff,
'today_changes': [],
'status': status,
})
return items
def _enrich_dashboard_items_for_competitor_review(session, paged_items, review_queue_map, ai_pick_map=None):
ai_pick_map = ai_pick_map or {}
for item in paged_items:
item['safe_created_at'] = getattr(item['record'].product, 'created_at', None)
sku = str(item['record'].product.i_code)
item['ai_pick'] = ai_pick_map.get(sku)
item['pchome_review'] = review_queue_map.get(sku)
item['safe_momo_url'] = (
item.get('safe_product_url')
or normalize_momo_product_url(item['record'].product.url, sku)
or _build_momo_product_url(sku)
)
item['category_color'] = get_color_for_string(item['record'].product.category)
skus = [item['record'].product.i_code for item in paged_items]
pchome_map = _load_pchome_competitor_map(session, skus)
pchome_attempt_map = _load_pchome_match_attempt_map(session, skus)
pchome_ineligible_map = _load_pchome_ineligible_competitor_map(session, skus)
for item in paged_items:
product = item['record'].product
sku = str(product.i_code)
competitor = pchome_map.get(sku)
attempt = pchome_attempt_map.get(sku)
ineligible = pchome_ineligible_map.get(sku)
match_status = _build_pchome_match_status(attempt, ineligible=ineligible)
item['pchome_competitor'] = competitor
item['pchome_match_attempt'] = attempt
item['pchome_ineligible_competitor'] = ineligible
item['pchome_match_status'] = match_status
item['competitor_decision'] = _build_competitor_decision(
item['record'].price,
competitor.get('price') if competitor else None,
match_status=match_status,
)
return paged_items
def _load_cached_competitor_overview_for_review(now_taipei, review_queue, review_queue_total, review_status):
full_data = None
cached_data = _DASHBOARD_DATA_CACHE.get('full_data')
cached_ts = _DASHBOARD_DATA_CACHE.get('full_timestamp')
if cached_data and cached_ts and now_taipei.timestamp() - cached_ts < _DASHBOARD_STALE_CACHE_MAX_AGE:
full_data = cached_data
if not full_data:
full_data = _load_shared_full_dashboard_cache(now_taipei) or _load_stale_full_dashboard_cache(now_taipei)
overview = dict((full_data or {}).get('competitor_overview') or {})
if not overview:
overview = {
'total_active': 0,
'matched_count': 0,
'match_rate': 0,
'decision_ready_count': 0,
'decision_ready_rate': 0,
'decision_support_count': 0,
'decision_support_rate': 0,
'decision_support_non_exact_count': 0,
'catalog_comparable_count': 0,
'catalog_comparable_rate': 0,
'catalog_variant_review_count': 0,
'catalog_unit_review_count': 0,
'catalog_identity_review_count': 0,
'catalog_review_plan': {},
'unknown_freshness_count': 0,
'pchome_advantage_count': 0,
'momo_threat_count': 0,
'near_count': 0,
'pending_match_count': 0,
'ai_pick_count': 0,
'avg_advantage_gap': 0,
'last_pchome_crawled': None,
'top_picks': [],
'top_pchome_advantages': [],
'top_momo_threats': [],
'pending_priority': [],
'review_queue': [],
'review_status_counts': {},
}
status_counts = dict(overview.get('review_status_counts') or {})
status_counts['all'] = max(int(status_counts.get('all') or 0), int(review_queue_total or 0))
if review_status and review_status != 'all':
status_counts[review_status] = max(int(status_counts.get(review_status) or 0), int(review_queue_total or 0))
overview['review_status_counts'] = status_counts
overview['review_queue_count'] = max(
int(overview.get('review_queue_count') or 0),
int(review_queue_total or 0),
)
if not overview.get('review_queue'):
overview['review_queue'] = list(review_queue[:3])
overview.setdefault('unit_comparable_count', 0)
overview.setdefault('rescore_accepted_count', 0)
overview.setdefault('decision_support_count', overview.get('decision_ready_count') or 0)
overview.setdefault('decision_support_rate', overview.get('decision_ready_rate') or 0)
overview.setdefault('decision_support_non_exact_count', 0)
overview.setdefault('catalog_comparable_count', 0)
overview.setdefault('catalog_comparable_rate', 0)
overview.setdefault('catalog_variant_review_count', 0)
overview.setdefault('catalog_unit_review_count', 0)
overview.setdefault('catalog_identity_review_count', 0)
overview.setdefault('catalog_review_plan', {})
return overview
def _render_pchome_review_dashboard(
session,
*,
page,
per_page,
category_filter,
sort_by,
filter_type,
order,
review_status,
search_query,
now_taipei,
today_start_db,
):
fresh_review_context = _load_competitor_review_context(session, limit=12)
overview_hint = _load_cached_competitor_overview_for_review(
now_taipei,
[],
0,
review_status,
)
_merge_competitor_review_context(overview_hint, fresh_review_context)
count_total = (
review_status != 'all'
or bool(search_query)
or bool(_normalize_dashboard_category_filter(category_filter))
)
review_page = _load_competitor_review_page(
session,
page=page,
per_page=per_page,
search_query=search_query,
category_filter=category_filter,
review_status=review_status,
count_total=count_total,
)
review_queue = review_page.get('items') or []
review_queue_total = int(review_page.get('total') or 0)
review_total_is_estimated = False
if review_queue_total < 0:
review_queue_total = int(
(overview_hint.get('review_status_counts') or {}).get('all')
or overview_hint.get('review_queue_count')
or len(review_queue)
or 0
)
review_total_is_estimated = True
review_queue_map = {
str(row.get('sku') or ''): row
for row in review_queue
if row.get('sku')
}
paged_items = _build_review_dashboard_items(session, review_queue, today_start_db)
_enrich_dashboard_items_for_competitor_review(session, paged_items, review_queue_map)
competitor_overview = _load_cached_competitor_overview_for_review(
now_taipei,
review_queue,
review_queue_total,
review_status,
)
_merge_competitor_review_context(competitor_overview, fresh_review_context)
if review_queue:
competitor_overview['review_queue'] = review_queue[:3]
review_status_options = _build_review_status_options(competitor_overview)
total_pages = math.ceil(review_queue_total / per_page) if review_queue_total else 0
total_products_history = int(competitor_overview.get('total_active') or 0)
pchome_growth_command_center = _load_pchome_growth_command_center(session)
return render_template(
'dashboard_v2.html',
total_products=total_products_history,
today_new_products=0,
total_price_records=0,
cnt_increase=0,
cnt_decrease=0,
today_delisted_count=0,
today_delisted_items=[],
system_status=_load_dashboard_system_status(),
items=paged_items,
categories=_load_dashboard_categories(session),
current_page=page,
total_pages=total_pages,
total_items=review_queue_total,
datetime_now=now_taipei.strftime('%Y-%m-%d %H:%M:%S'),
today_date=now_taipei.strftime('%Y-%m-%d'),
public_url=public_url,
current_category=category_filter,
current_filter=filter_type,
current_review_status=review_status,
review_total_is_estimated=review_total_is_estimated,
review_status_options=review_status_options,
search_query=search_query,
current_sort=sort_by,
current_order=order,
ai_pick_summary=None,
scheduler_stats=load_scheduler_stats(),
avg_increase=0,
avg_decrease=0,
activity_rate=0,
active_count=0,
max_change_item=None,
max_change_value=0,
week_new_products=0,
stable_count=0,
most_active_category=None,
most_active_count=0,
competitor_overview=competitor_overview,
pchome_growth_command_center=pchome_growth_command_center,
ai_pick_list_limit=PRODUCT_PICK_LIST_LIMIT,
build_momo_product_url=_build_momo_product_url,
active_page='dashboard',
)
# ==========================================
# 核心數據函數
# ==========================================
def get_consolidated_data():
"""統一封裝:獲取全分類去重後的當前數據、昨日對比及差值 (帶快取)"""
global _DASHBOARD_DATA_CACHE
now = datetime.now(TAIPEI_TZ)
# V-Opt: 先檢查快取(無需鎖)
if (_DASHBOARD_DATA_CACHE['consolidated_data'] is not None and
_DASHBOARD_DATA_CACHE['consolidated_timestamp'] is not None):
cache_age = (now.timestamp() - _DASHBOARD_DATA_CACHE['consolidated_timestamp'])
if cache_age < _DASHBOARD_CACHE_TTL:
sys_log.debug(f"[Dashboard] [Cache] ✅ 使用快取資料 | 快取年齡: {cache_age:.1f}")
return _DASHBOARD_DATA_CACHE['consolidated_data'], _DASHBOARD_DATA_CACHE['today_start']
# V-Opt: 使用檔案鎖避免多 gunicorn worker 同時重建快取
# 注意: get_consolidated_data 通常由 get_full_dashboard_data 調用,
# 後者已持有 _DASHBOARD_FILE_LOCK因此這裡可以不重複鎖定
# 但為避免直接調用時的競爭問題,仍保留快取檢查邏輯
# 再次檢查快取(可能其他 worker 已經更新)
if (_DASHBOARD_DATA_CACHE['consolidated_data'] is not None and
_DASHBOARD_DATA_CACHE['consolidated_timestamp'] is not None):
cache_age = (now.timestamp() - _DASHBOARD_DATA_CACHE['consolidated_timestamp'])
if cache_age < _DASHBOARD_CACHE_TTL:
sys_log.debug(f"[Dashboard] [Cache] ✅ 使用快取資料 (其他 worker 已更新) | 快取年齡: {cache_age:.1f}")
return _DASHBOARD_DATA_CACHE['consolidated_data'], _DASHBOARD_DATA_CACHE['today_start']
sys_log.debug("[Dashboard] [Cache] 🔄 快取過期或不存在,重新查詢資料庫")
query_start_time = time.time()
db = DatabaseManager()
session = db.get_session()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) # 保持台北時區
seven_days_ago = today_start - timedelta(days=7)
thirty_days_ago = today_start - timedelta(days=30)
try:
# Query 1: Get the latest price record for every product
latest_price_subq = session.query(
func.max(PriceRecord.id).label('max_id')
).group_by(PriceRecord.product_id).subquery()
latest_records = session.query(PriceRecord).options(
joinedload(PriceRecord.product)
).join(latest_price_subq, PriceRecord.id == latest_price_subq.c.max_id).all()
product_ids = [r.product_id for r in latest_records]
if not product_ids:
session.close()
return [], today_start
# Query 2: Get yesterday's closing prices for all products
yesterday_prices_subq = session.query(
PriceRecord.product_id,
func.max(PriceRecord.id).label('max_id')
).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp < today_start
).group_by(PriceRecord.product_id).subquery()
yesterday_prices_q = session.query(
PriceRecord.product_id, PriceRecord.price
).join(
yesterday_prices_subq,
PriceRecord.id == yesterday_prices_subq.c.max_id
)
yesterday_prices_map = {pid: price for pid, price in yesterday_prices_q}
# Query 3: Get specific historical price points (7 days ago and 30 days ago)
def get_price_map_before(target_date):
subq = session.query(
PriceRecord.product_id,
func.max(PriceRecord.timestamp).label('max_ts')
).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp < target_date
).group_by(PriceRecord.product_id).subquery()
q = session.query(PriceRecord.product_id, PriceRecord.price).join(
subq,
and_(PriceRecord.product_id == subq.c.product_id, PriceRecord.timestamp == subq.c.max_ts)
)
return {pid: price for pid, price in q}
prices_7d_ago_map = get_price_map_before(seven_days_ago + timedelta(days=1))
prices_30d_ago_map = get_price_map_before(thirty_days_ago + timedelta(days=1))
# Query 4: Get TODAY's records only (for sparkline/intraday change)
today_records_q = session.query(PriceRecord).filter(
PriceRecord.product_id.in_(product_ids),
PriceRecord.timestamp >= today_start
).order_by(PriceRecord.product_id, PriceRecord.timestamp).all()
today_map = {}
for r in today_records_q:
if r.product_id not in today_map:
today_map[r.product_id] = []
today_map[r.product_id].append(r)
# Final Assembly
unique_items = []
for r in latest_records:
pid = r.product_id
product = r.product
safe_product_url = normalize_momo_product_url(getattr(product, 'url', None), getattr(product, 'i_code', ''))
price_7d = prices_7d_ago_map.get(pid)
price_30d = prices_30d_ago_map.get(pid)
stats_7d_diff = r.price - price_7d if price_7d is not None else 0
stats_30d_diff = r.price - price_30d if price_30d is not None else 0
today_records = today_map.get(pid, [])
today_diff = 0
today_changes = []
if len(today_records) > 1:
today_diff = today_records[-1].price - today_records[0].price
y_price = yesterday_prices_map.get(pid)
yesterday_diff = r.price - y_price if y_price is not None else 0
status = "NONE"
if yesterday_diff > 0:
status = "PRICE_UP"
elif yesterday_diff < 0:
status = "PRICE_DOWN"
last_p = y_price if y_price is not None else (today_records[0].price if today_records else r.price)
for tr in today_records:
if tr.price != last_p:
diff = tr.price - last_p
today_changes.append({
'time': tr.timestamp.strftime('%H:%M'),
'price': tr.price,
'diff': diff
})
last_p = tr.price
unique_items.append({
'record': r,
'safe_product_url': safe_product_url or _build_momo_product_url(getattr(product, 'i_code', '')),
'stats': {'7d_diff': stats_7d_diff, '30d_diff': stats_30d_diff, '1d_diff': today_diff},
'yesterday_diff': yesterday_diff,
'today_changes': today_changes,
'status': status
})
# 更新快取
_DASHBOARD_DATA_CACHE['consolidated_data'] = unique_items
_DASHBOARD_DATA_CACHE['consolidated_timestamp'] = now.timestamp()
_DASHBOARD_DATA_CACHE['today_start'] = today_start
query_duration_ms = (time.time() - query_start_time) * 1000
track_query_time('get_consolidated_data', query_duration_ms)
sys_log.debug(f"[Dashboard] [Cache] 快取已更新 | 商品數: {len(unique_items)} | 耗時: {query_duration_ms:.0f}ms")
return unique_items, today_start
finally:
session.close()
def get_full_dashboard_data(force_rebuild=False):
"""獲取完整的看板資料,包含快取清單與全部 KPIs (深度快取)"""
global _DASHBOARD_DATA_CACHE
now = datetime.now(TAIPEI_TZ)
# V-Opt: 先檢查快取(無需鎖)
if not force_rebuild and _DASHBOARD_DATA_CACHE.get('full_data') and _DASHBOARD_DATA_CACHE.get('full_timestamp'):
age = now.timestamp() - _DASHBOARD_DATA_CACHE['full_timestamp']
if age < _DASHBOARD_CACHE_TTL:
sys_log.debug(f"[Dashboard] [Cache] ✅ 使用完整看板快取 | 快取年齡: {age:.0f}")
return _DASHBOARD_DATA_CACHE['full_data']
shared_full_data = None if force_rebuild else _load_shared_full_dashboard_cache(now)
if shared_full_data:
return shared_full_data
expired_shared_data = None if force_rebuild else _load_expired_shared_full_dashboard_cache(now)
if expired_shared_data:
_trigger_dashboard_background_refresh('expired_shared_cache')
return expired_shared_data
# V-Opt: 使用檔案鎖避免多 gunicorn worker 同時計算
dashboard_lock = _new_dashboard_file_lock()
lock_acquired = dashboard_lock.acquire(blocking=False)
if not lock_acquired:
# 其他 worker 正在重建時,先用舊快取救首屏,避免使用者第一次打開卡住。
stale_full_data = None if force_rebuild else _load_stale_full_dashboard_cache(now)
if stale_full_data:
return stale_full_data
sys_log.info("[Dashboard] [Cache] ⏳ 等待其他 worker 重建快取...")
deadline = time.time() + 5
while time.time() < deadline:
time.sleep(0.25)
shared_full_data = None if force_rebuild else _load_shared_full_dashboard_cache(now)
if shared_full_data:
return shared_full_data
dashboard_lock = _new_dashboard_file_lock()
lock_acquired = dashboard_lock.acquire(blocking=False)
if lock_acquired:
break
shared_full_data = None if force_rebuild else _load_shared_full_dashboard_cache(now)
if shared_full_data:
return shared_full_data
if not lock_acquired:
stale_full_data = None if force_rebuild else _load_stale_full_dashboard_cache(now)
if stale_full_data:
return stale_full_data
dashboard_lock = _new_dashboard_file_lock()
lock_acquired = dashboard_lock.acquire(blocking=True)
try:
# 再次檢查快取(可能其他 worker 已經更新)
if not force_rebuild and _DASHBOARD_DATA_CACHE.get('full_data') and _DASHBOARD_DATA_CACHE.get('full_timestamp'):
age = now.timestamp() - _DASHBOARD_DATA_CACHE['full_timestamp']
if age < _DASHBOARD_CACHE_TTL:
sys_log.debug(f"[Dashboard] [Cache] ✅ 使用完整看板快取 (其他 worker 已更新) | 快取年齡: {age:.0f}")
return _DASHBOARD_DATA_CACHE['full_data']
shared_full_data = None if force_rebuild else _load_shared_full_dashboard_cache(now)
if shared_full_data:
return shared_full_data
sys_log.info("[Dashboard] [Cache] 🔄 完整快取過期,重新計算所有 KPIs 與統計數據...")
query_start_time = time.time()
unique_items, today_start = get_consolidated_data()
today_start_db = today_start # 保持台北時區
db = DatabaseManager()
session = db.get_session()
try:
# A. 基礎清單統計
increase_items = [item for item in unique_items if item['yesterday_diff'] > 0]
decrease_items = [item for item in unique_items if item['yesterday_diff'] < 0]
# B. 分類筆數統計
cat_counts = {}
for item in unique_items:
c = item['record'].product.category
if c:
cat_counts[c] = cat_counts.get(c, 0) + 1
all_categories = [f"{cat} ({count}筆)" for cat, count in sorted(cat_counts.items())]
# C. 核心 KPI 統計
total_products_history = session.query(Product).count()
total_price_records = session.query(PriceRecord).count()
today_updates = session.query(PriceRecord).filter(PriceRecord.timestamp >= today_start_db).count()
# 今日新增商品
new_pids_query = session.query(PriceRecord.product_id).group_by(
PriceRecord.product_id
).having(func.min(PriceRecord.timestamp) >= today_start_db)
new_product_ids = {r[0] for r in new_pids_query.all()}
today_new_products = len(new_product_ids)
# D. 今日下架商品處理
raw_delisted_items = session.query(Product).filter(
Product.status == 'INACTIVE',
Product.updated_at >= today_start_db
).all()
today_delisted_items = []
if raw_delisted_items:
delisted_ids = [p.id for p in raw_delisted_items]
last_prices_subq = session.query(
PriceRecord.product_id,
func.max(PriceRecord.id).label('max_id')
).filter(PriceRecord.product_id.in_(delisted_ids)).group_by(PriceRecord.product_id).subquery()
last_prices_q = session.query(PriceRecord.product_id, PriceRecord.price).join(
last_prices_subq, PriceRecord.id == last_prices_subq.c.max_id).all()
price_map = {pid: price for pid, price in last_prices_q}
for p in raw_delisted_items:
today_delisted_items.append({'product': p, 'last_price': price_map.get(p.id, 0)})
# E. 週增長
week_ago_db = now.replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=7)
week_new_products = session.query(func.count(Product.id)).filter(
Product.id.in_(
session.query(PriceRecord.product_id)
.group_by(PriceRecord.product_id)
.having(func.min(PriceRecord.timestamp) >= week_ago_db)
)
).scalar() or 0
# F. 價格穩定商品數
try:
stable_count = session.query(PriceRecord.product_id).filter(
PriceRecord.timestamp >= week_ago_db
).group_by(PriceRecord.product_id).having(
func.count(func.distinct(PriceRecord.price)) == 1
).count()
except Exception:
stable_count = 0
# G. 最大變動計算
max_change_item = None
max_change_value = 0
for item in unique_items:
if abs(item['yesterday_diff']) > abs(max_change_value):
max_change_value = item['yesterday_diff']
max_change_item = item
# H. 最活躍分類
category_activity = {}
for item in increase_items + decrease_items:
cat = item['record'].product.category
if cat:
category_activity[cat] = category_activity.get(cat, 0) + 1
most_active_category_item = max(category_activity.items(), key=lambda x: x[1]) if category_activity else (None, 0)
# I. 組合結果
full_data = {
'unique_items': unique_items,
'today_start': today_start,
'today_start_db': today_start_db,
'increase_items_all': increase_items,
'decrease_items_all': decrease_items,
'all_categories': all_categories,
'new_product_ids': new_product_ids,
'total_products_history': total_products_history,
'total_price_records': total_price_records,
'today_updates': today_updates,
'today_new_products': today_new_products,
'today_delisted_count': len(raw_delisted_items),
'today_delisted_items': today_delisted_items,
'max_change_item': max_change_item,
'max_change_value': max_change_value,
'avg_increase': sum(item['yesterday_diff'] for item in increase_items) / len(increase_items) if increase_items else 0,
'avg_decrease': sum(item['yesterday_diff'] for item in decrease_items) / len(decrease_items) if decrease_items else 0,
'activity_rate': (len(increase_items) + len(decrease_items)) / total_products_history * 100 if total_products_history > 0 else 0,
'active_count': len(increase_items) + len(decrease_items),
'week_new_products': week_new_products,
'stable_count': stable_count,
'most_active_category': most_active_category_item[0],
'most_active_count': most_active_category_item[1]
}
full_data['competitor_overview'] = _load_competitor_decision_overview(session, unique_items)
# 更新快取
_DASHBOARD_DATA_CACHE['full_data'] = full_data
_DASHBOARD_DATA_CACHE['full_timestamp'] = now.timestamp()
_write_shared_full_dashboard_cache(full_data)
query_duration_ms = (time.time() - query_start_time) * 1000
track_query_time('get_full_dashboard_data', query_duration_ms)
sys_log.info(f"[Dashboard] [Cache] ✅ 完整看板快取已更新 | 耗時: {query_duration_ms:.0f}ms")
return full_data
except Exception as e:
sys_log.error(f"[Dashboard] KPI 計算失敗: {e}")
import traceback
traceback.print_exc()
return None
finally:
session.close()
finally:
# V-Opt: 確保釋放檔案鎖
if lock_acquired:
dashboard_lock.release()
def get_dashboard_stats():
"""計算看板統計數據 (供通知使用) — backward-compat wrapper"""
from services.dashboard_service import get_dashboard_stats as _get_dashboard_stats
return _get_dashboard_stats()
# ==========================================
# 頁面路由
# ==========================================
@dashboard_bp.route('/api/pchome-review/<sku>/decision', methods=['POST'])
@login_required
def record_pchome_review_decision(sku):
"""Record an operator decision for a PChome comparison candidate."""
payload = request.get_json(silent=True) or request.form or {}
action = payload.get('action') or ''
reason = payload.get('reason') or ''
user = get_current_user() or {}
reviewer = user.get('username') or user.get('client_ip') or 'dashboard'
db = DatabaseManager()
session = db.get_session()
try:
from services.cache_manager import clear_dashboard_cache
from services.competitor_intel_repository import clear_competitor_intel_cache
from services.competitor_match_review_service import record_competitor_match_review
result = record_competitor_match_review(
session.get_bind(),
sku=sku,
review_action=action,
reviewer_identity=reviewer,
review_reason=reason,
source='pchome',
)
if result.get('success'):
clear_dashboard_cache()
clear_competitor_intel_cache()
return jsonify(result)
return jsonify(result), 400
except Exception as exc:
sys_log.error(f"[Dashboard] PChome 覆核決策寫入失敗 | sku={sku} action={action} error={exc}")
return jsonify({'success': False, 'message': f'覆核寫入失敗:{exc}'}), 500
finally:
session.close()
@dashboard_bp.route('/api/pchome-review/queue')
@login_required
def get_pchome_review_queue_api():
"""Read-only PChome review queue API for smoke tests and operator tools."""
page = request.args.get('page', 1, type=int) or 1
per_page = request.args.get('per_page', type=int) or request.args.get('limit', type=int) or 50
category_filter = request.args.get('category', 'all')
review_status = _normalize_review_status_filter(
request.args.get('review_status') or request.args.get('status') or 'all'
)
search_query = request.args.get('q', request.args.get('search', '')).strip()
normalized_category = _normalize_dashboard_category_filter(category_filter)
count_total = (
review_status != 'all'
or bool(search_query)
or bool(normalized_category)
)
if str(request.args.get('count_total') or '').lower() in {'1', 'true', 'yes', 'on'}:
count_total = True
db = DatabaseManager()
session = db.get_session()
try:
payload = _load_competitor_review_page(
session,
page=page,
per_page=per_page,
search_query=search_query,
category_filter=category_filter,
review_status=review_status,
count_total=count_total,
)
total = int(payload.get('total') if payload.get('total') is not None else 0)
return jsonify({
'success': True,
'items': payload.get('items') or [],
'total': total,
'total_is_estimated': total < 0,
'page': int(payload.get('page') or page),
'per_page': int(payload.get('per_page') or per_page),
'review_status': review_status,
'category': normalized_category or 'all',
'q': search_query,
})
except Exception as exc:
sys_log.error(f"[Dashboard] PChome 覆核隊列 API 讀取失敗 | error={exc}")
return jsonify({'success': False, 'message': f'覆核隊列讀取失敗:{exc}'}), 500
finally:
session.close()
@dashboard_bp.route('/')
@login_required
def index():
"""商品看板首頁"""
db = DatabaseManager()
session = db.get_session()
page = request.args.get('page', 1, type=int)
category_filter = request.args.get('category', 'all')
sort_by = request.args.get('sort_by', 'timestamp')
filter_type = request.args.get('filter', 'all')
order = request.args.get('order', 'desc')
review_status = _normalize_review_status_filter(request.args.get('review_status', 'all'))
search_query = request.args.get('q', '').strip()
per_page = 50
now_taipei = datetime.now(TAIPEI_TZ)
today_start_db = now_taipei.replace(hour=0, minute=0, second=0, microsecond=0) # 保持台北時區
try:
if filter_type == 'pchome_review':
return _render_pchome_review_dashboard(
session,
page=page,
per_page=per_page,
category_filter=category_filter,
sort_by=sort_by,
filter_type=filter_type,
order=order,
review_status=review_status,
search_query=search_query,
now_taipei=now_taipei,
today_start_db=today_start_db,
)
# 使用深度快取獲取所有數據
data = get_full_dashboard_data()
if not data:
return render_template('index.html', error="無法載入數據,請檢查資料庫。")
unique_items = data['unique_items']
today_start = data['today_start']
today_start_db = data['today_start_db']
increase_items = data['increase_items_all']
decrease_items = data['decrease_items_all']
all_categories = data['all_categories']
new_product_ids = data['new_product_ids']
total_products_history = data['total_products_history']
today_new_products = data['today_new_products']
total_price_records = data['total_price_records']
today_updates = data['today_updates']
today_delisted_count = data['today_delisted_count']
today_delisted_items = data['today_delisted_items']
max_change_item = data['max_change_item']
max_change_value = data['max_change_value']
avg_increase = data['avg_increase']
avg_decrease = data['avg_decrease']
activity_rate = data['activity_rate']
week_new_products = data['week_new_products']
stable_count = data['stable_count']
most_active_category = data['most_active_category']
most_active_count = data['most_active_count']
active_count = data.get('active_count', 0)
# 讀取系統狀態
system_status = _load_dashboard_system_status()
# 後端篩選
scheduler_stats = load_scheduler_stats()
# Handle old scheduler stats format
if scheduler_stats.get('momo_task') and isinstance(scheduler_stats.get('momo_task'), dict):
scheduler_stats['momo_task'] = [scheduler_stats['momo_task']]
if scheduler_stats.get('edm_task') and isinstance(scheduler_stats.get('edm_task'), dict):
scheduler_stats['edm_task'] = [scheduler_stats['edm_task']]
filtered_items = []
ai_pick_skus = []
ai_pick_map = {}
ai_pick_summary = None
review_queue = []
review_queue_map = {}
review_queue_order = {}
review_queue_total = 0
if filter_type == 'ai_picks':
ai_pick_skus, ai_pick_map = _load_ai_pick_selection(session, PRODUCT_PICK_LIST_LIMIT)
ai_pick_summary = _summarize_ai_pick_selection(ai_pick_map)
elif filter_type == 'pchome_review':
review_page = _load_competitor_review_page(
session,
page=page,
per_page=per_page,
search_query=search_query,
category_filter=category_filter,
review_status=review_status,
)
review_queue = review_page.get('items') or []
review_queue_total = int(review_page.get('total') or len(review_queue))
review_queue_map = {
str(row.get('sku') or ''): row
for row in review_queue
if row.get('sku')
}
review_queue_order = {
sku: idx
for idx, sku in enumerate(review_queue_map.keys(), start=1)
}
# 先處理搜尋
if search_query:
search_lower = search_query.lower()
base_items = [
item for item in unique_items
if (item['record'].product.name and search_lower in item['record'].product.name.lower()) or
(item['record'].product.i_code and search_lower in str(item['record'].product.i_code))
]
else:
base_items = unique_items
# 處理狀態篩選
if filter_type == 'increase':
filtered_items = [i for i in base_items if i in increase_items]
elif filter_type == 'decrease':
filtered_items = [i for i in base_items if i in decrease_items]
elif filter_type == 'new':
filtered_items = [i for i in base_items if i['record'].product_id in new_product_ids]
elif filter_type == 'ai_picks':
pick_set = set(ai_pick_skus)
filtered_items = [
i for i in base_items
if str(i['record'].product.i_code) in pick_set
]
elif filter_type == 'pchome_review':
review_set = set(review_queue_map.keys())
filtered_items = [
i for i in base_items
if str(i['record'].product.i_code) in review_set
]
elif filter_type == 'delisted':
for item in today_delisted_items:
class DelistedRecord:
def __init__(self, p, price):
self.product = p
self.price = price
self.timestamp = p.updated_at
if not search_query or search_query.lower() in item['product'].name.lower():
filtered_items.append({
'record': DelistedRecord(item['product'], item['last_price']),
'stats': {'1d_diff': 0, '7d_diff': 0, '30d_diff': 0},
'yesterday_diff': 0,
'today_changes': [],
'status': 'DELISTED'
})
else:
if category_filter != 'all':
real_category = _normalize_dashboard_category_filter(category_filter)
filtered_items = [item for item in base_items if item['record'].product.category == real_category]
else:
filtered_items = base_items
# 後端排序
reverse = (order == 'desc')
def get_sort_key(item):
def safe_get(value, default=0):
return default if value is None else value
if sort_by == 'i_code':
return int(safe_get(item['record'].product.i_code, 0))
if sort_by == 'category':
return safe_get(item['record'].product.category, '')
if sort_by == 'name':
return safe_get(item['record'].product.name, '')
if sort_by == 'price':
return safe_get(item['record'].price, 0)
if sort_by == 'today_change':
return safe_get(item['stats']['1d_diff'], 0)
if sort_by == 'yesterday_change':
return safe_get(item['yesterday_diff'], 0)
if sort_by == 'week_change':
return safe_get(item['stats']['7d_diff'], 0)
if filter_type == 'ai_picks':
sku = str(item['record'].product.i_code)
return -ai_pick_map.get(sku, {}).get('rank', 9999)
if filter_type == 'pchome_review':
sku = str(item['record'].product.i_code)
return -review_queue_order.get(sku, 9999)
return item['record'].timestamp
sorted_items = sorted(filtered_items, key=get_sort_key, reverse=reverse)
# 分頁
if filter_type == 'pchome_review':
total_items = review_queue_total
total_pages = math.ceil(total_items / per_page)
paged_items = sorted_items
else:
total_items = len(sorted_items)
total_pages = math.ceil(total_items / per_page)
start_idx = (page - 1) * per_page
paged_items = sorted_items[start_idx: start_idx + per_page]
# 為前端準備安全的 created_at 屬性
for item in paged_items:
item['safe_created_at'] = getattr(item['record'].product, 'created_at', None)
sku = str(item['record'].product.i_code)
item['ai_pick'] = ai_pick_map.get(sku)
item['pchome_review'] = review_queue_map.get(sku)
item['safe_momo_url'] = (
item.get('safe_product_url')
or normalize_momo_product_url(item['record'].product.url, sku)
or _build_momo_product_url(sku)
)
# 為當前頁面項目添加顏色
for item in paged_items:
category_name = item['record'].product.category
item['category_color'] = get_color_for_string(category_name)
pchome_map = _load_pchome_competitor_map(
session,
[item['record'].product.i_code for item in paged_items]
)
pchome_attempt_map = _load_pchome_match_attempt_map(
session,
[item['record'].product.i_code for item in paged_items]
)
pchome_ineligible_map = _load_pchome_ineligible_competitor_map(
session,
[item['record'].product.i_code for item in paged_items]
)
for item in paged_items:
product = item['record'].product
sku = str(product.i_code)
competitor = pchome_map.get(sku)
attempt = pchome_attempt_map.get(sku)
ineligible = pchome_ineligible_map.get(sku)
match_status = _build_pchome_match_status(attempt, ineligible=ineligible)
item['pchome_competitor'] = competitor
item['pchome_match_attempt'] = attempt
item['pchome_ineligible_competitor'] = ineligible
item['pchome_match_status'] = match_status
item['competitor_decision'] = _build_competitor_decision(
item['record'].price,
competitor.get('price') if competitor else None,
match_status=match_status,
)
competitor_overview = data.get('competitor_overview')
if not competitor_overview:
competitor_overview = _load_competitor_decision_overview(session, unique_items)
else:
_merge_competitor_review_context(
competitor_overview,
_load_competitor_review_context(session, limit=12),
)
data['competitor_overview'] = competitor_overview
_DASHBOARD_DATA_CACHE['full_data'] = data
_write_shared_full_dashboard_cache(data)
review_status_options = _build_review_status_options(competitor_overview)
pchome_growth_command_center = _load_pchome_growth_command_center(session)
template_name = 'dashboard_v2.html'
return render_template(template_name,
total_products=total_products_history,
today_new_products=today_new_products,
total_price_records=total_price_records,
cnt_increase=len(increase_items),
cnt_decrease=len(decrease_items),
today_delisted_count=today_delisted_count,
today_delisted_items=today_delisted_items,
system_status=system_status,
items=paged_items,
categories=all_categories,
current_page=page,
total_pages=total_pages,
total_items=total_items,
datetime_now=now_taipei.strftime('%Y-%m-%d %H:%M:%S'),
today_date=now_taipei.strftime('%Y-%m-%d'),
public_url=public_url,
current_category=category_filter,
current_filter=filter_type,
current_review_status=review_status,
review_status_options=review_status_options,
search_query=search_query,
current_sort=sort_by,
current_order=order,
ai_pick_summary=ai_pick_summary,
scheduler_stats=scheduler_stats,
avg_increase=avg_increase,
avg_decrease=avg_decrease,
activity_rate=activity_rate,
active_count=active_count,
max_change_item=max_change_item,
max_change_value=max_change_value,
week_new_products=week_new_products,
stable_count=stable_count,
most_active_category=most_active_category,
most_active_count=most_active_count,
competitor_overview=competitor_overview,
pchome_growth_command_center=pchome_growth_command_center,
ai_pick_list_limit=PRODUCT_PICK_LIST_LIMIT,
build_momo_product_url=_build_momo_product_url,
active_page='dashboard')
except Exception as e:
sys_log.error(f"[Web] [Dashboard] 渲染錯誤 | Error: {e}")
return f"系統維護中,錯誤詳情:{e}"
finally:
session.close()