Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import json
import logging
import re
import subprocess
import tempfile

Expand All @@ -11,6 +12,26 @@

logger = logging.getLogger(__name__)

# Read-only kubectl subcommands that do not require approval
SAFE_READONLY_COMMANDS = {
'get', 'describe', 'logs', 'top', 'api-resources', 'api-versions',
'explain', 'version', 'cluster-info', 'config', 'auth', 'diff'
}

# Mutating verbs that require approval
MUTATING_VERBS = {
'apply', 'create', 'delete', 'patch', 'replace', 'scale',
'annotate', 'label', 'taint', 'drain', 'cordon', 'uncordon', 'edit',
'set', 'autoscale', 'expose', 'run', 'attach', 'exec', 'port-forward',
'proxy', 'cp', 'wait'
}

# Safe pipe commands that don't require approval
SAFE_PIPE_COMMANDS = {'head', 'tail', 'grep', 'awk', 'sed', 'cut', 'sort', 'uniq', 'wc', 'less', 'more'}

# Dangerous shell operators that should be blocked
DANGEROUS_OPERATORS = [';', '&&', '||', '`', '$(', '>', '>>', '<', '&']


class KubectlApiProcessor(Processor):
client = None
Expand All @@ -32,10 +53,99 @@ def __init__(self, api_server=None, token=None, ssl_ca_cert=None, ssl_ca_cert_pa
fp.close()
self.__ca_cert = ca_filename

@staticmethod
def strip_kubectl_prefix(command):
"""
Strip only the leading 'kubectl' token from a command, not all occurrences.
This preserves subcommands like 'kubectl api-resources' correctly.

Args:
command: Command string potentially starting with 'kubectl'

Returns:
Command string with leading 'kubectl' removed
"""
# Strip leading/trailing whitespace
command = command.strip()
# Remove only leading 'kubectl' followed by whitespace using regex
command = re.sub(r'^\s*kubectl\s+', '', command)
return command

@staticmethod
def check_dangerous_operators(command):
"""
Check if command contains dangerous shell operators.

Args:
command: Command string to check

Returns:
True if dangerous operators found, False otherwise
"""
for op in DANGEROUS_OPERATORS:
if op in command:
return True
return False

@staticmethod
def is_safe_pipe_command(pipe_cmd):
"""
Check if a piped command is in the safe list.

Args:
pipe_cmd: Piped command string (e.g., "head -n 5")

Returns:
True if the command is safe, False otherwise
"""
cmd_parts = pipe_cmd.strip().split()
if not cmd_parts:
return False
base_cmd = cmd_parts[0]
return base_cmd in SAFE_PIPE_COMMANDS

@staticmethod
def requires_approval(command):
"""
Determine if a kubectl command requires approval based on whether it's mutating.
Read-only commands do not require approval.

Args:
command: Kubectl command string (without 'kubectl' prefix)

Returns:
True if approval is required, False otherwise
"""
# Strip the command and split into parts
cmd_parts = command.strip().split()
if not cmd_parts:
return False

# Get the first verb/subcommand
verb = cmd_parts[0].lower()

# Check if it's a mutating verb
if verb in MUTATING_VERBS:
return True

# Special handling for 'rollout' - check the subcommand
if verb == 'rollout' and len(cmd_parts) > 1:
rollout_action = cmd_parts[1].lower()
# 'rollout restart' is mutating, but 'rollout status', 'rollout history' are read-only
if rollout_action in ['restart', 'undo', 'pause', 'resume']:
return True

# Special handling for 'annotate' and 'label' - check for --overwrite flag
if verb in ['annotate', 'label']:
if '--overwrite' in command:
return True

# All other commands are considered read-only
return False

def test_connection(self):
command = "kubectl version --output=json"
if 'kubectl' in command:
command = command.replace('kubectl', '')
command = self.strip_kubectl_prefix(command)
if self.native_connection_mode:
kubectl_command = ["kubectl"] + command.split()
elif self.__ca_cert:
Expand Down Expand Up @@ -70,38 +180,79 @@ def test_connection(self):
logger.error(f"Exception occurred while executing kubectl command with error: {e}")
raise e

def execute_command(self, command):
def execute_command(self, command, require_approval_check=True):
"""
Execute a kubectl command with optional pipe support.

Args:
command: The kubectl command to execute
require_approval_check: If True, check if command requires approval (default: True)

Returns:
Command output or error message

Raises:
ValueError: If command contains dangerous operators or requires approval
Exception: If command execution fails
"""
command = command.strip()
if 'kubectl' in command:
command = command.replace('kubectl', '')

# Check for dangerous shell operators (except pipe which we handle specially)
cmd_without_pipes = command.split('|')[0]
if self.check_dangerous_operators(cmd_without_pipes):
raise ValueError(f"Command contains dangerous shell operators and cannot be executed: {command}")

# Strip the kubectl prefix
command = self.strip_kubectl_prefix(command)

# Parse pipes
if '|' in command:
commands = [cmd.strip() for cmd in command.split('|')]
kubectl_cmd = commands[0]
pipe_cmds = commands[1:]

# Validate pipe commands are safe
for pipe_cmd in pipe_cmds:
if not self.is_safe_pipe_command(pipe_cmd):
raise ValueError(f"Unsafe pipe command detected: {pipe_cmd}. Only safe commands like head, tail, grep are allowed.")
else:
commands = [command]
kubectl_cmd = command
pipe_cmds = []

# Check if approval is required for the kubectl command
if require_approval_check and self.requires_approval(kubectl_cmd):
raise ValueError(f"Command requires approval: kubectl {kubectl_cmd}. This is a mutating operation.")

# Build the kubectl command
if self.native_connection_mode:
kubectl_command = ["kubectl"] + commands[0].split()
kubectl_command = ["kubectl"] + kubectl_cmd.split()
elif self.__ca_cert:
kubectl_command = [
"kubectl",
f"--server={self.__api_server}",
f"--token={self.__token}",
f"--certificate-authority={self.__ca_cert}"
] + commands[0].split()
] + kubectl_cmd.split()
else:
kubectl_command = [
"kubectl",
f"--server={self.__api_server}",
f"--token={self.__token}",
f"--insecure-skip-tls-verify=true"
] + commands[0].split()
] + kubectl_cmd.split()

try:
# Execute the kubectl command
process = subprocess.Popen(kubectl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
stdout, stderr = process.communicate()
if len(commands) > 1:
for cmd in commands[1:]:

# Execute pipe commands if present
if pipe_cmds:
for cmd in pipe_cmds:
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True, shell=True)
stdout, stderr = process.communicate(input=stdout)

if process.returncode == 0:
print("Command Output:", stdout)
return stdout
Expand Down
Loading