Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9d4e4dc
add typedefs for AmSendMemoryTypePolicy and AmSendParams
grlee77 Feb 17, 2026
8e8b02a
add contiguous + IOV constructors for AM requests
grlee77 Feb 17, 2026
f7c8210
add Endpoint overloads for AmSendParams and std::vector<ucp_dt_iov_t>…
grlee77 Feb 17, 2026
88b6cc9
add test cases for host IOV AM send
grlee77 Feb 17, 2026
c089729
remove const from AmReceiverCallbackInfo members so copy and move con…
grlee77 Feb 17, 2026
87c169c
Fix segfault in IOV sends (request_am.cpp)
grlee77 Feb 17, 2026
f4c69e2
update failing test case
grlee77 Feb 17, 2026
f4bad74
support opaque userHeader field in AmHeader
grlee77 Feb 18, 2026
53b3cd2
document header segment size limitation in typedefs.h
grlee77 Feb 18, 2026
e50506e
update comment to clarify purpose of _count
grlee77 Feb 18, 2026
1f6604d
change iov argument to AmSend from const ref to pass by value (then m…
grlee77 Feb 25, 2026
5e00989
Merge branch 'main' into grelee/ucxx-iov-updates-cpp
grlee77 Feb 25, 2026
2e3c043
Update cpp/src/request_data.cpp
grlee77 Feb 26, 2026
a8f7f58
lint fixes
grlee77 Feb 26, 2026
49634b8
fix use after move
grlee77 Feb 26, 2026
382ab84
change userHeader to std::vector<std::byte>
grlee77 Feb 26, 2026
201a127
add Python bindings support for am_send with memory_type_policy
grlee77 Feb 18, 2026
d6f55ce
add Endpoint.am_send_iov to Python API
grlee77 Feb 18, 2026
5abba24
update Python AM APIs with user_header support
grlee77 Feb 18, 2026
4d395a0
add user_header test cases
grlee77 Feb 18, 2026
2a75746
update ucxx Python docs
grlee77 Feb 18, 2026
61045be
add public Python am_recv_with_header() method
grlee77 Feb 18, 2026
c69bae7
lint fixes
grlee77 Feb 26, 2026
c517f79
fix missing user_header argument in test cases
grlee77 Feb 26, 2026
6ebe29d
Merge remote-tracking branch 'upstream/main' into grelee/ucxx-iov-upd…
pentschev Mar 3, 2026
92e8aac
Merge remote-tracking branch 'upstream/main' into grelee/ucxx-iov-upd…
pentschev Mar 3, 2026
efa8f07
Merge branch 'main' into grelee/ucxx-iov-updates-python
pentschev Apr 2, 2026
12ae26d
Fix test_send_recv_am
pentschev Apr 2, 2026
535f679
Fix docstring indentation in am_recv_with_header
pentschev Apr 16, 2026
26f0f71
Merge remote-tracking branch 'upstream/main' into grelee/ucxx-iov-upd…
pentschev Apr 16, 2026
9c00c98
Merge remote-tracking branch 'upstream/main' into grelee/ucxx-iov-upd…
pentschev Apr 16, 2026
02a29ce
Ensure exceptions are reraised to test's parent process
pentschev Apr 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/ucxx/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ API
_lib_async.Endpoint.close_after_n_recv
_lib_async.Endpoint.get_ucp_endpoint
_lib_async.Endpoint.get_ucp_worker
_lib_async.Endpoint.am_recv
_lib_async.Endpoint.am_recv_with_header
_lib_async.Endpoint.am_send
_lib_async.Endpoint.am_send_iov
_lib_async.Endpoint.recv
_lib_async.Endpoint.send
_lib_async.Endpoint.uid
Expand Down
104 changes: 100 additions & 4 deletions python/ucxx/ucxx/_lib/libucxx.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ from libcpp.memory cimport (
shared_ptr,
static_pointer_cast,
)
from libcpp.optional cimport nullopt
from libcpp.string cimport string
from libcpp.string_view cimport string_view
from libcpp.utility cimport move
Expand Down Expand Up @@ -340,6 +339,11 @@ class Feature(enum.Enum):
AM = UCP_FEATURE_AM


class PythonAmSendMemoryTypePolicy(enum.Enum):
FallbackToHost = <int>AmSendMemoryTypePolicy.FallbackToHost
ErrorOnUnsupported = <int>AmSendMemoryTypePolicy.ErrorOnUnsupported


class PythonRequestNotifierWaitState(enum.Enum):
Ready = RequestNotifierWaitState.Ready
Timeout = RequestNotifierWaitState.Timeout
Expand Down Expand Up @@ -994,6 +998,20 @@ cdef class UCXRequest():
elif bufType == BufferType.Host:
return _get_host_buffer(<uintptr_t><void*>buf.get())

@property
def recv_header(self) -> bytes:
"""Get the user-defined header from an AM receive request.

Returns the opaque header bytes sent by the peer. Returns empty bytes
if no user header was sent or for non-AM requests.
"""
cdef string header

with nogil:
header = self._request.get().getRecvHeader()

return <bytes>header

def is_completed(self) -> bool:
warnings.warn(
"UCXRequest.is_completed() is deprecated and will soon be removed, "
Expand Down Expand Up @@ -1451,21 +1469,99 @@ cdef class UCXEndpoint():

return ep_matched

def am_send(self, Array arr) -> UCXRequest:
def am_send(
self, Array arr, memory_type_policy=None, user_header=None
) -> UCXRequest:
cdef void* buf = <void*>arr.ptr
cdef size_t nbytes = arr.nbytes
cdef bint cuda_array = arr.cuda
cdef shared_ptr[Request] req
cdef AmSendParams params
cdef bytes user_header_bytes
cdef const char* user_header_ptr

if not self._context_feature_flags & Feature.AM.value:
raise ValueError("UCXContext must be created with `Feature.AM`")

params.memoryType = (
UCS_MEMORY_TYPE_CUDA if cuda_array else UCS_MEMORY_TYPE_HOST
)
if memory_type_policy is not None:
params.memoryTypePolicy = (
<AmSendMemoryTypePolicy>memory_type_policy.value
)
if user_header is not None:
if not isinstance(user_header, bytes):
raise TypeError("user_header must be bytes")
user_header_bytes = user_header
user_header_ptr = user_header_bytes
params.setUserHeader(<const void*>user_header_ptr, len(user_header_bytes))

with nogil:
req = self._endpoint.get().amSend(
buf,
nbytes,
UCS_MEMORY_TYPE_CUDA if cuda_array else UCS_MEMORY_TYPE_HOST,
nullopt,
params,
self._enable_python_future
)

return UCXRequest(<uintptr_t><void*>&req, self._enable_python_future)

def am_send_iov(
self, list arrays, memory_type_policy=None, user_header=None
) -> UCXRequest:
cdef vector[ucp_dt_iov_t] iov_vec
cdef ucp_dt_iov_t entry
cdef shared_ptr[Request] req
cdef AmSendParams params
cdef bytes user_header_bytes
cdef const char* user_header_ptr

if not self._context_feature_flags & Feature.AM.value:
raise ValueError("UCXContext must be created with `Feature.AM`")

if len(arrays) == 0:
raise ValueError("IOV segment list must not be empty")

cdef list wrapped = []
for buf in arrays:
if not isinstance(buf, Array):
buf = Array(buf)
wrapped.append(buf)

# Validate all segments have the same memory type
cdef bint first_cuda = (<Array>wrapped[0]).cuda
for arr_obj in wrapped:
if (<Array>arr_obj).cuda != first_cuda:
raise ValueError(
"All IOV segments must have the same memory type "
"(all host or all CUDA)"
)

for arr_obj in wrapped:
entry.buffer = <void*>(<Array>arr_obj).ptr
entry.length = (<Array>arr_obj).nbytes
iov_vec.push_back(entry)

params.datatype = UCP_DATATYPE_IOV
params.memoryType = (
UCS_MEMORY_TYPE_CUDA if first_cuda else UCS_MEMORY_TYPE_HOST
)
if memory_type_policy is not None:
params.memoryTypePolicy = (
<AmSendMemoryTypePolicy>memory_type_policy.value
)
if user_header is not None:
if not isinstance(user_header, bytes):
raise TypeError("user_header must be bytes")
user_header_bytes = user_header
user_header_ptr = user_header_bytes
params.setUserHeader(<const void*>user_header_ptr, len(user_header_bytes))

with nogil:
req = self._endpoint.get().amSend(
move(iov_vec),
params,
self._enable_python_future
)

Expand Down
Loading
Loading