349 lines
12 KiB
Python
349 lines
12 KiB
Python
from pathlib import Path
|
|
from datetime import date
|
|
import pickle
|
|
from flask import Flask, session
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
|
|
def test_cache_service_reexports_cache_manager_state():
|
|
from services import cache_manager, cache_service
|
|
|
|
assert cache_service._SALES_DF_CACHE is cache_manager._SALES_DF_CACHE
|
|
assert cache_service._SALES_PROCESSED_CACHE is cache_manager._SALES_PROCESSED_CACHE
|
|
assert cache_service._SALES_OPTIONS_CACHE is cache_manager._SALES_OPTIONS_CACHE
|
|
assert cache_service._SALES_ANALYSIS_RESULT_CACHE is cache_manager._SALES_ANALYSIS_RESULT_CACHE
|
|
assert cache_service._DASHBOARD_DATA_CACHE is cache_manager._DASHBOARD_DATA_CACHE
|
|
assert cache_service._SALES_RESULT_TTL == cache_manager._SALES_CACHE_TTL
|
|
|
|
|
|
def test_routes_share_sales_cache_manager_state():
|
|
from services import cache_manager
|
|
from routes import export_routes, import_routes, sales_routes
|
|
|
|
sales_df_cache, sales_processed_cache, clear_for_table = import_routes._get_cache_refs()
|
|
|
|
assert sales_routes._SALES_PROCESSED_CACHE is cache_manager._SALES_PROCESSED_CACHE
|
|
assert export_routes._get_sales_cache() is cache_manager._SALES_PROCESSED_CACHE
|
|
assert sales_df_cache is cache_manager._SALES_DF_CACHE
|
|
assert sales_processed_cache is cache_manager._SALES_PROCESSED_CACHE
|
|
assert clear_for_table is cache_manager.clear_sales_cache_for_table
|
|
|
|
|
|
def test_clear_sales_cache_for_table_removes_table_and_range_aliases():
|
|
from services import cache_manager
|
|
|
|
cache_manager.clear_sales_cache()
|
|
cache_manager._SALES_DF_CACHE["realtime_sales_monthly"] = object()
|
|
cache_manager._SALES_DF_CACHE["other_table"] = object()
|
|
cache_manager._SALES_PROCESSED_CACHE.update({
|
|
"realtime_sales_monthly": {"df": "base"},
|
|
"realtime_sales_monthly_1m": {"df": "range"},
|
|
"realtime_sales_monthly_custom_2026-01-01_2026-01-31": {"df": "custom"},
|
|
"other_table": {"df": "other"},
|
|
})
|
|
|
|
cache_manager.clear_sales_cache_for_table("realtime_sales_monthly")
|
|
|
|
assert "realtime_sales_monthly" not in cache_manager._SALES_DF_CACHE
|
|
assert "other_table" in cache_manager._SALES_DF_CACHE
|
|
assert "realtime_sales_monthly" not in cache_manager._SALES_PROCESSED_CACHE
|
|
assert "realtime_sales_monthly_1m" not in cache_manager._SALES_PROCESSED_CACHE
|
|
assert "realtime_sales_monthly_custom_2026-01-01_2026-01-31" not in cache_manager._SALES_PROCESSED_CACHE
|
|
assert "other_table" in cache_manager._SALES_PROCESSED_CACHE
|
|
|
|
|
|
def test_set_sales_processed_cache_adds_timestamp_and_aliases():
|
|
from services import cache_manager
|
|
|
|
cache_manager.clear_sales_cache()
|
|
entry = {"df": "data", "cols": {"date": "日期"}}
|
|
|
|
cache_manager.set_sales_processed_cache(
|
|
"realtime_sales_monthly_3m",
|
|
entry,
|
|
aliases=("realtime_sales_monthly",),
|
|
)
|
|
|
|
cached = cache_manager._SALES_PROCESSED_CACHE["realtime_sales_monthly_3m"]
|
|
assert cached is entry
|
|
assert cached["time"] > 0
|
|
assert cache_manager._SALES_PROCESSED_CACHE["realtime_sales_monthly"] is entry
|
|
|
|
|
|
def test_dashboard_cache_clear_restores_expected_shape(tmp_path, monkeypatch):
|
|
from services import cache_manager
|
|
|
|
shared_cache = tmp_path / "dashboard_full_cache.pkl"
|
|
stale_cache = tmp_path / "dashboard_full_cache_stale.pkl"
|
|
shared_cache.write_bytes(b"stale")
|
|
monkeypatch.setattr(cache_manager, "_DASHBOARD_SHARED_CACHE_FILE", shared_cache)
|
|
monkeypatch.setattr(cache_manager, "_DASHBOARD_STALE_CACHE_FILE", stale_cache)
|
|
|
|
cache_manager._DASHBOARD_DATA_CACHE["consolidated_data"] = ["stale"]
|
|
cache_manager._DASHBOARD_DATA_CACHE["full_data"] = ["stale"]
|
|
|
|
cache_manager.clear_dashboard_cache()
|
|
|
|
assert cache_manager._DASHBOARD_DATA_CACHE == {
|
|
"consolidated_data": None,
|
|
"consolidated_timestamp": None,
|
|
"today_start": None,
|
|
"full_data": None,
|
|
"full_timestamp": None,
|
|
}
|
|
assert not shared_cache.exists()
|
|
assert stale_cache.exists()
|
|
|
|
|
|
def test_sales_analysis_shared_page_cache_roundtrip(tmp_path, monkeypatch):
|
|
from routes import sales_routes
|
|
|
|
monkeypatch.setattr(sales_routes, "_SALES_ANALYSIS_PAGE_CACHE_DIR", tmp_path)
|
|
cache_key = "sales_analysis:page_context:test"
|
|
context = {"kpi": {"revenue": 123}, "active_page": "sales"}
|
|
|
|
sales_routes._set_sales_shared_page_context_cache(cache_key, context)
|
|
|
|
assert sales_routes._get_sales_shared_page_context_cache(cache_key) == context
|
|
|
|
|
|
def test_clear_sales_cache_removes_shared_page_cache_files(tmp_path, monkeypatch):
|
|
from services import cache_manager
|
|
|
|
monkeypatch.setattr(cache_manager, "_SALES_ANALYSIS_PAGE_CACHE_DIR", tmp_path)
|
|
tmp_path.mkdir(parents=True, exist_ok=True)
|
|
cache_file = tmp_path / "sales_analysis_page.pkl"
|
|
cache_file.write_bytes(b"stale")
|
|
|
|
cache_manager.clear_sales_cache()
|
|
|
|
assert not cache_file.exists()
|
|
|
|
|
|
def test_growth_cache_shared_file_roundtrip(tmp_path, monkeypatch):
|
|
from services import cache_service
|
|
|
|
shared_cache = tmp_path / "growth_analysis_cache.pkl"
|
|
monkeypatch.setattr(cache_service, "_GROWTH_SHARED_CACHE_FILE", shared_cache)
|
|
|
|
cache_service.clear_growth_cache()
|
|
cache_service.set_growth_cache(
|
|
{"labels": ["2026-05"], "revenue": [1000]},
|
|
{"ytd_revenue": 1000},
|
|
source_fingerprint=("2026-05-17", 10),
|
|
)
|
|
|
|
cache_service._GROWTH_ANALYSIS_CACHE = {
|
|
"chart_data": None,
|
|
"kpi": None,
|
|
"timestamp": None,
|
|
"source_fingerprint": None,
|
|
}
|
|
|
|
assert cache_service.is_growth_cache_valid(("2026-05-17", 10))
|
|
assert cache_service.get_growth_cache()["chart_data"]["revenue"] == [1000]
|
|
assert shared_cache.exists()
|
|
|
|
|
|
def test_clear_growth_cache_removes_shared_file(tmp_path, monkeypatch):
|
|
from services import cache_service
|
|
|
|
shared_cache = tmp_path / "growth_analysis_cache.pkl"
|
|
shared_cache.write_bytes(b"stale")
|
|
monkeypatch.setattr(cache_service, "_GROWTH_SHARED_CACHE_FILE", shared_cache)
|
|
|
|
cache_service.clear_growth_cache()
|
|
|
|
assert not shared_cache.exists()
|
|
|
|
|
|
def test_daily_sales_shared_view_cache_roundtrip(tmp_path, monkeypatch):
|
|
from routes import daily_sales_routes
|
|
|
|
monkeypatch.setattr(daily_sales_routes, "_DAILY_SALES_VIEW_CACHE_DIR", tmp_path)
|
|
cache_key = "daily_sales:view:test"
|
|
context = {"summary": {"revenue": 456}, "active_page": "daily_sales"}
|
|
|
|
daily_sales_routes._set_shared_daily_view_cache(cache_key, context)
|
|
|
|
assert daily_sales_routes._get_shared_daily_view_cache(cache_key) == context
|
|
|
|
|
|
def test_clear_daily_sales_cache_removes_shared_view_cache_files(tmp_path, monkeypatch):
|
|
from services import cache_manager
|
|
|
|
monkeypatch.setattr(cache_manager, "_DAILY_SALES_VIEW_CACHE_DIR", tmp_path)
|
|
tmp_path.mkdir(parents=True, exist_ok=True)
|
|
cache_file = tmp_path / "daily_sales_view.pkl"
|
|
cache_file.write_bytes(b"stale")
|
|
|
|
cache_manager.clear_daily_sales_cache()
|
|
|
|
assert not cache_file.exists()
|
|
|
|
|
|
def test_daily_sales_metadata_uses_single_postgres_query():
|
|
from routes import daily_sales_routes
|
|
|
|
class FakeDialect:
|
|
name = "postgresql"
|
|
|
|
class FakeResult:
|
|
def fetchone(self):
|
|
return ("2026-05-17", 85118, [date(2026, 5, 17), date(2026, 5, 16)])
|
|
|
|
class FakeConnection:
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def execute(self, query):
|
|
self.query = str(query)
|
|
return FakeResult()
|
|
|
|
class FakeEngine:
|
|
dialect = FakeDialect()
|
|
|
|
def connect(self):
|
|
return FakeConnection()
|
|
|
|
dates, fingerprint = daily_sales_routes._get_daily_sales_metadata(FakeEngine())
|
|
|
|
assert [d.strftime("%Y-%m-%d") for d in dates] == ["2026-05-17", "2026-05-16"]
|
|
assert fingerprint == ("2026-05-17", 85118)
|
|
|
|
|
|
def test_daily_sales_metadata_falls_back_for_sqlite(monkeypatch):
|
|
from routes import daily_sales_routes
|
|
|
|
class FakeDialect:
|
|
name = "sqlite"
|
|
|
|
class FakeEngine:
|
|
dialect = FakeDialect()
|
|
|
|
monkeypatch.setattr(daily_sales_routes, "_get_available_daily_dates", lambda engine, table_name: ["date-a"])
|
|
monkeypatch.setattr(daily_sales_routes, "_get_data_fingerprint", lambda engine, table_name: ("date-a", 1))
|
|
|
|
assert daily_sales_routes._get_daily_sales_metadata(FakeEngine()) == (["date-a"], ("date-a", 1))
|
|
|
|
|
|
def test_promo_dashboard_shared_cache_roundtrip(tmp_path, monkeypatch):
|
|
from routes import edm_routes
|
|
|
|
shared_cache = tmp_path / "promo_dashboard_cache.pkl"
|
|
monkeypatch.setattr(edm_routes, "_PROMO_SHARED_CACHE_FILE", shared_cache)
|
|
edm_routes._PROMO_DASHBOARD_CACHE.clear()
|
|
cache_key = ("edm", "default", "desc", "", "2026-05-19-12", (10, "2026-05-19T12:00:00", 30))
|
|
data = {
|
|
"sorted_grouped_items": {"11:00": []},
|
|
"slot_stats": {"11:00": {"on_shelf": 0}},
|
|
"items_in_batch": [],
|
|
"last_update_str": "2026-05-19 12:00",
|
|
"activity_time": "11:00",
|
|
"active_tab": "11:00",
|
|
"current_batch_id": "batch-1",
|
|
}
|
|
|
|
edm_routes._write_shared_promo_dashboard_cache(cache_key, data)
|
|
edm_routes._PROMO_DASHBOARD_CACHE.clear()
|
|
|
|
assert edm_routes._load_shared_promo_dashboard_cache(cache_key) == data
|
|
assert shared_cache.exists()
|
|
|
|
|
|
def test_promo_dashboard_shared_cache_ignores_other_versions(tmp_path, monkeypatch):
|
|
from routes import edm_routes
|
|
|
|
shared_cache = tmp_path / "promo_dashboard_cache.pkl"
|
|
cache_key = ("edm", "default", "desc", "", "bucket", (1, "ts", 1))
|
|
shared_cache.write_bytes(
|
|
pickle.dumps(
|
|
{"version": "older-version", "entries": {cache_key: {"stale": True}}},
|
|
protocol=pickle.HIGHEST_PROTOCOL,
|
|
)
|
|
)
|
|
monkeypatch.setattr(edm_routes, "_PROMO_SHARED_CACHE_FILE", shared_cache)
|
|
|
|
assert edm_routes._load_shared_promo_dashboard_cache(cache_key) is None
|
|
|
|
|
|
def test_sales_analysis_preview_context_cache_avoids_reloading_options(tmp_path, monkeypatch):
|
|
from routes import sales_routes
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = "test"
|
|
|
|
class FakeDatabaseManager:
|
|
engine = object()
|
|
|
|
class FakeInspector:
|
|
def has_table(self, table_name):
|
|
return table_name == "realtime_sales_monthly"
|
|
|
|
preview_calls = {"count": 0}
|
|
|
|
def fake_preview_options(_engine, _table_name):
|
|
preview_calls["count"] += 1
|
|
return {
|
|
"categories": ["美妝"],
|
|
"brands": ["品牌"],
|
|
"vendors": ["廠商"],
|
|
"activities": ["活動"],
|
|
"payments": ["付款"],
|
|
"months": ["2026-05"],
|
|
}
|
|
|
|
rendered = []
|
|
|
|
def fake_render_template(template_name, **context):
|
|
rendered.append((template_name, context))
|
|
return context
|
|
|
|
monkeypatch.setattr(sales_routes, "_SALES_ANALYSIS_PAGE_CACHE_DIR", tmp_path)
|
|
monkeypatch.setattr(sales_routes, "DatabaseManager", FakeDatabaseManager)
|
|
monkeypatch.setattr(sales_routes, "inspect", lambda _engine: FakeInspector())
|
|
monkeypatch.setattr(sales_routes, "_fetch_sales_data_range", lambda *_args: "2026/05/01 - 2026/05/13")
|
|
monkeypatch.setattr(sales_routes, "_preview_sales_filter_options", fake_preview_options)
|
|
monkeypatch.setattr(sales_routes, "render_template", fake_render_template)
|
|
sales_routes._SALES_ANALYSIS_RESULT_CACHE.clear()
|
|
|
|
for _ in range(2):
|
|
with app.test_request_context("/sales_analysis"):
|
|
session["logged_in"] = True
|
|
response = sales_routes.sales_analysis()
|
|
assert response["no_filter"] is True
|
|
assert response["all_categories"] == ["美妝"]
|
|
|
|
assert preview_calls["count"] == 1
|
|
assert len(rendered) == 2
|
|
|
|
|
|
def test_cache_dicts_are_only_defined_in_cache_manager():
|
|
assignments = []
|
|
for path in [ROOT / "app.py", *ROOT.glob("routes/*.py"), *ROOT.glob("services/*.py")]:
|
|
text = path.read_text(encoding="utf-8")
|
|
for marker in ("_SALES_DF_CACHE = {}", "_SALES_PROCESSED_CACHE = {}", "_DASHBOARD_DATA_CACHE = {"):
|
|
if marker in text:
|
|
assignments.append((path.relative_to(ROOT).as_posix(), marker))
|
|
|
|
assert assignments == [
|
|
("services/cache_manager.py", "_SALES_DF_CACHE = {}"),
|
|
("services/cache_manager.py", "_SALES_PROCESSED_CACHE = {}"),
|
|
("services/cache_manager.py", "_DASHBOARD_DATA_CACHE = {"),
|
|
]
|
|
|
|
|
|
def test_sales_cache_ttl_is_only_defined_in_cache_manager():
|
|
assignments = []
|
|
for path in [ROOT / "app.py", *ROOT.glob("routes/*.py"), *ROOT.glob("services/*.py")]:
|
|
text = path.read_text(encoding="utf-8")
|
|
if "_SALES_CACHE_TTL =" in text:
|
|
assignments.append(path.relative_to(ROOT).as_posix())
|
|
|
|
assert assignments == ["services/cache_manager.py"]
|