feat(awooop): persist inbound source envelopes
All checks were successful
Code Review / ai-code-review (push) Successful in 10s
CD Pipeline / tests (push) Successful in 1m23s
CD Pipeline / build-and-deploy (push) Successful in 3m37s
CD Pipeline / post-deploy-checks (push) Successful in 1m34s

This commit is contained in:
Your Name
2026-05-13 21:29:04 +08:00
parent c888444287
commit 795085170a
8 changed files with 562 additions and 6 deletions

View File

@@ -35,6 +35,7 @@ from src.models.approval import (
)
from src.services.anomaly_counter import get_anomaly_counter
from src.services.approval_db import get_approval_service
from src.services.channel_hub import record_external_alert_event
from src.services.openclaw_http_service import get_openclaw_http_service
from src.services.sentry_service import get_sentry_service
# 2026-04-27 P3.1-T2 by Claude — Tier-2 三服務感知強化:補 SentryWebhookService 簽章驗證
@@ -124,16 +125,60 @@ async def handle_sentry_error(
# 提取錯誤資訊
issue_data = payload.get("data", {}).get("issue", {})
event_data = payload.get("data", {}).get("event", {})
issue_id = issue_data.get("id")
source_url = (
issue_data.get("permalink")
or issue_data.get("web_url")
or issue_data.get("url")
)
background_tasks.add_task(
record_external_alert_event,
project_id="awoooi",
provider="sentry",
event_id=str(issue_id or issue_data.get("shortId") or "unknown"),
stage="received",
title=str(issue_data.get("title") or "Sentry issue"),
severity=str(issue_data.get("level") or "error"),
namespace="sentry",
target_resource=str(issue_data.get("culprit") or issue_data.get("project", {}).get("slug") or "unknown"),
fingerprint=f"sentry-{issue_id or issue_data.get('shortId') or 'unknown'}",
source_url=source_url,
labels={
"project": issue_data.get("project", {}),
"level": issue_data.get("level"),
"culprit": issue_data.get("culprit"),
},
annotations={"message": event_data.get("message")},
payload=payload,
)
# Phase 10.2.1: 去重檢查 (10 分鐘內不重複發送)
issue_id = issue_data.get("id")
sentry_service = get_sentry_service()
if not await sentry_service.check_dedup(issue_id, ttl=SENTRY_DEDUP_TTL):
background_tasks.add_task(
record_external_alert_event,
project_id="awoooi",
provider="sentry",
event_id=str(issue_id or issue_data.get("shortId") or "unknown"),
stage="deduplicated",
title=str(issue_data.get("title") or "Sentry issue"),
severity=str(issue_data.get("level") or "error"),
namespace="sentry",
target_resource=str(issue_data.get("culprit") or issue_data.get("project", {}).get("slug") or "unknown"),
fingerprint=f"sentry-{issue_id or issue_data.get('shortId') or 'unknown'}",
source_url=source_url,
labels={"project": issue_data.get("project", {}), "level": issue_data.get("level")},
annotations={"message": event_data.get("message")},
payload={"dedup_ttl": SENTRY_DEDUP_TTL},
is_duplicate=True,
)
return {"status": "deduplicated", "issue_id": issue_id, "ttl": SENTRY_DEDUP_TTL}
event_data = payload.get("data", {}).get("event", {})
error_context = {
"issue_id": issue_data.get("id"),
"source_url": source_url,
"title": issue_data.get("title"),
"culprit": issue_data.get("culprit"),
"level": issue_data.get("level"),
@@ -256,6 +301,29 @@ async def analyze_and_comment(
analysis=analysis,
anomaly_frequency=frequency_dict,
)
await record_external_alert_event(
project_id="awoooi",
provider="sentry",
event_id=str(issue_id or error_context.get("issue_id") or "unknown"),
stage="approval_linked",
title=str(error_context.get("title") or "Sentry issue"),
severity=str(error_context.get("level") or "error"),
namespace="sentry",
target_resource=str(error_context.get("culprit") or error_context.get("project") or "unknown"),
fingerprint=f"sentry-{issue_id or error_context.get('issue_id') or 'unknown'}",
approval_id=approval_id,
source_url=error_context.get("source_url"),
labels={
"project": error_context.get("project"),
"level": error_context.get("level"),
},
annotations={"message": error_context.get("message")},
payload={
"anomaly_frequency": frequency_dict,
"ai_analyzed": analysis is not None,
"ai_provider": analysis.analyzed_by if analysis else None,
},
)
# 4. 發送 Telegram 告警 (含頻率資訊)
await send_sentry_telegram_alert(

View File

@@ -18,6 +18,7 @@ AWOOOI API - SignOz Webhook Handler
"""
import uuid
from typing import TYPE_CHECKING
import structlog
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request
@@ -37,10 +38,14 @@ from src.models.approval import (
)
from src.services.anomaly_counter import get_anomaly_counter
from src.services.approval_db import get_approval_service
from src.services.channel_hub import record_external_alert_event
from src.services.incident_service import get_incident_service
from src.services.telegram_gateway import get_telegram_gateway
from src.utils.timezone import now_taipei_iso
if TYPE_CHECKING:
from src.services.openclaw import LLMAnalysisResult
logger = structlog.get_logger(__name__)
router = APIRouter(prefix="/webhooks/signoz", tags=["SignOz Webhook"])
@@ -104,6 +109,26 @@ async def handle_signoz_alert(
labels = alert.get("labels", {})
annotations = alert.get("annotations", {})
severity = labels.get("severity", "warning")
source_url = alert.get("generatorURL")
service_name = labels.get("service_name", labels.get("service", "unknown"))
fingerprint = labels.get("fingerprint") or f"signoz-{alert_name}-{service_name}"
background_tasks.add_task(
record_external_alert_event,
project_id="awoooi",
provider="signoz",
event_id=str(fingerprint),
stage="received",
title=str(alert_name),
severity=str(severity),
namespace=str(labels.get("namespace", "signoz")),
target_resource=str(service_name),
fingerprint=str(fingerprint),
source_url=source_url,
labels=labels,
annotations=annotations,
payload=alert,
)
# 背景處理
background_tasks.add_task(
@@ -113,6 +138,8 @@ async def handle_signoz_alert(
annotations=annotations,
severity=severity,
starts_at=alert.get("startsAt"),
source_url=source_url,
raw_payload=alert,
)
results.append({
@@ -133,6 +160,8 @@ async def process_signoz_alert(
annotations: dict,
severity: str,
starts_at: str | None,
source_url: str | None = None,
raw_payload: dict | None = None,
):
"""
背景處理 SignOz 告警
@@ -190,6 +219,7 @@ async def process_signoz_alert(
"annotations": annotations,
"fingerprint": f"signoz-{alert_name}-{labels.get('service_name', 'unknown')}",
}
fingerprint = signal_data["fingerprint"]
# ADR-037: 傳遞頻率統計到 Incident
incident = await incident_service.create_incident_from_signal(
signal_data, frequency_stats=anomaly_frequency
@@ -229,6 +259,30 @@ async def process_signoz_alert(
anomaly_frequency=anomaly_frequency,
analysis_result=analysis_result, # 帶入 AI 結果
)
await record_external_alert_event(
project_id="awoooi",
provider="signoz",
event_id=str(fingerprint),
stage="incident_linked",
title=str(alert_name),
severity=str(severity),
namespace=str(labels.get("namespace", "signoz")),
target_resource=str(labels.get("service_name", labels.get("service", "unknown"))),
fingerprint=str(fingerprint),
incident_id=str(incident.incident_id),
approval_id=str(approval_id),
source_url=source_url or trace_url,
labels=labels,
annotations=annotations,
payload={
"raw_alert": raw_payload or {},
"trace_url": trace_url,
"has_signoz_metrics": bool(signoz_metrics),
"ai_provider": ai_provider,
"tokens": tokens,
"cost": cost,
},
)
# =================================================================
# Step 5: 發送 Telegram 告警

View File

@@ -1741,6 +1741,8 @@ async def _process_new_alert_background(
incident_id=incident_id,
approval_id=str(approval.id),
repeat_count=1,
labels=traced_alert_labels,
annotations=alert_context.get("annotations", {}),
)
if _cs2_auto_approval is not None and _cs2_exec_success is not None:
@@ -2017,6 +2019,8 @@ async def _process_new_alert_background(
incident_id=incident_id,
approval_id=str(approval.id),
repeat_count=1,
labels=traced_alert_labels,
annotations=alert_context.get("annotations", {}),
)
if _cs3_auto_approval is not None and _cs3_exec_success is not None:
@@ -2197,6 +2201,8 @@ async def _process_new_alert_background(
incident_id=fallback_incident_id,
approval_id=str(approval.id),
repeat_count=1,
labels=traced_alert_labels,
annotations=alert_context.get("annotations", {}),
)
await _push_to_telegram_background(
@@ -2457,6 +2463,9 @@ async def alertmanager_webhook(
stage="received",
notification_type=notification_type,
alert_category=alert_category,
source_url=alert.generatorURL,
labels=dict(alert.labels) if alert.labels else {},
annotations=dict(alert.annotations) if alert.annotations else {},
)
# ==========================================================================
@@ -2556,6 +2565,9 @@ async def alertmanager_webhook(
approval_id=str(updated_approval.id),
repeat_count=updated_approval.hit_count,
is_duplicate=True,
source_url=alert.generatorURL,
labels=dict(alert.labels) if alert.labels else {},
annotations=dict(alert.annotations) if alert.annotations else {},
)
return AlertResponse(
@@ -2601,6 +2613,9 @@ async def alertmanager_webhook(
notification_type="TYPE-1",
alert_category=alert_category,
incident_id=_info_incident_id,
source_url=alert.generatorURL,
labels={**alert.labels, "fingerprint": fingerprint, "alert_id": alert_id},
annotations=dict(alert.annotations) if alert.annotations else {},
)
# 2026-04-15 ogt: TYPE-1 純資訊告警建立後立即關閉
# 設計原則: backup/heartbeat/info 告警無需追蹤狀態,通知即完成
@@ -2646,6 +2661,9 @@ async def alertmanager_webhook(
notification_type=notification_type,
alert_category=alert_category,
is_duplicate=True,
source_url=alert.generatorURL,
labels=dict(alert.labels) if alert.labels else {},
annotations=dict(alert.annotations) if alert.annotations else {},
)
return AlertResponse(
success=True,

View File

@@ -635,6 +635,13 @@ class AwoooPConversationEvent(Base):
content_type: Mapped[str] = mapped_column(String(32), nullable=False, default="text")
content_hash: Mapped[str | None] = mapped_column(String(64), nullable=True)
content_preview: Mapped[str | None] = mapped_column(String(256), nullable=True)
content_redacted: Mapped[str | None] = mapped_column(Text, nullable=True)
redaction_version: Mapped[str] = mapped_column(
String(32), nullable=False, server_default=text("'audit_sink_v1'")
)
source_envelope: Mapped[dict[str, Any]] = mapped_column(
JSONB, nullable=False, server_default=text("'{}'::jsonb")
)
attachment_sha256: Mapped[str | None] = mapped_column(String(64), nullable=True)
is_duplicate: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
provider_ts: Mapped[datetime | None] = mapped_column(nullable=True)

View File

@@ -1177,6 +1177,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
if incident_fingerprints
else ""
)
fingerprint_value = incident_fingerprints[0] if incident_fingerprints else ""
inbound_rows = await _fetch_all(
db,
"""
@@ -1192,6 +1193,9 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
content_type,
content_hash,
content_preview,
content_redacted,
redaction_version,
source_envelope,
attachment_sha256,
is_duplicate,
provider_ts,
@@ -1202,11 +1206,18 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
run_id::text = :source_id
OR provider_event_id = :source_id
OR content_preview ILIKE :source_needle
OR coalesce(source_envelope #> '{source_refs,event_ids}', '[]'::jsonb) ? :source_id
OR coalesce(source_envelope #> '{source_refs,incident_ids}', '[]'::jsonb) ? :source_id
OR coalesce(source_envelope #> '{source_refs,approval_ids}', '[]'::jsonb) ? :source_id
OR coalesce(source_envelope #> '{source_refs,alert_ids}', '[]'::jsonb) ? :source_id
OR coalesce(source_envelope #> '{source_refs,sentry_issue_ids}', '[]'::jsonb) ? :source_id
OR coalesce(source_envelope #> '{source_refs,signoz_alerts}', '[]'::jsonb) ? :source_id
OR (
:fingerprint_needle != ''
AND (
provider_event_id ILIKE :fingerprint_needle
OR content_preview ILIKE :fingerprint_needle
OR coalesce(source_envelope #> '{source_refs,fingerprints}', '[]'::jsonb) ? :fingerprint_value
)
)
)
@@ -1218,6 +1229,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
"project_id": project_id,
"source_needle": f"%{source_id}%",
"fingerprint_needle": fingerprint_needle,
"fingerprint_value": fingerprint_value,
"limit": _MAX_ROWS,
},
)
@@ -1312,7 +1324,14 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
"project_id": project_id,
"source_id": source_id,
"source_type": source_type,
"found": incident is not None or drift is not None or bool(runs) or bool(gateway_mcp_rows),
"found": (
incident is not None
or drift is not None
or bool(runs)
or bool(gateway_mcp_rows)
or bool(inbound_rows)
or bool(outbound_rows)
),
"truth_status": truth_status,
"linked_ids": {
"incident_id": incident.get("incident_id") if incident else None,
@@ -1321,6 +1340,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
"drift_report_id": drift.get("report_id") if drift else None,
"operation_ids": _operation_ids(automation_ops),
"auto_repair_execution_ids": _auto_repair_ids(auto_repair_executions),
"conversation_event_ids": [row["event_id"] for row in inbound_rows],
},
"incident": incident,
"drift": {

View File

@@ -30,6 +30,7 @@ import asyncio
import hashlib
import html
import json
import re
from datetime import UTC, datetime
from typing import Any
from uuid import NAMESPACE_URL, UUID, uuid5
@@ -46,7 +47,9 @@ logger = structlog.get_logger(__name__)
# Progressive Feedback Policy等待超過此秒數才發 interim 訊息
_INTERIM_WAIT_SECONDS = 30
_INBOUND_REDACTION_VERSION = "audit_sink_v1"
_OUTBOUND_REDACTION_VERSION = "audit_sink_v1"
_INCIDENT_ID_RE = re.compile(r"\bINC-\d{8}-[A-Z0-9]{6}\b")
def _db_timestamp_now() -> datetime:
@@ -54,6 +57,75 @@ def _db_timestamp_now() -> datetime:
return datetime.now(UTC).replace(tzinfo=None)
def _compact_unique(values: list[str | None], *, limit: int = 20) -> list[str]:
"""Return stable non-empty values without leaking duplicate source refs."""
return sorted({str(value).strip() for value in values if str(value or "").strip()})[:limit]
def build_inbound_source_envelope(
*,
provider: str,
stage: str,
provider_event_id: str,
raw_event_id: str | None = None,
raw_content: str | None = None,
alertname: str | None = None,
severity: str | None = None,
namespace: str | None = None,
target_resource: str | None = None,
fingerprint: str | None = None,
incident_id: str | None = None,
approval_id: str | None = None,
source_url: str | None = None,
labels: dict[str, Any] | None = None,
annotations: dict[str, Any] | None = None,
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build a redaction-friendly inbound replay envelope for truth-chain use."""
content_sha256 = hashlib.sha256(raw_content.encode()).hexdigest() if raw_content else None
text_refs = _INCIDENT_ID_RE.findall(raw_content or "")
provider_name = str(provider or "unknown").strip().lower() or "unknown"
source_refs = {
"event_ids": _compact_unique([raw_event_id]),
"incident_ids": _compact_unique([incident_id, *text_refs]),
"approval_ids": _compact_unique([approval_id]),
"alert_ids": _compact_unique([provider_event_id, raw_event_id]),
"fingerprints": _compact_unique([fingerprint]),
"sentry_issue_ids": _compact_unique(
[raw_event_id, provider_event_id] if provider_name == "sentry" else []
),
"signoz_alerts": _compact_unique(
[raw_event_id, alertname] if provider_name == "signoz" else []
),
}
envelope: dict[str, Any] = {
"schema_version": "inbound_source_envelope_v1",
"redaction_version": _INBOUND_REDACTION_VERSION,
"adapter": f"{provider_name}_webhook",
"provider": provider_name,
"stage": stage,
"provider_event_id": provider_event_id,
"source_url": source_url,
"content_sha256": content_sha256,
"content_length": len(raw_content) if raw_content is not None else 0,
"source_refs": source_refs,
"log_correlation": {
"alertname": alertname,
"severity": severity,
"namespace": namespace,
"target_resource": target_resource,
"fingerprint": fingerprint,
},
"labels": labels or {},
"annotations": annotations or {},
}
if extra:
envelope["extra"] = extra
sanitized = sanitize(envelope)
sanitized["content_sha256"] = content_sha256
return sanitized
def _input_sha256(input_payload: dict[str, Any] | None) -> str | None:
"""計算 Run input 的穩定 hash讓 mirror run 也能保留最小完整性證據。"""
if not input_payload:
@@ -140,6 +212,19 @@ def build_alertmanager_run_id(project_id: str, provider_event_id: str) -> UUID:
return uuid5(NAMESPACE_URL, f"awooop:alertmanager:{project_id}:{provider_event_id}")
def build_external_alert_provider_event_id(provider: str, event_id: str, stage: str) -> str:
"""建立 Sentry/SignOz 等外部告警 inbound event 的冪等 provider_event_id。"""
safe_provider = str(provider).strip().lower()[:32] or "external"
safe_event_id = str(event_id).strip()[:96] or "unknown"
safe_stage = str(stage).strip()[:32] or "received"
return f"{safe_provider}:{safe_stage}:{safe_event_id}"
def build_external_alert_run_id(project_id: str, provider_event_id: str) -> UUID:
"""為外部告警 inbound mirror 建立穩定 shadow run_id。"""
return uuid5(NAMESPACE_URL, f"awooop:external-alert:{project_id}:{provider_event_id}")
# ─────────────────────────────────────────────────────────────────────────────
# 入站事件記錄
# ─────────────────────────────────────────────────────────────────────────────
@@ -155,6 +240,7 @@ async def mirror_inbound_event(
channel_chat_id: str | None = None,
content_type: str = "text",
raw_content: str | None = None,
source_envelope: dict[str, Any] | None = None,
attachment_sha256: str | None = None,
provider_ts: datetime | None = None,
run_id: UUID | None = None,
@@ -168,12 +254,32 @@ async def mirror_inbound_event(
"""
content_hash: str | None = None
content_preview: str | None = None
content_redacted: str | None = None
if raw_content is not None:
content_hash = hashlib.sha256(raw_content.encode()).hexdigest()
# previewredact 後截取前 256 字元
redacted = _redact_string(raw_content)
content_preview = redacted[:256] if len(redacted) > 256 else redacted
content_redacted = _redact_string(raw_content)
content_preview = (
content_redacted[:256] if len(content_redacted) > 256 else content_redacted
)
if source_envelope and source_envelope.get("schema_version") == "inbound_source_envelope_v1":
original_content_sha256 = source_envelope.get("content_sha256")
envelope = sanitize(source_envelope)
envelope.setdefault("redaction_version", _INBOUND_REDACTION_VERSION)
envelope["content_sha256"] = content_hash or original_content_sha256
envelope.setdefault("content_length", len(raw_content) if raw_content is not None else 0)
else:
envelope = build_inbound_source_envelope(
provider=channel_type,
stage="received",
provider_event_id=provider_event_id,
raw_event_id=provider_event_id,
raw_content=raw_content,
extra=source_envelope,
)
source_envelope_json = json.dumps(envelope, ensure_ascii=False, default=str)
result = await db.execute(
text("""
@@ -181,16 +287,28 @@ async def mirror_inbound_event(
project_id, channel_type, provider_event_id,
platform_subject_id, channel_user_id, channel_chat_id,
run_id, content_type, content_hash, content_preview,
content_redacted, redaction_version, source_envelope,
attachment_sha256, is_duplicate, provider_ts, received_at
) VALUES (
:project_id, :channel_type, :provider_event_id,
:platform_subject_id, :channel_user_id, :channel_chat_id,
:run_id, :content_type, :content_hash, :content_preview,
:content_redacted, :redaction_version, CAST(:source_envelope AS jsonb),
:attachment_sha256, :is_duplicate, :provider_ts, NOW()
)
ON CONFLICT (project_id, channel_type, provider_event_id) DO UPDATE SET
is_duplicate = TRUE,
run_id = COALESCE(EXCLUDED.run_id, awooop_conversation_event.run_id)
run_id = COALESCE(EXCLUDED.run_id, awooop_conversation_event.run_id),
content_redacted = COALESCE(
awooop_conversation_event.content_redacted,
EXCLUDED.content_redacted
),
redaction_version = EXCLUDED.redaction_version,
source_envelope = CASE
WHEN awooop_conversation_event.source_envelope = '{}'::jsonb
THEN EXCLUDED.source_envelope
ELSE awooop_conversation_event.source_envelope
END
RETURNING event_id
"""),
{
@@ -204,6 +322,9 @@ async def mirror_inbound_event(
"content_type": content_type,
"content_hash": content_hash,
"content_preview": content_preview,
"content_redacted": content_redacted,
"redaction_version": _INBOUND_REDACTION_VERSION,
"source_envelope": source_envelope_json,
"attachment_sha256": attachment_sha256,
"is_duplicate": is_duplicate,
"provider_ts": provider_ts,
@@ -514,6 +635,10 @@ async def record_alertmanager_event(
approval_id: str | None = None,
repeat_count: int | None = None,
is_duplicate: bool = False,
source_url: str | None = None,
labels: dict[str, Any] | None = None,
annotations: dict[str, Any] | None = None,
source_extra: dict[str, Any] | None = None,
) -> UUID | None:
"""
將 Alertmanager inbound alert 鏡像到 AwoooP conversation_event。
@@ -546,6 +671,29 @@ async def record_alertmanager_event(
approval_id=approval_ref,
repeat_count=repeat_count,
)
source_envelope = build_inbound_source_envelope(
provider="alertmanager",
stage=stage,
provider_event_id=provider_event_id,
raw_event_id=alert_id,
raw_content=content,
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target_resource,
fingerprint=fingerprint,
incident_id=incident_ref,
approval_id=approval_ref,
source_url=source_url,
labels=labels,
annotations=annotations,
extra={
"notification_type": notification_type,
"alert_category": alert_category,
"repeat_count": repeat_count,
**(source_extra or {}),
},
)
async with get_db_context(project_id) as db:
run_id = build_alertmanager_run_id(project_id, provider_event_id)
@@ -581,6 +729,7 @@ async def record_alertmanager_event(
channel_chat_id=f"alertmanager:{namespace or 'default'}",
content_type="text",
raw_content=content,
source_envelope=source_envelope,
provider_ts=_db_timestamp_now(),
run_id=run_id,
is_duplicate=is_duplicate,
@@ -608,6 +757,129 @@ async def record_alertmanager_event(
return None
async def record_external_alert_event(
*,
project_id: str,
provider: str,
event_id: str,
stage: str,
title: str,
severity: str,
namespace: str | None = None,
target_resource: str | None = None,
fingerprint: str | None = None,
incident_id: str | None = None,
approval_id: str | None = None,
source_url: str | None = None,
labels: dict[str, Any] | None = None,
annotations: dict[str, Any] | None = None,
payload: dict[str, Any] | None = None,
is_duplicate: bool = False,
) -> UUID | None:
"""
將 Sentry / SignOz 等非 Alertmanager 告警鏡像到 conversation_event。
這是 truth-chain 的最低共用入口:只寫 redacted content + source_envelope
不改變原本 webhook 的通知、審批或自動化行為。
"""
provider_name = str(provider or "external").strip().lower() or "external"
provider_event_id = build_external_alert_provider_event_id(provider_name, event_id, stage)
content = "\n".join([
f"{provider_name} inbound {stage}",
f"Event ID: {event_id}",
f"Title: {title}",
f"Severity: {severity}",
f"Namespace: {namespace or '-'}",
f"Target: {target_resource or '-'}",
f"Fingerprint: {fingerprint or '-'}",
f"Incident: {incident_id or '-'}",
f"Approval: {approval_id or '-'}",
f"Source URL: {source_url or '-'}",
])
source_envelope = build_inbound_source_envelope(
provider=provider_name,
stage=stage,
provider_event_id=provider_event_id,
raw_event_id=event_id,
raw_content=content,
alertname=title,
severity=severity,
namespace=namespace,
target_resource=target_resource,
fingerprint=fingerprint,
incident_id=str(incident_id) if incident_id else None,
approval_id=str(approval_id) if approval_id else None,
source_url=source_url,
labels=labels,
annotations=annotations,
extra={
"payload": payload or {},
},
)
try:
from src.db.base import get_db_context
async with get_db_context(project_id) as db:
run_id = build_external_alert_run_id(project_id, provider_event_id)
await ensure_completed_shadow_run(
db,
project_id=project_id,
run_id=run_id,
agent_id=f"legacy-{provider_name}-webhook",
trigger_type=f"{provider_name}_inbound",
trigger_ref=provider_event_id,
input_payload={
"provider": provider_name,
"event_id": event_id,
"stage": stage,
"severity": severity,
"namespace": namespace,
"target_resource": target_resource,
"fingerprint": fingerprint,
"incident_id": str(incident_id) if incident_id else None,
"approval_id": str(approval_id) if approval_id else None,
},
)
event_uuid = await mirror_inbound_event(
db,
project_id=project_id,
channel_type="internal",
provider_event_id=provider_event_id,
platform_subject_id=provider_name,
channel_user_id=provider_name,
channel_chat_id=f"{provider_name}:{namespace or 'default'}",
content_type="text",
raw_content=content,
source_envelope=source_envelope,
provider_ts=_db_timestamp_now(),
run_id=run_id,
is_duplicate=is_duplicate,
)
logger.info(
"external_alert_event_recorded",
project_id=project_id,
provider=provider_name,
event_id=event_id,
stage=stage,
conversation_event_id=str(event_uuid),
incident_id=str(incident_id) if incident_id else None,
approval_id=str(approval_id) if approval_id else None,
)
return event_uuid
except Exception as exc:
logger.warning(
"external_alert_event_record_failed",
project_id=project_id,
provider=provider_name,
event_id=event_id,
stage=stage,
error=str(exc),
)
return None
# ─────────────────────────────────────────────────────────────────────────────
# 出站訊息記錄
# ─────────────────────────────────────────────────────────────────────────────

View File

@@ -59,6 +59,17 @@ def test_fetch_truth_chain_can_match_inbound_provider_event_id() -> None:
assert "provider_event_id = :source_id" in source
def test_fetch_truth_chain_returns_inbound_redacted_envelope_fields() -> None:
source = inspect.getsource(fetch_truth_chain)
assert "content_redacted" in source
assert "source_envelope" in source
assert "source_refs,event_ids" in source
assert "source_refs,incident_ids" in source
assert "source_refs,sentry_issue_ids" in source
assert "source_refs,signoz_alerts" in source
def test_truth_status_marks_no_action_approval_as_manual_required() -> None:
status = _truth_status(
incident={"incident_id": "INC-1", "status": "INVESTIGATING"},

View File

@@ -1,15 +1,20 @@
from __future__ import annotations
import json
from src.services.channel_hub import (
_db_timestamp_now,
build_alertmanager_provider_event_id,
build_alertmanager_run_id,
build_external_alert_provider_event_id,
build_grouped_alert_provider_event_id,
build_grouped_alert_run_id,
build_inbound_source_envelope,
ensure_completed_shadow_run,
format_alertmanager_event_content,
format_grouped_alert_digest_text,
format_grouped_alert_event_content,
mirror_inbound_event,
record_outbound_message,
)
@@ -68,6 +73,46 @@ def test_build_alertmanager_run_id_is_stable() -> None:
)
def test_build_external_alert_provider_event_id_is_stable() -> None:
event_id = build_external_alert_provider_event_id(
"Sentry",
"12345678901234567890",
"approval_linked",
)
assert event_id == "sentry:approval_linked:12345678901234567890"
assert len(event_id) < 256
def test_build_inbound_source_envelope_redacts_and_keeps_refs() -> None:
envelope = build_inbound_source_envelope(
provider="alertmanager",
stage="received",
provider_event_id="alertmanager:received:alert-1:fp-1",
raw_event_id="alert-1",
raw_content="ACTION REQUIRED INC-20260513-ABCDEF token 1234567890:abcdefghijklmnopqrstuvwxyzABCDEFGH",
alertname="DockerContainerUnhealthy",
severity="warning",
namespace="default",
target_resource="bitan-pharmacy-bitan-1",
fingerprint="fp-1",
incident_id="INC-20260513-ABCDEF",
approval_id="approval-1",
labels={"token": "should-not-leak", "container": "bitan-pharmacy-bitan-1"},
)
rendered = json.dumps(envelope, ensure_ascii=False)
assert envelope["schema_version"] == "inbound_source_envelope_v1"
assert envelope["source_refs"]["event_ids"] == ["alert-1"]
assert envelope["source_refs"]["incident_ids"] == ["INC-20260513-ABCDEF"]
assert envelope["source_refs"]["approval_ids"] == ["approval-1"]
assert envelope["source_refs"]["alert_ids"] == ["alert-1", "alertmanager:received:alert-1:fp-1"]
assert envelope["source_refs"]["fingerprints"] == ["fp-1"]
assert len(envelope["content_sha256"]) == 64
assert "1234567890:" not in rendered
assert "should-not-leak" not in rendered
def test_format_alertmanager_event_content_keeps_incident_first() -> None:
content = format_alertmanager_event_content(
stage="incident_linked",
@@ -168,6 +213,67 @@ async def test_completed_shadow_run_sets_run_state_not_null_defaults() -> None:
assert session.params["run_id"] == run_id
async def test_mirror_inbound_event_writes_redacted_replay_fields() -> None:
session = _FakeSession()
envelope = build_inbound_source_envelope(
provider="sentry",
stage="received",
provider_event_id="sentry:received:123",
raw_event_id="123",
raw_content="Sentry token 1234567890:abcdefghijklmnopqrstuvwxyzABCDEFGH",
alertname="Sentry issue",
severity="error",
namespace="sentry",
target_resource="frontend",
)
await mirror_inbound_event(
session, # type: ignore[arg-type]
project_id="awoooi",
channel_type="internal",
provider_event_id="sentry:received:123",
platform_subject_id="sentry",
raw_content="Sentry token 1234567890:abcdefghijklmnopqrstuvwxyzABCDEFGH",
source_envelope=envelope,
)
assert "content_redacted" in session.statement
assert "source_envelope" in session.statement
assert session.params["redaction_version"] == "audit_sink_v1"
assert len(json.loads(session.params["source_envelope"])["content_sha256"]) == 64
assert "1234567890:" not in session.params["content_redacted"]
assert "1234567890:" not in session.params["source_envelope"]
def test_sentry_and_signoz_source_refs_keep_raw_event_ids() -> None:
sentry_envelope = build_inbound_source_envelope(
provider="sentry",
stage="received",
provider_event_id="sentry:received:issue-123",
raw_event_id="issue-123",
raw_content="Sentry issue",
alertname="Sentry issue",
fingerprint="sentry-issue-123",
)
signoz_envelope = build_inbound_source_envelope(
provider="signoz",
stage="received",
provider_event_id="signoz:received:fp-456",
raw_event_id="fp-456",
raw_content="SignOz alert",
alertname="HighLatency",
fingerprint="fp-456",
)
assert sentry_envelope["source_refs"]["event_ids"] == ["issue-123"]
assert sentry_envelope["source_refs"]["sentry_issue_ids"] == [
"issue-123",
"sentry:received:issue-123",
]
assert signoz_envelope["source_refs"]["event_ids"] == ["fp-456"]
assert signoz_envelope["source_refs"]["signoz_alerts"] == ["HighLatency", "fp-456"]
async def test_record_outbound_message_sets_sent_at_for_sent_messages() -> None:
session = _FakeSession()
run_id = build_grouped_alert_run_id("awoooi", "telegram-message-13152")