Files
ewoooc/tests/test_event_router.py
2026-05-24 22:49:46 +08:00

272 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
def test_dispatch_degrades_and_queues_when_ai_or_telegram_fails(tmp_path, monkeypatch):
import services.event_router as event_router
queue_path = tmp_path / "event_router_failed.jsonl"
monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path))
monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False)
async def broken_l1(event, session_id):
raise RuntimeError("hermes unavailable")
monkeypatch.setattr(event_router, "_handle_l1", broken_l1)
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda *args, **kwargs: {
"ok": False,
"sent": 0,
"failed": 1,
"chat_ids": [123],
"errors": ["123:HTTP 500"],
},
)
result = asyncio.run(event_router.dispatch({
"source": "Scheduler.Test",
"event_type": "crawler_timeout",
"severity": "warning",
"title": "測試任務異常",
"summary": "timeout",
"trace": "Traceback...",
}))
assert result["tier"] == "L1"
assert result["delivered"] is False
assert result["queued"] is True
assert result["payload"]["status"] == "degraded"
queued = json.loads(queue_path.read_text(encoding="utf-8").strip())
assert queued["reason"] == "telegram_delivery_failed"
assert queued["event_key"] == "Scheduler.Test:crawler_timeout"
def test_dispatch_executes_only_auto_safe_actions(monkeypatch):
import services.agent_actions as agent_actions
import services.event_router as event_router
calls = []
monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False)
monkeypatch.setitem(
agent_actions.SAFE_ACTIONS,
"retry_task",
lambda **params: calls.append(params) or {"status": "scheduled"},
)
async def planned_l2(event, session_id):
return {
"dispatch_to": "safe_action",
"auto_execute": True,
"action_plan": [
{"action": "retry_task", "params": {"task_name": "run_momo_task"}},
{"action": "restart_container", "params": {"container": "momo-db"}},
],
}
monkeypatch.setattr(event_router, "_handle_l2", planned_l2)
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda *args, **kwargs: {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []},
)
result = asyncio.run(event_router.dispatch({
"source": "Scheduler.MOMO",
"event_type": "scheduler_task_failure",
"severity": "alert",
"title": "MOMO 任務異常",
"summary": "boom",
"payload": {"task_name": "run_momo_task"},
}))
assert result["tier"] == "L2"
assert calls == [{"task_name": "run_momo_task"}]
assert result["payload"]["executed_actions"][0]["status"] == "ok"
assert result["payload"]["executed_actions"][1]["status"] == "rejected"
def test_dispatch_dedupes_repeated_event(monkeypatch):
import services.event_router as event_router
event_router._EVENT_DEDUP.clear()
sent = []
monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False)
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda *args, **kwargs: sent.append(True) or {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []},
)
event = {
"source": "Scheduler.BackupMonitor",
"event_type": "backup_monitor_alert",
"severity": "alert",
"title": "備份健康告警",
"summary": "stale backup",
"dedup_ttl_sec": 60,
}
first = asyncio.run(event_router.dispatch(event))
second = asyncio.run(event_router.dispatch(event))
assert first["deduped"] is False
assert second["deduped"] is True
assert second["sent"] == 0
assert len(sent) == 1
def test_dispatch_decision_envelope_skips_ai_handler(monkeypatch):
import services.event_router as event_router
sent = []
monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False)
async def should_not_run(*args, **kwargs):
raise AssertionError("decision envelope event should not enter AI tier")
monkeypatch.setattr(event_router, "_handle_l1", should_not_run)
monkeypatch.setattr(event_router, "_handle_l2", should_not_run)
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda message, **kwargs: sent.append((message, kwargs)) or {
"ok": True,
"sent": 1,
"failed": 0,
"chat_ids": [123],
"errors": [],
},
)
envelope = {
"decision_id": "nemotron:price_alert:SKU-1:abcdef12",
"source_agent": "nemotron",
"decision_type": "price_alert",
"severity": "P1",
"confidence": 0.91,
"subject": {
"sku": "SKU-1",
"name": "測試精華液",
"competitor_product_id": "PC-1",
"competitor_product_name": "PChome 測試精華液",
},
"evidence": [
{
"type": "match",
"metric": "match_score",
"value": 0.93,
"basis": "exact/total_price/price_alert_exact",
"confidence": 0.93,
}
],
"recommended_action": {"action": "price_follow_review", "owner": "營運", "requires_hitl": True},
"guardrails": {
"can_auto_execute": False,
"data_quality": "complete",
"blocked_reason": "價格調整需人工覆核",
},
}
result = asyncio.run(event_router.dispatch({
"source": "NemoTron.Dispatcher",
"event_type": "nemoton_dispatch_alert",
"severity": "alert",
"title": "NemoTron 派發器告警",
"summary": "legacy raw message should not become the decision brief",
"decision_envelope": envelope,
"payload": {"raw_message": "legacy raw message should not become the decision brief"},
}))
assert result["delivered"] is True
assert result["payload"]["status"] == "decision_envelope_direct"
assert len(sent) == 1
message, kwargs = sent[0]
assert "🧭 <b>決策信封</b>" in message
assert "SKU<code>SKU-1</code>" in message
assert "PChome<code>PC-1</code>" in message
assert "exact/total_price/price_alert_exact" in message
assert "legacy raw message" not in message
assert kwargs["reply_markup"]["inline_keyboard"][0][0]["callback_data"].startswith("momo:eig:")
def test_replay_failed_deliveries_removes_successful_records(tmp_path, monkeypatch):
import services.event_router as event_router
queue_path = tmp_path / "failed.jsonl"
queue_path.write_text(
json.dumps({
"ts": "2026-04-29T00:00:00",
"reason": "telegram_delivery_failed",
"tier": "L1",
"event_key": "Scheduler.Test:test",
"event": {"title": "舊告警", "summary": "需要回放"},
"message": "queued message",
"errors": ["HTTP 500"],
}, ensure_ascii=False) + "\n",
encoding="utf-8",
)
monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path))
sent = []
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda message, **kwargs: sent.append(message) or {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []},
)
result = event_router.replay_failed_deliveries(limit=10)
assert result == {"attempted": 1, "sent": 1, "failed": 0, "dropped": 0}
assert "EventRouter Queue Replay" in sent[0]
assert "queued message" not in sent[0]
assert not queue_path.exists()
def test_queued_record_message_rebuilds_safe_compact_html():
import services.event_router as event_router
record = {
"ts": "2026-05-21T13:00:00",
"reason": "telegram_delivery_failed",
"event_key": "Scheduler.Festival:festival_task_failure",
"event": {
"title": "促銷爬蟲 <失敗>",
"summary": "Alert Text: 很抱歉此EDM不存在 <unknown>",
"trace": "#0 0xabc <unknown>\n" * 200,
},
"message": "<pre>原始壞訊息 <unknown></pre>",
"errors": ["-1003940688311:HTTP 400 <bad html>"],
}
message = event_router._queued_record_message(record)
assert "EventRouter Queue Replay" in message
assert "原始壞訊息" not in message
assert "<unknown>" not in message
assert "&lt;unknown&gt;" in message
assert "&lt;失敗&gt;" in message
assert len(message) < 4096
def test_replay_failed_deliveries_keeps_failed_records(tmp_path, monkeypatch):
import services.event_router as event_router
queue_path = tmp_path / "failed.jsonl"
queue_path.write_text(
json.dumps({"event_key": "Scheduler.Test:test", "event": {"title": "舊告警"}, "message": "queued"}) + "\n",
encoding="utf-8",
)
monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path))
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda *args, **kwargs: {"ok": False, "sent": 0, "failed": 1, "chat_ids": [123], "errors": ["HTTP 500"]},
)
result = event_router.replay_failed_deliveries(limit=10)
assert result["attempted"] == 1
assert result["failed"] == 1
assert "last_replay_error" in queue_path.read_text(encoding="utf-8")