feat(dashboard): optimize cache and AI pick confidence
All checks were successful
CD Pipeline / deploy (push) Successful in 2m46s

This commit is contained in:
OoO
2026-05-01 16:01:52 +08:00
parent 0334051aa7
commit 3920701e1a
9 changed files with 198 additions and 27 deletions

View File

@@ -2,7 +2,7 @@
> 本文件定義專案開發的核心準則與不可違反的規範
> **建立日期**: 2026-01-12
> **當前版本**: V10.59 (Dashboard AI pick list can export 50-item action workbook)
> **當前版本**: V10.61 (Dashboard speed cache and evidence-based AI pick confidence)
> **最後更新**: 2026-05-01
---

4
app.py
View File

@@ -95,8 +95,8 @@ except Exception as e:
sys_log.error(f"無法檢測磁碟空間: {e}")
# 🚩 系統版本定義 (備份與顯示用)
# 🚩 2026-05-01 V10.59: Dashboard AI pick list can export 50-item action workbook
SYSTEM_VERSION = "V10.59"
# 🚩 2026-05-01 V10.61: Dashboard speed cache and evidence-based AI pick confidence
SYSTEM_VERSION = "V10.61"
# ==========================================
# 🔒 SQL Injection 防護函數

View File

@@ -254,7 +254,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.59"
SYSTEM_VERSION = "V10.61"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -30,14 +30,14 @@ SQL漏斗(~300筆)
`services/ai_product_pick_agent.py` 新增 PChome 銷售用挑品 Agent
- 只讀真實資料表:`products``price_records``competitor_prices``competitor_price_history`,若 `daily_sales_snapshot` 可用則納入近 7 天銷售額數量。
- 將 PChome 比 MOMO 有價格優勢、比對信心足夠、且有歷史快照或銷售動能的品項寫入 `ai_price_recommendations`
- 只讀真實資料表:`products``price_records``competitor_prices``competitor_price_history`,若 `daily_sales_snapshot` 可用則納入近 7 天銷售額數量、毛利或成本推算毛利率
- 將 PChome 比 MOMO 有價格優勢、比對信心足夠、且有歷史快照或銷售動能的品項寫入 `ai_price_recommendations`信心度不以固定倍率灌高,而是由商機分數與證據完整度共同決定,證據包含 PChome match score、歷史快照、銷售/毛利、PChome 商品 ID/名稱、抓取時間與促銷/評價/庫存標籤。
- 寫入策略使用 `strategy='product_pick'`,保留在既有 AI 決策表,不新增假頁面或暫存 JSON。
- 後台入口:`POST /api/ai/product-picks/generate``/ai_intelligence` 可手動產生清單。
- 配對來源仍以 PChome crawler 真實搜尋結果為準;無競品資料時不生成挑品。
- 比對覆蓋率補強入口:`POST /api/ai/pchome-match/backfill`,優先補抓仍無有效 PChome 配對的高價 ACTIVE 商品,完成後自動重算 AI 挑品清單。
- 排程閉環:`run_pchome_match_backfill_task` 每日 10:30 執行,補抓 PChome 待比對商品、寫入歷史價格,再重算 `strategy='product_pick'` 清單。
- 商品看板第一屏:`/` 的 V2 看板直接以 `products``price_records``competitor_prices``ai_price_recommendations` 顯示比對覆蓋率、PChome 優勢、MOMO 威脅、AI 挑品與待比對優先清單;`filter=ai_picks` 可查看 50 品 AI 挑品列表,並在列表上方顯示平均信心、平均價差、最大價差與估算總價差空間,列表列內顯示 AI 排名與建議理由,且可透過 `/api/export/excel/ai-picks` 匯出 50 品 Excel 操作清單。
- 商品看板第一屏:`/` 的 V2 看板直接以 `products``price_records``competitor_prices``ai_price_recommendations` 顯示比對覆蓋率、PChome 優勢、MOMO 威脅、AI 挑品與待比對優先清單;`filter=ai_picks` 可查看 50 品 AI 挑品列表,並在列表上方顯示平均信心、平均價差、最大價差與估算總價差空間,列表列內顯示 AI 排名與建議理由,且可透過 `/api/export/excel/ai-picks` 匯出 50 品 Excel 操作清單。商品看板深度快取同時寫入 `data/dashboard_full_cache.pkl`,供多個 Gunicorn worker 共用,避免部署後各 worker 重複重建 7,000+ 商品統計造成開頁變慢;所有資料異動仍透過 `clear_dashboard_cache()` 同步清除記憶體與共享快取。
| 角色 | 模型 | 主機 | 成本 | 每日限額 |
|------|------|------|------|---------|

