feat(worker): AiderEventProcessor — Redis stream consumer + incident + DB write
- Implement Task A7: background worker consuming signals:aider:events stream - Parse AiderEventIn from Redis XREADGROUP messages - Call IncidentEngine.process_signal for incident-worthy events - Persist aider_events to PostgreSQL with optional incident_id FK - XACK on success, preserve in pending list on DB failure (retry) - ACK on parse failure (bad JSON avoids pending list jam) - Match signal_worker.py pattern: no Active Sweeper (MVP) - Unit tests: 4 tests covering incident creation, non-incident events, malformed payloads, engine failures Tests: 37 passed (4 new + 33 existing regression)
This commit is contained in:
198
apps/api/src/workers/aider_event_processor.py
Normal file
198
apps/api/src/workers/aider_event_processor.py
Normal file
@@ -0,0 +1,198 @@
|
||||
# aider_event_processor_job | 2026-04-20 @ Asia/Taipei
|
||||
"""aider-watch 事件處理器
|
||||
|
||||
消費 Redis stream `signals:aider:events` (由 /api/v1/aider/events webhook 推入):
|
||||
1. 解析 AiderEventIn
|
||||
2. 若應該觸發 incident → build_signal_data → IncidentEngine.process_signal
|
||||
3. 寫入 aider_events 表(含 incident_id 關聯)
|
||||
4. XACK
|
||||
|
||||
仿 workers/signal_worker.py,但簡化(無 Active Sweeper)。
|
||||
|
||||
@author Claude Code
|
||||
@date 2026-04-20
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import os
|
||||
import socket
|
||||
from typing import Any
|
||||
|
||||
import structlog
|
||||
|
||||
from src.core.redis_client import get_redis, get_worker_redis
|
||||
from src.db.base import get_session_factory
|
||||
from src.models.aider import AiderEventIn
|
||||
from src.repositories.aider_event_repository import AiderEventRepository
|
||||
from src.services.aider_event_service import build_signal_data, should_create_incident
|
||||
from src.services.incident_engine import get_incident_engine
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
# =========================================================================
|
||||
# Constants
|
||||
# =========================================================================
|
||||
|
||||
STREAM_KEY_DEFAULT = "signals:aider:events"
|
||||
CONSUMER_GROUP = "aider_processors"
|
||||
CONSUMER_NAME = f"aider_{socket.gethostname()}"
|
||||
BATCH_SIZE = 10
|
||||
BLOCK_MS = 5000
|
||||
GRACEFUL_SHUTDOWN_TIMEOUT_S = 30
|
||||
|
||||
|
||||
def _stream_key() -> str:
|
||||
"""取得 Redis stream key,可覆蓋環境變數。"""
|
||||
return os.environ.get("AIDER_EVENTS_STREAM_KEY", STREAM_KEY_DEFAULT)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Processor
|
||||
# =========================================================================
|
||||
|
||||
class AiderEventProcessor:
|
||||
"""Redis Streams consumer for aider events."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._running = False
|
||||
self._task: asyncio.Task | None = None
|
||||
|
||||
async def _ensure_consumer_group(self) -> None:
|
||||
"""確保 Consumer Group 存在。"""
|
||||
r = get_redis()
|
||||
try:
|
||||
await r.xgroup_create(_stream_key(), CONSUMER_GROUP,
|
||||
id="0", mkstream=True)
|
||||
logger.info("aider_consumer_group_created",
|
||||
stream=_stream_key(), group=CONSUMER_GROUP)
|
||||
except Exception as e:
|
||||
if "BUSYGROUP" in str(e):
|
||||
logger.debug("aider_consumer_group_exists", group=CONSUMER_GROUP)
|
||||
else:
|
||||
raise
|
||||
|
||||
async def start(self) -> None:
|
||||
"""啟動消費循環。"""
|
||||
if self._running:
|
||||
logger.warning("aider_processor_already_running")
|
||||
return
|
||||
await self._ensure_consumer_group()
|
||||
self._running = True
|
||||
self._task = asyncio.create_task(self._consume_loop())
|
||||
logger.info("aider_processor_started",
|
||||
stream=_stream_key(), group=CONSUMER_GROUP, consumer=CONSUMER_NAME)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""優雅關閉。"""
|
||||
if not self._running:
|
||||
return
|
||||
self._running = False
|
||||
if self._task:
|
||||
try:
|
||||
await asyncio.wait_for(self._task, timeout=GRACEFUL_SHUTDOWN_TIMEOUT_S)
|
||||
except TimeoutError:
|
||||
self._task.cancel()
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.info("aider_processor_stopped")
|
||||
|
||||
async def _consume_loop(self) -> None:
|
||||
"""主消費循環。"""
|
||||
r = get_worker_redis()
|
||||
stream_key = _stream_key()
|
||||
while self._running:
|
||||
try:
|
||||
messages = await r.xreadgroup(
|
||||
groupname=CONSUMER_GROUP, consumername=CONSUMER_NAME,
|
||||
streams={stream_key: ">"},
|
||||
count=BATCH_SIZE, block=BLOCK_MS,
|
||||
)
|
||||
if not messages:
|
||||
continue
|
||||
for _stream_name, entries in messages:
|
||||
for message_id, data in entries:
|
||||
await self._process_one(stream_key, message_id, data)
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.exception("aider_processor_loop_error", error=str(e))
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
async def _process_one(self, stream_key: str, msg_id: Any, data: dict) -> None:
|
||||
"""處理單筆 message:parse → (maybe) incident → DB write → ACK。"""
|
||||
try:
|
||||
raw = data.get(b"payload") or data.get("payload")
|
||||
if isinstance(raw, bytes):
|
||||
raw = raw.decode()
|
||||
ev = AiderEventIn.model_validate_json(raw)
|
||||
except Exception:
|
||||
logger.exception("aider_processor_parse_failed", msg_id=msg_id)
|
||||
# Parse 失敗不重試(bad payload),直接 ACK 避免卡 pending
|
||||
await self._ack(stream_key, msg_id)
|
||||
return
|
||||
|
||||
incident_id: str | None = None
|
||||
if should_create_incident(ev):
|
||||
try:
|
||||
sig_data = build_signal_data(ev)
|
||||
if sig_data is not None:
|
||||
engine = get_incident_engine()
|
||||
incident = await engine.process_signal(sig_data)
|
||||
if incident:
|
||||
incident_id = getattr(incident, "incident_id", None)
|
||||
except Exception:
|
||||
logger.exception("aider_processor_incident_failed",
|
||||
session_id=ev.session_id, type=ev.type)
|
||||
# 不中斷 — 即使 incident 失敗,event 仍要持久化
|
||||
|
||||
try:
|
||||
session_factory = get_session_factory()
|
||||
async with session_factory() as session:
|
||||
repo = AiderEventRepository(session)
|
||||
await repo.insert(
|
||||
session_id=ev.session_id, ts=ev.ts, type_=ev.type,
|
||||
host=ev.host, payload=ev.payload, incident_id=incident_id,
|
||||
)
|
||||
await session.commit()
|
||||
except Exception:
|
||||
logger.exception("aider_processor_db_write_failed",
|
||||
session_id=ev.session_id)
|
||||
# DB 失敗就不 ACK,靠 Redis pending list 保留重試
|
||||
return
|
||||
|
||||
await self._ack(stream_key, msg_id)
|
||||
|
||||
async def _ack(self, stream_key: str, msg_id: Any) -> None:
|
||||
"""確認訊息已處理。"""
|
||||
try:
|
||||
r = get_worker_redis()
|
||||
await r.xack(stream_key, CONSUMER_GROUP, msg_id)
|
||||
except Exception:
|
||||
logger.exception("aider_processor_ack_failed", msg_id=msg_id)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Lifespan entry (called from main.py in Task A9)
|
||||
# =========================================================================
|
||||
|
||||
_instance: AiderEventProcessor | None = None
|
||||
|
||||
|
||||
def get_aider_event_processor() -> AiderEventProcessor:
|
||||
"""取得 AiderEventProcessor singleton。"""
|
||||
global _instance
|
||||
if _instance is None:
|
||||
_instance = AiderEventProcessor()
|
||||
return _instance
|
||||
|
||||
|
||||
async def run_aider_event_processor_loop() -> None:
|
||||
"""main.py lifespan 呼叫此函式。"""
|
||||
proc = get_aider_event_processor()
|
||||
await proc.start()
|
||||
try:
|
||||
while proc._running:
|
||||
await asyncio.sleep(60)
|
||||
except asyncio.CancelledError:
|
||||
await proc.stop()
|
||||
raise
|
||||
163
apps/api/tests/test_aider_event_processor.py
Normal file
163
apps/api/tests/test_aider_event_processor.py
Normal file
@@ -0,0 +1,163 @@
|
||||
# test_aider_event_processor | 2026-04-20 @ Asia/Taipei
|
||||
"""Unit tests for AiderEventProcessor."""
|
||||
import pytest
|
||||
import json
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from src.workers.aider_event_processor import AiderEventProcessor
|
||||
|
||||
|
||||
TAIPEI = timezone(timedelta(hours=8))
|
||||
|
||||
|
||||
def _payload_dict():
|
||||
"""基本的 aider event payload。"""
|
||||
return {
|
||||
"ts": datetime.now(TAIPEI).isoformat(),
|
||||
"session_id": "s1", "host": "ogt-mac",
|
||||
"type": "error",
|
||||
"payload": {"cwd": "/r", "model": "elephant-alpha",
|
||||
"kind": "api_rate_limit", "message": "429",
|
||||
"context_50chars": ""},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_one_error_event_creates_incident_and_writes_db(monkeypatch):
|
||||
"""error event 應建 incident + 寫 DB。"""
|
||||
# Mock incident engine
|
||||
fake_incident = MagicMock()
|
||||
fake_incident.incident_id = "inc-123"
|
||||
fake_engine = MagicMock()
|
||||
fake_engine.process_signal = AsyncMock(return_value=fake_incident)
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine",
|
||||
lambda: fake_engine)
|
||||
|
||||
# Mock DB session factory + repo
|
||||
inserted = {}
|
||||
class FakeRepo:
|
||||
def __init__(self, sess): pass
|
||||
async def insert(self, **kw): inserted.update(kw); return 1
|
||||
class FakeSession:
|
||||
async def __aenter__(self): return self
|
||||
async def __aexit__(self, *a): return False
|
||||
async def commit(self): pass
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository",
|
||||
FakeRepo)
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory",
|
||||
lambda: (lambda: FakeSession()))
|
||||
|
||||
# Mock redis ack
|
||||
fake_r = MagicMock()
|
||||
fake_r.xack = AsyncMock()
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
|
||||
lambda: fake_r)
|
||||
|
||||
# Act
|
||||
proc = AiderEventProcessor()
|
||||
payload = _payload_dict()
|
||||
data = {b"payload": json.dumps(payload).encode()}
|
||||
await proc._process_one("stream", "1-0", data)
|
||||
|
||||
# Assert
|
||||
assert fake_engine.process_signal.called
|
||||
assert inserted.get("incident_id") == "inc-123"
|
||||
assert inserted.get("type_") == "error"
|
||||
fake_r.xack.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_one_session_start_no_incident(monkeypatch):
|
||||
"""session_start 不應建 incident,但應寫 DB。"""
|
||||
fake_engine = MagicMock()
|
||||
fake_engine.process_signal = AsyncMock()
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine",
|
||||
lambda: fake_engine)
|
||||
|
||||
inserted = {}
|
||||
class FakeRepo:
|
||||
def __init__(self, sess): pass
|
||||
async def insert(self, **kw): inserted.update(kw); return 1
|
||||
class FakeSession:
|
||||
async def __aenter__(self): return self
|
||||
async def __aexit__(self, *a): return False
|
||||
async def commit(self): pass
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository",
|
||||
FakeRepo)
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory",
|
||||
lambda: (lambda: FakeSession()))
|
||||
|
||||
fake_r = MagicMock()
|
||||
fake_r.xack = AsyncMock()
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
|
||||
lambda: fake_r)
|
||||
|
||||
# Act
|
||||
proc = AiderEventProcessor()
|
||||
payload = _payload_dict()
|
||||
payload["type"] = "session_start"
|
||||
payload["payload"] = {"cwd": "/r", "model": "m", "aider_args": [],
|
||||
"aider_pid": 1, "cli_version": "0"}
|
||||
data = {b"payload": json.dumps(payload).encode()}
|
||||
await proc._process_one("stream", "1-0", data)
|
||||
|
||||
# Assert
|
||||
assert not fake_engine.process_signal.called # session_start 不建 incident
|
||||
assert inserted.get("incident_id") is None # DB 依然寫入
|
||||
assert inserted.get("type_") == "session_start"
|
||||
fake_r.xack.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_one_malformed_payload_acks_and_skips(monkeypatch, caplog):
|
||||
"""malformed JSON 應 ACK 避免卡 pending,但不建 DB record。"""
|
||||
fake_r = MagicMock()
|
||||
fake_r.xack = AsyncMock()
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
|
||||
lambda: fake_r)
|
||||
|
||||
# Act
|
||||
proc = AiderEventProcessor()
|
||||
data = {b"payload": b"this is not json"}
|
||||
await proc._process_one("stream", "1-0", data)
|
||||
|
||||
# Assert
|
||||
fake_r.xack.assert_called_once() # 壞 payload ACK 避免卡 pending
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_incident_failure_still_writes_db(monkeypatch):
|
||||
"""incident engine 壞掉時,event 仍要進 DB(不丟資料)。"""
|
||||
fake_engine = MagicMock()
|
||||
fake_engine.process_signal = AsyncMock(side_effect=RuntimeError("engine down"))
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine",
|
||||
lambda: fake_engine)
|
||||
|
||||
inserted = {}
|
||||
class FakeRepo:
|
||||
def __init__(self, sess): pass
|
||||
async def insert(self, **kw): inserted.update(kw); return 1
|
||||
class FakeSession:
|
||||
async def __aenter__(self): return self
|
||||
async def __aexit__(self, *a): return False
|
||||
async def commit(self): pass
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository",
|
||||
FakeRepo)
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory",
|
||||
lambda: (lambda: FakeSession()))
|
||||
|
||||
fake_r = MagicMock()
|
||||
fake_r.xack = AsyncMock()
|
||||
monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis",
|
||||
lambda: fake_r)
|
||||
|
||||
# Act
|
||||
proc = AiderEventProcessor()
|
||||
payload = _payload_dict()
|
||||
data = {b"payload": json.dumps(payload).encode()}
|
||||
await proc._process_one("stream", "1-0", data)
|
||||
|
||||
# Assert
|
||||
assert inserted.get("type_") == "error"
|
||||
assert inserted.get("incident_id") is None # engine 壞,無 id
|
||||
fake_r.xack.assert_called_once() # 仍 ACK
|
||||
Reference in New Issue
Block a user