""" Structured safety parser for remediation actions. SPF-2 replaces the old single-regex kubectl whitelist with a small token parser. The parser intentionally supports only the kubectl forms AWOOOI can safely auto-execute; anything outside that grammar falls back to human review. """ from __future__ import annotations import shlex from dataclasses import dataclass, field from enum import StrEnum KUBECTL_MAX_LEN = 500 _FORBIDDEN_RAW_CHARS = frozenset("\n\r\t\f\v;&|<>`$") _SAFE_TOKEN_CHARS = frozenset( "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz" "0123456789" "-_./:=,@" ) _RESOURCE_ALIASES: dict[str, str] = { "deploy": "deployment", "deployment": "deployment", "deployments": "deployment", "ds": "daemonset", "daemonset": "daemonset", "daemonsets": "daemonset", "pod": "pod", "pods": "pod", "po": "pod", "service": "service", "services": "service", "svc": "service", "statefulset": "statefulset", "statefulsets": "statefulset", "sts": "statefulset", "configmap": "configmap", "configmaps": "configmap", "cm": "configmap", "node": "node", "nodes": "node", } _ROLLING_RESOURCES = frozenset({"deployment", "statefulset", "daemonset"}) _SCALABLE_RESOURCES = frozenset({"deployment", "statefulset"}) _READONLY_VERBS = frozenset({"get", "describe", "logs", "top", "version"}) _MUTATING_VERBS = frozenset({"rollout", "scale", "delete"}) class ActionKind(StrEnum): """High-level parsed action kind.""" READONLY = "readonly" ROLLOUT = "rollout" SCALE = "scale" AUTOSCALE = "autoscale" SET_RESOURCES = "set_resources" DELETE_POD = "delete_pod" @dataclass(frozen=True) class ParsedKubectlAction: """Parsed kubectl action with safety decision.""" ok: bool reason: str kind: ActionKind | None = None verb: str | None = None subverb: str | None = None resource_type: str | None = None resource_name: str | None = None namespace: str | None = None flags: tuple[str, ...] = field(default_factory=tuple) def is_safe_kubectl_action(command: str) -> bool: """Return True when the command is in the allowed kubectl action grammar.""" return parse_kubectl_action(command).ok def kubectl_safety_reason(command: str) -> str | None: """Return None for a safe kubectl command, otherwise the parser reason. Non-kubectl commands are outside this parser's scope and return None so SSH / host-repair gates can keep their own policy. """ command = (command or "").strip() if not command.lower().startswith("kubectl"): return None parsed = parse_kubectl_action(command) return None if parsed.ok else parsed.reason def parse_kubectl_action(command: str) -> ParsedKubectlAction: """Parse and validate a kubectl command for auto-execute safety. The grammar is intentionally narrow: - readonly: get/describe/logs/top/version with bounded, known-safe flags - rollout: rollout restart on workload resources - scale: scale deployment/statefulset to a positive replica count - autoscale: HPA bounds on deployment/statefulset with positive min/max - set resources: CPU/memory requests/limits on deployment/statefulset - delete: delete one pod by name only """ command = (command or "").strip() if not command: return _reject("empty") if len(command) > KUBECTL_MAX_LEN: return _reject("too_long") if any(ch in command for ch in _FORBIDDEN_RAW_CHARS): return _reject("forbidden_shell_metachar") if any(ord(ch) < 32 or ord(ch) > 126 for ch in command): return _reject("non_ascii_or_control") try: tokens = shlex.split(command, posix=True) except ValueError: return _reject("invalid_shell_syntax") if not tokens or tokens[0] != "kubectl": return _reject("not_kubectl") if any(not token or not set(token) <= _SAFE_TOKEN_CHARS for token in tokens): return _reject("invalid_token_chars") body = tokens[1:] namespace, body, namespace_flags = _consume_namespace_flags(body) if not body: return _reject("missing_verb") verb = body[0] rest = body[1:] if verb in _READONLY_VERBS: return _parse_readonly(verb, rest, namespace, namespace_flags) if verb == "rollout": return _parse_rollout(rest, namespace, namespace_flags) if verb == "scale": return _parse_scale(rest, namespace, namespace_flags) if verb == "autoscale": return _parse_autoscale(rest, namespace, namespace_flags) if verb == "set": return _parse_set(rest, namespace, namespace_flags) if verb == "delete": return _parse_delete(rest, namespace, namespace_flags) return _reject("unsupported_verb") def _reject(reason: str) -> ParsedKubectlAction: return ParsedKubectlAction(ok=False, reason=reason) def _consume_namespace_flags(tokens: list[str]) -> tuple[str | None, list[str], list[str]]: namespace: str | None = None remaining: list[str] = [] namespace_flags: list[str] = [] i = 0 while i < len(tokens): token = tokens[i] if token in {"-n", "--namespace"}: if i + 1 >= len(tokens): remaining.append(token) i += 1 continue namespace = tokens[i + 1] namespace_flags.extend([token, tokens[i + 1]]) i += 2 continue if token.startswith("--namespace="): namespace = token.split("=", 1)[1] namespace_flags.append(token) i += 1 continue remaining.append(token) i += 1 return namespace, remaining, namespace_flags def _normalize_resource(value: str) -> str: return _RESOURCE_ALIASES.get(value.lower(), value.lower()) def _split_resource_ref(tokens: list[str]) -> tuple[str | None, str | None, list[str]]: if not tokens: return None, None, [] first = tokens[0] if "/" in first: resource_type, resource_name = first.split("/", 1) return _normalize_resource(resource_type), resource_name, tokens[1:] resource_type = _normalize_resource(first) if len(tokens) >= 2 and not tokens[1].startswith("-"): return resource_type, tokens[1], tokens[2:] return resource_type, None, tokens[1:] def _parse_readonly( verb: str, tokens: list[str], namespace: str | None, namespace_flags: list[str], ) -> ParsedKubectlAction: if verb == "version": if tokens: return _reject("version_disallows_args") return ParsedKubectlAction( ok=True, reason="ok", kind=ActionKind.READONLY, verb=verb, namespace=namespace, flags=tuple(namespace_flags), ) resource_type, resource_name, rest = _split_resource_ref(tokens) if not resource_type: return _reject("missing_readonly_resource") allowed_flags = { "--all-namespaces", "--no-headers", "--output", "-o", "--previous", "--show-labels", "--since", "--since-time", "--tail", "--timestamps", "--selector", "-l", "--container", "-c", "--watch", "-w", } if not _flags_allowed(rest, allowed_flags): return _reject("unsupported_readonly_flag") return ParsedKubectlAction( ok=True, reason="ok", kind=ActionKind.READONLY, verb=verb, resource_type=resource_type, resource_name=resource_name, namespace=namespace, flags=tuple(namespace_flags + rest), ) def _parse_rollout( tokens: list[str], namespace: str | None, namespace_flags: list[str], ) -> ParsedKubectlAction: if len(tokens) < 2: return _reject("rollout_missing_args") subverb = tokens[0] if subverb != "restart": return _reject("unsupported_rollout_subverb") resource_type, resource_name, rest = _split_resource_ref(tokens[1:]) if resource_type not in _ROLLING_RESOURCES or not resource_name: return _reject("invalid_rollout_resource") if rest: return _reject("unsupported_rollout_flag") return ParsedKubectlAction( ok=True, reason="ok", kind=ActionKind.ROLLOUT, verb="rollout", subverb=subverb, resource_type=resource_type, resource_name=resource_name, namespace=namespace, flags=tuple(namespace_flags), ) def _parse_scale( tokens: list[str], namespace: str | None, namespace_flags: list[str], ) -> ParsedKubectlAction: resource_type, resource_name, rest = _split_resource_ref(tokens) if resource_type not in _SCALABLE_RESOURCES or not resource_name: return _reject("invalid_scale_resource") replicas: int | None = None remaining_flags: list[str] = [] i = 0 while i < len(rest): token = rest[i] if token == "--replicas": if i + 1 >= len(rest): return _reject("replicas_missing_value") replicas = _parse_positive_int(rest[i + 1]) remaining_flags.extend([token, rest[i + 1]]) i += 2 continue if token.startswith("--replicas="): replicas = _parse_positive_int(token.split("=", 1)[1]) remaining_flags.append(token) i += 1 continue return _reject("unsupported_scale_flag") if replicas is None: return _reject("replicas_required") if replicas < 1: return _reject("replicas_must_be_positive") return ParsedKubectlAction( ok=True, reason="ok", kind=ActionKind.SCALE, verb="scale", resource_type=resource_type, resource_name=resource_name, namespace=namespace, flags=tuple(namespace_flags + remaining_flags), ) def _parse_autoscale( tokens: list[str], namespace: str | None, namespace_flags: list[str], ) -> ParsedKubectlAction: resource_type, resource_name, rest = _split_resource_ref(tokens) if resource_type not in _SCALABLE_RESOURCES or not resource_name: return _reject("invalid_autoscale_resource") min_replicas: int | None = None max_replicas: int | None = None cpu_percent: int | None = None remaining_flags: list[str] = [] i = 0 while i < len(rest): token = rest[i] flag, raw_value, consumed = _consume_required_flag_value( rest, i, {"--min", "--max", "--cpu-percent"}, ) if not flag or raw_value is None: return _reject("unsupported_autoscale_flag") value = _parse_positive_int(raw_value) if value < 1: return _reject("autoscale_value_must_be_positive") if flag == "--min": min_replicas = value elif flag == "--max": max_replicas = value elif flag == "--cpu-percent": cpu_percent = value remaining_flags.extend(rest[i:i + consumed]) i += consumed if min_replicas is None or max_replicas is None: return _reject("autoscale_min_max_required") if max_replicas < min_replicas: return _reject("autoscale_max_below_min") if cpu_percent is not None and cpu_percent > 100: return _reject("autoscale_cpu_percent_out_of_range") return ParsedKubectlAction( ok=True, reason="ok", kind=ActionKind.AUTOSCALE, verb="autoscale", resource_type=resource_type, resource_name=resource_name, namespace=namespace, flags=tuple(namespace_flags + remaining_flags), ) def _parse_set( tokens: list[str], namespace: str | None, namespace_flags: list[str], ) -> ParsedKubectlAction: if not tokens or tokens[0] != "resources": return _reject("unsupported_set_subverb") resource_type, resource_name, rest = _split_resource_ref(tokens[1:]) if resource_type not in _SCALABLE_RESOURCES or not resource_name: return _reject("invalid_set_resources_target") saw_resource_flag = False remaining_flags: list[str] = [] i = 0 while i < len(rest): flag, raw_value, consumed = _consume_required_flag_value( rest, i, {"--limits", "--requests"}, ) if not flag or raw_value is None: return _reject("unsupported_set_resources_flag") if not _resource_quantity_assignments_safe(raw_value): return _reject("invalid_resource_quantity") saw_resource_flag = True remaining_flags.extend(rest[i:i + consumed]) i += consumed if not saw_resource_flag: return _reject("set_resources_requires_limits_or_requests") return ParsedKubectlAction( ok=True, reason="ok", kind=ActionKind.SET_RESOURCES, verb="set", subverb="resources", resource_type=resource_type, resource_name=resource_name, namespace=namespace, flags=tuple(namespace_flags + remaining_flags), ) def _parse_delete( tokens: list[str], namespace: str | None, namespace_flags: list[str], ) -> ParsedKubectlAction: resource_type, resource_name, rest = _split_resource_ref(tokens) if resource_type != "pod" or not resource_name: return _reject("delete_only_allows_single_pod") if rest: return _reject("unsupported_delete_flag") if resource_name in {"--all", "all"}: return _reject("delete_all_disallowed") return ParsedKubectlAction( ok=True, reason="ok", kind=ActionKind.DELETE_POD, verb="delete", resource_type=resource_type, resource_name=resource_name, namespace=namespace, flags=tuple(namespace_flags), ) def _parse_positive_int(value: str) -> int: if not value.isdigit(): return -1 return int(value) def _consume_required_flag_value( tokens: list[str], index: int, allowed_flags: set[str], ) -> tuple[str | None, str | None, int]: token = tokens[index] if "=" in token: flag, value = token.split("=", 1) if flag not in allowed_flags or not value: return None, None, 1 return flag, value, 1 if token not in allowed_flags or index + 1 >= len(tokens): return None, None, 1 value = tokens[index + 1] if not value or value.startswith("-"): return None, None, 1 return token, value, 2 def _resource_quantity_assignments_safe(value: str) -> bool: parts = value.split(",") if not parts: return False for part in parts: key, separator, quantity = part.partition("=") if separator != "=": return False if key not in {"cpu", "memory"}: return False if not quantity or not set(quantity) <= _SAFE_TOKEN_CHARS: return False if quantity in {"0", "0m", "0Mi", "0Gi"}: return False return True def _flags_allowed(tokens: list[str], allowed_flags: set[str]) -> bool: i = 0 while i < len(tokens): token = tokens[i] if not token.startswith("-"): return False flag = token.split("=", 1)[0] if flag not in allowed_flags: return False if "=" in token: i += 1 continue requires_value = flag in { "--output", "-o", "--since", "--since-time", "--tail", "--selector", "-l", "--container", "-c", } if requires_value: if i + 1 >= len(tokens) or tokens[i + 1].startswith("-"): return False i += 2 continue i += 1 return True