531 lines
16 KiB
Python
531 lines
16 KiB
Python
"""
|
|
Structured safety parser for remediation actions.
|
|
|
|
SPF-2 replaces the old single-regex kubectl whitelist with a small token parser.
|
|
The parser intentionally supports only the kubectl forms AWOOOI can safely
|
|
auto-execute; anything outside that grammar falls back to human review.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import shlex
|
|
from dataclasses import dataclass, field
|
|
from enum import StrEnum
|
|
|
|
KUBECTL_MAX_LEN = 500
|
|
|
|
_FORBIDDEN_RAW_CHARS = frozenset("\n\r\t\f\v;&|<>`$")
|
|
_SAFE_TOKEN_CHARS = frozenset(
|
|
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
|
"abcdefghijklmnopqrstuvwxyz"
|
|
"0123456789"
|
|
"-_./:=,@"
|
|
)
|
|
|
|
_RESOURCE_ALIASES: dict[str, str] = {
|
|
"deploy": "deployment",
|
|
"deployment": "deployment",
|
|
"deployments": "deployment",
|
|
"ds": "daemonset",
|
|
"daemonset": "daemonset",
|
|
"daemonsets": "daemonset",
|
|
"pod": "pod",
|
|
"pods": "pod",
|
|
"po": "pod",
|
|
"service": "service",
|
|
"services": "service",
|
|
"svc": "service",
|
|
"statefulset": "statefulset",
|
|
"statefulsets": "statefulset",
|
|
"sts": "statefulset",
|
|
"configmap": "configmap",
|
|
"configmaps": "configmap",
|
|
"cm": "configmap",
|
|
"node": "node",
|
|
"nodes": "node",
|
|
}
|
|
|
|
_ROLLING_RESOURCES = frozenset({"deployment", "statefulset", "daemonset"})
|
|
_SCALABLE_RESOURCES = frozenset({"deployment", "statefulset"})
|
|
_READONLY_VERBS = frozenset({"get", "describe", "logs", "top", "version"})
|
|
_MUTATING_VERBS = frozenset({"rollout", "scale", "delete"})
|
|
|
|
|
|
class ActionKind(StrEnum):
|
|
"""High-level parsed action kind."""
|
|
|
|
READONLY = "readonly"
|
|
ROLLOUT = "rollout"
|
|
SCALE = "scale"
|
|
AUTOSCALE = "autoscale"
|
|
SET_RESOURCES = "set_resources"
|
|
DELETE_POD = "delete_pod"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ParsedKubectlAction:
|
|
"""Parsed kubectl action with safety decision."""
|
|
|
|
ok: bool
|
|
reason: str
|
|
kind: ActionKind | None = None
|
|
verb: str | None = None
|
|
subverb: str | None = None
|
|
resource_type: str | None = None
|
|
resource_name: str | None = None
|
|
namespace: str | None = None
|
|
flags: tuple[str, ...] = field(default_factory=tuple)
|
|
|
|
|
|
def is_safe_kubectl_action(command: str) -> bool:
|
|
"""Return True when the command is in the allowed kubectl action grammar."""
|
|
|
|
return parse_kubectl_action(command).ok
|
|
|
|
|
|
def kubectl_safety_reason(command: str) -> str | None:
|
|
"""Return None for a safe kubectl command, otherwise the parser reason.
|
|
|
|
Non-kubectl commands are outside this parser's scope and return None so
|
|
SSH / host-repair gates can keep their own policy.
|
|
"""
|
|
|
|
command = (command or "").strip()
|
|
if not command.lower().startswith("kubectl"):
|
|
return None
|
|
parsed = parse_kubectl_action(command)
|
|
return None if parsed.ok else parsed.reason
|
|
|
|
|
|
def parse_kubectl_action(command: str) -> ParsedKubectlAction:
|
|
"""Parse and validate a kubectl command for auto-execute safety.
|
|
|
|
The grammar is intentionally narrow:
|
|
- readonly: get/describe/logs/top/version with bounded, known-safe flags
|
|
- rollout: rollout restart on workload resources
|
|
- scale: scale deployment/statefulset to a positive replica count
|
|
- autoscale: HPA bounds on deployment/statefulset with positive min/max
|
|
- set resources: CPU/memory requests/limits on deployment/statefulset
|
|
- delete: delete one pod by name only
|
|
"""
|
|
|
|
command = (command or "").strip()
|
|
if not command:
|
|
return _reject("empty")
|
|
if len(command) > KUBECTL_MAX_LEN:
|
|
return _reject("too_long")
|
|
if any(ch in command for ch in _FORBIDDEN_RAW_CHARS):
|
|
return _reject("forbidden_shell_metachar")
|
|
if any(ord(ch) < 32 or ord(ch) > 126 for ch in command):
|
|
return _reject("non_ascii_or_control")
|
|
|
|
try:
|
|
tokens = shlex.split(command, posix=True)
|
|
except ValueError:
|
|
return _reject("invalid_shell_syntax")
|
|
|
|
if not tokens or tokens[0] != "kubectl":
|
|
return _reject("not_kubectl")
|
|
if any(not token or not set(token) <= _SAFE_TOKEN_CHARS for token in tokens):
|
|
return _reject("invalid_token_chars")
|
|
|
|
body = tokens[1:]
|
|
namespace, body, namespace_flags = _consume_namespace_flags(body)
|
|
if not body:
|
|
return _reject("missing_verb")
|
|
|
|
verb = body[0]
|
|
rest = body[1:]
|
|
if verb in _READONLY_VERBS:
|
|
return _parse_readonly(verb, rest, namespace, namespace_flags)
|
|
if verb == "rollout":
|
|
return _parse_rollout(rest, namespace, namespace_flags)
|
|
if verb == "scale":
|
|
return _parse_scale(rest, namespace, namespace_flags)
|
|
if verb == "autoscale":
|
|
return _parse_autoscale(rest, namespace, namespace_flags)
|
|
if verb == "set":
|
|
return _parse_set(rest, namespace, namespace_flags)
|
|
if verb == "delete":
|
|
return _parse_delete(rest, namespace, namespace_flags)
|
|
return _reject("unsupported_verb")
|
|
|
|
|
|
def _reject(reason: str) -> ParsedKubectlAction:
|
|
return ParsedKubectlAction(ok=False, reason=reason)
|
|
|
|
|
|
def _consume_namespace_flags(tokens: list[str]) -> tuple[str | None, list[str], list[str]]:
|
|
namespace: str | None = None
|
|
remaining: list[str] = []
|
|
namespace_flags: list[str] = []
|
|
i = 0
|
|
while i < len(tokens):
|
|
token = tokens[i]
|
|
if token in {"-n", "--namespace"}:
|
|
if i + 1 >= len(tokens):
|
|
remaining.append(token)
|
|
i += 1
|
|
continue
|
|
namespace = tokens[i + 1]
|
|
namespace_flags.extend([token, tokens[i + 1]])
|
|
i += 2
|
|
continue
|
|
if token.startswith("--namespace="):
|
|
namespace = token.split("=", 1)[1]
|
|
namespace_flags.append(token)
|
|
i += 1
|
|
continue
|
|
remaining.append(token)
|
|
i += 1
|
|
return namespace, remaining, namespace_flags
|
|
|
|
|
|
def _normalize_resource(value: str) -> str:
|
|
return _RESOURCE_ALIASES.get(value.lower(), value.lower())
|
|
|
|
|
|
def _split_resource_ref(tokens: list[str]) -> tuple[str | None, str | None, list[str]]:
|
|
if not tokens:
|
|
return None, None, []
|
|
|
|
first = tokens[0]
|
|
if "/" in first:
|
|
resource_type, resource_name = first.split("/", 1)
|
|
return _normalize_resource(resource_type), resource_name, tokens[1:]
|
|
|
|
resource_type = _normalize_resource(first)
|
|
if len(tokens) >= 2 and not tokens[1].startswith("-"):
|
|
return resource_type, tokens[1], tokens[2:]
|
|
return resource_type, None, tokens[1:]
|
|
|
|
|
|
def _parse_readonly(
|
|
verb: str,
|
|
tokens: list[str],
|
|
namespace: str | None,
|
|
namespace_flags: list[str],
|
|
) -> ParsedKubectlAction:
|
|
if verb == "version":
|
|
if tokens:
|
|
return _reject("version_disallows_args")
|
|
return ParsedKubectlAction(
|
|
ok=True,
|
|
reason="ok",
|
|
kind=ActionKind.READONLY,
|
|
verb=verb,
|
|
namespace=namespace,
|
|
flags=tuple(namespace_flags),
|
|
)
|
|
|
|
resource_type, resource_name, rest = _split_resource_ref(tokens)
|
|
if not resource_type:
|
|
return _reject("missing_readonly_resource")
|
|
|
|
allowed_flags = {
|
|
"--all-namespaces",
|
|
"--no-headers",
|
|
"--output",
|
|
"-o",
|
|
"--previous",
|
|
"--show-labels",
|
|
"--since",
|
|
"--since-time",
|
|
"--tail",
|
|
"--timestamps",
|
|
"--selector",
|
|
"-l",
|
|
"--container",
|
|
"-c",
|
|
"--watch",
|
|
"-w",
|
|
}
|
|
if not _flags_allowed(rest, allowed_flags):
|
|
return _reject("unsupported_readonly_flag")
|
|
|
|
return ParsedKubectlAction(
|
|
ok=True,
|
|
reason="ok",
|
|
kind=ActionKind.READONLY,
|
|
verb=verb,
|
|
resource_type=resource_type,
|
|
resource_name=resource_name,
|
|
namespace=namespace,
|
|
flags=tuple(namespace_flags + rest),
|
|
)
|
|
|
|
|
|
def _parse_rollout(
|
|
tokens: list[str],
|
|
namespace: str | None,
|
|
namespace_flags: list[str],
|
|
) -> ParsedKubectlAction:
|
|
if len(tokens) < 2:
|
|
return _reject("rollout_missing_args")
|
|
subverb = tokens[0]
|
|
if subverb != "restart":
|
|
return _reject("unsupported_rollout_subverb")
|
|
|
|
resource_type, resource_name, rest = _split_resource_ref(tokens[1:])
|
|
if resource_type not in _ROLLING_RESOURCES or not resource_name:
|
|
return _reject("invalid_rollout_resource")
|
|
if rest:
|
|
return _reject("unsupported_rollout_flag")
|
|
|
|
return ParsedKubectlAction(
|
|
ok=True,
|
|
reason="ok",
|
|
kind=ActionKind.ROLLOUT,
|
|
verb="rollout",
|
|
subverb=subverb,
|
|
resource_type=resource_type,
|
|
resource_name=resource_name,
|
|
namespace=namespace,
|
|
flags=tuple(namespace_flags),
|
|
)
|
|
|
|
|
|
def _parse_scale(
|
|
tokens: list[str],
|
|
namespace: str | None,
|
|
namespace_flags: list[str],
|
|
) -> ParsedKubectlAction:
|
|
resource_type, resource_name, rest = _split_resource_ref(tokens)
|
|
if resource_type not in _SCALABLE_RESOURCES or not resource_name:
|
|
return _reject("invalid_scale_resource")
|
|
|
|
replicas: int | None = None
|
|
remaining_flags: list[str] = []
|
|
i = 0
|
|
while i < len(rest):
|
|
token = rest[i]
|
|
if token == "--replicas":
|
|
if i + 1 >= len(rest):
|
|
return _reject("replicas_missing_value")
|
|
replicas = _parse_positive_int(rest[i + 1])
|
|
remaining_flags.extend([token, rest[i + 1]])
|
|
i += 2
|
|
continue
|
|
if token.startswith("--replicas="):
|
|
replicas = _parse_positive_int(token.split("=", 1)[1])
|
|
remaining_flags.append(token)
|
|
i += 1
|
|
continue
|
|
return _reject("unsupported_scale_flag")
|
|
|
|
if replicas is None:
|
|
return _reject("replicas_required")
|
|
if replicas < 1:
|
|
return _reject("replicas_must_be_positive")
|
|
|
|
return ParsedKubectlAction(
|
|
ok=True,
|
|
reason="ok",
|
|
kind=ActionKind.SCALE,
|
|
verb="scale",
|
|
resource_type=resource_type,
|
|
resource_name=resource_name,
|
|
namespace=namespace,
|
|
flags=tuple(namespace_flags + remaining_flags),
|
|
)
|
|
|
|
|
|
def _parse_autoscale(
|
|
tokens: list[str],
|
|
namespace: str | None,
|
|
namespace_flags: list[str],
|
|
) -> ParsedKubectlAction:
|
|
resource_type, resource_name, rest = _split_resource_ref(tokens)
|
|
if resource_type not in _SCALABLE_RESOURCES or not resource_name:
|
|
return _reject("invalid_autoscale_resource")
|
|
|
|
min_replicas: int | None = None
|
|
max_replicas: int | None = None
|
|
cpu_percent: int | None = None
|
|
remaining_flags: list[str] = []
|
|
i = 0
|
|
while i < len(rest):
|
|
token = rest[i]
|
|
flag, raw_value, consumed = _consume_required_flag_value(
|
|
rest,
|
|
i,
|
|
{"--min", "--max", "--cpu-percent"},
|
|
)
|
|
if not flag or raw_value is None:
|
|
return _reject("unsupported_autoscale_flag")
|
|
value = _parse_positive_int(raw_value)
|
|
if value < 1:
|
|
return _reject("autoscale_value_must_be_positive")
|
|
if flag == "--min":
|
|
min_replicas = value
|
|
elif flag == "--max":
|
|
max_replicas = value
|
|
elif flag == "--cpu-percent":
|
|
cpu_percent = value
|
|
remaining_flags.extend(rest[i:i + consumed])
|
|
i += consumed
|
|
|
|
if min_replicas is None or max_replicas is None:
|
|
return _reject("autoscale_min_max_required")
|
|
if max_replicas < min_replicas:
|
|
return _reject("autoscale_max_below_min")
|
|
if cpu_percent is not None and cpu_percent > 100:
|
|
return _reject("autoscale_cpu_percent_out_of_range")
|
|
|
|
return ParsedKubectlAction(
|
|
ok=True,
|
|
reason="ok",
|
|
kind=ActionKind.AUTOSCALE,
|
|
verb="autoscale",
|
|
resource_type=resource_type,
|
|
resource_name=resource_name,
|
|
namespace=namespace,
|
|
flags=tuple(namespace_flags + remaining_flags),
|
|
)
|
|
|
|
|
|
def _parse_set(
|
|
tokens: list[str],
|
|
namespace: str | None,
|
|
namespace_flags: list[str],
|
|
) -> ParsedKubectlAction:
|
|
if not tokens or tokens[0] != "resources":
|
|
return _reject("unsupported_set_subverb")
|
|
resource_type, resource_name, rest = _split_resource_ref(tokens[1:])
|
|
if resource_type not in _SCALABLE_RESOURCES or not resource_name:
|
|
return _reject("invalid_set_resources_target")
|
|
|
|
saw_resource_flag = False
|
|
remaining_flags: list[str] = []
|
|
i = 0
|
|
while i < len(rest):
|
|
flag, raw_value, consumed = _consume_required_flag_value(
|
|
rest,
|
|
i,
|
|
{"--limits", "--requests"},
|
|
)
|
|
if not flag or raw_value is None:
|
|
return _reject("unsupported_set_resources_flag")
|
|
if not _resource_quantity_assignments_safe(raw_value):
|
|
return _reject("invalid_resource_quantity")
|
|
saw_resource_flag = True
|
|
remaining_flags.extend(rest[i:i + consumed])
|
|
i += consumed
|
|
|
|
if not saw_resource_flag:
|
|
return _reject("set_resources_requires_limits_or_requests")
|
|
|
|
return ParsedKubectlAction(
|
|
ok=True,
|
|
reason="ok",
|
|
kind=ActionKind.SET_RESOURCES,
|
|
verb="set",
|
|
subverb="resources",
|
|
resource_type=resource_type,
|
|
resource_name=resource_name,
|
|
namespace=namespace,
|
|
flags=tuple(namespace_flags + remaining_flags),
|
|
)
|
|
|
|
|
|
def _parse_delete(
|
|
tokens: list[str],
|
|
namespace: str | None,
|
|
namespace_flags: list[str],
|
|
) -> ParsedKubectlAction:
|
|
resource_type, resource_name, rest = _split_resource_ref(tokens)
|
|
if resource_type != "pod" or not resource_name:
|
|
return _reject("delete_only_allows_single_pod")
|
|
if rest:
|
|
return _reject("unsupported_delete_flag")
|
|
if resource_name in {"--all", "all"}:
|
|
return _reject("delete_all_disallowed")
|
|
|
|
return ParsedKubectlAction(
|
|
ok=True,
|
|
reason="ok",
|
|
kind=ActionKind.DELETE_POD,
|
|
verb="delete",
|
|
resource_type=resource_type,
|
|
resource_name=resource_name,
|
|
namespace=namespace,
|
|
flags=tuple(namespace_flags),
|
|
)
|
|
|
|
|
|
def _parse_positive_int(value: str) -> int:
|
|
if not value.isdigit():
|
|
return -1
|
|
return int(value)
|
|
|
|
|
|
def _consume_required_flag_value(
|
|
tokens: list[str],
|
|
index: int,
|
|
allowed_flags: set[str],
|
|
) -> tuple[str | None, str | None, int]:
|
|
token = tokens[index]
|
|
if "=" in token:
|
|
flag, value = token.split("=", 1)
|
|
if flag not in allowed_flags or not value:
|
|
return None, None, 1
|
|
return flag, value, 1
|
|
|
|
if token not in allowed_flags or index + 1 >= len(tokens):
|
|
return None, None, 1
|
|
value = tokens[index + 1]
|
|
if not value or value.startswith("-"):
|
|
return None, None, 1
|
|
return token, value, 2
|
|
|
|
|
|
def _resource_quantity_assignments_safe(value: str) -> bool:
|
|
parts = value.split(",")
|
|
if not parts:
|
|
return False
|
|
for part in parts:
|
|
key, separator, quantity = part.partition("=")
|
|
if separator != "=":
|
|
return False
|
|
if key not in {"cpu", "memory"}:
|
|
return False
|
|
if not quantity or not set(quantity) <= _SAFE_TOKEN_CHARS:
|
|
return False
|
|
if quantity in {"0", "0m", "0Mi", "0Gi"}:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _flags_allowed(tokens: list[str], allowed_flags: set[str]) -> bool:
|
|
i = 0
|
|
while i < len(tokens):
|
|
token = tokens[i]
|
|
if not token.startswith("-"):
|
|
return False
|
|
|
|
flag = token.split("=", 1)[0]
|
|
if flag not in allowed_flags:
|
|
return False
|
|
if "=" in token:
|
|
i += 1
|
|
continue
|
|
|
|
requires_value = flag in {
|
|
"--output",
|
|
"-o",
|
|
"--since",
|
|
"--since-time",
|
|
"--tail",
|
|
"--selector",
|
|
"-l",
|
|
"--container",
|
|
"-c",
|
|
}
|
|
if requires_value:
|
|
if i + 1 >= len(tokens) or tokens[i + 1].startswith("-"):
|
|
return False
|
|
i += 2
|
|
continue
|
|
i += 1
|
|
return True
|