Files
awoooi/apps/api/src/services/openclaw.py
OG T 1ac8965a7a feat(api): Phase 15.1 Langfuse LLMOps 整合 + 模型升級
## 新功能
- Langfuse 自建部署 (192.168.0.110:3100)
- langfuse_client.py - LLM 呼叫追蹤包裝
- OpenClaw 整合 Langfuse trace

## 模型升級 (統帥批准)
- 生產預設: llama3.2:3b → qwen2.5:7b-instruct
- 摘要任務: llama3.2:3b (速度優先)

## 配置更新
- requirements.txt: +langfuse>=2.0.0
- config.py: +LANGFUSE_* 設定
- models.json: 更新 Ollama 模型配置
- K8s: Secret + ConfigMap 更新

## 審查通過
- 模組化檢查 
- 核心測試 31/31 

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-26 00:32:19 +08:00

1319 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
OpenClaw AI Decision Engine - True LLM + SignOz Integration
============================================================
Phase 5: OpenClaw 實體化升級 (2026-03-21)
統帥校正: SignOz 為唯一全能視力中心
Features:
- 真實 LLM SDK 整合 (Ollama → Gemini → Claude)
- SignOz Gold Metrics 即時擷取 (P99/Error/RPS)
- AIOps Agent 專業人格 (K8s 維運 + SRE RCA 專精)
- 強制結構化 JSON 輸出 (符合 API 契約)
- 動態告警上下文注入 + SignOz 數據
- Shadow Mode 調優指令生成 (日誌輸出,不執行)
防禦性工程鐵律:
- Zero Trust: 預設不信任 LLM 輸出,必須通過 Pydantic 驗證
- Edge Case: 網路失敗、解析失敗、超時處理
- SignOz 失敗時優雅降級 (不阻塞主流程)
"""
import hashlib
import json
import random
import re
import time
from datetime import datetime
import httpx
import structlog
from src.core.config import settings
from src.core.redis_client import get_redis
from src.models.ai import (
OpenClawDecision,
)
from src.services.langfuse_client import langfuse_trace
from src.services.signoz_client import GoldMetrics, get_signoz_client
from src.utils.timezone import now_taipei_iso
logger = structlog.get_logger(__name__)
# =============================================================================
# AIOps Agent System Prompt (專業人格 + 仲裁邏輯 + SignOz 數據)
# =============================================================================
# 責任矩陣定義
RESPONSIBILITY_MATRIX = {
"FE": "前端團隊 (Frontend)",
"BE": "後端團隊 (Backend)",
"INFRA": "基礎設施團隊 (Infrastructure/SRE)",
"DB": "資料庫團隊 (Database/DBA)",
"COLLAB": "協同處理 (需多團隊會診)",
}
# 信心度閾值
CONFIDENCE_THRESHOLD_COLLAB = 0.70 # 低於此閾值自動標記為 COLLAB
OPENCLAW_SYSTEM_PROMPT = """# OpenClaw v7.0 - AWOOOI AI 仲裁官 + SignOz 視力
You are OpenClaw, a senior Site Reliability Engineer (SRE) AI arbitrator with SignOz observability integration.
## 🔬 SignOz Gold Metrics Available
You will receive real-time SignOz metrics for the affected service:
- **RPS (Requests Per Second)**: Current traffic volume and trend
- **Error Rate**: Percentage of 4xx/5xx responses
- **P99 Latency**: 99th percentile response time in ms
Use these metrics to:
1. **Correlate** symptoms with actual traffic patterns
2. **Identify** if it's a traffic spike, degradation, or anomaly
3. **Recommend** data-driven scaling/tuning actions
## 🎯 Your PRIMARY Mission
You are NOT a summarizer. You are an ARBITRATOR who must:
1. **JUDGE** which team is primarily responsible (FE/BE/INFRA/DB)
2. **ANALYZE** root cause with technical depth + SignOz data correlation
3. **RECOMMEND** preventive actions (HPA tuning, cache strategies, circuit breakers)
4. **GENERATE** kubectl commands for auto-tuning (Shadow Mode will log, not execute)
5. **SCORE** your confidence honestly - if unsure, mark as COLLAB
## 📊 Responsibility Definitions
- **FE**: Frontend issues (JS errors, rendering, CDN, static assets)
- **BE**: Backend issues (API errors, business logic, microservices)
- **INFRA**: Infrastructure (K8s, networking, load balancers, certificates)
- **DB**: Database (queries, connections, replication, migrations)
- **COLLAB**: Multiple teams needed OR confidence < 70%
## ⚙️ Auto-Tuning Commands (Shadow Mode)
For each optimization suggestion, provide EXECUTABLE kubectl commands:
- Resource tuning: `kubectl set resources deployment/X --limits=cpu=2,memory=1Gi -n Y`
- HPA: `kubectl autoscale deployment X --cpu-percent=70 --min=2 --max=10 -n Y`
- Scale: `kubectl scale deployment X --replicas=N -n Y`
- Patch: `kubectl patch deployment X -p '{"spec":...}' -n Y`
## ⚠️ Output Rules
- You MUST respond with ONLY valid JSON
- confidence MUST be between 0.0 and 1.0
- If confidence < 0.70, set primary_responsibility to "COLLAB"
- optimization_suggestions MUST contain executable kubectl commands
- Each suggestion needs: type, description, kubectl_or_config (REQUIRED)
## 📋 JSON Schema (REQUIRED)
```json
{
"action_title": "string - 操作標題 (繁體中文)",
"description": "string - 根因分析含 SignOz 數據關聯 (繁體中文)",
"suggested_action": "RESTART_DEPLOYMENT|DELETE_POD|SCALE_DEPLOYMENT|APPLY_HPA|TUNE_RESOURCES|NO_ACTION",
"kubectl_command": "string - 具體的 kubectl 指令",
"target_resource": "string - 目標資源名稱",
"namespace": "string - K8s namespace",
"risk_level": "low|medium|critical",
"blast_radius": {
"affected_pods": "number",
"estimated_downtime": "string",
"related_services": ["array"],
"data_impact": "NONE|READ_ONLY|WRITE|DESTRUCTIVE"
},
"primary_responsibility": "FE|BE|INFRA|DB|COLLAB",
"responsibility_reasoning": "string - 為何判定此團隊負責 (繁體中文)",
"secondary_teams": ["array - 需協助的其他團隊"],
"optimization_suggestions": [
{
"type": "HPA|RESOURCE_LIMIT|CACHE|CIRCUIT_BREAKER|INDEX|CONNECTION_POOL|SCALE",
"description": "string - 預防性建議描述",
"kubectl_or_config": "string - 可執行的 kubectl 指令或配置"
}
],
"reasoning": "string - 決策理由含 SignOz 數據分析",
"deviation_analysis": "string - 基準線偏差分析",
"confidence": "number - 0.0 to 1.0",
"affected_services": ["array"],
"signoz_correlation": "string - SignOz 指標與告警的關聯分析"
}
```
## 🔥 Example: High CPU with SignOz Data
Given SignOz metrics: RPS=150 (↑), Error=0.5%, P99=450ms (↑)
```json
{
"action_title": "擴展副本數 + 配置 HPA 自動擴展",
"description": "api-gateway CPU 飆高SignOz 顯示 RPS 從 80 飆升至 150 (+87%)P99 從 200ms 升至 450ms。流量突增導致資源不足。",
"suggested_action": "SCALE_DEPLOYMENT",
"kubectl_command": "kubectl scale deployment/api-gateway --replicas=4 -n production",
"target_resource": "api-gateway",
"namespace": "production",
"risk_level": "medium",
"blast_radius": {
"affected_pods": 0,
"estimated_downtime": "0",
"related_services": [],
"data_impact": "NONE"
},
"primary_responsibility": "INFRA",
"responsibility_reasoning": "流量突增但 HPA 未配置,屬基礎設施團隊責任",
"secondary_teams": ["BE"],
"optimization_suggestions": [
{
"type": "HPA",
"description": "配置 CPU 基準 HPA閾值 70%,基於 SignOz RPS 趨勢",
"kubectl_or_config": "kubectl autoscale deployment api-gateway --cpu-percent=70 --min=2 --max=10 -n production"
},
{
"type": "RESOURCE_LIMIT",
"description": "增加 CPU limit 以應對流量峰值",
"kubectl_or_config": "kubectl set resources deployment/api-gateway --requests=cpu=500m --limits=cpu=2000m -n production"
}
],
"reasoning": "SignOz 數據顯示流量突增為主因,非代碼問題。先擴容緩解,再配置 HPA 防止復發。",
"deviation_analysis": "RPS +87%P99 延遲 +125%,超出基準線達 +4.2σ",
"confidence": 0.91,
"affected_services": ["api-gateway"],
"signoz_correlation": "RPS 與 CPU 使用率高度相關 (r=0.94)P99 上升為資源競爭導致"
}
```
Now analyze the following alert with SignOz data:
"""
# =============================================================================
# LLM Analysis Result - Using Pydantic for Schema Enforcement
# =============================================================================
# We use OpenClawDecision from models/ai.py for Pydantic validation
# This alias is for backwards compatibility
LLMAnalysisResult = OpenClawDecision
# =============================================================================
# OpenClaw Service
# =============================================================================
class OpenClawService:
"""
OpenClaw AI 決策服務 - True LLM + SignOz Integration
實作 AI_FALLBACK_ORDER 備援機制:
Ollama → Gemini → Claude → Mock
新增 SignOz 整合:
- 自動擷取 Gold Metrics
- 數據驅動的 RCA 分析
- 動態 Trace URL 生成
"""
def __init__(self):
self._http_client: httpx.AsyncClient | None = None
self._signoz = get_signoz_client()
async def _get_client(self) -> httpx.AsyncClient:
"""取得 HTTP 客戶端"""
if self._http_client is None or self._http_client.is_closed:
self._http_client = httpx.AsyncClient(
timeout=httpx.Timeout(120.0, connect=10.0),
)
return self._http_client
async def close(self) -> None:
"""關閉連線"""
if self._http_client:
await self._http_client.aclose()
self._http_client = None
# =========================================================================
# SignOz Integration
# =========================================================================
async def get_signoz_context(
self,
service_name: str,
namespace: str = "default",
alert_timestamp: datetime | None = None,
) -> tuple[GoldMetrics | None, str]:
"""
擷取 SignOz 上下文數據
Returns:
(GoldMetrics, trace_url) or (None, fallback_url)
"""
try:
metrics = await self._signoz.get_gold_metrics(
service_name=service_name,
namespace=namespace,
time_window_minutes=10,
)
trace_url = self._signoz.generate_trace_url(
service_name=service_name,
alert_timestamp=alert_timestamp,
window_minutes=5,
)
logger.info(
"signoz_context_fetched",
service=service_name,
rps=metrics.rps,
error_rate=metrics.error_rate,
p99_latency=metrics.p99_latency_ms,
)
return metrics, trace_url
except Exception as e:
logger.warning(
"signoz_context_fetch_failed",
service=service_name,
error=str(e),
)
# 降級: 返回 None 和靜態 URL
fallback_url = f"{settings.SIGNOZ_URL}/traces?service={service_name}"
return None, fallback_url
def generate_auto_tuning_command(
self,
alert_type: str,
target_resource: str,
namespace: str,
metrics: GoldMetrics | None = None,
) -> dict:
"""
根據告警類型和 SignOz 數據生成調優指令
Shadow Mode: 僅生成指令,不執行
Returns:
{command: str, description: str, type: str}
"""
# 根據告警類型選擇調優策略
if "cpu" in alert_type.lower() or "high_cpu" in alert_type.lower():
# CPU 高 → 擴容或調整 limit
if metrics and metrics.rps > 100:
# 高流量場景 → HPA
return {
"type": "HPA",
"command": f"kubectl autoscale deployment {target_resource} --cpu-percent=70 --min=2 --max=10 -n {namespace}",
"description": f"SignOz RPS={metrics.rps:.0f},配置 HPA 應對流量波動",
}
else:
# 低流量但 CPU 高 → 調整資源
return {
"type": "RESOURCE_LIMIT",
"command": f"kubectl set resources deployment/{target_resource} --limits=cpu=2000m -n {namespace}",
"description": "增加 CPU limit 緩解資源競爭",
}
elif "memory" in alert_type.lower() or "oom" in alert_type.lower():
return {
"type": "RESOURCE_LIMIT",
"command": f"kubectl set resources deployment/{target_resource} --limits=memory=1Gi -n {namespace}",
"description": "增加 Memory limit 防止 OOM",
}
elif "pod_crash" in alert_type.lower() or "crash" in alert_type.lower():
return {
"type": "RESTART",
"command": f"kubectl rollout restart deployment/{target_resource} -n {namespace}",
"description": "滾動重啟清除異常狀態",
}
elif "latency" in alert_type.lower() or "slow" in alert_type.lower():
if metrics and metrics.p99_latency_ms > 500:
return {
"type": "SCALE",
"command": f"kubectl scale deployment {target_resource} --replicas=+2 -n {namespace}",
"description": f"SignOz P99={metrics.p99_latency_ms:.0f}ms擴容分散負載",
}
else:
return {
"type": "CACHE",
"command": "# 檢查 Redis 連線池配置",
"description": "建議增加緩存層減少後端壓力",
}
else:
# 通用: 滾動重啟
return {
"type": "RESTART",
"command": f"kubectl rollout restart deployment/{target_resource} -n {namespace}",
"description": "滾動重啟恢復服務",
}
# =========================================================================
# AI Provider Implementations - Enhanced with Structured Output
# =========================================================================
async def _call_ollama(self, prompt: str) -> tuple[str, bool]:
"""
呼叫本機 Ollama (支援 JSON Mode)
"""
try:
client = await self._get_client()
logger.info(
"ollama_request_start",
url=f"{settings.OLLAMA_URL}/api/generate",
prompt_length=len(prompt),
)
response = await client.post(
f"{settings.OLLAMA_URL}/api/generate",
json={
"model": "qwen2.5:7b-instruct", # 使用更大的模型提高品質
"prompt": prompt,
"stream": False,
"format": "json", # 強制 JSON 輸出
"options": {
"num_predict": 1024, # 增加輸出長度
"temperature": 0.1, # 低溫度確保穩定輸出
"top_p": 0.9,
},
},
timeout=httpx.Timeout(float(settings.OPENCLAW_TIMEOUT), connect=10.0),
)
logger.info(
"ollama_response_received",
status_code=response.status_code,
)
response.raise_for_status()
data = response.json()
result = data.get("response", "")
logger.info(
"ollama_response_parsed",
response_length=len(result),
)
return result, True
except httpx.TimeoutException as e:
logger.warning("ollama_timeout", error=str(e))
return f"Timeout: {e}", False
except Exception as e:
logger.warning(
"ollama_call_failed",
error=str(e),
error_type=type(e).__name__,
)
return str(e), False
async def _call_gemini(self, prompt: str) -> tuple[str, bool]:
"""
呼叫 Google Gemini (支援 JSON Mode)
"""
if not settings.GEMINI_API_KEY:
return "GEMINI_API_KEY not configured", False
try:
client = await self._get_client()
# Gemini 1.5 Flash 支援 JSON Mode
response = await client.post(
f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={settings.GEMINI_API_KEY}",
json={
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"temperature": 0.1,
"maxOutputTokens": 2048,
"responseMimeType": "application/json", # 強制 JSON 輸出
},
},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
text = data["candidates"][0]["content"]["parts"][0]["text"]
logger.info("gemini_response_received", response_length=len(text))
return text, True
except Exception as e:
logger.warning("gemini_call_failed", error=str(e))
return str(e), False
async def _call_claude(self, prompt: str) -> tuple[str, bool]:
"""
呼叫 Anthropic Claude (使用 Tool Use 強制 JSON)
"""
if not settings.CLAUDE_API_KEY:
return "CLAUDE_API_KEY not configured", False
try:
client = await self._get_client()
# Claude 使用 Tool Use 強制結構化輸出
response = await client.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": settings.CLAUDE_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": "claude-3-haiku-20240307",
"max_tokens": 2048,
"messages": [{"role": "user", "content": prompt}],
"tools": [{
"name": "submit_analysis",
"description": "Submit the RCA analysis result in structured format",
"input_schema": {
"type": "object",
"properties": {
"action_title": {"type": "string"},
"description": {"type": "string"},
"suggested_action": {"type": "string", "enum": ["RESTART_DEPLOYMENT", "DELETE_POD", "SCALE_DEPLOYMENT", "NO_ACTION"]},
"kubectl_command": {"type": "string"},
"target_resource": {"type": "string"},
"namespace": {"type": "string"},
"risk_level": {"type": "string", "enum": ["low", "medium", "critical"]},
"blast_radius": {
"type": "object",
"properties": {
"affected_pods": {"type": "integer"},
"estimated_downtime": {"type": "string"},
"related_services": {"type": "array", "items": {"type": "string"}},
"data_impact": {"type": "string", "enum": ["NONE", "READ_ONLY", "WRITE", "DESTRUCTIVE"]}
},
"required": ["affected_pods", "estimated_downtime", "related_services", "data_impact"]
},
"reasoning": {"type": "string"},
"deviation_analysis": {"type": "string"},
"confidence": {"type": "number"},
"affected_services": {"type": "array", "items": {"type": "string"}}
},
"required": ["action_title", "description", "suggested_action", "kubectl_command", "target_resource", "namespace", "risk_level", "blast_radius", "reasoning", "confidence"]
}
}],
"tool_choice": {"type": "tool", "name": "submit_analysis"},
},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
# 從 Tool Use 回應中提取 JSON
for block in data.get("content", []):
if block.get("type") == "tool_use" and block.get("name") == "submit_analysis":
tool_input = block.get("input", {})
logger.info("claude_tool_use_response", input_keys=list(tool_input.keys()))
return json.dumps(tool_input), True
# Fallback: 嘗試從 text 內容提取
for block in data.get("content", []):
if block.get("type") == "text":
return block.get("text", ""), True
return "No valid response from Claude", False
except Exception as e:
logger.warning("claude_call_failed", error=str(e))
return str(e), False
# =========================================================================
# Mock LLM - Intelligent Fallback with SignOz Data
# =========================================================================
def _generate_mock_response(
self,
alert_context: dict,
signoz_metrics: GoldMetrics | None = None,
) -> str:
"""
Mock LLM 回應生成器 - 智能降級 (v7.0 含 SignOz)
根據告警類型和 SignOz 數據動態產生合理的 RCA 分析結果
"""
time.sleep(random.uniform(0.3, 0.8)) # 模擬思考延遲
alert_type = alert_context.get("alert_type", "custom")
severity = alert_context.get("severity", "warning")
target = alert_context.get("target_resource", "unknown-service")
namespace = alert_context.get("namespace", "default")
message = alert_context.get("message", "")
metrics = alert_context.get("metrics", {})
# SignOz 數據整合
signoz_summary = ""
signoz_correlation = "SignOz 數據擷取中..."
if signoz_metrics:
signoz_summary = signoz_metrics.to_summary()
signoz_correlation = (
f"RPS={signoz_metrics.rps:.1f} ({signoz_metrics.rps_trend}), "
f"Error={signoz_metrics.error_rate:.2f}%, "
f"P99={signoz_metrics.p99_latency_ms:.0f}ms"
)
# 生成調優指令
tuning = self.generate_auto_tuning_command(
alert_type=alert_type,
target_resource=target,
namespace=namespace,
metrics=signoz_metrics,
)
# 根據告警類型生成專業 RCA + 仲裁
if "oom" in message.lower() or "memory" in alert_type.lower():
mock_response = {
"action_title": f"刪除異常 Pod {target} (OOMKilled)",
"description": f"🤖 AI 仲裁: {target} 發生 OOMKilled根因為 JVM Heap 配置與 K8s memory limit 不匹配或存在記憶體洩漏。{signoz_summary}",
"suggested_action": "DELETE_POD",
"kubectl_command": f"kubectl delete pod {target} -n {namespace}",
"target_resource": target,
"namespace": namespace,
"risk_level": "critical" if severity == "critical" else "medium",
"blast_radius": {
"affected_pods": 1,
"estimated_downtime": "~30s",
"related_services": ["api-gateway", "downstream-service"],
"data_impact": "NONE"
},
"primary_responsibility": "BE",
"responsibility_reasoning": "OOMKilled 通常源於應用程式記憶體配置不當,屬後端團隊責任範圍",
"secondary_teams": ["INFRA"],
"optimization_suggestions": [
{
"type": "RESOURCE_LIMIT",
"description": "調整 memory limit 至 1Gi 並確保 JVM -Xmx 不超過 70%",
"kubectl_or_config": f"kubectl set resources deployment/{target.rsplit('-', 2)[0]} -c {target.rsplit('-', 2)[0]} --limits=memory=1Gi -n {namespace}"
},
{
"type": "HPA",
"description": "啟用基於記憶體的 HPA 自動擴展",
"kubectl_or_config": f"kubectl autoscale deployment {target.rsplit('-', 2)[0]} --memory-percent=80 --min=2 --max=5 -n {namespace}"
}
],
"reasoning": f"🤖 Pod OOMKilled 後 ReplicaSet 將自動重建,但需同步修正資源配置防止復發。{signoz_correlation}",
"deviation_analysis": f"Memory 使用率 {metrics.get('memory_percent', 99)}%,超出基準線 60% 達 +6.5σ",
"confidence": 0.88,
"affected_services": [target, "api-gateway"],
"signoz_correlation": signoz_correlation,
}
elif "cpu" in alert_type.lower() or "high_cpu" in alert_type:
# 根據 SignOz RPS 調整策略
rps_context = ""
if signoz_metrics and signoz_metrics.rps > 50:
rps_context = f"SignOz 顯示 RPS={signoz_metrics.rps:.0f},流量較高,建議配置 HPA。"
mock_response = {
"action_title": f"擴展 {target} 副本數 + 啟用 HPA",
"description": f"🤖 AI 仲裁: {target} CPU 使用率過高,根因為流量突增或計算密集任務未配置自動擴展。{rps_context}",
"suggested_action": "SCALE_DEPLOYMENT",
"kubectl_command": tuning["command"],
"target_resource": target,
"namespace": namespace,
"risk_level": "medium",
"blast_radius": {
"affected_pods": 0,
"estimated_downtime": "0",
"related_services": [],
"data_impact": "NONE"
},
"primary_responsibility": "INFRA",
"responsibility_reasoning": "自動擴展策略未配置或閾值過高,屬基礎設施團隊責任",
"secondary_teams": ["BE"],
"optimization_suggestions": [
{
"type": tuning["type"],
"description": tuning["description"],
"kubectl_or_config": tuning["command"],
},
{
"type": "RESOURCE_LIMIT",
"description": "增加 CPU request 確保 QoS 為 Guaranteed",
"kubectl_or_config": f"kubectl set resources deployment/{target} --requests=cpu=500m --limits=cpu=2000m -n {namespace}"
}
],
"reasoning": f"🤖 水平擴展可即時分散負載,同時建議配置 HPA 防止復發。{signoz_correlation}",
"deviation_analysis": f"CPU 使用率 {metrics.get('cpu_percent', 95)}%,超出基準線 50% 達 +4.5σ",
"confidence": 0.92,
"affected_services": [target],
"signoz_correlation": signoz_correlation,
}
elif "http" in alert_type.lower() or "5xx" in message.lower() or "502" in message.lower():
mock_response = {
"action_title": f"重啟 {target} + 檢查上游服務",
"description": f"🤖 AI 仲裁: {target} 產生 HTTP 5xx 錯誤,可能為應用程式例外或上游服務不可達。{signoz_summary}",
"suggested_action": "RESTART_DEPLOYMENT",
"kubectl_command": f"kubectl rollout restart deployment/{target} -n {namespace}",
"target_resource": target,
"namespace": namespace,
"risk_level": "critical",
"blast_radius": {
"affected_pods": 3,
"estimated_downtime": "~1 min",
"related_services": ["nginx-ingress", "upstream-api"],
"data_impact": "NONE"
},
"primary_responsibility": "COLLAB",
"responsibility_reasoning": "HTTP 5xx 可能源於前端路由、後端邏輯或基礎設施,需多團隊協同排查",
"secondary_teams": ["FE", "BE", "INFRA"],
"optimization_suggestions": [
{
"type": "CIRCUIT_BREAKER",
"description": "配置熔斷器防止故障擴散",
"kubectl_or_config": "# Istio VirtualService outlierDetection 配置"
},
{
"type": "CACHE",
"description": "增加 Redis 緩存減少上游壓力",
"kubectl_or_config": "# 檢查 Redis 連線池配置,建議 maxTotal=50"
}
],
"reasoning": f"🤖 HTTP 錯誤需協同排查,先重啟恢復服務同時通知相關團隊。{signoz_correlation}",
"deviation_analysis": "錯誤率 5%,超出基準線 0.1% 達 +50σ",
"confidence": 0.65,
"affected_services": [target, "nginx-ingress", "upstream-api"],
"signoz_correlation": signoz_correlation,
}
else:
# 通用異常處理
mock_response = {
"action_title": f"重新啟動 {target} 服務",
"description": f"🤖 AI 仲裁: {target} 發生異常: {message[:80]}。需進一步診斷確認根因。{signoz_summary}",
"suggested_action": "RESTART_DEPLOYMENT",
"kubectl_command": f"kubectl rollout restart deployment/{target} -n {namespace}",
"target_resource": target,
"namespace": namespace,
"risk_level": "critical" if severity == "critical" else "medium",
"blast_radius": {
"affected_pods": 3,
"estimated_downtime": "~1 min",
"related_services": ["dependent-services"],
"data_impact": "NONE"
},
"primary_responsibility": "COLLAB",
"responsibility_reasoning": "告警資訊不足以判定單一責任團隊,建議多團隊協同排查",
"secondary_teams": ["BE", "INFRA"],
"optimization_suggestions": [
{
"type": tuning["type"],
"description": tuning["description"],
"kubectl_or_config": tuning["command"],
}
],
"reasoning": f"🤖 根據告警 {alert_type} 先重啟恢復服務,同時安排深入診斷。{signoz_correlation}",
"deviation_analysis": "監控指標顯示異常偏離基準線",
"confidence": 0.70,
"affected_services": [target],
"signoz_correlation": signoz_correlation,
}
logger.info(
"mock_llm_response_generated",
action_title=mock_response["action_title"],
risk_level=mock_response["risk_level"],
primary_responsibility=mock_response["primary_responsibility"],
confidence=mock_response["confidence"],
signoz_integrated=signoz_metrics is not None,
is_mock=True,
)
return json.dumps(mock_response)
# =========================================================================
# LLM Cache Layer (憲法要求: 嚴禁無快取裸奔)
# =========================================================================
def _generate_cache_key(self, prompt: str, context_hash: str = "") -> str:
"""
生成 LLM 快取鍵
使用 prompt 內容的 SHA256 作為快取鍵,確保相同問題不重複呼叫 LLM
"""
content = f"{prompt}:{context_hash}"
hash_digest = hashlib.sha256(content.encode()).hexdigest()[:16]
return f"llm_cache:{hash_digest}"
async def _call_with_cache(
self,
prompt: str,
alert_context: dict | None = None,
signoz_metrics: GoldMetrics | None = None,
cache_ttl: int = 3600, # 1 hour default
) -> tuple[str, str, bool, bool]:
"""
帶快取的 LLM 呼叫包裝器
憲法條款: 必須使用快取保護算力資源
Args:
prompt: LLM prompt
alert_context: 告警上下文
signoz_metrics: SignOz 指標
cache_ttl: 快取存活時間 (秒)
Returns:
(response, provider, success, from_cache)
"""
# 生成快取鍵 (基於 prompt + alert_context hash)
context_hash = ""
if alert_context:
# 使用告警類型 + 目標資源作為上下文 hash
context_hash = f"{alert_context.get('alert_type', '')}:{alert_context.get('target_resource', '')}"
cache_key = self._generate_cache_key(prompt, context_hash)
# 1. 嘗試從快取讀取
try:
redis_client = get_redis()
cached = await redis_client.get(cache_key)
if cached:
cached_data = json.loads(cached)
logger.info(
"llm_cache_hit",
cache_key=cache_key[:20],
provider=cached_data.get("provider", "cached"),
)
return (
cached_data["response"],
f"{cached_data['provider']}_cached",
True,
True, # from_cache
)
except Exception as e:
logger.warning("llm_cache_read_failed", error=str(e))
# 2. Cache Miss - 呼叫 LLM
logger.info("llm_cache_miss", cache_key=cache_key[:20])
response, provider, success = await self._call_with_fallback(
prompt, alert_context, signoz_metrics
)
# 3. 成功則寫入快取
if success:
try:
redis_client = get_redis()
cache_data = {
"response": response,
"provider": provider,
"cached_at": now_taipei_iso(),
}
await redis_client.set(
cache_key,
json.dumps(cache_data, ensure_ascii=False),
ex=cache_ttl,
)
logger.info(
"llm_cache_write",
cache_key=cache_key[:20],
provider=provider,
ttl=cache_ttl,
)
except Exception as e:
logger.warning("llm_cache_write_failed", error=str(e))
return response, provider, success, False # from_cache=False
# =========================================================================
# Fallback Chain
# =========================================================================
async def _call_with_fallback(
self,
prompt: str,
alert_context: dict | None = None,
signoz_metrics: GoldMetrics | None = None,
) -> tuple[str, str, bool]:
"""
依 AI_FALLBACK_ORDER 順序呼叫 AI
若 MOCK_MODE=True直接回傳模擬結果。
若所有 Provider 失敗fallback 到 Mock。
Phase 15.1: 整合 Langfuse LLMOps 追蹤
"""
# Mock Mode: 開發測試用
if settings.MOCK_MODE:
logger.info("mock_mode_enabled", using="mock_llm")
return self._generate_mock_response(alert_context or {}, signoz_metrics), "mock", True
# Phase 15.1: Langfuse 追蹤整合
with langfuse_trace(
"openclaw_fallback_chain",
metadata={
"prompt_length": len(prompt),
"fallback_order": settings.AI_FALLBACK_ORDER,
"alert_fingerprint": (alert_context or {}).get("fingerprint", "unknown"),
},
) as trace:
for provider in settings.AI_FALLBACK_ORDER:
logger.info("ai_provider_attempt", provider=provider)
start_time = time.time()
model_name = self._get_model_name(provider)
if provider == "ollama":
response, success = await self._call_ollama(prompt)
elif provider == "gemini":
response, success = await self._call_gemini(prompt)
elif provider == "claude":
response, success = await self._call_claude(prompt)
else:
logger.warning("unknown_ai_provider", provider=provider)
continue
latency_ms = (time.time() - start_time) * 1000
# Langfuse: 記錄每次 LLM 呼叫
trace.generation(
name=f"{provider}_call",
model=model_name,
input=prompt[:500], # 截斷避免過長
output=response[:500] if success else f"ERROR: {response[:200]}",
metadata={
"success": success,
"latency_ms": round(latency_ms, 2),
"provider": provider,
},
)
if success:
logger.info("ai_provider_success", provider=provider, latency_ms=latency_ms)
# Langfuse: 記錄成功評分
trace.score(name="provider_success", value=1.0, comment=f"Success via {provider}")
return response, provider, True
logger.warning("ai_provider_failed_fallback", provider=provider, latency_ms=latency_ms)
# 所有 Provider 失敗時fallback 到 Mock (優雅降級)
logger.warning("all_providers_failed_using_mock", fallback="mock_llm")
trace.score(name="provider_success", value=0.0, comment="All providers failed, using mock")
return self._generate_mock_response(alert_context or {}, signoz_metrics), "mock_fallback", True
def _get_model_name(self, provider: str) -> str:
"""取得 provider 對應的模型名稱"""
model_map = {
"ollama": "qwen2.5:7b-instruct",
"gemini": "gemini-1.5-flash",
"claude": "claude-3-haiku-20240307",
}
return model_map.get(provider, provider)
# =========================================================================
# Response Parsing (防禦性解析)
# =========================================================================
def _extract_json_from_response(self, text: str) -> str | None:
"""從 LLM 回應中提取 JSON"""
# 嘗試直接解析
try:
json.loads(text)
return text
except json.JSONDecodeError:
pass
# 嘗試從 markdown code block 提取
patterns = [
r"```json\s*([\s\S]*?)\s*```",
r"```\s*([\s\S]*?)\s*```",
r"\{[\s\S]*\}",
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
candidate = match.group(1) if "```" in pattern else match.group(0)
try:
json.loads(candidate)
return candidate
except json.JSONDecodeError:
continue
return None
def _parse_analysis_result(self, raw_response: str) -> OpenClawDecision | None:
"""
解析 LLM 分析結果 - 使用 Pydantic Schema Enforcement
關鍵blast_radius 為 REQUIRED使用 AIBlastRadius Pydantic 模型驗證
"""
json_str = self._extract_json_from_response(raw_response)
if not json_str:
logger.error("json_extraction_failed", raw_response=raw_response[:200])
return None
try:
data = json.loads(json_str)
# Step 1: 確保 blast_radius 存在且為正確格式
if "blast_radius" not in data or not isinstance(data["blast_radius"], dict):
data["blast_radius"] = {
"affected_pods": 1,
"estimated_downtime": "~30s",
"related_services": data.get("affected_services", []),
"data_impact": "NONE"
}
else:
# 確保 blast_radius 內的必填欄位存在
br = data["blast_radius"]
if "affected_pods" not in br:
br["affected_pods"] = 1
if "estimated_downtime" not in br:
br["estimated_downtime"] = "~30s"
if "related_services" not in br:
br["related_services"] = data.get("affected_services", [])
if "data_impact" not in br:
br["data_impact"] = "NONE"
# Step 2: 填補其他可選欄位
if "action_title" not in data:
data["action_title"] = data.get("action", "未知操作")
if "target_resource" not in data:
data["target_resource"] = "unknown"
if "suggested_action" not in data:
data["suggested_action"] = "NO_ACTION"
# Step 3: 使用 Pydantic 驗證 (會自動正規化 risk_level, data_impact 等)
decision = OpenClawDecision(**data)
logger.info(
"pydantic_validation_success",
action_title=decision.action_title,
risk_level=decision.risk_level.value,
blast_radius_pods=decision.blast_radius.affected_pods,
)
return decision
except Exception as e:
logger.error(
"pydantic_validation_failed",
error=str(e),
json_str=json_str[:300],
)
return None
# =========================================================================
# Main Analysis Methods
# =========================================================================
async def analyze_alert(
self,
alert_context: dict,
) -> tuple[LLMAnalysisResult | None, str, str, GoldMetrics | None, str]:
"""
分析告警並產生 RCA 結果 (含 SignOz 整合)
Args:
alert_context: 告警上下文 (alert_type, severity, target_resource, etc.)
Returns:
(analysis_result, ai_provider, raw_response, signoz_metrics, signoz_trace_url)
"""
# Step 0: 擷取 SignOz 上下文
service_name = alert_context.get("target_resource", "unknown")
namespace = alert_context.get("namespace", "default")
signoz_metrics, signoz_trace_url = await self.get_signoz_context(
service_name=service_name,
namespace=namespace,
)
# 將 SignOz 數據加入 prompt
signoz_context = ""
if signoz_metrics:
signoz_context = f"""
## 📊 SignOz Real-time Metrics (Last 10 min)
{signoz_metrics.to_summary()}
Trace URL: {signoz_trace_url}
"""
# 格式化告警為 Prompt
alert_json = json.dumps(alert_context, ensure_ascii=False, indent=2)
full_prompt = OPENCLAW_SYSTEM_PROMPT + signoz_context + "\n\n## Alert Data:\n" + alert_json
logger.info(
"openclaw_alert_analysis_start",
alert_type=alert_context.get("alert_type"),
target=alert_context.get("target_resource"),
signoz_available=signoz_metrics is not None,
)
# 呼叫 LLM (使用快取層保護算力)
raw_response, provider, success, from_cache = await self._call_with_cache(
full_prompt,
alert_context,
signoz_metrics,
cache_ttl=1800, # 30 min for alert analysis
)
if not success:
logger.error("openclaw_all_providers_failed")
return None, provider, raw_response, signoz_metrics, signoz_trace_url
if from_cache:
logger.info("openclaw_using_cached_response", provider=provider)
logger.info(
"openclaw_llm_response_received",
provider=provider,
response_length=len(raw_response),
)
# 解析結果
result = self._parse_analysis_result(raw_response)
if result:
logger.info(
"openclaw_analysis_complete",
action_title=result.action_title,
risk_level=result.risk_level,
confidence=result.confidence,
provider=provider,
signoz_integrated=signoz_metrics is not None,
)
else:
logger.warning(
"openclaw_analysis_parse_failed",
raw_response=raw_response[:300],
)
return result, provider, raw_response, signoz_metrics, signoz_trace_url
# =========================================================================
# Phase 6.4: LLM Proposal Generation
# =========================================================================
async def generate_incident_proposal(
self,
incident_id: str,
severity: str,
signals: list[dict],
affected_services: list[str],
) -> tuple[dict | None, str, bool]:
"""
為 Incident 生成 LLM-based 修復提案
Phase 6.4: 賦予大腦「生成解決方案」的思考能力
Args:
incident_id: Incident ID
severity: 嚴重度 (P0/P1/P2/P3)
signals: 關聯的告警訊號
affected_services: 受影響服務
Returns:
(proposal_dict, provider, success)
proposal_dict 包含:
- action: 建議動作
- description: 動作描述
- kubectl_command: kubectl 指令
- risk_level: 風險等級
- reasoning: LLM 推理過程
"""
# 建構 prompt
signal_summary = "\n".join([
f"- {s.get('alert_name', 'unknown')}: {s.get('description', 'N/A')}"
for s in signals[:10] # 最多 10 筆
])
target = affected_services[0] if affected_services else "unknown-service"
# 擷取 SignOz 指標
signoz_metrics, signoz_trace_url = await self.get_signoz_context(
service_name=target,
namespace="awoooi-prod",
)
signoz_context = ""
if signoz_metrics:
signoz_context = f"""
## 📊 SignOz Real-time Metrics
{signoz_metrics.to_summary()}
"""
proposal_prompt = f"""{OPENCLAW_SYSTEM_PROMPT}
{signoz_context}
## 🚨 Incident Context
- **Incident ID**: {incident_id}
- **Severity**: {severity}
- **Affected Services**: {', '.join(affected_services)}
- **Signal Count**: {len(signals)}
## 📋 Alert Signals
{signal_summary}
## 🎯 Your Task
Based on the above incident and signals, generate a remediation proposal.
You MUST respond with ONLY valid JSON following the schema above.
Focus on:
1. Root cause analysis based on signals and SignOz data
2. Specific kubectl command to remediate
3. Risk assessment for the proposed action
4. Preventive recommendations
"""
logger.info(
"proposal_generation_start",
incident_id=incident_id,
severity=severity,
signal_count=len(signals),
signoz_available=signoz_metrics is not None,
)
# 使用快取呼叫 LLM
alert_context = {
"incident_id": incident_id,
"alert_type": signals[0].get("alert_name", "incident") if signals else "incident",
"target_resource": target,
"severity": severity,
}
raw_response, provider, success, from_cache = await self._call_with_cache(
proposal_prompt,
alert_context,
signoz_metrics,
cache_ttl=3600, # 1 hour for proposals
)
if not success:
logger.error(
"proposal_generation_failed",
incident_id=incident_id,
provider=provider,
)
return None, provider, False
# 解析 LLM 結果
result = self._parse_analysis_result(raw_response)
if result:
logger.info(
"proposal_generation_complete",
incident_id=incident_id,
action_title=result.action_title,
risk_level=result.risk_level,
provider=provider,
from_cache=from_cache,
)
# 轉換為 proposal dict (optimization_suggestions 是 list[dict])
proposal_dict = {
"action": result.action_title,
"description": result.description,
"kubectl_command": result.kubectl_command,
"target_resource": result.target_resource,
"namespace": result.namespace,
"risk_level": result.risk_level,
"reasoning": result.reasoning,
"confidence": result.confidence,
"primary_responsibility": result.primary_responsibility,
"optimization_suggestions": [
{
"type": s.get("type", "UNKNOWN"),
"description": s.get("description", ""),
"kubectl_or_config": s.get("kubectl_or_config", ""),
}
for s in result.optimization_suggestions
],
"signoz_correlation": result.signoz_correlation,
"from_cache": from_cache,
"provider": provider,
}
return proposal_dict, provider, True
logger.warning(
"proposal_parse_failed",
incident_id=incident_id,
raw_response=raw_response[:300],
)
return None, provider, False
# =========================================================================
# Shadow Mode Auto-Tuning
# =========================================================================
async def execute_auto_tuning(
self,
approval_id: str,
kubectl_command: str,
description: str,
) -> dict:
"""
執行自動調優 (Shadow Mode: 僅日誌輸出)
統帥鐵律: Shadow Mode 下嚴禁實際執行 K8s 命令
Args:
approval_id: 簽核單 ID
kubectl_command: kubectl 指令
description: 操作描述
Returns:
{executed: bool, shadow_mode: bool, command: str, log: str}
"""
if settings.SHADOW_MODE_ENABLED:
# Shadow Mode: 僅記錄,不執行
log_message = f"[SHADOW MODE] AI 生成的調優指令:{kubectl_command}"
logger.info(
"shadow_mode_auto_tuning",
approval_id=approval_id,
command=kubectl_command,
description=description,
executed=False,
)
print(f"\n{'='*60}")
print(log_message)
print(f"描述: {description}")
print(f"簽核單: {approval_id}")
print(f"{'='*60}\n")
return {
"executed": False,
"shadow_mode": True,
"command": kubectl_command,
"description": description,
"log": log_message,
}
else:
# 生產模式: 實際執行 (需要額外安全檢查)
logger.warning(
"auto_tuning_execution_attempted",
approval_id=approval_id,
command=kubectl_command,
message="Production execution not yet implemented - requires multi-sig approval",
)
return {
"executed": False,
"shadow_mode": False,
"command": kubectl_command,
"description": description,
"log": "Production execution requires multi-sig approval",
}
# =============================================================================
# Singleton
# =============================================================================
_openclaw: OpenClawService | None = None
def get_openclaw() -> OpenClawService:
"""取得全域 OpenClaw 實例"""
global _openclaw
if _openclaw is None:
_openclaw = OpenClawService()
return _openclaw
async def close_openclaw() -> None:
"""關閉 OpenClaw 連線"""
global _openclaw
if _openclaw:
await _openclaw.close()
_openclaw = None
# =============================================================================
# Phase 5 + SignOz Integration Complete
# =============================================================================