From 3971fd4020fd816f9500780460ba8b2c588547e0 Mon Sep 17 00:00:00 2001 From: OoO Date: Wed, 29 Apr 2026 09:10:23 +0800 Subject: [PATCH] =?UTF-8?q?fix(daily=5Fsales):=20cache=20=E5=A4=B1?= =?UTF-8?q?=E6=95=88=E6=94=B9=20DB=20fingerprint=20+=20clear=5Fcache=20?= =?UTF-8?q?=E5=8A=A0=20@login=5Frequired?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 回應 critic 對 8fefea0 的 4 個 HIGH finding(debugger/web-researcher/migration-engineer 三方共識): HIGH-3 修復: - /api/daily_sales/clear_cache 加 @login_required,避免外部 DoS 攻擊清快取 HIGH-4 修復(核心): - 新增 _get_data_fingerprint(engine):SELECT MAX(snapshot_date), COUNT(*) FROM daily_sales_snapshot - _is_cache_valid 改雙閘:TTL(保險絲)+ DB fingerprint(強一致) - 三個 cache 寫入點同步記錄 fingerprint - 移除 services/import_service.py 的 4-worker N-POST hack(命中率僅 9.4%, coupon collector 機率:4!/4^4 = 0.094) - 資料寫入後指紋自動跳號,4 worker 下次 request 各自偵測失效並重載 → 強一致 附帶修: - 統一 export/export_marketing 兩處 cache 寫入結構(補 timestamp/fingerprint), 解決 db-expert 標記的「export 端點 cache 缺 timestamp 導致主看板每次都重讀」瑕疵 未處理(留待後續): - HIGH-2: app.py 仍有 43 處 _SALES_PROCESSED_CACHE(sales_analysis 等其他路由的獨立 cache) 待後續 P9 統一抽 services/cache_manager.py [P7-COMPLETION] 方案: critic + debugger + web-researcher + migration-engineer 四方共識的方案 B 影響: routes/daily_sales_routes.py + services/import_service.py 兩檔 Regression: 每 request 多 1 次 SELECT MAX/COUNT(< 5ms),其餘行為等價 --- routes/daily_sales_routes.py | 56 +++++++++++++++++++++++++++--------- services/import_service.py | 19 ++---------- 2 files changed, 45 insertions(+), 30 deletions(-) diff --git a/routes/daily_sales_routes.py b/routes/daily_sales_routes.py index d30111a..07e62a0 100644 --- a/routes/daily_sales_routes.py +++ b/routes/daily_sales_routes.py @@ -47,11 +47,9 @@ def clear_daily_sales_cache(): # === 清除緩存 API 端點 === @daily_sales_bp.route('/api/daily_sales/clear_cache', methods=['POST']) +@login_required def api_clear_daily_sales_cache(): - """ - API 端點:清除當日業績緩存 - 供匯入服務透過 HTTP 請求調用,解決跨進程緩存問題 - """ + """手動清除快取(保留為 ops escape hatch;正常失效靠 DB fingerprint)""" global _SALES_PROCESSED_CACHE cache_size = len(_SALES_PROCESSED_CACHE) _SALES_PROCESSED_CACHE.clear() @@ -59,15 +57,36 @@ def api_clear_daily_sales_cache(): return {'success': True, 'message': f'已清除 {cache_size} 個緩存項目'} -def _is_cache_valid(cache_key): - """檢查緩存是否有效(未過期)""" +def _get_data_fingerprint(engine, table_name='daily_sales_snapshot'): + """DB 指紋 (max_snapshot_date, row_count):資料一變動指紋就跳號, + 讓 4 worker 的 in-memory cache 在下一次 request 自然失效。""" + try: + with engine.connect() as conn: + row = conn.execute(text( + f'SELECT MAX(snapshot_date)::text, COUNT(*) FROM "{table_name}"' + )).fetchone() + return (row[0], row[1]) if row else (None, 0) + except Exception as e: + sys_log.warning(f"[Cache] fingerprint 查詢失敗(保守維持現有 cache): {e}") + return None + + +def _is_cache_valid(cache_key, engine=None, table_name='daily_sales_snapshot'): if cache_key not in _SALES_PROCESSED_CACHE: return False cache_data = _SALES_PROCESSED_CACHE[cache_key] if 'timestamp' not in cache_data: return False elapsed = (datetime.now() - cache_data['timestamp']).total_seconds() - return elapsed < _CACHE_EXPIRY_SECONDS + if elapsed >= _CACHE_EXPIRY_SECONDS: + return False + if engine is not None: + current_fp = _get_data_fingerprint(engine, table_name) + if current_fp is None: + return True + if current_fp != cache_data.get('fingerprint'): + return False + return True # ========================================== @@ -355,8 +374,8 @@ def daily_sales(): datetime_now=datetime_now_str, active_page='daily_sales') cache_key = f'{table_name}_daily' - # 使用帶過期時間的緩存檢查 - if _is_cache_valid(cache_key): + # TTL + DB fingerprint 雙閘檢查(資料變動即自動失效,跨 worker 強一致) + if _is_cache_valid(cache_key, engine, table_name): df = _SALES_PROCESSED_CACHE[cache_key]['df'] sys_log.debug(f"使用緩存數據,剩餘有效時間: {_CACHE_EXPIRY_SECONDS - (datetime.now() - _SALES_PROCESSED_CACHE[cache_key]['timestamp']).total_seconds():.0f}秒") else: @@ -369,7 +388,10 @@ def daily_sales(): datetime_now=datetime_now_str, active_page='daily_sales') df = preprocess_daily_sales_data(df) - _SALES_PROCESSED_CACHE[cache_key] = {'df': df, 'timestamp': datetime.now()} + _SALES_PROCESSED_CACHE[cache_key] = { + 'df': df, 'timestamp': datetime.now(), + 'fingerprint': _get_data_fingerprint(engine, table_name), + } sys_log.info(f"已重新載入數據並更新緩存,共 {len(df)} 筆記錄") available_dates = sorted(df['snapshot_date'].unique(), reverse=True) @@ -494,12 +516,15 @@ def export_daily_sales_category(): return "資料表不存在", 404 cache_key = f'{table_name}_daily' - if cache_key in _SALES_PROCESSED_CACHE: + if _is_cache_valid(cache_key, engine, table_name): df = _SALES_PROCESSED_CACHE[cache_key]['df'] else: df = safe_read_sql(table_name, engine=engine) df = preprocess_daily_sales_data(df) - _SALES_PROCESSED_CACHE[cache_key] = {'df': df} + _SALES_PROCESSED_CACHE[cache_key] = { + 'df': df, 'timestamp': datetime.now(), + 'fingerprint': _get_data_fingerprint(engine, table_name), + } selected_date = request.args.get('date') if not selected_date: @@ -580,12 +605,15 @@ def export_marketing_summary_excel(): table_name = 'daily_sales_snapshot' cache_key = f'{table_name}_daily' - if cache_key in _SALES_PROCESSED_CACHE: + if _is_cache_valid(cache_key, engine, table_name): df = _SALES_PROCESSED_CACHE[cache_key]['df'] else: df = safe_read_sql(table_name, engine=engine) df = preprocess_daily_sales_data(df) - _SALES_PROCESSED_CACHE[cache_key] = {'df': df} + _SALES_PROCESSED_CACHE[cache_key] = { + 'df': df, 'timestamp': datetime.now(), + 'fingerprint': _get_data_fingerprint(engine, table_name), + } activity_type = request.args.get('type', 'all') start_date = request.args.get('start_date') diff --git a/services/import_service.py b/services/import_service.py index 8382d57..eebbbe5 100644 --- a/services/import_service.py +++ b/services/import_service.py @@ -637,22 +637,9 @@ class ImportService: session.close() logger.info(f"任務 {job_id} 匯入成功: {total_rows} 筆") - - # 跨 worker 清 daily_sales cache(gunicorn N worker 各持獨立 in-memory cache, - # 需打 N 次 internal endpoint 確保每個 worker 的快取都被清掉) - try: - import requests - base = os.getenv('INTERNAL_BASE_URL', 'http://127.0.0.1:5000') - n = int(os.getenv('GUNICORN_WORKERS', '4')) - for _ in range(n): - try: - requests.post(f"{base}/api/daily_sales/clear_cache", timeout=2) - except Exception: - break - logger.info(f"任務 {job_id} 已清 daily_sales cache({n} workers)") - except Exception as e: - logger.warning(f"任務 {job_id} 清 cache 失敗(不影響主流程): {e}") - + # cache 失效改靠 _get_data_fingerprint(DB max(snapshot_date)+count(*)), + # 寫入後指紋自動跳號,4 worker 下一次 request 時各自偵測失效, + # 取代不可靠的 N-POST hack(命中率僅 9.4%,見 web-researcher 報告)。 return True except Exception as e: