From 3bc4ec4c0c20888c646f253016493ca1b0da8c90 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 16 Apr 2025 13:26:14 +0200 Subject: [PATCH 1/6] Reduce size of scheduler_info --- distributed/client.py | 25 +++++++++++++++---------- distributed/scheduler.py | 7 +++++-- distributed/tests/test_client.py | 4 ++++ 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 1c1c0b46706..4334937ae61 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -1298,7 +1298,7 @@ def dashboard_link(self): try: return self.cluster.dashboard_link except AttributeError: - scheduler, info = self._get_scheduler_info() + scheduler, info = self._get_scheduler_info(n_workers=0) if scheduler is None: return None else: @@ -1312,7 +1312,7 @@ def dashboard_link(self): return format_dashboard_link(host, port) - def _get_scheduler_info(self): + def _get_scheduler_info(self, n_workers): from distributed.scheduler import Scheduler if ( @@ -1320,12 +1320,12 @@ def _get_scheduler_info(self): and hasattr(self.cluster, "scheduler") and isinstance(self.cluster.scheduler, Scheduler) ): - info = self.cluster.scheduler.identity() + info = self.cluster.scheduler.identity(n_workers=n_workers) scheduler = self.cluster.scheduler elif ( self._loop_runner.is_started() and self.scheduler and not self.asynchronous ): - info = sync(self.loop, self.scheduler.identity) + info = sync(self.loop, self.scheduler.identity, n_workers=n_workers) scheduler = self.scheduler else: info = self._scheduler_identity @@ -1368,7 +1368,7 @@ def _repr_html_(self): except PackageNotFoundError: JUPYTERLAB = False - scheduler, info = self._get_scheduler_info() + scheduler, info = self._get_scheduler_info(n_workers=5) return get_template("client.html.j2").render( id=self.id, @@ -1589,14 +1589,16 @@ async def _update_scheduler_info(self): if self.status not in ("running", "connecting") or self.scheduler is None: return try: - self._scheduler_identity = SchedulerInfo(await self.scheduler.identity()) + self._scheduler_identity = SchedulerInfo( + await self.scheduler.identity(n_workers=5) + ) except OSError: logger.debug("Not able to query scheduler for identity") async def _wait_for_workers( self, n_workers: int, timeout: float | None = None ) -> None: - info = await self.scheduler.identity() + info = await self.scheduler.identity(n_workers=-1) self._scheduler_identity = SchedulerInfo(info) if timeout: deadline = time() + parse_timedelta(timeout) @@ -1619,7 +1621,7 @@ def running_workers(info): % (running_workers(info), n_workers, timeout) ) await asyncio.sleep(0.1) - info = await self.scheduler.identity() + info = await self.scheduler.identity(n_workers=-1) self._scheduler_identity = SchedulerInfo(info) def wait_for_workers(self, n_workers: int, timeout: float | None = None) -> None: @@ -4407,11 +4409,14 @@ async def _profile( else: return state - def scheduler_info(self, **kwargs): + def scheduler_info(self, n_workers: int = 5, **kwargs: Any) -> SchedulerInfo: """Basic information about the workers in the cluster Parameters ---------- + n_workers: int + The number of workers for which to fetch information. To fetch all, + use -1. **kwargs : dict Optional keyword arguments for the remote function @@ -4429,7 +4434,7 @@ def scheduler_info(self, **kwargs): 'time-delay': 0.0061032772064208984}}} """ if not self.asynchronous: - self.sync(self._update_scheduler_info) + self.sync(self._update_scheduler_info, n_workers=n_workers) return self._scheduler_identity def dump_cluster_state( diff --git a/distributed/scheduler.py b/distributed/scheduler.py index a026e897ed6..d1ef6c8ce99 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4075,8 +4075,10 @@ def _repr_html_(self) -> str: tasks=self.tasks, ) - def identity(self) -> dict[str, Any]: + def identity(self, n_workers: int = -1) -> dict[str, Any]: """Basic information about ourselves and our cluster""" + if n_workers == -1: + n_workers = len(self.workers) d = { "type": type(self).__name__, "id": str(self.id), @@ -4084,7 +4086,8 @@ def identity(self) -> dict[str, Any]: "services": {key: v.port for (key, v) in self.services.items()}, "started": self.time_started, "workers": { - worker.address: worker.identity() for worker in self.workers.values() + worker.address: worker.identity() + for worker in itertools.islice(self.workers.values(), n_workers) }, } return d diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index ac781625917..6ae578db79d 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3923,6 +3923,10 @@ def test_scheduler_info(c): assert isinstance(info, dict) assert len(info["workers"]) == 2 assert isinstance(info["started"], float) + info = c.scheduler_info(n_workers=1) + assert len(info["workers"]) == 1 + info = c.scheduler_info(n_workers=-1) + assert len(info["workers"]) == 2 def test_write_scheduler_file(c, loop): From 41dd231442c0bdcf5089a28c8803155f8de95210 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 16 Apr 2025 13:32:19 +0200 Subject: [PATCH 2/6] adjust template --- distributed/client.py | 4 ++-- distributed/scheduler.py | 1 + distributed/widgets/templates/scheduler_info.html.j2 | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 4334937ae61..1632a52b548 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -1585,12 +1585,12 @@ async def _ensure_connected(self, timeout=None): logger.debug("Started scheduling coroutines. Synchronized") - async def _update_scheduler_info(self): + async def _update_scheduler_info(self, n_workers=5): if self.status not in ("running", "connecting") or self.scheduler is None: return try: self._scheduler_identity = SchedulerInfo( - await self.scheduler.identity(n_workers=5) + await self.scheduler.identity(n_workers=n_workers) ) except OSError: logger.debug("Not able to query scheduler for identity") diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d1ef6c8ce99..cc8915cffa9 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4085,6 +4085,7 @@ def identity(self, n_workers: int = -1) -> dict[str, Any]: "address": self.address, "services": {key: v.port for (key, v) in self.services.items()}, "started": self.time_started, + "n_workers": len(self.workers), "workers": { worker.address: worker.identity() for worker in itertools.islice(self.workers.values(), n_workers) diff --git a/distributed/widgets/templates/scheduler_info.html.j2 b/distributed/widgets/templates/scheduler_info.html.j2 index 47501d1a6fe..5aed19969a8 100644 --- a/distributed/widgets/templates/scheduler_info.html.j2 +++ b/distributed/widgets/templates/scheduler_info.html.j2 @@ -10,7 +10,7 @@ Comm: {{ address }} - Workers: {{ workers | length }} + Workers: {{ n_workers }} From f45e11915d141e5d4c11245a713c82d2117938c6 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 16 Apr 2025 13:43:21 +0200 Subject: [PATCH 3/6] fix totals --- distributed/scheduler.py | 2 ++ distributed/widgets/templates/scheduler_info.html.j2 | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index cc8915cffa9..127c7d3298b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4086,6 +4086,8 @@ def identity(self, n_workers: int = -1) -> dict[str, Any]: "services": {key: v.port for (key, v) in self.services.items()}, "started": self.time_started, "n_workers": len(self.workers), + "total_threads": self.total_nthreads, + "total_memory": sum(ws.memory_limit for ws in self.workers.values()), "workers": { worker.address: worker.identity() for worker in itertools.islice(self.workers.values(), n_workers) diff --git a/distributed/widgets/templates/scheduler_info.html.j2 b/distributed/widgets/templates/scheduler_info.html.j2 index 5aed19969a8..9f1c697825e 100644 --- a/distributed/widgets/templates/scheduler_info.html.j2 +++ b/distributed/widgets/templates/scheduler_info.html.j2 @@ -10,7 +10,7 @@ Comm: {{ address }} - Workers: {{ n_workers }} + Workers: {{ n_workers }} {% if n_workers > workers | length %} (shown below: {{ workers | length }}) {% endif %} @@ -18,7 +18,7 @@ Dashboard: {{ scheduler | format_dashboard_address }} - Total threads: {{ workers.values() | map(attribute='nthreads') | sum }} + Total threads: {{ total_threads }} @@ -26,7 +26,7 @@ Started: {{ started | datetime_from_timestamp | format_time_ago }} - Total memory: {{ workers.values() | map(attribute='memory_limit') | sum | format_bytes }} + Total memory: {{ total_memory | format_bytes }} From 123d7fd9f793c89d4cade3358b1ada5c2cf88d23 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 16 Apr 2025 13:45:37 +0200 Subject: [PATCH 4/6] introduce total_memory attr --- distributed/scheduler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 127c7d3298b..4eaf245a1f0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1778,6 +1778,7 @@ def __init__( self.task_groups = {} self.task_prefixes = {} self.task_metadata = {} + self.total_memory = 0 self.total_nthreads = 0 self.total_nthreads_history = [(time(), 0)] self.queued = queued @@ -4087,7 +4088,7 @@ def identity(self, n_workers: int = -1) -> dict[str, Any]: "started": self.time_started, "n_workers": len(self.workers), "total_threads": self.total_nthreads, - "total_memory": sum(ws.memory_limit for ws in self.workers.values()), + "total_memory": self.total_memory, "workers": { worker.address: worker.identity() for worker in itertools.islice(self.workers.values(), n_workers) @@ -4541,6 +4542,7 @@ async def add_worker( dh_addresses.add(address) dh["nthreads"] += nthreads + self.total_memory += ws.memory_limit self.total_nthreads += nthreads self.total_nthreads_history.append((time(), self.total_nthreads)) self.aliases[name] = address @@ -5452,6 +5454,7 @@ async def remove_worker( dh_addresses: set = dh["addresses"] dh_addresses.remove(address) dh["nthreads"] -= ws.nthreads + self.total_memory -= ws.memory_limit self.total_nthreads -= ws.nthreads self.total_nthreads_history.append((time(), self.total_nthreads)) if not dh_addresses: From 665431f99d0ae0536ee43e58069e8b4d6d9f006d Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 16 Apr 2025 13:48:36 +0200 Subject: [PATCH 5/6] add type annotation --- distributed/scheduler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 4eaf245a1f0..27d23e3c69d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -1654,6 +1654,8 @@ class SchedulerState: idle_task_count: set[WorkerState] #: Workers that are fully utilized. May include non-running workers. saturated: set[WorkerState] + #: Current total memory across all workers (sum over memory_limit) + total_memory: int #: Current number of threads across all workers total_nthreads: int #: History of number of threads From 41ce938f1526769b4e65922487b25f753410bc5e Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 16 Apr 2025 14:26:45 +0200 Subject: [PATCH 6/6] fix test_Scheduler__to_dict --- distributed/tests/test_scheduler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 956c886f1f5..5cafb8c7dd8 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -4301,6 +4301,9 @@ async def test_Scheduler__to_dict(c, s, a): "extensions", "services", "started", + "n_workers", + "total_threads", + "total_memory", "workers", "status", "thread_id",