From 159f514f5586677e489fc0378c68de4b9d2cd359 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 1 Jun 2026 09:20:18 +0800 Subject: [PATCH] fix(awooop): cache heavy operator summaries --- apps/api/src/api/v1/platform/operator_runs.py | 13 ++ apps/api/src/api/v1/platform/truth_chain.py | 2 + .../services/awooop_truth_chain_service.py | 52 +++++++- .../src/services/operator_summary_cache.py | 122 ++++++++++++++++++ .../src/services/platform_operator_service.py | 61 ++++++++- apps/api/tests/test_operator_summary_cache.py | 58 +++++++++ apps/web/messages/en.json | 3 + apps/web/messages/zh-TW.json | 3 + .../web/src/app/[locale]/awooop/runs/page.tsx | 97 +++++++++++--- 9 files changed, 381 insertions(+), 30 deletions(-) create mode 100644 apps/api/src/services/operator_summary_cache.py create mode 100644 apps/api/tests/test_operator_summary_cache.py diff --git a/apps/api/src/api/v1/platform/operator_runs.py b/apps/api/src/api/v1/platform/operator_runs.py index fe616164..a3fc412e 100644 --- a/apps/api/src/api/v1/platform/operator_runs.py +++ b/apps/api/src/api/v1/platform/operator_runs.py @@ -74,6 +74,16 @@ class ListRunsResponse(BaseModel): per_page: int +class OperatorSummaryCacheInfo(BaseModel): + schema_version: str = "operator_summary_cache_v1" + status: str + source: str + ttl_seconds: int + age_seconds: float = 0.0 + stored_at: datetime + expires_at: datetime + + class CallbackReplyItem(BaseModel): message_id: UUID run_id: UUID @@ -168,6 +178,7 @@ class ListCallbackRepliesResponse(BaseModel): page: int per_page: int summary: CallbackReplyAuditSummary | None = None + cache: OperatorSummaryCacheInfo | None = None class CicdEventItem(BaseModel): @@ -304,6 +315,7 @@ async def list_callback_replies( incident_id: str | None = Query(None, description="關聯 Incident ID filter(可選)"), page: int = Query(1, ge=1, description="頁碼,從 1 開始"), per_page: int = Query(20, ge=1, le=_MAX_PER_PAGE, description="每頁筆數"), + refresh: bool = Query(False, description="略過短 TTL 快取並重新聚合"), ) -> dict[str, Any]: return await list_callback_replies_svc( project_id=project_id, @@ -312,6 +324,7 @@ async def list_callback_replies( incident_id=incident_id, page=page, per_page=per_page, + refresh=refresh, ) diff --git a/apps/api/src/api/v1/platform/truth_chain.py b/apps/api/src/api/v1/platform/truth_chain.py index aba193ce..a6be498e 100644 --- a/apps/api/src/api/v1/platform/truth_chain.py +++ b/apps/api/src/api/v1/platform/truth_chain.py @@ -31,11 +31,13 @@ async def get_automation_quality_summary( project_id: str = Query("awoooi", description="租戶 ID"), hours: int = Query(24, ge=1, le=168, description="回看小時數"), limit: int = Query(200, ge=1, le=500, description="最多評估 incident 數"), + refresh: bool = Query(False, description="略過短 TTL 快取並重新聚合"), ) -> dict[str, Any]: summary = await fetch_automation_quality_summary( project_id=project_id, hours=hours, limit=limit, + refresh=refresh, ) summary["examples"] = [] summary["visibility_note"] = ( diff --git a/apps/api/src/services/awooop_truth_chain_service.py b/apps/api/src/services/awooop_truth_chain_service.py index 82bf3db9..7f7ea8d0 100644 --- a/apps/api/src/services/awooop_truth_chain_service.py +++ b/apps/api/src/services/awooop_truth_chain_service.py @@ -26,12 +26,19 @@ from src.services.operator_outcome import build_operator_outcome from src.services.awooop_ansible_check_mode_service import detect_ansible_transport_blockers from src.services.awooop_ansible_audit_service import build_ansible_truth from src.services.drift_repeat_state import build_drift_repeat_state +from src.services.operator_summary_cache import ( + get_cached_operator_summary, + store_operator_summary, +) logger = structlog.get_logger(__name__) _MAX_ROWS = 100 _JSON_TEXT_FIELDS = {"gate_result", "source_envelope"} _QUALITY_SUMMARY_CONCURRENCY = 8 +_QUALITY_SUMMARY_CACHE_TTL_SECONDS = int( + os.getenv("AWOOOP_QUALITY_SUMMARY_CACHE_TTL_SECONDS", "30") +) def _clean(value: Any) -> Any: @@ -1738,13 +1745,36 @@ async def fetch_automation_quality_summary( project_id: str = "awoooi", hours: int = 24, limit: int = 200, + refresh: bool = False, ) -> dict[str, Any]: """Return a recent incident-level quality summary for the automation flywheel.""" bounded_hours = max(1, min(int(hours), 168)) bounded_limit = max(1, min(int(limit), 500)) + normalized_project_id = project_id or "awoooi" + cache_key = { + "project_id": normalized_project_id, + "hours": bounded_hours, + "limit": bounded_limit, + } + if not refresh: + cached_summary = get_cached_operator_summary( + "truth_chain_quality_summary", + cache_key, + ttl_seconds=_QUALITY_SUMMARY_CACHE_TTL_SECONDS, + ) + if cached_summary is not None: + logger.info( + "awooop_automation_quality_summary_cache_hit", + project_id=normalized_project_id, + window_hours=bounded_hours, + limit=bounded_limit, + ttl_seconds=_QUALITY_SUMMARY_CACHE_TTL_SECONDS, + ) + return cached_summary + cutoff = datetime.now(UTC) - timedelta(hours=bounded_hours) - async with get_db_context(project_id) as db: + async with get_db_context(normalized_project_id) as db: incidents = await _fetch_all( db, """ @@ -1800,7 +1830,7 @@ async def fetch_automation_quality_summary( LIMIT :limit """, { - "project_id": project_id, + "project_id": normalized_project_id, "cutoff": cutoff, "limit": bounded_limit, }, @@ -1813,7 +1843,10 @@ async def fetch_automation_quality_summary( if not incident_id: return None async with semaphore: - truth_chain = await fetch_truth_chain(source_id=incident_id, project_id=project_id) + truth_chain = await fetch_truth_chain( + source_id=incident_id, + project_id=normalized_project_id, + ) return { "incident": truth_chain.get("incident") or incident, "truth_status": truth_chain.get("truth_status") or {}, @@ -1828,17 +1861,24 @@ async def fetch_automation_quality_summary( ] summary = summarize_automation_quality_records( - project_id=project_id, + project_id=normalized_project_id, window_hours=bounded_hours, records=records, limit=bounded_limit, ) logger.info( "awooop_automation_quality_summary_fetched", - project_id=project_id, + project_id=normalized_project_id, window_hours=bounded_hours, incident_total=summary["incident_total"], evaluated_total=summary["evaluated_total"], can_claim_full_auto_repair=summary["production_claim"]["can_claim_full_auto_repair"], + cache_status="miss", + cache_ttl_seconds=_QUALITY_SUMMARY_CACHE_TTL_SECONDS, + ) + return store_operator_summary( + "truth_chain_quality_summary", + cache_key, + summary, + ttl_seconds=_QUALITY_SUMMARY_CACHE_TTL_SECONDS, ) - return summary diff --git a/apps/api/src/services/operator_summary_cache.py b/apps/api/src/services/operator_summary_cache.py new file mode 100644 index 00000000..d8f76709 --- /dev/null +++ b/apps/api/src/services/operator_summary_cache.py @@ -0,0 +1,122 @@ +"""Short TTL cache for read-only AwoooP operator summaries. + +This cache intentionally lives in the API pod memory. It reduces repeated heavy +operator-console reads without becoming a new source of truth. +""" + +from __future__ import annotations + +import hashlib +import json +import time +from copy import deepcopy +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from typing import Any + + +_CACHE_SCHEMA_VERSION = "operator_summary_cache_v1" +_CACHE_SOURCE = "api_pod_memory" + + +@dataclass(slots=True) +class _CacheRecord: + value: dict[str, Any] + stored_at: datetime + stored_monotonic: float + ttl_seconds: int + + +_CACHE: dict[str, _CacheRecord] = {} + + +def _cache_key(namespace: str, key_parts: dict[str, Any]) -> str: + payload = json.dumps( + {"namespace": namespace, "key_parts": key_parts}, + ensure_ascii=False, + sort_keys=True, + default=str, + ) + digest = hashlib.sha256(payload.encode("utf-8")).hexdigest() + return f"{namespace}:{digest}" + + +def _cache_meta( + *, + status: str, + record: _CacheRecord, + age_seconds: float, +) -> dict[str, Any]: + ttl_seconds = max(1, int(record.ttl_seconds)) + expires_at = record.stored_at + timedelta(seconds=ttl_seconds) + return { + "schema_version": _CACHE_SCHEMA_VERSION, + "status": status, + "source": _CACHE_SOURCE, + "ttl_seconds": ttl_seconds, + "age_seconds": round(max(0.0, age_seconds), 3), + "stored_at": record.stored_at.isoformat(), + "expires_at": expires_at.isoformat(), + } + + +def _with_cache_meta(value: dict[str, Any], meta: dict[str, Any]) -> dict[str, Any]: + response = deepcopy(value) + response["cache"] = meta + return response + + +def get_cached_operator_summary( + namespace: str, + key_parts: dict[str, Any], + *, + ttl_seconds: int, + now_monotonic: float | None = None, +) -> dict[str, Any] | None: + """Return cached summary with hit metadata, or None if absent/expired.""" + cache_key = _cache_key(namespace, key_parts) + record = _CACHE.get(cache_key) + if record is None: + return None + + now_value = time.monotonic() if now_monotonic is None else now_monotonic + ttl_value = max(1, int(ttl_seconds)) + age_seconds = now_value - record.stored_monotonic + if age_seconds >= ttl_value: + _CACHE.pop(cache_key, None) + return None + + return _with_cache_meta( + record.value, + _cache_meta(status="hit", record=record, age_seconds=age_seconds), + ) + + +def store_operator_summary( + namespace: str, + key_parts: dict[str, Any], + value: dict[str, Any], + *, + ttl_seconds: int, + now_monotonic: float | None = None, + now_utc: datetime | None = None, +) -> dict[str, Any]: + """Store a fresh summary and return it with miss metadata.""" + cache_key = _cache_key(namespace, key_parts) + stored_at = now_utc or datetime.now(UTC) + record = _CacheRecord( + value=deepcopy(value), + stored_at=stored_at, + stored_monotonic=time.monotonic() if now_monotonic is None else now_monotonic, + ttl_seconds=max(1, int(ttl_seconds)), + ) + _CACHE[cache_key] = record + return _with_cache_meta( + record.value, + _cache_meta(status="miss", record=record, age_seconds=0.0), + ) + + +def clear_operator_summary_cache() -> None: + """Clear process-local cache for tests and controlled operator refreshes.""" + _CACHE.clear() diff --git a/apps/api/src/services/platform_operator_service.py b/apps/api/src/services/platform_operator_service.py index 5ac96d5b..f26a9b11 100644 --- a/apps/api/src/services/platform_operator_service.py +++ b/apps/api/src/services/platform_operator_service.py @@ -9,6 +9,7 @@ ADR-106(AwoooP Agent Platform) from __future__ import annotations import asyncio +import os import re import time import uuid @@ -59,6 +60,10 @@ from src.services.ollama_failover_manager import ( ) from src.services.ollama_health_monitor import HealthReport, HealthStatus from src.services.operator_outcome import build_operator_outcome +from src.services.operator_summary_cache import ( + get_cached_operator_summary, + store_operator_summary, +) from src.services.run_state_machine import transition logger = structlog.get_logger(__name__) @@ -74,6 +79,9 @@ _MAX_STEP_SUMMARY_CHARS = 128 _AI_ROUTE_STATUS_SELECT_TIMEOUT_SECONDS = 12.0 _AI_ROUTE_STATUS_CONNECTIVITY_TIMEOUT_SECONDS = 2.5 _REMEDIATION_HISTORY_LIMIT = 20 +_CALLBACK_REPLY_CACHE_TTL_SECONDS = int( + os.getenv("AWOOOP_CALLBACK_REPLY_CACHE_TTL_SECONDS", "20") +) _INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b") _REMEDIATION_STATUS_FILTERS = { "mcp_observed", @@ -304,11 +312,13 @@ async def list_callback_replies( incident_id: str | None, page: int, per_page: int, + refresh: bool = False, ) -> dict[str, Any]: """列出 Telegram detail/history callback reply evidence,不改 runtime 狀態。""" _validate_callback_reply_status_filter(callback_reply_status) callback_action = _validate_callback_reply_action_filter(action) _validate_incident_id_filter(incident_id) + normalized_project_id = project_id or "awoooi" if callback_reply_status == "no_callback": return { @@ -318,6 +328,33 @@ async def list_callback_replies( "per_page": per_page, } + cache_key = { + "project_id": project_id or "__all__", + "callback_reply_status": callback_reply_status or "", + "action": callback_action or "", + "incident_id": incident_id or "", + "page": page, + "per_page": per_page, + } + if not refresh: + cached_response = get_cached_operator_summary( + "callback_replies", + cache_key, + ttl_seconds=_CALLBACK_REPLY_CACHE_TTL_SECONDS, + ) + if cached_response is not None: + logger.info( + "operator_callback_replies_cache_hit", + project_id=normalized_project_id, + callback_reply_status=callback_reply_status, + action=callback_action, + incident_id=incident_id, + page=page, + per_page=per_page, + ttl_seconds=_CALLBACK_REPLY_CACHE_TTL_SECONDS, + ) + return cached_response + where_clauses = [ "m.source_envelope ? 'callback_reply'", ] @@ -399,14 +436,14 @@ async def list_callback_replies( LIMIT :limit OFFSET :offset """) - async with get_db_context(project_id or "awoooi") as db: + async with get_db_context(normalized_project_id) as db: count_result = await db.execute(count_sql, params) total = count_result.scalar_one() rows_result = await db.execute(list_sql, params) rows = list(rows_result.mappings().all()) audit_summary = await _fetch_callback_reply_audit_summary( db, - project_id=project_id or "awoooi", + project_id=normalized_project_id, ) items = [_callback_reply_event_item(row) for row in rows] @@ -459,13 +496,31 @@ async def list_callback_replies( km_completion_summary_cache[summary_cache_key] = km_summary item["km_stale_completion_summary"] = km_summary - return { + response = { "items": items, "total": total, "page": page, "per_page": per_page, "summary": audit_summary, } + logger.info( + "operator_callback_replies_fetched", + project_id=normalized_project_id, + callback_reply_status=callback_reply_status, + action=callback_action, + incident_id=incident_id, + page=page, + per_page=per_page, + total=total, + cache_status="miss", + cache_ttl_seconds=_CALLBACK_REPLY_CACHE_TTL_SECONDS, + ) + return store_operator_summary( + "callback_replies", + cache_key, + response, + ttl_seconds=_CALLBACK_REPLY_CACHE_TTL_SECONDS, + ) async def _fetch_callback_reply_audit_summary( diff --git a/apps/api/tests/test_operator_summary_cache.py b/apps/api/tests/test_operator_summary_cache.py new file mode 100644 index 00000000..83621450 --- /dev/null +++ b/apps/api/tests/test_operator_summary_cache.py @@ -0,0 +1,58 @@ +from datetime import UTC, datetime + +from src.services.operator_summary_cache import ( + clear_operator_summary_cache, + get_cached_operator_summary, + store_operator_summary, +) + + +def test_operator_summary_cache_returns_copy_with_hit_metadata() -> None: + clear_operator_summary_cache() + key = {"project_id": "awoooi", "limit": 30} + stored = store_operator_summary( + "truth_chain_quality_summary", + key, + {"evaluated_total": 30, "nested": {"ok": True}}, + ttl_seconds=30, + now_monotonic=100.0, + now_utc=datetime(2026, 6, 1, tzinfo=UTC), + ) + + assert stored["cache"]["status"] == "miss" + stored["nested"]["ok"] = False + + cached = get_cached_operator_summary( + "truth_chain_quality_summary", + key, + ttl_seconds=30, + now_monotonic=105.5, + ) + + assert cached is not None + assert cached["cache"]["status"] == "hit" + assert cached["cache"]["age_seconds"] == 5.5 + assert cached["nested"]["ok"] is True + + +def test_operator_summary_cache_expires_by_ttl() -> None: + clear_operator_summary_cache() + key = {"project_id": "awoooi", "page": 1} + store_operator_summary( + "callback_replies", + key, + {"total": 4}, + ttl_seconds=20, + now_monotonic=200.0, + now_utc=datetime(2026, 6, 1, tzinfo=UTC), + ) + + assert ( + get_cached_operator_summary( + "callback_replies", + key, + ttl_seconds=20, + now_monotonic=220.0, + ) + is None + ) diff --git a/apps/web/messages/en.json b/apps/web/messages/en.json index 1f45b3e2..ed62458f 100644 --- a/apps/web/messages/en.json +++ b/apps/web/messages/en.json @@ -3699,6 +3699,9 @@ "title": "TG Callback Evidence", "subtitle": "詳情 / 歷史回覆證據來自 AwoooP outbound mirror", "total": "{count} 筆", + "loading": "正在同步 callback evidence;尚未回來前不判定為空資料。", + "cacheHit": "快取命中 {age}s / TTL {ttl}s", + "cacheMiss": "剛重新聚合 / TTL {ttl}s", "empty": "目前尚無 callback reply evidence。", "error": "Callback evidence 載入失敗:{error}", "summary": { diff --git a/apps/web/messages/zh-TW.json b/apps/web/messages/zh-TW.json index 1f45b3e2..ed62458f 100644 --- a/apps/web/messages/zh-TW.json +++ b/apps/web/messages/zh-TW.json @@ -3699,6 +3699,9 @@ "title": "TG Callback Evidence", "subtitle": "詳情 / 歷史回覆證據來自 AwoooP outbound mirror", "total": "{count} 筆", + "loading": "正在同步 callback evidence;尚未回來前不判定為空資料。", + "cacheHit": "快取命中 {age}s / TTL {ttl}s", + "cacheMiss": "剛重新聚合 / TTL {ttl}s", "empty": "目前尚無 callback reply evidence。", "error": "Callback evidence 載入失敗:{error}", "summary": { diff --git a/apps/web/src/app/[locale]/awooop/runs/page.tsx b/apps/web/src/app/[locale]/awooop/runs/page.tsx index 6892d20d..1f13bcca 100644 --- a/apps/web/src/app/[locale]/awooop/runs/page.tsx +++ b/apps/web/src/app/[locale]/awooop/runs/page.tsx @@ -203,6 +203,16 @@ interface CallbackReplyAuditSummary { latest_callback_at?: string | null; } +interface OperatorSummaryCacheInfo { + schema_version?: string; + status?: "hit" | "miss" | string; + source?: string; + ttl_seconds?: number; + age_seconds?: number; + stored_at?: string; + expires_at?: string; +} + interface Run { run_id: string; project_id: string; @@ -471,6 +481,7 @@ interface CallbackRepliesResponse { page: number; per_page: number; summary?: CallbackReplyAuditSummary | null; + cache?: OperatorSummaryCacheInfo | null; } interface AiRoutePolicyItem { @@ -2769,15 +2780,25 @@ function CallbackReplyEvidencePanel({ events, total, summary, + cache, + loading, error, }: { events: CallbackReplyEvent[]; total: number; summary?: CallbackReplyAuditSummary | null; + cache?: OperatorSummaryCacheInfo | null; + loading: boolean; error: string | null; }) { const t = useTranslations("awooop.callbackReply.events"); const tCallback = useTranslations("awooop.callbackReply"); + const cacheAge = Math.max(0, Math.round(cache?.age_seconds ?? 0)); + const cacheLabel = cache + ? cache.status === "hit" + ? t("cacheHit", { age: cacheAge, ttl: cache.ttl_seconds ?? 0 }) + : t("cacheMiss", { age: cacheAge, ttl: cache.ttl_seconds ?? 0 }) + : null; return (
@@ -2789,9 +2810,16 @@ function CallbackReplyEvidencePanel({

{t("subtitle")}

- - {t("total", { count: total })} - +
+ {cacheLabel ? ( + + {cacheLabel} + + ) : null} + + {t("total", { count: total })} + +
@@ -2800,11 +2828,7 @@ function CallbackReplyEvidencePanel({
{t("error", { error })}
- ) : events.length === 0 ? ( -
- {t("empty")} -
- ) : ( + ) : events.length > 0 ? (
{events.slice(0, 6).map((event) => { const status = normalizeCallbackReplyEventStatus(event.status); @@ -2882,6 +2906,14 @@ function CallbackReplyEvidencePanel({ ); })}
+ ) : loading ? ( +
+ {t("loading")} +
+ ) : ( +
+ {t("empty")} +
)}
); @@ -3302,6 +3334,8 @@ export default function RunsPage() { const [callbackEvents, setCallbackEvents] = useState([]); const [callbackEventsTotal, setCallbackEventsTotal] = useState(0); const [callbackAuditSummary, setCallbackAuditSummary] = useState(null); + const [callbackEventsLoading, setCallbackEventsLoading] = useState(true); + const [callbackCacheInfo, setCallbackCacheInfo] = useState(null); const [callbackEventsError, setCallbackEventsError] = useState(null); const [aiRouteStatus, setAiRouteStatus] = useState(null); const [aiRouteStatusError, setAiRouteStatusError] = useState(null); @@ -3346,7 +3380,7 @@ export default function RunsPage() { .catch(() => {}); }, []); - const fetchRuns = useCallback(async () => { + const fetchRuns = useCallback(async (options?: { refresh?: boolean }) => { try { setError(null); const params = new URLSearchParams(); @@ -3423,20 +3457,38 @@ export default function RunsPage() { if (INCIDENT_ID_FILTER_RE.test(normalizedIncidentFilter)) { callbackParams.set("incident_id", normalizedIncidentFilter); } - const callbackRes = await fetch( - `${API_BASE}/api/v1/platform/runs/callback-replies?${callbackParams.toString()}` - ); - if (callbackRes.ok) { - const callbackData: CallbackRepliesResponse = await callbackRes.json(); - setCallbackEvents(Array.isArray(callbackData.items) ? callbackData.items : []); - setCallbackEventsTotal(callbackData.total ?? 0); - setCallbackAuditSummary(callbackData.summary ?? null); - setCallbackEventsError(null); - } else { + if (options?.refresh) { + callbackParams.set("refresh", "true"); + } + setCallbackEventsLoading(true); + try { + const callbackRes = await fetch( + `${API_BASE}/api/v1/platform/runs/callback-replies?${callbackParams.toString()}` + ); + if (callbackRes.ok) { + const callbackData: CallbackRepliesResponse = await callbackRes.json(); + setCallbackEvents(Array.isArray(callbackData.items) ? callbackData.items : []); + setCallbackEventsTotal(callbackData.total ?? 0); + setCallbackAuditSummary(callbackData.summary ?? null); + setCallbackCacheInfo(callbackData.cache ?? null); + setCallbackEventsError(null); + } else { + setCallbackEvents([]); + setCallbackEventsTotal(0); + setCallbackAuditSummary(null); + setCallbackCacheInfo(null); + setCallbackEventsError(`HTTP ${callbackRes.status}`); + } + } catch (callbackError) { setCallbackEvents([]); setCallbackEventsTotal(0); setCallbackAuditSummary(null); - setCallbackEventsError(`HTTP ${callbackRes.status}`); + setCallbackCacheInfo(null); + setCallbackEventsError( + callbackError instanceof Error ? callbackError.message : "callback evidence failed" + ); + } finally { + setCallbackEventsLoading(false); } try { @@ -3461,6 +3513,7 @@ export default function RunsPage() { setLastRefresh(new Date()); } catch (err) { setError(err instanceof Error ? err.message : "載入失敗"); + setCallbackEventsLoading(false); } finally { setLoading(false); } @@ -3555,7 +3608,7 @@ export default function RunsPage() {
每 30 秒自動刷新