diff --git a/apps/api/tests/integration/test_aider_event_processor_integration.py b/apps/api/tests/integration/test_aider_event_processor_integration.py new file mode 100644 index 00000000..2315b506 --- /dev/null +++ b/apps/api/tests/integration/test_aider_event_processor_integration.py @@ -0,0 +1,129 @@ +# tests/integration/test_aider_event_processor_integration.py +# 2026-04-22 @ Asia/Taipei: 從 tests/test_aider_event_processor.py 遷移 +# 需真實 PostgreSQL 的三個測試移到 integration 層,符合 CI 架構: +# tests/ — 單元測試(無 DB),CI 直接執行 +# tests/integration/ — 整合測試(真實 DB),CI --ignore 後由 K8s E2E 覆蓋 +"""Integration tests for AiderEventProcessor — 真實 DB + mock 外部服務。""" +import json +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock + +import pytest +import pytest_asyncio +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine + +from src.repositories.aider_event_repository import AiderEventRepository +from src.workers.aider_event_processor import AiderEventProcessor + +TAIPEI = timezone(timedelta(hours=8)) + + +def _payload_dict(type_="error"): + base = { + "ts": datetime.now(TAIPEI).isoformat(), + "session_id": "test-s1", "host": "ogt-mac", + "type": type_, + "payload": {"cwd": "/r", "model": "elephant-alpha", + "kind": "api_rate_limit", "message": "429", + "context_50chars": ""}, + } + if type_ == "session_start": + base["payload"] = {"cwd": "/r", "model": "m", "aider_args": [], + "aider_pid": 1, "cli_version": "0"} + return base + + +# ============================================================================= +# real_factory fixture — 真實 DB,測試後 rollback +# ============================================================================= + +@pytest_asyncio.fixture +async def real_factory(db_session): + """用 integration conftest 的 db_session 建立 _process_one 可用的 factory。""" + class _SingleSessionFactory: + def __call__(self): + return _SessionCtx(db_session) + + class _SessionCtx: + def __init__(self, sess): self._sess = sess + async def __aenter__(self): return self._sess + async def __aexit__(self, *a): pass + + return _SingleSessionFactory() + + +# ============================================================================= +# Tests +# ============================================================================= + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_process_one_error_event_creates_incident_and_writes_db(real_factory, db_session): + fake_incident = MagicMock() + fake_incident.incident_id = "inc-123" + fake_engine = MagicMock() + fake_engine.process_signal = AsyncMock(return_value=fake_incident) + fake_r = MagicMock() + fake_r.xack = AsyncMock() + + proc = AiderEventProcessor() + data = {b"payload": json.dumps(_payload_dict("error")).encode()} + + import unittest.mock as um + with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ + um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): + await proc._process_one("stream", "1-0", data, _session_factory=real_factory) + + assert fake_engine.process_signal.called + fake_r.xack.assert_called_once() + + repo = AiderEventRepository(db_session) + count = await repo.count_by_session("test-s1") + assert count == 1 + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_process_one_session_start_no_incident(real_factory, db_session): + fake_engine = MagicMock() + fake_engine.process_signal = AsyncMock() + fake_r = MagicMock() + fake_r.xack = AsyncMock() + + proc = AiderEventProcessor() + data = {b"payload": json.dumps(_payload_dict("session_start")).encode()} + + import unittest.mock as um + with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ + um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): + await proc._process_one("stream", "1-0", data, _session_factory=real_factory) + + assert not fake_engine.process_signal.called + fake_r.xack.assert_called_once() + + repo = AiderEventRepository(db_session) + count = await repo.count_by_session("test-s1") + assert count == 1 + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_incident_failure_still_writes_db(real_factory, db_session): + fake_engine = MagicMock() + fake_engine.process_signal = AsyncMock(side_effect=RuntimeError("engine down")) + fake_r = MagicMock() + fake_r.xack = AsyncMock() + + proc = AiderEventProcessor() + data = {b"payload": json.dumps(_payload_dict("error")).encode()} + + import unittest.mock as um + with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ + um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): + await proc._process_one("stream", "1-0", data, _session_factory=real_factory) + + fake_r.xack.assert_called_once() + + repo = AiderEventRepository(db_session) + count = await repo.count_by_session("test-s1") + assert count == 1 diff --git a/apps/api/tests/test_aider_event_processor.py b/apps/api/tests/test_aider_event_processor.py index 9706b837..9f0707a5 100644 --- a/apps/api/tests/test_aider_event_processor.py +++ b/apps/api/tests/test_aider_event_processor.py @@ -1,132 +1,16 @@ # test_aider_event_processor | 2026-04-22 @ Asia/Taipei -# 重構:移除 FakeRepo/FakeSession(違反 feedback_no_mock_testing.md) -# 方案:_process_one 加 _session_factory DI 參數,測試注入真實 asyncpg 連線。 -# Redis xack + IncidentEngine 仍 mock(外部 broker/AI 服務,符合「外部 API 例外」)。 -"""Unit tests for AiderEventProcessor — 真實 DB + mock 外部服務。""" -import json -import os -from datetime import datetime, timedelta, timezone -from unittest.mock import AsyncMock, MagicMock - +# 需真實 DB 的三個測試已移至 tests/integration/test_aider_event_processor_integration.py +# 此檔案只保留無 DB 依賴的純單元測試,符合 CI 架構(tests/ 層不連 DB)。 +"""Unit tests for AiderEventProcessor — 無 DB 依賴。""" import pytest -import pytest_asyncio -from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine - -from src.repositories.aider_event_repository import AiderEventRepository -from src.workers.aider_event_processor import AiderEventProcessor - -TAIPEI = timezone(timedelta(hours=8)) -_DEV_DB_URL = os.environ.get( - "TEST_DATABASE_URL", - "postgresql+asyncpg://awoooi:awoooi_prod_2026@192.168.0.188:5432/awoooi_dev?ssl=disable", -) - - -# ============================================================================= -# 真實 DB session factory fixture(每個測試後 rollback) -# ============================================================================= - -@pytest_asyncio.fixture -async def real_factory(): - """提供真實 PostgreSQL session factory,測試後 rollback(不污染 DB)。""" - engine = create_async_engine(_DEV_DB_URL, echo=False) - conn = await engine.connect().__aenter__() - await conn.begin() - session = AsyncSession(bind=conn, expire_on_commit=False, autoflush=False) - - # 回傳一個永遠回傳同一個 session 的 factory(讓 _process_one 正常使用 async with) - class _SingleSessionFactory: - def __call__(self): - return _SessionCtx(session) - - class _SessionCtx: - def __init__(self, sess): self._sess = sess - async def __aenter__(self): return self._sess - async def __aexit__(self, *a): pass - - yield _SingleSessionFactory() - - await conn.rollback() - await conn.__aexit__(None, None, None) - await engine.dispose() - - -def _payload_dict(type_="error"): - base = { - "ts": datetime.now(TAIPEI).isoformat(), - "session_id": "test-s1", "host": "ogt-mac", - "type": type_, - "payload": {"cwd": "/r", "model": "elephant-alpha", - "kind": "api_rate_limit", "message": "429", - "context_50chars": ""}, - } - if type_ == "session_start": - base["payload"] = {"cwd": "/r", "model": "m", "aider_args": [], - "aider_pid": 1, "cli_version": "0"} - return base - - -# ============================================================================= -# Tests — 真實 DB write -# ============================================================================= - -@pytest.mark.asyncio -async def test_process_one_error_event_creates_incident_and_writes_db(real_factory): - fake_incident = MagicMock() - fake_incident.incident_id = "inc-123" - fake_engine = MagicMock() - fake_engine.process_signal = AsyncMock(return_value=fake_incident) - - fake_r = MagicMock() - fake_r.xack = AsyncMock() - - proc = AiderEventProcessor() - # Patch 外部服務(Redis + IncidentEngine),保留真實 DB - proc._redis = fake_r - data = {b"payload": json.dumps(_payload_dict("error")).encode()} - - import unittest.mock as um - with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ - um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): - await proc._process_one("stream", "1-0", data, _session_factory=real_factory) - - assert fake_engine.process_signal.called - fake_r.xack.assert_called_once() - - # 驗證真實 DB 寫入 - async with real_factory() as sess: - repo = AiderEventRepository(sess) - count = await repo.count_by_session("test-s1") - assert count == 1 - - -@pytest.mark.asyncio -async def test_process_one_session_start_no_incident(real_factory): - fake_engine = MagicMock() - fake_engine.process_signal = AsyncMock() - fake_r = MagicMock() - fake_r.xack = AsyncMock() - - proc = AiderEventProcessor() - data = {b"payload": json.dumps(_payload_dict("session_start")).encode()} - - import unittest.mock as um - with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ - um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): - await proc._process_one("stream", "1-0", data, _session_factory=real_factory) - - assert not fake_engine.process_signal.called # session_start 不建 incident - fake_r.xack.assert_called_once() - - async with real_factory() as sess: - repo = AiderEventRepository(sess) - count = await repo.count_by_session("test-s1") - assert count == 1 +from unittest.mock import AsyncMock, MagicMock @pytest.mark.asyncio async def test_process_one_malformed_payload_acks_and_skips(monkeypatch): """malformed JSON 應 ACK 避免卡 pending,且不觸碰 DB。""" + from src.workers.aider_event_processor import AiderEventProcessor + fake_r = MagicMock() fake_r.xack = AsyncMock() monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", @@ -137,27 +21,3 @@ async def test_process_one_malformed_payload_acks_and_skips(monkeypatch): await proc._process_one("stream", "1-0", data) fake_r.xack.assert_called_once() - - -@pytest.mark.asyncio -async def test_incident_failure_still_writes_db(real_factory): - """incident engine 壞掉時,event 仍要進 DB(不丟資料)。""" - fake_engine = MagicMock() - fake_engine.process_signal = AsyncMock(side_effect=RuntimeError("engine down")) - fake_r = MagicMock() - fake_r.xack = AsyncMock() - - proc = AiderEventProcessor() - data = {b"payload": json.dumps(_payload_dict("error")).encode()} - - import unittest.mock as um - with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ - um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): - await proc._process_one("stream", "1-0", data, _session_factory=real_factory) - - fake_r.xack.assert_called_once() # 仍 ACK - - async with real_factory() as sess: - repo = AiderEventRepository(sess) - count = await repo.count_by_session("test-s1") - assert count == 1 # DB 依然寫入,即使 incident engine 壞了