feat(heartbeat): noise reduction — silent 6h + warnings hash dedup
P0 #4 (徹底長期修系列) — 統帥鐵證:「INFO | AWOOOI 系統報告」每 30 分鐘 推一次,一天 48 條同樣內容,即使我修了 P0 #3 假警報,每天的「全系統正常」 重複推送本身就是噪音,讓統帥誤以為告警還在重複。 修法(不違反「監控工具必須被監控」鐵律 — 健康狀態仍每 6h 推 1 次「我活著」): | 狀況 | 推送行為 | |------|---------| | 健康(無 warnings)| 6h 內最多 1 次「我活著」訊號 | | 有 warnings 跟上次同 hash | 跳過 | | 有 warnings 跟上次不同 | 立即推送(新狀況不漏)| | 健康 ↔ 有事 切換 | 自動清掉相反 marker | Redis keys: - `heartbeat:silent_last_sent` — 健康狀態 silent marker, TTL=6h - `heartbeat:warnings_hash` — 上次 warnings 的 md5[:12], TTL=24h 效果:統帥每天從 48 條 heartbeat → ~4 條(健康狀態 4×6h),有事立即推。 Tests: 6 passed (test_heartbeat_dedup_p0_4.py) - healthy_first_send_goes_through - healthy_second_send_within_6h_skipped - warnings_unchanged_skipped - warnings_changed_pushes - warnings_to_healthy_clears_warnings_hash - healthy_to_warnings_clears_silent_marker Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -6211,6 +6211,55 @@ class TelegramGateway:
|
||||
return True
|
||||
|
||||
report = await HeartbeatReportService().collect()
|
||||
|
||||
# 2026-05-03 Claude Opus 4.7 + 統帥 ogt:P0 #4 heartbeat 噪音降頻
|
||||
# 鐵證:原本 30min/次 = 一天 48 條,統帥每天看相同內容 = 變相重複告警
|
||||
# 修法(不違反「監控工具必須被監控」鐵律):
|
||||
# 健康(無 warnings)→ 6h 內最多 1 次「我活著」訊號
|
||||
# 有 warnings 跟上次相同 → 跳過(hash 對比)
|
||||
# 有 warnings 跟上次不同 → 立即推送(新狀況不漏)
|
||||
import hashlib
|
||||
SILENT_REPORT_INTERVAL_HOURS = 6
|
||||
WARNINGS_HASH_TTL = 24 * 3600
|
||||
silent_key = "heartbeat:silent_last_sent"
|
||||
warnings_hash_key = "heartbeat:warnings_hash"
|
||||
|
||||
warnings_str = "|".join(sorted(report.warnings))
|
||||
warnings_hash = hashlib.md5(warnings_str.encode()).hexdigest()[:12]
|
||||
|
||||
if not report.warnings:
|
||||
# 健康狀態:6h 1 次「我活著」訊號
|
||||
if await redis_client.exists(silent_key):
|
||||
logger.debug(
|
||||
"telegram_heartbeat_skipped_silent_recent",
|
||||
slot_id=slot_id,
|
||||
)
|
||||
return True
|
||||
await redis_client.setex(
|
||||
silent_key, SILENT_REPORT_INTERVAL_HOURS * 3600, "1",
|
||||
)
|
||||
# 清掉舊的 warnings hash(從有事 → 健康,下次有事要立即推)
|
||||
await redis_client.delete(warnings_hash_key)
|
||||
else:
|
||||
# 有事:跟上次同 hash 跳過
|
||||
last_hash_raw = await redis_client.get(warnings_hash_key)
|
||||
last_hash = (
|
||||
last_hash_raw.decode() if isinstance(last_hash_raw, bytes)
|
||||
else last_hash_raw
|
||||
)
|
||||
if last_hash == warnings_hash:
|
||||
logger.debug(
|
||||
"telegram_heartbeat_skipped_warnings_unchanged",
|
||||
slot_id=slot_id,
|
||||
warnings_hash=warnings_hash,
|
||||
)
|
||||
return True
|
||||
await redis_client.setex(
|
||||
warnings_hash_key, WARNINGS_HASH_TTL, warnings_hash,
|
||||
)
|
||||
# 清掉 silent marker(從健康 → 有事,下次健康要過 6h 才再推)
|
||||
await redis_client.delete(silent_key)
|
||||
|
||||
text = report_to_telegram_html(report)
|
||||
|
||||
# 只發到 SRE 戰情室群組
|
||||
@@ -6228,6 +6277,7 @@ class TelegramGateway:
|
||||
logger.info(
|
||||
"telegram_heartbeat_sent",
|
||||
warnings=len(report.warnings),
|
||||
warnings_hash=warnings_hash,
|
||||
has_sre_group=bool(settings.SRE_GROUP_CHAT_ID),
|
||||
)
|
||||
|
||||
|
||||
196
apps/api/tests/test_heartbeat_dedup_p0_4.py
Normal file
196
apps/api/tests/test_heartbeat_dedup_p0_4.py
Normal file
@@ -0,0 +1,196 @@
|
||||
"""
|
||||
P0 #4 heartbeat 噪音降頻測試
|
||||
2026-05-03 Claude Opus 4.7 + 統帥 ogt
|
||||
|
||||
驗證 send_heartbeat() 的 dedup 邏輯:
|
||||
健康(無 warnings)→ 6h 內最多 1 次
|
||||
有 warnings 且跟上次同 hash → 跳過
|
||||
有 warnings 且跟上次不同 → 立即推送
|
||||
健康 ↔ 有事 切換時清掉相反 marker
|
||||
|
||||
直接測 telegram_gateway.send_heartbeat(),mock 掉 redis + report + send_to_group。
|
||||
"""
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class FakeRedis:
|
||||
"""模擬 Redis 行為,記錄 set/get/delete 呼叫"""
|
||||
def __init__(self):
|
||||
self._store: dict[str, str] = {}
|
||||
self.set_calls: list[tuple] = []
|
||||
self.delete_calls: list[str] = []
|
||||
|
||||
async def set(self, key: str, value: str, nx: bool = False, ex: int | None = None):
|
||||
if nx and key in self._store:
|
||||
return False
|
||||
self._store[key] = value
|
||||
self.set_calls.append((key, value, nx, ex))
|
||||
return True
|
||||
|
||||
async def setex(self, key: str, ttl: int, value: str):
|
||||
self._store[key] = value
|
||||
self.set_calls.append((key, value, False, ttl))
|
||||
return True
|
||||
|
||||
async def get(self, key: str):
|
||||
return self._store.get(key)
|
||||
|
||||
async def exists(self, key: str):
|
||||
return key in self._store
|
||||
|
||||
async def delete(self, *keys):
|
||||
for k in keys:
|
||||
self._store.pop(k, None)
|
||||
self.delete_calls.append(k)
|
||||
return len(keys)
|
||||
|
||||
def preset(self, key: str, value: str = "1"):
|
||||
"""測試前預設 key"""
|
||||
self._store[key] = value
|
||||
|
||||
|
||||
def _make_report(warnings: list[str] | None = None):
|
||||
"""構造 fake HeartbeatReport"""
|
||||
from datetime import datetime, timezone
|
||||
from src.services.heartbeat_report_service import HeartbeatReport
|
||||
return HeartbeatReport(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
warnings=warnings or [],
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def gateway_with_fake_redis():
|
||||
"""構造 telegram gateway 實例 + 注入 fake redis"""
|
||||
from src.services.telegram_gateway import TelegramGateway
|
||||
gw = TelegramGateway.__new__(TelegramGateway) # 跳過 __init__
|
||||
gw._initialized = True
|
||||
gw._last_message_time = None
|
||||
gw.send_to_group = AsyncMock()
|
||||
gw.send_notification = AsyncMock()
|
||||
|
||||
fake_redis = FakeRedis()
|
||||
return gw, fake_redis
|
||||
|
||||
|
||||
class TestHeartbeatDedup:
|
||||
"""P0 #4 heartbeat 降頻邏輯"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_healthy_first_send_goes_through(self, gateway_with_fake_redis):
|
||||
"""健康狀態第一次推送(無 silent marker)→ 推送"""
|
||||
gw, fake_redis = gateway_with_fake_redis
|
||||
|
||||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||||
return_value="<b>healthy</b>"):
|
||||
MockSvc.return_value.collect = AsyncMock(return_value=_make_report([]))
|
||||
|
||||
result = await gw.send_heartbeat()
|
||||
|
||||
assert result is True
|
||||
assert "heartbeat:silent_last_sent" in fake_redis._store
|
||||
# 應該有呼叫 send_to_group 或 send_notification(其一)
|
||||
assert gw.send_to_group.called or gw.send_notification.called
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_healthy_second_send_within_6h_skipped(self, gateway_with_fake_redis):
|
||||
"""健康狀態 6h 內第二次推送 → 跳過"""
|
||||
gw, fake_redis = gateway_with_fake_redis
|
||||
fake_redis.preset("heartbeat:silent_last_sent") # 模擬已有 silent marker
|
||||
|
||||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||||
return_value="<b>healthy</b>"):
|
||||
MockSvc.return_value.collect = AsyncMock(return_value=_make_report([]))
|
||||
|
||||
result = await gw.send_heartbeat()
|
||||
|
||||
assert result is True
|
||||
# 不該呼叫 send(被跳過)
|
||||
gw.send_to_group.assert_not_called()
|
||||
gw.send_notification.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_warnings_unchanged_skipped(self, gateway_with_fake_redis):
|
||||
"""有 warnings 跟上次同 hash → 跳過"""
|
||||
gw, fake_redis = gateway_with_fake_redis
|
||||
warnings = ["Pod api-x Failed", "Redis: down"]
|
||||
# 預設上次 hash
|
||||
import hashlib
|
||||
warnings_str = "|".join(sorted(warnings))
|
||||
last_hash = hashlib.md5(warnings_str.encode()).hexdigest()[:12]
|
||||
fake_redis.preset("heartbeat:warnings_hash", last_hash)
|
||||
|
||||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||||
return_value="<b>warnings</b>"):
|
||||
MockSvc.return_value.collect = AsyncMock(return_value=_make_report(warnings))
|
||||
|
||||
result = await gw.send_heartbeat()
|
||||
|
||||
assert result is True
|
||||
gw.send_to_group.assert_not_called()
|
||||
gw.send_notification.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_warnings_changed_pushes(self, gateway_with_fake_redis):
|
||||
"""有 warnings 但跟上次不同 → 立即推送"""
|
||||
gw, fake_redis = gateway_with_fake_redis
|
||||
# 預設舊的 hash(跟新 warnings 不同)
|
||||
fake_redis.preset("heartbeat:warnings_hash", "old1234567890")
|
||||
|
||||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||||
return_value="<b>new warnings</b>"):
|
||||
MockSvc.return_value.collect = AsyncMock(
|
||||
return_value=_make_report(["Pod api-y Failed"]) # 新 warnings
|
||||
)
|
||||
|
||||
result = await gw.send_heartbeat()
|
||||
|
||||
assert result is True
|
||||
# 應該推送
|
||||
assert gw.send_to_group.called or gw.send_notification.called
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_warnings_to_healthy_clears_warnings_hash(self, gateway_with_fake_redis):
|
||||
"""從有事 → 健康:清掉 warnings_hash marker,下次有事可立即推"""
|
||||
gw, fake_redis = gateway_with_fake_redis
|
||||
fake_redis.preset("heartbeat:warnings_hash", "old1234567890")
|
||||
|
||||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||||
return_value="<b>healthy</b>"):
|
||||
MockSvc.return_value.collect = AsyncMock(return_value=_make_report([]))
|
||||
|
||||
await gw.send_heartbeat()
|
||||
|
||||
assert "heartbeat:warnings_hash" in fake_redis.delete_calls
|
||||
assert "heartbeat:silent_last_sent" in fake_redis._store
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_healthy_to_warnings_clears_silent_marker(self, gateway_with_fake_redis):
|
||||
"""從健康 → 有事:清掉 silent marker,下次靜默過 6h 才再推"""
|
||||
gw, fake_redis = gateway_with_fake_redis
|
||||
fake_redis.preset("heartbeat:silent_last_sent")
|
||||
|
||||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||||
return_value="<b>warning</b>"):
|
||||
MockSvc.return_value.collect = AsyncMock(
|
||||
return_value=_make_report(["Pod api-z Failed"])
|
||||
)
|
||||
|
||||
await gw.send_heartbeat()
|
||||
|
||||
assert "heartbeat:silent_last_sent" in fake_redis.delete_calls
|
||||
assert "heartbeat:warnings_hash" in fake_redis._store
|
||||
Reference in New Issue
Block a user