fix(tests): 移 DB 測試到 integration 層修復 CI asyncpg 密碼錯誤
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 1m55s

test_aider_event_processor.py 的三個真實 DB 測試在 CI 單元測試層
(tests/)因連線 awoooi_dev DB 失敗(密碼不符)而中斷。

正確架構:
  tests/                  — 單元測試,CI 直接跑,無 DB
  tests/integration/      — 整合測試,CI --ignore,K8s E2E 覆蓋

修復:
- tests/test_aider_event_processor.py 只保留無 DB 的 malformed payload 測試
- 三個 DB 測試移至 tests/integration/test_aider_event_processor_integration.py
  改用 conftest db_session fixture,不自建 engine(避免密碼硬碼)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-22 01:41:34 +08:00
parent 5e353407f7
commit a6788c2baa
2 changed files with 135 additions and 146 deletions

View File

@@ -0,0 +1,129 @@
# tests/integration/test_aider_event_processor_integration.py
# 2026-04-22 @ Asia/Taipei: 從 tests/test_aider_event_processor.py 遷移
# 需真實 PostgreSQL 的三個測試移到 integration 層,符合 CI 架構:
# tests/ — 單元測試(無 DBCI 直接執行
# tests/integration/ — 整合測試(真實 DBCI --ignore 後由 K8s E2E 覆蓋
"""Integration tests for AiderEventProcessor — 真實 DB + mock 外部服務。"""
import json
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from src.repositories.aider_event_repository import AiderEventRepository
from src.workers.aider_event_processor import AiderEventProcessor
TAIPEI = timezone(timedelta(hours=8))
def _payload_dict(type_="error"):
base = {
"ts": datetime.now(TAIPEI).isoformat(),
"session_id": "test-s1", "host": "ogt-mac",
"type": type_,
"payload": {"cwd": "/r", "model": "elephant-alpha",
"kind": "api_rate_limit", "message": "429",
"context_50chars": ""},
}
if type_ == "session_start":
base["payload"] = {"cwd": "/r", "model": "m", "aider_args": [],
"aider_pid": 1, "cli_version": "0"}
return base
# =============================================================================
# real_factory fixture — 真實 DB測試後 rollback
# =============================================================================
@pytest_asyncio.fixture
async def real_factory(db_session):
"""用 integration conftest 的 db_session 建立 _process_one 可用的 factory。"""
class _SingleSessionFactory:
def __call__(self):
return _SessionCtx(db_session)
class _SessionCtx:
def __init__(self, sess): self._sess = sess
async def __aenter__(self): return self._sess
async def __aexit__(self, *a): pass
return _SingleSessionFactory()
# =============================================================================
# Tests
# =============================================================================
@pytest.mark.asyncio
@pytest.mark.integration
async def test_process_one_error_event_creates_incident_and_writes_db(real_factory, db_session):
fake_incident = MagicMock()
fake_incident.incident_id = "inc-123"
fake_engine = MagicMock()
fake_engine.process_signal = AsyncMock(return_value=fake_incident)
fake_r = MagicMock()
fake_r.xack = AsyncMock()
proc = AiderEventProcessor()
data = {b"payload": json.dumps(_payload_dict("error")).encode()}
import unittest.mock as um
with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \
um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r):
await proc._process_one("stream", "1-0", data, _session_factory=real_factory)
assert fake_engine.process_signal.called
fake_r.xack.assert_called_once()
repo = AiderEventRepository(db_session)
count = await repo.count_by_session("test-s1")
assert count == 1
@pytest.mark.asyncio
@pytest.mark.integration
async def test_process_one_session_start_no_incident(real_factory, db_session):
fake_engine = MagicMock()
fake_engine.process_signal = AsyncMock()
fake_r = MagicMock()
fake_r.xack = AsyncMock()
proc = AiderEventProcessor()
data = {b"payload": json.dumps(_payload_dict("session_start")).encode()}
import unittest.mock as um
with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \
um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r):
await proc._process_one("stream", "1-0", data, _session_factory=real_factory)
assert not fake_engine.process_signal.called
fake_r.xack.assert_called_once()
repo = AiderEventRepository(db_session)
count = await repo.count_by_session("test-s1")
assert count == 1
@pytest.mark.asyncio
@pytest.mark.integration
async def test_incident_failure_still_writes_db(real_factory, db_session):
fake_engine = MagicMock()
fake_engine.process_signal = AsyncMock(side_effect=RuntimeError("engine down"))
fake_r = MagicMock()
fake_r.xack = AsyncMock()
proc = AiderEventProcessor()
data = {b"payload": json.dumps(_payload_dict("error")).encode()}
import unittest.mock as um
with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \
um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r):
await proc._process_one("stream", "1-0", data, _session_factory=real_factory)
fake_r.xack.assert_called_once()
repo = AiderEventRepository(db_session)
count = await repo.count_by_session("test-s1")
assert count == 1

View File

