From 945d7ebade9e545722dce25032b177bda3eb0656 Mon Sep 17 00:00:00 2001 From: peter941221 Date: Sun, 24 May 2026 23:02:56 +0800 Subject: [PATCH 1/5] Clarify behavior for unsatisfied resource restrictions Signed-off-by: peter941221 --- docs/source/resources.rst | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/docs/source/resources.rst b/docs/source/resources.rst index f551222a91..eff65de256 100644 --- a/docs/source/resources.rst +++ b/docs/source/resources.rst @@ -128,6 +128,30 @@ each task will run in a separate process: resources={'process': 1}) for arg in args] +What happens if no worker can satisfy a resource restriction? +------------------------------------------------------------- + +Dask clusters are dynamic: workers may join later, and those future workers may +provide resources that are not available right now. Because of that, submitting +a task with currently unsatisfied resource restrictions does not immediately +raise an exception. Instead, the task remains in the scheduler's +``no-worker`` state and waits until a suitable worker becomes available. + +For example, if every connected worker advertises at most ``{"GPU": 1}``, then +submitting a task with ``resources={"GPU": 2}`` will keep the task waiting +rather than fail immediately. + +If you expect the required resources to be unavailable permanently, consider one +of the following: + +- call :meth:`distributed.Future.result` with ``timeout=...`` so your client + code does not wait forever; +- inspect the task state in the dashboard or scheduler diagnostics to confirm it + is in ``no-worker``; +- validate your cluster's available resources before submitting tasks with + restrictive resource requirements. + + Resources are Abstract ---------------------- From 7af9ce55170adfcadb8efbd9865409316551ecd9 Mon Sep 17 00:00:00 2001 From: peter941221 Date: Mon, 25 May 2026 08:03:08 +0800 Subject: [PATCH 2/5] Log client addresses in scheduler lifecycle messages Signed-off-by: peter941221 --- distributed/scheduler.py | 31 ++++++++++++++++++++++++----- distributed/tests/test_scheduler.py | 11 ++++++++++ 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 445ffd96e8..ffd45c9649 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -223,14 +223,24 @@ class ClientState: #: Output of :func:`distributed.versions.get_versions` on the client versions: dict[str, Any] + #: Remote address of the client connection as seen by the scheduler. + address: str | None + __slots__ = tuple(__annotations__) - def __init__(self, client: str, *, versions: dict[str, Any] | None = None): + def __init__( + self, + client: str, + *, + versions: dict[str, Any] | None = None, + address: str | None = None, + ): self.client_key = client self._hash = hash(client) self.wants_what = set() self.last_seen = time() self.versions = versions or {} + self.address = address def __hash__(self) -> int: return self._hash @@ -5907,9 +5917,12 @@ async def add_client( """ assert client is not None comm.name = "Scheduler->Client" - logger.info("Receive client connection: %s", client) + client_address = comm.peer_address + logger.info("Receive client connection: %s at %s", client, client_address) self.log_event(["all", client], {"action": "add-client", "client": client}) - self.clients[client] = ClientState(client, versions=versions) + self.clients[client] = ClientState( + client, versions=versions, address=client_address + ) self._client_connections_added_total += 1 for plugin in list(self.plugins.values()): @@ -5944,15 +5957,23 @@ async def add_client( await self.client_comms[client].close() del self.client_comms[client] if self.status == Status.running: - logger.info("Close client connection: %s", client) + logger.info( + "Close client connection: %s at %s", + client, + client_address, + ) except TypeError: # comm becomes None during GC pass def remove_client(self, client: str, stimulus_id: str | None = None) -> None: """Remove client from network""" stimulus_id = stimulus_id or f"remove-client-{time()}" + client_address = self.clients.get(client).address if client in self.clients else None if self.status == Status.running: - logger.info("Remove client %s", client) + if client_address is not None: + logger.info("Remove client %s at %s", client, client_address) + else: + logger.info("Remove client %s", client) self.log_event(["all", client], {"action": "remove-client", "client": client}) try: cs: ClientState = self.clients[client] diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index ff4d4268d2..f5d93c022e 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -878,6 +878,17 @@ async def test_clear_events_client_removal(c, s, a, b): assert time() < start + 2 +@gen_cluster(client=True, nthreads=[]) +async def test_client_connection_logs_include_address(c, s): + with captured_logger("distributed.scheduler", level=logging.INFO) as caplog: + await c.close() + + logs = caplog.getvalue() + assert f"Receive client connection: {c.id} at " in logs + assert f"Remove client {c.id} at " in logs + assert f"Close client connection: {c.id} at " in logs + + @gen_cluster(client=True, nthreads=[]) async def test_add_worker(c, s): x = c.submit(inc, 1, key="x") From 728ceed447cb9f624c3bdf9418c399ca73380adc Mon Sep 17 00:00:00 2001 From: peter941221 Date: Mon, 25 May 2026 08:09:24 +0800 Subject: [PATCH 3/5] Handle missing client state in remove_client --- distributed/scheduler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index ffd45c9649..e79521bc35 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5968,7 +5968,8 @@ async def add_client( def remove_client(self, client: str, stimulus_id: str | None = None) -> None: """Remove client from network""" stimulus_id = stimulus_id or f"remove-client-{time()}" - client_address = self.clients.get(client).address if client in self.clients else None + client_state = self.clients.get(client) + client_address = client_state.address if client_state is not None else None if self.status == Status.running: if client_address is not None: logger.info("Remove client %s at %s", client, client_address) From d42d84443acdeecac19183eba66a980e597cba5f Mon Sep 17 00:00:00 2001 From: peter941221 Date: Mon, 25 May 2026 17:11:13 +0800 Subject: [PATCH 4/5] Capture client lifecycle logs in test --- distributed/tests/test_scheduler.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index f5d93c022e..ad1073be17 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -878,15 +878,16 @@ async def test_clear_events_client_removal(c, s, a, b): assert time() < start + 2 -@gen_cluster(client=True, nthreads=[]) -async def test_client_connection_logs_include_address(c, s): +@gen_cluster(nthreads=[]) +async def test_client_connection_logs_include_address(s): with captured_logger("distributed.scheduler", level=logging.INFO) as caplog: - await c.close() + async with Client(s.address, asynchronous=True) as c: + client_id = c.id logs = caplog.getvalue() - assert f"Receive client connection: {c.id} at " in logs - assert f"Remove client {c.id} at " in logs - assert f"Close client connection: {c.id} at " in logs + assert f"Receive client connection: {client_id} at " in logs + assert f"Remove client {client_id} at " in logs + assert f"Close client connection: {client_id} at " in logs @gen_cluster(client=True, nthreads=[]) From d31b9d7550c93d08bd6dc5c4bbecaeb6cb391303 Mon Sep 17 00:00:00 2001 From: peter941221 Date: Tue, 26 May 2026 18:46:46 +0800 Subject: [PATCH 5/5] Remove unrelated resource docs from PR 9270 --- docs/source/resources.rst | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/docs/source/resources.rst b/docs/source/resources.rst index eff65de256..f551222a91 100644 --- a/docs/source/resources.rst +++ b/docs/source/resources.rst @@ -128,30 +128,6 @@ each task will run in a separate process: resources={'process': 1}) for arg in args] -What happens if no worker can satisfy a resource restriction? -------------------------------------------------------------- - -Dask clusters are dynamic: workers may join later, and those future workers may -provide resources that are not available right now. Because of that, submitting -a task with currently unsatisfied resource restrictions does not immediately -raise an exception. Instead, the task remains in the scheduler's -``no-worker`` state and waits until a suitable worker becomes available. - -For example, if every connected worker advertises at most ``{"GPU": 1}``, then -submitting a task with ``resources={"GPU": 2}`` will keep the task waiting -rather than fail immediately. - -If you expect the required resources to be unavailable permanently, consider one -of the following: - -- call :meth:`distributed.Future.result` with ``timeout=...`` so your client - code does not wait forever; -- inspect the task state in the dashboard or scheduler diagnostics to confirm it - is in ``no-worker``; -- validate your cluster's available resources before submitting tasks with - restrictive resource requirements. - - Resources are Abstract ----------------------