fix(api): share operator summary cache through redis
Some checks failed
CD Pipeline / tests (push) Successful in 1m20s
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / build-and-deploy (push) Successful in 3m35s
CD Pipeline / post-deploy-checks (push) Failing after 20s

This commit is contained in:
Your Name
2026-06-01 09:38:08 +08:00
parent 8938706062
commit d4483e730e
4 changed files with 177 additions and 8 deletions

View File

@@ -27,8 +27,8 @@ from src.services.awooop_ansible_check_mode_service import detect_ansible_transp
from src.services.awooop_ansible_audit_service import build_ansible_truth
from src.services.drift_repeat_state import build_drift_repeat_state
from src.services.operator_summary_cache import (
get_cached_operator_summary,
store_operator_summary,
get_cached_operator_summary_async,
store_operator_summary_async,
)
logger = structlog.get_logger(__name__)
@@ -1757,7 +1757,7 @@ async def fetch_automation_quality_summary(
"limit": bounded_limit,
}
if not refresh:
cached_summary = get_cached_operator_summary(
cached_summary = await get_cached_operator_summary_async(
"truth_chain_quality_summary",
cache_key,
ttl_seconds=_QUALITY_SUMMARY_CACHE_TTL_SECONDS,
@@ -1876,7 +1876,7 @@ async def fetch_automation_quality_summary(
cache_status="miss",
cache_ttl_seconds=_QUALITY_SUMMARY_CACHE_TTL_SECONDS,
)
return store_operator_summary(
return await store_operator_summary_async(
"truth_chain_quality_summary",
cache_key,
summary,

View File

@@ -66,6 +66,30 @@ def _with_cache_meta(value: dict[str, Any], meta: dict[str, Any]) -> dict[str, A
return response
def _redis_key(namespace: str, key_parts: dict[str, Any]) -> str:
return f"awooop:operator_summary:{_cache_key(namespace, key_parts)}"
def _json_default(value: Any) -> str:
if isinstance(value, datetime):
return value.isoformat()
return str(value)
def _decode_redis_value(raw: Any) -> dict[str, Any] | None:
if raw is None:
return None
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
if not isinstance(raw, str):
return None
try:
payload = json.loads(raw)
except json.JSONDecodeError:
return None
return payload if isinstance(payload, dict) else None
def get_cached_operator_summary(
namespace: str,
key_parts: dict[str, Any],
@@ -92,6 +116,58 @@ def get_cached_operator_summary(
)
async def get_cached_operator_summary_async(
namespace: str,
key_parts: dict[str, Any],
*,
ttl_seconds: int,
now_monotonic: float | None = None,
) -> dict[str, Any] | None:
"""Return a shared Redis cache hit, falling back to process memory."""
redis_key = _redis_key(namespace, key_parts)
try:
from src.core.redis_client import get_redis
redis_client = get_redis()
payload = _decode_redis_value(await redis_client.get(redis_key))
if payload is not None and isinstance(payload.get("value"), dict):
stored_epoch = float(payload.get("stored_epoch") or 0.0)
now_epoch = time.time()
ttl_value = max(1, int(ttl_seconds))
age_seconds = now_epoch - stored_epoch
if 0 <= age_seconds < ttl_value:
stored_at_raw = payload.get("stored_at")
try:
stored_at = datetime.fromisoformat(str(stored_at_raw))
except ValueError:
stored_at = datetime.now(UTC) - timedelta(seconds=age_seconds)
record = _CacheRecord(
value=payload["value"],
stored_at=stored_at,
stored_monotonic=(now_monotonic or time.monotonic())
- age_seconds,
ttl_seconds=ttl_value,
)
return _with_cache_meta(
record.value,
_cache_meta(
status="hit",
record=record,
age_seconds=age_seconds,
),
)
await redis_client.delete(redis_key)
except Exception:
pass
return get_cached_operator_summary(
namespace,
key_parts,
ttl_seconds=ttl_seconds,
now_monotonic=now_monotonic,
)
def store_operator_summary(
namespace: str,
key_parts: dict[str, Any],
@@ -117,6 +193,46 @@ def store_operator_summary(
)
async def store_operator_summary_async(
namespace: str,
key_parts: dict[str, Any],
value: dict[str, Any],
*,
ttl_seconds: int,
now_monotonic: float | None = None,
now_utc: datetime | None = None,
) -> dict[str, Any]:
"""Store a fresh summary in Redis and process memory."""
stored_at = now_utc or datetime.now(UTC)
ttl_value = max(1, int(ttl_seconds))
response = store_operator_summary(
namespace,
key_parts,
value,
ttl_seconds=ttl_value,
now_monotonic=now_monotonic,
now_utc=stored_at,
)
payload = {
"value": deepcopy(value),
"stored_at": stored_at.isoformat(),
"stored_epoch": time.time(),
"ttl_seconds": ttl_value,
}
try:
from src.core.redis_client import get_redis
redis_client = get_redis()
await redis_client.set(
_redis_key(namespace, key_parts),
json.dumps(payload, ensure_ascii=False, default=_json_default),
ex=ttl_value,
)
except Exception:
pass
return response
def clear_operator_summary_cache() -> None:
"""Clear process-local cache for tests and controlled operator refreshes."""
_CACHE.clear()

View File

@@ -61,8 +61,8 @@ from src.services.ollama_failover_manager import (
from src.services.ollama_health_monitor import HealthReport, HealthStatus
from src.services.operator_outcome import build_operator_outcome
from src.services.operator_summary_cache import (
get_cached_operator_summary,
store_operator_summary,
get_cached_operator_summary_async,
store_operator_summary_async,
)
from src.services.run_state_machine import transition
@@ -337,7 +337,7 @@ async def list_callback_replies(
"per_page": per_page,
}
if not refresh:
cached_response = get_cached_operator_summary(
cached_response = await get_cached_operator_summary_async(
"callback_replies",
cache_key,
ttl_seconds=_CALLBACK_REPLY_CACHE_TTL_SECONDS,
@@ -515,7 +515,7 @@ async def list_callback_replies(
cache_status="miss",
cache_ttl_seconds=_CALLBACK_REPLY_CACHE_TTL_SECONDS,
)
return store_operator_summary(
return await store_operator_summary_async(
"callback_replies",
cache_key,
response,