Files
awoooi/apps/api/tests/test_heartbeat_dedup_p0_4.py
Your Name 2ec7f6f440
Some checks failed
Code Review / ai-code-review (push) Successful in 14s
Deploy Alert Rules / Deploy Prometheus Alert Rules (push) Successful in 31s
CD Pipeline / tests (push) Successful in 1m59s
CD Pipeline / build-and-deploy (push) Successful in 7m36s
CD Pipeline / post-deploy-checks (push) Failing after 43s
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
fix(ops): harden heartbeat and momo alert noise
2026-06-24 19:38:33 +08:00

258 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
P0 #4 heartbeat 噪音降頻測試
2026-05-03 Claude Opus 4.7 + 統帥 ogt
驗證 send_heartbeat() 的低噪音邏輯:
健康(無 warnings→ 不即時推 Telegram只留 metrics/log/每日摘要
有 warnings 且跟上次同 hash → 跳過
有 warnings 且跟上次不同 → 立即推送
warnings 消失 → 只推一次恢復通知
直接測 telegram_gateway.send_heartbeat()mock 掉 redis + report + send_to_group。
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
class FakeRedis:
"""模擬 Redis 行為,記錄 set/get/delete 呼叫"""
def __init__(self):
self._store: dict[str, str] = {}
self.set_calls: list[tuple] = []
self.delete_calls: list[str] = []
async def set(self, key: str, value: str, nx: bool = False, ex: int | None = None):
if nx and key in self._store:
return False
self._store[key] = value
self.set_calls.append((key, value, nx, ex))
return True
async def setex(self, key: str, ttl: int, value: str):
self._store[key] = value
self.set_calls.append((key, value, False, ttl))
return True
async def get(self, key: str):
return self._store.get(key)
async def exists(self, key: str):
return key in self._store
async def delete(self, *keys):
for k in keys:
self._store.pop(k, None)
self.delete_calls.append(k)
return len(keys)
def preset(self, key: str, value: str = "1"):
"""測試前預設 key"""
self._store[key] = value
def _make_report(warnings: list[str] | None = None):
"""構造 fake HeartbeatReport"""
from datetime import datetime, timezone
from src.services.heartbeat_report_service import HeartbeatReport
return HeartbeatReport(
timestamp=datetime.now(timezone.utc),
warnings=warnings or [],
)
@pytest.fixture
def sre_group_configured(monkeypatch):
"""Heartbeat 正式推送只能在 AwoooI SRE 戰情室設定存在時成立。"""
from src.services.telegram_gateway import settings
monkeypatch.setattr(settings, "SRE_GROUP_CHAT_ID", "-1003711974679")
@pytest.fixture
def gateway_with_fake_redis():
"""構造 telegram gateway 實例 + 注入 fake redis"""
from src.services.telegram_gateway import TelegramGateway
gw = TelegramGateway.__new__(TelegramGateway) # 跳過 __init__
gw._initialized = True
gw._last_message_time = None
gw.send_to_group = AsyncMock()
gw.send_notification = AsyncMock()
fake_redis = FakeRedis()
return gw, fake_redis
class TestHeartbeatDedup:
"""P0 #4 heartbeat 降頻邏輯"""
@pytest.mark.asyncio
async def test_healthy_first_send_is_suppressed(
self,
gateway_with_fake_redis,
sre_group_configured,
):
"""健康狀態第一次檢查 → 不推 Telegram只記錄 suppressed marker"""
gw, fake_redis = gateway_with_fake_redis
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
patch("src.services.heartbeat_report_service.report_to_telegram_html",
return_value="<b>healthy</b>"):
MockSvc.return_value.collect = AsyncMock(return_value=_make_report([]))
result = await gw.send_heartbeat()
assert result is True
assert "heartbeat:healthy_suppressed_last_seen" in fake_redis._store
gw.send_to_group.assert_not_called()
gw.send_notification.assert_not_called()
@pytest.mark.asyncio
async def test_healthy_second_send_stays_suppressed(
self,
gateway_with_fake_redis,
sre_group_configured,
):
"""健康狀態第二次檢查 → 仍不推 Telegram"""
gw, fake_redis = gateway_with_fake_redis
fake_redis.preset("heartbeat:healthy_suppressed_last_seen")
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
patch("src.services.heartbeat_report_service.report_to_telegram_html",
return_value="<b>healthy</b>"):
MockSvc.return_value.collect = AsyncMock(return_value=_make_report([]))
result = await gw.send_heartbeat()
assert result is True
gw.send_to_group.assert_not_called()
gw.send_notification.assert_not_called()
@pytest.mark.asyncio
async def test_warnings_unchanged_skipped(
self,
gateway_with_fake_redis,
sre_group_configured,
):
"""有 warnings 跟上次同 hash → 跳過"""
gw, fake_redis = gateway_with_fake_redis
warnings = ["Pod api-x Failed", "Redis: down"]
# 預設上次 hash
from src.services.telegram_gateway import _heartbeat_warnings_hash
last_hash = _heartbeat_warnings_hash(warnings)
fake_redis.preset("heartbeat:warnings_hash", last_hash)
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
patch("src.services.heartbeat_report_service.report_to_telegram_html",
return_value="<b>warnings</b>"):
MockSvc.return_value.collect = AsyncMock(return_value=_make_report(warnings))
result = await gw.send_heartbeat()
assert result is True
gw.send_to_group.assert_not_called()
gw.send_notification.assert_not_called()
@pytest.mark.asyncio
async def test_warnings_with_same_actionable_condition_are_skipped(
self,
gateway_with_fake_redis,
sre_group_configured,
):
"""同一可處置 warning 即使 HTTP / timeout / latency 變動也不重複洗版"""
gw, fake_redis = gateway_with_fake_redis
from src.services.telegram_gateway import _heartbeat_warnings_hash
fake_redis.preset(
"heartbeat:warnings_hash",
_heartbeat_warnings_hash(["Ollama 111 異常: ❌ HTTP 502 124ms"]),
)
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
patch("src.services.heartbeat_report_service.report_to_telegram_html",
return_value="<b>warnings</b>"):
MockSvc.return_value.collect = AsyncMock(
return_value=_make_report(["Ollama 111 異常: ❌ HTTP 504 236ms"])
)
result = await gw.send_heartbeat()
assert result is True
gw.send_to_group.assert_not_called()
gw.send_notification.assert_not_called()
@pytest.mark.asyncio
async def test_warnings_changed_pushes(
self,
gateway_with_fake_redis,
sre_group_configured,
):
"""有 warnings 但跟上次不同 → 立即推送"""
gw, fake_redis = gateway_with_fake_redis
# 預設舊的 hash跟新 warnings 不同)
fake_redis.preset("heartbeat:warnings_hash", "old1234567890")
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
patch("src.services.heartbeat_report_service.report_to_telegram_html",
return_value="<b>new warnings</b>"):
MockSvc.return_value.collect = AsyncMock(
return_value=_make_report(["Pod api-y Failed"]) # 新 warnings
)
result = await gw.send_heartbeat()
assert result is True
gw.send_to_group.assert_called_once()
gw.send_notification.assert_not_called()
@pytest.mark.asyncio
async def test_warnings_to_healthy_clears_warnings_hash(
self,
gateway_with_fake_redis,
sre_group_configured,
):
"""從有事 → 健康:清掉 warnings_hash marker下次有事可立即推"""
gw, fake_redis = gateway_with_fake_redis
fake_redis.preset("heartbeat:warnings_hash", "old1234567890")
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
patch("src.services.heartbeat_report_service.report_to_telegram_html",
return_value="<b>healthy</b>"):
MockSvc.return_value.collect = AsyncMock(return_value=_make_report([]))
await gw.send_heartbeat()
assert "heartbeat:warnings_hash" in fake_redis.delete_calls
gw.send_to_group.assert_called_once()
gw.send_notification.assert_not_called()
@pytest.mark.asyncio
async def test_healthy_to_warnings_clears_suppressed_marker(
self,
gateway_with_fake_redis,
sre_group_configured,
):
"""從健康 → 有事:清掉 healthy suppressed marker並推送新 warning"""
gw, fake_redis = gateway_with_fake_redis
fake_redis.preset("heartbeat:healthy_suppressed_last_seen")
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
patch("src.services.heartbeat_report_service.report_to_telegram_html",
return_value="<b>warning</b>"):
MockSvc.return_value.collect = AsyncMock(
return_value=_make_report(["Pod api-z Failed"])
)
await gw.send_heartbeat()
assert "heartbeat:healthy_suppressed_last_seen" in fake_redis.delete_calls
assert "heartbeat:warnings_hash" in fake_redis._store