快取 PChome 競價摘要查詢
All checks were successful
CD Pipeline / deploy (push) Successful in 1m2s

This commit is contained in:
OoO
2026-05-19 21:48:39 +08:00
parent 2a7916a73f
commit 2c02e8d691
4 changed files with 147 additions and 2 deletions

View File

@@ -320,7 +320,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
# ==========================================
# 系統版本與路徑
# ==========================================
SYSTEM_VERSION = "V10.276"
SYSTEM_VERSION = "V10.277"
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
public_url = PUBLIC_URL # 用於模板顯示

View File

@@ -2,7 +2,7 @@
> **最後更新**: 2026-05-19 (台北時間)
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地LLM 路由紅線升級為 Ollama-first 三主機級聯Gemini 僅備援 / 鎖定場景
> **適用版本**: V10.276
> **適用版本**: V10.277
---
@@ -52,6 +52,7 @@ SQL漏斗(~300筆)
- 配對來源仍以 PChome crawler 真實搜尋結果為準;無競品資料時不生成挑品。
- 比對覆蓋率補強入口:`POST /api/ai/pchome-match/backfill`,優先補抓仍無有效 PChome 配對的高價 ACTIVE 商品,完成後自動重算 AI 挑品清單。
- 排程閉環:`run_pchome_match_backfill_task` 每日 10:30 執行,補抓 PChome 待比對商品、寫入歷史價格,再重算 `strategy='product_pick'` 清單。
- PChome / MOMO 競價摘要出口 `services/competitor_intel_repository.py` 使用 5 分鐘共享快取(`COMPETITOR_INTEL_CACHE_TTL_SECONDS` 可調),避免 `/growth_analysis``/daily_sales`、PPT/AI 報表每次請求重跑昂貴覆蓋率與價差趨勢查詢;快取只包摘要輸出,不改 matcher 的高信心門檻與 identity_v2 準確性規則。
- 商品看板第一屏:`/` 的 V2 看板直接以 `products``price_records``competitor_prices``ai_price_recommendations` 顯示比對覆蓋率、PChome 優勢、MOMO 威脅、AI 挑品與待比對優先清單;`filter=ai_picks` 可查看 50 品 AI 挑品列表,並在列表上方顯示平均信心、平均價差、最大價差與估算總價差空間,列表列內顯示 AI 排名與建議理由,且可透過 `/api/export/excel/ai-picks` 匯出 50 品 Excel 操作清單。商品看板深度快取同時寫入 `data/dashboard_full_cache.pkl`,供多個 Gunicorn worker 共用,避免部署後各 worker 重複重建 7,000+ 商品統計造成開頁變慢;所有資料異動與 AI 挑品重算都透過 `clear_dashboard_cache()` 同步清除記憶體與共享快取,手動重算 API 會立即預熱商品看板快取,避免第一位使用者承擔重建成本。
| 角色 | 模型 | 主機 | 成本 | 每日限額 |

View File

