feat(p1.5): FailoverAlerter 整合點 3+4 + 6 個 testcase 補完
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 1m32s

P1.5 收尾(status 文件 line 96-99 指定):

整合點 3 — failover_manager Gemini quota 告警觸發:
- ollama_failover_manager.py: _check_gemini_quota 返回 False 時呼叫
  alerter.alert_gemini_quota_exceeded({quota, current_count})
- 從 Redis 讀 ollama:gemini_daily_count:{date} 取 current_count(fail-soft)
- alerter 內 24h dedup(QUOTA_DEDUP_TTL_SEC=86400),每日只發一次
- try/except 包裹:告警失敗 fail-open,不阻斷 routing

整合點 4 — main.py lifespan 注入 Redis client:
- 在 _recovery_svc.start() 之後、yield 之前
- 呼叫 configure_alerter(get_redis()) 替換 singleton 注入 dedup 能力
- try/except 包裹:注入失敗 fail-open(alerter 仍可工作但 dedup 失效)

新測試 (174 行, 6/6 pass):
- test_alert_failover_dedup: 同 to_provider 第二次被 10min dedup 
- test_alert_recovery_send: 正常發送 + Markdown 訊息 + 連續 N 次 HEALTHY 
- test_no_telegram_chat_id_noop: chat_id 缺時 fail-soft 不 raise 
- test_quota_alert_dedup_24h: TTL=86400s,訊息含 quota+count 
- test_configure_alerter_replaces_singleton: lifespan 注入後 redis 可用 
- test_dedup_fail_open_when_no_redis: Redis None → 允許送出 

Mock 注意:_send() inline import telegram_gateway/get_settings,
mock target 必須是 src.services.telegram_gateway / src.core.config
而非 alerter module 自己。

回歸:原 37 ollama_failover_manager + 3 lifespan_wiring 測試全綠。

飛輪自主化分數:~75 → 預估 ~80(配額耗盡有告警,運維可見性 +5)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-26 20:28:29 +08:00
parent fd40b79db4
commit dcf2750b2b
3 changed files with 209 additions and 0 deletions

View File

@@ -563,6 +563,17 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
# 啟動 recovery service從 Redis bootstrap current_primary並啟動背景監控
await _recovery_svc.start()
# 2026-04-26 P1.5 整合點 4 by Claude Opus 4.7 — Failover Alerter 注入 Redis client
# 必須在 recovery_svc.start() 之後(確保 Redis pool 已可用yield 之前
try:
from src.services.failover_alerter import configure_alerter
from src.core.redis_client import get_redis
configure_alerter(get_redis())
logger.info("failover_alerter_configured")
except Exception as _alerter_err:
logger.warning("failover_alerter_configure_failed", error=str(_alerter_err))
logger.info("ollama_failover_system_started")
except Exception as e:
logger.warning("ollama_failover_system_start_failed", error=str(e))

View File

@@ -232,6 +232,30 @@ class OllamaFailoverManager:
url_111=url_111,
url_188=url_188,
)
# 2026-04-26 P1.5 整合點 3 by Claude Opus 4.7 — 配額耗盡 Telegram 告警
# alerter 內部 24h dedupQUOTA_DEDUP_TTL_SEC即使每次 quota exceeded
# 都呼叫,當日只會發送一次告警。失敗 fail-open不阻擋 routing
try:
from src.services.failover_alerter import get_failover_alerter
from src.core.redis_client import get_redis
_current_count = quota # 預設為 quota 值(已超過則 ≥ quota
try:
_redis = get_redis()
if _redis is not None:
_key = f"ollama:gemini_daily_count:{datetime.date.today().isoformat()}"
_raw = await _redis.get(_key)
_current_count = int(_raw or 0)
except Exception:
pass
await get_failover_alerter().alert_gemini_quota_exceeded({
"quota": quota,
"current_count": _current_count,
})
except Exception as _alert_err:
logger.warning(
"gemini_quota_alert_dispatch_failed",
error=str(_alert_err),
)
# 寫入 audit_logbest-effort
await self._write_failover_audit(result)

View File

