""" Blast Radius Agent - 影響範圍分析專家 ====================================== 職責: - 評估操作的影響範圍 - 識別受影響的服務和依賴 - 估計使用者影響人數 - 回傳影響等級 (low/medium/high/critical) 符合 ADR-009 BlastRadiusAgent 規範 """ import time from dataclasses import dataclass, field from enum import Enum from typing import Any import structlog from src.agents.base import AgentResult, AgentStatus, BaseAgent logger = structlog.get_logger(__name__) # ============================================================================= # Blast Radius Types # ============================================================================= class ImpactLevel(str, Enum): """影響等級""" LOW = "low" # 單一服務,<100 用戶 MEDIUM = "medium" # 2-5 服務,100-1000 用戶 HIGH = "high" # 5-10 服務,1000-10000 用戶 CRITICAL = "critical" # >10 服務,>10000 用戶或核心服務 @dataclass class AffectedService: """受影響服務""" name: str impact_type: str # direct, indirect, transitive confidence: float reason: str def to_dict(self) -> dict[str, Any]: return { "name": self.name, "impact_type": self.impact_type, "confidence": self.confidence, "reason": self.reason, } @dataclass class BlastRadiusResult(AgentResult): """ BlastRadiusAgent 分析結果 額外欄位: - impact_level: 影響等級 (low/medium/high/critical) - affected_services: 受影響服務列表 - estimated_users: 估計影響用戶數 - dependency_chain: 依賴鏈 - recovery_time_estimate: 預估恢復時間 (分鐘) """ impact_level: ImpactLevel = ImpactLevel.LOW affected_services: list[AffectedService] = field(default_factory=list) estimated_users: int = 0 dependency_chain: list[str] = field(default_factory=list) recovery_time_estimate: int = 0 def to_dict(self) -> dict[str, Any]: """轉換為 dict""" base = super().to_dict() base.update({ "impact_level": self.impact_level.value, "affected_services": [s.to_dict() for s in self.affected_services], "estimated_users": self.estimated_users, "dependency_chain": self.dependency_chain, "recovery_time_estimate": self.recovery_time_estimate, }) return base # ============================================================================= # Service Dependency Graph (簡化版) # ============================================================================= # AWOOOI 服務依賴圖 (簡化版,實際應從 GraphRAG 讀取) SERVICE_DEPENDENCIES: dict[str, dict[str, Any]] = { # === Core Services === "api": { "dependencies": ["postgres", "redis", "openclaw"], "dependents": ["web", "telegram-gateway"], "criticality": "critical", "estimated_users": 5000, }, "web": { "dependencies": ["api"], "dependents": [], "criticality": "high", "estimated_users": 3000, }, "openclaw": { "dependencies": ["redis", "ollama"], "dependents": ["api"], "criticality": "critical", "estimated_users": 5000, }, # === Infrastructure === "postgres": { "dependencies": [], "dependents": ["api", "signoz"], "criticality": "critical", "estimated_users": 10000, }, "redis": { "dependencies": [], "dependents": ["api", "openclaw", "signal-worker"], "criticality": "critical", "estimated_users": 8000, }, "ollama": { "dependencies": [], "dependents": ["openclaw"], "criticality": "high", "estimated_users": 2000, }, # === Workers === "signal-worker": { "dependencies": ["redis", "api"], "dependents": [], "criticality": "medium", "estimated_users": 500, }, "telegram-gateway": { "dependencies": ["api"], "dependents": [], "criticality": "medium", "estimated_users": 1000, }, # === Observability === "signoz": { "dependencies": ["postgres"], "dependents": [], "criticality": "low", "estimated_users": 100, }, "prometheus": { "dependencies": [], "dependents": [], "criticality": "low", "estimated_users": 50, }, } class BlastRadiusAgent(BaseAgent[BlastRadiusResult]): """ 影響範圍分析專家 Agent 分析流程: 1. 識別直接影響的服務 2. 遍歷依賴圖找出間接影響 3. 計算總影響用戶數 4. 判定影響等級 使用方式: ```python agent = BlastRadiusAgent() result = await agent.analyze({ "target_service": "api", "action": "kubectl rollout restart", "namespace": "awoooi-prod", }) print(result.impact_level) # ImpactLevel.CRITICAL ``` """ AGENT_NAME = "blast-radius" AGENT_DESCRIPTION = "影響範圍分析師,評估相依服務與影響範圍" AGENT_TOOLS = ["Read", "Glob", "Grep"] def __init__( self, timeout_sec: float = 30.0, dependency_graph: dict[str, dict[str, Any]] | None = None, ): """ 初始化 BlastRadiusAgent Args: timeout_sec: 執行超時時間 dependency_graph: 自訂依賴圖 (測試用) """ super().__init__(timeout_sec) self.dependency_graph = dependency_graph or SERVICE_DEPENDENCIES async def analyze(self, context: dict[str, Any]) -> BlastRadiusResult: """ 執行影響範圍分析 Args: context: 分析上下文 - target_service: 目標服務 (可以是列表) - action: 執行的操作 - namespace: 命名空間 Returns: BlastRadiusResult 包含影響等級和詳細分析 """ start_time = time.time() self.logger.info( "blast_radius_analysis_start", target=context.get("target_service"), action=context.get("action", "")[:50], ) try: # 取得目標服務列表 target_services = context.get("target_service", []) if isinstance(target_services, str): target_services = [target_services] # 分析每個目標服務的影響 all_affected: list[AffectedService] = [] total_users = 0 dependency_chain: list[str] = [] for target in target_services: affected, users, chain = self._analyze_service_impact(target) all_affected.extend(affected) total_users = max(total_users, users) # 取最大值避免重複計算 dependency_chain.extend(chain) # 去重 seen_services = set() unique_affected: list[AffectedService] = [] for svc in all_affected: if svc.name not in seen_services: seen_services.add(svc.name) unique_affected.append(svc) # 判定影響等級 impact_level = self._calculate_impact_level( len(unique_affected), total_users, unique_affected, ) # 估計恢復時間 recovery_time = self._estimate_recovery_time(impact_level, len(unique_affected)) latency_ms = int((time.time() - start_time) * 1000) # 生成分析摘要 analysis = self._generate_analysis( impact_level, len(unique_affected), total_users, ) result = BlastRadiusResult( agent_name=self.AGENT_NAME, status=AgentStatus.SUCCESS, confidence=0.85, # 基於依賴圖的信心分數 analysis=analysis, latency_ms=latency_ms, impact_level=impact_level, affected_services=unique_affected, estimated_users=total_users, dependency_chain=list(set(dependency_chain)), recovery_time_estimate=recovery_time, ) self.logger.info( "blast_radius_analysis_complete", impact_level=impact_level.value, affected_count=len(unique_affected), estimated_users=total_users, latency_ms=latency_ms, ) return result except Exception as e: latency_ms = int((time.time() - start_time) * 1000) self.logger.exception( "blast_radius_analysis_error", error=str(e), ) return BlastRadiusResult( agent_name=self.AGENT_NAME, status=AgentStatus.FAILED, confidence=0.0, analysis=f"分析失敗: {str(e)}", latency_ms=latency_ms, error=str(e), impact_level=ImpactLevel.CRITICAL, # 失敗時假設最大影響 ) def _analyze_service_impact( self, target_service: str, ) -> tuple[list[AffectedService], int, list[str]]: """ 分析單一服務的影響 Returns: (受影響服務列表, 估計用戶數, 依賴鏈) """ affected: list[AffectedService] = [] visited: set[str] = set() dependency_chain: list[str] = [] total_users = 0 # 標準化服務名稱 target_key = self._normalize_service_name(target_service) if target_key not in self.dependency_graph: # 未知服務,假設中等影響 affected.append(AffectedService( name=target_service, impact_type="direct", confidence=0.5, reason="未知服務,無法確定依賴關係", )) return affected, 1000, [target_service] # 1. 直接影響 (目標服務本身) target_info = self.dependency_graph[target_key] affected.append(AffectedService( name=target_key, impact_type="direct", confidence=1.0, reason="目標服務", )) total_users += target_info.get("estimated_users", 0) dependency_chain.append(target_key) visited.add(target_key) # 2. 依賴此服務的上游 (dependents) self._find_dependents( target_key, affected, visited, dependency_chain, depth=0, max_depth=3, ) # 計算總用戶數 for svc in affected: if svc.name in self.dependency_graph: total_users += self.dependency_graph[svc.name].get("estimated_users", 0) return affected, total_users, dependency_chain def _find_dependents( self, service: str, affected: list[AffectedService], visited: set[str], chain: list[str], depth: int, max_depth: int, ) -> None: """遞迴查找依賴此服務的上游""" if depth >= max_depth: return if service not in self.dependency_graph: return dependents = self.dependency_graph[service].get("dependents", []) for dep in dependents: if dep in visited: continue visited.add(dep) chain.append(dep) impact_type = "indirect" if depth == 0 else "transitive" confidence = 0.9 - (depth * 0.1) affected.append(AffectedService( name=dep, impact_type=impact_type, confidence=confidence, reason=f"依賴 {service}", )) # 遞迴查找 self._find_dependents( dep, affected, visited, chain, depth + 1, max_depth, ) def _normalize_service_name(self, service: str) -> str: """標準化服務名稱""" # 移除常見後綴 service = service.lower() for suffix in ["-deployment", "-svc", "-service", "-pod"]: if service.endswith(suffix): service = service[: -len(suffix)] # 處理常見別名 aliases = { "awoooi-api": "api", "awoooi-web": "web", "nginx": "web", "frontend": "web", "backend": "api", "database": "postgres", "db": "postgres", "cache": "redis", } return aliases.get(service, service) def _calculate_impact_level( self, service_count: int, user_count: int, affected: list[AffectedService], ) -> ImpactLevel: """計算影響等級""" # 檢查是否有 critical 服務 has_critical = any( svc.name in self.dependency_graph and self.dependency_graph[svc.name].get("criticality") == "critical" for svc in affected ) if has_critical or service_count > 10 or user_count > 10000: return ImpactLevel.CRITICAL if service_count > 5 or user_count > 1000: return ImpactLevel.HIGH if service_count > 2 or user_count > 100: return ImpactLevel.MEDIUM return ImpactLevel.LOW def _estimate_recovery_time( self, impact_level: ImpactLevel, service_count: int, ) -> int: """估計恢復時間 (分鐘)""" base_time = { ImpactLevel.LOW: 5, ImpactLevel.MEDIUM: 15, ImpactLevel.HIGH: 30, ImpactLevel.CRITICAL: 60, } # 每多一個服務增加 5 分鐘 return base_time[impact_level] + (service_count * 5) def _generate_analysis( self, impact_level: ImpactLevel, service_count: int, user_count: int, ) -> str: """生成分析摘要""" level_desc = { ImpactLevel.LOW: "低影響", ImpactLevel.MEDIUM: "中等影響", ImpactLevel.HIGH: "高影響", ImpactLevel.CRITICAL: "嚴重影響", } return ( f"{level_desc[impact_level]}: " f"影響 {service_count} 個服務,預估 {user_count:,} 用戶受影響" ) def _build_prompt(self, context: dict[str, Any]) -> str: """建構 LLM Prompt (Phase 9.4 擴展)""" return f"""你是 AWOOOI 的影響範圍分析師。 分析以下操作的影響範圍: 目標服務: {context.get("target_service", "N/A")} 操作: {context.get("action", "N/A")} 命名空間: {context.get("namespace", "N/A")} 評估: 1. 直接影響的服務 2. 間接相依的服務 3. 使用者影響人數估計 輸出 JSON: ```json {{ "impact_level": "low|medium|high|critical", "affected_services": [ {{"name": "...", "impact_type": "direct|indirect", "reason": "..."}} ], "estimated_users": 0, "dependency_chain": ["service1", "service2"], "analysis": "一句話摘要", "confidence": 0-1 }} ```""" def _parse_response(self, response: str) -> dict[str, Any]: """解析 LLM 回應""" return self._extract_json(response)