Files
awoooi/apps/api/src/api/v1/platform/events.py
Your Name ced36f2521
Some checks failed
CD Pipeline / tests (push) Failing after 6s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
Code Review / ai-code-review (push) Failing after 8s
feat(awooop): add source provider freshness heartbeat
2026-05-20 19:32:22 +08:00

494 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
AwoooP Operator Console — Channel Events API
============================================
提供 Operator Console 讀取 Communication Hub / legacy mirror 的事件摘要。
"""
from __future__ import annotations
from datetime import UTC, datetime
from typing import Annotated, Any, Literal
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from src.core.awooop_operator_auth import (
AwoooPOperatorPrincipal,
verify_awooop_operator,
)
from src.services.channel_event_dossier_service import (
RecurrenceWorkItemHandoffKind,
RecurrenceWorkItemMode,
RecurrenceWorkItemNotFoundError,
fetch_channel_event_dossier,
fetch_channel_event_dossier_coverage,
fetch_channel_event_dossier_recurrence,
fetch_recurrence_work_item_dry_run,
fetch_recurrence_work_item_handoff,
fetch_recurrence_work_item_preview,
)
from src.services.channel_hub import record_external_alert_event
from src.services.platform_operator_service import list_recent_channel_events
router = APIRouter()
class ChannelEventItem(BaseModel):
event_id: UUID
project_id: str
channel_type: str
provider_event_id: str
channel_chat_id: str | None
content_preview: str | None
is_duplicate: bool
received_at: datetime
class RecentEventsResponse(BaseModel):
events: list[ChannelEventItem]
total: int
limit: int
class ChannelEventDossierItem(BaseModel):
event_id: UUID
project_id: str
channel_type: str
provider: str | None
stage: str
provider_event_id: str
content_preview: str | None
content_redacted: str | None
has_redacted_content: bool
redaction_version: str | None
source_url: str | None
content_sha256: str | None
content_length: int | None
source_refs: dict[str, Any]
source_ref_count: int
log_correlation: dict[str, Any]
alertname: str | None
severity: str | None
namespace: str | None
target_resource: str | None
fingerprint: str | None
is_duplicate: bool
provider_ts: datetime | None
received_at: datetime
class ChannelEventDossierSummary(BaseModel):
source_count: int
duplicate_total: int
redacted_total: int
source_ref_total: int
class ChannelEventDossierResponse(BaseModel):
events: list[ChannelEventDossierItem]
total: int
limit: int
summary: ChannelEventDossierSummary
class ChannelEventProviderCoverage(BaseModel):
provider: str
total: int
duplicate_total: int
redacted_total: int
source_ref_total: int
missing_source_refs_total: int
sentry_ref_total: int
signoz_ref_total: int
alert_ref_total: int
latest_received_at: datetime | None
class ChannelEventDossierCoverageSummary(BaseModel):
source_count: int
source_envelope_total: int
missing_source_envelope_total: int
with_source_refs_total: int
missing_source_refs_total: int
duplicate_total: int
redacted_total: int
source_ref_total: int
sentry_ref_total: int
signoz_ref_total: int
alert_ref_total: int
latest_received_at: datetime | None
class ChannelEventDossierCoverageResponse(BaseModel):
project_id: str
limit: int
summary: ChannelEventDossierCoverageSummary
providers: list[ChannelEventProviderCoverage]
SourceProviderName = Literal["sentry", "signoz"]
class SourceProviderHeartbeatRequest(BaseModel):
"""Low-noise freshness heartbeat for external source-provider mirrors."""
project_id: str = Field(default="awoooi", min_length=1, max_length=64)
providers: list[SourceProviderName] = Field(
default_factory=lambda: ["sentry", "signoz"],
min_length=1,
max_length=2,
)
reason: str = Field(
default="scheduled_provider_freshness_smoke",
min_length=1,
max_length=120,
)
run_ref: str | None = Field(default=None, max_length=120)
class SourceProviderHeartbeatItem(BaseModel):
provider: SourceProviderName
event_id: str
conversation_event_id: UUID
class SourceProviderHeartbeatResponse(BaseModel):
status: str
project_id: str
items: list[SourceProviderHeartbeatItem]
class ChannelEventRecurrenceSummary(BaseModel):
source_event_total: int
recurrence_group_total: int
recurrent_group_total: int
duplicate_event_total: int
linked_run_total: int
unlinked_event_total: int
auto_repair_linked_total: int = 0
verified_repair_group_total: int = 0
open_work_item_group_total: int = 0
manual_gate_group_total: int = 0
automation_gap_group_total: int = 0
failed_repair_group_total: int = 0
latest_received_at: datetime | None
class ChannelEventRecurrenceItem(BaseModel):
recurrence_key: str
provider: str | None
alertname: str | None
severity: str | None
namespace: str | None
target_resource: str | None
fingerprint: str | None
latest_event_id: UUID | None
latest_provider_event_id: str | None
latest_content_preview: str | None
latest_run_id: UUID | None
latest_run_state: str | None
latest_agent_id: str | None
latest_incident_id: str | None = None
incident_ids: list[str] = Field(default_factory=list)
repair_summary: dict[str, Any] | None = None
work_item: dict[str, Any] | None = None
occurrence_total: int
duplicate_total: int
linked_run_total: int
source_ref_total: int
missing_source_refs_total: int
sentry_ref_total: int
signoz_ref_total: int
alert_ref_total: int
run_state_counts: dict[str, int]
first_received_at: datetime | None
latest_received_at: datetime | None
class ChannelEventRecurrenceResponse(BaseModel):
project_id: str
limit: int
summary: ChannelEventRecurrenceSummary
items: list[ChannelEventRecurrenceItem]
class RecurrenceWorkItemDryRunRequest(BaseModel):
"""AwoooP recurrence work item dry-run request."""
project_id: str | None = Field(default=None, min_length=1)
work_item_id: str = Field(min_length=1)
mode: RecurrenceWorkItemMode = "auto"
provider: str | None = Field(default=None, min_length=1)
limit: int = Field(default=300, ge=1, le=300)
class RecurrenceWorkItemHandoffRequest(BaseModel):
"""AwoooP recurrence work item handoff request."""
project_id: str | None = Field(default=None, min_length=1)
work_item_id: str = Field(min_length=1)
mode: RecurrenceWorkItemMode = "auto"
handoff_kind: RecurrenceWorkItemHandoffKind = "ticket_proposal"
provider: str | None = Field(default=None, min_length=1)
limit: int = Field(default=300, ge=1, le=300)
@router.get(
"/events/dossier",
response_model=ChannelEventDossierResponse,
summary="查詢 Channel Event 來源卷宗",
description=(
"返回 redacted inbound source envelope供 AwoooP Run Detail 顯示"
"告警來源、source refs、Sentry / SignOz / Alertmanager 關聯與去重狀態。"
),
)
async def get_event_dossier(
project_id: str | None = Query(None, description="租戶 ID可選"),
run_id: UUID | None = Query(None, description="Run ID可選"),
provider_event_id: str | None = Query(
None, description="provider_event_id可選"
),
limit: int = Query(20, ge=1, le=50, description="最多返回筆數"),
) -> dict[str, Any]:
return await fetch_channel_event_dossier(
project_id=project_id,
run_id=run_id,
provider_event_id=provider_event_id,
limit=limit,
)
@router.get(
"/events/dossier/coverage",
response_model=ChannelEventDossierCoverageResponse,
summary="查詢 Channel Event 來源卷宗覆蓋率",
description=(
"返回近期 inbound event 的 source_envelope / source_refs / 去重 / "
"Sentry / SignOz 關聯覆蓋率,供 AwoooP Run List 顯示告警是否已入庫。"
),
)
async def get_event_dossier_coverage(
project_id: str | None = Query(None, description="租戶 ID可選"),
provider: str | None = Query(
None, description="provider可選如 sentry / signoz"
),
limit: int = Query(100, ge=1, le=200, description="最多納入統計筆數"),
) -> dict[str, Any]:
return await fetch_channel_event_dossier_coverage(
project_id=project_id,
provider=provider,
limit=limit,
)
@router.post(
"/events/dossier/provider-heartbeat",
response_model=SourceProviderHeartbeatResponse,
summary="寫入 Sentry / SignOz 來源卷宗 freshness heartbeat",
description=(
"受 AwoooP operator key 保護的低噪音 smoke。只寫入來源卷宗與"
"completed shadow run不建立 Incident、不送 Telegram、不宣稱真實上游告警。"
),
)
async def create_source_provider_heartbeat(
payload: SourceProviderHeartbeatRequest,
operator: Annotated[
AwoooPOperatorPrincipal,
Depends(verify_awooop_operator),
],
) -> dict[str, Any]:
timestamp = datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ")
items: list[dict[str, Any]] = []
for provider in payload.providers:
event_id = f"heartbeat-{timestamp}"
event_uuid = await record_external_alert_event(
project_id=payload.project_id,
provider=provider,
event_id=event_id,
stage="heartbeat",
title="SourceProviderHeartbeat",
severity="info",
namespace="awoooi-prod",
target_resource="source-provider-ingestion",
fingerprint=f"source-provider-heartbeat:{provider}",
labels={
"provider": provider,
"synthetic": "true",
"alert_category": "alertchain_provider_freshness",
"telegram": "not_sent",
"incident": "not_created",
},
annotations={
"summary": (
"Low-noise provider freshness smoke; verifies AwoooP "
"source dossier ingestion without creating an incident."
),
"reason": payload.reason,
},
payload={
"reason": payload.reason,
"run_ref": payload.run_ref,
"operator_id": operator.operator_id,
"auth_method": operator.auth_method,
"synthetic": True,
"side_effects": {
"incident_created": False,
"telegram_sent": False,
"approval_created": False,
},
},
)
if event_uuid is None:
raise HTTPException(
status_code=500,
detail=f"{provider} provider heartbeat was not recorded",
)
items.append(
{
"provider": provider,
"event_id": event_id,
"conversation_event_id": event_uuid,
}
)
return {
"status": "recorded",
"project_id": payload.project_id,
"items": items,
}
@router.get(
"/events/dossier/recurrence",
response_model=ChannelEventRecurrenceResponse,
summary="查詢 Channel Event 重複發生與關聯 Run 狀態",
description=(
"將近期 inbound source events 依 fingerprint / alertname / namespace / target 分組,"
"顯示重複發生次數、去重數、source refs 與最新 linked run 狀態。"
),
)
async def get_event_dossier_recurrence(
project_id: str | None = Query(None, description="租戶 ID可選"),
provider: str | None = Query(
None, description="provider可選如 alertmanager / sentry / signoz"
),
limit: int = Query(100, ge=1, le=300, description="最多納入統計筆數"),
) -> dict[str, Any]:
return await fetch_channel_event_dossier_recurrence(
project_id=project_id,
provider=provider,
limit=limit,
)
@router.get(
"/events/dossier/recurrence/work-item/preview",
summary="預覽重複告警工作項的安全處理計畫",
description=(
"依 recurrence read model 找出指定 work_item返回下一步、pre-flight checks "
"與 read-only / no-write 保證;不修改 incident、auto-repair 或 ticket 狀態。"
),
)
async def preview_event_recurrence_work_item(
work_item_id: str = Query(..., min_length=1, description="recurrence work_item_id"),
project_id: str | None = Query(None, description="租戶 ID可選"),
provider: str | None = Query(
None, description="provider可選如 alertmanager / sentry / signoz"
),
mode: RecurrenceWorkItemMode = Query("auto", description="預覽模式"),
limit: int = Query(300, ge=1, le=300, description="最多納入統計筆數"),
) -> dict[str, Any]:
try:
return await fetch_recurrence_work_item_preview(
project_id=project_id,
work_item_id=work_item_id,
mode=mode,
provider=provider,
limit=limit,
)
except RecurrenceWorkItemNotFoundError as exc:
raise HTTPException(
status_code=404,
detail="recurrence_work_item_not_found",
) from exc
@router.post(
"/events/dossier/recurrence/work-item/dry-run",
summary="乾跑重複告警工作項的安全處理流程",
description=(
"依 recurrence read model 產生 dry-run 結果並寫入 pre-flight history"
"但不修改 incident、auto-repair 或 ticket 狀態。"
),
)
async def dry_run_event_recurrence_work_item(
request: RecurrenceWorkItemDryRunRequest,
) -> dict[str, Any]:
try:
return await fetch_recurrence_work_item_dry_run(
project_id=request.project_id,
work_item_id=request.work_item_id,
mode=request.mode,
provider=request.provider,
limit=request.limit,
)
except RecurrenceWorkItemNotFoundError as exc:
raise HTTPException(
status_code=404,
detail="recurrence_work_item_not_found",
) from exc
@router.post(
"/events/dossier/recurrence/work-item/handoff",
summary="記錄重複告警工作項的交接提案",
description=(
"依 recurrence read model 與 dry-run 結果記錄 ticket proposal / 人工接手歷史,"
"但不修改 incident、auto-repair 或外部 ticket 狀態。"
),
)
async def handoff_event_recurrence_work_item(
request: RecurrenceWorkItemHandoffRequest,
) -> dict[str, Any]:
try:
return await fetch_recurrence_work_item_handoff(
project_id=request.project_id,
work_item_id=request.work_item_id,
mode=request.mode,
handoff_kind=request.handoff_kind,
provider=request.provider,
limit=request.limit,
)
except RecurrenceWorkItemNotFoundError as exc:
raise HTTPException(
status_code=404,
detail="recurrence_work_item_not_found",
) from exc
@router.get(
"/events/recent",
response_model=RecentEventsResponse,
summary="列出最近 Channel Events",
description=(
"返回 awooop_conversation_event 最近事件。"
"可用 channel_type / provider_prefix 過濾,例如 alert-group 收斂事件。"
),
)
async def list_recent_events(
project_id: str | None = Query(None, description="租戶 ID可選"),
channel_type: str | None = Query(None, description="通道類型(可選)"),
provider_prefix: str | None = Query(
None, description="provider_event_id 前綴(可選)"
),
limit: int = Query(20, ge=1, le=100, description="最多返回筆數"),
) -> dict[str, Any]:
return await list_recent_channel_events(
project_id=project_id,
channel_type=channel_type,
provider_prefix=provider_prefix,
limit=limit,
)