59 lines
1.5 KiB
Python
59 lines
1.5 KiB
Python
from datetime import UTC, datetime
|
|
|
|
from src.services.operator_summary_cache import (
|
|
clear_operator_summary_cache,
|
|
get_cached_operator_summary,
|
|
store_operator_summary,
|
|
)
|
|
|
|
|
|
def test_operator_summary_cache_returns_copy_with_hit_metadata() -> None:
|
|
clear_operator_summary_cache()
|
|
key = {"project_id": "awoooi", "limit": 30}
|
|
stored = store_operator_summary(
|
|
"truth_chain_quality_summary",
|
|
key,
|
|
{"evaluated_total": 30, "nested": {"ok": True}},
|
|
ttl_seconds=30,
|
|
now_monotonic=100.0,
|
|
now_utc=datetime(2026, 6, 1, tzinfo=UTC),
|
|
)
|
|
|
|
assert stored["cache"]["status"] == "miss"
|
|
stored["nested"]["ok"] = False
|
|
|
|
cached = get_cached_operator_summary(
|
|
"truth_chain_quality_summary",
|
|
key,
|
|
ttl_seconds=30,
|
|
now_monotonic=105.5,
|
|
)
|
|
|
|
assert cached is not None
|
|
assert cached["cache"]["status"] == "hit"
|
|
assert cached["cache"]["age_seconds"] == 5.5
|
|
assert cached["nested"]["ok"] is True
|
|
|
|
|
|
def test_operator_summary_cache_expires_by_ttl() -> None:
|
|
clear_operator_summary_cache()
|
|
key = {"project_id": "awoooi", "page": 1}
|
|
store_operator_summary(
|
|
"callback_replies",
|
|
key,
|
|
{"total": 4},
|
|
ttl_seconds=20,
|
|
now_monotonic=200.0,
|
|
now_utc=datetime(2026, 6, 1, tzinfo=UTC),
|
|
)
|
|
|
|
assert (
|
|
get_cached_operator_summary(
|
|
"callback_replies",
|
|
key,
|
|
ttl_seconds=20,
|
|
now_monotonic=220.0,
|
|
)
|
|
is None
|
|
)
|