根因:host_resource_alert 規則使用 {host}(由 instance label 派生),
與 {target} 無關;但 host 告警缺少 K8s deployment label 導致 target=unknown,
_is_bad_target=True → kubectl_command 被清空 → auto_approve 以
no_executable_action 拒絕 → 每日 3 次人工攔截。
修復:
- alert_rule_engine.py: SSH 指令(startswith "ssh ")跳過 bad_target 驗證
- prompts.py: 主 + Nemo prompt 補 Host* 告警 SSH 診斷規則,防 LLM fallback 路徑輸出 kubectl
- ssh_command_whitelist.py: 新建唯讀 SSH 指令白名單模組(供 _ssh_execute() 執行前驗證)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
122 lines
2.7 KiB
Python
122 lines
2.7 KiB
Python
"""
|
||
SSH 唯讀診斷指令白名單 — 防止 RCE
|
||
|
||
2026-05-04 ogt + Claude Sonnet 4.6:
|
||
設計原則:
|
||
- 只允許純唯讀指令(top/uptime/free/df/ps/cat /proc/*)
|
||
- 禁止任何 shell metachar(compound/pipeline/substitution/redirect)
|
||
- 由 validate_kubectl_command() 的 "ssh " 豁免確保進規則路徑;
|
||
本模組為補充安全層,供未來 _ssh_execute() 執行前再次驗證。
|
||
"""
|
||
|
||
# 允許的指令前綴(唯讀診斷用)
|
||
_SAFE_PREFIXES: tuple[str, ...] = (
|
||
"top -bn1",
|
||
"uptime",
|
||
"free -m",
|
||
"free -h",
|
||
"df -h",
|
||
"df -hT",
|
||
"iostat",
|
||
"vmstat",
|
||
"ps aux",
|
||
"ps -ef",
|
||
"cat /proc/loadavg",
|
||
"cat /proc/meminfo",
|
||
"cat /proc/cpuinfo",
|
||
"echo ",
|
||
)
|
||
|
||
# 禁止的 shell metachar — 防止指令注入
|
||
_DANGEROUS_PATTERNS: tuple[str, ...] = (
|
||
";",
|
||
"&&",
|
||
"||",
|
||
"|",
|
||
">",
|
||
"<",
|
||
"`",
|
||
"$(",
|
||
"${",
|
||
"\\n",
|
||
"\n",
|
||
"rm ",
|
||
"kill ",
|
||
"pkill ",
|
||
"reboot",
|
||
"shutdown",
|
||
"poweroff",
|
||
"dd ",
|
||
"mkfs",
|
||
"fdisk",
|
||
"wget ",
|
||
"curl ",
|
||
"nc ",
|
||
"ncat ",
|
||
"bash ",
|
||
"sh ",
|
||
"python",
|
||
"perl",
|
||
"ruby",
|
||
"exec ",
|
||
)
|
||
|
||
|
||
def is_safe_ssh_command(cmd: str) -> bool:
|
||
"""
|
||
驗證 SSH 指令是否為唯讀安全指令。
|
||
|
||
規則:
|
||
1. 必須以 _SAFE_PREFIXES 其中一個前綴開頭
|
||
2. 不得含有任何 _DANGEROUS_PATTERNS
|
||
|
||
Args:
|
||
cmd: 要驗證的 SSH 指令字串(不含前導 "ssh <host> '" 包裝)
|
||
|
||
Returns:
|
||
True — 安全,可執行
|
||
False — 含危險模式,應拒絕
|
||
"""
|
||
if not cmd or not cmd.strip():
|
||
return False
|
||
|
||
cmd_stripped = cmd.strip()
|
||
|
||
# 先阻擋危險模式(優先於前綴白名單)
|
||
for pattern in _DANGEROUS_PATTERNS:
|
||
if pattern in cmd_stripped:
|
||
return False
|
||
|
||
# 必須以白名單前綴開頭
|
||
return any(cmd_stripped.startswith(prefix) for prefix in _SAFE_PREFIXES)
|
||
|
||
|
||
def extract_inner_command(ssh_cmd: str) -> str:
|
||
"""
|
||
從完整 SSH 指令中提取內層指令供 is_safe_ssh_command() 驗證。
|
||
|
||
範例:
|
||
"ssh 192.168.1.1 'uptime'" → "uptime"
|
||
"ssh -o StrictHostKeyChecking=no host 'free -m'" → "free -m"
|
||
|
||
Args:
|
||
ssh_cmd: 完整 SSH 指令字串
|
||
|
||
Returns:
|
||
內層指令字串;若解析失敗返回原始字串
|
||
"""
|
||
if not ssh_cmd.startswith("ssh "):
|
||
return ssh_cmd
|
||
|
||
# 找最後一對單引號或雙引號包裝的指令
|
||
for quote in ("'", '"'):
|
||
last_open = ssh_cmd.rfind(quote)
|
||
if last_open == -1:
|
||
continue
|
||
first_open = ssh_cmd.find(quote)
|
||
if first_open == last_open:
|
||
continue
|
||
return ssh_cmd[first_open + 1 : last_open]
|
||
|
||
return ssh_cmd
|