Some checks failed
Code Review / ai-code-review (push) Successful in 14s
Deploy Alert Rules / Deploy Prometheus Alert Rules (push) Successful in 31s
CD Pipeline / tests (push) Successful in 1m59s
CD Pipeline / build-and-deploy (push) Successful in 7m36s
CD Pipeline / post-deploy-checks (push) Failing after 43s
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled
258 lines
9.8 KiB
Python
258 lines
9.8 KiB
Python
"""
|
||
P0 #4 heartbeat 噪音降頻測試
|
||
2026-05-03 Claude Opus 4.7 + 統帥 ogt
|
||
|
||
驗證 send_heartbeat() 的低噪音邏輯:
|
||
健康(無 warnings)→ 不即時推 Telegram,只留 metrics/log/每日摘要
|
||
有 warnings 且跟上次同 hash → 跳過
|
||
有 warnings 且跟上次不同 → 立即推送
|
||
warnings 消失 → 只推一次恢復通知
|
||
|
||
直接測 telegram_gateway.send_heartbeat(),mock 掉 redis + report + send_to_group。
|
||
"""
|
||
from unittest.mock import AsyncMock, MagicMock, patch
|
||
|
||
import pytest
|
||
|
||
|
||
class FakeRedis:
|
||
"""模擬 Redis 行為,記錄 set/get/delete 呼叫"""
|
||
def __init__(self):
|
||
self._store: dict[str, str] = {}
|
||
self.set_calls: list[tuple] = []
|
||
self.delete_calls: list[str] = []
|
||
|
||
async def set(self, key: str, value: str, nx: bool = False, ex: int | None = None):
|
||
if nx and key in self._store:
|
||
return False
|
||
self._store[key] = value
|
||
self.set_calls.append((key, value, nx, ex))
|
||
return True
|
||
|
||
async def setex(self, key: str, ttl: int, value: str):
|
||
self._store[key] = value
|
||
self.set_calls.append((key, value, False, ttl))
|
||
return True
|
||
|
||
async def get(self, key: str):
|
||
return self._store.get(key)
|
||
|
||
async def exists(self, key: str):
|
||
return key in self._store
|
||
|
||
async def delete(self, *keys):
|
||
for k in keys:
|
||
self._store.pop(k, None)
|
||
self.delete_calls.append(k)
|
||
return len(keys)
|
||
|
||
def preset(self, key: str, value: str = "1"):
|
||
"""測試前預設 key"""
|
||
self._store[key] = value
|
||
|
||
|
||
def _make_report(warnings: list[str] | None = None):
|
||
"""構造 fake HeartbeatReport"""
|
||
from datetime import datetime, timezone
|
||
from src.services.heartbeat_report_service import HeartbeatReport
|
||
return HeartbeatReport(
|
||
timestamp=datetime.now(timezone.utc),
|
||
warnings=warnings or [],
|
||
)
|
||
|
||
|
||
@pytest.fixture
|
||
def sre_group_configured(monkeypatch):
|
||
"""Heartbeat 正式推送只能在 AwoooI SRE 戰情室設定存在時成立。"""
|
||
from src.services.telegram_gateway import settings
|
||
|
||
monkeypatch.setattr(settings, "SRE_GROUP_CHAT_ID", "-1003711974679")
|
||
|
||
|
||
@pytest.fixture
|
||
def gateway_with_fake_redis():
|
||
"""構造 telegram gateway 實例 + 注入 fake redis"""
|
||
from src.services.telegram_gateway import TelegramGateway
|
||
gw = TelegramGateway.__new__(TelegramGateway) # 跳過 __init__
|
||
gw._initialized = True
|
||
gw._last_message_time = None
|
||
gw.send_to_group = AsyncMock()
|
||
gw.send_notification = AsyncMock()
|
||
|
||
fake_redis = FakeRedis()
|
||
return gw, fake_redis
|
||
|
||
|
||
class TestHeartbeatDedup:
|
||
"""P0 #4 heartbeat 降頻邏輯"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_healthy_first_send_is_suppressed(
|
||
self,
|
||
gateway_with_fake_redis,
|
||
sre_group_configured,
|
||
):
|
||
"""健康狀態第一次檢查 → 不推 Telegram,只記錄 suppressed marker"""
|
||
gw, fake_redis = gateway_with_fake_redis
|
||
|
||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||
return_value="<b>healthy</b>"):
|
||
MockSvc.return_value.collect = AsyncMock(return_value=_make_report([]))
|
||
|
||
result = await gw.send_heartbeat()
|
||
|
||
assert result is True
|
||
assert "heartbeat:healthy_suppressed_last_seen" in fake_redis._store
|
||
gw.send_to_group.assert_not_called()
|
||
gw.send_notification.assert_not_called()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_healthy_second_send_stays_suppressed(
|
||
self,
|
||
gateway_with_fake_redis,
|
||
sre_group_configured,
|
||
):
|
||
"""健康狀態第二次檢查 → 仍不推 Telegram"""
|
||
gw, fake_redis = gateway_with_fake_redis
|
||
fake_redis.preset("heartbeat:healthy_suppressed_last_seen")
|
||
|
||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||
return_value="<b>healthy</b>"):
|
||
MockSvc.return_value.collect = AsyncMock(return_value=_make_report([]))
|
||
|
||
result = await gw.send_heartbeat()
|
||
|
||
assert result is True
|
||
gw.send_to_group.assert_not_called()
|
||
gw.send_notification.assert_not_called()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_warnings_unchanged_skipped(
|
||
self,
|
||
gateway_with_fake_redis,
|
||
sre_group_configured,
|
||
):
|
||
"""有 warnings 跟上次同 hash → 跳過"""
|
||
gw, fake_redis = gateway_with_fake_redis
|
||
warnings = ["Pod api-x Failed", "Redis: down"]
|
||
# 預設上次 hash
|
||
from src.services.telegram_gateway import _heartbeat_warnings_hash
|
||
|
||
last_hash = _heartbeat_warnings_hash(warnings)
|
||
fake_redis.preset("heartbeat:warnings_hash", last_hash)
|
||
|
||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||
return_value="<b>warnings</b>"):
|
||
MockSvc.return_value.collect = AsyncMock(return_value=_make_report(warnings))
|
||
|
||
result = await gw.send_heartbeat()
|
||
|
||
assert result is True
|
||
gw.send_to_group.assert_not_called()
|
||
gw.send_notification.assert_not_called()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_warnings_with_same_actionable_condition_are_skipped(
|
||
self,
|
||
gateway_with_fake_redis,
|
||
sre_group_configured,
|
||
):
|
||
"""同一可處置 warning 即使 HTTP / timeout / latency 變動也不重複洗版"""
|
||
gw, fake_redis = gateway_with_fake_redis
|
||
from src.services.telegram_gateway import _heartbeat_warnings_hash
|
||
|
||
fake_redis.preset(
|
||
"heartbeat:warnings_hash",
|
||
_heartbeat_warnings_hash(["Ollama 111 異常: ❌ HTTP 502 124ms"]),
|
||
)
|
||
|
||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||
return_value="<b>warnings</b>"):
|
||
MockSvc.return_value.collect = AsyncMock(
|
||
return_value=_make_report(["Ollama 111 異常: ❌ HTTP 504 236ms"])
|
||
)
|
||
|
||
result = await gw.send_heartbeat()
|
||
|
||
assert result is True
|
||
gw.send_to_group.assert_not_called()
|
||
gw.send_notification.assert_not_called()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_warnings_changed_pushes(
|
||
self,
|
||
gateway_with_fake_redis,
|
||
sre_group_configured,
|
||
):
|
||
"""有 warnings 但跟上次不同 → 立即推送"""
|
||
gw, fake_redis = gateway_with_fake_redis
|
||
# 預設舊的 hash(跟新 warnings 不同)
|
||
fake_redis.preset("heartbeat:warnings_hash", "old1234567890")
|
||
|
||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||
return_value="<b>new warnings</b>"):
|
||
MockSvc.return_value.collect = AsyncMock(
|
||
return_value=_make_report(["Pod api-y Failed"]) # 新 warnings
|
||
)
|
||
|
||
result = await gw.send_heartbeat()
|
||
|
||
assert result is True
|
||
gw.send_to_group.assert_called_once()
|
||
gw.send_notification.assert_not_called()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_warnings_to_healthy_clears_warnings_hash(
|
||
self,
|
||
gateway_with_fake_redis,
|
||
sre_group_configured,
|
||
):
|
||
"""從有事 → 健康:清掉 warnings_hash marker,下次有事可立即推"""
|
||
gw, fake_redis = gateway_with_fake_redis
|
||
fake_redis.preset("heartbeat:warnings_hash", "old1234567890")
|
||
|
||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||
return_value="<b>healthy</b>"):
|
||
MockSvc.return_value.collect = AsyncMock(return_value=_make_report([]))
|
||
|
||
await gw.send_heartbeat()
|
||
|
||
assert "heartbeat:warnings_hash" in fake_redis.delete_calls
|
||
gw.send_to_group.assert_called_once()
|
||
gw.send_notification.assert_not_called()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_healthy_to_warnings_clears_suppressed_marker(
|
||
self,
|
||
gateway_with_fake_redis,
|
||
sre_group_configured,
|
||
):
|
||
"""從健康 → 有事:清掉 healthy suppressed marker,並推送新 warning"""
|
||
gw, fake_redis = gateway_with_fake_redis
|
||
fake_redis.preset("heartbeat:healthy_suppressed_last_seen")
|
||
|
||
with patch("src.core.redis_client.get_redis", return_value=fake_redis), \
|
||
patch("src.services.heartbeat_report_service.HeartbeatReportService") as MockSvc, \
|
||
patch("src.services.heartbeat_report_service.report_to_telegram_html",
|
||
return_value="<b>warning</b>"):
|
||
MockSvc.return_value.collect = AsyncMock(
|
||
return_value=_make_report(["Pod api-z Failed"])
|
||
)
|
||
|
||
await gw.send_heartbeat()
|
||
|
||
assert "heartbeat:healthy_suppressed_last_seen" in fake_redis.delete_calls
|
||
assert "heartbeat:warnings_hash" in fake_redis._store
|