diff --git a/apps/api/src/api/v1/approvals.py b/apps/api/src/api/v1/approvals.py index c410cb8f..603a8d55 100644 --- a/apps/api/src/api/v1/approvals.py +++ b/apps/api/src/api/v1/approvals.py @@ -25,13 +25,23 @@ import re from typing import TYPE_CHECKING from uuid import UUID -from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException, status - if TYPE_CHECKING: from src.services.notifications import ExecutionStatus +from fastapi import ( + APIRouter, + BackgroundTasks, + Depends, + Header, + HTTPException, + Request, + status, +) +from fastapi.responses import StreamingResponse + from src.core.config import settings from src.core.logging import get_logger +from src.core.sse import EventType, SSEEvent, get_publisher from src.models.approval import ( ApprovalRequest, ApprovalRequestCreate, @@ -521,6 +531,9 @@ async def create_approval( required_signatures=approval.required_signatures, ) + # SSE: 發布建立事件 + asyncio.create_task(_publish_approval_event("created", approval)) + return ApprovalRequestResponse.from_approval(approval) @@ -667,6 +680,10 @@ async def sign_approval( note="Approval has no incident_id in metadata, cannot update Incident status", ) + # SSE: 發布簽核/核准事件 + event_action = "approved" if execution_triggered else "signed" + asyncio.create_task(_publish_approval_event(event_action, approval)) + return SignResponse( success=True, message=message, @@ -752,4 +769,129 @@ async def reject_approval( reason=request.reason, ) + # SSE: 發布拒絕事件 + asyncio.create_task(_publish_approval_event("rejected", approval)) + return ApprovalRequestResponse.from_approval(approval) + + +# ============================================================================= +# SSE Event Publishing (Phase 15: Polling → SSE) +# ============================================================================= + +async def _publish_approval_event( + action: str, + approval: ApprovalRequest, +) -> None: + """ + 發布 Approval SSE 事件 + + Args: + action: 事件動作 (created, signed, approved, rejected, executed) + approval: 授權請求物件 + """ + try: + pub = await get_publisher() + + event = SSEEvent( + type=EventType.APPROVAL, + data={ + "action": action, + "approval_id": str(approval.id), + "status": approval.status.value, + "risk_level": approval.risk_level.value, + "current_signatures": approval.current_signatures, + "required_signatures": approval.required_signatures, + "action_text": approval.action[:100], + }, + ) + + sent_count = await pub.publish(event, topic="approvals") + + if sent_count > 0: + logger.debug( + "approval_sse_published", + action=action, + approval_id=str(approval.id), + sent_count=sent_count, + ) + + except Exception as e: + logger.error( + "approval_sse_publish_failed", + action=action, + approval_id=str(approval.id), + error=str(e), + ) + + +# ============================================================================= +# GET /api/v1/approvals/stream (SSE) +# ============================================================================= + +@router.get( + "/stream", + summary="SSE 即時授權更新", + description="Server-Sent Events 即時推送授權狀態變更,取代輪詢機制", +) +async def stream_approvals(request: Request) -> StreamingResponse: + """ + SSE 即時授權更新 (Phase 15: Polling → SSE) + + 事件類型: + - connected: 連線建立 + - heartbeat: 心跳 (每 15 秒) + - approval: 授權狀態變更 (created, signed, approved, rejected, executed) + + Client Usage (JavaScript): + ```javascript + const es = new EventSource('/api/v1/approvals/stream'); + es.addEventListener('approval', (e) => { + const data = JSON.parse(e.data); + console.log('Approval update:', data.action, data.approval_id); + // Refresh approval list or update specific item + }); + es.addEventListener('heartbeat', () => { + console.log('Connection alive'); + }); + ``` + """ + logger.info( + "approval_stream_connect", + client_ip=request.client.host if request.client else "unknown", + ) + + pub = await get_publisher() + + # 訂閱 approvals topic + client = await pub.subscribe( + topics=["approvals"], + metadata={"ip": request.client.host if request.client else "unknown"}, + ) + + async def event_generator(): + """SSE 事件生成器,含斷線偵測""" + try: + async for data in pub.stream(client): + if await request.is_disconnected(): + logger.info("approval_stream_disconnected", client_id=client.id) + break + yield data + + except asyncio.CancelledError: + logger.info("approval_stream_cancelled", client_id=client.id) + raise + + finally: + logger.info("approval_stream_cleanup", client_id=client.id) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache, no-store, must-revalidate", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + "Access-Control-Allow-Origin": "*", + }, + ) diff --git a/apps/web/src/components/ai/ai-command-panel.tsx b/apps/web/src/components/ai/ai-command-panel.tsx index 0f3682d8..5a204644 100644 --- a/apps/web/src/components/ai/ai-command-panel.tsx +++ b/apps/web/src/components/ai/ai-command-panel.tsx @@ -8,13 +8,12 @@ * Features: * - OpenClaw AI 視覺化 (頂部) * - 待授權卡片列表 (底部) - * - 即時輪詢後端待簽核 + * - **Phase 15: SSE 即時更新** (取代輪詢) * - Nothing.tech 純白極簡風格 * * i18n: 100% next-intl */ -import { useEffect } from 'react' import { useTranslations } from 'next-intl' import { cn } from '@/lib/utils' import { OpenClawPanel, type OpenClawStatus } from './openclaw-panel' @@ -24,6 +23,7 @@ import { usePendingApprovals, toFrontendApproval, } from '@/stores/approval.store' +import { useApprovalSSE } from '@/hooks/useApprovalSSE' import { ShieldCheck, Bell } from 'lucide-react' // ============================================================================= @@ -43,14 +43,11 @@ export function AICommandPanel({ className }: AICommandPanelProps) { const tApproval = useTranslations('approval') // Store - const { fetchPending, signApproval, rejectApproval, startPolling, stopPolling } = useApprovalStore() + const { fetchPending, signApproval, rejectApproval } = useApprovalStore() const pendingApprovals = usePendingApprovals() - // Start polling on mount - useEffect(() => { - startPolling(5000) // Poll every 5 seconds - return () => stopPolling() - }, [startPolling, stopPolling]) + // Phase 15: SSE 即時更新 (取代輪詢) + useApprovalSSE({ autoConnect: true }) // Handle approval const handleApprove = async (id: string) => { diff --git a/apps/web/src/components/ai/hitl-section.tsx b/apps/web/src/components/ai/hitl-section.tsx index b7a261c7..c11ac56bb 100644 --- a/apps/web/src/components/ai/hitl-section.tsx +++ b/apps/web/src/components/ai/hitl-section.tsx @@ -9,10 +9,10 @@ * - OpenClaw AI 思考流視覺化 * - 動態 Approval Card 渲染 (100% 後端資料) * - Slide Up 動畫過渡 - * - 即時輪詢後端待簽核列表 + * - **Phase 15: SSE 即時更新** (取代輪詢) */ -import { useState, useEffect, useCallback } from 'react' +import { useState, useCallback } from 'react' import { useTranslations } from 'next-intl' import { cn } from '@/lib/utils' import { OpenClawPanel, type OpenClawStatus } from './openclaw-panel' @@ -22,6 +22,7 @@ import { usePendingApprovals, toFrontendApproval, } from '@/stores/approval.store' +import { useApprovalSSE } from '@/hooks/useApprovalSSE' import { useTimelineStore } from '@/stores/timeline.store' import { ActionTimeline } from '@/components/timeline' import { GlassCard, GlassCardTitle, GlassCardContent, GlassCardHeader } from '@/components/ui/glass-card' @@ -84,10 +85,13 @@ export function HITLSection({ locale, className }: HITLSectionProps) { const tApproval = useTranslations('approval') // Store - const { fetchPending, signApproval, rejectApproval, startPolling, stopPolling } = useApprovalStore() + const { fetchPending, signApproval, rejectApproval } = useApprovalStore() const pendingApprovals = usePendingApprovals() const addTimelineEvent = useTimelineStore((state) => state.addEvent) + // Phase 15: SSE 即時更新 (取代輪詢) + useApprovalSSE({ autoConnect: true }) + // AI thinking state const [openclawStatus, setClawbotStatus] = useState('patrolling') const [currentAlertType, setCurrentAlertType] = useState() @@ -105,12 +109,6 @@ export function HITLSection({ locale, className }: HITLSectionProps) { requiredRoles: string } | null>(null) - // Start polling on mount - useEffect(() => { - startPolling(5000) // Poll every 5 seconds - return () => stopPolling() - }, [startPolling, stopPolling]) - // Simulate webhook alert (triggers AI thinking flow) const simulateAlert = useCallback(async (alertType: string, severity: 'critical' | 'warning') => { setIsSimulating(true) diff --git a/apps/web/src/components/approval/live-approval-panel.tsx b/apps/web/src/components/approval/live-approval-panel.tsx index b38aec2c..de4f7031 100644 --- a/apps/web/src/components/approval/live-approval-panel.tsx +++ b/apps/web/src/components/approval/live-approval-panel.tsx @@ -6,7 +6,7 @@ * 整合後端 API 的 Multi-Sig 授權流程 * * Features: - * - 輪詢 GET /api/v1/approvals/pending + * - **Phase 15: SSE 即時更新** (取代輪詢) * - 真實 API 簽核與拒絕 * - Multi-Sig 狀態即時更新 * - 簽核成功動畫 @@ -15,9 +15,10 @@ * - DevOps 角色長按時顯示 Access Denied */ -import { useEffect, useState, useCallback } from 'react' +import { useState, useCallback } from 'react' import { useTranslations } from 'next-intl' import { useApprovalStore, usePendingApprovals, toFrontendApproval } from '@/stores/approval.store' +import { useApprovalSSE } from '@/hooks/useApprovalSSE' import { ApprovalCard, type ApprovalRequest, type RiskLevel } from './approval-card' import { GlassCard, @@ -101,9 +102,12 @@ export function LiveApprovalPanel({ const t = useTranslations('approval') const tCommon = useTranslations('common') - const { startPolling, stopPolling, signApproval, rejectApproval, error } = useApprovalStore() + const { signApproval, rejectApproval, error } = useApprovalStore() const pendingApprovals = usePendingApprovals() + // Phase 15: SSE 即時更新 (取代 Polling) + const { isConnected, status: sseStatus } = useApprovalSSE({ autoConnect: true }) + // 模擬當前登入者 (Phase 3 權限擋板) const currentUser: CurrentUser = { id: signerId, @@ -123,12 +127,6 @@ export function LiveApprovalPanel({ requiredRoles: string } | null>(null) - // Start polling on mount - useEffect(() => { - startPolling(5000) - return () => stopPolling() - }, [startPolling, stopPolling]) - // Handle sign with permission check (Phase 3 權限擋板) const handleSign = useCallback(async (id: string, riskLevel: RiskLevel) => { // Phase 3: 權限檢查 diff --git a/apps/web/src/hooks/index.ts b/apps/web/src/hooks/index.ts index 53e01145..cf2b2660 100644 --- a/apps/web/src/hooks/index.ts +++ b/apps/web/src/hooks/index.ts @@ -1,5 +1,6 @@ export * from './use-agent' export * from './use-health' export * from './useSSE' +export * from './useApprovalSSE' export * from './useIncidents' export * from './useGlobalPulseMetrics' diff --git a/apps/web/src/hooks/useApprovalSSE.ts b/apps/web/src/hooks/useApprovalSSE.ts new file mode 100644 index 00000000..f9c5cc1b --- /dev/null +++ b/apps/web/src/hooks/useApprovalSSE.ts @@ -0,0 +1,86 @@ +'use client' + +/** + * useApprovalSSE - Approval SSE 連線 Hook + * ======================================== + * Phase 15: Polling → SSE + * + * 自動管理 SSE 連線生命週期: + * - 組件掛載時自動連線 + * - 組件卸載時自動斷線 + * - 支援 SSE 失敗後降級到 Polling + */ + +import { useEffect, useRef, useCallback } from 'react' +import { useApprovalStore } from '@/stores/approval.store' + +interface UseApprovalSSEOptions { + /** 自動連線 (預設 true) */ + autoConnect?: boolean + /** SSE 失敗後是否降級到 Polling (預設 true) */ + fallbackToPolling?: boolean +} + +interface UseApprovalSSEReturn { + /** SSE 連線狀態 */ + status: 'disconnected' | 'connecting' | 'connected' | 'reconnecting' | 'error' + /** 是否已連線 */ + isConnected: boolean + /** 是否重連中 */ + isReconnecting: boolean + /** 錯誤訊息 */ + error: string | null + /** 手動連線 */ + connect: () => void + /** 手動斷線 */ + disconnect: () => void +} + +export function useApprovalSSE(options: UseApprovalSSEOptions = {}): UseApprovalSSEReturn { + const { autoConnect = true, fallbackToPolling = true } = options + + const connectSSE = useApprovalStore((s) => s.connectSSE) + const disconnectSSE = useApprovalStore((s) => s.disconnectSSE) + const stopPolling = useApprovalStore((s) => s.stopPolling) + const sseStatus = useApprovalStore((s) => s.sseStatus) + const error = useApprovalStore((s) => s.error) + + const mountedRef = useRef(true) + + const handleConnect = useCallback(() => { + if (!mountedRef.current) return + console.log('[useApprovalSSE] Connecting...') + connectSSE() + }, [connectSSE]) + + const handleDisconnect = useCallback(() => { + console.log('[useApprovalSSE] Disconnecting...') + disconnectSSE() + stopPolling() + }, [disconnectSSE, stopPolling]) + + useEffect(() => { + mountedRef.current = true + + if (autoConnect) { + console.log('[useApprovalSSE] Auto-connecting...') + handleConnect() + } + + // Cleanup: 防止殭屍連線 + return () => { + console.log('[useApprovalSSE] Cleanup') + mountedRef.current = false + handleDisconnect() + } + }, [autoConnect, handleConnect, handleDisconnect]) + + return { + status: sseStatus, + isConnected: sseStatus === 'connected', + isReconnecting: sseStatus === 'reconnecting', + error, + connect: handleConnect, + disconnect: handleDisconnect, + } +} diff --git a/apps/web/src/stores/approval.store.ts b/apps/web/src/stores/approval.store.ts index dc7ac125..d98e7da0 100644 --- a/apps/web/src/stores/approval.store.ts +++ b/apps/web/src/stores/approval.store.ts @@ -74,6 +74,8 @@ export interface SignResponse { // Store State // ============================================================================= +type SSEConnectionStatus = 'disconnected' | 'connecting' | 'connected' | 'reconnecting' | 'error' + interface ApprovalState { // Data pendingApprovals: ApprovalRequest[] @@ -89,13 +91,21 @@ interface ApprovalState { recentlyApproved: Set recentlyRejected: Set - // Polling + // SSE Connection (Phase 15: Polling → SSE) + sseStatus: SSEConnectionStatus + sseReconnectAttempts: number + + // Polling (Legacy, kept for fallback) pollingInterval: number | null // Actions fetchPending: () => Promise signApproval: (id: string, signerId: string, signerName: string, comment?: string) => Promise rejectApproval: (id: string, rejectorId: string, rejectorName: string, reason: string) => Promise + // SSE Actions (Phase 15) + connectSSE: () => void + disconnectSSE: () => void + // Polling (Legacy) startPolling: (intervalMs?: number) => void stopPolling: () => void clearRecentlyResolved: (id: string) => void @@ -107,6 +117,9 @@ interface ApprovalState { // ============================================================================= const DEFAULT_POLLING_INTERVAL = 5000 // 5 seconds +const MAX_SSE_RECONNECT_ATTEMPTS = 10 +const BASE_RECONNECT_DELAY = 1000 // 1 second +const MAX_RECONNECT_DELAY = 30000 // 30 seconds // 統帥鐵律: 禁止任何 Fallback IP const getApiBaseUrl = (): string => { @@ -126,6 +139,8 @@ const API_BASE_URL = getApiBaseUrl() // ============================================================================= let pollingTimer: NodeJS.Timeout | null = null +let eventSource: EventSource | null = null +let sseReconnectTimeout: NodeJS.Timeout | null = null export const useApprovalStore = create()( subscribeWithSelector((set, get) => ({ @@ -138,6 +153,10 @@ export const useApprovalStore = create()( rejectingId: null, recentlyApproved: new Set(), recentlyRejected: new Set(), + // SSE state (Phase 15) + sseStatus: 'disconnected', + sseReconnectAttempts: 0, + // Polling (Legacy) pollingInterval: null, // ========================================================================== @@ -324,7 +343,122 @@ export const useApprovalStore = create()( }, // ========================================================================== - // Polling + // SSE Connection (Phase 15: Polling → SSE) + // ========================================================================== + connectSSE: () => { + const state = get() + + // Already connected or connecting + if (eventSource && state.sseStatus === 'connected') { + console.log('[Approval SSE] Already connected') + return + } + + // Clean up existing connection + if (eventSource) { + eventSource.close() + eventSource = null + } + + if (!API_BASE_URL) { + console.error('[Approval SSE] Missing API URL') + set({ sseStatus: 'error', error: 'Missing API URL' }) + return + } + + set({ sseStatus: 'connecting', error: null }) + console.log('[Approval SSE] Connecting to', `${API_BASE_URL}/api/v1/approvals/stream`) + + // Initial fetch + state.fetchPending() + + // Create EventSource + eventSource = new EventSource(`${API_BASE_URL}/api/v1/approvals/stream`) + + eventSource.onopen = () => { + console.log('[Approval SSE] Connected') + set({ + sseStatus: 'connected', + sseReconnectAttempts: 0, + error: null, + }) + } + + // Handle approval events + eventSource.addEventListener('approval', (e: MessageEvent) => { + try { + const data = JSON.parse(e.data) + console.log('[Approval SSE] Event:', data.action, data.approval_id) + + // Re-fetch pending approvals on any change + get().fetchPending() + + } catch (err) { + console.error('[Approval SSE] Failed to parse event:', err) + } + }) + + eventSource.addEventListener('heartbeat', () => { + console.log('[Approval SSE] Heartbeat') + }) + + eventSource.addEventListener('connected', () => { + console.log('[Approval SSE] Server confirmed connection') + }) + + // Error handling with exponential backoff + eventSource.onerror = () => { + const attempts = get().sseReconnectAttempts + + if (attempts >= MAX_SSE_RECONNECT_ATTEMPTS) { + console.error('[Approval SSE] Max reconnect attempts reached, falling back to polling') + set({ sseStatus: 'error', error: 'SSE connection failed' }) + eventSource?.close() + eventSource = null + // Fallback to polling + get().startPolling() + return + } + + set({ + sseStatus: 'reconnecting', + sseReconnectAttempts: attempts + 1, + }) + + const delay = Math.min( + BASE_RECONNECT_DELAY * Math.pow(2, attempts), + MAX_RECONNECT_DELAY + ) + + console.log(`[Approval SSE] Reconnecting in ${delay}ms (attempt ${attempts + 1})`) + + if (sseReconnectTimeout) clearTimeout(sseReconnectTimeout) + sseReconnectTimeout = setTimeout(() => { + eventSource?.close() + eventSource = null + get().connectSSE() + }, delay) + } + }, + + disconnectSSE: () => { + console.log('[Approval SSE] Disconnecting') + + if (eventSource) { + eventSource.close() + eventSource = null + } + + if (sseReconnectTimeout) { + clearTimeout(sseReconnectTimeout) + sseReconnectTimeout = null + } + + set({ sseStatus: 'disconnected', sseReconnectAttempts: 0 }) + }, + + // ========================================================================== + // Polling (Legacy Fallback) // ========================================================================== startPolling: (intervalMs = DEFAULT_POLLING_INTERVAL) => { const state = get() @@ -396,6 +530,13 @@ export const useIsRejectingApproval = (id: string) => export const useApprovalError = () => useApprovalStore((state) => state.error) +// SSE Status Hooks (Phase 15) +export const useApprovalSSEStatus = () => + useApprovalStore((state) => state.sseStatus) + +export const useApprovalSSEConnected = () => + useApprovalStore((state) => state.sseStatus === 'connected') + // ============================================================================= // Type Converters (Backend → Frontend) // =============================================================================