diff --git a/cpp/include/rmm/mr/detail/fixed_size_memory_resource_impl.hpp b/cpp/include/rmm/mr/detail/fixed_size_memory_resource_impl.hpp index c651f5077..8951af77a 100644 --- a/cpp/include/rmm/mr/detail/fixed_size_memory_resource_impl.hpp +++ b/cpp/include/rmm/mr/detail/fixed_size_memory_resource_impl.hpp @@ -12,12 +12,18 @@ #include #include +#include #include #include #include namespace RMM_NAMESPACE { namespace mr { + +// Forward declarations for friend access from detail::fixed_size_memory_resource_impl +class fixed_size_memory_resource; +class multiple_blocks_allocation; + namespace detail { /** @@ -79,6 +85,12 @@ class fixed_size_memory_resource_impl final std::pair free_list_summary(free_list const& blocks); private: + friend class RMM_NAMESPACE::mr::multiple_blocks_allocation; + + // Caller must hold get_mutex(). + [[nodiscard]] cudaError_t deallocate_blocks_async_unsafe(std::vector&& blocks, + cuda_stream_view stream); + free_list blocks_from_upstream(cuda_stream_view stream); void release(); diff --git a/cpp/include/rmm/mr/detail/stream_ordered_memory_resource.hpp b/cpp/include/rmm/mr/detail/stream_ordered_memory_resource.hpp index e8de8b65e..8da377a16 100644 --- a/cpp/include/rmm/mr/detail/stream_ordered_memory_resource.hpp +++ b/cpp/include/rmm/mr/detail/stream_ordered_memory_resource.hpp @@ -290,7 +290,7 @@ class stream_ordered_memory_resource : public crtp { bool operator<(stream_event_pair const& rhs) const { return event < rhs.event; } }; - private: + protected: /** * @brief get a unique CUDA event (possibly new) associated with `stream` * diff --git a/cpp/include/rmm/mr/fixed_size_memory_resource.hpp b/cpp/include/rmm/mr/fixed_size_memory_resource.hpp index 5ba97813d..459f8f4b2 100644 --- a/cpp/include/rmm/mr/fixed_size_memory_resource.hpp +++ b/cpp/include/rmm/mr/fixed_size_memory_resource.hpp @@ -9,8 +9,12 @@ #include #include +#include +#include #include +#include +#include namespace RMM_NAMESPACE { namespace mr { @@ -84,6 +88,150 @@ class RMM_EXPORT fixed_size_memory_resource static_assert(cuda::mr::resource_with, "fixed_size_memory_resource does not satisfy the cuda::mr::resource concept"); +/** + * @brief RAII handle for an allocation that may span multiple fixed-size blocks from a + * `fixed_size_memory_resource`. + * + * When destroyed, all blocks are returned to the memory resource on the same stream used for + * allocation. Copy is disabled to prevent double deallocation; move transfers ownership of the + * blocks. Holds a `fixed_size_memory_resource` (which has shared, refcounted ownership of the + * underlying pool) so the pool outlives the handle. + */ +class RMM_EXPORT multiple_blocks_allocation { + public: + /** + * @brief Allocate device memory spanning one or more fixed-size blocks, stream-ordered on a + * non-PTDS stream. + * + * Use this for allocations larger than a single block. The allocation is ordered on + * `stream`; deallocation (when the returned handle is destroyed) is also ordered on + * the same stream. A single event is recorded for the whole allocation, so there is no + * per-block event overhead. + * + * @param mr The `fixed_size_memory_resource` that supplies blocks. Copied by value since + * `fixed_size_memory_resource` has refcounted shared ownership. + * @param size Minimum number of bytes to allocate. Will be rounded up to a multiple of + * block size (see `get_block_size()` on `*mr`). + * @param stream A non-PTDS CUDA stream on which the allocation is ordered. + * @return Unique handle to the allocation; destroys to deallocate. Empty (zero-size) + * allocation returns a valid handle with size 0 and no blocks. + * @throw rmm::invalid_argument if `stream` is a per-thread default stream. + * @throw Any exception from allocating blocks. Blocks successfully taken from the pool + * before the failure are returned to the pool on `stream` (same ordering as normal + * deallocation). + */ + [[nodiscard]] static std::unique_ptr make_async( + fixed_size_memory_resource mr, std::size_t size, cuda::stream_ref stream); + + /** + * @brief Destroy this handle and return any held blocks to the pool. + * + * `noexcept`. Uses `deallocate_blocks_async_unsafe` under the pool mutex; CUDA errors are + * logged with `RMM_LOG_ERROR` and other exceptions during teardown are caught and logged. + */ + ~multiple_blocks_allocation() noexcept; + + multiple_blocks_allocation(multiple_blocks_allocation const&) = delete; + multiple_blocks_allocation& operator=(multiple_blocks_allocation const&) = delete; + + /** + * @brief Move-constructor + * + * @param other Source handle to move from. + */ + multiple_blocks_allocation(multiple_blocks_allocation&& other) noexcept; + + /** + * @brief Move-assignment + * @param other Source handle to move from. + * @return Reference to `*this`. + * @throw rmm::cuda_error if returning the current blocks to the pool fails during `clear()`. + */ + multiple_blocks_allocation& operator=(multiple_blocks_allocation&& other); + + /** + * @brief Number of bytes requested for this allocation. + * + * @return Requested size in bytes. + */ + [[nodiscard]] constexpr std::size_t size() const noexcept { return size_; } + + /** + * @brief Total capacity in bytes (number of blocks × block size). + * + * @return Capacity in bytes; always >= size(). + */ + [[nodiscard]] std::size_t capacity() const noexcept { return block_size() * blocks_.size(); } + + /** + * @brief Size in bytes of each block in this allocation. + * + * @return Block size (same as the memory resource's get_block_size()). + */ + [[nodiscard]] std::size_t block_size() const noexcept { return mr_->get_block_size(); } + + /** + * @brief Non-owning view of the underlying block pointers. + * + * @return Span of device pointers, one per block; each block has size block_size(). + */ + [[nodiscard]] cuda::std::span get_blocks() const noexcept + { + return {blocks_.data(), blocks_.size()}; + } + + /** + * @brief Span over the i-th block's bytes. + * + * @param i Block index in [0, get_blocks().size()). + * @return Span of std::byte over the i-th block. + */ + [[nodiscard]] cuda::std::span operator[](std::size_t i) const + { + return {blocks_[i], mr_->get_block_size()}; + } + + /** + * @brief Span over the i-th block's bytes with bounds checking. + * + * @param i Block index. + * @return Span of std::byte over the i-th block. + * @throws std::out_of_range if i >= number of blocks. + */ + [[nodiscard]] cuda::std::span at(std::size_t i) const + { + return {blocks_.at(i), mr_->get_block_size()}; + } + + /** + * @brief Stream on which this allocation is ordered. + * + * @return The stream passed to make_async. + */ + [[nodiscard]] constexpr cuda::stream_ref stream() const noexcept { return stream_; } + + /** + * @brief Return all blocks to the pool on `stream()`, then leave this handle empty. + * + * Same ordering as destruction: stream-ordered deallocation on the stream passed to + * `make_async`. After `clear()`, `size()` is 0 and `get_blocks()` is empty. + * + * @throw rmm::cuda_error if the event recording fails. + */ + void clear(); + + private: + multiple_blocks_allocation(std::size_t size, + std::vector buffers, + cuda::stream_ref stream, + fixed_size_memory_resource mr) noexcept; + + std::vector blocks_; + std::size_t size_; + cuda::stream_ref stream_; + fixed_size_memory_resource mr_; +}; + /** @} */ // end of group } // namespace mr } // namespace RMM_NAMESPACE diff --git a/cpp/src/mr/detail/fixed_size_memory_resource_impl.cpp b/cpp/src/mr/detail/fixed_size_memory_resource_impl.cpp index e17f6ecdd..ac9b74f63 100644 --- a/cpp/src/mr/detail/fixed_size_memory_resource_impl.cpp +++ b/cpp/src/mr/detail/fixed_size_memory_resource_impl.cpp @@ -7,8 +7,11 @@ #include #include #include +#include #include +#include +#include #include #include @@ -107,6 +110,23 @@ std::pair fixed_size_memory_resource_impl::free_list_s : std::make_pair(block_size_, blocks.size() * block_size_); } +cudaError_t fixed_size_memory_resource_impl::deallocate_blocks_async_unsafe( + std::vector&& blocks, cuda_stream_view stream) +{ + if (blocks.empty()) { return cudaSuccess; } + + free_list blocks_free_list; + cuda::std::ranges::for_each(blocks, [this, &blocks_free_list](std::byte* ptr) { + blocks_free_list.insert(this->free_block(ptr, get_block_size())); + }); + + auto stream_event = get_event(stream); + cudaError_t const error = cudaEventRecord(stream_event.event, stream.value()); + if (cudaSuccess != error) { return error; } + this->insert_blocks(std::move(blocks_free_list), stream); + return cudaSuccess; +} + #ifdef RMM_DEBUG_PRINT void fixed_size_memory_resource_impl::print() { diff --git a/cpp/src/mr/fixed_size_memory_resource.cpp b/cpp/src/mr/fixed_size_memory_resource.cpp index 6f2275d2a..b97c23436 100644 --- a/cpp/src/mr/fixed_size_memory_resource.cpp +++ b/cpp/src/mr/fixed_size_memory_resource.cpp @@ -3,9 +3,17 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include +#include +#include +#include #include +#include + #include +#include +#include namespace RMM_NAMESPACE { namespace mr { @@ -29,5 +37,90 @@ std::size_t fixed_size_memory_resource::get_block_size() const noexcept return get().get_block_size(); } +// multiple_blocks_allocation + +multiple_blocks_allocation::multiple_blocks_allocation(std::size_t size, + std::vector buffers, + cuda::stream_ref stream, + fixed_size_memory_resource mr) noexcept + : blocks_(std::move(buffers)), size_(size), stream_(stream), mr_(std::move(mr)) +{ +} + +multiple_blocks_allocation::multiple_blocks_allocation(multiple_blocks_allocation&& other) noexcept + : blocks_(std::move(other.blocks_)), + size_(other.size_), + stream_(other.stream_), + mr_(std::move(other.mr_)) +{ + other.size_ = 0; +} + +void multiple_blocks_allocation::clear() +{ + if (!blocks_.empty()) { + std::lock_guard lock(mr_->get_mutex()); + RMM_CUDA_TRY(mr_->deallocate_blocks_async_unsafe(std::move(blocks_), stream_)); + } + size_ = 0; +} + +multiple_blocks_allocation& multiple_blocks_allocation::operator=( + multiple_blocks_allocation&& other) +{ + if (this != &other) { + clear(); + blocks_ = std::move(other.blocks_); + size_ = other.size_; + stream_ = other.stream_; + mr_ = std::move(other.mr_); + other.size_ = 0; + } + return *this; +} + +multiple_blocks_allocation::~multiple_blocks_allocation() noexcept +{ + try { + clear(); + } catch (...) { + RMM_LOG_ERROR( + "multiple_blocks_allocation: exception while releasing device blocks in destructor"); + } +} + +std::unique_ptr multiple_blocks_allocation::make_async( + fixed_size_memory_resource mr, std::size_t size, cuda::stream_ref stream) +{ + RMM_EXPECTS(!cuda_stream_view{stream}.is_per_thread_default(), + "stream must not be a per-thread default stream", + rmm::invalid_argument); + + if (size == 0) { + return std::unique_ptr( + new multiple_blocks_allocation(0, {}, stream, std::move(mr))); + } + + auto& self = *mr; + std::lock_guard lock(self.get_mutex()); + + auto stream_event = self.get_event(stream); + std::size_t const num_blocks = cuda::ceil_div(size, self.get_block_size()); + std::vector blocks; + blocks.reserve(num_blocks); + try { + for (std::size_t i = 0; i < num_blocks; ++i) { + blocks.push_back( + static_cast(self.get_block(self.get_block_size(), stream_event).pointer())); + } + } catch (...) { + RMM_CUDA_TRY(self.deallocate_blocks_async_unsafe(std::move(blocks), stream)); + throw; + } + + return std::unique_ptr( + new multiple_blocks_allocation(size, std::move(blocks), stream, std::move(mr))); +} + } // namespace mr } // namespace RMM_NAMESPACE diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index d56a5a258..07e3cd1fe 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -145,6 +145,7 @@ ConfigureTest(SYSTEM_MR_REF_TEST mr/mr_ref_system_tests.cpp) ConfigureTest(PINNED_MR_REF_TEST mr/mr_ref_pinned_tests.cpp) ConfigureTest(LOGGING_MR_REF_TEST mr/mr_ref_logging_tests.cpp) ConfigureTest(FIXED_SIZE_MR_REF_TEST mr/mr_ref_fixed_size_tests.cpp) +ConfigureTest(FIXED_SIZE_MR_TEST mr/fixed_size_mr_test.cpp) ConfigureTest(DEFAULT_MR_REF_TEST mr/mr_ref_default_tests.cpp) # general adaptor tests diff --git a/cpp/tests/mr/fixed_size_mr_test.cpp b/cpp/tests/mr/fixed_size_mr_test.cpp new file mode 100644 index 000000000..ae70187ef --- /dev/null +++ b/cpp/tests/mr/fixed_size_mr_test.cpp @@ -0,0 +1,183 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights + * reserved. SPDX-License-Identifier: Apache-2.0 + */ + +#include "../byte_literals.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +using namespace rmm::test; + +namespace { + +struct FixedSizeMRTestParam { + cuda::mr::any_resource upstream; + std::string name; ///< Name of the memory resource. + std::size_t block_size{0}; ///< Block size in bytes. + std::size_t size{0}; ///< Allocation size in bytes. + std::size_t n_threads{1}; ///< Number of threads to use for the test. + std::size_t n_streams{1}; ///< Number of streams to use for the test. +}; + +std::vector make_fixed_size_mr_test_params() +{ + std::vector params; + + auto add_params = [&](cuda::mr::any_resource upstream, + std::string const& name) { + for (std::size_t block_sz : {std::size_t{256}, std::size_t{1_KiB}}) { + for (std::size_t size : {std::size_t{0}, block_sz / 2, block_sz, std::size_t{3_KiB}}) { + for (std::size_t n_threads : std::vector{1, 2, 4}) { + for (std::size_t n_streams : std::vector{1, 2, 4}) { + params.emplace_back(upstream, name, block_sz, size, n_threads, n_streams); + } + } + } + } + }; + + add_params(rmm::mr::cuda_memory_resource{}, "Cuda"); + add_params(rmm::mr::cuda_async_memory_resource{}, "CudaAsync"); + + return params; +} + +} // namespace + +class FixedSizeMRTest : public ::testing::TestWithParam { + protected: + using stats_mr = rmm::mr::statistics_resource_adaptor; + using fixed_size_mr = rmm::mr::fixed_size_memory_resource; + using multi_block_alloc = rmm::mr::multiple_blocks_allocation; + + std::size_t expected_blocks() + { + auto const& param = GetParam(); + if (param.size == 0) { return std::size_t{0}; } + return cuda::ceil_div(param.size, param.block_size); + } +}; + +TEST_P(FixedSizeMRTest, AllocAndDeallocBlocksAsync) +{ + auto const& param = GetParam(); + constexpr std::size_t blocks_to_preallocate = 1; + + // statistics_resource_adaptor is itself a shared_resource: copying it shares the same pool. + auto counting = stats_mr(param.upstream); + + { + auto fixed_mr = fixed_size_mr(counting, param.block_size, blocks_to_preallocate); + + rmm::cuda_stream_pool stream_pool{param.n_streams, rmm::cuda_stream::flags::non_blocking}; + + std::size_t const alloc_size = param.size; + std::size_t const n_threads = param.n_threads; + std::size_t const block_size = fixed_mr.get_block_size(); + EXPECT_EQ(block_size, param.block_size); + + constexpr int num_allocations = 16; + std::mutex handles_mutex; + std::vector> handles; + + std::vector> alloc_futs; + alloc_futs.reserve(param.n_threads); + + // each thread allocates num_allocations allocations + for (std::size_t i = 0; i < n_threads; ++i) { + alloc_futs.emplace_back(std::async(std::launch::async, [&] { + for (int i = 0; i < num_allocations; ++i) { + rmm::cuda_stream_view stream = stream_pool.get_stream(); + auto handle = multi_block_alloc::make_async(fixed_mr, alloc_size, stream); + EXPECT_EQ(handle->size(), alloc_size); + EXPECT_EQ(handle->capacity(), expected_blocks() * block_size); + + // enqueue a dummy cudamemsetasync + int dummy = 0; + cuda::std::ranges::for_each(handle->get_blocks(), [&](auto& block) { + RMM_CUDA_TRY(cudaMemsetAsync(block, (dummy++) & 0xFF, block_size, stream.value())); + }); + + { + std::lock_guard lock(handles_mutex); + handles.emplace_back(std::move(handle)); + } + } + })); + } + + // wait for all allocations to complete + cuda::std::ranges::for_each(alloc_futs, [](auto& fut) { fut.get(); }); + + // Note that stream pool is not sync'ed. The counter & driver should be able to account for the + // allocations without sync. + auto const bytes_after_alloc = counting.get_bytes_counter().value; + + if (expected_blocks() > 0) { + EXPECT_GE( + bytes_after_alloc, + static_cast(expected_blocks() * num_allocations * block_size * n_threads)); + } + + // deallocate using multiple threads + std::vector> dealloc_futs; + dealloc_futs.reserve(param.n_threads); + for (std::size_t i = 0; i < n_threads; ++i) { + dealloc_futs.emplace_back(std::async(std::launch::async, [&] { + while (true) { + std::unique_ptr handle; + { + std::lock_guard lock(handles_mutex); + if (handles.empty()) { break; } + handle = std::move(handles.back()); + handles.pop_back(); + } + handle.reset(); + } + })); + } + + // wait for all deallocations to complete + cuda::std::ranges::for_each(dealloc_futs, [](auto& fut) { fut.get(); }); + + EXPECT_EQ(counting.get_bytes_counter().value, bytes_after_alloc) + << "After deallocate, upstream bytes must be unchanged until fixed_size_mr is destroyed"; + + // finally sync the stream pool + for (size_t i = 0; i < param.n_streams; ++i) { + stream_pool.get_stream(i).synchronize(); + } + } + + EXPECT_EQ(counting.get_bytes_counter().value, 0) + << "After fixed_size pool destruction, upstream should have released all memory"; +} + +INSTANTIATE_TEST_SUITE_P(FixedSizeMRTests, + FixedSizeMRTest, + ::testing::ValuesIn(make_fixed_size_mr_test_params()), + [](testing::TestParamInfo const& info) { + return info.param.name + "_bs" + std::to_string(info.param.block_size) + + "_sz" + std::to_string(info.param.size) + "_nt" + + std::to_string(info.param.n_threads) + "_ns" + + std::to_string(info.param.n_streams); + });