""" AWOOOI API - BFF Gateway ======================== ADR-005: BFF Architecture ADR-006: AI Fallback Strategy Four Iron Laws: 1. Async-First - All handlers are async def 2. CORS Whitelist - Strict origin control (NO wildcards) 3. Pydantic Config - Type-safe settings with validation 4. structlog - Structured JSON logging Observability Stack: - OpenTelemetry → SignOz (Traces + Logs + Metrics) - Sentry SDK → Sentry Self-Hosted (Error Tracking + Stack Traces) Version: 1.0.0 Date: 2026-03-20 """ import os from collections.abc import AsyncGenerator from contextlib import asynccontextmanager import sentry_sdk import structlog from fastapi import FastAPI, Request from sentry_sdk.integrations.fastapi import FastApiIntegration from sentry_sdk.integrations.starlette import StarletteIntegration from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from src.api.v1 import agents as agents_v1 # Phase 9.5: Agent Teams API from src.api.v1 import ai as ai_v1 from src.api.v1 import approvals as approvals_v1 from src.api.v1 import audit_logs as audit_logs_v1 from src.api.v1 import dashboard as dashboard_v1 # Import API routers from src.api.v1 import health as health_v1 from src.api.v1 import incidents as incidents_v1 # Phase 6.4: Decision Proposal from src.api.v1 import metrics as metrics_v1 # Phase 7: Gold Metrics (真實血脈) from src.api.v1 import proposals as proposals_v1 # Phase 6.4h: Proposals CRUD API from src.api.v1 import stats as stats_v1 # Phase 6.5: Statistics Analytics from src.api.v1 import telegram as telegram_v1 # Phase 5.4: Telegram Gateway from src.api.v1 import timeline as timeline_v1 from src.api.v1 import webhooks as webhooks_v1 from src.core.config import settings from src.core.http_client import close_all_http_clients, init_all_http_clients from src.core.logging import get_logger, setup_logging from src.core.redis_client import close_redis_pool, init_redis_pool from src.core.sse import get_publisher from src.core.telemetry import setup_telemetry, shutdown_telemetry # CTO-201: Database & Executor from src.db.base import close_db, init_db # Phase 6.4g: lewooogo-brain 積木路由 from src.routers import proposals as proposals_router # Legacy route imports (to be migrated) from src.routes import agent, notifications, pipelines, plugins from src.services.executor import close_executor # Phase 5: OpenClaw AI Engine from src.services.openclaw import close_openclaw from src.services.telegram_gateway import get_telegram_gateway # Phase 6.1: Event Bus (Signal Worker) from src.workers import close_signal_worker, init_signal_worker # ============================================================================= # Initialize Logging (MUST be first) # ============================================================================= setup_logging() logger = get_logger("awoooi.api") # ============================================================================= # Sentry SDK Initialization (Error Tracking - 補強 SignOz) # Self-Hosted @ 192.168.0.110 # 分工: Sentry 專注 Error Tracking,SignOz 專注 Traces/Logs/Metrics # ============================================================================= SENTRY_DSN = os.getenv("SENTRY_DSN") if SENTRY_DSN: sentry_sdk.init( dsn=SENTRY_DSN, environment=settings.ENVIRONMENT, release=f"awoooi-api@{settings.VERSION}", # 效能監控取樣率 (生產環境降低) traces_sample_rate=0.1 if settings.ENVIRONMENT == "production" else 1.0, # FastAPI 深度整合 integrations=[ FastApiIntegration(transaction_style="endpoint"), StarletteIntegration(transaction_style="endpoint"), ], # 忽略常見的非錯誤 ignore_errors=[ ConnectionRefusedError, TimeoutError, ], # 只在生產環境發送 send_default_pii=False, ) logger.info("sentry_initialized", dsn=SENTRY_DSN.split("@")[-1]) else: logger.info("sentry_disabled", reason="SENTRY_DSN not configured") # ============================================================================= # Application Lifespan # ============================================================================= @asynccontextmanager async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: """Application lifespan events""" # Startup logger.info( "api_startup", version=settings.VERSION, environment=settings.ENVIRONMENT, mock_mode=settings.MOCK_MODE, cors_origins=settings.CORS_ORIGINS, ai_fallback_order=settings.AI_FALLBACK_ORDER, four_hosts=settings.four_hosts, kubeconfig=settings.KUBECONFIG_PATH, ) # CTO-201: Initialize PostgreSQL database (統帥鐵律: 禁止 SQLite) await init_db() db_url = settings.DATABASE_URL logger.info( "database_initialized", url=db_url.split("@")[-1] if "@" in db_url else db_url ) # Phase 5: Initialize HTTP Clients (ClickHouse, Ollama) # 統帥鐵律: 連線池在啟動時建立,關閉時回收 await init_all_http_clients() logger.info("http_clients_initialized") # Phase 6.1.1: Initialize Redis Pool (Multi-Sig 狀態持久化) # 統帥鐵律: Redis 連線池在 Lifespan 啟動時建立 await init_redis_pool() logger.info("redis_pool_initialized", url=settings.REDIS_URL.split("@")[-1]) # Start SSE publisher publisher = await get_publisher() logger.info("sse_publisher_initialized") # Phase 5: Telegram Gateway 初始化 # 2026-03-23 架構修正: AWOOOI API 不做 Long Polling # 原因: 同一個 Bot Token 只能有一個 Long Polling 實例 # OpenClaw (192.168.0.188) 是唯一的 Polling 實例 # AWOOOI API 只負責發送訊息,不接收 telegram_gw = get_telegram_gateway() if settings.TELEGRAM_ENABLE_POLLING: await telegram_gw.start_long_polling() logger.info("telegram_long_polling_started") else: logger.info("telegram_polling_disabled", reason="OpenClaw 是唯一 Polling 實例") # Phase 6.5: Telegram 心跳監控 (防止沉默盲點) # - 每 30 分鐘發送心跳,證明告警鏈路正常 # - 超過 2 小時沒訊息則告警 if settings.OPENCLAW_TG_BOT_TOKEN: await telegram_gw.start_heartbeat_monitor( heartbeat_interval_minutes=30, silence_threshold_hours=2, ) logger.info("telegram_heartbeat_monitor_started") # Phase 6.1: 啟動 Signal Worker (Redis Streams Consumer) # 統帥鐵律: Event Bus 解耦告警接收與處理 await init_signal_worker() logger.info("signal_worker_initialized") yield # Shutdown # Phase 6.1: 關閉 Signal Worker (先關閉 Consumer) await close_signal_worker() await publisher.stop() await close_executor() await close_openclaw() # Phase 5.4: Close Telegram Gateway telegram_gw = get_telegram_gateway() await telegram_gw.close() # Phase 5: Close HTTP Clients (統帥鐵律: 連線池回收) await close_all_http_clients() # Phase 6.1.1: Close Redis Pool (統帥鐵律: Redis 連線池回收) await close_redis_pool() await close_db() shutdown_telemetry() logger.info("api_shutdown", version=settings.VERSION) # ============================================================================= # FastAPI Application # ============================================================================= app = FastAPI( title="AWOOOI API", description="AWOOOI 智能運維平台 API - 由 leWOOOgo Engine 驅動", version=settings.VERSION, docs_url="/api/v1/docs", redoc_url="/api/v1/redoc", openapi_url="/api/v1/openapi.json", lifespan=lifespan, ) # ============================================================================= # OpenTelemetry Instrumentation (可觀測性鐵律) # 必須在 Middleware 之前初始化,確保追蹤完整性 # 優雅降級: 失敗不影響 API 啟動 # ============================================================================= otel_enabled = setup_telemetry(app) if otel_enabled: logger.info( "otel_initialized", service=settings.OTEL_SERVICE_NAME, endpoint=settings.OTEL_EXPORTER_OTLP_ENDPOINT, ) else: logger.warning("otel_disabled", reason="initialization failed or disabled") # ============================================================================= # Middleware # ============================================================================= # CORS - Strict Whitelist (Iron Law #2) # NO wildcards, NO UAT app.add_middleware( CORSMiddleware, allow_origins=settings.CORS_ORIGINS, allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"], allow_headers=["Authorization", "Content-Type", "X-Request-ID"], expose_headers=["X-Request-ID"], ) @app.middleware("http") async def request_logging_middleware(request: Request, call_next): """ Structured request logging middleware Logs every request with: - Request ID (from header or generated) - HTTP method and path - Response status code - Request duration """ import time request_id = request.headers.get("X-Request-ID", "-") start_time = time.perf_counter() # Bind request context for all logs in this request structlog.contextvars.clear_contextvars() structlog.contextvars.bind_contextvars( request_id=request_id, method=request.method, path=request.url.path, ) log = get_logger("awoooi.http") log.debug("request_start") response = await call_next(request) duration_ms = (time.perf_counter() - start_time) * 1000 log.info( "request_complete", status_code=response.status_code, duration_ms=round(duration_ms, 2), ) # Add request ID to response headers response.headers["X-Request-ID"] = request_id return response # ============================================================================= # Exception Handlers # ============================================================================= @app.exception_handler(Exception) async def global_exception_handler(_request: Request, exc: Exception) -> JSONResponse: """ Global exception handler with structured logging + Sentry Catches all unhandled exceptions and returns a safe error response. Full exception details are logged but not exposed to clients. Sentry SDK 會自動捕獲並發送到 Self-Hosted Server。 """ # Sentry 自動捕獲 (如果已初始化) sentry_sdk.capture_exception(exc) log = get_logger("awoooi.error") log.exception( "unhandled_exception", exc_type=type(exc).__name__, exc_message=str(exc), ) return JSONResponse( status_code=500, content={ "code": "INTERNAL_ERROR", "message": "An internal error occurred", }, ) # ============================================================================= # API Routers - Path-based routing (/api/v1/*) # ============================================================================= # New v1 API routes app.include_router(health_v1.router, prefix="/api/v1", tags=["Health"]) app.include_router(dashboard_v1.router, prefix="/api/v1", tags=["Dashboard"]) app.include_router(approvals_v1.router, prefix="/api/v1", tags=["HITL Approvals"]) app.include_router(ai_v1.router, prefix="/api/v1", tags=["AI Decision"]) app.include_router(webhooks_v1.router, prefix="/api/v1", tags=["Webhooks"]) app.include_router(timeline_v1.router, prefix="/api/v1", tags=["Timeline"]) app.include_router(audit_logs_v1.router, prefix="/api/v1", tags=["Audit Logs"]) app.include_router( telegram_v1.router, prefix="/api/v1", tags=["Telegram Gateway"] ) # Phase 5.4 app.include_router( metrics_v1.router, prefix="/api/v1", tags=["Gold Metrics"] ) # Phase 7: 真實血脈 app.include_router( incidents_v1.router, prefix="/api/v1", tags=["Incidents"] ) # Phase 6.4: Decision Proposal app.include_router( proposals_v1.router, prefix="/api/v1", tags=["Proposals"] ) # Phase 6.4h: Proposals CRUD app.include_router( agents_v1.router, prefix="/api/v1", tags=["Agent Teams"] ) # Phase 9.5: Agent Teams app.include_router( stats_v1.router, prefix="/api/v1", tags=["Statistics"] ) # Phase 6.5: Statistics Analytics app.include_router( proposals_router.router, tags=["Proposals (Legacy)"] ) # Phase 6.4g: lewooogo-brain (舊版) # Legacy routes (to be migrated to api/v1/) app.include_router(plugins.router, prefix="/api/v1/plugins", tags=["Plugins"]) app.include_router(pipelines.router, prefix="/api/v1/pipelines", tags=["Pipelines"]) app.include_router(agent.router, prefix="/api/v1/agent", tags=["Agent"]) app.include_router( notifications.router, prefix="/api/v1/notifications", tags=["Notifications"] ) # ============================================================================= # Root Endpoint # ============================================================================= @app.get("/", include_in_schema=False) async def root() -> dict: """Root endpoint with API info""" return { "name": "AWOOOI API", "version": settings.VERSION, "environment": settings.ENVIRONMENT, "docs": "/api/v1/docs", "health": "/api/v1/health", "dashboard": "/api/v1/dashboard", "stream": "/api/v1/dashboard/stream", } # ============================================================================= # Entry Point # ============================================================================= if __name__ == "__main__": import uvicorn uvicorn.run( "src.main:app", host="0.0.0.0", port=8000, reload=settings.DEBUG, log_level=settings.LOG_LEVEL.lower(), )