diff --git a/apps/api/src/repositories/aider_event_repository.py b/apps/api/src/repositories/aider_event_repository.py new file mode 100644 index 00000000..c9cfda0b --- /dev/null +++ b/apps/api/src/repositories/aider_event_repository.py @@ -0,0 +1,151 @@ +"""aider_events CRUD + 聚合查詢。 + +2026-04-20 @ Asia/Taipei +Repository for aider-watch event stream (aider CLI activity tracking) +支援 AI Router feedback + symptom_pattern 抽取 +""" +from __future__ import annotations +from datetime import datetime, timedelta +from typing import Any +import json + +import structlog +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from src.db.base import get_db_context + +logger = structlog.get_logger(__name__) + + +class AiderEventRepository: + """aider_events 資料庫操作""" + + def __init__(self, session: AsyncSession): + self.session = session + + async def insert(self, *, session_id: str, ts: datetime, type_: str, + host: str, payload: dict[str, Any], + incident_id: str | None = None) -> int: + """ + 插入單筆 aider event + + Args: + session_id: aider session ID + ts: event timestamp (timezone-aware) + type_: event type (session_start|file_edit|error|commit|silent_timeout|session_end|raw) + host: hostname (default: ogt-mac) + payload: type-specific JSON payload + incident_id: optional FK to incidents table + + Returns: + inserted record id + """ + r = await self.session.execute(text(""" + INSERT INTO aider_events (session_id, ts, type, host, payload, incident_id) + VALUES (:sid, :ts, :t, :h, CAST(:p AS JSONB), :inc) + RETURNING id + """), {"sid": session_id, "ts": ts, "t": type_, "h": host, + "p": _json(payload), "inc": incident_id}) + return r.scalar_one() + + async def link_incident(self, event_ids: list[int], incident_id: str) -> None: + """更新多筆 event 的 incident_id (batch update)""" + await self.session.execute(text(""" + UPDATE aider_events SET incident_id = :inc WHERE id = ANY(:ids) + """), {"inc": incident_id, "ids": event_ids}) + + async def count_by_session(self, session_id: str) -> int: + """統計特定 session 的 event 數量""" + r = await self.session.execute(text( + "SELECT count(*) FROM aider_events WHERE session_id = :sid" + ), {"sid": session_id}) + return r.scalar_one() + + async def model_stats_since(self, days: int = 7) -> list[dict]: + """ + 回傳 [{repo, model, total, errors, success_rate}, ...] + + 聚合查詢:按 repo + model 分組,統計: + - session 總數 (total) + - 含 error 或 非零 exit_code 的 session 數 (errors) + - 成功率 (success_rate = 1 - error_rate) + + 用途:AI Router feedback, model performance monitoring + """ + cutoff = datetime.now().astimezone() - timedelta(days=days) + r = await self.session.execute(text(""" + WITH session_models AS ( + SELECT session_id, + MAX(CASE WHEN type='session_start' + THEN payload->>'model' END) AS model, + MAX(CASE WHEN type='session_start' + THEN payload->>'cwd' END) AS repo, + COUNT(*) FILTER (WHERE type='error') AS err_count, + COUNT(*) FILTER (WHERE type='session_end' + AND (payload->>'exit_code')::int != 0) AS nonzero_exit + FROM aider_events + WHERE ts >= :cutoff + GROUP BY session_id + ) + SELECT repo, model, + COUNT(*) AS total, + SUM(CASE WHEN err_count>0 OR nonzero_exit>0 THEN 1 ELSE 0 END) AS errors, + 1.0 - AVG(CASE WHEN err_count>0 OR nonzero_exit>0 THEN 1.0 ELSE 0.0 END) AS success_rate + FROM session_models + WHERE model IS NOT NULL AND repo IS NOT NULL + GROUP BY repo, model + """), {"cutoff": cutoff}) + return [dict(row._mapping) for row in r.fetchall()] + + async def daily_pattern_candidates(self, *, days: int = 7, + threshold: float = 0.25, + min_samples: int = 3) -> list[dict]: + """ + 抽 symptom_pattern 候選:error_rate >= threshold, samples >= min_samples + + Returns: + [{repo, model, error_rate, samples}, ...] + + 用途: + - Task A5: 從高錯誤率模型自動生成 playbook + - 識別系統性問題:某個 repo+model 組合持續失敗 + """ + rows = await self.model_stats_since(days=days) + out = [] + for row in rows: + if row["total"] < min_samples: + continue + err_rate = (row["errors"] or 0) / row["total"] + if err_rate < threshold: + continue + out.append({ + "repo": row["repo"], + "model": row["model"], + "error_rate": round(err_rate, 3), + "samples": row["total"], + }) + return out + + +def _json(obj: Any) -> str: + """序列化 Python object 為 JSON string (JSONB 注入用)""" + return json.dumps(obj, ensure_ascii=False, default=str) + + +# ============================================================================= +# Singleton Factory +# ============================================================================= + +_aider_event_repo: AiderEventRepository | None = None + + +async def get_aider_event_repository() -> AiderEventRepository: + """取得 AiderEventRepository 實例 (async factory) + + 用法: + async with get_db_context() as db: + repo = AiderEventRepository(db) + event_id = await repo.insert(...) + """ + return AiderEventRepository(await get_db_context().__aenter__())