Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
CHANGES
=======

1.4.0b5
-------

- Fix overflow in Python server write by sending ACK messages for each chunk if
more than one chunk in the message. Moreover, reduce the chunk size to 2 KB
from 4 KB and remove the imp module from default modules of the terminal as
it is removed from Python 3.12.

1.4.0b4
-------

Expand Down
2 changes: 1 addition & 1 deletion src/crl/interactivesessions/_version.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__copyright__ = 'Copyright (C) 2019-2023, Nokia'

VERSION = '1.4.0b4'
VERSION = '1.4.0b5'
GITHASH = ''


Expand Down
2 changes: 1 addition & 1 deletion src/crl/interactivesessions/runnerterminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class RunnerTerminal(object):
args='{response_id}, timeout={timeout}')

# list of libraries to be imported on the remote end during setup phase
_IMPORTS = ['pickle', 'imp', 'base64', 'os']
_IMPORTS = ['pickle', 'base64', 'os']

# OPTIONS FOR CONFIGURING SUBCLASS BEHAVIOR
# path to retrieve the remote handler module from
Expand Down
81 changes: 68 additions & 13 deletions src/crl/interactivesessions/shells/remotemodules/chunkcomm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
__copyright__ = 'Copyright (C) 2019, Nokia'

CHILD_MODULES = [commbase, tokenreader, compatibility]
CHUNKSIZE = 4096
CHUNKSIZE = 2000
MAX_BUFFER_SIZE = 100 * CHUNKSIZE
LOGGER = logging.getLogger(__name__)

Expand All @@ -27,8 +27,17 @@ class ChunkReaderError(Exception):
class ChunkIOBase(object):

_token = b'^)}>?gDYs[ULFqAkSf~|'
_chunk_id_width = 4
_chunk_id_tmpl = '{{chunk_id:0{chunk_id_width}}}'.format(
chunk_id_width=_chunk_id_width)
_len_width = len(str(CHUNKSIZE))
_chunk_len_tmpl = '{{chunk_len:0{len_width}}}'.format(len_width=_len_width)
_hdr_tmpl = _chunk_id_tmpl + _chunk_len_tmpl
_hdr_len = _chunk_id_width + _len_width


class NoAck(object):
pass


class ChunkWriterBase(ChunkIOBase, commbase.CommWriterBase):
Expand All @@ -45,27 +54,48 @@ def _flush(self):

def write(self, s):
"""Write string or bytes *s* with *_write*"""
for i in compatibility.RANGE(0, len(s), CHUNKSIZE):
for chunk_id, i in enumerate(compatibility.RANGE(0, len(s), CHUNKSIZE)):
chunk_id = chunk_id % 10000
chunk = s[i:i + CHUNKSIZE]
self._write_with_size_and_token(chunk)
self._write_with_size_and_token(chunk, chunk_id=chunk_id)
self._flush()
if chunk_id:
self._read_and_verify_ack(chunk_id)

def _write_with_size_and_token(self, chunk):
def _write_with_size_and_token(self, chunk, chunk_id):
io = BytesIO()
for s in self._bytes_for_chunk_with_token(chunk):
for s in self._bytes_for_chunk_with_token(chunk, chunk_id=chunk_id):
io.write(s)

self._write(io.getvalue())

def _bytes_for_chunk_with_token(self, chunk):
def _bytes_for_chunk_with_token(self, chunk, chunk_id):
yield self._token
hdr = self._hdr_tmpl.format(chunk_id=chunk_id, chunk_len=len(chunk))
yield compatibility.to_bytes(hdr)
yield self._token
for s in self._bytes_for_chunk(chunk):
yield s
yield self._token

def _bytes_for_chunk(self, chunk):
yield compatibility.to_bytes(self._chunk_len_tmpl.format(chunk_len=len(chunk)))
yield chunk
yield self._token

def _read_and_verify_ack(self, chunk_id):
if not chunk_id:
return
chunk_id_str = self._read_ack()
if not isinstance(chunk_id_str, NoAck):
try:
ack_chunk_id = int(chunk_id_str)
if chunk_id != ack_chunk_id:
raise ChunkReaderError(
f'Expected chunk_id {chunk_id}, got {ack_chunk_id}')
except ValueError:
raise ChunkReaderError(
f'Unable to desirialize chunk_id {chunk_id_str}'.format(
chunk_id_str=chunk_id_str))

def _read_ack(self): # pylint: disable=no-self-use
"""Override this if ACK for each chunk is required and return chunk_id string.
"""
return NoAck()


class ChunkReaderBase(ChunkIOBase, commbase.CommReaderBase):
Expand All @@ -91,13 +121,21 @@ def _read(self, n):
def read_until_size(self, n):
while self._sharedio.readable_size < n:
self._read_token()
chunk_size_str = self._read_until_size(self._len_width)
hdr = self._read_until_size(self._hdr_len)
self._verify_read_token()
chunk_size_str = hdr[self._chunk_id_width:]
chunk_size = int(chunk_size_str)
self._sharedio.write(self._read_until_size(chunk_size))
self._verify_read_token()
chunk_id = hdr[:self._chunk_id_width]
if chunk_id != b'0000':
self._write_ack(chunk_id)
return self._sharedio.read(n)

def _write_ack(self, chunk_id):
"""Override this if ACK for each chunk is required.
"""

def _read_until_size(self, n):
sio = SharedBytesIO()
while sio.readable_size < n:
Expand All @@ -120,6 +158,23 @@ def _verify_read_token(self):
raise ChunkReaderError('Buffer: {!r}'.format(self._sharedio.getvalue()))


class ChunkAckBase(ChunkReaderBase, ChunkWriterBase):
def __init__(self):
ChunkReaderBase.__init__(self)
self._chunk_ack_reading = False

def _read_ack(self):
try:
self._chunk_ack_reading = True
return self._read_until_size(self._chunk_id_width)
finally:
self._chunk_ack_reading = False

def _write_ack(self, chunk_id):
self._write(chunk_id)
self._flush()


class SharedBytesIO(object):
def __init__(self):
self._io = BytesIO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
CHILD_MODULES = [chunkcomm, compatibility]


class ServerComm(chunkcomm.ChunkWriterBase, chunkcomm.ChunkReaderBase):
class ServerComm(chunkcomm.ChunkAckBase):

_sleep_in_broken_systems = 0.00005

def __init__(self, infd, outfile):
chunkcomm.ChunkReaderBase.__init__(self)
chunkcomm.ChunkAckBase.__init__(self)
self.infd = infd
self.outfile = outfile
self._msgcaches = None
Expand Down Expand Up @@ -50,7 +50,8 @@ def set_msgcaches(self, msgcaches):
def _read(self, n):
while True:
r, _, _ = select.select([self.infd], [], [], *self._msgcaches.timeout_args)
self._msgcaches.send_expired()
if not self._chunk_ack_reading:
self._msgcaches.send_expired()
if r:
try:
return self._read_sleep_if_needed(n)
Expand Down
10 changes: 3 additions & 7 deletions src/crl/interactivesessions/shells/terminalclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
from monotonic import monotonic
from crl.interactivesessions.shells.shell import TimeoutError
from .remotemodules.msgmanager import MsgManagerBase
from .remotemodules.chunkcomm import (
ChunkReaderBase,
ChunkWriterBase)
from .remotemodules.chunkcomm import ChunkAckBase
from .remotemodules.msgs import Ack


Expand Down Expand Up @@ -68,7 +66,6 @@ def send_and_receive(self, msg, timeout):
for received_msg in self._received_msgs_for_msg(msg):
if isinstance(received_msg, Ack):
return self._try_to_receive_until_reply(msg, timeout)

return received_msg

return self._final_try_to_receive_until_reply(msg, timeout)
Expand All @@ -78,7 +75,6 @@ def _received_msgs_for_msg(self, msg):
self.send(msg)
try:
yield self._receive_ack_or_reply(msg, t)

except (TerminalClientError, TimerTimeout) as e:
LOGGER.debug('No reply yet for message uid %s: %s', msg.uid, e)

Expand Down Expand Up @@ -156,9 +152,9 @@ def _client_exception_wrap(self):
raise


class TerminalComm(ChunkReaderBase, ChunkWriterBase):
class TerminalComm(ChunkAckBase):
def __init__(self, terminal):
ChunkReaderBase.__init__(self)
ChunkAckBase.__init__(self)
self._terminal = terminal
self._timeout = -1

Expand Down
8 changes: 4 additions & 4 deletions tests/shells/remotemodules/test_chunkcomm.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
__copyright__ = 'Copyright (C) 2019, Nokia'

EXPECTED_TOKEN_LEN = 20
EXPECTED_LEN_WIDTH = len(str(CHUNKSIZE))
EXPECTED_HDR_LEN = 8
LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -104,8 +104,8 @@ class DataRubbishWriter(MiddleWriterBase):

"""
def _get_rubbish_start(self, s):
data_len = len(s) - 3 * EXPECTED_TOKEN_LEN - EXPECTED_LEN_WIDTH
return 2 * EXPECTED_TOKEN_LEN + EXPECTED_LEN_WIDTH + int(
data_len = len(s) - 3 * EXPECTED_TOKEN_LEN - EXPECTED_HDR_LEN
return 2 * EXPECTED_TOKEN_LEN + EXPECTED_HDR_LEN + int(
data_len * self._clean_portion)


Expand All @@ -116,7 +116,7 @@ class LenRubbishWriter(MiddleWriterBase):
{token}{data_len_start}{rubbish}{data_len_end}{token}{data}{token}
"""
def _get_rubbish_start(self, s): # pylint: disable=unused-argument
return EXPECTED_TOKEN_LEN + int(EXPECTED_LEN_WIDTH * self._clean_portion)
return EXPECTED_TOKEN_LEN + int(EXPECTED_HDR_LEN * self._clean_portion)


class ExampleChunkReader(ChunkReaderBase):
Expand Down
8 changes: 5 additions & 3 deletions tests/shells/test_pythonshell.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,15 @@ def test_start_rawpythonshell(mockrawpythonshell):
mockrawpythonshell.start()

first_read = mock.call(timeout=30, suppress_timeout=False)
set_echo_off_read = mock.call(timeout=10, suppress_timeout=False)
set_echo_off_reads = [mock.call(timeout=10, suppress_timeout=False)
for _ in range(2)]

expected_reads = [first_read, set_echo_off_read]
expected_reads = [first_read] + set_echo_off_reads
print(mockrawpythonshell.mock_read_until_prompt.mock_calls)
assert mockrawpythonshell.mock_read_until_prompt.mock_calls == expected_reads

assert mockrawpythonshell.mock_terminal.sendline.mock_calls == [
mock.call(cmd) for cmd in mockrawpythonshell.setup_cmds]
mock.call(cmd) for cmd in mockrawpythonshell.setup_cmds + ["'ready'"]]


def test_start_rawpythonshell_raises(mockrawpythonshell):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_runnerterminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def test_initialize_import_libraries(runnerterminal,
cs = mock_session.get_session.return_value.current_shell.return_value

assert cs.exec_command.mock_calls[0] == mock.call(
'import pickle, imp, base64, os', timeout=-1)
'import pickle, base64, os', timeout=-1)


def test_close_session(runnerterminal,
Expand Down
Loading