View File

@@ -10,6 +10,7 @@ import json
import math
import time
import hashlib
import pickle
from datetime import datetime, timezone, timedelta
from flask import Blueprint, request, render_template
from sqlalchemy import func, and_, text, bindparam
@@ -20,7 +21,11 @@ from config import BASE_DIR, SYSTEM_VERSION, public_url
from database.manager import DatabaseManager
from database.models import Product, PriceRecord
from services.logger_manager import SystemLogger
from services.cache_manager import _DASHBOARD_DATA_CACHE, _DASHBOARD_CACHE_TTL
from services.cache_manager import (
_DASHBOARD_DATA_CACHE,
_DASHBOARD_CACHE_TTL,
_DASHBOARD_SHARED_CACHE_FILE,
)
# 時區設定
TAIPEI_TZ = timezone(timedelta(hours=8))
@@ -515,6 +520,62 @@ class FileLock:
_DASHBOARD_FILE_LOCK = FileLock(_DASHBOARD_LOCK_FILE)
def _load_shared_full_dashboard_cache(now):
"""讀取跨 worker 共享的商品看板深度快取。"""
cache_file = str(_DASHBOARD_SHARED_CACHE_FILE)
if not os.path.exists(cache_file):
return None
try:
with open(cache_file, 'rb') as f:
payload = pickle.load(f)
full_timestamp = payload.get('full_timestamp')
full_data = payload.get('full_data')
if not full_timestamp or not full_data:
return None
age = now.timestamp() - full_timestamp
if age >= _DASHBOARD_CACHE_TTL:
return None
_DASHBOARD_DATA_CACHE['full_data'] = full_data
_DASHBOARD_DATA_CACHE['full_timestamp'] = full_timestamp
_DASHBOARD_DATA_CACHE['consolidated_data'] = payload.get('consolidated_data')
_DASHBOARD_DATA_CACHE['consolidated_timestamp'] = payload.get('consolidated_timestamp')
_DASHBOARD_DATA_CACHE['today_start'] = payload.get('today_start')
sys_log.debug(f"[Dashboard] [Cache] ✅ 使用共享完整看板快取 | 快取年齡: {age:.0f}")
return full_data
except Exception as exc:
sys_log.warning(f"[Dashboard] [Cache] 共享快取讀取失敗,改走資料庫重建: {exc}")
return None
def _write_shared_full_dashboard_cache(full_data):
"""原子寫入跨 worker 共享的商品看板深度快取。"""
cache_file = str(_DASHBOARD_SHARED_CACHE_FILE)
tmp_file = f"{cache_file}.{os.getpid()}.tmp"
payload = {
'full_data': full_data,
'full_timestamp': _DASHBOARD_DATA_CACHE.get('full_timestamp'),
'consolidated_data': _DASHBOARD_DATA_CACHE.get('consolidated_data'),
'consolidated_timestamp': _DASHBOARD_DATA_CACHE.get('consolidated_timestamp'),
'today_start': _DASHBOARD_DATA_CACHE.get('today_start'),
}
try:
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
with open(tmp_file, 'wb') as f:
pickle.dump(payload, f, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(tmp_file, cache_file)
except Exception as exc:
sys_log.warning(f"[Dashboard] [Cache] 共享快取寫入失敗,仍保留記憶體快取: {exc}")
try:
if os.path.exists(tmp_file):
os.remove(tmp_file)
except OSError:
pass
# 慢查詢監控
_SLOW_QUERY_STATS = {
'total_queries': 0,
@@ -744,15 +805,31 @@ def get_full_dashboard_data():
sys_log.debug(f"[Dashboard] [Cache] ✅ 使用完整看板快取 | 快取年齡: {age:.0f}")
return _DASHBOARD_DATA_CACHE['full_data']
shared_full_data = _load_shared_full_dashboard_cache(now)
if shared_full_data:
return shared_full_data
# V-Opt: 使用檔案鎖避免多 gunicorn worker 同時計算
if not _DASHBOARD_FILE_LOCK.acquire(blocking=False):
lock_acquired = _DASHBOARD_FILE_LOCK.acquire(blocking=False)
if not lock_acquired:
# 如果無法取得鎖,表示其他 worker 正在重建,等待並使用更新後的快取
sys_log.debug("[Dashboard] [Cache] ⏳ 等待其他 worker 重建快取...")
_DASHBOARD_FILE_LOCK.acquire() # 等待取得鎖
_DASHBOARD_FILE_LOCK.release() # 立即釋放
# 返回更新後的快取
if _DASHBOARD_DATA_CACHE.get('full_data'):
return _DASHBOARD_DATA_CACHE['full_data']
shared_full_data = _load_shared_full_dashboard_cache(now)
if shared_full_data:
return shared_full_data
if _DASHBOARD_DATA_CACHE.get('full_data') and _DASHBOARD_DATA_CACHE.get('full_timestamp'):
age = now.timestamp() - _DASHBOARD_DATA_CACHE['full_timestamp']
if age < _DASHBOARD_CACHE_TTL:
return _DASHBOARD_DATA_CACHE['full_data']
lock_acquired = _DASHBOARD_FILE_LOCK.acquire()
if not lock_acquired:
sys_log.warning("[Dashboard] [Cache] 共享鎖取得失敗,改用無鎖重建")
shared_full_data = _load_shared_full_dashboard_cache(now)
if shared_full_data:
return shared_full_data
try:
# 再次檢查快取(可能其他 worker 已經更新)
@@ -762,6 +839,10 @@ def get_full_dashboard_data():
sys_log.debug(f"[Dashboard] [Cache] ✅ 使用完整看板快取 (其他 worker 已更新) | 快取年齡: {age:.0f}")
return _DASHBOARD_DATA_CACHE['full_data']
shared_full_data = _load_shared_full_dashboard_cache(now)
if shared_full_data:
return shared_full_data
sys_log.info("[Dashboard] [Cache] 🔄 完整快取過期,重新計算所有 KPIs 與統計數據...")
query_start_time = time.time()
@@ -883,6 +964,7 @@ def get_full_dashboard_data():
# 更新快取
_DASHBOARD_DATA_CACHE['full_data'] = full_data
_DASHBOARD_DATA_CACHE['full_timestamp'] = now.timestamp()
_write_shared_full_dashboard_cache(full_data)
query_duration_ms = (time.time() - query_start_time) * 1000
track_query_time('get_full_dashboard_data', query_duration_ms)
@@ -898,7 +980,8 @@ def get_full_dashboard_data():
session.close()
finally:
# V-Opt: 確保釋放檔案鎖
_DASHBOARD_FILE_LOCK.release()
if lock_acquired:
_DASHBOARD_FILE_LOCK.release()
def get_dashboard_stats():

View File

@@ -85,6 +85,8 @@ def _daily_sales_columns(conn) -> Dict[str, str]:
"date": first_available(["snapshot_date", "日期", "訂單日期", "交易日期", "Date"]),
"revenue": first_available(["總業績", "銷售金額", "業績", "金額", "Amount", "Sales", "Total"]),
"qty": first_available(["數量", "銷售數量", "銷量", "Qty", "Quantity"]),
"profit": first_available(["毛利", "Profit", "利潤"]),
"cost": first_available(["總成本", "成本", "Cost", "進價"]),
}
@@ -96,7 +98,7 @@ def _fetch_candidates(conn, limit: int) -> List[Dict[str, Any]]:
from sqlalchemy import text
sales_join = ""
sales_select = "0 AS sales_7d, 0 AS sales_prev_7d, 0 AS qty_7d"
sales_select = "0 AS sales_7d, 0 AS sales_prev_7d, 0 AS qty_7d, 0 AS profit_7d, 0 AS cost_7d"
sales_cols = {}
if _has_daily_sales_snapshot(conn):
sales_cols = _daily_sales_columns(conn)
@@ -108,6 +110,10 @@ def _fetch_candidates(conn, limit: int) -> List[Dict[str, Any]]:
date_col = _quote_identifier(sales_cols["date"])
revenue_col = _quote_identifier(sales_cols["revenue"])
qty_col = _quote_identifier(sales_cols["qty"])
profit_col = _quote_identifier(sales_cols["profit"]) if sales_cols.get("profit") else None
cost_col = _quote_identifier(sales_cols["cost"]) if sales_cols.get("cost") else None
profit_expr = f"COALESCE({profit_col}::numeric, 0)" if profit_col else "0"
cost_expr = f"COALESCE({cost_col}::numeric, 0)" if cost_col else "0"
sales_join = """
LEFT JOIN (
SELECT
@@ -118,7 +124,11 @@ def _fetch_candidates(conn, limit: int) -> List[Dict[str, Any]]:
AND {date_col}::date < CURRENT_DATE - 7
THEN COALESCE({revenue_col}::numeric, 0) ELSE 0 END) AS sales_prev_7d,
SUM(CASE WHEN {date_col}::date >= CURRENT_DATE - 7
THEN COALESCE({qty_col}::numeric, 0) ELSE 0 END) AS qty_7d
THEN COALESCE({qty_col}::numeric, 0) ELSE 0 END) AS qty_7d,
SUM(CASE WHEN {date_col}::date >= CURRENT_DATE - 7
THEN {profit_expr} ELSE 0 END) AS profit_7d,
SUM(CASE WHEN {date_col}::date >= CURRENT_DATE - 7
THEN {cost_expr} ELSE 0 END) AS cost_7d
FROM daily_sales_snapshot
GROUP BY {sku_col}
) sales ON sales.sku = lm.sku
@@ -127,11 +137,15 @@ def _fetch_candidates(conn, limit: int) -> List[Dict[str, Any]]:
date_col=date_col,
revenue_col=revenue_col,
qty_col=qty_col,
profit_expr=profit_expr,
cost_expr=cost_expr,
)
sales_select = """
COALESCE(sales.sales_7d, 0) AS sales_7d,
COALESCE(sales.sales_prev_7d, 0) AS sales_prev_7d,
COALESCE(sales.qty_7d, 0) AS qty_7d
COALESCE(sales.qty_7d, 0) AS qty_7d,
COALESCE(sales.profit_7d, 0) AS profit_7d,
COALESCE(sales.cost_7d, 0) AS cost_7d
"""
sql = text(f"""
@@ -236,7 +250,9 @@ def _fetch_candidates(conn, limit: int) -> List[Dict[str, Any]]:
NULL AS max_pchome_price,
0 AS sales_7d,
0 AS sales_prev_7d,
0 AS qty_7d
0 AS qty_7d,
0 AS profit_7d,
0 AS cost_7d
FROM latest_momo lm
JOIN competitor_prices cp
ON cp.sku = lm.sku
@@ -257,24 +273,60 @@ def _score_candidate(row: Dict[str, Any]) -> Dict[str, Any]:
sales_7d = _to_float(row.get("sales_7d"))
sales_prev_7d = _to_float(row.get("sales_prev_7d"))
qty_7d = _to_float(row.get("qty_7d"))
profit_7d = _to_float(row.get("profit_7d"))
cost_7d = _to_float(row.get("cost_7d"))
history_points = int(_to_float(row.get("history_points")))
min_pchome_price = _to_float(row.get("min_pchome_price"))
tags = _load_json_tags(row.get("tags"))
gap_pct = ((momo_price - pchome_price) / pchome_price * 100) if pchome_price else 0
sales_delta = ((sales_7d - sales_prev_7d) / sales_prev_7d * 100) if sales_prev_7d else None
if not profit_7d and cost_7d and sales_7d:
profit_7d = sales_7d - cost_7d
margin_rate = (profit_7d / sales_7d * 100) if sales_7d and profit_7d else None
price_score = max(0, min(38, gap_pct * 1.8 + 8))
match_component = max(0, min(24, match_score * 24))
price_score = max(0, min(40, gap_pct * 1.9 + 8))
match_component = max(0, min(30, match_score * 30))
sales_component = 0
if sales_7d > 0:
sales_component += min(10, sales_7d / 30000 * 10)
sales_component += min(9, sales_7d / 30000 * 9)
if qty_7d > 0:
sales_component += min(5, qty_7d / 20 * 5)
sales_component += min(4, qty_7d / 20 * 4)
if sales_delta is not None and sales_delta > 0:
sales_component += min(8, sales_delta / 40 * 8)
history_component = min(10, history_points * 2)
promo_component = 5 if any(tag in tags for tag in ["on_sale", "discount_10pct", "discount_20pct", "discount_30pct"]) else 0
score = round(min(100, price_score + match_component + sales_component + history_component + promo_component), 1)
sales_component += min(7, sales_delta / 40 * 7)
margin_component = 0
if margin_rate is not None:
margin_component = max(0, min(10, margin_rate / 35 * 10))
history_component = min(12, history_points * 2.4)
promo_component = 0
if any(tag in tags for tag in ["on_sale", "discount_10pct", "discount_20pct", "discount_30pct"]):
promo_component += 5
if "high_rating" in tags:
promo_component += 3
if "low_stock" in tags:
promo_component -= 4
price_position_component = 0
if min_pchome_price and pchome_price:
if pchome_price <= min_pchome_price * 1.03:
price_position_component = 6
elif pchome_price <= min_pchome_price * 1.08:
price_position_component = 3
opportunity_score = min(
100,
price_score + sales_component + margin_component + promo_component + price_position_component,
)
evidence_quality = min(
100,
match_component
+ history_component
+ (12 if sales_7d > 0 else 0)
+ (8 if margin_rate is not None else 0)
+ (8 if row.get("competitor_product_id") and row.get("competitor_product_name") else 0)
+ (6 if row.get("crawled_at") else 0),
)
score = round(min(100, opportunity_score + evidence_quality * 0.35), 1)
confidence = round(max(0.45, min(0.98, (score * 0.65 + evidence_quality * 0.35) / 100)), 3)
if gap_pct >= 10:
angle = "PChome 價格優勢明顯"
@@ -292,15 +344,26 @@ def _score_candidate(row: Dict[str, Any]) -> Dict[str, Any]:
]
if sales_7d > 0:
reason_parts.append(f"近 7 天銷售額 ${sales_7d:,.0f}")
if margin_rate is not None:
reason_parts.append(f"近 7 天毛利率 {margin_rate:.1f}%")
if history_points:
reason_parts.append(f"已有 {history_points} 筆 PChome 歷史快照")
if price_position_component:
reason_parts.append("目前 PChome 價格接近 30 天低點")
if "high_rating" in tags:
reason_parts.append("PChome 商品評價訊號佳")
if "low_stock" in tags:
reason_parts.append("PChome 庫存偏低,需留意供貨")
return {
**row,
"gap_pct": round(gap_pct, 1),
"sales_7d_delta": round(sales_delta, 1) if sales_delta is not None else 0,
"pick_score": score,
"confidence": round(max(0.45, min(0.98, score / 100)), 3),
"confidence": confidence,
"evidence_quality": round(evidence_quality, 1),
"opportunity_score": round(opportunity_score, 1),
"margin_rate": round(margin_rate, 1) if margin_rate is not None else None,
"reason": "".join(reason_parts),
}
@@ -315,6 +378,9 @@ def _write_pick(conn, pick: Dict[str, Any]) -> None:
"generated_at": datetime.now().isoformat(timespec="seconds"),
"inputs": ["products", "price_records", "competitor_prices", "competitor_price_history", "daily_sales_snapshot"],
"score": pick["pick_score"],
"opportunity_score": pick.get("opportunity_score"),
"evidence_quality": pick.get("evidence_quality"),
"margin_rate": pick.get("margin_rate"),
},
"competitor": {
"source": "pchome",

View File

@@ -7,7 +7,9 @@ ADR-017 Phase 3f-2: 將 sales/import/export/daily 會共同碰到的
module-level cache 收斂到這裡,避免各 route 各自持有一份 dict。
"""
import os
import time
from pathlib import Path
class FingerprintCache:
@@ -59,6 +61,8 @@ _DASHBOARD_DATA_CACHE = {
'full_timestamp': None,
}
_DASHBOARD_CACHE_TTL = 1800
_BASE_DIR = Path(__file__).resolve().parents[1]
_DASHBOARD_SHARED_CACHE_FILE = _BASE_DIR / "data" / "dashboard_full_cache.pkl"
def cleanup_sales_cache():
@@ -123,3 +127,9 @@ def clear_dashboard_cache():
'full_data': None,
'full_timestamp': None,
})
try:
os.remove(_DASHBOARD_SHARED_CACHE_FILE)
except FileNotFoundError:
pass
except OSError:
pass

View File

@@ -68,9 +68,13 @@ def test_set_sales_processed_cache_adds_timestamp_and_aliases():
assert cache_manager._SALES_PROCESSED_CACHE["realtime_sales_monthly"] is entry
def test_dashboard_cache_clear_restores_expected_shape():
def test_dashboard_cache_clear_restores_expected_shape(tmp_path, monkeypatch):
from services import cache_manager
shared_cache = tmp_path / "dashboard_full_cache.pkl"
shared_cache.write_bytes(b"stale")
monkeypatch.setattr(cache_manager, "_DASHBOARD_SHARED_CACHE_FILE", shared_cache)
cache_manager._DASHBOARD_DATA_CACHE["consolidated_data"] = ["stale"]
cache_manager._DASHBOARD_DATA_CACHE["full_data"] = ["stale"]
@@ -83,6 +87,7 @@ def test_dashboard_cache_clear_restores_expected_shape():
"full_data": None,
"full_timestamp": None,
}
assert not shared_cache.exists()
def test_cache_dicts_are_only_defined_in_cache_manager():

View File

@@ -47,6 +47,8 @@ def test_dashboard_v2_is_production_default_and_uses_real_dashboard_data():
assert "request.args.get('ui') == 'legacy'" in route_source
assert "template_name = 'dashboard.html' if request.args.get('ui') == 'legacy' else 'dashboard_v2.html'" in route_source
assert "get_full_dashboard_data()" in route_source
assert "_load_shared_full_dashboard_cache(now)" in route_source
assert "_write_shared_full_dashboard_cache(full_data)" in route_source
assert "_load_competitor_decision_overview(session)" in route_source
assert "ai_price_recommendations" in route_source
assert "pending_match_count" in route_source
@@ -187,6 +189,11 @@ def test_ai_product_pick_agent_uses_real_competitor_data_and_dashboard_action():
assert "PChome 價格優勢" in agent_source
assert "_daily_sales_columns" in agent_source
assert '"總業績"' in agent_source
assert '"毛利"' in agent_source
assert '"總成本"' in agent_source
assert "evidence_quality" in agent_source
assert "opportunity_score" in agent_source
assert "margin_rate" in agent_source
assert "{date_col}::date" in agent_source
assert "conn.rollback()" in agent_source