# test_aider_event_processor | 2026-04-20 @ Asia/Taipei """Unit tests for AiderEventProcessor.""" import pytest import json from datetime import datetime, timezone, timedelta from unittest.mock import AsyncMock, MagicMock, patch from src.workers.aider_event_processor import AiderEventProcessor TAIPEI = timezone(timedelta(hours=8)) def _payload_dict(): """基本的 aider event payload。""" return { "ts": datetime.now(TAIPEI).isoformat(), "session_id": "s1", "host": "ogt-mac", "type": "error", "payload": {"cwd": "/r", "model": "elephant-alpha", "kind": "api_rate_limit", "message": "429", "context_50chars": ""}, } @pytest.mark.asyncio async def test_process_one_error_event_creates_incident_and_writes_db(monkeypatch): """error event 應建 incident + 寫 DB。""" # Mock incident engine fake_incident = MagicMock() fake_incident.incident_id = "inc-123" fake_engine = MagicMock() fake_engine.process_signal = AsyncMock(return_value=fake_incident) monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine", lambda: fake_engine) # Mock DB session factory + repo inserted = {} class FakeRepo: def __init__(self, sess): pass async def insert(self, **kw): inserted.update(kw); return 1 class FakeSession: async def __aenter__(self): return self async def __aexit__(self, *a): return False async def commit(self): pass monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository", FakeRepo) monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory", lambda: (lambda: FakeSession())) # Mock redis ack fake_r = MagicMock() fake_r.xack = AsyncMock() monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", lambda: fake_r) # Act proc = AiderEventProcessor() payload = _payload_dict() data = {b"payload": json.dumps(payload).encode()} await proc._process_one("stream", "1-0", data) # Assert assert fake_engine.process_signal.called assert inserted.get("incident_id") == "inc-123" assert inserted.get("type_") == "error" fake_r.xack.assert_called_once() @pytest.mark.asyncio async def test_process_one_session_start_no_incident(monkeypatch): """session_start 不應建 incident,但應寫 DB。""" fake_engine = MagicMock() fake_engine.process_signal = AsyncMock() monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine", lambda: fake_engine) inserted = {} class FakeRepo: def __init__(self, sess): pass async def insert(self, **kw): inserted.update(kw); return 1 class FakeSession: async def __aenter__(self): return self async def __aexit__(self, *a): return False async def commit(self): pass monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository", FakeRepo) monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory", lambda: (lambda: FakeSession())) fake_r = MagicMock() fake_r.xack = AsyncMock() monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", lambda: fake_r) # Act proc = AiderEventProcessor() payload = _payload_dict() payload["type"] = "session_start" payload["payload"] = {"cwd": "/r", "model": "m", "aider_args": [], "aider_pid": 1, "cli_version": "0"} data = {b"payload": json.dumps(payload).encode()} await proc._process_one("stream", "1-0", data) # Assert assert not fake_engine.process_signal.called # session_start 不建 incident assert inserted.get("incident_id") is None # DB 依然寫入 assert inserted.get("type_") == "session_start" fake_r.xack.assert_called_once() @pytest.mark.asyncio async def test_process_one_malformed_payload_acks_and_skips(monkeypatch, caplog): """malformed JSON 應 ACK 避免卡 pending,但不建 DB record。""" fake_r = MagicMock() fake_r.xack = AsyncMock() monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", lambda: fake_r) # Act proc = AiderEventProcessor() data = {b"payload": b"this is not json"} await proc._process_one("stream", "1-0", data) # Assert fake_r.xack.assert_called_once() # 壞 payload ACK 避免卡 pending @pytest.mark.asyncio async def test_incident_failure_still_writes_db(monkeypatch): """incident engine 壞掉時,event 仍要進 DB(不丟資料)。""" fake_engine = MagicMock() fake_engine.process_signal = AsyncMock(side_effect=RuntimeError("engine down")) monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine", lambda: fake_engine) inserted = {} class FakeRepo: def __init__(self, sess): pass async def insert(self, **kw): inserted.update(kw); return 1 class FakeSession: async def __aenter__(self): return self async def __aexit__(self, *a): return False async def commit(self): pass monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository", FakeRepo) monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory", lambda: (lambda: FakeSession())) fake_r = MagicMock() fake_r.xack = AsyncMock() monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", lambda: fake_r) # Act proc = AiderEventProcessor() payload = _payload_dict() data = {b"payload": json.dumps(payload).encode()} await proc._process_one("stream", "1-0", data) # Assert assert inserted.get("type_") == "error" assert inserted.get("incident_id") is None # engine 壞,無 id fake_r.xack.assert_called_once() # 仍 ACK