diff --git a/apps/api/src/api/v1/sentry_webhook.py b/apps/api/src/api/v1/sentry_webhook.py index 6d0c329b..1a0056b3 100644 --- a/apps/api/src/api/v1/sentry_webhook.py +++ b/apps/api/src/api/v1/sentry_webhook.py @@ -35,6 +35,7 @@ from src.models.approval import ( ) from src.services.anomaly_counter import get_anomaly_counter from src.services.approval_db import get_approval_service +from src.services.channel_hub import record_external_alert_event from src.services.openclaw_http_service import get_openclaw_http_service from src.services.sentry_service import get_sentry_service # 2026-04-27 P3.1-T2 by Claude — Tier-2 三服務感知強化:補 SentryWebhookService 簽章驗證 @@ -124,16 +125,60 @@ async def handle_sentry_error( # 提取錯誤資訊 issue_data = payload.get("data", {}).get("issue", {}) + event_data = payload.get("data", {}).get("event", {}) + issue_id = issue_data.get("id") + source_url = ( + issue_data.get("permalink") + or issue_data.get("web_url") + or issue_data.get("url") + ) + + background_tasks.add_task( + record_external_alert_event, + project_id="awoooi", + provider="sentry", + event_id=str(issue_id or issue_data.get("shortId") or "unknown"), + stage="received", + title=str(issue_data.get("title") or "Sentry issue"), + severity=str(issue_data.get("level") or "error"), + namespace="sentry", + target_resource=str(issue_data.get("culprit") or issue_data.get("project", {}).get("slug") or "unknown"), + fingerprint=f"sentry-{issue_id or issue_data.get('shortId') or 'unknown'}", + source_url=source_url, + labels={ + "project": issue_data.get("project", {}), + "level": issue_data.get("level"), + "culprit": issue_data.get("culprit"), + }, + annotations={"message": event_data.get("message")}, + payload=payload, + ) # Phase 10.2.1: 去重檢查 (10 分鐘內不重複發送) - issue_id = issue_data.get("id") sentry_service = get_sentry_service() if not await sentry_service.check_dedup(issue_id, ttl=SENTRY_DEDUP_TTL): + background_tasks.add_task( + record_external_alert_event, + project_id="awoooi", + provider="sentry", + event_id=str(issue_id or issue_data.get("shortId") or "unknown"), + stage="deduplicated", + title=str(issue_data.get("title") or "Sentry issue"), + severity=str(issue_data.get("level") or "error"), + namespace="sentry", + target_resource=str(issue_data.get("culprit") or issue_data.get("project", {}).get("slug") or "unknown"), + fingerprint=f"sentry-{issue_id or issue_data.get('shortId') or 'unknown'}", + source_url=source_url, + labels={"project": issue_data.get("project", {}), "level": issue_data.get("level")}, + annotations={"message": event_data.get("message")}, + payload={"dedup_ttl": SENTRY_DEDUP_TTL}, + is_duplicate=True, + ) return {"status": "deduplicated", "issue_id": issue_id, "ttl": SENTRY_DEDUP_TTL} - event_data = payload.get("data", {}).get("event", {}) error_context = { "issue_id": issue_data.get("id"), + "source_url": source_url, "title": issue_data.get("title"), "culprit": issue_data.get("culprit"), "level": issue_data.get("level"), @@ -256,6 +301,29 @@ async def analyze_and_comment( analysis=analysis, anomaly_frequency=frequency_dict, ) + await record_external_alert_event( + project_id="awoooi", + provider="sentry", + event_id=str(issue_id or error_context.get("issue_id") or "unknown"), + stage="approval_linked", + title=str(error_context.get("title") or "Sentry issue"), + severity=str(error_context.get("level") or "error"), + namespace="sentry", + target_resource=str(error_context.get("culprit") or error_context.get("project") or "unknown"), + fingerprint=f"sentry-{issue_id or error_context.get('issue_id') or 'unknown'}", + approval_id=approval_id, + source_url=error_context.get("source_url"), + labels={ + "project": error_context.get("project"), + "level": error_context.get("level"), + }, + annotations={"message": error_context.get("message")}, + payload={ + "anomaly_frequency": frequency_dict, + "ai_analyzed": analysis is not None, + "ai_provider": analysis.analyzed_by if analysis else None, + }, + ) # 4. 發送 Telegram 告警 (含頻率資訊) await send_sentry_telegram_alert( diff --git a/apps/api/src/api/v1/signoz_webhook.py b/apps/api/src/api/v1/signoz_webhook.py index 44152a0f..e60d6219 100644 --- a/apps/api/src/api/v1/signoz_webhook.py +++ b/apps/api/src/api/v1/signoz_webhook.py @@ -18,6 +18,7 @@ AWOOOI API - SignOz Webhook Handler """ import uuid +from typing import TYPE_CHECKING import structlog from fastapi import APIRouter, BackgroundTasks, HTTPException, Request @@ -37,10 +38,14 @@ from src.models.approval import ( ) from src.services.anomaly_counter import get_anomaly_counter from src.services.approval_db import get_approval_service +from src.services.channel_hub import record_external_alert_event from src.services.incident_service import get_incident_service from src.services.telegram_gateway import get_telegram_gateway from src.utils.timezone import now_taipei_iso +if TYPE_CHECKING: + from src.services.openclaw import LLMAnalysisResult + logger = structlog.get_logger(__name__) router = APIRouter(prefix="/webhooks/signoz", tags=["SignOz Webhook"]) @@ -104,6 +109,26 @@ async def handle_signoz_alert( labels = alert.get("labels", {}) annotations = alert.get("annotations", {}) severity = labels.get("severity", "warning") + source_url = alert.get("generatorURL") + service_name = labels.get("service_name", labels.get("service", "unknown")) + fingerprint = labels.get("fingerprint") or f"signoz-{alert_name}-{service_name}" + + background_tasks.add_task( + record_external_alert_event, + project_id="awoooi", + provider="signoz", + event_id=str(fingerprint), + stage="received", + title=str(alert_name), + severity=str(severity), + namespace=str(labels.get("namespace", "signoz")), + target_resource=str(service_name), + fingerprint=str(fingerprint), + source_url=source_url, + labels=labels, + annotations=annotations, + payload=alert, + ) # 背景處理 background_tasks.add_task( @@ -113,6 +138,8 @@ async def handle_signoz_alert( annotations=annotations, severity=severity, starts_at=alert.get("startsAt"), + source_url=source_url, + raw_payload=alert, ) results.append({ @@ -133,6 +160,8 @@ async def process_signoz_alert( annotations: dict, severity: str, starts_at: str | None, + source_url: str | None = None, + raw_payload: dict | None = None, ): """ 背景處理 SignOz 告警 @@ -190,6 +219,7 @@ async def process_signoz_alert( "annotations": annotations, "fingerprint": f"signoz-{alert_name}-{labels.get('service_name', 'unknown')}", } + fingerprint = signal_data["fingerprint"] # ADR-037: 傳遞頻率統計到 Incident incident = await incident_service.create_incident_from_signal( signal_data, frequency_stats=anomaly_frequency @@ -229,6 +259,30 @@ async def process_signoz_alert( anomaly_frequency=anomaly_frequency, analysis_result=analysis_result, # 帶入 AI 結果 ) + await record_external_alert_event( + project_id="awoooi", + provider="signoz", + event_id=str(fingerprint), + stage="incident_linked", + title=str(alert_name), + severity=str(severity), + namespace=str(labels.get("namespace", "signoz")), + target_resource=str(labels.get("service_name", labels.get("service", "unknown"))), + fingerprint=str(fingerprint), + incident_id=str(incident.incident_id), + approval_id=str(approval_id), + source_url=source_url or trace_url, + labels=labels, + annotations=annotations, + payload={ + "raw_alert": raw_payload or {}, + "trace_url": trace_url, + "has_signoz_metrics": bool(signoz_metrics), + "ai_provider": ai_provider, + "tokens": tokens, + "cost": cost, + }, + ) # ================================================================= # Step 5: 發送 Telegram 告警 diff --git a/apps/api/src/api/v1/webhooks.py b/apps/api/src/api/v1/webhooks.py index 795dbbe6..2beafd26 100644 --- a/apps/api/src/api/v1/webhooks.py +++ b/apps/api/src/api/v1/webhooks.py @@ -1741,6 +1741,8 @@ async def _process_new_alert_background( incident_id=incident_id, approval_id=str(approval.id), repeat_count=1, + labels=traced_alert_labels, + annotations=alert_context.get("annotations", {}), ) if _cs2_auto_approval is not None and _cs2_exec_success is not None: @@ -2017,6 +2019,8 @@ async def _process_new_alert_background( incident_id=incident_id, approval_id=str(approval.id), repeat_count=1, + labels=traced_alert_labels, + annotations=alert_context.get("annotations", {}), ) if _cs3_auto_approval is not None and _cs3_exec_success is not None: @@ -2197,6 +2201,8 @@ async def _process_new_alert_background( incident_id=fallback_incident_id, approval_id=str(approval.id), repeat_count=1, + labels=traced_alert_labels, + annotations=alert_context.get("annotations", {}), ) await _push_to_telegram_background( @@ -2457,6 +2463,9 @@ async def alertmanager_webhook( stage="received", notification_type=notification_type, alert_category=alert_category, + source_url=alert.generatorURL, + labels=dict(alert.labels) if alert.labels else {}, + annotations=dict(alert.annotations) if alert.annotations else {}, ) # ========================================================================== @@ -2556,6 +2565,9 @@ async def alertmanager_webhook( approval_id=str(updated_approval.id), repeat_count=updated_approval.hit_count, is_duplicate=True, + source_url=alert.generatorURL, + labels=dict(alert.labels) if alert.labels else {}, + annotations=dict(alert.annotations) if alert.annotations else {}, ) return AlertResponse( @@ -2601,6 +2613,9 @@ async def alertmanager_webhook( notification_type="TYPE-1", alert_category=alert_category, incident_id=_info_incident_id, + source_url=alert.generatorURL, + labels={**alert.labels, "fingerprint": fingerprint, "alert_id": alert_id}, + annotations=dict(alert.annotations) if alert.annotations else {}, ) # 2026-04-15 ogt: TYPE-1 純資訊告警建立後立即關閉 # 設計原則: backup/heartbeat/info 告警無需追蹤狀態,通知即完成 @@ -2646,6 +2661,9 @@ async def alertmanager_webhook( notification_type=notification_type, alert_category=alert_category, is_duplicate=True, + source_url=alert.generatorURL, + labels=dict(alert.labels) if alert.labels else {}, + annotations=dict(alert.annotations) if alert.annotations else {}, ) return AlertResponse( success=True, diff --git a/apps/api/src/db/awooop_models.py b/apps/api/src/db/awooop_models.py index d04b1d57..50fda76a 100644 --- a/apps/api/src/db/awooop_models.py +++ b/apps/api/src/db/awooop_models.py @@ -635,6 +635,13 @@ class AwoooPConversationEvent(Base): content_type: Mapped[str] = mapped_column(String(32), nullable=False, default="text") content_hash: Mapped[str | None] = mapped_column(String(64), nullable=True) content_preview: Mapped[str | None] = mapped_column(String(256), nullable=True) + content_redacted: Mapped[str | None] = mapped_column(Text, nullable=True) + redaction_version: Mapped[str] = mapped_column( + String(32), nullable=False, server_default=text("'audit_sink_v1'") + ) + source_envelope: Mapped[dict[str, Any]] = mapped_column( + JSONB, nullable=False, server_default=text("'{}'::jsonb") + ) attachment_sha256: Mapped[str | None] = mapped_column(String(64), nullable=True) is_duplicate: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) provider_ts: Mapped[datetime | None] = mapped_column(nullable=True) diff --git a/apps/api/src/services/awooop_truth_chain_service.py b/apps/api/src/services/awooop_truth_chain_service.py index efbd546d..251dcef9 100644 --- a/apps/api/src/services/awooop_truth_chain_service.py +++ b/apps/api/src/services/awooop_truth_chain_service.py @@ -1177,6 +1177,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ if incident_fingerprints else "" ) + fingerprint_value = incident_fingerprints[0] if incident_fingerprints else "" inbound_rows = await _fetch_all( db, """ @@ -1192,6 +1193,9 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ content_type, content_hash, content_preview, + content_redacted, + redaction_version, + source_envelope, attachment_sha256, is_duplicate, provider_ts, @@ -1202,11 +1206,18 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ run_id::text = :source_id OR provider_event_id = :source_id OR content_preview ILIKE :source_needle + OR coalesce(source_envelope #> '{source_refs,event_ids}', '[]'::jsonb) ? :source_id + OR coalesce(source_envelope #> '{source_refs,incident_ids}', '[]'::jsonb) ? :source_id + OR coalesce(source_envelope #> '{source_refs,approval_ids}', '[]'::jsonb) ? :source_id + OR coalesce(source_envelope #> '{source_refs,alert_ids}', '[]'::jsonb) ? :source_id + OR coalesce(source_envelope #> '{source_refs,sentry_issue_ids}', '[]'::jsonb) ? :source_id + OR coalesce(source_envelope #> '{source_refs,signoz_alerts}', '[]'::jsonb) ? :source_id OR ( :fingerprint_needle != '' AND ( provider_event_id ILIKE :fingerprint_needle OR content_preview ILIKE :fingerprint_needle + OR coalesce(source_envelope #> '{source_refs,fingerprints}', '[]'::jsonb) ? :fingerprint_value ) ) ) @@ -1218,6 +1229,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ "project_id": project_id, "source_needle": f"%{source_id}%", "fingerprint_needle": fingerprint_needle, + "fingerprint_value": fingerprint_value, "limit": _MAX_ROWS, }, ) @@ -1312,7 +1324,14 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ "project_id": project_id, "source_id": source_id, "source_type": source_type, - "found": incident is not None or drift is not None or bool(runs) or bool(gateway_mcp_rows), + "found": ( + incident is not None + or drift is not None + or bool(runs) + or bool(gateway_mcp_rows) + or bool(inbound_rows) + or bool(outbound_rows) + ), "truth_status": truth_status, "linked_ids": { "incident_id": incident.get("incident_id") if incident else None, @@ -1321,6 +1340,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ "drift_report_id": drift.get("report_id") if drift else None, "operation_ids": _operation_ids(automation_ops), "auto_repair_execution_ids": _auto_repair_ids(auto_repair_executions), + "conversation_event_ids": [row["event_id"] for row in inbound_rows], }, "incident": incident, "drift": { diff --git a/apps/api/src/services/channel_hub.py b/apps/api/src/services/channel_hub.py index f270e123..8c9d656f 100644 --- a/apps/api/src/services/channel_hub.py +++ b/apps/api/src/services/channel_hub.py @@ -30,6 +30,7 @@ import asyncio import hashlib import html import json +import re from datetime import UTC, datetime from typing import Any from uuid import NAMESPACE_URL, UUID, uuid5 @@ -46,7 +47,9 @@ logger = structlog.get_logger(__name__) # Progressive Feedback Policy:等待超過此秒數才發 interim 訊息 _INTERIM_WAIT_SECONDS = 30 +_INBOUND_REDACTION_VERSION = "audit_sink_v1" _OUTBOUND_REDACTION_VERSION = "audit_sink_v1" +_INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{6}\b") def _db_timestamp_now() -> datetime: @@ -54,6 +57,75 @@ def _db_timestamp_now() -> datetime: return datetime.now(UTC).replace(tzinfo=None) +def _compact_unique(values: list[str | None], *, limit: int = 20) -> list[str]: + """Return stable non-empty values without leaking duplicate source refs.""" + return sorted({str(value).strip() for value in values if str(value or "").strip()})[:limit] + + +def build_inbound_source_envelope( + *, + provider: str, + stage: str, + provider_event_id: str, + raw_event_id: str | None = None, + raw_content: str | None = None, + alertname: str | None = None, + severity: str | None = None, + namespace: str | None = None, + target_resource: str | None = None, + fingerprint: str | None = None, + incident_id: str | None = None, + approval_id: str | None = None, + source_url: str | None = None, + labels: dict[str, Any] | None = None, + annotations: dict[str, Any] | None = None, + extra: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Build a redaction-friendly inbound replay envelope for truth-chain use.""" + content_sha256 = hashlib.sha256(raw_content.encode()).hexdigest() if raw_content else None + text_refs = _INCIDENT_ID_RE.findall(raw_content or "") + provider_name = str(provider or "unknown").strip().lower() or "unknown" + source_refs = { + "event_ids": _compact_unique([raw_event_id]), + "incident_ids": _compact_unique([incident_id, *text_refs]), + "approval_ids": _compact_unique([approval_id]), + "alert_ids": _compact_unique([provider_event_id, raw_event_id]), + "fingerprints": _compact_unique([fingerprint]), + "sentry_issue_ids": _compact_unique( + [raw_event_id, provider_event_id] if provider_name == "sentry" else [] + ), + "signoz_alerts": _compact_unique( + [raw_event_id, alertname] if provider_name == "signoz" else [] + ), + } + envelope: dict[str, Any] = { + "schema_version": "inbound_source_envelope_v1", + "redaction_version": _INBOUND_REDACTION_VERSION, + "adapter": f"{provider_name}_webhook", + "provider": provider_name, + "stage": stage, + "provider_event_id": provider_event_id, + "source_url": source_url, + "content_sha256": content_sha256, + "content_length": len(raw_content) if raw_content is not None else 0, + "source_refs": source_refs, + "log_correlation": { + "alertname": alertname, + "severity": severity, + "namespace": namespace, + "target_resource": target_resource, + "fingerprint": fingerprint, + }, + "labels": labels or {}, + "annotations": annotations or {}, + } + if extra: + envelope["extra"] = extra + sanitized = sanitize(envelope) + sanitized["content_sha256"] = content_sha256 + return sanitized + + def _input_sha256(input_payload: dict[str, Any] | None) -> str | None: """計算 Run input 的穩定 hash,讓 mirror run 也能保留最小完整性證據。""" if not input_payload: @@ -140,6 +212,19 @@ def build_alertmanager_run_id(project_id: str, provider_event_id: str) -> UUID: return uuid5(NAMESPACE_URL, f"awooop:alertmanager:{project_id}:{provider_event_id}") +def build_external_alert_provider_event_id(provider: str, event_id: str, stage: str) -> str: + """建立 Sentry/SignOz 等外部告警 inbound event 的冪等 provider_event_id。""" + safe_provider = str(provider).strip().lower()[:32] or "external" + safe_event_id = str(event_id).strip()[:96] or "unknown" + safe_stage = str(stage).strip()[:32] or "received" + return f"{safe_provider}:{safe_stage}:{safe_event_id}" + + +def build_external_alert_run_id(project_id: str, provider_event_id: str) -> UUID: + """為外部告警 inbound mirror 建立穩定 shadow run_id。""" + return uuid5(NAMESPACE_URL, f"awooop:external-alert:{project_id}:{provider_event_id}") + + # ───────────────────────────────────────────────────────────────────────────── # 入站事件記錄 # ───────────────────────────────────────────────────────────────────────────── @@ -155,6 +240,7 @@ async def mirror_inbound_event( channel_chat_id: str | None = None, content_type: str = "text", raw_content: str | None = None, + source_envelope: dict[str, Any] | None = None, attachment_sha256: str | None = None, provider_ts: datetime | None = None, run_id: UUID | None = None, @@ -168,12 +254,32 @@ async def mirror_inbound_event( """ content_hash: str | None = None content_preview: str | None = None + content_redacted: str | None = None if raw_content is not None: content_hash = hashlib.sha256(raw_content.encode()).hexdigest() # preview:redact 後截取前 256 字元 - redacted = _redact_string(raw_content) - content_preview = redacted[:256] if len(redacted) > 256 else redacted + content_redacted = _redact_string(raw_content) + content_preview = ( + content_redacted[:256] if len(content_redacted) > 256 else content_redacted + ) + + if source_envelope and source_envelope.get("schema_version") == "inbound_source_envelope_v1": + original_content_sha256 = source_envelope.get("content_sha256") + envelope = sanitize(source_envelope) + envelope.setdefault("redaction_version", _INBOUND_REDACTION_VERSION) + envelope["content_sha256"] = content_hash or original_content_sha256 + envelope.setdefault("content_length", len(raw_content) if raw_content is not None else 0) + else: + envelope = build_inbound_source_envelope( + provider=channel_type, + stage="received", + provider_event_id=provider_event_id, + raw_event_id=provider_event_id, + raw_content=raw_content, + extra=source_envelope, + ) + source_envelope_json = json.dumps(envelope, ensure_ascii=False, default=str) result = await db.execute( text(""" @@ -181,16 +287,28 @@ async def mirror_inbound_event( project_id, channel_type, provider_event_id, platform_subject_id, channel_user_id, channel_chat_id, run_id, content_type, content_hash, content_preview, + content_redacted, redaction_version, source_envelope, attachment_sha256, is_duplicate, provider_ts, received_at ) VALUES ( :project_id, :channel_type, :provider_event_id, :platform_subject_id, :channel_user_id, :channel_chat_id, :run_id, :content_type, :content_hash, :content_preview, + :content_redacted, :redaction_version, CAST(:source_envelope AS jsonb), :attachment_sha256, :is_duplicate, :provider_ts, NOW() ) ON CONFLICT (project_id, channel_type, provider_event_id) DO UPDATE SET is_duplicate = TRUE, - run_id = COALESCE(EXCLUDED.run_id, awooop_conversation_event.run_id) + run_id = COALESCE(EXCLUDED.run_id, awooop_conversation_event.run_id), + content_redacted = COALESCE( + awooop_conversation_event.content_redacted, + EXCLUDED.content_redacted + ), + redaction_version = EXCLUDED.redaction_version, + source_envelope = CASE + WHEN awooop_conversation_event.source_envelope = '{}'::jsonb + THEN EXCLUDED.source_envelope + ELSE awooop_conversation_event.source_envelope + END RETURNING event_id """), { @@ -204,6 +322,9 @@ async def mirror_inbound_event( "content_type": content_type, "content_hash": content_hash, "content_preview": content_preview, + "content_redacted": content_redacted, + "redaction_version": _INBOUND_REDACTION_VERSION, + "source_envelope": source_envelope_json, "attachment_sha256": attachment_sha256, "is_duplicate": is_duplicate, "provider_ts": provider_ts, @@ -514,6 +635,10 @@ async def record_alertmanager_event( approval_id: str | None = None, repeat_count: int | None = None, is_duplicate: bool = False, + source_url: str | None = None, + labels: dict[str, Any] | None = None, + annotations: dict[str, Any] | None = None, + source_extra: dict[str, Any] | None = None, ) -> UUID | None: """ 將 Alertmanager inbound alert 鏡像到 AwoooP conversation_event。 @@ -546,6 +671,29 @@ async def record_alertmanager_event( approval_id=approval_ref, repeat_count=repeat_count, ) + source_envelope = build_inbound_source_envelope( + provider="alertmanager", + stage=stage, + provider_event_id=provider_event_id, + raw_event_id=alert_id, + raw_content=content, + alertname=alertname, + severity=severity, + namespace=namespace, + target_resource=target_resource, + fingerprint=fingerprint, + incident_id=incident_ref, + approval_id=approval_ref, + source_url=source_url, + labels=labels, + annotations=annotations, + extra={ + "notification_type": notification_type, + "alert_category": alert_category, + "repeat_count": repeat_count, + **(source_extra or {}), + }, + ) async with get_db_context(project_id) as db: run_id = build_alertmanager_run_id(project_id, provider_event_id) @@ -581,6 +729,7 @@ async def record_alertmanager_event( channel_chat_id=f"alertmanager:{namespace or 'default'}", content_type="text", raw_content=content, + source_envelope=source_envelope, provider_ts=_db_timestamp_now(), run_id=run_id, is_duplicate=is_duplicate, @@ -608,6 +757,129 @@ async def record_alertmanager_event( return None +async def record_external_alert_event( + *, + project_id: str, + provider: str, + event_id: str, + stage: str, + title: str, + severity: str, + namespace: str | None = None, + target_resource: str | None = None, + fingerprint: str | None = None, + incident_id: str | None = None, + approval_id: str | None = None, + source_url: str | None = None, + labels: dict[str, Any] | None = None, + annotations: dict[str, Any] | None = None, + payload: dict[str, Any] | None = None, + is_duplicate: bool = False, +) -> UUID | None: + """ + 將 Sentry / SignOz 等非 Alertmanager 告警鏡像到 conversation_event。 + + 這是 truth-chain 的最低共用入口:只寫 redacted content + source_envelope, + 不改變原本 webhook 的通知、審批或自動化行為。 + """ + provider_name = str(provider or "external").strip().lower() or "external" + provider_event_id = build_external_alert_provider_event_id(provider_name, event_id, stage) + content = "\n".join([ + f"{provider_name} inbound {stage}", + f"Event ID: {event_id}", + f"Title: {title}", + f"Severity: {severity}", + f"Namespace: {namespace or '-'}", + f"Target: {target_resource or '-'}", + f"Fingerprint: {fingerprint or '-'}", + f"Incident: {incident_id or '-'}", + f"Approval: {approval_id or '-'}", + f"Source URL: {source_url or '-'}", + ]) + source_envelope = build_inbound_source_envelope( + provider=provider_name, + stage=stage, + provider_event_id=provider_event_id, + raw_event_id=event_id, + raw_content=content, + alertname=title, + severity=severity, + namespace=namespace, + target_resource=target_resource, + fingerprint=fingerprint, + incident_id=str(incident_id) if incident_id else None, + approval_id=str(approval_id) if approval_id else None, + source_url=source_url, + labels=labels, + annotations=annotations, + extra={ + "payload": payload or {}, + }, + ) + + try: + from src.db.base import get_db_context + + async with get_db_context(project_id) as db: + run_id = build_external_alert_run_id(project_id, provider_event_id) + await ensure_completed_shadow_run( + db, + project_id=project_id, + run_id=run_id, + agent_id=f"legacy-{provider_name}-webhook", + trigger_type=f"{provider_name}_inbound", + trigger_ref=provider_event_id, + input_payload={ + "provider": provider_name, + "event_id": event_id, + "stage": stage, + "severity": severity, + "namespace": namespace, + "target_resource": target_resource, + "fingerprint": fingerprint, + "incident_id": str(incident_id) if incident_id else None, + "approval_id": str(approval_id) if approval_id else None, + }, + ) + event_uuid = await mirror_inbound_event( + db, + project_id=project_id, + channel_type="internal", + provider_event_id=provider_event_id, + platform_subject_id=provider_name, + channel_user_id=provider_name, + channel_chat_id=f"{provider_name}:{namespace or 'default'}", + content_type="text", + raw_content=content, + source_envelope=source_envelope, + provider_ts=_db_timestamp_now(), + run_id=run_id, + is_duplicate=is_duplicate, + ) + + logger.info( + "external_alert_event_recorded", + project_id=project_id, + provider=provider_name, + event_id=event_id, + stage=stage, + conversation_event_id=str(event_uuid), + incident_id=str(incident_id) if incident_id else None, + approval_id=str(approval_id) if approval_id else None, + ) + return event_uuid + except Exception as exc: + logger.warning( + "external_alert_event_record_failed", + project_id=project_id, + provider=provider_name, + event_id=event_id, + stage=stage, + error=str(exc), + ) + return None + + # ───────────────────────────────────────────────────────────────────────────── # 出站訊息記錄 # ───────────────────────────────────────────────────────────────────────────── diff --git a/apps/api/tests/test_awooop_truth_chain_service.py b/apps/api/tests/test_awooop_truth_chain_service.py index 6b28e303..26f6caee 100644 --- a/apps/api/tests/test_awooop_truth_chain_service.py +++ b/apps/api/tests/test_awooop_truth_chain_service.py @@ -59,6 +59,17 @@ def test_fetch_truth_chain_can_match_inbound_provider_event_id() -> None: assert "provider_event_id = :source_id" in source +def test_fetch_truth_chain_returns_inbound_redacted_envelope_fields() -> None: + source = inspect.getsource(fetch_truth_chain) + + assert "content_redacted" in source + assert "source_envelope" in source + assert "source_refs,event_ids" in source + assert "source_refs,incident_ids" in source + assert "source_refs,sentry_issue_ids" in source + assert "source_refs,signoz_alerts" in source + + def test_truth_status_marks_no_action_approval_as_manual_required() -> None: status = _truth_status( incident={"incident_id": "INC-1", "status": "INVESTIGATING"}, diff --git a/apps/api/tests/test_channel_hub_grouped_alert_events.py b/apps/api/tests/test_channel_hub_grouped_alert_events.py index 8c55baed..4afb58e2 100644 --- a/apps/api/tests/test_channel_hub_grouped_alert_events.py +++ b/apps/api/tests/test_channel_hub_grouped_alert_events.py @@ -1,15 +1,20 @@ from __future__ import annotations +import json + from src.services.channel_hub import ( _db_timestamp_now, build_alertmanager_provider_event_id, build_alertmanager_run_id, + build_external_alert_provider_event_id, build_grouped_alert_provider_event_id, build_grouped_alert_run_id, + build_inbound_source_envelope, ensure_completed_shadow_run, format_alertmanager_event_content, format_grouped_alert_digest_text, format_grouped_alert_event_content, + mirror_inbound_event, record_outbound_message, ) @@ -68,6 +73,46 @@ def test_build_alertmanager_run_id_is_stable() -> None: ) +def test_build_external_alert_provider_event_id_is_stable() -> None: + event_id = build_external_alert_provider_event_id( + "Sentry", + "12345678901234567890", + "approval_linked", + ) + + assert event_id == "sentry:approval_linked:12345678901234567890" + assert len(event_id) < 256 + + +def test_build_inbound_source_envelope_redacts_and_keeps_refs() -> None: + envelope = build_inbound_source_envelope( + provider="alertmanager", + stage="received", + provider_event_id="alertmanager:received:alert-1:fp-1", + raw_event_id="alert-1", + raw_content="ACTION REQUIRED INC-20260513-ABCDEF token 1234567890:abcdefghijklmnopqrstuvwxyzABCDEFGH", + alertname="DockerContainerUnhealthy", + severity="warning", + namespace="default", + target_resource="bitan-pharmacy-bitan-1", + fingerprint="fp-1", + incident_id="INC-20260513-ABCDEF", + approval_id="approval-1", + labels={"token": "should-not-leak", "container": "bitan-pharmacy-bitan-1"}, + ) + + rendered = json.dumps(envelope, ensure_ascii=False) + assert envelope["schema_version"] == "inbound_source_envelope_v1" + assert envelope["source_refs"]["event_ids"] == ["alert-1"] + assert envelope["source_refs"]["incident_ids"] == ["INC-20260513-ABCDEF"] + assert envelope["source_refs"]["approval_ids"] == ["approval-1"] + assert envelope["source_refs"]["alert_ids"] == ["alert-1", "alertmanager:received:alert-1:fp-1"] + assert envelope["source_refs"]["fingerprints"] == ["fp-1"] + assert len(envelope["content_sha256"]) == 64 + assert "1234567890:" not in rendered + assert "should-not-leak" not in rendered + + def test_format_alertmanager_event_content_keeps_incident_first() -> None: content = format_alertmanager_event_content( stage="incident_linked", @@ -168,6 +213,67 @@ async def test_completed_shadow_run_sets_run_state_not_null_defaults() -> None: assert session.params["run_id"] == run_id +async def test_mirror_inbound_event_writes_redacted_replay_fields() -> None: + session = _FakeSession() + envelope = build_inbound_source_envelope( + provider="sentry", + stage="received", + provider_event_id="sentry:received:123", + raw_event_id="123", + raw_content="Sentry token 1234567890:abcdefghijklmnopqrstuvwxyzABCDEFGH", + alertname="Sentry issue", + severity="error", + namespace="sentry", + target_resource="frontend", + ) + + await mirror_inbound_event( + session, # type: ignore[arg-type] + project_id="awoooi", + channel_type="internal", + provider_event_id="sentry:received:123", + platform_subject_id="sentry", + raw_content="Sentry token 1234567890:abcdefghijklmnopqrstuvwxyzABCDEFGH", + source_envelope=envelope, + ) + + assert "content_redacted" in session.statement + assert "source_envelope" in session.statement + assert session.params["redaction_version"] == "audit_sink_v1" + assert len(json.loads(session.params["source_envelope"])["content_sha256"]) == 64 + assert "1234567890:" not in session.params["content_redacted"] + assert "1234567890:" not in session.params["source_envelope"] + + +def test_sentry_and_signoz_source_refs_keep_raw_event_ids() -> None: + sentry_envelope = build_inbound_source_envelope( + provider="sentry", + stage="received", + provider_event_id="sentry:received:issue-123", + raw_event_id="issue-123", + raw_content="Sentry issue", + alertname="Sentry issue", + fingerprint="sentry-issue-123", + ) + signoz_envelope = build_inbound_source_envelope( + provider="signoz", + stage="received", + provider_event_id="signoz:received:fp-456", + raw_event_id="fp-456", + raw_content="SignOz alert", + alertname="HighLatency", + fingerprint="fp-456", + ) + + assert sentry_envelope["source_refs"]["event_ids"] == ["issue-123"] + assert sentry_envelope["source_refs"]["sentry_issue_ids"] == [ + "issue-123", + "sentry:received:issue-123", + ] + assert signoz_envelope["source_refs"]["event_ids"] == ["fp-456"] + assert signoz_envelope["source_refs"]["signoz_alerts"] == ["HighLatency", "fp-456"] + + async def test_record_outbound_message_sets_sent_at_for_sent_messages() -> None: session = _FakeSession() run_id = build_grouped_alert_run_id("awoooi", "telegram-message-13152")