From 60e9538889098ea30aaa9dff2faac08622fa15b7 Mon Sep 17 00:00:00 2001 From: OG T Date: Thu, 26 Mar 2026 21:55:50 +0800 Subject: [PATCH] =?UTF-8?q?feat(api):=20ADR-030=20Phase=202=20=E8=A8=BA?= =?UTF-8?q?=E6=96=B7=E8=B3=87=E6=96=99=E6=94=B6=E9=9B=86=E5=BC=B7=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 實作智能自動修復系統的資料收集層: 1. k8s_diagnostics.py - K8s 診斷服務 - Pod Events/Logs/ResourceUsage 收集 - CrashLoopBackOff/OOM/ImagePull 偵測 - 非同步並行收集 + 錯誤容忍 2. diagnosis_aggregator.py - 診斷聚合器 - 整合 K8s + SignOz + Expert Rules - DiagnosisContext 提供結構化 LLM Prompt - DiagnosisSignal 信號分析 3. decision_manager.py - 決策引擎整合 - Step 2.5 加入診斷收集 - 傳遞 diagnosis_context 給 LLM 4. openclaw.py - LLM Prompt 增強 - 整合 K8s/SignOz 深度診斷上下文 - 支援 diagnosis_signals 摘要 ADR-030 架構: 診斷先行,根因分析,非盲目重啟 Co-Authored-By: Claude Opus 4.5 --- apps/api/src/services/decision_manager.py | 37 +- apps/api/src/services/diagnosis_aggregator.py | 590 ++++++++++++++++ apps/api/src/services/k8s_diagnostics.py | 654 ++++++++++++++++++ apps/api/src/services/openclaw.py | 23 +- 4 files changed, 1301 insertions(+), 3 deletions(-) create mode 100644 apps/api/src/services/diagnosis_aggregator.py create mode 100644 apps/api/src/services/k8s_diagnostics.py diff --git a/apps/api/src/services/decision_manager.py b/apps/api/src/services/decision_manager.py index 8451c49c..91a895dc 100644 --- a/apps/api/src/services/decision_manager.py +++ b/apps/api/src/services/decision_manager.py @@ -31,6 +31,7 @@ from src.core.config import settings from src.core.redis_client import get_redis from src.models.incident import Incident from src.models.playbook import SymptomPattern +from src.services.diagnosis_aggregator import get_diagnosis_aggregator from src.services.openclaw import get_openclaw from src.services.playbook_service import get_playbook_service @@ -600,7 +601,36 @@ class DecisionManager: ) return expert_result - # Step 3: 準備 LLM 上下文 (含 Expert 診斷) + # Step 2.5: ADR-030 診斷資料收集 (Phase 2) + # 使用 DiagnosisAggregator 收集 K8s + SignOz 診斷資料 + diagnosis_context = None + target = incident.affected_services[0] if incident.affected_services else None + if target: + try: + aggregator = get_diagnosis_aggregator() + diagnosis_context = await aggregator.collect_pod_diagnosis( + pod_name=target, + namespace="awoooi-prod", + include_signoz=True, + include_error_logs=True, + expert_match=expert_result, + ) + logger.info( + "dual_engine_diagnosis_collected", + incident_id=incident.incident_id, + target=target, + signals_count=len(diagnosis_context.signals), + highest_severity=diagnosis_context.highest_severity.value, + ) + except Exception as e: + logger.warning( + "dual_engine_diagnosis_failed", + incident_id=incident.incident_id, + error=str(e), + ) + # 診斷收集失敗不影響主流程,繼續使用 expert_result + + # Step 3: 準備 LLM 上下文 (含 Expert 診斷 + K8s/SignOz 診斷) signals_dict = [s.model_dump() for s in incident.signals] expert_context = { "initial_diagnosis": expert_result.get("matched_rule"), @@ -610,6 +640,11 @@ class DecisionManager: "requires_human_review": expert_result.get("human_review_required", False), } + # 加入診斷上下文 (如果有) + if diagnosis_context: + expert_context["diagnosis_context"] = diagnosis_context.get_llm_prompt_context() + expert_context["diagnosis_signals"] = [s.to_dict() for s in diagnosis_context.signals] + # Step 4: LLM 分析 (帶上 Expert 上下文) try: llm_result, provider, success = await self._openclaw.generate_incident_proposal( diff --git a/apps/api/src/services/diagnosis_aggregator.py b/apps/api/src/services/diagnosis_aggregator.py new file mode 100644 index 00000000..26647ceb --- /dev/null +++ b/apps/api/src/services/diagnosis_aggregator.py @@ -0,0 +1,590 @@ +""" +Diagnosis Aggregator - Phase 2 診斷資料整合層 +============================================== +ADR-030: 智能自動修復系統 + +整合多來源診斷資料: +- K8s Diagnostics: Pod Events, Logs, Resource Usage +- SignOz Metrics: Gold Metrics, Error Logs +- Expert Rules: 規則匹配與診斷建議 + +設計原則: +- 非同步並行收集,最大化效能 +- 錯誤容忍,部分失敗不影響整體 +- 提供結構化 Context 給 LLM 分析 + +版本: v1.0 +建立: 2026-03-26 (台北時區) +""" + +from dataclasses import dataclass, field +from datetime import UTC, datetime +from enum import Enum +from typing import Any + +import structlog + +from src.services.k8s_diagnostics import ( + K8sDiagnostics, + get_k8s_diagnostics_service, +) +from src.services.signoz_client import ( + GoldMetrics, + get_signoz_client, +) + +logger = structlog.get_logger(__name__) + + +# ============================================================================= +# Diagnosis Severity +# ============================================================================= + + +class DiagnosisSeverity(str, Enum): + """診斷嚴重程度""" + + CRITICAL = "critical" # 需立即處理 (服務中斷、資料遺失風險) + HIGH = "high" # 1 小時內處理 (效能嚴重下降) + MEDIUM = "medium" # 24 小時內處理 (異常但服務可用) + LOW = "low" # 追蹤觀察 (輕微異常) + INFO = "info" # 資訊性,無需處理 + + +# ============================================================================= +# Data Models +# ============================================================================= + + +@dataclass +class DiagnosisSignal: + """診斷信號 (來自各資料源的發現)""" + + source: str # k8s_events, k8s_logs, signoz_metrics, signoz_logs, expert_rules + signal_type: str # oom_killed, crash_loop, high_error_rate, etc. + severity: DiagnosisSeverity + message: str + evidence: dict[str, Any] = field(default_factory=dict) # 證據資料 + timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) + + def to_dict(self) -> dict[str, Any]: + return { + "source": self.source, + "signal_type": self.signal_type, + "severity": self.severity.value, + "message": self.message, + "evidence": self.evidence, + "timestamp": self.timestamp.isoformat(), + } + + +@dataclass +class DiagnosisContext: + """ + 診斷上下文 - 整合所有來源的診斷資料 + + 提供給 LLM 分析的完整 Context + """ + + # 識別資訊 + target: str # Pod name, Service name, etc. + namespace: str = "awoooi-prod" + collected_at: datetime = field(default_factory=lambda: datetime.now(UTC)) + + # 診斷資料 + k8s_diagnostics: K8sDiagnostics | None = None + gold_metrics: GoldMetrics | None = None + error_logs: list[dict] = field(default_factory=list) + + # 診斷信號 (各來源的發現) + signals: list[DiagnosisSignal] = field(default_factory=list) + + # Expert System 匹配結果 + expert_match: dict[str, Any] | None = None + + # 收集錯誤 + collection_errors: list[str] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + """轉換為字典 (供 JSON 序列化)""" + return { + "target": self.target, + "namespace": self.namespace, + "collected_at": self.collected_at.isoformat(), + "k8s_diagnostics": self.k8s_diagnostics.to_dict() if self.k8s_diagnostics else None, + "gold_metrics": { + "rps": self.gold_metrics.rps if self.gold_metrics else None, + "error_rate": self.gold_metrics.error_rate if self.gold_metrics else None, + "p99_latency_ms": self.gold_metrics.p99_latency_ms if self.gold_metrics else None, + } if self.gold_metrics else None, + "error_logs_count": len(self.error_logs), + "signals": [s.to_dict() for s in self.signals], + "expert_match": self.expert_match, + "collection_errors": self.collection_errors, + } + + @property + def highest_severity(self) -> DiagnosisSeverity: + """取得最高嚴重程度""" + if not self.signals: + return DiagnosisSeverity.INFO + + severity_order = [ + DiagnosisSeverity.CRITICAL, + DiagnosisSeverity.HIGH, + DiagnosisSeverity.MEDIUM, + DiagnosisSeverity.LOW, + DiagnosisSeverity.INFO, + ] + + for severity in severity_order: + if any(s.severity == severity for s in self.signals): + return severity + + return DiagnosisSeverity.INFO + + def get_llm_prompt_context(self) -> str: + """ + 生成 LLM 分析用的 Prompt Context + + 結構化呈現所有診斷資訊,讓 LLM 做出更好的判斷 + """ + sections = [] + + # 1. Target Info + sections.append(f"## 診斷目標\n- Target: {self.target}\n- Namespace: {self.namespace}") + + # 2. K8s Diagnostics + if self.k8s_diagnostics: + k8s_summary = self.k8s_diagnostics.get_diagnosis_summary() + sections.append(f"## K8s 診斷\n{k8s_summary}") + + # 警告事件詳情 + if self.k8s_diagnostics.warning_events: + events_text = "\n".join( + f"- [{e.reason}] {e.message[:150]}" + for e in self.k8s_diagnostics.warning_events[:5] + ) + sections.append(f"## K8s 警告事件\n{events_text}") + + # 3. Gold Metrics + if self.gold_metrics: + sections.append(f"## SignOz 黃金指標\n{self.gold_metrics.to_summary()}") + + # 4. Error Logs + if self.error_logs: + log_text = "\n".join( + f"- [{log.get('severity', 'ERROR')}] {log.get('message', '')[:100]}" + for log in self.error_logs[:5] + ) + sections.append(f"## 錯誤日誌 (最近 {len(self.error_logs)} 筆)\n{log_text}") + + # 5. Signals + if self.signals: + signals_text = "\n".join( + f"- [{s.severity.value.upper()}] {s.source}: {s.message}" + for s in sorted(self.signals, key=lambda x: x.severity.value) + ) + sections.append(f"## 診斷信號\n{signals_text}") + + # 6. Expert Match + if self.expert_match: + sections.append( + f"## Expert System 匹配\n" + f"- 規則: {self.expert_match.get('rule_name', 'N/A')}\n" + f"- 說明: {self.expert_match.get('description', 'N/A')}\n" + f"- 風險: {self.expert_match.get('risk_level', 'N/A')}\n" + f"- 推理: {self.expert_match.get('reasoning', 'N/A')}" + ) + + return "\n\n".join(sections) + + +# ============================================================================= +# Diagnosis Aggregator Service +# ============================================================================= + + +class DiagnosisAggregator: + """ + 診斷資料聚合器 + + 整合 K8s、SignOz、Expert System 等多來源診斷資料 + 提供統一的 DiagnosisContext 供 LLM 或決策引擎使用 + """ + + def __init__(self): + self.k8s_service = get_k8s_diagnostics_service() + self.signoz_client = get_signoz_client() + + async def collect_pod_diagnosis( + self, + pod_name: str, + namespace: str = "awoooi-prod", + include_signoz: bool = True, + include_error_logs: bool = True, + expert_match: dict | None = None, + ) -> DiagnosisContext: + """ + 收集 Pod 的完整診斷資料 + + Args: + pod_name: Pod 名稱 (支援部分匹配) + namespace: Namespace + include_signoz: 是否包含 SignOz 指標 + include_error_logs: 是否包含錯誤日誌 + expert_match: Expert System 匹配結果 + + Returns: + DiagnosisContext: 完整診斷上下文 + """ + context = DiagnosisContext( + target=pod_name, + namespace=namespace, + expert_match=expert_match, + ) + + import asyncio + + # 並行收集資料 + tasks = [] + + # K8s Diagnostics (必收集) + tasks.append(self._collect_k8s_diagnostics(context, pod_name, namespace)) + + # SignOz Metrics (可選) + if include_signoz: + # 從 pod_name 推斷 service_name (去除 hash suffix) + service_name = self._pod_to_service_name(pod_name) + tasks.append(self._collect_signoz_metrics(context, service_name)) + + if include_error_logs: + tasks.append(self._collect_error_logs(context, service_name)) + + await asyncio.gather(*tasks, return_exceptions=True) + + # 分析診斷資料,產生信號 + self._analyze_signals(context) + + logger.info( + "diagnosis_collected", + target=pod_name, + signals_count=len(context.signals), + highest_severity=context.highest_severity.value, + errors_count=len(context.collection_errors), + ) + + return context + + async def collect_service_diagnosis( + self, + service_name: str, + namespace: str = "awoooi-prod", + expert_match: dict | None = None, + ) -> DiagnosisContext: + """ + 收集 Service 的診斷資料 (不含特定 Pod) + + 主要用於服務級別的監控告警分析 + """ + context = DiagnosisContext( + target=service_name, + namespace=namespace, + expert_match=expert_match, + ) + + import asyncio + + await asyncio.gather( + self._collect_signoz_metrics(context, service_name), + self._collect_error_logs(context, service_name), + return_exceptions=True, + ) + + self._analyze_signals(context) + + return context + + # ========================================================================= + # Private Collection Methods + # ========================================================================= + + async def _collect_k8s_diagnostics( + self, + context: DiagnosisContext, + pod_name: str, + namespace: str, + ) -> None: + """收集 K8s 診斷資料""" + try: + diagnostics = await self.k8s_service.collect_diagnostics( + pod_name=pod_name, + namespace=namespace, + include_logs=True, + include_previous_logs=True, + log_tail_lines=100, + ) + context.k8s_diagnostics = diagnostics + + # 傳遞 K8s 收集錯誤 + if diagnostics.errors: + context.collection_errors.extend( + [f"k8s: {e}" for e in diagnostics.errors] + ) + + except Exception as e: + error_msg = f"K8s diagnostics failed: {e}" + context.collection_errors.append(error_msg) + logger.warning("k8s_diagnostics_collection_failed", error=str(e)) + + async def _collect_signoz_metrics( + self, + context: DiagnosisContext, + service_name: str, + ) -> None: + """收集 SignOz Gold Metrics""" + try: + metrics = await self.signoz_client.get_gold_metrics( + service_name=service_name, + namespace=context.namespace, + time_window_minutes=10, + ) + context.gold_metrics = metrics + + except Exception as e: + error_msg = f"SignOz metrics failed: {e}" + context.collection_errors.append(error_msg) + logger.warning("signoz_metrics_collection_failed", error=str(e)) + + async def _collect_error_logs( + self, + context: DiagnosisContext, + service_name: str, + ) -> None: + """收集錯誤日誌""" + try: + logs = await self.signoz_client.get_logs( + service_name=service_name, + severity="ERROR,FATAL,CRITICAL", + time_window_minutes=30, + limit=20, + ) + context.error_logs = logs + + except Exception as e: + error_msg = f"Error logs failed: {e}" + context.collection_errors.append(error_msg) + logger.warning("error_logs_collection_failed", error=str(e)) + + # ========================================================================= + # Signal Analysis + # ========================================================================= + + def _analyze_signals(self, context: DiagnosisContext) -> None: + """分析診斷資料,產生診斷信號""" + + # 1. K8s Signals + if context.k8s_diagnostics: + self._analyze_k8s_signals(context, context.k8s_diagnostics) + + # 2. SignOz Metrics Signals + if context.gold_metrics: + self._analyze_metrics_signals(context, context.gold_metrics) + + # 3. Error Log Signals + if context.error_logs: + self._analyze_log_signals(context, context.error_logs) + + def _analyze_k8s_signals( + self, + context: DiagnosisContext, + k8s: K8sDiagnostics, + ) -> None: + """分析 K8s 診斷資料產生信號""" + + # CrashLoopBackOff + if k8s.pod_status and k8s.pod_status.is_crash_loop(): + context.signals.append(DiagnosisSignal( + source="k8s_status", + signal_type="crash_loop", + severity=DiagnosisSeverity.CRITICAL, + message=f"Pod {k8s.pod_name} is in CrashLoopBackOff state", + evidence={ + "restart_count": k8s.pod_status.restart_count, + "container_statuses": k8s.pod_status.container_statuses, + }, + )) + + # Image Pull Error + if k8s.pod_status and k8s.pod_status.is_image_pull_error(): + context.signals.append(DiagnosisSignal( + source="k8s_status", + signal_type="image_pull_error", + severity=DiagnosisSeverity.HIGH, + message=f"Pod {k8s.pod_name} has image pull error", + evidence={ + "container_statuses": k8s.pod_status.container_statuses, + }, + )) + + # High Restart Count + if k8s.pod_status and k8s.pod_status.restart_count > 5: + context.signals.append(DiagnosisSignal( + source="k8s_status", + signal_type="high_restart_count", + severity=DiagnosisSeverity.MEDIUM, + message=f"Pod {k8s.pod_name} has high restart count: {k8s.pod_status.restart_count}", + evidence={ + "restart_count": k8s.pod_status.restart_count, + }, + )) + + # High Resource Usage + if k8s.resource_usage: + if k8s.resource_usage.is_cpu_high(threshold=80): + context.signals.append(DiagnosisSignal( + source="k8s_metrics", + signal_type="high_cpu", + severity=DiagnosisSeverity.MEDIUM, + message=f"High CPU usage: {k8s.resource_usage.cpu_percent:.1f}%", + evidence=k8s.resource_usage.to_dict(), + )) + + if k8s.resource_usage.is_memory_high(threshold=80): + context.signals.append(DiagnosisSignal( + source="k8s_metrics", + signal_type="high_memory", + severity=DiagnosisSeverity.HIGH, + message=f"High memory usage: {k8s.resource_usage.memory_percent:.1f}%", + evidence=k8s.resource_usage.to_dict(), + )) + + # Warning Events + for event in k8s.warning_events: + if event.is_recent(minutes=15): + # OOMKilled + if "oom" in event.message.lower() or "oomkilled" in event.reason.lower(): + context.signals.append(DiagnosisSignal( + source="k8s_events", + signal_type="oom_killed", + severity=DiagnosisSeverity.CRITICAL, + message=f"OOMKilled detected: {event.message[:100]}", + evidence=event.to_dict(), + )) + # FailedScheduling + elif "failedscheduling" in event.reason.lower(): + context.signals.append(DiagnosisSignal( + source="k8s_events", + signal_type="failed_scheduling", + severity=DiagnosisSeverity.HIGH, + message=f"Failed to schedule: {event.message[:100]}", + evidence=event.to_dict(), + )) + + def _analyze_metrics_signals( + self, + context: DiagnosisContext, + metrics: GoldMetrics, + ) -> None: + """分析 SignOz Metrics 產生信號""" + + # High Error Rate (> 5%) + if metrics.error_rate > 5: + context.signals.append(DiagnosisSignal( + source="signoz_metrics", + signal_type="high_error_rate", + severity=DiagnosisSeverity.CRITICAL if metrics.error_rate > 20 else DiagnosisSeverity.HIGH, + message=f"High error rate: {metrics.error_rate:.2f}%", + evidence={ + "error_rate": metrics.error_rate, + "error_count": metrics.error_count, + "total_requests": metrics.total_requests, + }, + )) + + # High Latency (P99 > 5s) + if metrics.p99_latency_ms > 5000: + context.signals.append(DiagnosisSignal( + source="signoz_metrics", + signal_type="high_latency", + severity=DiagnosisSeverity.MEDIUM if metrics.p99_latency_ms < 10000 else DiagnosisSeverity.HIGH, + message=f"High P99 latency: {metrics.p99_latency_ms:.0f}ms", + evidence={ + "p50_ms": metrics.p50_latency_ms, + "p95_ms": metrics.p95_latency_ms, + "p99_ms": metrics.p99_latency_ms, + }, + )) + + # Low/No Traffic + if metrics.rps < 0.01 and metrics.total_requests < 10: + context.signals.append(DiagnosisSignal( + source="signoz_metrics", + signal_type="no_traffic", + severity=DiagnosisSeverity.LOW, + message=f"Low/No traffic detected: {metrics.rps:.2f} RPS", + evidence={ + "rps": metrics.rps, + "total_requests": metrics.total_requests, + }, + )) + + def _analyze_log_signals( + self, + context: DiagnosisContext, + logs: list[dict], + ) -> None: + """分析錯誤日誌產生信號""" + + if not logs: + return + + # 計算各類錯誤數量 + error_count = len(logs) + + if error_count > 10: + # 取樣錯誤訊息 + sample_messages = [log.get("message", "")[:100] for log in logs[:3]] + context.signals.append(DiagnosisSignal( + source="signoz_logs", + signal_type="frequent_errors", + severity=DiagnosisSeverity.MEDIUM if error_count < 50 else DiagnosisSeverity.HIGH, + message=f"Frequent errors detected: {error_count} errors in last 30 minutes", + evidence={ + "error_count": error_count, + "sample_messages": sample_messages, + }, + )) + + # ========================================================================= + # Utilities + # ========================================================================= + + def _pod_to_service_name(self, pod_name: str) -> str: + """ + 從 Pod 名稱推斷 Service 名稱 + + 例如: + - awoooi-api-7f9d8b6c5d-x2k4j -> awoooi-api + - awoooi-web-5c8d7e6f4a-h3m9n -> awoooi-web + """ + # 移除 Deployment hash suffix + parts = pod_name.rsplit("-", 2) + if len(parts) >= 3: + return "-".join(parts[:-2]) + return pod_name + + +# ============================================================================= +# Singleton +# ============================================================================= + +_aggregator: DiagnosisAggregator | None = None + + +def get_diagnosis_aggregator() -> DiagnosisAggregator: + """取得診斷聚合器 singleton""" + global _aggregator + if _aggregator is None: + _aggregator = DiagnosisAggregator() + return _aggregator diff --git a/apps/api/src/services/k8s_diagnostics.py b/apps/api/src/services/k8s_diagnostics.py new file mode 100644 index 00000000..603c646f --- /dev/null +++ b/apps/api/src/services/k8s_diagnostics.py @@ -0,0 +1,654 @@ +""" +K8s Diagnostics Service - Phase 2 資料收集強化 +============================================== +ADR-030: 智能自動修復系統 + +提供 K8s 診斷資料收集: +- Pod Events (kubectl get events) +- Pod Logs (kubectl logs) +- Resource Usage (kubectl top) + +設計原則: +- 非同步執行,不阻塞主流程 +- 錯誤容忍,單一失敗不影響整體 +- 結果快取,避免重複查詢 + +版本: v1.0 +建立: 2026-03-27 (台北時區) +""" + +from dataclasses import dataclass, field +from datetime import UTC, datetime +from enum import Enum +from typing import Any + +import structlog + +from src.core.config import settings + +logger = structlog.get_logger(__name__) + +# Lazy import kubernetes_asyncio to avoid import errors when not installed +_k8s_client = None +_k8s_config_loaded = False + + +async def _get_k8s_client(): + """Lazy load kubernetes client""" + global _k8s_client, _k8s_config_loaded + + if _k8s_client is not None: + return _k8s_client + + try: + from kubernetes_asyncio import client, config + + if not _k8s_config_loaded: + try: + # 優先使用 in-cluster 配置 + config.load_incluster_config() + logger.info("k8s_diagnostics_incluster_config") + except config.ConfigException: + # Fallback 到 kubeconfig + await config.load_kube_config(config_file=settings.KUBECONFIG_PATH) + logger.info("k8s_diagnostics_kubeconfig", path=settings.KUBECONFIG_PATH) + _k8s_config_loaded = True + + _k8s_client = client + return _k8s_client + + except Exception as e: + logger.error("k8s_diagnostics_init_failed", error=str(e)) + return None + + +# ============================================================================= +# Data Models +# ============================================================================= + + +class EventType(str, Enum): + """K8s Event 類型""" + + NORMAL = "Normal" + WARNING = "Warning" + + +@dataclass +class K8sEvent: + """K8s Event 資料""" + + type: EventType + reason: str + message: str + count: int + first_timestamp: datetime | None + last_timestamp: datetime | None + source_component: str + involved_object: str + + def to_dict(self) -> dict[str, Any]: + return { + "type": self.type.value, + "reason": self.reason, + "message": self.message, + "count": self.count, + "first_timestamp": self.first_timestamp.isoformat() if self.first_timestamp else None, + "last_timestamp": self.last_timestamp.isoformat() if self.last_timestamp else None, + "source_component": self.source_component, + "involved_object": self.involved_object, + } + + def is_warning(self) -> bool: + return self.type == EventType.WARNING + + def is_recent(self, minutes: int = 30) -> bool: + """檢查是否為最近的事件""" + if not self.last_timestamp: + return False + age = datetime.now(UTC) - self.last_timestamp + return age.total_seconds() < minutes * 60 + + +@dataclass +class ResourceUsage: + """資源使用量""" + + cpu_millicores: int # 毫核 (1000m = 1 core) + memory_bytes: int # Bytes + cpu_limit_millicores: int | None = None + memory_limit_bytes: int | None = None + + @property + def cpu_percent(self) -> float | None: + """CPU 使用率 (相對於 limit)""" + if self.cpu_limit_millicores: + return (self.cpu_millicores / self.cpu_limit_millicores) * 100 + return None + + @property + def memory_percent(self) -> float | None: + """Memory 使用率 (相對於 limit)""" + if self.memory_limit_bytes: + return (self.memory_bytes / self.memory_limit_bytes) * 100 + return None + + @property + def memory_mb(self) -> float: + """Memory in MB""" + return self.memory_bytes / (1024 * 1024) + + def to_dict(self) -> dict[str, Any]: + return { + "cpu_millicores": self.cpu_millicores, + "cpu_percent": round(self.cpu_percent, 1) if self.cpu_percent else None, + "memory_bytes": self.memory_bytes, + "memory_mb": round(self.memory_mb, 1), + "memory_percent": round(self.memory_percent, 1) if self.memory_percent else None, + } + + def is_cpu_high(self, threshold: float = 80.0) -> bool: + """CPU 使用率是否過高""" + return self.cpu_percent is not None and self.cpu_percent > threshold + + def is_memory_high(self, threshold: float = 80.0) -> bool: + """Memory 使用率是否過高""" + return self.memory_percent is not None and self.memory_percent > threshold + + +@dataclass +class PodStatus: + """Pod 狀態詳情""" + + name: str + namespace: str + phase: str # Pending, Running, Succeeded, Failed, Unknown + ready: bool + restart_count: int + container_statuses: list[dict[str, Any]] = field(default_factory=list) + conditions: list[dict[str, Any]] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "namespace": self.namespace, + "phase": self.phase, + "ready": self.ready, + "restart_count": self.restart_count, + "container_statuses": self.container_statuses, + "conditions": self.conditions, + } + + def is_healthy(self) -> bool: + return self.phase == "Running" and self.ready + + def is_crash_loop(self) -> bool: + """檢查是否處於 CrashLoopBackOff""" + for cs in self.container_statuses: + waiting = cs.get("state", {}).get("waiting", {}) + if waiting.get("reason") == "CrashLoopBackOff": + return True + return False + + def is_image_pull_error(self) -> bool: + """檢查是否為 Image Pull 錯誤""" + for cs in self.container_statuses: + waiting = cs.get("state", {}).get("waiting", {}) + reason = waiting.get("reason", "") + if reason in ("ImagePullBackOff", "ErrImagePull", "ErrImageNeverPull"): + return True + return False + + +@dataclass +class K8sDiagnostics: + """K8s 診斷資料彙總""" + + pod_name: str + namespace: str + collected_at: datetime = field(default_factory=lambda: datetime.now(UTC)) + + # 診斷資料 + pod_status: PodStatus | None = None + events: list[K8sEvent] = field(default_factory=list) + logs: str = "" + previous_logs: str = "" + resource_usage: ResourceUsage | None = None + + # 錯誤記錄 + errors: list[str] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "pod_name": self.pod_name, + "namespace": self.namespace, + "collected_at": self.collected_at.isoformat(), + "pod_status": self.pod_status.to_dict() if self.pod_status else None, + "events": [e.to_dict() for e in self.events], + "logs_length": len(self.logs), + "previous_logs_length": len(self.previous_logs), + "resource_usage": self.resource_usage.to_dict() if self.resource_usage else None, + "errors": self.errors, + } + + @property + def warning_events(self) -> list[K8sEvent]: + """取得警告類型的事件""" + return [e for e in self.events if e.is_warning()] + + @property + def recent_events(self) -> list[K8sEvent]: + """取得最近 30 分鐘的事件""" + return [e for e in self.events if e.is_recent(30)] + + def get_diagnosis_summary(self) -> str: + """產生診斷摘要""" + lines = [] + + if self.pod_status: + lines.append(f"Pod Phase: {self.pod_status.phase}") + lines.append(f"Ready: {self.pod_status.ready}") + lines.append(f"Restart Count: {self.pod_status.restart_count}") + + if self.pod_status.is_crash_loop(): + lines.append("WARNING: CrashLoopBackOff detected!") + if self.pod_status.is_image_pull_error(): + lines.append("WARNING: Image Pull Error detected!") + + if self.resource_usage: + if self.resource_usage.is_cpu_high(): + lines.append(f"WARNING: High CPU usage ({self.resource_usage.cpu_percent:.1f}%)") + if self.resource_usage.is_memory_high(): + lines.append(f"WARNING: High Memory usage ({self.resource_usage.memory_percent:.1f}%)") + + warning_count = len(self.warning_events) + if warning_count > 0: + lines.append(f"Warning Events: {warning_count}") + for e in self.warning_events[:3]: # 最多顯示 3 個 + lines.append(f" - {e.reason}: {e.message[:100]}") + + if self.errors: + lines.append(f"Collection Errors: {len(self.errors)}") + + return "\n".join(lines) if lines else "No issues detected" + + +# ============================================================================= +# K8s Diagnostics Service +# ============================================================================= + + +class K8sDiagnosticsService: + """ + K8s 診斷資料收集服務 + + 功能: + - 取得 Pod Events + - 取得 Pod Logs (current + previous) + - 取得 Resource Usage (via metrics-server) + - 取得 Pod Status + + 設計: + - 非同步並行收集 + - 單一失敗不影響整體 + - 結果包含錯誤資訊 + """ + + def __init__(self, default_namespace: str = "awoooi-prod"): + self.default_namespace = default_namespace + + async def collect_diagnostics( + self, + pod_name: str, + namespace: str | None = None, + include_logs: bool = True, + include_previous_logs: bool = True, + log_tail_lines: int = 100, + ) -> K8sDiagnostics: + """ + 收集完整的 K8s 診斷資料 + + Args: + pod_name: Pod 名稱 (可以是部分名稱,會自動匹配) + namespace: Namespace (預設 awoooi-prod) + include_logs: 是否包含日誌 + include_previous_logs: 是否包含上一次容器的日誌 + log_tail_lines: 日誌行數 + + Returns: + K8sDiagnostics 包含所有收集到的資料 + """ + ns = namespace or self.default_namespace + diagnostics = K8sDiagnostics(pod_name=pod_name, namespace=ns) + + client = await _get_k8s_client() + if not client: + diagnostics.errors.append("K8s client initialization failed") + return diagnostics + + # 先找到實際的 Pod 名稱 (支援部分匹配) + actual_pod_name = await self._find_pod(client, pod_name, ns) + if not actual_pod_name: + diagnostics.errors.append(f"Pod not found: {pod_name}") + return diagnostics + + diagnostics.pod_name = actual_pod_name + + # 並行收集所有資料 + import asyncio + + tasks = [ + self._get_pod_status(client, actual_pod_name, ns), + self._get_pod_events(client, actual_pod_name, ns), + ] + + if include_logs: + tasks.append(self._get_pod_logs(client, actual_pod_name, ns, log_tail_lines, previous=False)) + if include_previous_logs: + tasks.append(self._get_pod_logs(client, actual_pod_name, ns, log_tail_lines, previous=True)) + + # 資源使用量需要 metrics-server + tasks.append(self._get_resource_usage(client, actual_pod_name, ns)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 處理結果 + idx = 0 + + # Pod Status + if isinstance(results[idx], Exception): + diagnostics.errors.append(f"Pod status: {results[idx]}") + else: + diagnostics.pod_status = results[idx] + idx += 1 + + # Events + if isinstance(results[idx], Exception): + diagnostics.errors.append(f"Events: {results[idx]}") + else: + diagnostics.events = results[idx] or [] + idx += 1 + + # Logs + if include_logs: + if isinstance(results[idx], Exception): + diagnostics.errors.append(f"Logs: {results[idx]}") + else: + diagnostics.logs = results[idx] or "" + idx += 1 + + if include_previous_logs: + if isinstance(results[idx], Exception): + # Previous logs 失敗很常見 (沒有 previous container) + pass + else: + diagnostics.previous_logs = results[idx] or "" + idx += 1 + + # Resource Usage + if isinstance(results[idx], Exception): + diagnostics.errors.append(f"Resource usage: {results[idx]}") + else: + diagnostics.resource_usage = results[idx] + + logger.info( + "k8s_diagnostics_collected", + pod_name=actual_pod_name, + namespace=ns, + has_status=diagnostics.pod_status is not None, + events_count=len(diagnostics.events), + logs_length=len(diagnostics.logs), + has_resource_usage=diagnostics.resource_usage is not None, + errors_count=len(diagnostics.errors), + ) + + return diagnostics + + async def _find_pod( + self, + client, + pod_name: str, + namespace: str, + ) -> str | None: + """找到實際的 Pod 名稱 (支援部分匹配)""" + try: + v1 = client.CoreV1Api() + + # 先嘗試精確匹配 + try: + await v1.read_namespaced_pod(name=pod_name, namespace=namespace) + return pod_name + except client.exceptions.ApiException as e: + if e.status != 404: + raise + + # 部分匹配 (用於 Deployment Pod 名稱) + pods = await v1.list_namespaced_pod(namespace=namespace) + for pod in pods.items: + if pod_name in pod.metadata.name: + return pod.metadata.name + + return None + + except Exception as e: + logger.warning("k8s_find_pod_failed", pod_name=pod_name, error=str(e)) + return None + + async def _get_pod_status( + self, + client, + pod_name: str, + namespace: str, + ) -> PodStatus | None: + """取得 Pod 狀態""" + try: + v1 = client.CoreV1Api() + pod = await v1.read_namespaced_pod(name=pod_name, namespace=namespace) + + # 計算 restart count + restart_count = 0 + container_statuses = [] + if pod.status.container_statuses: + for cs in pod.status.container_statuses: + restart_count += cs.restart_count or 0 + container_statuses.append({ + "name": cs.name, + "ready": cs.ready, + "restart_count": cs.restart_count, + "state": { + "running": cs.state.running is not None, + "waiting": { + "reason": cs.state.waiting.reason if cs.state.waiting else None, + "message": cs.state.waiting.message if cs.state.waiting else None, + } if cs.state.waiting else None, + "terminated": { + "reason": cs.state.terminated.reason if cs.state.terminated else None, + "exit_code": cs.state.terminated.exit_code if cs.state.terminated else None, + } if cs.state.terminated else None, + }, + }) + + # Ready 條件 + ready = False + conditions = [] + if pod.status.conditions: + for c in pod.status.conditions: + conditions.append({ + "type": c.type, + "status": c.status, + "reason": c.reason, + "message": c.message, + }) + if c.type == "Ready" and c.status == "True": + ready = True + + return PodStatus( + name=pod_name, + namespace=namespace, + phase=pod.status.phase, + ready=ready, + restart_count=restart_count, + container_statuses=container_statuses, + conditions=conditions, + ) + + except Exception as e: + logger.warning("k8s_get_pod_status_failed", pod_name=pod_name, error=str(e)) + raise + + async def _get_pod_events( + self, + client, + pod_name: str, + namespace: str, + limit: int = 20, + ) -> list[K8sEvent]: + """取得 Pod 相關 Events""" + try: + v1 = client.CoreV1Api() + + # 取得該 namespace 的所有 events,然後過濾 + field_selector = f"involvedObject.name={pod_name}" + events = await v1.list_namespaced_event( + namespace=namespace, + field_selector=field_selector, + limit=limit, + ) + + result = [] + for e in events.items: + result.append(K8sEvent( + type=EventType(e.type) if e.type else EventType.NORMAL, + reason=e.reason or "", + message=e.message or "", + count=e.count or 1, + first_timestamp=e.first_timestamp.replace(tzinfo=UTC) if e.first_timestamp else None, + last_timestamp=e.last_timestamp.replace(tzinfo=UTC) if e.last_timestamp else None, + source_component=e.source.component if e.source else "", + involved_object=f"{e.involved_object.kind}/{e.involved_object.name}" if e.involved_object else "", + )) + + # 按最後時間排序 + result.sort(key=lambda x: x.last_timestamp or datetime.min.replace(tzinfo=UTC), reverse=True) + return result + + except Exception as e: + logger.warning("k8s_get_pod_events_failed", pod_name=pod_name, error=str(e)) + raise + + async def _get_pod_logs( + self, + client, + pod_name: str, + namespace: str, + tail_lines: int = 100, + previous: bool = False, + ) -> str: + """取得 Pod 日誌""" + try: + v1 = client.CoreV1Api() + logs = await v1.read_namespaced_pod_log( + name=pod_name, + namespace=namespace, + tail_lines=tail_lines, + previous=previous, + ) + return logs or "" + + except Exception as e: + # Previous logs 失敗很常見 + if not previous: + logger.warning("k8s_get_pod_logs_failed", pod_name=pod_name, error=str(e)) + raise + + async def _get_resource_usage( + self, + client, + pod_name: str, + namespace: str, + ) -> ResourceUsage | None: + """取得資源使用量 (需要 metrics-server)""" + try: + # 使用 CustomObjectsApi 查詢 metrics + custom_api = client.CustomObjectsApi() + metrics = await custom_api.get_namespaced_custom_object( + group="metrics.k8s.io", + version="v1beta1", + namespace=namespace, + plural="pods", + name=pod_name, + ) + + # 解析 metrics + total_cpu = 0 + total_memory = 0 + for container in metrics.get("containers", []): + usage = container.get("usage", {}) + cpu = usage.get("cpu", "0") + memory = usage.get("memory", "0") + + # 解析 CPU (可能是 "100m" 或 "1") + if cpu.endswith("n"): + total_cpu += int(cpu[:-1]) // 1000000 # nano to milli + elif cpu.endswith("m"): + total_cpu += int(cpu[:-1]) + else: + total_cpu += int(float(cpu) * 1000) + + # 解析 Memory (可能是 "100Mi", "1Gi", "1000000Ki") + if memory.endswith("Ki"): + total_memory += int(memory[:-2]) * 1024 + elif memory.endswith("Mi"): + total_memory += int(memory[:-2]) * 1024 * 1024 + elif memory.endswith("Gi"): + total_memory += int(memory[:-2]) * 1024 * 1024 * 1024 + else: + total_memory += int(memory) + + # 取得 limits (from pod spec) + v1 = client.CoreV1Api() + pod = await v1.read_namespaced_pod(name=pod_name, namespace=namespace) + + cpu_limit = None + memory_limit = None + for container in pod.spec.containers: + if container.resources and container.resources.limits: + limits = container.resources.limits + if "cpu" in limits: + cpu_str = limits["cpu"] + if cpu_str.endswith("m"): + cpu_limit = (cpu_limit or 0) + int(cpu_str[:-1]) + else: + cpu_limit = (cpu_limit or 0) + int(float(cpu_str) * 1000) + if "memory" in limits: + mem_str = limits["memory"] + if mem_str.endswith("Mi"): + memory_limit = (memory_limit or 0) + int(mem_str[:-2]) * 1024 * 1024 + elif mem_str.endswith("Gi"): + memory_limit = (memory_limit or 0) + int(mem_str[:-2]) * 1024 * 1024 * 1024 + + return ResourceUsage( + cpu_millicores=total_cpu, + memory_bytes=total_memory, + cpu_limit_millicores=cpu_limit, + memory_limit_bytes=memory_limit, + ) + + except Exception as e: + logger.warning("k8s_get_resource_usage_failed", pod_name=pod_name, error=str(e)) + raise + + +# ============================================================================= +# Singleton +# ============================================================================= + +_diagnostics_service: K8sDiagnosticsService | None = None + + +def get_k8s_diagnostics_service() -> K8sDiagnosticsService: + """取得 K8s 診斷服務 singleton""" + global _diagnostics_service + if _diagnostics_service is None: + _diagnostics_service = K8sDiagnosticsService() + return _diagnostics_service diff --git a/apps/api/src/services/openclaw.py b/apps/api/src/services/openclaw.py index 9b05a1fc..bda9b3be 100644 --- a/apps/api/src/services/openclaw.py +++ b/apps/api/src/services/openclaw.py @@ -1119,10 +1119,22 @@ Trace URL: {signoz_trace_url} """ # 2026-03-27: 整合 Expert System 診斷上下文 + # 2026-03-26: ADR-030 Phase 2 - 加入 K8s/SignOz 診斷上下文 expert_diagnosis_context = "" if expert_context: diagnosis_cmds = expert_context.get("suggested_diagnosis_commands", []) diagnosis_cmds_str = "\n".join([f" - `{cmd}`" for cmd in diagnosis_cmds]) if diagnosis_cmds else " - (無)" + + # ADR-030: 加入完整診斷上下文 (如果有) + full_diagnosis = expert_context.get("diagnosis_context", "") + diagnosis_signals = expert_context.get("diagnosis_signals", []) + signals_summary = "" + if diagnosis_signals: + signals_summary = "\n".join([ + f" - [{s.get('severity', 'info').upper()}] {s.get('source', 'unknown')}: {s.get('message', 'N/A')[:100]}" + for s in diagnosis_signals[:5] + ]) + expert_diagnosis_context = f""" ## 🔍 Expert System Initial Diagnosis - **Matched Rule**: {expert_context.get('initial_diagnosis', 'unknown')} @@ -1132,8 +1144,15 @@ Trace URL: {signoz_trace_url} - **Suggested Diagnosis Commands**: {diagnosis_cmds_str} -**IMPORTANT**: The Expert System has provided an initial diagnosis. -Consider this context but apply your own analysis. If Expert says "human review required", +{f'''## 🩺 K8s/SignOz Deep Diagnosis (ADR-030) +{full_diagnosis} + +### Diagnosis Signals +{signals_summary if signals_summary else " - (No signals detected)"} +''' if full_diagnosis else ''} + +**IMPORTANT**: The Expert System and Diagnostic Aggregator have provided context. +Consider this data but apply your own analysis. If Expert says "human review required", provide diagnostic guidance rather than automated fixes. """