fix(telegram): digest grouped alert storms
All checks were successful
Code Review / ai-code-review (push) Successful in 11s
CD Pipeline / tests (push) Successful in 1m2s
CD Pipeline / build-and-deploy (push) Successful in 3m39s
CD Pipeline / post-deploy-checks (push) Successful in 1m18s

This commit is contained in:
Your Name
2026-05-07 01:51:31 +08:00
parent 968de38a94
commit 6ac61ab6d7
5 changed files with 251 additions and 2 deletions

View File

@@ -36,6 +36,17 @@ if TYPE_CHECKING:
logger = structlog.get_logger(__name__)
def _decode_redis_member(value: object, fallback: str) -> str:
"""Redis client 可能回 bytes 或 str統一成 str 供 DB / log 使用。"""
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
if isinstance(value, str):
return value
if value is None:
return fallback
return str(value)
# =============================================================================
# Data Types
# =============================================================================
@@ -189,7 +200,10 @@ class AlertGroupingService:
count = results[2]
first_members = results[3]
parent_fingerprint = first_members[0] if first_members else fingerprint
parent_fingerprint = _decode_redis_member(
first_members[0] if first_members else None,
fallback=fingerprint,
)
# 是否為父告警(第一個)
is_parent = parent_fingerprint == fingerprint or count == 1

View File

