Files
awoooi/apps/api/tests/test_aider_event_processor.py
Your Name df72da69e2 feat(worker): AiderEventProcessor — Redis stream consumer + incident + DB write
- Implement Task A7: background worker consuming signals:aider:events stream
- Parse AiderEventIn from Redis XREADGROUP messages
- Call IncidentEngine.process_signal for incident-worthy events
- Persist aider_events to PostgreSQL with optional incident_id FK
- XACK on success, preserve in pending list on DB failure (retry)
- ACK on parse failure (bad JSON avoids pending list jam)
- Match signal_worker.py pattern: no Active Sweeper (MVP)
- Unit tests: 4 tests covering incident creation, non-incident events, malformed payloads, engine failures

Tests: 37 passed (4 new + 33 existing regression)
2026-04-20 19:40:01 +08:00

164 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# test_aider_event_processor | 2026-04-20 @ Asia/Taipei
"""Unit tests for AiderEventProcessor."""
import pytest
import json
from datetime import datetime, timezone, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
from src.workers.aider_event_processor import AiderEventProcessor
TAIPEI = timezone(timedelta(hours=8))
def _payload_dict():
"""基本的 aider event payload。"""
return {
"ts": datetime.now(TAIPEI).isoformat(),
"session_id": "s1", "host": "ogt-mac",
"type": "error",
"payload": {"cwd": "/r", "model": "elephant-alpha",
"kind": "api_rate_limit", "message": "429",
"context_50chars": ""},
}
@pytest.mark.asyncio
async def test_process_one_error_event_creates_incident_and_writes_db(monkeypatch):
"""error event 應建 incident + 寫 DB。"""
# Mock incident engine
fake_incident = MagicMock()
fake_incident.incident_id = "inc-123"
fake_engine = MagicMock()
fake_engine.process_signal = AsyncMock(return_value=fake_incident)
monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine",
lambda: fake_engine)
# Mock DB session factory + repo
inserted = {}
class FakeRepo:
def __init__(self, sess): pass
async def insert(self, **kw): inserted.update(kw); return 1
class FakeSession:
async def __aenter__(self): return self
async def __aexit__(self, *a): return False
async def commit(self): pass
monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository",
FakeRepo)
monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory",
lambda: (lambda: FakeSession()))
# Mock redis ack
fake_r = MagicMock()
fake_r.xack = AsyncMock()
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
lambda: fake_r)
# Act
proc = AiderEventProcessor()
payload = _payload_dict()
data = {b"payload": json.dumps(payload).encode()}
await proc._process_one("stream", "1-0", data)
# Assert
assert fake_engine.process_signal.called
assert inserted.get("incident_id") == "inc-123"
assert inserted.get("type_") == "error"
fake_r.xack.assert_called_once()
@pytest.mark.asyncio
async def test_process_one_session_start_no_incident(monkeypatch):
"""session_start 不應建 incident但應寫 DB。"""
fake_engine = MagicMock()
fake_engine.process_signal = AsyncMock()
monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine",
lambda: fake_engine)
inserted = {}
class FakeRepo:
def __init__(self, sess): pass
async def insert(self, **kw): inserted.update(kw); return 1
class FakeSession:
async def __aenter__(self): return self
async def __aexit__(self, *a): return False
async def commit(self): pass
monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository",
FakeRepo)
monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory",
lambda: (lambda: FakeSession()))
fake_r = MagicMock()
fake_r.xack = AsyncMock()
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
lambda: fake_r)
# Act
proc = AiderEventProcessor()
payload = _payload_dict()
payload["type"] = "session_start"
payload["payload"] = {"cwd": "/r", "model": "m", "aider_args": [],
"aider_pid": 1, "cli_version": "0"}
data = {b"payload": json.dumps(payload).encode()}
await proc._process_one("stream", "1-0", data)
# Assert
assert not fake_engine.process_signal.called # session_start 不建 incident
assert inserted.get("incident_id") is None # DB 依然寫入
assert inserted.get("type_") == "session_start"
fake_r.xack.assert_called_once()
@pytest.mark.asyncio
async def test_process_one_malformed_payload_acks_and_skips(monkeypatch, caplog):
"""malformed JSON 應 ACK 避免卡 pending但不建 DB record。"""
fake_r = MagicMock()
fake_r.xack = AsyncMock()
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
lambda: fake_r)
# Act
proc = AiderEventProcessor()
data = {b"payload": b"this is not json"}
await proc._process_one("stream", "1-0", data)
# Assert
fake_r.xack.assert_called_once() # 壞 payload ACK 避免卡 pending
@pytest.mark.asyncio
async def test_incident_failure_still_writes_db(monkeypatch):
"""incident engine 壞掉時event 仍要進 DB不丟資料"""
fake_engine = MagicMock()
fake_engine.process_signal = AsyncMock(side_effect=RuntimeError("engine down"))
monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine",
lambda: fake_engine)
inserted = {}
class FakeRepo:
def __init__(self, sess): pass
async def insert(self, **kw): inserted.update(kw); return 1
class FakeSession:
async def __aenter__(self): return self
async def __aexit__(self, *a): return False
async def commit(self): pass
monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository",
FakeRepo)
monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory",
lambda: (lambda: FakeSession()))
fake_r = MagicMock()
fake_r.xack = AsyncMock()
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
lambda: fake_r)
# Act
proc = AiderEventProcessor()
payload = _payload_dict()
data = {b"payload": json.dumps(payload).encode()}
await proc._process_one("stream", "1-0", data)
# Assert
assert inserted.get("type_") == "error"
assert inserted.get("incident_id") is None # engine 壞,無 id
fake_r.xack.assert_called_once() # 仍 ACK