diff --git a/cpp/benchmarks/bench_shuffle.cpp b/cpp/benchmarks/bench_shuffle.cpp index 1744f488a..6bec096ba 100644 --- a/cpp/benchmarks/bench_shuffle.cpp +++ b/cpp/benchmarks/bench_shuffle.cpp @@ -52,7 +52,7 @@ class ArgumentParser { } try { int option; - while ((option = getopt(argc, argv, "C:r:w:c:n:p:o:m:l:LigsbxhM:")) != -1) { + while ((option = getopt(argc, argv, "C:r:w:c:n:p:o:m:l:LigsbdxhM:")) != -1) { switch (option) { case 'h': { @@ -77,7 +77,9 @@ class ArgumentParser { " unlimited)\n" << " -g Use pre-partitioned (hash) input tables " "(default: unset, hash partition during insertion)\n" - << " -s Discard output chunks to simulate streaming " + << " -s Discard output chunks after extract and " + "concat to simulate streaming (default: disabled)\n" + << " -d Discard result after shuffle completes " "(default: disabled)\n" << " -b Disallow memory overbooking when generating " "input data (default: allow memory overbooking)\n" @@ -159,6 +161,9 @@ class ArgumentParser { case 's': enable_output_discard = true; break; + case 'd': + just_shuffle = true; + break; case 'b': input_data_allow_overbooking = rapidsmpf::AllowOverbooking::NO; break; @@ -235,6 +240,9 @@ class ArgumentParser { if (enable_output_discard) { ss << " -s (enable output discard to simulate streaming)\n"; } + if (just_shuffle) { + ss << " -d (only shuffle, no extraction)\n"; + } if (input_data_allow_overbooking == rapidsmpf::AllowOverbooking::NO) { ss << " -b (disallow memory overbooking when generating input data)\n"; } @@ -271,6 +279,7 @@ class ArgumentParser { std::int64_t device_mem_limit_mb{-1}; bool pinned_mem_disable{false}; bool enable_cupti_monitoring{false}; + bool just_shuffle{false}; std::string cupti_csv_prefix; }; @@ -315,20 +324,21 @@ rapidsmpf::Duration do_run( shuffle_insert_fn(shuffler); shuffler.wait(); - for (auto finished_partition : shuffler.local_partitions()) { - auto packed_chunks = shuffler.extract(finished_partition); - auto output_partition = rapidsmpf::unpack_and_concat( - rapidsmpf::unspill_partitions( - std::move(packed_chunks), br, rapidsmpf::AllowOverbooking::YES - ), - stream, - br - ); - if (!args.enable_output_discard) { - output_partitions.emplace_back(std::move(output_partition)); + if (!args.just_shuffle) { + for (auto finished_partition : shuffler.local_partitions()) { + auto packed_chunks = shuffler.extract(finished_partition); + auto output_partition = rapidsmpf::unpack_and_concat( + rapidsmpf::unspill_partitions( + std::move(packed_chunks), br, rapidsmpf::AllowOverbooking::YES + ), + stream, + br + ); + if (!args.enable_output_discard) { + output_partitions.emplace_back(std::move(output_partition)); + } } } - stream.synchronize(); } auto const elapsed = rapidsmpf::Clock::now() - t0_elapsed; diff --git a/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/core.hpp b/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/core.hpp index 4657a5928..077d27ca4 100644 --- a/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/core.hpp +++ b/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/core.hpp @@ -208,6 +208,7 @@ class MetadataPayloadExchange { * @return `true` if the communication layer is idle; `false` if activity is ongoing. */ [[nodiscard]] virtual bool is_idle() const = 0; + [[nodiscard]] virtual bool finished_polling() const = 0; }; diff --git a/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/tag.hpp b/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/tag.hpp index f712c0c1e..376428363 100644 --- a/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/tag.hpp +++ b/cpp/include/rapidsmpf/communicator/metadata_payload_exchange/tag.hpp @@ -83,6 +83,7 @@ class TagMetadataPayloadExchange : public MetadataPayloadExchange { * @copydoc MetadataPayloadExchange::is_idle */ bool is_idle() const override; + bool finished_polling() const override; private: /** diff --git a/cpp/include/rapidsmpf/shuffler/chunk.hpp b/cpp/include/rapidsmpf/shuffler/chunk.hpp index 75106e1cb..efcb57a0d 100644 --- a/cpp/include/rapidsmpf/shuffler/chunk.hpp +++ b/cpp/include/rapidsmpf/shuffler/chunk.hpp @@ -228,24 +228,17 @@ class Chunk { * @brief Create a chunk by deserializing a metadata message. * * @param msg The metadata message received from another rank. - * @param br Buffer resource for allocating the data buffer of the deserialized - * message. Ignored when @p data is provided. + * @param br Buffer resource for allocating a the data buffer of the deserialized + * message. * @param validate Whether to validate the metadata buffer. - * @param data Optional pre-existing data buffer to use instead of allocating a new - * one. When non-null, the buffer resource allocation is skipped entirely, avoiding - * unnecessary memory pressure from a temporary allocation. * @return The chunk. * - * @throws std::logic_error if the chunk is not a control message and neither @p data - * nor @p br is provided. - * @throws std::runtime_error if the metadata buffer does not follow the expected - * format and @p validate is true. + * @throws std::logic_error if the chunk is not a control message and no buffer + * resource is provided. @throws std::runtime_error if the metadata buffer does not + * follow the expected format and `validate` is true. */ static Chunk deserialize( - std::vector const& msg, - BufferResource* br, - bool validate = true, - std::unique_ptr data = nullptr + std::vector const& msg, BufferResource* br, bool validate = true ); /** diff --git a/cpp/include/rapidsmpf/shuffler/shuffler.hpp b/cpp/include/rapidsmpf/shuffler/shuffler.hpp index f0d8d00ad..3b4d3247a 100644 --- a/cpp/include/rapidsmpf/shuffler/shuffler.hpp +++ b/cpp/include/rapidsmpf/shuffler/shuffler.hpp @@ -16,7 +16,6 @@ #include #include -#include #include #include #include @@ -119,8 +118,6 @@ class Shuffler { * @param br Buffer resource used to allocate temporary and the shuffle result. * @param finished_callback Callback to notify when all partitions are finished. * @param partition_owner Function to determine partition ownership. - * @param mpe Optional custom metadata payload exchange. If not provided, - * uses the default tag-based implementation. * * @note It is safe to reuse the `op_id` as soon as `wait` has completed * locally. @@ -135,8 +132,7 @@ class Shuffler { PartID total_num_partitions, BufferResource* br, FinishedCallback&& finished_callback, - PartitionOwner partition_owner = round_robin, - std::unique_ptr mpe = nullptr + PartitionOwner partition_owner = round_robin ); /** @@ -148,8 +144,6 @@ class Shuffler { * @param total_num_partitions Total number of partitions in the shuffle. * @param br Buffer resource used to allocate temporary and the shuffle result. * @param partition_owner Function to determine partition ownership. - * @param mpe Optional custom metadata payload exchange. If not provided, - * uses the default tag-based implementation. * * @note The caller promises that inserted buffers are stream-ordered with respect * to their own stream, and extracted buffers are likewise guaranteed to be stream- @@ -160,18 +154,9 @@ class Shuffler { OpID op_id, PartID total_num_partitions, BufferResource* br, - PartitionOwner partition_owner = round_robin, - std::unique_ptr mpe = nullptr + PartitionOwner partition_owner = round_robin ) - : Shuffler( - comm, - op_id, - total_num_partitions, - br, - nullptr, - partition_owner, - std::move(mpe) - ) {} + : Shuffler(comm, op_id, total_num_partitions, br, nullptr, partition_owner) {} ~Shuffler(); @@ -328,12 +313,12 @@ class Shuffler { // Flipped to true exactly once when partitions are ready for extraction and we've // posted all sends we're going to bool can_extract_{false}; + OpID const op_id_; detail::ChunksToSend to_send_; ///< Storage for chunks to send to other ranks. detail::ReceivedChunks received_; ///< Storage for received chunks that are ///< ready to be extracted by the user. std::shared_ptr comm_; - std::unique_ptr mpe_; ProgressThread::FunctionID progress_thread_function_id_; SpillManager::SpillFunctionID spill_function_id_; diff --git a/cpp/src/communicator/metadata_payload_exchange/tag.cpp b/cpp/src/communicator/metadata_payload_exchange/tag.cpp index c94467c5f..bcb6bfd4d 100644 --- a/cpp/src/communicator/metadata_payload_exchange/tag.cpp +++ b/cpp/src/communicator/metadata_payload_exchange/tag.cpp @@ -140,7 +140,8 @@ void TagMetadataPayloadExchange::finish() { // exactly how many application messages we sent to it, so the peer can // stop receiving once it has them all. // Format: [sentinel=UINT64_MAX (8 bytes)][message_count (8 bytes)] - for (Rank peer = 0; peer < nranks_; ++peer) { + for (Rank i = 0; i < nranks_; ++i) { + auto const peer = (i + rank_) % nranks_; if (peer == rank_) { continue; } @@ -158,6 +159,22 @@ void TagMetadataPayloadExchange::finish() { } } +bool TagMetadataPayloadExchange::finished_polling() const { + if (finished_) { + for (Rank peer = 0; peer < nranks_; ++peer) { + if (peer == rank_) { + continue; + } + auto const p = safe_cast(peer); + if (!peer_terminated_[p] || peer_received_[p] < peer_expected_[p]) { + return false; + } + } + return true; + } + return false; +} + bool TagMetadataPayloadExchange::is_idle() const { bool const io_idle = fire_and_forget_.empty() && incoming_messages_.empty() && in_transit_messages_.empty() && in_transit_futures_.empty(); @@ -184,7 +201,8 @@ void TagMetadataPayloadExchange::receive_metadata() { // Use per-peer recv_from to avoid consuming messages belonging to a future // collective on the same tag (see rapidsai/rapidsmpf#927). - for (Rank peer = 0; peer < nranks_; ++peer) { + for (Rank i = 0; i < nranks_; ++i) { + auto const peer = (i + rank_) % nranks_; if (peer == rank_) { continue; } @@ -316,9 +334,6 @@ TagMetadataPayloadExchange::setup_data_receives() { ); // Store in per-rank vector to maintain order in_transit_messages_[src].push_back(std::move(tag_message)); - // Break to ensure we don't return later messages before this one - // completes - break; } else { // Control/metadata-only message // Only return if there are no earlier in-transit messages from this rank diff --git a/cpp/src/shuffler/chunk.cpp b/cpp/src/shuffler/chunk.cpp index ea50edaac..4ce212ed2 100644 --- a/cpp/src/shuffler/chunk.cpp +++ b/cpp/src/shuffler/chunk.cpp @@ -55,10 +55,7 @@ Chunk Chunk::from_finished_partition( } Chunk Chunk::deserialize( - std::vector const& msg, - BufferResource* br, - bool validate, - std::unique_ptr data + std::vector const& msg, BufferResource* br, bool validate ) { if (validate) { RAPIDSMPF_EXPECTS( @@ -91,7 +88,8 @@ Chunk Chunk::deserialize( msg.begin() + safe_cast(offset), msg.end() ); - if (!data && expected_num_chunks == 0) { + std::unique_ptr data; + if (expected_num_chunks == 0) { RAPIDSMPF_EXPECTS( br != nullptr, "Deserializing non-control Chunk requires a BufferResource" ); diff --git a/cpp/src/shuffler/shuffler.cpp b/cpp/src/shuffler/shuffler.cpp index 2c81a40e1..e47c4c770 100644 --- a/cpp/src/shuffler/shuffler.cpp +++ b/cpp/src/shuffler/shuffler.cpp @@ -13,8 +13,6 @@ #include #include -#include -#include #include #include #include @@ -27,45 +25,6 @@ namespace rapidsmpf::shuffler { using namespace detail; -namespace { - -/** - * @brief Convert chunks into messages for communication. - * - * This function converts a vector of chunks into messages suitable for sending - * through the metadata payload exchange. Each chunk is serialized and its data - * buffer is released to create the message. - * - * @param chunks Vector of chunks to convert (will be moved from). - * @param peer_rank_fn Function to determine the destination rank for each chunk. - * - * @return A vector of message unique pointers ready to be sent. - */ -template -std::vector> -convert_chunks_to_messages( - std::vector&& chunks, PeerRankFn&& peer_rank_fn -) { - std::vector> messages; - messages.reserve(chunks.size()); - - for (auto&& chunk : chunks) { - auto dst = peer_rank_fn(chunk); - auto metadata = std::move(*chunk.serialize()); - auto data = chunk.release_data_buffer(); - - messages.push_back( - std::make_unique( - dst, std::move(metadata), std::move(data) - ) - ); - } - - return messages; -} - -} // namespace - class Shuffler::Progress { public: /** @@ -73,7 +32,10 @@ class Shuffler::Progress { * * @param shuffler Reference to the shuffler instance that this will progress. */ - Progress(Shuffler& shuffler) : shuffler_(shuffler) {} + Progress(Shuffler& shuffler) + : shuffler_(shuffler), + peer_received_(safe_cast(shuffler.comm_->nranks()), 0), + peer_expected_(safe_cast(shuffler.comm_->nranks()), 0) {} /** * @brief Executes a single iteration of the shuffler's event loop. @@ -89,6 +51,11 @@ class Shuffler::Progress { RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("Shuffler.Progress", p_iters++); auto const t0_event_loop = Clock::now(); + // Tags for each stage of the shuffle + Tag const metadata_tag{shuffler_.op_id_, 0}; + Tag const gpu_data_tag{shuffler_.op_id_, 1}; + + auto& log = *shuffler_.comm_->logger(); auto& stats = *shuffler_.statistics_; // Submit outgoing chunks to the metadata payload exchange @@ -96,97 +63,166 @@ class Shuffler::Progress { auto const t0_submit_outgoing = Clock::now(); auto ready_chunks = shuffler_.to_send_.extract_ready(); RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("submit_outgoing", ready_chunks.size()); - - if (!ready_chunks.empty()) { - auto peer_rank_fn = [&shuffler = - shuffler_](detail::Chunk const& chunk) -> Rank { - auto dst = shuffler.partition_owner( - shuffler.comm_, chunk.part_id(), shuffler.total_num_partitions - ); - shuffler.comm_->logger()->trace( - "submitting message to ", dst, ": ", chunk - ); - RAPIDSMPF_EXPECTS( - dst != shuffler.comm_->rank(), "sending message to ourselves" - ); - return dst; - }; - - for (auto const& chunk : ready_chunks) { - if (chunk.data_size() > 0) { - stats.add_bytes_stat("shuffle-payload-send", chunk.data_size()); - } + for (auto&& chunk : ready_chunks) { + auto dst = shuffler_.partition_owner( + shuffler_.comm_, chunk.part_id(), shuffler_.total_num_partitions + ); + RAPIDSMPF_EXPECTS( + dst != shuffler_.comm_->rank(), "sending message to ourselves" + ); + fire_and_forget_.push_back( + shuffler_.comm_->send(chunk.serialize(), dst, metadata_tag) + ); + if (chunk.data_size() > 0) { + stats.add_bytes_stat("shuffle-payload-send", chunk.data_size()); + fire_and_forget_.push_back(shuffler_.comm_->send( + chunk.release_data_buffer(), dst, gpu_data_tag + )); } - - auto messages = - convert_chunks_to_messages(std::move(ready_chunks), peer_rank_fn); - - shuffler_.mpe_->send(std::move(messages)); } + stats.add_duration_stat( "event-loop-submit-outgoing", Clock::now() - t0_submit_outgoing ); } - - // Process all communication operations and get completed chunks + // Receive incoming metadata of remote chunks and place them in + // `incoming_chunks_`, using per-peer recv_from to avoid consuming + // messages belonging to a future collective on the same tag. { - auto const t0_process_comm = Clock::now(); - RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("process_communication"); - - shuffler_.mpe_->progress(); - auto completed_messages = shuffler_.mpe_->recv(); - - for (auto&& message : completed_messages) { - auto chunk = detail::Chunk::deserialize( - message->metadata(), shuffler_.br_, false, message->release_data() - ); + auto const t0_metadata_recv = Clock::now(); + RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("meta_recv"); + [[maybe_unused]] int recv_from_iters = + 0; // this will be stripped off if RAPIDSMPF_VERBOSE_INFO is not set + if (!shuffler_.local_partitions_.empty()) { + auto const rank = shuffler_.comm_->rank(); + auto const size = shuffler_.comm_->nranks(); + for (Rank i = 0; i < size; ++i) { + auto const peer = (i + rank) % size; + if (peer == rank) { + continue; + } + auto const p = safe_cast(peer); + while (peer_expected_[p] == 0 + || peer_received_[p] < peer_expected_[p]) + { + auto msg = shuffler_.comm_->recv_from(peer, metadata_tag); + if (!msg) { + break; + } + auto chunk = + Chunk::deserialize(*msg, shuffler_.br_, /*validate=*/false); + log.trace("recv_from ", peer, ": ", chunk); + peer_received_[p]++; + if (chunk.is_control_message()) { + peer_expected_[p] = chunk.expected_num_chunks(); + } else { + RAPIDSMPF_EXPECTS( + shuffler_.partition_owner( + shuffler_.comm_, + chunk.part_id(), + shuffler_.total_num_partitions + ) == shuffler_.comm_->rank(), + "receiving chunk not owned by us" + ); + } + incoming_chunks_[peer].push_back(std::move(chunk)); + recv_from_iters++; + } + } + } - RAPIDSMPF_EXPECTS( - shuffler_.partition_owner( - shuffler_.comm_, chunk.part_id(), shuffler_.total_num_partitions - ) == shuffler_.comm_->rank(), - "receiving chunk not owned by us" - ); + stats.add_duration_stat( + "event-loop-metadata-recv", Clock::now() - t0_metadata_recv + ); + RAPIDSMPF_NVTX_MARKER_VERBOSE("meta_recv_iters", recv_from_iters); + } + // Post receives for incoming chunks. Note that we start the allocation of chunks + // in received message order, but because the allocations run on different streams + // they might not complete and be ready in that order. To handle that, we separate + // incoming chunks by rank and then process chunks in FIFO order until we observe + // a non-ready chunk. + { + auto const t0_post_incoming_chunk_recv = Clock::now(); + for (auto& [src, chunks] : incoming_chunks_) { + RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE("post_chunk_recv", chunks.size()); + std::ptrdiff_t n_processed = 0; + for (auto& chunk : chunks) { + log.trace("checking incoming chunk data from ", src, ": ", chunk); - if (chunk.data_size() > 0) { - stats.add_bytes_stat("shuffle-payload-recv", chunk.data_size()); + if (chunk.data_size() > 0) { + if (!chunk.is_ready()) { + break; + } + auto chunk_id = chunk.chunk_id(); + auto data_size = chunk.data_size(); + // Setup to receive the chunk into `in_transit_*`. + auto future = shuffler_.comm_->recv( + src, gpu_data_tag, chunk.release_data_buffer() + ); + RAPIDSMPF_EXPECTS( + in_transit_futures_.emplace(chunk_id, std::move(future)) + .second, + "in transit future already exist" + ); + RAPIDSMPF_EXPECTS( + in_transit_chunks_.emplace(chunk_id, std::move(chunk)).second, + "in transit chunk already exist" + ); + shuffler_.statistics_->add_bytes_stat( + "shuffle-payload-recv", data_size + ); + } else { + // Control messages and metadata-only messages go + // directly to the ready postbox. + shuffler_.insert_into_received(std::move(chunk)); + } + n_processed++; } - - shuffler_.insert_into_received(std::move(chunk)); + chunks.erase(chunks.begin(), chunks.begin() + n_processed); } stats.add_duration_stat( - "event-loop-process-communication", Clock::now() - t0_process_comm + "event-loop-post-incoming-chunk-recv", + Clock::now() - t0_post_incoming_chunk_recv ); } - stats.add_duration_stat("event-loop-total", Clock::now() - t0_event_loop); - - // Signal the MPE that no more messages will be sent once all application - // messages have been flushed from to_send_ into the MPE. - if (!mpe_finish_called_ - && shuffler_.locally_finished_.load(std::memory_order_acquire) - && shuffler_.to_send_.empty()) + // Check if any data in transit is finished. { - shuffler_.mpe_->finish(); - mpe_finish_called_ = true; + auto const t0_check_future_finish = Clock::now(); + RAPIDSMPF_NVTX_SCOPED_RANGE_VERBOSE( + "check_fut_finish", in_transit_futures_.size() + ); + if (!in_transit_futures_.empty()) { + std::vector finished = + shuffler_.comm_->test_some(in_transit_futures_); + for (auto cid : finished) { + auto chunk = extract_value(in_transit_chunks_, cid); + auto future = extract_value(in_transit_futures_, cid); + chunk.set_data_buffer( + shuffler_.comm_->release_data(std::move(future)) + ); + shuffler_.insert_into_received(std::move(chunk)); + } + } + // Check if we can free some of the outstanding futures. + if (!fire_and_forget_.empty()) { + std::ignore = shuffler_.comm_->test_some(fire_and_forget_); + } + stats.add_duration_stat( + "event-loop-check-future-finish", Clock::now() - t0_check_future_finish + ); } - - // There are no messages to be posted, or waiting to be completed. - bool const containers_empty = - shuffler_.mpe_->is_idle() && shuffler_.to_send_.empty(); // We've inserted a finish message and we've received everything we expect. bool const is_finished = shuffler_.locally_finished_.load(std::memory_order_acquire) - && shuffler_.finish_counter_.all_finished(); - // Finished and shuffler is no longer active. - bool const is_done = !shuffler_.active_.load(std::memory_order_acquire) - && is_finished && containers_empty; + // There are no messages to be posted, or waiting to be completed. + && shuffler_.to_send_.empty() && shuffler_.finish_counter_.all_finished(); // Signal can_extract_ when all chunks have been received and all internal // containers are drained. If we own no partitions we "can-extract" immediately, // but we only wake a waiter once we've drained internal containers so that we can // reuse the op_id for a subsequent shuffle. - if (!shuffler_.can_extract_ && is_finished && containers_empty) { + if (!shuffler_.can_extract_ && is_finished) { { std::lock_guard lock(shuffler_.mutex_); shuffler_.can_extract_ = true; @@ -196,13 +232,32 @@ class Shuffler::Progress { callback(); } } + bool const containers_empty = + is_finished && fire_and_forget_.empty() && in_transit_chunks_.empty() + && in_transit_futures_.empty() + && std::ranges::all_of(incoming_chunks_, [](auto const& kv) { + return kv.second.empty(); + }); + // Finished and shuffler is no longer active. + bool const is_done = + !shuffler_.active_.load(std::memory_order_acquire) && containers_empty; + stats.add_duration_stat("event-loop-total", Clock::now() - t0_event_loop); return is_done ? ProgressThread::ProgressState::Done : ProgressThread::ProgressState::InProgress; } private: Shuffler& shuffler_; - bool mpe_finish_called_{false}; + std::vector peer_received_; ///< Messages received per rank. + std::vector peer_expected_; ///< Total expected from rank (0 = unknown). + std::vector> + fire_and_forget_; ///< Ongoing "fire-and-forget" operations (non-blocking sends). + std::unordered_map> + incoming_chunks_; ///< Per-rank FIFO of chunks awaiting receive. + std::unordered_map + in_transit_chunks_; ///< Chunks currently in transit. + std::unordered_map> + in_transit_futures_; ///< Futures corresponding to in-transit chunks. #if RAPIDSMPF_VERBOSE_INFO std::int64_t p_iters = 0; ///< Number of progress iterations (for NVTX) @@ -229,29 +284,15 @@ Shuffler::Shuffler( PartID total_num_partitions, BufferResource* br, FinishedCallback&& finished_callback, - PartitionOwner partition_owner_fn, - std::unique_ptr mpe + PartitionOwner partition_owner_fn ) : total_num_partitions{total_num_partitions}, partition_owner{std::move(partition_owner_fn)}, br_{br}, + op_id_{op_id}, to_send_{}, received_{safe_cast(total_num_partitions)}, comm_{std::move(comm)}, - mpe_{ - mpe ? std::move(mpe) - : std::make_unique( - comm_, - op_id, - [this](std::size_t size) -> std::unique_ptr { - return br_->allocate( - br_->stream_pool().get_stream(), - br_->reserve_or_fail(size, MEMORY_TYPES) - ); - }, - br_->statistics() - ) - }, local_partitions_{local_partitions(comm_, total_num_partitions, partition_owner)}, finish_counter_{comm_->nranks(), safe_cast(local_partitions_.size())}, outbound_chunk_counter_(safe_cast(comm_->nranks()), 0),