From 14f4684d032dd8b82b946941918084751a315c87 Mon Sep 17 00:00:00 2001 From: Petri Huovinen Date: Wed, 23 Jul 2025 18:57:46 +0300 Subject: [PATCH] Fix overflow in Python server write Send ACK messages for each chunk if more than one chunk in the message. Moreover, reduce the chunk size to 2 KB from 4 KB and remove the imp module from default modules of the terminal as it is removed from Python 3.12. --- CHANGES.rst | 8 ++ src/crl/interactivesessions/_version.py | 2 +- src/crl/interactivesessions/runnerterminal.py | 2 +- .../shells/remotemodules/chunkcomm.py | 81 ++++++++++++++++--- .../shells/remotemodules/servercomm.py | 7 +- .../shells/terminalclient.py | 10 +-- tests/shells/remotemodules/test_chunkcomm.py | 8 +- tests/shells/test_pythonshell.py | 8 +- tests/test_runnerterminal.py | 2 +- 9 files changed, 95 insertions(+), 33 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index b41735b..49c4014 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -3,6 +3,14 @@ CHANGES ======= +1.4.0b5 +------- + +- Fix overflow in Python server write by sending ACK messages for each chunk if + more than one chunk in the message. Moreover, reduce the chunk size to 2 KB + from 4 KB and remove the imp module from default modules of the terminal as + it is removed from Python 3.12. + 1.4.0b4 ------- diff --git a/src/crl/interactivesessions/_version.py b/src/crl/interactivesessions/_version.py index d1a9819..3dda0ed 100644 --- a/src/crl/interactivesessions/_version.py +++ b/src/crl/interactivesessions/_version.py @@ -1,6 +1,6 @@ __copyright__ = 'Copyright (C) 2019-2023, Nokia' -VERSION = '1.4.0b4' +VERSION = '1.4.0b5' GITHASH = '' diff --git a/src/crl/interactivesessions/runnerterminal.py b/src/crl/interactivesessions/runnerterminal.py index 1de589b..874735b 100644 --- a/src/crl/interactivesessions/runnerterminal.py +++ b/src/crl/interactivesessions/runnerterminal.py @@ -132,7 +132,7 @@ class RunnerTerminal(object): args='{response_id}, timeout={timeout}') # list of libraries to be imported on the remote end during setup phase - _IMPORTS = ['pickle', 'imp', 'base64', 'os'] + _IMPORTS = ['pickle', 'base64', 'os'] # OPTIONS FOR CONFIGURING SUBCLASS BEHAVIOR # path to retrieve the remote handler module from diff --git a/src/crl/interactivesessions/shells/remotemodules/chunkcomm.py b/src/crl/interactivesessions/shells/remotemodules/chunkcomm.py index 3094c66..7c993bc 100644 --- a/src/crl/interactivesessions/shells/remotemodules/chunkcomm.py +++ b/src/crl/interactivesessions/shells/remotemodules/chunkcomm.py @@ -12,7 +12,7 @@ __copyright__ = 'Copyright (C) 2019, Nokia' CHILD_MODULES = [commbase, tokenreader, compatibility] -CHUNKSIZE = 4096 +CHUNKSIZE = 2000 MAX_BUFFER_SIZE = 100 * CHUNKSIZE LOGGER = logging.getLogger(__name__) @@ -27,8 +27,17 @@ class ChunkReaderError(Exception): class ChunkIOBase(object): _token = b'^)}>?gDYs[ULFqAkSf~|' + _chunk_id_width = 4 + _chunk_id_tmpl = '{{chunk_id:0{chunk_id_width}}}'.format( + chunk_id_width=_chunk_id_width) _len_width = len(str(CHUNKSIZE)) _chunk_len_tmpl = '{{chunk_len:0{len_width}}}'.format(len_width=_len_width) + _hdr_tmpl = _chunk_id_tmpl + _chunk_len_tmpl + _hdr_len = _chunk_id_width + _len_width + + +class NoAck(object): + pass class ChunkWriterBase(ChunkIOBase, commbase.CommWriterBase): @@ -45,27 +54,48 @@ def _flush(self): def write(self, s): """Write string or bytes *s* with *_write*""" - for i in compatibility.RANGE(0, len(s), CHUNKSIZE): + for chunk_id, i in enumerate(compatibility.RANGE(0, len(s), CHUNKSIZE)): + chunk_id = chunk_id % 10000 chunk = s[i:i + CHUNKSIZE] - self._write_with_size_and_token(chunk) + self._write_with_size_and_token(chunk, chunk_id=chunk_id) self._flush() + if chunk_id: + self._read_and_verify_ack(chunk_id) - def _write_with_size_and_token(self, chunk): + def _write_with_size_and_token(self, chunk, chunk_id): io = BytesIO() - for s in self._bytes_for_chunk_with_token(chunk): + for s in self._bytes_for_chunk_with_token(chunk, chunk_id=chunk_id): io.write(s) self._write(io.getvalue()) - def _bytes_for_chunk_with_token(self, chunk): + def _bytes_for_chunk_with_token(self, chunk, chunk_id): + yield self._token + hdr = self._hdr_tmpl.format(chunk_id=chunk_id, chunk_len=len(chunk)) + yield compatibility.to_bytes(hdr) yield self._token - for s in self._bytes_for_chunk(chunk): - yield s - yield self._token - - def _bytes_for_chunk(self, chunk): - yield compatibility.to_bytes(self._chunk_len_tmpl.format(chunk_len=len(chunk))) yield chunk + yield self._token + + def _read_and_verify_ack(self, chunk_id): + if not chunk_id: + return + chunk_id_str = self._read_ack() + if not isinstance(chunk_id_str, NoAck): + try: + ack_chunk_id = int(chunk_id_str) + if chunk_id != ack_chunk_id: + raise ChunkReaderError( + f'Expected chunk_id {chunk_id}, got {ack_chunk_id}') + except ValueError: + raise ChunkReaderError( + f'Unable to desirialize chunk_id {chunk_id_str}'.format( + chunk_id_str=chunk_id_str)) + + def _read_ack(self): # pylint: disable=no-self-use + """Override this if ACK for each chunk is required and return chunk_id string. + """ + return NoAck() class ChunkReaderBase(ChunkIOBase, commbase.CommReaderBase): @@ -91,13 +121,21 @@ def _read(self, n): def read_until_size(self, n): while self._sharedio.readable_size < n: self._read_token() - chunk_size_str = self._read_until_size(self._len_width) + hdr = self._read_until_size(self._hdr_len) self._verify_read_token() + chunk_size_str = hdr[self._chunk_id_width:] chunk_size = int(chunk_size_str) self._sharedio.write(self._read_until_size(chunk_size)) self._verify_read_token() + chunk_id = hdr[:self._chunk_id_width] + if chunk_id != b'0000': + self._write_ack(chunk_id) return self._sharedio.read(n) + def _write_ack(self, chunk_id): + """Override this if ACK for each chunk is required. + """ + def _read_until_size(self, n): sio = SharedBytesIO() while sio.readable_size < n: @@ -120,6 +158,23 @@ def _verify_read_token(self): raise ChunkReaderError('Buffer: {!r}'.format(self._sharedio.getvalue())) +class ChunkAckBase(ChunkReaderBase, ChunkWriterBase): + def __init__(self): + ChunkReaderBase.__init__(self) + self._chunk_ack_reading = False + + def _read_ack(self): + try: + self._chunk_ack_reading = True + return self._read_until_size(self._chunk_id_width) + finally: + self._chunk_ack_reading = False + + def _write_ack(self, chunk_id): + self._write(chunk_id) + self._flush() + + class SharedBytesIO(object): def __init__(self): self._io = BytesIO() diff --git a/src/crl/interactivesessions/shells/remotemodules/servercomm.py b/src/crl/interactivesessions/shells/remotemodules/servercomm.py index 1e8287f..f162725 100644 --- a/src/crl/interactivesessions/shells/remotemodules/servercomm.py +++ b/src/crl/interactivesessions/shells/remotemodules/servercomm.py @@ -15,12 +15,12 @@ CHILD_MODULES = [chunkcomm, compatibility] -class ServerComm(chunkcomm.ChunkWriterBase, chunkcomm.ChunkReaderBase): +class ServerComm(chunkcomm.ChunkAckBase): _sleep_in_broken_systems = 0.00005 def __init__(self, infd, outfile): - chunkcomm.ChunkReaderBase.__init__(self) + chunkcomm.ChunkAckBase.__init__(self) self.infd = infd self.outfile = outfile self._msgcaches = None @@ -50,7 +50,8 @@ def set_msgcaches(self, msgcaches): def _read(self, n): while True: r, _, _ = select.select([self.infd], [], [], *self._msgcaches.timeout_args) - self._msgcaches.send_expired() + if not self._chunk_ack_reading: + self._msgcaches.send_expired() if r: try: return self._read_sleep_if_needed(n) diff --git a/src/crl/interactivesessions/shells/terminalclient.py b/src/crl/interactivesessions/shells/terminalclient.py index 08a68fa..4056945 100644 --- a/src/crl/interactivesessions/shells/terminalclient.py +++ b/src/crl/interactivesessions/shells/terminalclient.py @@ -5,9 +5,7 @@ from monotonic import monotonic from crl.interactivesessions.shells.shell import TimeoutError from .remotemodules.msgmanager import MsgManagerBase -from .remotemodules.chunkcomm import ( - ChunkReaderBase, - ChunkWriterBase) +from .remotemodules.chunkcomm import ChunkAckBase from .remotemodules.msgs import Ack @@ -68,7 +66,6 @@ def send_and_receive(self, msg, timeout): for received_msg in self._received_msgs_for_msg(msg): if isinstance(received_msg, Ack): return self._try_to_receive_until_reply(msg, timeout) - return received_msg return self._final_try_to_receive_until_reply(msg, timeout) @@ -78,7 +75,6 @@ def _received_msgs_for_msg(self, msg): self.send(msg) try: yield self._receive_ack_or_reply(msg, t) - except (TerminalClientError, TimerTimeout) as e: LOGGER.debug('No reply yet for message uid %s: %s', msg.uid, e) @@ -156,9 +152,9 @@ def _client_exception_wrap(self): raise -class TerminalComm(ChunkReaderBase, ChunkWriterBase): +class TerminalComm(ChunkAckBase): def __init__(self, terminal): - ChunkReaderBase.__init__(self) + ChunkAckBase.__init__(self) self._terminal = terminal self._timeout = -1 diff --git a/tests/shells/remotemodules/test_chunkcomm.py b/tests/shells/remotemodules/test_chunkcomm.py index 693e06a..9631fcd 100644 --- a/tests/shells/remotemodules/test_chunkcomm.py +++ b/tests/shells/remotemodules/test_chunkcomm.py @@ -18,7 +18,7 @@ __copyright__ = 'Copyright (C) 2019, Nokia' EXPECTED_TOKEN_LEN = 20 -EXPECTED_LEN_WIDTH = len(str(CHUNKSIZE)) +EXPECTED_HDR_LEN = 8 LOGGER = logging.getLogger(__name__) @@ -104,8 +104,8 @@ class DataRubbishWriter(MiddleWriterBase): """ def _get_rubbish_start(self, s): - data_len = len(s) - 3 * EXPECTED_TOKEN_LEN - EXPECTED_LEN_WIDTH - return 2 * EXPECTED_TOKEN_LEN + EXPECTED_LEN_WIDTH + int( + data_len = len(s) - 3 * EXPECTED_TOKEN_LEN - EXPECTED_HDR_LEN + return 2 * EXPECTED_TOKEN_LEN + EXPECTED_HDR_LEN + int( data_len * self._clean_portion) @@ -116,7 +116,7 @@ class LenRubbishWriter(MiddleWriterBase): {token}{data_len_start}{rubbish}{data_len_end}{token}{data}{token} """ def _get_rubbish_start(self, s): # pylint: disable=unused-argument - return EXPECTED_TOKEN_LEN + int(EXPECTED_LEN_WIDTH * self._clean_portion) + return EXPECTED_TOKEN_LEN + int(EXPECTED_HDR_LEN * self._clean_portion) class ExampleChunkReader(ChunkReaderBase): diff --git a/tests/shells/test_pythonshell.py b/tests/shells/test_pythonshell.py index c683291..fe2e312 100644 --- a/tests/shells/test_pythonshell.py +++ b/tests/shells/test_pythonshell.py @@ -128,13 +128,15 @@ def test_start_rawpythonshell(mockrawpythonshell): mockrawpythonshell.start() first_read = mock.call(timeout=30, suppress_timeout=False) - set_echo_off_read = mock.call(timeout=10, suppress_timeout=False) + set_echo_off_reads = [mock.call(timeout=10, suppress_timeout=False) + for _ in range(2)] - expected_reads = [first_read, set_echo_off_read] + expected_reads = [first_read] + set_echo_off_reads + print(mockrawpythonshell.mock_read_until_prompt.mock_calls) assert mockrawpythonshell.mock_read_until_prompt.mock_calls == expected_reads assert mockrawpythonshell.mock_terminal.sendline.mock_calls == [ - mock.call(cmd) for cmd in mockrawpythonshell.setup_cmds] + mock.call(cmd) for cmd in mockrawpythonshell.setup_cmds + ["'ready'"]] def test_start_rawpythonshell_raises(mockrawpythonshell): diff --git a/tests/test_runnerterminal.py b/tests/test_runnerterminal.py index c01bff7..a873b69 100644 --- a/tests/test_runnerterminal.py +++ b/tests/test_runnerterminal.py @@ -132,7 +132,7 @@ def test_initialize_import_libraries(runnerterminal, cs = mock_session.get_session.return_value.current_shell.return_value assert cs.exec_command.mock_calls[0] == mock.call( - 'import pickle, imp, base64, os', timeout=-1) + 'import pickle, base64, os', timeout=-1) def test_close_session(runnerterminal,