fix(api): expose recent event source summaries
All checks were successful
CD Pipeline / tests (push) Successful in 1m30s
Code Review / ai-code-review (push) Successful in 12s
CD Pipeline / build-and-deploy (push) Successful in 4m27s
CD Pipeline / post-deploy-checks (push) Successful in 1m32s

This commit is contained in:
Your Name
2026-06-04 21:25:35 +08:00
parent 062e890a5e
commit 87fe932b45
3 changed files with 189 additions and 15 deletions

View File

@@ -43,9 +43,12 @@ class ChannelEventItem(BaseModel):
channel_type: str
provider_event_id: str
channel_chat_id: str | None
run_id: UUID | None = None
content_type: str | None = None
content_preview: str | None
is_duplicate: bool
received_at: datetime
source_summary: dict[str, Any] = Field(default_factory=dict)
class RecentEventsResponse(BaseModel):
@@ -279,7 +282,10 @@ class SourceCorrelationApplyRequest(BaseModel):
)
async def get_event_dossier(
project_id: str | None = Query(None, description="租戶 ID可選"),
run_id: UUID | None = Query(None, description="Run ID可選"),
run_id: Annotated[
UUID | None,
Query(description="Run ID可選"),
] = None,
provider_event_id: str | None = Query(
None, description="provider_event_id可選"
),
@@ -431,7 +437,10 @@ async def preview_event_recurrence_work_item(
provider: str | None = Query(
None, description="provider可選如 alertmanager / sentry / signoz"
),
mode: RecurrenceWorkItemMode = Query("auto", description="預覽模式"),
mode: Annotated[
RecurrenceWorkItemMode,
Query(description="預覽模式"),
] = "auto",
limit: int = Query(300, ge=1, le=300, description="最多納入統計筆數"),
) -> dict[str, Any]:
try:

View File

@@ -1649,6 +1649,82 @@ def _source_ref_count(envelope: Any) -> int:
return total
def _recent_event_source_summary(row: AwoooPConversationEvent) -> dict[str, Any]:
"""Return redaction-safe source context for recent channel events."""
envelope = _as_dict(row.source_envelope)
extra = _as_dict(envelope.get("extra"))
telegram_callback = _as_dict(
extra.get("telegram_callback_query")
or envelope.get("telegram_callback_query")
)
log_correlation = _as_dict(envelope.get("log_correlation"))
provider = (
envelope.get("provider")
or str(row.provider_event_id or "").split(":", 1)[0]
or row.channel_type
)
summary: dict[str, Any] = {
"schema_version": "awooop_recent_event_source_summary_v1",
"provider": provider,
"stage": envelope.get("stage"),
"provider_event_id": row.provider_event_id,
"source_ref_count": _source_ref_count(envelope),
"redaction_version": envelope.get("redaction_version"),
}
if telegram_callback:
summary["telegram_callback_query"] = {
"action": _string_or_none(
telegram_callback.get("callback_action")
),
"callback_ref": _string_or_none(
telegram_callback.get("callback_ref")
),
"incident_id": _string_or_none(
telegram_callback.get("incident_id")
),
"approval_id": _string_or_none(
telegram_callback.get("approval_id")
),
"message_id": _string_or_none(
telegram_callback.get("message_id")
),
"username_present": _bool_or_none(
telegram_callback.get("username_present")
),
}
if log_correlation:
summary["log_correlation"] = {
"alertname": _string_or_none(log_correlation.get("alertname")),
"severity": _string_or_none(log_correlation.get("severity")),
"namespace": _string_or_none(log_correlation.get("namespace")),
"target_resource": _string_or_none(
log_correlation.get("target_resource")
),
"fingerprint": _string_or_none(
log_correlation.get("fingerprint")
),
}
return summary
def _recent_channel_event_item(row: AwoooPConversationEvent) -> dict[str, Any]:
"""Project one recent channel event into the Operator Console DTO."""
return {
"event_id": row.event_id,
"project_id": row.project_id,
"channel_type": row.channel_type,
"provider_event_id": row.provider_event_id,
"channel_chat_id": row.channel_chat_id,
"run_id": row.run_id,
"content_type": row.content_type,
"content_preview": row.content_preview,
"is_duplicate": row.is_duplicate,
"received_at": row.received_at,
"source_summary": _recent_event_source_summary(row),
}
def _ai_route_repair_work_item(evidence: Mapping[str, Any]) -> dict[str, Any]:
target = str(evidence.get("target_resource") or "unknown").strip()
blockers = _as_string_list(evidence.get("access_blockers"))
@@ -4555,19 +4631,7 @@ async def list_recent_channel_events(
result = await db.execute(stmt.limit(safe_limit))
rows = list(result.scalars().all())
events = [
{
"event_id": r.event_id,
"project_id": r.project_id,
"channel_type": r.channel_type,
"provider_event_id": r.provider_event_id,
"channel_chat_id": r.channel_chat_id,
"content_preview": r.content_preview,
"is_duplicate": r.is_duplicate,
"received_at": r.received_at,
}
for r in rows
]
events = [_recent_channel_event_item(r) for r in rows]
return {"events": events, "total": len(events), "limit": safe_limit}

View File

@@ -39,6 +39,8 @@ from src.services.platform_operator_service import (
_outbound_timeline_status,
_outbound_timeline_summary,
_outbound_timeline_title,
_recent_channel_event_item,
_recent_event_source_summary,
_remediation_summary_matches_incident_id,
_remediation_summary_matches_status,
_remediation_timeline_summary,
@@ -85,6 +87,105 @@ def test_outbound_timeline_title_labels_cicd_status() -> None:
assert title == "TELEGRAMCI/CD 狀態通知"
def test_recent_event_source_summary_projects_telegram_callback_safely() -> None:
row = SimpleNamespace(
event_id=UUID("11111111-1111-4111-8111-111111111111"),
project_id="awoooi",
channel_type="telegram",
provider_event_id="telegram_callback:503475699",
channel_chat_id=None,
run_id=None,
content_type="callback_query",
content_preview=(
"Telegram callback_query received; action=approve; "
"incident_id=INC-20260602-5734BE"
),
is_duplicate=False,
received_at=datetime(2026, 6, 4, 7, 16, 33),
source_envelope={
"schema_version": "inbound_source_envelope_v1",
"provider": "telegram",
"stage": "received",
"provider_event_id": "telegram_callback:503475699",
"redaction_version": "audit_sink_v1",
"extra": {
"telegram_callback_query": {
"callback_query_id_sha256": "q" * 64,
"callback_data_sha256": "d" * 64,
"callback_action": "approve",
"callback_ref": "INC-20260602-5734BE",
"incident_id": "INC-20260602-5734BE",
"approval_id": None,
"message_id": "30972",
"user_id_sha256": "u" * 64,
"username_present": True,
}
},
},
)
summary = _recent_event_source_summary(row)
assert summary["schema_version"] == "awooop_recent_event_source_summary_v1"
assert summary["provider"] == "telegram"
assert summary["stage"] == "received"
assert summary["source_ref_count"] == 0
assert summary["telegram_callback_query"] == {
"action": "approve",
"callback_ref": "INC-20260602-5734BE",
"incident_id": "INC-20260602-5734BE",
"approval_id": None,
"message_id": "30972",
"username_present": True,
}
dumped = str(summary)
assert "sha256" not in dumped
assert "callback_data" not in dumped
assert "user_id" not in dumped
def test_recent_channel_event_item_includes_content_type_and_source_summary() -> None:
run_id = UUID("22222222-2222-4222-8222-222222222222")
row = SimpleNamespace(
event_id=UUID("33333333-3333-4333-8333-333333333333"),
project_id="awoooi",
channel_type="internal",
provider_event_id="alertmanager:received:alert-1",
channel_chat_id=None,
run_id=run_id,
content_type="text",
content_preview="Alertmanager inbound received",
is_duplicate=True,
received_at=datetime(2026, 6, 4, 13, 15, 2),
source_envelope={
"provider": "alertmanager",
"stage": "received",
"source_refs": {
"alert_ids": ["alert-1"],
"incident_ids": ["INC-20260603-9B2535"],
},
"log_correlation": {
"alertname": "DockerContainerUnhealthy",
"severity": "warning",
"namespace": "default",
"target_resource": "bitan-pharmacy-bitan-1",
"fingerprint": "be6a1821f6336fa44b5ec33855b9f23d",
},
},
)
item = _recent_channel_event_item(row)
assert item["run_id"] == run_id
assert item["content_type"] == "text"
assert item["source_summary"]["provider"] == "alertmanager"
assert item["source_summary"]["stage"] == "received"
assert item["source_summary"]["source_ref_count"] == 2
assert item["source_summary"]["log_correlation"]["alertname"] == (
"DockerContainerUnhealthy"
)
def test_cicd_event_item_preserves_rollout_risk_summary() -> None:
item = _cicd_event_item_from_row(
{