From 17c92ec8d55daffbb4eed4142833c6f347fd278a Mon Sep 17 00:00:00 2001 From: Ernest Provo Date: Sun, 22 Feb 2026 10:41:08 -0500 Subject: [PATCH 1/6] Truncate large coro repr in retry log output When retry() falls back to str(coro) for the log message, truncate the representation to 200 characters to prevent excessively large logs. This was observed in P2P shuffles where functools.partial repr includes serialized binary data. Closes #8529 --- distributed/tests/test_utils_comm.py | 44 ++++++++++++++++++++++++++++ distributed/utils_comm.py | 5 +++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_utils_comm.py b/distributed/tests/test_utils_comm.py index 36f5fca77e8..1d02d4e2090 100644 --- a/distributed/tests/test_utils_comm.py +++ b/distributed/tests/test_utils_comm.py @@ -229,6 +229,50 @@ async def f(): assert sleep_calls == [0.0, 1.0, 3.0, 6.0, 6.0] +def test_retry_truncates_large_coro_repr(cleanup): + """Test that retry truncates excessively large string representations of coro.""" + + class MyEx(Exception): + pass + + class LargeReprCallable: + def __repr__(self): + return "x" * 500 + + async def __call__(self): + raise MyEx("fail") + + log_messages = [] + + async def f(): + return await retry( + LargeReprCallable(), + retry_on_exceptions=(MyEx,), + count=1, + delay_min=0, + delay_max=0, + jitter_fraction=0, + ) + + import logging + + handler = logging.Handler() + handler.emit = lambda record: log_messages.append(record.getMessage()) + + logger = logging.getLogger("distributed.utils_comm") + logger.addHandler(handler) + try: + with pytest.raises(MyEx): + asyncio_run(f(), loop_factory=get_loop_factory()) + finally: + logger.removeHandler(handler) + + assert len(log_messages) == 1 + # The 500-char repr should be truncated to 200 + "..." + assert len(log_messages[0]) < 500 + assert "..." in log_messages[0] + + def test_unpack_remotedata(): def assert_eq(keys1: set[TaskRef], keys2: set[TaskRef]) -> None: if len(keys1) != len(keys2): diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index 92679b13ada..b1b43208778 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -384,7 +384,10 @@ async def retry( try: return await coro() except retry_on_exceptions as ex: - operation = operation or str(coro) + if not operation: + operation = str(coro) + if len(operation) > 200: + operation = operation[:200] + "..." logger.info( f"Retrying {operation} after exception in attempt {i_try}/{count}: {ex}" ) From 0590d81e6bfdae6e2e4ebbd19d15565c6083d09c Mon Sep 17 00:00:00 2001 From: Ernest Provo Date: Fri, 6 Mar 2026 19:22:39 -0500 Subject: [PATCH 2/6] Use reprlib for truncating large coro repr in retry logs Replace manual str[:200] + "..." truncation with stdlib reprlib.Repr() per reviewer feedback. reprlib handles truncation consistently and is a well-known stdlib pattern for this use case. --- distributed/tests/test_utils_comm.py | 5 ++++- distributed/utils_comm.py | 7 ++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_utils_comm.py b/distributed/tests/test_utils_comm.py index 1d02d4e2090..0cf4235c5fa 100644 --- a/distributed/tests/test_utils_comm.py +++ b/distributed/tests/test_utils_comm.py @@ -268,9 +268,12 @@ async def f(): logger.removeHandler(handler) assert len(log_messages) == 1 - # The 500-char repr should be truncated to 200 + "..." + # reprlib truncates the 500-char repr to maxother (200) chars assert len(log_messages[0]) < 500 + # reprlib uses "..." to indicate truncation assert "..." in log_messages[0] + # Verify the full 500-char repr is NOT present + assert "x" * 500 not in log_messages[0] def test_unpack_remotedata(): diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index b1b43208778..043ab04caf6 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -3,6 +3,7 @@ import asyncio import logging import random +import reprlib from collections import defaultdict from collections.abc import Callable, Collection, Coroutine, Mapping from functools import partial @@ -385,9 +386,9 @@ async def retry( return await coro() except retry_on_exceptions as ex: if not operation: - operation = str(coro) - if len(operation) > 200: - operation = operation[:200] + "..." + aRepr = reprlib.Repr() + aRepr.maxother = 200 + operation = aRepr.repr(coro) logger.info( f"Retrying {operation} after exception in attempt {i_try}/{count}: {ex}" ) From 7f0bdda98f5808cd9f594cbb6f657ee5bb866922 Mon Sep 17 00:00:00 2001 From: Ernest Provo Date: Thu, 19 Mar 2026 09:19:23 -0400 Subject: [PATCH 3/6] Address review feedback: one-liner reprlib, caplog, top-level imports - Condense reprlib setup to single line per suggestion - Replace manual logging handler with pytest caplog fixture - Move logging import to top of test file --- distributed/tests/test_utils_comm.py | 25 ++++++++----------------- distributed/utils_comm.py | 4 +--- 2 files changed, 9 insertions(+), 20 deletions(-) diff --git a/distributed/tests/test_utils_comm.py b/distributed/tests/test_utils_comm.py index 0cf4235c5fa..bd8a3fd4f2d 100644 --- a/distributed/tests/test_utils_comm.py +++ b/distributed/tests/test_utils_comm.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import logging import random from unittest import mock @@ -229,7 +230,7 @@ async def f(): assert sleep_calls == [0.0, 1.0, 3.0, 6.0, 6.0] -def test_retry_truncates_large_coro_repr(cleanup): +def test_retry_truncates_large_coro_repr(cleanup, caplog): """Test that retry truncates excessively large string representations of coro.""" class MyEx(Exception): @@ -242,8 +243,6 @@ def __repr__(self): async def __call__(self): raise MyEx("fail") - log_messages = [] - async def f(): return await retry( LargeReprCallable(), @@ -254,26 +253,18 @@ async def f(): jitter_fraction=0, ) - import logging - - handler = logging.Handler() - handler.emit = lambda record: log_messages.append(record.getMessage()) - - logger = logging.getLogger("distributed.utils_comm") - logger.addHandler(handler) - try: + with caplog.at_level(logging.INFO, logger="distributed.utils_comm"): with pytest.raises(MyEx): asyncio_run(f(), loop_factory=get_loop_factory()) - finally: - logger.removeHandler(handler) - assert len(log_messages) == 1 + assert len(caplog.records) == 1 + msg = caplog.records[0].getMessage() # reprlib truncates the 500-char repr to maxother (200) chars - assert len(log_messages[0]) < 500 + assert len(msg) < 500 # reprlib uses "..." to indicate truncation - assert "..." in log_messages[0] + assert "..." in msg # Verify the full 500-char repr is NOT present - assert "x" * 500 not in log_messages[0] + assert "x" * 500 not in msg def test_unpack_remotedata(): diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index 043ab04caf6..10f812f7f0b 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -386,9 +386,7 @@ async def retry( return await coro() except retry_on_exceptions as ex: if not operation: - aRepr = reprlib.Repr() - aRepr.maxother = 200 - operation = aRepr.repr(coro) + operation = reprlib.Repr(maxother=200).repr(coro) logger.info( f"Retrying {operation} after exception in attempt {i_try}/{count}: {ex}" ) From 7aa9c062ea4ea48d4e1e58715314eb71b8331bdc Mon Sep 17 00:00:00 2001 From: Ernest Provo Date: Thu, 26 Mar 2026 17:20:36 -0400 Subject: [PATCH 4/6] Fix mypy error: Repr() doesn't accept maxother as kwarg --- distributed/utils_comm.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index 10f812f7f0b..a110c936496 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -386,7 +386,9 @@ async def retry( return await coro() except retry_on_exceptions as ex: if not operation: - operation = reprlib.Repr(maxother=200).repr(coro) + r = reprlib.Repr() + r.maxother = 200 + operation = r.repr(coro) logger.info( f"Retrying {operation} after exception in attempt {i_try}/{count}: {ex}" ) From ac79e3e5a8f4f9b117a2b96dcc7b85c621282c1f Mon Sep 17 00:00:00 2001 From: Ernest Provo Date: Mon, 18 May 2026 21:49:59 -0400 Subject: [PATCH 5/6] Fix caplog capture by re-enabling propagation on parent logger distributed/config.py sets propagate=False on the top-level 'distributed' logger and attaches its own handler. Records from distributed.utils_comm propagate up to 'distributed' but stop there, so pytest's caplog (attached to root) never sees them. Use monkeypatch to flip propagate=True on the 'distributed' logger for the test duration. --- distributed/tests/test_utils_comm.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_utils_comm.py b/distributed/tests/test_utils_comm.py index bd8a3fd4f2d..ec7b0a7dcae 100644 --- a/distributed/tests/test_utils_comm.py +++ b/distributed/tests/test_utils_comm.py @@ -230,7 +230,7 @@ async def f(): assert sleep_calls == [0.0, 1.0, 3.0, 6.0, 6.0] -def test_retry_truncates_large_coro_repr(cleanup, caplog): +def test_retry_truncates_large_coro_repr(cleanup, caplog, monkeypatch): """Test that retry truncates excessively large string representations of coro.""" class MyEx(Exception): @@ -253,6 +253,11 @@ async def f(): jitter_fraction=0, ) + # distributed.config sets propagate=False on the top-level "distributed" + # logger, so caplog (attached to root) won't see records without re-enabling + # propagation on the parent for the duration of the test. + monkeypatch.setattr(logging.getLogger("distributed"), "propagate", True) + with caplog.at_level(logging.INFO, logger="distributed.utils_comm"): with pytest.raises(MyEx): asyncio_run(f(), loop_factory=get_loop_factory()) From 630991a7170316324099f856a1e70201df4ccea5 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 19 May 2026 10:51:52 +0100 Subject: [PATCH 6/6] Review test --- distributed/tests/test_utils_comm.py | 37 +++++++++++++++------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/distributed/tests/test_utils_comm.py b/distributed/tests/test_utils_comm.py index ec7b0a7dcae..4be0a9e2b3a 100644 --- a/distributed/tests/test_utils_comm.py +++ b/distributed/tests/test_utils_comm.py @@ -20,7 +20,14 @@ subs_multiple, unpack_remotedata, ) -from distributed.utils_test import BarrierGetData, BrokenComm, gen_cluster, inc +from distributed.utils_test import ( + BarrierGetData, + BrokenComm, + captured_logger, + gen_cluster, + gen_test, + inc, +) def test_pack_data(): @@ -230,7 +237,9 @@ async def f(): assert sleep_calls == [0.0, 1.0, 3.0, 6.0, 6.0] -def test_retry_truncates_large_coro_repr(cleanup, caplog, monkeypatch): +@gen_test() +@pytest.mark.parametrize("count", [1, 2]) +async def test_retry_truncates_large_coro_repr(count): """Test that retry truncates excessively large string representations of coro.""" class MyEx(Exception): @@ -247,29 +256,23 @@ async def f(): return await retry( LargeReprCallable(), retry_on_exceptions=(MyEx,), - count=1, + count=count, delay_min=0, delay_max=0, jitter_fraction=0, ) - # distributed.config sets propagate=False on the top-level "distributed" - # logger, so caplog (attached to root) won't see records without re-enabling - # propagation on the parent for the duration of the test. - monkeypatch.setattr(logging.getLogger("distributed"), "propagate", True) - - with caplog.at_level(logging.INFO, logger="distributed.utils_comm"): - with pytest.raises(MyEx): - asyncio_run(f(), loop_factory=get_loop_factory()) + with ( + captured_logger("distributed.utils_comm", level=logging.INFO) as caplog, + pytest.raises(MyEx), + ): + await f() - assert len(caplog.records) == 1 - msg = caplog.records[0].getMessage() - # reprlib truncates the 500-char repr to maxother (200) chars - assert len(msg) < 500 + msg = caplog.getvalue() # reprlib uses "..." to indicate truncation - assert "..." in msg + assert "xxxxx..." in msg # Verify the full 500-char repr is NOT present - assert "x" * 500 not in msg + assert 200 * count < len(msg) < 300 * count def test_unpack_remotedata():