Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 51 additions & 25 deletions rmw_fastrtps_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <memory>

#include "fastcdr/FastBuffer.h"

#include "rmw/error_handling.h"
Expand All @@ -20,6 +22,46 @@

#include "./type_support_common.hpp"

namespace
{

// Per-thread cache: resolves and constructs the MessageTypeSupport object once per unique
// type_support pointer, avoiding repeated get_message_typesupport_handle dispatch and
// MessageTypeSupport_cpp construction on every serialize/deserialize call.
struct TypeSupportCache
{
const rosidl_message_type_support_t * input_ts = nullptr;
const message_type_support_callbacks_t * callbacks = nullptr;
std::unique_ptr<MessageTypeSupport_cpp> tss;
};

// Returns nullptr and sets the RMW error on failure.
const TypeSupportCache * get_type_support_cache(
const rosidl_message_type_support_t * type_support)
{
thread_local TypeSupportCache cache;

if (cache.input_ts != type_support) {
const rosidl_message_type_support_t * ts = get_message_typesupport_handle(
type_support, RMW_FASTRTPS_CPP_TYPESUPPORT_C);
if (!ts) {
ts = get_message_typesupport_handle(
type_support, RMW_FASTRTPS_CPP_TYPESUPPORT_CPP);
if (!ts) {
RMW_SET_ERROR_MSG("type support not from this implementation");
return nullptr;
}
}
cache.input_ts = type_support;
cache.callbacks = static_cast<const message_type_support_callbacks_t *>(ts->data);
cache.tss = std::make_unique<MessageTypeSupport_cpp>(cache.callbacks, type_support);
}

return &cache;
}

} // namespace

extern "C"
{
rmw_ret_t
Expand All @@ -28,20 +70,12 @@ rmw_serialize(
const rosidl_message_type_support_t * type_support,
rmw_serialized_message_t * serialized_message)
{
const rosidl_message_type_support_t * ts = get_message_typesupport_handle(
type_support, RMW_FASTRTPS_CPP_TYPESUPPORT_C);
if (!ts) {
ts = get_message_typesupport_handle(
type_support, RMW_FASTRTPS_CPP_TYPESUPPORT_CPP);
if (!ts) {
RMW_SET_ERROR_MSG("type support not from this implementation");
return RMW_RET_ERROR;
}
const TypeSupportCache * cache = get_type_support_cache(type_support);
if (!cache) {
return RMW_RET_ERROR;
}

auto callbacks = static_cast<const message_type_support_callbacks_t *>(ts->data);
auto tss = MessageTypeSupport_cpp(callbacks, type_support);
auto data_length = tss.getEstimatedSerializedSize(ros_message, callbacks);
auto data_length = cache->tss->getEstimatedSerializedSize(ros_message, cache->callbacks);
if (serialized_message->buffer_capacity < data_length) {
if (rmw_serialized_message_resize(serialized_message, data_length) != RMW_RET_OK) {
rmw_reset_error();
Expand All @@ -56,7 +90,7 @@ rmw_serialize(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::CdrVersion::XCDRv1);
ser.set_encoding_flag(eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR);

auto ret = tss.serializeROSmessage(ros_message, ser, callbacks);
auto ret = cache->tss->serializeROSmessage(ros_message, ser, cache->callbacks);
serialized_message->buffer_length = data_length;
serialized_message->buffer_capacity = data_length;
return ret == true ? RMW_RET_OK : RMW_RET_ERROR;
Expand All @@ -68,24 +102,16 @@ rmw_deserialize(
const rosidl_message_type_support_t * type_support,
void * ros_message)
{
const rosidl_message_type_support_t * ts = get_message_typesupport_handle(
type_support, RMW_FASTRTPS_CPP_TYPESUPPORT_C);
if (!ts) {
ts = get_message_typesupport_handle(
type_support, RMW_FASTRTPS_CPP_TYPESUPPORT_CPP);
if (!ts) {
RMW_SET_ERROR_MSG("type support not from this implementation");
return RMW_RET_ERROR;
}
const TypeSupportCache * cache = get_type_support_cache(type_support);
if (!cache) {
return RMW_RET_ERROR;
}

auto callbacks = static_cast<const message_type_support_callbacks_t *>(ts->data);
auto tss = MessageTypeSupport_cpp(callbacks, type_support);
eprosima::fastcdr::FastBuffer buffer(
reinterpret_cast<char *>(serialized_message->buffer), serialized_message->buffer_length);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN);

auto ret = tss.deserializeROSmessage(deser, ros_message, callbacks);
auto ret = cache->tss->deserializeROSmessage(deser, ros_message, cache->callbacks);
return ret == true ? RMW_RET_OK : RMW_RET_ERROR;
}

Expand Down