""" Security Agent - 安全風險評估專家 ================================= 職責: - 分析提案的安全風險 - 檢查權限邊界 - 評估潛在漏洞 - 回傳風險評分 (0-10) 符合 ADR-009 SecurityAgent 規範 """ import time from dataclasses import dataclass, field from typing import Any import structlog from src.agents.base import AgentResult, AgentStatus, BaseAgent logger = structlog.get_logger(__name__) # ============================================================================= # Security Result # ============================================================================= @dataclass class SecurityResult(AgentResult): """ SecurityAgent 分析結果 額外欄位: - risk_score: 風險評分 (0-10, 10 最高風險) - risk_factors: 風險因素列表 - permission_issues: 權限問題 - recommendations: 安全建議 """ risk_score: float = 0.0 risk_factors: list[str] = field(default_factory=list) permission_issues: list[str] = field(default_factory=list) recommendations: list[str] = field(default_factory=list) def to_dict(self) -> dict[str, Any]: """轉換為 dict""" base = super().to_dict() base.update({ "risk_score": self.risk_score, "risk_factors": self.risk_factors, "permission_issues": self.permission_issues, "recommendations": self.recommendations, }) return base # ============================================================================= # Security Agent # ============================================================================= # 安全規則引擎 (本地快速檢查) SECURITY_RULES: dict[str, dict[str, Any]] = { "delete_operation": { "patterns": ["delete", "rm", "remove", "destroy", "drop"], "risk_score": 8.0, "factor": "破壞性操作: 涉及刪除資源", "recommendation": "確保有備份,並考慮使用 --dry-run 先行測試", }, "force_operation": { "patterns": ["--force", "-f", "--no-wait", "--grace-period=0"], "risk_score": 7.0, "factor": "強制操作: 跳過安全確認", "recommendation": "移除 --force 參數,使用標準流程", }, "privileged_namespace": { "patterns": ["kube-system", "kube-public", "default"], "risk_score": 9.0, "factor": "敏感命名空間: 操作影響 K8s 核心組件", "recommendation": "確認是否真的需要操作系統命名空間", }, "secret_operation": { "patterns": ["secret", "configmap", "credential", "password", "token"], "risk_score": 8.5, "factor": "敏感資料: 操作涉及機密資訊", "recommendation": "確保日誌不會記錄機密內容", }, "network_policy": { "patterns": ["networkpolicy", "ingress", "egress", "firewall"], "risk_score": 7.5, "factor": "網路變更: 可能影響服務連通性", "recommendation": "變更前確認流量影響範圍", }, "rbac_operation": { "patterns": ["role", "rolebinding", "clusterrole", "serviceaccount"], "risk_score": 9.0, "factor": "權限變更: 操作涉及 RBAC 設定", "recommendation": "最小權限原則,避免過度授權", }, "scale_to_zero": { "patterns": ["replicas=0", "replicas 0", "scale --replicas=0"], "risk_score": 8.0, "factor": "服務中斷: 副本數設為 0", "recommendation": "確認是否為計畫性維護", }, "rollback": { "patterns": ["rollout undo", "rollback"], "risk_score": 5.0, "factor": "回滾操作: 相對安全但需確認目標版本", "recommendation": "確認回滾目標版本是穩定的", }, "restart": { "patterns": ["rollout restart", "restart"], "risk_score": 3.0, "factor": "重啟操作: 低風險但可能造成短暫中斷", "recommendation": "確認服務有足夠副本處理滾動重啟", }, } class SecurityAgent(BaseAgent[SecurityResult]): """ 安全風險評估專家 Agent 分析流程: 1. 本地規則引擎快速掃描 (毫秒級) 2. LLM 深度分析 (可選,複雜場景) 3. 綜合評分 使用方式: ```python agent = SecurityAgent() result = await agent.analyze({ "action": "kubectl delete pod nginx-xxx", "namespace": "awoooi-prod", "affected_services": ["nginx", "frontend"], }) print(result.risk_score) # 0-10 ``` """ AGENT_NAME = "security-expert" AGENT_DESCRIPTION = "資安專家,評估安全風險與權限影響" AGENT_TOOLS = ["Read", "Grep"] # 只讀權限 def __init__(self, timeout_sec: float = 30.0, use_llm: bool = False): """ 初始化 SecurityAgent Args: timeout_sec: 執行超時時間 use_llm: 是否啟用 LLM 深度分析 (Phase 9.4 擴展) """ super().__init__(timeout_sec) self.use_llm = use_llm async def analyze(self, context: dict[str, Any]) -> SecurityResult: """ 執行安全風險分析 Args: context: 分析上下文 - action: 要執行的指令 - namespace: 目標命名空間 - affected_services: 受影響服務列表 - incident_id: 事件 ID (可選) Returns: SecurityResult 包含風險評分和詳細分析 """ start_time = time.time() self.logger.info( "security_analysis_start", action=context.get("action", "")[:100], namespace=context.get("namespace"), ) try: # Phase 1: 本地規則引擎 (同步、快速) rule_result = self._rule_engine_analyze(context) # Phase 2: LLM 深度分析 (可選,未來擴展) if self.use_llm and rule_result["risk_score"] >= 7.0: # 高風險場景啟用 LLM 二次確認 # TODO: Phase 9.4 實作 LLM 分析 pass latency_ms = int((time.time() - start_time) * 1000) result = SecurityResult( agent_name=self.AGENT_NAME, status=AgentStatus.SUCCESS, confidence=rule_result["confidence"], analysis=rule_result["analysis"], latency_ms=latency_ms, risk_score=rule_result["risk_score"], risk_factors=rule_result["risk_factors"], permission_issues=rule_result["permission_issues"], recommendations=rule_result["recommendations"], raw_response=rule_result, ) self.logger.info( "security_analysis_complete", risk_score=result.risk_score, latency_ms=latency_ms, ) return result except Exception as e: latency_ms = int((time.time() - start_time) * 1000) self.logger.exception( "security_analysis_error", error=str(e), ) return SecurityResult( agent_name=self.AGENT_NAME, status=AgentStatus.FAILED, confidence=0.0, analysis=f"分析失敗: {str(e)}", latency_ms=latency_ms, error=str(e), risk_score=10.0, # 失敗時預設最高風險 risk_factors=["分析過程發生錯誤"], recommendations=["請人工審核此操作"], ) def _rule_engine_analyze(self, context: dict[str, Any]) -> dict[str, Any]: """ 本地規則引擎分析 快速檢查常見安全模式,毫秒級回應 """ action = context.get("action", "").lower() namespace = context.get("namespace", "").lower() affected_services = context.get("affected_services", []) risk_factors: list[str] = [] recommendations: list[str] = [] permission_issues: list[str] = [] max_risk_score: float = 0.0 # 掃描所有安全規則 for rule_name, rule in SECURITY_RULES.items(): patterns = rule["patterns"] # 檢查 action if any(pattern in action for pattern in patterns): risk_factors.append(rule["factor"]) recommendations.append(rule["recommendation"]) max_risk_score = max(max_risk_score, rule["risk_score"]) # 檢查 namespace if rule_name == "privileged_namespace": if any(pattern in namespace for pattern in patterns): risk_factors.append(rule["factor"]) recommendations.append(rule["recommendation"]) max_risk_score = max(max_risk_score, rule["risk_score"]) # 檢查受影響服務數量 if len(affected_services) > 5: risk_factors.append(f"大範圍影響: 涉及 {len(affected_services)} 個服務") max_risk_score = max(max_risk_score, 6.0) recommendations.append("考慮分批執行,降低爆炸半徑") # 檢查是否涉及生產環境 if "prod" in namespace: if max_risk_score < 5.0: max_risk_score = 5.0 # 生產環境最低風險 5 permission_issues.append("操作目標為生產環境") # 如果沒有匹配任何規則,給予基礎評分 if not risk_factors: risk_factors.append("未偵測到明顯風險因素") max_risk_score = 2.0 # 基礎低風險 # 🔴 規則匹配,非 AI 分析,信心度設 0 confidence = 0.0 # 生成分析摘要 if max_risk_score >= 8.0: analysis = f"高風險操作 (Score: {max_risk_score}/10): 建議人工審核" elif max_risk_score >= 5.0: analysis = f"中等風險 (Score: {max_risk_score}/10): 確認影響範圍後執行" else: analysis = f"低風險操作 (Score: {max_risk_score}/10): 可安全執行" return { "risk_score": max_risk_score, "risk_factors": risk_factors, "recommendations": list(set(recommendations)), # 去重 "permission_issues": permission_issues, "confidence": confidence, "analysis": analysis, "rules_matched": len(risk_factors), } def _build_prompt(self, context: dict[str, Any]) -> str: """建構 LLM Prompt (Phase 9.4 擴展)""" return f"""你是 AWOOOI 的資安專家。 分析以下操作的安全風險: 操作指令: {context.get("action", "N/A")} 目標命名空間: {context.get("namespace", "N/A")} 受影響服務: {", ".join(context.get("affected_services", []))} 評估: 1. 是否涉及敏感資料 2. 是否可能被利用 3. 權限邊界是否被突破 輸出 JSON: ```json {{ "risk_score": 0-10, "risk_factors": ["...", "..."], "permission_issues": ["...", "..."], "recommendations": ["...", "..."], "analysis": "一句話摘要", "confidence": 0-1 }} ```""" def _parse_response(self, response: str) -> dict[str, Any]: """解析 LLM 回應""" return self._extract_json(response)