@@ -28,6 +28,7 @@ from __future__ import annotations
import asyncio
import hashlib
import html
import json
from datetime import datetime, timezone
from typing import Any
@@ -167,6 +168,111 @@ def format_grouped_alert_event_content(
)
def format_grouped_alert_digest_text(
*,
alertname: str,
severity: str,
namespace: str,
target_resource: str,
group_key: str,
count: int,
) -> str:
"""格式化要回覆到父告警卡的短 digest。"""
safe_alert = html.escape(alertname or "unknown")
safe_severity = html.escape(severity or "unknown")
safe_namespace = html.escape(namespace or "default")
safe_target = html.escape(target_resource or "unknown")
safe_group = html.escape(group_key or "unknown")
return "\n".join(
[
"🧩 <b>告警已收斂到父卡</b>",
f"├ 類型:<code>{safe_alert}</code>",
f"├ 等級:<code>{safe_severity}</code>",
f"├ 範圍:<code>{safe_namespace}</code>",
f"├ 最新目標:<code>{safe_target}</code>",
f"├ 群組:<code>{safe_group}</code>",
f"└ 目前視窗:<b>{count}</b> 筆同組告警",
"",
"完整子告警請看 AwoooP Run 監控,不再逐筆發 Telegram。",
]
)
async def maybe_send_grouped_alert_digest(
*,
project_id: str,
alertname: str,
severity: str,
namespace: str,
target_resource: str,
group_key: str,
count: int,
parent_fingerprint: str | None,
) -> bool:
"""若父告警卡已存在,回覆一則低頻 digest找不到父卡則安靜降級。"""
if not parent_fingerprint:
return False
try:
from sqlalchemy import select
from src.db.base import get_db_context
from src.db.models import ApprovalRecord
from src.services.telegram_gateway import get_telegram_gateway
async with get_db_context(project_id) as db:
result = await db.execute(
select(ApprovalRecord.incident_id)
.where(ApprovalRecord.fingerprint == parent_fingerprint)
.where(ApprovalRecord.incident_id.is_not(None))
.order_by(ApprovalRecord.created_at.desc())
.limit(1)
)
incident_id = result.scalar_one_or_none()
if not incident_id:
logger.info(
"grouped_alert_digest_parent_not_ready",
project_id=project_id,
group_key=group_key,
parent_fingerprint=parent_fingerprint,
)
return False
digest_text = format_grouped_alert_digest_text(
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target_resource,
group_key=group_key,
count=count,
)
sent = await get_telegram_gateway().append_grouped_alert_digest(
incident_id=str(incident_id),
group_key=group_key,
digest_text=digest_text,
)
logger.info(
"grouped_alert_digest_result",
project_id=project_id,
incident_id=str(incident_id),
group_key=group_key,
count=count,
sent=sent,
)
return sent
except Exception as exc:
logger.warning(
"grouped_alert_digest_failed",
project_id=project_id,
group_key=group_key,
parent_fingerprint=parent_fingerprint,
error=str(exc),
)
return False
async def record_grouped_alert_event(
*,
project_id: str,
@@ -226,6 +332,16 @@ async def record_grouped_alert_event(
group_key=group_key,
count=count,
)
await maybe_send_grouped_alert_digest(
project_id=project_id,
alertname=alertname,
severity=severity,
namespace=namespace,
target_resource=target_resource,
group_key=group_key,
count=count,
parent_fingerprint=parent_fingerprint,
)
return event_id
except Exception as exc:
logger.warning(

View File

@@ -56,6 +56,8 @@ INCIDENT_UPDATE_DEDUP_PREFIX = "awoooi:tg_update_dedup:" # {incident_id}:{statu
INCIDENT_UPDATE_DEDUP_TTL_SECONDS = 5 * 60 # 5 分鐘內相同狀態不重複洗版
INCIDENT_UPDATE_GLOBAL_FAILURE_DEDUP_PREFIX = "awoooi:tg_update_global_failure_dedup:"
INCIDENT_UPDATE_GLOBAL_FAILURE_DEDUP_TTL_SECONDS = 10 * 60 # 相同失敗摘要跨 incident 10 分鐘只推一次
GROUPED_ALERT_DIGEST_DEDUP_PREFIX = "awoooi:tg_group_digest:" # {group_key}
GROUPED_ALERT_DIGEST_DEDUP_TTL_SECONDS = 5 * 60 # 同一告警群組 5 分鐘只推一則 digest
# 2026-04-01 Claude Code: Long Polling 分散式 Leader Election
# 防止多 Pod 同時 getUpdates → 409 Conflict 互搶問題
@@ -4765,6 +4767,91 @@ class TelegramGateway:
)
return True
async def append_grouped_alert_digest(
self,
*,
incident_id: str,
group_key: str,
digest_text: str,
) -> bool:
"""
將同組告警收斂摘要回覆到父告警卡,不移除原卡按鈕。
與 append_incident_update 不同digest 是觀測訊息,不代表執行狀態改變,
因此不能動 approve/reject/silence 按鈕。
"""
redis = get_redis()
stored = await redis.get(f"tg_msg:{incident_id}")
if not stored:
logger.info(
"grouped_alert_digest_no_parent_message",
incident_id=incident_id,
group_key=group_key,
)
return False
try:
message_id = int(stored)
except (ValueError, TypeError):
logger.warning(
"grouped_alert_digest_invalid_parent_message",
incident_id=incident_id,
stored=stored,
)
return False
dedup_key = f"{GROUPED_ALERT_DIGEST_DEDUP_PREFIX}{group_key}"
try:
was_set = await redis.set(
dedup_key,
incident_id,
ex=GROUPED_ALERT_DIGEST_DEDUP_TTL_SECONDS,
nx=True,
)
if not was_set:
logger.info(
"grouped_alert_digest_dedup_suppressed",
incident_id=incident_id,
group_key=group_key,
)
return True
except Exception as exc:
logger.warning(
"grouped_alert_digest_dedup_failed",
incident_id=incident_id,
group_key=group_key,
error=str(exc),
)
try:
await self._send_request("sendMessage", {
"chat_id": self.alert_chat_id,
"text": digest_text[:1400],
"parse_mode": "HTML",
"reply_parameters": {
"message_id": message_id,
"allow_sending_without_reply": True,
},
"disable_web_page_preview": True,
})
except TelegramGatewayError as exc:
logger.warning(
"grouped_alert_digest_reply_failed",
incident_id=incident_id,
group_key=group_key,
message_id=message_id,
error=str(exc),
)
return False
logger.info(
"grouped_alert_digest_reply_sent",
incident_id=incident_id,
group_key=group_key,
message_id=message_id,
)
return True
async def _dispatch_category_action(
self,
callback_query_id: str,

View File

@@ -12,7 +12,11 @@ ADR-076: 告警聚合引擎 — 告警風暴防禦
import pytest
from src.services.alert_grouping_service import AlertGroupingService, GroupingResult
from src.services.alert_grouping_service import (
AlertGroupingService,
GroupingResult,
_decode_redis_member,
)
class TestBuildGroupKey:
@@ -116,6 +120,16 @@ class TestGroupingResultDataclass:
assert len(parts) == 2
class TestRedisMemberDecode:
"""測試 Redis zrange member 正規化。"""
def test_decode_bytes_member(self):
assert _decode_redis_member(b"fp-parent", "fallback") == "fp-parent"
def test_decode_none_uses_fallback(self):
assert _decode_redis_member(None, "fallback") == "fallback"
class TestAlertGroupingServiceConstants:
"""測試服務常量設定"""

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from src.services.channel_hub import (
build_grouped_alert_provider_event_id,
format_grouped_alert_digest_text,
format_grouped_alert_event_content,
)
@@ -34,3 +35,20 @@ def test_format_grouped_alert_event_content_keeps_operator_context() -> None:
assert "Target: sentry-self-hosted-events-consumer-1" in content
assert "Group Count: 4" in content
assert "Parent Fingerprint: parent-fp" in content
def test_format_grouped_alert_digest_text_is_html_safe() -> None:
content = format_grouped_alert_digest_text(
alertname="Docker<Restart>",
severity="critical",
namespace="default",
target_resource="sentry&snuba",
group_key="Docker<Restart>:default",
count=7,
)
assert "告警已收斂到父卡" in content
assert "Docker&lt;Restart&gt;" in content
assert "sentry&amp;snuba" in content
assert "<b>7</b> 筆同組告警" in content
assert "AwoooP Run 監控" in content