From 9d4e4dc66955646f63910e936dc85e13bdeb3ea0 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Tue, 17 Feb 2026 12:26:20 -0500 Subject: [PATCH 01/26] add typedefs for AmSendMemoryTypePolicy and AmSendParams --- cpp/include/ucxx/typedefs.h | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/cpp/include/ucxx/typedefs.h b/cpp/include/ucxx/typedefs.h index bda188dc0..7a114492b 100644 --- a/cpp/include/ucxx/typedefs.h +++ b/cpp/include/ucxx/typedefs.h @@ -168,6 +168,33 @@ class AmReceiverCallbackInfo { AmReceiverCallbackInfo(const AmReceiverCallbackOwnerType owner, AmReceiverCallbackIdType id); }; +/** + * @brief Policy used to allocate receive buffers for Active Messages. + * + * Active Message receive allocations can be strict (error if no allocator is registered for + * sender-provided memory type) or permissive (fallback to host allocation). + */ +enum class AmSendMemoryTypePolicy { + FallbackToHost = 0, ///< If no allocator exists for memory type, fallback to host memory. + ErrorOnUnsupported, ///< If no allocator exists for memory type, fail with unsupported error. +}; + +/** + * @brief Parameters controlling Active Message send behavior. + * + * This object is used by the extended Active Message API to expose UCX send knobs without + * breaking existing callers. + */ +struct AmSendParams { + uint32_t flags{UCP_AM_SEND_FLAG_REPLY}; ///< UCP AM send flags. + ucp_datatype_t datatype{ucp_dt_make_contig(1)}; ///< Datatype used by `ucp_am_send_nbx`. + ucs_memory_type_t memoryType{UCS_MEMORY_TYPE_HOST}; ///< Sender memory type hint. + AmSendMemoryTypePolicy memoryTypePolicy{ + AmSendMemoryTypePolicy::FallbackToHost}; ///< Receiver allocation policy. + std::optional receiverCallbackInfo{ + std::nullopt}; ///< Optional receiver callback metadata. +}; + /** * @brief Serialized form of a remote key. * From 8e8b02ab94c931eb1396bf3449064e8eb44f8248 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Tue, 17 Feb 2026 12:28:01 -0500 Subject: [PATCH 02/26] add contiguous + IOV constructors for AM requests --- cpp/include/ucxx/request_data.h | 25 ++++++++++++++++------ cpp/src/request_data.cpp | 38 +++++++++++++++++++++++++++++---- 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/cpp/include/ucxx/request_data.h b/cpp/include/ucxx/request_data.h index 057936484..993600a15 100644 --- a/cpp/include/ucxx/request_data.h +++ b/cpp/include/ucxx/request_data.h @@ -29,8 +29,14 @@ namespace data { class AmSend { public: const void* const _buffer{nullptr}; ///< The raw pointer where data to be sent is stored. - const size_t _length{0}; ///< The length of the message. + const size_t _length{0}; ///< Message length in bytes (contiguous datatype only). + const std::vector _iov{}; ///< Segments for IOV datatype. + const size_t _count{0}; ///< Element count according to selected datatype. + const uint32_t _flags{UCP_AM_SEND_FLAG_REPLY}; ///< UCP AM send flags. + const ucp_datatype_t _datatype{ucp_dt_make_contig(1)}; ///< UCP datatype. const ucs_memory_type_t _memoryType{UCS_MEMORY_TYPE_HOST}; ///< Memory type used on the operation + const AmSendMemoryTypePolicy _memoryTypePolicy{ + AmSendMemoryTypePolicy::FallbackToHost}; ///< Receiver allocation policy. const std::optional _receiverCallbackInfo{ std::nullopt}; ///< Owner name and unique identifier of the receiver callback. @@ -41,14 +47,21 @@ class AmSend { * * @param[in] buffer a raw pointer to the data to be sent. * @param[in] length the size in bytes of the message to be sent. - * @param[in] memoryType the memory type of the buffer. - * @param[in] receiverCallbackInfo the owner name and unique identifier of the receiver - callback. + * @param[in] params send parameters controlling datatype/flags/policies. */ explicit AmSend(const decltype(_buffer) buffer, const decltype(_length) length, - const decltype(_memoryType) memoryType = UCS_MEMORY_TYPE_HOST, - const decltype(_receiverCallbackInfo) receiverCallbackInfo = std::nullopt); + const AmSendParams& params = AmSendParams{}); + + /** + * @brief Constructor for Active Message-specific send data using IOV datatype. + * + * Construct an object containing Active Message-specific send data for `UCP_DATATYPE_IOV`. + * + * @param[in] iov vector of IOV segments to send. + * @param[in] params send parameters controlling datatype/flags/policies. + */ + explicit AmSend(const decltype(_iov)& iov, const AmSendParams& params = AmSendParams{}); AmSend() = delete; }; diff --git a/cpp/src/request_data.cpp b/cpp/src/request_data.cpp index d4abb0bbc..573c5af4e 100644 --- a/cpp/src/request_data.cpp +++ b/cpp/src/request_data.cpp @@ -21,13 +21,43 @@ namespace data { AmSend::AmSend(const void* const buffer, const size_t length, - const ucs_memory_type memoryType, - const std::optional receiverCallbackInfo) + const AmSendParams& params) : _buffer(buffer), _length(length), - _memoryType(memoryType), - _receiverCallbackInfo(receiverCallbackInfo) + _iov(), + _count(length), + _flags(params.flags), + _datatype(params.datatype), + _memoryType(params.memoryType), + _memoryTypePolicy(params.memoryTypePolicy), + _receiverCallbackInfo(params.receiverCallbackInfo) { + if (_datatype != ucp_dt_make_contig(1)) + throw std::runtime_error("Contiguous AM send requires datatype `ucp_dt_make_contig(1)`."); + + if (_buffer == nullptr && _length > 0) throw std::runtime_error("Buffer cannot be a nullptr."); +} + +AmSend::AmSend(const std::vector& iov, const AmSendParams& params) + : _buffer(nullptr), + _length(0), + _iov(iov), + _count(iov.size()), + _flags(params.flags), + _datatype(params.datatype), + _memoryType(params.memoryType), + _memoryTypePolicy(params.memoryTypePolicy), + _receiverCallbackInfo(params.receiverCallbackInfo) +{ + if (_datatype != UCP_DATATYPE_IOV) + throw std::runtime_error("IOV AM send requires datatype `UCP_DATATYPE_IOV`."); + + if (_iov.empty()) throw std::runtime_error("IOV cannot be empty."); + + for (const auto& segment : _iov) { + if (segment.buffer == nullptr && segment.length > 0) + throw std::runtime_error("IOV segment buffer cannot be nullptr when segment length is > 0."); + } } AmReceive::AmReceive() {} From f7c82100fb80cd4a8353abb0ed155ac76a3678f2 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Tue, 17 Feb 2026 12:28:36 -0500 Subject: [PATCH 03/26] add Endpoint overloads for AmSendParams and std::vector IOV sends --- cpp/include/ucxx/endpoint.h | 46 ++++++++++++++++++++++++++ cpp/src/endpoint.cpp | 36 +++++++++++++++++++- cpp/src/request_am.cpp | 65 ++++++++++++++++++++++++++----------- 3 files changed, 127 insertions(+), 20 deletions(-) diff --git a/cpp/include/ucxx/endpoint.h b/cpp/include/ucxx/endpoint.h index 8416b51e6..2f413911a 100644 --- a/cpp/include/ucxx/endpoint.h +++ b/cpp/include/ucxx/endpoint.h @@ -376,6 +376,52 @@ class Endpoint : public Component { RequestCallbackUserFunction callbackFunction = nullptr, RequestCallbackUserData callbackData = nullptr); + /** + * @brief Enqueue an active message send operation with explicit policy parameters. + * + * This overload extends `amSend()` with explicit UCX datatype/flags controls and receive + * allocation policy metadata while keeping callback behavior identical to the legacy API. + * + * @param[in] buffer a raw pointer to the data to be sent. + * @param[in] length the size in bytes of the message to be sent. + * @param[in] params active message send parameters. + * @param[in] enablePythonFuture whether a python future should be created and + * subsequently notified. + * @param[in] callbackFunction user-defined callback function to call upon completion. + * @param[in] callbackData user-defined data to pass to the `callbackFunction`. + * + * @returns Request to be subsequently checked for the completion and its state. + */ + [[nodiscard]] std::shared_ptr amSend( + const void* const buffer, + const size_t length, + const AmSendParams& params, + const bool enablePythonFuture = false, + RequestCallbackUserFunction callbackFunction = nullptr, + RequestCallbackUserData callbackData = nullptr); + + /** + * @brief Enqueue an active message send operation with IOV datatype. + * + * This overload submits `UCP_DATATYPE_IOV` active message sends. + * + * @param[in] iov vector of IOV segments to be sent. + * @param[in] params active message send parameters. Datatype must be + * `UCP_DATATYPE_IOV`. + * @param[in] enablePythonFuture whether a python future should be created and + * subsequently notified. + * @param[in] callbackFunction user-defined callback function to call upon completion. + * @param[in] callbackData user-defined data to pass to the `callbackFunction`. + * + * @returns Request to be subsequently checked for the completion and its state. + */ + [[nodiscard]] std::shared_ptr amSend( + const std::vector& iov, + const AmSendParams& params, + const bool enablePythonFuture = false, + RequestCallbackUserFunction callbackFunction = nullptr, + RequestCallbackUserData callbackData = nullptr); + /** * @brief Enqueue an active message receive operation. * diff --git a/cpp/src/endpoint.cpp b/cpp/src/endpoint.cpp index 876a2d86c..75a25b4f1 100644 --- a/cpp/src/endpoint.cpp +++ b/cpp/src/endpoint.cpp @@ -449,11 +449,45 @@ std::shared_ptr Endpoint::amSend( const bool enablePythonFuture, RequestCallbackUserFunction callbackFunction, RequestCallbackUserData callbackData) +{ + auto params = AmSendParams{}; + params.memoryType = memoryType; + params.receiverCallbackInfo = receiverCallbackInfo; + + return amSend(buffer, + length, + params, + enablePythonFuture, + callbackFunction, + callbackData); +} + +std::shared_ptr Endpoint::amSend(const void* const buffer, + const size_t length, + const AmSendParams& params, + const bool enablePythonFuture, + RequestCallbackUserFunction callbackFunction, + RequestCallbackUserData callbackData) +{ + auto endpoint = std::dynamic_pointer_cast(shared_from_this()); + return registerInflightRequest( + createRequestAm(endpoint, + data::AmSend(buffer, length, params), + enablePythonFuture, + callbackFunction, + callbackData)); +} + +std::shared_ptr Endpoint::amSend(const std::vector& iov, + const AmSendParams& params, + const bool enablePythonFuture, + RequestCallbackUserFunction callbackFunction, + RequestCallbackUserData callbackData) { auto endpoint = std::dynamic_pointer_cast(shared_from_this()); return registerInflightRequest( createRequestAm(endpoint, - data::AmSend(buffer, length, memoryType, receiverCallbackInfo), + data::AmSend(iov, params), enablePythonFuture, callbackFunction, callbackData)); diff --git a/cpp/src/request_am.cpp b/cpp/src/request_am.cpp index 1c2995a65..cdf9acbf0 100644 --- a/cpp/src/request_am.cpp +++ b/cpp/src/request_am.cpp @@ -32,6 +32,7 @@ typedef std::string AmHeaderSerialized; struct AmHeader { ucs_memory_type_t memoryType; + AmSendMemoryTypePolicy memoryTypePolicy; std::optional receiverCallbackInfo; static AmHeader deserialize(const std::string_view serialized) @@ -49,6 +50,7 @@ struct AmHeader { bool hasReceiverCallback{false}; decode(&hasReceiverCallback, sizeof(hasReceiverCallback)); + std::optional receiverCallbackInfo = std::nullopt; if (hasReceiverCallback) { size_t ownerSize{0}; decode(&ownerSize, sizeof(ownerSize)); @@ -59,11 +61,19 @@ struct AmHeader { AmReceiverCallbackIdType id{}; decode(&id, sizeof(id)); - return AmHeader{.memoryType = memoryType, - .receiverCallbackInfo = AmReceiverCallbackInfo(owner, id)}; + receiverCallbackInfo = AmReceiverCallbackInfo(owner, id); } - return AmHeader{.memoryType = memoryType, .receiverCallbackInfo = std::nullopt}; + AmSendMemoryTypePolicy memoryTypePolicy = AmSendMemoryTypePolicy::FallbackToHost; + if (offset + sizeof(uint8_t) <= serialized.size()) { + uint8_t serializedMemoryTypePolicy{0}; + decode(&serializedMemoryTypePolicy, sizeof(serializedMemoryTypePolicy)); + memoryTypePolicy = static_cast(serializedMemoryTypePolicy); + } + + return AmHeader{.memoryType = memoryType, + .memoryTypePolicy = memoryTypePolicy, + .receiverCallbackInfo = receiverCallbackInfo}; } const AmHeaderSerialized serialize() const @@ -73,8 +83,10 @@ struct AmHeader { const size_t ownerSize = (receiverCallbackInfo) ? receiverCallbackInfo->owner.size() : 0; const size_t amReceiverCallbackInfoSize = (receiverCallbackInfo) ? sizeof(ownerSize) + ownerSize + sizeof(receiverCallbackInfo->id) : 0; + const uint8_t serializedMemoryTypePolicy = static_cast(memoryTypePolicy); const size_t totalSize = - sizeof(memoryType) + sizeof(hasReceiverCallback) + amReceiverCallbackInfoSize; + sizeof(memoryType) + sizeof(hasReceiverCallback) + amReceiverCallbackInfoSize + + sizeof(serializedMemoryTypePolicy); std::string serialized(totalSize, 0); auto encode = [&offset, &serialized](void const* data, size_t bytes) { @@ -89,6 +101,7 @@ struct AmHeader { encode(receiverCallbackInfo->owner.c_str(), ownerSize); encode(&receiverCallbackInfo->id, sizeof(receiverCallbackInfo->id)); } + encode(&serializedMemoryTypePolicy, sizeof(serializedMemoryTypePolicy)); return serialized; } @@ -278,15 +291,17 @@ ucs_status_t RequestAm::recvCallback(void* arg, if (is_rndv) { if (amData->_allocators.find(amHeader.memoryType) == amData->_allocators.end()) { - // TODO: Is a hard failure better? - // ucxx_debug("Unsupported memory type %d", amHeader.memoryType); - // internal::RecvAmMessage recvAmMessage(amData, ep, req, nullptr); - // recvAmMessage.callback(nullptr, UCS_ERR_UNSUPPORTED); - // return UCS_ERR_UNSUPPORTED; - - ucxx_trace_req("No allocator registered for memory type %u, falling back to host memory.", - amHeader.memoryType); - amHeader.memoryType = UCS_MEMORY_TYPE_HOST; + if (amHeader.memoryTypePolicy == AmSendMemoryTypePolicy::ErrorOnUnsupported) { + ucxx_debug("No allocator registered for memory type %u and strict policy is active", + amHeader.memoryType); + internal::RecvAmMessage recvAmMessage(amData, ep, req, nullptr, receiverCallback); + recvAmMessage.callback(nullptr, UCS_ERR_UNSUPPORTED); + return UCS_ERR_UNSUPPORTED; + } else { + ucxx_trace_req("No allocator registered for memory type %u, falling back to host memory.", + amHeader.memoryType); + amHeader.memoryType = UCS_MEMORY_TYPE_HOST; + } } try { @@ -408,20 +423,25 @@ void RequestAm::request() ucp_request_param_t param = {.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_FLAGS | UCP_OP_ATTR_FIELD_USER_DATA, - .flags = UCP_AM_SEND_FLAG_REPLY, - .datatype = ucp_dt_make_contig(1), + .flags = amSend._flags, + .datatype = amSend._datatype, .user_data = this}; param.cb.send = _amSendCallback; AmHeader header = {.memoryType = amSend._memoryType, + .memoryTypePolicy = amSend._memoryTypePolicy, .receiverCallbackInfo = amSend._receiverCallbackInfo}; _header = header.serialize(); - void* request = ucp_am_send_nbx(_endpoint->getHandle(), + const void* sendBuffer = + (amSend._datatype == UCP_DATATYPE_IOV) + ? reinterpret_cast(amSend._iov.data()) + : amSend._buffer; + void* request = ucp_am_send_nbx(_endpoint->getHandle(), 0, _header.data(), _header.size(), - amSend._buffer, - amSend._length, + sendBuffer, + amSend._count, ¶m); std::lock_guard lock(_mutex); @@ -477,7 +497,14 @@ void RequestAm::populateDelayedSubmission() std::visit(data::dispatch{ [this, &log](data::AmSend amSend) { - log(amSend._buffer, amSend._length, amSend._memoryType); + if (amSend._datatype == UCP_DATATYPE_IOV) { + size_t totalLength{0}; + for (const auto& segment : amSend._iov) + totalLength += segment.length; + log(amSend._iov.data(), totalLength, amSend._memoryType); + } else { + log(amSend._buffer, amSend._length, amSend._memoryType); + } }, [](auto) { throw std::runtime_error("Unreachable"); }, }, From 88b6cc9fcb15c9d4dc1e9b5a356232ddc80db2ed Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Tue, 17 Feb 2026 12:29:35 -0500 Subject: [PATCH 04/26] add test cases for host IOV AM send add test for strict memory policy unsupported-path --- cpp/tests/request.cpp | 87 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/cpp/tests/request.cpp b/cpp/tests/request.cpp index fa7fb1c18..fe41d070f 100644 --- a/cpp/tests/request.cpp +++ b/cpp/tests/request.cpp @@ -202,6 +202,93 @@ TEST_P(RequestTest, ProgressAm) ASSERT_THAT(_recv[0], ContainerEq(_send[0])); } +TEST_P(RequestTest, ProgressAmIovHost) +{ + if (_progressMode == ProgressMode::Wait) { + GTEST_SKIP() << "Interrupting UCP worker progress operation in wait mode is not possible"; + } + + if (_memoryType != UCS_MEMORY_TYPE_HOST) { + GTEST_SKIP() << "IOV test uses host buffers for deterministic validation"; + } + + const size_t messageLength = std::max(4, _messageLength); + std::vector send(messageLength); + std::iota(send.begin(), send.end(), 0); + + const size_t firstSegmentLength = messageLength / 2; + const size_t secondSegmentLength = messageLength - firstSegmentLength; + std::vector iov(2); + iov[0].buffer = send.data(); + iov[0].length = firstSegmentLength * sizeof(int); + iov[1].buffer = send.data() + firstSegmentLength; + iov[1].length = secondSegmentLength * sizeof(int); + + auto amSendParams = ucxx::AmSendParams{}; + amSendParams.datatype = UCP_DATATYPE_IOV; + amSendParams.memoryType = UCS_MEMORY_TYPE_HOST; + + std::vector> requests; + requests.push_back(_ep->amSend(iov, amSendParams)); + requests.push_back(_ep->amRecv()); + waitRequests(_worker, requests, _progressWorker); + + auto recvReq = requests[1]; + auto recvBuffer = recvReq->getRecvBuffer(); + ASSERT_EQ(recvBuffer->getType(), ucxx::BufferType::Host); + ASSERT_EQ(recvBuffer->getSize(), messageLength * sizeof(int)); + + std::vector recv(reinterpret_cast(recvBuffer->data()), + reinterpret_cast(recvBuffer->data()) + messageLength); + ASSERT_THAT(recv, ContainerEq(send)); +} + +TEST_P(RequestTest, ProgressAmIovValidation) +{ + auto amSendParams = ucxx::AmSendParams{}; + amSendParams.datatype = UCP_DATATYPE_IOV; + amSendParams.memoryType = UCS_MEMORY_TYPE_HOST; + + EXPECT_THROW(std::ignore = _ep->amSend(std::vector{}, amSendParams), + std::runtime_error); + + std::vector iovWithNullBuffer(1); + iovWithNullBuffer[0].buffer = nullptr; + iovWithNullBuffer[0].length = 16; + EXPECT_THROW(std::ignore = _ep->amSend(iovWithNullBuffer, amSendParams), std::runtime_error); + + std::vector send{1, 2, 3, 4}; + std::vector validIov(1); + validIov[0].buffer = send.data(); + validIov[0].length = send.size() * sizeof(send[0]); + + auto wrongDatatypeParams = amSendParams; + wrongDatatypeParams.datatype = ucp_dt_make_contig(1); + EXPECT_THROW(std::ignore = _ep->amSend(validIov, wrongDatatypeParams), std::runtime_error); +} + +TEST_P(RequestTest, ProgressAmMemoryTypePolicyStrict) +{ + if (_progressMode == ProgressMode::Wait) { + GTEST_SKIP() << "Interrupting UCP worker progress operation in wait mode is not possible"; + } + + const size_t bytes = std::max(_rndvThresh + 128, sizeof(int)); + std::vector send(bytes, 42); + + auto amSendParams = ucxx::AmSendParams{}; + amSendParams.memoryType = UCS_MEMORY_TYPE_CUDA; + amSendParams.memoryTypePolicy = ucxx::AmSendMemoryTypePolicy::ErrorOnUnsupported; + + std::vector> requests; + requests.push_back(_ep->amSend(send.data(), send.size(), amSendParams)); + requests.push_back(_ep->amRecv()); + waitRequests(_worker, requests, _progressWorker); + + ASSERT_EQ(requests[0]->getStatus(), UCS_OK); + ASSERT_EQ(requests[1]->getStatus(), UCS_ERR_UNSUPPORTED); +} + TEST_P(RequestTest, ProgressAmReceiverCallback) { if (_progressMode == ProgressMode::Wait) { From c0897292c610dff6dff390555e3b4e3d92c6a88c Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Tue, 17 Feb 2026 16:56:19 -0500 Subject: [PATCH 05/26] remove const from AmReceiverCallbackInfo members so copy and move constructors are created --- cpp/include/ucxx/typedefs.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/include/ucxx/typedefs.h b/cpp/include/ucxx/typedefs.h index 7a114492b..0f263d896 100644 --- a/cpp/include/ucxx/typedefs.h +++ b/cpp/include/ucxx/typedefs.h @@ -154,8 +154,8 @@ typedef uint64_t AmReceiverCallbackIdType; */ class AmReceiverCallbackInfo { public: - const AmReceiverCallbackOwnerType owner; ///< The owner name of the callback - const AmReceiverCallbackIdType id; ///< The unique identifier of the callback + AmReceiverCallbackOwnerType owner; ///< The owner name of the callback + AmReceiverCallbackIdType id; ///< The unique identifier of the callback AmReceiverCallbackInfo() = delete; From 87c169cdbf5920867e4990bb81fa9a4c66eacb3d Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Tue, 17 Feb 2026 16:57:43 -0500 Subject: [PATCH 06/26] Fix segfault in IOV sends (request_am.cpp) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The std::visit lambda was taking data::AmSend amSend by value, creating a temporary copy. For IOV sends, sendBuffer pointed to amSend._iov.data() — the copy's vector storage. When the lambda returned, the copy was destroyed, but ucp_am_send_nbx is async and still needed the IOV descriptors, causing a use-after-free. Changed to const data::AmSend& amSend so it references the original data in _requestData, which lives as long as the RequestAm object. Add UCP_OP_ATTR_FIELD_DATATYPE so UCX doesn't ignore the .datatype field --- cpp/src/request_am.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/src/request_am.cpp b/cpp/src/request_am.cpp index cdf9acbf0..847c38dc6 100644 --- a/cpp/src/request_am.cpp +++ b/cpp/src/request_am.cpp @@ -419,8 +419,9 @@ void RequestAm::request() { std::visit( data::dispatch{ - [this](data::AmSend amSend) { + [this](const data::AmSend& amSend) { ucp_request_param_t param = {.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | + UCP_OP_ATTR_FIELD_DATATYPE | UCP_OP_ATTR_FIELD_FLAGS | UCP_OP_ATTR_FIELD_USER_DATA, .flags = amSend._flags, From f4c69e27d73fa7ab65822303e39f5a3264552e6e Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Tue, 17 Feb 2026 17:00:08 -0500 Subject: [PATCH 07/26] update failing test case --- cpp/tests/request.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/cpp/tests/request.cpp b/cpp/tests/request.cpp index fe41d070f..02ecd06c6 100644 --- a/cpp/tests/request.cpp +++ b/cpp/tests/request.cpp @@ -283,9 +283,14 @@ TEST_P(RequestTest, ProgressAmMemoryTypePolicyStrict) std::vector> requests; requests.push_back(_ep->amSend(send.data(), send.size(), amSendParams)); requests.push_back(_ep->amRecv()); - waitRequests(_worker, requests, _progressWorker); - ASSERT_EQ(requests[0]->getStatus(), UCS_OK); + // Wait for completion without calling checkError(), since the receive request + // is expected to complete with UCS_ERR_UNSUPPORTED. + while (!requests[0]->isCompleted() || !requests[1]->isCompleted()) + _progressWorker(); + + // When the receiver rejects a rendezvous transfer, UCX propagates the error to + // both sides, so the send may also complete with UCS_ERR_UNSUPPORTED. ASSERT_EQ(requests[1]->getStatus(), UCS_ERR_UNSUPPORTED); } From f4bad746fc983601d7b575d5a4a4640af5bd250b Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Wed, 18 Feb 2026 08:01:57 -0500 Subject: [PATCH 08/26] support opaque userHeader field in AmHeader this is needed to include host-side information on tensor shape, strides, dtype etc when using I/O Vector (iov) APIs --- cpp/include/ucxx/internal/request_am.h | 3 +- cpp/include/ucxx/request.h | 11 +++ cpp/include/ucxx/request_am.h | 2 + cpp/include/ucxx/request_data.h | 2 + cpp/include/ucxx/typedefs.h | 3 +- cpp/src/internal/request_am.cpp | 8 +- cpp/src/request.cpp | 2 + cpp/src/request_am.cpp | 41 ++++++++-- cpp/src/request_data.cpp | 6 +- cpp/tests/request.cpp | 101 +++++++++++++++++++++++++ 10 files changed, 167 insertions(+), 12 deletions(-) diff --git a/cpp/include/ucxx/internal/request_am.h b/cpp/include/ucxx/internal/request_am.h index 5900a4e79..485cdc5e1 100644 --- a/cpp/include/ucxx/internal/request_am.h +++ b/cpp/include/ucxx/internal/request_am.h @@ -62,7 +62,8 @@ class RecvAmMessage { ucp_ep_h ep, std::shared_ptr request, std::shared_ptr buffer, - AmReceiverCallbackType receiverCallback = AmReceiverCallbackType()); + AmReceiverCallbackType receiverCallback = AmReceiverCallbackType(), + std::string userHeader = {}); /** * @brief Set the UCP request. diff --git a/cpp/include/ucxx/request.h b/cpp/include/ucxx/request.h index 9e237bf0e..2b12a6cb9 100644 --- a/cpp/include/ucxx/request.h +++ b/cpp/include/ucxx/request.h @@ -224,6 +224,17 @@ class Request : public Component { * @return The received buffer (if applicable) or `nullptr`. */ [[nodiscard]] virtual std::shared_ptr getRecvBuffer(); + + /** + * @brief Get the received user header. + * + * This method is used to get the user-defined header bytes for applicable derived classes + * (e.g., `RequestAm` receive operations), in all other cases this will return an empty + * string. + * + * @return The received user header (if applicable) or an empty string. + */ + [[nodiscard]] virtual std::string getRecvHeader(); }; } // namespace ucxx diff --git a/cpp/include/ucxx/request_am.h b/cpp/include/ucxx/request_am.h index 533610a7e..c90a6108b 100644 --- a/cpp/include/ucxx/request_am.h +++ b/cpp/include/ucxx/request_am.h @@ -161,6 +161,8 @@ class RequestAm : public Request { const ucp_am_recv_param_t* param); [[nodiscard]] std::shared_ptr getRecvBuffer() override; + + [[nodiscard]] std::string getRecvHeader() override; }; } // namespace ucxx diff --git a/cpp/include/ucxx/request_data.h b/cpp/include/ucxx/request_data.h index 993600a15..0d0be945b 100644 --- a/cpp/include/ucxx/request_data.h +++ b/cpp/include/ucxx/request_data.h @@ -39,6 +39,7 @@ class AmSend { AmSendMemoryTypePolicy::FallbackToHost}; ///< Receiver allocation policy. const std::optional _receiverCallbackInfo{ std::nullopt}; ///< Owner name and unique identifier of the receiver callback. + const std::string _userHeader{}; ///< Opaque user-defined header (arbitrary bytes, not necessarily text). /** * @brief Constructor for Active Message-specific send data. @@ -75,6 +76,7 @@ class AmSend { class AmReceive { public: std::shared_ptr<::ucxx::Buffer> _buffer{nullptr}; ///< The AM received message buffer + std::string _userHeader{}; ///< User-defined header from the sender (arbitrary bytes, not necessarily text). /** * @brief Constructor for Active Message-specific receive data. diff --git a/cpp/include/ucxx/typedefs.h b/cpp/include/ucxx/typedefs.h index 0f263d896..34b7e1564 100644 --- a/cpp/include/ucxx/typedefs.h +++ b/cpp/include/ucxx/typedefs.h @@ -192,7 +192,8 @@ struct AmSendParams { AmSendMemoryTypePolicy memoryTypePolicy{ AmSendMemoryTypePolicy::FallbackToHost}; ///< Receiver allocation policy. std::optional receiverCallbackInfo{ - std::nullopt}; ///< Optional receiver callback metadata. + std::nullopt}; ///< Optional receiver callback metadata. + std::string userHeader{}; ///< Opaque user-defined header (arbitrary bytes, not necessarily text). }; /** diff --git a/cpp/src/internal/request_am.cpp b/cpp/src/internal/request_am.cpp index 88db30363..6508ad67c 100644 --- a/cpp/src/internal/request_am.cpp +++ b/cpp/src/internal/request_am.cpp @@ -18,11 +18,15 @@ RecvAmMessage::RecvAmMessage(internal::AmData* amData, ucp_ep_h ep, std::shared_ptr request, std::shared_ptr buffer, - AmReceiverCallbackType receiverCallback) + AmReceiverCallbackType receiverCallback, + std::string userHeader) : _amData(amData), _ep(ep), _request(request) { std::visit(data::dispatch{ - [this, buffer](data::AmReceive& amReceive) { amReceive._buffer = buffer; }, + [this, buffer, &userHeader](data::AmReceive& amReceive) { + amReceive._buffer = buffer; + amReceive._userHeader = std::move(userHeader); + }, [](auto) { throw std::runtime_error("Unreachable"); }, }, _request->_requestData); diff --git a/cpp/src/request.cpp b/cpp/src/request.cpp index 2283e27f2..1551e4f23 100644 --- a/cpp/src/request.cpp +++ b/cpp/src/request.cpp @@ -242,4 +242,6 @@ const std::string& Request::getOwnerString() const { return _ownerString; } std::shared_ptr Request::getRecvBuffer() { return nullptr; } +std::string Request::getRecvHeader() { return {}; } + } // namespace ucxx diff --git a/cpp/src/request_am.cpp b/cpp/src/request_am.cpp index 847c38dc6..78a4f2dc5 100644 --- a/cpp/src/request_am.cpp +++ b/cpp/src/request_am.cpp @@ -34,6 +34,7 @@ struct AmHeader { ucs_memory_type_t memoryType; AmSendMemoryTypePolicy memoryTypePolicy; std::optional receiverCallbackInfo; + std::string userHeader; ///< Opaque user-defined header (arbitrary bytes, not necessarily text). static AmHeader deserialize(const std::string_view serialized) { @@ -71,9 +72,20 @@ struct AmHeader { memoryTypePolicy = static_cast(serializedMemoryTypePolicy); } + std::string userHeader{}; + if (offset + sizeof(size_t) <= serialized.size()) { + size_t userHeaderSize{0}; + decode(&userHeaderSize, sizeof(userHeaderSize)); + if (userHeaderSize > 0 && offset + userHeaderSize <= serialized.size()) { + userHeader.resize(userHeaderSize); + decode(userHeader.data(), userHeaderSize); + } + } + return AmHeader{.memoryType = memoryType, .memoryTypePolicy = memoryTypePolicy, - .receiverCallbackInfo = receiverCallbackInfo}; + .receiverCallbackInfo = receiverCallbackInfo, + .userHeader = std::move(userHeader)}; } const AmHeaderSerialized serialize() const @@ -84,9 +96,10 @@ struct AmHeader { const size_t amReceiverCallbackInfoSize = (receiverCallbackInfo) ? sizeof(ownerSize) + ownerSize + sizeof(receiverCallbackInfo->id) : 0; const uint8_t serializedMemoryTypePolicy = static_cast(memoryTypePolicy); + const size_t userHeaderSize = userHeader.size(); const size_t totalSize = sizeof(memoryType) + sizeof(hasReceiverCallback) + amReceiverCallbackInfoSize + - sizeof(serializedMemoryTypePolicy); + sizeof(serializedMemoryTypePolicy) + sizeof(userHeaderSize) + userHeaderSize; std::string serialized(totalSize, 0); auto encode = [&offset, &serialized](void const* data, size_t bytes) { @@ -102,6 +115,8 @@ struct AmHeader { encode(&receiverCallbackInfo->id, sizeof(receiverCallbackInfo->id)); } encode(&serializedMemoryTypePolicy, sizeof(serializedMemoryTypePolicy)); + encode(&userHeaderSize, sizeof(userHeaderSize)); + if (userHeaderSize > 0) { encode(userHeader.data(), userHeaderSize); } return serialized; } @@ -294,7 +309,8 @@ ucs_status_t RequestAm::recvCallback(void* arg, if (amHeader.memoryTypePolicy == AmSendMemoryTypePolicy::ErrorOnUnsupported) { ucxx_debug("No allocator registered for memory type %u and strict policy is active", amHeader.memoryType); - internal::RecvAmMessage recvAmMessage(amData, ep, req, nullptr, receiverCallback); + internal::RecvAmMessage recvAmMessage( + amData, ep, req, nullptr, receiverCallback, amHeader.userHeader); recvAmMessage.callback(nullptr, UCS_ERR_UNSUPPORTED); return UCS_ERR_UNSUPPORTED; } else { @@ -311,7 +327,8 @@ ucs_status_t RequestAm::recvCallback(void* arg, } auto recvAmMessage = - std::make_shared(amData, ep, req, buf, receiverCallback); + std::make_shared(amData, ep, req, buf, receiverCallback, + amHeader.userHeader); ucp_request_param_t requestParam = {.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA | @@ -370,7 +387,8 @@ ucs_status_t RequestAm::recvCallback(void* arg, } else { buf = amData->_allocators.at(UCS_MEMORY_TYPE_HOST)(length); - internal::RecvAmMessage recvAmMessage(amData, ep, req, buf, receiverCallback); + internal::RecvAmMessage recvAmMessage(amData, ep, req, buf, receiverCallback, + amHeader.userHeader); if (buf == nullptr) { ucxx_debug("Failed to allocate %lu bytes of memory", length); recvAmMessage._request->setStatus(UCS_ERR_NO_MEMORY); @@ -415,6 +433,16 @@ std::shared_ptr RequestAm::getRecvBuffer() _requestData); } +std::string RequestAm::getRecvHeader() +{ + return std::visit( + data::dispatch{ + [](const data::AmReceive& amReceive) { return amReceive._userHeader; }, + [](auto) -> std::string { return {}; }, + }, + _requestData); +} + void RequestAm::request() { std::visit( @@ -431,7 +459,8 @@ void RequestAm::request() param.cb.send = _amSendCallback; AmHeader header = {.memoryType = amSend._memoryType, .memoryTypePolicy = amSend._memoryTypePolicy, - .receiverCallbackInfo = amSend._receiverCallbackInfo}; + .receiverCallbackInfo = amSend._receiverCallbackInfo, + .userHeader = amSend._userHeader}; _header = header.serialize(); const void* sendBuffer = (amSend._datatype == UCP_DATATYPE_IOV) diff --git a/cpp/src/request_data.cpp b/cpp/src/request_data.cpp index 573c5af4e..1ffe140cb 100644 --- a/cpp/src/request_data.cpp +++ b/cpp/src/request_data.cpp @@ -30,7 +30,8 @@ AmSend::AmSend(const void* const buffer, _datatype(params.datatype), _memoryType(params.memoryType), _memoryTypePolicy(params.memoryTypePolicy), - _receiverCallbackInfo(params.receiverCallbackInfo) + _receiverCallbackInfo(params.receiverCallbackInfo), + _userHeader(params.userHeader) { if (_datatype != ucp_dt_make_contig(1)) throw std::runtime_error("Contiguous AM send requires datatype `ucp_dt_make_contig(1)`."); @@ -47,7 +48,8 @@ AmSend::AmSend(const std::vector& iov, const AmSendParams& params) _datatype(params.datatype), _memoryType(params.memoryType), _memoryTypePolicy(params.memoryTypePolicy), - _receiverCallbackInfo(params.receiverCallbackInfo) + _receiverCallbackInfo(params.receiverCallbackInfo), + _userHeader(params.userHeader) { if (_datatype != UCP_DATATYPE_IOV) throw std::runtime_error("IOV AM send requires datatype `UCP_DATATYPE_IOV`."); diff --git a/cpp/tests/request.cpp b/cpp/tests/request.cpp index 02ecd06c6..5e2acdc8b 100644 --- a/cpp/tests/request.cpp +++ b/cpp/tests/request.cpp @@ -356,6 +356,107 @@ TEST_P(RequestTest, ProgressAmReceiverCallback) ASSERT_THAT(_recv[0], ContainerEq(_send[0])); } +TEST_P(RequestTest, ProgressAmUserHeader) +{ + if (_progressMode == ProgressMode::Wait) { + GTEST_SKIP() << "Interrupting UCP worker progress operation in wait mode is not possible"; + } + + if (_memoryType != UCS_MEMORY_TYPE_HOST) { + GTEST_SKIP() << "User header test uses host buffers only"; + } + + allocate(1, false); + + const std::string sentHeader = "test-header-payload-\x00\x01\x02\xff"; + + auto amSendParams = ucxx::AmSendParams{}; + amSendParams.userHeader = sentHeader; + + std::vector> requests; + requests.push_back(_ep->amSend(_sendPtr[0], _messageSize, amSendParams)); + requests.push_back(_ep->amRecv()); + waitRequests(_worker, requests, _progressWorker); + + auto recvReq = requests[1]; + ASSERT_EQ(recvReq->getRecvHeader(), sentHeader); + + _recvPtr[0] = recvReq->getRecvBuffer()->data(); + copyResults(); + ASSERT_THAT(_recv[0], ContainerEq(_send[0])); +} + +TEST_P(RequestTest, ProgressAmIovUserHeader) +{ + if (_progressMode == ProgressMode::Wait) { + GTEST_SKIP() << "Interrupting UCP worker progress operation in wait mode is not possible"; + } + + if (_memoryType != UCS_MEMORY_TYPE_HOST) { + GTEST_SKIP() << "IOV user header test uses host buffers only"; + } + + const size_t messageLength = std::max(4, _messageLength); + std::vector send(messageLength); + std::iota(send.begin(), send.end(), 0); + + const size_t firstSegmentLength = messageLength / 2; + const size_t secondSegmentLength = messageLength - firstSegmentLength; + std::vector iov(2); + iov[0].buffer = send.data(); + iov[0].length = firstSegmentLength * sizeof(int); + iov[1].buffer = send.data() + firstSegmentLength; + iov[1].length = secondSegmentLength * sizeof(int); + + const std::string sentHeader = "iov-user-header-data"; + + auto amSendParams = ucxx::AmSendParams{}; + amSendParams.datatype = UCP_DATATYPE_IOV; + amSendParams.memoryType = UCS_MEMORY_TYPE_HOST; + amSendParams.userHeader = sentHeader; + + std::vector> requests; + requests.push_back(_ep->amSend(iov, amSendParams)); + requests.push_back(_ep->amRecv()); + waitRequests(_worker, requests, _progressWorker); + + auto recvReq = requests[1]; + auto recvBuffer = recvReq->getRecvBuffer(); + ASSERT_EQ(recvBuffer->getType(), ucxx::BufferType::Host); + ASSERT_EQ(recvBuffer->getSize(), messageLength * sizeof(int)); + ASSERT_EQ(recvReq->getRecvHeader(), sentHeader); + + std::vector recv(reinterpret_cast(recvBuffer->data()), + reinterpret_cast(recvBuffer->data()) + messageLength); + ASSERT_THAT(recv, ContainerEq(send)); +} + +TEST_P(RequestTest, ProgressAmEmptyUserHeader) +{ + if (_progressMode == ProgressMode::Wait) { + GTEST_SKIP() << "Interrupting UCP worker progress operation in wait mode is not possible"; + } + + if (_memoryType != UCS_MEMORY_TYPE_HOST) { + GTEST_SKIP() << "User header test uses host buffers only"; + } + + allocate(1, false); + + // Send without user header (default empty) + std::vector> requests; + requests.push_back(_ep->amSend(_sendPtr[0], _messageSize, _memoryType)); + requests.push_back(_ep->amRecv()); + waitRequests(_worker, requests, _progressWorker); + + auto recvReq = requests[1]; + ASSERT_EQ(recvReq->getRecvHeader(), std::string{}); + + _recvPtr[0] = recvReq->getRecvBuffer()->data(); + copyResults(); + ASSERT_THAT(_recv[0], ContainerEq(_send[0])); +} + TEST_P(RequestTest, ProgressStream) { allocate(); From 53b3cd22799237b5da3c8fd211733a1b1ed74572 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Wed, 18 Feb 2026 09:19:36 -0500 Subject: [PATCH 09/26] document header segment size limitation in typedefs.h --- cpp/include/ucxx/typedefs.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cpp/include/ucxx/typedefs.h b/cpp/include/ucxx/typedefs.h index 34b7e1564..76e69814f 100644 --- a/cpp/include/ucxx/typedefs.h +++ b/cpp/include/ucxx/typedefs.h @@ -194,6 +194,13 @@ struct AmSendParams { std::optional receiverCallbackInfo{ std::nullopt}; ///< Optional receiver callback metadata. std::string userHeader{}; ///< Opaque user-defined header (arbitrary bytes, not necessarily text). + ///< This is serialized into the AM header parameter of + ///< `ucp_am_send_nbx`, which is subject to transport-level size + ///< limits. For TCP, the default segment size is ~8 KiB + ///< (`UCX_TCP_TX_SEG_SIZE` / `UCX_TCP_RX_SEG_SIZE`). Headers that + ///< exceed the transport limit will cause a fatal UCX error. Keep + ///< user headers small (recommended < 4 KiB) or increase the + ///< segment size environment variables as needed. }; /** From e50506efa34ffe5be91b6f1d665a5698ffae763e Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Wed, 18 Feb 2026 10:51:59 -0500 Subject: [PATCH 10/26] update comment to clarify purpose of _count --- cpp/include/ucxx/request_data.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cpp/include/ucxx/request_data.h b/cpp/include/ucxx/request_data.h index 0d0be945b..edd991431 100644 --- a/cpp/include/ucxx/request_data.h +++ b/cpp/include/ucxx/request_data.h @@ -31,7 +31,8 @@ class AmSend { const void* const _buffer{nullptr}; ///< The raw pointer where data to be sent is stored. const size_t _length{0}; ///< Message length in bytes (contiguous datatype only). const std::vector _iov{}; ///< Segments for IOV datatype. - const size_t _count{0}; ///< Element count according to selected datatype. + const size_t _count{0}; ///< Count passed to `ucp_am_send_nbx`: byte count + ///< for contiguous, number of IOV segments for IOV. const uint32_t _flags{UCP_AM_SEND_FLAG_REPLY}; ///< UCP AM send flags. const ucp_datatype_t _datatype{ucp_dt_make_contig(1)}; ///< UCP datatype. const ucs_memory_type_t _memoryType{UCS_MEMORY_TYPE_HOST}; ///< Memory type used on the operation From 1f6604dba272bdc37439ff5ba4249008612baf19 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Wed, 25 Feb 2026 10:21:04 -0500 Subject: [PATCH 11/26] change iov argument to AmSend from const ref to pass by value (then move) --- cpp/include/ucxx/endpoint.h | 2 +- cpp/include/ucxx/request_data.h | 2 +- cpp/src/endpoint.cpp | 4 ++-- cpp/src/request_data.cpp | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/include/ucxx/endpoint.h b/cpp/include/ucxx/endpoint.h index 2f413911a..7a3cd8604 100644 --- a/cpp/include/ucxx/endpoint.h +++ b/cpp/include/ucxx/endpoint.h @@ -416,7 +416,7 @@ class Endpoint : public Component { * @returns Request to be subsequently checked for the completion and its state. */ [[nodiscard]] std::shared_ptr amSend( - const std::vector& iov, + std::vector iov, const AmSendParams& params, const bool enablePythonFuture = false, RequestCallbackUserFunction callbackFunction = nullptr, diff --git a/cpp/include/ucxx/request_data.h b/cpp/include/ucxx/request_data.h index edd991431..34a50a1a7 100644 --- a/cpp/include/ucxx/request_data.h +++ b/cpp/include/ucxx/request_data.h @@ -63,7 +63,7 @@ class AmSend { * @param[in] iov vector of IOV segments to send. * @param[in] params send parameters controlling datatype/flags/policies. */ - explicit AmSend(const decltype(_iov)& iov, const AmSendParams& params = AmSendParams{}); + explicit AmSend(decltype(_iov) iov, const AmSendParams& params = AmSendParams{}); AmSend() = delete; }; diff --git a/cpp/src/endpoint.cpp b/cpp/src/endpoint.cpp index 75a25b4f1..8247f9e89 100644 --- a/cpp/src/endpoint.cpp +++ b/cpp/src/endpoint.cpp @@ -478,7 +478,7 @@ std::shared_ptr Endpoint::amSend(const void* const buffer, callbackData)); } -std::shared_ptr Endpoint::amSend(const std::vector& iov, +std::shared_ptr Endpoint::amSend(std::vector iov, const AmSendParams& params, const bool enablePythonFuture, RequestCallbackUserFunction callbackFunction, @@ -487,7 +487,7 @@ std::shared_ptr Endpoint::amSend(const std::vector& iov, auto endpoint = std::dynamic_pointer_cast(shared_from_this()); return registerInflightRequest( createRequestAm(endpoint, - data::AmSend(iov, params), + data::AmSend(std::move(iov), params), enablePythonFuture, callbackFunction, callbackData)); diff --git a/cpp/src/request_data.cpp b/cpp/src/request_data.cpp index 1ffe140cb..281b3a511 100644 --- a/cpp/src/request_data.cpp +++ b/cpp/src/request_data.cpp @@ -39,10 +39,10 @@ AmSend::AmSend(const void* const buffer, if (_buffer == nullptr && _length > 0) throw std::runtime_error("Buffer cannot be a nullptr."); } -AmSend::AmSend(const std::vector& iov, const AmSendParams& params) +AmSend::AmSend(std::vector iov, const AmSendParams& params) : _buffer(nullptr), _length(0), - _iov(iov), + _iov(std::move(iov)), _count(iov.size()), _flags(params.flags), _datatype(params.datatype), From 2e3c0430d2a692fb2872e4f599d5da2a155aaf61 Mon Sep 17 00:00:00 2001 From: Gregory Lee Date: Thu, 26 Feb 2026 17:08:23 -0500 Subject: [PATCH 12/26] Update cpp/src/request_data.cpp Co-authored-by: Peter Andreas Entschev --- cpp/src/request_data.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/request_data.cpp b/cpp/src/request_data.cpp index 281b3a511..032dd755c 100644 --- a/cpp/src/request_data.cpp +++ b/cpp/src/request_data.cpp @@ -36,7 +36,7 @@ AmSend::AmSend(const void* const buffer, if (_datatype != ucp_dt_make_contig(1)) throw std::runtime_error("Contiguous AM send requires datatype `ucp_dt_make_contig(1)`."); - if (_buffer == nullptr && _length > 0) throw std::runtime_error("Buffer cannot be a nullptr."); + if (_buffer == nullptr && _length > 0) throw std::runtime_error("Buffer cannot be a nullptr when length is > 0."); } AmSend::AmSend(std::vector iov, const AmSendParams& params) From a8f7f585ad1c4d718bb74c58d6840edcf23d5308 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Thu, 26 Feb 2026 17:23:19 -0500 Subject: [PATCH 13/26] lint fixes --- cpp/include/ucxx/internal/request_am.h | 3 +- cpp/include/ucxx/request.h | 2 +- cpp/include/ucxx/request_am.h | 2 +- cpp/include/ucxx/request_data.h | 21 +++++---- cpp/include/ucxx/typedefs.h | 10 ++-- cpp/src/endpoint.cpp | 37 ++++++--------- cpp/src/request_am.cpp | 64 ++++++++++++-------------- cpp/src/request_data.cpp | 6 +-- cpp/tests/request.cpp | 11 +++-- 9 files changed, 74 insertions(+), 82 deletions(-) diff --git a/cpp/include/ucxx/internal/request_am.h b/cpp/include/ucxx/internal/request_am.h index 485cdc5e1..61de98cc5 100644 --- a/cpp/include/ucxx/internal/request_am.h +++ b/cpp/include/ucxx/internal/request_am.h @@ -57,13 +57,14 @@ class RecvAmMessage { * @param[in] request request to be later notified/delivered to user. * @param[in] buffer buffer containing the received data * @param[in] receiverCallback receiver callback to execute when request completes. + * @param[in] userHeader user-defined header associated with the received message. */ RecvAmMessage(internal::AmData* amData, ucp_ep_h ep, std::shared_ptr request, std::shared_ptr buffer, AmReceiverCallbackType receiverCallback = AmReceiverCallbackType(), - std::string userHeader = {}); + std::string userHeader = {}); /** * @brief Set the UCP request. diff --git a/cpp/include/ucxx/request.h b/cpp/include/ucxx/request.h index 2b12a6cb9..6ad7607bd 100644 --- a/cpp/include/ucxx/request.h +++ b/cpp/include/ucxx/request.h @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: BSD-3-Clause */ #pragma once diff --git a/cpp/include/ucxx/request_am.h b/cpp/include/ucxx/request_am.h index c90a6108b..e98f16ccb 100644 --- a/cpp/include/ucxx/request_am.h +++ b/cpp/include/ucxx/request_am.h @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: BSD-3-Clause */ #pragma once diff --git a/cpp/include/ucxx/request_data.h b/cpp/include/ucxx/request_data.h index 34a50a1a7..012dd816d 100644 --- a/cpp/include/ucxx/request_data.h +++ b/cpp/include/ucxx/request_data.h @@ -1,11 +1,12 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: BSD-3-Clause */ #pragma once #include #include +#include #include #include @@ -28,19 +29,20 @@ namespace data { */ class AmSend { public: - const void* const _buffer{nullptr}; ///< The raw pointer where data to be sent is stored. - const size_t _length{0}; ///< Message length in bytes (contiguous datatype only). + const void* const _buffer{nullptr}; ///< The raw pointer where data to be sent is stored. + const size_t _length{0}; ///< Message length in bytes (contiguous datatype only). const std::vector _iov{}; ///< Segments for IOV datatype. - const size_t _count{0}; ///< Count passed to `ucp_am_send_nbx`: byte count - ///< for contiguous, number of IOV segments for IOV. - const uint32_t _flags{UCP_AM_SEND_FLAG_REPLY}; ///< UCP AM send flags. - const ucp_datatype_t _datatype{ucp_dt_make_contig(1)}; ///< UCP datatype. + const size_t _count{0}; ///< Count passed to `ucp_am_send_nbx`: byte count + ///< for contiguous, number of IOV segments for IOV. + const uint32_t _flags{UCP_AM_SEND_FLAG_REPLY}; ///< UCP AM send flags. + const ucp_datatype_t _datatype{ucp_dt_make_contig(1)}; ///< UCP datatype. const ucs_memory_type_t _memoryType{UCS_MEMORY_TYPE_HOST}; ///< Memory type used on the operation const AmSendMemoryTypePolicy _memoryTypePolicy{ AmSendMemoryTypePolicy::FallbackToHost}; ///< Receiver allocation policy. const std::optional _receiverCallbackInfo{ std::nullopt}; ///< Owner name and unique identifier of the receiver callback. - const std::string _userHeader{}; ///< Opaque user-defined header (arbitrary bytes, not necessarily text). + const std::string + _userHeader{}; ///< Opaque user-defined header (arbitrary bytes, not necessarily text). /** * @brief Constructor for Active Message-specific send data. @@ -77,7 +79,8 @@ class AmSend { class AmReceive { public: std::shared_ptr<::ucxx::Buffer> _buffer{nullptr}; ///< The AM received message buffer - std::string _userHeader{}; ///< User-defined header from the sender (arbitrary bytes, not necessarily text). + std::string _userHeader{}; ///< User-defined header from the sender (arbitrary bytes, not + ///< necessarily text). /** * @brief Constructor for Active Message-specific receive data. diff --git a/cpp/include/ucxx/typedefs.h b/cpp/include/ucxx/typedefs.h index 76e69814f..8ff0cf57f 100644 --- a/cpp/include/ucxx/typedefs.h +++ b/cpp/include/ucxx/typedefs.h @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: BSD-3-Clause */ #pragma once @@ -186,15 +186,15 @@ enum class AmSendMemoryTypePolicy { * breaking existing callers. */ struct AmSendParams { - uint32_t flags{UCP_AM_SEND_FLAG_REPLY}; ///< UCP AM send flags. - ucp_datatype_t datatype{ucp_dt_make_contig(1)}; ///< Datatype used by `ucp_am_send_nbx`. + uint32_t flags{UCP_AM_SEND_FLAG_REPLY}; ///< UCP AM send flags. + ucp_datatype_t datatype{ucp_dt_make_contig(1)}; ///< Datatype used by `ucp_am_send_nbx`. ucs_memory_type_t memoryType{UCS_MEMORY_TYPE_HOST}; ///< Sender memory type hint. AmSendMemoryTypePolicy memoryTypePolicy{ AmSendMemoryTypePolicy::FallbackToHost}; ///< Receiver allocation policy. std::optional receiverCallbackInfo{ std::nullopt}; ///< Optional receiver callback metadata. - std::string userHeader{}; ///< Opaque user-defined header (arbitrary bytes, not necessarily text). - ///< This is serialized into the AM header parameter of + std::string userHeader{}; ///< Opaque user-defined header (arbitrary bytes, not necessarily + ///< text). This is serialized into the AM header parameter of ///< `ucp_am_send_nbx`, which is subject to transport-level size ///< limits. For TCP, the default segment size is ~8 KiB ///< (`UCX_TCP_TX_SEG_SIZE` / `UCX_TCP_RX_SEG_SIZE`). Headers that diff --git a/cpp/src/endpoint.cpp b/cpp/src/endpoint.cpp index 8247f9e89..4c20bed4c 100644 --- a/cpp/src/endpoint.cpp +++ b/cpp/src/endpoint.cpp @@ -450,16 +450,11 @@ std::shared_ptr Endpoint::amSend( RequestCallbackUserFunction callbackFunction, RequestCallbackUserData callbackData) { - auto params = AmSendParams{}; - params.memoryType = memoryType; - params.receiverCallbackInfo = receiverCallbackInfo; - - return amSend(buffer, - length, - params, - enablePythonFuture, - callbackFunction, - callbackData); + auto params = AmSendParams{}; + params.memoryType = memoryType; + params.receiverCallbackInfo = receiverCallbackInfo; + + return amSend(buffer, length, params, enablePythonFuture, callbackFunction, callbackData); } std::shared_ptr Endpoint::amSend(const void* const buffer, @@ -470,12 +465,11 @@ std::shared_ptr Endpoint::amSend(const void* const buffer, RequestCallbackUserData callbackData) { auto endpoint = std::dynamic_pointer_cast(shared_from_this()); - return registerInflightRequest( - createRequestAm(endpoint, - data::AmSend(buffer, length, params), - enablePythonFuture, - callbackFunction, - callbackData)); + return registerInflightRequest(createRequestAm(endpoint, + data::AmSend(buffer, length, params), + enablePythonFuture, + callbackFunction, + callbackData)); } std::shared_ptr Endpoint::amSend(std::vector iov, @@ -485,12 +479,11 @@ std::shared_ptr Endpoint::amSend(std::vector iov, RequestCallbackUserData callbackData) { auto endpoint = std::dynamic_pointer_cast(shared_from_this()); - return registerInflightRequest( - createRequestAm(endpoint, - data::AmSend(std::move(iov), params), - enablePythonFuture, - callbackFunction, - callbackData)); + return registerInflightRequest(createRequestAm(endpoint, + data::AmSend(std::move(iov), params), + enablePythonFuture, + callbackFunction, + callbackData)); } std::shared_ptr Endpoint::amRecv(const bool enablePythonFuture, diff --git a/cpp/src/request_am.cpp b/cpp/src/request_am.cpp index 78a4f2dc5..77efdbb4e 100644 --- a/cpp/src/request_am.cpp +++ b/cpp/src/request_am.cpp @@ -96,10 +96,10 @@ struct AmHeader { const size_t amReceiverCallbackInfoSize = (receiverCallbackInfo) ? sizeof(ownerSize) + ownerSize + sizeof(receiverCallbackInfo->id) : 0; const uint8_t serializedMemoryTypePolicy = static_cast(memoryTypePolicy); - const size_t userHeaderSize = userHeader.size(); - const size_t totalSize = - sizeof(memoryType) + sizeof(hasReceiverCallback) + amReceiverCallbackInfoSize + - sizeof(serializedMemoryTypePolicy) + sizeof(userHeaderSize) + userHeaderSize; + const size_t userHeaderSize = userHeader.size(); + const size_t totalSize = sizeof(memoryType) + sizeof(hasReceiverCallback) + + amReceiverCallbackInfoSize + sizeof(serializedMemoryTypePolicy) + + sizeof(userHeaderSize) + userHeaderSize; std::string serialized(totalSize, 0); auto encode = [&offset, &serialized](void const* data, size_t bytes) { @@ -326,9 +326,8 @@ ucs_status_t RequestAm::recvCallback(void* arg, ucxx_debug("Exception calling allocator: %s", e.what()); } - auto recvAmMessage = - std::make_shared(amData, ep, req, buf, receiverCallback, - amHeader.userHeader); + auto recvAmMessage = std::make_shared( + amData, ep, req, buf, receiverCallback, amHeader.userHeader); ucp_request_param_t requestParam = {.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_USER_DATA | @@ -387,8 +386,8 @@ ucs_status_t RequestAm::recvCallback(void* arg, } else { buf = amData->_allocators.at(UCS_MEMORY_TYPE_HOST)(length); - internal::RecvAmMessage recvAmMessage(amData, ep, req, buf, receiverCallback, - amHeader.userHeader); + internal::RecvAmMessage recvAmMessage( + amData, ep, req, buf, receiverCallback, amHeader.userHeader); if (buf == nullptr) { ucxx_debug("Failed to allocate %lu bytes of memory", length); recvAmMessage._request->setStatus(UCS_ERR_NO_MEMORY); @@ -435,12 +434,11 @@ std::shared_ptr RequestAm::getRecvBuffer() std::string RequestAm::getRecvHeader() { - return std::visit( - data::dispatch{ - [](const data::AmReceive& amReceive) { return amReceive._userHeader; }, - [](auto) -> std::string { return {}; }, - }, - _requestData); + return std::visit(data::dispatch{ + [](const data::AmReceive& amReceive) { return amReceive._userHeader; }, + [](auto) -> std::string { return {}; }, + }, + _requestData); } void RequestAm::request() @@ -448,25 +446,23 @@ void RequestAm::request() std::visit( data::dispatch{ [this](const data::AmSend& amSend) { - ucp_request_param_t param = {.op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | - UCP_OP_ATTR_FIELD_DATATYPE | - UCP_OP_ATTR_FIELD_FLAGS | - UCP_OP_ATTR_FIELD_USER_DATA, - .flags = amSend._flags, - .datatype = amSend._datatype, - .user_data = this}; - - param.cb.send = _amSendCallback; - AmHeader header = {.memoryType = amSend._memoryType, - .memoryTypePolicy = amSend._memoryTypePolicy, - .receiverCallbackInfo = amSend._receiverCallbackInfo, - .userHeader = amSend._userHeader}; - _header = header.serialize(); - const void* sendBuffer = - (amSend._datatype == UCP_DATATYPE_IOV) - ? reinterpret_cast(amSend._iov.data()) - : amSend._buffer; - void* request = ucp_am_send_nbx(_endpoint->getHandle(), + ucp_request_param_t param = { + .op_attr_mask = UCP_OP_ATTR_FIELD_CALLBACK | UCP_OP_ATTR_FIELD_DATATYPE | + UCP_OP_ATTR_FIELD_FLAGS | UCP_OP_ATTR_FIELD_USER_DATA, + .flags = amSend._flags, + .datatype = amSend._datatype, + .user_data = this}; + + param.cb.send = _amSendCallback; + AmHeader header = {.memoryType = amSend._memoryType, + .memoryTypePolicy = amSend._memoryTypePolicy, + .receiverCallbackInfo = amSend._receiverCallbackInfo, + .userHeader = amSend._userHeader}; + _header = header.serialize(); + const void* sendBuffer = (amSend._datatype == UCP_DATATYPE_IOV) + ? reinterpret_cast(amSend._iov.data()) + : amSend._buffer; + void* request = ucp_am_send_nbx(_endpoint->getHandle(), 0, _header.data(), _header.size(), diff --git a/cpp/src/request_data.cpp b/cpp/src/request_data.cpp index 032dd755c..6e7ba6c54 100644 --- a/cpp/src/request_data.cpp +++ b/cpp/src/request_data.cpp @@ -1,5 +1,5 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: BSD-3-Clause */ #include @@ -19,9 +19,7 @@ namespace ucxx { namespace data { -AmSend::AmSend(const void* const buffer, - const size_t length, - const AmSendParams& params) +AmSend::AmSend(const void* const buffer, const size_t length, const AmSendParams& params) : _buffer(buffer), _length(length), _iov(), diff --git a/cpp/tests/request.cpp b/cpp/tests/request.cpp index 5e2acdc8b..aefb97c23 100644 --- a/cpp/tests/request.cpp +++ b/cpp/tests/request.cpp @@ -1,10 +1,11 @@ /** - * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. + * SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. * SPDX-License-Identifier: BSD-3-Clause */ #include #include #include +#include #include #include #include @@ -216,7 +217,7 @@ TEST_P(RequestTest, ProgressAmIovHost) std::vector send(messageLength); std::iota(send.begin(), send.end(), 0); - const size_t firstSegmentLength = messageLength / 2; + const size_t firstSegmentLength = messageLength / 2; const size_t secondSegmentLength = messageLength - firstSegmentLength; std::vector iov(2); iov[0].buffer = send.data(); @@ -276,9 +277,9 @@ TEST_P(RequestTest, ProgressAmMemoryTypePolicyStrict) const size_t bytes = std::max(_rndvThresh + 128, sizeof(int)); std::vector send(bytes, 42); - auto amSendParams = ucxx::AmSendParams{}; - amSendParams.memoryType = UCS_MEMORY_TYPE_CUDA; - amSendParams.memoryTypePolicy = ucxx::AmSendMemoryTypePolicy::ErrorOnUnsupported; + auto amSendParams = ucxx::AmSendParams{}; + amSendParams.memoryType = UCS_MEMORY_TYPE_CUDA; + amSendParams.memoryTypePolicy = ucxx::AmSendMemoryTypePolicy::ErrorOnUnsupported; std::vector> requests; requests.push_back(_ep->amSend(send.data(), send.size(), amSendParams)); From 49634b8edbe6d01ad68dcbc6637ac9552ea25bd4 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Thu, 26 Feb 2026 17:57:30 -0500 Subject: [PATCH 14/26] fix use after move --- cpp/src/request_data.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/request_data.cpp b/cpp/src/request_data.cpp index 6e7ba6c54..f1cfdc086 100644 --- a/cpp/src/request_data.cpp +++ b/cpp/src/request_data.cpp @@ -41,7 +41,7 @@ AmSend::AmSend(std::vector iov, const AmSendParams& params) : _buffer(nullptr), _length(0), _iov(std::move(iov)), - _count(iov.size()), + _count(_iov.size()), _flags(params.flags), _datatype(params.datatype), _memoryType(params.memoryType), From 382ab84d5548d98151a89147e2e5abd0230bbc6d Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Thu, 26 Feb 2026 18:02:56 -0500 Subject: [PATCH 15/26] change userHeader to std::vector --- cpp/include/ucxx/internal/request_am.h | 3 +- cpp/include/ucxx/request_data.h | 6 ++-- cpp/include/ucxx/typedefs.h | 46 +++++++++++++++++++++----- cpp/src/internal/request_am.cpp | 4 ++- cpp/src/request_am.cpp | 12 +++++-- cpp/tests/request.cpp | 6 ++-- 6 files changed, 56 insertions(+), 21 deletions(-) diff --git a/cpp/include/ucxx/internal/request_am.h b/cpp/include/ucxx/internal/request_am.h index 61de98cc5..71f81b79d 100644 --- a/cpp/include/ucxx/internal/request_am.h +++ b/cpp/include/ucxx/internal/request_am.h @@ -9,6 +9,7 @@ #include #include #include +#include #include @@ -64,7 +65,7 @@ class RecvAmMessage { std::shared_ptr request, std::shared_ptr buffer, AmReceiverCallbackType receiverCallback = AmReceiverCallbackType(), - std::string userHeader = {}); + std::vector userHeader = {}); /** * @brief Set the UCP request. diff --git a/cpp/include/ucxx/request_data.h b/cpp/include/ucxx/request_data.h index 012dd816d..addf8b680 100644 --- a/cpp/include/ucxx/request_data.h +++ b/cpp/include/ucxx/request_data.h @@ -41,8 +41,7 @@ class AmSend { AmSendMemoryTypePolicy::FallbackToHost}; ///< Receiver allocation policy. const std::optional _receiverCallbackInfo{ std::nullopt}; ///< Owner name and unique identifier of the receiver callback. - const std::string - _userHeader{}; ///< Opaque user-defined header (arbitrary bytes, not necessarily text). + const std::vector _userHeader{}; ///< Opaque user-defined header bytes. /** * @brief Constructor for Active Message-specific send data. @@ -79,8 +78,7 @@ class AmSend { class AmReceive { public: std::shared_ptr<::ucxx::Buffer> _buffer{nullptr}; ///< The AM received message buffer - std::string _userHeader{}; ///< User-defined header from the sender (arbitrary bytes, not - ///< necessarily text). + std::vector _userHeader{}; ///< User-defined header bytes from the sender. /** * @brief Constructor for Active Message-specific receive data. diff --git a/cpp/include/ucxx/typedefs.h b/cpp/include/ucxx/typedefs.h index 8ff0cf57f..63685fb3c 100644 --- a/cpp/include/ucxx/typedefs.h +++ b/cpp/include/ucxx/typedefs.h @@ -5,12 +5,17 @@ #pragma once #include +#include +#include #include #include #include #include +#include #include +#include #include +#include #include @@ -192,15 +197,38 @@ struct AmSendParams { AmSendMemoryTypePolicy memoryTypePolicy{ AmSendMemoryTypePolicy::FallbackToHost}; ///< Receiver allocation policy. std::optional receiverCallbackInfo{ - std::nullopt}; ///< Optional receiver callback metadata. - std::string userHeader{}; ///< Opaque user-defined header (arbitrary bytes, not necessarily - ///< text). This is serialized into the AM header parameter of - ///< `ucp_am_send_nbx`, which is subject to transport-level size - ///< limits. For TCP, the default segment size is ~8 KiB - ///< (`UCX_TCP_TX_SEG_SIZE` / `UCX_TCP_RX_SEG_SIZE`). Headers that - ///< exceed the transport limit will cause a fatal UCX error. Keep - ///< user headers small (recommended < 4 KiB) or increase the - ///< segment size environment variables as needed. + std::nullopt}; ///< Optional receiver callback metadata. + std::vector userHeader{}; ///< Opaque user-defined header bytes. This is serialized + ///< into the AM header parameter of `ucp_am_send_nbx`, + ///< which is subject to transport-level size limits. For + ///< TCP, the default segment size is ~8 KiB + ///< (`UCX_TCP_TX_SEG_SIZE` / `UCX_TCP_RX_SEG_SIZE`). + ///< Headers that exceed the transport limit will cause a + ///< fatal UCX error. Keep user headers small + ///< (recommended < 4 KiB) or increase segment size env + ///< vars as needed. + + /** + * @brief Set opaque user header bytes from raw pointer. + * + * @param[in] data pointer to input bytes, may be `nullptr` iff `size == 0`. + * @param[in] size number of bytes in input. + */ + void setUserHeader(const void* data, size_t size) + { + if (size > 0 && data == nullptr) + throw std::invalid_argument( + "AmSendParams::setUserHeader received null data with non-zero size"); + userHeader.resize(size); + if (size > 0) memcpy(userHeader.data(), data, size); + } + + /** + * @brief Convenience overload to set user header from string-like views. + * + * @param[in] data view of opaque bytes. + */ + void setUserHeader(std::string_view data) { setUserHeader(data.data(), data.size()); } }; /** diff --git a/cpp/src/internal/request_am.cpp b/cpp/src/internal/request_am.cpp index 6508ad67c..e2eb65c25 100644 --- a/cpp/src/internal/request_am.cpp +++ b/cpp/src/internal/request_am.cpp @@ -9,6 +9,8 @@ #include #include +#include +#include namespace ucxx { @@ -19,7 +21,7 @@ RecvAmMessage::RecvAmMessage(internal::AmData* amData, std::shared_ptr request, std::shared_ptr buffer, AmReceiverCallbackType receiverCallback, - std::string userHeader) + std::vector userHeader) : _amData(amData), _ep(ep), _request(request) { std::visit(data::dispatch{ diff --git a/cpp/src/request_am.cpp b/cpp/src/request_am.cpp index 77efdbb4e..974c4d5cb 100644 --- a/cpp/src/request_am.cpp +++ b/cpp/src/request_am.cpp @@ -3,6 +3,7 @@ * SPDX-License-Identifier: BSD-3-Clause */ #include +#include #include #include #include @@ -34,7 +35,7 @@ struct AmHeader { ucs_memory_type_t memoryType; AmSendMemoryTypePolicy memoryTypePolicy; std::optional receiverCallbackInfo; - std::string userHeader; ///< Opaque user-defined header (arbitrary bytes, not necessarily text). + std::vector userHeader; ///< Opaque user-defined header bytes. static AmHeader deserialize(const std::string_view serialized) { @@ -72,7 +73,7 @@ struct AmHeader { memoryTypePolicy = static_cast(serializedMemoryTypePolicy); } - std::string userHeader{}; + std::vector userHeader{}; if (offset + sizeof(size_t) <= serialized.size()) { size_t userHeaderSize{0}; decode(&userHeaderSize, sizeof(userHeaderSize)); @@ -435,7 +436,12 @@ std::shared_ptr RequestAm::getRecvBuffer() std::string RequestAm::getRecvHeader() { return std::visit(data::dispatch{ - [](const data::AmReceive& amReceive) { return amReceive._userHeader; }, + [](const data::AmReceive& amReceive) { + if (amReceive._userHeader.empty()) return std::string{}; + return std::string( + reinterpret_cast(amReceive._userHeader.data()), + amReceive._userHeader.size()); + }, [](auto) -> std::string { return {}; }, }, _requestData); diff --git a/cpp/tests/request.cpp b/cpp/tests/request.cpp index aefb97c23..c0b5dcd33 100644 --- a/cpp/tests/request.cpp +++ b/cpp/tests/request.cpp @@ -371,8 +371,8 @@ TEST_P(RequestTest, ProgressAmUserHeader) const std::string sentHeader = "test-header-payload-\x00\x01\x02\xff"; - auto amSendParams = ucxx::AmSendParams{}; - amSendParams.userHeader = sentHeader; + auto amSendParams = ucxx::AmSendParams{}; + amSendParams.setUserHeader(sentHeader); std::vector> requests; requests.push_back(_ep->amSend(_sendPtr[0], _messageSize, amSendParams)); @@ -414,7 +414,7 @@ TEST_P(RequestTest, ProgressAmIovUserHeader) auto amSendParams = ucxx::AmSendParams{}; amSendParams.datatype = UCP_DATATYPE_IOV; amSendParams.memoryType = UCS_MEMORY_TYPE_HOST; - amSendParams.userHeader = sentHeader; + amSendParams.setUserHeader(sentHeader); std::vector> requests; requests.push_back(_ep->amSend(iov, amSendParams)); From 201a127298076896d422cb40f5513fbd257e6162 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Wed, 18 Feb 2026 06:17:25 -0500 Subject: [PATCH 16/26] add Python bindings support for am_send with memory_type_policy wrap the amSend overload taking amSendParams --- python/ucxx/ucxx/_lib/libucxx.pyx | 19 ++- .../ucxx/_lib/tests/test_server_client.py | 113 +++++++++++++++++- python/ucxx/ucxx/_lib/ucxx_api.pxd | 17 ++- python/ucxx/ucxx/_lib_async/endpoint.py | 10 +- .../_lib_async/tests/test_send_recv_am.py | 19 ++- 5 files changed, 165 insertions(+), 13 deletions(-) diff --git a/python/ucxx/ucxx/_lib/libucxx.pyx b/python/ucxx/ucxx/_lib/libucxx.pyx index 29438b40c..1af12c01b 100644 --- a/python/ucxx/ucxx/_lib/libucxx.pyx +++ b/python/ucxx/ucxx/_lib/libucxx.pyx @@ -338,6 +338,11 @@ class Feature(enum.Enum): AM = UCP_FEATURE_AM +class PythonAmSendMemoryTypePolicy(enum.Enum): + FallbackToHost = AmSendMemoryTypePolicy.FallbackToHost + ErrorOnUnsupported = AmSendMemoryTypePolicy.ErrorOnUnsupported + + class PythonRequestNotifierWaitState(enum.Enum): Ready = RequestNotifierWaitState.Ready Timeout = RequestNotifierWaitState.Timeout @@ -1457,21 +1462,29 @@ cdef class UCXEndpoint(): return ep_matched - def am_send(self, Array arr) -> UCXRequest: + def am_send(self, Array arr, memory_type_policy=None) -> UCXRequest: cdef void* buf = arr.ptr cdef size_t nbytes = arr.nbytes cdef bint cuda_array = arr.cuda cdef shared_ptr[Request] req + cdef AmSendParams params if not self._context_feature_flags & Feature.AM.value: raise ValueError("UCXContext must be created with `Feature.AM`") + params.memoryType = ( + UCS_MEMORY_TYPE_CUDA if cuda_array else UCS_MEMORY_TYPE_HOST + ) + if memory_type_policy is not None: + params.memoryTypePolicy = ( + memory_type_policy.value + ) + with nogil: req = self._endpoint.get().amSend( buf, nbytes, - UCS_MEMORY_TYPE_CUDA if cuda_array else UCS_MEMORY_TYPE_HOST, - nullopt, + params, self._enable_python_future ) diff --git a/python/ucxx/ucxx/_lib/tests/test_server_client.py b/python/ucxx/ucxx/_lib/tests/test_server_client.py index b36653023..f9f7e04d4 100644 --- a/python/ucxx/ucxx/_lib/tests/test_server_client.py +++ b/python/ucxx/ucxx/_lib/tests/test_server_client.py @@ -17,9 +17,9 @@ WireupMessageSize = 10 -def _send(ep, api, message): +def _send(ep, api, message, memory_type_policy=None): if api == "am": - return ep.am_send(message) + return ep.am_send(message, memory_type_policy=memory_type_policy) elif api == "stream": return ep.stream_send(message) else: @@ -177,6 +177,115 @@ def _echo_client(transfer_api, msg_size, progress_mode, port): worker.stop_progress_thread() +def _echo_server_am_params( + get_queue, put_queue, msg_size, progress_mode, memory_type_policy +): + """Server that echoes AM messages using the AmSendParams code path.""" + feature_flags = (ucx_api.Feature.WAKEUP, ucx_api.Feature.AM) + ctx = ucx_api.UCXContext(feature_flags=feature_flags) + worker = ucx_api.UCXWorker(ctx) + + if progress_mode == "blocking": + worker.init_blocking_progress_mode() + else: + worker.start_progress_thread() + + ep = [None] + + def _listener_handler(conn_request): + ep[0] = listener.create_endpoint_from_conn_request(conn_request, True) + + listener = ucx_api.UCXListener.create( + worker=worker, port=0, cb_func=_listener_handler + ) + put_queue.put(listener.port) + + while ep[0] is None: + if progress_mode == "blocking": + worker.progress() + + msg = Array(bytearray(msg_size)) + requests = [ep[0].am_recv()] + wait_requests(worker, progress_mode, requests) + msg = Array(requests[0].recv_buffer) + requests = [ + ep[0].am_send(msg, memory_type_policy=memory_type_policy) + ] + wait_requests(worker, progress_mode, requests) + + while True: + try: + get_queue.get(block=True, timeout=0.1) + except QueueIsEmpty: + continue + else: + break + + if progress_mode == "thread": + worker.stop_progress_thread() + + +def _echo_client_am_params(msg_size, progress_mode, memory_type_policy, port): + """Client that sends and receives AM messages using AmSendParams.""" + feature_flags = (ucx_api.Feature.WAKEUP, ucx_api.Feature.AM) + ctx = ucx_api.UCXContext(feature_flags=feature_flags) + worker = ucx_api.UCXWorker(ctx) + + if progress_mode == "blocking": + worker.init_blocking_progress_mode() + else: + worker.start_progress_thread() + + ep = ucx_api.UCXEndpoint.create( + worker, "127.0.0.1", port, endpoint_error_handling=True, + ) + + if progress_mode == "blocking": + worker.progress() + + send_msg = bytes(os.urandom(msg_size)) + + requests = [ + ep.am_send(Array(send_msg), memory_type_policy=memory_type_policy), + ep.am_recv(), + ] + wait_requests(worker, progress_mode, requests) + + recv_msg = requests[1].recv_buffer + assert bytes(recv_msg) == send_msg + + if progress_mode == "thread": + worker.stop_progress_thread() + + +@pytest.mark.parametrize( + "memory_type_policy", + [None, ucx_api.PythonAmSendMemoryTypePolicy.FallbackToHost], +) +@pytest.mark.parametrize("msg_size", [10, 2**24]) +@pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) +def test_server_client_am_params(msg_size, progress_mode, memory_type_policy): + put_queue, get_queue = mp.Queue(), mp.Queue() + server = mp.Process( + target=_echo_server_am_params, + args=( + put_queue, get_queue, msg_size, progress_mode, memory_type_policy + ), + ) + server.start() + port = get_queue.get() + client = mp.Process( + target=_echo_client_am_params, + args=(msg_size, progress_mode, memory_type_policy, port), + ) + client.start() + client.join(timeout=60) + terminate_process(client) + put_queue.put("Finished") + server.join(timeout=10) + terminate_process(server) + + @pytest.mark.parametrize("transfer_api", ["am", "stream", "tag"]) @pytest.mark.parametrize("msg_size", [0, 10, 2**24]) @pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) diff --git a/python/ucxx/ucxx/_lib/ucxx_api.pxd b/python/ucxx/ucxx/_lib/ucxx_api.pxd index e47542015..05ef01433 100644 --- a/python/ucxx/ucxx/_lib/ucxx_api.pxd +++ b/python/ucxx/ucxx/_lib/ucxx_api.pxd @@ -4,7 +4,7 @@ from posix cimport fcntl -from libc.stdint cimport int64_t, uint16_t, uint64_t +from libc.stdint cimport int64_t, uint16_t, uint32_t, uint64_t from libcpp cimport bool as cpp_bool from libcpp.functional cimport function from libcpp.memory cimport shared_ptr, unique_ptr @@ -196,6 +196,15 @@ cdef extern from "" namespace "ucxx" nogil: cdef cppclass AmReceiverCallbackInfo: pass + cdef enum class AmSendMemoryTypePolicy: + FallbackToHost + ErrorOnUnsupported + + cdef cppclass AmSendParams: + uint32_t flags + ucs_memory_type_t memoryType + AmSendMemoryTypePolicy memoryTypePolicy + # Using function[Buffer] here doesn't seem possible due to Cython bugs/limitations. # The workaround is to use a raw C function pointer and let it be parsed by the # compiler. @@ -309,6 +318,12 @@ cdef extern from "" namespace "ucxx" nogil: nullopt_t receiver_callback_info, bint enable_python_future ) except +raise_py_error + shared_ptr[Request] amSend( + const void* const buffer, + size_t length, + AmSendParams params, + bint enable_python_future + ) except +raise_py_error shared_ptr[Request] amRecv( bint enable_python_future ) except +raise_py_error diff --git a/python/ucxx/ucxx/_lib_async/endpoint.py b/python/ucxx/ucxx/_lib_async/endpoint.py index c5c61e6b3..3c4b76a98 100644 --- a/python/ucxx/ucxx/_lib_async/endpoint.py +++ b/python/ucxx/ucxx/_lib_async/endpoint.py @@ -167,7 +167,7 @@ async def close(self, period=10**10, max_attempts=1): await asyncio.sleep(0) self.abort(period=period, max_attempts=max_attempts) - async def am_send(self, buffer): + async def am_send(self, buffer, memory_type_policy=None): """Send `buffer` to connected peer via active messages. Parameters @@ -175,6 +175,10 @@ async def am_send(self, buffer): buffer: exposing the buffer protocol or array/cuda interface The buffer to send. Raise ValueError if buffer is smaller than nbytes. + memory_type_policy: PythonAmSendMemoryTypePolicy, optional + Policy controlling receiver-side allocation when no allocator is + registered for the sender's memory type. Default ``None`` uses + ``FallbackToHost``. """ self._ep.raise_on_error() if self.closed: @@ -196,7 +200,9 @@ async def am_send(self, buffer): self._send_count += 1 try: - request = self._ep.am_send(buffer) + request = self._ep.am_send( + buffer, memory_type_policy=memory_type_policy + ) return await request.wait() except UCXCanceled as e: # If self._ep has already been closed and destroyed, we reraise the diff --git a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py index 1032d89ce..ee7fed60c 100644 --- a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py +++ b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py @@ -8,6 +8,7 @@ import pytest import ucxx +from ucxx._lib.libucxx import PythonAmSendMemoryTypePolicy from ucxx._lib_async.utils_test import wait_listener_client_handlers msg_sizes = [0] + [2**i for i in range(0, 25, 4)] @@ -54,10 +55,10 @@ def get_data(): return ret -def simple_server(size, recv): +def simple_server(size, recv, memory_type_policy=None): async def server(ep): recv = await ep.am_recv() - await ep.am_send(recv) + await ep.am_send(recv, memory_type_policy=memory_type_policy) await ep.close() return server @@ -67,14 +68,20 @@ async def server(ep): @pytest.mark.parametrize("size", msg_sizes) @pytest.mark.parametrize("recv_wait", [True, False]) @pytest.mark.parametrize("data", get_data()) -async def test_send_recv_am(size, recv_wait, data): +@pytest.mark.parametrize( + "memory_type_policy", + [None, PythonAmSendMemoryTypePolicy.FallbackToHost], +) +async def test_send_recv_am(size, recv_wait, data, memory_type_policy): rndv_thresh = 8192 ucxx.init(options={"RNDV_THRESH": str(rndv_thresh)}) msg = data["generator"](size) recv = [] - listener = ucxx.create_listener(simple_server(size, recv)) + listener = ucxx.create_listener( + simple_server(size, recv, memory_type_policy=memory_type_policy) + ) num_clients = 1 clients = [ await ucxx.create_endpoint(ucxx.get_address(), listener.port) @@ -85,7 +92,9 @@ async def test_send_recv_am(size, recv_wait, data): # ep.am_recv call will have to wait, rather than return # immediately as receive data is already available. await asyncio.sleep(1) - await asyncio.gather(*(c.am_send(msg) for c in clients)) + await asyncio.gather( + *(c.am_send(msg, memory_type_policy=memory_type_policy) for c in clients) + ) recv_msgs = await asyncio.gather(*(c.am_recv() for c in clients)) for recv_msg in recv_msgs: From d6f55ce13efe86eccff1363458a4af8763e830b6 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Wed, 18 Feb 2026 07:20:01 -0500 Subject: [PATCH 17/26] add Endpoint.am_send_iov to Python API add basic sync and async test cases for send/recv of AM I/O Vector messages --- python/ucxx/ucxx/_lib/libucxx.pyx | 50 +++++++++ .../ucxx/_lib/tests/test_server_client.py | 105 ++++++++++++++++++ python/ucxx/ucxx/_lib/ucxx_api.pxd | 14 +++ python/ucxx/ucxx/_lib_async/endpoint.py | 41 +++++++ .../_lib_async/tests/test_send_recv_am.py | 32 ++++++ 5 files changed, 242 insertions(+) diff --git a/python/ucxx/ucxx/_lib/libucxx.pyx b/python/ucxx/ucxx/_lib/libucxx.pyx index 1af12c01b..cee0c82b5 100644 --- a/python/ucxx/ucxx/_lib/libucxx.pyx +++ b/python/ucxx/ucxx/_lib/libucxx.pyx @@ -1490,6 +1490,56 @@ cdef class UCXEndpoint(): return UCXRequest(&req, self._enable_python_future) + def am_send_iov(self, list arrays, memory_type_policy=None) -> UCXRequest: + cdef vector[ucp_dt_iov_t] iov_vec + cdef ucp_dt_iov_t entry + cdef shared_ptr[Request] req + cdef AmSendParams params + + if not self._context_feature_flags & Feature.AM.value: + raise ValueError("UCXContext must be created with `Feature.AM`") + + if len(arrays) == 0: + raise ValueError("IOV segment list must not be empty") + + cdef list wrapped = [] + for buf in arrays: + if not isinstance(buf, Array): + buf = Array(buf) + wrapped.append(buf) + + # Validate all segments have the same memory type + cdef bint first_cuda = (wrapped[0]).cuda + for arr_obj in wrapped: + if (arr_obj).cuda != first_cuda: + raise ValueError( + "All IOV segments must have the same memory type " + "(all host or all CUDA)" + ) + + for arr_obj in wrapped: + entry.buffer = (arr_obj).ptr + entry.length = (arr_obj).nbytes + iov_vec.push_back(entry) + + params.datatype = UCP_DATATYPE_IOV + params.memoryType = ( + UCS_MEMORY_TYPE_CUDA if first_cuda else UCS_MEMORY_TYPE_HOST + ) + if memory_type_policy is not None: + params.memoryTypePolicy = ( + memory_type_policy.value + ) + + with nogil: + req = self._endpoint.get().amSend( + move(iov_vec), + params, + self._enable_python_future + ) + + return UCXRequest(&req, self._enable_python_future) + def am_recv(self) -> UCXRequest: cdef shared_ptr[Request] req diff --git a/python/ucxx/ucxx/_lib/tests/test_server_client.py b/python/ucxx/ucxx/_lib/tests/test_server_client.py index f9f7e04d4..10d7723bb 100644 --- a/python/ucxx/ucxx/_lib/tests/test_server_client.py +++ b/python/ucxx/ucxx/_lib/tests/test_server_client.py @@ -286,6 +286,111 @@ def test_server_client_am_params(msg_size, progress_mode, memory_type_policy): terminate_process(server) +def _echo_server_am_iov(get_queue, put_queue, msg_size, progress_mode): + """Server that receives an IOV AM message and echoes it back.""" + feature_flags = (ucx_api.Feature.WAKEUP, ucx_api.Feature.AM) + ctx = ucx_api.UCXContext(feature_flags=feature_flags) + worker = ucx_api.UCXWorker(ctx) + + if progress_mode == "blocking": + worker.init_blocking_progress_mode() + else: + worker.start_progress_thread() + + ep = [None] + + def _listener_handler(conn_request): + ep[0] = listener.create_endpoint_from_conn_request(conn_request, True) + + listener = ucx_api.UCXListener.create( + worker=worker, port=0, cb_func=_listener_handler + ) + put_queue.put(listener.port) + + while ep[0] is None: + if progress_mode == "blocking": + worker.progress() + + # Receive the IOV message (arrives as a single contiguous buffer) + requests = [ep[0].am_recv()] + wait_requests(worker, progress_mode, requests) + msg = Array(requests[0].recv_buffer) + # Echo back as a regular contiguous send + requests = [ep[0].am_send(msg)] + wait_requests(worker, progress_mode, requests) + + while True: + try: + get_queue.get(block=True, timeout=0.1) + except QueueIsEmpty: + continue + else: + break + + if progress_mode == "thread": + worker.stop_progress_thread() + + +def _echo_client_am_iov(msg_size, progress_mode, port): + """Client that sends an IOV AM message and receives the echo.""" + feature_flags = (ucx_api.Feature.WAKEUP, ucx_api.Feature.AM) + ctx = ucx_api.UCXContext(feature_flags=feature_flags) + worker = ucx_api.UCXWorker(ctx) + + if progress_mode == "blocking": + worker.init_blocking_progress_mode() + else: + worker.start_progress_thread() + + ep = ucx_api.UCXEndpoint.create( + worker, "127.0.0.1", port, endpoint_error_handling=True, + ) + + if progress_mode == "blocking": + worker.progress() + + send_msg = bytes(os.urandom(msg_size)) + + # Split the message into two segments for IOV send + mid = msg_size // 2 + seg1 = Array(send_msg[:mid]) + seg2 = Array(send_msg[mid:]) + + requests = [ + ep.am_send_iov([seg1, seg2]), + ep.am_recv(), + ] + wait_requests(worker, progress_mode, requests) + + recv_msg = requests[1].recv_buffer + assert bytes(recv_msg) == send_msg + + if progress_mode == "thread": + worker.stop_progress_thread() + + +@pytest.mark.parametrize("msg_size", [10, 2**24]) +@pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) +def test_server_client_am_iov(msg_size, progress_mode): + put_queue, get_queue = mp.Queue(), mp.Queue() + server = mp.Process( + target=_echo_server_am_iov, + args=(put_queue, get_queue, msg_size, progress_mode), + ) + server.start() + port = get_queue.get() + client = mp.Process( + target=_echo_client_am_iov, + args=(msg_size, progress_mode, port), + ) + client.start() + client.join(timeout=60) + terminate_process(client) + put_queue.put("Finished") + server.join(timeout=10) + terminate_process(server) + + @pytest.mark.parametrize("transfer_api", ["am", "stream", "tag"]) @pytest.mark.parametrize("msg_size", [0, 10, 2**24]) @pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) diff --git a/python/ucxx/ucxx/_lib/ucxx_api.pxd b/python/ucxx/ucxx/_lib/ucxx_api.pxd index 05ef01433..c77630f9f 100644 --- a/python/ucxx/ucxx/_lib/ucxx_api.pxd +++ b/python/ucxx/ucxx/_lib/ucxx_api.pxd @@ -55,6 +55,14 @@ cdef extern from "ucp/api/ucp.h" nogil: ctypedef uint64_t ucp_tag_t + ctypedef uint64_t ucp_datatype_t + + ctypedef struct ucp_dt_iov_t: + void* buffer + size_t length + + ucp_datatype_t UCP_DATATYPE_IOV + ctypedef struct ucp_tag_recv_info_t: pass @@ -202,6 +210,7 @@ cdef extern from "" namespace "ucxx" nogil: cdef cppclass AmSendParams: uint32_t flags + ucp_datatype_t datatype ucs_memory_type_t memoryType AmSendMemoryTypePolicy memoryTypePolicy @@ -324,6 +333,11 @@ cdef extern from "" namespace "ucxx" nogil: AmSendParams params, bint enable_python_future ) except +raise_py_error + shared_ptr[Request] amSend( + vector[ucp_dt_iov_t] iov, + AmSendParams params, + bint enable_python_future + ) except +raise_py_error shared_ptr[Request] amRecv( bint enable_python_future ) except +raise_py_error diff --git a/python/ucxx/ucxx/_lib_async/endpoint.py b/python/ucxx/ucxx/_lib_async/endpoint.py index 3c4b76a98..5e495b76c 100644 --- a/python/ucxx/ucxx/_lib_async/endpoint.py +++ b/python/ucxx/ucxx/_lib_async/endpoint.py @@ -210,6 +210,47 @@ async def am_send(self, buffer, memory_type_policy=None): if self._ep is None: raise e + async def am_send_iov(self, buffers, memory_type_policy=None): + """Send multiple buffers as a single IOV active message. + + Parameters + ---------- + buffers: list + List of buffers exposing the buffer protocol or array/cuda + interface. All buffers must have the same memory type (all host + or all CUDA). + memory_type_policy: PythonAmSendMemoryTypePolicy, optional + Policy controlling receiver-side allocation when no allocator is + registered for the sender's memory type. Default ``None`` uses + ``FallbackToHost``. + """ + self._ep.raise_on_error() + if self.closed: + raise UCXCloseError("Endpoint closed") + if not (isinstance(buffers, list) or isinstance(buffers, tuple)): + raise ValueError("The `buffers` argument must be a `list` or `tuple`") + arrays = [Array(b) if not isinstance(b, Array) else b for b in buffers] + + if logger.isEnabledFor(logging.DEBUG): + log = "[AM Send IOV #%03d] ep: 0x%x, segments: %d, nbytes: %s" % ( + self._send_count, + self.uid, + len(arrays), + tuple([b.nbytes for b in arrays]), + ) + logger.debug(log) + + self._send_count += 1 + + try: + request = self._ep.am_send_iov( + arrays, memory_type_policy=memory_type_policy + ) + return await request.wait() + except UCXCanceled as e: + if self._ep is None: + raise e + # @ucx_api.nvtx_annotate("UCXPY_SEND", color="green", domain="ucxpy") async def send(self, buffer, tag=None, force_tag=False): """Send `buffer` to connected peer. diff --git a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py index ee7fed60c..b71c0a938 100644 --- a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py +++ b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py @@ -12,6 +12,7 @@ from ucxx._lib_async.utils_test import wait_listener_client_handlers msg_sizes = [0] + [2**i for i in range(0, 25, 4)] +iov_msg_sizes = [10] + [2**i for i in range(4, 25, 4)] def _bytearray_assert_equal(a, b): @@ -107,3 +108,34 @@ async def test_send_recv_am(size, recv_wait, data, memory_type_policy): await asyncio.gather(*(c.close() for c in clients)) await wait_listener_client_handlers(listener) + + +def simple_iov_server(): + async def server(ep): + recv = await ep.am_recv() + await ep.am_send(recv) + await ep.close() + + return server + + +@pytest.mark.asyncio +@pytest.mark.parametrize("size", iov_msg_sizes) +async def test_send_recv_am_iov(size): + ucxx.init() + + msg = bytearray(b"m" * size) + mid = size // 2 + seg1 = msg[:mid] + seg2 = msg[mid:] + + listener = ucxx.create_listener(simple_iov_server()) + ep = await ucxx.create_endpoint(ucxx.get_address(), listener.port) + + await ep.am_send_iov([seg1, seg2]) + recv_msg = await ep.am_recv() + + assert bytes(recv_msg) == bytes(msg) + + await ep.close() + await wait_listener_client_handlers(listener) From 5abba24a01c832cae67d9b05b668325a85493a91 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Wed, 18 Feb 2026 08:15:36 -0500 Subject: [PATCH 18/26] update Python AM APIs with user_header support --- python/ucxx/ucxx/_lib/libucxx.pyx | 30 +++++++++++++++++++++++-- python/ucxx/ucxx/_lib/ucxx_api.pxd | 2 ++ python/ucxx/ucxx/_lib_async/endpoint.py | 16 +++++++++---- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/python/ucxx/ucxx/_lib/libucxx.pyx b/python/ucxx/ucxx/_lib/libucxx.pyx index cee0c82b5..3b8b3bbfc 100644 --- a/python/ucxx/ucxx/_lib/libucxx.pyx +++ b/python/ucxx/ucxx/_lib/libucxx.pyx @@ -1005,6 +1005,20 @@ cdef class UCXRequest(): elif bufType == BufferType.Host: return _get_host_buffer(buf.get()) + @property + def recv_header(self) -> bytes: + """Get the user-defined header from an AM receive request. + + Returns the opaque header bytes sent by the peer. Returns empty bytes + if no user header was sent or for non-AM requests. + """ + cdef string header + + with nogil: + header = self._request.get().getRecvHeader() + + return header + def is_completed(self) -> bool: warnings.warn( "UCXRequest.is_completed() is deprecated and will soon be removed, " @@ -1462,7 +1476,9 @@ cdef class UCXEndpoint(): return ep_matched - def am_send(self, Array arr, memory_type_policy=None) -> UCXRequest: + def am_send( + self, Array arr, memory_type_policy=None, user_header=None + ) -> UCXRequest: cdef void* buf = arr.ptr cdef size_t nbytes = arr.nbytes cdef bint cuda_array = arr.cuda @@ -1479,6 +1495,10 @@ cdef class UCXEndpoint(): params.memoryTypePolicy = ( memory_type_policy.value ) + if user_header is not None: + if not isinstance(user_header, bytes): + raise TypeError("user_header must be bytes") + params.userHeader = user_header with nogil: req = self._endpoint.get().amSend( @@ -1490,7 +1510,9 @@ cdef class UCXEndpoint(): return UCXRequest(&req, self._enable_python_future) - def am_send_iov(self, list arrays, memory_type_policy=None) -> UCXRequest: + def am_send_iov( + self, list arrays, memory_type_policy=None, user_header=None + ) -> UCXRequest: cdef vector[ucp_dt_iov_t] iov_vec cdef ucp_dt_iov_t entry cdef shared_ptr[Request] req @@ -1530,6 +1552,10 @@ cdef class UCXEndpoint(): params.memoryTypePolicy = ( memory_type_policy.value ) + if user_header is not None: + if not isinstance(user_header, bytes): + raise TypeError("user_header must be bytes") + params.userHeader = user_header with nogil: req = self._endpoint.get().amSend( diff --git a/python/ucxx/ucxx/_lib/ucxx_api.pxd b/python/ucxx/ucxx/_lib/ucxx_api.pxd index c77630f9f..552eb60aa 100644 --- a/python/ucxx/ucxx/_lib/ucxx_api.pxd +++ b/python/ucxx/ucxx/_lib/ucxx_api.pxd @@ -213,6 +213,7 @@ cdef extern from "" namespace "ucxx" nogil: ucp_datatype_t datatype ucs_memory_type_t memoryType AmSendMemoryTypePolicy memoryTypePolicy + string userHeader # Using function[Buffer] here doesn't seem possible due to Cython bugs/limitations. # The workaround is to use a raw C function pointer and let it be parsed by the @@ -393,6 +394,7 @@ cdef extern from "" namespace "ucxx" nogil: void checkError() except +raise_py_error void* getFuture() except +raise_py_error shared_ptr[Buffer] getRecvBuffer() except +raise_py_error + string getRecvHeader() except +raise_py_error void cancel() diff --git a/python/ucxx/ucxx/_lib_async/endpoint.py b/python/ucxx/ucxx/_lib_async/endpoint.py index 5e495b76c..79b372b6d 100644 --- a/python/ucxx/ucxx/_lib_async/endpoint.py +++ b/python/ucxx/ucxx/_lib_async/endpoint.py @@ -167,7 +167,7 @@ async def close(self, period=10**10, max_attempts=1): await asyncio.sleep(0) self.abort(period=period, max_attempts=max_attempts) - async def am_send(self, buffer, memory_type_policy=None): + async def am_send(self, buffer, memory_type_policy=None, user_header=None): """Send `buffer` to connected peer via active messages. Parameters @@ -179,6 +179,8 @@ async def am_send(self, buffer, memory_type_policy=None): Policy controlling receiver-side allocation when no allocator is registered for the sender's memory type. Default ``None`` uses ``FallbackToHost``. + user_header: bytes, optional + Opaque user-defined header bytes to send alongside the message. """ self._ep.raise_on_error() if self.closed: @@ -201,7 +203,9 @@ async def am_send(self, buffer, memory_type_policy=None): try: request = self._ep.am_send( - buffer, memory_type_policy=memory_type_policy + buffer, + memory_type_policy=memory_type_policy, + user_header=user_header, ) return await request.wait() except UCXCanceled as e: @@ -210,7 +214,7 @@ async def am_send(self, buffer, memory_type_policy=None): if self._ep is None: raise e - async def am_send_iov(self, buffers, memory_type_policy=None): + async def am_send_iov(self, buffers, memory_type_policy=None, user_header=None): """Send multiple buffers as a single IOV active message. Parameters @@ -223,6 +227,8 @@ async def am_send_iov(self, buffers, memory_type_policy=None): Policy controlling receiver-side allocation when no allocator is registered for the sender's memory type. Default ``None`` uses ``FallbackToHost``. + user_header: bytes, optional + Opaque user-defined header bytes to send alongside the message. """ self._ep.raise_on_error() if self.closed: @@ -244,7 +250,9 @@ async def am_send_iov(self, buffers, memory_type_policy=None): try: request = self._ep.am_send_iov( - arrays, memory_type_policy=memory_type_policy + arrays, + memory_type_policy=memory_type_policy, + user_header=user_header, ) return await request.wait() except UCXCanceled as e: From 4d395a068a7c86eda56e0309f5d4953b4b30ab06 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Wed, 18 Feb 2026 09:06:19 -0500 Subject: [PATCH 19/26] add user_header test cases --- .../ucxx/_lib/tests/test_server_client.py | 226 ++++++++++++++++++ .../_lib_async/tests/test_send_recv_am.py | 111 +++++++++ 2 files changed, 337 insertions(+) diff --git a/python/ucxx/ucxx/_lib/tests/test_server_client.py b/python/ucxx/ucxx/_lib/tests/test_server_client.py index 10d7723bb..b282b59e0 100644 --- a/python/ucxx/ucxx/_lib/tests/test_server_client.py +++ b/python/ucxx/ucxx/_lib/tests/test_server_client.py @@ -391,6 +391,232 @@ def test_server_client_am_iov(msg_size, progress_mode): terminate_process(server) +def _user_header_server(get_queue, put_queue, msg_size, progress_mode): + """Server that receives an AM message with user header and echoes both back.""" + feature_flags = (ucx_api.Feature.WAKEUP, ucx_api.Feature.AM) + ctx = ucx_api.UCXContext(feature_flags=feature_flags) + worker = ucx_api.UCXWorker(ctx) + + if progress_mode == "blocking": + worker.init_blocking_progress_mode() + else: + worker.start_progress_thread() + + ep = [None] + + def _listener_handler(conn_request): + ep[0] = listener.create_endpoint_from_conn_request(conn_request, True) + + listener = ucx_api.UCXListener.create( + worker=worker, port=0, cb_func=_listener_handler + ) + put_queue.put(listener.port) + + while ep[0] is None: + if progress_mode == "blocking": + worker.progress() + + # Receive the message and its user header + requests = [ep[0].am_recv()] + wait_requests(worker, progress_mode, requests) + recv_header = requests[0].recv_header + msg = Array(requests[0].recv_buffer) + + # Echo back with the same user header + requests = [ep[0].am_send(msg, user_header=recv_header)] + wait_requests(worker, progress_mode, requests) + + while True: + try: + get_queue.get(block=True, timeout=0.1) + except QueueIsEmpty: + continue + else: + break + + if progress_mode == "thread": + worker.stop_progress_thread() + + +def _user_header_client(msg_size, progress_mode, port): + """Client that sends AM with user header and validates the echo.""" + feature_flags = (ucx_api.Feature.WAKEUP, ucx_api.Feature.AM) + ctx = ucx_api.UCXContext(feature_flags=feature_flags) + worker = ucx_api.UCXWorker(ctx) + + if progress_mode == "blocking": + worker.init_blocking_progress_mode() + else: + worker.start_progress_thread() + + ep = ucx_api.UCXEndpoint.create( + worker, "127.0.0.1", port, endpoint_error_handling=True, + ) + + if progress_mode == "blocking": + worker.progress() + + send_msg = bytes(os.urandom(msg_size)) + user_header = b"test-header-\x00\x01\xff" + + requests = [ + ep.am_send(Array(send_msg), user_header=user_header), + ep.am_recv(), + ] + wait_requests(worker, progress_mode, requests) + + recv_msg = requests[1].recv_buffer + recv_header = requests[1].recv_header + assert bytes(recv_msg) == send_msg + assert recv_header == user_header + + if progress_mode == "thread": + worker.stop_progress_thread() + + +def _user_header_iov_client(msg_size, progress_mode, port): + """Client that sends IOV AM with user header and validates the echo.""" + feature_flags = (ucx_api.Feature.WAKEUP, ucx_api.Feature.AM) + ctx = ucx_api.UCXContext(feature_flags=feature_flags) + worker = ucx_api.UCXWorker(ctx) + + if progress_mode == "blocking": + worker.init_blocking_progress_mode() + else: + worker.start_progress_thread() + + ep = ucx_api.UCXEndpoint.create( + worker, "127.0.0.1", port, endpoint_error_handling=True, + ) + + if progress_mode == "blocking": + worker.progress() + + send_msg = bytes(os.urandom(msg_size)) + user_header = b"iov-header-data" + + mid = msg_size // 2 + seg1 = Array(send_msg[:mid]) + seg2 = Array(send_msg[mid:]) + + requests = [ + ep.am_send_iov([seg1, seg2], user_header=user_header), + ep.am_recv(), + ] + wait_requests(worker, progress_mode, requests) + + recv_msg = requests[1].recv_buffer + recv_header = requests[1].recv_header + assert bytes(recv_msg) == send_msg + assert recv_header == user_header + + if progress_mode == "thread": + worker.stop_progress_thread() + + +def _empty_user_header_client(msg_size, progress_mode, port): + """Client that sends AM without user header and validates empty recv_header.""" + feature_flags = (ucx_api.Feature.WAKEUP, ucx_api.Feature.AM) + ctx = ucx_api.UCXContext(feature_flags=feature_flags) + worker = ucx_api.UCXWorker(ctx) + + if progress_mode == "blocking": + worker.init_blocking_progress_mode() + else: + worker.start_progress_thread() + + ep = ucx_api.UCXEndpoint.create( + worker, "127.0.0.1", port, endpoint_error_handling=True, + ) + + if progress_mode == "blocking": + worker.progress() + + send_msg = bytes(os.urandom(msg_size)) + + requests = [ + ep.am_send(Array(send_msg)), + ep.am_recv(), + ] + wait_requests(worker, progress_mode, requests) + + recv_msg = requests[1].recv_buffer + recv_header = requests[1].recv_header + assert bytes(recv_msg) == send_msg + assert recv_header == b"" + + if progress_mode == "thread": + worker.stop_progress_thread() + + +@pytest.mark.parametrize("msg_size", [10, 2**24]) +@pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) +def test_server_client_am_user_header(msg_size, progress_mode): + put_queue, get_queue = mp.Queue(), mp.Queue() + server = mp.Process( + target=_user_header_server, + args=(put_queue, get_queue, msg_size, progress_mode), + ) + server.start() + port = get_queue.get() + client = mp.Process( + target=_user_header_client, + args=(msg_size, progress_mode, port), + ) + client.start() + client.join(timeout=60) + terminate_process(client) + put_queue.put("Finished") + server.join(timeout=10) + terminate_process(server) + + +@pytest.mark.parametrize("msg_size", [10, 2**24]) +@pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) +def test_server_client_am_iov_user_header(msg_size, progress_mode): + put_queue, get_queue = mp.Queue(), mp.Queue() + server = mp.Process( + target=_user_header_server, + args=(put_queue, get_queue, msg_size, progress_mode), + ) + server.start() + port = get_queue.get() + client = mp.Process( + target=_user_header_iov_client, + args=(msg_size, progress_mode, port), + ) + client.start() + client.join(timeout=60) + terminate_process(client) + put_queue.put("Finished") + server.join(timeout=10) + terminate_process(server) + + +@pytest.mark.parametrize("msg_size", [10, 2**24]) +@pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) +def test_server_client_am_empty_user_header(msg_size, progress_mode): + """Test that recv_header is empty bytes when no user header is sent.""" + put_queue, get_queue = mp.Queue(), mp.Queue() + # Reuse the echo server that doesn't set user_header + server = mp.Process( + target=_echo_server_am_params, + args=(put_queue, get_queue, msg_size, progress_mode, None), + ) + server.start() + port = get_queue.get() + client = mp.Process( + target=_empty_user_header_client, + args=(msg_size, progress_mode, port), + ) + client.start() + client.join(timeout=60) + terminate_process(client) + put_queue.put("Finished") + server.join(timeout=10) + terminate_process(server) + + @pytest.mark.parametrize("transfer_api", ["am", "stream", "tag"]) @pytest.mark.parametrize("msg_size", [0, 10, 2**24]) @pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) diff --git a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py index b71c0a938..3cf809413 100644 --- a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py +++ b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py @@ -110,6 +110,77 @@ async def test_send_recv_am(size, recv_wait, data, memory_type_policy): await wait_listener_client_handlers(listener) +def simple_user_header_server(user_header_echo): + async def server(ep): + req = ep._ep.am_recv() + await req.wait() + recv_header = req.recv_header + recv_buffer = req.recv_buffer + # Echo back with the same user header the client sent + await ep.am_send(recv_buffer, user_header=recv_header) + user_header_echo.append(recv_header) + await ep.close() + + return server + + +@pytest.mark.asyncio +@pytest.mark.parametrize("size", msg_sizes) +async def test_send_recv_am_user_header(size): + ucxx.init() + + msg = bytearray(b"m" * size) + user_header = b"test-header-\x00\x01\xff" + + server_received_headers = [] + listener = ucxx.create_listener( + simple_user_header_server(server_received_headers) + ) + ep = await ucxx.create_endpoint(ucxx.get_address(), listener.port) + + await ep.am_send(msg, user_header=user_header) + req = ep._ep.am_recv() + await req.wait() + + recv_msg = req.recv_buffer + recv_header = req.recv_header + + assert bytes(recv_msg) == bytes(msg) + assert recv_header == user_header + + # Verify the server also received the header correctly + assert len(server_received_headers) == 1 + assert server_received_headers[0] == user_header + + await ep.close() + await wait_listener_client_handlers(listener) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("size", msg_sizes) +async def test_send_recv_am_empty_user_header(size): + """Test that recv_header is empty bytes when no user header is sent.""" + ucxx.init() + + msg = bytearray(b"m" * size) + + listener = ucxx.create_listener(simple_server(size, [], memory_type_policy=None)) + ep = await ucxx.create_endpoint(ucxx.get_address(), listener.port) + + await ep.am_send(msg) + req = ep._ep.am_recv() + await req.wait() + + recv_msg = req.recv_buffer + recv_header = req.recv_header + + assert bytes(recv_msg) == bytes(msg) + assert recv_header == b"" + + await ep.close() + await wait_listener_client_handlers(listener) + + def simple_iov_server(): async def server(ep): recv = await ep.am_recv() @@ -139,3 +210,43 @@ async def test_send_recv_am_iov(size): await ep.close() await wait_listener_client_handlers(listener) + + +def simple_iov_user_header_server(server_received_headers): + async def server(ep): + req = ep._ep.am_recv() + await req.wait() + server_received_headers.append(req.recv_header) + recv_buffer = req.recv_buffer + await ep.am_send(recv_buffer) + await ep.close() + + return server + + +@pytest.mark.asyncio +@pytest.mark.parametrize("size", iov_msg_sizes) +async def test_send_recv_am_iov_user_header(size): + ucxx.init() + + msg = bytearray(b"m" * size) + mid = size // 2 + seg1 = msg[:mid] + seg2 = msg[mid:] + user_header = b"iov-header-data" + + server_received_headers = [] + listener = ucxx.create_listener( + simple_iov_user_header_server(server_received_headers) + ) + ep = await ucxx.create_endpoint(ucxx.get_address(), listener.port) + + await ep.am_send_iov([seg1, seg2], user_header=user_header) + recv_msg = await ep.am_recv() + + assert bytes(recv_msg) == bytes(msg) + assert len(server_received_headers) == 1 + assert server_received_headers[0] == user_header + + await ep.close() + await wait_listener_client_handlers(listener) From 2a7574623f0f312bec7300653d1153e652eab51e Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Wed, 18 Feb 2026 09:37:13 -0500 Subject: [PATCH 20/26] update ucxx Python docs --- docs/ucxx/source/api.rst | 3 +++ python/ucxx/ucxx/_lib_async/endpoint.py | 11 ++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/docs/ucxx/source/api.rst b/docs/ucxx/source/api.rst index e5c6e1e87..81173b753 100644 --- a/docs/ucxx/source/api.rst +++ b/docs/ucxx/source/api.rst @@ -26,6 +26,9 @@ API _lib_async.Endpoint.close_after_n_recv _lib_async.Endpoint.get_ucp_endpoint _lib_async.Endpoint.get_ucp_worker + _lib_async.Endpoint.am_recv + _lib_async.Endpoint.am_send + _lib_async.Endpoint.am_send_iov _lib_async.Endpoint.recv _lib_async.Endpoint.send _lib_async.Endpoint.uid diff --git a/python/ucxx/ucxx/_lib_async/endpoint.py b/python/ucxx/ucxx/_lib_async/endpoint.py index 79b372b6d..49e1c3556 100644 --- a/python/ucxx/ucxx/_lib_async/endpoint.py +++ b/python/ucxx/ucxx/_lib_async/endpoint.py @@ -391,7 +391,16 @@ async def send_obj(self, obj, tag=None): await self.send(obj, tag=tag) async def am_recv(self): - """Receive from connected peer via active messages.""" + """Receive from connected peer via active messages. + + Returns the received buffer. To access the user-defined header sent + alongside the message, use the low-level request API instead:: + + req = ep._ep.am_recv() + await req.wait() + buffer = req.recv_buffer + header = req.recv_header # bytes, empty if no header was sent + """ if not self._ep.am_probe(): self._ep.raise_on_error() if self.closed: From 61045be2f41fb671a7aa16ab80e3f36d1069f575 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Wed, 18 Feb 2026 09:40:46 -0500 Subject: [PATCH 21/26] add public Python am_recv_with_header() method --- docs/ucxx/source/api.rst | 1 + python/ucxx/ucxx/_lib_async/endpoint.py | 42 ++++++++++++++----- .../_lib_async/tests/test_send_recv_am.py | 26 ++++-------- 3 files changed, 39 insertions(+), 30 deletions(-) diff --git a/docs/ucxx/source/api.rst b/docs/ucxx/source/api.rst index 81173b753..eb9a76d85 100644 --- a/docs/ucxx/source/api.rst +++ b/docs/ucxx/source/api.rst @@ -27,6 +27,7 @@ API _lib_async.Endpoint.get_ucp_endpoint _lib_async.Endpoint.get_ucp_worker _lib_async.Endpoint.am_recv + _lib_async.Endpoint.am_recv_with_header _lib_async.Endpoint.am_send _lib_async.Endpoint.am_send_iov _lib_async.Endpoint.recv diff --git a/python/ucxx/ucxx/_lib_async/endpoint.py b/python/ucxx/ucxx/_lib_async/endpoint.py index 49e1c3556..f32c24b02 100644 --- a/python/ucxx/ucxx/_lib_async/endpoint.py +++ b/python/ucxx/ucxx/_lib_async/endpoint.py @@ -390,17 +390,8 @@ async def send_obj(self, obj, tag=None): await self.send(nbytes, tag=tag) await self.send(obj, tag=tag) - async def am_recv(self): - """Receive from connected peer via active messages. - - Returns the received buffer. To access the user-defined header sent - alongside the message, use the low-level request API instead:: - - req = ep._ep.am_recv() - await req.wait() - buffer = req.recv_buffer - header = req.recv_header # bytes, empty if no header was sent - """ + async def _am_recv_request(self): + """Internal helper: receive AM request, return (buffer, request).""" if not self._ep.am_probe(): self._ep.raise_on_error() if self.closed: @@ -435,8 +426,37 @@ async def am_recv(self): and self._finished_recv_count >= self._close_after_n_recv ): self.abort() + return buffer, req + + async def am_recv(self): + """Receive from connected peer via active messages. + + Returns + ------- + buffer + The received data buffer. + + See Also + -------- + am_recv_with_header : Also returns the user-defined header. + """ + buffer, _ = await self._am_recv_request() return buffer + async def am_recv_with_header(self): + """Receive from connected peer via active messages, including + the user-defined header. + + Returns + ------- + tuple of (buffer, header) + buffer: The received data buffer. + header: bytes, the user-defined header sent by the peer. + Empty bytes if no user header was sent. + """ + buffer, req = await self._am_recv_request() + return buffer, req.recv_header + def tag_probe(self, tag=None, force_tag=False, remove=False): """Probe for tag messages without receiving them. diff --git a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py index 3cf809413..0630640cf 100644 --- a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py +++ b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py @@ -112,10 +112,7 @@ async def test_send_recv_am(size, recv_wait, data, memory_type_policy): def simple_user_header_server(user_header_echo): async def server(ep): - req = ep._ep.am_recv() - await req.wait() - recv_header = req.recv_header - recv_buffer = req.recv_buffer + recv_buffer, recv_header = await ep.am_recv_with_header() # Echo back with the same user header the client sent await ep.am_send(recv_buffer, user_header=recv_header) user_header_echo.append(recv_header) @@ -139,11 +136,7 @@ async def test_send_recv_am_user_header(size): ep = await ucxx.create_endpoint(ucxx.get_address(), listener.port) await ep.am_send(msg, user_header=user_header) - req = ep._ep.am_recv() - await req.wait() - - recv_msg = req.recv_buffer - recv_header = req.recv_header + recv_msg, recv_header = await ep.am_recv_with_header() assert bytes(recv_msg) == bytes(msg) assert recv_header == user_header @@ -168,11 +161,7 @@ async def test_send_recv_am_empty_user_header(size): ep = await ucxx.create_endpoint(ucxx.get_address(), listener.port) await ep.am_send(msg) - req = ep._ep.am_recv() - await req.wait() - - recv_msg = req.recv_buffer - recv_header = req.recv_header + recv_msg, recv_header = await ep.am_recv_with_header() assert bytes(recv_msg) == bytes(msg) assert recv_header == b"" @@ -214,10 +203,8 @@ async def test_send_recv_am_iov(size): def simple_iov_user_header_server(server_received_headers): async def server(ep): - req = ep._ep.am_recv() - await req.wait() - server_received_headers.append(req.recv_header) - recv_buffer = req.recv_buffer + recv_buffer, recv_header = await ep.am_recv_with_header() + server_received_headers.append(recv_header) await ep.am_send(recv_buffer) await ep.close() @@ -242,9 +229,10 @@ async def test_send_recv_am_iov_user_header(size): ep = await ucxx.create_endpoint(ucxx.get_address(), listener.port) await ep.am_send_iov([seg1, seg2], user_header=user_header) - recv_msg = await ep.am_recv() + recv_msg, recv_header = await ep.am_recv_with_header() assert bytes(recv_msg) == bytes(msg) + assert recv_header == user_header assert len(server_received_headers) == 1 assert server_received_headers[0] == user_header From c69bae7a7eabe09d40f718d947cc0d97633d187b Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Thu, 26 Feb 2026 17:22:19 -0500 Subject: [PATCH 22/26] lint fixes --- cpp/include/ucxx/request_data.h | 1 - python/ucxx/ucxx/_lib/libucxx.pyx | 15 +++++--- .../ucxx/_lib/tests/test_server_client.py | 35 ++++++++++++------- python/ucxx/ucxx/_lib/ucxx_api.pxd | 4 +-- python/ucxx/ucxx/_lib_async/endpoint.py | 2 +- .../_lib_async/tests/test_send_recv_am.py | 6 ++-- 6 files changed, 39 insertions(+), 24 deletions(-) diff --git a/cpp/include/ucxx/request_data.h b/cpp/include/ucxx/request_data.h index addf8b680..49b8312b0 100644 --- a/cpp/include/ucxx/request_data.h +++ b/cpp/include/ucxx/request_data.h @@ -79,7 +79,6 @@ class AmReceive { public: std::shared_ptr<::ucxx::Buffer> _buffer{nullptr}; ///< The AM received message buffer std::vector _userHeader{}; ///< User-defined header bytes from the sender. - /** * @brief Constructor for Active Message-specific receive data. * diff --git a/python/ucxx/ucxx/_lib/libucxx.pyx b/python/ucxx/ucxx/_lib/libucxx.pyx index 3b8b3bbfc..4e713afb4 100644 --- a/python/ucxx/ucxx/_lib/libucxx.pyx +++ b/python/ucxx/ucxx/_lib/libucxx.pyx @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: BSD-3-Clause @@ -24,7 +24,6 @@ from libcpp.memory cimport ( shared_ptr, static_pointer_cast, ) -from libcpp.optional cimport nullopt from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector @@ -1484,6 +1483,8 @@ cdef class UCXEndpoint(): cdef bint cuda_array = arr.cuda cdef shared_ptr[Request] req cdef AmSendParams params + cdef bytes user_header_bytes + cdef const char* user_header_ptr if not self._context_feature_flags & Feature.AM.value: raise ValueError("UCXContext must be created with `Feature.AM`") @@ -1498,7 +1499,9 @@ cdef class UCXEndpoint(): if user_header is not None: if not isinstance(user_header, bytes): raise TypeError("user_header must be bytes") - params.userHeader = user_header + user_header_bytes = user_header + user_header_ptr = user_header_bytes + params.setUserHeader(user_header_ptr, len(user_header_bytes)) with nogil: req = self._endpoint.get().amSend( @@ -1517,6 +1520,8 @@ cdef class UCXEndpoint(): cdef ucp_dt_iov_t entry cdef shared_ptr[Request] req cdef AmSendParams params + cdef bytes user_header_bytes + cdef const char* user_header_ptr if not self._context_feature_flags & Feature.AM.value: raise ValueError("UCXContext must be created with `Feature.AM`") @@ -1555,7 +1560,9 @@ cdef class UCXEndpoint(): if user_header is not None: if not isinstance(user_header, bytes): raise TypeError("user_header must be bytes") - params.userHeader = user_header + user_header_bytes = user_header + user_header_ptr = user_header_bytes + params.setUserHeader(user_header_ptr, len(user_header_bytes)) with nogil: req = self._endpoint.get().amSend( diff --git a/python/ucxx/ucxx/_lib/tests/test_server_client.py b/python/ucxx/ucxx/_lib/tests/test_server_client.py index b282b59e0..7a4233e9c 100644 --- a/python/ucxx/ucxx/_lib/tests/test_server_client.py +++ b/python/ucxx/ucxx/_lib/tests/test_server_client.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: BSD-3-Clause import multiprocessing as mp @@ -208,9 +208,7 @@ def _listener_handler(conn_request): requests = [ep[0].am_recv()] wait_requests(worker, progress_mode, requests) msg = Array(requests[0].recv_buffer) - requests = [ - ep[0].am_send(msg, memory_type_policy=memory_type_policy) - ] + requests = [ep[0].am_send(msg, memory_type_policy=memory_type_policy)] wait_requests(worker, progress_mode, requests) while True: @@ -237,7 +235,10 @@ def _echo_client_am_params(msg_size, progress_mode, memory_type_policy, port): worker.start_progress_thread() ep = ucx_api.UCXEndpoint.create( - worker, "127.0.0.1", port, endpoint_error_handling=True, + worker, + "127.0.0.1", + port, + endpoint_error_handling=True, ) if progress_mode == "blocking": @@ -268,9 +269,7 @@ def test_server_client_am_params(msg_size, progress_mode, memory_type_policy): put_queue, get_queue = mp.Queue(), mp.Queue() server = mp.Process( target=_echo_server_am_params, - args=( - put_queue, get_queue, msg_size, progress_mode, memory_type_policy - ), + args=(put_queue, get_queue, msg_size, progress_mode, memory_type_policy), ) server.start() port = get_queue.get() @@ -343,7 +342,10 @@ def _echo_client_am_iov(msg_size, progress_mode, port): worker.start_progress_thread() ep = ucx_api.UCXEndpoint.create( - worker, "127.0.0.1", port, endpoint_error_handling=True, + worker, + "127.0.0.1", + port, + endpoint_error_handling=True, ) if progress_mode == "blocking": @@ -450,7 +452,10 @@ def _user_header_client(msg_size, progress_mode, port): worker.start_progress_thread() ep = ucx_api.UCXEndpoint.create( - worker, "127.0.0.1", port, endpoint_error_handling=True, + worker, + "127.0.0.1", + port, + endpoint_error_handling=True, ) if progress_mode == "blocking": @@ -486,7 +491,10 @@ def _user_header_iov_client(msg_size, progress_mode, port): worker.start_progress_thread() ep = ucx_api.UCXEndpoint.create( - worker, "127.0.0.1", port, endpoint_error_handling=True, + worker, + "127.0.0.1", + port, + endpoint_error_handling=True, ) if progress_mode == "blocking": @@ -526,7 +534,10 @@ def _empty_user_header_client(msg_size, progress_mode, port): worker.start_progress_thread() ep = ucx_api.UCXEndpoint.create( - worker, "127.0.0.1", port, endpoint_error_handling=True, + worker, + "127.0.0.1", + port, + endpoint_error_handling=True, ) if progress_mode == "blocking": diff --git a/python/ucxx/ucxx/_lib/ucxx_api.pxd b/python/ucxx/ucxx/_lib/ucxx_api.pxd index 552eb60aa..eb9f5a6af 100644 --- a/python/ucxx/ucxx/_lib/ucxx_api.pxd +++ b/python/ucxx/ucxx/_lib/ucxx_api.pxd @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: BSD-3-Clause @@ -213,7 +213,7 @@ cdef extern from "" namespace "ucxx" nogil: ucp_datatype_t datatype ucs_memory_type_t memoryType AmSendMemoryTypePolicy memoryTypePolicy - string userHeader + void setUserHeader(const void* data, size_t size) # Using function[Buffer] here doesn't seem possible due to Cython bugs/limitations. # The workaround is to use a raw C function pointer and let it be parsed by the diff --git a/python/ucxx/ucxx/_lib_async/endpoint.py b/python/ucxx/ucxx/_lib_async/endpoint.py index f32c24b02..dfeb6b58e 100644 --- a/python/ucxx/ucxx/_lib_async/endpoint.py +++ b/python/ucxx/ucxx/_lib_async/endpoint.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2022-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: BSD-3-Clause diff --git a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py index 0630640cf..c196c6985 100644 --- a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py +++ b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION. +# SPDX-FileCopyrightText: Copyright (c) 2023-2026, NVIDIA CORPORATION. # SPDX-License-Identifier: BSD-3-Clause import asyncio @@ -130,9 +130,7 @@ async def test_send_recv_am_user_header(size): user_header = b"test-header-\x00\x01\xff" server_received_headers = [] - listener = ucxx.create_listener( - simple_user_header_server(server_received_headers) - ) + listener = ucxx.create_listener(simple_user_header_server(server_received_headers)) ep = await ucxx.create_endpoint(ucxx.get_address(), listener.port) await ep.am_send(msg, user_header=user_header) From c517f796f9061023e43605867f646ea3fbd54266 Mon Sep 17 00:00:00 2001 From: "Gregory R. Lee" Date: Thu, 26 Feb 2026 17:57:52 -0500 Subject: [PATCH 23/26] fix missing user_header argument in test cases --- python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py index c196c6985..f6cf3cc1a 100644 --- a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py +++ b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py @@ -203,7 +203,7 @@ def simple_iov_user_header_server(server_received_headers): async def server(ep): recv_buffer, recv_header = await ep.am_recv_with_header() server_received_headers.append(recv_header) - await ep.am_send(recv_buffer) + await ep.am_send(recv_buffer, user_header=recv_header) await ep.close() return server From 12ae26d315d89f9eb4a1540c1faa98dce47fb509 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 2 Apr 2026 04:05:43 -0700 Subject: [PATCH 24/26] Fix test_send_recv_am --- python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py index f6cf3cc1a..c9743201e 100644 --- a/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py +++ b/python/ucxx/ucxx/_lib_async/tests/test_send_recv_am.py @@ -113,9 +113,10 @@ async def test_send_recv_am(size, recv_wait, data, memory_type_policy): def simple_user_header_server(user_header_echo): async def server(ep): recv_buffer, recv_header = await ep.am_recv_with_header() - # Echo back with the same user header the client sent - await ep.am_send(recv_buffer, user_header=recv_header) + # Append before am_send so the client cannot observe the list before this + # (asyncio scheduling). user_header_echo.append(recv_header) + await ep.am_send(recv_buffer, user_header=recv_header) await ep.close() return server From 535f679be6ed8ee091d50f137e1aa4e3463e9447 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 16 Apr 2026 01:38:38 -0700 Subject: [PATCH 25/26] Fix docstring indentation in am_recv_with_header --- python/ucxx/ucxx/_lib_async/endpoint.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ucxx/ucxx/_lib_async/endpoint.py b/python/ucxx/ucxx/_lib_async/endpoint.py index dfeb6b58e..839312b52 100644 --- a/python/ucxx/ucxx/_lib_async/endpoint.py +++ b/python/ucxx/ucxx/_lib_async/endpoint.py @@ -450,9 +450,9 @@ async def am_recv_with_header(self): Returns ------- tuple of (buffer, header) - buffer: The received data buffer. - header: bytes, the user-defined header sent by the peer. - Empty bytes if no user header was sent. + - buffer: The received data buffer. + - header: bytes, the user-defined header sent by the peer. + Empty bytes if no user header was sent. """ buffer, req = await self._am_recv_request() return buffer, req.recv_header From 02a29cea466c38e7d59e395f0befbd3c14b39abc Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Thu, 16 Apr 2026 13:57:23 -0700 Subject: [PATCH 26/26] Ensure exceptions are reraised to test's parent process --- .../ucxx/_lib/tests/test_server_client.py | 138 ++++++++++++++---- 1 file changed, 108 insertions(+), 30 deletions(-) diff --git a/python/ucxx/ucxx/_lib/tests/test_server_client.py b/python/ucxx/ucxx/_lib/tests/test_server_client.py index 5a49d6002..0f3c8da28 100644 --- a/python/ucxx/ucxx/_lib/tests/test_server_client.py +++ b/python/ucxx/ucxx/_lib/tests/test_server_client.py @@ -267,22 +267,39 @@ def _echo_client_am_params(msg_size, progress_mode, memory_type_policy, port): @pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) def test_server_client_am_params(msg_size, progress_mode, memory_type_policy): put_queue, get_queue = mp.Queue(), mp.Queue() + server_error_q = mp.Queue() + client_error_q = mp.Queue() server = mp.Process( - target=_echo_server_am_params, - args=(put_queue, get_queue, msg_size, progress_mode, memory_type_policy), + target=run_in_subprocess, + args=( + _echo_server_am_params, + server_error_q, + put_queue, + get_queue, + msg_size, + progress_mode, + memory_type_policy, + ), ) server.start() port = get_queue.get() client = mp.Process( - target=_echo_client_am_params, - args=(msg_size, progress_mode, memory_type_policy, port), + target=run_in_subprocess, + args=( + _echo_client_am_params, + client_error_q, + msg_size, + progress_mode, + memory_type_policy, + port, + ), ) client.start() client.join(timeout=60) - terminate_process(client) + terminate_process(client, error_queue=client_error_q) put_queue.put("Finished") server.join(timeout=10) - terminate_process(server) + terminate_process(server, error_queue=server_error_q) def _echo_server_am_iov(get_queue, put_queue, msg_size, progress_mode): @@ -375,22 +392,37 @@ def _echo_client_am_iov(msg_size, progress_mode, port): @pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) def test_server_client_am_iov(msg_size, progress_mode): put_queue, get_queue = mp.Queue(), mp.Queue() + server_error_q = mp.Queue() + client_error_q = mp.Queue() server = mp.Process( - target=_echo_server_am_iov, - args=(put_queue, get_queue, msg_size, progress_mode), + target=run_in_subprocess, + args=( + _echo_server_am_iov, + server_error_q, + put_queue, + get_queue, + msg_size, + progress_mode, + ), ) server.start() port = get_queue.get() client = mp.Process( - target=_echo_client_am_iov, - args=(msg_size, progress_mode, port), + target=run_in_subprocess, + args=( + _echo_client_am_iov, + client_error_q, + msg_size, + progress_mode, + port, + ), ) client.start() client.join(timeout=60) - terminate_process(client) + terminate_process(client, error_queue=client_error_q) put_queue.put("Finished") server.join(timeout=10) - terminate_process(server) + terminate_process(server, error_queue=server_error_q) def _user_header_server(get_queue, put_queue, msg_size, progress_mode): @@ -564,44 +596,74 @@ def _empty_user_header_client(msg_size, progress_mode, port): @pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) def test_server_client_am_user_header(msg_size, progress_mode): put_queue, get_queue = mp.Queue(), mp.Queue() + server_error_q = mp.Queue() + client_error_q = mp.Queue() server = mp.Process( - target=_user_header_server, - args=(put_queue, get_queue, msg_size, progress_mode), + target=run_in_subprocess, + args=( + _user_header_server, + server_error_q, + put_queue, + get_queue, + msg_size, + progress_mode, + ), ) server.start() port = get_queue.get() client = mp.Process( - target=_user_header_client, - args=(msg_size, progress_mode, port), + target=run_in_subprocess, + args=( + _user_header_client, + client_error_q, + msg_size, + progress_mode, + port, + ), ) client.start() client.join(timeout=60) - terminate_process(client) + terminate_process(client, error_queue=client_error_q) put_queue.put("Finished") server.join(timeout=10) - terminate_process(server) + terminate_process(server, error_queue=server_error_q) @pytest.mark.parametrize("msg_size", [10, 2**24]) @pytest.mark.parametrize("progress_mode", ["blocking", "thread"]) def test_server_client_am_iov_user_header(msg_size, progress_mode): put_queue, get_queue = mp.Queue(), mp.Queue() + server_error_q = mp.Queue() + client_error_q = mp.Queue() server = mp.Process( - target=_user_header_server, - args=(put_queue, get_queue, msg_size, progress_mode), + target=run_in_subprocess, + args=( + _user_header_server, + server_error_q, + put_queue, + get_queue, + msg_size, + progress_mode, + ), ) server.start() port = get_queue.get() client = mp.Process( - target=_user_header_iov_client, - args=(msg_size, progress_mode, port), + target=run_in_subprocess, + args=( + _user_header_iov_client, + client_error_q, + msg_size, + progress_mode, + port, + ), ) client.start() client.join(timeout=60) - terminate_process(client) + terminate_process(client, error_queue=client_error_q) put_queue.put("Finished") server.join(timeout=10) - terminate_process(server) + terminate_process(server, error_queue=server_error_q) @pytest.mark.parametrize("msg_size", [10, 2**24]) @@ -609,23 +671,39 @@ def test_server_client_am_iov_user_header(msg_size, progress_mode): def test_server_client_am_empty_user_header(msg_size, progress_mode): """Test that recv_header is empty bytes when no user header is sent.""" put_queue, get_queue = mp.Queue(), mp.Queue() + server_error_q = mp.Queue() + client_error_q = mp.Queue() # Reuse the echo server that doesn't set user_header server = mp.Process( - target=_echo_server_am_params, - args=(put_queue, get_queue, msg_size, progress_mode, None), + target=run_in_subprocess, + args=( + _echo_server_am_params, + server_error_q, + put_queue, + get_queue, + msg_size, + progress_mode, + None, + ), ) server.start() port = get_queue.get() client = mp.Process( - target=_empty_user_header_client, - args=(msg_size, progress_mode, port), + target=run_in_subprocess, + args=( + _empty_user_header_client, + client_error_q, + msg_size, + progress_mode, + port, + ), ) client.start() client.join(timeout=60) - terminate_process(client) + terminate_process(client, error_queue=client_error_q) put_queue.put("Finished") server.join(timeout=10) - terminate_process(server) + terminate_process(server, error_queue=server_error_q) @pytest.mark.parametrize("transfer_api", ["am", "stream", "tag"])