import asyncio import json def test_dispatch_degrades_and_queues_when_ai_or_telegram_fails(tmp_path, monkeypatch): import services.event_router as event_router queue_path = tmp_path / "event_router_failed.jsonl" monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path)) monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False) async def broken_l1(event, session_id): raise RuntimeError("hermes unavailable") monkeypatch.setattr(event_router, "_handle_l1", broken_l1) monkeypatch.setattr( event_router, "send_telegram_with_result", lambda *args, **kwargs: { "ok": False, "sent": 0, "failed": 1, "chat_ids": [123], "errors": ["123:HTTP 500"], }, ) result = asyncio.run(event_router.dispatch({ "source": "Scheduler.Test", "event_type": "crawler_timeout", "severity": "warning", "title": "測試任務異常", "summary": "timeout", "trace": "Traceback...", })) assert result["tier"] == "L1" assert result["delivered"] is False assert result["queued"] is True assert result["payload"]["status"] == "degraded" queued = json.loads(queue_path.read_text(encoding="utf-8").strip()) assert queued["reason"] == "telegram_delivery_failed" assert queued["event_key"] == "Scheduler.Test:crawler_timeout" def test_dispatch_executes_only_auto_safe_actions(monkeypatch): import services.agent_actions as agent_actions import services.event_router as event_router calls = [] monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False) monkeypatch.setitem( agent_actions.SAFE_ACTIONS, "retry_task", lambda **params: calls.append(params) or {"status": "scheduled"}, ) async def planned_l2(event, session_id): return { "dispatch_to": "safe_action", "auto_execute": True, "action_plan": [ {"action": "retry_task", "params": {"task_name": "run_momo_task"}}, {"action": "restart_container", "params": {"container": "momo-db"}}, ], } monkeypatch.setattr(event_router, "_handle_l2", planned_l2) monkeypatch.setattr( event_router, "send_telegram_with_result", lambda *args, **kwargs: {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []}, ) result = asyncio.run(event_router.dispatch({ "source": "Scheduler.MOMO", "event_type": "scheduler_task_failure", "severity": "alert", "title": "MOMO 任務異常", "summary": "boom", "payload": {"task_name": "run_momo_task"}, })) assert result["tier"] == "L2" assert calls == [{"task_name": "run_momo_task"}] assert result["payload"]["executed_actions"][0]["status"] == "ok" assert result["payload"]["executed_actions"][1]["status"] == "rejected" def test_dispatch_dedupes_repeated_event(monkeypatch): import services.event_router as event_router event_router._EVENT_DEDUP.clear() sent = [] monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False) monkeypatch.setattr( event_router, "send_telegram_with_result", lambda *args, **kwargs: sent.append(True) or {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []}, ) event = { "source": "Scheduler.BackupMonitor", "event_type": "backup_monitor_alert", "severity": "alert", "title": "備份健康告警", "summary": "stale backup", "dedup_ttl_sec": 60, } first = asyncio.run(event_router.dispatch(event)) second = asyncio.run(event_router.dispatch(event)) assert first["deduped"] is False assert second["deduped"] is True assert second["sent"] == 0 assert len(sent) == 1 def test_dispatch_decision_envelope_skips_ai_handler(monkeypatch): import services.event_router as event_router sent = [] monkeypatch.setattr(event_router, "_is_event_silenced", lambda event: False) async def should_not_run(*args, **kwargs): raise AssertionError("decision envelope event should not enter AI tier") monkeypatch.setattr(event_router, "_handle_l1", should_not_run) monkeypatch.setattr(event_router, "_handle_l2", should_not_run) monkeypatch.setattr( event_router, "send_telegram_with_result", lambda message, **kwargs: sent.append((message, kwargs)) or { "ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": [], }, ) envelope = { "decision_id": "nemotron:price_alert:SKU-1:abcdef12", "source_agent": "nemotron", "decision_type": "price_alert", "severity": "P1", "confidence": 0.91, "subject": { "sku": "SKU-1", "name": "測試精華液", "momo_price": 120, "competitor_price": 100, "competitor_product_id": "PC-1", "competitor_product_name": "PChome 測試精華液", }, "evidence": [ { "type": "match", "metric": "match_score", "value": 0.93, "basis": "exact/total_price/price_alert_exact", "confidence": 0.93, } ], "recommended_action": {"action": "price_follow_review", "owner": "營運", "requires_hitl": True}, "guardrails": { "can_auto_execute": False, "data_quality": "complete", "blocked_reason": "價格調整需人工覆核", }, "expected_impact": { "gap_amount": 20, "candidate_gap_pct": 20, }, } result = asyncio.run(event_router.dispatch({ "source": "NemoTron.Dispatcher", "event_type": "nemoton_dispatch_alert", "severity": "alert", "title": "NemoTron 派發器告警", "summary": "legacy raw message should not become the decision brief", "decision_envelope": envelope, "payload": {"raw_message": "legacy raw message should not become the decision brief"}, })) assert result["delivered"] is True assert result["payload"]["status"] == "decision_envelope_direct" assert len(sent) == 1 message, kwargs = sent[0] assert "🧭 決策信封" in message assert "🎯 標的" in message assert "📊 價格證據" in message assert "🧩 比對證據" in message assert "✅ 人工下一步" in message assert "🚦 通知分級" in message assert "直接價格威脅" in message assert "高信心同款 / 總價可比 / 可直接價格告警" in message assert "SKU:SKU-1" in message assert "MOMO:NT$ 120" in message assert "PChome:NT$ 100" in message assert "價差:+20.0% / NT$ 20" in message assert "PChome:PC-1" in message assert "exact/total_price/price_alert_exact" in message assert "legacy raw message" not in message assert kwargs["reply_markup"]["inline_keyboard"][0][0]["callback_data"].startswith("momo:eig:") def test_price_decision_template_marks_unit_price_review_as_non_direct_alert(): from services.telegram_templates import _format_decision_envelope envelope = { "decision_id": "review_queue:SKU-UNIT", "decision_type": "pchome_match_review", "severity": "P2", "confidence": 0.82, "subject": { "sku": "SKU-UNIT", "name": "測試乳液 500ml x2", "momo_price": 1200, "competitor_price": 950, "competitor_product_id": "PCH-UNIT", "competitor_product_name": "測試乳液 1000ml", }, "evidence": [ { "type": "match", "metric": "match_score", "value": 0.82, "basis": "same_product_different_pack/unit_price/unit_price_review", } ], "recommended_action": {"action": "unit_price_required", "owner": "營運", "requires_hitl": True}, "guardrails": { "can_auto_execute": False, "data_quality": "partial", "match_type": "same_product_different_pack", "price_basis": "unit_price", "alert_tier": "unit_price_review", }, } message = "\n".join(_format_decision_envelope(envelope)) assert "🚦 通知分級" in message assert "單位價覆核" in message assert "同商品不同包裝 / 單位價可比 / 單位價覆核" in message assert "禁止用總價直接判定價格威脅" in message assert "改用單位價覆核,不寫總價型價差" in message def test_replay_failed_deliveries_removes_successful_records(tmp_path, monkeypatch): import services.event_router as event_router queue_path = tmp_path / "failed.jsonl" queue_path.write_text( json.dumps({ "ts": "2026-04-29T00:00:00", "reason": "telegram_delivery_failed", "tier": "L1", "event_key": "Scheduler.Test:test", "event": {"title": "舊告警", "summary": "需要回放"}, "message": "queued message", "errors": ["HTTP 500"], }, ensure_ascii=False) + "\n", encoding="utf-8", ) monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path)) sent = [] monkeypatch.setattr( event_router, "send_telegram_with_result", lambda message, **kwargs: sent.append(message) or {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []}, ) result = event_router.replay_failed_deliveries(limit=10) assert result == {"attempted": 1, "sent": 1, "failed": 0, "dropped": 0} assert "EventRouter Queue Replay" in sent[0] assert "queued message" not in sent[0] assert not queue_path.exists() def test_queued_record_message_rebuilds_safe_compact_html(): import services.event_router as event_router record = { "ts": "2026-05-21T13:00:00", "reason": "telegram_delivery_failed", "event_key": "Scheduler.Festival:festival_task_failure", "event": { "title": "促銷爬蟲 <失敗>", "summary": "Alert Text: 很抱歉此EDM不存在 ", "trace": "#0 0xabc \n" * 200, }, "message": "
原始壞訊息 
", "errors": ["-1003940688311:HTTP 400 "], } message = event_router._queued_record_message(record) assert "EventRouter Queue Replay" in message assert "原始壞訊息" not in message assert "" not in message assert "<unknown>" in message assert "<失敗>" in message assert len(message) < 4096 def test_replay_failed_deliveries_keeps_failed_records(tmp_path, monkeypatch): import services.event_router as event_router queue_path = tmp_path / "failed.jsonl" queue_path.write_text( json.dumps({"event_key": "Scheduler.Test:test", "event": {"title": "舊告警"}, "message": "queued"}) + "\n", encoding="utf-8", ) monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path)) monkeypatch.setattr( event_router, "send_telegram_with_result", lambda *args, **kwargs: {"ok": False, "sent": 0, "failed": 1, "chat_ids": [123], "errors": ["HTTP 500"]}, ) result = event_router.replay_failed_deliveries(limit=10) assert result["attempted"] == 1 assert result["failed"] == 1 assert "last_replay_error" in queue_path.read_text(encoding="utf-8")