feat(api): Phase 6.5 Statistics API + Y/n 按鈕修復

新增:
- /stats/incidents/summary - 事件總覽統計
- /stats/incidents/resolution - 解決時間 P50/P95
- /stats/ai-performance - AI 提案效能
- /stats/services/affected - 受影響服務排名

修復:
- Y/n 按鈕永久禁用問題 (decision.state=completed 但 incident 未解決)
- decision_manager.py: 只有當 incident 也已解決才返回已完成的 decision

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-24 09:50:03 +08:00
parent ab7ad09ed6
commit 765ee39a90
3 changed files with 447 additions and 20 deletions

View File

@@ -0,0 +1,385 @@
# =============================================================================
# AWOOOI Statistics API - Phase 6.5
# =============================================================================
# 統計分析 API - 從 Episodic Memory 萃取洞察
#
# 核心價值:
# - 識別常見問題模式
# - 評估 AI 建議效能
# - 支援 Playbook 萃取
# =============================================================================
from datetime import datetime, timedelta
from typing import Any
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel, Field
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from src.core.logging import get_logger
from src.db.database import get_db
from src.db.models import IncidentRecord
from src.models.incident import IncidentStatus
logger = get_logger(__name__)
router = APIRouter(prefix="/stats", tags=["Statistics"])
# =============================================================================
# Response Models
# =============================================================================
class StatusCount(BaseModel):
"""狀態計數"""
status: str
count: int
class SeverityCount(BaseModel):
"""嚴重度計數"""
severity: str
count: int
class IncidentSummary(BaseModel):
"""事件總覽"""
total_incidents: int = Field(description="總事件數")
status_distribution: list[StatusCount] = Field(description="狀態分佈")
severity_distribution: list[SeverityCount] = Field(description="嚴重度分佈")
resolved_rate: float = Field(description="解決率 (%)")
avg_signals_per_incident: float = Field(description="平均告警聚合數")
class ResolutionStats(BaseModel):
"""解決時間統計"""
avg_minutes: float | None = Field(description="平均解決時間 (分鐘)")
p50_minutes: float | None = Field(description="P50 解決時間")
p95_minutes: float | None = Field(description="P95 解決時間")
fastest_minutes: float | None = Field(description="最快解決時間")
slowest_minutes: float | None = Field(description="最慢解決時間")
sample_size: int = Field(description="樣本數")
class TrendPoint(BaseModel):
"""趨勢數據點"""
date: str
count: int
class IncidentTrends(BaseModel):
"""事件趨勢"""
period: str = Field(description="週期 (daily/weekly/monthly)")
data: list[TrendPoint]
class AIPerformance(BaseModel):
"""AI 效能統計"""
total_proposals: int = Field(description="總提案數")
executed_count: int = Field(description="已執行數")
execution_rate: float = Field(description="執行率 (%)")
success_count: int = Field(description="成功數")
success_rate: float = Field(description="成功率 (%)")
avg_effectiveness: float | None = Field(description="平均有效性評分 (1-5)")
effectiveness_distribution: dict[int, int] = Field(
description="有效性評分分佈 {1: count, 2: count, ...}"
)
class ServiceImpact(BaseModel):
"""服務影響統計"""
service: str
incident_count: int
severity_breakdown: dict[str, int] = Field(description="{P0: 5, P1: 10, ...}")
class FeedbackSummary(BaseModel):
"""人類回饋摘要"""
total_feedback: int
positive_count: int = Field(description="正面回饋 (score >= 4)")
neutral_count: int = Field(description="中性回饋 (score == 3)")
negative_count: int = Field(description="負面回饋 (score <= 2)")
common_themes: list[str] = Field(description="常見主題 (從 learning_notes 萃取)")
# =============================================================================
# API Endpoints
# =============================================================================
@router.get(
"/incidents/summary",
response_model=IncidentSummary,
summary="事件總覽統計",
)
async def get_incident_summary(
days: int = Query(30, ge=1, le=365, description="統計區間 (天)"),
db: AsyncSession = Depends(get_db), # noqa: B008
) -> IncidentSummary:
"""
取得事件總覽統計
包含:
- 總事件數
- 狀態分佈
- 嚴重度分佈
- 解決率
"""
since = datetime.utcnow() - timedelta(days=days)
# 總數
total_result = await db.execute(
select(func.count(IncidentRecord.incident_id)).where(
IncidentRecord.created_at >= since
)
)
total = total_result.scalar() or 0
# 狀態分佈
status_result = await db.execute(
select(IncidentRecord.status, func.count(IncidentRecord.incident_id))
.where(IncidentRecord.created_at >= since)
.group_by(IncidentRecord.status)
)
status_dist = [
StatusCount(status=str(row[0]), count=row[1]) for row in status_result.all()
]
# 嚴重度分佈
severity_result = await db.execute(
select(IncidentRecord.severity, func.count(IncidentRecord.incident_id))
.where(IncidentRecord.created_at >= since)
.group_by(IncidentRecord.severity)
)
severity_dist = [
SeverityCount(severity=str(row[0]), count=row[1])
for row in severity_result.all()
]
# 解決率
resolved_result = await db.execute(
select(func.count(IncidentRecord.incident_id)).where(
IncidentRecord.created_at >= since,
IncidentRecord.status.in_(
[IncidentStatus.RESOLVED, IncidentStatus.CLOSED]
),
)
)
resolved_count = resolved_result.scalar() or 0
resolved_rate = (resolved_count / total * 100) if total > 0 else 0.0
# 平均告警聚合數
signals_result = await db.execute(
select(func.avg(func.json_array_length(IncidentRecord.signals))).where(
IncidentRecord.created_at >= since
)
)
avg_signals = signals_result.scalar() or 0.0
logger.info(
"stats_incident_summary",
total=total,
resolved_rate=resolved_rate,
days=days,
)
return IncidentSummary(
total_incidents=total,
status_distribution=status_dist,
severity_distribution=severity_dist,
resolved_rate=round(resolved_rate, 2),
avg_signals_per_incident=round(float(avg_signals), 2),
)
@router.get(
"/incidents/resolution",
response_model=ResolutionStats,
summary="解決時間統計",
)
async def get_resolution_stats(
days: int = Query(30, ge=1, le=365, description="統計區間 (天)"),
db: AsyncSession = Depends(get_db), # noqa: B008
) -> ResolutionStats:
"""
取得解決時間統計
計算已解決事件的:
- 平均解決時間
- P50/P95 解決時間
- 最快/最慢解決時間
"""
since = datetime.utcnow() - timedelta(days=days)
# 取得已解決事件的時間差
result = await db.execute(
select(
IncidentRecord.created_at,
IncidentRecord.resolved_at,
).where(
IncidentRecord.created_at >= since,
IncidentRecord.resolved_at.isnot(None),
)
)
rows = result.all()
if not rows:
return ResolutionStats(
avg_minutes=None,
p50_minutes=None,
p95_minutes=None,
fastest_minutes=None,
slowest_minutes=None,
sample_size=0,
)
# 計算解決時間 (分鐘)
durations = []
for row in rows:
if row.resolved_at and row.created_at:
delta = row.resolved_at - row.created_at
durations.append(delta.total_seconds() / 60)
if not durations:
return ResolutionStats(
avg_minutes=None,
p50_minutes=None,
p95_minutes=None,
fastest_minutes=None,
slowest_minutes=None,
sample_size=0,
)
durations.sort()
n = len(durations)
return ResolutionStats(
avg_minutes=round(sum(durations) / n, 2),
p50_minutes=round(durations[n // 2], 2),
p95_minutes=round(durations[min(int(n * 0.95), n - 1)], 2),
fastest_minutes=round(min(durations), 2),
slowest_minutes=round(max(durations), 2),
sample_size=n,
)
@router.get(
"/ai-performance",
response_model=AIPerformance,
summary="AI 效能統計",
)
async def get_ai_performance(
days: int = Query(30, ge=1, le=365, description="統計區間 (天)"),
db: AsyncSession = Depends(get_db), # noqa: B008
) -> AIPerformance:
"""
取得 AI 提案效能統計
評估指標:
- 提案執行率
- 執行成功率
- 有效性評分分佈
"""
since = datetime.utcnow() - timedelta(days=days)
# 取得有 outcome 的事件
result = await db.execute(
select(IncidentRecord.outcome).where(
IncidentRecord.created_at >= since,
IncidentRecord.outcome.isnot(None),
)
)
outcomes = [row[0] for row in result.all() if row[0]]
total = len(outcomes)
executed = sum(1 for o in outcomes if o.get("proposal_executed"))
success = sum(
1 for o in outcomes if o.get("proposal_executed") and o.get("execution_success")
)
# 有效性評分分佈
effectiveness_dist: dict[int, int] = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
scores = []
for o in outcomes:
score = o.get("effectiveness_score")
if score and 1 <= score <= 5:
effectiveness_dist[score] += 1
scores.append(score)
avg_effectiveness = sum(scores) / len(scores) if scores else None
return AIPerformance(
total_proposals=total,
executed_count=executed,
execution_rate=round((executed / total * 100) if total > 0 else 0, 2),
success_count=success,
success_rate=round((success / executed * 100) if executed > 0 else 0, 2),
avg_effectiveness=round(avg_effectiveness, 2) if avg_effectiveness else None,
effectiveness_distribution=effectiveness_dist,
)
@router.get(
"/services/affected",
response_model=list[ServiceImpact],
summary="受影響服務排名",
)
async def get_affected_services(
days: int = Query(30, ge=1, le=365, description="統計區間 (天)"),
limit: int = Query(10, ge=1, le=50, description="返回數量"),
db: AsyncSession = Depends(get_db), # noqa: B008
) -> list[ServiceImpact]:
"""
取得最常受影響的服務排名
包含:
- 事件計數
- 嚴重度分佈
"""
since = datetime.utcnow() - timedelta(days=days)
# 取得所有事件的 affected_services 和 severity
result = await db.execute(
select(
IncidentRecord.affected_services,
IncidentRecord.severity,
).where(IncidentRecord.created_at >= since)
)
# 統計每個服務
service_stats: dict[str, dict[str, Any]] = {}
for row in result.all():
services = row[0] or []
severity = str(row[1])
for svc in services:
if svc not in service_stats:
service_stats[svc] = {"count": 0, "severity": {}}
service_stats[svc]["count"] += 1
service_stats[svc]["severity"][severity] = (
service_stats[svc]["severity"].get(severity, 0) + 1
)
# 排序並返回 top N
sorted_services = sorted(
service_stats.items(), key=lambda x: x[1]["count"], reverse=True
)[:limit]
return [
ServiceImpact(
service=svc,
incident_count=stats["count"],
severity_breakdown=stats["severity"],
)
for svc, stats in sorted_services
]

View File

@@ -33,6 +33,7 @@ from src.api.v1 import health as health_v1
from src.api.v1 import incidents as incidents_v1 # Phase 6.4: Decision Proposal
from src.api.v1 import metrics as metrics_v1 # Phase 7: Gold Metrics (真實血脈)
from src.api.v1 import proposals as proposals_v1 # Phase 6.4h: Proposals CRUD API
from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics
from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway
from src.api.v1 import timeline as timeline_v1
from src.api.v1 import webhooks as webhooks_v1
@@ -88,7 +89,9 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
# CTO-201: Initialize PostgreSQL database (統帥鐵律: 禁止 SQLite)
await init_db()
db_url = settings.DATABASE_URL
logger.info("database_initialized", url=db_url.split("@")[-1] if "@" in db_url else db_url)
logger.info(
"database_initialized", url=db_url.split("@")[-1] if "@" in db_url else db_url
)
# Phase 5: Initialize HTTP Clients (ClickHouse, Ollama)
# 統帥鐵律: 連線池在啟動時建立,關閉時回收
@@ -242,6 +245,7 @@ async def request_logging_middleware(request: Request, call_next):
# Exception Handlers
# =============================================================================
@app.exception_handler(Exception)
async def global_exception_handler(_request: Request, exc: Exception) -> JSONResponse:
"""
@@ -277,24 +281,42 @@ app.include_router(ai_v1.router, prefix="/api/v1", tags=["AI Decision"])
app.include_router(webhooks_v1.router, prefix="/api/v1", tags=["Webhooks"])
app.include_router(timeline_v1.router, prefix="/api/v1", tags=["Timeline"])
app.include_router(audit_logs_v1.router, prefix="/api/v1", tags=["Audit Logs"])
app.include_router(telegram_v1.router, prefix="/api/v1", tags=["Telegram Gateway"]) # Phase 5.4
app.include_router(metrics_v1.router, prefix="/api/v1", tags=["Gold Metrics"]) # Phase 7: 真實血脈
app.include_router(incidents_v1.router, prefix="/api/v1", tags=["Incidents"]) # Phase 6.4: Decision Proposal
app.include_router(proposals_v1.router, prefix="/api/v1", tags=["Proposals"]) # Phase 6.4h: Proposals CRUD
app.include_router(agents_v1.router, prefix="/api/v1", tags=["Agent Teams"]) # Phase 9.5: Agent Teams
app.include_router(proposals_router.router, tags=["Proposals (Legacy)"]) # Phase 6.4g: lewooogo-brain (舊版)
app.include_router(
telegram_v1.router, prefix="/api/v1", tags=["Telegram Gateway"]
) # Phase 5.4
app.include_router(
metrics_v1.router, prefix="/api/v1", tags=["Gold Metrics"]
) # Phase 7: 真實血脈
app.include_router(
incidents_v1.router, prefix="/api/v1", tags=["Incidents"]
) # Phase 6.4: Decision Proposal
app.include_router(
proposals_v1.router, prefix="/api/v1", tags=["Proposals"]
) # Phase 6.4h: Proposals CRUD
app.include_router(
agents_v1.router, prefix="/api/v1", tags=["Agent Teams"]
) # Phase 9.5: Agent Teams
app.include_router(
stats_v1.router, prefix="/api/v1", tags=["Statistics"]
) # Phase 6.5: Statistics Analytics
app.include_router(
proposals_router.router, tags=["Proposals (Legacy)"]
) # Phase 6.4g: lewooogo-brain (舊版)
# Legacy routes (to be migrated to api/v1/)
app.include_router(plugins.router, prefix="/api/v1/plugins", tags=["Plugins"])
app.include_router(pipelines.router, prefix="/api/v1/pipelines", tags=["Pipelines"])
app.include_router(agent.router, prefix="/api/v1/agent", tags=["Agent"])
app.include_router(notifications.router, prefix="/api/v1/notifications", tags=["Notifications"])
app.include_router(
notifications.router, prefix="/api/v1/notifications", tags=["Notifications"]
)
# =============================================================================
# Root Endpoint
# =============================================================================
@app.get("/", include_in_schema=False)
async def root() -> dict:
"""Root endpoint with API info"""

View File

@@ -300,12 +300,23 @@ class DecisionManager:
# 1. 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token and existing_token.state in (
DecisionState.READY,
DecisionState.EXECUTING,
DecisionState.COMPLETED,
):
return existing_token
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 只有 incident 也已解決才返回,否則創建新 decision
# 修復: 避免 incident 未解決但 decision 已完成導致 Y/n 按鈕永久禁用
if existing_token.state == DecisionState.COMPLETED:
from src.models.incident import IncidentStatus
if incident.status in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED):
return existing_token
# incident 仍在處理中,需要新的 decision
logger.info(
"decision_reset_for_active_incident",
token=existing_token.token,
incident_id=incident.incident_id,
incident_status=incident.status.value,
)
# 2. 建立新 token
token = DecisionToken(
@@ -539,12 +550,21 @@ class DecisionManager:
# 檢查現有 token
existing_token = await self._find_existing_token(incident.incident_id)
if existing_token and existing_token.state in (
DecisionState.READY,
DecisionState.EXECUTING,
DecisionState.COMPLETED,
):
return existing_token
if existing_token:
# READY 或 EXECUTING 狀態: 直接返回
if existing_token.state in (DecisionState.READY, DecisionState.EXECUTING):
return existing_token
# COMPLETED 狀態: 只有 incident 也已解決才返回
if existing_token.state == DecisionState.COMPLETED:
from src.models.incident import IncidentStatus
if incident.status in (IncidentStatus.RESOLVED, IncidentStatus.CLOSED):
return existing_token
logger.info(
"decision_reset_for_active_incident_consensus",
token=existing_token.token,
incident_id=incident.incident_id,
incident_status=incident.status.value,
)
# 建立新 token
token = DecisionToken(