🔴 違規修正: 規則匹配/Expert System 不是 AI 分析,confidence 必須 = 0.0 修正檔案: - agents/action_planner.py: 0.9 → 0.0 - agents/blast_radius.py: 0.85/0.5/0.9 → 0.0 - agents/security.py: 計算公式 → 0.0 - signoz_webhook.py: 0.7 → 0.0 - auto_approve.py: default 0.5 → 0.0 - ci_auto_repair.py: 整個計算函數 → return 0.0 - error_analyzer_service.py: default 0.5 → 0.0 - intent_classifier.py: 計算公式 → 0.0 - openclaw.py: default 0.5 → 0.0 - resource_resolver.py: 0.8 → 0.0 - k8s_naming.py: 0.9/0.7 → 0.0 只有 LLM 真實分析返回的 confidence 才能 > 0 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
332 lines
11 KiB
Python
332 lines
11 KiB
Python
"""
|
|
Security Agent - 安全風險評估專家
|
|
=================================
|
|
|
|
職責:
|
|
- 分析提案的安全風險
|
|
- 檢查權限邊界
|
|
- 評估潛在漏洞
|
|
- 回傳風險評分 (0-10)
|
|
|
|
符合 ADR-009 SecurityAgent 規範
|
|
"""
|
|
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
import structlog
|
|
|
|
from src.agents.base import AgentResult, AgentStatus, BaseAgent
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# Security Result
|
|
# =============================================================================
|
|
|
|
|
|
@dataclass
|
|
class SecurityResult(AgentResult):
|
|
"""
|
|
SecurityAgent 分析結果
|
|
|
|
額外欄位:
|
|
- risk_score: 風險評分 (0-10, 10 最高風險)
|
|
- risk_factors: 風險因素列表
|
|
- permission_issues: 權限問題
|
|
- recommendations: 安全建議
|
|
"""
|
|
risk_score: float = 0.0
|
|
risk_factors: list[str] = field(default_factory=list)
|
|
permission_issues: list[str] = field(default_factory=list)
|
|
recommendations: list[str] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""轉換為 dict"""
|
|
base = super().to_dict()
|
|
base.update({
|
|
"risk_score": self.risk_score,
|
|
"risk_factors": self.risk_factors,
|
|
"permission_issues": self.permission_issues,
|
|
"recommendations": self.recommendations,
|
|
})
|
|
return base
|
|
|
|
|
|
# =============================================================================
|
|
# Security Agent
|
|
# =============================================================================
|
|
|
|
|
|
# 安全規則引擎 (本地快速檢查)
|
|
SECURITY_RULES: dict[str, dict[str, Any]] = {
|
|
"delete_operation": {
|
|
"patterns": ["delete", "rm", "remove", "destroy", "drop"],
|
|
"risk_score": 8.0,
|
|
"factor": "破壞性操作: 涉及刪除資源",
|
|
"recommendation": "確保有備份,並考慮使用 --dry-run 先行測試",
|
|
},
|
|
"force_operation": {
|
|
"patterns": ["--force", "-f", "--no-wait", "--grace-period=0"],
|
|
"risk_score": 7.0,
|
|
"factor": "強制操作: 跳過安全確認",
|
|
"recommendation": "移除 --force 參數,使用標準流程",
|
|
},
|
|
"privileged_namespace": {
|
|
"patterns": ["kube-system", "kube-public", "default"],
|
|
"risk_score": 9.0,
|
|
"factor": "敏感命名空間: 操作影響 K8s 核心組件",
|
|
"recommendation": "確認是否真的需要操作系統命名空間",
|
|
},
|
|
"secret_operation": {
|
|
"patterns": ["secret", "configmap", "credential", "password", "token"],
|
|
"risk_score": 8.5,
|
|
"factor": "敏感資料: 操作涉及機密資訊",
|
|
"recommendation": "確保日誌不會記錄機密內容",
|
|
},
|
|
"network_policy": {
|
|
"patterns": ["networkpolicy", "ingress", "egress", "firewall"],
|
|
"risk_score": 7.5,
|
|
"factor": "網路變更: 可能影響服務連通性",
|
|
"recommendation": "變更前確認流量影響範圍",
|
|
},
|
|
"rbac_operation": {
|
|
"patterns": ["role", "rolebinding", "clusterrole", "serviceaccount"],
|
|
"risk_score": 9.0,
|
|
"factor": "權限變更: 操作涉及 RBAC 設定",
|
|
"recommendation": "最小權限原則,避免過度授權",
|
|
},
|
|
"scale_to_zero": {
|
|
"patterns": ["replicas=0", "replicas 0", "scale --replicas=0"],
|
|
"risk_score": 8.0,
|
|
"factor": "服務中斷: 副本數設為 0",
|
|
"recommendation": "確認是否為計畫性維護",
|
|
},
|
|
"rollback": {
|
|
"patterns": ["rollout undo", "rollback"],
|
|
"risk_score": 5.0,
|
|
"factor": "回滾操作: 相對安全但需確認目標版本",
|
|
"recommendation": "確認回滾目標版本是穩定的",
|
|
},
|
|
"restart": {
|
|
"patterns": ["rollout restart", "restart"],
|
|
"risk_score": 3.0,
|
|
"factor": "重啟操作: 低風險但可能造成短暫中斷",
|
|
"recommendation": "確認服務有足夠副本處理滾動重啟",
|
|
},
|
|
}
|
|
|
|
|
|
class SecurityAgent(BaseAgent[SecurityResult]):
|
|
"""
|
|
安全風險評估專家 Agent
|
|
|
|
分析流程:
|
|
1. 本地規則引擎快速掃描 (毫秒級)
|
|
2. LLM 深度分析 (可選,複雜場景)
|
|
3. 綜合評分
|
|
|
|
使用方式:
|
|
```python
|
|
agent = SecurityAgent()
|
|
result = await agent.analyze({
|
|
"action": "kubectl delete pod nginx-xxx",
|
|
"namespace": "awoooi-prod",
|
|
"affected_services": ["nginx", "frontend"],
|
|
})
|
|
print(result.risk_score) # 0-10
|
|
```
|
|
"""
|
|
|
|
AGENT_NAME = "security-expert"
|
|
AGENT_DESCRIPTION = "資安專家,評估安全風險與權限影響"
|
|
AGENT_TOOLS = ["Read", "Grep"] # 只讀權限
|
|
|
|
def __init__(self, timeout_sec: float = 30.0, use_llm: bool = False):
|
|
"""
|
|
初始化 SecurityAgent
|
|
|
|
Args:
|
|
timeout_sec: 執行超時時間
|
|
use_llm: 是否啟用 LLM 深度分析 (Phase 9.4 擴展)
|
|
"""
|
|
super().__init__(timeout_sec)
|
|
self.use_llm = use_llm
|
|
|
|
async def analyze(self, context: dict[str, Any]) -> SecurityResult:
|
|
"""
|
|
執行安全風險分析
|
|
|
|
Args:
|
|
context: 分析上下文
|
|
- action: 要執行的指令
|
|
- namespace: 目標命名空間
|
|
- affected_services: 受影響服務列表
|
|
- incident_id: 事件 ID (可選)
|
|
|
|
Returns:
|
|
SecurityResult 包含風險評分和詳細分析
|
|
"""
|
|
start_time = time.time()
|
|
|
|
self.logger.info(
|
|
"security_analysis_start",
|
|
action=context.get("action", "")[:100],
|
|
namespace=context.get("namespace"),
|
|
)
|
|
|
|
try:
|
|
# Phase 1: 本地規則引擎 (同步、快速)
|
|
rule_result = self._rule_engine_analyze(context)
|
|
|
|
# Phase 2: LLM 深度分析 (可選,未來擴展)
|
|
if self.use_llm and rule_result["risk_score"] >= 7.0:
|
|
# 高風險場景啟用 LLM 二次確認
|
|
# TODO: Phase 9.4 實作 LLM 分析
|
|
pass
|
|
|
|
latency_ms = int((time.time() - start_time) * 1000)
|
|
|
|
result = SecurityResult(
|
|
agent_name=self.AGENT_NAME,
|
|
status=AgentStatus.SUCCESS,
|
|
confidence=rule_result["confidence"],
|
|
analysis=rule_result["analysis"],
|
|
latency_ms=latency_ms,
|
|
risk_score=rule_result["risk_score"],
|
|
risk_factors=rule_result["risk_factors"],
|
|
permission_issues=rule_result["permission_issues"],
|
|
recommendations=rule_result["recommendations"],
|
|
raw_response=rule_result,
|
|
)
|
|
|
|
self.logger.info(
|
|
"security_analysis_complete",
|
|
risk_score=result.risk_score,
|
|
latency_ms=latency_ms,
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
latency_ms = int((time.time() - start_time) * 1000)
|
|
|
|
self.logger.exception(
|
|
"security_analysis_error",
|
|
error=str(e),
|
|
)
|
|
|
|
return SecurityResult(
|
|
agent_name=self.AGENT_NAME,
|
|
status=AgentStatus.FAILED,
|
|
confidence=0.0,
|
|
analysis=f"分析失敗: {str(e)}",
|
|
latency_ms=latency_ms,
|
|
error=str(e),
|
|
risk_score=10.0, # 失敗時預設最高風險
|
|
risk_factors=["分析過程發生錯誤"],
|
|
recommendations=["請人工審核此操作"],
|
|
)
|
|
|
|
def _rule_engine_analyze(self, context: dict[str, Any]) -> dict[str, Any]:
|
|
"""
|
|
本地規則引擎分析
|
|
|
|
快速檢查常見安全模式,毫秒級回應
|
|
"""
|
|
action = context.get("action", "").lower()
|
|
namespace = context.get("namespace", "").lower()
|
|
affected_services = context.get("affected_services", [])
|
|
|
|
risk_factors: list[str] = []
|
|
recommendations: list[str] = []
|
|
permission_issues: list[str] = []
|
|
max_risk_score: float = 0.0
|
|
|
|
# 掃描所有安全規則
|
|
for rule_name, rule in SECURITY_RULES.items():
|
|
patterns = rule["patterns"]
|
|
|
|
# 檢查 action
|
|
if any(pattern in action for pattern in patterns):
|
|
risk_factors.append(rule["factor"])
|
|
recommendations.append(rule["recommendation"])
|
|
max_risk_score = max(max_risk_score, rule["risk_score"])
|
|
|
|
# 檢查 namespace
|
|
if rule_name == "privileged_namespace":
|
|
if any(pattern in namespace for pattern in patterns):
|
|
risk_factors.append(rule["factor"])
|
|
recommendations.append(rule["recommendation"])
|
|
max_risk_score = max(max_risk_score, rule["risk_score"])
|
|
|
|
# 檢查受影響服務數量
|
|
if len(affected_services) > 5:
|
|
risk_factors.append(f"大範圍影響: 涉及 {len(affected_services)} 個服務")
|
|
max_risk_score = max(max_risk_score, 6.0)
|
|
recommendations.append("考慮分批執行,降低爆炸半徑")
|
|
|
|
# 檢查是否涉及生產環境
|
|
if "prod" in namespace:
|
|
if max_risk_score < 5.0:
|
|
max_risk_score = 5.0 # 生產環境最低風險 5
|
|
permission_issues.append("操作目標為生產環境")
|
|
|
|
# 如果沒有匹配任何規則,給予基礎評分
|
|
if not risk_factors:
|
|
risk_factors.append("未偵測到明顯風險因素")
|
|
max_risk_score = 2.0 # 基礎低風險
|
|
|
|
# 🔴 規則匹配,非 AI 分析,信心度設 0
|
|
confidence = 0.0
|
|
|
|
# 生成分析摘要
|
|
if max_risk_score >= 8.0:
|
|
analysis = f"高風險操作 (Score: {max_risk_score}/10): 建議人工審核"
|
|
elif max_risk_score >= 5.0:
|
|
analysis = f"中等風險 (Score: {max_risk_score}/10): 確認影響範圍後執行"
|
|
else:
|
|
analysis = f"低風險操作 (Score: {max_risk_score}/10): 可安全執行"
|
|
|
|
return {
|
|
"risk_score": max_risk_score,
|
|
"risk_factors": risk_factors,
|
|
"recommendations": list(set(recommendations)), # 去重
|
|
"permission_issues": permission_issues,
|
|
"confidence": confidence,
|
|
"analysis": analysis,
|
|
"rules_matched": len(risk_factors),
|
|
}
|
|
|
|
def _build_prompt(self, context: dict[str, Any]) -> str:
|
|
"""建構 LLM Prompt (Phase 9.4 擴展)"""
|
|
return f"""你是 AWOOOI 的資安專家。
|
|
分析以下操作的安全風險:
|
|
|
|
操作指令: {context.get("action", "N/A")}
|
|
目標命名空間: {context.get("namespace", "N/A")}
|
|
受影響服務: {", ".join(context.get("affected_services", []))}
|
|
|
|
評估:
|
|
1. 是否涉及敏感資料
|
|
2. 是否可能被利用
|
|
3. 權限邊界是否被突破
|
|
|
|
輸出 JSON:
|
|
```json
|
|
{{
|
|
"risk_score": 0-10,
|
|
"risk_factors": ["...", "..."],
|
|
"permission_issues": ["...", "..."],
|
|
"recommendations": ["...", "..."],
|
|
"analysis": "一句話摘要",
|
|
"confidence": 0-1
|
|
}}
|
|
```"""
|
|
|
|
def _parse_response(self, response: str) -> dict[str, Any]:
|
|
"""解析 LLM 回應"""
|
|
return self._extract_json(response)
|