diff --git a/apps/api/src/api/v1/webhooks.py b/apps/api/src/api/v1/webhooks.py index 1fe6bf81..87fd1a16 100644 --- a/apps/api/src/api/v1/webhooks.py +++ b/apps/api/src/api/v1/webhooks.py @@ -59,6 +59,9 @@ from src.services.channel_hub import ( record_alertmanager_event, record_grouped_alert_event, ) +from src.services.converged_alert_recurrence_notifier import ( + notify_converged_alert_recurrence, +) # Phase 15.2: Trace Context (moved to SignalProducerService) # get_trace_context 已移至 Service 層 @@ -1148,15 +1151,29 @@ async def receive_alert( # 避免 Telegram 洗版,用戶可在 UI 查看聚合次數 # ================================================================= logger.info( - "alert_converged_telegram_skipped", + "alert_converged_telegram_recurrence_scheduled", approval_id=str(updated_approval.id), hit_count=updated_approval.hit_count, - reason="Converged alert - Telegram already sent for this fingerprint", + reason="Converged alert - scheduling throttled recurrence notice", + ) + background_tasks.add_task( + notify_converged_alert_recurrence, + source=alert.source, + fingerprint=fingerprint, + alertname=alert.alert_type, + severity=alert.severity, + namespace=alert.namespace, + target_resource=alert.target_resource, + hit_count=updated_approval.hit_count, + incident_id=getattr(updated_approval, "incident_id", None), + approval_id=str(updated_approval.id), + alert_category=alert.alert_type, + notification_type="generic", ) return AlertResponse( success=True, - message=f"🛡️ 告警收斂 (x{updated_approval.hit_count}) - Telegram 已發送,跳過重複通知", + message=f"🛡️ 告警收斂 (x{updated_approval.hit_count}) - 已排程節流再通知", alert_id=alert_id, approval_created=False, # 未建立新卡片 approval_id=str(updated_approval.id), @@ -2693,10 +2710,10 @@ async def alertmanager_webhook( # 2026-03-27 ogt: 收斂告警不重複發送 Telegram,只更新 hit_count # 用戶可在 UI 查看聚合次數,避免 Telegram 洗版 logger.info( - "alertmanager_converged_telegram_skipped", + "alertmanager_converged_telegram_recurrence_scheduled", approval_id=str(updated_approval.id), hit_count=updated_approval.hit_count, - reason="Converged alert - Telegram already sent for this fingerprint", + reason="Converged alert - scheduling throttled recurrence notice", ) background_tasks.add_task( record_alertmanager_event, @@ -2718,10 +2735,24 @@ async def alertmanager_webhook( labels=dict(alert.labels) if alert.labels else {}, annotations=dict(alert.annotations) if alert.annotations else {}, ) + background_tasks.add_task( + notify_converged_alert_recurrence, + source="alertmanager", + fingerprint=fingerprint, + alertname=alertname, + severity=severity, + namespace=namespace, + target_resource=target_resource, + hit_count=updated_approval.hit_count, + incident_id=getattr(updated_approval, "incident_id", None), + approval_id=str(updated_approval.id), + alert_category=alert_category, + notification_type=notification_type, + ) return AlertResponse( success=True, - message=f"🛡️ 告警收斂 (x{updated_approval.hit_count}) - Telegram 已發送,跳過重複通知", + message=f"🛡️ 告警收斂 (x{updated_approval.hit_count}) - 已排程節流再通知", alert_id=alert_id, approval_created=False, approval_id=str(updated_approval.id), diff --git a/apps/api/src/services/converged_alert_recurrence_notifier.py b/apps/api/src/services/converged_alert_recurrence_notifier.py new file mode 100644 index 00000000..b2b06d28 --- /dev/null +++ b/apps/api/src/services/converged_alert_recurrence_notifier.py @@ -0,0 +1,194 @@ +"""Throttled Telegram notices for converged alert fingerprints.""" + +import hashlib +import html + +from src.core.config import settings +from src.core.logging import get_logger +from src.core.redis_client import get_redis +from src.services.telegram_gateway import get_telegram_gateway + +logger = get_logger("awoooi.converged_alert_recurrence") + +CONVERGED_ALERT_RECURRENCE_REDIS_PREFIX = "awoooi:tg:converged_alert_recurrence:" +CONVERGED_ALERT_RECURRENCE_TTL_SECONDS = 30 * 60 +CONVERGED_ALERT_RECURRENCE_FALLBACK_MILESTONES = frozenset({2, 3, 5, 10, 20, 50, 100}) + + +def shorten_alert_text(value: str | None, *, limit: int = 80) -> str: + """Keep Telegram recurrence notices readable and HTML-safe.""" + + text = " ".join(str(value or "-").split()) + if len(text) <= limit: + return html.escape(text) + return html.escape(f"{text[: limit - 3]}...") + + +def is_converged_alert_recurrence_milestone(hit_count: int) -> bool: + """Fallback dedupe when Redis is unavailable.""" + + if hit_count <= 1: + return False + return hit_count in CONVERGED_ALERT_RECURRENCE_FALLBACK_MILESTONES or hit_count % 100 == 0 + + +def converged_alert_recurrence_key(fingerprint: str) -> str: + digest = hashlib.sha256(str(fingerprint or "missing").encode()).hexdigest()[:32] + return f"{CONVERGED_ALERT_RECURRENCE_REDIS_PREFIX}{digest}" + + +async def should_notify_converged_alert_recurrence( + *, + fingerprint: str, + hit_count: int, +) -> bool: + """Throttle converged alert notices without making active incidents disappear.""" + + if hit_count <= 1: + return False + + key = converged_alert_recurrence_key(fingerprint) + try: + redis = get_redis() + acquired = await redis.set( + key, + str(hit_count), + ex=CONVERGED_ALERT_RECURRENCE_TTL_SECONDS, + nx=True, + ) + return bool(acquired) + except Exception as exc: + logger.warning( + "converged_alert_recurrence_dedup_unavailable", + fingerprint_hash=key.rsplit(":", 1)[-1], + hit_count=hit_count, + fallback="milestone", + error=str(exc), + ) + return is_converged_alert_recurrence_milestone(hit_count) + + +def format_converged_alert_recurrence_message( + *, + source: str, + alertname: str, + severity: str, + namespace: str, + target_resource: str, + hit_count: int, + incident_id: str | None, + approval_id: str | None, + alert_category: str = "", + notification_type: str = "", +) -> str: + """Build a concise recurrence notice for an already-open alert fingerprint.""" + + return "\n".join( + [ + "告警仍在發生", + "同一指紋已收斂,系統保留去重,但不再完全靜音。", + "", + f"來源:{shorten_alert_text(source, limit=40)}", + f"告警:{shorten_alert_text(alertname, limit=80)}", + f"嚴重度:{shorten_alert_text(severity, limit=24)}", + f"目標:{shorten_alert_text(target_resource, limit=80)}", + f"命名空間:{shorten_alert_text(namespace, limit=48)}", + f"累計次數:{hit_count}", + f"事件:{shorten_alert_text(incident_id, limit=48)}", + f"簽核:{shorten_alert_text(approval_id, limit=48)}", + f"分類:{shorten_alert_text(alert_category or '-', limit=48)}", + f"通知型別:{shorten_alert_text(notification_type or '-', limit=48)}", + "", + "下一步:請查看 AwoooP 事件時間線;這不是新的自動修復授權。", + ] + ) + + +async def notify_converged_alert_recurrence( + *, + source: str, + fingerprint: str, + alertname: str, + severity: str, + namespace: str, + target_resource: str, + hit_count: int, + incident_id: str | None, + approval_id: str | None, + alert_category: str = "", + notification_type: str = "", +) -> None: + """Send a throttled recurrence notice for an already-open alert fingerprint.""" + + if not await should_notify_converged_alert_recurrence( + fingerprint=fingerprint, + hit_count=hit_count, + ): + logger.info( + "converged_alert_recurrence_throttled", + source=source, + hit_count=hit_count, + approval_id=approval_id, + ) + return + + text = format_converged_alert_recurrence_message( + source=source, + alertname=alertname, + severity=severity, + namespace=namespace, + target_resource=target_resource, + hit_count=hit_count, + incident_id=incident_id, + approval_id=approval_id, + alert_category=alert_category, + notification_type=notification_type, + ) + + gateway = get_telegram_gateway() + sent_count = 0 + failures: list[str] = [] + + try: + await gateway.send_alert_notification(text) + sent_count += 1 + except Exception as exc: + failures.append(f"primary:{type(exc).__name__}") + logger.warning( + "converged_alert_recurrence_primary_failed", + source=source, + approval_id=approval_id, + error=str(exc), + ) + + private_chat_id = settings.OPENCLAW_TG_CHAT_ID + if private_chat_id and private_chat_id != gateway.alert_chat_id: + try: + await gateway.send_notification(text, chat_id=private_chat_id) + sent_count += 1 + except Exception as exc: + failures.append(f"private:{type(exc).__name__}") + logger.warning( + "converged_alert_recurrence_private_mirror_failed", + source=source, + approval_id=approval_id, + error=str(exc), + ) + + if sent_count: + logger.info( + "converged_alert_recurrence_sent", + source=source, + hit_count=hit_count, + approval_id=approval_id, + mirrored_to_private=bool(private_chat_id and private_chat_id != gateway.alert_chat_id), + sent_count=sent_count, + ) + else: + logger.error( + "converged_alert_recurrence_failed", + source=source, + hit_count=hit_count, + approval_id=approval_id, + failures=failures, + ) diff --git a/apps/api/tests/test_alert_converged_recurrence.py b/apps/api/tests/test_alert_converged_recurrence.py new file mode 100644 index 00000000..317586b7 --- /dev/null +++ b/apps/api/tests/test_alert_converged_recurrence.py @@ -0,0 +1,136 @@ +import pytest + +from src.services import converged_alert_recurrence_notifier as notifier + + +class _FakeRedis: + def __init__(self, result): + self.result = result + self.calls = [] + + async def set(self, key, value, *, ex=None, nx=None): + self.calls.append({"key": key, "value": value, "ex": ex, "nx": nx}) + return self.result + + +class _FakeGateway: + alert_chat_id = "group-chat" + + def __init__(self): + self.primary_messages = [] + self.private_messages = [] + + async def send_alert_notification(self, text): + self.primary_messages.append(text) + return {"ok": True} + + async def send_notification(self, text, *, chat_id=None): + self.private_messages.append({"text": text, "chat_id": chat_id}) + return {"ok": True} + + +def test_converged_recurrence_message_escapes_html(): + text = notifier.format_converged_alert_recurrence_message( + source="alertmanager", + alertname="Disk", + severity="critical", + namespace="prod&ops", + target_resource="api