Files
awoooi/apps/api/src/main.py
OG T 765ee39a90 feat(api): Phase 6.5 Statistics API + Y/n 按鈕修復
新增:
- /stats/incidents/summary - 事件總覽統計
- /stats/incidents/resolution - 解決時間 P50/P95
- /stats/ai-performance - AI 提案效能
- /stats/services/affected - 受影響服務排名

修復:
- Y/n 按鈕永久禁用問題 (decision.state=completed 但 incident 未解決)
- decision_manager.py: 只有當 incident 也已解決才返回已完成的 decision

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-24 09:50:03 +08:00

348 lines
12 KiB
Python

"""
AWOOOI API - BFF Gateway
========================
ADR-005: BFF Architecture
ADR-006: AI Fallback Strategy
Four Iron Laws:
1. Async-First - All handlers are async def
2. CORS Whitelist - Strict origin control (NO wildcards)
3. Pydantic Config - Type-safe settings with validation
4. structlog - Structured JSON logging
Version: 1.0.0
Date: 2026-03-20
"""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
import structlog
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from src.api.v1 import agents as agents_v1 # Phase 9.5: Agent Teams API
from src.api.v1 import ai as ai_v1
from src.api.v1 import approvals as approvals_v1
from src.api.v1 import audit_logs as audit_logs_v1
from src.api.v1 import dashboard as dashboard_v1
# Import API routers
from src.api.v1 import health as health_v1
from src.api.v1 import incidents as incidents_v1 # Phase 6.4: Decision Proposal
from src.api.v1 import metrics as metrics_v1 # Phase 7: Gold Metrics (真實血脈)
from src.api.v1 import proposals as proposals_v1 # Phase 6.4h: Proposals CRUD API
from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics
from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway
from src.api.v1 import timeline as timeline_v1
from src.api.v1 import webhooks as webhooks_v1
from src.core.config import settings
from src.core.http_client import close_all_http_clients, init_all_http_clients
from src.core.logging import get_logger, setup_logging
from src.core.redis_client import close_redis_pool, init_redis_pool
from src.core.sse import get_publisher
from src.core.telemetry import setup_telemetry, shutdown_telemetry
# CTO-201: Database & Executor
from src.db.base import close_db, init_db
# Phase 6.4g: lewooogo-brain 積木路由
from src.routers import proposals as proposals_router
# Legacy route imports (to be migrated)
from src.routes import agent, notifications, pipelines, plugins
from src.services.executor import close_executor
# Phase 5: OpenClaw AI Engine
from src.services.openclaw import close_openclaw
from src.services.telegram_gateway import get_telegram_gateway
# Phase 6.1: Event Bus (Signal Worker)
from src.workers import close_signal_worker, init_signal_worker
# =============================================================================
# Initialize Logging (MUST be first)
# =============================================================================
setup_logging()
logger = get_logger("awoooi.api")
# =============================================================================
# Application Lifespan
# =============================================================================
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
"""Application lifespan events"""
# Startup
logger.info(
"api_startup",
version=settings.VERSION,
environment=settings.ENVIRONMENT,
mock_mode=settings.MOCK_MODE,
cors_origins=settings.CORS_ORIGINS,
ai_fallback_order=settings.AI_FALLBACK_ORDER,
four_hosts=settings.four_hosts,
kubeconfig=settings.KUBECONFIG_PATH,
)
# CTO-201: Initialize PostgreSQL database (統帥鐵律: 禁止 SQLite)
await init_db()
db_url = settings.DATABASE_URL
logger.info(
"database_initialized", url=db_url.split("@")[-1] if "@" in db_url else db_url
)
# Phase 5: Initialize HTTP Clients (ClickHouse, Ollama)
# 統帥鐵律: 連線池在啟動時建立,關閉時回收
await init_all_http_clients()
logger.info("http_clients_initialized")
# Phase 6.1.1: Initialize Redis Pool (Multi-Sig 狀態持久化)
# 統帥鐵律: Redis 連線池在 Lifespan 啟動時建立
await init_redis_pool()
logger.info("redis_pool_initialized", url=settings.REDIS_URL.split("@")[-1])
# Start SSE publisher
publisher = await get_publisher()
logger.info("sse_publisher_initialized")
# Phase 5: Telegram Gateway 初始化
# 2026-03-23 架構修正: AWOOOI API 不做 Long Polling
# 原因: 同一個 Bot Token 只能有一個 Long Polling 實例
# OpenClaw (192.168.0.188) 是唯一的 Polling 實例
# AWOOOI API 只負責發送訊息,不接收
telegram_gw = get_telegram_gateway()
if settings.TELEGRAM_ENABLE_POLLING:
await telegram_gw.start_long_polling()
logger.info("telegram_long_polling_started")
else:
logger.info("telegram_polling_disabled", reason="OpenClaw 是唯一 Polling 實例")
# Phase 6.5: Telegram 心跳監控 (防止沉默盲點)
# - 每 30 分鐘發送心跳,證明告警鏈路正常
# - 超過 2 小時沒訊息則告警
if settings.OPENCLAW_TG_BOT_TOKEN:
await telegram_gw.start_heartbeat_monitor(
heartbeat_interval_minutes=30,
silence_threshold_hours=2,
)
logger.info("telegram_heartbeat_monitor_started")
# Phase 6.1: 啟動 Signal Worker (Redis Streams Consumer)
# 統帥鐵律: Event Bus 解耦告警接收與處理
await init_signal_worker()
logger.info("signal_worker_initialized")
yield
# Shutdown
# Phase 6.1: 關閉 Signal Worker (先關閉 Consumer)
await close_signal_worker()
await publisher.stop()
await close_executor()
await close_openclaw()
# Phase 5.4: Close Telegram Gateway
telegram_gw = get_telegram_gateway()
await telegram_gw.close()
# Phase 5: Close HTTP Clients (統帥鐵律: 連線池回收)
await close_all_http_clients()
# Phase 6.1.1: Close Redis Pool (統帥鐵律: Redis 連線池回收)
await close_redis_pool()
await close_db()
shutdown_telemetry()
logger.info("api_shutdown", version=settings.VERSION)
# =============================================================================
# FastAPI Application
# =============================================================================
app = FastAPI(
title="AWOOOI API",
description="AWOOOI 智能運維平台 API - 由 leWOOOgo Engine 驅動",
version=settings.VERSION,
docs_url="/api/v1/docs",
redoc_url="/api/v1/redoc",
openapi_url="/api/v1/openapi.json",
lifespan=lifespan,
)
# =============================================================================
# OpenTelemetry Instrumentation (可觀測性鐵律)
# 必須在 Middleware 之前初始化,確保追蹤完整性
# 優雅降級: 失敗不影響 API 啟動
# =============================================================================
otel_enabled = setup_telemetry(app)
if otel_enabled:
logger.info(
"otel_initialized",
service=settings.OTEL_SERVICE_NAME,
endpoint=settings.OTEL_EXPORTER_OTLP_ENDPOINT,
)
else:
logger.warning("otel_disabled", reason="initialization failed or disabled")
# =============================================================================
# Middleware
# =============================================================================
# CORS - Strict Whitelist (Iron Law #2)
# NO wildcards, NO UAT
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["Authorization", "Content-Type", "X-Request-ID"],
expose_headers=["X-Request-ID"],
)
@app.middleware("http")
async def request_logging_middleware(request: Request, call_next):
"""
Structured request logging middleware
Logs every request with:
- Request ID (from header or generated)
- HTTP method and path
- Response status code
- Request duration
"""
import time
request_id = request.headers.get("X-Request-ID", "-")
start_time = time.perf_counter()
# Bind request context for all logs in this request
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request_id,
method=request.method,
path=request.url.path,
)
log = get_logger("awoooi.http")
log.debug("request_start")
response = await call_next(request)
duration_ms = (time.perf_counter() - start_time) * 1000
log.info(
"request_complete",
status_code=response.status_code,
duration_ms=round(duration_ms, 2),
)
# Add request ID to response headers
response.headers["X-Request-ID"] = request_id
return response
# =============================================================================
# Exception Handlers
# =============================================================================
@app.exception_handler(Exception)
async def global_exception_handler(_request: Request, exc: Exception) -> JSONResponse:
"""
Global exception handler with structured logging
Catches all unhandled exceptions and returns a safe error response.
Full exception details are logged but not exposed to clients.
"""
log = get_logger("awoooi.error")
log.exception(
"unhandled_exception",
exc_type=type(exc).__name__,
exc_message=str(exc),
)
return JSONResponse(
status_code=500,
content={
"code": "INTERNAL_ERROR",
"message": "An internal error occurred",
},
)
# =============================================================================
# API Routers - Path-based routing (/api/v1/*)
# =============================================================================
# New v1 API routes
app.include_router(health_v1.router, prefix="/api/v1", tags=["Health"])
app.include_router(dashboard_v1.router, prefix="/api/v1", tags=["Dashboard"])
app.include_router(approvals_v1.router, prefix="/api/v1", tags=["HITL Approvals"])
app.include_router(ai_v1.router, prefix="/api/v1", tags=["AI Decision"])
app.include_router(webhooks_v1.router, prefix="/api/v1", tags=["Webhooks"])
app.include_router(timeline_v1.router, prefix="/api/v1", tags=["Timeline"])
app.include_router(audit_logs_v1.router, prefix="/api/v1", tags=["Audit Logs"])
app.include_router(
telegram_v1.router, prefix="/api/v1", tags=["Telegram Gateway"]
) # Phase 5.4
app.include_router(
metrics_v1.router, prefix="/api/v1", tags=["Gold Metrics"]
) # Phase 7: 真實血脈
app.include_router(
incidents_v1.router, prefix="/api/v1", tags=["Incidents"]
) # Phase 6.4: Decision Proposal
app.include_router(
proposals_v1.router, prefix="/api/v1", tags=["Proposals"]
) # Phase 6.4h: Proposals CRUD
app.include_router(
agents_v1.router, prefix="/api/v1", tags=["Agent Teams"]
) # Phase 9.5: Agent Teams
app.include_router(
stats_v1.router, prefix="/api/v1", tags=["Statistics"]
) # Phase 6.5: Statistics Analytics
app.include_router(
proposals_router.router, tags=["Proposals (Legacy)"]
) # Phase 6.4g: lewooogo-brain (舊版)
# Legacy routes (to be migrated to api/v1/)
app.include_router(plugins.router, prefix="/api/v1/plugins", tags=["Plugins"])
app.include_router(pipelines.router, prefix="/api/v1/pipelines", tags=["Pipelines"])
app.include_router(agent.router, prefix="/api/v1/agent", tags=["Agent"])
app.include_router(
notifications.router, prefix="/api/v1/notifications", tags=["Notifications"]
)
# =============================================================================
# Root Endpoint
# =============================================================================
@app.get("/", include_in_schema=False)
async def root() -> dict:
"""Root endpoint with API info"""
return {
"name": "AWOOOI API",
"version": settings.VERSION,
"environment": settings.ENVIRONMENT,
"docs": "/api/v1/docs",
"health": "/api/v1/health",
"dashboard": "/api/v1/dashboard",
"stream": "/api/v1/dashboard/stream",
}
# =============================================================================
# Entry Point
# =============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"src.main:app",
host="0.0.0.0",
port=8000,
reload=settings.DEBUG,
log_level=settings.LOG_LEVEL.lower(),
)