From cc658c8dd1710b99bb21f80bd08bc4cd53d8c72f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 26 May 2026 16:22:48 +0100 Subject: [PATCH 1/4] uvloop tests --- distributed/cli/tests/test_dask_scheduler.py | 27 +++++++++++- distributed/cli/tests/test_dask_spec.py | 43 ++++++++++++++++++++ distributed/cli/tests/test_dask_worker.py | 26 ++++++++++++ distributed/tests/test_config.py | 27 +++++++++--- 4 files changed, 116 insertions(+), 7 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 3b8b4ee03cf..b6bc2990361 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import os import re import shutil @@ -672,7 +673,8 @@ def test_signal_handling(loop, sig): [ sys.executable, "-m", - "distributed.cli.dask_scheduler", + "dask", + "scheduler", f"--port={port}", "--dashboard-address=:0", ], @@ -690,3 +692,26 @@ def test_signal_handling(loop, sig): assert scheduler.returncode == 0 assert "scheduler closing" in logs assert "end scheduler" in logs + + +def test_uvloop(loop): + uvloop = pytest.importorskip("uvloop") + port = open_port() + + def check(): + return isinstance(asyncio.get_event_loop(), uvloop.Loop) + + with popen( + [ + sys.executable, + "-m", + "dask", + "scheduler", + "--no-dashboard", + "--host", + f"127.0.0.1:{port}", + ], + env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"}, + ): + with Client(f"127.0.0.1:{port}", loop=loop) as c: + assert c.run_on_scheduler(check) diff --git a/distributed/cli/tests/test_dask_spec.py b/distributed/cli/tests/test_dask_spec.py index 59bf8585311..3c961a47a76 100644 --- a/distributed/cli/tests/test_dask_spec.py +++ b/distributed/cli/tests/test_dask_spec.py @@ -172,3 +172,46 @@ async def test_signal_handling_scheduler(sig): assert "exception" not in logs finally: await scheduler.wait() + + +@gen_test() +async def test_uvloop(): + uvloop = pytest.importorskip("uvloop") + port = open_port() + env = {"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"} + + def check(): + return isinstance(asyncio.get_event_loop(), uvloop.Loop) + + with popen( + [ + sys.executable, + "-m", + "dask", + "spec", + "--spec", + json.dumps({"cls": "dask.distributed.Scheduler", "opts": {"port": port}}), + ], + env=env, + ): + with popen( + [ + sys.executable, + "-m", + "dask", + "spec", + f"tcp://localhost:{port}", + "--spec", + json.dumps( + { + "cls": "dask.distributed.Worker", + "opts": {"nanny": False, "nthreads": 1}, + } + ), + ], + env=env, + ): + async with Client(f"tcp://localhost:{port}", asynchronous=True) as client: + await client.wait_for_workers(1) + assert await client.run_on_scheduler(check) + assert all((await client.run(check)).values()) diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index bd12f8fc453..5449f173522 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -811,3 +811,29 @@ def test_error_during_startup(monkeypatch, nanny, loop): ], ) as worker: assert worker.wait(10) == 1 + + +@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) +@gen_cluster(client=True, nthreads=[]) +async def test_uvloop(c, s, nanny): + uvloop = pytest.importorskip("uvloop") + + def check(): + return isinstance(asyncio.get_event_loop(), uvloop.Loop) + + with popen( + [ + sys.executable, + "-m", + "dask", + "worker", + s.address, + nanny, + "--no-dashboard", + ], + env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"}, + ): + await c.wait_for_workers(1) + assert all((await c.run(check)).values()) + if nanny == "--nanny": + assert all((await c.run(check, nanny=True)).values()) diff --git a/distributed/tests/test_config.py b/distributed/tests/test_config.py index b7bf97ef92c..dea57ea3da3 100644 --- a/distributed/tests/test_config.py +++ b/distributed/tests/test_config.py @@ -347,15 +347,30 @@ def test_matches(c, s, root): test_matches(config, schema, "") -def test_uvloop_event_loop(): +uvloop_test_script = """ +import asyncio + +import uvloop + +import distributed + +async def test(): + return isinstance(asyncio.get_event_loop(), uvloop.Loop) + +if __name__ == "__main__": + with distributed.Client(processes=True, n_workers=1) as client: + assert client.sync(test) # client loop + assert client.run_on_scheduler(test) # also client loop loop + assert client.run(test) # nanny loop +""" + + +def test_uvloop(): """Check that configuring distributed to use uvloop actually sets the event loop policy""" pytest.importorskip("uvloop") - script = ( - "import distributed, asyncio, uvloop\n" - "assert isinstance(asyncio.get_event_loop_policy(), uvloop.EventLoopPolicy)" - ) + subprocess.check_call( - [sys.executable, "-c", script], + [sys.executable, "-c", uvloop_test_script], env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"}, ) From e8c5117d9b30a12b2fdf5d84c7096da81fe54bfe Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 26 May 2026 17:03:37 +0100 Subject: [PATCH 2/4] Add to legacy CI --- continuous_integration/environment-3.10.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/continuous_integration/environment-3.10.yaml b/continuous_integration/environment-3.10.yaml index 8a6302105ba..3a93acd86a1 100644 --- a/continuous_integration/environment-3.10.yaml +++ b/continuous_integration/environment-3.10.yaml @@ -50,6 +50,7 @@ dependencies: - toolz - torchvision # Only tested here - tornado + - uvloop # Only tested here - zict # overridden by git tip below - zstandard - pip: From 765b5e6dc40c9bfdee36078580b12aa3bd416e72 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 26 May 2026 17:08:48 +0100 Subject: [PATCH 3/4] nits --- distributed/cli/tests/test_dask_scheduler.py | 8 +++++--- distributed/tests/test_config.py | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index b6bc2990361..1affd8b4fd8 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -25,6 +25,7 @@ from distributed.utils_test import ( assert_can_connect_from_everywhere_4_6, assert_can_connect_locally_4, + gen_test, popen, ) @@ -694,7 +695,8 @@ def test_signal_handling(loop, sig): assert "end scheduler" in logs -def test_uvloop(loop): +@gen_test() +async def test_uvloop(): uvloop = pytest.importorskip("uvloop") port = open_port() @@ -713,5 +715,5 @@ def check(): ], env={"DASK_DISTRIBUTED__ADMIN__EVENT_LOOP": "uvloop"}, ): - with Client(f"127.0.0.1:{port}", loop=loop) as c: - assert c.run_on_scheduler(check) + async with Client(f"127.0.0.1:{port}", asynchronous=True) as c: + assert await c.run_on_scheduler(check) diff --git a/distributed/tests/test_config.py b/distributed/tests/test_config.py index dea57ea3da3..91737bb7759 100644 --- a/distributed/tests/test_config.py +++ b/distributed/tests/test_config.py @@ -360,7 +360,7 @@ async def test(): if __name__ == "__main__": with distributed.Client(processes=True, n_workers=1) as client: assert client.sync(test) # client loop - assert client.run_on_scheduler(test) # also client loop loop + assert client.run_on_scheduler(test) # also client loop assert client.run(test) # nanny loop """ From fb50daa4ed54305b9032920d7acbb712d1b65e32 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 26 May 2026 17:20:04 +0100 Subject: [PATCH 4/4] Revert "Add to legacy CI" This reverts commit e8c5117d9b30a12b2fdf5d84c7096da81fe54bfe. --- continuous_integration/environment-3.10.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/continuous_integration/environment-3.10.yaml b/continuous_integration/environment-3.10.yaml index 3a93acd86a1..8a6302105ba 100644 --- a/continuous_integration/environment-3.10.yaml +++ b/continuous_integration/environment-3.10.yaml @@ -50,7 +50,6 @@ dependencies: - toolz - torchvision # Only tested here - tornado - - uvloop # Only tested here - zict # overridden by git tip below - zstandard - pip: