Files
awoooi/apps/api/src/services/openclaw.py
OG T d469a239af fix(ai): 移除 confidence 預設值,強制 LLM 真實計算
變更:
1. models/ai.py: confidence 改為 REQUIRED (移除 default=0.8)
2. openclaw.py: 如果 LLM 沒輸出 confidence,設為 0.5 + COLLAB

根本原因:
- 原本 Pydantic default=0.8 導致信心分數永遠是 80%
- 現在強制 LLM 必須計算真實信心分數

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-28 22:21:29 +08:00

1408 lines
56 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
OpenClaw AI Decision Engine - True LLM + SignOz Integration
============================================================
Phase 5: OpenClaw 實體化升級 (2026-03-21)
統帥校正: SignOz 為唯一全能視力中心
Features:
- 真實 LLM SDK 整合 (Ollama → Gemini → Claude)
- SignOz Gold Metrics 即時擷取 (P99/Error/RPS)
- AIOps Agent 專業人格 (K8s 維運 + SRE RCA 專精)
- 強制結構化 JSON 輸出 (符合 API 契約)
- 動態告警上下文注入 + SignOz 數據
- Shadow Mode 調優指令生成 (日誌輸出,不執行)
防禦性工程鐵律:
- Zero Trust: 預設不信任 LLM 輸出,必須通過 Pydantic 驗證
- Edge Case: 網路失敗、解析失敗、超時處理
- SignOz 失敗時優雅降級 (不阻塞主流程)
"""
import hashlib
import json
import random
import re
import time
from datetime import datetime
import httpx
import structlog
from src.core.config import settings
from src.core.prompts import OPENCLAW_SYSTEM_PROMPT
from src.core.redis_client import get_redis
from src.models.ai import (
OpenClawDecision,
)
from src.services.langfuse_client import langfuse_trace
from src.services.model_registry import get_model_registry
from src.services.signoz_client import GoldMetrics, get_signoz_client
from src.utils.k8s_naming import normalize_resource_name
from src.utils.timezone import now_taipei_iso
logger = structlog.get_logger(__name__)
# =============================================================================
# AIOps Agent System Prompt (專業人格 + 仲裁邏輯 + SignOz 數據)
# =============================================================================
# 責任矩陣定義
RESPONSIBILITY_MATRIX = {
"FE": "前端團隊 (Frontend)",
"BE": "後端團隊 (Backend)",
"INFRA": "基礎設施團隊 (Infrastructure/SRE)",
"DB": "資料庫團隊 (Database/DBA)",
"COLLAB": "協同處理 (需多團隊會診)",
}
# 信心度閾值
CONFIDENCE_THRESHOLD_COLLAB = 0.70 # 低於此閾值自動標記為 COLLAB
# OPENCLAW_SYSTEM_PROMPT 已移至 src/core/prompts.py (Phase 17 P2 改進)
# =============================================================================
# LLM Analysis Result - Using Pydantic for Schema Enforcement
# =============================================================================
# We use OpenClawDecision from models/ai.py for Pydantic validation
# This alias is for backwards compatibility
LLMAnalysisResult = OpenClawDecision
# =============================================================================
# OpenClaw Service
# =============================================================================
class OpenClawService:
"""
OpenClaw AI 決策服務 - True LLM + SignOz Integration
實作 AI_FALLBACK_ORDER 備援機制:
Ollama → Gemini → Claude → Mock
新增 SignOz 整合:
- 自動擷取 Gold Metrics
- 數據驅動的 RCA 分析
- 動態 Trace URL 生成
"""
def __init__(self):
self._http_client: httpx.AsyncClient | None = None
self._signoz = get_signoz_client()
async def _get_client(self) -> httpx.AsyncClient:
"""取得 HTTP 客戶端"""
if self._http_client is None or self._http_client.is_closed:
self._http_client = httpx.AsyncClient(
timeout=httpx.Timeout(120.0, connect=10.0),
)
return self._http_client
async def close(self) -> None:
"""關閉連線"""
if self._http_client:
await self._http_client.aclose()
self._http_client = None
# =========================================================================
# SignOz Integration
# =========================================================================
async def get_signoz_context(
self,
service_name: str,
namespace: str = "default",
alert_timestamp: datetime | None = None,
) -> tuple[GoldMetrics | None, str]:
"""
擷取 SignOz 上下文數據
Returns:
(GoldMetrics, trace_url) or (None, fallback_url)
"""
try:
metrics = await self._signoz.get_gold_metrics(
service_name=service_name,
namespace=namespace,
time_window_minutes=10,
)
trace_url = self._signoz.generate_trace_url(
service_name=service_name,
alert_timestamp=alert_timestamp,
window_minutes=5,
)
logger.info(
"signoz_context_fetched",
service=service_name,
rps=metrics.rps,
error_rate=metrics.error_rate,
p99_latency=metrics.p99_latency_ms,
)
return metrics, trace_url
except Exception as e:
logger.warning(
"signoz_context_fetch_failed",
service=service_name,
error=str(e),
)
# 降級: 返回 None 和靜態 URL
fallback_url = f"{settings.SIGNOZ_URL}/traces?service={service_name}"
return None, fallback_url
def generate_auto_tuning_command(
self,
alert_type: str,
target_resource: str,
namespace: str,
metrics: GoldMetrics | None = None,
) -> dict:
"""
根據告警類型和 SignOz 數據生成調優指令
Shadow Mode: 僅生成指令,不執行
Phase 18.1.6: 整合 K8s 資源名稱驗證 (ADR-016)
Returns:
{command: str, description: str, type: str}
"""
# Phase 18.1.6: 先正規化資源名稱
normalized = normalize_resource_name(target_resource, namespace)
if not normalized.is_k8s_resource:
# 非 K8s 資源,返回提示訊息
logger.info(
"non_k8s_resource_detected",
original=target_resource,
note=normalized.note,
)
return {
"type": "MANUAL",
"command": f"# 非 K8s 資源: {target_resource}",
"description": f"此資源不在 K8s 中,需人工處理。{normalized.note or ''}",
}
# 使用正規化後的名稱
resolved_name = normalized.normalized or target_resource
resolved_ns = normalized.namespace or namespace
if normalized.confidence < 0.8:
logger.warning(
"low_confidence_resource_name",
original=target_resource,
resolved=resolved_name,
confidence=normalized.confidence,
)
# 根據告警類型選擇調優策略 (使用正規化後的名稱)
if "cpu" in alert_type.lower() or "high_cpu" in alert_type.lower():
# CPU 高 → 擴容或調整 limit
if metrics and metrics.rps > 100:
# 高流量場景 → HPA
return {
"type": "HPA",
"command": f"kubectl autoscale deployment {resolved_name} --cpu-percent=70 --min=2 --max=10 -n {resolved_ns}",
"description": f"SignOz RPS={metrics.rps:.0f},配置 HPA 應對流量波動",
}
else:
# 低流量但 CPU 高 → 調整資源
return {
"type": "RESOURCE_LIMIT",
"command": f"kubectl set resources deployment/{resolved_name} --limits=cpu=2000m -n {resolved_ns}",
"description": "增加 CPU limit 緩解資源競爭",
}
elif "memory" in alert_type.lower() or "oom" in alert_type.lower():
return {
"type": "RESOURCE_LIMIT",
"command": f"kubectl set resources deployment/{resolved_name} --limits=memory=1Gi -n {resolved_ns}",
"description": "增加 Memory limit 防止 OOM",
}
elif "pod_crash" in alert_type.lower() or "crash" in alert_type.lower():
return {
"type": "RESTART",
"command": f"kubectl rollout restart deployment/{resolved_name} -n {resolved_ns}",
"description": "滾動重啟清除異常狀態",
}
elif "latency" in alert_type.lower() or "slow" in alert_type.lower():
if metrics and metrics.p99_latency_ms > 500:
return {
"type": "SCALE",
"command": f"kubectl scale deployment {resolved_name} --replicas=+2 -n {resolved_ns}",
"description": f"SignOz P99={metrics.p99_latency_ms:.0f}ms擴容分散負載",
}
else:
return {
"type": "CACHE",
"command": "# 檢查 Redis 連線池配置",
"description": "建議增加緩存層減少後端壓力",
}
else:
# 通用: 滾動重啟
return {
"type": "RESTART",
"command": f"kubectl rollout restart deployment/{resolved_name} -n {resolved_ns}",
"description": "滾動重啟恢復服務",
}
# =========================================================================
# AI Provider Implementations - Enhanced with Structured Output
# =========================================================================
async def _call_ollama(self, prompt: str) -> tuple[str, bool]:
"""
呼叫本機 Ollama (支援 JSON Mode)
"""
try:
client = await self._get_client()
logger.info(
"ollama_request_start",
url=f"{settings.OLLAMA_URL}/api/generate",
prompt_length=len(prompt),
)
# 從 ModelRegistry 取得模型配置
registry = get_model_registry()
model_name = registry.get_model("ollama", "rca")
options = registry.get_provider_options("ollama")
response = await client.post(
f"{settings.OLLAMA_URL}/api/generate",
json={
"model": model_name,
"prompt": prompt,
"stream": False,
"format": "json", # 強制 JSON 輸出
"options": {
"num_predict": options.get("num_predict", 1024),
"temperature": options.get("temperature", 0.1),
"top_p": options.get("top_p", 0.9),
},
},
timeout=httpx.Timeout(float(settings.OPENCLAW_TIMEOUT), connect=10.0),
)
logger.info(
"ollama_response_received",
status_code=response.status_code,
)
response.raise_for_status()
data = response.json()
result = data.get("response", "")
logger.info(
"ollama_response_parsed",
response_length=len(result),
)
return result, True
except httpx.TimeoutException as e:
logger.warning("ollama_timeout", error=str(e))
return f"Timeout: {e}", False
except Exception as e:
logger.warning(
"ollama_call_failed",
error=str(e),
error_type=type(e).__name__,
)
return str(e), False
async def _call_gemini(self, prompt: str) -> tuple[str, bool, int, float]:
"""
呼叫 Google Gemini (支援 JSON Mode)
Returns:
tuple: (response_text, success, total_tokens, cost_usd)
- response_text: LLM 回應文本
- success: 是否成功
- total_tokens: 使用的 Token 總數
- cost_usd: 預估成本 (USD)
2026-03-29 ogt: 加入 Token/Cost 追蹤
"""
if not settings.GEMINI_API_KEY:
return "GEMINI_API_KEY not configured", False, 0, 0.0
try:
client = await self._get_client()
# 從 ModelRegistry 取得模型配置
registry = get_model_registry()
model_name = registry.get_model("gemini", "rca")
response = await client.post(
f"https://generativelanguage.googleapis.com/v1beta/models/{model_name}:generateContent?key={settings.GEMINI_API_KEY}",
json={
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {
"temperature": 0.1,
"maxOutputTokens": 2048,
"responseMimeType": "application/json", # 強制 JSON 輸出
},
},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
text = data["candidates"][0]["content"]["parts"][0]["text"]
# 2026-03-29 ogt: 擷取 Token 使用量
usage_metadata = data.get("usageMetadata", {})
prompt_tokens = usage_metadata.get("promptTokenCount", 0)
completion_tokens = usage_metadata.get("candidatesTokenCount", 0)
total_tokens = usage_metadata.get("totalTokenCount", prompt_tokens + completion_tokens)
# Gemini 1.5 Flash 定價 (per 1M tokens)
# Input: $0.075 / 1M, Output: $0.30 / 1M
cost_usd = (prompt_tokens * 0.000000075) + (completion_tokens * 0.0000003)
logger.info(
"gemini_response_received",
response_length=len(text),
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
cost_usd=f"${cost_usd:.6f}",
)
return text, True, total_tokens, cost_usd
except Exception as e:
logger.warning("gemini_call_failed", error=str(e))
return str(e), False, 0, 0.0
async def _call_claude(self, prompt: str) -> tuple[str, bool]:
"""
呼叫 Anthropic Claude (使用 Tool Use 強制 JSON)
"""
if not settings.CLAUDE_API_KEY:
return "CLAUDE_API_KEY not configured", False
try:
client = await self._get_client()
# Claude 使用 Tool Use 強制結構化輸出
response = await client.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": settings.CLAUDE_API_KEY,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": "claude-3-haiku-20240307",
"max_tokens": 2048,
"messages": [{"role": "user", "content": prompt}],
"tools": [{
"name": "submit_analysis",
"description": "Submit the RCA analysis result in structured format",
"input_schema": {
"type": "object",
"properties": {
"action_title": {"type": "string"},
"description": {"type": "string"},
"suggested_action": {"type": "string", "enum": ["RESTART_DEPLOYMENT", "DELETE_POD", "SCALE_DEPLOYMENT", "NO_ACTION"]},
"kubectl_command": {"type": "string"},
"target_resource": {"type": "string"},
"namespace": {"type": "string"},
"risk_level": {"type": "string", "enum": ["low", "medium", "critical"]},
"blast_radius": {
"type": "object",
"properties": {
"affected_pods": {"type": "integer"},
"estimated_downtime": {"type": "string"},
"related_services": {"type": "array", "items": {"type": "string"}},
"data_impact": {"type": "string", "enum": ["NONE", "READ_ONLY", "WRITE", "DESTRUCTIVE"]}
},
"required": ["affected_pods", "estimated_downtime", "related_services", "data_impact"]
},
"reasoning": {"type": "string"},
"deviation_analysis": {"type": "string"},
"confidence": {"type": "number"},
"affected_services": {"type": "array", "items": {"type": "string"}}
},
"required": ["action_title", "description", "suggested_action", "kubectl_command", "target_resource", "namespace", "risk_level", "blast_radius", "reasoning", "confidence"]
}
}],
"tool_choice": {"type": "tool", "name": "submit_analysis"},
},
timeout=30.0,
)
response.raise_for_status()
data = response.json()
# 從 Tool Use 回應中提取 JSON
for block in data.get("content", []):
if block.get("type") == "tool_use" and block.get("name") == "submit_analysis":
tool_input = block.get("input", {})
logger.info("claude_tool_use_response", input_keys=list(tool_input.keys()))
return json.dumps(tool_input), True
# Fallback: 嘗試從 text 內容提取
for block in data.get("content", []):
if block.get("type") == "text":
return block.get("text", ""), True
return "No valid response from Claude", False
except Exception as e:
logger.warning("claude_call_failed", error=str(e))
return str(e), False
# =========================================================================
# Mock LLM - Intelligent Fallback with SignOz Data
# =========================================================================
def _generate_mock_response(
self,
alert_context: dict,
signoz_metrics: GoldMetrics | None = None,
) -> str:
"""
Mock LLM 回應生成器 - 智能降級 (v7.0 含 SignOz)
根據告警類型和 SignOz 數據動態產生合理的 RCA 分析結果
"""
time.sleep(random.uniform(0.3, 0.8)) # 模擬思考延遲
alert_type = alert_context.get("alert_type", "custom")
severity = alert_context.get("severity", "warning")
raw_target = alert_context.get("target_resource", "unknown-service")
raw_namespace = alert_context.get("namespace", "default")
message = alert_context.get("message", "")
metrics = alert_context.get("metrics", {})
# Phase 18.1: 正規化資源名稱 (ADR-016)
# 確保 kubectl 指令使用有效的 K8s 名稱
normalized = normalize_resource_name(raw_target, raw_namespace)
if normalized.is_k8s_resource and normalized.normalized:
target = normalized.normalized
namespace = normalized.namespace or raw_namespace
logger.info(
"mock_response_resource_normalized",
original=raw_target,
normalized=target,
namespace=namespace,
)
else:
target = raw_target
namespace = raw_namespace
# SignOz 數據整合
signoz_summary = ""
signoz_correlation = "SignOz 數據擷取中..."
if signoz_metrics:
signoz_summary = signoz_metrics.to_summary()
signoz_correlation = (
f"RPS={signoz_metrics.rps:.1f} ({signoz_metrics.rps_trend}), "
f"Error={signoz_metrics.error_rate:.2f}%, "
f"P99={signoz_metrics.p99_latency_ms:.0f}ms"
)
# 生成調優指令
tuning = self.generate_auto_tuning_command(
alert_type=alert_type,
target_resource=target,
namespace=namespace,
metrics=signoz_metrics,
)
# 根據告警類型生成專業 RCA + 仲裁
if "oom" in message.lower() or "memory" in alert_type.lower():
mock_response = {
"action_title": f"刪除異常 Pod {target} (OOMKilled)",
"description": f"🤖 AI 仲裁: {target} 發生 OOMKilled根因為 JVM Heap 配置與 K8s memory limit 不匹配或存在記憶體洩漏。{signoz_summary}",
"suggested_action": "DELETE_POD",
"kubectl_command": f"kubectl delete pod {target} -n {namespace}",
"target_resource": target,
"namespace": namespace,
"risk_level": "critical" if severity == "critical" else "medium",
"blast_radius": {
"affected_pods": 1,
"estimated_downtime": "~30s",
"related_services": ["api-gateway", "downstream-service"],
"data_impact": "NONE"
},
"primary_responsibility": "BE",
"responsibility_reasoning": "OOMKilled 通常源於應用程式記憶體配置不當,屬後端團隊責任範圍",
"secondary_teams": ["INFRA"],
"optimization_suggestions": [
{
"type": "RESOURCE_LIMIT",
"description": "調整 memory limit 至 1Gi 並確保 JVM -Xmx 不超過 70%",
"kubectl_or_config": f"kubectl set resources deployment/{target.rsplit('-', 2)[0]} -c {target.rsplit('-', 2)[0]} --limits=memory=1Gi -n {namespace}"
},
{
"type": "HPA",
"description": "啟用基於記憶體的 HPA 自動擴展",
"kubectl_or_config": f"kubectl autoscale deployment {target.rsplit('-', 2)[0]} --memory-percent=80 --min=2 --max=5 -n {namespace}"
}
],
"reasoning": f"🤖 Pod OOMKilled 後 ReplicaSet 將自動重建,但需同步修正資源配置防止復發。{signoz_correlation}",
"deviation_analysis": f"Memory 使用率 {metrics.get('memory_percent', 99)}%,超出基準線 60% 達 +6.5σ",
"confidence": 0.88,
"affected_services": [target, "api-gateway"],
"signoz_correlation": signoz_correlation,
}
elif "cpu" in alert_type.lower() or "high_cpu" in alert_type:
# 根據 SignOz RPS 調整策略
rps_context = ""
if signoz_metrics and signoz_metrics.rps > 50:
rps_context = f"SignOz 顯示 RPS={signoz_metrics.rps:.0f},流量較高,建議配置 HPA。"
mock_response = {
"action_title": f"擴展 {target} 副本數 + 啟用 HPA",
"description": f"🤖 AI 仲裁: {target} CPU 使用率過高,根因為流量突增或計算密集任務未配置自動擴展。{rps_context}",
"suggested_action": "SCALE_DEPLOYMENT",
"kubectl_command": tuning["command"],
"target_resource": target,
"namespace": namespace,
"risk_level": "medium",
"blast_radius": {
"affected_pods": 0,
"estimated_downtime": "0",
"related_services": [],
"data_impact": "NONE"
},
"primary_responsibility": "INFRA",
"responsibility_reasoning": "自動擴展策略未配置或閾值過高,屬基礎設施團隊責任",
"secondary_teams": ["BE"],
"optimization_suggestions": [
{
"type": tuning["type"],
"description": tuning["description"],
"kubectl_or_config": tuning["command"],
},
{
"type": "RESOURCE_LIMIT",
"description": "增加 CPU request 確保 QoS 為 Guaranteed",
"kubectl_or_config": f"kubectl set resources deployment/{target} --requests=cpu=500m --limits=cpu=2000m -n {namespace}"
}
],
"reasoning": f"🤖 水平擴展可即時分散負載,同時建議配置 HPA 防止復發。{signoz_correlation}",
"deviation_analysis": f"CPU 使用率 {metrics.get('cpu_percent', 95)}%,超出基準線 50% 達 +4.5σ",
"confidence": 0.92,
"affected_services": [target],
"signoz_correlation": signoz_correlation,
}
elif "http" in alert_type.lower() or "5xx" in message.lower() or "502" in message.lower():
mock_response = {
"action_title": f"重啟 {target} + 檢查上游服務",
"description": f"🤖 AI 仲裁: {target} 產生 HTTP 5xx 錯誤,可能為應用程式例外或上游服務不可達。{signoz_summary}",
"suggested_action": "RESTART_DEPLOYMENT",
"kubectl_command": f"kubectl rollout restart deployment/{target} -n {namespace}",
"target_resource": target,
"namespace": namespace,
"risk_level": "critical",
"blast_radius": {
"affected_pods": 3,
"estimated_downtime": "~1 min",
"related_services": ["nginx-ingress", "upstream-api"],
"data_impact": "NONE"
},
"primary_responsibility": "COLLAB",
"responsibility_reasoning": "HTTP 5xx 可能源於前端路由、後端邏輯或基礎設施,需多團隊協同排查",
"secondary_teams": ["FE", "BE", "INFRA"],
"optimization_suggestions": [
{
"type": "CIRCUIT_BREAKER",
"description": "配置熔斷器防止故障擴散",
"kubectl_or_config": "# Istio VirtualService outlierDetection 配置"
},
{
"type": "CACHE",
"description": "增加 Redis 緩存減少上游壓力",
"kubectl_or_config": "# 檢查 Redis 連線池配置,建議 maxTotal=50"
}
],
"reasoning": f"🤖 HTTP 錯誤需協同排查,先重啟恢復服務同時通知相關團隊。{signoz_correlation}",
"deviation_analysis": "錯誤率 5%,超出基準線 0.1% 達 +50σ",
"confidence": 0.65,
"affected_services": [target, "nginx-ingress", "upstream-api"],
"signoz_correlation": signoz_correlation,
}
else:
# 通用異常處理
mock_response = {
"action_title": f"重新啟動 {target} 服務",
"description": f"🤖 AI 仲裁: {target} 發生異常: {message[:80]}。需進一步診斷確認根因。{signoz_summary}",
"suggested_action": "RESTART_DEPLOYMENT",
"kubectl_command": f"kubectl rollout restart deployment/{target} -n {namespace}",
"target_resource": target,
"namespace": namespace,
"risk_level": "critical" if severity == "critical" else "medium",
"blast_radius": {
"affected_pods": 3,
"estimated_downtime": "~1 min",
"related_services": ["dependent-services"],
"data_impact": "NONE"
},
"primary_responsibility": "COLLAB",
"responsibility_reasoning": "告警資訊不足以判定單一責任團隊,建議多團隊協同排查",
"secondary_teams": ["BE", "INFRA"],
"optimization_suggestions": [
{
"type": tuning["type"],
"description": tuning["description"],
"kubectl_or_config": tuning["command"],
}
],
"reasoning": f"🤖 根據告警 {alert_type} 先重啟恢復服務,同時安排深入診斷。{signoz_correlation}",
"deviation_analysis": "監控指標顯示異常偏離基準線",
"confidence": 0.70,
"affected_services": [target],
"signoz_correlation": signoz_correlation,
}
logger.info(
"mock_llm_response_generated",
action_title=mock_response["action_title"],
risk_level=mock_response["risk_level"],
primary_responsibility=mock_response["primary_responsibility"],
confidence=mock_response["confidence"],
signoz_integrated=signoz_metrics is not None,
is_mock=True,
)
return json.dumps(mock_response)
# =========================================================================
# LLM Cache Layer (憲法要求: 嚴禁無快取裸奔)
# =========================================================================
def _generate_cache_key(self, prompt: str, context_hash: str = "") -> str:
"""
生成 LLM 快取鍵
使用 prompt 內容的 SHA256 作為快取鍵,確保相同問題不重複呼叫 LLM
"""
content = f"{prompt}:{context_hash}"
hash_digest = hashlib.sha256(content.encode()).hexdigest()[:16]
return f"llm_cache:{hash_digest}"
async def _call_with_cache(
self,
prompt: str,
alert_context: dict | None = None,
signoz_metrics: GoldMetrics | None = None,
cache_ttl: int = 3600, # 1 hour default
) -> tuple[str, str, bool, bool, int, float]:
"""
帶快取的 LLM 呼叫包裝器
憲法條款: 必須使用快取保護算力資源
Args:
prompt: LLM prompt
alert_context: 告警上下文
signoz_metrics: SignOz 指標
cache_ttl: 快取存活時間 (秒)
Returns:
(response, provider, success, from_cache, total_tokens, cost_usd)
2026-03-29 ogt: 加入 Token/Cost 追蹤
"""
# 生成快取鍵 (基於 prompt + alert_context hash)
context_hash = ""
if alert_context:
# 使用告警類型 + 目標資源作為上下文 hash
context_hash = f"{alert_context.get('alert_type', '')}:{alert_context.get('target_resource', '')}"
cache_key = self._generate_cache_key(prompt, context_hash)
# 1. 嘗試從快取讀取
try:
redis_client = get_redis()
cached = await redis_client.get(cache_key)
if cached:
cached_data = json.loads(cached)
logger.info(
"llm_cache_hit",
cache_key=cache_key[:20],
provider=cached_data.get("provider", "cached"),
)
return (
cached_data["response"],
f"{cached_data['provider']}_cached",
True,
True, # from_cache
0, # tokens (cache hit, no new tokens)
0.0, # cost (cache hit, no cost)
)
except Exception as e:
logger.warning("llm_cache_read_failed", error=str(e))
# 2. Cache Miss - 呼叫 LLM
logger.info("llm_cache_miss", cache_key=cache_key[:20])
response, provider, success, total_tokens, cost_usd = await self._call_with_fallback(
prompt, alert_context, signoz_metrics
)
# 3. 成功則寫入快取
if success:
try:
redis_client = get_redis()
cache_data = {
"response": response,
"provider": provider,
"cached_at": now_taipei_iso(),
}
await redis_client.set(
cache_key,
json.dumps(cache_data, ensure_ascii=False),
ex=cache_ttl,
)
logger.info(
"llm_cache_write",
cache_key=cache_key[:20],
provider=provider,
ttl=cache_ttl,
)
except Exception as e:
logger.warning("llm_cache_write_failed", error=str(e))
return response, provider, success, False, total_tokens, cost_usd # from_cache=False
# =========================================================================
# Public LLM Interface (ILLMProvider Protocol)
# =========================================================================
async def call(self, prompt: str) -> tuple[str, str, bool]:
"""
呼叫 LLM (ILLMProvider Protocol 實作)
#39 Error Analyzer Agent 使用此方法
Args:
prompt: 完整的 prompt
Returns:
(response, provider, success)
"""
return await self._call_with_fallback(prompt)
# =========================================================================
# Fallback Chain
# =========================================================================
async def _call_with_fallback(
self,
prompt: str,
alert_context: dict | None = None,
signoz_metrics: GoldMetrics | None = None,
) -> tuple[str, str, bool, int, float]:
"""
依 AI_FALLBACK_ORDER 順序呼叫 AI
若 MOCK_MODE=True直接回傳模擬結果。
若所有 Provider 失敗fallback 到 Mock。
Returns:
tuple: (response, provider, success, total_tokens, cost_usd)
Phase 15.1: 整合 Langfuse LLMOps 追蹤
2026-03-29 ogt: 加入 Token/Cost 追蹤
"""
# Mock Mode: 開發測試用
if settings.MOCK_MODE:
logger.info("mock_mode_enabled", using="mock_llm")
return self._generate_mock_response(alert_context or {}, signoz_metrics), "mock", True, 0, 0.0
# Phase 15.1 + 15.3: Langfuse 追蹤整合 + SignOz Deep Linking
with langfuse_trace(
"openclaw_fallback_chain",
metadata={
"prompt_length": len(prompt),
"fallback_order": settings.AI_FALLBACK_ORDER,
"alert_fingerprint": (alert_context or {}).get("fingerprint", "unknown"),
},
) as trace:
# Phase 15.3: SignOz → Langfuse 反向連結
# 在當前 OTEL span 中記錄 Langfuse trace_id
if trace.langfuse_trace_id:
from opentelemetry import trace as otel_trace
from src.core.deep_linking import DeepLinking
current_span = otel_trace.get_current_span()
if current_span:
current_span.set_attribute("langfuse.trace_id", trace.langfuse_trace_id)
current_span.set_attribute(
"langfuse.trace_url",
DeepLinking.langfuse_trace_url(trace.langfuse_trace_id),
)
# Phase 13.2: Rate Limiter 整合 (2026-03-26)
# 防止雲端 API 用量暴衝,超限自動降級
from src.services.ai_rate_limiter import get_ai_rate_limiter
rate_limiter = get_ai_rate_limiter()
for provider in settings.AI_FALLBACK_ORDER:
# Rate Limit 檢查 (gemini/claude 需檢查ollama 不限)
if provider in ("gemini", "claude"):
allowed, reason = await rate_limiter.check_and_increment(provider)
if not allowed:
logger.warning(
"ai_rate_limit_skip",
provider=provider,
reason=reason,
)
continue # 跳過此 provider嘗試下一個
logger.info("ai_provider_attempt", provider=provider)
start_time = time.time()
model_name = self._get_model_name(provider)
# 2026-03-29 ogt: Gemini 回傳 4 值 (含 token/cost),其他 Provider 補 0
total_tokens = 0
cost_usd = 0.0
if provider == "ollama":
response, success = await self._call_ollama(prompt)
elif provider == "gemini":
response, success, total_tokens, cost_usd = await self._call_gemini(prompt)
elif provider == "claude":
response, success = await self._call_claude(prompt)
else:
logger.warning("unknown_ai_provider", provider=provider)
continue
latency_ms = (time.time() - start_time) * 1000
# Langfuse: 記錄每次 LLM 呼叫
trace.generation(
name=f"{provider}_call",
model=model_name,
input=prompt[:500], # 截斷避免過長
output=response[:500] if success else f"ERROR: {response[:200]}",
metadata={
"success": success,
"latency_ms": round(latency_ms, 2),
"provider": provider,
"total_tokens": total_tokens,
"cost_usd": cost_usd,
},
)
if success:
logger.info(
"ai_provider_success",
provider=provider,
latency_ms=latency_ms,
total_tokens=total_tokens,
cost_usd=f"${cost_usd:.6f}",
)
# Langfuse: 記錄成功評分
trace.score(name="provider_success", value=1.0, comment=f"Success via {provider}")
return response, provider, True, total_tokens, cost_usd
logger.warning("ai_provider_failed_fallback", provider=provider, latency_ms=latency_ms)
# 所有 Provider 失敗時fallback 到 Mock (優雅降級)
logger.warning("all_providers_failed_using_mock", fallback="mock_llm")
trace.score(name="provider_success", value=0.0, comment="All providers failed, using mock")
return self._generate_mock_response(alert_context or {}, signoz_metrics), "mock_fallback", True, 0, 0.0
def _get_model_name(self, provider: str) -> str:
"""取得 provider 對應的模型名稱 (從 ModelRegistry)"""
registry = get_model_registry()
return registry.get_model(provider, "rca")
# =========================================================================
# Response Parsing (防禦性解析)
# =========================================================================
def _extract_json_from_response(self, text: str) -> str | None:
"""從 LLM 回應中提取 JSON"""
# 嘗試直接解析
try:
json.loads(text)
return text
except json.JSONDecodeError:
pass
# 嘗試從 markdown code block 提取
patterns = [
r"```json\s*([\s\S]*?)\s*```",
r"```\s*([\s\S]*?)\s*```",
r"\{[\s\S]*\}",
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
candidate = match.group(1) if "```" in pattern else match.group(0)
try:
json.loads(candidate)
return candidate
except json.JSONDecodeError:
continue
return None
def _parse_analysis_result(self, raw_response: str) -> OpenClawDecision | None:
"""
解析 LLM 分析結果 - 使用 Pydantic Schema Enforcement
關鍵blast_radius 為 REQUIRED使用 AIBlastRadius Pydantic 模型驗證
"""
json_str = self._extract_json_from_response(raw_response)
if not json_str:
logger.error("json_extraction_failed", raw_response=raw_response[:200])
return None
try:
data = json.loads(json_str)
# Step 1: 確保 blast_radius 存在且為正確格式
if "blast_radius" not in data or not isinstance(data["blast_radius"], dict):
data["blast_radius"] = {
"affected_pods": 1,
"estimated_downtime": "~30s",
"related_services": data.get("affected_services", []),
"data_impact": "NONE"
}
else:
# 確保 blast_radius 內的必填欄位存在
br = data["blast_radius"]
if "affected_pods" not in br:
br["affected_pods"] = 1
if "estimated_downtime" not in br:
br["estimated_downtime"] = "~30s"
if "related_services" not in br:
br["related_services"] = data.get("affected_services", [])
if "data_impact" not in br:
br["data_impact"] = "NONE"
# Step 2: 填補其他可選欄位
if "action_title" not in data:
data["action_title"] = data.get("action", "未知操作")
if "target_resource" not in data:
data["target_resource"] = "unknown"
if "suggested_action" not in data:
data["suggested_action"] = "NO_ACTION"
# Step 2.5: 2026-03-29 ogt - 強制 confidence 必須由 LLM 輸出
# 如果 LLM 沒有輸出 confidence強制設為 0.5 並標記為 COLLAB
if "confidence" not in data or not isinstance(data["confidence"], (int, float)):
logger.warning(
"llm_missing_confidence",
raw_confidence=data.get("confidence"),
forcing_collab=True,
)
data["confidence"] = 0.5 # 低信心分數
data["primary_responsibility"] = "COLLAB" # 強制協作處理
# Step 3: 使用 Pydantic 驗證 (會自動正規化 risk_level, data_impact 等)
decision = OpenClawDecision(**data)
logger.info(
"pydantic_validation_success",
action_title=decision.action_title,
risk_level=decision.risk_level.value,
blast_radius_pods=decision.blast_radius.affected_pods,
)
return decision
except Exception as e:
logger.error(
"pydantic_validation_failed",
error=str(e),
json_str=json_str[:300],
)
return None
# =========================================================================
# Main Analysis Methods
# =========================================================================
async def analyze_alert(
self,
alert_context: dict,
) -> tuple[LLMAnalysisResult | None, str, str, GoldMetrics | None, str, int, float]:
"""
分析告警並產生 RCA 結果 (含 SignOz 整合)
Args:
alert_context: 告警上下文 (alert_type, severity, target_resource, etc.)
Returns:
(analysis_result, ai_provider, raw_response, signoz_metrics, signoz_trace_url, total_tokens, cost_usd)
2026-03-29 ogt: 加入 Token/Cost 追蹤
"""
# Step 0: 擷取 SignOz 上下文
service_name = alert_context.get("target_resource", "unknown")
namespace = alert_context.get("namespace", "default")
signoz_metrics, signoz_trace_url = await self.get_signoz_context(
service_name=service_name,
namespace=namespace,
)
# 將 SignOz 數據加入 prompt
signoz_context = ""
if signoz_metrics:
signoz_context = f"""
## 📊 SignOz Real-time Metrics (Last 10 min)
{signoz_metrics.to_summary()}
Trace URL: {signoz_trace_url}
"""
# 格式化告警為 Prompt
alert_json = json.dumps(alert_context, ensure_ascii=False, indent=2)
full_prompt = OPENCLAW_SYSTEM_PROMPT + signoz_context + "\n\n## Alert Data:\n" + alert_json
logger.info(
"openclaw_alert_analysis_start",
alert_type=alert_context.get("alert_type"),
target=alert_context.get("target_resource"),
signoz_available=signoz_metrics is not None,
)
# 呼叫 LLM (使用快取層保護算力)
raw_response, provider, success, from_cache, total_tokens, cost_usd = await self._call_with_cache(
full_prompt,
alert_context,
signoz_metrics,
cache_ttl=1800, # 30 min for alert analysis
)
if not success:
logger.error("openclaw_all_providers_failed")
return None, provider, raw_response, signoz_metrics, signoz_trace_url, 0, 0.0
if from_cache:
logger.info("openclaw_using_cached_response", provider=provider)
logger.info(
"openclaw_llm_response_received",
provider=provider,
response_length=len(raw_response),
)
# 解析結果
result = self._parse_analysis_result(raw_response)
if result:
logger.info(
"openclaw_analysis_complete",
action_title=result.action_title,
risk_level=result.risk_level,
confidence=result.confidence,
provider=provider,
signoz_integrated=signoz_metrics is not None,
)
else:
logger.warning(
"openclaw_analysis_parse_failed",
raw_response=raw_response[:300],
)
return result, provider, raw_response, signoz_metrics, signoz_trace_url, total_tokens, cost_usd
# =========================================================================
# Phase 6.4: LLM Proposal Generation
# =========================================================================
async def generate_incident_proposal(
self,
incident_id: str,
severity: str,
signals: list[dict],
affected_services: list[str],
expert_context: dict | None = None,
) -> tuple[dict | None, str, bool]:
"""
為 Incident 生成 LLM-based 修復提案
Phase 6.4: 賦予大腦「生成解決方案」的思考能力
2026-03-27: 整合 Expert System 診斷上下文
Args:
incident_id: Incident ID
severity: 嚴重度 (P0/P1/P2/P3)
signals: 關聯的告警訊號
affected_services: 受影響服務
expert_context: Expert System 初步診斷 (可選)
- initial_diagnosis: 規則匹配結果
- diagnosis_description: 診斷描述
- suggested_diagnosis_commands: 建議診斷指令
- expert_confidence: 信心分數
- requires_human_review: 是否需人工介入
Returns:
(proposal_dict, provider, success)
proposal_dict 包含:
- action: 建議動作
- description: 動作描述
- kubectl_command: kubectl 指令
- risk_level: 風險等級
- reasoning: LLM 推理過程
"""
# 建構 prompt
signal_summary = "\n".join([
f"- {s.get('alert_name', 'unknown')}: {s.get('description', 'N/A')}"
for s in signals[:10] # 最多 10 筆
])
target = affected_services[0] if affected_services else "unknown-service"
# 擷取 SignOz 指標
signoz_metrics, signoz_trace_url = await self.get_signoz_context(
service_name=target,
namespace="awoooi-prod",
)
signoz_context = ""
if signoz_metrics:
signoz_context = f"""
## 📊 SignOz Real-time Metrics
{signoz_metrics.to_summary()}
"""
# 2026-03-27: 整合 Expert System 診斷上下文
# 2026-03-26: ADR-030 Phase 2 - 加入 K8s/SignOz 診斷上下文
expert_diagnosis_context = ""
if expert_context:
diagnosis_cmds = expert_context.get("suggested_diagnosis_commands", [])
diagnosis_cmds_str = "\n".join([f" - `{cmd}`" for cmd in diagnosis_cmds]) if diagnosis_cmds else " - (無)"
# ADR-030: 加入完整診斷上下文 (如果有)
full_diagnosis = expert_context.get("diagnosis_context", "")
diagnosis_signals = expert_context.get("diagnosis_signals", [])
signals_summary = ""
if diagnosis_signals:
signals_summary = "\n".join([
f" - [{s.get('severity', 'info').upper()}] {s.get('source', 'unknown')}: {s.get('message', 'N/A')[:100]}"
for s in diagnosis_signals[:5]
])
expert_diagnosis_context = f"""
## 🔍 Expert System Initial Diagnosis
- **Matched Rule**: {expert_context.get('initial_diagnosis', 'unknown')}
- **Diagnosis**: {expert_context.get('diagnosis_description', 'N/A')}
- **Confidence**: {expert_context.get('expert_confidence', 0.5):.0%}
- **Requires Human Review**: {'Yes' if expert_context.get('requires_human_review') else 'No'}
- **Suggested Diagnosis Commands**:
{diagnosis_cmds_str}
{f'''## 🩺 K8s/SignOz Deep Diagnosis (ADR-030)
{full_diagnosis}
### Diagnosis Signals
{signals_summary if signals_summary else " - (No signals detected)"}
''' if full_diagnosis else ''}
**IMPORTANT**: The Expert System and Diagnostic Aggregator have provided context.
Consider this data but apply your own analysis. If Expert says "human review required",
provide diagnostic guidance rather than automated fixes.
"""
proposal_prompt = f"""{OPENCLAW_SYSTEM_PROMPT}
{signoz_context}
{expert_diagnosis_context}
## 🚨 Incident Context
- **Incident ID**: {incident_id}
- **Severity**: {severity}
- **Affected Services**: {', '.join(affected_services)}
- **Signal Count**: {len(signals)}
## 📋 Alert Signals
{signal_summary}
## 🎯 Your Task
Based on the above incident, signals, and Expert System diagnosis, generate a remediation proposal.
You MUST respond with ONLY valid JSON following the schema above.
Focus on:
1. Root cause analysis based on signals, SignOz data, and Expert diagnosis
2. Specific kubectl command to remediate (or diagnostic command if root cause unclear)
3. Risk assessment for the proposed action
4. Preventive recommendations
5. If Expert System flagged "human review required", prioritize diagnostic commands over fixes
"""
logger.info(
"proposal_generation_start",
incident_id=incident_id,
severity=severity,
signal_count=len(signals),
signoz_available=signoz_metrics is not None,
)
# 使用快取呼叫 LLM
alert_context = {
"incident_id": incident_id,
"alert_type": signals[0].get("alert_name", "incident") if signals else "incident",
"target_resource": target,
"severity": severity,
}
raw_response, provider, success, from_cache = await self._call_with_cache(
proposal_prompt,
alert_context,
signoz_metrics,
cache_ttl=3600, # 1 hour for proposals
)
if not success:
logger.error(
"proposal_generation_failed",
incident_id=incident_id,
provider=provider,
)
return None, provider, False
# 解析 LLM 結果
result = self._parse_analysis_result(raw_response)
if result:
logger.info(
"proposal_generation_complete",
incident_id=incident_id,
action_title=result.action_title,
risk_level=result.risk_level,
provider=provider,
from_cache=from_cache,
)
# 轉換為 proposal dict (optimization_suggestions 是 list[dict])
proposal_dict = {
"action": result.action_title,
"description": result.description,
"kubectl_command": result.kubectl_command,
"target_resource": result.target_resource,
"namespace": result.namespace,
"risk_level": result.risk_level,
"reasoning": result.reasoning,
"confidence": result.confidence,
"primary_responsibility": result.primary_responsibility,
"optimization_suggestions": [
{
"type": s.get("type", "UNKNOWN"),
"description": s.get("description", ""),
"kubectl_or_config": s.get("kubectl_or_config", ""),
}
for s in result.optimization_suggestions
],
"signoz_correlation": result.signoz_correlation,
"from_cache": from_cache,
"provider": provider,
}
return proposal_dict, provider, True
logger.warning(
"proposal_parse_failed",
incident_id=incident_id,
raw_response=raw_response[:300],
)
return None, provider, False
# =========================================================================
# Shadow Mode Auto-Tuning
# =========================================================================
async def execute_auto_tuning(
self,
approval_id: str,
kubectl_command: str,
description: str,
) -> dict:
"""
執行自動調優 (Shadow Mode: 僅日誌輸出)
統帥鐵律: Shadow Mode 下嚴禁實際執行 K8s 命令
Args:
approval_id: 簽核單 ID
kubectl_command: kubectl 指令
description: 操作描述
Returns:
{executed: bool, shadow_mode: bool, command: str, log: str}
"""
if settings.SHADOW_MODE_ENABLED:
# Shadow Mode: 僅記錄,不執行
log_message = f"[SHADOW MODE] AI 生成的調優指令:{kubectl_command}"
logger.info(
"shadow_mode_auto_tuning",
approval_id=approval_id,
command=kubectl_command,
description=description,
executed=False,
)
print(f"\n{'='*60}")
print(log_message)
print(f"描述: {description}")
print(f"簽核單: {approval_id}")
print(f"{'='*60}\n")
return {
"executed": False,
"shadow_mode": True,
"command": kubectl_command,
"description": description,
"log": log_message,
}
else:
# 生產模式: 實際執行 (需要額外安全檢查)
logger.warning(
"auto_tuning_execution_attempted",
approval_id=approval_id,
command=kubectl_command,
message="Production execution not yet implemented - requires multi-sig approval",
)
return {
"executed": False,
"shadow_mode": False,
"command": kubectl_command,
"description": description,
"log": "Production execution requires multi-sig approval",
}
# =============================================================================
# Singleton
# =============================================================================
_openclaw: OpenClawService | None = None
def get_openclaw() -> OpenClawService:
"""取得全域 OpenClaw 實例"""
global _openclaw
if _openclaw is None:
_openclaw = OpenClawService()
return _openclaw
async def close_openclaw() -> None:
"""關閉 OpenClaw 連線"""
global _openclaw
if _openclaw:
await _openclaw.close()
_openclaw = None
# =============================================================================
# Phase 5 + SignOz Integration Complete
# =============================================================================