Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ enum SerializedDataType
{
FASTDDS_SERIALIZED_DATA_TYPE_CDR_BUFFER,
FASTDDS_SERIALIZED_DATA_TYPE_DYNAMIC_MESSAGE,
FASTDDS_SERIALIZED_DATA_TYPE_ROS_MESSAGE
FASTDDS_SERIALIZED_DATA_TYPE_ROS_MESSAGE,
// `data` points to a `rmw_serialized_message_t` (aka
// `rcutils_uint8_array_t`). The CDR payload is written directly into that
// buffer on deserialize, avoiding an intermediate FastBuffer and the
// follow-up memcpy in `_take_serialized_message`.
Comment on lines +45 to +48
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment too verbose or useful to keep?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put similar comments in the new structure I suggested in #877 (comment)

FASTDDS_SERIALIZED_DATA_TYPE_RMW_SERIALIZED_MESSAGE
};

// Publishers write method will receive a pointer to this struct
Expand Down
19 changes: 19 additions & 0 deletions rmw_fastrtps_shared_cpp/src/TypeSupport_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
#include "rmw/error_handling.h"
#include "rmw/ret_types.h"
#include "rmw/serialized_message.h"

#include "rosidl_typesupport_introspection_c/identifier.h"
#include "rosidl_typesupport_introspection_cpp/identifier.hpp"
Expand Down Expand Up @@ -206,6 +208,23 @@ bool TypeSupport::deserialize(
return true;
}

case FASTDDS_SERIALIZED_DATA_TYPE_RMW_SERIALIZED_MESSAGE:
{
// Write the CDR payload straight into the user's
// rmw_serialized_message_t. This skips the intermediate FastBuffer
// and the follow-up memcpy in `_take_serialized_message`, halving
// the rmw-level memcpys for large serialized messages.
auto out = static_cast<rmw_serialized_message_t *>(ser_data->data);
if (out->buffer_capacity < payload.length) {
if (rmw_serialized_message_resize(out, payload.length) != RMW_RET_OK) {
return false;
}
}
memcpy(out->buffer, payload.data, payload.length);
out->buffer_length = payload.length;
return true;
}

case FASTDDS_SERIALIZED_DATA_TYPE_DYNAMIC_MESSAGE:
{
auto m_type = std::make_shared<eprosima::fastdds::dds::DynamicPubSubType>();
Expand Down
26 changes: 12 additions & 14 deletions rmw_fastrtps_shared_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,15 @@ _take_serialized_message(
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(info, "custom subscriber info is null", return RMW_RET_ERROR);

eprosima::fastcdr::FastBuffer buffer;

// Point the deserialize path at the user's rmw_serialized_message_t
// directly. The RMW_SERIALIZED_MESSAGE branch in TypeSupport::deserialize
// will resize this buffer and memcpy the CDR payload into it, avoiding the
// previous intermediate FastBuffer and its follow-up memcpy into the user
// buffer.
rmw_fastrtps_shared_cpp::SerializedData data;
data.type = FASTDDS_SERIALIZED_DATA_TYPE_CDR_BUFFER;
data.data = &buffer;
data.impl = nullptr; // not used when type is FASTDDS_SERIALIZED_DATA_TYPE_CDR_BUFFER
data.type = FASTDDS_SERIALIZED_DATA_TYPE_RMW_SERIALIZED_MESSAGE;
data.data = serialized_message;
data.impl = nullptr; // not used for RMW_SERIALIZED_MESSAGE

eprosima::fastdds::dds::StackAllocatedSequence<void *, 1> data_values;
const_cast<void **>(data_values.buffer())[0] = &data;
Expand All @@ -339,15 +342,10 @@ _take_serialized_message(
continue;
}
}
auto buffer_size = static_cast<size_t>(buffer.getBufferSize());
if (serialized_message->buffer_capacity < buffer_size) {
auto ret = rmw_serialized_message_resize(serialized_message, buffer_size);
if (ret != RMW_RET_OK) {
return ret; // Error message already set
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before, it returns corresponding error code that is propagated for the failure here? but the new code conceals the reason because it just returns false without any information with it. i think this is minor but we would want to consider to add RMW_SET_ERROR_MSG("failed to resize serialized message") in FASTDDS_SERIALIZED_DATA_TYPE_RMW_SERIALIZED_MESSAGE case statement?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either that, or pass in data.data a pointer to a custom structure that has both the serialized_message and a rmw_ret_t field that is filled in the new case of the type support implementation

}
}
serialized_message->buffer_length = buffer_size;
memcpy(serialized_message->buffer, buffer.getBuffer(), serialized_message->buffer_length);
// `serialized_message->buffer` and `buffer_length` have already been
// populated by TypeSupport::deserialize via the
// FASTDDS_SERIALIZED_DATA_TYPE_RMW_SERIALIZED_MESSAGE path; nothing
// else to do here.

if (message_info) {
_assign_message_info(identifier, message_info, &info_seq[0]);
Expand Down