From 5b25f55340aaee4b6cf41756bb40abef20d1f777 Mon Sep 17 00:00:00 2001 From: OoO Date: Wed, 29 Apr 2026 23:26:02 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A3=9C=E9=BD=8A=20EventRouter=20=E5=A4=B1?= =?UTF-8?q?=E6=95=97=E9=80=9A=E7=9F=A5=E5=9B=9E=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/event_router.py | 69 ++++++++++++++++++++++++++++++++++++++ tests/test_event_router.py | 53 +++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+) diff --git a/services/event_router.py b/services/event_router.py index 71eb4b8..fcf2914 100644 --- a/services/event_router.py +++ b/services/event_router.py @@ -25,6 +25,8 @@ _QUEUE_LOCK = threading.Lock() _DEDUP_LOCK = threading.Lock() _EVENT_DEDUP: Dict[str, float] = {} _DEFAULT_DEDUP_SEC = int(os.getenv("MOMO_EVENT_ROUTER_DEFAULT_DEDUP_SEC", "0")) +_REPLAY_ON_SUCCESS = os.getenv("MOMO_EVENT_ROUTER_REPLAY_ON_SUCCESS", "true").lower() == "true" +_REPLAY_LIMIT = int(os.getenv("MOMO_EVENT_ROUTER_REPLAY_LIMIT", "3")) async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]: @@ -166,6 +168,69 @@ def _queue_failed_delivery( return False +def _queued_record_message(record: Dict[str, Any]) -> str: + message = record.get("message") + if message: + return str(message) + event = record.get("event") if isinstance(record.get("event"), dict) else {} + title = event.get("title") or record.get("event_key") or "EventRouter queued event" + summary = event.get("summary") or event.get("status") or record.get("reason") or "queued delivery replay" + return ( + f"♻️ {title}\n" + f"━━━━━━━━━━━━━━━━━━━━\n" + f"Queue replay: {summary}\n" + f"event_key={record.get('event_key', 'unknown')}\n" + f"queued_at={record.get('ts', 'unknown')}" + ) + + +def replay_failed_deliveries(limit: int = 20, admin_chat_ids: Optional[list] = None) -> Dict[str, Any]: + """Replay queued Telegram deliveries and keep failures in the JSONL queue.""" + if limit <= 0 or not os.path.exists(_QUEUE_PATH): + return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0} + + with _QUEUE_LOCK: + try: + with open(_QUEUE_PATH, "r", encoding="utf-8") as fh: + lines = fh.readlines() + except FileNotFoundError: + return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0} + + remaining = [] + attempted = sent = failed = dropped = 0 + for idx, line in enumerate(lines): + if idx >= limit: + remaining.append(line) + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + dropped += 1 + continue + + attempted += 1 + result = send_telegram_with_result(_queued_record_message(record), chat_ids=admin_chat_ids) + if result["ok"]: + sent += 1 + else: + failed += 1 + record["last_replay_error"] = result.get("errors", []) + record["last_replay_at"] = datetime.now().isoformat() + remaining.append(json.dumps(record, ensure_ascii=False, default=str) + "\n") + + if remaining: + os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True) + with open(_QUEUE_PATH, "w", encoding="utf-8") as fh: + fh.writelines(remaining) + else: + try: + os.remove(_QUEUE_PATH) + except FileNotFoundError: + pass + + return {"attempted": attempted, "sent": sent, "failed": failed, "dropped": dropped} + + def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list[Dict[str, Any]]: """ Execute only ADR-012 L2 SAFE_ACTIONS when NemoTron explicitly marks the plan auto-safe. @@ -247,8 +312,11 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) message, reply_markup = _build_telegram_message(event, tier, result) send_result = send_telegram_with_result(message, chat_ids=admin_chat_ids, reply_markup=reply_markup) queued = False + replayed = {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0} if not send_result["ok"]: queued = _queue_failed_delivery(event, tier, message, send_result["errors"], "telegram_delivery_failed") + elif _REPLAY_ON_SUCCESS: + replayed = replay_failed_deliveries(limit=_REPLAY_LIMIT, admin_chat_ids=admin_chat_ids) latency_ms = int((time.perf_counter() - started_at) * 1000) return { @@ -261,6 +329,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None) "silenced": False, "queued": queued, "deduped": False, + "replayed": replayed, } except Exception as e: logger.exception(f"[EventRouter] dispatch failed: {e}") diff --git a/tests/test_event_router.py b/tests/test_event_router.py index 07a2dee..a456d7f 100644 --- a/tests/test_event_router.py +++ b/tests/test_event_router.py @@ -115,3 +115,56 @@ def test_dispatch_dedupes_repeated_event(monkeypatch): assert second["deduped"] is True assert second["sent"] == 0 assert len(sent) == 1 + + +def test_replay_failed_deliveries_removes_successful_records(tmp_path, monkeypatch): + import services.event_router as event_router + + queue_path = tmp_path / "failed.jsonl" + queue_path.write_text( + json.dumps({ + "ts": "2026-04-29T00:00:00", + "reason": "telegram_delivery_failed", + "tier": "L1", + "event_key": "Scheduler.Test:test", + "event": {"title": "舊告警", "summary": "需要回放"}, + "message": "queued message", + "errors": ["HTTP 500"], + }, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path)) + sent = [] + monkeypatch.setattr( + event_router, + "send_telegram_with_result", + lambda message, **kwargs: sent.append(message) or {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []}, + ) + + result = event_router.replay_failed_deliveries(limit=10) + + assert result == {"attempted": 1, "sent": 1, "failed": 0, "dropped": 0} + assert sent == ["queued message"] + assert not queue_path.exists() + + +def test_replay_failed_deliveries_keeps_failed_records(tmp_path, monkeypatch): + import services.event_router as event_router + + queue_path = tmp_path / "failed.jsonl" + queue_path.write_text( + json.dumps({"event_key": "Scheduler.Test:test", "event": {"title": "舊告警"}, "message": "queued"}) + "\n", + encoding="utf-8", + ) + monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path)) + monkeypatch.setattr( + event_router, + "send_telegram_with_result", + lambda *args, **kwargs: {"ok": False, "sent": 0, "failed": 1, "chat_ids": [123], "errors": ["HTTP 500"]}, + ) + + result = event_router.replay_failed_deliveries(limit=10) + + assert result["attempted"] == 1 + assert result["failed"] == 1 + assert "last_replay_error" in queue_path.read_text(encoding="utf-8")