fix(daily_sales): cache 失效改 DB fingerprint + clear_cache 加 @login_required
All checks were successful
CD Pipeline / deploy (push) Successful in 1m12s

回應 critic 對 8fefea0 的 4 個 HIGH finding(debugger/web-researcher/migration-engineer 三方共識):

HIGH-3 修復:
- /api/daily_sales/clear_cache 加 @login_required,避免外部 DoS 攻擊清快取

HIGH-4 修復(核心):
- 新增 _get_data_fingerprint(engine):SELECT MAX(snapshot_date), COUNT(*) FROM daily_sales_snapshot
- _is_cache_valid 改雙閘:TTL(保險絲)+ DB fingerprint(強一致)
- 三個 cache 寫入點同步記錄 fingerprint
- 移除 services/import_service.py 的 4-worker N-POST hack(命中率僅 9.4%,
  coupon collector 機率:4!/4^4 = 0.094)
- 資料寫入後指紋自動跳號,4 worker 下次 request 各自偵測失效並重載 → 強一致

附帶修:
- 統一 export/export_marketing 兩處 cache 寫入結構(補 timestamp/fingerprint),
  解決 db-expert 標記的「export 端點 cache 缺 timestamp 導致主看板每次都重讀」瑕疵

未處理(留待後續):
- HIGH-2: app.py 仍有 43 處 _SALES_PROCESSED_CACHE(sales_analysis 等其他路由的獨立 cache)
  待後續 P9 統一抽 services/cache_manager.py

[P7-COMPLETION]
方案: critic + debugger + web-researcher + migration-engineer 四方共識的方案 B
影響: routes/daily_sales_routes.py + services/import_service.py 兩檔
Regression: 每 request 多 1 次 SELECT MAX/COUNT(< 5ms),其餘行為等價
This commit is contained in:
OoO
2026-04-29 09:10:23 +08:00
parent 8fefea05da
commit 3971fd4020
2 changed files with 45 additions and 30 deletions

View File

