feat(awooop): surface source dossier coverage
This commit is contained in:
@@ -13,7 +13,10 @@ from uuid import UUID
|
||||
from fastapi import APIRouter, Query
|
||||
from pydantic import BaseModel
|
||||
|
||||
from src.services.channel_event_dossier_service import fetch_channel_event_dossier
|
||||
from src.services.channel_event_dossier_service import (
|
||||
fetch_channel_event_dossier,
|
||||
fetch_channel_event_dossier_coverage,
|
||||
)
|
||||
from src.services.platform_operator_service import list_recent_channel_events
|
||||
|
||||
router = APIRouter()
|
||||
@@ -77,6 +80,41 @@ class ChannelEventDossierResponse(BaseModel):
|
||||
summary: ChannelEventDossierSummary
|
||||
|
||||
|
||||
class ChannelEventProviderCoverage(BaseModel):
|
||||
provider: str
|
||||
total: int
|
||||
duplicate_total: int
|
||||
redacted_total: int
|
||||
source_ref_total: int
|
||||
missing_source_refs_total: int
|
||||
sentry_ref_total: int
|
||||
signoz_ref_total: int
|
||||
alert_ref_total: int
|
||||
latest_received_at: datetime | None
|
||||
|
||||
|
||||
class ChannelEventDossierCoverageSummary(BaseModel):
|
||||
source_count: int
|
||||
source_envelope_total: int
|
||||
missing_source_envelope_total: int
|
||||
with_source_refs_total: int
|
||||
missing_source_refs_total: int
|
||||
duplicate_total: int
|
||||
redacted_total: int
|
||||
source_ref_total: int
|
||||
sentry_ref_total: int
|
||||
signoz_ref_total: int
|
||||
alert_ref_total: int
|
||||
latest_received_at: datetime | None
|
||||
|
||||
|
||||
class ChannelEventDossierCoverageResponse(BaseModel):
|
||||
project_id: str
|
||||
limit: int
|
||||
summary: ChannelEventDossierCoverageSummary
|
||||
providers: list[ChannelEventProviderCoverage]
|
||||
|
||||
|
||||
@router.get(
|
||||
"/events/dossier",
|
||||
response_model=ChannelEventDossierResponse,
|
||||
@@ -100,6 +138,27 @@ async def get_event_dossier(
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/events/dossier/coverage",
|
||||
response_model=ChannelEventDossierCoverageResponse,
|
||||
summary="查詢 Channel Event 來源卷宗覆蓋率",
|
||||
description=(
|
||||
"返回近期 inbound event 的 source_envelope / source_refs / 去重 / "
|
||||
"Sentry / SignOz 關聯覆蓋率,供 AwoooP Run List 顯示告警是否已入庫。"
|
||||
),
|
||||
)
|
||||
async def get_event_dossier_coverage(
|
||||
project_id: str | None = Query(None, description="租戶 ID(可選)"),
|
||||
provider: str | None = Query(None, description="provider(可選,如 sentry / signoz)"),
|
||||
limit: int = Query(100, ge=1, le=200, description="最多納入統計筆數"),
|
||||
) -> dict[str, Any]:
|
||||
return await fetch_channel_event_dossier_coverage(
|
||||
project_id=project_id,
|
||||
provider=provider,
|
||||
limit=limit,
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/events/recent",
|
||||
response_model=RecentEventsResponse,
|
||||
|
||||
@@ -16,6 +16,7 @@ from sqlalchemy import text
|
||||
from src.db.base import get_db_context
|
||||
|
||||
_MAX_DOSSIER_EVENTS = 50
|
||||
_MAX_COVERAGE_EVENTS = 200
|
||||
|
||||
|
||||
def _as_dict(value: Any) -> dict[str, Any]:
|
||||
@@ -32,6 +33,106 @@ def _compact_ref_count(source_refs: dict[str, Any]) -> int:
|
||||
return total
|
||||
|
||||
|
||||
def _ref_count(source_refs: dict[str, Any], key: str) -> int:
|
||||
value = source_refs.get(key)
|
||||
if isinstance(value, list):
|
||||
return len(value)
|
||||
return 1 if value else 0
|
||||
|
||||
|
||||
def build_dossier_coverage(
|
||||
rows: list[dict[str, Any]],
|
||||
*,
|
||||
project_id: str,
|
||||
limit: int,
|
||||
) -> dict[str, Any]:
|
||||
"""Summarize recent inbound source envelopes for console coverage checks."""
|
||||
events = [build_dossier_event(row) for row in rows]
|
||||
provider_map: dict[str, dict[str, Any]] = {}
|
||||
source_envelope_total = 0
|
||||
with_source_refs_total = 0
|
||||
sentry_ref_total = 0
|
||||
signoz_ref_total = 0
|
||||
alert_ref_total = 0
|
||||
|
||||
for row, event in zip(rows, events, strict=False):
|
||||
envelope = _as_dict(row.get("source_envelope"))
|
||||
if envelope:
|
||||
source_envelope_total += 1
|
||||
|
||||
source_refs = _as_dict(event.get("source_refs"))
|
||||
source_ref_count = int(event.get("source_ref_count") or 0)
|
||||
if source_ref_count > 0:
|
||||
with_source_refs_total += 1
|
||||
|
||||
provider = str(event.get("provider") or row.get("channel_type") or "unknown")
|
||||
provider_item = provider_map.setdefault(
|
||||
provider,
|
||||
{
|
||||
"provider": provider,
|
||||
"total": 0,
|
||||
"duplicate_total": 0,
|
||||
"redacted_total": 0,
|
||||
"source_ref_total": 0,
|
||||
"missing_source_refs_total": 0,
|
||||
"sentry_ref_total": 0,
|
||||
"signoz_ref_total": 0,
|
||||
"alert_ref_total": 0,
|
||||
"latest_received_at": None,
|
||||
},
|
||||
)
|
||||
provider_item["total"] += 1
|
||||
provider_item["source_ref_total"] += source_ref_count
|
||||
if event.get("is_duplicate"):
|
||||
provider_item["duplicate_total"] += 1
|
||||
if event.get("has_redacted_content"):
|
||||
provider_item["redacted_total"] += 1
|
||||
if source_ref_count <= 0:
|
||||
provider_item["missing_source_refs_total"] += 1
|
||||
|
||||
event_sentry_refs = _ref_count(source_refs, "sentry_issue_ids")
|
||||
event_signoz_refs = _ref_count(source_refs, "signoz_alerts")
|
||||
event_alert_refs = _ref_count(source_refs, "alert_ids")
|
||||
sentry_ref_total += event_sentry_refs
|
||||
signoz_ref_total += event_signoz_refs
|
||||
alert_ref_total += event_alert_refs
|
||||
provider_item["sentry_ref_total"] += event_sentry_refs
|
||||
provider_item["signoz_ref_total"] += event_signoz_refs
|
||||
provider_item["alert_ref_total"] += event_alert_refs
|
||||
provider_item["latest_received_at"] = (
|
||||
provider_item["latest_received_at"] or event.get("received_at")
|
||||
)
|
||||
|
||||
duplicate_total = sum(1 for event in events if event.get("is_duplicate"))
|
||||
redacted_total = sum(1 for event in events if event.get("has_redacted_content"))
|
||||
source_ref_total = sum(int(event.get("source_ref_count") or 0) for event in events)
|
||||
missing_source_refs_total = len(events) - with_source_refs_total
|
||||
missing_source_envelope_total = len(events) - source_envelope_total
|
||||
|
||||
return {
|
||||
"project_id": project_id,
|
||||
"limit": limit,
|
||||
"summary": {
|
||||
"source_count": len(events),
|
||||
"source_envelope_total": source_envelope_total,
|
||||
"missing_source_envelope_total": missing_source_envelope_total,
|
||||
"with_source_refs_total": with_source_refs_total,
|
||||
"missing_source_refs_total": missing_source_refs_total,
|
||||
"duplicate_total": duplicate_total,
|
||||
"redacted_total": redacted_total,
|
||||
"source_ref_total": source_ref_total,
|
||||
"sentry_ref_total": sentry_ref_total,
|
||||
"signoz_ref_total": signoz_ref_total,
|
||||
"alert_ref_total": alert_ref_total,
|
||||
"latest_received_at": events[0].get("received_at") if events else None,
|
||||
},
|
||||
"providers": sorted(
|
||||
provider_map.values(),
|
||||
key=lambda item: (-int(item.get("total") or 0), str(item.get("provider") or "")),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def build_dossier_event(row: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Normalize a DB row into the front-end event dossier shape."""
|
||||
envelope = _as_dict(row.get("source_envelope"))
|
||||
@@ -136,3 +237,56 @@ async def fetch_channel_event_dossier(
|
||||
"source_ref_total": sum(int(event.get("source_ref_count") or 0) for event in events),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
async def fetch_channel_event_dossier_coverage(
|
||||
*,
|
||||
project_id: str | None,
|
||||
provider: str | None,
|
||||
limit: int,
|
||||
) -> dict[str, Any]:
|
||||
"""Fetch a read-only coverage summary for recent inbound channel events."""
|
||||
effective_project_id = project_id or "awoooi"
|
||||
safe_limit = max(1, min(limit, _MAX_COVERAGE_EVENTS))
|
||||
where_clauses = ["project_id = :project_id"]
|
||||
params: dict[str, Any] = {
|
||||
"project_id": effective_project_id,
|
||||
"limit": safe_limit,
|
||||
}
|
||||
if provider:
|
||||
where_clauses.append(
|
||||
"COALESCE(NULLIF(source_envelope->>'provider', ''), "
|
||||
"split_part(provider_event_id, ':', 1), channel_type) = :provider"
|
||||
)
|
||||
params["provider"] = provider
|
||||
|
||||
async with get_db_context(effective_project_id) as db:
|
||||
result = await db.execute(
|
||||
text(f"""
|
||||
SELECT
|
||||
event_id,
|
||||
project_id,
|
||||
channel_type,
|
||||
provider_event_id,
|
||||
content_hash,
|
||||
content_preview,
|
||||
content_redacted,
|
||||
redaction_version,
|
||||
source_envelope,
|
||||
is_duplicate,
|
||||
provider_ts,
|
||||
received_at
|
||||
FROM awooop_conversation_event
|
||||
WHERE {" AND ".join(where_clauses)}
|
||||
ORDER BY received_at DESC
|
||||
LIMIT :limit
|
||||
"""),
|
||||
params,
|
||||
)
|
||||
rows = [dict(row) for row in result.mappings().all()]
|
||||
|
||||
return build_dossier_coverage(
|
||||
rows,
|
||||
project_id=effective_project_id,
|
||||
limit=safe_limit,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user