feat(api): ADR-030 Phase 2 診斷資料收集強化

實作智能自動修復系統的資料收集層:

1. k8s_diagnostics.py - K8s 診斷服務
   - Pod Events/Logs/ResourceUsage 收集
   - CrashLoopBackOff/OOM/ImagePull 偵測
   - 非同步並行收集 + 錯誤容忍

2. diagnosis_aggregator.py - 診斷聚合器
   - 整合 K8s + SignOz + Expert Rules
   - DiagnosisContext 提供結構化 LLM Prompt
   - DiagnosisSignal 信號分析

3. decision_manager.py - 決策引擎整合
   - Step 2.5 加入診斷收集
   - 傳遞 diagnosis_context 給 LLM

4. openclaw.py - LLM Prompt 增強
   - 整合 K8s/SignOz 深度診斷上下文
   - 支援 diagnosis_signals 摘要

ADR-030 架構: 診斷先行,根因分析,非盲目重啟

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-03-26 21:55:50 +08:00
parent bb6151cf44
commit 60e9538889
4 changed files with 1301 additions and 3 deletions

View File

@@ -31,6 +31,7 @@ from src.core.config import settings
from src.core.redis_client import get_redis
from src.models.incident import Incident
from src.models.playbook import SymptomPattern
from src.services.diagnosis_aggregator import get_diagnosis_aggregator
from src.services.openclaw import get_openclaw
from src.services.playbook_service import get_playbook_service
@@ -600,7 +601,36 @@ class DecisionManager:
)
return expert_result
# Step 3: 準備 LLM 上下文 (含 Expert 診斷)
# Step 2.5: ADR-030 診斷資料收集 (Phase 2)
# 使用 DiagnosisAggregator 收集 K8s + SignOz 診斷資料
diagnosis_context = None
target = incident.affected_services[0] if incident.affected_services else None
if target:
try:
aggregator = get_diagnosis_aggregator()
diagnosis_context = await aggregator.collect_pod_diagnosis(
pod_name=target,
namespace="awoooi-prod",
include_signoz=True,
include_error_logs=True,
expert_match=expert_result,
)
logger.info(
"dual_engine_diagnosis_collected",
incident_id=incident.incident_id,
target=target,
signals_count=len(diagnosis_context.signals),
highest_severity=diagnosis_context.highest_severity.value,
)
except Exception as e:
logger.warning(
"dual_engine_diagnosis_failed",
incident_id=incident.incident_id,
error=str(e),
)
# 診斷收集失敗不影響主流程,繼續使用 expert_result
# Step 3: 準備 LLM 上下文 (含 Expert 診斷 + K8s/SignOz 診斷)
signals_dict = [s.model_dump() for s in incident.signals]
expert_context = {
"initial_diagnosis": expert_result.get("matched_rule"),
@@ -610,6 +640,11 @@ class DecisionManager:
"requires_human_review": expert_result.get("human_review_required", False),
}
# 加入診斷上下文 (如果有)
if diagnosis_context:
expert_context["diagnosis_context"] = diagnosis_context.get_llm_prompt_context()
expert_context["diagnosis_signals"] = [s.to_dict() for s in diagnosis_context.signals]
# Step 4: LLM 分析 (帶上 Expert 上下文)
try:
llm_result, provider, success = await self._openclaw.generate_incident_proposal(

View File

@@ -0,0 +1,590 @@
"""
Diagnosis Aggregator - Phase 2 診斷資料整合層
==============================================
ADR-030: 智能自動修復系統
整合多來源診斷資料:
- K8s Diagnostics: Pod Events, Logs, Resource Usage
- SignOz Metrics: Gold Metrics, Error Logs
- Expert Rules: 規則匹配與診斷建議
設計原則:
- 非同步並行收集,最大化效能
- 錯誤容忍,部分失敗不影響整體
- 提供結構化 Context 給 LLM 分析
版本: v1.0
建立: 2026-03-26 (台北時區)
"""
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
import structlog
from src.services.k8s_diagnostics import (
K8sDiagnostics,
get_k8s_diagnostics_service,
)
from src.services.signoz_client import (
GoldMetrics,
get_signoz_client,
)
logger = structlog.get_logger(__name__)
# =============================================================================
# Diagnosis Severity
# =============================================================================
class DiagnosisSeverity(str, Enum):
"""診斷嚴重程度"""
CRITICAL = "critical" # 需立即處理 (服務中斷、資料遺失風險)
HIGH = "high" # 1 小時內處理 (效能嚴重下降)
MEDIUM = "medium" # 24 小時內處理 (異常但服務可用)
LOW = "low" # 追蹤觀察 (輕微異常)
INFO = "info" # 資訊性,無需處理
# =============================================================================
# Data Models
# =============================================================================
@dataclass
class DiagnosisSignal:
"""診斷信號 (來自各資料源的發現)"""
source: str # k8s_events, k8s_logs, signoz_metrics, signoz_logs, expert_rules
signal_type: str # oom_killed, crash_loop, high_error_rate, etc.
severity: DiagnosisSeverity
message: str
evidence: dict[str, Any] = field(default_factory=dict) # 證據資料
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
def to_dict(self) -> dict[str, Any]:
return {
"source": self.source,
"signal_type": self.signal_type,
"severity": self.severity.value,
"message": self.message,
"evidence": self.evidence,
"timestamp": self.timestamp.isoformat(),
}
@dataclass
class DiagnosisContext:
"""
診斷上下文 - 整合所有來源的診斷資料
提供給 LLM 分析的完整 Context
"""
# 識別資訊
target: str # Pod name, Service name, etc.
namespace: str = "awoooi-prod"
collected_at: datetime = field(default_factory=lambda: datetime.now(UTC))
# 診斷資料
k8s_diagnostics: K8sDiagnostics | None = None
gold_metrics: GoldMetrics | None = None
error_logs: list[dict] = field(default_factory=list)
# 診斷信號 (各來源的發現)
signals: list[DiagnosisSignal] = field(default_factory=list)
# Expert System 匹配結果
expert_match: dict[str, Any] | None = None
# 收集錯誤
collection_errors: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
"""轉換為字典 (供 JSON 序列化)"""
return {
"target": self.target,
"namespace": self.namespace,
"collected_at": self.collected_at.isoformat(),
"k8s_diagnostics": self.k8s_diagnostics.to_dict() if self.k8s_diagnostics else None,
"gold_metrics": {
"rps": self.gold_metrics.rps if self.gold_metrics else None,
"error_rate": self.gold_metrics.error_rate if self.gold_metrics else None,
"p99_latency_ms": self.gold_metrics.p99_latency_ms if self.gold_metrics else None,
} if self.gold_metrics else None,
"error_logs_count": len(self.error_logs),
"signals": [s.to_dict() for s in self.signals],
"expert_match": self.expert_match,
"collection_errors": self.collection_errors,
}
@property
def highest_severity(self) -> DiagnosisSeverity:
"""取得最高嚴重程度"""
if not self.signals:
return DiagnosisSeverity.INFO
severity_order = [
DiagnosisSeverity.CRITICAL,
DiagnosisSeverity.HIGH,
DiagnosisSeverity.MEDIUM,
DiagnosisSeverity.LOW,
DiagnosisSeverity.INFO,
]
for severity in severity_order:
if any(s.severity == severity for s in self.signals):
return severity
return DiagnosisSeverity.INFO
def get_llm_prompt_context(self) -> str:
"""
生成 LLM 分析用的 Prompt Context
結構化呈現所有診斷資訊,讓 LLM 做出更好的判斷
"""
sections = []
# 1. Target Info
sections.append(f"## 診斷目標\n- Target: {self.target}\n- Namespace: {self.namespace}")
# 2. K8s Diagnostics
if self.k8s_diagnostics:
k8s_summary = self.k8s_diagnostics.get_diagnosis_summary()
sections.append(f"## K8s 診斷\n{k8s_summary}")
# 警告事件詳情
if self.k8s_diagnostics.warning_events:
events_text = "\n".join(
f"- [{e.reason}] {e.message[:150]}"
for e in self.k8s_diagnostics.warning_events[:5]
)
sections.append(f"## K8s 警告事件\n{events_text}")
# 3. Gold Metrics
if self.gold_metrics:
sections.append(f"## SignOz 黃金指標\n{self.gold_metrics.to_summary()}")
# 4. Error Logs
if self.error_logs:
log_text = "\n".join(
f"- [{log.get('severity', 'ERROR')}] {log.get('message', '')[:100]}"
for log in self.error_logs[:5]
)
sections.append(f"## 錯誤日誌 (最近 {len(self.error_logs)} 筆)\n{log_text}")
# 5. Signals
if self.signals:
signals_text = "\n".join(
f"- [{s.severity.value.upper()}] {s.source}: {s.message}"
for s in sorted(self.signals, key=lambda x: x.severity.value)
)
sections.append(f"## 診斷信號\n{signals_text}")
# 6. Expert Match
if self.expert_match:
sections.append(
f"## Expert System 匹配\n"
f"- 規則: {self.expert_match.get('rule_name', 'N/A')}\n"
f"- 說明: {self.expert_match.get('description', 'N/A')}\n"
f"- 風險: {self.expert_match.get('risk_level', 'N/A')}\n"
f"- 推理: {self.expert_match.get('reasoning', 'N/A')}"
)
return "\n\n".join(sections)
# =============================================================================
# Diagnosis Aggregator Service
# =============================================================================
class DiagnosisAggregator:
"""
診斷資料聚合器
整合 K8s、SignOz、Expert System 等多來源診斷資料
提供統一的 DiagnosisContext 供 LLM 或決策引擎使用
"""
def __init__(self):
self.k8s_service = get_k8s_diagnostics_service()
self.signoz_client = get_signoz_client()
async def collect_pod_diagnosis(
self,
pod_name: str,
namespace: str = "awoooi-prod",
include_signoz: bool = True,
include_error_logs: bool = True,
expert_match: dict | None = None,
) -> DiagnosisContext:
"""
收集 Pod 的完整診斷資料
Args:
pod_name: Pod 名稱 (支援部分匹配)
namespace: Namespace
include_signoz: 是否包含 SignOz 指標
include_error_logs: 是否包含錯誤日誌
expert_match: Expert System 匹配結果
Returns:
DiagnosisContext: 完整診斷上下文
"""
context = DiagnosisContext(
target=pod_name,
namespace=namespace,
expert_match=expert_match,
)
import asyncio
# 並行收集資料
tasks = []
# K8s Diagnostics (必收集)
tasks.append(self._collect_k8s_diagnostics(context, pod_name, namespace))
# SignOz Metrics (可選)
if include_signoz:
# 從 pod_name 推斷 service_name (去除 hash suffix)
service_name = self._pod_to_service_name(pod_name)
tasks.append(self._collect_signoz_metrics(context, service_name))
if include_error_logs:
tasks.append(self._collect_error_logs(context, service_name))
await asyncio.gather(*tasks, return_exceptions=True)
# 分析診斷資料,產生信號
self._analyze_signals(context)
logger.info(
"diagnosis_collected",
target=pod_name,
signals_count=len(context.signals),
highest_severity=context.highest_severity.value,
errors_count=len(context.collection_errors),
)
return context
async def collect_service_diagnosis(
self,
service_name: str,
namespace: str = "awoooi-prod",
expert_match: dict | None = None,
) -> DiagnosisContext:
"""
收集 Service 的診斷資料 (不含特定 Pod)
主要用於服務級別的監控告警分析
"""
context = DiagnosisContext(
target=service_name,
namespace=namespace,
expert_match=expert_match,
)
import asyncio
await asyncio.gather(
self._collect_signoz_metrics(context, service_name),
self._collect_error_logs(context, service_name),
return_exceptions=True,
)
self._analyze_signals(context)
return context
# =========================================================================
# Private Collection Methods
# =========================================================================
async def _collect_k8s_diagnostics(
self,
context: DiagnosisContext,
pod_name: str,
namespace: str,
) -> None:
"""收集 K8s 診斷資料"""
try:
diagnostics = await self.k8s_service.collect_diagnostics(
pod_name=pod_name,
namespace=namespace,
include_logs=True,
include_previous_logs=True,
log_tail_lines=100,
)
context.k8s_diagnostics = diagnostics
# 傳遞 K8s 收集錯誤
if diagnostics.errors:
context.collection_errors.extend(
[f"k8s: {e}" for e in diagnostics.errors]
)
except Exception as e:
error_msg = f"K8s diagnostics failed: {e}"
context.collection_errors.append(error_msg)
logger.warning("k8s_diagnostics_collection_failed", error=str(e))
async def _collect_signoz_metrics(
self,
context: DiagnosisContext,
service_name: str,
) -> None:
"""收集 SignOz Gold Metrics"""
try:
metrics = await self.signoz_client.get_gold_metrics(
service_name=service_name,
namespace=context.namespace,
time_window_minutes=10,
)
context.gold_metrics = metrics
except Exception as e:
error_msg = f"SignOz metrics failed: {e}"
context.collection_errors.append(error_msg)
logger.warning("signoz_metrics_collection_failed", error=str(e))
async def _collect_error_logs(
self,
context: DiagnosisContext,
service_name: str,
) -> None:
"""收集錯誤日誌"""
try:
logs = await self.signoz_client.get_logs(
service_name=service_name,
severity="ERROR,FATAL,CRITICAL",
time_window_minutes=30,
limit=20,
)
context.error_logs = logs
except Exception as e:
error_msg = f"Error logs failed: {e}"
context.collection_errors.append(error_msg)
logger.warning("error_logs_collection_failed", error=str(e))
# =========================================================================
# Signal Analysis
# =========================================================================
def _analyze_signals(self, context: DiagnosisContext) -> None:
"""分析診斷資料,產生診斷信號"""
# 1. K8s Signals
if context.k8s_diagnostics:
self._analyze_k8s_signals(context, context.k8s_diagnostics)
# 2. SignOz Metrics Signals
if context.gold_metrics:
self._analyze_metrics_signals(context, context.gold_metrics)
# 3. Error Log Signals
if context.error_logs:
self._analyze_log_signals(context, context.error_logs)
def _analyze_k8s_signals(
self,
context: DiagnosisContext,
k8s: K8sDiagnostics,
) -> None:
"""分析 K8s 診斷資料產生信號"""
# CrashLoopBackOff
if k8s.pod_status and k8s.pod_status.is_crash_loop():
context.signals.append(DiagnosisSignal(
source="k8s_status",
signal_type="crash_loop",
severity=DiagnosisSeverity.CRITICAL,
message=f"Pod {k8s.pod_name} is in CrashLoopBackOff state",
evidence={
"restart_count": k8s.pod_status.restart_count,
"container_statuses": k8s.pod_status.container_statuses,
},
))
# Image Pull Error
if k8s.pod_status and k8s.pod_status.is_image_pull_error():
context.signals.append(DiagnosisSignal(
source="k8s_status",
signal_type="image_pull_error",
severity=DiagnosisSeverity.HIGH,
message=f"Pod {k8s.pod_name} has image pull error",
evidence={
"container_statuses": k8s.pod_status.container_statuses,
},
))
# High Restart Count
if k8s.pod_status and k8s.pod_status.restart_count > 5:
context.signals.append(DiagnosisSignal(
source="k8s_status",
signal_type="high_restart_count",
severity=DiagnosisSeverity.MEDIUM,
message=f"Pod {k8s.pod_name} has high restart count: {k8s.pod_status.restart_count}",
evidence={
"restart_count": k8s.pod_status.restart_count,
},
))
# High Resource Usage
if k8s.resource_usage:
if k8s.resource_usage.is_cpu_high(threshold=80):
context.signals.append(DiagnosisSignal(
source="k8s_metrics",
signal_type="high_cpu",
severity=DiagnosisSeverity.MEDIUM,
message=f"High CPU usage: {k8s.resource_usage.cpu_percent:.1f}%",
evidence=k8s.resource_usage.to_dict(),
))
if k8s.resource_usage.is_memory_high(threshold=80):
context.signals.append(DiagnosisSignal(
source="k8s_metrics",
signal_type="high_memory",
severity=DiagnosisSeverity.HIGH,
message=f"High memory usage: {k8s.resource_usage.memory_percent:.1f}%",
evidence=k8s.resource_usage.to_dict(),
))
# Warning Events
for event in k8s.warning_events:
if event.is_recent(minutes=15):
# OOMKilled
if "oom" in event.message.lower() or "oomkilled" in event.reason.lower():
context.signals.append(DiagnosisSignal(
source="k8s_events",
signal_type="oom_killed",
severity=DiagnosisSeverity.CRITICAL,
message=f"OOMKilled detected: {event.message[:100]}",
evidence=event.to_dict(),
))
# FailedScheduling
elif "failedscheduling" in event.reason.lower():
context.signals.append(DiagnosisSignal(
source="k8s_events",
signal_type="failed_scheduling",
severity=DiagnosisSeverity.HIGH,
message=f"Failed to schedule: {event.message[:100]}",
evidence=event.to_dict(),
))
def _analyze_metrics_signals(
self,
context: DiagnosisContext,
metrics: GoldMetrics,
) -> None:
"""分析 SignOz Metrics 產生信號"""
# High Error Rate (> 5%)
if metrics.error_rate > 5:
context.signals.append(DiagnosisSignal(
source="signoz_metrics",
signal_type="high_error_rate",
severity=DiagnosisSeverity.CRITICAL if metrics.error_rate > 20 else DiagnosisSeverity.HIGH,
message=f"High error rate: {metrics.error_rate:.2f}%",
evidence={
"error_rate": metrics.error_rate,
"error_count": metrics.error_count,
"total_requests": metrics.total_requests,
},
))
# High Latency (P99 > 5s)
if metrics.p99_latency_ms > 5000:
context.signals.append(DiagnosisSignal(
source="signoz_metrics",
signal_type="high_latency",
severity=DiagnosisSeverity.MEDIUM if metrics.p99_latency_ms < 10000 else DiagnosisSeverity.HIGH,
message=f"High P99 latency: {metrics.p99_latency_ms:.0f}ms",
evidence={
"p50_ms": metrics.p50_latency_ms,
"p95_ms": metrics.p95_latency_ms,
"p99_ms": metrics.p99_latency_ms,
},
))
# Low/No Traffic
if metrics.rps < 0.01 and metrics.total_requests < 10:
context.signals.append(DiagnosisSignal(
source="signoz_metrics",
signal_type="no_traffic",
severity=DiagnosisSeverity.LOW,
message=f"Low/No traffic detected: {metrics.rps:.2f} RPS",
evidence={
"rps": metrics.rps,
"total_requests": metrics.total_requests,
},
))
def _analyze_log_signals(
self,
context: DiagnosisContext,
logs: list[dict],
) -> None:
"""分析錯誤日誌產生信號"""
if not logs:
return
# 計算各類錯誤數量
error_count = len(logs)
if error_count > 10:
# 取樣錯誤訊息
sample_messages = [log.get("message", "")[:100] for log in logs[:3]]
context.signals.append(DiagnosisSignal(
source="signoz_logs",
signal_type="frequent_errors",
severity=DiagnosisSeverity.MEDIUM if error_count < 50 else DiagnosisSeverity.HIGH,
message=f"Frequent errors detected: {error_count} errors in last 30 minutes",
evidence={
"error_count": error_count,
"sample_messages": sample_messages,
},
))
# =========================================================================
# Utilities
# =========================================================================
def _pod_to_service_name(self, pod_name: str) -> str:
"""
從 Pod 名稱推斷 Service 名稱
例如:
- awoooi-api-7f9d8b6c5d-x2k4j -> awoooi-api
- awoooi-web-5c8d7e6f4a-h3m9n -> awoooi-web
"""
# 移除 Deployment hash suffix
parts = pod_name.rsplit("-", 2)
if len(parts) >= 3:
return "-".join(parts[:-2])
return pod_name
# =============================================================================
# Singleton
# =============================================================================
_aggregator: DiagnosisAggregator | None = None
def get_diagnosis_aggregator() -> DiagnosisAggregator:
"""取得診斷聚合器 singleton"""
global _aggregator
if _aggregator is None:
_aggregator = DiagnosisAggregator()
return _aggregator

View File

@@ -0,0 +1,654 @@
"""
K8s Diagnostics Service - Phase 2 資料收集強化
==============================================
ADR-030: 智能自動修復系統
提供 K8s 診斷資料收集:
- Pod Events (kubectl get events)
- Pod Logs (kubectl logs)
- Resource Usage (kubectl top)
設計原則:
- 非同步執行,不阻塞主流程
- 錯誤容忍,單一失敗不影響整體
- 結果快取,避免重複查詢
版本: v1.0
建立: 2026-03-27 (台北時區)
"""
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
import structlog
from src.core.config import settings
logger = structlog.get_logger(__name__)
# Lazy import kubernetes_asyncio to avoid import errors when not installed
_k8s_client = None
_k8s_config_loaded = False
async def _get_k8s_client():
"""Lazy load kubernetes client"""
global _k8s_client, _k8s_config_loaded
if _k8s_client is not None:
return _k8s_client
try:
from kubernetes_asyncio import client, config
if not _k8s_config_loaded:
try:
# 優先使用 in-cluster 配置
config.load_incluster_config()
logger.info("k8s_diagnostics_incluster_config")
except config.ConfigException:
# Fallback 到 kubeconfig
await config.load_kube_config(config_file=settings.KUBECONFIG_PATH)
logger.info("k8s_diagnostics_kubeconfig", path=settings.KUBECONFIG_PATH)
_k8s_config_loaded = True
_k8s_client = client
return _k8s_client
except Exception as e:
logger.error("k8s_diagnostics_init_failed", error=str(e))
return None
# =============================================================================
# Data Models
# =============================================================================
class EventType(str, Enum):
"""K8s Event 類型"""
NORMAL = "Normal"
WARNING = "Warning"
@dataclass
class K8sEvent:
"""K8s Event 資料"""
type: EventType
reason: str
message: str
count: int
first_timestamp: datetime | None
last_timestamp: datetime | None
source_component: str
involved_object: str
def to_dict(self) -> dict[str, Any]:
return {
"type": self.type.value,
"reason": self.reason,
"message": self.message,
"count": self.count,
"first_timestamp": self.first_timestamp.isoformat() if self.first_timestamp else None,
"last_timestamp": self.last_timestamp.isoformat() if self.last_timestamp else None,
"source_component": self.source_component,
"involved_object": self.involved_object,
}
def is_warning(self) -> bool:
return self.type == EventType.WARNING
def is_recent(self, minutes: int = 30) -> bool:
"""檢查是否為最近的事件"""
if not self.last_timestamp:
return False
age = datetime.now(UTC) - self.last_timestamp
return age.total_seconds() < minutes * 60
@dataclass
class ResourceUsage:
"""資源使用量"""
cpu_millicores: int # 毫核 (1000m = 1 core)
memory_bytes: int # Bytes
cpu_limit_millicores: int | None = None
memory_limit_bytes: int | None = None
@property
def cpu_percent(self) -> float | None:
"""CPU 使用率 (相對於 limit)"""
if self.cpu_limit_millicores:
return (self.cpu_millicores / self.cpu_limit_millicores) * 100
return None
@property
def memory_percent(self) -> float | None:
"""Memory 使用率 (相對於 limit)"""
if self.memory_limit_bytes:
return (self.memory_bytes / self.memory_limit_bytes) * 100
return None
@property
def memory_mb(self) -> float:
"""Memory in MB"""
return self.memory_bytes / (1024 * 1024)
def to_dict(self) -> dict[str, Any]:
return {
"cpu_millicores": self.cpu_millicores,
"cpu_percent": round(self.cpu_percent, 1) if self.cpu_percent else None,
"memory_bytes": self.memory_bytes,
"memory_mb": round(self.memory_mb, 1),
"memory_percent": round(self.memory_percent, 1) if self.memory_percent else None,
}
def is_cpu_high(self, threshold: float = 80.0) -> bool:
"""CPU 使用率是否過高"""
return self.cpu_percent is not None and self.cpu_percent > threshold
def is_memory_high(self, threshold: float = 80.0) -> bool:
"""Memory 使用率是否過高"""
return self.memory_percent is not None and self.memory_percent > threshold
@dataclass
class PodStatus:
"""Pod 狀態詳情"""
name: str
namespace: str
phase: str # Pending, Running, Succeeded, Failed, Unknown
ready: bool
restart_count: int
container_statuses: list[dict[str, Any]] = field(default_factory=list)
conditions: list[dict[str, Any]] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"name": self.name,
"namespace": self.namespace,
"phase": self.phase,
"ready": self.ready,
"restart_count": self.restart_count,
"container_statuses": self.container_statuses,
"conditions": self.conditions,
}
def is_healthy(self) -> bool:
return self.phase == "Running" and self.ready
def is_crash_loop(self) -> bool:
"""檢查是否處於 CrashLoopBackOff"""
for cs in self.container_statuses:
waiting = cs.get("state", {}).get("waiting", {})
if waiting.get("reason") == "CrashLoopBackOff":
return True
return False
def is_image_pull_error(self) -> bool:
"""檢查是否為 Image Pull 錯誤"""
for cs in self.container_statuses:
waiting = cs.get("state", {}).get("waiting", {})
reason = waiting.get("reason", "")
if reason in ("ImagePullBackOff", "ErrImagePull", "ErrImageNeverPull"):
return True
return False
@dataclass
class K8sDiagnostics:
"""K8s 診斷資料彙總"""
pod_name: str
namespace: str
collected_at: datetime = field(default_factory=lambda: datetime.now(UTC))
# 診斷資料
pod_status: PodStatus | None = None
events: list[K8sEvent] = field(default_factory=list)
logs: str = ""
previous_logs: str = ""
resource_usage: ResourceUsage | None = None
# 錯誤記錄
errors: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"pod_name": self.pod_name,
"namespace": self.namespace,
"collected_at": self.collected_at.isoformat(),
"pod_status": self.pod_status.to_dict() if self.pod_status else None,
"events": [e.to_dict() for e in self.events],
"logs_length": len(self.logs),
"previous_logs_length": len(self.previous_logs),
"resource_usage": self.resource_usage.to_dict() if self.resource_usage else None,
"errors": self.errors,
}
@property
def warning_events(self) -> list[K8sEvent]:
"""取得警告類型的事件"""
return [e for e in self.events if e.is_warning()]
@property
def recent_events(self) -> list[K8sEvent]:
"""取得最近 30 分鐘的事件"""
return [e for e in self.events if e.is_recent(30)]
def get_diagnosis_summary(self) -> str:
"""產生診斷摘要"""
lines = []
if self.pod_status:
lines.append(f"Pod Phase: {self.pod_status.phase}")
lines.append(f"Ready: {self.pod_status.ready}")
lines.append(f"Restart Count: {self.pod_status.restart_count}")
if self.pod_status.is_crash_loop():
lines.append("WARNING: CrashLoopBackOff detected!")
if self.pod_status.is_image_pull_error():
lines.append("WARNING: Image Pull Error detected!")
if self.resource_usage:
if self.resource_usage.is_cpu_high():
lines.append(f"WARNING: High CPU usage ({self.resource_usage.cpu_percent:.1f}%)")
if self.resource_usage.is_memory_high():
lines.append(f"WARNING: High Memory usage ({self.resource_usage.memory_percent:.1f}%)")
warning_count = len(self.warning_events)
if warning_count > 0:
lines.append(f"Warning Events: {warning_count}")
for e in self.warning_events[:3]: # 最多顯示 3 個
lines.append(f" - {e.reason}: {e.message[:100]}")
if self.errors:
lines.append(f"Collection Errors: {len(self.errors)}")
return "\n".join(lines) if lines else "No issues detected"
# =============================================================================
# K8s Diagnostics Service
# =============================================================================
class K8sDiagnosticsService:
"""
K8s 診斷資料收集服務
功能:
- 取得 Pod Events
- 取得 Pod Logs (current + previous)
- 取得 Resource Usage (via metrics-server)
- 取得 Pod Status
設計:
- 非同步並行收集
- 單一失敗不影響整體
- 結果包含錯誤資訊
"""
def __init__(self, default_namespace: str = "awoooi-prod"):
self.default_namespace = default_namespace
async def collect_diagnostics(
self,
pod_name: str,
namespace: str | None = None,
include_logs: bool = True,
include_previous_logs: bool = True,
log_tail_lines: int = 100,
) -> K8sDiagnostics:
"""
收集完整的 K8s 診斷資料
Args:
pod_name: Pod 名稱 (可以是部分名稱,會自動匹配)
namespace: Namespace (預設 awoooi-prod)
include_logs: 是否包含日誌
include_previous_logs: 是否包含上一次容器的日誌
log_tail_lines: 日誌行數
Returns:
K8sDiagnostics 包含所有收集到的資料
"""
ns = namespace or self.default_namespace
diagnostics = K8sDiagnostics(pod_name=pod_name, namespace=ns)
client = await _get_k8s_client()
if not client:
diagnostics.errors.append("K8s client initialization failed")
return diagnostics
# 先找到實際的 Pod 名稱 (支援部分匹配)
actual_pod_name = await self._find_pod(client, pod_name, ns)
if not actual_pod_name:
diagnostics.errors.append(f"Pod not found: {pod_name}")
return diagnostics
diagnostics.pod_name = actual_pod_name
# 並行收集所有資料
import asyncio
tasks = [
self._get_pod_status(client, actual_pod_name, ns),
self._get_pod_events(client, actual_pod_name, ns),
]
if include_logs:
tasks.append(self._get_pod_logs(client, actual_pod_name, ns, log_tail_lines, previous=False))
if include_previous_logs:
tasks.append(self._get_pod_logs(client, actual_pod_name, ns, log_tail_lines, previous=True))
# 資源使用量需要 metrics-server
tasks.append(self._get_resource_usage(client, actual_pod_name, ns))
results = await asyncio.gather(*tasks, return_exceptions=True)
# 處理結果
idx = 0
# Pod Status
if isinstance(results[idx], Exception):
diagnostics.errors.append(f"Pod status: {results[idx]}")
else:
diagnostics.pod_status = results[idx]
idx += 1
# Events
if isinstance(results[idx], Exception):
diagnostics.errors.append(f"Events: {results[idx]}")
else:
diagnostics.events = results[idx] or []
idx += 1
# Logs
if include_logs:
if isinstance(results[idx], Exception):
diagnostics.errors.append(f"Logs: {results[idx]}")
else:
diagnostics.logs = results[idx] or ""
idx += 1
if include_previous_logs:
if isinstance(results[idx], Exception):
# Previous logs 失敗很常見 (沒有 previous container)
pass
else:
diagnostics.previous_logs = results[idx] or ""
idx += 1
# Resource Usage
if isinstance(results[idx], Exception):
diagnostics.errors.append(f"Resource usage: {results[idx]}")
else:
diagnostics.resource_usage = results[idx]
logger.info(
"k8s_diagnostics_collected",
pod_name=actual_pod_name,
namespace=ns,
has_status=diagnostics.pod_status is not None,
events_count=len(diagnostics.events),
logs_length=len(diagnostics.logs),
has_resource_usage=diagnostics.resource_usage is not None,
errors_count=len(diagnostics.errors),
)
return diagnostics
async def _find_pod(
self,
client,
pod_name: str,
namespace: str,
) -> str | None:
"""找到實際的 Pod 名稱 (支援部分匹配)"""
try:
v1 = client.CoreV1Api()
# 先嘗試精確匹配
try:
await v1.read_namespaced_pod(name=pod_name, namespace=namespace)
return pod_name
except client.exceptions.ApiException as e:
if e.status != 404:
raise
# 部分匹配 (用於 Deployment Pod 名稱)
pods = await v1.list_namespaced_pod(namespace=namespace)
for pod in pods.items:
if pod_name in pod.metadata.name:
return pod.metadata.name
return None
except Exception as e:
logger.warning("k8s_find_pod_failed", pod_name=pod_name, error=str(e))
return None
async def _get_pod_status(
self,
client,
pod_name: str,
namespace: str,
) -> PodStatus | None:
"""取得 Pod 狀態"""
try:
v1 = client.CoreV1Api()
pod = await v1.read_namespaced_pod(name=pod_name, namespace=namespace)
# 計算 restart count
restart_count = 0
container_statuses = []
if pod.status.container_statuses:
for cs in pod.status.container_statuses:
restart_count += cs.restart_count or 0
container_statuses.append({
"name": cs.name,
"ready": cs.ready,
"restart_count": cs.restart_count,
"state": {
"running": cs.state.running is not None,
"waiting": {
"reason": cs.state.waiting.reason if cs.state.waiting else None,
"message": cs.state.waiting.message if cs.state.waiting else None,
} if cs.state.waiting else None,
"terminated": {
"reason": cs.state.terminated.reason if cs.state.terminated else None,
"exit_code": cs.state.terminated.exit_code if cs.state.terminated else None,
} if cs.state.terminated else None,
},
})
# Ready 條件
ready = False
conditions = []
if pod.status.conditions:
for c in pod.status.conditions:
conditions.append({
"type": c.type,
"status": c.status,
"reason": c.reason,
"message": c.message,
})
if c.type == "Ready" and c.status == "True":
ready = True
return PodStatus(
name=pod_name,
namespace=namespace,
phase=pod.status.phase,
ready=ready,
restart_count=restart_count,
container_statuses=container_statuses,
conditions=conditions,
)
except Exception as e:
logger.warning("k8s_get_pod_status_failed", pod_name=pod_name, error=str(e))
raise
async def _get_pod_events(
self,
client,
pod_name: str,
namespace: str,
limit: int = 20,
) -> list[K8sEvent]:
"""取得 Pod 相關 Events"""
try:
v1 = client.CoreV1Api()
# 取得該 namespace 的所有 events然後過濾
field_selector = f"involvedObject.name={pod_name}"
events = await v1.list_namespaced_event(
namespace=namespace,
field_selector=field_selector,
limit=limit,
)
result = []
for e in events.items:
result.append(K8sEvent(
type=EventType(e.type) if e.type else EventType.NORMAL,
reason=e.reason or "",
message=e.message or "",
count=e.count or 1,
first_timestamp=e.first_timestamp.replace(tzinfo=UTC) if e.first_timestamp else None,
last_timestamp=e.last_timestamp.replace(tzinfo=UTC) if e.last_timestamp else None,
source_component=e.source.component if e.source else "",
involved_object=f"{e.involved_object.kind}/{e.involved_object.name}" if e.involved_object else "",
))
# 按最後時間排序
result.sort(key=lambda x: x.last_timestamp or datetime.min.replace(tzinfo=UTC), reverse=True)
return result
except Exception as e:
logger.warning("k8s_get_pod_events_failed", pod_name=pod_name, error=str(e))
raise
async def _get_pod_logs(
self,
client,
pod_name: str,
namespace: str,
tail_lines: int = 100,
previous: bool = False,
) -> str:
"""取得 Pod 日誌"""
try:
v1 = client.CoreV1Api()
logs = await v1.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
tail_lines=tail_lines,
previous=previous,
)
return logs or ""
except Exception as e:
# Previous logs 失敗很常見
if not previous:
logger.warning("k8s_get_pod_logs_failed", pod_name=pod_name, error=str(e))
raise
async def _get_resource_usage(
self,
client,
pod_name: str,
namespace: str,
) -> ResourceUsage | None:
"""取得資源使用量 (需要 metrics-server)"""
try:
# 使用 CustomObjectsApi 查詢 metrics
custom_api = client.CustomObjectsApi()
metrics = await custom_api.get_namespaced_custom_object(
group="metrics.k8s.io",
version="v1beta1",
namespace=namespace,
plural="pods",
name=pod_name,
)
# 解析 metrics
total_cpu = 0
total_memory = 0
for container in metrics.get("containers", []):
usage = container.get("usage", {})
cpu = usage.get("cpu", "0")
memory = usage.get("memory", "0")
# 解析 CPU (可能是 "100m" 或 "1")
if cpu.endswith("n"):
total_cpu += int(cpu[:-1]) // 1000000 # nano to milli
elif cpu.endswith("m"):
total_cpu += int(cpu[:-1])
else:
total_cpu += int(float(cpu) * 1000)
# 解析 Memory (可能是 "100Mi", "1Gi", "1000000Ki")
if memory.endswith("Ki"):
total_memory += int(memory[:-2]) * 1024
elif memory.endswith("Mi"):
total_memory += int(memory[:-2]) * 1024 * 1024
elif memory.endswith("Gi"):
total_memory += int(memory[:-2]) * 1024 * 1024 * 1024
else:
total_memory += int(memory)
# 取得 limits (from pod spec)
v1 = client.CoreV1Api()
pod = await v1.read_namespaced_pod(name=pod_name, namespace=namespace)
cpu_limit = None
memory_limit = None
for container in pod.spec.containers:
if container.resources and container.resources.limits:
limits = container.resources.limits
if "cpu" in limits:
cpu_str = limits["cpu"]
if cpu_str.endswith("m"):
cpu_limit = (cpu_limit or 0) + int(cpu_str[:-1])
else:
cpu_limit = (cpu_limit or 0) + int(float(cpu_str) * 1000)
if "memory" in limits:
mem_str = limits["memory"]
if mem_str.endswith("Mi"):
memory_limit = (memory_limit or 0) + int(mem_str[:-2]) * 1024 * 1024
elif mem_str.endswith("Gi"):
memory_limit = (memory_limit or 0) + int(mem_str[:-2]) * 1024 * 1024 * 1024
return ResourceUsage(
cpu_millicores=total_cpu,
memory_bytes=total_memory,
cpu_limit_millicores=cpu_limit,
memory_limit_bytes=memory_limit,
)
except Exception as e:
logger.warning("k8s_get_resource_usage_failed", pod_name=pod_name, error=str(e))
raise
# =============================================================================
# Singleton
# =============================================================================
_diagnostics_service: K8sDiagnosticsService | None = None
def get_k8s_diagnostics_service() -> K8sDiagnosticsService:
"""取得 K8s 診斷服務 singleton"""
global _diagnostics_service
if _diagnostics_service is None:
_diagnostics_service = K8sDiagnosticsService()
return _diagnostics_service

