diff --git a/apps/api/src/api/v1/platform/operator_runs.py b/apps/api/src/api/v1/platform/operator_runs.py index 60972270..6bc03af8 100644 --- a/apps/api/src/api/v1/platform/operator_runs.py +++ b/apps/api/src/api/v1/platform/operator_runs.py @@ -95,6 +95,7 @@ class CallbackReplyItem(BaseModel): run_created_at: datetime | None = None callback_reply: dict[str, Any] awooop_status_chain: dict[str, Any] | None = None + km_stale_completion_summary: dict[str, Any] | None = None run_detail_href: str | None = None diff --git a/apps/api/src/services/platform_operator_service.py b/apps/api/src/services/platform_operator_service.py index 4b5db7d3..83d58484 100644 --- a/apps/api/src/services/platform_operator_service.py +++ b/apps/api/src/services/platform_operator_service.py @@ -42,6 +42,9 @@ from src.services.awooop_truth_chain_service import ( _summarize_mcp, fetch_truth_chain, ) +from src.services.governance_km_stale_review_service import ( + query_km_stale_owner_review_completion_queue, +) from src.services.ollama_endpoint_resolver import ( OllamaEndpointSelection, OllamaWorkloadType, @@ -100,6 +103,9 @@ _SOURCE_CORRELATION_PROVIDERS = ("sentry", "signoz") _SOURCE_CORRELATION_EVENT_LIMIT = 200 _SOURCE_CORRELATION_LOOKBACK_DAYS = 7 _SOURCE_CORRELATION_PRE_WINDOW_HOURS = 2 +_KM_STALE_COMPLETION_CALLBACK_SCHEMA_VERSION = ( + "km_stale_owner_review_completion_callback_summary_v1" +) # ============================================================================= # Tenants @@ -387,32 +393,53 @@ async def list_callback_replies( items = [_callback_reply_event_item(row) for row in rows] status_chain_cache: dict[tuple[str, str], dict[str, Any]] = {} + km_completion_queue_cache: dict[str, Any] = {} + km_completion_summary_cache: dict[tuple[str, str | None], dict[str, Any]] = {} for item in items: incident = item.get("incident_id") + item_project_id = str(item.get("project_id") or project_id or "awoooi") if not incident: item["awooop_status_chain"] = _build_awooop_status_chain( incident_ids=[], source_id=None, ) + item["km_stale_completion_summary"] = ( + _empty_km_stale_completion_summary( + project_id=item_project_id, + incident_id=None, + status_value="no_incident", + reason="callback_reply_missing_incident_id", + ) + ) continue incident_id = str(incident) - item_project_id = str(item.get("project_id") or project_id or "awoooi") cache_key = (item_project_id, incident_id) cached = status_chain_cache.get(cache_key) if cached is not None: item["awooop_status_chain"] = cached - continue - remediation_history = await _fetch_run_remediation_history( - [incident_id], - limit=5, - ) - chain = await _fetch_awooop_status_chain( - incident_ids=[incident_id], - project_id=item_project_id, - remediation_history=remediation_history, - ) - status_chain_cache[cache_key] = chain - item["awooop_status_chain"] = chain + else: + remediation_history = await _fetch_run_remediation_history( + [incident_id], + limit=5, + ) + chain = await _fetch_awooop_status_chain( + incident_ids=[incident_id], + project_id=item_project_id, + remediation_history=remediation_history, + ) + status_chain_cache[cache_key] = chain + item["awooop_status_chain"] = chain + + summary_cache_key = (item_project_id, incident_id) + summary = km_completion_summary_cache.get(summary_cache_key) + if summary is None: + summary = await _fetch_km_stale_completion_summary_for_incident( + project_id=item_project_id, + incident_id=incident_id, + queue_cache=km_completion_queue_cache, + ) + km_completion_summary_cache[summary_cache_key] = summary + item["km_stale_completion_summary"] = summary return { "items": items, @@ -422,6 +449,54 @@ async def list_callback_replies( } +async def _fetch_km_stale_completion_summary_for_incident( + *, + project_id: str, + incident_id: str | None, + queue_cache: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Fetch read-only KM owner-review completion context for callback evidence.""" + normalized_project_id = project_id or "awoooi" + normalized_incident_id = str(incident_id or "").strip() or None + if not normalized_incident_id: + return _empty_km_stale_completion_summary( + project_id=normalized_project_id, + incident_id=None, + status_value="no_incident", + reason="callback_reply_missing_incident_id", + ) + + cache = queue_cache if queue_cache is not None else {} + queue = cache.get(normalized_project_id) + if queue is None: + try: + queue = await query_km_stale_owner_review_completion_queue( + project_id=normalized_project_id, + status_bucket="all", + limit=100, + ) + except Exception as exc: + logger.warning( + "operator_km_stale_completion_summary_fetch_failed", + project_id=normalized_project_id, + incident_id=normalized_incident_id, + error=str(exc), + ) + return _empty_km_stale_completion_summary( + project_id=normalized_project_id, + incident_id=normalized_incident_id, + status_value="fetch_failed", + reason="km_stale_completion_queue_fetch_failed", + ) + cache[normalized_project_id] = queue + + return _build_km_stale_completion_summary( + queue=queue, + project_id=normalized_project_id, + incident_id=normalized_incident_id, + ) + + async def list_cicd_events( *, project_id: str | None, @@ -985,6 +1060,106 @@ def _callback_reply_event_item(row: Mapping[str, Any]) -> dict[str, Any]: } +def _empty_km_stale_completion_summary( + *, + project_id: str, + incident_id: str | None, + status_value: str, + reason: str | None = None, +) -> dict[str, Any]: + """Build the nullable KM owner-review summary shape for callback evidence.""" + return { + "schema_version": _KM_STALE_COMPLETION_CALLBACK_SCHEMA_VERSION, + "project_id": project_id, + "incident_id": incident_id, + "status": status_value, + "missing_reason": reason, + "total": 0, + "returned": 0, + "pending_count": 0, + "ready_count": 0, + "blocked_count": 0, + "completed_count": 0, + "failed_count": 0, + "writes_on_read": False, + "manual_review_required": True, + "batch_writes_allowed": False, + "items_truncated": False, + "related_total": 0, + "related_items": [], + } + + +def _object_field(payload: Any, name: str, default: Any = None) -> Any: + if isinstance(payload, Mapping): + return payload.get(name, default) + return getattr(payload, name, default) + + +def _object_int_field(payload: Any, name: str) -> int: + try: + return int(_object_field(payload, name, 0) or 0) + except (TypeError, ValueError): + return 0 + + +def _build_km_stale_completion_summary( + *, + queue: Any, + project_id: str, + incident_id: str, +) -> dict[str, Any]: + """Summarize KM owner-review completion queue state for one incident.""" + related_items: list[dict[str, Any]] = [] + for item in list(_object_field(queue, "items", []) or []): + if str(_object_field(item, "related_incident_id") or "").strip() != incident_id: + continue + related_items.append({ + "entry_id": _object_field(item, "entry_id"), + "title": _object_field(item, "title"), + "dispatch_id": _object_field(item, "dispatch_id"), + "governance_event_id": _object_field(item, "governance_event_id"), + "readiness": _object_field(item, "readiness"), + "workflow_stage": _object_field(item, "workflow_stage"), + "next_action": _object_field(item, "next_action"), + "priority_tier": _object_field(item, "priority_tier"), + "recommended_completion_outcome": _object_field( + item, + "recommended_completion_outcome", + ), + "can_preview": bool(_object_field(item, "can_preview", False)), + }) + + total = _object_int_field(queue, "total") + returned = _object_int_field(queue, "returned") + return { + "schema_version": _KM_STALE_COMPLETION_CALLBACK_SCHEMA_VERSION, + "project_id": project_id, + "incident_id": incident_id, + "status": "matched_owner_review" + if related_items + else "no_related_owner_review", + "missing_reason": None if related_items else "no_matching_completion_item", + "total": total, + "returned": returned, + "pending_count": _object_int_field(queue, "pending_count"), + "ready_count": _object_int_field(queue, "ready_count"), + "blocked_count": _object_int_field(queue, "blocked_count"), + "completed_count": _object_int_field(queue, "completed_count"), + "failed_count": _object_int_field(queue, "failed_count"), + "writes_on_read": bool(_object_field(queue, "writes_on_read", False)), + "manual_review_required": bool( + _object_field(queue, "manual_review_required", True) + ), + "batch_writes_allowed": bool( + _object_field(queue, "batch_writes_allowed", False) + ), + "items_truncated": total > returned, + "related_total": len(related_items), + "related_items": related_items[:3], + } + + def _outbound_timeline_status( send_status: str, callback_reply: dict[str, Any] | None, diff --git a/apps/api/tests/test_awooop_operator_timeline_labels.py b/apps/api/tests/test_awooop_operator_timeline_labels.py index bdd47dc1..32fc5a95 100644 --- a/apps/api/tests/test_awooop_operator_timeline_labels.py +++ b/apps/api/tests/test_awooop_operator_timeline_labels.py @@ -421,6 +421,28 @@ def test_list_callback_replies_response_preserves_callback_evidence() -> None: "repair_state": "read_only_dry_run", "needs_human": True, }, + "km_stale_completion_summary": { + "schema_version": ( + "km_stale_owner_review_completion_callback_summary_v1" + ), + "project_id": "awoooi", + "incident_id": "INC-20260513-79ED5E", + "status": "matched_owner_review", + "ready_count": 3, + "blocked_count": 1, + "completed_count": 2, + "failed_count": 0, + "batch_writes_allowed": False, + "manual_review_required": True, + "related_total": 1, + "related_items": [ + { + "entry_id": "km-1", + "readiness": "ready", + "next_action": "preview_stale_km_review_completion", + } + ], + }, "run_detail_href": ( "/awooop/runs/5c0306e0-591a-5445-9a33-80f499426b38" "?project_id=awoooi" @@ -438,9 +460,77 @@ def test_list_callback_replies_response_preserves_callback_evidence() -> None: assert dumped["items"][0]["awooop_status_chain"]["repair_state"] == ( "read_only_dry_run" ) + assert dumped["items"][0]["km_stale_completion_summary"]["ready_count"] == 3 + assert dumped["items"][0]["km_stale_completion_summary"]["related_total"] == 1 assert dumped["items"][0]["run_detail_href"].endswith("project_id=awoooi") +@pytest.mark.asyncio +async def test_km_stale_completion_summary_matches_callback_incident( + monkeypatch, +) -> None: + async def fake_query_km_stale_completion_queue(**kwargs): + assert kwargs["project_id"] == "awoooi" + assert kwargs["status_bucket"] == "all" + assert kwargs["limit"] == 100 + return SimpleNamespace( + project_id="awoooi", + total=2, + returned=2, + pending_count=2, + ready_count=1, + blocked_count=1, + completed_count=0, + failed_count=0, + writes_on_read=False, + manual_review_required=True, + batch_writes_allowed=False, + items=[ + SimpleNamespace( + entry_id="km-1", + title="Bitan pharmacy status drift", + dispatch_id="dispatch-1", + governance_event_id="event-1", + readiness="ready", + workflow_stage="waiting_owner_review", + next_action="preview_stale_km_review_completion", + priority_tier="P0", + recommended_completion_outcome="refresh_with_evidence", + can_preview=True, + related_incident_id="INC-20260513-79ED5E", + ), + SimpleNamespace( + entry_id="km-2", + title="Other stale KM", + related_incident_id="INC-20260513-OTHER", + readiness="blocked", + ), + ], + ) + + monkeypatch.setattr( + platform_operator_service, + "query_km_stale_owner_review_completion_queue", + fake_query_km_stale_completion_queue, + ) + + summary = await platform_operator_service._fetch_km_stale_completion_summary_for_incident( + project_id="awoooi", + incident_id="INC-20260513-79ED5E", + queue_cache={}, + ) + + assert summary["schema_version"] == ( + "km_stale_owner_review_completion_callback_summary_v1" + ) + assert summary["status"] == "matched_owner_review" + assert summary["ready_count"] == 1 + assert summary["blocked_count"] == 1 + assert summary["batch_writes_allowed"] is False + assert summary["related_items"][0]["entry_id"] == "km-1" + assert summary["related_items"][0]["can_preview"] is True + + def test_list_approvals_response_preserves_status_chain() -> None: run_id = UUID("5c0306e0-591a-5445-9a33-80f499426b38") response = ListApprovalsResponse.model_validate({ diff --git a/apps/web/messages/en.json b/apps/web/messages/en.json index 385f307f..ebc3e4f3 100644 --- a/apps/web/messages/en.json +++ b/apps/web/messages/en.json @@ -2782,7 +2782,23 @@ "sendStatus": "Send status: {status}", "providerMessage": "Message: {messageId}", "previewEmpty": "No preview", - "openRun": "Open Run" + "openRun": "Open Run", + "kmCompletion": { + "title": "KM Owner Review", + "status": "Status: {status}", + "counts": "ready {ready} / blocked {blocked} / completed {completed} / failed {failed}", + "guardrail": "Guardrail: writes_on_read={writesOnRead}; batch_writes_allowed={batchWrite}; manual_review_required={manualReview}", + "related": "{entryId} · {readiness} · {nextAction}", + "noRelated": "This incident has no matching owner-review completion item yet.", + "fetchFailed": "KM owner-review summary failed to load: {reason}", + "statuses": { + "matched_owner_review": "Matched owner review", + "no_related_owner_review": "No matched owner review", + "fetch_failed": "Fetch failed", + "no_incident": "Missing incident", + "observed": "Recorded" + } + } } }, "aiRouteStatus": { diff --git a/apps/web/messages/zh-TW.json b/apps/web/messages/zh-TW.json index 24a15356..9729c6c3 100644 --- a/apps/web/messages/zh-TW.json +++ b/apps/web/messages/zh-TW.json @@ -2783,7 +2783,23 @@ "sendStatus": "送訊狀態:{status}", "providerMessage": "Message:{messageId}", "previewEmpty": "無摘要", - "openRun": "開啟 Run" + "openRun": "開啟 Run", + "kmCompletion": { + "title": "KM Owner Review", + "status": "狀態:{status}", + "counts": "ready {ready} / blocked {blocked} / completed {completed} / failed {failed}", + "guardrail": "Guardrail:writes_on_read={writesOnRead};batch_writes_allowed={batchWrite};manual_review_required={manualReview}", + "related": "{entryId} · {readiness} · {nextAction}", + "noRelated": "本 Incident 尚未對到 owner-review completion item。", + "fetchFailed": "KM owner-review 摘要讀取失敗:{reason}", + "statuses": { + "matched_owner_review": "已匹配 owner review", + "no_related_owner_review": "未匹配 owner review", + "fetch_failed": "讀取失敗", + "no_incident": "缺少 Incident", + "observed": "已記錄" + } + } } }, "aiRouteStatus": { diff --git a/apps/web/src/app/[locale]/awooop/runs/page.tsx b/apps/web/src/app/[locale]/awooop/runs/page.tsx index 0fff4fd1..fb51b2b5 100644 --- a/apps/web/src/app/[locale]/awooop/runs/page.tsx +++ b/apps/web/src/app/[locale]/awooop/runs/page.tsx @@ -281,6 +281,40 @@ interface EventRecurrenceResponse { items: EventRecurrenceItem[]; } +interface KmStaleCompletionSummaryItem { + entry_id?: string | null; + title?: string | null; + dispatch_id?: string | null; + governance_event_id?: string | null; + readiness?: string | null; + workflow_stage?: string | null; + next_action?: string | null; + priority_tier?: string | null; + recommended_completion_outcome?: string | null; + can_preview?: boolean | null; +} + +interface KmStaleCompletionSummary { + schema_version?: string; + project_id?: string | null; + incident_id?: string | null; + status?: string | null; + missing_reason?: string | null; + total?: number; + returned?: number; + pending_count?: number; + ready_count?: number; + blocked_count?: number; + completed_count?: number; + failed_count?: number; + writes_on_read?: boolean | null; + manual_review_required?: boolean | null; + batch_writes_allowed?: boolean | null; + items_truncated?: boolean | null; + related_total?: number; + related_items?: KmStaleCompletionSummaryItem[]; +} + interface CallbackReplyEvent { message_id: string; run_id: string; @@ -301,6 +335,7 @@ interface CallbackReplyEvent { agent_id?: string | null; run_detail_href?: string | null; awooop_status_chain?: AwoooPStatusChain | null; + km_stale_completion_summary?: KmStaleCompletionSummary | null; } interface CallbackRepliesResponse { @@ -660,6 +695,18 @@ function normalizeCallbackReplyEventStatus(statusValue?: string | null): Callbac return "observed"; } +function normalizeKmCompletionStatus(statusValue?: string | null) { + if ( + statusValue === "matched_owner_review" || + statusValue === "no_related_owner_review" || + statusValue === "fetch_failed" || + statusValue === "no_incident" + ) { + return statusValue; + } + return "observed"; +} + function RemediationEvidenceCell({ summary }: { summary?: RemediationSummary | null }) { const t = useTranslations("awooop.listEvidence"); const status = normalizeRemediationStatus(summary); @@ -1409,6 +1456,66 @@ function GroupedAlertEventsPanel({ events }: { events: PlatformEvent[] }) { ); } +function CallbackKmCompletionSummary({ + summary, +}: { + summary?: KmStaleCompletionSummary | null; +}) { + const t = useTranslations("awooop.callbackReply.events.kmCompletion"); + if (!summary) return null; + + const statusKey = normalizeKmCompletionStatus(summary.status); + const related = Array.isArray(summary.related_items) + ? summary.related_items[0] + : null; + + return ( +
+
+
+
+

+ {t("status", { + status: t(`statuses.${statusKey}` as never), + })} +

+

+ {t("counts", { + ready: summary.ready_count ?? 0, + blocked: summary.blocked_count ?? 0, + completed: summary.completed_count ?? 0, + failed: summary.failed_count ?? 0, + })} +

+

+ {t("guardrail", { + writesOnRead: String(summary.writes_on_read ?? false), + batchWrite: String(summary.batch_writes_allowed ?? false), + manualReview: String(summary.manual_review_required ?? true), + })} +

+ {related ? ( +

+ {t("related", { + entryId: related.entry_id ?? "--", + readiness: related.readiness ?? "--", + nextAction: related.next_action ?? "--", + })} +

+ ) : statusKey === "fetch_failed" ? ( +

+ {t("fetchFailed", { reason: summary.missing_reason ?? "--" })} +

+ ) : ( +

{t("noRelated")}

+ )} +
+
+ ); +} + function CallbackReplyEvidencePanel({ events, total, @@ -1499,6 +1606,9 @@ function CallbackReplyEvidencePanel({ compact className="mt-3" /> +