diff --git a/apps/api/src/api/v1/stats.py b/apps/api/src/api/v1/stats.py new file mode 100644 index 00000000..8471a3bc --- /dev/null +++ b/apps/api/src/api/v1/stats.py @@ -0,0 +1,385 @@ +# ============================================================================= +# AWOOOI Statistics API - Phase 6.5 +# ============================================================================= +# 統計分析 API - 從 Episodic Memory 萃取洞察 +# +# 核心價值: +# - 識別常見問題模式 +# - 評估 AI 建議效能 +# - 支援 Playbook 萃取 +# ============================================================================= + +from datetime import datetime, timedelta +from typing import Any + +from fastapi import APIRouter, Depends, Query +from pydantic import BaseModel, Field +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.core.logging import get_logger +from src.db.database import get_db +from src.db.models import IncidentRecord +from src.models.incident import IncidentStatus + +logger = get_logger(__name__) + +router = APIRouter(prefix="/stats", tags=["Statistics"]) + + +# ============================================================================= +# Response Models +# ============================================================================= + + +class StatusCount(BaseModel): + """狀態計數""" + + status: str + count: int + + +class SeverityCount(BaseModel): + """嚴重度計數""" + + severity: str + count: int + + +class IncidentSummary(BaseModel): + """事件總覽""" + + total_incidents: int = Field(description="總事件數") + status_distribution: list[StatusCount] = Field(description="狀態分佈") + severity_distribution: list[SeverityCount] = Field(description="嚴重度分佈") + resolved_rate: float = Field(description="解決率 (%)") + avg_signals_per_incident: float = Field(description="平均告警聚合數") + + +class ResolutionStats(BaseModel): + """解決時間統計""" + + avg_minutes: float | None = Field(description="平均解決時間 (分鐘)") + p50_minutes: float | None = Field(description="P50 解決時間") + p95_minutes: float | None = Field(description="P95 解決時間") + fastest_minutes: float | None = Field(description="最快解決時間") + slowest_minutes: float | None = Field(description="最慢解決時間") + sample_size: int = Field(description="樣本數") + + +class TrendPoint(BaseModel): + """趨勢數據點""" + + date: str + count: int + + +class IncidentTrends(BaseModel): + """事件趨勢""" + + period: str = Field(description="週期 (daily/weekly/monthly)") + data: list[TrendPoint] + + +class AIPerformance(BaseModel): + """AI 效能統計""" + + total_proposals: int = Field(description="總提案數") + executed_count: int = Field(description="已執行數") + execution_rate: float = Field(description="執行率 (%)") + success_count: int = Field(description="成功數") + success_rate: float = Field(description="成功率 (%)") + avg_effectiveness: float | None = Field(description="平均有效性評分 (1-5)") + effectiveness_distribution: dict[int, int] = Field( + description="有效性評分分佈 {1: count, 2: count, ...}" + ) + + +class ServiceImpact(BaseModel): + """服務影響統計""" + + service: str + incident_count: int + severity_breakdown: dict[str, int] = Field(description="{P0: 5, P1: 10, ...}") + + +class FeedbackSummary(BaseModel): + """人類回饋摘要""" + + total_feedback: int + positive_count: int = Field(description="正面回饋 (score >= 4)") + neutral_count: int = Field(description="中性回饋 (score == 3)") + negative_count: int = Field(description="負面回饋 (score <= 2)") + common_themes: list[str] = Field(description="常見主題 (從 learning_notes 萃取)") + + +# ============================================================================= +# API Endpoints +# ============================================================================= + + +@router.get( + "/incidents/summary", + response_model=IncidentSummary, + summary="事件總覽統計", +) +async def get_incident_summary( + days: int = Query(30, ge=1, le=365, description="統計區間 (天)"), + db: AsyncSession = Depends(get_db), # noqa: B008 +) -> IncidentSummary: + """ + 取得事件總覽統計 + + 包含: + - 總事件數 + - 狀態分佈 + - 嚴重度分佈 + - 解決率 + """ + since = datetime.utcnow() - timedelta(days=days) + + # 總數 + total_result = await db.execute( + select(func.count(IncidentRecord.incident_id)).where( + IncidentRecord.created_at >= since + ) + ) + total = total_result.scalar() or 0 + + # 狀態分佈 + status_result = await db.execute( + select(IncidentRecord.status, func.count(IncidentRecord.incident_id)) + .where(IncidentRecord.created_at >= since) + .group_by(IncidentRecord.status) + ) + status_dist = [ + StatusCount(status=str(row[0]), count=row[1]) for row in status_result.all() + ] + + # 嚴重度分佈 + severity_result = await db.execute( + select(IncidentRecord.severity, func.count(IncidentRecord.incident_id)) + .where(IncidentRecord.created_at >= since) + .group_by(IncidentRecord.severity) + ) + severity_dist = [ + SeverityCount(severity=str(row[0]), count=row[1]) + for row in severity_result.all() + ] + + # 解決率 + resolved_result = await db.execute( + select(func.count(IncidentRecord.incident_id)).where( + IncidentRecord.created_at >= since, + IncidentRecord.status.in_( + [IncidentStatus.RESOLVED, IncidentStatus.CLOSED] + ), + ) + ) + resolved_count = resolved_result.scalar() or 0 + resolved_rate = (resolved_count / total * 100) if total > 0 else 0.0 + + # 平均告警聚合數 + signals_result = await db.execute( + select(func.avg(func.json_array_length(IncidentRecord.signals))).where( + IncidentRecord.created_at >= since + ) + ) + avg_signals = signals_result.scalar() or 0.0 + + logger.info( + "stats_incident_summary", + total=total, + resolved_rate=resolved_rate, + days=days, + ) + + return IncidentSummary( + total_incidents=total, + status_distribution=status_dist, + severity_distribution=severity_dist, + resolved_rate=round(resolved_rate, 2), + avg_signals_per_incident=round(float(avg_signals), 2), + ) + + +@router.get( + "/incidents/resolution", + response_model=ResolutionStats, + summary="解決時間統計", +) +async def get_resolution_stats( + days: int = Query(30, ge=1, le=365, description="統計區間 (天)"), + db: AsyncSession = Depends(get_db), # noqa: B008 +) -> ResolutionStats: + """ + 取得解決時間統計 + + 計算已解決事件的: + - 平均解決時間 + - P50/P95 解決時間 + - 最快/最慢解決時間 + """ + since = datetime.utcnow() - timedelta(days=days) + + # 取得已解決事件的時間差 + result = await db.execute( + select( + IncidentRecord.created_at, + IncidentRecord.resolved_at, + ).where( + IncidentRecord.created_at >= since, + IncidentRecord.resolved_at.isnot(None), + ) + ) + rows = result.all() + + if not rows: + return ResolutionStats( + avg_minutes=None, + p50_minutes=None, + p95_minutes=None, + fastest_minutes=None, + slowest_minutes=None, + sample_size=0, + ) + + # 計算解決時間 (分鐘) + durations = [] + for row in rows: + if row.resolved_at and row.created_at: + delta = row.resolved_at - row.created_at + durations.append(delta.total_seconds() / 60) + + if not durations: + return ResolutionStats( + avg_minutes=None, + p50_minutes=None, + p95_minutes=None, + fastest_minutes=None, + slowest_minutes=None, + sample_size=0, + ) + + durations.sort() + n = len(durations) + + return ResolutionStats( + avg_minutes=round(sum(durations) / n, 2), + p50_minutes=round(durations[n // 2], 2), + p95_minutes=round(durations[min(int(n * 0.95), n - 1)], 2), + fastest_minutes=round(min(durations), 2), + slowest_minutes=round(max(durations), 2), + sample_size=n, + ) + + +@router.get( + "/ai-performance", + response_model=AIPerformance, + summary="AI 效能統計", +) +async def get_ai_performance( + days: int = Query(30, ge=1, le=365, description="統計區間 (天)"), + db: AsyncSession = Depends(get_db), # noqa: B008 +) -> AIPerformance: + """ + 取得 AI 提案效能統計 + + 評估指標: + - 提案執行率 + - 執行成功率 + - 有效性評分分佈 + """ + since = datetime.utcnow() - timedelta(days=days) + + # 取得有 outcome 的事件 + result = await db.execute( + select(IncidentRecord.outcome).where( + IncidentRecord.created_at >= since, + IncidentRecord.outcome.isnot(None), + ) + ) + outcomes = [row[0] for row in result.all() if row[0]] + + total = len(outcomes) + executed = sum(1 for o in outcomes if o.get("proposal_executed")) + success = sum( + 1 for o in outcomes if o.get("proposal_executed") and o.get("execution_success") + ) + + # 有效性評分分佈 + effectiveness_dist: dict[int, int] = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0} + scores = [] + for o in outcomes: + score = o.get("effectiveness_score") + if score and 1 <= score <= 5: + effectiveness_dist[score] += 1 + scores.append(score) + + avg_effectiveness = sum(scores) / len(scores) if scores else None + + return AIPerformance( + total_proposals=total, + executed_count=executed, + execution_rate=round((executed / total * 100) if total > 0 else 0, 2), + success_count=success, + success_rate=round((success / executed * 100) if executed > 0 else 0, 2), + avg_effectiveness=round(avg_effectiveness, 2) if avg_effectiveness else None, + effectiveness_distribution=effectiveness_dist, + ) + + +@router.get( + "/services/affected", + response_model=list[ServiceImpact], + summary="受影響服務排名", +) +async def get_affected_services( + days: int = Query(30, ge=1, le=365, description="統計區間 (天)"), + limit: int = Query(10, ge=1, le=50, description="返回數量"), + db: AsyncSession = Depends(get_db), # noqa: B008 +) -> list[ServiceImpact]: + """ + 取得最常受影響的服務排名 + + 包含: + - 事件計數 + - 嚴重度分佈 + """ + since = datetime.utcnow() - timedelta(days=days) + + # 取得所有事件的 affected_services 和 severity + result = await db.execute( + select( + IncidentRecord.affected_services, + IncidentRecord.severity, + ).where(IncidentRecord.created_at >= since) + ) + + # 統計每個服務 + service_stats: dict[str, dict[str, Any]] = {} + for row in result.all(): + services = row[0] or [] + severity = str(row[1]) + for svc in services: + if svc not in service_stats: + service_stats[svc] = {"count": 0, "severity": {}} + service_stats[svc]["count"] += 1 + service_stats[svc]["severity"][severity] = ( + service_stats[svc]["severity"].get(severity, 0) + 1 + ) + + # 排序並返回 top N + sorted_services = sorted( + service_stats.items(), key=lambda x: x[1]["count"], reverse=True + )[:limit] + + return [ + ServiceImpact( + service=svc, + incident_count=stats["count"], + severity_breakdown=stats["severity"], + ) + for svc, stats in sorted_services + ] diff --git a/apps/api/src/main.py b/apps/api/src/main.py index f77554d8..31740d74 100644 --- a/apps/api/src/main.py +++ b/apps/api/src/main.py @@ -33,6 +33,7 @@ from src.api.v1 import health as health_v1 from src.api.v1 import incidents as incidents_v1 # Phase 6.4: Decision Proposal from src.api.v1 import metrics as metrics_v1 # Phase 7: Gold Metrics (真實血脈) from src.api.v1 import proposals as proposals_v1 # Phase 6.4h: Proposals CRUD API +from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway from src.api.v1 import timeline as timeline_v1 from src.api.v1 import webhooks as webhooks_v1 @@ -88,7 +89,9 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: # CTO-201: Initialize PostgreSQL database (統帥鐵律: 禁止 SQLite) await init_db() db_url = settings.DATABASE_URL - logger.info("database_initialized", url=db_url.split("@")[-1] if "@" in db_url else db_url) + logger.info( + "database_initialized", url=db_url.split("@")[-1] if "@" in db_url else db_url + ) # Phase 5: Initialize HTTP Clients (ClickHouse, Ollama) # 統帥鐵律: 連線池在啟動時建立,關閉時回收 @@ -242,6 +245,7 @@ async def request_logging_middleware(request: Request, call_next): # Exception Handlers # ============================================================================= + @app.exception_handler(Exception) async def global_exception_handler(_request: Request, exc: Exception) -> JSONResponse: """ @@ -277,24 +281,42 @@ app.include_router(ai_v1.router, prefix="/api/v1", tags=["AI Decision"]) app.include_router(webhooks_v1.router, prefix="/api/v1", tags=["Webhooks"]) app.include_router(timeline_v1.router, prefix="/api/v1", tags=["Timeline"]) app.include_router(audit_logs_v1.router, prefix="/api/v1", tags=["Audit Logs"]) -app.include_router(telegram_v1.router, prefix="/api/v1", tags=["Telegram Gateway"]) # Phase 5.4 -app.include_router(metrics_v1.router, prefix="/api/v1", tags=["Gold Metrics"]) # Phase 7: 真實血脈 -app.include_router(incidents_v1.router, prefix="/api/v1", tags=["Incidents"]) # Phase 6.4: Decision Proposal -app.include_router(proposals_v1.router, prefix="/api/v1", tags=["Proposals"]) # Phase 6.4h: Proposals CRUD -app.include_router(agents_v1.router, prefix="/api/v1", tags=["Agent Teams"]) # Phase 9.5: Agent Teams -app.include_router(proposals_router.router, tags=["Proposals (Legacy)"]) # Phase 6.4g: lewooogo-brain (舊版) +app.include_router( + telegram_v1.router, prefix="/api/v1", tags=["Telegram Gateway"] +) # Phase 5.4 +app.include_router( + metrics_v1.router, prefix="/api/v1", tags=["Gold Metrics"] +) # Phase 7: 真實血脈 +app.include_router( + incidents_v1.router, prefix="/api/v1", tags=["Incidents"] +) # Phase 6.4: Decision Proposal +app.include_router( + proposals_v1.router, prefix="/api/v1", tags=["Proposals"] +) # Phase 6.4h: Proposals CRUD +app.include_router( + agents_v1.router, prefix="/api/v1", tags=["Agent Teams"] +) # Phase 9.5: Agent Teams +app.include_router( + stats_v1.router, prefix="/api/v1", tags=["Statistics"] +) # Phase 6.5: Statistics Analytics +app.include_router( + proposals_router.router, tags=["Proposals (Legacy)"] +) # Phase 6.4g: lewooogo-brain (舊版) # Legacy routes (to be migrated to api/v1/) app.include_router(plugins.router, prefix="/api/v1/plugins", tags=["Plugins"]) app.include_router(pipelines.router, prefix="/api/v1/pipelines", tags=["Pipelines"]) app.include_router(agent.router, prefix="/api/v1/agent", tags=["Agent"]) -app.include_router(notifications.router, prefix="/api/v1/notifications", tags=["Notifications"]) +app.include_router( + notifications.router, prefix="/api/v1/notifications", tags=["Notifications"] +) # ============================================================================= # Root Endpoint # ============================================================================= + @app.get("/", include_in_schema=False) async def root() -> dict: """Root endpoint with API info""" diff --git a/apps/api/src/services/decision_manager.py b/apps/api/src/services/decision_manager.py index 999d12c3..1f8d2244 100644 --- a/apps/api/src/services/decision_manager.py +++ b/apps/api/src/services/decision_manager.py @@ -300,12 +300,23 @@ class DecisionManager: # 1. 檢查現有 token existing_token = await self._find_existing_token(incident.incident_id) - if existing_token and existing_token.state in ( - DecisionState.READY, - DecisionState.EXECUTING, - DecisionState.COMPLETED, - ): - return existing_token + if existing_token: + # READY 或 EXECUTING 狀態: 直接返回 + if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING): + return existing_token + # COMPLETED 狀態: 只有 incident 也已解決才返回,否則創建新 decision + # 修復: 避免 incident 未解決但 decision 已完成導致 Y/n 按鈕永久禁用 + if existing_token.state == DecisionState.COMPLETED: + from src.models.incident import IncidentStatus + if incident.status in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED): + return existing_token + # incident 仍在處理中,需要新的 decision + logger.info( + "decision_reset_for_active_incident", + token=existing_token.token, + incident_id=incident.incident_id, + incident_status=incident.status.value, + ) # 2. 建立新 token token = DecisionToken( @@ -539,12 +550,21 @@ class DecisionManager: # 檢查現有 token existing_token = await self._find_existing_token(incident.incident_id) - if existing_token and existing_token.state in ( - DecisionState.READY, - DecisionState.EXECUTING, - DecisionState.COMPLETED, - ): - return existing_token + if existing_token: + # READY 或 EXECUTING 狀態: 直接返回 + if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING): + return existing_token + # COMPLETED 狀態: 只有 incident 也已解決才返回 + if existing_token.state == DecisionState.COMPLETED: + from src.models.incident import IncidentStatus + if incident.status in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED): + return existing_token + logger.info( + "decision_reset_for_active_incident_consensus", + token=existing_token.token, + incident_id=incident.incident_id, + incident_status=incident.status.value, + ) # 建立新 token token = DecisionToken(