Files
ewoooc/services/cache_service.py
OoO b68125ca26
All checks were successful
CD Pipeline / deploy (push) Successful in 1m5s
[V10.329] 延長成長分析穩定快取
2026-05-20 13:32:57 +08:00

230 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Legacy cache compatibility shim.
Sales/dashboard cache state lives in services.cache_manager. This module keeps
older imports working and only owns growth-analysis and slow-query helpers.
"""
import os
import pickle
import time
from datetime import datetime, timezone, timedelta
from pathlib import Path
from services.cache_manager import (
_SALES_DF_CACHE,
_SALES_PROCESSED_CACHE,
_SALES_OPTIONS_CACHE,
_SALES_ANALYSIS_RESULT_CACHE,
_SALES_CACHE_TTL,
_DASHBOARD_DATA_CACHE,
_DASHBOARD_CACHE_TTL,
clear_sales_cache,
clear_dashboard_cache,
)
# 台北時區
TAIPEI_TZ = timezone(timedelta(hours=8))
# 快取 TTL 設定sales TTL 以 cache_manager 為單一來源
_SALES_OPTIONS_TTL = 21600 # 選項快取: 6 小時
_SALES_RESULT_TTL = _SALES_CACHE_TTL
# ==========================================
# 成長分析快取
# ==========================================
_GROWTH_ANALYSIS_CACHE = {
'chart_data': None,
'kpi': None,
'timestamp': None,
'source_fingerprint': None,
}
_GROWTH_CACHE_TTL = 1800 # 成長分析快取: 30 分鐘
_GROWTH_STABLE_SOURCE_CACHE_TTL = 21600 # source fingerprint 未變時延長到 6 小時,避免冷 worker 反覆掃明細表
_BASE_DIR = Path(__file__).resolve().parents[1]
_GROWTH_SHARED_CACHE_FILE = _BASE_DIR / "data" / "growth_analysis_cache.pkl"
_GROWTH_SOURCE_FINGERPRINT_CACHE = {}
_GROWTH_SOURCE_FINGERPRINT_TTL = 60
# ==========================================
# 慢查詢監控
# ==========================================
_SLOW_QUERY_STATS = {
'total_queries': 0,
'slow_queries': 0,
'very_slow_queries': 0,
'total_query_time_ms': 0,
'last_slow_query': None,
'last_slow_query_time': None,
}
_SLOW_QUERY_THRESHOLD_MS = 1000 # 慢查詢閾值: 1秒
_VERY_SLOW_QUERY_THRESHOLD_MS = 5000 # 極慢查詢閾值: 5秒
def track_query_time(query_name, duration_ms):
"""追蹤查詢時間,更新慢查詢統計"""
global _SLOW_QUERY_STATS
_SLOW_QUERY_STATS['total_queries'] += 1
_SLOW_QUERY_STATS['total_query_time_ms'] += duration_ms
if duration_ms >= _VERY_SLOW_QUERY_THRESHOLD_MS:
_SLOW_QUERY_STATS['very_slow_queries'] += 1
_SLOW_QUERY_STATS['slow_queries'] += 1
_SLOW_QUERY_STATS['last_slow_query'] = query_name
_SLOW_QUERY_STATS['last_slow_query_time'] = datetime.now(TAIPEI_TZ).isoformat()
elif duration_ms >= _SLOW_QUERY_THRESHOLD_MS:
_SLOW_QUERY_STATS['slow_queries'] += 1
_SLOW_QUERY_STATS['last_slow_query'] = query_name
_SLOW_QUERY_STATS['last_slow_query_time'] = datetime.now(TAIPEI_TZ).isoformat()
def get_slow_query_stats():
"""取得慢查詢統計資料"""
return _SLOW_QUERY_STATS.copy()
def clear_growth_cache():
"""清除成長分析快取"""
global _GROWTH_ANALYSIS_CACHE
_GROWTH_ANALYSIS_CACHE = {
'chart_data': None,
'kpi': None,
'timestamp': None,
'source_fingerprint': None,
}
_GROWTH_SOURCE_FINGERPRINT_CACHE.clear()
try:
if os.path.exists(_GROWTH_SHARED_CACHE_FILE):
os.remove(_GROWTH_SHARED_CACHE_FILE)
except OSError:
pass
def clear_all_cache():
"""清除所有快取"""
clear_sales_cache()
clear_dashboard_cache()
clear_growth_cache()
def is_cache_valid(timestamp, ttl_seconds):
"""
檢查快取是否有效
Args:
timestamp: 快取時間戳記
ttl_seconds: 快取有效期(秒)
Returns:
bool: True 表示快取有效
"""
if timestamp is None:
return False
now = datetime.now(TAIPEI_TZ)
if timestamp.tzinfo is None:
timestamp = timestamp.replace(tzinfo=TAIPEI_TZ)
age = (now - timestamp).total_seconds()
return age < ttl_seconds
def get_growth_cache():
"""取得成長分析快取"""
return _GROWTH_ANALYSIS_CACHE
def get_growth_source_fingerprint_cache(cache_key):
"""讀取短 TTL source fingerprint避免每次命中頁面快取仍掃大表 COUNT。"""
entry = _GROWTH_SOURCE_FINGERPRINT_CACHE.get(cache_key)
if not entry:
return None
if time.time() - entry.get('time', 0) >= _GROWTH_SOURCE_FINGERPRINT_TTL:
_GROWTH_SOURCE_FINGERPRINT_CACHE.pop(cache_key, None)
return None
return entry.get('fingerprint')
def set_growth_source_fingerprint_cache(cache_key, fingerprint):
"""寫入 source fingerprint 短快取None 不快取,維持 fail-open。"""
if fingerprint is None:
return
_GROWTH_SOURCE_FINGERPRINT_CACHE[cache_key] = {
'fingerprint': fingerprint,
'time': time.time(),
}
def clear_growth_source_fingerprint_cache(cache_key=None):
"""清除 source fingerprint 快取;匯入流程會呼叫 clear_growth_cache 一併處理。"""
if cache_key is None:
_GROWTH_SOURCE_FINGERPRINT_CACHE.clear()
else:
_GROWTH_SOURCE_FINGERPRINT_CACHE.pop(cache_key, None)
def _is_growth_payload_valid(payload, source_fingerprint=None):
if not payload:
return False
if not payload.get('chart_data') or not payload.get('kpi'):
return False
if source_fingerprint is not None:
if payload.get('source_fingerprint') != source_fingerprint:
return False
return is_cache_valid(payload.get('timestamp'), _GROWTH_STABLE_SOURCE_CACHE_TTL)
if not is_cache_valid(payload.get('timestamp'), _GROWTH_CACHE_TTL):
return False
return True
def _load_growth_cache_file(source_fingerprint=None):
"""讀取跨 worker 成長分析快取,避免冷 worker 重新掃明細表。"""
global _GROWTH_ANALYSIS_CACHE
try:
if not os.path.exists(_GROWTH_SHARED_CACHE_FILE):
return False
with open(_GROWTH_SHARED_CACHE_FILE, 'rb') as f:
payload = pickle.load(f)
if not _is_growth_payload_valid(payload, source_fingerprint):
return False
_GROWTH_ANALYSIS_CACHE = payload
return True
except Exception:
return False
def _write_growth_cache_file(payload):
"""原子寫入成長分析共享快取。"""
cache_file = str(_GROWTH_SHARED_CACHE_FILE)
tmp_file = f"{cache_file}.{os.getpid()}.tmp"
try:
os.makedirs(os.path.dirname(cache_file), exist_ok=True)
with open(tmp_file, 'wb') as f:
pickle.dump(payload, f, protocol=pickle.HIGHEST_PROTOCOL)
os.replace(tmp_file, cache_file)
except Exception:
try:
if os.path.exists(tmp_file):
os.remove(tmp_file)
except OSError:
pass
def set_growth_cache(chart_data, kpi, source_fingerprint=None):
"""設定成長分析快取"""
global _GROWTH_ANALYSIS_CACHE
payload = {
'chart_data': chart_data,
'kpi': kpi,
'timestamp': datetime.now(TAIPEI_TZ),
'source_fingerprint': source_fingerprint,
}
_GROWTH_ANALYSIS_CACHE = payload
_write_growth_cache_file(payload)
def is_growth_cache_valid(source_fingerprint=None):
"""檢查成長分析快取是否有效"""
if _is_growth_payload_valid(_GROWTH_ANALYSIS_CACHE, source_fingerprint):
return True
return _load_growth_cache_file(source_fingerprint)