- Implement Task A7: background worker consuming signals:aider:events stream - Parse AiderEventIn from Redis XREADGROUP messages - Call IncidentEngine.process_signal for incident-worthy events - Persist aider_events to PostgreSQL with optional incident_id FK - XACK on success, preserve in pending list on DB failure (retry) - ACK on parse failure (bad JSON avoids pending list jam) - Match signal_worker.py pattern: no Active Sweeper (MVP) - Unit tests: 4 tests covering incident creation, non-incident events, malformed payloads, engine failures Tests: 37 passed (4 new + 33 existing regression)
164 lines
6.0 KiB
Python
164 lines
6.0 KiB
Python
# test_aider_event_processor | 2026-04-20 @ Asia/Taipei
|
||
"""Unit tests for AiderEventProcessor."""
|
||
import pytest
|
||
import json
|
||
from datetime import datetime, timezone, timedelta
|
||
from unittest.mock import AsyncMock, MagicMock, patch
|
||
from src.workers.aider_event_processor import AiderEventProcessor
|
||
|
||
|
||
TAIPEI = timezone(timedelta(hours=8))
|
||
|
||
|
||
def _payload_dict():
|
||
"""基本的 aider event payload。"""
|
||
return {
|
||
"ts": datetime.now(TAIPEI).isoformat(),
|
||
"session_id": "s1", "host": "ogt-mac",
|
||
"type": "error",
|
||
"payload": {"cwd": "/r", "model": "elephant-alpha",
|
||
"kind": "api_rate_limit", "message": "429",
|
||
"context_50chars": ""},
|
||
}
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_process_one_error_event_creates_incident_and_writes_db(monkeypatch):
|
||
"""error event 應建 incident + 寫 DB。"""
|
||
# Mock incident engine
|
||
fake_incident = MagicMock()
|
||
fake_incident.incident_id = "inc-123"
|
||
fake_engine = MagicMock()
|
||
fake_engine.process_signal = AsyncMock(return_value=fake_incident)
|
||
monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine",
|
||
lambda: fake_engine)
|
||
|
||
# Mock DB session factory + repo
|
||
inserted = {}
|
||
class FakeRepo:
|
||
def __init__(self, sess): pass
|
||
async def insert(self, **kw): inserted.update(kw); return 1
|
||
class FakeSession:
|
||
async def __aenter__(self): return self
|
||
async def __aexit__(self, *a): return False
|
||
async def commit(self): pass
|
||
monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository",
|
||
FakeRepo)
|
||
monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory",
|
||
lambda: (lambda: FakeSession()))
|
||
|
||
# Mock redis ack
|
||
fake_r = MagicMock()
|
||
fake_r.xack = AsyncMock()
|
||
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
|
||
lambda: fake_r)
|
||
|
||
# Act
|
||
proc = AiderEventProcessor()
|
||
payload = _payload_dict()
|
||
data = {b"payload": json.dumps(payload).encode()}
|
||
await proc._process_one("stream", "1-0", data)
|
||
|
||
# Assert
|
||
assert fake_engine.process_signal.called
|
||
assert inserted.get("incident_id") == "inc-123"
|
||
assert inserted.get("type_") == "error"
|
||
fake_r.xack.assert_called_once()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_process_one_session_start_no_incident(monkeypatch):
|
||
"""session_start 不應建 incident,但應寫 DB。"""
|
||
fake_engine = MagicMock()
|
||
fake_engine.process_signal = AsyncMock()
|
||
monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine",
|
||
lambda: fake_engine)
|
||
|
||
inserted = {}
|
||
class FakeRepo:
|
||
def __init__(self, sess): pass
|
||
async def insert(self, **kw): inserted.update(kw); return 1
|
||
class FakeSession:
|
||
async def __aenter__(self): return self
|
||
async def __aexit__(self, *a): return False
|
||
async def commit(self): pass
|
||
monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository",
|
||
FakeRepo)
|
||
monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory",
|
||
lambda: (lambda: FakeSession()))
|
||
|
||
fake_r = MagicMock()
|
||
fake_r.xack = AsyncMock()
|
||
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
|
||
lambda: fake_r)
|
||
|
||
# Act
|
||
proc = AiderEventProcessor()
|
||
payload = _payload_dict()
|
||
payload["type"] = "session_start"
|
||
payload["payload"] = {"cwd": "/r", "model": "m", "aider_args": [],
|
||
"aider_pid": 1, "cli_version": "0"}
|
||
data = {b"payload": json.dumps(payload).encode()}
|
||
await proc._process_one("stream", "1-0", data)
|
||
|
||
# Assert
|
||
assert not fake_engine.process_signal.called # session_start 不建 incident
|
||
assert inserted.get("incident_id") is None # DB 依然寫入
|
||
assert inserted.get("type_") == "session_start"
|
||
fake_r.xack.assert_called_once()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_process_one_malformed_payload_acks_and_skips(monkeypatch, caplog):
|
||
"""malformed JSON 應 ACK 避免卡 pending,但不建 DB record。"""
|
||
fake_r = MagicMock()
|
||
fake_r.xack = AsyncMock()
|
||
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
|
||
lambda: fake_r)
|
||
|
||
# Act
|
||
proc = AiderEventProcessor()
|
||
data = {b"payload": b"this is not json"}
|
||
await proc._process_one("stream", "1-0", data)
|
||
|
||
# Assert
|
||
fake_r.xack.assert_called_once() # 壞 payload ACK 避免卡 pending
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_incident_failure_still_writes_db(monkeypatch):
|
||
"""incident engine 壞掉時,event 仍要進 DB(不丟資料)。"""
|
||
fake_engine = MagicMock()
|
||
fake_engine.process_signal = AsyncMock(side_effect=RuntimeError("engine down"))
|
||
monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine",
|
||
lambda: fake_engine)
|
||
|
||
inserted = {}
|
||
class FakeRepo:
|
||
def __init__(self, sess): pass
|
||
async def insert(self, **kw): inserted.update(kw); return 1
|
||
class FakeSession:
|
||
async def __aenter__(self): return self
|
||
async def __aexit__(self, *a): return False
|
||
async def commit(self): pass
|
||
monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository",
|
||
FakeRepo)
|
||
monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory",
|
||
lambda: (lambda: FakeSession()))
|
||
|
||
fake_r = MagicMock()
|
||
fake_r.xack = AsyncMock()
|
||
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
|
||
lambda: fake_r)
|
||
|
||
# Act
|
||
proc = AiderEventProcessor()
|
||
payload = _payload_dict()
|
||
data = {b"payload": json.dumps(payload).encode()}
|
||
await proc._process_one("stream", "1-0", data)
|
||
|
||
# Assert
|
||
assert inserted.get("type_") == "error"
|
||
assert inserted.get("incident_id") is None # engine 壞,無 id
|
||
fake_r.xack.assert_called_once() # 仍 ACK
|