diff --git a/distributed/tests/test_utils_comm.py b/distributed/tests/test_utils_comm.py index 36f5fca77e..4be0a9e2b3 100644 --- a/distributed/tests/test_utils_comm.py +++ b/distributed/tests/test_utils_comm.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import logging import random from unittest import mock @@ -19,7 +20,14 @@ subs_multiple, unpack_remotedata, ) -from distributed.utils_test import BarrierGetData, BrokenComm, gen_cluster, inc +from distributed.utils_test import ( + BarrierGetData, + BrokenComm, + captured_logger, + gen_cluster, + gen_test, + inc, +) def test_pack_data(): @@ -229,6 +237,44 @@ async def f(): assert sleep_calls == [0.0, 1.0, 3.0, 6.0, 6.0] +@gen_test() +@pytest.mark.parametrize("count", [1, 2]) +async def test_retry_truncates_large_coro_repr(count): + """Test that retry truncates excessively large string representations of coro.""" + + class MyEx(Exception): + pass + + class LargeReprCallable: + def __repr__(self): + return "x" * 500 + + async def __call__(self): + raise MyEx("fail") + + async def f(): + return await retry( + LargeReprCallable(), + retry_on_exceptions=(MyEx,), + count=count, + delay_min=0, + delay_max=0, + jitter_fraction=0, + ) + + with ( + captured_logger("distributed.utils_comm", level=logging.INFO) as caplog, + pytest.raises(MyEx), + ): + await f() + + msg = caplog.getvalue() + # reprlib uses "..." to indicate truncation + assert "xxxxx..." in msg + # Verify the full 500-char repr is NOT present + assert 200 * count < len(msg) < 300 * count + + def test_unpack_remotedata(): def assert_eq(keys1: set[TaskRef], keys2: set[TaskRef]) -> None: if len(keys1) != len(keys2): diff --git a/distributed/utils_comm.py b/distributed/utils_comm.py index 92679b13ad..a110c93649 100644 --- a/distributed/utils_comm.py +++ b/distributed/utils_comm.py @@ -3,6 +3,7 @@ import asyncio import logging import random +import reprlib from collections import defaultdict from collections.abc import Callable, Collection, Coroutine, Mapping from functools import partial @@ -384,7 +385,10 @@ async def retry( try: return await coro() except retry_on_exceptions as ex: - operation = operation or str(coro) + if not operation: + r = reprlib.Repr() + r.maxother = 200 + operation = r.repr(coro) logger.info( f"Retrying {operation} after exception in attempt {i_try}/{count}: {ex}" )