From 47f5ac48c6d223bcdd53b8e47464342a68bdeeec Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Fri, 8 May 2026 12:43:36 +0100 Subject: [PATCH 1/7] Add option to discard results without extracting in shuffle This allows benchmarking just the data movement part of the shuffle without the unspilling and concatenation of the results. Additionally, remove the unnecessary stream sync, the contract is the downstream data is available in stream-ordered fashion, so do that. --- cpp/benchmarks/bench_shuffle.cpp | 38 ++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/cpp/benchmarks/bench_shuffle.cpp b/cpp/benchmarks/bench_shuffle.cpp index 1744f488a..6bec096ba 100644 --- a/cpp/benchmarks/bench_shuffle.cpp +++ b/cpp/benchmarks/bench_shuffle.cpp @@ -52,7 +52,7 @@ class ArgumentParser { } try { int option; - while ((option = getopt(argc, argv, "C:r:w:c:n:p:o:m:l:LigsbxhM:")) != -1) { + while ((option = getopt(argc, argv, "C:r:w:c:n:p:o:m:l:LigsbdxhM:")) != -1) { switch (option) { case 'h': { @@ -77,7 +77,9 @@ class ArgumentParser { " unlimited)\n" << " -g Use pre-partitioned (hash) input tables " "(default: unset, hash partition during insertion)\n" - << " -s Discard output chunks to simulate streaming " + << " -s Discard output chunks after extract and " + "concat to simulate streaming (default: disabled)\n" + << " -d Discard result after shuffle completes " "(default: disabled)\n" << " -b Disallow memory overbooking when generating " "input data (default: allow memory overbooking)\n" @@ -159,6 +161,9 @@ class ArgumentParser { case 's': enable_output_discard = true; break; + case 'd': + just_shuffle = true; + break; case 'b': input_data_allow_overbooking = rapidsmpf::AllowOverbooking::NO; break; @@ -235,6 +240,9 @@ class ArgumentParser { if (enable_output_discard) { ss << " -s (enable output discard to simulate streaming)\n"; } + if (just_shuffle) { + ss << " -d (only shuffle, no extraction)\n"; + } if (input_data_allow_overbooking == rapidsmpf::AllowOverbooking::NO) { ss << " -b (disallow memory overbooking when generating input data)\n"; } @@ -271,6 +279,7 @@ class ArgumentParser { std::int64_t device_mem_limit_mb{-1}; bool pinned_mem_disable{false}; bool enable_cupti_monitoring{false}; + bool just_shuffle{false}; std::string cupti_csv_prefix; }; @@ -315,20 +324,21 @@ rapidsmpf::Duration do_run( shuffle_insert_fn(shuffler); shuffler.wait(); - for (auto finished_partition : shuffler.local_partitions()) { - auto packed_chunks = shuffler.extract(finished_partition); - auto output_partition = rapidsmpf::unpack_and_concat( - rapidsmpf::unspill_partitions( - std::move(packed_chunks), br, rapidsmpf::AllowOverbooking::YES - ), - stream, - br - ); - if (!args.enable_output_discard) { - output_partitions.emplace_back(std::move(output_partition)); + if (!args.just_shuffle) { + for (auto finished_partition : shuffler.local_partitions()) { + auto packed_chunks = shuffler.extract(finished_partition); + auto output_partition = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(packed_chunks), br, rapidsmpf::AllowOverbooking::YES + ), + stream, + br + ); + if (!args.enable_output_discard) { + output_partitions.emplace_back(std::move(output_partition)); + } } } - stream.synchronize(); } auto const elapsed = rapidsmpf::Clock::now() - t0_elapsed; From 9cf3784d507889463673e05dce455c522946dad4 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Fri, 8 May 2026 14:16:46 +0100 Subject: [PATCH 2/7] Wake once we have locally received everything --- cpp/src/shuffler/shuffler.cpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cpp/src/shuffler/shuffler.cpp b/cpp/src/shuffler/shuffler.cpp index 2c81a40e1..c7b5d75a5 100644 --- a/cpp/src/shuffler/shuffler.cpp +++ b/cpp/src/shuffler/shuffler.cpp @@ -173,20 +173,16 @@ class Shuffler::Progress { } // There are no messages to be posted, or waiting to be completed. - bool const containers_empty = - shuffler_.mpe_->is_idle() && shuffler_.to_send_.empty(); + bool const all_sends_posted = shuffler_.to_send_.empty(); // We've inserted a finish message and we've received everything we expect. bool const is_finished = shuffler_.locally_finished_.load(std::memory_order_acquire) && shuffler_.finish_counter_.all_finished(); - // Finished and shuffler is no longer active. - bool const is_done = !shuffler_.active_.load(std::memory_order_acquire) - && is_finished && containers_empty; - // Signal can_extract_ when all chunks have been received and all internal - // containers are drained. If we own no partitions we "can-extract" immediately, - // but we only wake a waiter once we've drained internal containers so that we can - // reuse the op_id for a subsequent shuffle. - if (!shuffler_.can_extract_ && is_finished && containers_empty) { + // Signal can_extract_ when all chunks have been received and all sends have been + // posted. If we own no partitions we "can-extract" immediately, but we only wake + // a waiter once we've drained internal containers so that we can reuse the op_id + // for a subsequent shuffle. + if (!shuffler_.can_extract_ && is_finished) { { std::lock_guard lock(shuffler_.mutex_); shuffler_.can_extract_ = true; @@ -196,6 +192,10 @@ class Shuffler::Progress { callback(); } } + // Finished and shuffler is no longer active. + bool const is_done = !shuffler_.active_.load(std::memory_order_acquire) + && is_finished && all_sends_posted + && shuffler_.mpe_->is_idle(); return is_done ? ProgressThread::ProgressState::Done : ProgressThread::ProgressState::InProgress; } From b33d5921c664543e28dd84fcfb5163ba993a8ece Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Fri, 8 May 2026 11:52:15 +0100 Subject: [PATCH 3/7] Post all receives for prefix of ready messages --- cpp/src/communicator/metadata_payload_exchange/tag.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/cpp/src/communicator/metadata_payload_exchange/tag.cpp b/cpp/src/communicator/metadata_payload_exchange/tag.cpp index c94467c5f..625910095 100644 --- a/cpp/src/communicator/metadata_payload_exchange/tag.cpp +++ b/cpp/src/communicator/metadata_payload_exchange/tag.cpp @@ -316,9 +316,6 @@ TagMetadataPayloadExchange::setup_data_receives() { ); // Store in per-rank vector to maintain order in_transit_messages_[src].push_back(std::move(tag_message)); - // Break to ensure we don't return later messages before this one - // completes - break; } else { // Control/metadata-only message // Only return if there are no earlier in-transit messages from this rank From 92e13fee96817de7c48837875b173884c78ca740 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Fri, 8 May 2026 11:50:55 +0100 Subject: [PATCH 4/7] Circulant shift for metadata receive in shuffle --- cpp/src/communicator/metadata_payload_exchange/tag.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/communicator/metadata_payload_exchange/tag.cpp b/cpp/src/communicator/metadata_payload_exchange/tag.cpp index 625910095..2dbcf0582 100644 --- a/cpp/src/communicator/metadata_payload_exchange/tag.cpp +++ b/cpp/src/communicator/metadata_payload_exchange/tag.cpp @@ -184,7 +184,8 @@ void TagMetadataPayloadExchange::receive_metadata() { // Use per-peer recv_from to avoid consuming messages belonging to a future // collective on the same tag (see rapidsai/rapidsmpf#927). - for (Rank peer = 0; peer < nranks_; ++peer) { + for (Rank i = 0; i < nranks_; ++i) { + auto const peer = (i + rank_) % nranks_; if (peer == rank_) { continue; } From 671545af7333f18c3192066e9fbe1299f2648a55 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Mon, 11 May 2026 12:25:23 +0100 Subject: [PATCH 5/7] Circulant shift for mpe termination marker send --- cpp/src/communicator/metadata_payload_exchange/tag.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/communicator/metadata_payload_exchange/tag.cpp b/cpp/src/communicator/metadata_payload_exchange/tag.cpp index 2dbcf0582..33c63a083 100644 --- a/cpp/src/communicator/metadata_payload_exchange/tag.cpp +++ b/cpp/src/communicator/metadata_payload_exchange/tag.cpp @@ -140,7 +140,8 @@ void TagMetadataPayloadExchange::finish() { // exactly how many application messages we sent to it, so the peer can // stop receiving once it has them all. // Format: [sentinel=UINT64_MAX (8 bytes)][message_count (8 bytes)] - for (Rank peer = 0; peer < nranks_; ++peer) { + for (Rank i = 0; i < nranks_; ++i) { + auto const peer = (i + rank_) % nranks_; if (peer == rank_) { continue; } From dd2bb61ee7f0dd787ff07012591621daf95dabc4 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Mon, 11 May 2026 12:41:05 +0100 Subject: [PATCH 6/7] Introduce finished_polling method to MPE We can wake if the MPE has finished polling for new metadata, and don't need to wait for it to be completely idle. --- .../metadata_payload_exchange/core.hpp | 1 + .../metadata_payload_exchange/tag.hpp | 1 + .../metadata_payload_exchange/tag.cpp | 16 ++++++++++++++++ cpp/src/shuffler/shuffler.cpp | 7 ++++--- 4 files changed, 22 insertions(+), 3 deletions(-) diff --git a/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/core.hpp b/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/core.hpp index 4657a5928..077d27ca4 100644 --- a/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/core.hpp +++ b/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/core.hpp @@ -208,6 +208,7 @@ class MetadataPayloadExchange { * @return `true` if the communication layer is idle; `false` if activity is ongoing. */ [[nodiscard]] virtual bool is_idle() const = 0; + [[nodiscard]] virtual bool finished_polling() const = 0; }; diff --git a/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/tag.hpp b/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/tag.hpp index f712c0c1e..376428363 100644 --- a/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/tag.hpp +++ b/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/tag.hpp @@ -83,6 +83,7 @@ class TagMetadataPayloadExchange : public MetadataPayloadExchange { * @copydoc MetadataPayloadExchange::is_idle */ bool is_idle() const override; + bool finished_polling() const override; private: /** diff --git a/cpp/src/communicator/metadata_payload_exchange/tag.cpp b/cpp/src/communicator/metadata_payload_exchange/tag.cpp index 33c63a083..bcb6bfd4d 100644 --- a/cpp/src/communicator/metadata_payload_exchange/tag.cpp +++ b/cpp/src/communicator/metadata_payload_exchange/tag.cpp @@ -159,6 +159,22 @@ void TagMetadataPayloadExchange::finish() { } } +bool TagMetadataPayloadExchange::finished_polling() const { + if (finished_) { + for (Rank peer = 0; peer < nranks_; ++peer) { + if (peer == rank_) { + continue; + } + auto const p = safe_cast(peer); + if (!peer_terminated_[p] || peer_received_[p] < peer_expected_[p]) { + return false; + } + } + return true; + } + return false; +} + bool TagMetadataPayloadExchange::is_idle() const { bool const io_idle = fire_and_forget_.empty() && incoming_messages_.empty() && in_transit_messages_.empty() && in_transit_futures_.empty(); diff --git a/cpp/src/shuffler/shuffler.cpp b/cpp/src/shuffler/shuffler.cpp index c7b5d75a5..5c11bd4f5 100644 --- a/cpp/src/shuffler/shuffler.cpp +++ b/cpp/src/shuffler/shuffler.cpp @@ -173,7 +173,8 @@ class Shuffler::Progress { } // There are no messages to be posted, or waiting to be completed. - bool const all_sends_posted = shuffler_.to_send_.empty(); + bool const containers_empty = + shuffler_.mpe_->finished_polling() && shuffler_.to_send_.empty(); // We've inserted a finish message and we've received everything we expect. bool const is_finished = shuffler_.locally_finished_.load(std::memory_order_acquire) @@ -182,7 +183,7 @@ class Shuffler::Progress { // posted. If we own no partitions we "can-extract" immediately, but we only wake // a waiter once we've drained internal containers so that we can reuse the op_id // for a subsequent shuffle. - if (!shuffler_.can_extract_ && is_finished) { + if (!shuffler_.can_extract_ && is_finished && containers_empty) { { std::lock_guard lock(shuffler_.mutex_); shuffler_.can_extract_ = true; @@ -194,7 +195,7 @@ class Shuffler::Progress { } // Finished and shuffler is no longer active. bool const is_done = !shuffler_.active_.load(std::memory_order_acquire) - && is_finished && all_sends_posted + && is_finished && containers_empty && shuffler_.mpe_->is_idle(); return is_done ? ProgressThread::ProgressState::Done : ProgressThread::ProgressState::InProgress; From 011ed993937502aac1f3d984432fc8e38c2d49a8 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Mon, 11 May 2026 14:32:30 +0100 Subject: [PATCH 7/7] Inline shuffle message exchange again Seems from benchmarking this won't be worth it --- cpp/include/rapidsmpf/shuffler/chunk.hpp | 19 +- cpp/include/rapidsmpf/shuffler/shuffler.hpp | 23 +- cpp/src/shuffler/chunk.cpp | 8 +- cpp/src/shuffler/shuffler.cpp | 298 +++++++++++--------- 4 files changed, 182 insertions(+), 166 deletions(-) diff --git a/cpp/include/rapidsmpf/shuffler/chunk.hpp b/cpp/include/rapidsmpf/shuffler/chunk.hpp index 75106e1cb..efcb57a0d 100644 --- a/cpp/include/rapidsmpf/shuffler/chunk.hpp +++ b/cpp/include/rapidsmpf/shuffler/chunk.hpp @@ -228,24 +228,17 @@ class Chunk { * @brief Create a chunk by deserializing a metadata message. * * @param msg The metadata message received from another rank. - * @param br Buffer resource for allocating the data buffer of the deserialized - * message. Ignored when @p data is provided. + * @param br Buffer resource for allocating a the data buffer of the deserialized + * message. * @param validate Whether to validate the metadata buffer. - * @param data Optional pre-existing data buffer to use instead of allocating a new - * one. When non-null, the buffer resource allocation is skipped entirely, avoiding - * unnecessary memory pressure from a temporary allocation. * @return The chunk. * - * @throws std::logic_error if the chunk is not a control message and neither @p data - * nor @p br is provided. - * @throws std::runtime_error if the metadata buffer does not follow the expected - * format and @p validate is true. + * @throws std::logic_error if the chunk is not a control message and no buffer + * resource is provided. @throws std::runtime_error if the metadata buffer does not + * follow the expected format and `validate` is true. */ static Chunk deserialize( - std::vector const& msg, - BufferResource* br, - bool validate = true, - std::unique_ptr data = nullptr + std::vector const& msg, BufferResource* br, bool validate = true ); /** diff --git a/cpp/include/rapidsmpf/shuffler/shuffler.hpp b/cpp/include/rapidsmpf/shuffler/shuffler.hpp index f0d8d00ad..3b4d3247a 100644 --- a/cpp/include/rapidsmpf/shuffler/shuffler.hpp +++ b/cpp/include/rapidsmpf/shuffler/shuffler.hpp @@ -16,7 +16,6 @@ #include #include -#include #include #include #include @@ -119,8 +118,6 @@ class Shuffler { * @param br Buffer resource used to allocate temporary and the shuffle result. * @param finished_callback Callback to notify when all partitions are finished. * @param partition_owner Function to determine partition ownership. - * @param mpe Optional custom metadata payload exchange. If not provided, - * uses the default tag-based implementation. * * @note It is safe to reuse the `op_id` as soon as `wait` has completed * locally. @@ -135,8 +132,7 @@ class Shuffler { PartID total_num_partitions, BufferResource* br, FinishedCallback&& finished_callback, - PartitionOwner partition_owner = round_robin, - std::unique_ptr mpe = nullptr + PartitionOwner partition_owner = round_robin ); /** @@ -148,8 +144,6 @@ class Shuffler { * @param total_num_partitions Total number of partitions in the shuffle. * @param br Buffer resource used to allocate temporary and the shuffle result. * @param partition_owner Function to determine partition ownership. - * @param mpe Optional custom metadata payload exchange. If not provided, - * uses the default tag-based implementation. * * @note The caller promises that inserted buffers are stream-ordered with respect * to their own stream, and extracted buffers are likewise guaranteed to be stream- @@ -160,18 +154,9 @@ class Shuffler { OpID op_id, PartID total_num_partitions, BufferResource* br, - PartitionOwner partition_owner = round_robin, - std::unique_ptr mpe = nullptr + PartitionOwner partition_owner = round_robin ) - : Shuffler( - comm, - op_id, - total_num_partitions, - br, - nullptr, - partition_owner, - std::move(mpe) - ) {} + : Shuffler(comm, op_id, total_num_partitions, br, nullptr, partition_owner) {} ~Shuffler(); @@ -328,12 +313,12 @@ class Shuffler { // Flipped to true exactly once when partitions are ready for extraction and we've // posted all sends we're going to bool can_extract_{false}; + OpID const op_id_; detail::ChunksToSend to_send_; ///< Storage for chunks to send to other ranks. detail::ReceivedChunks received_; ///< Storage for received chunks that are ///< ready to be extracted by the user. std::shared_ptr comm_; - std::unique_ptr mpe_; ProgressThread::FunctionID progress_thread_function_id_; SpillManager::SpillFunctionID spill_function_id_; diff --git a/cpp/src/shuffler/chunk.cpp b/cpp/src/shuffler/chunk.cpp index ea50edaac..4ce212ed2 100644 --- a/cpp/src/shuffler/chunk.cpp +++ b/cpp/src/shuffler/chunk.cpp @@ -55,10 +55,7 @@ Chunk Chunk::from_finished_partition( } Chunk Chunk::deserialize( - std::vector const& msg, - BufferResource* br, - bool validate, - std::unique_ptr data + std::vector const& msg, BufferResource* br, bool validate ) { if (validate) { RAPIDSMPF_EXPECTS( @@ -91,7 +88,8 @@ Chunk Chunk::deserialize( msg.begin() + safe_cast(offset), msg.end() ); - if (!data && expected_num_chunks == 0) { + std::unique_ptr data; + if (expected_num_chunks == 0) { RAPIDSMPF_EXPECTS( br != nullptr, "Deserializing non-control Chunk requires a BufferResource" ); diff --git a/cpp/src/shuffler/shuffler.cpp b/cpp/src/shuffler/shuffler.cpp index 5c11bd4f5..e47c4c770 100644 --- a/cpp/src/shuffler/shuffler.cpp +++ b/cpp/src/shuffler/shuffler.cpp @@ -13,8 +13,6 @@ #include #include -#include -#include #include #include #include @@ -27,45 +25,6 @@ namespace rapidsmpf::shuffler { using namespace detail; -namespace { - -/** - * @brief Convert chunks into messages for communication. - * - * This function converts a vector of chunks into messages suitable for sending - * through the metadata payload exchange. Each chunk is serialized and its data - * buffer is released to create the message. - * - * @param chunks Vector of chunks to convert (will be moved from). - * @param peer_rank_fn Function to determine the destination rank for each chunk. - * - * @return A vector of message unique pointers ready to be sent. - */ -template -std::vector> -convert_chunks_to_messages( - std::vector&& chunks, PeerRankFn&& peer_rank_fn -) { - std::vector> messages; - messages.reserve(chunks.size()); - - for (auto&& chunk : chunks) { - auto dst = peer_rank_fn(chunk); - auto metadata = std::move(*chunk.serialize()); - auto data = chunk.release_data_buffer(); - - messages.push_back( - std::make_unique( - dst, std::move(metadata), std::move(data) - ) - ); - } - - return messages; -} - -} // namespace - class Shuffler::Progress { public: /** @@ -73,7 +32,10 @@ class Shuffler::Progress { * * @param shuffler Reference to the shuffler instance that this will progress. */ - Progress(Shuffler& shuffler) : shuffler_(shuffler) {} + Progress(Shuffler& shuffler) + : shuffler_(shuffler), + peer_received_(safe_cast(shuffler.comm_->nranks()), 0), + peer_expected_(safe_cast(shuffler.comm_->nranks()), 0) {} /** * @brief Executes a single iteration of the shuffler's event loop. @@ -89,6 +51,11 @@ class Shuffler::Progress { RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("Shuffler.Progress", p_iters++); auto const t0_event_loop = Clock::now(); + // Tags for each stage of the shuffle + Tag const metadata_tag{shuffler_.op_id_, 0}; + Tag const gpu_data_tag{shuffler_.op_id_, 1}; + + auto& log = *shuffler_.comm_->logger(); auto& stats = *shuffler_.statistics_; // Submit outgoing chunks to the metadata payload exchange @@ -96,94 +63,166 @@ class Shuffler::Progress { auto const t0_submit_outgoing = Clock::now(); auto ready_chunks = shuffler_.to_send_.extract_ready(); RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("submit_outgoing", ready_chunks.size()); - - if (!ready_chunks.empty()) { - auto peer_rank_fn = [&shuffler = - shuffler_](detail::Chunk const& chunk) -> Rank { - auto dst = shuffler.partition_owner( - shuffler.comm_, chunk.part_id(), shuffler.total_num_partitions - ); - shuffler.comm_->logger()->trace( - "submitting message to ", dst, ": ", chunk - ); - RAPIDSMPF_EXPECTS( - dst != shuffler.comm_->rank(), "sending message to ourselves" - ); - return dst; - }; - - for (auto const& chunk : ready_chunks) { - if (chunk.data_size() > 0) { - stats.add_bytes_stat("shuffle-payload-send", chunk.data_size()); - } + for (auto&& chunk : ready_chunks) { + auto dst = shuffler_.partition_owner( + shuffler_.comm_, chunk.part_id(), shuffler_.total_num_partitions + ); + RAPIDSMPF_EXPECTS( + dst != shuffler_.comm_->rank(), "sending message to ourselves" + ); + fire_and_forget_.push_back( + shuffler_.comm_->send(chunk.serialize(), dst, metadata_tag) + ); + if (chunk.data_size() > 0) { + stats.add_bytes_stat("shuffle-payload-send", chunk.data_size()); + fire_and_forget_.push_back(shuffler_.comm_->send( + chunk.release_data_buffer(), dst, gpu_data_tag + )); } - - auto messages = - convert_chunks_to_messages(std::move(ready_chunks), peer_rank_fn); - - shuffler_.mpe_->send(std::move(messages)); } + stats.add_duration_stat( "event-loop-submit-outgoing", Clock::now() - t0_submit_outgoing ); } - - // Process all communication operations and get completed chunks + // Receive incoming metadata of remote chunks and place them in + // `incoming_chunks_`, using per-peer recv_from to avoid consuming + // messages belonging to a future collective on the same tag. { - auto const t0_process_comm = Clock::now(); - RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("process_communication"); - - shuffler_.mpe_->progress(); - auto completed_messages = shuffler_.mpe_->recv(); - - for (auto&& message : completed_messages) { - auto chunk = detail::Chunk::deserialize( - message->metadata(), shuffler_.br_, false, message->release_data() - ); + auto const t0_metadata_recv = Clock::now(); + RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("meta_recv"); + [[maybe_unused]] int recv_from_iters = + 0; // this will be stripped off if RAPIDSMPF_VERBOSE_INFO is not set + if (!shuffler_.local_partitions_.empty()) { + auto const rank = shuffler_.comm_->rank(); + auto const size = shuffler_.comm_->nranks(); + for (Rank i = 0; i < size; ++i) { + auto const peer = (i + rank) % size; + if (peer == rank) { + continue; + } + auto const p = safe_cast(peer); + while (peer_expected_[p] == 0 + || peer_received_[p] < peer_expected_[p]) + { + auto msg = shuffler_.comm_->recv_from(peer, metadata_tag); + if (!msg) { + break; + } + auto chunk = + Chunk::deserialize(*msg, shuffler_.br_, /*validate=*/false); + log.trace("recv_from ", peer, ": ", chunk); + peer_received_[p]++; + if (chunk.is_control_message()) { + peer_expected_[p] = chunk.expected_num_chunks(); + } else { + RAPIDSMPF_EXPECTS( + shuffler_.partition_owner( + shuffler_.comm_, + chunk.part_id(), + shuffler_.total_num_partitions + ) == shuffler_.comm_->rank(), + "receiving chunk not owned by us" + ); + } + incoming_chunks_[peer].push_back(std::move(chunk)); + recv_from_iters++; + } + } + } - RAPIDSMPF_EXPECTS( - shuffler_.partition_owner( - shuffler_.comm_, chunk.part_id(), shuffler_.total_num_partitions - ) == shuffler_.comm_->rank(), - "receiving chunk not owned by us" - ); + stats.add_duration_stat( + "event-loop-metadata-recv", Clock::now() - t0_metadata_recv + ); + RAPIDSMPF_NVTX_MARKER_VERBOSE("meta_recv_iters", recv_from_iters); + } + // Post receives for incoming chunks. Note that we start the allocation of chunks + // in received message order, but because the allocations run on different streams + // they might not complete and be ready in that order. To handle that, we separate + // incoming chunks by rank and then process chunks in FIFO order until we observe + // a non-ready chunk. + { + auto const t0_post_incoming_chunk_recv = Clock::now(); + for (auto& [src, chunks] : incoming_chunks_) { + RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("post_chunk_recv", chunks.size()); + std::ptrdiff_t n_processed = 0; + for (auto& chunk : chunks) { + log.trace("checking incoming chunk data from ", src, ": ", chunk); - if (chunk.data_size() > 0) { - stats.add_bytes_stat("shuffle-payload-recv", chunk.data_size()); + if (chunk.data_size() > 0) { + if (!chunk.is_ready()) { + break; + } + auto chunk_id = chunk.chunk_id(); + auto data_size = chunk.data_size(); + // Setup to receive the chunk into `in_transit_*`. + auto future = shuffler_.comm_->recv( + src, gpu_data_tag, chunk.release_data_buffer() + ); + RAPIDSMPF_EXPECTS( + in_transit_futures_.emplace(chunk_id, std::move(future)) + .second, + "in transit future already exist" + ); + RAPIDSMPF_EXPECTS( + in_transit_chunks_.emplace(chunk_id, std::move(chunk)).second, + "in transit chunk already exist" + ); + shuffler_.statistics_->add_bytes_stat( + "shuffle-payload-recv", data_size + ); + } else { + // Control messages and metadata-only messages go + // directly to the ready postbox. + shuffler_.insert_into_received(std::move(chunk)); + } + n_processed++; } - - shuffler_.insert_into_received(std::move(chunk)); + chunks.erase(chunks.begin(), chunks.begin() + n_processed); } stats.add_duration_stat( - "event-loop-process-communication", Clock::now() - t0_process_comm + "event-loop-post-incoming-chunk-recv", + Clock::now() - t0_post_incoming_chunk_recv ); } - stats.add_duration_stat("event-loop-total", Clock::now() - t0_event_loop); - - // Signal the MPE that no more messages will be sent once all application - // messages have been flushed from to_send_ into the MPE. - if (!mpe_finish_called_ - && shuffler_.locally_finished_.load(std::memory_order_acquire) - && shuffler_.to_send_.empty()) + // Check if any data in transit is finished. { - shuffler_.mpe_->finish(); - mpe_finish_called_ = true; + auto const t0_check_future_finish = Clock::now(); + RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE( + "check_fut_finish", in_transit_futures_.size() + ); + if (!in_transit_futures_.empty()) { + std::vector finished = + shuffler_.comm_->test_some(in_transit_futures_); + for (auto cid : finished) { + auto chunk = extract_value(in_transit_chunks_, cid); + auto future = extract_value(in_transit_futures_, cid); + chunk.set_data_buffer( + shuffler_.comm_->release_data(std::move(future)) + ); + shuffler_.insert_into_received(std::move(chunk)); + } + } + // Check if we can free some of the outstanding futures. + if (!fire_and_forget_.empty()) { + std::ignore = shuffler_.comm_->test_some(fire_and_forget_); + } + stats.add_duration_stat( + "event-loop-check-future-finish", Clock::now() - t0_check_future_finish + ); } - - // There are no messages to be posted, or waiting to be completed. - bool const containers_empty = - shuffler_.mpe_->finished_polling() && shuffler_.to_send_.empty(); // We've inserted a finish message and we've received everything we expect. bool const is_finished = shuffler_.locally_finished_.load(std::memory_order_acquire) - && shuffler_.finish_counter_.all_finished(); - // Signal can_extract_ when all chunks have been received and all sends have been - // posted. If we own no partitions we "can-extract" immediately, but we only wake - // a waiter once we've drained internal containers so that we can reuse the op_id - // for a subsequent shuffle. - if (!shuffler_.can_extract_ && is_finished && containers_empty) { + // There are no messages to be posted, or waiting to be completed. + && shuffler_.to_send_.empty() && shuffler_.finish_counter_.all_finished(); + // Signal can_extract_ when all chunks have been received and all internal + // containers are drained. If we own no partitions we "can-extract" immediately, + // but we only wake a waiter once we've drained internal containers so that we can + // reuse the op_id for a subsequent shuffle. + if (!shuffler_.can_extract_ && is_finished) { { std::lock_guard lock(shuffler_.mutex_); shuffler_.can_extract_ = true; @@ -193,17 +232,32 @@ class Shuffler::Progress { callback(); } } + bool const containers_empty = + is_finished && fire_and_forget_.empty() && in_transit_chunks_.empty() + && in_transit_futures_.empty() + && std::ranges::all_of(incoming_chunks_, [](auto const& kv) { + return kv.second.empty(); + }); // Finished and shuffler is no longer active. - bool const is_done = !shuffler_.active_.load(std::memory_order_acquire) - && is_finished && containers_empty - && shuffler_.mpe_->is_idle(); + bool const is_done = + !shuffler_.active_.load(std::memory_order_acquire) && containers_empty; + stats.add_duration_stat("event-loop-total", Clock::now() - t0_event_loop); return is_done ? ProgressThread::ProgressState::Done : ProgressThread::ProgressState::InProgress; } private: Shuffler& shuffler_; - bool mpe_finish_called_{false}; + std::vector peer_received_; ///< Messages received per rank. + std::vector peer_expected_; ///< Total expected from rank (0 = unknown). + std::vector> + fire_and_forget_; ///< Ongoing "fire-and-forget" operations (non-blocking sends). + std::unordered_map> + incoming_chunks_; ///< Per-rank FIFO of chunks awaiting receive. + std::unordered_map + in_transit_chunks_; ///< Chunks currently in transit. + std::unordered_map> + in_transit_futures_; ///< Futures corresponding to in-transit chunks. #if RAPIDSMPF_VERBOSE_INFO std::int64_t p_iters = 0; ///< Number of progress iterations (for NVTX) @@ -230,29 +284,15 @@ Shuffler::Shuffler( PartID total_num_partitions, BufferResource* br, FinishedCallback&& finished_callback, - PartitionOwner partition_owner_fn, - std::unique_ptr mpe + PartitionOwner partition_owner_fn ) : total_num_partitions{total_num_partitions}, partition_owner{std::move(partition_owner_fn)}, br_{br}, + op_id_{op_id}, to_send_{}, received_{safe_cast(total_num_partitions)}, comm_{std::move(comm)}, - mpe_{ - mpe ? std::move(mpe) - : std::make_unique( - comm_, - op_id, - [this](std::size_t size) -> std::unique_ptr { - return br_->allocate( - br_->stream_pool().get_stream(), - br_->reserve_or_fail(size, MEMORY_TYPES) - ); - }, - br_->statistics() - ) - }, local_partitions_{local_partitions(comm_, total_num_partitions, partition_owner)}, finish_counter_{comm_->nranks(), safe_cast(local_partitions_.size())}, outbound_chunk_counter_(safe_cast(comm_->nranks()), 0),