Files
awoooi/apps/web/src/stores/agent.store.ts
Your Name 6ccdf199ad
All checks were successful
CD Pipeline / tests (push) Successful in 1m23s
Code Review / ai-code-review (push) Successful in 13s
CD Pipeline / build-and-deploy (push) Successful in 4m18s
CD Pipeline / post-deploy-checks (push) Successful in 1m44s
chore(web): 清理 IwoooS D2 註解語氣
2026-06-05 01:11:44 +08:00

307 lines
9.0 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* Agent Store - OpenClaw 狀態管理 (企業級強化版)
* ADR-004: Zustand 狀態管理
*
* 封裝內容:
* - Agent 狀態 (idle/thinking/executing/waiting_approval)
* - 思考串流 (thinkingStream)
* - SSE 連線邏輯 (含 AbortController + Buffer)
* - 錯誤處理
*/
import { create } from 'zustand'
import { subscribeWithSelector } from 'zustand/middleware'
// ==================== Types ====================
export type AgentStatus = 'idle' | 'thinking' | 'executing' | 'waiting_approval' | 'error'
export interface ThinkingStep {
type: 'thinking' | 'result' | 'error' | 'graph_rag' | 'finops'
content: string
timestamp: Date
// GraphRAG 結構化資料 (可選)
graphData?: {
analysisType: 'blast_radius' | 'root_cause'
targetService: string
affectedServices?: string[]
dependencyChain?: string[]
probableRootCauses?: string[]
criticalPath?: string[]
}
// FinOps 結構化資料 (可選)
finopsData?: {
totalWastedUsd: number
realizableSavingsUsd: number
freedResourcesUsd: number
topActions: Array<{
action: string
savings: number
risk: string
}>
}
}
export interface Approval {
id: string
type: string
action: {
pluginId: string
operation: string
parameters: Record<string, unknown>
riskLevel: 'low' | 'medium' | 'high' | 'critical'
}
requestedAt: Date
expiresAt?: Date
}
interface AgentState {
// ==================== State ====================
status: AgentStatus
currentTask: string | null
thinkingStream: ThinkingStep[]
pendingApprovals: Approval[]
conversationId: string | null
error: string | null
// SSE 連線控制 (內部使用)
_abortController: AbortController | null
_sseRetryCount: number
// ==================== Actions ====================
setStatus: (status: AgentStatus) => void
setCurrentTask: (task: string | null) => void
setError: (error: string | null) => void
// Thinking Stream
appendThinking: (step: ThinkingStep) => void
clearThinking: () => void
// SSE 串流控制
startThinkingStream: (apiUrl?: string) => Promise<void>
stopThinkingStream: () => void
// Approvals
addApproval: (approval: Approval) => void
removeApproval: (approvalId: string) => void
// Conversation
setConversationId: (id: string | null) => void
// Reset
reset: () => void
}
// ==================== Constants ====================
// 專案鐵律: 禁止任何 Fallback IP
const getApiBaseUrl = (): string => {
if (typeof window === 'undefined') return ''
const url = process.env.NEXT_PUBLIC_API_URL
if (!url) {
console.error('[AWOOOI ERROR] Missing NEXT_PUBLIC_API_URL')
return ''
}
return `${url}/api/v1`
}
const API_BASE_URL = getApiBaseUrl()
const initialState = {
status: 'idle' as AgentStatus,
currentTask: null,
thinkingStream: [],
pendingApprovals: [],
conversationId: null,
error: null,
_abortController: null,
_sseRetryCount: 0,
}
// ==================== Store ====================
export const useAgentStore = create<AgentState>()(
subscribeWithSelector((set, get) => ({
...initialState,
// ==================== Basic Setters ====================
setStatus: (status) => set({ status, error: status === 'error' ? get().error : null }),
setCurrentTask: (task) => set({ currentTask: task }),
setError: (error) => set({ error, status: error ? 'error' : get().status }),
// ==================== Thinking Stream ====================
appendThinking: (step) =>
set((state) => ({
thinkingStream: [...state.thinkingStream, step],
})),
clearThinking: () => set({ thinkingStream: [] }),
// ==================== SSE Stream Control ====================
startThinkingStream: async (apiUrl?: string) => {
const state = get()
// 中斷前一次未完成的請求
state._abortController?.abort()
const abortController = new AbortController()
set({
_abortController: abortController,
status: 'thinking',
error: null,
// 如果是重連,保留原本的 streams否則清空
thinkingStream: state._sseRetryCount > 0 ? state.thinkingStream : [],
})
try {
const url = apiUrl || `${API_BASE_URL}/agent/thinking`
const response = await fetch(url, {
signal: abortController.signal,
})
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`)
}
// 連線成功,重置重試計數
set({ _sseRetryCount: 0 })
const reader = response.body?.getReader()
if (!reader) {
throw new Error('無法建立串流通道')
}
const decoder = new TextDecoder()
let buffer = '' // Buffer 累積,防止 TCP 封包切斷 JSON
// eslint-disable-next-line no-constant-condition -- SSE 串流讀取迴圈,由 done 控制退出
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
// SSE 規範: 事件之間以 \n\n 分隔
const events = buffer.split('\n\n')
buffer = events.pop() || '' // 保留不完整片段
for (const event of events) {
if (event.startsWith('data: ')) {
const data = event.slice(6).trim()
// 結束標記
if (data === '[DONE]') {
set({ status: 'idle' })
return
}
// 安全解析 JSON
try {
const parsed = JSON.parse(data) as { type: string; content: string }
get().appendThinking({
type: parsed.type as ThinkingStep['type'],
content: parsed.content,
timestamp: new Date(),
})
} catch (e) {
console.warn('JSON 解析錯誤,跳過片段:', data)
}
}
}
}
set({ status: 'idle' })
} catch (err: unknown) {
if (err instanceof Error && err.name === 'AbortError') {
console.log('SSE 串流已手動中斷')
set({ status: 'idle', _sseRetryCount: 0 })
} else {
const message = err instanceof Error ? err.message : '未知錯誤'
// L2 網路自癒機制: Exponential Backoff Retry
const maxRetries = 5
const currentRetries = state._sseRetryCount
if (currentRetries < maxRetries) {
const delay = Math.min(1000 * Math.pow(2, currentRetries), 30000)
console.log(`[AWOOOI L2 Healing] SSE Error: ${message}. Retrying in ${delay}ms (Attempt ${currentRetries + 1}/${maxRetries})...`)
get().appendThinking({
type: 'error',
content: `連線中斷: ${message}。將在 ${delay/1000} 秒後自動重連 (嘗試 ${currentRetries + 1}/${maxRetries})...`,
timestamp: new Date(),
})
set({ _sseRetryCount: currentRetries + 1 })
setTimeout(() => get().startThinkingStream(apiUrl), delay)
} else {
console.error('[AWOOOI L2 Healing] SSE Max retries reached. Escalating to L3 AIOps.')
set({
status: 'error',
error: `Maximum SSE reconnect attempts reached: ${message}`,
})
get().appendThinking({
type: 'error',
content: `嚴重錯誤: 無法建立串流連線,已達最大重試次數。`,
timestamp: new Date(),
})
}
}
}
},
stopThinkingStream: () => {
const state = get()
state._abortController?.abort()
set({
_abortController: null,
status: 'idle',
})
},
// ==================== Approvals ====================
addApproval: (approval) =>
set((state) => ({
pendingApprovals: [...state.pendingApprovals, approval],
status: 'waiting_approval',
})),
removeApproval: (approvalId) =>
set((state) => {
const pendingApprovals = state.pendingApprovals.filter(
(a) => a.id !== approvalId
)
return {
pendingApprovals,
status: pendingApprovals.length === 0 ? 'idle' : 'waiting_approval',
}
}),
// ==================== Conversation ====================
setConversationId: (id) => set({ conversationId: id }),
// ==================== Reset ====================
reset: () => {
get()._abortController?.abort()
set(initialState)
},
}))
)
// ==================== Selectors (效能優化) ====================
export const selectAgentStatus = (state: AgentState) => state.status
export const selectThinkingStream = (state: AgentState) => state.thinkingStream
export const selectIsThinking = (state: AgentState) => state.status === 'thinking'
export const selectHasError = (state: AgentState) => state.status === 'error'
export const selectError = (state: AgentState) => state.error