Skip to content
This repository was archived by the owner on Sep 18, 2025. It is now read-only.
This repository was archived by the owner on Sep 18, 2025. It is now read-only.

Dask-cudf multi partition merge slows down with ucx #402

@VibhuJawa

Description

@VibhuJawa

Dask-cudf multi partition merge slows down with ucx .

Dask-cudf merge seems to slow down with ucx .

Wall time: (15.4 seconds on tcp) vs (37.8 s on ucx) (exp-01)

In the attached example we see a slow down with ucx vs just using tcp .

Wall Times on exp-01

UCX Time

CPU times: user 19.3 s, sys: 1.97 s, total: 21.2 s
Wall time: 38.4 s
2945293


CPU times: user 16.7 s, sys: 1.71 s, total: 18.4 s
Wall time: 37.8 s
2943379

TCP times

CPU times: user 10.8 s, sys: 815 ms, total: 11.6 s
Wall time: 15.7 s
2944022

CPU times: user 10.9 s, sys: 807 ms, total: 11.7 s
Wall time: 15.4 s
2943697 

Repro Code:

Helper Function to create distributed dask-cudf frame

import dask_cudf
import cudf
import os
import time
import dask.dataframe as dd
import dask.array as da

from dask_cuda import LocalCUDACluster
from dask.distributed import Client,wait
from dask.utils import parse_bytes

def create_random_data(n_rows=1_000,n_parts = 10, n_keys_index_1=100_000,n_keys_index_2=100,n_keys_index_3=100, col_prefix = 'a'):
    
    chunks = n_rows//n_parts

    df = dd.concat([
        da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_1'),
        da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_2'),
        da.random.random(n_rows, chunks = chunks).to_dask_dataframe(columns= col_prefix + '_non_merge_3'),
        da.random.randint(0, n_keys_index_1, size=n_rows,chunks = chunks ).to_dask_dataframe(columns= col_prefix + '_0'),
        da.random.randint(0, n_keys_index_2, size=n_rows, chunks = chunks ).to_dask_dataframe(columns= col_prefix +'_1'),
        da.random.randint(0, n_keys_index_3, size=n_rows, chunks = chunks ).to_dask_dataframe(columns= col_prefix +'_2'),
        
    ], axis=1).persist()
    
    gdf = df.map_partitions(cudf.from_pandas)
    gdf =  gdf.persist()
    _ = wait(gdf)
    return gdf

RMM Setup:

def setup_rmm_pool(client):
    client.run(
        cudf.set_allocator,
        pool=True,
        initial_pool_size= parse_bytes("26GB"),
        allocator="default"
    )
    return None

setup_rmm_pool(client)

Merge Code:

The slow down happens on the merge step.

rows_1, parts_1 = 140_176_770, 245
rows_2, parts_2 = 21_004_393, 171

df_1 = create_random_data(n_rows= rows_1, n_parts = parts_1, col_prefix = 'a')
df_2 = create_random_data(n_rows= rows_2,  n_parts = parts_2, col_prefix = 'b')

merged_df = df_1.merge(df_2, left_on = ['a_0','a_1','a_2'], right_on = ['b_0','b_1','b_2'])
%time len(merged_df)

Additional Context:

There has been discussion about this on our internal slack channel, please see for more context.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions