Files
ewoooc/tests/test_event_router.py
OoO 35ad29b04d
All checks were successful
CD Pipeline / deploy (push) Successful in 1m6s
V10.582 強化比價通知與身份證據
2026-06-04 14:03:21 +08:00

331 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import json
def test_dispatch_degrades_and_queues_when_ai_or_telegram_fails(tmp_path, monkeypatch):
import services.event_router as event_router
queue_path = tmp_path / "event_router_failed.jsonl"
monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path))
monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False)
async def broken_l1(event, session_id):
raise RuntimeError("hermes unavailable")
monkeypatch.setattr(event_router, "_handle_l1", broken_l1)
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda *args, **kwargs: {
"ok": False,
"sent": 0,
"failed": 1,
"chat_ids": [123],
"errors": ["123:HTTP 500"],
},
)
result = asyncio.run(event_router.dispatch({
"source": "Scheduler.Test",
"event_type": "crawler_timeout",
"severity": "warning",
"title": "測試任務異常",
"summary": "timeout",
"trace": "Traceback...",
}))
assert result["tier"] == "L1"
assert result["delivered"] is False
assert result["queued"] is True
assert result["payload"]["status"] == "degraded"
queued = json.loads(queue_path.read_text(encoding="utf-8").strip())
assert queued["reason"] == "telegram_delivery_failed"
assert queued["event_key"] == "Scheduler.Test:crawler_timeout"
def test_dispatch_executes_only_auto_safe_actions(monkeypatch):
import services.agent_actions as agent_actions
import services.event_router as event_router
calls = []
monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False)
monkeypatch.setitem(
agent_actions.SAFE_ACTIONS,
"retry_task",
lambda **params: calls.append(params) or {"status": "scheduled"},
)
async def planned_l2(event, session_id):
return {
"dispatch_to": "safe_action",
"auto_execute": True,
"action_plan": [
{"action": "retry_task", "params": {"task_name": "run_momo_task"}},
{"action": "restart_container", "params": {"container": "momo-db"}},
],
}
monkeypatch.setattr(event_router, "_handle_l2", planned_l2)
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda *args, **kwargs: {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []},
)
result = asyncio.run(event_router.dispatch({
"source": "Scheduler.MOMO",
"event_type": "scheduler_task_failure",
"severity": "alert",
"title": "MOMO 任務異常",
"summary": "boom",
"payload": {"task_name": "run_momo_task"},
}))
assert result["tier"] == "L2"
assert calls == [{"task_name": "run_momo_task"}]
assert result["payload"]["executed_actions"][0]["status"] == "ok"
assert result["payload"]["executed_actions"][1]["status"] == "rejected"
def test_dispatch_dedupes_repeated_event(monkeypatch):
import services.event_router as event_router
event_router._EVENT_DEDUP.clear()
sent = []
monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False)
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda *args, **kwargs: sent.append(True) or {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []},
)
event = {
"source": "Scheduler.BackupMonitor",
"event_type": "backup_monitor_alert",
"severity": "alert",
"title": "備份健康告警",
"summary": "stale backup",
"dedup_ttl_sec": 60,
}
first = asyncio.run(event_router.dispatch(event))
second = asyncio.run(event_router.dispatch(event))
assert first["deduped"] is False
assert second["deduped"] is True
assert second["sent"] == 0
assert len(sent) == 1
def test_dispatch_decision_envelope_skips_ai_handler(monkeypatch):
import services.event_router as event_router
sent = []
monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False)
async def should_not_run(*args, **kwargs):
raise AssertionError("decision envelope event should not enter AI tier")
monkeypatch.setattr(event_router, "_handle_l1", should_not_run)
monkeypatch.setattr(event_router, "_handle_l2", should_not_run)
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda message, **kwargs: sent.append((message, kwargs)) or {
"ok": True,
"sent": 1,
"failed": 0,
"chat_ids": [123],
"errors": [],
},
)
envelope = {
"decision_id": "nemotron:price_alert:SKU-1:abcdef12",
"source_agent": "nemotron",
"decision_type": "price_alert",
"severity": "P1",
"confidence": 0.91,
"subject": {
"sku": "SKU-1",
"name": "測試精華液",
"momo_price": 120,
"competitor_price": 100,
"competitor_product_id": "PC-1",
"competitor_product_name": "PChome 測試精華液",
},
"evidence": [
{
"type": "match",
"metric": "match_score",
"value": 0.93,
"basis": "exact/total_price/price_alert_exact",
"confidence": 0.93,
}
],
"recommended_action": {"action": "price_follow_review", "owner": "營運", "requires_hitl": True},
"guardrails": {
"can_auto_execute": False,
"data_quality": "complete",
"blocked_reason": "價格調整需人工覆核",
},
"expected_impact": {
"gap_amount": 20,
"candidate_gap_pct": 20,
},
}
result = asyncio.run(event_router.dispatch({
"source": "NemoTron.Dispatcher",
"event_type": "nemoton_dispatch_alert",
"severity": "alert",
"title": "NemoTron 派發器告警",
"summary": "legacy raw message should not become the decision brief",
"decision_envelope": envelope,
"payload": {"raw_message": "legacy raw message should not become the decision brief"},
}))
assert result["delivered"] is True
assert result["payload"]["status"] == "decision_envelope_direct"
assert len(sent) == 1
message, kwargs = sent[0]
assert "🧭 <b>決策信封</b>" in message
assert "🎯 <b>標的</b>" in message
assert "📊 <b>價格證據</b>" in message
assert "🧩 <b>比對證據</b>" in message
assert "✅ <b>人工下一步</b>" in message
assert "🚦 <b>通知分級</b>" in message
assert "直接價格威脅" in message
assert "高信心同款 / 總價可比 / 可直接價格告警" in message
assert "SKU<code>SKU-1</code>" in message
assert "MOMO<b>NT$ 120</b>" in message
assert "PChome<b>NT$ 100</b>" in message
assert "價差:<b>+20.0%</b> / NT$ 20" in message
assert "PChome<code>PC-1</code>" in message
assert "exact/total_price/price_alert_exact" in message
assert "legacy raw message" not in message
assert kwargs["reply_markup"]["inline_keyboard"][0][0]["callback_data"].startswith("momo:eig:")
def test_price_decision_template_marks_unit_price_review_as_non_direct_alert():
from services.telegram_templates import _format_decision_envelope
envelope = {
"decision_id": "review_queue:SKU-UNIT",
"decision_type": "pchome_match_review",
"severity": "P2",
"confidence": 0.82,
"subject": {
"sku": "SKU-UNIT",
"name": "測試乳液 500ml x2",
"momo_price": 1200,
"competitor_price": 950,
"competitor_product_id": "PCH-UNIT",
"competitor_product_name": "測試乳液 1000ml",
},
"evidence": [
{
"type": "match",
"metric": "match_score",
"value": 0.82,
"basis": "same_product_different_pack/unit_price/unit_price_review",
}
],
"recommended_action": {"action": "unit_price_required", "owner": "營運", "requires_hitl": True},
"guardrails": {
"can_auto_execute": False,
"data_quality": "partial",
"match_type": "same_product_different_pack",
"price_basis": "unit_price",
"alert_tier": "unit_price_review",
},
}
message = "\n".join(_format_decision_envelope(envelope))
assert "🚦 <b>通知分級</b>" in message
assert "單位價覆核" in message
assert "同商品不同包裝 / 單位價可比 / 單位價覆核" in message
assert "禁止用總價直接判定價格威脅" in message
assert "改用單位價覆核,不寫總價型價差" in message
def test_replay_failed_deliveries_removes_successful_records(tmp_path, monkeypatch):
import services.event_router as event_router
queue_path = tmp_path / "failed.jsonl"
queue_path.write_text(
json.dumps({
"ts": "2026-04-29T00:00:00",
"reason": "telegram_delivery_failed",
"tier": "L1",
"event_key": "Scheduler.Test:test",
"event": {"title": "舊告警", "summary": "需要回放"},
"message": "queued message",
"errors": ["HTTP 500"],
}, ensure_ascii=False) + "\n",
encoding="utf-8",
)
monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path))
sent = []
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda message, **kwargs: sent.append(message) or {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []},
)
result = event_router.replay_failed_deliveries(limit=10)
assert result == {"attempted": 1, "sent": 1, "failed": 0, "dropped": 0}
assert "EventRouter Queue Replay" in sent[0]
assert "queued message" not in sent[0]
assert not queue_path.exists()
def test_queued_record_message_rebuilds_safe_compact_html():
import services.event_router as event_router
record = {
"ts": "2026-05-21T13:00:00",
"reason": "telegram_delivery_failed",
"event_key": "Scheduler.Festival:festival_task_failure",
"event": {
"title": "促銷爬蟲 <失敗>",
"summary": "Alert Text: 很抱歉此EDM不存在 <unknown>",
"trace": "#0 0xabc <unknown>\n" * 200,
},
"message": "<pre>原始壞訊息 <unknown></pre>",
"errors": ["-1003940688311:HTTP 400 <bad html>"],
}
message = event_router._queued_record_message(record)
assert "EventRouter Queue Replay" in message
assert "原始壞訊息" not in message
assert "<unknown>" not in message
assert "&lt;unknown&gt;" in message
assert "&lt;失敗&gt;" in message
assert len(message) < 4096
def test_replay_failed_deliveries_keeps_failed_records(tmp_path, monkeypatch):
import services.event_router as event_router
queue_path = tmp_path / "failed.jsonl"
queue_path.write_text(
json.dumps({"event_key": "Scheduler.Test:test", "event": {"title": "舊告警"}, "message": "queued"}) + "\n",
encoding="utf-8",
)
monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path))
monkeypatch.setattr(
event_router,
"send_telegram_with_result",
lambda *args, **kwargs: {"ok": False, "sent": 0, "failed": 1, "chat_ids": [123], "errors": ["HTTP 500"]},
)
result = event_router.replay_failed_deliveries(limit=10)
assert result["attempted"] == 1
assert result["failed"] == 1
assert "last_replay_error" in queue_path.read_text(encoding="utf-8")