feat(service): aider_event_service — classify + signal_data builder (uses existing debounce)

This commit is contained in:
Your Name
2026-04-20 04:27:51 +08:00
parent 6bcbd12f6c
commit 964427c5d4
2 changed files with 204 additions and 0 deletions

View File

@@ -0,0 +1,111 @@
# aider_event_service | 2026-04-20 @ Asia/Taipei
"""aider event 分類 + 轉成 awoooi signal_data 給 IncidentService。
設計原則:
- 不重做 dedup — 既有 IncidentService.create_incident_from_signal 已有 3min fingerprint debounce
- 不做 pattern extract — Task A8 ai_router 會直接從 aider_event_repository 聚合
- 純函式為主,副作用(建 incident由 callerA7 processor job管理
"""
from __future__ import annotations
from typing import Any
from src.models.aider import AiderEventIn
from src.utils.secret_redactor import redact
# ---- 分類 ----
def classify_severity(ev: AiderEventIn) -> str | None:
"""回傳對應 awoooi Signal 的 severity 字串_parse_severity 會轉 P0-P3
None = 不該建 incident。
映射:
error → warning (P2)
silent_timeout → info (P3)
session_end + nonzero exit + error_count>=3 → high (P1)
session_end + nonzero exit → warning (P2)
其他 → None
"""
t = ev.type
if t == "error":
return "warning"
if t == "silent_timeout":
return "info"
if t == "session_end":
exit_code = ev.payload.get("exit_code", 0)
err_count = ev.payload.get("error_count", 0)
if exit_code != 0 and err_count >= 3:
return "high"
if exit_code != 0 or err_count >= 1:
return "warning"
return None
def should_create_incident(ev: AiderEventIn) -> bool:
return classify_severity(ev) is not None
# ---- signal_data 構造 ----
def build_signal_data(ev: AiderEventIn) -> dict[str, Any] | None:
"""把 AiderEventIn 轉成 IncidentService.create_incident_from_signal 所需 dict。
不該建 incident 的 eventsession_start 等)回傳 None。
Fingerprint 設計:'aider:{session_id}:{type}' — 讓既有 3min debounce 自然發揮
(同 session 60s 內連續 5 個 error 會只建 1 個 incident
"""
sev = classify_severity(ev)
if sev is None:
return None
p = redact(ev.payload)
cwd = p.get("cwd") or ""
model = p.get("model") or "unknown"
repo = _repo_basename(cwd)
alert_name_map = {
"error": "AiderError",
"silent_timeout": "AiderSilentTimeout",
"session_end": "AiderSessionFailure",
}
alert_name = alert_name_map.get(ev.type, "AiderEvent")
return {
"alert_name": alert_name,
"severity": sev,
"source": "manual", # aider 不屬 prometheus/signoz/alertmanager/telegram
"fingerprint": f"aider:{ev.session_id}:{ev.type}",
"target": repo or "unknown",
"labels": {
"session_id": ev.session_id,
"host": ev.host,
"repo": repo,
"model": model,
"event_type": ev.type,
},
"annotations": {
"summary": f"[aider/{ev.type}] {repo} ({model})",
"description": _compact_desc(ev.type, p),
},
}
def _repo_basename(cwd: str) -> str:
"""/Users/ogt/awoooi → awoooi"""
if not cwd:
return ""
return cwd.rstrip("/").rsplit("/", 1)[-1]
def _compact_desc(event_type: str, payload: dict) -> str:
"""把 payload 壓成 200 字內描述secret 已 redacted。"""
if event_type == "error":
kind = payload.get("kind", "unknown")
msg = payload.get("message", "")[:150]
return f"kind={kind} message={msg}"
if event_type == "silent_timeout":
return f"idle_sec={payload.get('idle_sec')} tail={payload.get('last_output_tail','')[:50]}"
if event_type == "session_end":
return (f"exit_code={payload.get('exit_code')} "
f"errors={payload.get('error_count',0)} "
f"duration={payload.get('duration_sec',0)}s "
f"tokens={payload.get('tokens_sent',0)}+{payload.get('tokens_received',0)}")
return str(payload)[:200]

View File

@@ -0,0 +1,93 @@
# Test aider_event_service | 2026-04-20 @ Asia/Taipei
import pytest
from datetime import datetime, timezone, timedelta
from src.services.aider_event_service import (
classify_severity, should_create_incident, build_signal_data,
)
from src.models.aider import AiderEventIn
TAIPEI = timezone(timedelta(hours=8))
def _ev(t, p=None, sid="s1"):
return AiderEventIn(
ts=datetime.now(TAIPEI), session_id=sid,
host="ogt-mac", type=t, payload=p or {},
)
def test_classify_error_is_warning():
# aider "error" event → signal severity "warning" → Severity.P2
assert classify_severity(_ev("error")) == "warning"
def test_classify_silent_timeout_is_info():
# silent_timeout → "info" → P3
assert classify_severity(_ev("silent_timeout")) == "info"
def test_classify_session_end_nonzero_exit_is_warning():
assert classify_severity(_ev("session_end", {"exit_code": 1})) == "warning"
def test_classify_session_end_multi_error_is_high():
# 3+ errors + nonzero exit → "high" → P1
assert classify_severity(_ev("session_end", {"exit_code": 1, "error_count": 5})) == "high"
def test_classify_session_end_clean_is_none():
ev = _ev("session_end", {"exit_code": 0, "error_count": 0})
assert classify_severity(ev) is None
assert should_create_incident(ev) is False
def test_classify_session_start_is_none():
assert should_create_incident(_ev("session_start")) is False
def test_classify_error_triggers_incident():
assert should_create_incident(_ev("error")) is True
def test_build_signal_data_fingerprint_per_session_type():
# 同 session + 同 type → 同 fingerprint讓既有 3min debounce 生效)
sd1 = build_signal_data(_ev("error", {"cwd": "/a", "model": "m"}, sid="sx"))
sd2 = build_signal_data(_ev("error", {"cwd": "/a", "model": "m"}, sid="sx"))
assert sd1["fingerprint"] == sd2["fingerprint"]
assert sd1["fingerprint"].startswith("aider:sx:error")
def test_build_signal_data_different_sessions_different_fp():
sd_a = build_signal_data(_ev("error", sid="sA"))
sd_b = build_signal_data(_ev("error", sid="sB"))
assert sd_a["fingerprint"] != sd_b["fingerprint"]
def test_build_signal_data_shape():
ev = _ev("error", {"cwd": "/Users/ogt/awoooi", "model": "elephant-alpha",
"message": "api timeout", "kind": "api_rate_limit"})
sd = build_signal_data(ev)
# 必要 keys
for k in ("alert_name", "severity", "source", "fingerprint", "target",
"labels", "annotations"):
assert k in sd, f"missing {k}"
assert sd["source"] == "manual"
assert sd["target"] == "awoooi" # 從 cwd basename
assert sd["labels"]["session_id"] == "s1"
assert sd["labels"]["model"] == "elephant-alpha"
def test_build_signal_data_redacts_secrets_in_annotations():
"""secret 不可進 annotations/labels"""
ev = _ev("error", {"cwd": "/r", "model": "m",
"message": "key sk-or-v1-abcdef0123456789ABCDEFghijklmnopqrstuv fail"})
sd = build_signal_data(ev)
# annotations 可能含 message — 需已遮罩
annot_str = str(sd["annotations"])
assert "sk-or-v1-abcdef" not in annot_str
assert "<redacted:" in annot_str
def test_build_signal_data_returns_none_for_non_incident_types():
# session_start 不該建 signal
assert build_signal_data(_ev("session_start")) is None