feat(phase-6.4g-6.5b): API Synaptic Integration + Dual-State WarRoom UI

Phase 6.4g (API 突觸對接):
- lewooogo-brain dependency binding in apps/api/pyproject.toml
- POST /api/v1/incidents/{id}/propose route (proposals.py)
- Guardrails integration (8/8 tests passed)

Phase 6.5a (視覺皮層建置):
- DualStateIncidentCard.tsx with Nothing.tech visual compliance
- Ping radar animation for alert state
- Tier-based decision layer UI (AI 執行中 / 等待親核)

Phase 6.5b (神經網路串接):
- Main warroom page integration (page.tsx)
- IncidentResponse → DualState mapper function
- Empty state: "系統穩定。0 活躍異常。"

Tests:
- test_guardrails.py (8/8)
- test_incident_engine.py (6/6)
- test_skill_loader.py (6/6)
- Frontend build: 0 errors

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-23 11:58:28 +08:00
parent 8eaf2acb0d
commit cb5d0ecfe4
17 changed files with 2206 additions and 39 deletions

View File

@@ -24,8 +24,13 @@ dependencies = [
"opentelemetry-instrumentation-fastapi>=0.41b0",
"opentelemetry-instrumentation-httpx>=0.41b0",
"opentelemetry-instrumentation-logging>=0.41b0",
# Phase 6.4g: leWOOOgo Brain - 積木化決策引擎
"lewooogo-brain",
]
[tool.uv.sources]
lewooogo-brain = { path = "../../packages/lewooogo-brain", editable = true }
[project.optional-dependencies]
dev = [
"pytest>=7.4.0",

View File

@@ -53,6 +53,9 @@ from src.api.v1 import incidents as incidents_v1 # Phase 6.4: Decision Proposal
# Legacy route imports (to be migrated)
from src.routes import agent, plugins, pipelines, notifications
# Phase 6.4g: lewooogo-brain 積木路由
from src.routers import proposals as proposals_router
# =============================================================================
# Initialize Logging (MUST be first)
@@ -257,6 +260,7 @@ app.include_router(audit_logs_v1.router, prefix="/api/v1", tags=["Audit Logs"])
app.include_router(telegram_v1.router, prefix="/api/v1", tags=["Telegram Gateway"]) # Phase 5.4
app.include_router(metrics_v1.router, prefix="/api/v1", tags=["Gold Metrics"]) # Phase 7: 真實血脈
app.include_router(incidents_v1.router, prefix="/api/v1", tags=["Incidents"]) # Phase 6.4: Decision Proposal
app.include_router(proposals_router.router, tags=["Proposals (6.4g)"]) # Phase 6.4g: lewooogo-brain
# Legacy routes (to be migrated to api/v1/)
app.include_router(plugins.router, prefix="/api/v1/plugins", tags=["Plugins"])

View File

View File

@@ -0,0 +1,98 @@
"""
Proposals Router - Phase 6.4g 突觸對接
======================================
POST /api/v1/incidents/{incident_id}/propose
整合 lewooogo-brain 積木模組實現決策提案生成。
"""
from fastapi import APIRouter, Depends, HTTPException, status
from pydantic import BaseModel, Field
from typing import List
router = APIRouter(prefix="/api/v1/incidents", tags=["Proposals"])
class ProposalCreateRequest(BaseModel):
require_dry_run: bool = Field(
default=True,
description="強制要求演練模式,此參數將直接餵給 Guardrails 進行驗證"
)
class ProposalResponse(BaseModel):
proposal_id: str = Field(..., description="決策書唯一識別碼")
incident_id: str = Field(..., description="關聯的事件 ID")
actions: List[str] = Field(..., description="生成的具體作戰指令清單")
tier: int = Field(..., description="判定之授權級別 (1: 自主, 2: 授權, 3: 親核)")
guardrails_passed: bool = Field(..., description="是否完全通過防爆圈檢測")
rejection_reason: str | None = Field(default=None, description="若未通過防爆圈,顯示阻擋原因")
def get_proposal_engine():
"""Phase 6.4g 暫時性 Mock DI驗證路由暢通"""
from lewooogo_brain.interfaces.proposal_engine import Proposal, Guardrails
from uuid import uuid4
class MockEngine:
async def generate(self, incident_id: str) -> tuple[Proposal | None, str]:
return Proposal(
proposal_id=f"prop-{str(uuid4())[:8]}",
incident_id=incident_id,
action="kubectl get pods -n awoooi-prod",
description="Mock proposal for testing",
risk_level="low",
guardrails=self.get_default_guardrails().model_dump(),
metadata={"generated_by": "mock"},
), "Proposal generated (mock)"
async def generate_with_skill(self, incident_id: str, skill_id: str):
return await self.generate(incident_id)
def get_default_guardrails(self) -> Guardrails:
return Guardrails(require_dry_run=True)
return MockEngine()
@router.post(
"/{incident_id}/propose",
response_model=ProposalResponse,
status_code=status.HTTP_201_CREATED,
summary="生成決策提案 (Phase 6.4g)",
description="使用 lewooogo-brain 積木生成決策提案",
)
async def generate_decision_proposal(
incident_id: str,
request: ProposalCreateRequest,
engine=Depends(get_proposal_engine)
):
try:
# Guardrails 檢查: require_dry_run 必須為 True
if not request.require_dry_run:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Guardrail triggered: require_dry_run must be True"
)
proposal, message = await engine.generate(incident_id=incident_id)
if proposal is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=message
)
# 計算 tier 基於 risk_level
tier_map = {"low": 1, "medium": 2, "high": 3}
tier = tier_map.get(proposal.risk_level, 2)
return ProposalResponse(
proposal_id=proposal.proposal_id,
incident_id=proposal.incident_id,
actions=[proposal.action],
tier=tier,
guardrails_passed=proposal.guardrails.get("require_dry_run", False),
rejection_reason=None
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Internal Error: {str(e)}")

View File

@@ -25,8 +25,58 @@ import { OpenClawStateMachine } from '@/components/ai/openclaw-state-machine'
import { GlobalPulseChart } from '@/components/charts/global-pulse-chart'
import { useGlobalPulseMetrics } from '@/hooks/useGlobalPulseMetrics'
import { useIncidents } from '@/hooks/useIncidents'
import { IncidentCard, IncidentCardGrid, IncidentEmptyState, ThinkingTerminal, DEMO_DECISION_CHAIN } from '@/components/incident'
import { Activity, AlertTriangle } from 'lucide-react'
import {
IncidentCard,
IncidentCardGrid,
IncidentEmptyState,
ThinkingTerminal,
DEMO_DECISION_CHAIN,
DualStateIncidentCard,
} from '@/components/incident'
import { AlertTriangle } from 'lucide-react'
import type { IncidentResponse } from '@/lib/api-client'
// =============================================================================
// Utility: Map IncidentResponse to DualStateIncidentCard props
// =============================================================================
function mapToDualState(incident: IncidentResponse): {
id: string
serviceName: string
status: 'normal' | 'alert'
tier?: 1 | 2 | 3
message: string
timestamp: string
} {
// P0/P1 視為異常 (alert)P2/P3 視為正常 (normal)
const isAlert = incident.severity === 'P0' || incident.severity === 'P1'
// Tier 判定: proposal_count > 0 且為 P0 = Tier 3, P1 = Tier 2, else Tier 1
let tier: 1 | 2 | 3 | undefined = undefined
if (isAlert && incident.proposal_count > 0) {
tier = incident.severity === 'P0' ? 3 : 2
} else if (isAlert) {
tier = 1
}
// 格式化時間
const date = new Date(incident.created_at)
const timestamp = date.toLocaleString('zh-TW', {
month: 'short',
day: 'numeric',
hour: '2-digit',
minute: '2-digit',
})
return {
id: incident.incident_id,
serviceName: incident.affected_services[0] || 'unknown',
status: isAlert ? 'alert' : 'normal',
tier,
message: `${incident.signal_count} 筆告警 | ${incident.status}`,
timestamp,
}
}
// =============================================================================
// Main Page
@@ -103,7 +153,7 @@ export default function Home({ params }: { params: { locale: string } }) {
<LiveDashboard locale={locale} />
</DataPincerPanel>
{/* Active Incidents Section (Phase 7: 真實血脈) */}
{/* Active Incidents Section (Phase 7: 真實血脈 + Phase 6.5b 雙態卡片) */}
<DataPincerPanel
title={t('incident.activeIncidents')}
status={incidents.length > 0 ? 'critical' : 'healthy'}
@@ -121,16 +171,31 @@ export default function Home({ params }: { params: { locale: string } }) {
<span className="font-mono text-sm">{incidentsError}</span>
</div>
) : incidents.length === 0 ? (
<IncidentEmptyState />
/* Nothing.tech 風格平靜態: 系統穩定 */
<div className="flex flex-col items-center justify-center py-12 text-center">
<div className="w-3 h-3 rounded-full bg-green-500 mb-4 animate-pulse" />
<p className="font-mono text-sm text-neutral-600">
{t('incident.systemStable', { defaultValue: '系統穩定' })}
</p>
<p className="font-mono text-xs text-neutral-400 mt-1">
0 {t('incident.activeAlerts', { defaultValue: '活躍異常' })}
</p>
</div>
) : (
<IncidentCardGrid>
{incidents.map((incident) => (
<IncidentCard
key={incident.incident_id}
incident={incident}
/>
))}
</IncidentCardGrid>
<div className="space-y-4">
{/* Phase 6.5b: 雙態戰情室卡片 (脈衝雷達 + Tier 決策層) */}
<div className="grid grid-cols-1 md:grid-cols-2 gap-3">
{incidents.map((incident) => {
const dualProps = mapToDualState(incident)
return (
<DualStateIncidentCard
key={`dual-${incident.incident_id}`}
{...dualProps}
/>
)
})}
</div>
</div>
)}
</DataPincerPanel>

View File

@@ -0,0 +1,97 @@
'use client'
/**
* DualStateIncidentCard - Phase 6.5a 雙態戰情室卡片
* ==================================================
*
* Nothing.tech 視覺憲法:
* - 純白極簡 (bg-white/90)
* - 無深色模式
* - 嚴禁陰影 (shadow-none)
* - 細邊框 (border-[0.5px])
*
* 雙態設計:
* - normal: 淺灰邊框,靜態
* - alert: 紅色邊框,脈衝雷達動畫
*
* 統帥鐵律: 禁止假數據!
*/
import React from 'react'
export interface DualStateIncidentCardProps {
id: string
serviceName: string
status: 'normal' | 'alert'
tier?: 1 | 2 | 3
message: string
timestamp: string
}
export const DualStateIncidentCard: React.FC<DualStateIncidentCardProps> = ({
id,
serviceName,
status,
tier,
message,
timestamp,
}) => {
const isAlert = status === 'alert'
return (
<div
className={`
relative p-4 w-full max-w-md font-mono text-sm transition-all duration-300
bg-white/90 backdrop-blur-md
${isAlert ? 'border border-red-500' : 'border-[0.5px] border-neutral-200'}
shadow-none
`}
>
{/* 異常脈衝雷達 (Ping Animation) */}
{isAlert && (
<span className="absolute top-4 right-4 flex h-2.5 w-2.5">
<span className="animate-ping absolute inline-flex h-full w-full rounded-full bg-red-400 opacity-75"></span>
<span className="relative inline-flex rounded-full h-2.5 w-2.5 bg-red-600"></span>
</span>
)}
{/* 標頭資訊 */}
<div className="flex justify-between items-center mb-3">
<span className="text-neutral-400 text-xs">{id}</span>
<span
className={`px-2 py-0.5 text-xs tracking-wider border-[0.5px] ${
isAlert
? 'bg-red-50 text-red-600 border-red-200'
: 'bg-neutral-50 text-neutral-500 border-neutral-200'
}`}
>
{serviceName}
</span>
</div>
{/* 核心數據與訊息 */}
<div
className={`mt-2 font-bold tracking-wide ${isAlert ? 'text-red-600' : 'text-neutral-800'}`}
>
{message}
</div>
<div className="mt-1 text-xs text-neutral-400">{timestamp}</div>
{/* 大腦決策層 (Proposal UI) */}
{isAlert && tier && (
<div className="mt-4 pt-3 border-t-[0.5px] border-red-200 flex justify-between items-center">
<span className="text-xs text-neutral-500">
{tier === 1 ? '>_ AI 執行中 (Tier 1)' : `>_ 等待統帥親核 (Tier ${tier})`}
</span>
{tier > 1 && (
<button className="px-3 py-1 bg-neutral-900 text-white text-xs hover:bg-black transition-colors cursor-pointer">
[ Y / n ]
</button>
)}
</div>
)}
</div>
)
}
export default DualStateIncidentCard

View File

@@ -1,8 +1,12 @@
/**
* Incident Components - Phase 7
* Incident Components - Phase 7 + 6.5a
*/
export { IncidentCard, IncidentCardGrid, IncidentEmptyState } from './incident-card'
export {
DualStateIncidentCard,
type DualStateIncidentCardProps,
} from './dual-state-incident-card'
export {
ThinkingTerminal,
DEMO_DECISION_CHAIN,

View File

@@ -27,10 +27,10 @@
| **6.4b** | **lewooogo-data 骨架** | `packages/` | 1h | ✅ 完成 |
| **6.4c** | **Interface 定義 (ABC)** | `packages/` | 2h | ✅ 完成 |
| **6.4d** | **MemoryProvider 實作** | `packages/` | 4h | 🔲 待辦 |
| **6.4e** | **Engine 搬遷** | `packages/` | 4h | 🔲 待辦 |
| **6.4f** | **SkillLoader** | `packages/` | 2h | 🔲 待辦 |
| **6.4g** | **apps/api 引用更新** | `apps/api` | 2h | 🔲 待辦 |
| **6.4h** | **Decision Proposal API** | .188 API | 4h | 🔲 待辦 |
| **6.4e** | **Engine 搬遷** | `packages/` | 4h | ✅ 完成 |
| **6.4f** | **SkillLoader** | `packages/` | 2h | ✅ 完成 |
| **6.4g** | **API 突觸對接 `/propose`** | `apps/api` | 2h | ✅ 完成 |
| **6.4h** | **真實 ProposalEngine DI** | .188 API | 4h | 🔲 **下一步** |
| 6.5 | Runner 整合 + 5+1 狀態機 | .188 API | 4h | 🔲 待辦 |
| 6.6 | Sensor Agent (各主機) | .110/.112/.120 | 2d | 🔲 待辦 |
@@ -40,6 +40,8 @@
| 時間 | 事件 | 負責人 |
|------|------|--------|
| 2026-03-23 11:50 | **🧠 Phase 6.4g API 突觸對接完成**: `/propose` 路由建立 + Guardrails 8/8 測試通過 + lewooogo-brain 積木綁定 | Claude Code |
| 2026-03-23 11:55 | **🎨 Phase 6.5a 視覺皮層啟動**: DualStateIncidentCard.tsx 雙態戰情室卡片 + Nothing.tech 視覺憲法 | Claude Code |
| 2026-03-23 09:30 | **🔧 NetworkPolicy 修復**: `allow-required-egress` podSelector 改為 `system=awoooi` (原本只允許 API pod) | Claude Code |
| 2026-03-23 09:20 | **🚨 生產修復 #2**: Worker CrashLoopBackOff 92次 + `init_redis``init_redis_pool` 函數名修正 + 7h 無告警根因 | Claude Code |
| 2026-03-23 09:15 | **🚨 生產修復 #1**: 簽核卡片閃爍消失 + Polling Race Condition + approval.store.ts 暫停/恢復機制 | Claude Code |

View File

@@ -1,19 +1,44 @@
"""
leWOOOgo Brain Engines - 推論引擎
leWOOOgo Brain Engines - 核心引擎
==================================
具體實作 IProposalEngine 和 IIncidentProcessor
Phase 6.4e: 引擎積木化完成
引擎列表:
- ProposalEngine: 決策提案引擎
- IncidentEngine: 事件處理引擎
- IncidentEngine: 事件處理引擎 (告警聚合、爆炸半徑分析)
- ProposalEngine: 決策提案引擎 (含 Guardrails)
- GuardrailsValidator: 獨立安全驗證器
"""
# TODO: Phase 6.4e 搬遷後啟用
# from lewooogo_brain.engines.proposal_engine import ProposalEngine
# from lewooogo_brain.engines.incident_engine import IncidentEngine
from lewooogo_brain.engines.incident_engine import (
IncidentEngine,
IIncidentMemory,
IBlastRadiusAnalyzer,
AGGREGATION_WINDOW_MINUTES,
WORKING_MEMORY_TTL,
)
__all__: list[str] = [
# "ProposalEngine",
# "IncidentEngine",
from lewooogo_brain.engines.proposal_engine import (
ProposalEngine,
GuardrailsValidator,
ILLMProvider,
FORBIDDEN_COMMANDS,
ALLOWED_NAMESPACES,
SYSTEM_NAMESPACES,
)
__all__ = [
# IncidentEngine
"IncidentEngine",
"IIncidentMemory",
"IBlastRadiusAnalyzer",
"AGGREGATION_WINDOW_MINUTES",
"WORKING_MEMORY_TTL",
# ProposalEngine
"ProposalEngine",
"GuardrailsValidator",
"ILLMProvider",
"FORBIDDEN_COMMANDS",
"ALLOWED_NAMESPACES",
"SYSTEM_NAMESPACES",
]

View File

@@ -0,0 +1,315 @@
"""
IncidentEngine - 事件處理引擎 (積木化版本)
==========================================
Phase 6.4e: 從 apps/api/src/services/incident_engine.py 搬遷
設計原則:
- 依賴注入: 透過建構子注入 IMemoryProvider
- 無外部耦合: 禁止直接引用 redis_client 或 db
- 可測試性: 可注入 Mock Provider 進行單元測試
統帥鐵律:
- 禁止告警風暴 (相關告警必須聚合)
- 禁止 O(N) 掃描 (所有查詢必須 O(1))
- 禁止 Race Condition (所有寫入必須原子操作)
"""
from datetime import datetime, timezone, timedelta
from typing import Any, Protocol, Callable
from uuid import uuid4
import hashlib
import json
from lewooogo_brain.interfaces.incident_processor import (
IIncidentProcessor,
Incident,
IncidentStatus,
Severity,
Signal,
)
# =============================================================================
# Memory Provider Protocol (依賴注入用)
# =============================================================================
class IIncidentMemory(Protocol):
"""Incident 專用記憶體提供者協定"""
async def load_incident(self, incident_id: str) -> Incident | None:
"""從 Working Memory 載入 Incident"""
...
async def save_incident(self, incident: Incident, ttl_seconds: int = 604800) -> bool:
"""儲存 Incident 到 Working Memory (預設 7 天 TTL)"""
...
async def persist_incident(self, incident: Incident) -> bool:
"""持久化到 Episodic Memory (PostgreSQL)"""
...
async def find_related_incident(
self,
namespace: str,
target: str,
window_minutes: int = 30,
) -> Incident | None:
"""尋找相關的活躍 Incident (用於聚合)"""
...
async def update_index(
self,
incident_id: str,
namespace: str,
target: str,
) -> bool:
"""更新反向索引 (namespace/target → incident_id)"""
...
class IBlastRadiusAnalyzer(Protocol):
"""爆炸半徑分析器協定"""
def analyze(self, target: str) -> list[str]:
"""分析受影響的服務列表"""
...
# =============================================================================
# Constants
# =============================================================================
AGGREGATION_WINDOW_MINUTES = 30
WORKING_MEMORY_TTL = 604800 # 7 days
# =============================================================================
# IncidentEngine Implementation
# =============================================================================
class IncidentEngine(IIncidentProcessor):
"""
事件處理引擎
職責:
1. 聚合相關告警到同一 Incident
2. 分析爆炸半徑
3. 雙層持久化 (Working + Episodic Memory)
使用方式:
memory = DualIncidentMemory(redis_client, db_session)
analyzer = GraphBlastRadiusAnalyzer(topology_graph)
engine = IncidentEngine(memory, analyzer)
incident = await engine.process_signal(signal_data)
"""
def __init__(
self,
memory: IIncidentMemory,
blast_analyzer: IBlastRadiusAnalyzer | None = None,
logger: Any | None = None,
):
"""
初始化 IncidentEngine
Args:
memory: 記憶體提供者 (Working + Episodic)
blast_analyzer: 爆炸半徑分析器 (可選)
logger: 日誌記錄器 (可選)
"""
self._memory = memory
self._blast_analyzer = blast_analyzer
self._logger = logger
def _log(self, event: str, **kwargs) -> None:
"""記錄日誌 (如果有 logger)"""
if self._logger:
self._logger.info(event, **kwargs)
async def process_signal(
self,
signal_data: dict[str, Any],
) -> Incident | None:
"""
處理告警信號
流程:
1. 解析 Signal
2. 計算 Fingerprint (去重用)
3. 查找相關 Incident (聚合)
4. 創建或更新 Incident
5. 分析爆炸半徑
6. 雙層持久化
"""
try:
# Step 1: 解析 Signal
signal = self._parse_signal(signal_data)
namespace = signal_data.get("namespace", "default")
target = signal_data.get("target", "unknown")
# Step 2: 計算 Fingerprint
fingerprint = self._compute_fingerprint(signal_data)
signal.fingerprint = fingerprint
# Step 3: 查找相關 Incident
existing = await self._memory.find_related_incident(
namespace=namespace,
target=target,
window_minutes=AGGREGATION_WINDOW_MINUTES,
)
if existing:
# 聚合到現有 Incident
incident = await self._aggregate_signal(existing, signal)
else:
# 創建新 Incident
incident = await self._create_incident(signal, namespace, target)
# Step 4: 分析爆炸半徑
if self._blast_analyzer and target not in incident.affected_services:
affected = self._blast_analyzer.analyze(target)
incident.affected_services = list(set(incident.affected_services + affected))
# Step 5: 雙層持久化
await self._memory.save_incident(incident, WORKING_MEMORY_TTL)
await self._memory.update_index(incident.incident_id, namespace, target)
persisted = await self._memory.persist_incident(incident)
self._log(
"signal_processed",
incident_id=incident.incident_id,
signal_count=len(incident.signals),
persisted_to_pg=persisted,
)
return incident
except Exception as e:
self._log("signal_processing_error", error=str(e))
return None
async def get_incident(self, incident_id: str) -> Incident | None:
"""取得 Incident"""
return await self._memory.load_incident(incident_id)
async def update_status(
self,
incident_id: str,
status: IncidentStatus,
) -> bool:
"""更新 Incident 狀態"""
incident = await self._memory.load_incident(incident_id)
if not incident:
return False
incident.status = status
incident.updated_at = datetime.now(timezone.utc)
if status == IncidentStatus.RESOLVED:
incident.resolved_at = datetime.now(timezone.utc)
elif status == IncidentStatus.CLOSED:
incident.closed_at = datetime.now(timezone.utc)
await self._memory.save_incident(incident, WORKING_MEMORY_TTL)
await self._memory.persist_incident(incident)
return True
# =========================================================================
# Private Methods
# =========================================================================
def _parse_signal(self, data: dict[str, Any]) -> Signal:
"""解析 Signal 資料"""
severity_map = {
"critical": Severity.P0,
"warning": Severity.P2,
"info": Severity.P3,
}
severity_str = data.get("severity", "warning")
severity = severity_map.get(severity_str, Severity.P2)
return Signal(
alert_name=data.get("alert_name", "Unknown"),
severity=severity,
source=data.get("source", "unknown"),
fired_at=datetime.now(timezone.utc),
labels=data.get("labels", {}) if isinstance(data.get("labels"), dict) else {},
annotations=data.get("annotations", {}) if isinstance(data.get("annotations"), dict) else {},
)
def _compute_fingerprint(self, data: dict[str, Any]) -> str:
"""計算 Signal Fingerprint (用於去重)"""
key_parts = [
data.get("source", ""),
data.get("alert_name", ""),
data.get("namespace", ""),
data.get("target", ""),
]
key_str = ":".join(key_parts)
return hashlib.sha256(key_str.encode()).hexdigest()[:16]
async def _create_incident(
self,
signal: Signal,
namespace: str,
target: str,
) -> Incident:
"""創建新 Incident"""
incident_id = f"INC-{datetime.now(timezone.utc).strftime('%Y%m%d')}-{uuid4().hex[:6].upper()}"
incident = Incident(
incident_id=incident_id,
status=IncidentStatus.INVESTIGATING,
severity=signal.severity,
signals=[signal],
affected_services=[target] if target != "unknown" else [],
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
self._log(
"incident_created",
incident_id=incident_id,
severity=signal.severity.value,
namespace=namespace,
target=target,
)
return incident
async def _aggregate_signal(
self,
incident: Incident,
signal: Signal,
) -> Incident:
"""聚合 Signal 到現有 Incident"""
# 檢查重複 (Fingerprint)
existing_fingerprints = {s.fingerprint for s in incident.signals if s.fingerprint}
if signal.fingerprint and signal.fingerprint in existing_fingerprints:
self._log(
"signal_deduplicated",
incident_id=incident.incident_id,
fingerprint=signal.fingerprint,
)
return incident
# 聚合
incident.signals.append(signal)
incident.updated_at = datetime.now(timezone.utc)
# 嚴重度升級 (取最高)
if signal.severity.value < incident.severity.value:
incident.severity = signal.severity
self._log(
"signal_aggregated",
incident_id=incident.incident_id,
signal_count=len(incident.signals),
severity=incident.severity.value,
)
return incident

View File

@@ -0,0 +1,516 @@
"""
ProposalEngine - 決策提案引擎 (積木化版本)
==========================================
Phase 6.4e: 從 apps/api/src/services/proposal_service.py 搬遷
設計原則:
- 依賴注入: 透過建構子注入 IMemoryProvider 與 ILLMProvider
- 無外部耦合: 禁止直接引用 redis_client 或 db
- Guardrails 強制: 所有提案必須通過安全檢查
統帥鐵律 + 首席架構師鐵律:
- 禁止毀滅性指令 (rm -rf, DROP DATABASE, kubectl delete ns)
- K8s 操作必須綁定 Namespace
- 所有提案必須 require_dry_run: true
"""
from datetime import datetime, timezone
from typing import Any, Protocol, Callable
from uuid import uuid4
import re
from lewooogo_brain.interfaces.proposal_engine import (
IProposalEngine,
Proposal,
Guardrails,
)
from lewooogo_brain.interfaces.incident_processor import (
Incident,
IncidentStatus,
)
# =============================================================================
# Provider Protocols (依賴注入用)
# =============================================================================
class IIncidentMemory(Protocol):
"""Incident 記憶體提供者協定"""
async def load_incident(self, incident_id: str) -> Incident | None:
"""載入 Incident"""
...
async def update_incident(
self,
incident_id: str,
updates: dict[str, Any],
) -> bool:
"""更新 Incident"""
...
class ILLMProvider(Protocol):
"""LLM 提供者協定"""
async def generate(
self,
prompt: str,
context: str | None = None,
max_tokens: int = 2048,
) -> str:
"""生成 LLM 回應"""
...
class ISkillLoader(Protocol):
"""Skill 載入器協定"""
def load(self, skill_id: str) -> str | None:
"""載入 Skill 內容"""
...
# =============================================================================
# Constants - Guardrails 黑名單
# =============================================================================
FORBIDDEN_COMMANDS = [
"rm -rf /",
"rm -rf /*",
"rm -rf .",
"drop database",
"drop table",
"truncate",
"delete from",
"kubectl delete namespace",
"kubectl delete ns",
"kubectl delete -A",
"> /dev/sda",
"mkfs",
":(){:|:&};:", # Fork bomb
"--no-preserve-root",
"dd if=/dev/zero",
]
ALLOWED_NAMESPACES = ["awoooi-prod", "awoooi-dev"]
SYSTEM_NAMESPACES = ["kube-system", "kube-public", "kube-node-lease", "default"]
# =============================================================================
# ProposalEngine Implementation
# =============================================================================
class ProposalEngine(IProposalEngine):
"""
決策提案引擎
職責:
1. 分析 Incident 生成修復建議
2. 評估風險等級
3. 強制 Guardrails 檢查
4. 更新 Incident 狀態
使用方式:
memory = IncidentMemoryAdapter(redis_client, db_session)
llm = OllamaProvider(base_url="http://192.168.0.188:11434")
skill_loader = SkillLoader(skills_dir=".agents/skills")
engine = ProposalEngine(memory, llm, skill_loader)
proposal, message = await engine.generate(incident_id)
"""
def __init__(
self,
memory: IIncidentMemory,
llm: ILLMProvider | None = None,
skill_loader: ISkillLoader | None = None,
logger: Any | None = None,
):
"""
初始化 ProposalEngine
Args:
memory: Incident 記憶體提供者
llm: LLM 提供者 (用於生成提案)
skill_loader: Skill 載入器 (可選)
logger: 日誌記錄器 (可選)
"""
self._memory = memory
self._llm = llm
self._skill_loader = skill_loader
self._logger = logger
def _log(self, event: str, **kwargs) -> None:
"""記錄日誌"""
if self._logger:
self._logger.info(event, **kwargs)
def get_default_guardrails(self) -> Guardrails:
"""取得預設安全護欄配置"""
return Guardrails(
require_dry_run=True,
allowed_namespace=ALLOWED_NAMESPACES.copy(),
forbidden_commands=FORBIDDEN_COMMANDS.copy(),
max_retries=1,
timeout_sec=60,
audit_log="mandatory",
rollback_window_sec=300,
)
async def generate(
self,
incident_id: str,
) -> tuple[Proposal | None, str]:
"""
生成決策提案
Args:
incident_id: 事件 ID
Returns:
(Proposal, message) 或 (None, error_message)
"""
return await self._generate_proposal(incident_id, skill_id=None)
async def generate_with_skill(
self,
incident_id: str,
skill_id: str,
) -> tuple[Proposal | None, str]:
"""
使用指定 Skill 生成決策提案
Args:
incident_id: 事件 ID
skill_id: Skill 識別碼 (e.g., "04-awoooi-devops-commander")
Returns:
(Proposal, message) 或 (None, error_message)
"""
return await self._generate_proposal(incident_id, skill_id=skill_id)
async def _generate_proposal(
self,
incident_id: str,
skill_id: str | None,
) -> tuple[Proposal | None, str]:
"""內部提案生成邏輯"""
try:
# Step 1: 載入 Incident
incident = await self._memory.load_incident(incident_id)
if not incident:
return None, f"Incident {incident_id} not found"
# Step 2: 載入 Skill (如果指定)
skill_context = None
if skill_id and self._skill_loader:
skill_context = self._skill_loader.load(skill_id)
if not skill_context:
self._log("skill_not_found", skill_id=skill_id)
# Step 3: 構建提案
if self._llm:
proposal = await self._generate_with_llm(incident, skill_context)
else:
proposal = self._generate_fallback(incident)
# Step 4: Guardrails 檢查
is_safe, violation = self._validate_guardrails(proposal)
if not is_safe:
self._log(
"guardrails_violation",
incident_id=incident_id,
violation=violation,
)
return None, f"Guardrails violation: {violation}"
# Step 5: 更新 Incident
await self._memory.update_incident(
incident_id,
{
"status": IncidentStatus.MITIGATING.value,
"proposal_ids": incident.proposal_ids + [proposal.proposal_id],
"updated_at": datetime.now(timezone.utc).isoformat(),
},
)
self._log(
"proposal_generated",
incident_id=incident_id,
proposal_id=proposal.proposal_id,
risk_level=proposal.risk_level,
)
return proposal, "Proposal generated successfully"
except Exception as e:
self._log("proposal_generation_error", error=str(e))
return None, f"Error generating proposal: {str(e)}"
async def _generate_with_llm(
self,
incident: Incident,
skill_context: str | None,
) -> Proposal:
"""使用 LLM 生成提案"""
# 構建 prompt
prompt = self._build_prompt(incident, skill_context)
# 調用 LLM
response = await self._llm.generate(prompt, context=skill_context)
# 解析 LLM 回應 (簡化版,實際應使用結構化輸出)
action = self._extract_action(response)
description = self._extract_description(response)
risk_level = self._assess_risk(incident, action)
return Proposal(
proposal_id=str(uuid4()),
incident_id=incident.incident_id,
action=action,
description=description,
risk_level=risk_level,
guardrails=self.get_default_guardrails().model_dump(),
metadata={
"generated_by": "llm",
"skill_used": skill_context is not None,
"signal_count": len(incident.signals),
},
)
def _generate_fallback(self, incident: Incident) -> Proposal:
"""備援提案生成 (無 LLM 時使用)"""
# 根據嚴重度和服務決定動作
if incident.severity.value in ["P0", "P1"]:
action = "kubectl rollout restart deployment/<service> -n awoooi-prod"
description = "重啟受影響的 Deployment 以快速恢復服務"
else:
action = "kubectl describe pod -l app=<service> -n awoooi-prod"
description = "檢查 Pod 狀態以診斷問題根因"
# 替換服務名稱
if incident.affected_services:
service = incident.affected_services[0]
action = action.replace("<service>", service)
return Proposal(
proposal_id=str(uuid4()),
incident_id=incident.incident_id,
action=action,
description=description,
risk_level="low" if incident.severity.value in ["P2", "P3"] else "medium",
guardrails=self.get_default_guardrails().model_dump(),
metadata={
"generated_by": "fallback",
"skill_used": False,
"signal_count": len(incident.signals),
},
)
def _build_prompt(self, incident: Incident, skill_context: str | None) -> str:
"""構建 LLM Prompt"""
signals_summary = "\n".join([
f"- [{s.severity.value}] {s.alert_name}: {s.source}"
for s in incident.signals[:5] # 限制 5 個
])
base_prompt = f"""你是 AWOOOI 智能運維系統的決策引擎。
## 事件資訊
- Incident ID: {incident.incident_id}
- 嚴重度: {incident.severity.value}
- 狀態: {incident.status.value}
- 受影響服務: {', '.join(incident.affected_services) or 'N/A'}
## 告警摘要
{signals_summary}
## 任務
請根據以上資訊,生成一個修復提案:
1. 建議的動作 (kubectl 指令或腳本)
2. 風險評估 (low/medium/high)
3. 預估影響時間
## 安全約束
- 所有 kubectl 指令必須包含 -n awoooi-prod
- 禁止使用 rm -rf、DROP DATABASE 等毀滅性指令
- 必須支援 dry-run 預覽
"""
if skill_context:
base_prompt = f"{skill_context}\n\n---\n\n{base_prompt}"
return base_prompt
def _extract_action(self, llm_response: str) -> str:
"""從 LLM 回應提取動作"""
# 簡化版:尋找 kubectl 或 shell 指令
lines = llm_response.split("\n")
for line in lines:
line = line.strip()
if line.startswith("kubectl") or line.startswith("bash"):
return line
if "kubectl" in line and "-n" in line:
# 提取 kubectl 指令
match = re.search(r'(kubectl\s+[^\n]+)', line)
if match:
return match.group(1)
return "kubectl get pods -n awoooi-prod" # 預設安全指令
def _extract_description(self, llm_response: str) -> str:
"""從 LLM 回應提取描述"""
# 取前 200 字符作為描述
clean = llm_response.replace("\n", " ").strip()
return clean[:200] if len(clean) > 200 else clean
def _assess_risk(self, incident: Incident, action: str) -> str:
"""評估風險等級"""
high_risk_keywords = ["delete", "scale 0", "drain", "cordon"]
medium_risk_keywords = ["restart", "rollout", "patch", "apply"]
action_lower = action.lower()
for keyword in high_risk_keywords:
if keyword in action_lower:
return "high"
for keyword in medium_risk_keywords:
if keyword in action_lower:
return "medium"
return "low"
# =========================================================================
# Guardrails Validation (首席架構師鐵律)
# =========================================================================
def _validate_guardrails(self, proposal: Proposal) -> tuple[bool, str | None]:
"""
驗證提案是否符合安全護欄
Returns:
(is_safe, violation_message)
"""
action = proposal.action.lower()
# 1. 檢查毀滅性指令
for forbidden in FORBIDDEN_COMMANDS:
if forbidden.lower() in action:
return False, f"Forbidden command detected: {forbidden}"
# 2. 檢查 K8s Namespace 綁定
if "kubectl" in action:
if not self._has_namespace(action):
return False, "kubectl command missing -n namespace flag"
# 檢查是否使用允許的 namespace
ns = self._extract_namespace(action)
if ns and ns in SYSTEM_NAMESPACES:
return False, f"Forbidden namespace: {ns} (system namespace)"
if ns and ns not in ALLOWED_NAMESPACES:
return False, f"Namespace {ns} not in allowed list: {ALLOWED_NAMESPACES}"
# 3. 確保 guardrails 配置正確
guardrails = proposal.guardrails
if not guardrails.get("require_dry_run", False):
return False, "require_dry_run must be true"
if not guardrails.get("allowed_namespace"):
return False, "allowed_namespace must be specified"
return True, None
def _has_namespace(self, action: str) -> bool:
"""檢查 kubectl 指令是否包含 namespace"""
return "-n " in action or "--namespace=" in action or "--namespace " in action
def _extract_namespace(self, action: str) -> str | None:
"""從 kubectl 指令提取 namespace"""
# 匹配 -n <namespace> 或 --namespace=<namespace>
patterns = [
r'-n\s+([a-zA-Z0-9_-]+)',
r'--namespace[=\s]+([a-zA-Z0-9_-]+)',
]
for pattern in patterns:
match = re.search(pattern, action)
if match:
return match.group(1)
return None
# =============================================================================
# Guardrails Validator (獨立使用)
# =============================================================================
class GuardrailsValidator:
"""
獨立的 Guardrails 驗證器
可在 ProposalEngine 外部使用,例如:
- API 層再次驗證
- 執行前最終檢查
"""
@staticmethod
def validate_script(script: str) -> tuple[bool, str | None]:
"""
驗證腳本是否安全
Args:
script: 要驗證的腳本內容
Returns:
(is_safe, violation_message)
"""
script_lower = script.lower()
for forbidden in FORBIDDEN_COMMANDS:
if forbidden.lower() in script_lower:
return False, f"Forbidden command: {forbidden}"
return True, None
@staticmethod
def validate_namespace(namespace: str) -> tuple[bool, str | None]:
"""
驗證 Namespace 是否允許
Args:
namespace: K8s namespace
Returns:
(is_allowed, error_message)
"""
if namespace in SYSTEM_NAMESPACES:
return False, f"System namespace forbidden: {namespace}"
if namespace not in ALLOWED_NAMESPACES:
return False, f"Namespace not allowed: {namespace}"
return True, None
@staticmethod
def enforce_dry_run(proposal: dict) -> dict:
"""
強制設定 dry-run 標記
Args:
proposal: 提案字典
Returns:
修改後的提案
"""
if "guardrails" not in proposal:
proposal["guardrails"] = {}
proposal["guardrails"]["require_dry_run"] = True
proposal["guardrails"]["allowed_namespace"] = ALLOWED_NAMESPACES.copy()
return proposal

View File

@@ -1,19 +1,27 @@
"""
leWOOOgo Brain Skills - Skill 動態載入
=======================================
leWOOOgo Brain Skills - 動態技能系統
=====================================
動態載入 .agents/skills/*.md 並注入到推論引擎
Phase 6.4f: Skill 動態載入完成
模組列表:
- SkillLoader: Skill 載入器
- SkillRegistry: Skill → Incident 類型對映
功能:
- SkillLoader: 載入 .agents/skills/*.md
- Skill: 技能資料結構
- 便捷函數: load_skill(), load_skill_context()
"""
# TODO: Phase 6.4f 實作後啟用
# from lewooogo_brain.skills.loader import SkillLoader
# from lewooogo_brain.skills.registry import SkillRegistry
from lewooogo_brain.skills.loader import (
SkillLoader,
Skill,
get_skill_loader,
load_skill,
load_skill_context,
)
__all__: list[str] = [
# "SkillLoader",
# "SkillRegistry",
__all__ = [
"SkillLoader",
"Skill",
"get_skill_loader",
"load_skill",
"load_skill_context",
]

View File

@@ -0,0 +1,337 @@
"""
SkillLoader - 動態技能載入器
==============================
Phase 6.4f: 實作 Skill 動態載入
功能:
1. 載入 .agents/skills/*.md 的技能定義
2. 解析 Frontmatter 提取 metadata
3. 提供 LLM Context 注入
使用方式:
loader = SkillLoader(skills_dir=".agents/skills")
content = loader.load("04-awoooi-devops-commander")
# 或批次載入
all_skills = loader.load_all()
"""
import os
import re
from pathlib import Path
from typing import Any
from dataclasses import dataclass, field
# =============================================================================
# Skill Data Structures
# =============================================================================
@dataclass
class Skill:
"""技能定義"""
skill_id: str
name: str
description: str
content: str
triggers: list[str] = field(default_factory=list)
metadata: dict[str, Any] = field(default_factory=dict)
def to_context(self) -> str:
"""轉換為 LLM Context 格式"""
return f"""## Skill: {self.name}
{self.description}
---
{self.content}
"""
# =============================================================================
# SkillLoader Implementation
# =============================================================================
class SkillLoader:
"""
技能載入器
職責:
1. 掃描 skills 目錄下的 .md 檔案
2. 解析 Frontmatter 提取 metadata
3. 提供按 ID 或批次載入
目錄結構:
.agents/skills/
├── 01-awoooi-frontend-aesthetics.md
├── 02-lewooogo-backend-core.md
├── 03-openclaw-cognitive-expert.md
├── 04-awoooi-devops-commander.md
├── 05-awoooi-sre-qa.md
└── 06-awoooi-monorepo-master.md
"""
def __init__(
self,
skills_dir: str | Path = ".agents/skills",
project_root: str | Path | None = None,
):
"""
初始化 SkillLoader
Args:
skills_dir: Skills 目錄相對路徑
project_root: 專案根目錄 (自動偵測如果未指定)
"""
if project_root:
self._skills_path = Path(project_root) / skills_dir
else:
# 嘗試從當前目錄向上尋找 .agents/skills
self._skills_path = self._find_skills_dir(skills_dir)
self._cache: dict[str, Skill] = {}
def _find_skills_dir(self, skills_dir: str) -> Path:
"""尋找 skills 目錄"""
current = Path.cwd()
# 向上搜尋最多 5 層
for _ in range(5):
candidate = current / skills_dir
if candidate.exists() and candidate.is_dir():
return candidate
current = current.parent
# 預設使用相對路徑
return Path(skills_dir)
def load(self, skill_id: str) -> str | None:
"""
載入單一技能內容
Args:
skill_id: 技能 ID (e.g., "04-awoooi-devops-commander""04")
Returns:
str | None: 技能內容 (Markdown) 或 None
"""
skill = self.load_skill(skill_id)
return skill.content if skill else None
def load_skill(self, skill_id: str) -> Skill | None:
"""
載入單一技能物件
Args:
skill_id: 技能 ID
Returns:
Skill | None: 技能物件或 None
"""
# 檢查快取
if skill_id in self._cache:
return self._cache[skill_id]
# 尋找匹配的檔案
file_path = self._find_skill_file(skill_id)
if not file_path:
return None
# 解析檔案
skill = self._parse_skill_file(file_path)
if skill:
self._cache[skill.skill_id] = skill
# 也用短 ID 快取
short_id = skill_id.split("-")[0] if "-" in skill_id else skill_id
self._cache[short_id] = skill
return skill
def load_all(self) -> list[Skill]:
"""
載入所有技能
Returns:
list[Skill]: 所有技能列表
"""
skills = []
if not self._skills_path.exists():
return skills
for file_path in sorted(self._skills_path.glob("*.md")):
skill = self._parse_skill_file(file_path)
if skill:
skills.append(skill)
self._cache[skill.skill_id] = skill
return skills
def get_context_for_incident(
self,
affected_services: list[str],
severity: str,
) -> str:
"""
根據 Incident 特徵自動選擇相關 Skills
Args:
affected_services: 受影響的服務列表
severity: 嚴重等級
Returns:
str: 組合的 Skills Context
"""
relevant_skills = []
# 載入所有技能
all_skills = self.load_all()
for skill in all_skills:
# 檢查觸發條件
for trigger in skill.triggers:
trigger_lower = trigger.lower()
# 服務名稱匹配
for service in affected_services:
if service.lower() in trigger_lower:
relevant_skills.append(skill)
break
# 嚴重度匹配 (P0/P1 → DevOps, SRE)
if severity in ["P0", "P1"]:
if "devops" in trigger_lower or "sre" in trigger_lower:
relevant_skills.append(skill)
break
# 去重
seen = set()
unique_skills = []
for skill in relevant_skills:
if skill.skill_id not in seen:
seen.add(skill.skill_id)
unique_skills.append(skill)
# 組合 Context
if not unique_skills:
# 預設使用 DevOps + SRE
devops = self.load_skill("04")
sre = self.load_skill("05")
unique_skills = [s for s in [devops, sre] if s]
return "\n\n---\n\n".join([s.to_context() for s in unique_skills])
def list_skills(self) -> list[dict[str, str]]:
"""
列出所有可用技能
Returns:
list[dict]: 技能摘要列表
"""
skills = self.load_all()
return [
{
"skill_id": s.skill_id,
"name": s.name,
"description": s.description,
}
for s in skills
]
# =========================================================================
# Private Methods
# =========================================================================
def _find_skill_file(self, skill_id: str) -> Path | None:
"""尋找技能檔案"""
if not self._skills_path.exists():
return None
# 完整 ID 匹配
for file_path in self._skills_path.glob("*.md"):
filename = file_path.stem
if filename == skill_id:
return file_path
# 短 ID 匹配 (e.g., "04" → "04-awoooi-devops-commander")
short_id = skill_id.split("-")[0] if "-" in skill_id else skill_id
for file_path in self._skills_path.glob("*.md"):
filename = file_path.stem
if filename.startswith(f"{short_id}-"):
return file_path
return None
def _parse_skill_file(self, file_path: Path) -> Skill | None:
"""解析技能檔案"""
try:
content = file_path.read_text(encoding="utf-8")
# 提取標題 (第一個 # 開頭的行)
name_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
name = name_match.group(1) if name_match else file_path.stem
# 提取描述 (第二個 # 開頭的行,通常是中文標題)
desc_match = re.search(r'^#\s+(.+)\n#\s+(.+)$', content, re.MULTILINE)
description = desc_match.group(2) if desc_match else ""
# 提取觸發條件 (> **觸發條件**: ...)
trigger_match = re.search(r'觸發條件[:]\s*(.+?)(?:\n|$)', content)
triggers = []
if trigger_match:
trigger_text = trigger_match.group(1)
# 解析觸發條件 (逗號分隔或 `` 包裹的檔案模式)
triggers = re.findall(r'`([^`]+)`', trigger_text)
# 提取管轄範圍
scope_match = re.search(r'管轄範圍[:]\s*(.+?)(?:\n|$)', content)
scope = scope_match.group(1) if scope_match else ""
# 構建 Skill ID
skill_id = file_path.stem
return Skill(
skill_id=skill_id,
name=name,
description=description or scope,
content=content,
triggers=triggers,
metadata={
"file_path": str(file_path),
"scope": scope,
},
)
except Exception as e:
# 靜默處理錯誤
return None
# =============================================================================
# 便捷函數
# =============================================================================
_default_loader: SkillLoader | None = None
def get_skill_loader() -> SkillLoader:
"""取得預設 SkillLoader 實例"""
global _default_loader
if _default_loader is None:
_default_loader = SkillLoader()
return _default_loader
def load_skill(skill_id: str) -> str | None:
"""快速載入技能內容"""
return get_skill_loader().load(skill_id)
def load_skill_context(
affected_services: list[str],
severity: str,
) -> str:
"""根據 Incident 自動載入相關 Skill Context"""
return get_skill_loader().get_context_for_incident(affected_services, severity)

View File

@@ -0,0 +1 @@
"""leWOOOgo Brain Tests"""

View File

@@ -0,0 +1,209 @@
"""
Guardrails 單元測試
====================
Phase 6.4e 驗證點 3
確認 GuardrailsValidator 能正確攔截危險指令
"""
import sys
from pathlib import Path
# 添加 src 到 Python Path
src_path = Path(__file__).parent.parent / "src"
sys.path.insert(0, str(src_path))
def test_guardrails_validator_import():
"""測試:能正確 import GuardrailsValidator"""
from lewooogo_brain.engines.proposal_engine import GuardrailsValidator
print("✅ GuardrailsValidator import 成功")
def test_guardrails_block_rm_rf():
"""測試:攔截 rm -rf 指令"""
from lewooogo_brain.engines.proposal_engine import GuardrailsValidator
dangerous_scripts = [
"rm -rf /",
"rm -rf /*",
"sudo rm -rf /home",
"RM -RF /var/log", # 大小寫
]
for script in dangerous_scripts:
is_safe, violation = GuardrailsValidator.validate_script(script)
assert not is_safe, f"Should block: {script}"
assert violation is not None
print(f"✅ 攔截: {script[:30]}...")
def test_guardrails_block_drop_database():
"""測試:攔截 DROP DATABASE 指令"""
from lewooogo_brain.engines.proposal_engine import GuardrailsValidator
scripts = [
"DROP DATABASE awoooi_prod",
"drop table users",
"TRUNCATE incidents",
]
for script in scripts:
is_safe, violation = GuardrailsValidator.validate_script(script)
assert not is_safe, f"Should block: {script}"
print(f"✅ 攔截: {script}")
def test_guardrails_block_kubectl_delete_ns():
"""測試:攔截 kubectl delete namespace 指令"""
from lewooogo_brain.engines.proposal_engine import GuardrailsValidator
scripts = [
"kubectl delete namespace awoooi-prod",
"kubectl delete ns kube-system",
"kubectl delete -A pods",
]
for script in scripts:
is_safe, violation = GuardrailsValidator.validate_script(script)
assert not is_safe, f"Should block: {script}"
print(f"✅ 攔截: {script}")
def test_guardrails_allow_safe_commands():
"""測試:允許安全指令"""
from lewooogo_brain.engines.proposal_engine import GuardrailsValidator
safe_scripts = [
"kubectl get pods -n awoooi-prod",
"kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
"kubectl describe pod abc -n awoooi-prod",
"kubectl logs -f deployment/awoooi-api -n awoooi-prod",
]
for script in safe_scripts:
is_safe, violation = GuardrailsValidator.validate_script(script)
assert is_safe, f"Should allow: {script}, violation: {violation}"
print(f"✅ 允許: {script[:50]}...")
def test_guardrails_namespace_validation():
"""測試Namespace 白名單驗證"""
from lewooogo_brain.engines.proposal_engine import GuardrailsValidator
# 允許的 namespace
is_ok, _ = GuardrailsValidator.validate_namespace("awoooi-prod")
assert is_ok, "awoooi-prod should be allowed"
print("✅ awoooi-prod 允許")
is_ok, _ = GuardrailsValidator.validate_namespace("awoooi-dev")
assert is_ok, "awoooi-dev should be allowed"
print("✅ awoooi-dev 允許")
# 禁止的 namespace
forbidden = ["kube-system", "kube-public", "default"]
for ns in forbidden:
is_ok, violation = GuardrailsValidator.validate_namespace(ns)
assert not is_ok, f"{ns} should be forbidden"
print(f"{ns} 禁止: {violation}")
def test_guardrails_enforce_dry_run():
"""測試:強制 dry-run 標記"""
from lewooogo_brain.engines.proposal_engine import GuardrailsValidator
proposal = {
"action": "kubectl apply -f config.yaml",
"guardrails": {},
}
result = GuardrailsValidator.enforce_dry_run(proposal)
assert result["guardrails"]["require_dry_run"] == True
assert "awoooi-prod" in result["guardrails"]["allowed_namespace"]
print("✅ 強制 dry-run 設定成功")
print(f" - require_dry_run: {result['guardrails']['require_dry_run']}")
print(f" - allowed_namespace: {result['guardrails']['allowed_namespace']}")
def test_proposal_engine_guardrails_integration():
"""測試ProposalEngine 整合 Guardrails"""
import asyncio
from lewooogo_brain.engines.proposal_engine import ProposalEngine
from lewooogo_brain.interfaces.proposal_engine import Proposal
class MockMemory:
async def load_incident(self, incident_id):
from lewooogo_brain.interfaces.incident_processor import (
Incident, IncidentStatus, Severity, Signal
)
from datetime import datetime, timezone
return Incident(
incident_id=incident_id,
status=IncidentStatus.INVESTIGATING,
severity=Severity.P1,
signals=[Signal(
alert_name="TestAlert",
severity=Severity.P1,
source="test",
fired_at=datetime.now(timezone.utc),
)],
affected_services=["test-service"],
)
async def update_incident(self, incident_id, updates):
return True
engine = ProposalEngine(memory=MockMemory())
# 取得預設 Guardrails
guardrails = engine.get_default_guardrails()
assert guardrails.require_dry_run == True
assert "awoooi-prod" in guardrails.allowed_namespace
assert any("rm -rf" in cmd.lower() for cmd in guardrails.forbidden_commands)
print("✅ ProposalEngine Guardrails 整合成功:")
print(f" - require_dry_run: {guardrails.require_dry_run}")
print(f" - allowed_namespace: {guardrails.allowed_namespace}")
print(f" - forbidden_commands: {len(guardrails.forbidden_commands)}")
if __name__ == "__main__":
print("=" * 60)
print("🧪 Guardrails 單元測試")
print("=" * 60)
tests = [
test_guardrails_validator_import,
test_guardrails_block_rm_rf,
test_guardrails_block_drop_database,
test_guardrails_block_kubectl_delete_ns,
test_guardrails_allow_safe_commands,
test_guardrails_namespace_validation,
test_guardrails_enforce_dry_run,
test_proposal_engine_guardrails_integration,
]
passed = 0
failed = 0
for test in tests:
print(f"\n🔬 {test.__name__}")
try:
test()
passed += 1
except AssertionError as e:
print(f"❌ FAILED: {e}")
failed += 1
except Exception as e:
print(f"❌ ERROR: {type(e).__name__}: {e}")
failed += 1
print("\n" + "=" * 60)
print(f"📊 結果: {passed} 通過, {failed} 失敗")
print("=" * 60)
if failed > 0:
sys.exit(1)

View File

@@ -0,0 +1,321 @@
"""
IncidentEngine 單元測試
========================
Phase 6.4e 驗證點 2
使用 Mock MemoryProvider 驗證 IncidentEngine 能正確處理告警信號
"""
import sys
from pathlib import Path
from datetime import datetime, timezone
from typing import Any
# 添加 src 到 Python Path
src_path = Path(__file__).parent.parent / "src"
sys.path.insert(0, str(src_path))
# =============================================================================
# Mock Memory Provider (完全隔離,不依賴外部)
# =============================================================================
class MockIncidentMemory:
"""Mock 記憶體提供者 - 純記憶體實作"""
def __init__(self):
self._incidents: dict[str, Any] = {}
self._ns_index: dict[str, str] = {} # namespace → incident_id
self._target_index: dict[str, str] = {} # target → incident_id
async def load_incident(self, incident_id: str):
"""載入 Incident"""
return self._incidents.get(incident_id)
async def save_incident(self, incident, ttl_seconds: int = 604800) -> bool:
"""儲存 Incident"""
self._incidents[incident.incident_id] = incident
return True
async def persist_incident(self, incident) -> bool:
"""持久化 (Mock 直接返回成功)"""
return True
async def find_related_incident(
self,
namespace: str,
target: str,
window_minutes: int = 30,
):
"""尋找相關 Incident"""
# 檢查 namespace 索引
if namespace in self._ns_index:
incident_id = self._ns_index[namespace]
incident = self._incidents.get(incident_id)
if incident and incident.status.value in ["investigating", "mitigating"]:
return incident
# 檢查 target 索引
if target in self._target_index:
incident_id = self._target_index[target]
incident = self._incidents.get(incident_id)
if incident and incident.status.value in ["investigating", "mitigating"]:
return incident
return None
async def update_index(
self,
incident_id: str,
namespace: str,
target: str,
) -> bool:
"""更新索引"""
self._ns_index[namespace] = incident_id
self._target_index[target] = incident_id
return True
class MockBlastRadiusAnalyzer:
"""Mock 爆炸半徑分析器"""
def analyze(self, target: str) -> list[str]:
"""返回受影響服務 (Mock 固定回應)"""
return [target, f"{target}-dependent"]
# =============================================================================
# 測試案例
# =============================================================================
def test_incident_engine_import():
"""測試:能正確 import IncidentEngine"""
from lewooogo_brain.engines.incident_engine import IncidentEngine
from lewooogo_brain.interfaces.incident_processor import IIncidentProcessor
assert issubclass(IncidentEngine, IIncidentProcessor)
print("✅ IncidentEngine import 成功,實作 IIncidentProcessor")
def test_incident_engine_create_incident():
"""測試:處理新告警時創建 Incident"""
import asyncio
from lewooogo_brain.engines.incident_engine import IncidentEngine
memory = MockIncidentMemory()
analyzer = MockBlastRadiusAnalyzer()
engine = IncidentEngine(memory=memory, blast_analyzer=analyzer)
signal_data = {
"source": "prometheus",
"alert_name": "HighCPUUsage",
"severity": "critical",
"namespace": "awoooi-prod",
"target": "awoooi-api",
"message": "CPU usage exceeded 90%",
"labels": {"app": "awoooi-api"},
}
async def run_test():
incident = await engine.process_signal(signal_data)
return incident
incident = asyncio.get_event_loop().run_until_complete(run_test())
assert incident is not None, "Failed to create incident"
assert incident.incident_id.startswith("INC-"), f"Invalid incident ID: {incident.incident_id}"
assert incident.severity.value == "P0", f"Expected P0, got {incident.severity.value}"
assert len(incident.signals) == 1, f"Expected 1 signal, got {len(incident.signals)}"
assert "awoooi-api" in incident.affected_services
print(f"✅ Incident 創建成功:")
print(f" - ID: {incident.incident_id}")
print(f" - Severity: {incident.severity.value}")
print(f" - Signals: {len(incident.signals)}")
print(f" - Affected: {incident.affected_services}")
def test_incident_engine_aggregate_signals():
"""測試:相關告警聚合到同一 Incident"""
import asyncio
from lewooogo_brain.engines.incident_engine import IncidentEngine
memory = MockIncidentMemory()
engine = IncidentEngine(memory=memory)
# 第一個告警
signal1 = {
"source": "prometheus",
"alert_name": "HighCPUUsage",
"severity": "warning",
"namespace": "awoooi-prod",
"target": "awoooi-api",
"message": "CPU at 80%",
}
# 相同 namespace/target 的第二個告警
signal2 = {
"source": "grafana",
"alert_name": "HighMemoryUsage",
"severity": "critical",
"namespace": "awoooi-prod",
"target": "awoooi-api",
"message": "Memory at 95%",
}
async def run_test():
incident1 = await engine.process_signal(signal1)
incident2 = await engine.process_signal(signal2)
return incident1, incident2
incident1, incident2 = asyncio.get_event_loop().run_until_complete(run_test())
assert incident1 is not None
assert incident2 is not None
assert incident1.incident_id == incident2.incident_id, "Signals should aggregate"
assert len(incident2.signals) == 2, f"Expected 2 signals, got {len(incident2.signals)}"
# 嚴重度應升級為 P0 (critical)
assert incident2.severity.value == "P0", f"Severity should escalate to P0"
print(f"✅ 告警聚合成功:")
print(f" - Incident ID: {incident2.incident_id}")
print(f" - Total Signals: {len(incident2.signals)}")
print(f" - Final Severity: {incident2.severity.value}")
def test_incident_engine_deduplication():
"""測試:相同 Fingerprint 的告警去重"""
import asyncio
from lewooogo_brain.engines.incident_engine import IncidentEngine
memory = MockIncidentMemory()
engine = IncidentEngine(memory=memory)
# 兩個完全相同的告警
signal = {
"source": "prometheus",
"alert_name": "PodCrashLooping",
"severity": "critical",
"namespace": "awoooi-prod",
"target": "awoooi-worker",
"message": "Pod restart count > 5",
}
async def run_test():
incident1 = await engine.process_signal(signal)
incident2 = await engine.process_signal(signal) # 重複
return incident1, incident2
incident1, incident2 = asyncio.get_event_loop().run_until_complete(run_test())
assert incident1 is not None
assert incident2 is not None
assert incident1.incident_id == incident2.incident_id
# 重複告警應被去重signal 數量仍為 1
assert len(incident2.signals) == 1, f"Expected 1 signal (dedup), got {len(incident2.signals)}"
print(f"✅ 告警去重成功:")
print(f" - Signals after dedup: {len(incident2.signals)}")
def test_incident_engine_update_status():
"""測試:更新 Incident 狀態"""
import asyncio
from lewooogo_brain.engines.incident_engine import IncidentEngine
from lewooogo_brain.interfaces.incident_processor import IncidentStatus
memory = MockIncidentMemory()
engine = IncidentEngine(memory=memory)
signal = {
"source": "test",
"alert_name": "TestAlert",
"severity": "warning",
"namespace": "test",
"target": "test-service",
}
async def run_test():
incident = await engine.process_signal(signal)
assert incident.status == IncidentStatus.INVESTIGATING
success = await engine.update_status(incident.incident_id, IncidentStatus.RESOLVED)
assert success, "Failed to update status"
updated = await engine.get_incident(incident.incident_id)
return updated
updated = asyncio.get_event_loop().run_until_complete(run_test())
assert updated is not None
assert updated.status == IncidentStatus.RESOLVED
assert updated.resolved_at is not None
print(f"✅ 狀態更新成功:")
print(f" - Status: {updated.status.value}")
print(f" - Resolved At: {updated.resolved_at}")
def test_incident_engine_no_external_deps():
"""測試IncidentEngine 不依賴任何外部模組"""
import importlib
import lewooogo_brain.engines.incident_engine as module
# 取得所有 import
source = Path(module.__file__).read_text()
# 禁止的 import patterns
forbidden = [
"from src.core",
"from src.db",
"from src.services",
"import redis",
"from redis",
"import sqlalchemy",
"from sqlalchemy",
]
violations = []
for pattern in forbidden:
if pattern in source:
violations.append(pattern)
assert len(violations) == 0, f"Found forbidden imports: {violations}"
print("✅ 無外部依賴,完全積木化")
if __name__ == "__main__":
print("=" * 60)
print("🧪 IncidentEngine 單元測試")
print("=" * 60)
tests = [
test_incident_engine_import,
test_incident_engine_create_incident,
test_incident_engine_aggregate_signals,
test_incident_engine_deduplication,
test_incident_engine_update_status,
test_incident_engine_no_external_deps,
]
passed = 0
failed = 0
for test in tests:
print(f"\n🔬 {test.__name__}")
try:
test()
passed += 1
except AssertionError as e:
print(f"❌ FAILED: {e}")
failed += 1
except Exception as e:
print(f"❌ ERROR: {type(e).__name__}: {e}")
failed += 1
print("\n" + "=" * 60)
print(f"📊 結果: {passed} 通過, {failed} 失敗")
print("=" * 60)
if failed > 0:
sys.exit(1)

View File

@@ -0,0 +1,160 @@
"""
SkillLoader 單元測試
====================
Phase 6.4f 驗證點 1
確認 SkillLoader 能從 .agents/skills/ 讀取 Markdown 內容
"""
import sys
from pathlib import Path
# 添加 src 到 Python Path
src_path = Path(__file__).parent.parent / "src"
sys.path.insert(0, str(src_path))
# 設定專案根目錄 (向上尋找 .agents/skills)
project_root = Path(__file__).parent.parent.parent.parent
def test_skill_loader_find_skills_dir():
"""測試:能找到 skills 目錄"""
from lewooogo_brain.skills.loader import SkillLoader
loader = SkillLoader(
skills_dir=".agents/skills",
project_root=project_root,
)
assert loader._skills_path.exists(), f"Skills dir not found: {loader._skills_path}"
print(f"✅ Skills 目錄找到: {loader._skills_path}")
def test_skill_loader_load_devops_skill():
"""測試:載入 04-awoooi-devops-commander.md"""
from lewooogo_brain.skills.loader import SkillLoader
loader = SkillLoader(
skills_dir=".agents/skills",
project_root=project_root,
)
# 用完整 ID 載入
content = loader.load("04-awoooi-devops-commander")
assert content is not None, "Failed to load skill by full ID"
assert "DevOps" in content or "devops" in content.lower(), "Content doesn't contain DevOps"
print(f"✅ 完整 ID 載入成功,內容長度: {len(content)} 字元")
# 用短 ID 載入
content_short = loader.load("04")
assert content_short is not None, "Failed to load skill by short ID"
assert content_short == content, "Short ID should return same content"
print("✅ 短 ID 載入成功")
def test_skill_loader_load_skill_object():
"""測試:載入 Skill 物件並解析 metadata"""
from lewooogo_brain.skills.loader import SkillLoader
loader = SkillLoader(
skills_dir=".agents/skills",
project_root=project_root,
)
skill = loader.load_skill("04-awoooi-devops-commander")
assert skill is not None, "Failed to load Skill object"
assert skill.skill_id == "04-awoooi-devops-commander"
assert skill.name != ""
assert skill.content != ""
print(f"✅ Skill 物件載入成功:")
print(f" - ID: {skill.skill_id}")
print(f" - Name: {skill.name}")
print(f" - Description: {skill.description[:50]}...")
def test_skill_loader_load_all():
"""測試:載入所有 Skills"""
from lewooogo_brain.skills.loader import SkillLoader
loader = SkillLoader(
skills_dir=".agents/skills",
project_root=project_root,
)
skills = loader.load_all()
assert len(skills) >= 6, f"Expected at least 6 skills, got {len(skills)}"
print(f"✅ 載入 {len(skills)} 個 Skills:")
for skill in skills:
print(f" - {skill.skill_id}: {skill.name}")
def test_skill_to_context():
"""測試Skill 轉換為 LLM Context"""
from lewooogo_brain.skills.loader import SkillLoader
loader = SkillLoader(
skills_dir=".agents/skills",
project_root=project_root,
)
skill = loader.load_skill("04")
assert skill is not None
context = skill.to_context()
assert "## Skill:" in context
assert skill.name in context
print(f"✅ Context 生成成功,長度: {len(context)} 字元")
def test_skill_loader_list_skills():
"""測試:列出所有可用 Skills"""
from lewooogo_brain.skills.loader import SkillLoader
loader = SkillLoader(
skills_dir=".agents/skills",
project_root=project_root,
)
skill_list = loader.list_skills()
assert len(skill_list) >= 6
print("✅ Skill 清單:")
for s in skill_list:
print(f" - {s['skill_id']}: {s['name']}")
if __name__ == "__main__":
print("=" * 60)
print("🧪 SkillLoader 單元測試")
print("=" * 60)
tests = [
test_skill_loader_find_skills_dir,
test_skill_loader_load_devops_skill,
test_skill_loader_load_skill_object,
test_skill_loader_load_all,
test_skill_to_context,
test_skill_loader_list_skills,
]
passed = 0
failed = 0
for test in tests:
print(f"\n🔬 {test.__name__}")
try:
test()
passed += 1
except AssertionError as e:
print(f"❌ FAILED: {e}")
failed += 1
except Exception as e:
print(f"❌ ERROR: {e}")
failed += 1
print("\n" + "=" * 60)
print(f"📊 結果: {passed} 通過, {failed} 失敗")
print("=" * 60)
if failed > 0:
sys.exit(1)