From f39b87880169938e396b90fdd91e6db138db6e0c Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 19 Mar 2020 23:50:24 -0700 Subject: [PATCH 1/6] Prefix `for`-loop variables with `each_*` Should make it easier to disambiguate things like `frame` and `frames` as they are now `each_frame` and `frames`. --- distributed/comm/tcp.py | 16 ++++++++-------- distributed/comm/ucx.py | 20 ++++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 7003053ce06..99098effa0a 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -191,17 +191,17 @@ async def read(self, deserializers=None): lengths = struct.unpack("Q" * n_frames, lengths) frames = [] - for length in lengths: - if length: + for each_length in lengths: + if each_length: if self._iostream_has_read_into: - frame = bytearray(length) - n = await stream.read_into(frame) - assert n == length, (n, length) + each_frame = bytearray(each_length) + n = await stream.read_into(each_frame) + assert n == each_length, (n, each_length) else: - frame = await stream.read_bytes(length) + each_frame = await stream.read_bytes(each_length) else: - frame = b"" - frames.append(frame) + each_frame = b"" + frames.append(each_frame) except StreamClosedError as e: self.stream = None if not shutting_down(): diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 7295b11bb48..7be38d014bb 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -160,9 +160,9 @@ async def write( np.array([nbytes(f) for f in frames], dtype=np.uint64) ) # Send frames - for frame in frames: - if nbytes(frame) > 0: - await self.ep.send(frame) + for each_frame in frames: + if nbytes(each_frame) > 0: + await self.ep.send(each_frame) return sum(map(nbytes, frames)) except (ucp.exceptions.UCXBaseException): self.abort() @@ -190,17 +190,17 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): else: # Recv frames frames = [] - for is_cuda, size in zip(is_cudas.tolist(), sizes.tolist()): - if size > 0: + for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()): + if each_size > 0: if is_cuda: - frame = cuda_array(size) + each_frame = cuda_array(each_size) else: - frame = np.empty(size, dtype=np.uint8) - await self.ep.recv(frame) - frames.append(frame) + each_frame = np.empty(each_size, dtype=np.uint8) + await self.ep.recv(each_frame) + frames.append(each_frame) else: if is_cuda: - frames.append(cuda_array(size)) + frames.append(cuda_array(each_size)) else: frames.append(b"") msg = await from_frames( From e667879f4401a06698e7e1a895386e1321c1a636 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 19 Mar 2020 23:50:26 -0700 Subject: [PATCH 2/6] Allocate frames the same way in 0-length case --- distributed/comm/tcp.py | 2 +- distributed/comm/ucx.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 99098effa0a..053d012eb29 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -200,7 +200,7 @@ async def read(self, deserializers=None): else: each_frame = await stream.read_bytes(each_length) else: - each_frame = b"" + each_frame = bytearray(each_length) frames.append(each_frame) except StreamClosedError as e: self.stream = None diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 7be38d014bb..98226ca41d9 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -202,7 +202,7 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): if is_cuda: frames.append(cuda_array(each_size)) else: - frames.append(b"") + each_frame = np.empty(each_size, dtype=np.uint8) msg = await from_frames( frames, deserialize=self.deserialize, deserializers=deserializers ) From cab49912833d1724b767af85d40f26fb09312d1f Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 19 Mar 2020 23:50:27 -0700 Subject: [PATCH 3/6] Always use `.read_into(...)` with TCP As of Tornado 5.0+, this feature is included in Tornado. The `.read_bytes(...)` code path is a holdover from before Tornado got this feature. Not to mention it is more performant than the `.read_bytes(...)` code path. Given this, drop the `.read_bytes(...)` case and always use `.read_into(...)`. --- distributed/comm/tcp.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 053d012eb29..a18adac40e9 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -193,12 +193,9 @@ async def read(self, deserializers=None): frames = [] for each_length in lengths: if each_length: - if self._iostream_has_read_into: - each_frame = bytearray(each_length) - n = await stream.read_into(each_frame) - assert n == each_length, (n, each_length) - else: - each_frame = await stream.read_bytes(each_length) + each_frame = bytearray(each_length) + n = await stream.read_into(each_frame) + assert n == each_length, (n, each_length) else: each_frame = bytearray(each_length) frames.append(each_frame) From 11c4805d8091dc7e41cd76acdafcdc56c850b1e7 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 19 Mar 2020 23:50:28 -0700 Subject: [PATCH 4/6] Always allocate frames, receive non-trivial ones --- distributed/comm/tcp.py | 4 +--- distributed/comm/ucx.py | 15 +++++---------- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index a18adac40e9..6cea7a2d6a3 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -192,12 +192,10 @@ async def read(self, deserializers=None): frames = [] for each_length in lengths: + each_frame = bytearray(each_length) if each_length: - each_frame = bytearray(each_length) n = await stream.read_into(each_frame) assert n == each_length, (n, each_length) - else: - each_frame = bytearray(each_length) frames.append(each_frame) except StreamClosedError as e: self.stream = None diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 98226ca41d9..c45f01270d6 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -191,18 +191,13 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): # Recv frames frames = [] for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()): + if is_cuda: + each_frame = cuda_array(each_size) + else: + each_frame = np.empty(each_size, dtype=np.uint8) if each_size > 0: - if is_cuda: - each_frame = cuda_array(each_size) - else: - each_frame = np.empty(each_size, dtype=np.uint8) await self.ep.recv(each_frame) - frames.append(each_frame) - else: - if is_cuda: - frames.append(cuda_array(each_size)) - else: - each_frame = np.empty(each_size, dtype=np.uint8) + frames.append(each_frame) msg = await from_frames( frames, deserialize=self.deserialize, deserializers=deserializers ) From 4ba802fe5b605fd9c39662ed3978e2cfe2eb5d98 Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 19 Mar 2020 23:50:30 -0700 Subject: [PATCH 5/6] Allocate all frames to fill before receiving --- distributed/comm/tcp.py | 7 +++---- distributed/comm/ucx.py | 16 ++++++++-------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 6cea7a2d6a3..9bec238f0ed 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -190,13 +190,12 @@ async def read(self, deserializers=None): lengths = await stream.read_bytes(8 * n_frames) lengths = struct.unpack("Q" * n_frames, lengths) - frames = [] - for each_length in lengths: - each_frame = bytearray(each_length) + frames = [bytearray(each_length) for each_length in lengths] + for each_frame in frames: + each_length = len(each_frame) if each_length: n = await stream.read_into(each_frame) assert n == each_length, (n, each_length) - frames.append(each_frame) except StreamClosedError as e: self.stream = None if not shutting_down(): diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index c45f01270d6..27bcd476be4 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -189,15 +189,15 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): raise CommClosedError("While reading, the connection was closed") else: # Recv frames - frames = [] - for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()): - if is_cuda: - each_frame = cuda_array(each_size) - else: - each_frame = np.empty(each_size, dtype=np.uint8) - if each_size > 0: + frames = [ + cuda_array(each_size) + if is_cuda + else np.empty(each_size, dtype=np.uint8) + for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()) + ] + for each_frame in frames: + if len(each_frame) > 0: await self.ep.recv(each_frame) - frames.append(each_frame) msg = await from_frames( frames, deserialize=self.deserialize, deserializers=deserializers ) From 7b156b3771501ce2627bef1c64966fe94b29479a Mon Sep 17 00:00:00 2001 From: John Kirkham Date: Thu, 19 Mar 2020 23:50:31 -0700 Subject: [PATCH 6/6] Filter out non-trivial frames to transmit --- distributed/comm/tcp.py | 8 ++++---- distributed/comm/ucx.py | 19 ++++++++++++------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 9bec238f0ed..1c1a8fbb2ae 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -191,11 +191,11 @@ async def read(self, deserializers=None): lengths = struct.unpack("Q" * n_frames, lengths) frames = [bytearray(each_length) for each_length in lengths] - for each_frame in frames: + recv_frames = [each_frame for each_frame in frames if len(each_frame) > 0] + for each_frame in recv_frames: each_length = len(each_frame) - if each_length: - n = await stream.read_into(each_frame) - assert n == each_length, (n, each_length) + n = await stream.read_into(each_frame) + assert n == each_length, (n, each_length) except StreamClosedError as e: self.stream = None if not shutting_down(): diff --git a/distributed/comm/ucx.py b/distributed/comm/ucx.py index 27bcd476be4..01d8df47d59 100644 --- a/distributed/comm/ucx.py +++ b/distributed/comm/ucx.py @@ -147,6 +147,9 @@ async def write( frames = await to_frames( msg, serializers=serializers, on_error=on_error ) + send_frames = [ + each_frame for each_frame in frames if len(each_frame) > 0 + ] # Send meta data await self.ep.send(np.array([len(frames)], dtype=np.uint64)) @@ -159,11 +162,11 @@ async def write( await self.ep.send( np.array([nbytes(f) for f in frames], dtype=np.uint64) ) + # Send frames - for each_frame in frames: - if nbytes(each_frame) > 0: - await self.ep.send(each_frame) - return sum(map(nbytes, frames)) + for each_frame in send_frames: + await self.ep.send(each_frame) + return sum(map(nbytes, send_frames)) except (ucp.exceptions.UCXBaseException): self.abort() raise CommClosedError("While writing, the connection was closed") @@ -195,9 +198,11 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")): else np.empty(each_size, dtype=np.uint8) for is_cuda, each_size in zip(is_cudas.tolist(), sizes.tolist()) ] - for each_frame in frames: - if len(each_frame) > 0: - await self.ep.recv(each_frame) + recv_frames = [ + each_frame for each_frame in frames if len(each_frame) > 0 + ] + for each_frame in recv_frames: + await self.ep.recv(each_frame) msg = await from_frames( frames, deserialize=self.deserialize, deserializers=deserializers )