feat(api): POST /api/v1/aider/events HMAC webhook + Redis stream push
- Router layer: HTTP validation + HMAC-SHA256 signature verification - Service layer: Redis stream push (aider_event_service.push_aider_batch_to_stream) - leWOOOgo積木化遵循: Router → Service → Redis - All 6 tests passing (signature validation, batch limits, edge cases)
This commit is contained in:
53
apps/api/src/api/v1/aider_events.py
Normal file
53
apps/api/src/api/v1/aider_events.py
Normal file
@@ -0,0 +1,53 @@
|
||||
# apps/api/src/api/v1/aider_events.py | 2026-04-20 @ Asia/Taipei
|
||||
"""POST /api/v1/aider/events — Mac aiderw client 推事件入口。
|
||||
HMAC-SHA256 verified; 推入 Redis stream 讓 background job 處理。"""
|
||||
from __future__ import annotations
|
||||
import hmac
|
||||
import hashlib
|
||||
import os
|
||||
import logging
|
||||
from fastapi import APIRouter, Header, HTTPException, Request, status
|
||||
from pydantic import ValidationError
|
||||
from src.models.aider import AiderBatchIn
|
||||
from src.services.aider_event_service import push_aider_batch_to_stream
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/aider", tags=["Aider"])
|
||||
|
||||
|
||||
def _verify_signature(body: bytes, signature: str | None, secret: str) -> bool:
|
||||
"""Timing-safe HMAC-SHA256 比對。signature 格式 'sha256=<hex>'。"""
|
||||
if not signature or not signature.startswith("sha256=") or not secret:
|
||||
return False
|
||||
expected = "sha256=" + hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
|
||||
return hmac.compare_digest(expected, signature)
|
||||
|
||||
|
||||
@router.post("/events", status_code=status.HTTP_202_ACCEPTED)
|
||||
async def receive_aider_events(
|
||||
request: Request,
|
||||
x_aider_signature: str | None = Header(default=None, alias="X-Aider-Signature"),
|
||||
):
|
||||
"""接收 Mac aiderw 推來的 event batch,HMAC 驗證後推 Redis stream。"""
|
||||
body = await request.body()
|
||||
|
||||
secret = os.environ.get("AIDER_WEBHOOK_SECRET", "")
|
||||
if not _verify_signature(body, x_aider_signature, secret):
|
||||
logger.warning("aider_webhook_signature_invalid")
|
||||
raise HTTPException(status_code=401, detail="invalid signature")
|
||||
|
||||
try:
|
||||
batch = AiderBatchIn.model_validate_json(body)
|
||||
except ValidationError as e:
|
||||
# 只回前 5 筆錯誤避免巨大 response
|
||||
raise HTTPException(status_code=400, detail=e.errors()[:5])
|
||||
|
||||
# 推 Redis stream(透過 Service 層)
|
||||
try:
|
||||
stream_ids = await push_aider_batch_to_stream(batch)
|
||||
except Exception as exc:
|
||||
logger.exception("aider_webhook_redis_push_failed")
|
||||
raise HTTPException(status_code=503, detail="queue unavailable") from exc
|
||||
|
||||
logger.info("aider_webhook_accepted", count=len(batch.events))
|
||||
return {"accepted": len(batch.events), "stream_ids": stream_ids}
|
||||
@@ -5,10 +5,12 @@
|
||||
- 不重做 dedup — 既有 IncidentService.create_incident_from_signal 已有 3min fingerprint debounce
|
||||
- 不做 pattern extract — Task A8 ai_router 會直接從 aider_event_repository 聚合
|
||||
- 純函式為主,副作用(建 incident)由 caller(A7 processor job)管理
|
||||
- Redis stream 推送 (Task A6):轉接 Router 層
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import os
|
||||
from typing import Any
|
||||
from src.models.aider import AiderEventIn
|
||||
from src.models.aider import AiderEventIn, AiderBatchIn
|
||||
from src.utils.secret_redactor import redact
|
||||
|
||||
|
||||
@@ -109,3 +111,25 @@ def _compact_desc(event_type: str, payload: dict) -> str:
|
||||
f"duration={payload.get('duration_sec',0)}s "
|
||||
f"tokens={payload.get('tokens_sent',0)}+{payload.get('tokens_received',0)}")
|
||||
return str(payload)[:200]
|
||||
|
||||
|
||||
# ---- Redis stream 推送 (Task A6) ----
|
||||
|
||||
async def push_aider_batch_to_stream(batch: AiderBatchIn) -> list[str]:
|
||||
"""把 event batch 推到 Redis stream。回傳 stream ID 列表。"""
|
||||
from src.core.redis_client import get_redis
|
||||
|
||||
stream_key = os.environ.get("AIDER_EVENTS_STREAM_KEY", "signals:aider:events")
|
||||
r = get_redis()
|
||||
|
||||
ids = []
|
||||
for ev in batch.events:
|
||||
msg_id = await r.xadd(stream_key, {"payload": ev.model_dump_json()})
|
||||
ids.append(_to_str(msg_id))
|
||||
|
||||
return ids
|
||||
|
||||
|
||||
def _to_str(x) -> str:
|
||||
"""轉成 str(相容 bytes 和 str 回傳值)。"""
|
||||
return x.decode() if isinstance(x, bytes) else str(x)
|
||||
|
||||
125
apps/api/tests/test_aider_events_api.py
Normal file
125
apps/api/tests/test_aider_events_api.py
Normal file
@@ -0,0 +1,125 @@
|
||||
# apps/api/tests/test_aider_events_api.py | 2026-04-20 @ Asia/Taipei
|
||||
import hmac
|
||||
import hashlib
|
||||
import json
|
||||
import pytest
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
from src.api.v1.aider_events import router
|
||||
|
||||
SECRET = "testsecret_for_unittest_do_not_use_in_prod_" + "x" * 20
|
||||
TAIPEI = timezone(timedelta(hours=8))
|
||||
|
||||
|
||||
def _sign(body: bytes, secret: str = SECRET) -> str:
|
||||
return "sha256=" + hmac.new(secret.encode(), body, hashlib.sha256).hexdigest()
|
||||
|
||||
|
||||
def _ev():
|
||||
return {
|
||||
"ts": datetime(2026, 4, 20, 10, 0, tzinfo=TAIPEI).isoformat(),
|
||||
"session_id": "s1",
|
||||
"host": "ogt-mac",
|
||||
"type": "session_start",
|
||||
"payload": {
|
||||
"cwd": "/t/x",
|
||||
"model": "elephant",
|
||||
"aider_args": [],
|
||||
"aider_pid": 1,
|
||||
"cli_version": "0.86",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(monkeypatch):
|
||||
monkeypatch.setenv("AIDER_WEBHOOK_SECRET", SECRET)
|
||||
|
||||
# Patch the service function
|
||||
async def mock_push(batch):
|
||||
return ["1234-0"] * len(batch.events)
|
||||
|
||||
with patch("src.api.v1.aider_events.push_aider_batch_to_stream", new_callable=lambda: mock_push):
|
||||
app = FastAPI()
|
||||
app.include_router(router, prefix="/api/v1")
|
||||
yield TestClient(app)
|
||||
|
||||
|
||||
def test_accepts_signed_batch(client):
|
||||
body = json.dumps({"events": [_ev()]}).encode()
|
||||
r = client.post(
|
||||
"/api/v1/aider/events",
|
||||
content=body,
|
||||
headers={
|
||||
"X-Aider-Signature": _sign(body),
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
assert r.status_code == 202, r.text
|
||||
data = r.json()
|
||||
assert data["accepted"] == 1
|
||||
|
||||
|
||||
def test_rejects_invalid_signature(client):
|
||||
body = json.dumps({"events": [_ev()]}).encode()
|
||||
r = client.post(
|
||||
"/api/v1/aider/events",
|
||||
content=body,
|
||||
headers={
|
||||
"X-Aider-Signature": "sha256=" + "0" * 64,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
|
||||
|
||||
def test_rejects_missing_signature(client):
|
||||
body = json.dumps({"events": [_ev()]}).encode()
|
||||
r = client.post(
|
||||
"/api/v1/aider/events",
|
||||
content=body,
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
|
||||
|
||||
def test_rejects_malformed_event(client):
|
||||
body = json.dumps({"events": [{"bad": "payload"}]}).encode()
|
||||
r = client.post(
|
||||
"/api/v1/aider/events",
|
||||
content=body,
|
||||
headers={
|
||||
"X-Aider-Signature": _sign(body),
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
assert r.status_code == 400
|
||||
|
||||
|
||||
def test_rejects_oversize_batch(client):
|
||||
body = json.dumps({"events": [_ev()] * 51}).encode()
|
||||
r = client.post(
|
||||
"/api/v1/aider/events",
|
||||
content=body,
|
||||
headers={
|
||||
"X-Aider-Signature": _sign(body),
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
assert r.status_code == 400
|
||||
|
||||
|
||||
def test_accepts_batch_of_50(client):
|
||||
body = json.dumps({"events": [_ev()] * 50}).encode()
|
||||
r = client.post(
|
||||
"/api/v1/aider/events",
|
||||
content=body,
|
||||
headers={
|
||||
"X-Aider-Signature": _sign(body),
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
assert r.status_code == 202
|
||||
assert r.json()["accepted"] == 50
|
||||
Reference in New Issue
Block a user