"""Short TTL cache for read-only AwoooP operator summaries. This cache intentionally lives in the API pod memory. It reduces repeated heavy operator-console reads without becoming a new source of truth. """ from __future__ import annotations import hashlib import json import time from copy import deepcopy from dataclasses import dataclass from datetime import UTC, datetime, timedelta from typing import Any import structlog _CACHE_SCHEMA_VERSION = "operator_summary_cache_v1" _CACHE_SOURCE = "api_pod_memory" logger = structlog.get_logger(__name__) @dataclass(slots=True) class _CacheRecord: value: dict[str, Any] stored_at: datetime stored_monotonic: float ttl_seconds: int _CACHE: dict[str, _CacheRecord] = {} def _cache_key(namespace: str, key_parts: dict[str, Any]) -> str: payload = json.dumps( {"namespace": namespace, "key_parts": key_parts}, ensure_ascii=False, sort_keys=True, default=str, ) digest = hashlib.sha256(payload.encode("utf-8")).hexdigest() return f"{namespace}:{digest}" def _cache_meta( *, status: str, record: _CacheRecord, age_seconds: float, ) -> dict[str, Any]: ttl_seconds = max(1, int(record.ttl_seconds)) expires_at = record.stored_at + timedelta(seconds=ttl_seconds) return { "schema_version": _CACHE_SCHEMA_VERSION, "status": status, "source": _CACHE_SOURCE, "ttl_seconds": ttl_seconds, "age_seconds": round(max(0.0, age_seconds), 3), "stored_at": record.stored_at.isoformat(), "expires_at": expires_at.isoformat(), } def _with_cache_meta(value: dict[str, Any], meta: dict[str, Any]) -> dict[str, Any]: response = deepcopy(value) response["cache"] = meta return response def _redis_key(namespace: str, key_parts: dict[str, Any]) -> str: return f"awooop:operator_summary:{_cache_key(namespace, key_parts)}" def _json_default(value: Any) -> str: if isinstance(value, datetime): return value.isoformat() return str(value) def _decode_redis_value(raw: Any) -> dict[str, Any] | None: if raw is None: return None if isinstance(raw, bytes): raw = raw.decode("utf-8") if not isinstance(raw, str): return None try: payload = json.loads(raw) except json.JSONDecodeError: return None return payload if isinstance(payload, dict) else None async def _get_redis_client() -> Any: from src.core.redis_client import get_redis, init_redis_pool try: return get_redis() except RuntimeError: return await init_redis_pool() def get_cached_operator_summary( namespace: str, key_parts: dict[str, Any], *, ttl_seconds: int, now_monotonic: float | None = None, ) -> dict[str, Any] | None: """Return cached summary with hit metadata, or None if absent/expired.""" cache_key = _cache_key(namespace, key_parts) record = _CACHE.get(cache_key) if record is None: return None now_value = time.monotonic() if now_monotonic is None else now_monotonic ttl_value = max(1, int(ttl_seconds)) age_seconds = now_value - record.stored_monotonic if age_seconds >= ttl_value: _CACHE.pop(cache_key, None) return None return _with_cache_meta( record.value, _cache_meta(status="hit", record=record, age_seconds=age_seconds), ) async def get_cached_operator_summary_async( namespace: str, key_parts: dict[str, Any], *, ttl_seconds: int, now_monotonic: float | None = None, ) -> dict[str, Any] | None: """Return a shared Redis cache hit, falling back to process memory.""" redis_key = _redis_key(namespace, key_parts) try: redis_client = await _get_redis_client() payload = _decode_redis_value(await redis_client.get(redis_key)) if payload is not None and isinstance(payload.get("value"), dict): stored_epoch = float(payload.get("stored_epoch") or 0.0) now_epoch = time.time() ttl_value = max(1, int(ttl_seconds)) age_seconds = now_epoch - stored_epoch if 0 <= age_seconds < ttl_value: stored_at_raw = payload.get("stored_at") try: stored_at = datetime.fromisoformat(str(stored_at_raw)) except ValueError: stored_at = datetime.now(UTC) - timedelta(seconds=age_seconds) record = _CacheRecord( value=payload["value"], stored_at=stored_at, stored_monotonic=(now_monotonic or time.monotonic()) - age_seconds, ttl_seconds=ttl_value, ) return _with_cache_meta( record.value, _cache_meta( status="hit", record=record, age_seconds=age_seconds, ), ) await redis_client.delete(redis_key) except Exception as exc: logger.debug( "operator_summary_redis_cache_read_failed", namespace=namespace, error=str(exc), ) return get_cached_operator_summary( namespace, key_parts, ttl_seconds=ttl_seconds, now_monotonic=now_monotonic, ) def store_operator_summary( namespace: str, key_parts: dict[str, Any], value: dict[str, Any], *, ttl_seconds: int, now_monotonic: float | None = None, now_utc: datetime | None = None, ) -> dict[str, Any]: """Store a fresh summary and return it with miss metadata.""" cache_key = _cache_key(namespace, key_parts) stored_at = now_utc or datetime.now(UTC) record = _CacheRecord( value=deepcopy(value), stored_at=stored_at, stored_monotonic=time.monotonic() if now_monotonic is None else now_monotonic, ttl_seconds=max(1, int(ttl_seconds)), ) _CACHE[cache_key] = record return _with_cache_meta( record.value, _cache_meta(status="miss", record=record, age_seconds=0.0), ) async def store_operator_summary_async( namespace: str, key_parts: dict[str, Any], value: dict[str, Any], *, ttl_seconds: int, now_monotonic: float | None = None, now_utc: datetime | None = None, ) -> dict[str, Any]: """Store a fresh summary in Redis and process memory.""" stored_at = now_utc or datetime.now(UTC) ttl_value = max(1, int(ttl_seconds)) response = store_operator_summary( namespace, key_parts, value, ttl_seconds=ttl_value, now_monotonic=now_monotonic, now_utc=stored_at, ) payload = { "value": deepcopy(value), "stored_at": stored_at.isoformat(), "stored_epoch": time.time(), "ttl_seconds": ttl_value, } try: redis_client = await _get_redis_client() await redis_client.set( _redis_key(namespace, key_parts), json.dumps(payload, ensure_ascii=False, default=_json_default), ex=ttl_value, ) except Exception as exc: logger.debug( "operator_summary_redis_cache_write_failed", namespace=namespace, error=str(exc), ) return response def clear_operator_summary_cache() -> None: """Clear process-local cache for tests and controlled operator refreshes.""" _CACHE.clear()