diff --git a/apps/api/src/api/v1/webhooks.py b/apps/api/src/api/v1/webhooks.py index e8107db5..795dbbe6 100644 --- a/apps/api/src/api/v1/webhooks.py +++ b/apps/api/src/api/v1/webhooks.py @@ -55,7 +55,10 @@ from src.services.alertmanager_llm_guard import ( from src.services.approval_db import get_approval_service from src.services.auto_approve import get_auto_approve_policy from src.services.auto_repair_service import AutoRepairService -from src.services.channel_hub import record_grouped_alert_event +from src.services.channel_hub import ( + record_alertmanager_event, + record_grouped_alert_event, +) # Phase 15.2: Trace Context (moved to SignalProducerService) # get_trace_context 已移至 Service 層 @@ -1509,6 +1512,11 @@ async def _process_new_alert_background( try: service = get_approval_service() openclaw = get_openclaw() + traced_alert_labels = { + **(alert_labels or {}), + "fingerprint": fingerprint, + "alert_id": alert_id, + } rule_response = match_rule(alert_context) should_bypass_llm = _should_use_alertmanager_rule_first(rule_response, alert_category) @@ -1703,7 +1711,7 @@ async def _process_new_alert_background( message=message, source="alertmanager", alertname=alertname, - alert_labels=alert_labels, + alert_labels=traced_alert_labels, notification_type=notification_type, alert_category=alert_category, ) @@ -1719,6 +1727,22 @@ async def _process_new_alert_background( error=str(_meta_err), ) + await record_alertmanager_event( + project_id="awoooi", + alert_id=alert_id, + alertname=alertname, + severity=severity, + namespace=namespace, + target_resource=target_resource, + fingerprint=fingerprint, + stage="incident_linked", + notification_type=notification_type, + alert_category=alert_category, + incident_id=incident_id, + approval_id=str(approval.id), + repeat_count=1, + ) + if _cs2_auto_approval is not None and _cs2_exec_success is not None: try: _cs2_auto_approval.incident_id = incident_id @@ -1963,7 +1987,7 @@ async def _process_new_alert_background( message=message, source="alertmanager", alertname=alertname, - alert_labels=alert_labels, + alert_labels=traced_alert_labels, notification_type=notification_type, alert_category=alert_category, ) @@ -1979,6 +2003,22 @@ async def _process_new_alert_background( error=str(_meta_err), ) + await record_alertmanager_event( + project_id="awoooi", + alert_id=alert_id, + alertname=alertname, + severity=severity, + namespace=namespace, + target_resource=target_resource, + fingerprint=fingerprint, + stage="incident_linked", + notification_type=notification_type, + alert_category=alert_category, + incident_id=incident_id, + approval_id=str(approval.id), + repeat_count=1, + ) + if _cs3_auto_approval is not None and _cs3_exec_success is not None: try: _cs3_auto_approval.incident_id = incident_id @@ -2127,7 +2167,7 @@ async def _process_new_alert_background( message=message, source="alertmanager", alertname=alertname, - alert_labels=alert_labels, + alert_labels=traced_alert_labels, notification_type=notification_type, alert_category=alert_category, ) @@ -2143,6 +2183,22 @@ async def _process_new_alert_background( error=str(_meta_err), ) + await record_alertmanager_event( + project_id="awoooi", + alert_id=alert_id, + alertname=alertname, + severity=severity, + namespace=namespace, + target_resource=target_resource, + fingerprint=fingerprint, + stage="incident_linked", + notification_type=notification_type, + alert_category=alert_category, + incident_id=fallback_incident_id, + approval_id=str(approval.id), + repeat_count=1, + ) + await _push_to_telegram_background( approval_id=str(approval.id), risk_level="medium", @@ -2389,6 +2445,19 @@ async def alertmanager_webhook( target=target_resource, fingerprint=fingerprint, ) + background_tasks.add_task( + record_alertmanager_event, + project_id="awoooi", + alert_id=alert_id, + alertname=alertname, + severity=severity, + namespace=namespace, + target_resource=target_resource, + fingerprint=fingerprint, + stage="received", + notification_type=notification_type, + alert_category=alert_category, + ) # ========================================================================== # ADR-076: 告警聚合引擎 — 5 分鐘滑動視窗,防止告警風暴 @@ -2471,6 +2540,23 @@ async def alertmanager_webhook( hit_count=updated_approval.hit_count, reason="Converged alert - Telegram already sent for this fingerprint", ) + background_tasks.add_task( + record_alertmanager_event, + project_id="awoooi", + alert_id=alert_id, + alertname=alertname, + severity=severity, + namespace=namespace, + target_resource=target_resource, + fingerprint=fingerprint, + stage="converged", + notification_type=notification_type, + alert_category=alert_category, + incident_id=getattr(updated_approval, "incident_id", None), + approval_id=str(updated_approval.id), + repeat_count=updated_approval.hit_count, + is_duplicate=True, + ) return AlertResponse( success=True, @@ -2498,10 +2584,24 @@ async def alertmanager_webhook( message=message, source="alertmanager", alertname=alertname, - alert_labels=alert.labels, + alert_labels={**alert.labels, "fingerprint": fingerprint, "alert_id": alert_id}, notification_type="TYPE-1", alert_category=alert_category, ) + background_tasks.add_task( + record_alertmanager_event, + project_id="awoooi", + alert_id=alert_id, + alertname=alertname, + severity=severity, + namespace=namespace, + target_resource=target_resource, + fingerprint=fingerprint, + stage="incident_linked", + notification_type="TYPE-1", + alert_category=alert_category, + incident_id=_info_incident_id, + ) # 2026-04-15 ogt: TYPE-1 純資訊告警建立後立即關閉 # 設計原則: backup/heartbeat/info 告警無需追蹤狀態,通知即完成 # 防止 incidents 表無限累積 INVESTIGATING 記錄(ADR-073 漏洞修補) @@ -2533,6 +2633,20 @@ async def alertmanager_webhook( fingerprint=fingerprint, ttl_seconds=ALERTMANAGER_LLM_INFLIGHT_LOCK_TTL_SECONDS, ) + background_tasks.add_task( + record_alertmanager_event, + project_id="awoooi", + alert_id=alert_id, + alertname=alertname, + severity=severity, + namespace=namespace, + target_resource=target_resource, + fingerprint=fingerprint, + stage="llm_inflight_suppressed", + notification_type=notification_type, + alert_category=alert_category, + is_duplicate=True, + ) return AlertResponse( success=True, message="🛡️ 告警已由同指紋背景 AI 分析處理中,跳過重複 LLM 呼叫", diff --git a/apps/api/src/services/awooop_truth_chain_service.py b/apps/api/src/services/awooop_truth_chain_service.py index f296ce87..dfe733de 100644 --- a/apps/api/src/services/awooop_truth_chain_service.py +++ b/apps/api/src/services/awooop_truth_chain_service.py @@ -101,6 +101,29 @@ def _auto_repair_ids(auto_repair_executions: list[dict[str, Any]]) -> list[str]: return [str(row["id"]) for row in auto_repair_executions if row.get("id")] +def _incident_fingerprints(incident: dict[str, Any] | None) -> list[str]: + """Extract durable alert fingerprints from incident signals, when present.""" + if not incident: + return [] + signals = incident.get("signals") + if not isinstance(signals, list): + return [] + + fingerprints: set[str] = set() + for signal in signals: + if not isinstance(signal, dict): + continue + direct = signal.get("fingerprint") + if direct: + fingerprints.add(str(direct)) + labels = signal.get("labels") + if isinstance(labels, dict): + value = labels.get("fingerprint") + if value: + fingerprints.add(str(value)) + return sorted(fingerprints) + + def _looks_like_no_action(value: Any) -> bool: text = str(value or "").upper() return ( @@ -837,6 +860,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ resolved_at, verification_result, frequency_snapshot, + signals, decision_chain FROM incidents WHERE incident_id = :source_id @@ -901,6 +925,9 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ automation_ops: list[dict[str, Any]] = [] auto_repair_executions: list[dict[str, Any]] = [] km_entries: list[dict[str, Any]] = [] + inbound_rows: list[dict[str, Any]] = [] + gateway_mcp_rows: list[dict[str, Any]] = [] + outbound_rows: list[dict[str, Any]] = [] if incident is not None: incident_id = str(incident["incident_id"]) @@ -1144,6 +1171,55 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ """, {"source_id": source_id, "project_id": project_id, "limit": _MAX_ROWS}, ) + incident_fingerprints = _incident_fingerprints(incident) + fingerprint_needle = ( + f"%{incident_fingerprints[0]}%" + if incident_fingerprints + else "" + ) + inbound_rows = await _fetch_all( + db, + """ + SELECT + event_id, + project_id, + channel_type, + provider_event_id, + platform_subject_id, + channel_user_id, + channel_chat_id, + run_id, + content_type, + content_hash, + content_preview, + attachment_sha256, + is_duplicate, + provider_ts, + received_at + FROM awooop_conversation_event + WHERE project_id = :project_id + AND ( + run_id::text = :source_id + OR content_preview ILIKE :source_needle + OR ( + :fingerprint_needle != '' + AND ( + provider_event_id ILIKE :fingerprint_needle + OR content_preview ILIKE :fingerprint_needle + ) + ) + ) + ORDER BY received_at DESC + LIMIT :limit + """, + { + "source_id": source_id, + "project_id": project_id, + "source_needle": f"%{source_id}%", + "fingerprint_needle": fingerprint_needle, + "limit": _MAX_ROWS, + }, + ) outbound_rows = await _fetch_all( db, """ @@ -1277,11 +1353,14 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[ }, "timeline_events": timeline_events, "channel": { + "inbound_events_visible": len(inbound_rows), + "inbound_events": inbound_rows, "outbound_messages_visible": len(outbound_rows), "outbound_messages": outbound_rows, "visibility_note": ( - "If this is zero while Telegram delivered a card, the outbound mirror " - "or RLS project context is not a reliable source of truth yet." + "If inbound is zero while Alertmanager fired, or outbound is zero while " + "Telegram delivered a card, channel mirrors or RLS project context are " + "not reliable sources of truth yet." ), }, } diff --git a/apps/api/src/services/channel_hub.py b/apps/api/src/services/channel_hub.py index 5b8e7c65..68a46743 100644 --- a/apps/api/src/services/channel_hub.py +++ b/apps/api/src/services/channel_hub.py @@ -122,6 +122,19 @@ def build_grouped_alert_run_id(project_id: str, provider_event_id: str) -> UUID: return uuid5(NAMESPACE_URL, f"awooop:grouped-alert:{project_id}:{provider_event_id}") +def build_alertmanager_provider_event_id(alert_id: str, fingerprint: str, stage: str) -> str: + """建立 Alertmanager inbound event 的冪等 provider_event_id。""" + safe_alert_id = str(alert_id).strip() or "unknown" + safe_fingerprint = str(fingerprint).strip()[:32] or "no-fingerprint" + safe_stage = str(stage).strip()[:32] or "received" + return f"alertmanager:{safe_stage}:{safe_alert_id}:{safe_fingerprint}" + + +def build_alertmanager_run_id(project_id: str, provider_event_id: str) -> UUID: + """為 Alertmanager inbound mirror 建立穩定 shadow run_id。""" + return uuid5(NAMESPACE_URL, f"awooop:alertmanager:{project_id}:{provider_event_id}") + + # ───────────────────────────────────────────────────────────────────────────── # 入站事件記錄 # ───────────────────────────────────────────────────────────────────────────── @@ -210,6 +223,41 @@ def build_grouped_alert_provider_event_id(alert_id: str, fingerprint: str) -> st return f"alert-group:{safe_alert_id}:{safe_fingerprint}" +def format_alertmanager_event_content( + *, + stage: str, + alert_id: str, + alertname: str, + severity: str, + namespace: str, + target_resource: str, + fingerprint: str, + notification_type: str | None = None, + alert_category: str | None = None, + incident_id: str | None = None, + approval_id: str | None = None, + repeat_count: int | None = None, +) -> str: + """格式化 Alertmanager inbound mirror 摘要,讓 truth-chain 可回查。""" + head = f"Incident: {incident_id}" if incident_id else f"Fingerprint: {fingerprint}" + return "\n".join( + [ + f"Alertmanager inbound {stage}", + head, + f"Alert ID: {alert_id}", + f"Approval: {approval_id or '-'}", + f"Alert: {alertname}", + f"Severity: {severity}", + f"Namespace: {namespace or 'default'}", + f"Target: {target_resource or '-'}", + f"Fingerprint: {fingerprint}", + f"Notification Type: {notification_type or '-'}", + f"Alert Category: {alert_category or '-'}", + f"Repeat Count: {repeat_count if repeat_count is not None else '-'}", + ] + ) + + def format_grouped_alert_event_content( *, alert_id: str, @@ -445,6 +493,116 @@ async def record_grouped_alert_event( return None +async def record_alertmanager_event( + *, + project_id: str, + alert_id: str, + alertname: str, + severity: str, + namespace: str, + target_resource: str, + fingerprint: str, + stage: str, + notification_type: str | None = None, + alert_category: str | None = None, + incident_id: str | None = None, + approval_id: str | None = None, + repeat_count: int | None = None, + is_duplicate: bool = False, +) -> UUID | None: + """ + 將 Alertmanager inbound alert 鏡像到 AwoooP conversation_event。 + + Telegram 不應是唯一事實來源;每個 firing alert 至少要有 received + event,建立 incident/approval 後再補 incident_linked event 供 truth-chain + 依 incident_id 回查。DB 失敗 fail-open,不影響 Alertmanager ACK。 + """ + try: + from src.db.base import get_db_context + + incident_ref = str(incident_id) if incident_id else None + approval_ref = str(approval_id) if approval_id else None + provider_event_id = build_alertmanager_provider_event_id( + alert_id=alert_id, + fingerprint=fingerprint, + stage=stage, + ) + content = format_alertmanager_event_content( + stage=stage, + alert_id=alert_id, + alertname=alertname, + severity=severity, + namespace=namespace, + target_resource=target_resource, + fingerprint=fingerprint, + notification_type=notification_type, + alert_category=alert_category, + incident_id=incident_ref, + approval_id=approval_ref, + repeat_count=repeat_count, + ) + + async with get_db_context(project_id) as db: + run_id = build_alertmanager_run_id(project_id, provider_event_id) + await ensure_completed_shadow_run( + db, + project_id=project_id, + run_id=run_id, + agent_id="legacy-alertmanager-webhook", + trigger_type="alertmanager_inbound", + trigger_ref=provider_event_id, + input_payload={ + "stage": stage, + "alert_id": alert_id, + "alertname": alertname, + "severity": severity, + "namespace": namespace, + "target_resource": target_resource, + "fingerprint": fingerprint, + "notification_type": notification_type, + "alert_category": alert_category, + "incident_id": incident_ref, + "approval_id": approval_ref, + "repeat_count": repeat_count, + }, + ) + event_id = await mirror_inbound_event( + db, + project_id=project_id, + channel_type="internal", + provider_event_id=provider_event_id, + platform_subject_id="alertmanager", + channel_user_id="alertmanager", + channel_chat_id=f"alertmanager:{namespace or 'default'}", + content_type="text", + raw_content=content, + provider_ts=datetime.now(UTC), + run_id=run_id, + is_duplicate=is_duplicate, + ) + + logger.info( + "alertmanager_event_recorded", + project_id=project_id, + alert_id=alert_id, + event_id=str(event_id), + stage=stage, + incident_id=incident_ref, + fingerprint=fingerprint, + ) + return event_id + except Exception as exc: + logger.warning( + "alertmanager_event_record_failed", + project_id=project_id, + alert_id=alert_id, + stage=stage, + fingerprint=fingerprint, + error=str(exc), + ) + return None + + # ───────────────────────────────────────────────────────────────────────────── # 出站訊息記錄 # ───────────────────────────────────────────────────────────────────────────── diff --git a/apps/api/tests/test_awooop_truth_chain_service.py b/apps/api/tests/test_awooop_truth_chain_service.py index 53f5c3c0..48315a50 100644 --- a/apps/api/tests/test_awooop_truth_chain_service.py +++ b/apps/api/tests/test_awooop_truth_chain_service.py @@ -12,6 +12,7 @@ from src.services.awooop_truth_chain_service import ( build_automation_quality, build_incident_reconciliation, _clean_row, + _incident_fingerprints, _summarize_gateway_mcp, _truth_status, summarize_automation_quality_records, @@ -37,6 +38,19 @@ def test_clean_row_parses_json_text_fields_for_gateway_visibility() -> None: assert cleaned["plain_text"] == '{"not":"parsed"}' +def test_incident_fingerprints_reads_signal_labels() -> None: + fingerprints = _incident_fingerprints({ + "incident_id": "INC-1", + "signals": [ + {"labels": {"fingerprint": "fp-label"}}, + {"fingerprint": "fp-direct", "labels": {}}, + {"labels": {"fingerprint": "fp-label"}}, + ], + }) + + assert fingerprints == ["fp-direct", "fp-label"] + + def test_truth_status_marks_no_action_approval_as_manual_required() -> None: status = _truth_status( incident={"incident_id": "INC-1", "status": "INVESTIGATING"}, diff --git a/apps/api/tests/test_channel_hub_grouped_alert_events.py b/apps/api/tests/test_channel_hub_grouped_alert_events.py index 02f08ea4..e6e70ab4 100644 --- a/apps/api/tests/test_channel_hub_grouped_alert_events.py +++ b/apps/api/tests/test_channel_hub_grouped_alert_events.py @@ -1,9 +1,12 @@ from __future__ import annotations from src.services.channel_hub import ( + build_alertmanager_provider_event_id, + build_alertmanager_run_id, build_grouped_alert_provider_event_id, build_grouped_alert_run_id, ensure_completed_shadow_run, + format_alertmanager_event_content, format_grouped_alert_digest_text, format_grouped_alert_event_content, record_outbound_message, @@ -34,6 +37,54 @@ def test_build_grouped_alert_run_id_is_stable() -> None: assert first != other_project +def test_build_alertmanager_provider_event_id_keeps_fingerprint() -> None: + event_id = build_alertmanager_provider_event_id( + "alert-20260513213000", + "abcdef1234567890" * 4, + "incident_linked", + ) + + assert event_id == "alertmanager:incident_linked:alert-20260513213000:abcdef1234567890abcdef1234567890" + assert len(event_id) < 256 + + +def test_build_alertmanager_run_id_is_stable() -> None: + provider_event_id = build_alertmanager_provider_event_id( + "alert-20260513213000", + "abcdef1234567890" * 4, + "received", + ) + + assert build_alertmanager_run_id("awoooi", provider_event_id) == build_alertmanager_run_id( + "awoooi", provider_event_id + ) + assert build_alertmanager_run_id("awoooi", provider_event_id) != build_alertmanager_run_id( + "ewoooc", provider_event_id + ) + + +def test_format_alertmanager_event_content_keeps_incident_first() -> None: + content = format_alertmanager_event_content( + stage="incident_linked", + alert_id="alert-20260513213000", + alertname="DockerContainerExited", + severity="warning", + namespace="default", + target_resource="bitan-pharmacy-bitan-1", + fingerprint="fp-123", + notification_type="TYPE-3", + alert_category="host_resource", + incident_id="INC-20260513-ABCDEF", + approval_id="approval-1", + repeat_count=3, + ) + + assert content.startswith("Alertmanager inbound incident_linked\nIncident: INC-20260513-ABCDEF") + assert "Approval: approval-1" in content + assert "Fingerprint: fp-123" in content + assert "Repeat Count: 3" in content + + def test_format_grouped_alert_event_content_keeps_operator_context() -> None: content = format_grouped_alert_event_content( alert_id="INC-20260507-ABCD12",