Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 85 additions & 39 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1842,67 +1842,113 @@ extern "C" rmw_ret_t rmw_get_serialized_message_size(
return RMW_RET_UNSUPPORTED;
}

extern "C" rmw_ret_t rmw_serialize(
const void * ros_message,
const rosidl_message_type_support_t * type_support,
rmw_serialized_message_t * serialized_message)
{
extern "C" rmw_ret_t
rmw_serialize(const void *ros_message,
const rosidl_message_type_support_t *type_support,
rmw_serialized_message_t *serialized_message) {
// Cache the CDR writer per type_support pointer to avoid rebuilding
// make_message_value_type and make_cdr_writer on every call.
thread_local const rosidl_message_type_support_t *cached_serialize_ts =
nullptr;
thread_local std::unique_ptr<rmw_cyclonedds_cpp::BaseCDRWriter> cached_writer;

try {
auto writer = rmw_cyclonedds_cpp::make_cdr_writer(
rmw_cyclonedds_cpp::make_message_value_type(type_support));
if (type_support != cached_serialize_ts) {
cached_writer = rmw_cyclonedds_cpp::make_cdr_writer(
rmw_cyclonedds_cpp::make_message_value_type(type_support));
cached_serialize_ts = type_support;
}

auto size = writer->get_serialized_size(ros_message);
auto size = cached_writer->get_serialized_size(ros_message);
rmw_ret_t ret = rmw_serialized_message_resize(serialized_message, size);
if (RMW_RET_OK != ret) {
rmw_reset_error();
RMW_SET_ERROR_MSG("rmw_serialize: failed to allocate space for message");
return ret;
}
writer->serialize(serialized_message->buffer, ros_message);
cached_writer->serialize(serialized_message->buffer, ros_message);
serialized_message->buffer_length = size;
return RMW_RET_OK;
} catch (std::exception & e) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("rmw_serialize: failed to serialize: %s", e.what());
} catch (std::exception &e) {
cached_serialize_ts = nullptr; // Invalidate cache on error
cached_writer.reset();
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING(
"rmw_serialize: failed to serialize: %s", e.what());
Comment on lines +1875 to +1876
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still irrelevant formatting changes, it would be great to avoid them.

return RMW_RET_ERROR;
}
}

