feat(#15): Approval Polling → SSE 即時更新

Phase 15: 將 Approval 輪詢機制改為 Server-Sent Events

後端變更:
- 新增 /api/v1/approvals/stream SSE 端點
- 建立/簽核/拒絕時發布 SSE 事件
- 使用現有 EventPublisher 基礎設施

前端變更:
- 新增 useApprovalSSE hook (自動連線/斷線管理)
- approval.store 新增 connectSSE/disconnectSSE actions
- 更新三個組件使用 SSE 取代 setInterval polling:
  - LiveApprovalPanel
  - AICommandPanel
  - HITLSection

效益:
- 即時推送 (延遲 ~0ms vs polling 5s)
- 減少 API 請求 (僅變更時推送)
- 自動重連 + Fallback to polling

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-25 09:54:02 +08:00
parent 749b8bc554
commit 170102a4ee
7 changed files with 393 additions and 30 deletions

View File

@@ -25,13 +25,23 @@ import re
from typing import TYPE_CHECKING
from uuid import UUID
from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException, status
if TYPE_CHECKING:
from src.services.notifications import ExecutionStatus
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
Header,
HTTPException,
Request,
status,
)
from fastapi.responses import StreamingResponse
from src.core.config import settings
from src.core.logging import get_logger
from src.core.sse import EventType, SSEEvent, get_publisher
from src.models.approval import (
ApprovalRequest,
ApprovalRequestCreate,
@@ -521,6 +531,9 @@ async def create_approval(
required_signatures=approval.required_signatures,
)
# SSE: 發布建立事件
asyncio.create_task(_publish_approval_event("created", approval))
return ApprovalRequestResponse.from_approval(approval)
@@ -667,6 +680,10 @@ async def sign_approval(
note="Approval has no incident_id in metadata, cannot update Incident status",
)
# SSE: 發布簽核/核准事件
event_action = "approved" if execution_triggered else "signed"
asyncio.create_task(_publish_approval_event(event_action, approval))
return SignResponse(
success=True,
message=message,
@@ -752,4 +769,129 @@ async def reject_approval(
reason=request.reason,
)
# SSE: 發布拒絕事件
asyncio.create_task(_publish_approval_event("rejected", approval))
return ApprovalRequestResponse.from_approval(approval)
# =============================================================================
# SSE Event Publishing (Phase 15: Polling → SSE)
# =============================================================================
async def _publish_approval_event(
action: str,
approval: ApprovalRequest,
) -> None:
"""
發布 Approval SSE 事件
Args:
action: 事件動作 (created, signed, approved, rejected, executed)
approval: 授權請求物件
"""
try:
pub = await get_publisher()
event = SSEEvent(
type=EventType.APPROVAL,
data={
"action": action,
"approval_id": str(approval.id),
"status": approval.status.value,
"risk_level": approval.risk_level.value,
"current_signatures": approval.current_signatures,
"required_signatures": approval.required_signatures,
"action_text": approval.action[:100],
},
)
sent_count = await pub.publish(event, topic="approvals")
if sent_count > 0:
logger.debug(
"approval_sse_published",
action=action,
approval_id=str(approval.id),
sent_count=sent_count,
)
except Exception as e:
logger.error(
"approval_sse_publish_failed",
action=action,
approval_id=str(approval.id),
error=str(e),
)
# =============================================================================
# GET /api/v1/approvals/stream (SSE)
# =============================================================================
@router.get(
"/stream",
summary="SSE 即時授權更新",
description="Server-Sent Events 即時推送授權狀態變更,取代輪詢機制",
)
async def stream_approvals(request: Request) -> StreamingResponse:
"""
SSE 即時授權更新 (Phase 15: Polling → SSE)
事件類型:
- connected: 連線建立
- heartbeat: 心跳 (每 15 秒)
- approval: 授權狀態變更 (created, signed, approved, rejected, executed)
Client Usage (JavaScript):
```javascript
const es = new EventSource('/api/v1/approvals/stream');
es.addEventListener('approval', (e) => {
const data = JSON.parse(e.data);
console.log('Approval update:', data.action, data.approval_id);
// Refresh approval list or update specific item
});
es.addEventListener('heartbeat', () => {
console.log('Connection alive');
});
```
"""
logger.info(
"approval_stream_connect",
client_ip=request.client.host if request.client else "unknown",
)
pub = await get_publisher()
# 訂閱 approvals topic
client = await pub.subscribe(
topics=["approvals"],
metadata={"ip": request.client.host if request.client else "unknown"},
)
async def event_generator():
"""SSE 事件生成器,含斷線偵測"""
try:
async for data in pub.stream(client):
if await request.is_disconnected():
logger.info("approval_stream_disconnected", client_id=client.id)
break
yield data
except asyncio.CancelledError:
logger.info("approval_stream_cancelled", client_id=client.id)
raise
finally:
logger.info("approval_stream_cleanup", client_id=client.id)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*",
},
)

