feat(awooop): add AI alert card delivery readback

This commit is contained in:
ogt
2026-06-25 09:27:16 +08:00
parent dc91dc76e4
commit b4d9cbb69d
6 changed files with 504 additions and 3 deletions

View File

@@ -43,6 +43,9 @@ from src.services.platform_operator_service import (
from src.services.platform_operator_service import (
list_callback_replies as list_callback_replies_svc,
)
from src.services.platform_operator_service import (
list_ai_alert_card_delivery_readback as list_ai_alert_card_delivery_readback_svc,
)
from src.services.platform_operator_service import (
list_runs as list_runs_svc,
)
@@ -112,6 +115,59 @@ class CallbackReplyItem(BaseModel):
run_detail_href: str | None = None
class AiAlertCardDeliveryItem(BaseModel):
message_id: UUID
run_id: UUID
project_id: str
event_at: datetime | None = None
channel_type: str
message_type: str
send_status: str
send_error: str | None = None
provider_message_id: str | None = None
triggered_by_state: str | None = None
event_type: str
lane: str
target: str
gates: list[str]
runtime_write_gate_count: int
runtime_write_allowed: bool
candidate_only: bool
delivery_receipt_readback_required: bool
source_refs: dict[str, Any]
run_state: str | None = None
agent_id: str | None = None
run_created_at: datetime | None = None
run_detail_href: str | None = None
class AiAlertCardDeliverySummary(BaseModel):
schema_version: str
project_id: str
event_type: str | None = None
lane: str | None = None
status: str
total: int
sent_total: int
failed_total: int
pending_total: int
shadow_total: int
delivery_receipt_required_total: int
runtime_write_gate_open_count: int
runtime_write_allowed: bool
latest_sent_at: datetime | None = None
latest_queued_at: datetime | None = None
production_write_count: int = 0
class ListAiAlertCardsResponse(BaseModel):
items: list[AiAlertCardDeliveryItem]
total: int
page: int
per_page: int
summary: AiAlertCardDeliverySummary
class OutboundReplyMarkupGapPrefix(BaseModel):
prefix: str
total: int
@@ -331,6 +387,33 @@ async def list_callback_replies(
)
@router.get(
"/runs/ai-alert-cards",
response_model=ListAiAlertCardsResponse,
summary="列出 AI 自動化事件卡送達讀回",
description=(
"從 AwoooP outbound mirror 查詢 ai_automation_alert_card_v1 的"
"結構化送達讀回;只讀,不送 Telegram、不修改 incident、run 或 Wazuh 狀態。"
),
)
async def list_ai_alert_card_delivery_readback(
project_id: str | None = Query("awoooi", description="租戶 ID"),
event_type: str | None = Query(None, description="事件類型 filter"),
lane: str | None = Query(None, description="AIOps lane filter"),
page: int = Query(1, ge=1, description="頁碼,從 1 開始"),
per_page: int = Query(20, ge=1, le=_MAX_PER_PAGE, description="每頁筆數"),
refresh: bool = Query(False, description="略過短 TTL 快取並重新聚合"),
) -> dict[str, Any]:
return await list_ai_alert_card_delivery_readback_svc(
project_id=project_id,
event_type=event_type,
lane=lane,
page=page,
per_page=per_page,
refresh=refresh,
)
@router.get(
"/cicd/events",
response_model=ListCicdEventsResponse,

View File

@@ -86,6 +86,9 @@ _ADR100_GATE5_PROJECTION_TRIGGER = "adr100_runtime_replay_gate5"
_CALLBACK_REPLY_CACHE_TTL_SECONDS = int(
os.getenv("AWOOOP_CALLBACK_REPLY_CACHE_TTL_SECONDS", "20")
)
_AI_ALERT_CARD_CACHE_TTL_SECONDS = int(
os.getenv("AWOOOP_AI_ALERT_CARD_CACHE_TTL_SECONDS", "20")
)
_INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{4,}\b")
_REMEDIATION_STATUS_FILTERS = {
"mcp_observed",
@@ -1271,6 +1274,247 @@ async def list_callback_replies(
)
async def list_ai_alert_card_delivery_readback(
*,
project_id: str | None = None,
event_type: str | None = None,
lane: str | None = None,
page: int = 1,
per_page: int = 20,
refresh: bool = False,
) -> dict[str, Any]:
"""Read-only AwoooP delivery readback for AI automation alert cards."""
normalized_project_id = project_id or "awoooi"
normalized_event_type = str(event_type or "").strip()
normalized_lane = str(lane or "").strip()
normalized_page = max(int(page or 1), 1)
normalized_per_page = min(max(int(per_page or 20), 1), _MAX_PER_PAGE)
cache_key = {
"project_id": normalized_project_id,
"event_type": normalized_event_type,
"lane": normalized_lane,
"page": normalized_page,
"per_page": normalized_per_page,
}
if not refresh:
cached_response = await get_cached_operator_summary_async(
"ai_alert_card_delivery_readback",
cache_key,
ttl_seconds=_AI_ALERT_CARD_CACHE_TTL_SECONDS,
)
if cached_response is not None:
logger.info(
"operator_ai_alert_card_delivery_readback_cache_hit",
project_id=normalized_project_id,
event_type=normalized_event_type,
lane=normalized_lane,
page=normalized_page,
per_page=normalized_per_page,
ttl_seconds=_AI_ALERT_CARD_CACHE_TTL_SECONDS,
)
return cached_response
where_clauses = [
"m.project_id = :project_id",
"m.channel_type = 'telegram'",
"m.source_envelope ? 'ai_automation_alert_card'",
]
params: dict[str, Any] = {
"project_id": normalized_project_id,
"limit": normalized_per_page,
"offset": (normalized_page - 1) * normalized_per_page,
}
if normalized_event_type:
where_clauses.append(
"m.source_envelope #>> '{ai_automation_alert_card,event_type}' = :event_type"
)
params["event_type"] = normalized_event_type
if normalized_lane:
where_clauses.append(
"m.source_envelope #>> '{ai_automation_alert_card,lane}' = :lane"
)
params["lane"] = normalized_lane
where_sql = " AND ".join(where_clauses)
summary_sql = text(f"""
SELECT
COUNT(*) AS total,
COUNT(*) FILTER (WHERE m.send_status = 'sent') AS sent_total,
COUNT(*) FILTER (WHERE m.send_status = 'failed') AS failed_total,
COUNT(*) FILTER (WHERE m.send_status = 'pending') AS pending_total,
COUNT(*) FILTER (WHERE m.send_status = 'shadow') AS shadow_total,
COUNT(*) FILTER (
WHERE COALESCE(
m.source_envelope #>>
'{{ai_automation_alert_card,delivery_receipt_readback_required}}',
''
) = 'true'
) AS delivery_receipt_required_total,
COUNT(*) FILTER (
WHERE COALESCE(
m.source_envelope #>>
'{{ai_automation_alert_card,runtime_write_gate_count}}',
'0'
) <> '0'
) AS runtime_write_gate_open_count,
MAX(m.sent_at) AS latest_sent_at,
MAX(m.queued_at) AS latest_queued_at
FROM awooop_outbound_message m
WHERE {where_sql}
""")
list_sql = text(f"""
SELECT
m.message_id,
m.project_id,
m.run_id,
m.channel_type,
m.message_type,
m.provider_message_id,
m.send_status,
m.send_error,
m.queued_at,
m.sent_at,
m.triggered_by_state,
m.source_envelope -> 'ai_automation_alert_card' AS alert_card,
m.source_envelope -> 'source_refs' AS source_refs,
r.agent_id,
r.state AS run_state,
r.created_at AS run_created_at
FROM awooop_outbound_message m
LEFT JOIN awooop_run_state r
ON r.project_id = m.project_id
AND r.run_id = m.run_id
WHERE {where_sql}
ORDER BY COALESCE(m.sent_at, m.queued_at) DESC, m.message_id DESC
LIMIT :limit OFFSET :offset
""")
async with get_db_context(normalized_project_id) as db:
summary_result = await db.execute(summary_sql, params)
summary_row = summary_result.mappings().first() or {}
rows_result = await db.execute(list_sql, params)
rows = list(rows_result.mappings().all())
summary = _ai_alert_card_delivery_summary_from_row(
summary_row,
project_id=normalized_project_id,
event_type=normalized_event_type or None,
lane=normalized_lane or None,
)
response = {
"items": [_ai_alert_card_delivery_item(row) for row in rows],
"total": summary["total"],
"page": normalized_page,
"per_page": normalized_per_page,
"summary": summary,
}
logger.info(
"operator_ai_alert_card_delivery_readback_fetched",
project_id=normalized_project_id,
event_type=normalized_event_type,
lane=normalized_lane,
page=normalized_page,
per_page=normalized_per_page,
total=summary["total"],
cache_status="miss",
cache_ttl_seconds=_AI_ALERT_CARD_CACHE_TTL_SECONDS,
)
return await store_operator_summary_async(
"ai_alert_card_delivery_readback",
cache_key,
response,
ttl_seconds=_AI_ALERT_CARD_CACHE_TTL_SECONDS,
)
def _ai_alert_card_delivery_summary_from_row(
row: Mapping[str, Any],
*,
project_id: str,
event_type: str | None,
lane: str | None,
) -> dict[str, Any]:
"""Normalize AI alert card delivery summary counts."""
total = _safe_int(row.get("total"))
sent_total = _safe_int(row.get("sent_total"))
failed_total = _safe_int(row.get("failed_total"))
pending_total = _safe_int(row.get("pending_total"))
shadow_total = _safe_int(row.get("shadow_total"))
runtime_write_gate_open_count = _safe_int(
row.get("runtime_write_gate_open_count")
)
status_value = "no_delivery_receipt" if total == 0 else "observed"
if failed_total > 0:
status_value = "delivery_failure_observed"
elif pending_total > 0:
status_value = "delivery_pending_observed"
return {
"schema_version": "awooop_ai_alert_card_delivery_readback_v1",
"project_id": project_id,
"event_type": event_type,
"lane": lane,
"status": status_value,
"total": total,
"sent_total": sent_total,
"failed_total": failed_total,
"pending_total": pending_total,
"shadow_total": shadow_total,
"delivery_receipt_required_total": _safe_int(
row.get("delivery_receipt_required_total")
),
"runtime_write_gate_open_count": runtime_write_gate_open_count,
"runtime_write_allowed": runtime_write_gate_open_count > 0,
"latest_sent_at": row.get("latest_sent_at"),
"latest_queued_at": row.get("latest_queued_at"),
"production_write_count": 0,
}
def _ai_alert_card_delivery_item(row: Mapping[str, Any]) -> dict[str, Any]:
"""Convert one AI alert-card outbound mirror row into delivery evidence."""
alert_card = _as_dict(row.get("alert_card"))
source_refs = _as_dict(row.get("source_refs"))
run_id = row.get("run_id")
project_id = str(row.get("project_id") or "")
runtime_write_gate_count = _safe_int(
alert_card.get("runtime_write_gate_count")
)
event_at = row.get("sent_at") or row.get("queued_at")
return {
"message_id": row.get("message_id"),
"run_id": run_id,
"project_id": project_id,
"event_at": event_at,
"channel_type": row.get("channel_type"),
"message_type": row.get("message_type"),
"send_status": row.get("send_status"),
"send_error": row.get("send_error"),
"provider_message_id": row.get("provider_message_id"),
"triggered_by_state": row.get("triggered_by_state"),
"event_type": str(alert_card.get("event_type") or ""),
"lane": str(alert_card.get("lane") or ""),
"target": str(alert_card.get("target") or ""),
"gates": alert_card.get("gates") if isinstance(alert_card.get("gates"), list) else [],
"runtime_write_gate_count": runtime_write_gate_count,
"runtime_write_allowed": runtime_write_gate_count > 0,
"candidate_only": bool(alert_card.get("candidate_only")),
"delivery_receipt_readback_required": bool(
alert_card.get("delivery_receipt_readback_required")
),
"source_refs": source_refs,
"run_state": row.get("run_state"),
"agent_id": row.get("agent_id"),
"run_created_at": row.get("run_created_at"),
"run_detail_href": (
f"/awooop/runs/{run_id}?project_id={project_id}"
if run_id and project_id
else None
),
}
async def _fetch_callback_reply_audit_summary(
db: Any,
*,