From 8408faa8ccc7e39ea9db5b2372494f30cd2dc316 Mon Sep 17 00:00:00 2001 From: Benjamin Zaitlen Date: Wed, 13 May 2026 21:19:09 +0000 Subject: [PATCH] update bulk ray with additional options --- .../examples/ray/bulk_ray_shuffle.py | 305 ++++++++++++++++-- python/rapidsmpf/rapidsmpf/tests/test_ray.py | 43 ++- 2 files changed, 318 insertions(+), 30 deletions(-) diff --git a/python/rapidsmpf/rapidsmpf/examples/ray/bulk_ray_shuffle.py b/python/rapidsmpf/rapidsmpf/examples/ray/bulk_ray_shuffle.py index 7d86a3eaf..04966e80d 100644 --- a/python/rapidsmpf/rapidsmpf/examples/ray/bulk_ray_shuffle.py +++ b/python/rapidsmpf/rapidsmpf/examples/ray/bulk_ray_shuffle.py @@ -16,6 +16,7 @@ import pylibcudf as plc import rmm.mr +from rapidsmpf.config import Options from rapidsmpf.integrations.cudf.partition import ( partition_and_pack, unpack_and_concat, @@ -24,6 +25,7 @@ from rapidsmpf.integrations.ray import RapidsMPFActor, setup_ray_ucxx_cluster from rapidsmpf.memory.buffer import MemoryType from rapidsmpf.memory.buffer_resource import BufferResource, LimitAvailableMemory +from rapidsmpf.memory.pinned_memory_resource import PinnedMemoryResource from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor from rapidsmpf.shuffler import Shuffler from rapidsmpf.statistics import Statistics @@ -55,6 +57,12 @@ class BulkRayShufflerActor(RapidsMPFActor): Size of the RMM memory pool in bytes. spill_device Device memory limit for spilling to host in bytes. + rmm_async + Whether to use RMM's cudaMallocAsync-backed memory resource. + pinned_memory + Whether to use pinned host memory for spilling when available. + pinned_initial_pool_size + Initial pinned host memory pool size in bytes. enable_statistics Whether to collect statistics. """ @@ -69,6 +77,9 @@ def __init__( rmm_pool_size: int = 1024 * 1024 * 1024, spill_device: int | None = None, *, + rmm_async: bool = False, + pinned_memory: bool = False, + pinned_initial_pool_size: int | None = None, enable_statistics: bool = False, ): self.batchsize = batchsize @@ -76,17 +87,23 @@ def __init__( self.output_path = output_path self.total_nparts = total_nparts self.rmm_pool_size = rmm_pool_size + self.rmm_async = rmm_async self.spill_device = spill_device + self.pinned_memory = pinned_memory + self.pinned_initial_pool_size = pinned_initial_pool_size # Initialize actor-local resources (statistics, memory resource) self.mr = RmmResourceAdaptor( - rmm.mr.PoolMemoryResource( - rmm.mr.CudaMemoryResource(), - initial_pool_size=self.rmm_pool_size, - maximum_pool_size=self.rmm_pool_size, + _make_device_memory_resource( + self.rmm_pool_size, + rmm_async=self.rmm_async, ) ) rmm.mr.set_current_device_resource(self.mr) + self.pinned_mr = _make_pinned_memory_resource( + pinned_memory=self.pinned_memory, + pinned_initial_pool_size=self.pinned_initial_pool_size, + ) # Create a buffer resource that limits device memory if `--spill-device` memory_available = ( None @@ -97,7 +114,11 @@ def __init__( ) } ) - br = BufferResource(self.mr, memory_available=memory_available) + br = BufferResource( + self.mr, + pinned_mr=self.pinned_mr, + memory_available=memory_available, + ) self.br = br super().__init__(nranks, Statistics(enable=enable_statistics)) @@ -120,7 +141,11 @@ def setup_worker(self, root_address_bytes: bytes) -> None: def cleanup(self) -> None: """Cleanup the UCXX communication and the shuffle operation.""" - self.comm.logger.info(self.statistics.report()) + if self.statistics.enabled: + report = self.statistics.report(mr=self.mr, pinned_mr=self.pinned_mr) + print(f"=== Statistics report (rank {self.comm.rank}) ===") + print(report, flush=True) + self.comm.logger.info(report) if self.shuffler is not None: self.shuffler.shutdown() @@ -253,6 +278,109 @@ def extract_and_write(self, column_names: list[str]) -> None: for partition_id, partition in self.extract(): self.write_table(partition, self.output_path, partition_id, column_names) + def lsh_read_and_insert( + self, + partitions: list[list[str]], + band_range: tuple[int, int], + minhashes_per_band: int, + id_field: str, + minhash_field: str, + ) -> list[str]: + """ + Read minhash parquets per partition, hash bands, melt, and insert. + + `partitions` is a list of file groups (typically ~2 GiB each, produced by + Curator's FilePartitioningStage). Each partition is read as one cudf.read_parquet + call so reads stay aligned with the user-chosen blocksize. + + band_range is half-open [start, end). All bands in the range are processed in a + single shuffle pass -- there is no iteration over bands. + """ + import cudf + + from rapidsmpf.utils.cudf import cudf_to_pylibcudf_table + + bucket_field = "_bucket_id" + column_names = [id_field, bucket_field] + start, end = band_range + if start < 0 or start >= end: + raise ValueError(f"Invalid band range: {band_range}") + for partition_files in partitions: + df = cudf.read_parquet(partition_files, columns=[id_field, minhash_field]) + if len(df) == 0: + continue + id_df = df[[id_field]] + for k in range(start, end): + idx = list(range(k * minhashes_per_band, (k + 1) * minhashes_per_band)) + rep = cudf.Series([idx]).repeat(len(id_df)) + id_df[f"_bucket_{k}"] = f"b{k}_" + df[minhash_field].list.take( + rep + ).hash_values(method="md5") + value_vars = [f"_bucket_{k}" for k in range(start, end)] + band_df = id_df.melt( + id_vars=[id_field], value_name=bucket_field, value_vars=value_vars + )[column_names] + self.insert_chunk(cudf_to_pylibcudf_table(band_df), column_names) + del df, id_df, band_df + self.insert_finished() + return column_names + + def lsh_extract_and_write(self, id_field: str) -> None: + """Extract shuffled bucket rows, drop singletons, group ids per bucket, write parquet.""" + bucket_field = "_bucket_id" + column_names = [id_field, bucket_field] + for partition_id, partition in self.extract(): + df = pylibcudf_to_cudf_dataframe(partition, column_names=column_names) + if len(df) == 0: + continue + df = df[df[bucket_field].duplicated(keep=False)] + if len(df) == 0: + continue + grouped = ( + df.groupby(bucket_field)[id_field] + .agg(list) + .list.sort_values() + .reset_index() + ) + grouped.to_parquet( + f"{self.output_path}/part.{partition_id}.parquet", index=False + ) + del df, grouped + + +def _make_device_memory_resource( + rmm_pool_size: int, + *, + rmm_async: bool, +) -> rmm.mr.DeviceMemoryResource: + if rmm_async: + # UCXX may transport same-node device buffers through CUDA IPC. + return rmm.mr.CudaAsyncMemoryResource( + initial_pool_size=rmm_pool_size, + release_threshold=rmm_pool_size, + enable_ipc=True, + ) + + return rmm.mr.PoolMemoryResource( + rmm.mr.CudaMemoryResource(), + initial_pool_size=rmm_pool_size, + maximum_pool_size=rmm_pool_size, + ) + + +def _make_pinned_memory_resource( + *, + pinned_memory: bool, + pinned_initial_pool_size: int | None, +) -> PinnedMemoryResource | None: + if not pinned_memory: + return None + + options = {"pinned_memory": "True"} + if pinned_initial_pool_size is not None: + options["pinned_initial_pool_size"] = str(pinned_initial_pool_size) + return PinnedMemoryResource.from_options(Options(options)) + def bulk_ray_shuffle( paths: list[str], @@ -264,6 +392,9 @@ def bulk_ray_shuffle( rmm_pool_size: int = 1024 * 1024 * 1024, spill_device: int | None = None, *, + rmm_async: bool = False, + pinned_memory: bool = False, + pinned_initial_pool_size: int | None = None, enable_statistics: bool = False, ) -> None: """ @@ -284,9 +415,16 @@ def bulk_ray_shuffle( num_output_files The number of output files to write. rmm_pool_size - The size of the RMM pool. + The size of the RMM pool. When rmm_async is enabled, this is used as the + async resource initial pool size and release threshold. spill_device Device memory limit for spilling to host. + rmm_async + Whether to use RMM's cudaMallocAsync-backed memory resource. + pinned_memory + Whether to use pinned host memory for spilling when available. + pinned_initial_pool_size + Initial pinned host memory pool size in bytes. enable_statistics Whether to collect statistics. """ @@ -305,6 +443,9 @@ def bulk_ray_shuffle( output_path=output_path, enable_statistics=enable_statistics, rmm_pool_size=rmm_pool_size, + rmm_async=rmm_async, + pinned_memory=pinned_memory, + pinned_initial_pool_size=pinned_initial_pool_size, spill_device=spill_device, ) start_time = time.time() @@ -327,6 +468,74 @@ def bulk_ray_shuffle( ray.get([actor.cleanup.remote() for actor in actors]) +def lsh_bulk_ray_shuffle( + partitions: list[list[str]], + output_path: str, + num_bands: int, + minhashes_per_band: int, + id_field: str, + minhash_field: str, + num_workers: int = 8, + num_output_files: int | None = None, + rmm_pool_size: int = 1024 * 1024 * 1024, + spill_device: int | None = None, + *, + rmm_async: bool = False, + pinned_memory: bool = False, + pinned_initial_pool_size: int | None = None, + enable_statistics: bool = False, +) -> None: + """ + LSH-mode driver: read minhashes per partition, hash bands, shuffle on bucket id, group, write. + + `partitions` is a list of file groups produced by Curator's FilePartitioningStage + (each group is ~2 GiB by default). Partitions are distributed round-robin across + `num_workers` actors. All `num_bands` bands are shuffled in a single pass. + """ + num_partitions = len(partitions) + total_num_partitions = num_output_files or num_partitions + + actors = setup_ray_ucxx_cluster( + BulkRayShufflerActor, + num_workers=num_workers, + total_nparts=total_num_partitions, + shuffle_on=["_bucket_id"], + batchsize=1, + output_path=output_path, + enable_statistics=enable_statistics, + rmm_pool_size=rmm_pool_size, + rmm_async=rmm_async, + pinned_memory=pinned_memory, + pinned_initial_pool_size=pinned_initial_pool_size, + spill_device=spill_device, + ) + # Round-robin partition assignment so size variance is spread across actors. + actor_partitions: list[list[list[str]]] = [[] for _ in range(num_workers)] + for i, part in enumerate(partitions): + actor_partitions[i % num_workers].append(part) + print( + f"Distributing {num_partitions} partitions across {num_workers} actors " + f"(min/max per actor: {min(len(p) for p in actor_partitions)} / {max(len(p) for p in actor_partitions)})" + ) + + start_time = time.time() + insert_tasks = [ + actor.lsh_read_and_insert.remote( + actor_partitions[i], + band_range=(0, num_bands), + minhashes_per_band=minhashes_per_band, + id_field=id_field, + minhash_field=minhash_field, + ) + for i, actor in enumerate(actors) + ] + ray.get(insert_tasks) + ray.get([actor.lsh_extract_and_write.remote(id_field) for actor in actors]) + end_time = time.time() + print(f"LSH shuffle time: {end_time - start_time} seconds") + ray.get([actor.cleanup.remote() for actor in actors]) + + def dir_path(path: str) -> Path: """ Validate that the given path is a directory and return a Path object. @@ -364,17 +573,26 @@ def setup_and_run(args: argparse.Namespace) -> None: if args.ray_address or os.environ.get("RAY_ADDRESS") is not None: ray.init(address="auto") # connect to existing cluster else: - ray.init(num_gpus=args.num_workers, dashboard_host="0.0.0.0") + ray.init(num_cpus=64, num_gpus=args.num_workers, dashboard_host="0.0.0.0") + + import json - bulk_ray_shuffle( - paths=sorted(map(str, args.input.glob("**/*"))), - shuffle_on=args.on.split(","), + with Path(args.partitions_json).open() as fh: + partitions = json.load(fh) + lsh_bulk_ray_shuffle( + partitions=partitions, output_path=args.output, + num_bands=args.num_bands, + minhashes_per_band=args.minhashes_per_band, + id_field=args.id_field, + minhash_field=args.minhash_field, num_workers=args.num_workers, - batchsize=args.batchsize, num_output_files=args.n_output_files, enable_statistics=args.statistics, rmm_pool_size=args.rmm_pool_size, + rmm_async=args.rmm_async, + pinned_memory=args.pinned_memory, + pinned_initial_pool_size=args.pinned_initial_pool_size, spill_device=args.spill_device, ) @@ -391,10 +609,10 @@ def setup_and_run(args: argparse.Namespace) -> None: help="Number of workers to use.", ) parser.add_argument( - "input", - type=dir_path, - metavar="INPUT_DIR_PATH", - help="Input directory path.", + "partitions_json", + type=str, + metavar="PARTITIONS_JSON", + help="Path to JSON file produced by make_partitions.py (list[list[str]]).", ) parser.add_argument( "output", @@ -403,10 +621,28 @@ def setup_and_run(args: argparse.Namespace) -> None: help="Output directory path.", ) parser.add_argument( - "on", - metavar="COLUMN_LIST", + "--num-bands", + type=int, + default=20, + help="Number of LSH bands to process in a single shuffle pass (bands [0, num_bands)).", + ) + parser.add_argument( + "--minhashes-per-band", + type=int, + default=13, + help="Number of minhashes per LSH band.", + ) + parser.add_argument( + "--id-field", type=str, - help="Comma-separated list of column names to shuffle on.", + default="_curator_dedup_id", + help="Document id column.", + ) + parser.add_argument( + "--minhash-field", + type=str, + default="_minhash_signature", + help="Minhash list column.", ) parser.add_argument( "--n-output-files", @@ -414,19 +650,36 @@ def setup_and_run(args: argparse.Namespace) -> None: default=None, help="Number of output files. Default preserves input file count.", ) - parser.add_argument( - "--batchsize", - type=int, - default=1, - help="Number of files to read on each MPI rank at once.", - ) parser.add_argument( "--rmm-pool-size", type=parse_bytes, default=format_bytes(int(rmm.mr.available_device_memory()[1] * 0.8)), help=( "The size of the RMM pool as a string with unit such as '2MiB' and '4KiB'. " - "Default to 80%% of the total device memory, which is %(default)s." + "With --rmm-async, this is used as the async resource initial pool size " + "and release threshold. Default to 80%% of the total device memory, " + "which is %(default)s." + ), + ) + parser.add_argument( + "--rmm-async", + default=False, + action="store_true", + help="Use RMM's cudaMallocAsync-backed memory resource.", + ) + parser.add_argument( + "--pinned-memory", + default=False, + action=argparse.BooleanOptionalAction, + help="Use pinned host memory for spilling when available.", + ) + parser.add_argument( + "--pinned-initial-pool-size", + type=parse_bytes, + default=None, + help=( + "Initial pinned host memory pool size as a string with unit such as " + "'2GiB'. Default uses RapidsMPF's pinned memory resource default." ), ) parser.add_argument( diff --git a/python/rapidsmpf/rapidsmpf/tests/test_ray.py b/python/rapidsmpf/rapidsmpf/tests/test_ray.py index 5d2bee4cb..05e9c5bf0 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_ray.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_ray.py @@ -5,26 +5,32 @@ import os from typing import TYPE_CHECKING +import pytest import toolz from cuda.core import system +import rmm + os.environ["RAY_DEDUP_LOGS"] = "0" os.environ["RAY_IGNORE_UNHANDLED_ERRORS"] = "1" -import pytest - ray = pytest.importorskip("ray") if TYPE_CHECKING: from collections.abc import Generator -from rapidsmpf.examples.ray.ray_shuffle_example import ( # noqa: E402 - ShufflingActor, +from rapidsmpf.examples.ray.bulk_ray_shuffle import ( # noqa: E402 + _make_device_memory_resource, + _make_pinned_memory_resource, ) +from rapidsmpf.examples.ray.ray_shuffle_example import ShufflingActor # noqa: E402 from rapidsmpf.integrations.ray import ( # noqa: E402 RapidsMPFActor, setup_ray_ucxx_cluster, ) +from rapidsmpf.memory.pinned_memory_resource import ( # noqa: E402 + is_pinned_memory_resources_supported, +) def get_nranks_if_spawned_by_mpi() -> int: @@ -127,6 +133,35 @@ class NonRapidsMPFActor: ... setup_ray_ucxx_cluster(NonRapidsMPFActor, 1) +def test_bulk_ray_shuffle_default_rmm_resource() -> None: + mr = _make_device_memory_resource(1024 * 1024, rmm_async=False) + assert isinstance(mr, rmm.mr.PoolMemoryResource) + + +def test_bulk_ray_shuffle_async_rmm_resource() -> None: + mr = _make_device_memory_resource(1024 * 1024, rmm_async=True) + assert isinstance(mr, rmm.mr.CudaAsyncMemoryResource) + + +def test_bulk_ray_shuffle_pinned_memory_disabled() -> None: + mr = _make_pinned_memory_resource( + pinned_memory=False, + pinned_initial_pool_size=1024 * 1024, + ) + assert mr is None + + +def test_bulk_ray_shuffle_pinned_memory_enabled_when_supported() -> None: + if not is_pinned_memory_resources_supported(): + pytest.skip("Pinned memory not supported on this system") + + mr = _make_pinned_memory_resource( + pinned_memory=True, + pinned_initial_pool_size=1024 * 1024, + ) + assert mr is not None + + @toolz.memoize def get_gpu_count() -> int: return system.get_num_devices() # type: ignore