extern "C" rmw_ret_t rmw_deserialize(
const rmw_serialized_message_t * serialized_message,
const rosidl_message_type_support_t * type_support,
void * ros_message)
{
extern "C" rmw_ret_t
rmw_deserialize(const rmw_serialized_message_t *serialized_message,
const rosidl_message_type_support_t *type_support,
void *ros_message) {
// Cache the resolved typesupport and MessageTypeSupport per type_support
// pointer, mirroring the rmw_serialize thread-local writer cache.
// MessageTypeSupport's constructor runs std::regex_replace + ostringstream
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it worth to complicate the code with caching instead of fixing the root cause in the constructor by removing std::regex and std::ostringstream? E.g. something like this:

template<typename MembersType>
MessageTypeSupport<MembersType>::MessageTypeSupport(const MembersType * members)
{
  assert(members);
  this->members_ = members;

  std::string message_namespace(this->members_->message_namespace_);
  std::string message_name(this->members_->message_name_);

  if (!message_namespace.empty()) {
    std::string::size_type pos = 0;
    while ((pos = message_namespace.find("__", pos)) != std::string::npos) {
      message_namespace.replace(pos, 2, "::");
      pos += 2;
    }
  }

  std::string name;
  name.reserve(
    message_namespace.size() + 2 + 5 + message_name.size() + 1);  // "::" + "dds_::" + "_"

  if (!message_namespace.empty()) {
    name += message_namespace;
    name += "::";
  }
  name += "dds_::";
  name += message_name;
  name += '_';

  this->setName(name.c_str());
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps. I'm mostly pushing this so that @jmachowinski can take a look at it as well. This was vibe coded in response to some micro-benchmarks that came up in Zulip, so I wouldn't read too much into the actual implementation versus the general idea behind it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the TLS caching looks really good in the mcirobenchmark because you are always publishing the same message repeatedly. In the case that you alternated the message types that you are publishing, you would constantly get cache misses, so optimizing the constructor additionally makes sense.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so you did in fact manage to genuinely benchmaxx the optimization @mjcarroll, I'm impressed 😂

// on every call, which adds ~400-500 ns of overhead per message.
thread_local const rosidl_message_type_support_t *cached_deser_ts = nullptr;
thread_local bool cached_is_c = false;
thread_local std::unique_ptr<rmw_cyclonedds_cpp::MessageTypeSupport<
rosidl_typesupport_introspection_c__MessageMembers>>
cached_msgts_c;
thread_local std::unique_ptr<rmw_cyclonedds_cpp::MessageTypeSupport<
rosidl_typesupport_introspection_cpp::MessageMembers>>
cached_msgts_cpp;

bool ok;
try {
cycdeser sd(serialized_message->buffer, serialized_message->buffer_length);
const rosidl_message_type_support_t * ts;
if ((ts =
get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_c__identifier)) != nullptr)
{
auto members =
static_cast<const rosidl_typesupport_introspection_c__MessageMembers *>(ts->data);
MessageTypeSupport_c msgts(members);
ok = msgts.deserializeROSmessage(sd, ros_message, nullptr);
} else {
if ((ts =
get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_cpp::typesupport_identifier)) != nullptr)
{
auto members =
static_cast<const rosidl_typesupport_introspection_cpp::MessageMembers *>(ts->data);
MessageTypeSupport_cpp msgts(members);
ok = msgts.deserializeROSmessage(sd, ros_message, nullptr);
if (type_support != cached_deser_ts) {
const rosidl_message_type_support_t *ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_c__identifier);
if (ts != nullptr) {
auto members = static_cast<
const rosidl_typesupport_introspection_c__MessageMembers *>(
ts->data);
cached_msgts_c =
std::make_unique<rmw_cyclonedds_cpp::MessageTypeSupport<
rosidl_typesupport_introspection_c__MessageMembers>>(members);
cached_msgts_cpp.reset();
cached_deser_ts = type_support;
cached_is_c = true;
} else {
RMW_SET_ERROR_MSG("rmw_serialize: type support trouble");
return RMW_RET_ERROR;
ts = get_message_typesupport_handle(
type_support,
rosidl_typesupport_introspection_cpp::typesupport_identifier);
if (ts != nullptr) {
auto members = static_cast<
const rosidl_typesupport_introspection_cpp::MessageMembers *>(
ts->data);
cached_msgts_cpp =
std::make_unique<rmw_cyclonedds_cpp::MessageTypeSupport<
rosidl_typesupport_introspection_cpp::MessageMembers>>(
members);
cached_msgts_c.reset();
cached_deser_ts = type_support;
cached_is_c = false;
} else {
RMW_SET_ERROR_MSG("rmw_deserialize: type support trouble");
return RMW_RET_ERROR;
}
}
}
} catch (rmw_cyclonedds_cpp::Exception & e) {

cycdeser sd(serialized_message->buffer, serialized_message->buffer_length);
if (cached_is_c) {
ok = cached_msgts_c->deserializeROSmessage(sd, ros_message, nullptr);
} else {
ok = cached_msgts_cpp->deserializeROSmessage(sd, ros_message, nullptr);
}
} catch (rmw_cyclonedds_cpp::Exception &e) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("rmw_serialize: %s", e.what());
cached_deser_ts = nullptr; // Invalidate cache on error
cached_msgts_c.reset();
cached_msgts_cpp.reset();
ok = false;
} catch (std::runtime_error & e) {
} catch (std::runtime_error &e) {
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("rmw_serialize: %s", e.what());
cached_deser_ts = nullptr;
cached_msgts_c.reset();
cached_msgts_cpp.reset();
ok = false;
}

Expand Down