feat(repo): AiderEventRepository CRUD + model_stats + pattern candidates
This commit is contained in:
151
apps/api/src/repositories/aider_event_repository.py
Normal file
151
apps/api/src/repositories/aider_event_repository.py
Normal file
@@ -0,0 +1,151 @@
|
||||
"""aider_events CRUD + 聚合查詢。
|
||||
|
||||
2026-04-20 @ Asia/Taipei
|
||||
Repository for aider-watch event stream (aider CLI activity tracking)
|
||||
支援 AI Router feedback + symptom_pattern 抽取
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
import json
|
||||
|
||||
import structlog
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from src.db.base import get_db_context
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
class AiderEventRepository:
|
||||
"""aider_events 資料庫操作"""
|
||||
|
||||
def __init__(self, session: AsyncSession):
|
||||
self.session = session
|
||||
|
||||
async def insert(self, *, session_id: str, ts: datetime, type_: str,
|
||||
host: str, payload: dict[str, Any],
|
||||
incident_id: str | None = None) -> int:
|
||||
"""
|
||||
插入單筆 aider event
|
||||
|
||||
Args:
|
||||
session_id: aider session ID
|
||||
ts: event timestamp (timezone-aware)
|
||||
type_: event type (session_start|file_edit|error|commit|silent_timeout|session_end|raw)
|
||||
host: hostname (default: ogt-mac)
|
||||
payload: type-specific JSON payload
|
||||
incident_id: optional FK to incidents table
|
||||
|
||||
Returns:
|
||||
inserted record id
|
||||
"""
|
||||
r = await self.session.execute(text("""
|
||||
INSERT INTO aider_events (session_id, ts, type, host, payload, incident_id)
|
||||
VALUES (:sid, :ts, :t, :h, CAST(:p AS JSONB), :inc)
|
||||
RETURNING id
|
||||
"""), {"sid": session_id, "ts": ts, "t": type_, "h": host,
|
||||
"p": _json(payload), "inc": incident_id})
|
||||
return r.scalar_one()
|
||||
|
||||
async def link_incident(self, event_ids: list[int], incident_id: str) -> None:
|
||||
"""更新多筆 event 的 incident_id (batch update)"""
|
||||
await self.session.execute(text("""
|
||||
UPDATE aider_events SET incident_id = :inc WHERE id = ANY(:ids)
|
||||
"""), {"inc": incident_id, "ids": event_ids})
|
||||
|
||||
async def count_by_session(self, session_id: str) -> int:
|
||||
"""統計特定 session 的 event 數量"""
|
||||
r = await self.session.execute(text(
|
||||
"SELECT count(*) FROM aider_events WHERE session_id = :sid"
|
||||
), {"sid": session_id})
|
||||
return r.scalar_one()
|
||||
|
||||
async def model_stats_since(self, days: int = 7) -> list[dict]:
|
||||
"""
|
||||
回傳 [{repo, model, total, errors, success_rate}, ...]
|
||||
|
||||
聚合查詢:按 repo + model 分組,統計:
|
||||
- session 總數 (total)
|
||||
- 含 error 或 非零 exit_code 的 session 數 (errors)
|
||||
- 成功率 (success_rate = 1 - error_rate)
|
||||
|
||||
用途:AI Router feedback, model performance monitoring
|
||||
"""
|
||||
cutoff = datetime.now().astimezone() - timedelta(days=days)
|
||||
r = await self.session.execute(text("""
|
||||
WITH session_models AS (
|
||||
SELECT session_id,
|
||||
MAX(CASE WHEN type='session_start'
|
||||
THEN payload->>'model' END) AS model,
|
||||
MAX(CASE WHEN type='session_start'
|
||||
THEN payload->>'cwd' END) AS repo,
|
||||
COUNT(*) FILTER (WHERE type='error') AS err_count,
|
||||
COUNT(*) FILTER (WHERE type='session_end'
|
||||
AND (payload->>'exit_code')::int != 0) AS nonzero_exit
|
||||
FROM aider_events
|
||||
WHERE ts >= :cutoff
|
||||
GROUP BY session_id
|
||||
)
|
||||
SELECT repo, model,
|
||||
COUNT(*) AS total,
|
||||
SUM(CASE WHEN err_count>0 OR nonzero_exit>0 THEN 1 ELSE 0 END) AS errors,
|
||||
1.0 - AVG(CASE WHEN err_count>0 OR nonzero_exit>0 THEN 1.0 ELSE 0.0 END) AS success_rate
|
||||
FROM session_models
|
||||
WHERE model IS NOT NULL AND repo IS NOT NULL
|
||||
GROUP BY repo, model
|
||||
"""), {"cutoff": cutoff})
|
||||
return [dict(row._mapping) for row in r.fetchall()]
|
||||
|
||||
async def daily_pattern_candidates(self, *, days: int = 7,
|
||||
threshold: float = 0.25,
|
||||
min_samples: int = 3) -> list[dict]:
|
||||
"""
|
||||
抽 symptom_pattern 候選:error_rate >= threshold, samples >= min_samples
|
||||
|
||||
Returns:
|
||||
[{repo, model, error_rate, samples}, ...]
|
||||
|
||||
用途:
|
||||
- Task A5: 從高錯誤率模型自動生成 playbook
|
||||
- 識別系統性問題:某個 repo+model 組合持續失敗
|
||||
"""
|
||||
rows = await self.model_stats_since(days=days)
|
||||
out = []
|
||||
for row in rows:
|
||||
if row["total"] < min_samples:
|
||||
continue
|
||||
err_rate = (row["errors"] or 0) / row["total"]
|
||||
if err_rate < threshold:
|
||||
continue
|
||||
out.append({
|
||||
"repo": row["repo"],
|
||||
"model": row["model"],
|
||||
"error_rate": round(err_rate, 3),
|
||||
"samples": row["total"],
|
||||
})
|
||||
return out
|
||||
|
||||
|
||||
def _json(obj: Any) -> str:
|
||||
"""序列化 Python object 為 JSON string (JSONB 注入用)"""
|
||||
return json.dumps(obj, ensure_ascii=False, default=str)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Singleton Factory
|
||||
# =============================================================================
|
||||
|
||||
_aider_event_repo: AiderEventRepository | None = None
|
||||
|
||||
|
||||
async def get_aider_event_repository() -> AiderEventRepository:
|
||||
"""取得 AiderEventRepository 實例 (async factory)
|
||||
|
||||
用法:
|
||||
async with get_db_context() as db:
|
||||
repo = AiderEventRepository(db)
|
||||
event_id = await repo.insert(...)
|
||||
"""
|
||||
return AiderEventRepository(await get_db_context().__aenter__())
|
||||
Reference in New Issue
Block a user