diff --git a/apps/api/src/services/awooop_truth_chain_service.py b/apps/api/src/services/awooop_truth_chain_service.py index 7f7ea8d0..434e6849 100644 --- a/apps/api/src/services/awooop_truth_chain_service.py +++ b/apps/api/src/services/awooop_truth_chain_service.py @@ -27,8 +27,8 @@ from src.services.awooop_ansible_check_mode_service import detect_ansible_transp from src.services.awooop_ansible_audit_service import build_ansible_truth from src.services.drift_repeat_state import build_drift_repeat_state from src.services.operator_summary_cache import ( - get_cached_operator_summary, - store_operator_summary, + get_cached_operator_summary_async, + store_operator_summary_async, ) logger = structlog.get_logger(__name__) @@ -1757,7 +1757,7 @@ async def fetch_automation_quality_summary( "limit": bounded_limit, } if not refresh: - cached_summary = get_cached_operator_summary( + cached_summary = await get_cached_operator_summary_async( "truth_chain_quality_summary", cache_key, ttl_seconds=_QUALITY_SUMMARY_CACHE_TTL_SECONDS, @@ -1876,7 +1876,7 @@ async def fetch_automation_quality_summary( cache_status="miss", cache_ttl_seconds=_QUALITY_SUMMARY_CACHE_TTL_SECONDS, ) - return store_operator_summary( + return await store_operator_summary_async( "truth_chain_quality_summary", cache_key, summary, diff --git a/apps/api/src/services/operator_summary_cache.py b/apps/api/src/services/operator_summary_cache.py index d8f76709..b5b94a3c 100644 --- a/apps/api/src/services/operator_summary_cache.py +++ b/apps/api/src/services/operator_summary_cache.py @@ -66,6 +66,30 @@ def _with_cache_meta(value: dict[str, Any], meta: dict[str, Any]) -> dict[str, A return response +def _redis_key(namespace: str, key_parts: dict[str, Any]) -> str: + return f"awooop:operator_summary:{_cache_key(namespace, key_parts)}" + + +def _json_default(value: Any) -> str: + if isinstance(value, datetime): + return value.isoformat() + return str(value) + + +def _decode_redis_value(raw: Any) -> dict[str, Any] | None: + if raw is None: + return None + if isinstance(raw, bytes): + raw = raw.decode("utf-8") + if not isinstance(raw, str): + return None + try: + payload = json.loads(raw) + except json.JSONDecodeError: + return None + return payload if isinstance(payload, dict) else None + + def get_cached_operator_summary( namespace: str, key_parts: dict[str, Any], @@ -92,6 +116,58 @@ def get_cached_operator_summary( ) +async def get_cached_operator_summary_async( + namespace: str, + key_parts: dict[str, Any], + *, + ttl_seconds: int, + now_monotonic: float | None = None, +) -> dict[str, Any] | None: + """Return a shared Redis cache hit, falling back to process memory.""" + redis_key = _redis_key(namespace, key_parts) + try: + from src.core.redis_client import get_redis + + redis_client = get_redis() + payload = _decode_redis_value(await redis_client.get(redis_key)) + if payload is not None and isinstance(payload.get("value"), dict): + stored_epoch = float(payload.get("stored_epoch") or 0.0) + now_epoch = time.time() + ttl_value = max(1, int(ttl_seconds)) + age_seconds = now_epoch - stored_epoch + if 0 <= age_seconds < ttl_value: + stored_at_raw = payload.get("stored_at") + try: + stored_at = datetime.fromisoformat(str(stored_at_raw)) + except ValueError: + stored_at = datetime.now(UTC) - timedelta(seconds=age_seconds) + record = _CacheRecord( + value=payload["value"], + stored_at=stored_at, + stored_monotonic=(now_monotonic or time.monotonic()) + - age_seconds, + ttl_seconds=ttl_value, + ) + return _with_cache_meta( + record.value, + _cache_meta( + status="hit", + record=record, + age_seconds=age_seconds, + ), + ) + await redis_client.delete(redis_key) + except Exception: + pass + + return get_cached_operator_summary( + namespace, + key_parts, + ttl_seconds=ttl_seconds, + now_monotonic=now_monotonic, + ) + + def store_operator_summary( namespace: str, key_parts: dict[str, Any], @@ -117,6 +193,46 @@ def store_operator_summary( ) +async def store_operator_summary_async( + namespace: str, + key_parts: dict[str, Any], + value: dict[str, Any], + *, + ttl_seconds: int, + now_monotonic: float | None = None, + now_utc: datetime | None = None, +) -> dict[str, Any]: + """Store a fresh summary in Redis and process memory.""" + stored_at = now_utc or datetime.now(UTC) + ttl_value = max(1, int(ttl_seconds)) + response = store_operator_summary( + namespace, + key_parts, + value, + ttl_seconds=ttl_value, + now_monotonic=now_monotonic, + now_utc=stored_at, + ) + payload = { + "value": deepcopy(value), + "stored_at": stored_at.isoformat(), + "stored_epoch": time.time(), + "ttl_seconds": ttl_value, + } + try: + from src.core.redis_client import get_redis + + redis_client = get_redis() + await redis_client.set( + _redis_key(namespace, key_parts), + json.dumps(payload, ensure_ascii=False, default=_json_default), + ex=ttl_value, + ) + except Exception: + pass + return response + + def clear_operator_summary_cache() -> None: """Clear process-local cache for tests and controlled operator refreshes.""" _CACHE.clear() diff --git a/apps/api/src/services/platform_operator_service.py b/apps/api/src/services/platform_operator_service.py index f26a9b11..c5a9f67e 100644 --- a/apps/api/src/services/platform_operator_service.py +++ b/apps/api/src/services/platform_operator_service.py @@ -61,8 +61,8 @@ from src.services.ollama_failover_manager import ( from src.services.ollama_health_monitor import HealthReport, HealthStatus from src.services.operator_outcome import build_operator_outcome from src.services.operator_summary_cache import ( - get_cached_operator_summary, - store_operator_summary, + get_cached_operator_summary_async, + store_operator_summary_async, ) from src.services.run_state_machine import transition @@ -337,7 +337,7 @@ async def list_callback_replies( "per_page": per_page, } if not refresh: - cached_response = get_cached_operator_summary( + cached_response = await get_cached_operator_summary_async( "callback_replies", cache_key, ttl_seconds=_CALLBACK_REPLY_CACHE_TTL_SECONDS, @@ -515,7 +515,7 @@ async def list_callback_replies( cache_status="miss", cache_ttl_seconds=_CALLBACK_REPLY_CACHE_TTL_SECONDS, ) - return store_operator_summary( + return await store_operator_summary_async( "callback_replies", cache_key, response, diff --git a/apps/api/tests/test_operator_summary_cache.py b/apps/api/tests/test_operator_summary_cache.py index 83621450..f9cb14ea 100644 --- a/apps/api/tests/test_operator_summary_cache.py +++ b/apps/api/tests/test_operator_summary_cache.py @@ -1,12 +1,33 @@ from datetime import UTC, datetime +import pytest + from src.services.operator_summary_cache import ( clear_operator_summary_cache, + get_cached_operator_summary_async, get_cached_operator_summary, + store_operator_summary_async, store_operator_summary, ) +class _FakeRedis: + def __init__(self) -> None: + self.values: dict[str, str] = {} + self.expirations: dict[str, int] = {} + + async def get(self, key: str) -> str | None: + return self.values.get(key) + + async def set(self, key: str, value: str, ex: int) -> bool: + self.values[key] = value + self.expirations[key] = ex + return True + + async def delete(self, key: str) -> int: + return 1 if self.values.pop(key, None) is not None else 0 + + def test_operator_summary_cache_returns_copy_with_hit_metadata() -> None: clear_operator_summary_cache() key = {"project_id": "awoooi", "limit": 30} @@ -47,6 +68,38 @@ def test_operator_summary_cache_expires_by_ttl() -> None: now_utc=datetime(2026, 6, 1, tzinfo=UTC), ) + +@pytest.mark.asyncio +async def test_operator_summary_cache_uses_shared_redis(monkeypatch) -> None: + clear_operator_summary_cache() + fake_redis = _FakeRedis() + monkeypatch.setattr("src.core.redis_client.get_redis", lambda: fake_redis) + key = {"project_id": "awoooi", "per_page": 1} + + stored = await store_operator_summary_async( + "callback_replies", + key, + {"total": 4, "items": []}, + ttl_seconds=20, + now_monotonic=300.0, + now_utc=datetime(2026, 6, 1, tzinfo=UTC), + ) + + assert stored["cache"]["status"] == "miss" + assert fake_redis.values + + clear_operator_summary_cache() + cached = await get_cached_operator_summary_async( + "callback_replies", + key, + ttl_seconds=20, + now_monotonic=302.0, + ) + + assert cached is not None + assert cached["cache"]["status"] == "hit" + assert cached["total"] == 4 + assert ( get_cached_operator_summary( "callback_replies",