@@ -1,132 +1,16 @@
# test_aider_event_processor | 2026-04-22 @ Asia/Taipei
# 重構:移除 FakeRepo/FakeSession違反 feedback_no_mock_testing.md
# 方案_process_one 加 _session_factory DI 參數,測試注入真實 asyncpg 連線
# Redis xack + IncidentEngine 仍 mock外部 broker/AI 服務,符合「外部 API 例外」)。
"""Unit tests for AiderEventProcessor — 真實 DB + mock 外部服務。"""
import json
import os
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, MagicMock
# 需真實 DB 的三個測試已移至 tests/integration/test_aider_event_processor_integration.py
# 此檔案只保留無 DB 依賴的純單元測試,符合 CI 架構tests/ 層不連 DB
"""Unit tests for AiderEventProcessor — 無 DB 依賴。"""
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from src.repositories.aider_event_repository import AiderEventRepository
from src.workers.aider_event_processor import AiderEventProcessor
TAIPEI = timezone(timedelta(hours=8))
_DEV_DB_URL = os.environ.get(
"TEST_DATABASE_URL",
"postgresql+asyncpg://awoooi:awoooi_prod_2026@192.168.0.188:5432/awoooi_dev?ssl=disable",
)
# =============================================================================
# 真實 DB session factory fixture每個測試後 rollback
# =============================================================================
@pytest_asyncio.fixture
async def real_factory():
"""提供真實 PostgreSQL session factory測試後 rollback不污染 DB"""
engine = create_async_engine(_DEV_DB_URL, echo=False)
conn = await engine.connect().__aenter__()
await conn.begin()
session = AsyncSession(bind=conn, expire_on_commit=False, autoflush=False)
# 回傳一個永遠回傳同一個 session 的 factory讓 _process_one 正常使用 async with
class _SingleSessionFactory:
def __call__(self):
return _SessionCtx(session)
class _SessionCtx:
def __init__(self, sess): self._sess = sess
async def __aenter__(self): return self._sess
async def __aexit__(self, *a): pass
yield _SingleSessionFactory()
await conn.rollback()
await conn.__aexit__(None, None, None)
await engine.dispose()
def _payload_dict(type_="error"):
base = {
"ts": datetime.now(TAIPEI).isoformat(),
"session_id": "test-s1", "host": "ogt-mac",
"type": type_,
"payload": {"cwd": "/r", "model": "elephant-alpha",
"kind": "api_rate_limit", "message": "429",
"context_50chars": ""},
}
if type_ == "session_start":
base["payload"] = {"cwd": "/r", "model": "m", "aider_args": [],
"aider_pid": 1, "cli_version": "0"}
return base
# =============================================================================
# Tests — 真實 DB write
# =============================================================================
@pytest.mark.asyncio
async def test_process_one_error_event_creates_incident_and_writes_db(real_factory):
fake_incident = MagicMock()
fake_incident.incident_id = "inc-123"
fake_engine = MagicMock()
fake_engine.process_signal = AsyncMock(return_value=fake_incident)
fake_r = MagicMock()
fake_r.xack = AsyncMock()
proc = AiderEventProcessor()
# Patch 外部服務Redis + IncidentEngine保留真實 DB
proc._redis = fake_r
data = {b"payload": json.dumps(_payload_dict("error")).encode()}
import unittest.mock as um
with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \
um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r):
await proc._process_one("stream", "1-0", data, _session_factory=real_factory)
assert fake_engine.process_signal.called
fake_r.xack.assert_called_once()
# 驗證真實 DB 寫入
async with real_factory() as sess:
repo = AiderEventRepository(sess)
count = await repo.count_by_session("test-s1")
assert count == 1
@pytest.mark.asyncio
async def test_process_one_session_start_no_incident(real_factory):
fake_engine = MagicMock()
fake_engine.process_signal = AsyncMock()
fake_r = MagicMock()
fake_r.xack = AsyncMock()
proc = AiderEventProcessor()
data = {b"payload": json.dumps(_payload_dict("session_start")).encode()}
import unittest.mock as um
with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \
um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r):
await proc._process_one("stream", "1-0", data, _session_factory=real_factory)
assert not fake_engine.process_signal.called # session_start 不建 incident
fake_r.xack.assert_called_once()
async with real_factory() as sess:
repo = AiderEventRepository(sess)
count = await repo.count_by_session("test-s1")
assert count == 1
from unittest.mock import AsyncMock, MagicMock
@pytest.mark.asyncio
async def test_process_one_malformed_payload_acks_and_skips(monkeypatch):
"""malformed JSON 應 ACK 避免卡 pending且不觸碰 DB。"""
from src.workers.aider_event_processor import AiderEventProcessor
fake_r = MagicMock()
fake_r.xack = AsyncMock()
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
@@ -137,27 +21,3 @@ async def test_process_one_malformed_payload_acks_and_skips(monkeypatch):
await proc._process_one("stream", "1-0", data)
fake_r.xack.assert_called_once()
@pytest.mark.asyncio
async def test_incident_failure_still_writes_db(real_factory):
"""incident engine 壞掉時event 仍要進 DB不丟資料"""
fake_engine = MagicMock()
fake_engine.process_signal = AsyncMock(side_effect=RuntimeError("engine down"))
fake_r = MagicMock()
fake_r.xack = AsyncMock()
proc = AiderEventProcessor()
data = {b"payload": json.dumps(_payload_dict("error")).encode()}
import unittest.mock as um
with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \
um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r):
await proc._process_one("stream", "1-0", data, _session_factory=real_factory)
fake_r.xack.assert_called_once() # 仍 ACK
async with real_factory() as sess:
repo = AiderEventRepository(sess)
count = await repo.count_by_session("test-s1")
assert count == 1 # DB 依然寫入,即使 incident engine 壞了