diff --git a/apps/api/src/workers/aider_event_processor.py b/apps/api/src/workers/aider_event_processor.py new file mode 100644 index 00000000..dd828f3d --- /dev/null +++ b/apps/api/src/workers/aider_event_processor.py @@ -0,0 +1,198 @@ +# aider_event_processor_job | 2026-04-20 @ Asia/Taipei +"""aider-watch 事件處理器 + +消費 Redis stream `signals:aider:events` (由 /api/v1/aider/events webhook 推入): +1. 解析 AiderEventIn +2. 若應該觸發 incident → build_signal_data → IncidentEngine.process_signal +3. 寫入 aider_events 表(含 incident_id 關聯) +4. XACK + +仿 workers/signal_worker.py,但簡化(無 Active Sweeper)。 + +@author Claude Code +@date 2026-04-20 +""" +from __future__ import annotations +import asyncio +import os +import socket +from typing import Any + +import structlog + +from src.core.redis_client import get_redis, get_worker_redis +from src.db.base import get_session_factory +from src.models.aider import AiderEventIn +from src.repositories.aider_event_repository import AiderEventRepository +from src.services.aider_event_service import build_signal_data, should_create_incident +from src.services.incident_engine import get_incident_engine + +logger = structlog.get_logger(__name__) + +# ========================================================================= +# Constants +# ========================================================================= + +STREAM_KEY_DEFAULT = "signals:aider:events" +CONSUMER_GROUP = "aider_processors" +CONSUMER_NAME = f"aider_{socket.gethostname()}" +BATCH_SIZE = 10 +BLOCK_MS = 5000 +GRACEFUL_SHUTDOWN_TIMEOUT_S = 30 + + +def _stream_key() -> str: + """取得 Redis stream key,可覆蓋環境變數。""" + return os.environ.get("AIDER_EVENTS_STREAM_KEY", STREAM_KEY_DEFAULT) + + +# ========================================================================= +# Processor +# ========================================================================= + +class AiderEventProcessor: + """Redis Streams consumer for aider events.""" + + def __init__(self) -> None: + self._running = False + self._task: asyncio.Task | None = None + + async def _ensure_consumer_group(self) -> None: + """確保 Consumer Group 存在。""" + r = get_redis() + try: + await r.xgroup_create(_stream_key(), CONSUMER_GROUP, + id="0", mkstream=True) + logger.info("aider_consumer_group_created", + stream=_stream_key(), group=CONSUMER_GROUP) + except Exception as e: + if "BUSYGROUP" in str(e): + logger.debug("aider_consumer_group_exists", group=CONSUMER_GROUP) + else: + raise + + async def start(self) -> None: + """啟動消費循環。""" + if self._running: + logger.warning("aider_processor_already_running") + return + await self._ensure_consumer_group() + self._running = True + self._task = asyncio.create_task(self._consume_loop()) + logger.info("aider_processor_started", + stream=_stream_key(), group=CONSUMER_GROUP, consumer=CONSUMER_NAME) + + async def stop(self) -> None: + """優雅關閉。""" + if not self._running: + return + self._running = False + if self._task: + try: + await asyncio.wait_for(self._task, timeout=GRACEFUL_SHUTDOWN_TIMEOUT_S) + except TimeoutError: + self._task.cancel() + except asyncio.CancelledError: + pass + logger.info("aider_processor_stopped") + + async def _consume_loop(self) -> None: + """主消費循環。""" + r = get_worker_redis() + stream_key = _stream_key() + while self._running: + try: + messages = await r.xreadgroup( + groupname=CONSUMER_GROUP, consumername=CONSUMER_NAME, + streams={stream_key: ">"}, + count=BATCH_SIZE, block=BLOCK_MS, + ) + if not messages: + continue + for _stream_name, entries in messages: + for message_id, data in entries: + await self._process_one(stream_key, message_id, data) + except asyncio.CancelledError: + break + except Exception as e: + logger.exception("aider_processor_loop_error", error=str(e)) + await asyncio.sleep(1.0) + + async def _process_one(self, stream_key: str, msg_id: Any, data: dict) -> None: + """處理單筆 message:parse → (maybe) incident → DB write → ACK。""" + try: + raw = data.get(b"payload") or data.get("payload") + if isinstance(raw, bytes): + raw = raw.decode() + ev = AiderEventIn.model_validate_json(raw) + except Exception: + logger.exception("aider_processor_parse_failed", msg_id=msg_id) + # Parse 失敗不重試(bad payload),直接 ACK 避免卡 pending + await self._ack(stream_key, msg_id) + return + + incident_id: str | None = None + if should_create_incident(ev): + try: + sig_data = build_signal_data(ev) + if sig_data is not None: + engine = get_incident_engine() + incident = await engine.process_signal(sig_data) + if incident: + incident_id = getattr(incident, "incident_id", None) + except Exception: + logger.exception("aider_processor_incident_failed", + session_id=ev.session_id, type=ev.type) + # 不中斷 — 即使 incident 失敗,event 仍要持久化 + + try: + session_factory = get_session_factory() + async with session_factory() as session: + repo = AiderEventRepository(session) + await repo.insert( + session_id=ev.session_id, ts=ev.ts, type_=ev.type, + host=ev.host, payload=ev.payload, incident_id=incident_id, + ) + await session.commit() + except Exception: + logger.exception("aider_processor_db_write_failed", + session_id=ev.session_id) + # DB 失敗就不 ACK,靠 Redis pending list 保留重試 + return + + await self._ack(stream_key, msg_id) + + async def _ack(self, stream_key: str, msg_id: Any) -> None: + """確認訊息已處理。""" + try: + r = get_worker_redis() + await r.xack(stream_key, CONSUMER_GROUP, msg_id) + except Exception: + logger.exception("aider_processor_ack_failed", msg_id=msg_id) + + +# ========================================================================= +# Lifespan entry (called from main.py in Task A9) +# ========================================================================= + +_instance: AiderEventProcessor | None = None + + +def get_aider_event_processor() -> AiderEventProcessor: + """取得 AiderEventProcessor singleton。""" + global _instance + if _instance is None: + _instance = AiderEventProcessor() + return _instance + + +async def run_aider_event_processor_loop() -> None: + """main.py lifespan 呼叫此函式。""" + proc = get_aider_event_processor() + await proc.start() + try: + while proc._running: + await asyncio.sleep(60) + except asyncio.CancelledError: + await proc.stop() + raise diff --git a/apps/api/tests/test_aider_event_processor.py b/apps/api/tests/test_aider_event_processor.py new file mode 100644 index 00000000..675e65cc --- /dev/null +++ b/apps/api/tests/test_aider_event_processor.py @@ -0,0 +1,163 @@ +# test_aider_event_processor | 2026-04-20 @ Asia/Taipei +"""Unit tests for AiderEventProcessor.""" +import pytest +import json +from datetime import datetime, timezone, timedelta +from unittest.mock import AsyncMock, MagicMock, patch +from src.workers.aider_event_processor import AiderEventProcessor + + +TAIPEI = timezone(timedelta(hours=8)) + + +def _payload_dict(): + """基本的 aider event payload。""" + return { + "ts": datetime.now(TAIPEI).isoformat(), + "session_id": "s1", "host": "ogt-mac", + "type": "error", + "payload": {"cwd": "/r", "model": "elephant-alpha", + "kind": "api_rate_limit", "message": "429", + "context_50chars": ""}, + } + + +@pytest.mark.asyncio +async def test_process_one_error_event_creates_incident_and_writes_db(monkeypatch): + """error event 應建 incident + 寫 DB。""" + # Mock incident engine + fake_incident = MagicMock() + fake_incident.incident_id = "inc-123" + fake_engine = MagicMock() + fake_engine.process_signal = AsyncMock(return_value=fake_incident) + monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine", + lambda: fake_engine) + + # Mock DB session factory + repo + inserted = {} + class FakeRepo: + def __init__(self, sess): pass + async def insert(self, **kw): inserted.update(kw); return 1 + class FakeSession: + async def __aenter__(self): return self + async def __aexit__(self, *a): return False + async def commit(self): pass + monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository", + FakeRepo) + monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory", + lambda: (lambda: FakeSession())) + + # Mock redis ack + fake_r = MagicMock() + fake_r.xack = AsyncMock() + monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", + lambda: fake_r) + + # Act + proc = AiderEventProcessor() + payload = _payload_dict() + data = {b"payload": json.dumps(payload).encode()} + await proc._process_one("stream", "1-0", data) + + # Assert + assert fake_engine.process_signal.called + assert inserted.get("incident_id") == "inc-123" + assert inserted.get("type_") == "error" + fake_r.xack.assert_called_once() + + +@pytest.mark.asyncio +async def test_process_one_session_start_no_incident(monkeypatch): + """session_start 不應建 incident,但應寫 DB。""" + fake_engine = MagicMock() + fake_engine.process_signal = AsyncMock() + monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine", + lambda: fake_engine) + + inserted = {} + class FakeRepo: + def __init__(self, sess): pass + async def insert(self, **kw): inserted.update(kw); return 1 + class FakeSession: + async def __aenter__(self): return self + async def __aexit__(self, *a): return False + async def commit(self): pass + monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository", + FakeRepo) + monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory", + lambda: (lambda: FakeSession())) + + fake_r = MagicMock() + fake_r.xack = AsyncMock() + monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", + lambda: fake_r) + + # Act + proc = AiderEventProcessor() + payload = _payload_dict() + payload["type"] = "session_start" + payload["payload"] = {"cwd": "/r", "model": "m", "aider_args": [], + "aider_pid": 1, "cli_version": "0"} + data = {b"payload": json.dumps(payload).encode()} + await proc._process_one("stream", "1-0", data) + + # Assert + assert not fake_engine.process_signal.called # session_start 不建 incident + assert inserted.get("incident_id") is None # DB 依然寫入 + assert inserted.get("type_") == "session_start" + fake_r.xack.assert_called_once() + + +@pytest.mark.asyncio +async def test_process_one_malformed_payload_acks_and_skips(monkeypatch, caplog): + """malformed JSON 應 ACK 避免卡 pending,但不建 DB record。""" + fake_r = MagicMock() + fake_r.xack = AsyncMock() + monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", + lambda: fake_r) + + # Act + proc = AiderEventProcessor() + data = {b"payload": b"this is not json"} + await proc._process_one("stream", "1-0", data) + + # Assert + fake_r.xack.assert_called_once() # 壞 payload ACK 避免卡 pending + + +@pytest.mark.asyncio +async def test_incident_failure_still_writes_db(monkeypatch): + """incident engine 壞掉時,event 仍要進 DB(不丟資料)。""" + fake_engine = MagicMock() + fake_engine.process_signal = AsyncMock(side_effect=RuntimeError("engine down")) + monkeypatch.setattr("src.workers.aider_event_processor.get_incident_engine", + lambda: fake_engine) + + inserted = {} + class FakeRepo: + def __init__(self, sess): pass + async def insert(self, **kw): inserted.update(kw); return 1 + class FakeSession: + async def __aenter__(self): return self + async def __aexit__(self, *a): return False + async def commit(self): pass + monkeypatch.setattr("src.workers.aider_event_processor.AiderEventRepository", + FakeRepo) + monkeypatch.setattr("src.workers.aider_event_processor.get_session_factory", + lambda: (lambda: FakeSession())) + + fake_r = MagicMock() + fake_r.xack = AsyncMock() + monkeypatch.setattr("src.workers.aider_event_processor.get_worker_redis", + lambda: fake_r) + + # Act + proc = AiderEventProcessor() + payload = _payload_dict() + data = {b"payload": json.dumps(payload).encode()} + await proc._process_one("stream", "1-0", data) + + # Assert + assert inserted.get("type_") == "error" + assert inserted.get("incident_id") is None # engine 壞,無 id + fake_r.xack.assert_called_once() # 仍 ACK