Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 1m32s
P1.5 收尾(status 文件 line 96-99 指定):
整合點 3 — failover_manager Gemini quota 告警觸發:
- ollama_failover_manager.py: _check_gemini_quota 返回 False 時呼叫
alerter.alert_gemini_quota_exceeded({quota, current_count})
- 從 Redis 讀 ollama:gemini_daily_count:{date} 取 current_count(fail-soft)
- alerter 內 24h dedup(QUOTA_DEDUP_TTL_SEC=86400),每日只發一次
- try/except 包裹:告警失敗 fail-open,不阻斷 routing
整合點 4 — main.py lifespan 注入 Redis client:
- 在 _recovery_svc.start() 之後、yield 之前
- 呼叫 configure_alerter(get_redis()) 替換 singleton 注入 dedup 能力
- try/except 包裹:注入失敗 fail-open(alerter 仍可工作但 dedup 失效)
新測試 (174 行, 6/6 pass):
- test_alert_failover_dedup: 同 to_provider 第二次被 10min dedup ✅
- test_alert_recovery_send: 正常發送 + Markdown 訊息 + 連續 N 次 HEALTHY ✅
- test_no_telegram_chat_id_noop: chat_id 缺時 fail-soft 不 raise ✅
- test_quota_alert_dedup_24h: TTL=86400s,訊息含 quota+count ✅
- test_configure_alerter_replaces_singleton: lifespan 注入後 redis 可用 ✅
- test_dedup_fail_open_when_no_redis: Redis None → 允許送出 ✅
Mock 注意:_send() inline import telegram_gateway/get_settings,
mock target 必須是 src.services.telegram_gateway / src.core.config
而非 alerter module 自己。
回歸:原 37 ollama_failover_manager + 3 lifespan_wiring 測試全綠。
飛輪自主化分數:~75 → 預估 ~80(配額耗盡有告警,運維可見性 +5)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
175 lines
6.2 KiB
Python
175 lines
6.2 KiB
Python
"""FailoverAlerter 單元測試 — P1.5 Telegram 容災告警
|
||
|
||
四大 testcase(覆蓋 status 文件 line 99 指定範圍):
|
||
1. test_alert_failover_dedup — 同 to_provider 第二次被 10min dedup
|
||
2. test_alert_recovery_send — 正常發送 + Markdown 訊息結構
|
||
3. test_no_telegram_chat_id_noop — chat_id 缺失時不發送(fail-soft)
|
||
4. test_quota_alert_dedup_24h — quota 告警 86400s TTL(每日一次)
|
||
|
||
2026-04-26 P1.5 補測 by Claude Opus 4.7
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from unittest.mock import AsyncMock, MagicMock, patch
|
||
|
||
import pytest
|
||
|
||
from src.services.failover_alerter import (
|
||
DEDUP_TTL_SEC,
|
||
QUOTA_DEDUP_TTL_SEC,
|
||
FailoverAlerter,
|
||
configure_alerter,
|
||
get_failover_alerter,
|
||
reset_failover_alerter,
|
||
)
|
||
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def _reset_singleton():
|
||
"""每個 test 前後重置 singleton,避免 state 洩漏"""
|
||
reset_failover_alerter()
|
||
yield
|
||
reset_failover_alerter()
|
||
|
||
|
||
@pytest.fixture
|
||
def mock_redis():
|
||
"""Mock Redis:set 第一次回 True(NX 成功),第二次回 None(已存在)"""
|
||
redis = MagicMock()
|
||
redis.set = AsyncMock(side_effect=[True, None, True, None])
|
||
return redis
|
||
|
||
|
||
@pytest.fixture
|
||
def mock_telegram_send():
|
||
"""Mock TelegramGateway.send_notification + settings.OPENCLAW_TG_CHAT_ID
|
||
|
||
`_send()` 在函式內 inline import,必須 mock 來源 module 而非 alerter module。
|
||
"""
|
||
with patch("src.services.telegram_gateway.get_telegram_gateway") as mock_gw, \
|
||
patch("src.core.config.get_settings") as mock_settings:
|
||
gateway = MagicMock()
|
||
gateway.send_notification = AsyncMock()
|
||
mock_gw.return_value = gateway
|
||
mock_settings.return_value = MagicMock(OPENCLAW_TG_CHAT_ID="-100123")
|
||
yield gateway
|
||
|
||
|
||
# =============================================================================
|
||
# Case 1: failover dedup(同 to_provider 第二次被攔)
|
||
# =============================================================================
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_alert_failover_dedup(mock_redis, mock_telegram_send):
|
||
alerter = FailoverAlerter(redis_client=mock_redis)
|
||
|
||
event = {
|
||
"to_provider": "gemini",
|
||
"reason": "111 unhealthy",
|
||
"model": "qwen3:8b",
|
||
"fallback_chain_str": "gemini → ollama_188",
|
||
}
|
||
|
||
# 第 1 次:dedup pass,發送
|
||
await alerter.alert_failover(event)
|
||
assert mock_telegram_send.send_notification.await_count == 1
|
||
|
||
# 第 2 次:dedup hit,不發送
|
||
await alerter.alert_failover(event)
|
||
assert mock_telegram_send.send_notification.await_count == 1 # 仍是 1
|
||
|
||
# 驗證 dedup TTL = 10min
|
||
assert mock_redis.set.await_args_list[0].kwargs["ex"] == DEDUP_TTL_SEC
|
||
assert mock_redis.set.await_args_list[0].kwargs["nx"] is True
|
||
|
||
|
||
# =============================================================================
|
||
# Case 2: recovery 正常發送
|
||
# =============================================================================
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_alert_recovery_send(mock_redis, mock_telegram_send):
|
||
alerter = FailoverAlerter(redis_client=mock_redis)
|
||
|
||
await alerter.alert_recovery({
|
||
"from_provider": "gemini",
|
||
"to_provider": "ollama_111",
|
||
"stable_count": 3,
|
||
})
|
||
|
||
assert mock_telegram_send.send_notification.await_count == 1
|
||
sent_kwargs = mock_telegram_send.send_notification.await_args.kwargs
|
||
assert sent_kwargs["parse_mode"] == "MarkdownV2"
|
||
# 訊息應提及恢復 + 連續 3 次 HEALTHY
|
||
assert "Ollama 自動恢復" in sent_kwargs["text"]
|
||
assert "連續 3" in sent_kwargs["text"]
|
||
|
||
|
||
# =============================================================================
|
||
# Case 3: chat_id 缺失 → fail-soft(不發送,不 raise)
|
||
# =============================================================================
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_no_telegram_chat_id_noop(mock_redis):
|
||
alerter = FailoverAlerter(redis_client=mock_redis)
|
||
|
||
with patch("src.services.telegram_gateway.get_telegram_gateway") as mock_gw, \
|
||
patch("src.core.config.get_settings") as mock_settings:
|
||
gateway = MagicMock()
|
||
gateway.send_notification = AsyncMock()
|
||
mock_gw.return_value = gateway
|
||
mock_settings.return_value = MagicMock(OPENCLAW_TG_CHAT_ID=None)
|
||
|
||
# 不該 raise,dedup pass 但 send 因 chat_id 缺直接 return
|
||
await alerter.alert_failover({"to_provider": "gemini"})
|
||
assert gateway.send_notification.await_count == 0
|
||
|
||
|
||
# =============================================================================
|
||
# Case 4: quota 告警 24h dedup
|
||
# =============================================================================
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_quota_alert_dedup_24h(mock_redis, mock_telegram_send):
|
||
alerter = FailoverAlerter(redis_client=mock_redis)
|
||
|
||
await alerter.alert_gemini_quota_exceeded({
|
||
"quota": 1000,
|
||
"current_count": 1003,
|
||
})
|
||
|
||
# 訊息發出
|
||
assert mock_telegram_send.send_notification.await_count == 1
|
||
sent = mock_telegram_send.send_notification.await_args.kwargs["text"]
|
||
assert "Gemini 每日配額耗盡" in sent
|
||
assert "1000" in sent
|
||
assert "1003" in sent
|
||
|
||
# 驗證 dedup TTL = 24h
|
||
assert mock_redis.set.await_args_list[0].kwargs["ex"] == QUOTA_DEDUP_TTL_SEC
|
||
assert QUOTA_DEDUP_TTL_SEC == 86400 # sanity check 常數本身
|
||
|
||
|
||
# =============================================================================
|
||
# 額外:configure_alerter / get_failover_alerter 行為驗證
|
||
# =============================================================================
|
||
|
||
def test_configure_alerter_replaces_singleton(mock_redis):
|
||
"""configure_alerter() 應替換現有 singleton 並注入 redis"""
|
||
a1 = get_failover_alerter()
|
||
assert a1._redis is None # 預設無 redis
|
||
|
||
configure_alerter(mock_redis)
|
||
a2 = get_failover_alerter()
|
||
assert a2._redis is mock_redis # 注入後 redis 可用
|
||
assert a1 is not a2 # 是新 instance
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dedup_fail_open_when_no_redis():
|
||
"""Redis 為 None 時 dedup 應 fail-open(允許送出)"""
|
||
alerter = FailoverAlerter(redis_client=None)
|
||
# _check_dedup 應返回 True(允許送出)
|
||
assert await alerter._check_dedup("any:key", ttl=600) is True
|