Files
awoooi/apps/api/src/workers/aider_event_processor.py
Your Name 479f8d8971
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 35s
refactor(tests): 技術債清零 — 移除 FakeRepo/FakeSession Mock DB 違規
## ai_router.py
- 抽取 _aggregate_feedback_stats() 純函數,feedback_from_aider_events 呼叫它

## aider_event_processor.py
- _process_one 加 _session_factory=None DI 參數(預設 get_session_factory())
- 可注入測試 factory,不改既有生產邏輯

## test_ai_router_feedback.py(完全重寫)
- 移除 FakeRepo/FakeSession,改為直接測試 _aggregate_feedback_stats 純函數
- 新增 test_feedback_skips_missing_model 邊界條件
- DB 失敗降級行為 test 保留(只 patch get_session_factory,無 FakeRepo)

## test_aider_event_processor.py(完全重寫)
- 移除 FakeRepo/FakeSession,改用真實 PostgreSQL(real_factory fixture)
- Redis xack + IncidentEngine 保留 mock(外部 broker/AI 服務,符合例外)
- 每個測試後 rollback,不污染 dev DB

## setup_test_schema.sql
- 補入 aider_events_payload_gin GIN index(與 adr091 生產 migration 一致)

## integration/conftest.py
- 補注解說明密碼名稱 awoooi_prod_2026 的歷史混淆
- 修正 assert 邏輯:檢查 DB 名稱而非 URL 字串,避免密碼含 prod 觸發誤判

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 01:33:30 +08:00

