""" OpenClaw AI Decision Engine - True LLM + SignOz Integration ============================================================ Phase 5: OpenClaw 實體化升級 (2026-03-21) 統帥校正: SignOz 為唯一全能視力中心 Features: - 真實 LLM SDK 整合 (Ollama → Gemini → Claude) - SignOz Gold Metrics 即時擷取 (P99/Error/RPS) - AIOps Agent 專業人格 (K8s 維運 + SRE RCA 專精) - 強制結構化 JSON 輸出 (符合 API 契約) - 動態告警上下文注入 + SignOz 數據 - Shadow Mode 調優指令生成 (日誌輸出,不執行) 防禦性工程鐵律: - Zero Trust: 預設不信任 LLM 輸出,必須通過 Pydantic 驗證 - Edge Case: 網路失敗、解析失敗、超時處理 - SignOz 失敗時優雅降級 (不阻塞主流程) """ import hashlib import json import random import re import time from datetime import datetime import httpx import structlog from src.core.config import settings from src.core.prompts import NEMOTRON_SYSTEM_PROMPT, OPENCLAW_SYSTEM_PROMPT from src.core.redis_client import get_redis from src.models.ai import ( OpenClawDecision, ) from src.services.langfuse_client import langfuse_trace from src.services.model_registry import get_model_registry from src.services.signoz_client import GoldMetrics, get_signoz_client from src.utils.k8s_naming import normalize_resource_name from src.utils.timezone import now_taipei_iso logger = structlog.get_logger(__name__) # ============================================================================= # AIOps Agent System Prompt (專業人格 + 仲裁邏輯 + SignOz 數據) # ============================================================================= # 責任矩陣定義 RESPONSIBILITY_MATRIX = { "FE": "前端團隊 (Frontend)", "BE": "後端團隊 (Backend)", "INFRA": "基礎設施團隊 (Infrastructure/SRE)", "DB": "資料庫團隊 (Database/DBA)", "COLLAB": "協同處理 (需多團隊會診)", } # 信心度閾值 CONFIDENCE_THRESHOLD_COLLAB = 0.70 # 低於此閾值自動標記為 COLLAB # OPENCLAW_SYSTEM_PROMPT 已移至 src/core/prompts.py (Phase 17 P2 改進) # ============================================================================= # LLM Analysis Result - Using Pydantic for Schema Enforcement # ============================================================================= # We use OpenClawDecision from models/ai.py for Pydantic validation # This alias is for backwards compatibility LLMAnalysisResult = OpenClawDecision # ============================================================================= # kubectl_command 回填 helper # 2026-04-09 Claude Sonnet 4.6: I2 架構Review修復 — 補齊所有 tool 類型,消除兩處重複邏輯 (M3) # ============================================================================= def _backfill_kubectl_command(proposal: dict, tools: list) -> None: """將 AI tool call 結果回填為可執行的 kubectl_command。 proposal["kubectl_command"] 若已有值則不覆蓋(LLM 直接填的優先)。 """ if not tools or proposal.get("kubectl_command"): return _t = tools[0] _tool_name = _t.get("tool", "") _args = _t.get("args", {}) _ns = _args.get("namespace", proposal.get("namespace", "awoooi-prod")) if _tool_name == "restart_deployment": _deploy = _args.get("deployment_name", proposal.get("target_resource", "")) if _deploy: proposal["kubectl_command"] = f"kubectl rollout restart deployment/{_deploy} -n {_ns}" elif _tool_name == "delete_pod": _pod = _args.get("pod_name", "") if _pod: proposal["kubectl_command"] = f"kubectl delete pod {_pod} -n {_ns}" elif _tool_name == "scale_deployment": _deploy = _args.get("deployment_name", "") _replicas = _args.get("replicas", 2) if _deploy: proposal["kubectl_command"] = f"kubectl scale deployment/{_deploy} --replicas={_replicas} -n {_ns}" elif _tool_name == "delete_deployment": _deploy = _args.get("deployment_name", "") if _deploy: proposal["kubectl_command"] = f"kubectl delete deployment/{_deploy} -n {_ns}" elif _tool_name == "drain_node": _node = _args.get("node_name", "") if _node: proposal["kubectl_command"] = f"kubectl drain {_node} --ignore-daemonsets --delete-emptydir-data" elif _tool_name == "cordon_node": _node = _args.get("node_name", "") if _node: proposal["kubectl_command"] = f"kubectl cordon {_node}" elif _tool_name == "delete_service": _svc = _args.get("service_name", "") if _svc: proposal["kubectl_command"] = f"kubectl delete service/{_svc} -n {_ns}" # ============================================================================= # OpenClaw Service # ============================================================================= class OpenClawService: """ OpenClaw AI 決策服務 - True LLM + SignOz Integration 實作 AI_FALLBACK_ORDER 備援機制: Ollama → Gemini → Claude → Mock 新增 SignOz 整合: - 自動擷取 Gold Metrics - 數據驅動的 RCA 分析 - 動態 Trace URL 生成 """ def __init__(self): self._http_client: httpx.AsyncClient | None = None self._signoz = get_signoz_client() async def _get_client(self) -> httpx.AsyncClient: """取得 HTTP 客戶端""" if self._http_client is None or self._http_client.is_closed: self._http_client = httpx.AsyncClient( timeout=httpx.Timeout(120.0, connect=10.0), ) return self._http_client async def close(self) -> None: """關閉連線""" if self._http_client: await self._http_client.aclose() self._http_client = None # ========================================================================= # SignOz Integration # ========================================================================= async def get_signoz_context( self, service_name: str, namespace: str = "default", alert_timestamp: datetime | None = None, ) -> tuple[GoldMetrics | None, str]: """ 擷取 SignOz 上下文數據 Returns: (GoldMetrics, trace_url) or (None, fallback_url) """ try: metrics = await self._signoz.get_gold_metrics( service_name=service_name, namespace=namespace, time_window_minutes=10, ) trace_url = self._signoz.generate_trace_url( service_name=service_name, alert_timestamp=alert_timestamp, window_minutes=5, ) logger.info( "signoz_context_fetched", service=service_name, rps=metrics.rps, error_rate=metrics.error_rate, p99_latency=metrics.p99_latency_ms, ) return metrics, trace_url except Exception as e: logger.warning( "signoz_context_fetch_failed", service=service_name, error=str(e), ) # 降級: 返回 None 和靜態 URL fallback_url = f"{settings.SIGNOZ_URL}/traces?service={service_name}" return None, fallback_url def generate_auto_tuning_command( self, alert_type: str, target_resource: str, namespace: str, metrics: GoldMetrics | None = None, ) -> dict: """ 根據告警類型和 SignOz 數據生成調優指令 Shadow Mode: 僅生成指令,不執行 Phase 18.1.6: 整合 K8s 資源名稱驗證 (ADR-016) Returns: {command: str, description: str, type: str} """ # Phase 18.1.6: 先正規化資源名稱 normalized = normalize_resource_name(target_resource, namespace) if not normalized.is_k8s_resource: # 非 K8s 資源,返回提示訊息 logger.info( "non_k8s_resource_detected", original=target_resource, note=normalized.note, ) return { "type": "MANUAL", "command": f"# 非 K8s 資源: {target_resource}", "description": f"此資源不在 K8s 中,需人工處理。{normalized.note or ''}", } # 使用正規化後的名稱 resolved_name = normalized.normalized or target_resource resolved_ns = normalized.namespace or namespace if normalized.confidence < 0.8: logger.warning( "low_confidence_resource_name", original=target_resource, resolved=resolved_name, confidence=normalized.confidence, ) # 根據告警類型選擇調優策略 (使用正規化後的名稱) if "cpu" in alert_type.lower() or "high_cpu" in alert_type.lower(): # CPU 高 → 擴容或調整 limit if metrics and metrics.rps > 100: # 高流量場景 → HPA return { "type": "HPA", "command": f"kubectl autoscale deployment {resolved_name} --cpu-percent=70 --min=2 --max=10 -n {resolved_ns}", "description": f"SignOz RPS={metrics.rps:.0f},配置 HPA 應對流量波動", } else: # 低流量但 CPU 高 → 調整資源 return { "type": "RESOURCE_LIMIT", "command": f"kubectl set resources deployment/{resolved_name} --limits=cpu=2000m -n {resolved_ns}", "description": "增加 CPU limit 緩解資源競爭", } elif "memory" in alert_type.lower() or "oom" in alert_type.lower(): return { "type": "RESOURCE_LIMIT", "command": f"kubectl set resources deployment/{resolved_name} --limits=memory=1Gi -n {resolved_ns}", "description": "增加 Memory limit 防止 OOM", } elif "pod_crash" in alert_type.lower() or "crash" in alert_type.lower(): return { "type": "RESTART", "command": f"kubectl rollout restart deployment/{resolved_name} -n {resolved_ns}", "description": "滾動重啟清除異常狀態", } elif "latency" in alert_type.lower() or "slow" in alert_type.lower(): if metrics and metrics.p99_latency_ms > 500: return { "type": "SCALE", "command": f"kubectl scale deployment {resolved_name} --replicas=+2 -n {resolved_ns}", "description": f"SignOz P99={metrics.p99_latency_ms:.0f}ms,擴容分散負載", } else: return { "type": "CACHE", "command": "# 檢查 Redis 連線池配置", "description": "建議增加緩存層減少後端壓力", } else: # 通用: 滾動重啟 return { "type": "RESTART", "command": f"kubectl rollout restart deployment/{resolved_name} -n {resolved_ns}", "description": "滾動重啟恢復服務", } # ========================================================================= # ========================================================================= # OpenClaw (Nemo) 委派仲裁 — 架構鐵律 # 2026-04-01 ogt: AWOOOI API 不直接打 LLM,委派給 OpenClaw (192.168.0.188:8089) # ========================================================================= async def _call_openclaw_analyze( self, incident_id: str, severity: str, signals: list[dict], affected_services: list[str], expert_context: dict | None = None, ) -> dict | None: """ 委派 Incident RCA 給 OpenClaw (Nemo) API Returns: proposal_dict if success, None if failed (fallback to direct LLM) """ try: client = await self._get_client() import json as _json def _to_serializable(obj): if isinstance(obj, dict): return {k: _to_serializable(v) for k, v in obj.items()} if isinstance(obj, list): return [_to_serializable(i) for i in obj] try: _json.dumps(obj) return obj except (TypeError, ValueError): return str(obj) payload = { "incident_id": incident_id, "severity": severity, "signals": _to_serializable(signals[:5]), "affected_services": affected_services, "expert_context": _to_serializable(expert_context) if expert_context else None, } resp = await client.post( f"{settings.OPENCLAW_URL}/api/v1/analyze/incident", json=payload, timeout=httpx.Timeout(130.0, connect=5.0), ) resp.raise_for_status() data = resp.json() # 驗證必要欄位 if not data.get("action_title") or not data.get("risk_level"): logger.warning("openclaw_analyze_invalid_response", incident_id=incident_id) return None logger.info( "openclaw_analyze_success", incident_id=incident_id, confidence=data.get("confidence", 0), risk_level=data.get("risk_level"), ) # 轉換為 AWOOOI API proposal dict 格式 return { "action": data.get("action_title", "AI 分析"), "description": data.get("description", ""), "kubectl_command": data.get("kubectl_command") or "", "target_resource": data.get("target_resource", "unknown"), "namespace": data.get("namespace", "awoooi-prod"), "risk_level": data.get("risk_level", "medium"), "reasoning": data.get("reasoning", ""), "confidence": data.get("confidence", 0.8), "primary_responsibility": data.get("primary_responsibility", "INFRA"), "optimization_suggestions": data.get("optimization_suggestions", []), "signoz_correlation": data.get("signoz_correlation", ""), "from_cache": False, "provider": "openclaw_nemo", "ai_tokens": 0, "ai_cost": 0.0, } except Exception as e: logger.warning( "openclaw_analyze_failed", incident_id=incident_id, error=str(e), reason="fallback to direct LLM", ) return None # ========================================================================= # [ARCHIVED Phase 24 B4 — 2026-04-03 ogt] # 以下三個方法 (_call_ollama/_call_gemini/_call_claude) 為舊版 fallback chain # 新路徑: USE_AI_ROUTER=true → _call_with_fallback → AIRouterExecutor (ai_router.py) # 新 Provider 實作: apps/api/src/services/ai_providers/ (OllamaProvider/GeminiProvider/ClaudeProvider) # 回滾保留: USE_AI_ROUTER=false 時仍由 _call_with_fallback (line ~993) 呼叫此區塊 # 完整移除時機: Phase 24 完整驗收後 (ADR-052 D11) # ========================================================================= async def _call_ollama(self, prompt: str) -> tuple[str, bool]: """ 呼叫本機 Ollama (支援 JSON Mode) """ try: client = await self._get_client() logger.info( "ollama_request_start", url=f"{settings.OLLAMA_URL}/api/generate", prompt_length=len(prompt), ) # 從 ModelRegistry 取得模型配置 registry = get_model_registry() model_name = registry.get_model("ollama", "rca") options = registry.get_provider_options("ollama") response = await client.post( f"{settings.OLLAMA_URL}/api/generate", json={ "model": model_name, "prompt": prompt, "stream": False, "format": "json", # 強制 JSON 輸出 "options": { "num_predict": options.get("num_predict", 1024), "temperature": options.get("temperature", 0.1), "top_p": options.get("top_p", 0.9), }, }, timeout=httpx.Timeout(float(settings.OPENCLAW_TIMEOUT), connect=10.0), ) logger.info( "ollama_response_received", status_code=response.status_code, ) response.raise_for_status() data = response.json() result = data.get("response", "") logger.info( "ollama_response_parsed", response_length=len(result), ) return result, True except httpx.TimeoutException as e: logger.warning("ollama_timeout", error=str(e)) return f"Timeout: {e}", False except Exception as e: logger.warning( "ollama_call_failed", error=str(e), error_type=type(e).__name__, ) return str(e), False async def _call_gemini(self, prompt: str) -> tuple[str, bool, int, float]: """ 呼叫 Google Gemini (支援 JSON Mode) Returns: tuple: (response_text, success, total_tokens, cost_usd) - response_text: LLM 回應文本 - success: 是否成功 - total_tokens: 使用的 Token 總數 - cost_usd: 預估成本 (USD) 2026-03-29 ogt: 加入 Token/Cost 追蹤 """ if not settings.GEMINI_API_KEY: return "GEMINI_API_KEY not configured", False, 0, 0.0 try: client = await self._get_client() # 從 ModelRegistry 取得模型配置 registry = get_model_registry() model_name = registry.get_model("gemini", "rca") response = await client.post( f"https://generativelanguage.googleapis.com/v1beta/models/{model_name}:generateContent?key={settings.GEMINI_API_KEY}", json={ "contents": [{"parts": [{"text": prompt}]}], "generationConfig": { "temperature": 0.1, "maxOutputTokens": 2048, "responseMimeType": "application/json", # 強制 JSON 輸出 }, }, timeout=30.0, ) response.raise_for_status() data = response.json() text = data["candidates"][0]["content"]["parts"][0]["text"] # 2026-03-29 ogt: 擷取 Token 使用量 usage_metadata = data.get("usageMetadata", {}) prompt_tokens = usage_metadata.get("promptTokenCount", 0) completion_tokens = usage_metadata.get("candidatesTokenCount", 0) total_tokens = usage_metadata.get("totalTokenCount", prompt_tokens + completion_tokens) # Gemini 1.5 Flash 定價 (per 1M tokens) # Input: $0.075 / 1M, Output: $0.30 / 1M cost_usd = (prompt_tokens * 0.000000075) + (completion_tokens * 0.0000003) logger.info( "gemini_response_received", response_length=len(text), prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=total_tokens, cost_usd=f"${cost_usd:.6f}", ) return text, True, total_tokens, cost_usd except Exception as e: logger.warning("gemini_call_failed", error=str(e)) return str(e), False, 0, 0.0 async def _call_claude(self, prompt: str) -> tuple[str, bool]: """ 呼叫 Anthropic Claude (使用 Tool Use 強制 JSON) """ if not settings.CLAUDE_API_KEY: return "CLAUDE_API_KEY not configured", False try: client = await self._get_client() # Claude 使用 Tool Use 強制結構化輸出 response = await client.post( "https://api.anthropic.com/v1/messages", headers={ "x-api-key": settings.CLAUDE_API_KEY, "anthropic-version": "2023-06-01", "content-type": "application/json", }, json={ "model": "claude-3-haiku-20240307", "max_tokens": 2048, "messages": [{"role": "user", "content": prompt}], "tools": [{ "name": "submit_analysis", "description": "Submit the RCA analysis result in structured format", "input_schema": { "type": "object", "properties": { "action_title": {"type": "string"}, "description": {"type": "string"}, "suggested_action": {"type": "string", "enum": ["RESTART_DEPLOYMENT", "DELETE_POD", "SCALE_DEPLOYMENT", "NO_ACTION"]}, "kubectl_command": {"type": "string"}, "target_resource": {"type": "string"}, "namespace": {"type": "string"}, "risk_level": {"type": "string", "enum": ["low", "medium", "critical"]}, "blast_radius": { "type": "object", "properties": { "affected_pods": {"type": "integer"}, "estimated_downtime": {"type": "string"}, "related_services": {"type": "array", "items": {"type": "string"}}, "data_impact": {"type": "string", "enum": ["NONE", "READ_ONLY", "WRITE", "DESTRUCTIVE"]} }, "required": ["affected_pods", "estimated_downtime", "related_services", "data_impact"] }, "reasoning": {"type": "string"}, "deviation_analysis": {"type": "string"}, "confidence": {"type": "number"}, "affected_services": {"type": "array", "items": {"type": "string"}} }, "required": ["action_title", "description", "suggested_action", "kubectl_command", "target_resource", "namespace", "risk_level", "blast_radius", "reasoning", "confidence"] } }], "tool_choice": {"type": "tool", "name": "submit_analysis"}, }, timeout=30.0, ) response.raise_for_status() data = response.json() # 從 Tool Use 回應中提取 JSON for block in data.get("content", []): if block.get("type") == "tool_use" and block.get("name") == "submit_analysis": tool_input = block.get("input", {}) logger.info("claude_tool_use_response", input_keys=list(tool_input.keys())) return json.dumps(tool_input), True # Fallback: 嘗試從 text 內容提取 for block in data.get("content", []): if block.get("type") == "text": return block.get("text", ""), True return "No valid response from Claude", False except Exception as e: logger.warning("claude_call_failed", error=str(e)) return str(e), False # 2026-03-29 ogt: _call_nvidia 已移至 nvidia_provider.py (ARCHIVED) # 符合模組化規範 - 所有 NVIDIA API 呼叫統一由 NvidiaProvider / OpenClawNemoProvider 處理 # ========================================================================= # [END ARCHIVED Phase 24 B4] # ========================================================================= # ========================================================================= # Mock LLM - Intelligent Fallback with SignOz Data # ========================================================================= def _generate_mock_response( self, alert_context: dict, signoz_metrics: GoldMetrics | None = None, ) -> str: """ 規則引擎降級回應 (v8.0) — 生產用途,不是假數據 從 alert_rules.yaml 載入規則進行匹配,AI 分析失敗時的正式降級路徑。 命中 generic_fallback 時會回傳 rule_id="generic_fallback", 由上層 async 方法(_call_with_fallback)觸發 auto_generate_rule() 學習新規則。 Returns: (json_str, rule_id) tuple 2026-04-09 ogt: 重構為規則引擎,移除 if/elif 硬編碼 2026-04-09 ogt: S2-4 架構師審查 — 修正 Mock 語意混淆,澄清為規則引擎生產路徑 """ from src.services.alert_rule_engine import match_rule time.sleep(random.uniform(0.3, 0.8)) # 模擬思考延遲 # SignOz 數據整合 signoz_correlation = "SignOz 數據擷取中..." if signoz_metrics: signoz_correlation = ( f"RPS={signoz_metrics.rps:.1f} ({signoz_metrics.rps_trend}), " f"Error={signoz_metrics.error_rate:.2f}%, " f"P99={signoz_metrics.p99_latency_ms:.0f}ms" ) mock_response = match_rule(alert_context) if mock_response is None: # match_rule 不應該回傳 None(有通用兜底),但防禦性處理 alert_type = alert_context.get("alert_type", "custom") target = alert_context.get("target_resource", "unknown") namespace = alert_context.get("namespace", "awoooi-prod") mock_response = { "action_title": f"重新啟動 {target} 服務", "description": f"⚙️ 規則匹配: {target} 發生異常,需進一步診斷確認根因。", "suggested_action": "RESTART_DEPLOYMENT", "kubectl_command": f"kubectl rollout restart deployment/{target} -n {namespace}", "target_resource": target, "namespace": namespace, "risk_level": "medium", "blast_radius": { "affected_pods": 1, "estimated_downtime": "5-15 min", "related_services": [target], "data_impact": "NONE", }, "primary_responsibility": "COLLAB", "responsibility_reasoning": "告警資訊不足,建議多團隊協同排查", "secondary_teams": ["BE", "INFRA"], "optimization_suggestions": [], "reasoning": f"[規則匹配] 根據告警 {alert_type} 先重啟恢復服務。", "deviation_analysis": "監控指標顯示異常偏離基準線", "confidence": 0.0, "affected_services": [target], "signoz_correlation": signoz_correlation, } # 補充 SignOz 關聯資訊(規則引擎不持有 signoz_metrics) mock_response["signoz_correlation"] = signoz_correlation if signoz_metrics: mock_response["description"] += f" {signoz_metrics.to_summary()}" rule_id = mock_response.get("rule_id", "unknown") logger.info( "mock_llm_response_generated", rule_id=rule_id, action_title=mock_response["action_title"], risk_level=mock_response["risk_level"], primary_responsibility=mock_response["primary_responsibility"], confidence=mock_response["confidence"], signoz_integrated=signoz_metrics is not None, is_mock=True, ) # 2026-04-09 ogt: rule_id 回傳給上層 async 方法觸發自動規則生成 # 不在此 sync 方法中呼叫 asyncio,避免 event loop 混用問題 (S1-1 架構師審查) return json.dumps(mock_response), rule_id # ========================================================================= # LLM Cache Layer (憲法要求: 嚴禁無快取裸奔) # ========================================================================= def _generate_cache_key(self, prompt: str, context_hash: str = "") -> str: """ 生成 LLM 快取鍵 使用 prompt 內容的 SHA256 作為快取鍵,確保相同問題不重複呼叫 LLM """ content = f"{prompt}:{context_hash}" hash_digest = hashlib.sha256(content.encode()).hexdigest()[:16] return f"llm_cache:{hash_digest}" async def _call_with_cache( self, prompt: str, alert_context: dict | None = None, signoz_metrics: GoldMetrics | None = None, cache_ttl: int = 3600, # 1 hour default ) -> tuple[str, str, bool, bool, int, float]: """ 帶快取的 LLM 呼叫包裝器 憲法條款: 必須使用快取保護算力資源 Args: prompt: LLM prompt alert_context: 告警上下文 signoz_metrics: SignOz 指標 cache_ttl: 快取存活時間 (秒) Returns: (response, provider, success, from_cache, total_tokens, cost_usd) 2026-03-29 ogt: 加入 Token/Cost 追蹤 """ # 生成快取鍵 (基於 prompt + alert_context hash) context_hash = "" if alert_context: # 使用告警類型 + 目標資源作為上下文 hash context_hash = f"{alert_context.get('alert_type', '')}:{alert_context.get('target_resource', '')}" cache_key = self._generate_cache_key(prompt, context_hash) # 1. 嘗試從快取讀取 try: redis_client = get_redis() cached = await redis_client.get(cache_key) if cached: cached_data = json.loads(cached) logger.info( "llm_cache_hit", cache_key=cache_key[:20], provider=cached_data.get("provider", "cached"), ) return ( cached_data["response"], f"{cached_data['provider']}_cached", True, True, # from_cache 0, # tokens (cache hit, no new tokens) 0.0, # cost (cache hit, no cost) ) except Exception as e: logger.warning("llm_cache_read_failed", error=str(e)) # 2. Cache Miss - 呼叫 LLM logger.info("llm_cache_miss", cache_key=cache_key[:20]) response, provider, success, total_tokens, cost_usd = await self._call_with_fallback( prompt, alert_context, signoz_metrics ) # 3. 成功則寫入快取 if success: try: redis_client = get_redis() cache_data = { "response": response, "provider": provider, "cached_at": now_taipei_iso(), } await redis_client.set( cache_key, json.dumps(cache_data, ensure_ascii=False), ex=cache_ttl, ) logger.info( "llm_cache_write", cache_key=cache_key[:20], provider=provider, ttl=cache_ttl, ) except Exception as e: logger.warning("llm_cache_write_failed", error=str(e)) return response, provider, success, False, total_tokens, cost_usd # from_cache=False # ========================================================================= # Public LLM Interface (ILLMProvider Protocol) # ========================================================================= async def call(self, prompt: str) -> tuple[str, str, bool]: """ 呼叫 LLM (ILLMProvider Protocol 實作) #39 Error Analyzer Agent 使用此方法 Args: prompt: 完整的 prompt Returns: (response, provider, success) """ return await self._call_with_fallback(prompt) # ========================================================================= # Fallback Chain # ========================================================================= async def _call_with_fallback( self, prompt: str, alert_context: dict | None = None, signoz_metrics: GoldMetrics | None = None, ) -> tuple[str, str, bool, int, float]: """ 依 AI_FALLBACK_ORDER 順序呼叫 AI 若 MOCK_MODE=True,直接回傳模擬結果。 若所有 Provider 失敗,fallback 到 Mock。 Returns: tuple: (response, provider, success, total_tokens, cost_usd) Phase 15.1: 整合 Langfuse LLMOps 追蹤 2026-03-29 ogt: 加入 Token/Cost 追蹤 2026-04-02 ogt: Phase 24 ADR-052 絞殺者包裝 — USE_AI_ROUTER 新舊並存 """ # ================================================================= # Phase 24 ADR-052: 絞殺者分支 (Strangler Fig) # USE_AI_ROUTER=true → 新 AIRouterExecutor 路由 # USE_AI_ROUTER=false → 舊 if/else fallback chain (現狀) # 回滾: kubectl set env deployment/awoooi-api USE_AI_ROUTER=false # Phase 24 C: Redis 狀態覆蓋 env var (/ai router on/off) # ================================================================= # Redis 狀態優先 (Phase 24 C — 2026-04-03 ogt) _use_ai_router = settings.USE_AI_ROUTER try: from src.services.ai_control import get_ai_router_enabled _redis_override = await get_ai_router_enabled() if _redis_override is not None: _use_ai_router = _redis_override except Exception: pass if _use_ai_router: try: # 2026-04-02 ogt: C2 修復 — 呼叫 AIRouter.route() 智慧路由 (非靜態 order) # D1 意圖分類路由、D7 隱私保護 (DIAGNOSE/CODE_REVIEW 強制 local) 生效 from src.services.ai_router import get_ai_router, get_ai_executor, IntentType router = get_ai_router() executor = get_ai_executor() # Step 1: 取得路由決策 (含意圖分類 + 複雜度評分) decision = await router.route(prompt, alert_context) # Step 2: 從 RoutingDecision 建立 provider_order (主 + fallback) # Phase 24 C: Redis primary_provider 覆蓋路由決策 provider_order = [decision.selected_provider.value] + [ p.value for p, _ in decision.fallback_chain if p.value != decision.selected_provider.value ] try: from src.services.ai_control import get_primary_provider, is_provider_disabled _primary = await get_primary_provider() if _primary and _primary != decision.selected_provider.value: # 把 primary 移到首位 (保留原始 fallback) provider_order = [_primary] + [p for p in provider_order if p != _primary] # 過濾被停用的 Provider # C2 修復 (2026-04-03 首席架構師審查): Python 3.11 不支援 list comprehension 中 await _filtered = [] for _p in provider_order: if not await is_provider_disabled(_p): _filtered.append(_p) if _filtered: provider_order = _filtered except Exception as _e: logger.warning("ai_control_override_failed", error=str(_e)) # Step 3: D7 隱私 — DIAGNOSE/CODE_REVIEW 強制 local require_local = decision.intent in (IntentType.DIAGNOSE, IntentType.CODE_REVIEW) result = await executor.execute( prompt=prompt, provider_order=provider_order, context=alert_context, cache_ttl=3600, require_local=require_local, ) logger.info( "phase24_ai_router_used", provider=result.provider, success=result.success, latency_ms=round(result.latency_ms, 1), intent=decision.intent.value, routing_reason=decision.routing_reason, ) return result.raw_response, result.provider, result.success, result.tokens, result.cost_usd except Exception as e: # AIRouter 失敗時 fallback 到舊路徑 (安全網) logger.warning("phase24_ai_router_fallback_to_legacy", error=str(e)) # Mock Mode: 開發測試用 if settings.MOCK_MODE: logger.info("mock_mode_enabled", using="mock_llm") _mock_json, _rule_id = self._generate_mock_response(alert_context or {}, signoz_metrics) if _rule_id == "generic_fallback": import asyncio from src.services.alert_rule_engine import auto_generate_rule try: asyncio.create_task(auto_generate_rule( alert_context or {}, ollama_url=settings.OLLAMA_URL, model=settings.OPENCLAW_DEFAULT_MODEL, gemini_api_key=getattr(settings, "GEMINI_API_KEY", ""), )) except Exception as _e: logger.warning("auto_rule_trigger_failed", error=str(_e)) return _mock_json, "mock", True, 0, 0.0 # Phase 15.1 + 15.3: Langfuse 追蹤整合 + SignOz Deep Linking with langfuse_trace( "openclaw_fallback_chain", metadata={ "prompt_length": len(prompt), "fallback_order": settings.AI_FALLBACK_ORDER, "alert_fingerprint": (alert_context or {}).get("fingerprint", "unknown"), }, ) as trace: # Phase 15.3: SignOz → Langfuse 反向連結 # 在當前 OTEL span 中記錄 Langfuse trace_id if trace.langfuse_trace_id: from opentelemetry import trace as otel_trace from src.core.deep_linking import DeepLinking current_span = otel_trace.get_current_span() if current_span: current_span.set_attribute("langfuse.trace_id", trace.langfuse_trace_id) current_span.set_attribute( "langfuse.trace_url", DeepLinking.langfuse_trace_url(trace.langfuse_trace_id), ) # Phase 13.2: Rate Limiter 整合 (2026-03-26) # 防止雲端 API 用量暴衝,超限自動降級 from src.services.ai_rate_limiter import get_ai_rate_limiter rate_limiter = get_ai_rate_limiter() for provider in settings.AI_FALLBACK_ORDER: # Rate Limit 檢查 (nvidia/gemini/claude 需檢查,ollama 不限) # 2026-03-30 ogt: 加入 nvidia (RPM=5 限制) if provider in ("nvidia", "gemini", "claude"): allowed, reason = await rate_limiter.check_and_increment(provider) if not allowed: logger.warning( "ai_rate_limit_skip", provider=provider, reason=reason, ) continue # 跳過此 provider,嘗試下一個 logger.info("ai_provider_attempt", provider=provider) start_time = time.time() model_name = self._get_model_name(provider) # 2026-03-29 ogt: Gemini 回傳 4 值 (含 token/cost),其他 Provider 補 0 total_tokens = 0 cost_usd = 0.0 if provider == "ollama": response, success = await self._call_ollama(prompt) elif provider == "gemini": response, success, total_tokens, cost_usd = await self._call_gemini(prompt) elif provider == "nvidia": # 2026-03-29 ogt: 使用 NvidiaProvider.chat() (模組化規範) from src.services.nvidia_provider import get_nvidia_provider nvidia_provider = get_nvidia_provider() response, success, total_tokens, cost_usd = await nvidia_provider.chat(prompt, use_json_mode=True) elif provider == "claude": response, success = await self._call_claude(prompt) else: logger.warning("unknown_ai_provider", provider=provider) continue latency_ms = (time.time() - start_time) * 1000 # Langfuse: 記錄每次 LLM 呼叫 trace.generation( name=f"{provider}_call", model=model_name, input=prompt[:500], # 截斷避免過長 output=response[:500] if success else f"ERROR: {response[:200]}", metadata={ "success": success, "latency_ms": round(latency_ms, 2), "provider": provider, "total_tokens": total_tokens, "cost_usd": cost_usd, }, ) if success: logger.info( "ai_provider_success", provider=provider, latency_ms=latency_ms, total_tokens=total_tokens, cost_usd=f"${cost_usd:.6f}", ) # Langfuse: 記錄成功評分 trace.score(name="provider_success", value=1.0, comment=f"Success via {provider}") # 2026-03-29 ogt: 記錄累積成本 (Gemini/Claude) if cost_usd > 0: await rate_limiter.record_cost(provider, cost_usd) return response, provider, True, total_tokens, cost_usd logger.warning("ai_provider_failed_fallback", provider=provider, latency_ms=latency_ms) # 所有 Provider 失敗時,fallback 到 Mock (優雅降級) logger.warning("all_providers_failed_using_mock", fallback="mock_llm") trace.score(name="provider_success", value=0.0, comment="All providers failed, using mock") _mock_json, _rule_id = self._generate_mock_response(alert_context or {}, signoz_metrics) if _rule_id == "generic_fallback": import asyncio from src.services.alert_rule_engine import auto_generate_rule try: asyncio.create_task(auto_generate_rule( alert_context or {}, ollama_url=settings.OLLAMA_URL, model=settings.OPENCLAW_DEFAULT_MODEL, gemini_api_key=getattr(settings, "GEMINI_API_KEY", ""), )) except Exception as _e: logger.warning("auto_rule_trigger_failed", error=str(_e)) return _mock_json, "mock_fallback", True, 0, 0.0 def _get_model_name(self, provider: str) -> str: """取得 provider 對應的模型名稱 (從 ModelRegistry)""" registry = get_model_registry() return registry.get_model(provider, "rca") # ========================================================================= # Response Parsing (防禦性解析) # ========================================================================= def _extract_json_from_response(self, text: str) -> str | None: """從 LLM 回應中提取 JSON (含啟發式修補)""" # 0. 清理開頭結尾空白 text = text.strip() if not text: return None # 1. 嘗試直接解析 try: json.loads(text) return text except json.JSONDecodeError: pass # 2. 嘗試從 markdown code block 提取 patterns = [ r"```json\s*([\s\S]*?)\s*```", r"```\s*([\s\S]*?)\s*```", r"(\{[\s\S]*\})", # 貪婪匹配最大括號對 ] for pattern in patterns: match = re.search(pattern, text) if match: candidate = match.group(1) if "(" in pattern else match.group(0) candidate = candidate.strip() try: json.loads(candidate) return candidate except json.JSONDecodeError: # 3. 啟發式修補: 如果結尾缺少括號,嘗試補齊 if candidate.startswith("{") and not candidate.endswith("}"): for i in range(1, 5): # 嘗試補 1-5 個括號/引號 try: repaired = candidate + '"' * (i-1) + "}" * i json.loads(repaired) logger.info("json_repaired_heuristically", level=i) return repaired except: continue continue # 4. 極端情況: 找出最後一個有效 key if "{" in text: start_idx = text.find("{") candidate = text[start_idx:] # 暴力去除非法尾綴 (如 \t\t...) candidate = re.sub(r"[ \t\r\n]+$", "", candidate) if not candidate.endswith("}"): candidate += '"}' # 嘗試最簡單的閉合 try: json.loads(candidate) return candidate except: pass return None def _parse_analysis_result(self, raw_response: str) -> OpenClawDecision | None: """ 解析 LLM 分析結果 - 使用 Pydantic Schema Enforcement 關鍵:blast_radius 為 REQUIRED,使用 AIBlastRadius Pydantic 模型驗證 """ json_str = self._extract_json_from_response(raw_response) if not json_str: logger.error("json_extraction_failed", raw_response=raw_response[:200]) return None try: data = json.loads(json_str) # Step 1: 確保 blast_radius 存在且為正確格式 if "blast_radius" not in data or not isinstance(data["blast_radius"], dict): data["blast_radius"] = { "affected_pods": 1, "estimated_downtime": "~30s", "related_services": data.get("affected_services", []), "data_impact": "NONE" } else: # 確保 blast_radius 內的必填欄位存在 br = data["blast_radius"] if "affected_pods" not in br: br["affected_pods"] = 1 if "estimated_downtime" not in br: br["estimated_downtime"] = "~30s" if "related_services" not in br: br["related_services"] = data.get("affected_services", []) if "data_impact" not in br: br["data_impact"] = "NONE" # Step 2: 填補其他可選欄位 if "action_title" not in data: data["action_title"] = data.get("action", "未知操作") if "target_resource" not in data: data["target_resource"] = "unknown" if "suggested_action" not in data: data["suggested_action"] = "NO_ACTION" # Step 2.5: 2026-04-01 Claude Code - 斷片補全 (信心度必須誠實) # 🔴 禁止填入假信心度!截斷 = 0.0,讓 auto-approve 正確判斷 if "confidence" not in data or not isinstance(data["confidence"], int | float): data["confidence"] = 0.0 # 截斷/缺失 → 0.0,不可偽造 if "risk_level" not in data: data["risk_level"] = "low" if "primary_responsibility" not in data: data["primary_responsibility"] = "INFRA" if "kubectl" in str(data) else "BE" if "suggested_action" not in data: data["suggested_action"] = "RESTART_DEPLOYMENT" if "restart" in str(data).lower() else "NO_ACTION" if "reasoning" not in data: data["reasoning"] = "AI 產出欄位缺失,系統自動補全以維持運作。" # Step 3: 使用 Pydantic 驗證 (會自動正規化 risk_level, data_impact 等) decision = OpenClawDecision(**data) logger.info( "pydantic_validation_success", action_title=decision.action_title, risk_level=decision.risk_level.value, blast_radius_pods=decision.blast_radius.affected_pods, ) return decision except Exception as e: logger.error( "pydantic_validation_failed", error=str(e), json_str=json_str[:300], ) return None # ========================================================================= # Main Analysis Methods # ========================================================================= async def analyze_alert( self, alert_context: dict, ) -> tuple[LLMAnalysisResult | None, str, str, GoldMetrics | None, str, int, float]: """ 分析告警並產生 RCA 結果 (含 SignOz 整合) Args: alert_context: 告警上下文 (alert_type, severity, target_resource, etc.) Returns: (analysis_result, ai_provider, raw_response, signoz_metrics, signoz_trace_url, total_tokens, cost_usd) 2026-03-29 ogt: 加入 Token/Cost 追蹤 """ # Step 0: 擷取 SignOz 上下文 service_name = alert_context.get("target_resource", "unknown") namespace = alert_context.get("namespace", "default") signoz_metrics, signoz_trace_url = await self.get_signoz_context( service_name=service_name, namespace=namespace, ) # 將 SignOz 數據加入 prompt signoz_context = "" if signoz_metrics: signoz_context = f""" ## 📊 SignOz Real-time Metrics (Last 10 min) {signoz_metrics.to_summary()} Trace URL: {signoz_trace_url} """ # 格式化告警為 Prompt (2026-03-31 ogt: 強力截斷以符合 NVIDIA 4K 限制) # 優先保留 System Prompt,截斷 Alert Data available_len = 3500 - len(OPENCLAW_SYSTEM_PROMPT) - len(signoz_context) if available_len < 500: # 如果 SignOz 太長,也截斷它 signoz_context = signoz_context[:500] + "... (truncated)" available_len = 3500 - len(OPENCLAW_SYSTEM_PROMPT) - len(signoz_context) alert_json = json.dumps(alert_context, ensure_ascii=False, indent=2) if len(alert_json) > available_len: alert_json = alert_json[:available_len] + "... (truncated)" full_prompt = OPENCLAW_SYSTEM_PROMPT + signoz_context + "\n\n## Alert Data:\n" + alert_json logger.info( "openclaw_alert_analysis_start", alert_type=alert_context.get("alert_type"), target=alert_context.get("target_resource"), signoz_available=signoz_metrics is not None, ) # 呼叫 LLM (使用快取層保護算力) raw_response, provider, success, from_cache, total_tokens, cost_usd = await self._call_with_cache( full_prompt, alert_context, signoz_metrics, cache_ttl=1800, # 30 min for alert analysis ) if not success: logger.error("openclaw_all_providers_failed") return None, provider, raw_response, signoz_metrics, signoz_trace_url, 0, 0.0 if from_cache: logger.info("openclaw_using_cached_response", provider=provider) logger.info( "openclaw_llm_response_received", provider=provider, response_length=len(raw_response), ) # 解析結果 result = self._parse_analysis_result(raw_response) if result: logger.info( "openclaw_analysis_complete", action_title=result.action_title, risk_level=result.risk_level, confidence=result.confidence, provider=provider, signoz_integrated=signoz_metrics is not None, ) else: logger.warning( "openclaw_analysis_parse_failed", raw_response=raw_response[:300], ) return result, provider, raw_response, signoz_metrics, signoz_trace_url, total_tokens, cost_usd # ========================================================================= # Phase 6.4: LLM Proposal Generation # ========================================================================= async def generate_incident_proposal( self, incident_id: str, severity: str, signals: list[dict], affected_services: list[str], expert_context: dict | None = None, ) -> tuple[dict | None, str, bool]: """ 為 Incident 生成 LLM-based 修復提案 Phase 6.4: 賦予大腦「生成解決方案」的思考能力 2026-03-27: 整合 Expert System 診斷上下文 Args: incident_id: Incident ID severity: 嚴重度 (P0/P1/P2/P3) signals: 關聯的告警訊號 affected_services: 受影響服務 expert_context: Expert System 初步診斷 (可選) - initial_diagnosis: 規則匹配結果 - diagnosis_description: 診斷描述 - suggested_diagnosis_commands: 建議診斷指令 - expert_confidence: 信心分數 - requires_human_review: 是否需人工介入 Returns: (proposal_dict, provider, success) proposal_dict 包含: - action: 建議動作 - description: 動作描述 - kubectl_command: kubectl 指令 - risk_level: 風險等級 - reasoning: LLM 推理過程 """ # 建構 prompt (2026-03-31 ogt: Nemotron-mini context 較小,限制數量與長度) signal_summary = "\n".join([ f"- {s.get('alert_name', 'unknown')}: {str(s.get('description', 'N/A'))[:100]}..." for s in signals[:3] # 最多 3 筆,每筆最多 100 字元 ]) target = affected_services[0] if affected_services else "unknown-service" # 擷取 SignOz 指標 signoz_metrics, signoz_trace_url = await self.get_signoz_context( service_name=target, namespace="awoooi-prod", ) signoz_context = "" if signoz_metrics: signoz_context = f""" ## 📊 SignOz Real-time Metrics {signoz_metrics.to_summary()} """ # 2026-03-27: 整合 Expert System 診斷上下文 # 2026-03-26: ADR-030 Phase 2 - 加入 K8s/SignOz 診斷上下文 expert_diagnosis_context = "" if expert_context: diagnosis_cmds = expert_context.get("suggested_diagnosis_commands", []) diagnosis_cmds_str = "\n".join([f" - `{cmd}`" for cmd in diagnosis_cmds]) if diagnosis_cmds else " - (無)" # ADR-030: 加入完整診斷上下文 (如果有),並限制長度以符合 4K Context full_diagnosis = str(expert_context.get("diagnosis_context", ""))[:800] if len(str(expert_context.get("diagnosis_context", ""))) > 800: full_diagnosis += "... (truncated)" diagnosis_signals = expert_context.get("diagnosis_signals", []) signals_summary = "" if diagnosis_signals: signals_summary = "\n".join([ f" - [{s.get('severity', 'info').upper()}] {s.get('source', 'unknown')}: {s.get('message', 'N/A')[:100]}" for s in diagnosis_signals[:5] ]) expert_diagnosis_context = f""" ## 🔍 Expert System Initial Diagnosis - **Matched Rule**: {expert_context.get('initial_diagnosis', 'unknown')} - **Diagnosis**: {expert_context.get('diagnosis_description', 'N/A')} - **Confidence**: {expert_context.get('expert_confidence', 0.0):.0%} - **Requires Human Review**: {'Yes' if expert_context.get('requires_human_review') else 'No'} - **Suggested Diagnosis Commands**: {diagnosis_cmds_str} {f'''## 🩺 K8s/SignOz Deep Diagnosis (ADR-030) {full_diagnosis} ### Diagnosis Signals {signals_summary if signals_summary else " - (No signals detected)"} ''' if full_diagnosis else ''} **IMPORTANT**: The Expert System and Diagnostic Aggregator have provided context. Consider this data but apply your own analysis. If Expert says "human review required", provide diagnostic guidance rather than automated fixes. """ # 2026-03-31 ogt: 針對 NVIDIA Nemo-4B 使用超精簡 Prompt registry = get_model_registry() is_nemo = "nvidia" in (registry.get_model("nvidia", "rca") or "").lower() base_prompt = NEMOTRON_SYSTEM_PROMPT if is_nemo else OPENCLAW_SYSTEM_PROMPT proposal_prompt = f"""{base_prompt} {signoz_context} {expert_diagnosis_context} ## 🚨 Incident Context - **Incident ID**: {incident_id} - **Severity**: {severity} - **Affected Services**: {', '.join(affected_services)} - **Signal Count**: {len(signals)} ## 📋 Alert Signals {signal_summary} ## 🎯 Your Task Based on the above incident, signals, and Expert System diagnosis, generate a remediation proposal. You MUST respond with ONLY valid JSON following the schema above. Focus on: 1. Root cause analysis based on signals, SignOz data, and Expert diagnosis 2. Specific kubectl command to remediate (or diagnostic command if root cause unclear) 3. Risk assessment for the proposed action 4. Preventive recommendations 5. If Expert System flagged "human review required", prioritize diagnostic commands over fixes """ logger.info( "proposal_generation_start", incident_id=incident_id, severity=severity, signal_count=len(signals), signoz_available=signoz_metrics is not None, ) # 2026-04-01 ogt: 架構鐵律 — OpenClaw (Nemo) 是 AI 大腦,優先委派仲裁 # AWOOOI K8s Pod 不直接打 Ollama/NVIDIA,避免並發逾時 openclaw_result = await self._call_openclaw_analyze( incident_id, severity, signals, affected_services, expert_context ) if openclaw_result is not None: return openclaw_result, "openclaw_nemo", True # 使用快取呼叫 LLM alert_context = { "incident_id": incident_id, "alert_type": signals[0].get("alert_name", "incident") if signals else "incident", "target_resource": target, "severity": severity, } # 2026-03-29 ogt: 修復 tuple unpacking (Token/Cost 追蹤) raw_response, provider, success, from_cache, ai_tokens, ai_cost = await self._call_with_cache( proposal_prompt, alert_context, signoz_metrics, cache_ttl=3600, # 1 hour for proposals ) if not success: logger.error( "proposal_generation_failed", incident_id=incident_id, provider=provider, ) return None, provider, False # 解析 LLM 結果 result = self._parse_analysis_result(raw_response) if result: logger.info( "proposal_generation_complete", incident_id=incident_id, action_title=result.action_title, risk_level=result.risk_level, provider=provider, from_cache=from_cache, ) # 轉換為 proposal dict (optimization_suggestions 是 list[dict]) # 2026-03-29 ogt: 加入 ai_tokens/ai_cost 追蹤 proposal_dict = { "action": result.action_title, "description": result.description, "kubectl_command": result.kubectl_command, "target_resource": result.target_resource, "namespace": result.namespace, "risk_level": result.risk_level, "reasoning": result.reasoning, "confidence": result.confidence, "primary_responsibility": result.primary_responsibility, "optimization_suggestions": [ { "type": s.get("type", "UNKNOWN"), "description": s.get("description", ""), "kubectl_or_config": s.get("kubectl_or_config", ""), } for s in result.optimization_suggestions ], "signoz_correlation": result.signoz_correlation, "from_cache": from_cache, "provider": provider, "model": self._get_model_name(provider), # 2026-04-04 ogt: 底層模型名稱 "ai_tokens": ai_tokens, "ai_cost": ai_cost, } return proposal_dict, provider, True logger.warning( "proposal_parse_failed", incident_id=incident_id, raw_response=raw_response[:300], ) return None, provider, False # ========================================================================= # Phase 22: OpenClaw + Nemotron 協作 (ADR-044) # 2026-03-31 Claude Code: 統帥批准實作 # ========================================================================= async def generate_incident_proposal_with_tools( self, incident_id: str, severity: str, signals: list[dict], affected_services: list[str], expert_context: dict | None = None, ) -> tuple[dict | None, str, bool]: """ Phase 22: OpenClaw + Nemotron 協作生成修復提案 架構: - OpenClaw = 仲裁者 (Arbitrator) - 決定「為什麼」和「風險等級」 - Nemotron = 執行者 (Executor) - 決定「怎麼做」和「具體指令」 觸發條件: - LOW 風險 → 僅 OpenClaw,跳過 Nemotron - MEDIUM/HIGH/CRITICAL → OpenClaw + Nemotron 雙軌 Args: incident_id: Incident ID severity: 嚴重度 (P0/P1/P2/P3) signals: 關聯的告警訊號 affected_services: 受影響服務 expert_context: Expert System 初步診斷 (可選) Returns: (proposal_dict, provider, success) proposal_dict 新增: - nemotron_enabled: bool - nemotron_tools: list[dict] (如果啟用) - nemotron_validation: str - nemotron_latency_ms: float """ # Feature Flag 檢查 if not settings.ENABLE_NEMOTRON_COLLABORATION: logger.info( "nemotron_collaboration_disabled", incident_id=incident_id, reason="Feature flag disabled", ) return await self.generate_incident_proposal( incident_id, severity, signals, affected_services, expert_context ) # Step 1: OpenClaw 仲裁 proposal, provider, success = await self.generate_incident_proposal( incident_id, severity, signals, affected_services, expert_context ) if not success or proposal is None: return proposal, provider, success # Step 2: 判斷是否需要 Nemotron risk_level = proposal.get("risk_level", "low").lower() if risk_level == "low": proposal["nemotron_enabled"] = False logger.info( "nemotron_skipped_low_risk", incident_id=incident_id, risk_level=risk_level, ) return proposal, provider, True # Step 3: 呼叫 Nemotron Tool Calling — 🔴 必須等到有結果,不可跳過 # 2026-04-07 ogt: 統帥指示 Nemotron 不能跳過,必須等到處理完成 logger.info( "nemotron_collaboration_start", incident_id=incident_id, risk_level=risk_level, ) max_retries = 2 last_error = None for attempt in range(1, max_retries + 1): try: nemotron_result = await self._call_nemotron_tools( incident_id=incident_id, reasoning=proposal.get("reasoning", ""), target_resource=proposal.get("target_resource", ""), suggested_action=proposal.get("action", ""), namespace=proposal.get("namespace", "awoooi-prod"), ) proposal["nemotron_enabled"] = True proposal["nemotron_tools"] = nemotron_result.get("tools", []) proposal["nemotron_validation"] = nemotron_result.get("validation", "⏳ 驗證中") proposal["nemotron_latency_ms"] = nemotron_result.get("latency_ms", 0.0) proposal["nemotron_tool_model"] = nemotron_result.get("tool_model", "") proposal["nemotron_tool_backend"] = nemotron_result.get("tool_backend", "") # 2026-04-09 Claude Sonnet 4.6: 將 Nemotron tool call 回填為 kubectl_command # 根本問題修復:approval_records.action 需要可執行指令才能被 parse_operation_from_action 解析 _backfill_kubectl_command(proposal, proposal["nemotron_tools"]) logger.info( "nemotron_collaboration_complete", incident_id=incident_id, tools_count=len(proposal["nemotron_tools"]), validation=proposal["nemotron_validation"], latency_ms=proposal["nemotron_latency_ms"], attempt=attempt, ) last_error = None break # 成功,跳出重試迴圈 except Exception as e: last_error = e logger.warning( "nemotron_collaboration_retry", incident_id=incident_id, error=str(e), attempt=attempt, max_retries=max_retries, ) if attempt < max_retries: import asyncio await asyncio.sleep(2) # 重試前等 2 秒 # 重試全部失敗 — fallback 到 Gemini 模擬 tool calling # 2026-04-08 ogt: NIM 完全不可用時,改用 Gemini 產生執行方案(不可跳過) if last_error is not None: logger.error( "nemotron_collaboration_exhausted", incident_id=incident_id, error=str(last_error), retries=max_retries, ) logger.info("nemotron_fallback_gemini_start", incident_id=incident_id) gemini_fallback_result = await self._call_nemotron_tools_via_gemini( incident_id=incident_id, reasoning=proposal.get("reasoning", ""), target_resource=proposal.get("target_resource", ""), suggested_action=proposal.get("action", ""), namespace=proposal.get("namespace", "awoooi-prod"), ) proposal["nemotron_enabled"] = True proposal["nemotron_tools"] = gemini_fallback_result.get("tools", []) proposal["nemotron_validation"] = gemini_fallback_result.get("validation", "⚠️ Gemini 代理") proposal["nemotron_latency_ms"] = gemini_fallback_result.get("latency_ms", 0.0) proposal["nemotron_tool_model"] = "gemini-fallback" proposal["nemotron_tool_backend"] = "Gemini 雲端" # 2026-04-09 Claude Sonnet 4.6: Gemini fallback 同樣回填 kubectl_command _backfill_kubectl_command(proposal, proposal["nemotron_tools"]) return proposal, provider, True async def _call_nemotron_tools( self, incident_id: str, reasoning: str, target_resource: str, suggested_action: str, namespace: str = "awoooi-prod", ) -> dict: """ 呼叫 Nemotron 執行 Tool Calling Args: incident_id: Incident ID reasoning: OpenClaw 推理結果 target_resource: 目標資源名稱 suggested_action: OpenClaw 建議的操作 namespace: K8s namespace Returns: { "tools": [{"tool": str, "args": dict, "valid": bool}], "validation": str, "latency_ms": float } """ import asyncio from src.services.nvidia_provider import get_nvidia_provider nvidia = get_nvidia_provider() start_time = time.time() # 建構 Tool Calling prompt # 2026-04-09 Claude Code: 明確指示 deployment_name,防止 Nemotron 填 placeholder tool_prompt = f"""根據以下 AI 分析結果,生成對應的 kubectl 操作指令: ## Incident 上下文 - Incident ID: {incident_id} - 目標資源 (deployment_name): {target_resource} - Namespace: {namespace} ## OpenClaw 分析 - 建議操作: {suggested_action} - 推理過程: {reasoning[:500]} ## 你的任務 使用提供的工具生成 kubectl 操作。 **重要**: deployment_name 必須填入 "{target_resource}",不可使用 placeholder。 """ # 定義可用 Tools (K8s 操作) k8s_tools = [ { "type": "function", "function": { "name": "restart_deployment", "description": "重啟 Deployment (rollout restart)", "parameters": { "type": "object", "properties": { "deployment_name": {"type": "string"}, "namespace": {"type": "string", "default": "awoooi-prod"}, }, "required": ["deployment_name"], }, }, }, { "type": "function", "function": { "name": "scale_deployment", "description": "調整 Deployment 副本數", "parameters": { "type": "object", "properties": { "deployment_name": {"type": "string"}, "replicas": {"type": "integer"}, "namespace": {"type": "string", "default": "awoooi-prod"}, }, "required": ["deployment_name", "replicas"], }, }, }, { "type": "function", "function": { "name": "delete_pod", "description": "刪除 Pod (強制重建)", "parameters": { "type": "object", "properties": { "pod_name": {"type": "string"}, "namespace": {"type": "string", "default": "awoooi-prod"}, }, "required": ["pod_name"], }, }, }, ] try: # 2026-04-07 ogt: 統帥指示不可跳過 Nemotron,用 120 秒寬裕超時 timeout = 120 result = await asyncio.wait_for( nvidia.tool_call( messages=[{"role": "user", "content": tool_prompt}], tools=k8s_tools, ), timeout=timeout, ) latency_ms = (time.time() - start_time) * 1000 # 解析 Tool Calling 結果 tools = [] validation_passed = True if result and hasattr(result, "tool_calls") and result.tool_calls: for tc in result.tool_calls: tool_entry = { "tool": tc.tool_name if hasattr(tc, "tool_name") else str(tc.get("name", "unknown")), "args": tc.arguments if hasattr(tc, "arguments") else tc.get("arguments", {}), "valid": tc.valid if hasattr(tc, "valid") else True, } tools.append(tool_entry) if not tool_entry["valid"]: validation_passed = False elif result and isinstance(result, dict) and result.get("tool_calls"): for tc in result["tool_calls"]: tool_entry = { "tool": tc.get("name", "unknown"), "args": tc.get("arguments", {}), "valid": True, } tools.append(tool_entry) # 2026-04-09 Claude Code: 修正 Nemotron 回傳 placeholder 問題 # 若 deployment_name 是 placeholder 或空值,用 target_resource 覆蓋 _PLACEHOLDERS = {"", "", "", "unknown", "null", "None"} for t in tools: args = t.get("args", {}) if isinstance(args, dict): dn = args.get("deployment_name", "") if not dn or str(dn).strip("<>") in _PLACEHOLDERS or dn.startswith("<"): args["deployment_name"] = target_resource logger.info( "nemotron_placeholder_corrected", incident_id=incident_id, original=dn, corrected=target_resource, ) validation_status = "✅ 驗證通過" if validation_passed and tools else "❌ 驗證失敗" return { "tools": tools, "validation": validation_status, "latency_ms": latency_ms, } except asyncio.TimeoutError: latency_ms = (time.time() - start_time) * 1000 logger.error( "nemotron_tool_call_timeout", incident_id=incident_id, timeout_seconds=timeout, ) # 超時也拋出,讓外層重試 raise except Exception as e: latency_ms = (time.time() - start_time) * 1000 logger.error( "nemotron_tool_call_error", incident_id=incident_id, error=str(e), ) raise async def _call_nemotron_tools_via_gemini( self, incident_id: str, reasoning: str, target_resource: str, suggested_action: str, namespace: str = "awoooi-prod", ) -> dict: """ NIM 完全不可用時,由 Gemini 代理產生 tool calling 執行方案。 2026-04-08 ogt: NIM timeout 後的唯一 fallback,不可跳過。 Returns: {"tools": [...], "validation": str, "latency_ms": float} """ import time as _time start_time = _time.time() prompt = f"""你是 K8s SRE 專家。根據以下分析,輸出對應的 kubectl 操作指令(JSON 格式)。 Incident ID: {incident_id} 目標資源: {target_resource} Namespace: {namespace} 建議操作: {suggested_action} 分析摘要: {reasoning[:300]} 請輸出以下 JSON 格式(只輸出 JSON,不要其他文字): {{ "tool_name": "restart_deployment 或 scale_deployment 或 no_action", "deployment_name": "部署名稱", "namespace": "{namespace}", "reason": "一句話說明原因" }}""" try: text, success, _, _ = await self._call_gemini(prompt) latency_ms = (_time.time() - start_time) * 1000 if not success: logger.warning("nemotron_gemini_fallback_failed", incident_id=incident_id, error=text) return {"tools": [], "validation": "❌ NIM + Gemini 均不可用", "latency_ms": latency_ms} import json as _json data = _json.loads(text) tool_name = data.get("tool_name", "no_action") tools = [] if tool_name != "no_action": tools = [{ "tool": tool_name, "args": { "deployment_name": data.get("deployment_name", target_resource), "namespace": data.get("namespace", namespace), }, "valid": True, }] logger.info( "nemotron_gemini_fallback_success", incident_id=incident_id, tool=tool_name, latency_ms=latency_ms, ) return { "tools": tools, "validation": "✅ Gemini 代理驗證通過", "latency_ms": latency_ms, } except Exception as e: latency_ms = (_time.time() - start_time) * 1000 logger.error("nemotron_gemini_fallback_error", incident_id=incident_id, error=str(e)) return {"tools": [], "validation": f"❌ Gemini 代理失敗: {str(e)[:50]}", "latency_ms": latency_ms} # ========================================================================= # Shadow Mode Auto-Tuning # ========================================================================= async def execute_auto_tuning( self, approval_id: str, kubectl_command: str, description: str, ) -> dict: """ 執行自動調優 (Shadow Mode: 僅日誌輸出) 統帥鐵律: Shadow Mode 下嚴禁實際執行 K8s 命令 Args: approval_id: 簽核單 ID kubectl_command: kubectl 指令 description: 操作描述 Returns: {executed: bool, shadow_mode: bool, command: str, log: str} """ if settings.SHADOW_MODE_ENABLED: # Shadow Mode: 僅記錄,不執行 log_message = f"[SHADOW MODE] AI 生成的調優指令:{kubectl_command}" logger.info( "shadow_mode_auto_tuning", approval_id=approval_id, command=kubectl_command, description=description, executed=False, ) print(f"\n{'='*60}") print(log_message) print(f"描述: {description}") print(f"簽核單: {approval_id}") print(f"{'='*60}\n") return { "executed": False, "shadow_mode": True, "command": kubectl_command, "description": description, "log": log_message, } else: # 生產模式: 實際執行 (需要額外安全檢查) logger.warning( "auto_tuning_execution_attempted", approval_id=approval_id, command=kubectl_command, message="Production execution not yet implemented - requires multi-sig approval", ) return { "executed": False, "shadow_mode": False, "command": kubectl_command, "description": description, "log": "Production execution requires multi-sig approval", } # ============================================================================= # Singleton # ============================================================================= _openclaw: OpenClawService | None = None def get_openclaw() -> OpenClawService: """取得全域 OpenClaw 實例""" global _openclaw if _openclaw is None: _openclaw = OpenClawService() return _openclaw async def close_openclaw() -> None: """關閉 OpenClaw 連線""" global _openclaw if _openclaw: await _openclaw.close() _openclaw = None # ============================================================================= # Phase 5 + SignOz Integration Complete # =============================================================================