feat(awooop): mirror alertmanager events into truth chain
All checks were successful
Code Review / ai-code-review (push) Successful in 19s
CD Pipeline / tests (push) Successful in 2m10s
CD Pipeline / build-and-deploy (push) Successful in 3m22s
CD Pipeline / post-deploy-checks (push) Successful in 1m17s

This commit is contained in:
Your Name
2026-05-13 20:16:42 +08:00
parent 21042ad0e7
commit c2d01eb6f1
5 changed files with 423 additions and 7 deletions

View File

@@ -55,7 +55,10 @@ from src.services.alertmanager_llm_guard import (
from src.services.approval_db import get_approval_service
from src.services.auto_approve import get_auto_approve_policy
from src.services.auto_repair_service import AutoRepairService
from src.services.channel_hub import record_grouped_alert_event
from src.services.channel_hub import (
record_alertmanager_event,
record_grouped_alert_event,
)
# Phase 15.2: Trace Context (moved to SignalProducerService)
# get_trace_context 已移至 Service 層
@@ -1509,6 +1512,11 @@ async def _process_new_alert_background(
try:
service = get_approval_service()
openclaw = get_openclaw()
traced_alert_labels = {
**(alert_labels or {}),
"fingerprint": fingerprint,
"alert_id": alert_id,
}
rule_response = match_rule(alert_context)
should_bypass_llm = _should_use_alertmanager_rule_first(rule_response, alert_category)
@@ -1703,7 +1711,7 @@ async def _process_new_alert_background(
message=message,
source="alertmanager",
alertname=alertname,
alert_labels=alert_labels,
alert_labels=traced_alert_labels,
notification_type=notification_type,
alert_category=alert_category,
)
@@ -1719,6 +1727,22 @@ async def _process_new_alert_background(
error=str(_meta_err),
)
await record_alertmanager_event(
project_id="awoooi",
alert_id=alert_id,
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target_resource,
fingerprint=fingerprint,
stage="incident_linked",
notification_type=notification_type,
alert_category=alert_category,
incident_id=incident_id,
approval_id=str(approval.id),
repeat_count=1,
)
if _cs2_auto_approval is not None and _cs2_exec_success is not None:
try:
_cs2_auto_approval.incident_id = incident_id
@@ -1963,7 +1987,7 @@ async def _process_new_alert_background(
message=message,
source="alertmanager",
alertname=alertname,
alert_labels=alert_labels,
alert_labels=traced_alert_labels,
notification_type=notification_type,
alert_category=alert_category,
)
@@ -1979,6 +2003,22 @@ async def _process_new_alert_background(
error=str(_meta_err),
)
await record_alertmanager_event(
project_id="awoooi",
alert_id=alert_id,
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target_resource,
fingerprint=fingerprint,
stage="incident_linked",
notification_type=notification_type,
alert_category=alert_category,
incident_id=incident_id,
approval_id=str(approval.id),
repeat_count=1,
)
if _cs3_auto_approval is not None and _cs3_exec_success is not None:
try:
_cs3_auto_approval.incident_id = incident_id
@@ -2127,7 +2167,7 @@ async def _process_new_alert_background(
message=message,
source="alertmanager",
alertname=alertname,
alert_labels=alert_labels,
alert_labels=traced_alert_labels,
notification_type=notification_type,
alert_category=alert_category,
)
@@ -2143,6 +2183,22 @@ async def _process_new_alert_background(
error=str(_meta_err),
)
await record_alertmanager_event(
project_id="awoooi",
alert_id=alert_id,
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target_resource,
fingerprint=fingerprint,
stage="incident_linked",
notification_type=notification_type,
alert_category=alert_category,
incident_id=fallback_incident_id,
approval_id=str(approval.id),
repeat_count=1,
)
await _push_to_telegram_background(
approval_id=str(approval.id),
risk_level="medium",
@@ -2389,6 +2445,19 @@ async def alertmanager_webhook(
target=target_resource,
fingerprint=fingerprint,
)
background_tasks.add_task(
record_alertmanager_event,
project_id="awoooi",
alert_id=alert_id,
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target_resource,
fingerprint=fingerprint,
stage="received",
notification_type=notification_type,
alert_category=alert_category,
)
# ==========================================================================
# ADR-076: 告警聚合引擎 — 5 分鐘滑動視窗,防止告警風暴
@@ -2471,6 +2540,23 @@ async def alertmanager_webhook(
hit_count=updated_approval.hit_count,
reason="Converged alert - Telegram already sent for this fingerprint",
)
background_tasks.add_task(
record_alertmanager_event,
project_id="awoooi",
alert_id=alert_id,
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target_resource,
fingerprint=fingerprint,
stage="converged",
notification_type=notification_type,
alert_category=alert_category,
incident_id=getattr(updated_approval, "incident_id", None),
approval_id=str(updated_approval.id),
repeat_count=updated_approval.hit_count,
is_duplicate=True,
)
return AlertResponse(
success=True,
@@ -2498,10 +2584,24 @@ async def alertmanager_webhook(
message=message,
source="alertmanager",
alertname=alertname,
alert_labels=alert.labels,
alert_labels={**alert.labels, "fingerprint": fingerprint, "alert_id": alert_id},
notification_type="TYPE-1",
alert_category=alert_category,
)
background_tasks.add_task(
record_alertmanager_event,
project_id="awoooi",
alert_id=alert_id,
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target_resource,
fingerprint=fingerprint,
stage="incident_linked",
notification_type="TYPE-1",
alert_category=alert_category,
incident_id=_info_incident_id,
)
# 2026-04-15 ogt: TYPE-1 純資訊告警建立後立即關閉
# 設計原則: backup/heartbeat/info 告警無需追蹤狀態,通知即完成
# 防止 incidents 表無限累積 INVESTIGATING 記錄ADR-073 漏洞修補)
@@ -2533,6 +2633,20 @@ async def alertmanager_webhook(
fingerprint=fingerprint,
ttl_seconds=ALERTMANAGER_LLM_INFLIGHT_LOCK_TTL_SECONDS,
)
background_tasks.add_task(
record_alertmanager_event,
project_id="awoooi",
alert_id=alert_id,
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target_resource,
fingerprint=fingerprint,
stage="llm_inflight_suppressed",
notification_type=notification_type,
alert_category=alert_category,
is_duplicate=True,
)
return AlertResponse(
success=True,
message="🛡️ 告警已由同指紋背景 AI 分析處理中,跳過重複 LLM 呼叫",

View File

@@ -101,6 +101,29 @@ def _auto_repair_ids(auto_repair_executions: list[dict[str, Any]]) -> list[str]:
return [str(row["id"]) for row in auto_repair_executions if row.get("id")]
def _incident_fingerprints(incident: dict[str, Any] | None) -> list[str]:
"""Extract durable alert fingerprints from incident signals, when present."""
if not incident:
return []
signals = incident.get("signals")
if not isinstance(signals, list):
return []
fingerprints: set[str] = set()
for signal in signals:
if not isinstance(signal, dict):
continue
direct = signal.get("fingerprint")
if direct:
fingerprints.add(str(direct))
labels = signal.get("labels")
if isinstance(labels, dict):
value = labels.get("fingerprint")
if value:
fingerprints.add(str(value))
return sorted(fingerprints)
def _looks_like_no_action(value: Any) -> bool:
text = str(value or "").upper()
return (
@@ -837,6 +860,7 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
resolved_at,
verification_result,
frequency_snapshot,
signals,
decision_chain
FROM incidents
WHERE incident_id = :source_id
@@ -901,6 +925,9 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
automation_ops: list[dict[str, Any]] = []
auto_repair_executions: list[dict[str, Any]] = []
km_entries: list[dict[str, Any]] = []
inbound_rows: list[dict[str, Any]] = []
gateway_mcp_rows: list[dict[str, Any]] = []
outbound_rows: list[dict[str, Any]] = []
if incident is not None:
incident_id = str(incident["incident_id"])
@@ -1144,6 +1171,55 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
""",
{"source_id": source_id, "project_id": project_id, "limit": _MAX_ROWS},
)
incident_fingerprints = _incident_fingerprints(incident)
fingerprint_needle = (
f"%{incident_fingerprints[0]}%"
if incident_fingerprints
else ""
)
inbound_rows = await _fetch_all(
db,
"""
SELECT
event_id,
project_id,
channel_type,
provider_event_id,
platform_subject_id,
channel_user_id,
channel_chat_id,
run_id,
content_type,
content_hash,
content_preview,
attachment_sha256,
is_duplicate,
provider_ts,
received_at
FROM awooop_conversation_event
WHERE project_id = :project_id
AND (
run_id::text = :source_id
OR content_preview ILIKE :source_needle
OR (
:fingerprint_needle != ''
AND (
provider_event_id ILIKE :fingerprint_needle
OR content_preview ILIKE :fingerprint_needle
)
)
)
ORDER BY received_at DESC
LIMIT :limit
""",
{
"source_id": source_id,
"project_id": project_id,
"source_needle": f"%{source_id}%",
"fingerprint_needle": fingerprint_needle,
"limit": _MAX_ROWS,
},
)
outbound_rows = await _fetch_all(
db,
"""
@@ -1277,11 +1353,14 @@ async def fetch_truth_chain(source_id: str, project_id: str = "awoooi") -> dict[
},
"timeline_events": timeline_events,
"channel": {
"inbound_events_visible": len(inbound_rows),
"inbound_events": inbound_rows,
"outbound_messages_visible": len(outbound_rows),
"outbound_messages": outbound_rows,
"visibility_note": (
"If this is zero while Telegram delivered a card, the outbound mirror "
"or RLS project context is not a reliable source of truth yet."
"If inbound is zero while Alertmanager fired, or outbound is zero while "
"Telegram delivered a card, channel mirrors or RLS project context are "
"not reliable sources of truth yet."
),
},
}

View File

@@ -122,6 +122,19 @@ def build_grouped_alert_run_id(project_id: str, provider_event_id: str) -> UUID:
return uuid5(NAMESPACE_URL, f"awooop:grouped-alert:{project_id}:{provider_event_id}")
def build_alertmanager_provider_event_id(alert_id: str, fingerprint: str, stage: str) -> str:
"""建立 Alertmanager inbound event 的冪等 provider_event_id。"""
safe_alert_id = str(alert_id).strip() or "unknown"
safe_fingerprint = str(fingerprint).strip()[:32] or "no-fingerprint"
safe_stage = str(stage).strip()[:32] or "received"
return f"alertmanager:{safe_stage}:{safe_alert_id}:{safe_fingerprint}"
def build_alertmanager_run_id(project_id: str, provider_event_id: str) -> UUID:
"""為 Alertmanager inbound mirror 建立穩定 shadow run_id。"""
return uuid5(NAMESPACE_URL, f"awooop:alertmanager:{project_id}:{provider_event_id}")
# ─────────────────────────────────────────────────────────────────────────────
# 入站事件記錄
# ─────────────────────────────────────────────────────────────────────────────
@@ -210,6 +223,41 @@ def build_grouped_alert_provider_event_id(alert_id: str, fingerprint: str) -> st
return f"alert-group:{safe_alert_id}:{safe_fingerprint}"
def format_alertmanager_event_content(
*,
stage: str,
alert_id: str,
alertname: str,
severity: str,
namespace: str,
target_resource: str,
fingerprint: str,
notification_type: str | None = None,
alert_category: str | None = None,
incident_id: str | None = None,
approval_id: str | None = None,
repeat_count: int | None = None,
) -> str:
"""格式化 Alertmanager inbound mirror 摘要,讓 truth-chain 可回查。"""
head = f"Incident: {incident_id}" if incident_id else f"Fingerprint: {fingerprint}"
return "\n".join(
[
f"Alertmanager inbound {stage}",
head,
f"Alert ID: {alert_id}",
f"Approval: {approval_id or '-'}",
f"Alert: {alertname}",
f"Severity: {severity}",
f"Namespace: {namespace or 'default'}",
f"Target: {target_resource or '-'}",
f"Fingerprint: {fingerprint}",
f"Notification Type: {notification_type or '-'}",
f"Alert Category: {alert_category or '-'}",
f"Repeat Count: {repeat_count if repeat_count is not None else '-'}",
]
)
def format_grouped_alert_event_content(
*,
alert_id: str,
@@ -445,6 +493,116 @@ async def record_grouped_alert_event(
return None
async def record_alertmanager_event(
*,
project_id: str,
alert_id: str,
alertname: str,
severity: str,
namespace: str,
target_resource: str,
fingerprint: str,
stage: str,
notification_type: str | None = None,
alert_category: str | None = None,
incident_id: str | None = None,
approval_id: str | None = None,
repeat_count: int | None = None,
is_duplicate: bool = False,
) -> UUID | None:
"""
將 Alertmanager inbound alert 鏡像到 AwoooP conversation_event。
Telegram 不應是唯一事實來源;每個 firing alert 至少要有 received
event建立 incident/approval 後再補 incident_linked event 供 truth-chain
依 incident_id 回查。DB 失敗 fail-open不影響 Alertmanager ACK。
"""
try:
from src.db.base import get_db_context
incident_ref = str(incident_id) if incident_id else None
approval_ref = str(approval_id) if approval_id else None
provider_event_id = build_alertmanager_provider_event_id(
alert_id=alert_id,
fingerprint=fingerprint,
stage=stage,
)
content = format_alertmanager_event_content(
stage=stage,
alert_id=alert_id,
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target_resource,
fingerprint=fingerprint,
notification_type=notification_type,
alert_category=alert_category,
incident_id=incident_ref,
approval_id=approval_ref,
repeat_count=repeat_count,
)
async with get_db_context(project_id) as db:
run_id = build_alertmanager_run_id(project_id, provider_event_id)
await ensure_completed_shadow_run(
db,
project_id=project_id,
run_id=run_id,
agent_id="legacy-alertmanager-webhook",
trigger_type="alertmanager_inbound",
trigger_ref=provider_event_id,
input_payload={
"stage": stage,
"alert_id": alert_id,
"alertname": alertname,
"severity": severity,
"namespace": namespace,
"target_resource": target_resource,
"fingerprint": fingerprint,
"notification_type": notification_type,
"alert_category": alert_category,
"incident_id": incident_ref,
"approval_id": approval_ref,
"repeat_count": repeat_count,
},
)
event_id = await mirror_inbound_event(
db,
project_id=project_id,
channel_type="internal",
provider_event_id=provider_event_id,
platform_subject_id="alertmanager",
channel_user_id="alertmanager",
channel_chat_id=f"alertmanager:{namespace or 'default'}",
content_type="text",
raw_content=content,
provider_ts=datetime.now(UTC),
run_id=run_id,
is_duplicate=is_duplicate,
)
logger.info(
"alertmanager_event_recorded",
project_id=project_id,
alert_id=alert_id,
event_id=str(event_id),
stage=stage,
incident_id=incident_ref,
fingerprint=fingerprint,
)
return event_id
except Exception as exc:
logger.warning(
"alertmanager_event_record_failed",
project_id=project_id,
alert_id=alert_id,
stage=stage,
fingerprint=fingerprint,
error=str(exc),
)
return None
# ─────────────────────────────────────────────────────────────────────────────
# 出站訊息記錄
# ─────────────────────────────────────────────────────────────────────────────

View File

@@ -12,6 +12,7 @@ from src.services.awooop_truth_chain_service import (
build_automation_quality,
build_incident_reconciliation,
_clean_row,
_incident_fingerprints,
_summarize_gateway_mcp,
_truth_status,
summarize_automation_quality_records,
@@ -37,6 +38,19 @@ def test_clean_row_parses_json_text_fields_for_gateway_visibility() -> None:
assert cleaned["plain_text"] == '{"not":"parsed"}'
def test_incident_fingerprints_reads_signal_labels() -> None:
fingerprints = _incident_fingerprints({
"incident_id": "INC-1",
"signals": [
{"labels": {"fingerprint": "fp-label"}},
{"fingerprint": "fp-direct", "labels": {}},
{"labels": {"fingerprint": "fp-label"}},
],
})
assert fingerprints == ["fp-direct", "fp-label"]
def test_truth_status_marks_no_action_approval_as_manual_required() -> None:
status = _truth_status(
incident={"incident_id": "INC-1", "status": "INVESTIGATING"},

View File

@@ -1,9 +1,12 @@
from __future__ import annotations
from src.services.channel_hub import (
build_alertmanager_provider_event_id,
build_alertmanager_run_id,
build_grouped_alert_provider_event_id,
build_grouped_alert_run_id,
ensure_completed_shadow_run,
format_alertmanager_event_content,
format_grouped_alert_digest_text,
format_grouped_alert_event_content,
record_outbound_message,
@@ -34,6 +37,54 @@ def test_build_grouped_alert_run_id_is_stable() -> None:
assert first != other_project
def test_build_alertmanager_provider_event_id_keeps_fingerprint() -> None:
event_id = build_alertmanager_provider_event_id(
"alert-20260513213000",
"abcdef1234567890" * 4,
"incident_linked",
)
assert event_id == "alertmanager:incident_linked:alert-20260513213000:abcdef1234567890abcdef1234567890"
assert len(event_id) < 256
def test_build_alertmanager_run_id_is_stable() -> None:
provider_event_id = build_alertmanager_provider_event_id(
"alert-20260513213000",
"abcdef1234567890" * 4,
"received",
)
assert build_alertmanager_run_id("awoooi", provider_event_id) == build_alertmanager_run_id(
"awoooi", provider_event_id
)
assert build_alertmanager_run_id("awoooi", provider_event_id) != build_alertmanager_run_id(
"ewoooc", provider_event_id
)
def test_format_alertmanager_event_content_keeps_incident_first() -> None:
content = format_alertmanager_event_content(
stage="incident_linked",
alert_id="alert-20260513213000",
alertname="DockerContainerExited",
severity="warning",
namespace="default",
target_resource="bitan-pharmacy-bitan-1",
fingerprint="fp-123",
notification_type="TYPE-3",
alert_category="host_resource",
incident_id="INC-20260513-ABCDEF",
approval_id="approval-1",
repeat_count=3,
)
assert content.startswith("Alertmanager inbound incident_linked\nIncident: INC-20260513-ABCDEF")
assert "Approval: approval-1" in content
assert "Fingerprint: fp-123" in content
assert "Repeat Count: 3" in content
def test_format_grouped_alert_event_content_keeps_operator_context() -> None:
content = format_grouped_alert_event_content(
alert_id="INC-20260507-ABCD12",