fix(api): share operator summary cache through redis
This commit is contained in:
@@ -27,8 +27,8 @@ from src.services.awooop_ansible_check_mode_service import detect_ansible_transp
|
||||
from src.services.awooop_ansible_audit_service import build_ansible_truth
|
||||
from src.services.drift_repeat_state import build_drift_repeat_state
|
||||
from src.services.operator_summary_cache import (
|
||||
get_cached_operator_summary,
|
||||
store_operator_summary,
|
||||
get_cached_operator_summary_async,
|
||||
store_operator_summary_async,
|
||||
)
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
@@ -1757,7 +1757,7 @@ async def fetch_automation_quality_summary(
|
||||
"limit": bounded_limit,
|
||||
}
|
||||
if not refresh:
|
||||
cached_summary = get_cached_operator_summary(
|
||||
cached_summary = await get_cached_operator_summary_async(
|
||||
"truth_chain_quality_summary",
|
||||
cache_key,
|
||||
ttl_seconds=_QUALITY_SUMMARY_CACHE_TTL_SECONDS,
|
||||
@@ -1876,7 +1876,7 @@ async def fetch_automation_quality_summary(
|
||||
cache_status="miss",
|
||||
cache_ttl_seconds=_QUALITY_SUMMARY_CACHE_TTL_SECONDS,
|
||||
)
|
||||
return store_operator_summary(
|
||||
return await store_operator_summary_async(
|
||||
"truth_chain_quality_summary",
|
||||
cache_key,
|
||||
summary,
|
||||
|
||||
@@ -66,6 +66,30 @@ def _with_cache_meta(value: dict[str, Any], meta: dict[str, Any]) -> dict[str, A
|
||||
return response
|
||||
|
||||
|
||||
def _redis_key(namespace: str, key_parts: dict[str, Any]) -> str:
|
||||
return f"awooop:operator_summary:{_cache_key(namespace, key_parts)}"
|
||||
|
||||
|
||||
def _json_default(value: Any) -> str:
|
||||
if isinstance(value, datetime):
|
||||
return value.isoformat()
|
||||
return str(value)
|
||||
|
||||
|
||||
def _decode_redis_value(raw: Any) -> dict[str, Any] | None:
|
||||
if raw is None:
|
||||
return None
|
||||
if isinstance(raw, bytes):
|
||||
raw = raw.decode("utf-8")
|
||||
if not isinstance(raw, str):
|
||||
return None
|
||||
try:
|
||||
payload = json.loads(raw)
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
return payload if isinstance(payload, dict) else None
|
||||
|
||||
|
||||
def get_cached_operator_summary(
|
||||
namespace: str,
|
||||
key_parts: dict[str, Any],
|
||||
@@ -92,6 +116,58 @@ def get_cached_operator_summary(
|
||||
)
|
||||
|
||||
|
||||
async def get_cached_operator_summary_async(
|
||||
namespace: str,
|
||||
key_parts: dict[str, Any],
|
||||
*,
|
||||
ttl_seconds: int,
|
||||
now_monotonic: float | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Return a shared Redis cache hit, falling back to process memory."""
|
||||
redis_key = _redis_key(namespace, key_parts)
|
||||
try:
|
||||
from src.core.redis_client import get_redis
|
||||
|
||||
redis_client = get_redis()
|
||||
payload = _decode_redis_value(await redis_client.get(redis_key))
|
||||
if payload is not None and isinstance(payload.get("value"), dict):
|
||||
stored_epoch = float(payload.get("stored_epoch") or 0.0)
|
||||
now_epoch = time.time()
|
||||
ttl_value = max(1, int(ttl_seconds))
|
||||
age_seconds = now_epoch - stored_epoch
|
||||
if 0 <= age_seconds < ttl_value:
|
||||
stored_at_raw = payload.get("stored_at")
|
||||
try:
|
||||
stored_at = datetime.fromisoformat(str(stored_at_raw))
|
||||
except ValueError:
|
||||
stored_at = datetime.now(UTC) - timedelta(seconds=age_seconds)
|
||||
record = _CacheRecord(
|
||||
value=payload["value"],
|
||||
stored_at=stored_at,
|
||||
stored_monotonic=(now_monotonic or time.monotonic())
|
||||
- age_seconds,
|
||||
ttl_seconds=ttl_value,
|
||||
)
|
||||
return _with_cache_meta(
|
||||
record.value,
|
||||
_cache_meta(
|
||||
status="hit",
|
||||
record=record,
|
||||
age_seconds=age_seconds,
|
||||
),
|
||||
)
|
||||
await redis_client.delete(redis_key)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return get_cached_operator_summary(
|
||||
namespace,
|
||||
key_parts,
|
||||
ttl_seconds=ttl_seconds,
|
||||
now_monotonic=now_monotonic,
|
||||
)
|
||||
|
||||
|
||||
def store_operator_summary(
|
||||
namespace: str,
|
||||
key_parts: dict[str, Any],
|
||||
@@ -117,6 +193,46 @@ def store_operator_summary(
|
||||
)
|
||||
|
||||
|
||||
async def store_operator_summary_async(
|
||||
namespace: str,
|
||||
key_parts: dict[str, Any],
|
||||
value: dict[str, Any],
|
||||
*,
|
||||
ttl_seconds: int,
|
||||
now_monotonic: float | None = None,
|
||||
now_utc: datetime | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Store a fresh summary in Redis and process memory."""
|
||||
stored_at = now_utc or datetime.now(UTC)
|
||||
ttl_value = max(1, int(ttl_seconds))
|
||||
response = store_operator_summary(
|
||||
namespace,
|
||||
key_parts,
|
||||
value,
|
||||
ttl_seconds=ttl_value,
|
||||
now_monotonic=now_monotonic,
|
||||
now_utc=stored_at,
|
||||
)
|
||||
payload = {
|
||||
"value": deepcopy(value),
|
||||
"stored_at": stored_at.isoformat(),
|
||||
"stored_epoch": time.time(),
|
||||
"ttl_seconds": ttl_value,
|
||||
}
|
||||
try:
|
||||
from src.core.redis_client import get_redis
|
||||
|
||||
redis_client = get_redis()
|
||||
await redis_client.set(
|
||||
_redis_key(namespace, key_parts),
|
||||
json.dumps(payload, ensure_ascii=False, default=_json_default),
|
||||
ex=ttl_value,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return response
|
||||
|
||||
|
||||
def clear_operator_summary_cache() -> None:
|
||||
"""Clear process-local cache for tests and controlled operator refreshes."""
|
||||
_CACHE.clear()
|
||||
|
||||
@@ -61,8 +61,8 @@ from src.services.ollama_failover_manager import (
|
||||
from src.services.ollama_health_monitor import HealthReport, HealthStatus
|
||||
from src.services.operator_outcome import build_operator_outcome
|
||||
from src.services.operator_summary_cache import (
|
||||
get_cached_operator_summary,
|
||||
store_operator_summary,
|
||||
get_cached_operator_summary_async,
|
||||
store_operator_summary_async,
|
||||
)
|
||||
from src.services.run_state_machine import transition
|
||||
|
||||
@@ -337,7 +337,7 @@ async def list_callback_replies(
|
||||
"per_page": per_page,
|
||||
}
|
||||
if not refresh:
|
||||
cached_response = get_cached_operator_summary(
|
||||
cached_response = await get_cached_operator_summary_async(
|
||||
"callback_replies",
|
||||
cache_key,
|
||||
ttl_seconds=_CALLBACK_REPLY_CACHE_TTL_SECONDS,
|
||||
@@ -515,7 +515,7 @@ async def list_callback_replies(
|
||||
cache_status="miss",
|
||||
cache_ttl_seconds=_CALLBACK_REPLY_CACHE_TTL_SECONDS,
|
||||
)
|
||||
return store_operator_summary(
|
||||
return await store_operator_summary_async(
|
||||
"callback_replies",
|
||||
cache_key,
|
||||
response,
|
||||
|
||||
@@ -1,12 +1,33 @@
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import pytest
|
||||
|
||||
from src.services.operator_summary_cache import (
|
||||
clear_operator_summary_cache,
|
||||
get_cached_operator_summary_async,
|
||||
get_cached_operator_summary,
|
||||
store_operator_summary_async,
|
||||
store_operator_summary,
|
||||
)
|
||||
|
||||
|
||||
class _FakeRedis:
|
||||
def __init__(self) -> None:
|
||||
self.values: dict[str, str] = {}
|
||||
self.expirations: dict[str, int] = {}
|
||||
|
||||
async def get(self, key: str) -> str | None:
|
||||
return self.values.get(key)
|
||||
|
||||
async def set(self, key: str, value: str, ex: int) -> bool:
|
||||
self.values[key] = value
|
||||
self.expirations[key] = ex
|
||||
return True
|
||||
|
||||
async def delete(self, key: str) -> int:
|
||||
return 1 if self.values.pop(key, None) is not None else 0
|
||||
|
||||
|
||||
def test_operator_summary_cache_returns_copy_with_hit_metadata() -> None:
|
||||
clear_operator_summary_cache()
|
||||
key = {"project_id": "awoooi", "limit": 30}
|
||||
@@ -47,6 +68,38 @@ def test_operator_summary_cache_expires_by_ttl() -> None:
|
||||
now_utc=datetime(2026, 6, 1, tzinfo=UTC),
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_operator_summary_cache_uses_shared_redis(monkeypatch) -> None:
|
||||
clear_operator_summary_cache()
|
||||
fake_redis = _FakeRedis()
|
||||
monkeypatch.setattr("src.core.redis_client.get_redis", lambda: fake_redis)
|
||||
key = {"project_id": "awoooi", "per_page": 1}
|
||||
|
||||
stored = await store_operator_summary_async(
|
||||
"callback_replies",
|
||||
key,
|
||||
{"total": 4, "items": []},
|
||||
ttl_seconds=20,
|
||||
now_monotonic=300.0,
|
||||
now_utc=datetime(2026, 6, 1, tzinfo=UTC),
|
||||
)
|
||||
|
||||
assert stored["cache"]["status"] == "miss"
|
||||
assert fake_redis.values
|
||||
|
||||
clear_operator_summary_cache()
|
||||
cached = await get_cached_operator_summary_async(
|
||||
"callback_replies",
|
||||
key,
|
||||
ttl_seconds=20,
|
||||
now_monotonic=302.0,
|
||||
)
|
||||
|
||||
assert cached is not None
|
||||
assert cached["cache"]["status"] == "hit"
|
||||
assert cached["total"] == 4
|
||||
|
||||
assert (
|
||||
get_cached_operator_summary(
|
||||
"callback_replies",
|
||||
|
||||
Reference in New Issue
Block a user