fix(flywheel): unblock action safety and Claude fallback
All checks were successful
CD Pipeline / build-and-deploy (push) Successful in 9m45s

This commit is contained in:
Your Name
2026-04-29 21:50:57 +08:00
parent 4c91d89dd2
commit dccdcdbaf5
12 changed files with 467 additions and 40 deletions

View File

@@ -0,0 +1,373 @@
"""
Structured safety parser for remediation actions.
SPF-2 replaces the old single-regex kubectl whitelist with a small token parser.
The parser intentionally supports only the kubectl forms AWOOOI can safely
auto-execute; anything outside that grammar falls back to human review.
"""
from __future__ import annotations
import shlex
from dataclasses import dataclass, field
from enum import StrEnum
KUBECTL_MAX_LEN = 500
_FORBIDDEN_RAW_CHARS = frozenset("\n\r\t\f\v;&|<>`$")
_SAFE_TOKEN_CHARS = frozenset(
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789"
"-_./:=,@"
)
_RESOURCE_ALIASES: dict[str, str] = {
"deploy": "deployment",
"deployment": "deployment",
"deployments": "deployment",
"ds": "daemonset",
"daemonset": "daemonset",
"daemonsets": "daemonset",
"pod": "pod",
"pods": "pod",
"po": "pod",
"service": "service",
"services": "service",
"svc": "service",
"statefulset": "statefulset",
"statefulsets": "statefulset",
"sts": "statefulset",
"configmap": "configmap",
"configmaps": "configmap",
"cm": "configmap",
"node": "node",
"nodes": "node",
}
_ROLLING_RESOURCES = frozenset({"deployment", "statefulset", "daemonset"})
_SCALABLE_RESOURCES = frozenset({"deployment", "statefulset"})
_READONLY_VERBS = frozenset({"get", "describe", "logs", "top", "version"})
_MUTATING_VERBS = frozenset({"rollout", "scale", "delete"})
class ActionKind(StrEnum):
"""High-level parsed action kind."""
READONLY = "readonly"
ROLLOUT = "rollout"
SCALE = "scale"
DELETE_POD = "delete_pod"
@dataclass(frozen=True)
class ParsedKubectlAction:
"""Parsed kubectl action with safety decision."""
ok: bool
reason: str
kind: ActionKind | None = None
verb: str | None = None
subverb: str | None = None
resource_type: str | None = None
resource_name: str | None = None
namespace: str | None = None
flags: tuple[str, ...] = field(default_factory=tuple)
def is_safe_kubectl_action(command: str) -> bool:
"""Return True when the command is in the allowed kubectl action grammar."""
return parse_kubectl_action(command).ok
def parse_kubectl_action(command: str) -> ParsedKubectlAction:
"""Parse and validate a kubectl command for auto-execute safety.
The grammar is intentionally narrow:
- readonly: get/describe/logs/top/version with bounded, known-safe flags
- rollout: rollout restart/undo on workload resources
- scale: scale deployment/statefulset to a positive replica count
- delete: delete one pod by name only
"""
command = (command or "").strip()
if not command:
return _reject("empty")
if len(command) > KUBECTL_MAX_LEN:
return _reject("too_long")
if any(ch in command for ch in _FORBIDDEN_RAW_CHARS):
return _reject("forbidden_shell_metachar")
if any(ord(ch) < 32 or ord(ch) > 126 for ch in command):
return _reject("non_ascii_or_control")
try:
tokens = shlex.split(command, posix=True)
except ValueError:
return _reject("invalid_shell_syntax")
if not tokens or tokens[0] != "kubectl":
return _reject("not_kubectl")
if any(not token or not set(token) <= _SAFE_TOKEN_CHARS for token in tokens):
return _reject("invalid_token_chars")
body = tokens[1:]
namespace, body, namespace_flags = _consume_namespace_flags(body)
if not body:
return _reject("missing_verb")
verb = body[0]
rest = body[1:]
if verb in _READONLY_VERBS:
return _parse_readonly(verb, rest, namespace, namespace_flags)
if verb == "rollout":
return _parse_rollout(rest, namespace, namespace_flags)
if verb == "scale":
return _parse_scale(rest, namespace, namespace_flags)
if verb == "delete":
return _parse_delete(rest, namespace, namespace_flags)
return _reject("unsupported_verb")
def _reject(reason: str) -> ParsedKubectlAction:
return ParsedKubectlAction(ok=False, reason=reason)
def _consume_namespace_flags(tokens: list[str]) -> tuple[str | None, list[str], list[str]]:
namespace: str | None = None
remaining: list[str] = []
namespace_flags: list[str] = []
i = 0
while i < len(tokens):
token = tokens[i]
if token in {"-n", "--namespace"}:
if i + 1 >= len(tokens):
remaining.append(token)
i += 1
continue
namespace = tokens[i + 1]
namespace_flags.extend([token, tokens[i + 1]])
i += 2
continue
if token.startswith("--namespace="):
namespace = token.split("=", 1)[1]
namespace_flags.append(token)
i += 1
continue
remaining.append(token)
i += 1
return namespace, remaining, namespace_flags
def _normalize_resource(value: str) -> str:
return _RESOURCE_ALIASES.get(value.lower(), value.lower())
def _split_resource_ref(tokens: list[str]) -> tuple[str | None, str | None, list[str]]:
if not tokens:
return None, None, []
first = tokens[0]
if "/" in first:
resource_type, resource_name = first.split("/", 1)
return _normalize_resource(resource_type), resource_name, tokens[1:]
resource_type = _normalize_resource(first)
if len(tokens) >= 2 and not tokens[1].startswith("-"):
return resource_type, tokens[1], tokens[2:]
return resource_type, None, tokens[1:]
def _parse_readonly(
verb: str,
tokens: list[str],
namespace: str | None,
namespace_flags: list[str],
) -> ParsedKubectlAction:
if verb == "version":
if tokens:
return _reject("version_disallows_args")
return ParsedKubectlAction(
ok=True,
reason="ok",
kind=ActionKind.READONLY,
verb=verb,
namespace=namespace,
flags=tuple(namespace_flags),
)
resource_type, resource_name, rest = _split_resource_ref(tokens)
if not resource_type:
return _reject("missing_readonly_resource")
allowed_flags = {
"--all-namespaces",
"--no-headers",
"--output",
"-o",
"--previous",
"--show-labels",
"--since",
"--since-time",
"--tail",
"--timestamps",
"--selector",
"-l",
"--container",
"-c",
"--watch",
"-w",
}
if not _flags_allowed(rest, allowed_flags):
return _reject("unsupported_readonly_flag")
return ParsedKubectlAction(
ok=True,
reason="ok",
kind=ActionKind.READONLY,
verb=verb,
resource_type=resource_type,
resource_name=resource_name,
namespace=namespace,
flags=tuple(namespace_flags + rest),
)
def _parse_rollout(
tokens: list[str],
namespace: str | None,
namespace_flags: list[str],
) -> ParsedKubectlAction:
if len(tokens) < 2:
return _reject("rollout_missing_args")
subverb = tokens[0]
if subverb not in {"restart", "undo"}:
return _reject("unsupported_rollout_subverb")
resource_type, resource_name, rest = _split_resource_ref(tokens[1:])
if resource_type not in _ROLLING_RESOURCES or not resource_name:
return _reject("invalid_rollout_resource")
if rest:
return _reject("unsupported_rollout_flag")
return ParsedKubectlAction(
ok=True,
reason="ok",
kind=ActionKind.ROLLOUT,
verb="rollout",
subverb=subverb,
resource_type=resource_type,
resource_name=resource_name,
namespace=namespace,
flags=tuple(namespace_flags),
)
def _parse_scale(
tokens: list[str],
namespace: str | None,
namespace_flags: list[str],
) -> ParsedKubectlAction:
resource_type, resource_name, rest = _split_resource_ref(tokens)
if resource_type not in _SCALABLE_RESOURCES or not resource_name:
return _reject("invalid_scale_resource")
replicas: int | None = None
remaining_flags: list[str] = []
i = 0
while i < len(rest):
token = rest[i]
if token == "--replicas":
if i + 1 >= len(rest):
return _reject("replicas_missing_value")
replicas = _parse_positive_int(rest[i + 1])
remaining_flags.extend([token, rest[i + 1]])
i += 2
continue
if token.startswith("--replicas="):
replicas = _parse_positive_int(token.split("=", 1)[1])
remaining_flags.append(token)
i += 1
continue
return _reject("unsupported_scale_flag")
if replicas is None:
return _reject("replicas_required")
if replicas < 1:
return _reject("replicas_must_be_positive")
return ParsedKubectlAction(
ok=True,
reason="ok",
kind=ActionKind.SCALE,
verb="scale",
resource_type=resource_type,
resource_name=resource_name,
namespace=namespace,
flags=tuple(namespace_flags + remaining_flags),
)
def _parse_delete(
tokens: list[str],
namespace: str | None,
namespace_flags: list[str],
) -> ParsedKubectlAction:
resource_type, resource_name, rest = _split_resource_ref(tokens)
if resource_type != "pod" or not resource_name:
return _reject("delete_only_allows_single_pod")
if rest:
return _reject("unsupported_delete_flag")
if resource_name in {"--all", "all"}:
return _reject("delete_all_disallowed")
return ParsedKubectlAction(
ok=True,
reason="ok",
kind=ActionKind.DELETE_POD,
verb="delete",
resource_type=resource_type,
resource_name=resource_name,
namespace=namespace,
flags=tuple(namespace_flags),
)
def _parse_positive_int(value: str) -> int:
if not value.isdigit():
return -1
return int(value)
def _flags_allowed(tokens: list[str], allowed_flags: set[str]) -> bool:
i = 0
while i < len(tokens):
token = tokens[i]
if not token.startswith("-"):
return False
flag = token.split("=", 1)[0]
if flag not in allowed_flags:
return False
if "=" in token:
i += 1
continue
requires_value = flag in {
"--output",
"-o",
"--since",
"--since-time",
"--tail",
"--selector",
"-l",
"--container",
"-c",
}
if requires_value:
if i + 1 >= len(tokens) or tokens[i + 1].startswith("-"):
return False
i += 2
continue
i += 1
return True