feat(awooop): show callback trace recovery
All checks were successful
CD Pipeline / tests (push) Successful in 1m26s
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / build-and-deploy (push) Successful in 3m41s
CD Pipeline / post-deploy-checks (push) Successful in 1m41s

This commit is contained in:
Your Name
2026-05-25 21:47:40 +08:00
parent 6a379862e7
commit b2fc03d09f
7 changed files with 213 additions and 0 deletions

View File

@@ -129,6 +129,10 @@ class CallbackReplyAuditSummary(BaseModel):
outbound_reply_markup_missing_trace_ref_latest_sent_at: datetime | None = None
outbound_reply_markup_trace_ref_gap_status: str = "clean"
outbound_reply_markup_trace_ref_gap_next_action: str = "none"
outbound_reply_markup_trace_ref_after_gap_total: int = 0
outbound_reply_markup_trace_ref_after_gap_first_sent_at: datetime | None = None
outbound_reply_markup_trace_ref_after_gap_latest_sent_at: datetime | None = None
outbound_reply_markup_trace_ref_gap_recovery_status: str = "not_needed"
outbound_reply_markup_missing_incident_ref_top_prefixes: list[
OutboundReplyMarkupGapPrefix
] = Field(default_factory=list)

View File

@@ -499,6 +499,14 @@ async def _fetch_callback_reply_audit_summary(
FROM awooop_outbound_message m
WHERE m.project_id = :project_id
AND m.channel_type = 'telegram'
),
trace_gap_cutoff AS (
SELECT
MAX(COALESCE(sent_at, queued_at))
AS latest_missing_trace_ref_at
FROM outbound
WHERE source_envelope #>> '{reply_markup,present}' = 'true'
AND NOT has_trace_ref
)
SELECT
COUNT(*) AS outbound_total,
@@ -568,6 +576,30 @@ async def _fetch_callback_reply_audit_summary(
WHERE source_envelope #>> '{reply_markup,present}' = 'true'
AND NOT has_trace_ref
) AS outbound_reply_markup_missing_trace_ref_latest_sent_at,
COUNT(*) FILTER (
WHERE source_envelope #>> '{reply_markup,present}' = 'true'
AND has_trace_ref
AND trace_gap_cutoff.latest_missing_trace_ref_at
IS NOT NULL
AND COALESCE(sent_at, queued_at)
> trace_gap_cutoff.latest_missing_trace_ref_at
) AS outbound_reply_markup_trace_ref_after_gap_total,
MIN(COALESCE(sent_at, queued_at)) FILTER (
WHERE source_envelope #>> '{reply_markup,present}' = 'true'
AND has_trace_ref
AND trace_gap_cutoff.latest_missing_trace_ref_at
IS NOT NULL
AND COALESCE(sent_at, queued_at)
> trace_gap_cutoff.latest_missing_trace_ref_at
) AS outbound_reply_markup_trace_ref_after_gap_first_sent_at,
MAX(COALESCE(sent_at, queued_at)) FILTER (
WHERE source_envelope #>> '{reply_markup,present}' = 'true'
AND has_trace_ref
AND trace_gap_cutoff.latest_missing_trace_ref_at
IS NOT NULL
AND COALESCE(sent_at, queued_at)
> trace_gap_cutoff.latest_missing_trace_ref_at
) AS outbound_reply_markup_trace_ref_after_gap_latest_sent_at,
COALESCE((
SELECT jsonb_agg(
jsonb_build_object(
@@ -711,6 +743,7 @@ async def _fetch_callback_reply_audit_summary(
WHERE source_envelope ? 'callback_reply'
) AS latest_callback_at
FROM outbound
CROSS JOIN trace_gap_cutoff
"""),
{"project_id": project_id},
)
@@ -752,6 +785,13 @@ def _callback_reply_audit_summary_from_row(
recent_1h=missing_trace_recent_1h,
recent_24h=missing_trace_recent_24h,
)
trace_ref_after_gap_total = _safe_int(
row.get("outbound_reply_markup_trace_ref_after_gap_total")
)
trace_gap_recovery_status = _trace_ref_gap_recovery_status(
missing_total=missing_trace_total,
after_gap_total=trace_ref_after_gap_total,
)
if callback_total <= 0:
snapshot_status = "no_callback"
@@ -811,6 +851,18 @@ def _callback_reply_audit_summary_from_row(
),
"outbound_reply_markup_trace_ref_gap_status": trace_gap_status,
"outbound_reply_markup_trace_ref_gap_next_action": trace_gap_next_action,
"outbound_reply_markup_trace_ref_after_gap_total": (
trace_ref_after_gap_total
),
"outbound_reply_markup_trace_ref_after_gap_first_sent_at": row.get(
"outbound_reply_markup_trace_ref_after_gap_first_sent_at"
),
"outbound_reply_markup_trace_ref_after_gap_latest_sent_at": row.get(
"outbound_reply_markup_trace_ref_after_gap_latest_sent_at"
),
"outbound_reply_markup_trace_ref_gap_recovery_status": (
trace_gap_recovery_status
),
"outbound_reply_markup_missing_incident_ref_top_prefixes": (
top_missing_prefixes
),
@@ -852,6 +904,19 @@ def _trace_ref_gap_decision(
return "legacy_backlog", "backfill_or_archive_legacy_callbacks"
def _trace_ref_gap_recovery_status(
*,
missing_total: int,
after_gap_total: int,
) -> str:
"""Describe whether traced reply_markup messages resumed after the last gap."""
if missing_total <= 0:
return "not_needed"
if after_gap_total > 0:
return "recovered_after_gap"
return "no_recovery_signal"
def _reply_markup_gap_prefixes_from_value(value: Any) -> list[dict[str, Any]]:
if not isinstance(value, list):
return []