208 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# aider_event_processor_job | 2026-04-20 @ Asia/Taipei
"""aider-watch 事件處理器
消費 Redis stream `signals:aider:events` (由 /api/v1/aider/events webhook 推入)
1. 解析 AiderEventIn
2. 若應該觸發 incident → build_signal_data → IncidentEngine.process_signal
3. 寫入 aider_events 表(含 incident_id 關聯)
4. XACK
仿 workers/signal_worker.py但簡化無 Active Sweeper
@author Claude Code
@date 2026-04-20
"""
from __future__ import annotations
import asyncio
import os
import socket
from typing import Any
import structlog
from src.core.redis_client import get_redis, get_worker_redis, init_worker_redis_pool
from src.db.base import get_session_factory
from src.models.aider import AiderEventIn
from src.repositories.aider_event_repository import AiderEventRepository
from src.services.aider_event_service import build_signal_data, should_create_incident
from src.services.incident_engine import get_incident_engine
logger = structlog.get_logger(__name__)
# =========================================================================
# Constants
# =========================================================================
STREAM_KEY_DEFAULT = "signals:aider:events"
CONSUMER_GROUP = "aider_processors"
CONSUMER_NAME = f"aider_{socket.gethostname()}"
BATCH_SIZE = 10
BLOCK_MS = 5000
GRACEFUL_SHUTDOWN_TIMEOUT_S = 30
def _stream_key() -> str:
"""取得 Redis stream key可覆蓋環境變數。"""
return os.environ.get("AIDER_EVENTS_STREAM_KEY", STREAM_KEY_DEFAULT)
# =========================================================================
# Processor
# =========================================================================
class AiderEventProcessor:
"""Redis Streams consumer for aider events."""
def __init__(self) -> None:
self._running = False
self._task: asyncio.Task | None = None
async def _ensure_consumer_group(self) -> None:
"""確保 Consumer Group 存在。"""
r = get_redis()
try:
await r.xgroup_create(_stream_key(), CONSUMER_GROUP,
id="0", mkstream=True)
logger.info("aider_consumer_group_created",
stream=_stream_key(), group=CONSUMER_GROUP)
except Exception as e:
if "BUSYGROUP" in str(e):
logger.debug("aider_consumer_group_exists", group=CONSUMER_GROUP)
else:
raise
async def start(self) -> None:
"""啟動消費循環。"""
if self._running:
logger.warning("aider_processor_already_running")
return
await init_worker_redis_pool() # 確保 worker 專屬長連線池就緒(冪等)
await self._ensure_consumer_group()
self._running = True
self._task = asyncio.create_task(self._consume_loop())
logger.info("aider_processor_started",
stream=_stream_key(), group=CONSUMER_GROUP, consumer=CONSUMER_NAME)
async def stop(self) -> None:
"""優雅關閉。"""
if not self._running:
return
self._running = False
if self._task:
try:
await asyncio.wait_for(self._task, timeout=GRACEFUL_SHUTDOWN_TIMEOUT_S)
except TimeoutError:
self._task.cancel()
except asyncio.CancelledError:
pass
logger.info("aider_processor_stopped")
async def _consume_loop(self) -> None:
"""主消費循環。"""
r = get_worker_redis()
stream_key = _stream_key()
while self._running:
try:
messages = await r.xreadgroup(
groupname=CONSUMER_GROUP, consumername=CONSUMER_NAME,
streams={stream_key: ">"},
count=BATCH_SIZE, block=BLOCK_MS,
)
if not messages:
continue
for _stream_name, entries in messages:
for message_id, data in entries:
await self._process_one(stream_key, message_id, data)
except asyncio.CancelledError:
break
except Exception as e:
logger.exception("aider_processor_loop_error", error=str(e))
await asyncio.sleep(1.0)
async def _process_one(
self, stream_key: str, msg_id: Any, data: dict, _session_factory=None
) -> None:
"""處理單筆 messageparse → (maybe) incident → DB write → ACK。
_session_factory: 可注入測試用 factory預設使用 get_session_factory()。
"""
try:
raw = data.get(b"payload") or data.get("payload")
if isinstance(raw, bytes):
raw = raw.decode()
ev = AiderEventIn.model_validate_json(raw)
except Exception:
logger.exception("aider_processor_parse_failed", msg_id=msg_id)
# Parse 失敗不重試bad payload直接 ACK 避免卡 pending
await self._ack(stream_key, msg_id)
return
incident_id: str | None = None
if should_create_incident(ev):
try:
sig_data = build_signal_data(ev)
if sig_data is not None:
engine = get_incident_engine()
incident = await engine.process_signal(sig_data)
if incident:
incident_id = getattr(incident, "incident_id", None)
except Exception:
logger.exception("aider_processor_incident_failed",
session_id=ev.session_id, type=ev.type)
# 不中斷 — 即使 incident 失敗event 仍要持久化
try:
session_factory = _session_factory or get_session_factory()
async with session_factory() as session:
repo = AiderEventRepository(session)
await repo.insert(
session_id=ev.session_id, ts=ev.ts, type_=ev.type,
host=ev.host, payload=ev.payload, incident_id=incident_id,
)
await session.commit()
except Exception:
logger.exception("aider_processor_db_write_failed",
session_id=ev.session_id)
# DB 失敗就不 ACK靠 Redis pending list 保留重試
return
await self._ack(stream_key, msg_id)
async def _ack(self, stream_key: str, msg_id: Any) -> None:
"""確認訊息已處理。"""
try:
r = get_worker_redis()
await r.xack(stream_key, CONSUMER_GROUP, msg_id)
except Exception:
logger.exception("aider_processor_ack_failed", msg_id=msg_id)
# =========================================================================
# Lifespan entry (called from main.py in Task A9)
# =========================================================================
_instance: AiderEventProcessor | None = None
def get_aider_event_processor() -> AiderEventProcessor:
"""取得 AiderEventProcessor singleton。"""
global _instance
if _instance is None:
_instance = AiderEventProcessor()
return _instance
async def run_aider_event_processor_loop() -> None:
"""main.py lifespan 呼叫此函式。"""
proc = get_aider_event_processor()
try:
await proc.start()
except Exception:
logger.exception("aider_processor_start_failed")
return
try:
while proc._running:
await asyncio.sleep(60)
except asyncio.CancelledError:
await proc.stop()
raise