# test_aider_event_processor | 2026-04-22 @ Asia/Taipei # 重構:移除 FakeRepo/FakeSession(違反 feedback_no_mock_testing.md) # 方案:_process_one 加 _session_factory DI 參數,測試注入真實 asyncpg 連線。 # Redis xack + IncidentEngine 仍 mock(外部 broker/AI 服務,符合「外部 API 例外」)。 """Unit tests for AiderEventProcessor — 真實 DB + mock 外部服務。""" import json import os from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, MagicMock import pytest import pytest_asyncio from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from src.repositories.aider_event_repository import AiderEventRepository from src.workers.aider_event_processor import AiderEventProcessor TAIPEI = timezone(timedelta(hours=8)) _DEV_DB_URL = os.environ.get( "TEST_DATABASE_URL", "postgresql+asyncpg://awoooi:awoooi_prod_2026@192.168.0.188:5432/awoooi_dev?ssl=disable", ) # ============================================================================= # 真實 DB session factory fixture(每個測試後 rollback) # ============================================================================= @pytest_asyncio.fixture async def real_factory(): """提供真實 PostgreSQL session factory,測試後 rollback(不污染 DB)。""" engine = create_async_engine(_DEV_DB_URL, echo=False) conn = await engine.connect().__aenter__() await conn.begin() session = AsyncSession(bind=conn, expire_on_commit=False, autoflush=False) # 回傳一個永遠回傳同一個 session 的 factory(讓 _process_one 正常使用 async with) class _SingleSessionFactory: def __call__(self): return _SessionCtx(session) class _SessionCtx: def __init__(self, sess): self._sess = sess async def __aenter__(self): return self._sess async def __aexit__(self, *a): pass yield _SingleSessionFactory() await conn.rollback() await conn.__aexit__(None, None, None) await engine.dispose() def _payload_dict(type_="error"): base = { "ts": datetime.now(TAIPEI).isoformat(), "session_id": "test-s1", "host": "ogt-mac", "type": type_, "payload": {"cwd": "/r", "model": "elephant-alpha", "kind": "api_rate_limit", "message": "429", "context_50chars": ""}, } if type_ == "session_start": base["payload"] = {"cwd": "/r", "model": "m", "aider_args": [], "aider_pid": 1, "cli_version": "0"} return base # ============================================================================= # Tests — 真實 DB write # ============================================================================= @pytest.mark.asyncio async def test_process_one_error_event_creates_incident_and_writes_db(real_factory): fake_incident = MagicMock() fake_incident.incident_id = "inc-123" fake_engine = MagicMock() fake_engine.process_signal = AsyncMock(return_value=fake_incident) fake_r = MagicMock() fake_r.xack = AsyncMock() proc = AiderEventProcessor() # Patch 外部服務(Redis + IncidentEngine),保留真實 DB proc._redis = fake_r data = {b"payload": json.dumps(_payload_dict("error")).encode()} import unittest.mock as um with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): await proc._process_one("stream", "1-0", data, _session_factory=real_factory) assert fake_engine.process_signal.called fake_r.xack.assert_called_once() # 驗證真實 DB 寫入 async with real_factory() as sess: repo = AiderEventRepository(sess) count = await repo.count_by_session("test-s1") assert count == 1 @pytest.mark.asyncio async def test_process_one_session_start_no_incident(real_factory): fake_engine = MagicMock() fake_engine.process_signal = AsyncMock() fake_r = MagicMock() fake_r.xack = AsyncMock() proc = AiderEventProcessor() data = {b"payload": json.dumps(_payload_dict("session_start")).encode()} import unittest.mock as um with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): await proc._process_one("stream", "1-0", data, _session_factory=real_factory) assert not fake_engine.process_signal.called # session_start 不建 incident fake_r.xack.assert_called_once() async with real_factory() as sess: repo = AiderEventRepository(sess) count = await repo.count_by_session("test-s1") assert count == 1 @pytest.mark.asyncio async def test_process_one_malformed_payload_acks_and_skips(monkeypatch): """malformed JSON 應 ACK 避免卡 pending,且不觸碰 DB。""" fake_r = MagicMock() fake_r.xack = AsyncMock() monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", lambda: fake_r) proc = AiderEventProcessor() data = {b"payload": b"this is not json"} await proc._process_one("stream", "1-0", data) fake_r.xack.assert_called_once() @pytest.mark.asyncio async def test_incident_failure_still_writes_db(real_factory): """incident engine 壞掉時,event 仍要進 DB(不丟資料)。""" fake_engine = MagicMock() fake_engine.process_signal = AsyncMock(side_effect=RuntimeError("engine down")) fake_r = MagicMock() fake_r.xack = AsyncMock() proc = AiderEventProcessor() data = {b"payload": json.dumps(_payload_dict("error")).encode()} import unittest.mock as um with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): await proc._process_one("stream", "1-0", data, _session_factory=real_factory) fake_r.xack.assert_called_once() # 仍 ACK async with real_factory() as sess: repo = AiderEventRepository(sess) count = await repo.count_by_session("test-s1") assert count == 1 # DB 依然寫入,即使 incident engine 壞了