diff --git a/benchmarks/cugraph/standalone/main.py b/benchmarks/cugraph/standalone/main.py index 07e8eefde1d..20f99b9cb96 100644 --- a/benchmarks/cugraph/standalone/main.py +++ b/benchmarks/cugraph/standalone/main.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -113,7 +113,7 @@ def run( # If the number of GPUs is None, This is a MNMG run # Extract the number of gpus from the client if n_gpus is None: - n_gpus = len(setup_objs[0].scheduler_info()["workers"]) + n_gpus = setup_objs[0].scheduler_info()["n_workers"] log("done.") try: diff --git a/benchmarks/shared/build_cugraph_ucx/test_client_bandwidth.py b/benchmarks/shared/build_cugraph_ucx/test_client_bandwidth.py index 3d52fb46421..3e530a80e39 100644 --- a/benchmarks/shared/build_cugraph_ucx/test_client_bandwidth.py +++ b/benchmarks/shared/build_cugraph_ucx/test_client_bandwidth.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -46,7 +46,7 @@ def create_dataframe(client): } ) ddf = dask_cudf.from_cudf( - df, npartitions=len(client.scheduler_info()["workers"]) + df, npartitions=client.scheduler_info()["n_workers"] ).persist() client.rebalance(ddf) del df diff --git a/mg_utils/wait_for_workers.py b/mg_utils/wait_for_workers.py index fa75c90d4ad..486aecafb5e 100644 --- a/mg_utils/wait_for_workers.py +++ b/mg_utils/wait_for_workers.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# Copyright (c) 2023-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -58,7 +58,7 @@ def wait_for_workers( sys.stdout.flush() break with Client(scheduler_file=scheduler_file_path) as client: - num_workers = len(client.scheduler_info()["workers"]) + num_workers = client.scheduler_info()["n_workers"] if num_workers < num_expected_workers: print( f"wait_for_workers.py expected {num_expected_workers} but got {num_workers}, waiting..." diff --git a/python/cugraph-service/server/cugraph_service_server/cugraph_handler.py b/python/cugraph-service/server/cugraph_service_server/cugraph_handler.py index aa978d6cbe2..58ecc9b3beb 100644 --- a/python/cugraph-service/server/cugraph_service_server/cugraph_handler.py +++ b/python/cugraph-service/server/cugraph_service_server/cugraph_handler.py @@ -192,9 +192,7 @@ def num_gpus(self): the number of GPUs accessible through dask. """ return ( - len(self.__dask_client.scheduler_info()["workers"]) - if self.is_multi_gpu - else 1 + self.__dask_client.scheduler_info()["n_workers"] if self.is_multi_gpu else 1 ) def uptime(self): diff --git a/python/cugraph/cugraph/dask/common/read_utils.py b/python/cugraph/cugraph/dask/common/read_utils.py index be1231d910e..c81b883b49a 100644 --- a/python/cugraph/cugraph/dask/common/read_utils.py +++ b/python/cugraph/cugraph/dask/common/read_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2022, NVIDIA CORPORATION. +# Copyright (c) 2019-2025, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ def get_n_workers(): from dask.distributed import default_client client = default_client() - return len(client.scheduler_info()["workers"]) + return client.scheduler_info()["n_workers"] def get_chunksize(input_path): diff --git a/python/cugraph/cugraph/dask/comms/comms.py b/python/cugraph/cugraph/dask/comms/comms.py index 1e1c28fbbee..d013b67df28 100644 --- a/python/cugraph/cugraph/dask/comms/comms.py +++ b/python/cugraph/cugraph/dask/comms/comms.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2024, NVIDIA CORPORATION. +# Copyright (c) 2018-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -269,7 +269,7 @@ def rank_to_worker(client): """ Return a mapping of ranks to dask workers. """ - workers = client.scheduler_info()["workers"].keys() + workers = client.scheduler_info(n_workers=-1)["workers"].keys() worker_info = __instance.worker_info(workers) rank_to_worker = {} for w in worker_info: diff --git a/python/cugraph/cugraph/generators/rmat.py b/python/cugraph/cugraph/generators/rmat.py index 7a37e9bdaf2..7ea3034a9aa 100644 --- a/python/cugraph/cugraph/generators/rmat.py +++ b/python/cugraph/cugraph/generators/rmat.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -271,7 +271,7 @@ def _mg_rmat( each subsequent worker will receive seed+ as the seed value. """ client = default_client() - worker_list = list(client.scheduler_info()["workers"].keys()) + worker_list = list(client.scheduler_info(n_workers=-1)["workers"].keys()) num_workers = len(worker_list) num_edges_list = _calc_num_edges_per_worker(num_workers, num_edges) result = [] diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index 6d0bbba76c3..6beb7523756 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -234,7 +234,7 @@ def __from_edgelist( ddf_columns = s_col + d_col self.vertex_columns = ddf_columns.copy() _client = default_client() - workers = _client.scheduler_info()["workers"] + workers = _client.scheduler_info(n_workers=-1)["workers"] # Repartition to 2 partitions per GPU for memory efficient process input_ddf = input_ddf.repartition(npartitions=len(workers) * 2) # The dataframe will be symmetrized iff the graph is undirected @@ -365,7 +365,7 @@ def __from_edgelist( is_symmetric=not self.properties.directed, ) ddf = ddf.repartition(npartitions=len(workers) * 2) - workers = _client.scheduler_info()["workers"].keys() + workers = _client.scheduler_info(n_workers=-1)["workers"].keys() persisted_keys_d = persist_dask_df_equal_parts_per_worker( ddf, _client, return_type="dict" ) @@ -482,7 +482,7 @@ def view_edge_list(self): # Drop parallel edges for non MultiGraph # FIXME: Drop multi edges with the CAPI instead. _client = default_client() - workers = _client.scheduler_info()["workers"] + workers = _client.scheduler_info(n_workers=-1)["workers"] edgelist_df = _memory_efficient_drop_duplicates( edgelist_df, [srcCol, dstCol], len(workers) ) diff --git a/python/cugraph/cugraph/structure/symmetrize.py b/python/cugraph/cugraph/structure/symmetrize.py index e13ef6451fe..78458ecac61 100644 --- a/python/cugraph/cugraph/structure/symmetrize.py +++ b/python/cugraph/cugraph/structure/symmetrize.py @@ -148,7 +148,7 @@ def symmetrize_ddf( """ # FIXME: Uncomment out the above (broken) example _client = default_client() - workers = _client.scheduler_info()["workers"] + workers = _client.scheduler_info(n_workers=-1)["workers"] if not isinstance(src_name, list): src_name = [src_name] diff --git a/python/cugraph/cugraph/tests/generators/test_rmat_mg.py b/python/cugraph/cugraph/tests/generators/test_rmat_mg.py index 44a6b3a2fc1..5156da0f30c 100644 --- a/python/cugraph/cugraph/tests/generators/test_rmat_mg.py +++ b/python/cugraph/cugraph/tests/generators/test_rmat_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. +# Copyright (c) 2021-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -65,7 +65,7 @@ def setup_module(): global _visible_devices if not _is_single_gpu: (_client, _cluster) = start_dask_client() - _visible_devices = _client.scheduler_info()["workers"] + _visible_devices = _client.scheduler_info(n_workers=-1)["workers"] def teardown_module(): diff --git a/python/cugraph/cugraph/tests/internals/test_renumber_mg.py b/python/cugraph/cugraph/tests/internals/test_renumber_mg.py index 64917d0c747..9b1406cdaa4 100644 --- a/python/cugraph/cugraph/tests/internals/test_renumber_mg.py +++ b/python/cugraph/cugraph/tests/internals/test_renumber_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2024, NVIDIA CORPORATION. +# Copyright (c) 2019-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -91,7 +91,7 @@ def test_mg_renumber(dataset, dask_client): gdf["dst"] = destinations + translate ddf = dask.dataframe.from_pandas( - gdf, npartitions=len(dask_client.scheduler_info()["workers"]) + gdf, npartitions=dask_client.scheduler_info()["n_workers"] ) # preserve_order is not supported for MG @@ -140,7 +140,7 @@ def test_mg_renumber_add_internal_vertex_id(dataset, dask_client): gdf["weight"] = gdf.index.astype(np.float64) ddf = dask.dataframe.from_pandas( - gdf, npartitions=len(dask_client.scheduler_info()["workers"]) + gdf, npartitions=dask_client.scheduler_info()["n_workers"] ) ren2, num2 = NumberMap.renumber(ddf, ["src", "src_old"], ["dst", "dst_old"]) @@ -214,7 +214,7 @@ def test_mg_renumber_common_col_names(dataset, dask_client): } ) ddf = dask.dataframe.from_pandas( - gdf, npartitions=len(dask_client.scheduler_info()["workers"]) + gdf, npartitions=dask_client.scheduler_info()["n_workers"] ) renumbered_df, renumber_map = NumberMap.renumber( @@ -238,7 +238,7 @@ def test_mg_renumber_common_col_names(dataset, dask_client): ) ddf = dask.dataframe.from_pandas( - gdf, npartitions=len(dask_client.scheduler_info()["workers"]) + gdf, npartitions=dask_client.scheduler_info()["n_workers"] ) renumbered_df, renumber_map = NumberMap.renumber(ddf, "col_a", "col_b") diff --git a/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py b/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py index 09936e954e8..2f4c63c4a73 100644 --- a/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py +++ b/python/cugraph/cugraph/tests/internals/test_replicate_edgelist_mg.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. +# Copyright (c) 2022-2025, NVIDIA CORPORATION. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -88,7 +88,7 @@ def test_mg_replicate_edgelist( if distributed: # Distribute the edges across all ranks - num_workers = len(dask_client.scheduler_info()["workers"]) + num_workers = dask_client.scheduler_info()["n_workers"] df = dask_cudf.from_cudf(df, npartitions=num_workers) ddf = replicate_edgelist( df[columns], weight=weight, edge_id=edge_id, edge_type=edge_type diff --git a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py index 41483998fb5..8a66501d297 100644 --- a/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py +++ b/python/cugraph/cugraph/tests/sampling/test_uniform_neighbor_sample_mg.py @@ -127,7 +127,7 @@ def test_mg_uniform_neighbor_sample_simple(dask_client, input_combo): # Drop parallel edges for non MultiGraph # FIXME: Drop multi edges with the CAPI instead. vertex_col_name = ["src", "dst"] - workers = dask_client.scheduler_info()["workers"] + workers = dask_client.scheduler_info(n_workers=-1)["workers"] input_df = _memory_efficient_drop_duplicates( input_df, vertex_col_name, len(workers) ) @@ -333,7 +333,7 @@ def test_mg_uniform_neighbor_sample_ensure_no_duplicates(dask_client): @pytest.mark.mg @pytest.mark.parametrize("return_offsets", [True, False]) def test_uniform_neighbor_sample_edge_properties(dask_client, return_offsets): - n_workers = len(dask_client.scheduler_info()["workers"]) + n_workers = dask_client.scheduler_info()["n_workers"] if n_workers <= 1: pytest.skip("Test only valid for MG environments") edgelist_df = dask_cudf.from_cudf( @@ -696,7 +696,7 @@ def test_uniform_neighbor_sample_without_dask_inputs(dask_client): @pytest.mark.parametrize("input_df", [cudf.DataFrame, dask_cudf.DataFrame]) @pytest.mark.parametrize("max_batches", [2, 8, 16, 32]) def test_uniform_neighbor_sample_batched(dask_client, dataset, input_df, max_batches): - num_workers = len(dask_client.scheduler_info()["workers"]) + num_workers = dask_client.scheduler_info()["n_workers"] df = dataset.get_edgelist() df["eid"] = cupy.arange(len(df), dtype=df["src"].dtype) @@ -1039,7 +1039,7 @@ def test_uniform_neighbor_sample_offset_renumber(dask_client, hops): ) # can't use compute() since empty batches still get a partition - n_workers = len(dask_client.scheduler_info()["workers"]) + n_workers = dask_client.scheduler_info()["n_workers"] for p in range(n_workers): partition = offsets_renumbered.get_partition(p).compute() if not pandas.isna(partition.batch_id.iloc[0]): @@ -1105,7 +1105,7 @@ def test_uniform_neighbor_sample_csr_csc_global(dask_client, hops, seed): ) # can't use compute() since empty batches still get a partition - n_workers = len(dask_client.scheduler_info()["workers"]) + n_workers = dask_client.scheduler_info()["n_workers"] for p in range(n_workers): partition = offsets.get_partition(p).compute() if not pandas.isna(partition.batch_id.iloc[0]): @@ -1161,7 +1161,7 @@ def test_uniform_neighbor_sample_csr_csc_local(dask_client, hops, seed): ) # can't use compute() since empty batches still get a partition - n_workers = len(dask_client.scheduler_info()["workers"]) + n_workers = dask_client.scheduler_info()["n_workers"] for p in range(n_workers): partition = offsets.get_partition(p).compute() diff --git a/scripts/dask/wait_for_workers.py b/scripts/dask/wait_for_workers.py index 931e991c4cf..93c5cf5e448 100644 --- a/scripts/dask/wait_for_workers.py +++ b/scripts/dask/wait_for_workers.py @@ -57,7 +57,7 @@ def wait_for_workers( sys.stdout.flush() break with Client(scheduler_file=scheduler_file_path) as client: - num_workers = len(client.scheduler_info()["workers"]) + num_workers = client.scheduler_info()["n_workers"] if num_workers < num_expected_workers: print( f"wait_for_workers.py expected {num_expected_workers} but got {num_workers}, waiting..."