@@ -10,13 +10,23 @@
from __future__ import annotations
import os
import pickle
import time
from datetime import date, datetime, timedelta
from pathlib import Path
from threading import Lock
from typing import Any, Optional, Union
from sqlalchemy import inspect, text
PCHOME_MATCH_SCORE_FLOOR = 0.76
COMPETITOR_INTEL_CACHE_TTL_SECONDS = int(os.getenv("COMPETITOR_INTEL_CACHE_TTL_SECONDS", "300"))
_BASE_DIR = Path(__file__).resolve().parents[1]
_CACHE_FILE = _BASE_DIR / "data" / "competitor_intel_cache.pkl"
_CACHE_LOCK = Lock()
_MEM_CACHE: dict[str, dict[str, Any]] = {}
def _num(value: Any) -> float:
@@ -38,7 +48,81 @@ def _month_label(value: Any) -> str:
return str(value or "")[:7]
def clear_competitor_intel_cache() -> None:
"""Clear cached PChome/MOMO intelligence after crawler/import updates."""
with _CACHE_LOCK:
_MEM_CACHE.clear()
try:
if _CACHE_FILE.exists():
_CACHE_FILE.unlink()
except OSError:
pass
def _load_shared_cache() -> dict[str, dict[str, Any]]:
if not _CACHE_FILE.exists():
return {}
try:
with _CACHE_FILE.open("rb") as handle:
payload = pickle.load(handle)
return payload if isinstance(payload, dict) else {}
except Exception:
return {}
def _write_shared_cache(payload: dict[str, dict[str, Any]]) -> None:
try:
_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
tmp_file = _CACHE_FILE.with_suffix(f".{os.getpid()}.tmp")
with tmp_file.open("wb") as handle:
pickle.dump(payload, handle, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(tmp_file, _CACHE_FILE)
except Exception:
try:
if "tmp_file" in locals() and tmp_file.exists():
tmp_file.unlink()
except OSError:
pass
def _cached_payload(cache_key: str, producer, ttl_seconds: int = COMPETITOR_INTEL_CACHE_TTL_SECONDS):
if ttl_seconds <= 0:
return producer()
now = time.time()
with _CACHE_LOCK:
entry = _MEM_CACHE.get(cache_key)
if entry and now - float(entry.get("time", 0)) < ttl_seconds:
return entry.get("value")
shared = _load_shared_cache()
entry = shared.get(cache_key)
if entry and now - float(entry.get("time", 0)) < ttl_seconds:
_MEM_CACHE[cache_key] = entry
return entry.get("value")
value = producer()
entry = {"time": now, "value": value}
with _CACHE_LOCK:
_MEM_CACHE[cache_key] = entry
shared = _load_shared_cache()
shared[cache_key] = entry
stale_before = now - max(ttl_seconds * 4, 3600)
shared = {
key: item
for key, item in shared.items()
if isinstance(item, dict) and float(item.get("time", 0)) >= stale_before
}
_write_shared_cache(shared)
return value
def fetch_competitor_coverage(engine) -> dict:
return _cached_payload(
f"coverage:v2:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_competitor_coverage_uncached(engine),
)
def _fetch_competitor_coverage_uncached(engine) -> dict:
"""讀取目前 PChome 比價覆蓋率與待審分類。"""
if not inspect(engine).has_table("competitor_prices"):
return {
@@ -117,6 +201,14 @@ def fetch_competitor_coverage(engine) -> dict:
def fetch_competitor_gap_trend(engine, days: int = 30) -> dict:
days = max(7, min(int(days or 30), 120))
return _cached_payload(
f"gap_trend:v2:days={days}:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_competitor_gap_trend_uncached(engine, days=days),
)
def _fetch_competitor_gap_trend_uncached(engine, days: int = 30) -> dict:
"""近 N 天 PChome 價差壓力趨勢。"""
if not inspect(engine).has_table("competitor_price_history"):
return {"labels": [], "avg_gap_pct": [], "risk_count": [], "momo_advantage_count": [], "match_count": []}
@@ -167,6 +259,14 @@ def fetch_competitor_gap_trend(engine, days: int = 30) -> dict:
def fetch_competitor_monthly_pressure(engine, months: int = 12) -> dict:
months = max(3, min(int(months or 12), 36))
return _cached_payload(
f"monthly_pressure:v2:months={months}:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_competitor_monthly_pressure_uncached(engine, months=months),
)
def _fetch_competitor_monthly_pressure_uncached(engine, months: int = 12) -> dict:
"""月度競品價格壓力,用於 growth analysis。"""
if not inspect(engine).has_table("competitor_price_history"):
return {"labels": [], "avg_gap_pct": [], "risk_count": [], "match_count": []}
@@ -215,6 +315,14 @@ def fetch_competitor_monthly_pressure(engine, months: int = 12) -> dict:
def fetch_top_competitor_risks(engine, limit: int = 10) -> list[dict]:
limit = max(1, min(int(limit or 10), 50))
return _cached_payload(
f"top_risks:v2:limit={limit}:floor={PCHOME_MATCH_SCORE_FLOOR}",
lambda: _fetch_top_competitor_risks_uncached(engine, limit=limit),
)
def _fetch_top_competitor_risks_uncached(engine, limit: int = 10) -> list[dict]:
"""目前 MOMO 比 PChome 貴的高風險商品。"""
if not inspect(engine).has_table("competitor_prices"):
return []

View File

@@ -0,0 +1,36 @@
from pathlib import Path
def test_competitor_intel_cache_reuses_memory_and_shared_file(tmp_path, monkeypatch):
from services import competitor_intel_repository as repo
monkeypatch.setattr(repo, "_CACHE_FILE", Path(tmp_path) / "competitor_intel_cache.pkl")
repo._MEM_CACHE.clear()
calls = {"count": 0}
def producer():
calls["count"] += 1
return {"valid_matches": 7, "match_rate": 0.1}
first = repo._cached_payload("coverage:test", producer, ttl_seconds=60)
second = repo._cached_payload("coverage:test", producer, ttl_seconds=60)
repo._MEM_CACHE.clear()
third = repo._cached_payload("coverage:test", producer, ttl_seconds=60)
assert first == second == third == {"valid_matches": 7, "match_rate": 0.1}
assert calls["count"] == 1
def test_clear_competitor_intel_cache_removes_shared_file(tmp_path, monkeypatch):
from services import competitor_intel_repository as repo
cache_file = Path(tmp_path) / "competitor_intel_cache.pkl"
monkeypatch.setattr(repo, "_CACHE_FILE", cache_file)
repo._MEM_CACHE["x"] = {"time": 1, "value": {"ok": True}}
cache_file.write_bytes(b"stale")
repo.clear_competitor_intel_cache()
assert repo._MEM_CACHE == {}
assert not cache_file.exists()