Files
ewoooc/tests/test_cache_manager.py
OoO 5a5f268358
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
強化 EA JSON fallback 與 EDM cache 自癒
2026-05-21 18:59:16 +08:00

418 lines
15 KiB
Python

from pathlib import Path
from datetime import date
import pickle
from flask import Flask, session
ROOT = Path(__file__).resolve().parents[1]
def test_cache_service_reexports_cache_manager_state():
from services import cache_manager, cache_service
assert cache_service._SALES_DF_CACHE is cache_manager._SALES_DF_CACHE
assert cache_service._SALES_PROCESSED_CACHE is cache_manager._SALES_PROCESSED_CACHE
assert cache_service._SALES_OPTIONS_CACHE is cache_manager._SALES_OPTIONS_CACHE
assert cache_service._SALES_ANALYSIS_RESULT_CACHE is cache_manager._SALES_ANALYSIS_RESULT_CACHE
assert cache_service._DASHBOARD_DATA_CACHE is cache_manager._DASHBOARD_DATA_CACHE
assert cache_service._SALES_RESULT_TTL == cache_manager._SALES_CACHE_TTL
def test_routes_share_sales_cache_manager_state():
from services import cache_manager
from routes import export_routes, import_routes, sales_routes
sales_df_cache, sales_processed_cache, clear_for_table = import_routes._get_cache_refs()
assert sales_routes._SALES_PROCESSED_CACHE is cache_manager._SALES_PROCESSED_CACHE
assert export_routes._get_sales_cache() is cache_manager._SALES_PROCESSED_CACHE
assert sales_df_cache is cache_manager._SALES_DF_CACHE
assert sales_processed_cache is cache_manager._SALES_PROCESSED_CACHE
assert clear_for_table is cache_manager.clear_sales_cache_for_table
def test_clear_sales_cache_for_table_removes_table_and_range_aliases():
from services import cache_manager
cache_manager.clear_sales_cache()
cache_manager._SALES_DF_CACHE["realtime_sales_monthly"] = object()
cache_manager._SALES_DF_CACHE["other_table"] = object()
cache_manager._SALES_PROCESSED_CACHE.update({
"realtime_sales_monthly": {"df": "base"},
"realtime_sales_monthly_1m": {"df": "range"},
"realtime_sales_monthly_custom_2026-01-01_2026-01-31": {"df": "custom"},
"other_table": {"df": "other"},
})
cache_manager.clear_sales_cache_for_table("realtime_sales_monthly")
assert "realtime_sales_monthly" not in cache_manager._SALES_DF_CACHE
assert "other_table" in cache_manager._SALES_DF_CACHE
assert "realtime_sales_monthly" not in cache_manager._SALES_PROCESSED_CACHE
assert "realtime_sales_monthly_1m" not in cache_manager._SALES_PROCESSED_CACHE
assert "realtime_sales_monthly_custom_2026-01-01_2026-01-31" not in cache_manager._SALES_PROCESSED_CACHE
assert "other_table" in cache_manager._SALES_PROCESSED_CACHE
def test_set_sales_processed_cache_adds_timestamp_and_aliases():
from services import cache_manager
cache_manager.clear_sales_cache()
entry = {"df": "data", "cols": {"date": "日期"}}
cache_manager.set_sales_processed_cache(
"realtime_sales_monthly_3m",
entry,
aliases=("realtime_sales_monthly",),
)
cached = cache_manager._SALES_PROCESSED_CACHE["realtime_sales_monthly_3m"]
assert cached is entry
assert cached["time"] > 0
assert cache_manager._SALES_PROCESSED_CACHE["realtime_sales_monthly"] is entry
def test_dashboard_cache_clear_restores_expected_shape(tmp_path, monkeypatch):
from services import cache_manager
shared_cache = tmp_path / "dashboard_full_cache.pkl"
stale_cache = tmp_path / "dashboard_full_cache_stale.pkl"
shared_cache.write_bytes(b"stale")
monkeypatch.setattr(cache_manager, "_DASHBOARD_SHARED_CACHE_FILE", shared_cache)
monkeypatch.setattr(cache_manager, "_DASHBOARD_STALE_CACHE_FILE", stale_cache)
cache_manager._DASHBOARD_DATA_CACHE["consolidated_data"] = ["stale"]
cache_manager._DASHBOARD_DATA_CACHE["full_data"] = ["stale"]
cache_manager.clear_dashboard_cache()
assert cache_manager._DASHBOARD_DATA_CACHE == {
"consolidated_data": None,
"consolidated_timestamp": None,
"today_start": None,
"full_data": None,
"full_timestamp": None,
}
assert not shared_cache.exists()
assert stale_cache.exists()
def test_sales_analysis_shared_page_cache_roundtrip(tmp_path, monkeypatch):
from routes import sales_routes
monkeypatch.setattr(sales_routes, "_SALES_ANALYSIS_PAGE_CACHE_DIR", tmp_path)
cache_key = "sales_analysis:page_context:test"
context = {"kpi": {"revenue": 123}, "active_page": "sales"}
sales_routes._set_sales_shared_page_context_cache(cache_key, context)
assert sales_routes._get_sales_shared_page_context_cache(cache_key) == context
def test_clear_sales_cache_removes_shared_page_cache_files(tmp_path, monkeypatch):
from services import cache_manager
monkeypatch.setattr(cache_manager, "_SALES_ANALYSIS_PAGE_CACHE_DIR", tmp_path)
tmp_path.mkdir(parents=True, exist_ok=True)
cache_file = tmp_path / "sales_analysis_page.pkl"
cache_file.write_bytes(b"stale")
cache_manager.clear_sales_cache()
assert not cache_file.exists()
def test_growth_cache_shared_file_roundtrip(tmp_path, monkeypatch):
from services import cache_service
shared_cache = tmp_path / "growth_analysis_cache.pkl"
monkeypatch.setattr(cache_service, "_GROWTH_SHARED_CACHE_FILE", shared_cache)
cache_service.clear_growth_cache()
cache_service.set_growth_cache(
{"labels": ["2026-05"], "revenue": [1000]},
{"ytd_revenue": 1000},
source_fingerprint=("2026-05-17", 10),
)
cache_service._GROWTH_ANALYSIS_CACHE = {
"chart_data": None,
"kpi": None,
"timestamp": None,
"source_fingerprint": None,
}
assert cache_service.is_growth_cache_valid(("2026-05-17", 10))
assert cache_service.get_growth_cache()["chart_data"]["revenue"] == [1000]
assert shared_cache.exists()
def test_growth_cache_reuses_expired_ttl_when_source_fingerprint_matches(tmp_path, monkeypatch):
import pickle
from datetime import timedelta
from services import cache_service
shared_cache = tmp_path / "growth_analysis_cache.pkl"
monkeypatch.setattr(cache_service, "_GROWTH_SHARED_CACHE_FILE", shared_cache)
cache_service.clear_growth_cache()
cache_service.set_growth_cache(
{"labels": ["2026-05"], "revenue": [1000]},
{"ytd_revenue": 1000},
source_fingerprint=("2026-05-17", 10),
)
stale_timestamp = (
cache_service.datetime.now(cache_service.TAIPEI_TZ)
- timedelta(seconds=cache_service._GROWTH_CACHE_TTL + 60)
)
cache_service._GROWTH_ANALYSIS_CACHE["timestamp"] = stale_timestamp
payload = pickle.loads(shared_cache.read_bytes())
payload["timestamp"] = stale_timestamp
shared_cache.write_bytes(pickle.dumps(payload, protocol=pickle.HIGHEST_PROTOCOL))
assert cache_service.is_growth_cache_valid(("2026-05-17", 10))
assert not cache_service.is_growth_cache_valid(("2026-05-18", 10))
assert not cache_service.is_growth_cache_valid()
def test_clear_growth_cache_removes_shared_file(tmp_path, monkeypatch):
from services import cache_service
shared_cache = tmp_path / "growth_analysis_cache.pkl"
shared_cache.write_bytes(b"stale")
monkeypatch.setattr(cache_service, "_GROWTH_SHARED_CACHE_FILE", shared_cache)
cache_service.clear_growth_cache()
assert not shared_cache.exists()
def test_growth_source_fingerprint_short_cache_clears_with_growth_cache(tmp_path, monkeypatch):
from services import cache_service
shared_cache = tmp_path / "growth_analysis_cache.pkl"
monkeypatch.setattr(cache_service, "_GROWTH_SHARED_CACHE_FILE", shared_cache)
cache_service.clear_growth_cache()
cache_service.set_growth_source_fingerprint_cache("growth:source", ("2026-05-17", 10))
assert cache_service.get_growth_source_fingerprint_cache("growth:source") == ("2026-05-17", 10)
cache_service.clear_growth_cache()
assert cache_service.get_growth_source_fingerprint_cache("growth:source") is None
def test_daily_sales_shared_view_cache_roundtrip(tmp_path, monkeypatch):
from routes import daily_sales_routes
monkeypatch.setattr(daily_sales_routes, "_DAILY_SALES_VIEW_CACHE_DIR", tmp_path)
cache_key = "daily_sales:view:test"
context = {"summary": {"revenue": 456}, "active_page": "daily_sales"}
daily_sales_routes._set_shared_daily_view_cache(cache_key, context)
assert daily_sales_routes._get_shared_daily_view_cache(cache_key) == context
def test_clear_daily_sales_cache_removes_shared_view_cache_files(tmp_path, monkeypatch):
from services import cache_manager
monkeypatch.setattr(cache_manager, "_DAILY_SALES_VIEW_CACHE_DIR", tmp_path)
tmp_path.mkdir(parents=True, exist_ok=True)
cache_file = tmp_path / "daily_sales_view.pkl"
cache_file.write_bytes(b"stale")
cache_manager.clear_daily_sales_cache()
assert not cache_file.exists()
def test_daily_sales_metadata_uses_single_postgres_query():
from routes import daily_sales_routes
class FakeDialect:
name = "postgresql"
class FakeResult:
def fetchone(self):
return ("2026-05-17", 85118, [date(2026, 5, 17), date(2026, 5, 16)])
class FakeConnection:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def execute(self, query):
self.query = str(query)
return FakeResult()
class FakeEngine:
dialect = FakeDialect()
def connect(self):
return FakeConnection()
dates, fingerprint = daily_sales_routes._get_daily_sales_metadata(FakeEngine())
assert [d.strftime("%Y-%m-%d") for d in dates] == ["2026-05-17", "2026-05-16"]
assert fingerprint == ("2026-05-17", 85118)
def test_daily_sales_metadata_falls_back_for_sqlite(monkeypatch):
from routes import daily_sales_routes
class FakeDialect:
name = "sqlite"
class FakeEngine:
dialect = FakeDialect()
monkeypatch.setattr(daily_sales_routes, "_get_available_daily_dates", lambda engine, table_name: ["date-a"])
monkeypatch.setattr(daily_sales_routes, "_get_data_fingerprint", lambda engine, table_name: ("date-a", 1))
assert daily_sales_routes._get_daily_sales_metadata(FakeEngine()) == (["date-a"], ("date-a", 1))
def test_promo_dashboard_shared_cache_roundtrip(tmp_path, monkeypatch):
from routes import edm_routes
shared_cache = tmp_path / "promo_dashboard_cache.pkl"
monkeypatch.setattr(edm_routes, "_PROMO_SHARED_CACHE_FILE", shared_cache)
edm_routes._PROMO_DASHBOARD_CACHE.clear()
cache_key = ("edm", "default", "desc", "", "2026-05-19-12", (10, "2026-05-19T12:00:00", 30))
data = {
"sorted_grouped_items": {"11:00": []},
"slot_stats": {"11:00": {"on_shelf": 0}},
"items_in_batch": [],
"last_update_str": "2026-05-19 12:00",
"activity_time": "11:00",
"active_tab": "11:00",
"current_batch_id": "batch-1",
}
edm_routes._write_shared_promo_dashboard_cache(cache_key, data)
edm_routes._PROMO_DASHBOARD_CACHE.clear()
assert edm_routes._load_shared_promo_dashboard_cache(cache_key) == data
assert shared_cache.exists()
def test_promo_dashboard_shared_cache_ignores_other_versions(tmp_path, monkeypatch):
from routes import edm_routes
shared_cache = tmp_path / "promo_dashboard_cache.pkl"
cache_key = ("edm", "default", "desc", "", "bucket", (1, "ts", 1))
shared_cache.write_bytes(
pickle.dumps(
{"version": "older-version", "entries": {cache_key: {"stale": True}}},
protocol=pickle.HIGHEST_PROTOCOL,
)
)
monkeypatch.setattr(edm_routes, "_PROMO_SHARED_CACHE_FILE", shared_cache)
assert edm_routes._load_shared_promo_dashboard_cache(cache_key) is None
def test_promo_dashboard_shared_cache_discards_corrupt_payload(tmp_path, monkeypatch):
from routes import edm_routes
shared_cache = tmp_path / "promo_dashboard_cache.pkl"
shared_cache.write_bytes(b"\x96not-a-pickle")
monkeypatch.setattr(edm_routes, "_PROMO_SHARED_CACHE_FILE", shared_cache)
assert edm_routes._load_shared_promo_dashboard_cache(("edm",)) is None
assert not shared_cache.exists()
def test_promo_dashboard_shared_cache_write_recovers_from_corrupt_payload(tmp_path, monkeypatch):
from routes import edm_routes
shared_cache = tmp_path / "promo_dashboard_cache.pkl"
shared_cache.write_bytes(b"\x96not-a-pickle")
cache_key = ("edm", "default", "desc", "", "bucket", (1, "ts", 1))
data = {"items_in_batch": [{"sku": "SKU-1"}]}
monkeypatch.setattr(edm_routes, "_PROMO_SHARED_CACHE_FILE", shared_cache)
edm_routes._write_shared_promo_dashboard_cache(cache_key, data)
assert edm_routes._load_shared_promo_dashboard_cache(cache_key) == data
def test_sales_analysis_preview_context_cache_avoids_reloading_options(tmp_path, monkeypatch):
from routes import sales_routes
app = Flask(__name__)
app.secret_key = "test"
class FakeDatabaseManager:
engine = object()
class FakeInspector:
def has_table(self, table_name):
return table_name == "realtime_sales_monthly"
preview_calls = {"count": 0}
def fake_preview_options(_engine, _table_name):
preview_calls["count"] += 1
return {
"categories": ["美妝"],
"brands": ["品牌"],
"vendors": ["廠商"],
"activities": ["活動"],
"payments": ["付款"],
"months": ["2026-05"],
}
rendered = []
def fake_render_template(template_name, **context):
rendered.append((template_name, context))
return context
monkeypatch.setattr(sales_routes, "_SALES_ANALYSIS_PAGE_CACHE_DIR", tmp_path)
monkeypatch.setattr(sales_routes, "DatabaseManager", FakeDatabaseManager)
monkeypatch.setattr(sales_routes, "inspect", lambda _engine: FakeInspector())
monkeypatch.setattr(sales_routes, "_fetch_sales_data_range", lambda *_args: "2026/05/01 - 2026/05/13")
monkeypatch.setattr(sales_routes, "_preview_sales_filter_options", fake_preview_options)
monkeypatch.setattr(sales_routes, "render_template", fake_render_template)
sales_routes._SALES_ANALYSIS_RESULT_CACHE.clear()
for _ in range(2):
with app.test_request_context("/sales_analysis"):
session["logged_in"] = True
response = sales_routes.sales_analysis()
assert response["no_filter"] is True
assert response["all_categories"] == ["美妝"]
assert preview_calls["count"] == 1
assert len(rendered) == 2
def test_cache_dicts_are_only_defined_in_cache_manager():
assignments = []
for path in [ROOT / "app.py", *ROOT.glob("routes/*.py"), *ROOT.glob("services/*.py")]:
text = path.read_text(encoding="utf-8")
for marker in ("_SALES_DF_CACHE = {}", "_SALES_PROCESSED_CACHE = {}", "_DASHBOARD_DATA_CACHE = {"):
if marker in text:
assignments.append((path.relative_to(ROOT).as_posix(), marker))
assert assignments == [
("services/cache_manager.py", "_SALES_DF_CACHE = {}"),
("services/cache_manager.py", "_SALES_PROCESSED_CACHE = {}"),
("services/cache_manager.py", "_DASHBOARD_DATA_CACHE = {"),
]
def test_sales_cache_ttl_is_only_defined_in_cache_manager():
assignments = []
for path in [ROOT / "app.py", *ROOT.glob("routes/*.py"), *ROOT.glob("services/*.py")]:
text = path.read_text(encoding="utf-8")
if "_SALES_CACHE_TTL =" in text:
assignments.append(path.relative_to(ROOT).as_posix())
assert assignments == ["services/cache_manager.py"]