diff --git a/drdroid_debug_toolkit/core/integrations/source_api_processors/kubectl_api_processor.py b/drdroid_debug_toolkit/core/integrations/source_api_processors/kubectl_api_processor.py index 56abf2c..b7b309c 100644 --- a/drdroid_debug_toolkit/core/integrations/source_api_processors/kubectl_api_processor.py +++ b/drdroid_debug_toolkit/core/integrations/source_api_processors/kubectl_api_processor.py @@ -1,6 +1,7 @@ import base64 import json import logging +import re import subprocess import tempfile @@ -11,6 +12,26 @@ logger = logging.getLogger(__name__) +# Read-only kubectl subcommands that do not require approval +SAFE_READONLY_COMMANDS = { + 'get', 'describe', 'logs', 'top', 'api-resources', 'api-versions', + 'explain', 'version', 'cluster-info', 'config', 'auth', 'diff' +} + +# Mutating verbs that require approval +MUTATING_VERBS = { + 'apply', 'create', 'delete', 'patch', 'replace', 'scale', + 'annotate', 'label', 'taint', 'drain', 'cordon', 'uncordon', 'edit', + 'set', 'autoscale', 'expose', 'run', 'attach', 'exec', 'port-forward', + 'proxy', 'cp', 'wait' +} + +# Safe pipe commands that don't require approval +SAFE_PIPE_COMMANDS = {'head', 'tail', 'grep', 'awk', 'sed', 'cut', 'sort', 'uniq', 'wc', 'less', 'more'} + +# Dangerous shell operators that should be blocked +DANGEROUS_OPERATORS = [';', '&&', '||', '`', '$(', '>', '>>', '<', '&'] + class KubectlApiProcessor(Processor): client = None @@ -32,10 +53,99 @@ def __init__(self, api_server=None, token=None, ssl_ca_cert=None, ssl_ca_cert_pa fp.close() self.__ca_cert = ca_filename + @staticmethod + def strip_kubectl_prefix(command): + """ + Strip only the leading 'kubectl' token from a command, not all occurrences. + This preserves subcommands like 'kubectl api-resources' correctly. + + Args: + command: Command string potentially starting with 'kubectl' + + Returns: + Command string with leading 'kubectl' removed + """ + # Strip leading/trailing whitespace + command = command.strip() + # Remove only leading 'kubectl' followed by whitespace using regex + command = re.sub(r'^\s*kubectl\s+', '', command) + return command + + @staticmethod + def check_dangerous_operators(command): + """ + Check if command contains dangerous shell operators. + + Args: + command: Command string to check + + Returns: + True if dangerous operators found, False otherwise + """ + for op in DANGEROUS_OPERATORS: + if op in command: + return True + return False + + @staticmethod + def is_safe_pipe_command(pipe_cmd): + """ + Check if a piped command is in the safe list. + + Args: + pipe_cmd: Piped command string (e.g., "head -n 5") + + Returns: + True if the command is safe, False otherwise + """ + cmd_parts = pipe_cmd.strip().split() + if not cmd_parts: + return False + base_cmd = cmd_parts[0] + return base_cmd in SAFE_PIPE_COMMANDS + + @staticmethod + def requires_approval(command): + """ + Determine if a kubectl command requires approval based on whether it's mutating. + Read-only commands do not require approval. + + Args: + command: Kubectl command string (without 'kubectl' prefix) + + Returns: + True if approval is required, False otherwise + """ + # Strip the command and split into parts + cmd_parts = command.strip().split() + if not cmd_parts: + return False + + # Get the first verb/subcommand + verb = cmd_parts[0].lower() + + # Check if it's a mutating verb + if verb in MUTATING_VERBS: + return True + + # Special handling for 'rollout' - check the subcommand + if verb == 'rollout' and len(cmd_parts) > 1: + rollout_action = cmd_parts[1].lower() + # 'rollout restart' is mutating, but 'rollout status', 'rollout history' are read-only + if rollout_action in ['restart', 'undo', 'pause', 'resume']: + return True + + # Special handling for 'annotate' and 'label' - check for --overwrite flag + if verb in ['annotate', 'label']: + if '--overwrite' in command: + return True + + # All other commands are considered read-only + return False + def test_connection(self): command = "kubectl version --output=json" - if 'kubectl' in command: - command = command.replace('kubectl', '') + command = self.strip_kubectl_prefix(command) if self.native_connection_mode: kubectl_command = ["kubectl"] + command.split() elif self.__ca_cert: @@ -70,38 +180,79 @@ def test_connection(self): logger.error(f"Exception occurred while executing kubectl command with error: {e}") raise e - def execute_command(self, command): + def execute_command(self, command, require_approval_check=True): + """ + Execute a kubectl command with optional pipe support. + + Args: + command: The kubectl command to execute + require_approval_check: If True, check if command requires approval (default: True) + + Returns: + Command output or error message + + Raises: + ValueError: If command contains dangerous operators or requires approval + Exception: If command execution fails + """ command = command.strip() - if 'kubectl' in command: - command = command.replace('kubectl', '') + + # Check for dangerous shell operators (except pipe which we handle specially) + cmd_without_pipes = command.split('|')[0] + if self.check_dangerous_operators(cmd_without_pipes): + raise ValueError(f"Command contains dangerous shell operators and cannot be executed: {command}") + + # Strip the kubectl prefix + command = self.strip_kubectl_prefix(command) + + # Parse pipes if '|' in command: commands = [cmd.strip() for cmd in command.split('|')] + kubectl_cmd = commands[0] + pipe_cmds = commands[1:] + + # Validate pipe commands are safe + for pipe_cmd in pipe_cmds: + if not self.is_safe_pipe_command(pipe_cmd): + raise ValueError(f"Unsafe pipe command detected: {pipe_cmd}. Only safe commands like head, tail, grep are allowed.") else: - commands = [command] + kubectl_cmd = command + pipe_cmds = [] + + # Check if approval is required for the kubectl command + if require_approval_check and self.requires_approval(kubectl_cmd): + raise ValueError(f"Command requires approval: kubectl {kubectl_cmd}. This is a mutating operation.") + + # Build the kubectl command if self.native_connection_mode: - kubectl_command = ["kubectl"] + commands[0].split() + kubectl_command = ["kubectl"] + kubectl_cmd.split() elif self.__ca_cert: kubectl_command = [ "kubectl", f"--server={self.__api_server}", f"--token={self.__token}", f"--certificate-authority={self.__ca_cert}" - ] + commands[0].split() + ] + kubectl_cmd.split() else: kubectl_command = [ "kubectl", f"--server={self.__api_server}", f"--token={self.__token}", f"--insecure-skip-tls-verify=true" - ] + commands[0].split() + ] + kubectl_cmd.split() + try: + # Execute the kubectl command process = subprocess.Popen(kubectl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) stdout, stderr = process.communicate() - if len(commands) > 1: - for cmd in commands[1:]: + + # Execute pipe commands if present + if pipe_cmds: + for cmd in pipe_cmds: process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, shell=True) stdout, stderr = process.communicate(input=stdout) + if process.returncode == 0: print("Command Output:", stdout) return stdout diff --git a/drdroid_debug_toolkit/core/integrations/source_api_processors/test_kubectl_api_processor.py b/drdroid_debug_toolkit/core/integrations/source_api_processors/test_kubectl_api_processor.py new file mode 100644 index 0000000..1559f60 --- /dev/null +++ b/drdroid_debug_toolkit/core/integrations/source_api_processors/test_kubectl_api_processor.py @@ -0,0 +1,246 @@ +import sys +import unittest +from unittest.mock import Mock, patch, MagicMock + +# Mock Django modules before importing +django_mock = MagicMock() +settings_mock = MagicMock() +settings_mock.NATIVE_KUBERNETES_API_MODE = True + +sys.modules['django'] = django_mock +sys.modules['django.conf'] = MagicMock() +sys.modules['django.conf'].settings = settings_mock + +# Mock core modules +processor_mock = MagicMock() +processor_mock.Processor = object # Use object as base class +sys.modules['core'] = MagicMock() +sys.modules['core.integrations'] = MagicMock() +sys.modules['core.integrations.processor'] = processor_mock +sys.modules['core.settings'] = MagicMock() +sys.modules['core.settings'].EXTERNAL_CALL_TIMEOUT = 30 + +from kubectl_api_processor import KubectlApiProcessor + + +class TestKubectlApiProcessor(unittest.TestCase): + """Unit tests for KubectlApiProcessor command parsing and approval logic.""" + + def setUp(self): + """Set up test fixtures.""" + # Create a processor instance with native mode + self.processor = KubectlApiProcessor() + + def test_strip_kubectl_prefix_basic(self): + """Test stripping leading kubectl from commands.""" + result = KubectlApiProcessor.strip_kubectl_prefix("kubectl get pods") + self.assertEqual(result, "get pods") + + def test_strip_kubectl_prefix_api_resources(self): + """Test that api-resources subcommand is preserved correctly.""" + result = KubectlApiProcessor.strip_kubectl_prefix("kubectl api-resources") + self.assertEqual(result, "api-resources") + + def test_strip_kubectl_prefix_no_kubectl(self): + """Test command without kubectl prefix.""" + result = KubectlApiProcessor.strip_kubectl_prefix("get pods") + self.assertEqual(result, "get pods") + + def test_strip_kubectl_prefix_with_whitespace(self): + """Test stripping kubectl with extra whitespace.""" + result = KubectlApiProcessor.strip_kubectl_prefix(" kubectl get pods ") + self.assertEqual(result, "get pods") + + def test_check_dangerous_operators_semicolon(self): + """Test detection of dangerous semicolon operator.""" + result = KubectlApiProcessor.check_dangerous_operators("get pods; rm -rf /") + self.assertTrue(result) + + def test_check_dangerous_operators_ampersand(self): + """Test detection of dangerous && operator.""" + result = KubectlApiProcessor.check_dangerous_operators("get pods && delete pod foo") + self.assertTrue(result) + + def test_check_dangerous_operators_redirect(self): + """Test detection of dangerous redirect operators.""" + self.assertTrue(KubectlApiProcessor.check_dangerous_operators("get pods > /tmp/pods.txt")) + self.assertTrue(KubectlApiProcessor.check_dangerous_operators("get pods >> /tmp/pods.txt")) + self.assertTrue(KubectlApiProcessor.check_dangerous_operators("cat < /tmp/input.txt")) + + def test_check_dangerous_operators_command_substitution(self): + """Test detection of command substitution.""" + self.assertTrue(KubectlApiProcessor.check_dangerous_operators("get pods $(whoami)")) + self.assertTrue(KubectlApiProcessor.check_dangerous_operators("get pods `whoami`")) + + def test_check_dangerous_operators_safe_command(self): + """Test that safe commands are not flagged.""" + result = KubectlApiProcessor.check_dangerous_operators("get pods -n kube-system") + self.assertFalse(result) + + def test_is_safe_pipe_command_head(self): + """Test that head is recognized as safe.""" + result = KubectlApiProcessor.is_safe_pipe_command("head -n 5") + self.assertTrue(result) + + def test_is_safe_pipe_command_grep(self): + """Test that grep is recognized as safe.""" + result = KubectlApiProcessor.is_safe_pipe_command("grep Running") + self.assertTrue(result) + + def test_is_safe_pipe_command_unsafe(self): + """Test that unsafe commands are rejected.""" + result = KubectlApiProcessor.is_safe_pipe_command("bash -c 'rm -rf /'") + self.assertFalse(result) + + def test_requires_approval_get_command(self): + """Test that get command does not require approval.""" + result = KubectlApiProcessor.requires_approval("get pods -A") + self.assertFalse(result) + + def test_requires_approval_describe_command(self): + """Test that describe command does not require approval.""" + result = KubectlApiProcessor.requires_approval("describe pod foo") + self.assertFalse(result) + + def test_requires_approval_api_resources(self): + """Test that api-resources does not require approval.""" + result = KubectlApiProcessor.requires_approval("api-resources") + self.assertFalse(result) + + def test_requires_approval_logs_command(self): + """Test that logs command does not require approval.""" + result = KubectlApiProcessor.requires_approval("logs my-pod -n default") + self.assertFalse(result) + + def test_requires_approval_apply_command(self): + """Test that apply command requires approval.""" + result = KubectlApiProcessor.requires_approval("apply -f deployment.yaml") + self.assertTrue(result) + + def test_requires_approval_delete_command(self): + """Test that delete command requires approval.""" + result = KubectlApiProcessor.requires_approval("delete pod my-pod") + self.assertTrue(result) + + def test_requires_approval_scale_command(self): + """Test that scale command requires approval.""" + result = KubectlApiProcessor.requires_approval("scale deployment my-dep --replicas=3") + self.assertTrue(result) + + def test_requires_approval_rollout_restart(self): + """Test that rollout restart requires approval.""" + result = KubectlApiProcessor.requires_approval("rollout restart deployment/my-dep") + self.assertTrue(result) + + def test_requires_approval_rollout_status(self): + """Test that rollout status does not require approval.""" + result = KubectlApiProcessor.requires_approval("rollout status deployment/my-dep") + self.assertFalse(result) + + def test_requires_approval_rollout_history(self): + """Test that rollout history does not require approval.""" + result = KubectlApiProcessor.requires_approval("rollout history deployment/my-dep") + self.assertFalse(result) + + def test_requires_approval_get_rollout(self): + """Test that get rollout does not require approval.""" + result = KubectlApiProcessor.requires_approval("get rollout foo -n bar -o yaml") + self.assertFalse(result) + + def test_requires_approval_get_rollouts_crd(self): + """Test that get rollouts.argoproj.io does not require approval.""" + result = KubectlApiProcessor.requires_approval("get rollouts.argoproj.io my-rollout -n default") + self.assertFalse(result) + + @patch('kubectl_api_processor.subprocess.Popen') + def test_execute_command_api_resources_success(self, mock_popen): + """Test that api-resources command executes without rewriting.""" + # Mock successful execution + mock_process = Mock() + mock_process.returncode = 0 + mock_process.communicate.return_value = ("NAME\npods", "") + mock_popen.return_value = mock_process + + result = self.processor.execute_command("kubectl api-resources", require_approval_check=False) + + # Verify the command was called correctly + mock_popen.assert_called_once() + call_args = mock_popen.call_args[0][0] + self.assertEqual(call_args[0], "kubectl") + self.assertEqual(call_args[1], "api-resources") + self.assertIn("NAME\npods", result) + + @patch('kubectl_api_processor.subprocess.Popen') + def test_execute_command_get_pods_with_pipe(self, mock_popen): + """Test that get pods with pipe to head is allowed without approval.""" + # Mock kubectl execution + mock_kubectl_process = Mock() + mock_kubectl_process.returncode = 0 + mock_kubectl_process.communicate.return_value = ("pod1\npod2\npod3\npod4\npod5\npod6", "") + + # Mock head execution + mock_head_process = Mock() + mock_head_process.returncode = 0 + mock_head_process.communicate.return_value = ("pod1\npod2\npod3\npod4\npod5", "") + + mock_popen.side_effect = [mock_kubectl_process, mock_head_process] + + result = self.processor.execute_command("kubectl get pods -A | head -n 5", require_approval_check=True) + + # Verify approval was not required and command executed + self.assertEqual(mock_popen.call_count, 2) + self.assertIn("pod1", result) + + def test_execute_command_apply_requires_approval(self): + """Test that apply command raises error requiring approval.""" + with self.assertRaises(ValueError) as context: + self.processor.execute_command("kubectl apply -f deployment.yaml", require_approval_check=True) + + self.assertIn("requires approval", str(context.exception)) + + def test_execute_command_get_rollout_no_approval(self): + """Test that get rollout with pipe does not require approval.""" + with patch('kubectl_api_processor.subprocess.Popen') as mock_popen: + # Mock kubectl execution + mock_kubectl_process = Mock() + mock_kubectl_process.returncode = 0 + mock_kubectl_process.communicate.return_value = ("rollout data here", "") + + # Mock head execution + mock_head_process = Mock() + mock_head_process.returncode = 0 + mock_head_process.communicate.return_value = ("rollout data here", "") + + mock_popen.side_effect = [mock_kubectl_process, mock_head_process] + + # This should not raise an error + result = self.processor.execute_command("kubectl get rollout foo -n bar -o yaml | head -n 40", + require_approval_check=True) + + # Verify command executed successfully + self.assertIn("rollout data", result) + + def test_execute_command_dangerous_operators_blocked(self): + """Test that commands with dangerous operators are blocked.""" + dangerous_commands = [ + "kubectl get pods; rm -rf /", + "kubectl get pods && kubectl delete pod foo", + "kubectl get pods > /tmp/out.txt", + "kubectl get pods $(whoami)", + ] + + for cmd in dangerous_commands: + with self.assertRaises(ValueError) as context: + self.processor.execute_command(cmd, require_approval_check=True) + self.assertIn("dangerous shell operators", str(context.exception)) + + def test_execute_command_unsafe_pipe_blocked(self): + """Test that unsafe pipe commands are blocked.""" + with self.assertRaises(ValueError) as context: + self.processor.execute_command("kubectl get pods | bash", require_approval_check=True) + + self.assertIn("Unsafe pipe command", str(context.exception)) + + +if __name__ == '__main__': + unittest.main()