From 8fb0c5df338249d9e6ddd74cbd3784d73ecebb17 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sun, 3 May 2026 01:48:57 +0800 Subject: [PATCH] =?UTF-8?q?feat(heartbeat):=20noise=20reduction=20?= =?UTF-8?q?=E2=80=94=20silent=206h=20+=20warnings=20hash=20dedup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P0 #4 (徹底長期修系列) — 統帥鐵證:「INFO | AWOOOI 系統報告」每 30 分鐘 推一次,一天 48 條同樣內容,即使我修了 P0 #3 假警報,每天的「全系統正常」 重複推送本身就是噪音,讓統帥誤以為告警還在重複。 修法(不違反「監控工具必須被監控」鐵律 — 健康狀態仍每 6h 推 1 次「我活著」): | 狀況 | 推送行為 | |------|---------| | 健康(無 warnings)| 6h 內最多 1 次「我活著」訊號 | | 有 warnings 跟上次同 hash | 跳過 | | 有 warnings 跟上次不同 | 立即推送(新狀況不漏)| | 健康 ↔ 有事 切換 | 自動清掉相反 marker | Redis keys: - `heartbeat:silent_last_sent` — 健康狀態 silent marker, TTL=6h - `heartbeat:warnings_hash` — 上次 warnings 的 md5[:12], TTL=24h 效果:統帥每天從 48 條 heartbeat → ~4 條(健康狀態 4×6h),有事立即推。 Tests: 6 passed (test_heartbeat_dedup_p0_4.py) - healthy_first_send_goes_through - healthy_second_send_within_6h_skipped - warnings_unchanged_skipped - warnings_changed_pushes - warnings_to_healthy_clears_warnings_hash - healthy_to_warnings_clears_silent_marker Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/api/src/services/telegram_gateway.py | 50 +++++ apps/api/tests/test_heartbeat_dedup_p0_4.py | 196 ++++++++++++++++++++ 2 files changed, 246 insertions(+) create mode 100644 apps/api/tests/test_heartbeat_dedup_p0_4.py diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index 6bb2ad8b..17e3fcd2 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -6211,6 +6211,55 @@ class TelegramGateway: return True report = await HeartbeatReportService().collect() + + # 2026-05-03 Claude Opus 4.7 + 統帥 ogt:P0 #4 heartbeat 噪音降頻 + # 鐵證:原本 30min/次 = 一天 48 條,統帥每天看相同內容 = 變相重複告警 + # 修法(不違反「監控工具必須被監控」鐵律): + # 健康(無 warnings)→ 6h 內最多 1 次「我活著」訊號 + # 有 warnings 跟上次相同 → 跳過(hash 對比) + # 有 warnings 跟上次不同 → 立即推送(新狀況不漏) + import hashlib + SILENT_REPORT_INTERVAL_HOURS = 6 + WARNINGS_HASH_TTL = 24 * 3600 + silent_key = "heartbeat:silent_last_sent" + warnings_hash_key = "heartbeat:warnings_hash" + + warnings_str = "|".join(sorted(report.warnings)) + warnings_hash = hashlib.md5(warnings_str.encode()).hexdigest()[:12] + + if not report.warnings: + # 健康狀態:6h 1 次「我活著」訊號 + if await redis_client.exists(silent_key): + logger.debug( + "telegram_heartbeat_skipped_silent_recent", + slot_id=slot_id, + ) + return True + await redis_client.setex( + silent_key, SILENT_REPORT_INTERVAL_HOURS * 3600, "1", + ) + # 清掉舊的 warnings hash(從有事 → 健康,下次有事要立即推) + await redis_client.delete(warnings_hash_key) + else: + # 有事:跟上次同 hash 跳過 + last_hash_raw = await redis_client.get(warnings_hash_key) + last_hash = ( + last_hash_raw.decode() if isinstance(last_hash_raw, bytes) + else last_hash_raw + ) + if last_hash == warnings_hash: + logger.debug( + "telegram_heartbeat_skipped_warnings_unchanged", + slot_id=slot_id, + warnings_hash=warnings_hash, + ) + return True + await redis_client.setex( + warnings_hash_key, WARNINGS_HASH_TTL, warnings_hash, + ) + # 清掉 silent marker(從健康 → 有事,下次健康要過 6h 才再推) + await redis_client.delete(silent_key) + text = report_to_telegram_html(report) # 只發到 SRE 戰情室群組 @@ -6228,6 +6277,7 @@ class TelegramGateway: logger.info( "telegram_heartbeat_sent", warnings=len(report.warnings), + warnings_hash=warnings_hash, has_sre_group=bool(settings.SRE_GROUP_CHAT_ID), ) diff --git a/apps/api/tests/test_heartbeat_dedup_p0_4.py b/apps/api/tests/test_heartbeat_dedup_p0_4.py new file mode 100644 index 00000000..ceee594a --- /dev/null +++ b/apps/api/tests/test_heartbeat_dedup_p0_4.py @@ -0,0 +1,196 @@ +""" +P0 #4 heartbeat 噪音降頻測試 +2026-05-03 Claude Opus 4.7 + 統帥 ogt + +驗證 send_heartbeat() 的 dedup 邏輯: + 健康(無 warnings)→ 6h 內最多 1 次 + 有 warnings 且跟上次同 hash → 跳過 + 有 warnings 且跟上次不同 → 立即推送 + 健康 ↔ 有事 切換時清掉相反 marker + +直接測 telegram_gateway.send_heartbeat(),mock 掉 redis + report + send_to_group。 +""" +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +class FakeRedis: + """模擬 Redis 行為,記錄 set/get/delete 呼叫""" + def __init__(self): + self._store: dict[str, str] = {} + self.set_calls: list[tuple] = [] + self.delete_calls: list[str] = [] + + async def set(self, key: str, value: str, nx: bool = False, ex: int | None = None): + if nx and key in self._store: + return False + self._store[key] = value + self.set_calls.append((key, value, nx, ex)) + return True + + async def setex(self, key: str, ttl: int, value: str): + self._store[key] = value + self.set_calls.append((key, value, False, ttl)) + return True + + async def get(self, key: str): + return self._store.get(key) + + async def exists(self, key: str): + return key in self._store + + async def delete(self, *keys): + for k in keys: + self._store.pop(k, None) + self.delete_calls.append(k) + return len(keys) + + def preset(self, key: str, value: str = "1"): + """測試前預設 key""" + self._store[key] = value + + +def _make_report(warnings: list[str] | None = None): + """構造 fake HeartbeatReport""" + from datetime import datetime, timezone + from src.services.heartbeat_report_service import HeartbeatReport + return HeartbeatReport( + timestamp=datetime.now(timezone.utc), + warnings=warnings or [], + ) + + +@pytest.fixture +def gateway_with_fake_redis(): + """構造 telegram gateway 實例 + 注入 fake redis""" + from src.services.telegram_gateway import TelegramGateway + gw = TelegramGateway.__new__(TelegramGateway) # 跳過 __init__ + gw._initialized = True + gw._last_message_time = None + gw.send_to_group = AsyncMock() + gw.send_notification = AsyncMock() + + fake_redis = FakeRedis() + return gw, fake_redis + + +class TestHeartbeatDedup: + """P0 #4 heartbeat 降頻邏輯""" + + @pytest.mark.asyncio + async def test_healthy_first_send_goes_through(self, gateway_with_fake_redis): + """健康狀態第一次推送(無 silent marker)→ 推送""" + gw, fake_redis = gateway_with_fake_redis + + with patch("src.core.redis_client.get_redis", return_value=fake_redis), \ + patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \ + patch("src.services.heartbeat_report_service.report_to_telegram_html", + return_value="healthy"): + MockSvc.return_value.collect = AsyncMock(return_value=_make_report([])) + + result = await gw.send_heartbeat() + + assert result is True + assert "heartbeat:silent_last_sent" in fake_redis._store + # 應該有呼叫 send_to_group 或 send_notification(其一) + assert gw.send_to_group.called or gw.send_notification.called + + @pytest.mark.asyncio + async def test_healthy_second_send_within_6h_skipped(self, gateway_with_fake_redis): + """健康狀態 6h 內第二次推送 → 跳過""" + gw, fake_redis = gateway_with_fake_redis + fake_redis.preset("heartbeat:silent_last_sent") # 模擬已有 silent marker + + with patch("src.core.redis_client.get_redis", return_value=fake_redis), \ + patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \ + patch("src.services.heartbeat_report_service.report_to_telegram_html", + return_value="healthy"): + MockSvc.return_value.collect = AsyncMock(return_value=_make_report([])) + + result = await gw.send_heartbeat() + + assert result is True + # 不該呼叫 send(被跳過) + gw.send_to_group.assert_not_called() + gw.send_notification.assert_not_called() + + @pytest.mark.asyncio + async def test_warnings_unchanged_skipped(self, gateway_with_fake_redis): + """有 warnings 跟上次同 hash → 跳過""" + gw, fake_redis = gateway_with_fake_redis + warnings = ["Pod api-x Failed", "Redis: down"] + # 預設上次 hash + import hashlib + warnings_str = "|".join(sorted(warnings)) + last_hash = hashlib.md5(warnings_str.encode()).hexdigest()[:12] + fake_redis.preset("heartbeat:warnings_hash", last_hash) + + with patch("src.core.redis_client.get_redis", return_value=fake_redis), \ + patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \ + patch("src.services.heartbeat_report_service.report_to_telegram_html", + return_value="warnings"): + MockSvc.return_value.collect = AsyncMock(return_value=_make_report(warnings)) + + result = await gw.send_heartbeat() + + assert result is True + gw.send_to_group.assert_not_called() + gw.send_notification.assert_not_called() + + @pytest.mark.asyncio + async def test_warnings_changed_pushes(self, gateway_with_fake_redis): + """有 warnings 但跟上次不同 → 立即推送""" + gw, fake_redis = gateway_with_fake_redis + # 預設舊的 hash(跟新 warnings 不同) + fake_redis.preset("heartbeat:warnings_hash", "old1234567890") + + with patch("src.core.redis_client.get_redis", return_value=fake_redis), \ + patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \ + patch("src.services.heartbeat_report_service.report_to_telegram_html", + return_value="new warnings"): + MockSvc.return_value.collect = AsyncMock( + return_value=_make_report(["Pod api-y Failed"]) # 新 warnings + ) + + result = await gw.send_heartbeat() + + assert result is True + # 應該推送 + assert gw.send_to_group.called or gw.send_notification.called + + @pytest.mark.asyncio + async def test_warnings_to_healthy_clears_warnings_hash(self, gateway_with_fake_redis): + """從有事 → 健康:清掉 warnings_hash marker,下次有事可立即推""" + gw, fake_redis = gateway_with_fake_redis + fake_redis.preset("heartbeat:warnings_hash", "old1234567890") + + with patch("src.core.redis_client.get_redis", return_value=fake_redis), \ + patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \ + patch("src.services.heartbeat_report_service.report_to_telegram_html", + return_value="healthy"): + MockSvc.return_value.collect = AsyncMock(return_value=_make_report([])) + + await gw.send_heartbeat() + + assert "heartbeat:warnings_hash" in fake_redis.delete_calls + assert "heartbeat:silent_last_sent" in fake_redis._store + + @pytest.mark.asyncio + async def test_healthy_to_warnings_clears_silent_marker(self, gateway_with_fake_redis): + """從健康 → 有事:清掉 silent marker,下次靜默過 6h 才再推""" + gw, fake_redis = gateway_with_fake_redis + fake_redis.preset("heartbeat:silent_last_sent") + + with patch("src.core.redis_client.get_redis", return_value=fake_redis), \ + patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \ + patch("src.services.heartbeat_report_service.report_to_telegram_html", + return_value="warning"): + MockSvc.return_value.collect = AsyncMock( + return_value=_make_report(["Pod api-z Failed"]) + ) + + await gw.send_heartbeat() + + assert "heartbeat:silent_last_sent" in fake_redis.delete_calls + assert "heartbeat:warnings_hash" in fake_redis._store