View File

@@ -1119,10 +1119,22 @@ Trace URL: {signoz_trace_url}
"""
# 2026-03-27: 整合 Expert System 診斷上下文
# 2026-03-26: ADR-030 Phase 2 - 加入 K8s/SignOz 診斷上下文
expert_diagnosis_context = ""
if expert_context:
diagnosis_cmds = expert_context.get("suggested_diagnosis_commands", [])
diagnosis_cmds_str = "\n".join([f" - `{cmd}`" for cmd in diagnosis_cmds]) if diagnosis_cmds else " - (無)"
# ADR-030: 加入完整診斷上下文 (如果有)
full_diagnosis = expert_context.get("diagnosis_context", "")
diagnosis_signals = expert_context.get("diagnosis_signals", [])
signals_summary = ""
if diagnosis_signals:
signals_summary = "\n".join([
f" - [{s.get('severity', 'info').upper()}] {s.get('source', 'unknown')}: {s.get('message', 'N/A')[:100]}"
for s in diagnosis_signals[:5]
])
expert_diagnosis_context = f"""
## 🔍 Expert System Initial Diagnosis
- **Matched Rule**: {expert_context.get('initial_diagnosis', 'unknown')}
@@ -1132,8 +1144,15 @@ Trace URL: {signoz_trace_url}
- **Suggested Diagnosis Commands**:
{diagnosis_cmds_str}
**IMPORTANT**: The Expert System has provided an initial diagnosis.
Consider this context but apply your own analysis. If Expert says "human review required",
{f'''## 🩺 K8s/SignOz Deep Diagnosis (ADR-030)
{full_diagnosis}
### Diagnosis Signals
{signals_summary if signals_summary else " - (No signals detected)"}
''' if full_diagnosis else ''}
**IMPORTANT**: The Expert System and Diagnostic Aggregator have provided context.
Consider this data but apply your own analysis. If Expert says "human review required",
provide diagnostic guidance rather than automated fixes.
"""