This commit is contained in:
@@ -25,6 +25,8 @@ _QUEUE_LOCK = threading.Lock()
|
||||
_DEDUP_LOCK = threading.Lock()
|
||||
_EVENT_DEDUP: Dict[str, float] = {}
|
||||
_DEFAULT_DEDUP_SEC = int(os.getenv("MOMO_EVENT_ROUTER_DEFAULT_DEDUP_SEC", "0"))
|
||||
_REPLAY_ON_SUCCESS = os.getenv("MOMO_EVENT_ROUTER_REPLAY_ON_SUCCESS", "true").lower() == "true"
|
||||
_REPLAY_LIMIT = int(os.getenv("MOMO_EVENT_ROUTER_REPLAY_LIMIT", "3"))
|
||||
|
||||
|
||||
async def _handle_l1(event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
|
||||
@@ -166,6 +168,69 @@ def _queue_failed_delivery(
|
||||
return False
|
||||
|
||||
|
||||
def _queued_record_message(record: Dict[str, Any]) -> str:
|
||||
message = record.get("message")
|
||||
if message:
|
||||
return str(message)
|
||||
event = record.get("event") if isinstance(record.get("event"), dict) else {}
|
||||
title = event.get("title") or record.get("event_key") or "EventRouter queued event"
|
||||
summary = event.get("summary") or event.get("status") or record.get("reason") or "queued delivery replay"
|
||||
return (
|
||||
f"♻️ <b>{title}</b>\n"
|
||||
f"━━━━━━━━━━━━━━━━━━━━\n"
|
||||
f"Queue replay: {summary}\n"
|
||||
f"event_key={record.get('event_key', 'unknown')}\n"
|
||||
f"queued_at={record.get('ts', 'unknown')}"
|
||||
)
|
||||
|
||||
|
||||
def replay_failed_deliveries(limit: int = 20, admin_chat_ids: Optional[list] = None) -> Dict[str, Any]:
|
||||
"""Replay queued Telegram deliveries and keep failures in the JSONL queue."""
|
||||
if limit <= 0 or not os.path.exists(_QUEUE_PATH):
|
||||
return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0}
|
||||
|
||||
with _QUEUE_LOCK:
|
||||
try:
|
||||
with open(_QUEUE_PATH, "r", encoding="utf-8") as fh:
|
||||
lines = fh.readlines()
|
||||
except FileNotFoundError:
|
||||
return {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0}
|
||||
|
||||
remaining = []
|
||||
attempted = sent = failed = dropped = 0
|
||||
for idx, line in enumerate(lines):
|
||||
if idx >= limit:
|
||||
remaining.append(line)
|
||||
continue
|
||||
try:
|
||||
record = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
dropped += 1
|
||||
continue
|
||||
|
||||
attempted += 1
|
||||
result = send_telegram_with_result(_queued_record_message(record), chat_ids=admin_chat_ids)
|
||||
if result["ok"]:
|
||||
sent += 1
|
||||
else:
|
||||
failed += 1
|
||||
record["last_replay_error"] = result.get("errors", [])
|
||||
record["last_replay_at"] = datetime.now().isoformat()
|
||||
remaining.append(json.dumps(record, ensure_ascii=False, default=str) + "\n")
|
||||
|
||||
if remaining:
|
||||
os.makedirs(os.path.dirname(_QUEUE_PATH), exist_ok=True)
|
||||
with open(_QUEUE_PATH, "w", encoding="utf-8") as fh:
|
||||
fh.writelines(remaining)
|
||||
else:
|
||||
try:
|
||||
os.remove(_QUEUE_PATH)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
return {"attempted": attempted, "sent": sent, "failed": failed, "dropped": dropped}
|
||||
|
||||
|
||||
def _execute_safe_actions(result: Dict[str, Any], event: Dict[str, Any]) -> list[Dict[str, Any]]:
|
||||
"""
|
||||
Execute only ADR-012 L2 SAFE_ACTIONS when NemoTron explicitly marks the plan auto-safe.
|
||||
@@ -247,8 +312,11 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
|
||||
message, reply_markup = _build_telegram_message(event, tier, result)
|
||||
send_result = send_telegram_with_result(message, chat_ids=admin_chat_ids, reply_markup=reply_markup)
|
||||
queued = False
|
||||
replayed = {"attempted": 0, "sent": 0, "failed": 0, "dropped": 0}
|
||||
if not send_result["ok"]:
|
||||
queued = _queue_failed_delivery(event, tier, message, send_result["errors"], "telegram_delivery_failed")
|
||||
elif _REPLAY_ON_SUCCESS:
|
||||
replayed = replay_failed_deliveries(limit=_REPLAY_LIMIT, admin_chat_ids=admin_chat_ids)
|
||||
latency_ms = int((time.perf_counter() - started_at) * 1000)
|
||||
|
||||
return {
|
||||
@@ -261,6 +329,7 @@ async def dispatch(event: Dict[str, Any], admin_chat_ids: Optional[list] = None)
|
||||
"silenced": False,
|
||||
"queued": queued,
|
||||
"deduped": False,
|
||||
"replayed": replayed,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.exception(f"[EventRouter] dispatch failed: {e}")
|
||||
|
||||
@@ -115,3 +115,56 @@ def test_dispatch_dedupes_repeated_event(monkeypatch):
|
||||
assert second["deduped"] is True
|
||||
assert second["sent"] == 0
|
||||
assert len(sent) == 1
|
||||
|
||||
|
||||
def test_replay_failed_deliveries_removes_successful_records(tmp_path, monkeypatch):
|
||||
import services.event_router as event_router
|
||||
|
||||
queue_path = tmp_path / "failed.jsonl"
|
||||
queue_path.write_text(
|
||||
json.dumps({
|
||||
"ts": "2026-04-29T00:00:00",
|
||||
"reason": "telegram_delivery_failed",
|
||||
"tier": "L1",
|
||||
"event_key": "Scheduler.Test:test",
|
||||
"event": {"title": "舊告警", "summary": "需要回放"},
|
||||
"message": "queued message",
|
||||
"errors": ["HTTP 500"],
|
||||
}, ensure_ascii=False) + "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path))
|
||||
sent = []
|
||||
monkeypatch.setattr(
|
||||
event_router,
|
||||
"send_telegram_with_result",
|
||||
lambda message, **kwargs: sent.append(message) or {"ok": True, "sent": 1, "failed": 0, "chat_ids": [123], "errors": []},
|
||||
)
|
||||
|
||||
result = event_router.replay_failed_deliveries(limit=10)
|
||||
|
||||
assert result == {"attempted": 1, "sent": 1, "failed": 0, "dropped": 0}
|
||||
assert sent == ["queued message"]
|
||||
assert not queue_path.exists()
|
||||
|
||||
|
||||
def test_replay_failed_deliveries_keeps_failed_records(tmp_path, monkeypatch):
|
||||
import services.event_router as event_router
|
||||
|
||||
queue_path = tmp_path / "failed.jsonl"
|
||||
queue_path.write_text(
|
||||
json.dumps({"event_key": "Scheduler.Test:test", "event": {"title": "舊告警"}, "message": "queued"}) + "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path))
|
||||
monkeypatch.setattr(
|
||||
event_router,
|
||||
"send_telegram_with_result",
|
||||
lambda *args, **kwargs: {"ok": False, "sent": 0, "failed": 1, "chat_ids": [123], "errors": ["HTTP 500"]},
|
||||
)
|
||||
|
||||
result = event_router.replay_failed_deliveries(limit=10)
|
||||
|
||||
assert result["attempted"] == 1
|
||||
assert result["failed"] == 1
|
||||
assert "last_replay_error" in queue_path.read_text(encoding="utf-8")
|
||||
|
||||
Reference in New Issue
Block a user