diff --git a/apps/api/src/services/ai_router.py b/apps/api/src/services/ai_router.py index b960b9b8..2001f00c 100644 --- a/apps/api/src/services/ai_router.py +++ b/apps/api/src/services/ai_router.py @@ -689,6 +689,13 @@ class AIRouter: logger.debug("ai_router_feedback_aggregation_failed") return {} + return self._aggregate_feedback_stats(stats, repo=repo) + + @staticmethod + def _aggregate_feedback_stats( + stats: list[dict], repo: str | None = None + ) -> dict[str, float]: + """純函數:過濾 repo 並將 stats 轉換為 {model: success_rate}(可獨立單元測試)。""" out: dict[str, float] = {} for row in stats: if repo and row.get("repo") != repo: diff --git a/apps/api/src/workers/aider_event_processor.py b/apps/api/src/workers/aider_event_processor.py index 4ceaeb6c..6754755c 100644 --- a/apps/api/src/workers/aider_event_processor.py +++ b/apps/api/src/workers/aider_event_processor.py @@ -119,8 +119,12 @@ class AiderEventProcessor: logger.exception("aider_processor_loop_error", error=str(e)) await asyncio.sleep(1.0) - async def _process_one(self, stream_key: str, msg_id: Any, data: dict) -> None: - """處理單筆 message:parse → (maybe) incident → DB write → ACK。""" + async def _process_one( + self, stream_key: str, msg_id: Any, data: dict, _session_factory=None + ) -> None: + """處理單筆 message:parse → (maybe) incident → DB write → ACK。 + _session_factory: 可注入測試用 factory,預設使用 get_session_factory()。 + """ try: raw = data.get(b"payload") or data.get("payload") if isinstance(raw, bytes): @@ -147,7 +151,7 @@ class AiderEventProcessor: # 不中斷 — 即使 incident 失敗,event 仍要持久化 try: - session_factory = get_session_factory() + session_factory = _session_factory or get_session_factory() async with session_factory() as session: repo = AiderEventRepository(session) await repo.insert( diff --git a/apps/api/tests/integration/conftest.py b/apps/api/tests/integration/conftest.py index d74b4357..6654f9c3 100644 --- a/apps/api/tests/integration/conftest.py +++ b/apps/api/tests/integration/conftest.py @@ -23,12 +23,14 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_asyn DEV_DB_URL = os.environ.get( "TEST_DATABASE_URL", + # 注意:密碼 awoooi_prod_2026 是 PostgreSQL 帳號的實際密碼(歷史命名), + # 並非指向 prod DB — 連線目標為 awoooi_dev(開發資料庫)。 "postgresql+asyncpg://awoooi:awoooi_prod_2026@192.168.0.188:5432/awoooi_dev?ssl=disable", ) -# 確保不會誤打 prod -assert "prod" not in DEV_DB_URL or "awoooi_prod_2026" in DEV_DB_URL, ( - "TEST_DATABASE_URL 不可指向 prod DB" +# 確保不會誤打 prod(允許密碼含 "prod" 字串,檢查 DB 名稱) +assert "awoooi_prod" not in DEV_DB_URL.split("/")[-1], ( + "TEST_DATABASE_URL 不可指向 prod DB(awoooi_prod)" ) diff --git a/apps/api/tests/integration/setup_test_schema.sql b/apps/api/tests/integration/setup_test_schema.sql index bff76201..aa8c73f4 100644 --- a/apps/api/tests/integration/setup_test_schema.sql +++ b/apps/api/tests/integration/setup_test_schema.sql @@ -119,3 +119,4 @@ CREATE TABLE IF NOT EXISTS aider_events ( CREATE INDEX IF NOT EXISTS aider_events_session_idx ON aider_events(session_id); CREATE INDEX IF NOT EXISTS aider_events_type_ts_idx ON aider_events(type, ts DESC); CREATE INDEX IF NOT EXISTS aider_events_ts_idx ON aider_events(ts DESC); +CREATE INDEX IF NOT EXISTS aider_events_payload_gin ON aider_events USING GIN (payload); diff --git a/apps/api/tests/test_ai_router_feedback.py b/apps/api/tests/test_ai_router_feedback.py index a97f6ef1..ea6414e6 100644 --- a/apps/api/tests/test_ai_router_feedback.py +++ b/apps/api/tests/test_ai_router_feedback.py @@ -1,99 +1,65 @@ # apps/api/tests/test_ai_router_feedback.py | 2026-04-20 @ Asia/Taipei -# 2026-04-22 @ Asia/Taipei: FakeRepo / FakeSession 違反 feedback_no_mock_testing.md -# → DB 聚合查詢測試已遷移至 integration/test_ai_router_feedback_integration.py(真實 DB) -# 此檔案保留的測試驗證「DB 不可用時的降級行為」(fail_sf) — 此為錯誤路徑邏輯, -# 非正常 DB 查詢,可留作 unit 層覆蓋。 -# FakeRepo 測試(test_feedback_aggregates_by_model 等)已被 integration test 取代, -# 下方保留作參考,但實際 DB 行為請以 integration test 為準。 -"""Task A8: AIRouter.feedback_from_aider_events — 降級行為 + 邊界條件測試。""" +# 2026-04-22 @ Asia/Taipei: 重構移除 FakeRepo/FakeSession(違反 feedback_no_mock_testing.md) +# 方案:抽取 AIRouter._aggregate_feedback_stats 純函數,直接單元測試,零 DB 依賴。 +# DB 聚合查詢行為已由 integration/test_ai_router_feedback_integration.py 覆蓋。 +"""Unit tests for AIRouter._aggregate_feedback_stats — 純邏輯,無 DB。""" import pytest -from unittest.mock import AsyncMock, MagicMock from src.services.ai_router import AIRouter -@pytest.mark.asyncio -async def test_feedback_aggregates_by_model(monkeypatch): +# ============================================================================= +# _aggregate_feedback_stats 純函數測試(無 DB 依賴) +# ============================================================================= + +def test_feedback_aggregates_by_model(): stats = [ {"repo": "awoooi", "model": "elephant-alpha", "total": 10, "errors": 2, "success_rate": 0.8}, {"repo": "awoooi", "model": "gemini-pro", "total": 5, "errors": 0, "success_rate": 1.0}, ] - - class FakeRepo: - def __init__(self, sess): pass - async def model_stats_since(self, days): return stats - - class FakeSession: - async def __aenter__(self): return self - async def __aexit__(self, *a): return False - - monkeypatch.setattr("src.services.ai_router.get_session_factory", - lambda: (lambda: FakeSession()), raising=False) - monkeypatch.setattr("src.repositories.aider_event_repository.AiderEventRepository", - FakeRepo) - - r = AIRouter() - out = await r.feedback_from_aider_events(days=7) + out = AIRouter._aggregate_feedback_stats(stats) assert out["elephant-alpha"] == 0.8 assert out["gemini-pro"] == 1.0 -@pytest.mark.asyncio -async def test_feedback_filters_by_repo(monkeypatch): +def test_feedback_filters_by_repo(): stats = [ {"repo": "awoooi", "model": "elephant-alpha", "total": 5, "errors": 1, "success_rate": 0.8}, {"repo": "other-repo", "model": "elephant-alpha", "total": 3, "errors": 3, "success_rate": 0.0}, ] + out = AIRouter._aggregate_feedback_stats(stats, repo="awoooi") + assert out == {"elephant-alpha": 0.8} - class FakeRepo: - def __init__(self, sess): pass - async def model_stats_since(self, days): return stats - class FakeSession: - async def __aenter__(self): return self - async def __aexit__(self, *a): return False +def test_feedback_handles_empty_stats(): + out = AIRouter._aggregate_feedback_stats([]) + assert out == {} - monkeypatch.setattr("src.services.ai_router.get_session_factory", - lambda: (lambda: FakeSession()), raising=False) - monkeypatch.setattr("src.repositories.aider_event_repository.AiderEventRepository", - FakeRepo) - r = AIRouter() - out = await r.feedback_from_aider_events(repo="awoooi", days=7) - assert out == {"elephant-alpha": 0.8} # other-repo 過濾掉 +def test_feedback_skips_missing_model(): + stats = [ + {"repo": "awoooi", "model": None, "success_rate": 0.9}, + {"repo": "awoooi", "model": "gemini-pro", "success_rate": 0.7}, + ] + out = AIRouter._aggregate_feedback_stats(stats) + assert list(out.keys()) == ["gemini-pro"] +# ============================================================================= +# feedback_from_aider_events — DB 失敗降級行為(error path,無 FakeRepo) +# ============================================================================= + @pytest.mark.asyncio async def test_feedback_returns_empty_on_db_failure(monkeypatch): - def fail_sf(): - raise RuntimeError("DB unavailable") - - monkeypatch.setattr("src.services.ai_router.get_session_factory", - fail_sf, raising=False) + monkeypatch.setattr( + "src.services.ai_router.get_session_factory", + lambda: (_ for _ in ()).throw(RuntimeError("DB unavailable")), + raising=False, + ) r = AIRouter() out = await r.feedback_from_aider_events(days=7) - assert out == {} # 降級為空 dict,caller 不該崩 - - -@pytest.mark.asyncio -async def test_feedback_handles_empty_stats(monkeypatch): - class FakeRepo: - def __init__(self, sess): pass - async def model_stats_since(self, days): return [] - - class FakeSession: - async def __aenter__(self): return self - async def __aexit__(self, *a): return False - - monkeypatch.setattr("src.services.ai_router.get_session_factory", - lambda: (lambda: FakeSession()), raising=False) - monkeypatch.setattr("src.repositories.aider_event_repository.AiderEventRepository", - FakeRepo) - - r = AIRouter() - out = await r.feedback_from_aider_events() assert out == {} diff --git a/apps/api/tests/test_aider_event_processor.py b/apps/api/tests/test_aider_event_processor.py index 0423f12e..9706b837 100644 --- a/apps/api/tests/test_aider_event_processor.py +++ b/apps/api/tests/test_aider_event_processor.py @@ -1,169 +1,163 @@ -# test_aider_event_processor | 2026-04-20 @ Asia/Taipei -# 2026-04-22 @ Asia/Taipei: DB/Redis mock 違反 feedback_no_mock_testing.md -# - FakeRepo / FakeSession → 已遷移至 integration/test_aider_event_repository.py(真實 DB) -# - fake_r (Redis xack) → 屬外部 broker,保留 mock 符合「外部 API 例外」 -# - fake_engine (IncidentEngine) → 屬外部 AI 呼叫,保留 mock 符合「外部 API 例外」 -# 此檔案保留 _process_one 的 parse / ACK / incident routing 邏輯測試, -# DB 寫入行為已由 integration test 覆蓋。 -"""Unit tests for AiderEventProcessor — parse/ACK/incident routing 邏輯。""" -import pytest +# test_aider_event_processor | 2026-04-22 @ Asia/Taipei +# 重構:移除 FakeRepo/FakeSession(違反 feedback_no_mock_testing.md) +# 方案:_process_one 加 _session_factory DI 參數,測試注入真實 asyncpg 連線。 +# Redis xack + IncidentEngine 仍 mock(外部 broker/AI 服務,符合「外部 API 例外」)。 +"""Unit tests for AiderEventProcessor — 真實 DB + mock 外部服務。""" import json -from datetime import datetime, timezone, timedelta -from unittest.mock import AsyncMock, MagicMock, patch +import os +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, MagicMock + +import pytest +import pytest_asyncio +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine + +from src.repositories.aider_event_repository import AiderEventRepository from src.workers.aider_event_processor import AiderEventProcessor - TAIPEI = timezone(timedelta(hours=8)) +_DEV_DB_URL = os.environ.get( + "TEST_DATABASE_URL", + "postgresql+asyncpg://awoooi:awoooi_prod_2026@192.168.0.188:5432/awoooi_dev?ssl=disable", +) -def _payload_dict(): - """基本的 aider event payload。""" - return { +# ============================================================================= +# 真實 DB session factory fixture(每個測試後 rollback) +# ============================================================================= + +@pytest_asyncio.fixture +async def real_factory(): + """提供真實 PostgreSQL session factory,測試後 rollback(不污染 DB)。""" + engine = create_async_engine(_DEV_DB_URL, echo=False) + conn = await engine.connect().__aenter__() + await conn.begin() + session = AsyncSession(bind=conn, expire_on_commit=False, autoflush=False) + + # 回傳一個永遠回傳同一個 session 的 factory(讓 _process_one 正常使用 async with) + class _SingleSessionFactory: + def __call__(self): + return _SessionCtx(session) + + class _SessionCtx: + def __init__(self, sess): self._sess = sess + async def __aenter__(self): return self._sess + async def __aexit__(self, *a): pass + + yield _SingleSessionFactory() + + await conn.rollback() + await conn.__aexit__(None, None, None) + await engine.dispose() + + +def _payload_dict(type_="error"): + base = { "ts": datetime.now(TAIPEI).isoformat(), - "session_id": "s1", "host": "ogt-mac", - "type": "error", + "session_id": "test-s1", "host": "ogt-mac", + "type": type_, "payload": {"cwd": "/r", "model": "elephant-alpha", "kind": "api_rate_limit", "message": "429", "context_50chars": ""}, } + if type_ == "session_start": + base["payload"] = {"cwd": "/r", "model": "m", "aider_args": [], + "aider_pid": 1, "cli_version": "0"} + return base +# ============================================================================= +# Tests — 真實 DB write +# ============================================================================= + @pytest.mark.asyncio -async def test_process_one_error_event_creates_incident_and_writes_db(monkeypatch): - """error event 應建 incident + 寫 DB。""" - # Mock incident engine +async def test_process_one_error_event_creates_incident_and_writes_db(real_factory): fake_incident = MagicMock() fake_incident.incident_id = "inc-123" fake_engine = MagicMock() fake_engine.process_signal = AsyncMock(return_value=fake_incident) - monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine", - lambda: fake_engine) - # Mock DB session factory + repo - inserted = {} - class FakeRepo: - def __init__(self, sess): pass - async def insert(self, **kw): inserted.update(kw); return 1 - class FakeSession: - async def __aenter__(self): return self - async def __aexit__(self, *a): return False - async def commit(self): pass - monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository", - FakeRepo) - monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory", - lambda: (lambda: FakeSession())) - - # Mock redis ack fake_r = MagicMock() fake_r.xack = AsyncMock() - monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", - lambda: fake_r) - # Act proc = AiderEventProcessor() - payload = _payload_dict() - data = {b"payload": json.dumps(payload).encode()} - await proc._process_one("stream", "1-0", data) + # Patch 外部服務(Redis + IncidentEngine),保留真實 DB + proc._redis = fake_r + data = {b"payload": json.dumps(_payload_dict("error")).encode()} + + import unittest.mock as um + with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ + um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): + await proc._process_one("stream", "1-0", data, _session_factory=real_factory) - # Assert assert fake_engine.process_signal.called - assert inserted.get("incident_id") == "inc-123" - assert inserted.get("type_") == "error" fake_r.xack.assert_called_once() + # 驗證真實 DB 寫入 + async with real_factory() as sess: + repo = AiderEventRepository(sess) + count = await repo.count_by_session("test-s1") + assert count == 1 + @pytest.mark.asyncio -async def test_process_one_session_start_no_incident(monkeypatch): - """session_start 不應建 incident,但應寫 DB。""" +async def test_process_one_session_start_no_incident(real_factory): fake_engine = MagicMock() fake_engine.process_signal = AsyncMock() - monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine", - lambda: fake_engine) - - inserted = {} - class FakeRepo: - def __init__(self, sess): pass - async def insert(self, **kw): inserted.update(kw); return 1 - class FakeSession: - async def __aenter__(self): return self - async def __aexit__(self, *a): return False - async def commit(self): pass - monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository", - FakeRepo) - monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory", - lambda: (lambda: FakeSession())) - fake_r = MagicMock() fake_r.xack = AsyncMock() - monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", - lambda: fake_r) - # Act proc = AiderEventProcessor() - payload = _payload_dict() - payload["type"] = "session_start" - payload["payload"] = {"cwd": "/r", "model": "m", "aider_args": [], - "aider_pid": 1, "cli_version": "0"} - data = {b"payload": json.dumps(payload).encode()} - await proc._process_one("stream", "1-0", data) + data = {b"payload": json.dumps(_payload_dict("session_start")).encode()} + + import unittest.mock as um + with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ + um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): + await proc._process_one("stream", "1-0", data, _session_factory=real_factory) - # Assert assert not fake_engine.process_signal.called # session_start 不建 incident - assert inserted.get("incident_id") is None # DB 依然寫入 - assert inserted.get("type_") == "session_start" fake_r.xack.assert_called_once() + async with real_factory() as sess: + repo = AiderEventRepository(sess) + count = await repo.count_by_session("test-s1") + assert count == 1 + @pytest.mark.asyncio -async def test_process_one_malformed_payload_acks_and_skips(monkeypatch, caplog): - """malformed JSON 應 ACK 避免卡 pending,但不建 DB record。""" +async def test_process_one_malformed_payload_acks_and_skips(monkeypatch): + """malformed JSON 應 ACK 避免卡 pending,且不觸碰 DB。""" fake_r = MagicMock() fake_r.xack = AsyncMock() monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", lambda: fake_r) - # Act proc = AiderEventProcessor() data = {b"payload": b"this is not json"} await proc._process_one("stream", "1-0", data) - # Assert - fake_r.xack.assert_called_once() # 壞 payload ACK 避免卡 pending + fake_r.xack.assert_called_once() @pytest.mark.asyncio -async def test_incident_failure_still_writes_db(monkeypatch): +async def test_incident_failure_still_writes_db(real_factory): """incident engine 壞掉時,event 仍要進 DB(不丟資料)。""" fake_engine = MagicMock() fake_engine.process_signal = AsyncMock(side_effect=RuntimeError("engine down")) - monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine", - lambda: fake_engine) - - inserted = {} - class FakeRepo: - def __init__(self, sess): pass - async def insert(self, **kw): inserted.update(kw); return 1 - class FakeSession: - async def __aenter__(self): return self - async def __aexit__(self, *a): return False - async def commit(self): pass - monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository", - FakeRepo) - monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory", - lambda: (lambda: FakeSession())) - fake_r = MagicMock() fake_r.xack = AsyncMock() - monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", - lambda: fake_r) - # Act proc = AiderEventProcessor() - payload = _payload_dict() - data = {b"payload": json.dumps(payload).encode()} - await proc._process_one("stream", "1-0", data) + data = {b"payload": json.dumps(_payload_dict("error")).encode()} - # Assert - assert inserted.get("type_") == "error" - assert inserted.get("incident_id") is None # engine 壞,無 id - fake_r.xack.assert_called_once() # 仍 ACK + import unittest.mock as um + with um.patch("src.workers.aider_event_processor.get_incident_engine", return_value=fake_engine), \ + um.patch("src.workers.aider_event_processor.get_worker_redis", return_value=fake_r): + await proc._process_one("stream", "1-0", data, _session_factory=real_factory) + + fake_r.xack.assert_called_once() # 仍 ACK + + async with real_factory() as sess: + repo = AiderEventRepository(sess) + count = await repo.count_by_session("test-s1") + assert count == 1 # DB 依然寫入,即使 incident engine 壞了