Skip to content
13 changes: 6 additions & 7 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,12 @@ async def read(self, deserializers=None):
lengths = await stream.read_bytes(8 * n_frames)
lengths = struct.unpack("Q" * n_frames, lengths)

frames = []
for length in lengths:
frame = bytearray(length)
if length:
n = await stream.read_into(frame)
assert n == length, (n, length)
frames.append(frame)
frames = [bytearray(each_length) for each_length in lengths]
recv_frames = [each_frame for each_frame in frames if len(each_frame) > 0]
for each_frame in recv_frames:
each_length = len(each_frame)
n = await stream.read_into(each_frame)
assert n == each_length, (n, each_length)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this change has a semantic difference in that previously we used to include empty frames but now we don't. Is this correct?

Is there a specific reason for this change? Does it affect performance in some way, or is it strictly cosmetic? If it's strictly cosmetic, and if there is a change to semantics (however minor) I'm tempted to avoid the change just because I wouldn't be surprised if it has some unforeseen effect.

Copy link
Copy Markdown
Member Author

@jakirkham jakirkham Mar 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not true actually. We excluded them before as well. It's just now done outside of the for-loop. 🙂

The idea was to more-or-less pass control to Tornado or UCX-Py completely while receiving frames without all of the boilerplate in Dask between each receive. Tried asyncio.gather to push this even further, but Tornado doesn't seem to like that (seems to work with UCX-Py though 😉).

In any event I thought it would be a nice thing to do the same thing we are doing in UCX for TCP. At the end of the day, I don't have strong feelings about it (especially if there is push back). 🤷‍♂

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any other thoughts here, @mrocklin? 🙂

except StreamClosedError as e:
self.stream = None
if not shutting_down():
Expand Down