Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
296f94e
Use `weak_ptr<ucxx::Endpoint>` in `ErrorCallbackData`
pentschev May 5, 2024
f4b7dc4
EP cancel/non-blocking close
pentschev May 7, 2024
230b81b
Allow querying inflight request count in `ucxx::InflightRequests`
pentschev May 14, 2024
8641ec3
Allow querying inflight request count in `ucxx::Endpoint`
pentschev May 14, 2024
8f5d0b2
Add methods to initiate `ucxx::Endpoint` stop process
pentschev May 14, 2024
3c983c6
Add missing `enablePythonFuture` defaults
pentschev May 14, 2024
e87baf5
Fix missing status set in `ucxx::Endpoint::closeBlocking()`
pentschev May 14, 2024
3b22f54
Request fixes
pentschev May 14, 2024
2d54ebd
Remove request from `ucxx::InflightRequests` canceling map
pentschev May 15, 2024
27ac23f
Do not track requests that completed immediately in `cancelAll`
pentschev May 15, 2024
a687988
Fix `RequestEndpointClose` force mode
pentschev May 15, 2024
dfe975e
Endpoint cancelation tests
pentschev May 14, 2024
1fee6b4
Notify callback when all inflight/canceling requests complete
pentschev May 15, 2024
4dc81c5
Add user callback to `ucxx::Endpoint::cancelInflightRequests()`
pentschev May 15, 2024
0c2b72d
Endpoint test cancel inflight requests callback
pentschev May 15, 2024
82e6f5f
Use `scoped_lock`s in `ucxx::InflightRequests`
pentschev May 16, 2024
15dbb91
Print exception in callback warning message
pentschev May 16, 2024
621abc4
Remove `ucxx::InflightRequests::dropCanceled()`
pentschev May 16, 2024
981469a
`ucxx::InflightRequests` cleanup
pentschev May 16, 2024
dfdcb90
Add missing rejection tests and improve comments
pentschev May 16, 2024
cef478b
Update `ucxx::Endpoint::close()` docstring
pentschev May 16, 2024
5a72fe1
Update `ucxx::Requests` comments and logging
pentschev May 16, 2024
44cec65
Merge remote-tracking branch 'upstream/branch-0.38' into non-blocking…
pentschev May 16, 2024
e0206f0
Add missing callback in `ucxx::Endpoint::removeInflightRequest`
pentschev May 16, 2024
65671d2
Add `ucxx::Endpoint` reference back to `ucxx::EndpointErrorCallback`
pentschev May 17, 2024
71a8f12
Issue `ucxx::Request::cancel()` in worker thread when active
pentschev May 17, 2024
21dd0a9
Remove inflight request if its status has been already set
pentschev May 17, 2024
6d113a3
Keep track of `ucxx::Request` upon `findAndRemove()` return
pentschev May 17, 2024
110c8d1
Register `ucxx::Endpoint` during `createRequestAm()`
pentschev May 20, 2024
50072c1
Check and reset `ucxx::Request::_cancelCallback
pentschev May 20, 2024
f640255
Merge remote-tracking branch 'upstream/branch-0.41' into non-blocking…
pentschev Nov 15, 2024
cae6d04
Refactor InflightRequests
pentschev Mar 31, 2026
062d60e
Remove std::vector-based option
pentschev Mar 31, 2026
30b0d8d
Simplify to use `std::unordered_set`
pentschev Mar 31, 2026
3ee7a96
Merge upstream/main into non-blocking-ep-close-cancel
pentschev Apr 2, 2026
de5e5c5
Fix build errors
pentschev Apr 2, 2026
638b91e
Fix linting
pentschev Apr 2, 2026
ec1a3e1
Rename to VoidCallbackUserFunction
pentschev Apr 2, 2026
7bbf55d
Fix deadlock during InflightRequests::remove
pentschev Apr 2, 2026
b847374
Merge branch 'refactor-inflight-requests' into non-blocking-ep-close-…
pentschev Apr 2, 2026
0172a2d
Fix that fixes first part
pentschev Apr 2, 2026
29c991f
Supposedly fix std::bad_variant exception
pentschev Apr 3, 2026
4fe4ba8
Fix check of in progress AM requests
pentschev Apr 5, 2026
8f10274
Merge remote-tracking branch 'upstream/main' into non-blocking-ep-clo…
pentschev Apr 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 120 additions & 11 deletions cpp/include/ucxx/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,15 @@ class Endpoint : public Component {
///< that may run asynchronously on another thread.
ucs_status_t _status{UCS_INPROGRESS}; ///< Endpoint status
std::atomic<bool> _closing{false}; ///< Prevent calling close multiple concurrent times.
std::atomic<bool> _stopping{false}; ///< Signal whether endpoint is stopping.
EndpointCloseCallbackUserFunction _closeCallback{nullptr}; ///< Close callback to call
EndpointCloseCallbackUserData _closeCallbackArg{
nullptr}; ///< Argument to be passed to close callback
VoidCallbackUserFunction _cancelInflightCallback{
nullptr}; ///< The wrapper to the callback registered via `cancelInflightRequests()` that will
///< deregister once the callback is called.
VoidCallbackUserFunction _cancelInflightCallbackOriginal{
nullptr}; ///< The original user callback registered via `cancelInflightRequests()`

/**
* @brief Private constructor of `ucxx::Endpoint`.
Expand Down Expand Up @@ -271,9 +277,21 @@ class Endpoint : public Component {
* progress the worker and check the result of `getCancelingSize()`, all requests are only
* canceled when `getCancelingSize()` returns `0`.
*
* Supports an optional callback function to be called exclusively if there are no
* more requests inflight or canceling. Be advised that before the callback is called the
* mutex that controls inflight requests is released to prevent deadlocks in case the
* callback happens to register a new inflight request, therefore there's no guarantee
* that another inflight request won't be registered between the time in which the mutex
* is released and the callback is executed, the user is thus responsible to prevent such
* situations and the use of `stop()` before `cancelInflightRequests()` is highly
* advisable.
*
* @param[in] callbackFunction function to be called upon termination and only if no
* further requests inflight or canceling remain.
*
* @returns Number of requests that were scheduled for cancelation.
*/
size_t cancelInflightRequests();
size_t cancelInflightRequests(VoidCallbackUserFunction callbackFunction = nullptr);

/**
* @brief Check the number of inflight requests being canceled.
Expand All @@ -286,6 +304,16 @@ class Endpoint : public Component {
*/
[[nodiscard]] size_t getCancelingSize() const;

/**
* @brief Check the number of inflight requests waiting for completion.
*
* Check the number of inflight requests that were posted but have not yet completed nor
* have been scheduled for cancelation.
*
* @returns Number of inflight requests that are waiting for completion.
*/
size_t getInflightSize() const;

/**
* @brief Cancel inflight requests.
*
Expand Down Expand Up @@ -347,6 +375,8 @@ class Endpoint : public Component {
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @throws ucxx::RejectedError if `stop()` was already called.
*
* @note If a `callbackFunction` is specified, the lifetime of `callbackData` and of any
* other objects used in the scope of `callbackFunction` must be guaranteed by the caller
* until it executes or `isCompleted()` becomes true. The `callbackFunction` executes in
Expand Down Expand Up @@ -467,6 +497,8 @@ class Endpoint : public Component {
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @throws ucxx::RejectedError if `stop()` was already called.
*
* @note If a `callbackFunction` is specified, the lifetime of `callbackData` and of any
* other objects used in the scope of `callbackFunction` must be guaranteed by the caller
* until it executes or `isCompleted()` becomes true. The `callbackFunction` executes in
Expand Down Expand Up @@ -507,6 +539,8 @@ class Endpoint : public Component {
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @throws ucxx::RejectedError if `stop()` was already called.
*
* @note If a `callbackFunction` is specified, the lifetime of `callbackData` and of any
* other objects used in the scope of `callbackFunction` must be guaranteed by the caller
* until it executes or `isCompleted()` becomes true. The `callbackFunction` executes in
Expand Down Expand Up @@ -549,6 +583,8 @@ class Endpoint : public Component {
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @throws ucxx::RejectedError if `stop()` was already called.
*
* @note If a `callbackFunction` is specified, the lifetime of `callbackData` and of any
* other objects used in the scope of `callbackFunction` must be guaranteed by the caller
* until it executes or `isCompleted()` becomes true. The `callbackFunction` executes in
Expand Down Expand Up @@ -589,6 +625,8 @@ class Endpoint : public Component {
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @throws ucxx::RejectedError if `stop()` was already called.
*
* @note If a `callbackFunction` is specified, the lifetime of `callbackData` and of any
* other objects used in the scope of `callbackFunction` must be guaranteed by the caller
* until it executes or `isCompleted()` becomes true. The `callbackFunction` executes in
Expand Down Expand Up @@ -631,6 +669,8 @@ class Endpoint : public Component {
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @throws ucxx::RejectedError if `stop()` was already called.
*
* @param[in] buffer a raw pointer to the data to be sent.
* @param[in] length the size in bytes of the tag message to be sent.
* @param[in] enablePythonFuture whether a python future should be created and
Expand All @@ -640,7 +680,7 @@ class Endpoint : public Component {
*/
[[nodiscard]] std::shared_ptr<Request> streamSend(const void* const buffer,
size_t length,
const bool enablePythonFuture);
const bool enablePythonFuture = false);

/**
* @brief Enqueue a stream receive operation.
Expand All @@ -654,6 +694,8 @@ class Endpoint : public Component {
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @throws ucxx::RejectedError if `stop()` was already called.
*
* @param[in] buffer a raw pointer to pre-allocated memory where resulting
* data will be stored.
* @param[in] length the size in bytes of the tag message to be received.
Expand All @@ -664,7 +706,7 @@ class Endpoint : public Component {
*/
[[nodiscard]] std::shared_ptr<Request> streamRecv(void* buffer,
size_t length,
const bool enablePythonFuture);
const bool enablePythonFuture = false);

/**
* @brief Enqueue a tag send operation.
Expand All @@ -678,6 +720,8 @@ class Endpoint : public Component {
* Python future is requested, the Python application must then await on this future to
* ensure the transfer has completed. Requires UCXX Python support.
*
* @throws ucxx::RejectedError if `stop()` was already called.
*
* @note If a `callbackFunction` is specified, the lifetime of `callbackData` and of any
* other objects used in the scope of `callbackFunction` must be guaranteed by the caller
* until it executes or `isCompleted()` becomes true. The `callbackFunction` executes in
Expand Down Expand Up @@ -763,6 +807,7 @@ class Endpoint : public Component {
* ensure the transfer has completed. Requires UCXX Python support.
*
* @throws std::runtime_error if sizes of `buffer`, `size` and `isCUDA` do not match.
* @throws ucxx::RejectedError if `stop()` was already called.
*
* @param[in] buffer a vector of raw pointers to the data frames to be sent.
* @param[in] size a vector of size in bytes of each frame to be sent.
Expand All @@ -780,7 +825,7 @@ class Endpoint : public Component {
const std::vector<size_t>& size,
const std::vector<int>& isCUDA,
const Tag tag,
const bool enablePythonFuture);
const bool enablePythonFuture = false);

/**
* @brief Enqueue a multi-buffer tag receive operation.
Expand All @@ -806,7 +851,7 @@ class Endpoint : public Component {
*/
[[nodiscard]] std::shared_ptr<Request> tagMultiRecv(const Tag tag,
const TagMask tagMask,
const bool enablePythonFuture);
const bool enablePythonFuture = false);

/**
* @brief Enqueue a flush operation.
Expand Down Expand Up @@ -857,7 +902,23 @@ class Endpoint : public Component {
*
* Enqueue a non-blocking endpoint close operation, which will close the endpoint without
* requiring to destroy the object. This may be useful when other
* `std::shared_ptr<ucxx::Request>` objects are still alive, such as inflight transfers.
* `std::shared_ptr<ucxx::Request>` objects are still alive, such as inflight transfers,
* or the user wants to have more control over cancelation and closing order.
*
* @warning Unlike its `closeBlocking()` counterpart, this method does not cancel any
* inflight requests prior to submitting the UCP close request. Before scheduling the
* endpoint close request, the caller is advised to first call `stop()` to prevent new
* requests that require an active endpoint from being registered and once `stop()` is
* called, the user may call `cancelInflightRequests()` specifying a callback that can
* be used to submit a `close()` request, or may check for the number of inflight and
* canceling requests via `getInflightSize()` and `getCancelingSize()` methods,
* respectively, and issue the non-blocking `close()` of the worker once both return
* `0` or after a certain period of time has elapsed and the application cannot wait
* anymore for their completion. Note that `cancelInflightRequests()` callback is not
* guaranteed to be called, nor are `getCancelingSize()` and `getInflightSize()`
* guaranteed to go to `0` depending on the requests being handled, and thus the user is
* advised to provide a forceful termination mechanism in case the requests can never
* complete.
*
* This method returns a `std::shared<ucxx::Request>` that can be later awaited and
* checked for errors. This is a non-blocking operation, and the status of closing the
Expand Down Expand Up @@ -885,11 +946,6 @@ class Endpoint : public Component {
* in which case the callback will also execute immediately within the calling thread and
* before the method returns.
*
* @warning Unlike its `closeBlocking()` counterpart, this method does not cancel any
* inflight requests prior to submitting the UCP close request. Before scheduling the
* endpoint close request, the caller must first call `cancelInflightRequests()` and
* progress the worker until `getCancelingSize()` returns `0`.
*
* @param[in] enablePythonFuture whether a python future should be created and
* subsequently notified.
* @param[in] callbackFunction user-defined callback function to call upon completion.
Expand Down Expand Up @@ -928,6 +984,59 @@ class Endpoint : public Component {
* if worker is running a progress thread and `period > 0`.
*/
void closeBlocking(uint64_t period = 0, uint64_t maxAttempts = 1);

/**
* @brief Signal wish to close the endpoint, but does not close it.
*
* Signal the wish to close the endpoint without closing it such that no new requests can
* be issued that require the endpoint to complete. This method is useful when the user
* needs control of the non-blocking closing process with `close()`, and thus allow
* requests to complete before issuing the close request. Once this is called, the user
* may call `cancelInflightRequests()` specifying a callback that can be used to submit
* a `close()` request, or may check for the number of inflight and canceling requests
* via `getInflightSize()` and `getCancelingSize()` methods, respectively, and issue the
* non-blocking `close()` of the worker once both return `0` or after a certain period of
* time has elapsed and the application cannot wait anymore for their completion. Note
* that `cancelInflightRequests()` callback is not guaranteed to be called, nor are
* `getCancelingSize()` and `getInflightSize()` guaranteed to go to `0` depending on the
* requests being handled, and thus the user is advised to provide a forceful termination
* mechanism in case the requests can never complete.
*
* After this is called certain requests are not anymore accepted because they require a
* valid endpoint to complete. The requests that are not accepted are:
*
* - `amSend()`
* - `memGet()`
* - `memPut()`
* - `streamSend()`
* - `streamRecv()`
* - `tagSend()`
* - `tagMultiSend()`
*
* If the user attempts to call any of the methods above after this method a
* `ucxx::RejectError` exception is thrown. The user is also able to check whether this
* method was already called by checking the result of `isStopping()`.
*
* The following requests are handled by the underlying `ucxx::Worker` and only matched
* to the `ucxx::Endpoint` object and thus can still be called after the present method
* is called to allow draining them:
*
* - `amRecv()`
* - `tagRecv()`
* - `tagMultiRecv()`
*/
void stop();

/**
* @brief Check whether the endpoint was signaled the wish to close.
*
* Check whether the endpoint was signaled the wish to close after calling `stop()`. This
* result is useful to identify the stage where the endpoint finds itself, if this method
* returns `True`, the process to ultimately close the endpoint has begun.
*
* @return Whether the endpoint was signaled the wish to close.
*/
bool isStopping();
};

} // namespace ucxx
42 changes: 38 additions & 4 deletions cpp/include/ucxx/inflight_requests.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
*/
#pragma once

#include <atomic>
#include <memory>
#include <mutex>
#include <unordered_set>
#include <vector>

#include <ucxx/typedefs.h>

namespace ucxx {

class Request;
Expand All @@ -31,15 +34,16 @@ struct TrackedRequests {
* Handle tracked requests, providing functionality so that its owner can modify those
* requests, performing operations such as insertion, removal and cancelation.
*
* Uses `std::unordered_map<Request*, shared_ptr<Request>>` for O(1) amortized
* insert/remove that scales to thousands of concurrent inflight requests.
* Uses `std::unordered_set<shared_ptr<Request>>` for O(1) amortized insert/remove that
* scales to thousands of concurrent inflight requests.
*/
class InflightRequests {
private:
std::unordered_set<std::shared_ptr<Request>> _inflight{};
std::unordered_set<std::shared_ptr<Request>> _canceling{};

std::mutex _mutex{};
std::atomic<bool> _cancelAllInProgress{false};

public:
/**
Expand Down Expand Up @@ -91,19 +95,39 @@ class InflightRequests {
* be called when a request has completed and the `InflightRequests` owner does not need
* to keep track of it anymore.
*
* Supports an optional callback function to be called exclusively if there are no
* more requests inflight or canceling. Be advised that before the callback is called the
* mutex that controls inflight requests is released to prevent deadlocks in case the
* callback happens to register a new inflight request, therefore there's no guarantee
* that another inflight request won't be registered between the time in which the mutex
* is released and the callback is executed.
*
* @param[in] request shared pointer to the request
* @param[in] callbackFunction function to be called upon termination and only if no
* further requests inflight or canceling remain.
*/
void remove(const std::shared_ptr<Request>& request);
void remove(const std::shared_ptr<Request>& request,
VoidCallbackUserFunction callbackFunction = nullptr);

/**
* @brief Issue cancelation of all inflight requests and clear the internal container.
*
* Issue cancelation of all inflight requests known to this object and clear the
* internal container. The total number of canceled requests is returned.
*
* Supports an optional callback function to be called exclusively if there are no
* more requests inflight or canceling. Be advised that before the callback is called the
* mutex that controls inflight requests is released to prevent deadlocks in case the
* callback happens to register a new inflight request, therefore there's no guarantee
* that another inflight request won't be registered between the time in which the mutex
* is released and the callback is executed.
*
* @param[in] callbackFunction function to be called upon termination and only if no
* further requests inflight or canceling remain.
*
* @returns The total number of canceled requests.
*/
size_t cancelAll();
size_t cancelAll(VoidCallbackUserFunction callbackFunction = nullptr);

/**
* @brief Releases the internally-tracked containers.
Expand All @@ -127,6 +151,16 @@ class InflightRequests {
* @returns The count of requests that are in process of cancelation.
*/
[[nodiscard]] size_t getCancelingSize();

/**
* @brief Get count of inflight requests.
*
* Get the count of inflight requests that have not yet completed nor have been scheduled
* for cancelation.
*
* @returns The count of inflight requests.
*/
[[nodiscard]] size_t getInflightSize();
};

} // namespace ucxx
Loading
Loading