@@ -0,0 +1,174 @@
"""FailoverAlerter 單元測試 — P1.5 Telegram 容災告警
四大 testcase覆蓋 status 文件 line 99 指定範圍):
1. test_alert_failover_dedup — 同 to_provider 第二次被 10min dedup
2. test_alert_recovery_send — 正常發送 + Markdown 訊息結構
3. test_no_telegram_chat_id_noop — chat_id 缺失時不發送fail-soft
4. test_quota_alert_dedup_24h — quota 告警 86400s TTL每日一次
2026-04-26 P1.5 補測 by Claude Opus 4.7
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.services.failover_alerter import (
DEDUP_TTL_SEC,
QUOTA_DEDUP_TTL_SEC,
FailoverAlerter,
configure_alerter,
get_failover_alerter,
reset_failover_alerter,
)
@pytest.fixture(autouse=True)
def _reset_singleton():
"""每個 test 前後重置 singleton避免 state 洩漏"""
reset_failover_alerter()
yield
reset_failover_alerter()
@pytest.fixture
def mock_redis():
"""Mock Redisset 第一次回 TrueNX 成功),第二次回 None已存在"""
redis = MagicMock()
redis.set = AsyncMock(side_effect=[True, None, True, None])
return redis
@pytest.fixture
def mock_telegram_send():
"""Mock TelegramGateway.send_notification + settings.OPENCLAW_TG_CHAT_ID
`_send()` 在函式內 inline import必須 mock 來源 module 而非 alerter module。
"""
with patch("src.services.telegram_gateway.get_telegram_gateway") as mock_gw, \
patch("src.core.config.get_settings") as mock_settings:
gateway = MagicMock()
gateway.send_notification = AsyncMock()
mock_gw.return_value = gateway
mock_settings.return_value = MagicMock(OPENCLAW_TG_CHAT_ID="-100123")
yield gateway
# =============================================================================
# Case 1: failover dedup同 to_provider 第二次被攔)
# =============================================================================
@pytest.mark.asyncio
async def test_alert_failover_dedup(mock_redis, mock_telegram_send):
alerter = FailoverAlerter(redis_client=mock_redis)
event = {
"to_provider": "gemini",
"reason": "111 unhealthy",
"model": "qwen3:8b",
"fallback_chain_str": "gemini → ollama_188",
}
# 第 1 次dedup pass發送
await alerter.alert_failover(event)
assert mock_telegram_send.send_notification.await_count == 1
# 第 2 次dedup hit不發送
await alerter.alert_failover(event)
assert mock_telegram_send.send_notification.await_count == 1 # 仍是 1
# 驗證 dedup TTL = 10min
assert mock_redis.set.await_args_list[0].kwargs["ex"] == DEDUP_TTL_SEC
assert mock_redis.set.await_args_list[0].kwargs["nx"] is True
# =============================================================================
# Case 2: recovery 正常發送
# =============================================================================
@pytest.mark.asyncio
async def test_alert_recovery_send(mock_redis, mock_telegram_send):
alerter = FailoverAlerter(redis_client=mock_redis)
await alerter.alert_recovery({
"from_provider": "gemini",
"to_provider": "ollama_111",
"stable_count": 3,
})
assert mock_telegram_send.send_notification.await_count == 1
sent_kwargs = mock_telegram_send.send_notification.await_args.kwargs
assert sent_kwargs["parse_mode"] == "MarkdownV2"
# 訊息應提及恢復 + 連續 3 次 HEALTHY
assert "Ollama 自動恢復" in sent_kwargs["text"]
assert "連續 3" in sent_kwargs["text"]
# =============================================================================
# Case 3: chat_id 缺失 → fail-soft不發送不 raise
# =============================================================================
@pytest.mark.asyncio
async def test_no_telegram_chat_id_noop(mock_redis):
alerter = FailoverAlerter(redis_client=mock_redis)
with patch("src.services.telegram_gateway.get_telegram_gateway") as mock_gw, \
patch("src.core.config.get_settings") as mock_settings:
gateway = MagicMock()
gateway.send_notification = AsyncMock()
mock_gw.return_value = gateway
mock_settings.return_value = MagicMock(OPENCLAW_TG_CHAT_ID=None)
# 不該 raisededup pass 但 send 因 chat_id 缺直接 return
await alerter.alert_failover({"to_provider": "gemini"})
assert gateway.send_notification.await_count == 0
# =============================================================================
# Case 4: quota 告警 24h dedup
# =============================================================================
@pytest.mark.asyncio
async def test_quota_alert_dedup_24h(mock_redis, mock_telegram_send):
alerter = FailoverAlerter(redis_client=mock_redis)
await alerter.alert_gemini_quota_exceeded({
"quota": 1000,
"current_count": 1003,
})
# 訊息發出
assert mock_telegram_send.send_notification.await_count == 1
sent = mock_telegram_send.send_notification.await_args.kwargs["text"]
assert "Gemini 每日配額耗盡" in sent
assert "1000" in sent
assert "1003" in sent
# 驗證 dedup TTL = 24h
assert mock_redis.set.await_args_list[0].kwargs["ex"] == QUOTA_DEDUP_TTL_SEC
assert QUOTA_DEDUP_TTL_SEC == 86400 # sanity check 常數本身
# =============================================================================
# 額外configure_alerter / get_failover_alerter 行為驗證
# =============================================================================
def test_configure_alerter_replaces_singleton(mock_redis):
"""configure_alerter() 應替換現有 singleton 並注入 redis"""
a1 = get_failover_alerter()
assert a1._redis is None # 預設無 redis
configure_alerter(mock_redis)
a2 = get_failover_alerter()
assert a2._redis is mock_redis # 注入後 redis 可用
assert a1 is not a2 # 是新 instance
@pytest.mark.asyncio
async def test_dedup_fail_open_when_no_redis():
"""Redis 為 None 時 dedup 應 fail-open允許送出"""
alerter = FailoverAlerter(redis_client=None)
# _check_dedup 應返回 True允許送出
assert await alerter._check_dedup("any:key", ttl=600) is True