Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/cugraph/standalone/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2025, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -113,7 +113,7 @@ def run(
# If the number of GPUs is None, This is a MNMG run
# Extract the number of gpus from the client
if n_gpus is None:
n_gpus = len(setup_objs[0].scheduler_info()["workers"])
n_gpus = setup_objs[0].scheduler_info()["n_workers"]
log("done.")

try:
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/shared/build_cugraph_ucx/test_client_bandwidth.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2025, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -46,7 +46,7 @@ def create_dataframe(client):
}
)
ddf = dask_cudf.from_cudf(
df, npartitions=len(client.scheduler_info()["workers"])
df, npartitions=client.scheduler_info()["n_workers"]
).persist()
client.rebalance(ddf)
del df
Expand Down
4 changes: 2 additions & 2 deletions mg_utils/wait_for_workers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
# Copyright (c) 2023-2025, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -58,7 +58,7 @@ def wait_for_workers(
sys.stdout.flush()
break
with Client(scheduler_file=scheduler_file_path) as client:
num_workers = len(client.scheduler_info()["workers"])
num_workers = client.scheduler_info()["n_workers"]
if num_workers < num_expected_workers:
print(
f"wait_for_workers.py expected {num_expected_workers} but got {num_workers}, waiting..."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ def num_gpus(self):
the number of GPUs accessible through dask.
"""
return (
len(self.__dask_client.scheduler_info()["workers"])
if self.is_multi_gpu
else 1
self.__dask_client.scheduler_info()["n_workers"] if self.is_multi_gpu else 1
)

def uptime(self):
Expand Down
4 changes: 2 additions & 2 deletions python/cugraph/cugraph/dask/common/read_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2022, NVIDIA CORPORATION.
# Copyright (c) 2019-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,7 @@ def get_n_workers():
from dask.distributed import default_client

client = default_client()
return len(client.scheduler_info()["workers"])
return client.scheduler_info()["n_workers"]


def get_chunksize(input_path):
Expand Down
4 changes: 2 additions & 2 deletions python/cugraph/cugraph/dask/comms/comms.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2018-2024, NVIDIA CORPORATION.
# Copyright (c) 2018-2025, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -269,7 +269,7 @@ def rank_to_worker(client):
"""
Return a mapping of ranks to dask workers.
"""
workers = client.scheduler_info()["workers"].keys()
workers = client.scheduler_info(n_workers=-1)["workers"].keys()
worker_info = __instance.worker_info(workers)
rank_to_worker = {}
for w in worker_info:
Expand Down
4 changes: 2 additions & 2 deletions python/cugraph/cugraph/generators/rmat.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2025, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -271,7 +271,7 @@ def _mg_rmat(
each subsequent worker will receive seed+<worker num> as the seed value.
"""
client = default_client()
worker_list = list(client.scheduler_info()["workers"].keys())
worker_list = list(client.scheduler_info(n_workers=-1)["workers"].keys())
num_workers = len(worker_list)
num_edges_list = _calc_num_edges_per_worker(num_workers, num_edges)
result = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def __from_edgelist(
ddf_columns = s_col + d_col
self.vertex_columns = ddf_columns.copy()
_client = default_client()
workers = _client.scheduler_info()["workers"]
workers = _client.scheduler_info(n_workers=-1)["workers"]
# Repartition to 2 partitions per GPU for memory efficient process
input_ddf = input_ddf.repartition(npartitions=len(workers) * 2)
# The dataframe will be symmetrized iff the graph is undirected
Expand Down Expand Up @@ -365,7 +365,7 @@ def __from_edgelist(
is_symmetric=not self.properties.directed,
)
ddf = ddf.repartition(npartitions=len(workers) * 2)
workers = _client.scheduler_info()["workers"].keys()
workers = _client.scheduler_info(n_workers=-1)["workers"].keys()
persisted_keys_d = persist_dask_df_equal_parts_per_worker(
ddf, _client, return_type="dict"
)
Expand Down Expand Up @@ -482,7 +482,7 @@ def view_edge_list(self):
# Drop parallel edges for non MultiGraph
# FIXME: Drop multi edges with the CAPI instead.
_client = default_client()
workers = _client.scheduler_info()["workers"]
workers = _client.scheduler_info(n_workers=-1)["workers"]
edgelist_df = _memory_efficient_drop_duplicates(
edgelist_df, [srcCol, dstCol], len(workers)
)
Expand Down
2 changes: 1 addition & 1 deletion python/cugraph/cugraph/structure/symmetrize.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def symmetrize_ddf(
"""
# FIXME: Uncomment out the above (broken) example
_client = default_client()
workers = _client.scheduler_info()["workers"]
workers = _client.scheduler_info(n_workers=-1)["workers"]

if not isinstance(src_name, list):
src_name = [src_name]
Expand Down
4 changes: 2 additions & 2 deletions python/cugraph/cugraph/tests/generators/test_rmat_mg.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
# Copyright (c) 2021-2025, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -65,7 +65,7 @@ def setup_module():
global _visible_devices
if not _is_single_gpu:
(_client, _cluster) = start_dask_client()
_visible_devices = _client.scheduler_info()["workers"]
_visible_devices = _client.scheduler_info(n_workers=-1)["workers"]


def teardown_module():
Expand Down
10 changes: 5 additions & 5 deletions python/cugraph/cugraph/tests/internals/test_renumber_mg.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2024, NVIDIA CORPORATION.
# Copyright (c) 2019-2025, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -91,7 +91,7 @@ def test_mg_renumber(dataset, dask_client):
gdf["dst"] = destinations + translate

ddf = dask.dataframe.from_pandas(
gdf, npartitions=len(dask_client.scheduler_info()["workers"])
gdf, npartitions=dask_client.scheduler_info()["n_workers"]
)

# preserve_order is not supported for MG
Expand Down Expand Up @@ -140,7 +140,7 @@ def test_mg_renumber_add_internal_vertex_id(dataset, dask_client):
gdf["weight"] = gdf.index.astype(np.float64)

ddf = dask.dataframe.from_pandas(
gdf, npartitions=len(dask_client.scheduler_info()["workers"])
gdf, npartitions=dask_client.scheduler_info()["n_workers"]
)

ren2, num2 = NumberMap.renumber(ddf, ["src", "src_old"], ["dst", "dst_old"])
Expand Down Expand Up @@ -214,7 +214,7 @@ def test_mg_renumber_common_col_names(dataset, dask_client):
}
)
ddf = dask.dataframe.from_pandas(
gdf, npartitions=len(dask_client.scheduler_info()["workers"])
gdf, npartitions=dask_client.scheduler_info()["n_workers"]
)

renumbered_df, renumber_map = NumberMap.renumber(
Expand All @@ -238,7 +238,7 @@ def test_mg_renumber_common_col_names(dataset, dask_client):
)

ddf = dask.dataframe.from_pandas(
gdf, npartitions=len(dask_client.scheduler_info()["workers"])
gdf, npartitions=dask_client.scheduler_info()["n_workers"]
)

renumbered_df, renumber_map = NumberMap.renumber(ddf, "col_a", "col_b")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
# Copyright (c) 2022-2025, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -88,7 +88,7 @@ def test_mg_replicate_edgelist(

if distributed:
# Distribute the edges across all ranks
num_workers = len(dask_client.scheduler_info()["workers"])
num_workers = dask_client.scheduler_info()["n_workers"]
df = dask_cudf.from_cudf(df, npartitions=num_workers)
ddf = replicate_edgelist(
df[columns], weight=weight, edge_id=edge_id, edge_type=edge_type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def test_mg_uniform_neighbor_sample_simple(dask_client, input_combo):
# Drop parallel edges for non MultiGraph
# FIXME: Drop multi edges with the CAPI instead.
vertex_col_name = ["src", "dst"]
workers = dask_client.scheduler_info()["workers"]
workers = dask_client.scheduler_info(n_workers=-1)["workers"]
input_df = _memory_efficient_drop_duplicates(
input_df, vertex_col_name, len(workers)
)
Expand Down Expand Up @@ -333,7 +333,7 @@ def test_mg_uniform_neighbor_sample_ensure_no_duplicates(dask_client):
@pytest.mark.mg
@pytest.mark.parametrize("return_offsets", [True, False])
def test_uniform_neighbor_sample_edge_properties(dask_client, return_offsets):
n_workers = len(dask_client.scheduler_info()["workers"])
n_workers = dask_client.scheduler_info()["n_workers"]
if n_workers <= 1:
pytest.skip("Test only valid for MG environments")
edgelist_df = dask_cudf.from_cudf(
Expand Down Expand Up @@ -696,7 +696,7 @@ def test_uniform_neighbor_sample_without_dask_inputs(dask_client):
@pytest.mark.parametrize("input_df", [cudf.DataFrame, dask_cudf.DataFrame])
@pytest.mark.parametrize("max_batches", [2, 8, 16, 32])
def test_uniform_neighbor_sample_batched(dask_client, dataset, input_df, max_batches):
num_workers = len(dask_client.scheduler_info()["workers"])
num_workers = dask_client.scheduler_info()["n_workers"]

df = dataset.get_edgelist()
df["eid"] = cupy.arange(len(df), dtype=df["src"].dtype)
Expand Down Expand Up @@ -1039,7 +1039,7 @@ def test_uniform_neighbor_sample_offset_renumber(dask_client, hops):
)

# can't use compute() since empty batches still get a partition
n_workers = len(dask_client.scheduler_info()["workers"])
n_workers = dask_client.scheduler_info()["n_workers"]
for p in range(n_workers):
partition = offsets_renumbered.get_partition(p).compute()
if not pandas.isna(partition.batch_id.iloc[0]):
Expand Down Expand Up @@ -1105,7 +1105,7 @@ def test_uniform_neighbor_sample_csr_csc_global(dask_client, hops, seed):
)

# can't use compute() since empty batches still get a partition
n_workers = len(dask_client.scheduler_info()["workers"])
n_workers = dask_client.scheduler_info()["n_workers"]
for p in range(n_workers):
partition = offsets.get_partition(p).compute()
if not pandas.isna(partition.batch_id.iloc[0]):
Expand Down Expand Up @@ -1161,7 +1161,7 @@ def test_uniform_neighbor_sample_csr_csc_local(dask_client, hops, seed):
)

# can't use compute() since empty batches still get a partition
n_workers = len(dask_client.scheduler_info()["workers"])
n_workers = dask_client.scheduler_info()["n_workers"]
for p in range(n_workers):
partition = offsets.get_partition(p).compute()

Expand Down
2 changes: 1 addition & 1 deletion scripts/dask/wait_for_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def wait_for_workers(
sys.stdout.flush()
break
with Client(scheduler_file=scheduler_file_path) as client:
num_workers = len(client.scheduler_info()["workers"])
num_workers = client.scheduler_info()["n_workers"]
if num_workers < num_expected_workers:
print(
f"wait_for_workers.py expected {num_expected_workers} but got {num_workers}, waiting..."
Expand Down