View File

@@ -8,13 +8,12 @@
* Features:
* - OpenClaw AI 視覺化 (頂部)
* - 待授權卡片列表 (底部)
* - 即時輪詢後端待簽核
* - **Phase 15: SSE 即時更新** (取代輪詢)
* - Nothing.tech 純白極簡風格
*
* i18n: 100% next-intl
*/
import { useEffect } from 'react'
import { useTranslations } from 'next-intl'
import { cn } from '@/lib/utils'
import { OpenClawPanel, type OpenClawStatus } from './openclaw-panel'
@@ -24,6 +23,7 @@ import {
usePendingApprovals,
toFrontendApproval,
} from '@/stores/approval.store'
import { useApprovalSSE } from '@/hooks/useApprovalSSE'
import { ShieldCheck, Bell } from 'lucide-react'
// =============================================================================
@@ -43,14 +43,11 @@ export function AICommandPanel({ className }: AICommandPanelProps) {
const tApproval = useTranslations('approval')
// Store
const { fetchPending, signApproval, rejectApproval, startPolling, stopPolling } = useApprovalStore()
const { fetchPending, signApproval, rejectApproval } = useApprovalStore()
const pendingApprovals = usePendingApprovals()
// Start polling on mount
useEffect(() => {
startPolling(5000) // Poll every 5 seconds
return () => stopPolling()
}, [startPolling, stopPolling])
// Phase 15: SSE 即時更新 (取代輪詢)
useApprovalSSE({ autoConnect: true })
// Handle approval
const handleApprove = async (id: string) => {

View File

@@ -9,10 +9,10 @@
* - OpenClaw AI 思考流視覺化
* - 動態 Approval Card 渲染 (100% 後端資料)
* - Slide Up 動畫過渡
* - 即時輪詢後端待簽核列表
* - **Phase 15: SSE 即時更新** (取代輪詢)
*/
import { useState, useEffect, useCallback } from 'react'
import { useState, useCallback } from 'react'
import { useTranslations } from 'next-intl'
import { cn } from '@/lib/utils'
import { OpenClawPanel, type OpenClawStatus } from './openclaw-panel'
@@ -22,6 +22,7 @@ import {
usePendingApprovals,
toFrontendApproval,
} from '@/stores/approval.store'
import { useApprovalSSE } from '@/hooks/useApprovalSSE'
import { useTimelineStore } from '@/stores/timeline.store'
import { ActionTimeline } from '@/components/timeline'
import { GlassCard, GlassCardTitle, GlassCardContent, GlassCardHeader } from '@/components/ui/glass-card'
@@ -84,10 +85,13 @@ export function HITLSection({ locale, className }: HITLSectionProps) {
const tApproval = useTranslations('approval')
// Store
const { fetchPending, signApproval, rejectApproval, startPolling, stopPolling } = useApprovalStore()
const { fetchPending, signApproval, rejectApproval } = useApprovalStore()
const pendingApprovals = usePendingApprovals()
const addTimelineEvent = useTimelineStore((state) => state.addEvent)
// Phase 15: SSE 即時更新 (取代輪詢)
useApprovalSSE({ autoConnect: true })
// AI thinking state
const [openclawStatus, setClawbotStatus] = useState<OpenClawStatus>('patrolling')
const [currentAlertType, setCurrentAlertType] = useState<string | undefined>()
@@ -105,12 +109,6 @@ export function HITLSection({ locale, className }: HITLSectionProps) {
requiredRoles: string
} | null>(null)
// Start polling on mount
useEffect(() => {
startPolling(5000) // Poll every 5 seconds
return () => stopPolling()
}, [startPolling, stopPolling])
// Simulate webhook alert (triggers AI thinking flow)
const simulateAlert = useCallback(async (alertType: string, severity: 'critical' | 'warning') => {
setIsSimulating(true)

View File

@@ -6,7 +6,7 @@
* 整合後端 API 的 Multi-Sig 授權流程
*
* Features:
* - 輪詢 GET /api/v1/approvals/pending
* - **Phase 15: SSE 即時更新** (取代輪詢)
* - 真實 API 簽核與拒絕
* - Multi-Sig 狀態即時更新
* - 簽核成功動畫
@@ -15,9 +15,10 @@
* - DevOps 角色長按時顯示 Access Denied
*/
import { useEffect, useState, useCallback } from 'react'
import { useState, useCallback } from 'react'
import { useTranslations } from 'next-intl'
import { useApprovalStore, usePendingApprovals, toFrontendApproval } from '@/stores/approval.store'
import { useApprovalSSE } from '@/hooks/useApprovalSSE'
import { ApprovalCard, type ApprovalRequest, type RiskLevel } from './approval-card'
import {
GlassCard,
@@ -101,9 +102,12 @@ export function LiveApprovalPanel({
const t = useTranslations('approval')
const tCommon = useTranslations('common')
const { startPolling, stopPolling, signApproval, rejectApproval, error } = useApprovalStore()
const { signApproval, rejectApproval, error } = useApprovalStore()
const pendingApprovals = usePendingApprovals()
// Phase 15: SSE 即時更新 (取代 Polling)
const { isConnected, status: sseStatus } = useApprovalSSE({ autoConnect: true })
// 模擬當前登入者 (Phase 3 權限擋板)
const currentUser: CurrentUser = {
id: signerId,
@@ -123,12 +127,6 @@ export function LiveApprovalPanel({
requiredRoles: string
} | null>(null)
// Start polling on mount
useEffect(() => {
startPolling(5000)
return () => stopPolling()
}, [startPolling, stopPolling])
// Handle sign with permission check (Phase 3 權限擋板)
const handleSign = useCallback(async (id: string, riskLevel: RiskLevel) => {
// Phase 3: 權限檢查

View File

@@ -1,5 +1,6 @@
export * from './use-agent'
export * from './use-health'
export * from './useSSE'
export * from './useApprovalSSE'
export * from './useIncidents'
export * from './useGlobalPulseMetrics'

View File

@@ -0,0 +1,86 @@
'use client'
/**
* useApprovalSSE - Approval SSE 連線 Hook
* ========================================
* Phase 15: Polling → SSE
*
* 自動管理 SSE 連線生命週期:
* - 組件掛載時自動連線
* - 組件卸載時自動斷線
* - 支援 SSE 失敗後降級到 Polling
*/
import { useEffect, useRef, useCallback } from 'react'
import { useApprovalStore } from '@/stores/approval.store'
interface UseApprovalSSEOptions {
/** 自動連線 (預設 true) */
autoConnect?: boolean
/** SSE 失敗後是否降級到 Polling (預設 true) */
fallbackToPolling?: boolean
}
interface UseApprovalSSEReturn {
/** SSE 連線狀態 */
status: 'disconnected' | 'connecting' | 'connected' | 'reconnecting' | 'error'
/** 是否已連線 */
isConnected: boolean
/** 是否重連中 */
isReconnecting: boolean
/** 錯誤訊息 */
error: string | null
/** 手動連線 */
connect: () => void
/** 手動斷線 */
disconnect: () => void
}
export function useApprovalSSE(options: UseApprovalSSEOptions = {}): UseApprovalSSEReturn {
const { autoConnect = true, fallbackToPolling = true } = options
const connectSSE = useApprovalStore((s) => s.connectSSE)
const disconnectSSE = useApprovalStore((s) => s.disconnectSSE)
const stopPolling = useApprovalStore((s) => s.stopPolling)
const sseStatus = useApprovalStore((s) => s.sseStatus)
const error = useApprovalStore((s) => s.error)
const mountedRef = useRef(true)
const handleConnect = useCallback(() => {
if (!mountedRef.current) return
console.log('[useApprovalSSE] Connecting...')
connectSSE()
}, [connectSSE])
const handleDisconnect = useCallback(() => {
console.log('[useApprovalSSE] Disconnecting...')
disconnectSSE()
stopPolling()
}, [disconnectSSE, stopPolling])
useEffect(() => {
mountedRef.current = true
if (autoConnect) {
console.log('[useApprovalSSE] Auto-connecting...')
handleConnect()
}
// Cleanup: 防止殭屍連線
return () => {
console.log('[useApprovalSSE] Cleanup')
mountedRef.current = false
handleDisconnect()
}
}, [autoConnect, handleConnect, handleDisconnect])
return {
status: sseStatus,
isConnected: sseStatus === 'connected',
isReconnecting: sseStatus === 'reconnecting',
error,
connect: handleConnect,
disconnect: handleDisconnect,
}
}

View File

@@ -74,6 +74,8 @@ export interface SignResponse {
// Store State
// =============================================================================
type SSEConnectionStatus = 'disconnected' | 'connecting' | 'connected' | 'reconnecting' | 'error'
interface ApprovalState {
// Data
pendingApprovals: ApprovalRequest[]
@@ -89,13 +91,21 @@ interface ApprovalState {
recentlyApproved: Set<string>
recentlyRejected: Set<string>
// Polling
// SSE Connection (Phase 15: Polling → SSE)
sseStatus: SSEConnectionStatus
sseReconnectAttempts: number
// Polling (Legacy, kept for fallback)
pollingInterval: number | null
// Actions
fetchPending: () => Promise<void>
signApproval: (id: string, signerId: string, signerName: string, comment?: string) => Promise<SignResponse | null>
rejectApproval: (id: string, rejectorId: string, rejectorName: string, reason: string) => Promise<boolean>
// SSE Actions (Phase 15)
connectSSE: () => void
disconnectSSE: () => void
// Polling (Legacy)
startPolling: (intervalMs?: number) => void
stopPolling: () => void
clearRecentlyResolved: (id: string) => void
@@ -107,6 +117,9 @@ interface ApprovalState {
// =============================================================================
const DEFAULT_POLLING_INTERVAL = 5000 // 5 seconds
const MAX_SSE_RECONNECT_ATTEMPTS = 10
const BASE_RECONNECT_DELAY = 1000 // 1 second
const MAX_RECONNECT_DELAY = 30000 // 30 seconds
// 統帥鐵律: 禁止任何 Fallback IP
const getApiBaseUrl = (): string => {
@@ -126,6 +139,8 @@ const API_BASE_URL = getApiBaseUrl()
// =============================================================================
let pollingTimer: NodeJS.Timeout | null = null
let eventSource: EventSource | null = null
let sseReconnectTimeout: NodeJS.Timeout | null = null
export const useApprovalStore = create<ApprovalState>()(
subscribeWithSelector((set, get) => ({
@@ -138,6 +153,10 @@ export const useApprovalStore = create<ApprovalState>()(
rejectingId: null,
recentlyApproved: new Set(),
recentlyRejected: new Set(),
// SSE state (Phase 15)
sseStatus: 'disconnected',
sseReconnectAttempts: 0,
// Polling (Legacy)
pollingInterval: null,
// ==========================================================================
@@ -324,7 +343,122 @@ export const useApprovalStore = create<ApprovalState>()(
},
// ==========================================================================
// Polling
// SSE Connection (Phase 15: Polling → SSE)
// ==========================================================================
connectSSE: () => {
const state = get()
// Already connected or connecting
if (eventSource && state.sseStatus === 'connected') {
console.log('[Approval SSE] Already connected')
return
}
// Clean up existing connection
if (eventSource) {
eventSource.close()
eventSource = null
}
if (!API_BASE_URL) {
console.error('[Approval SSE] Missing API URL')
set({ sseStatus: 'error', error: 'Missing API URL' })
return
}
set({ sseStatus: 'connecting', error: null })
console.log('[Approval SSE] Connecting to', `${API_BASE_URL}/api/v1/approvals/stream`)
// Initial fetch
state.fetchPending()
// Create EventSource
eventSource = new EventSource(`${API_BASE_URL}/api/v1/approvals/stream`)
eventSource.onopen = () => {
console.log('[Approval SSE] Connected')
set({
sseStatus: 'connected',
sseReconnectAttempts: 0,
error: null,
})
}
// Handle approval events
eventSource.addEventListener('approval', (e: MessageEvent) => {
try {
const data = JSON.parse(e.data)
console.log('[Approval SSE] Event:', data.action, data.approval_id)
// Re-fetch pending approvals on any change
get().fetchPending()
} catch (err) {
console.error('[Approval SSE] Failed to parse event:', err)
}
})
eventSource.addEventListener('heartbeat', () => {
console.log('[Approval SSE] Heartbeat')
})
eventSource.addEventListener('connected', () => {
console.log('[Approval SSE] Server confirmed connection')
})
// Error handling with exponential backoff
eventSource.onerror = () => {
const attempts = get().sseReconnectAttempts
if (attempts >= MAX_SSE_RECONNECT_ATTEMPTS) {
console.error('[Approval SSE] Max reconnect attempts reached, falling back to polling')
set({ sseStatus: 'error', error: 'SSE connection failed' })
eventSource?.close()
eventSource = null
// Fallback to polling
get().startPolling()
return
}
set({
sseStatus: 'reconnecting',
sseReconnectAttempts: attempts + 1,
})
const delay = Math.min(
BASE_RECONNECT_DELAY * Math.pow(2, attempts),
MAX_RECONNECT_DELAY
)
console.log(`[Approval SSE] Reconnecting in ${delay}ms (attempt ${attempts + 1})`)
if (sseReconnectTimeout) clearTimeout(sseReconnectTimeout)
sseReconnectTimeout = setTimeout(() => {
eventSource?.close()
eventSource = null
get().connectSSE()
}, delay)
}
},
disconnectSSE: () => {
console.log('[Approval SSE] Disconnecting')
if (eventSource) {
eventSource.close()
eventSource = null
}
if (sseReconnectTimeout) {
clearTimeout(sseReconnectTimeout)
sseReconnectTimeout = null
}
set({ sseStatus: 'disconnected', sseReconnectAttempts: 0 })
},
// ==========================================================================
// Polling (Legacy Fallback)
// ==========================================================================
startPolling: (intervalMs = DEFAULT_POLLING_INTERVAL) => {
const state = get()
@@ -396,6 +530,13 @@ export const useIsRejectingApproval = (id: string) =>
export const useApprovalError = () =>
useApprovalStore((state) => state.error)
// SSE Status Hooks (Phase 15)
export const useApprovalSSEStatus = () =>
useApprovalStore((state) => state.sseStatus)
export const useApprovalSSEConnected = () =>
useApprovalStore((state) => state.sseStatus === 'connected')
// =============================================================================
// Type Converters (Backend → Frontend)
// =============================================================================