From 08a75f4b5a437ead23bff15b5ad141ab19331d49 Mon Sep 17 00:00:00 2001 From: Your Name Date: Mon, 18 May 2026 16:17:05 +0800 Subject: [PATCH] feat(awooop): search callback reply evidence --- apps/api/src/api/v1/platform/operator_runs.py | 63 ++++++ .../src/services/platform_operator_service.py | 188 +++++++++++++++++- .../test_awooop_operator_timeline_labels.py | 104 +++++++++- apps/web/messages/en.json | 13 ++ apps/web/messages/zh-TW.json | 13 ++ .../web/src/app/[locale]/awooop/runs/page.tsx | 174 ++++++++++++++++ 6 files changed, 548 insertions(+), 7 deletions(-) diff --git a/apps/api/src/api/v1/platform/operator_runs.py b/apps/api/src/api/v1/platform/operator_runs.py index 2028ce49..3c536973 100644 --- a/apps/api/src/api/v1/platform/operator_runs.py +++ b/apps/api/src/api/v1/platform/operator_runs.py @@ -31,6 +31,9 @@ from src.services.platform_operator_service import ( from src.services.platform_operator_service import ( list_approvals as list_approvals_svc, ) +from src.services.platform_operator_service import ( + list_callback_replies as list_callback_replies_svc, +) from src.services.platform_operator_service import ( list_runs as list_runs_svc, ) @@ -62,6 +65,36 @@ class ListRunsResponse(BaseModel): per_page: int +class CallbackReplyItem(BaseModel): + message_id: UUID + run_id: UUID + project_id: str + status: str + needs_human: bool + action: str | None = None + incident_id: str | None = None + event_at: datetime | None = None + channel_type: str + message_type: str + send_status: str + send_error: str | None = None + provider_message_id: str | None = None + triggered_by_state: str | None = None + content_preview: str | None = None + run_state: str | None = None + agent_id: str | None = None + run_created_at: datetime | None = None + callback_reply: dict[str, Any] + run_detail_href: str | None = None + + +class ListCallbackRepliesResponse(BaseModel): + items: list[CallbackReplyItem] + total: int + page: int + per_page: int + + class ApprovalItem(BaseModel): run_id: UUID project_id: str @@ -130,6 +163,36 @@ async def list_runs( ) +@router.get( + "/runs/callback-replies", + response_model=ListCallbackRepliesResponse, + summary="列出 Telegram Callback Reply Evidence", + description=( + "從 AwoooP outbound mirror 查詢 Telegram 詳情 / 歷史 callback reply 的" + "送達、fallback、救援與失敗證據;只讀,不修改 incident、run 或 Telegram 狀態。" + ), +) +async def list_callback_replies( + project_id: str | None = Query(None, description="租戶 ID(可選)"), + callback_reply_status: str | None = Query( + None, + description="Telegram callback reply 狀態 filter(sent/fallback_sent/rescue_sent/failed/observed/no_callback)", + ), + action: str | None = Query(None, description="Callback action filter(例如 detail/history)"), + incident_id: str | None = Query(None, description="關聯 Incident ID filter(可選)"), + page: int = Query(1, ge=1, description="頁碼,從 1 開始"), + per_page: int = Query(20, ge=1, le=_MAX_PER_PAGE, description="每頁筆數"), +) -> dict[str, Any]: + return await list_callback_replies_svc( + project_id=project_id, + callback_reply_status=callback_reply_status, + action=action, + incident_id=incident_id, + page=page, + per_page=per_page, + ) + + @router.get( "/runs/{run_id}/detail", summary="查詢 Run 詳細時間線", diff --git a/apps/api/src/services/platform_operator_service.py b/apps/api/src/services/platform_operator_service.py index 3b491f9f..b4f08414 100644 --- a/apps/api/src/services/platform_operator_service.py +++ b/apps/api/src/services/platform_operator_service.py @@ -10,6 +10,7 @@ from __future__ import annotations import re import uuid +from collections.abc import Mapping from collections import defaultdict from datetime import UTC, datetime from typing import Any @@ -65,6 +66,13 @@ _CALLBACK_REPLY_STATUS_FILTERS = { "failed", "observed", } +_CALLBACK_REPLY_RAW_STATUS_BY_FILTER = { + "sent": "callback_reply_sent", + "fallback_sent": "callback_reply_fallback_sent", + "rescue_sent": "callback_reply_rescue_sent", + "failed": "callback_reply_failed", +} +_CALLBACK_REPLY_ACTION_RE = re.compile(r"^[a-z0-9_:-]{1,64}$", re.IGNORECASE) # ============================================================================= # Tenants @@ -246,6 +254,116 @@ async def list_runs( return {"runs": runs, "total": total, "page": page, "per_page": per_page} +async def list_callback_replies( + project_id: str | None, + callback_reply_status: str | None, + action: str | None, + incident_id: str | None, + page: int, + per_page: int, +) -> dict[str, Any]: + """列出 Telegram detail/history callback reply evidence,不改 runtime 狀態。""" + _validate_callback_reply_status_filter(callback_reply_status) + callback_action = _validate_callback_reply_action_filter(action) + _validate_incident_id_filter(incident_id) + + if callback_reply_status == "no_callback": + return { + "items": [], + "total": 0, + "page": page, + "per_page": per_page, + } + + where_clauses = [ + "m.source_envelope ? 'callback_reply'", + "(:project_id IS NULL OR m.project_id = :project_id)", + ] + params: dict[str, Any] = { + "project_id": project_id, + "limit": per_page, + "offset": (page - 1) * per_page, + } + + raw_status = _CALLBACK_REPLY_RAW_STATUS_BY_FILTER.get( + str(callback_reply_status or "") + ) + if raw_status: + where_clauses.append( + "m.source_envelope #>> '{callback_reply,status}' = :raw_status" + ) + params["raw_status"] = raw_status + elif callback_reply_status == "observed": + where_clauses.append( + """ + COALESCE(m.source_envelope #>> '{callback_reply,status}', '') + NOT IN ( + 'callback_reply_sent', + 'callback_reply_fallback_sent', + 'callback_reply_rescue_sent', + 'callback_reply_failed' + ) + """ + ) + + if callback_action: + where_clauses.append( + "LOWER(m.source_envelope #>> '{callback_reply,action}') = :callback_action" + ) + params["callback_action"] = callback_action + if incident_id: + where_clauses.append( + "m.source_envelope #>> '{callback_reply,incident_id}' = :incident_id" + ) + params["incident_id"] = incident_id + + where_sql = " AND ".join(where_clauses) + count_sql = text(f""" + SELECT COUNT(*) AS total + FROM awooop_outbound_message m + WHERE {where_sql} + """) + list_sql = text(f""" + SELECT + m.message_id, + m.project_id, + m.run_id, + m.channel_type, + m.message_type, + m.content_preview, + m.provider_message_id, + m.send_status, + m.send_error, + m.queued_at, + m.sent_at, + m.triggered_by_state, + m.source_envelope -> 'callback_reply' AS callback_reply, + r.agent_id, + r.state AS run_state, + r.created_at AS run_created_at + FROM awooop_outbound_message m + LEFT JOIN awooop_run_state r + ON r.project_id = m.project_id + AND r.run_id = m.run_id + WHERE {where_sql} + ORDER BY COALESCE(m.sent_at, m.queued_at) DESC, m.message_id DESC + LIMIT :limit OFFSET :offset + """) + + async with get_db_context(project_id or "awoooi") as db: + count_result = await db.execute(count_sql, params) + total = count_result.scalar_one() + rows_result = await db.execute(list_sql, params) + rows = list(rows_result.mappings().all()) + + return { + "items": [_callback_reply_event_item(row) for row in rows], + "total": total, + "page": page, + "per_page": per_page, + } + + def _timeline_item( *, ts: Any, @@ -346,6 +464,55 @@ def _outbound_callback_reply(source_envelope: Any) -> dict[str, Any] | None: return callback_reply if isinstance(callback_reply, dict) else None +def _callback_reply_public_status(callback_reply: dict[str, Any]) -> str: + """Map raw Telegram callback reply result into the Operator Console filter value.""" + raw_status = str(callback_reply.get("status") or "") + return { + "callback_reply_sent": "sent", + "callback_reply_fallback_sent": "fallback_sent", + "callback_reply_rescue_sent": "rescue_sent", + "callback_reply_failed": "failed", + }.get(raw_status, "observed") + + +def _callback_reply_event_item(row: Mapping[str, Any]) -> dict[str, Any]: + """Convert one callback reply outbound row into a read-only evidence item.""" + callback_reply = _as_dict(row.get("callback_reply")) + action = str(callback_reply.get("action") or "").strip() or None + incident_id = str(callback_reply.get("incident_id") or "").strip() or None + project_id = str(row.get("project_id") or "") + run_id = row.get("run_id") + status_value = _callback_reply_public_status(callback_reply) + event_at = row.get("sent_at") or row.get("queued_at") + + return { + "message_id": row.get("message_id"), + "run_id": run_id, + "project_id": project_id, + "status": status_value, + "needs_human": status_value == "failed", + "action": action, + "incident_id": incident_id, + "event_at": event_at, + "channel_type": row.get("channel_type"), + "message_type": row.get("message_type"), + "send_status": row.get("send_status"), + "send_error": row.get("send_error"), + "provider_message_id": row.get("provider_message_id"), + "triggered_by_state": row.get("triggered_by_state"), + "content_preview": row.get("content_preview"), + "run_state": row.get("run_state"), + "agent_id": row.get("agent_id"), + "run_created_at": row.get("run_created_at"), + "callback_reply": callback_reply, + "run_detail_href": ( + f"/awooop/runs/{run_id}?project_id={project_id}" + if run_id and project_id + else None + ), + } + + def _outbound_timeline_status( send_status: str, callback_reply: dict[str, Any] | None, @@ -444,12 +611,7 @@ def _run_callback_reply_summary( ] failed = statuses.count("callback_reply_failed") latest_status = str(latest_callback.get("status") or "") - summary_status = { - "callback_reply_sent": "sent", - "callback_reply_fallback_sent": "fallback_sent", - "callback_reply_rescue_sent": "rescue_sent", - "callback_reply_failed": "failed", - }.get(latest_status, "observed") + summary_status = _callback_reply_public_status(latest_callback) return { "schema_version": "awooop_run_callback_reply_summary_v1", @@ -769,6 +931,20 @@ def _validate_callback_reply_status_filter(value: str | None) -> None: ) +def _validate_callback_reply_action_filter(value: str | None) -> str | None: + if value is None: + return None + normalized = value.strip().lower() + if not normalized: + return None + if not _CALLBACK_REPLY_ACTION_RE.fullmatch(normalized): + raise HTTPException( + status_code=422, + detail="callback action 格式錯誤,僅允許 a-z、0-9、底線、冒號與短橫線", + ) + return normalized + + def _validate_incident_id_filter(value: str | None) -> None: if value is None: return diff --git a/apps/api/tests/test_awooop_operator_timeline_labels.py b/apps/api/tests/test_awooop_operator_timeline_labels.py index cc4280de..c197fa81 100644 --- a/apps/api/tests/test_awooop_operator_timeline_labels.py +++ b/apps/api/tests/test_awooop_operator_timeline_labels.py @@ -6,10 +6,14 @@ from uuid import UUID import pytest from fastapi import HTTPException -from src.api.v1.platform.operator_runs import ListRunsResponse +from src.api.v1.platform.operator_runs import ( + ListCallbackRepliesResponse, + ListRunsResponse, +) from src.services.platform_operator_service import ( _callback_reply_summary_matches_status, _collect_run_incident_ids, + _callback_reply_event_item, _legacy_mcp_timeline_status, _legacy_mcp_timeline_summary, _list_filter_context_limit, @@ -22,6 +26,7 @@ from src.services.platform_operator_service import ( _run_callback_reply_summary, _run_remediation_list_summary, _timeline_sort_key, + _validate_callback_reply_action_filter, _validate_callback_reply_status_filter, ) @@ -276,6 +281,103 @@ def test_list_runs_response_preserves_callback_reply_summary() -> None: assert dumped["runs"][0]["callback_reply_summary"]["needs_human"] is True +def test_callback_reply_event_item_surfaces_run_link_and_human_flag() -> None: + run_id = UUID("5c0306e0-591a-5445-9a33-80f499426b38") + message_id = UUID("56cdb6ad-46a4-48f5-9d3b-b1ac9c0b2e92") + + item = _callback_reply_event_item({ + "message_id": message_id, + "run_id": run_id, + "project_id": "awoooi", + "channel_type": "telegram", + "message_type": "error", + "send_status": "failed", + "send_error": "HTTP error: 400", + "provider_message_id": "telegram_callback_reply:failed", + "queued_at": datetime(2026, 5, 18, 7, 31, 37), + "sent_at": None, + "triggered_by_state": "callback_reply", + "content_preview": "無法取得歷史統計", + "run_state": "completed", + "agent_id": "legacy-telegram-gateway", + "run_created_at": datetime(2026, 5, 18, 7, 30, 0), + "callback_reply": { + "status": "callback_reply_failed", + "action": "history", + "incident_id": "INC-20260513-79ED5E", + "error": "HTTP error: 400", + }, + }) + + assert item["status"] == "failed" + assert item["needs_human"] is True + assert item["action"] == "history" + assert item["incident_id"] == "INC-20260513-79ED5E" + assert item["event_at"] == datetime(2026, 5, 18, 7, 31, 37) + assert item["run_detail_href"] == ( + "/awooop/runs/5c0306e0-591a-5445-9a33-80f499426b38?project_id=awoooi" + ) + + +def test_list_callback_replies_response_preserves_callback_evidence() -> None: + run_id = UUID("5c0306e0-591a-5445-9a33-80f499426b38") + message_id = UUID("56cdb6ad-46a4-48f5-9d3b-b1ac9c0b2e92") + response = ListCallbackRepliesResponse.model_validate({ + "items": [ + { + "message_id": message_id, + "run_id": run_id, + "project_id": "awoooi", + "status": "fallback_sent", + "needs_human": False, + "action": "detail", + "incident_id": "INC-20260513-79ED5E", + "event_at": datetime(2026, 5, 18, 7, 31, 37), + "channel_type": "telegram", + "message_type": "final", + "send_status": "sent", + "send_error": None, + "provider_message_id": "123", + "triggered_by_state": "callback_reply", + "content_preview": "事件詳情", + "run_state": "completed", + "agent_id": "legacy-telegram-gateway", + "run_created_at": datetime(2026, 5, 18, 7, 30, 0), + "callback_reply": { + "status": "callback_reply_fallback_sent", + "action": "detail", + "incident_id": "INC-20260513-79ED5E", + }, + "run_detail_href": ( + "/awooop/runs/5c0306e0-591a-5445-9a33-80f499426b38" + "?project_id=awoooi" + ), + } + ], + "total": 1, + "page": 1, + "per_page": 20, + }) + + dumped = response.model_dump(mode="json") + assert dumped["items"][0]["status"] == "fallback_sent" + assert dumped["items"][0]["callback_reply"]["action"] == "detail" + assert dumped["items"][0]["run_detail_href"].endswith("project_id=awoooi") + + +def test_callback_reply_action_filter_normalizes_safe_actions() -> None: + assert _validate_callback_reply_action_filter(" History ") == "history" + assert _validate_callback_reply_action_filter("incident:detail-2") == ( + "incident:detail-2" + ) + assert _validate_callback_reply_action_filter("") is None + + +def test_callback_reply_action_filter_rejects_unsafe_values() -> None: + with pytest.raises(HTTPException): + _validate_callback_reply_action_filter("detail;drop") + + def test_remediation_timeline_summary_surfaces_route_and_write_flags() -> None: summary = _remediation_timeline_summary({ "incident_id": "INC-20260514-F85F21", diff --git a/apps/web/messages/en.json b/apps/web/messages/en.json index 1037d303..953a4b52 100644 --- a/apps/web/messages/en.json +++ b/apps/web/messages/en.json @@ -1895,6 +1895,19 @@ "rescueSent": "The Telegram fallback also failed, then rescue plain text was delivered.", "failed": "The Telegram callback reply ultimately failed to deliver and needs human review.", "observed": "The Telegram callback reply was recorded with a non-standard status." + }, + "events": { + "title": "TG Callback Evidence", + "subtitle": "Detail / history reply evidence from the AwoooP outbound mirror", + "total": "{count} items", + "empty": "No callback reply evidence yet.", + "error": "Callback evidence failed to load: {error}", + "action": "Action: {action}", + "incident": "Incident: {incidentId}", + "sendStatus": "Send status: {status}", + "providerMessage": "Message: {messageId}", + "previewEmpty": "No preview", + "openRun": "Open Run" } }, "incidentEvidence": { diff --git a/apps/web/messages/zh-TW.json b/apps/web/messages/zh-TW.json index f3f3c047..bf61da60 100644 --- a/apps/web/messages/zh-TW.json +++ b/apps/web/messages/zh-TW.json @@ -1896,6 +1896,19 @@ "rescueSent": "Telegram fallback 仍失敗後,已用救援純文字送達。", "failed": "Telegram callback reply 最終送達失敗,需人工確認。", "observed": "Telegram callback reply 已記錄,但狀態不屬於標準分類。" + }, + "events": { + "title": "TG Callback Evidence", + "subtitle": "詳情 / 歷史回覆證據來自 AwoooP outbound mirror", + "total": "{count} 筆", + "empty": "目前尚無 callback reply evidence。", + "error": "Callback evidence 載入失敗:{error}", + "action": "動作:{action}", + "incident": "Incident:{incidentId}", + "sendStatus": "送訊狀態:{status}", + "providerMessage": "Message:{messageId}", + "previewEmpty": "無摘要", + "openRun": "開啟 Run" } }, "incidentEvidence": { diff --git a/apps/web/src/app/[locale]/awooop/runs/page.tsx b/apps/web/src/app/[locale]/awooop/runs/page.tsx index 284dffbf..09704c82 100644 --- a/apps/web/src/app/[locale]/awooop/runs/page.tsx +++ b/apps/web/src/app/[locale]/awooop/runs/page.tsx @@ -141,6 +141,34 @@ interface RecentEventsResponse { limit: number; } +interface CallbackReplyEvent { + message_id: string; + run_id: string; + project_id: string; + status: CallbackReplyStatus | string; + needs_human: boolean; + action?: string | null; + incident_id?: string | null; + event_at?: string | null; + channel_type: string; + message_type: string; + send_status: string; + send_error?: string | null; + provider_message_id?: string | null; + triggered_by_state?: string | null; + content_preview?: string | null; + run_state?: string | null; + agent_id?: string | null; + run_detail_href?: string | null; +} + +interface CallbackRepliesResponse { + items?: CallbackReplyEvent[]; + total: number; + page: number; + per_page: number; +} + // ============================================================================= // 常數 // ============================================================================= @@ -442,6 +470,19 @@ function normalizeCallbackReplyStatus(summary?: CallbackReplySummary | null): Ca return "no_callback"; } +function normalizeCallbackReplyEventStatus(statusValue?: string | null): CallbackReplyStatus { + if ( + statusValue === "sent" || + statusValue === "fallback_sent" || + statusValue === "rescue_sent" || + statusValue === "failed" || + statusValue === "observed" + ) { + return statusValue; + } + return "observed"; +} + function RemediationEvidenceCell({ summary }: { summary?: RemediationSummary | null }) { const t = useTranslations("awooop.listEvidence"); const status = normalizeRemediationStatus(summary); @@ -716,6 +757,107 @@ function GroupedAlertEventsPanel({ events }: { events: PlatformEvent[] }) { ); } +function CallbackReplyEvidencePanel({ + events, + total, + error, +}: { + events: CallbackReplyEvent[]; + total: number; + error: string | null; +}) { + const t = useTranslations("awooop.callbackReply.events"); + const tCallback = useTranslations("awooop.callbackReply"); + + return ( +
+
+
+
+ + {t("total", { count: total })} + +
+ + {error ? ( +
+ {t("error", { error })} +
+ ) : events.length === 0 ? ( +
+ {t("empty")} +
+ ) : ( +
+ {events.slice(0, 6).map((event) => { + const status = normalizeCallbackReplyEventStatus(event.status); + const config = CALLBACK_REPLY_CONFIG[status]; + const eventTime = event.event_at + ? new Date(event.event_at).toLocaleTimeString("zh-TW", { + hour: "2-digit", + minute: "2-digit", + }) + : "--"; + const runHref = event.run_detail_href + ?? `/awooop/runs/${event.run_id}?project_id=${encodeURIComponent(event.project_id)}`; + + return ( +
+
+
+

+ {event.run_id.slice(0, 8)} +

+

+ {event.project_id} · {eventTime} +

+
+ + {tCallback(config.labelKey)} + +
+
+

{t("action", { action: event.action ?? "--" })}

+

+ {t("incident", { incidentId: event.incident_id ?? "--" })} +

+

{t("sendStatus", { status: event.send_status })}

+ {event.provider_message_id && ( +

+ {t("providerMessage", { messageId: event.provider_message_id })} +

+ )} +
+

+ {event.content_preview || t("previewEmpty")} +

+ +
+ ); + })} +
+ )} +
+ ); +} + // ============================================================================= // Main Component // ============================================================================= @@ -725,6 +867,9 @@ export default function RunsPage() { const tCallback = useTranslations("awooop.callbackReply"); const [runs, setRuns] = useState([]); const [groupedEvents, setGroupedEvents] = useState([]); + const [callbackEvents, setCallbackEvents] = useState([]); + const [callbackEventsTotal, setCallbackEventsTotal] = useState(0); + const [callbackEventsError, setCallbackEventsError] = useState(null); const [tenants, setTenants] = useState([]); const [total, setTotal] = useState(0); const [loading, setLoading] = useState(true); @@ -804,6 +949,29 @@ export default function RunsPage() { setGroupedEvents(Array.isArray(eventsData.events) ? eventsData.events : []); } + const callbackParams = new URLSearchParams(); + callbackParams.set("per_page", "6"); + if (projectFilter) callbackParams.set("project_id", projectFilter); + if (callbackFilter) { + callbackParams.set("callback_reply_status", callbackFilter); + } + if (INCIDENT_ID_FILTER_RE.test(normalizedIncidentFilter)) { + callbackParams.set("incident_id", normalizedIncidentFilter); + } + const callbackRes = await fetch( + `${API_BASE}/api/v1/platform/runs/callback-replies?${callbackParams.toString()}` + ); + if (callbackRes.ok) { + const callbackData: CallbackRepliesResponse = await callbackRes.json(); + setCallbackEvents(Array.isArray(callbackData.items) ? callbackData.items : []); + setCallbackEventsTotal(callbackData.total ?? 0); + setCallbackEventsError(null); + } else { + setCallbackEvents([]); + setCallbackEventsTotal(0); + setCallbackEventsError(`HTTP ${callbackRes.status}`); + } + setLastRefresh(new Date()); } catch (err) { setError(err instanceof Error ? err.message : "載入失敗"); @@ -995,6 +1163,12 @@ export default function RunsPage() { + + {/* Filters */}