@@ -47,11 +47,9 @@ def clear_daily_sales_cache():
# === 清除緩存 API 端點 ===
@daily_sales_bp.route('/api/daily_sales/clear_cache', methods=['POST'])
@login_required
def api_clear_daily_sales_cache():
"""
API 端點:清除當日業績緩存
供匯入服務透過 HTTP 請求調用,解決跨進程緩存問題
"""
"""手動清除快取(保留為 ops escape hatch正常失效靠 DB fingerprint"""
global _SALES_PROCESSED_CACHE
cache_size = len(_SALES_PROCESSED_CACHE)
_SALES_PROCESSED_CACHE.clear()
@@ -59,15 +57,36 @@ def api_clear_daily_sales_cache():
return {'success': True, 'message': f'已清除 {cache_size} 個緩存項目'}
def _is_cache_valid(cache_key):
"""檢查緩存是否有效(未過期)"""
def _get_data_fingerprint(engine, table_name='daily_sales_snapshot'):
"""DB 指紋 (max_snapshot_date, row_count):資料一變動指紋就跳號,
讓 4 worker 的 in-memory cache 在下一次 request 自然失效。"""
try:
with engine.connect() as conn:
row = conn.execute(text(
f'SELECT MAX(snapshot_date)::text, COUNT(*) FROM "{table_name}"'
)).fetchone()
return (row[0], row[1]) if row else (None, 0)
except Exception as e:
sys_log.warning(f"[Cache] fingerprint 查詢失敗(保守維持現有 cache: {e}")
return None
def _is_cache_valid(cache_key, engine=None, table_name='daily_sales_snapshot'):
if cache_key not in _SALES_PROCESSED_CACHE:
return False
cache_data = _SALES_PROCESSED_CACHE[cache_key]
if 'timestamp' not in cache_data:
return False
elapsed = (datetime.now() - cache_data['timestamp']).total_seconds()
return elapsed < _CACHE_EXPIRY_SECONDS
if elapsed >= _CACHE_EXPIRY_SECONDS:
return False
if engine is not None:
current_fp = _get_data_fingerprint(engine, table_name)
if current_fp is None:
return True
if current_fp != cache_data.get('fingerprint'):
return False
return True
# ==========================================
@@ -355,8 +374,8 @@ def daily_sales():
datetime_now=datetime_now_str, active_page='daily_sales')
cache_key = f'{table_name}_daily'
# 使用帶過期時間的緩存檢查
if _is_cache_valid(cache_key):
# TTL + DB fingerprint 雙閘檢查(資料變動即自動失效,跨 worker 強一致)
if _is_cache_valid(cache_key, engine, table_name):
df = _SALES_PROCESSED_CACHE[cache_key]['df']
sys_log.debug(f"使用緩存數據,剩餘有效時間: {_CACHE_EXPIRY_SECONDS - (datetime.now() - _SALES_PROCESSED_CACHE[cache_key]['timestamp']).total_seconds():.0f}")
else:
@@ -369,7 +388,10 @@ def daily_sales():
datetime_now=datetime_now_str, active_page='daily_sales')
df = preprocess_daily_sales_data(df)
_SALES_PROCESSED_CACHE[cache_key] = {'df': df, 'timestamp': datetime.now()}
_SALES_PROCESSED_CACHE[cache_key] = {
'df': df, 'timestamp': datetime.now(),
'fingerprint': _get_data_fingerprint(engine, table_name),
}
sys_log.info(f"已重新載入數據並更新緩存,共 {len(df)} 筆記錄")
available_dates = sorted(df['snapshot_date'].unique(), reverse=True)
@@ -494,12 +516,15 @@ def export_daily_sales_category():
return "資料表不存在", 404
cache_key = f'{table_name}_daily'
if cache_key in _SALES_PROCESSED_CACHE:
if _is_cache_valid(cache_key, engine, table_name):
df = _SALES_PROCESSED_CACHE[cache_key]['df']
else:
df = safe_read_sql(table_name, engine=engine)
df = preprocess_daily_sales_data(df)
_SALES_PROCESSED_CACHE[cache_key] = {'df': df}
_SALES_PROCESSED_CACHE[cache_key] = {
'df': df, 'timestamp': datetime.now(),
'fingerprint': _get_data_fingerprint(engine, table_name),
}
selected_date = request.args.get('date')
if not selected_date:
@@ -580,12 +605,15 @@ def export_marketing_summary_excel():
table_name = 'daily_sales_snapshot'
cache_key = f'{table_name}_daily'
if cache_key in _SALES_PROCESSED_CACHE:
if _is_cache_valid(cache_key, engine, table_name):
df = _SALES_PROCESSED_CACHE[cache_key]['df']
else:
df = safe_read_sql(table_name, engine=engine)
df = preprocess_daily_sales_data(df)
_SALES_PROCESSED_CACHE[cache_key] = {'df': df}
_SALES_PROCESSED_CACHE[cache_key] = {
'df': df, 'timestamp': datetime.now(),
'fingerprint': _get_data_fingerprint(engine, table_name),
}
activity_type = request.args.get('type', 'all')
start_date = request.args.get('start_date')

View File

@@ -637,22 +637,9 @@ class ImportService:
session.close()
logger.info(f"任務 {job_id} 匯入成功: {total_rows}")
# 跨 worker 清 daily_sales cachegunicorn N worker 各持獨立 in-memory cache
# 需打 N 次 internal endpoint 確保每個 worker 的快取都被清掉)
try:
import requests
base = os.getenv('INTERNAL_BASE_URL', 'http://127.0.0.1:5000')
n = int(os.getenv('GUNICORN_WORKERS', '4'))
for _ in range(n):
try:
requests.post(f"{base}/api/daily_sales/clear_cache", timeout=2)
except Exception:
break
logger.info(f"任務 {job_id} 已清 daily_sales cache{n} workers")
except Exception as e:
logger.warning(f"任務 {job_id} 清 cache 失敗(不影響主流程): {e}")
# cache 失效改靠 _get_data_fingerprintDB max(snapshot_date)+count(*)
# 寫入後指紋自動跳號4 worker 下一次 request 時各自偵測失效
# 取代不可靠的 N-POST hack命中率僅 9.4%,見 web-researcher 報告)。
return True
except Exception as e: