Skip to content

Latest commit

 

History

History
151 lines (121 loc) · 35.2 KB

File metadata and controls

151 lines (121 loc) · 35.2 KB

ROS 2 Native Buffer Feature — Summary

Overall Summary

The rolling-native-buffer branch adds a proof-of-concept native Buffer feature to ROS 2, allowing uint8[] message fields (e.g., image data) to be backed by vendor-specific memory (CPU, GPU, etc.) instead of always using std::vector. A new rosidl::Buffer<T> type replaces std::vector for these fields while remaining backward-compatible. Buffer backends are discovered at runtime via pluginlib, and the serialization and middleware layers are extended so that when a publisher and subscriber share a common non-CPU backend, data can be transferred via a lightweight descriptor rather than copying through CPU memory. When backends are incompatible, the system gracefully falls back to standard CPU serialization. To preserve transport and rosbag2 compatibility, CPU-backed Buffer serialization is wire-compatible with legacy uint8[]/std::vector<uint8_t> bytes, while descriptor-backed payloads are disambiguated by a first-word marker (kBufferDescriptorMarker = 0xFFFFFFFF). Subscribers can restrict which buffer backends they accept via a new subscription option acceptable_buffer_backends (rmw and rclcpp/rclpy); the default is "cpu" (CPU-only) for backward compatibility — subscriptions must explicitly opt in to non-CPU backends via "any" (all installed) or specific backend names. Buffer-aware messages always use the per-endpoint route regardless of the backend setting; CPU-only subscriptions advertise "cpu" so publishers fall back to CPU serialization. The generic subscription used by ros2 bag record also forces "cpu" so recorded serialized messages are always CPU-based and suitable for direct storage in bag files. rclpy support is added via rosidl_python: Python message types accept rosidl_buffer.Buffer for uint8[] fields, and the C sequence struct uses an is_rosidl_buffer flag to pass vendor-backed buffer pointers between Python and C without copying. The Python Buffer class is a lightweight wrapper for vendor-backed (non-CPU) data only; CPU-based buffer data is always delivered to rclpy subscribers as plain array.array('B') by the C→Python conversion layer, so existing rclpy code continues to work unchanged and legacy publishers/subscribers using array.array interoperate with Buffer-aware nodes. Three RMW implementations have buffer support: rmw_zenoh_cpp (per-endpoint Zenoh subscriptions), rmw_fastrtps_cpp (per-endpoint DDS DataWriters/DataReaders with user_data-based backend discovery), and rmw_cyclonedds_cpp (CPU-based fallback via introspection typesupport).

Repositories Changed

  • src/ros2/rosidl — Modified (core buffer packages rosidl_buffer, rosidl_buffer_backend, rosidl_buffer_backend_registry, rosidl_buffer_py now live here; plus existing rosidl sub-packages modified for Buffer support)
  • src/ros2/rcl_buffer — New repository (test and demo packages: test_rosidl_buffer, demo_buffer_backend)
  • src/ros2/rmw — Modified (subscription option acceptable_buffer_backends)
  • src/ros2/rmw_zenoh — Modified (1 commit: "Add support for native Buffer type")
  • src/ros2/rmw_fastrtps — Modified (buffer-aware per-endpoint pub/sub with DDS user_data discovery)
  • src/ros2/rclcpp — Modified (subscription options + generic subscription default)
  • src/ros2/rclpy — Modified (expose acceptable_buffer_backends in create_subscription)
  • src/ros2/rosidl_typesupport_fastrtps — Modified (1 commit: "Add support for native Buffer type")
  • src/ros2/rosidl_python — Modified (adds native Buffer support in rclpy + array.array compatibility)
  • src/ros2/rosidl_runtime_py — Modified (Buffer treated like array.array in set_message / convert)
  • src/ros2/rmw_cyclonedds — Modified (rosidl::Buffer CPU-based fallback for introspection typesupport)

Per-Package Changes

src/ros2/rosidl/rosidl_buffer (new)

  • Core Buffer<T> container class — a drop-in std::vector<T> replacement backed by a polymorphic BufferImplBase<T> with CpuBufferImpl<T> as the default.
  • BufferImplBase<T> — minimal abstract base with only the virtuals the pimpl and serialization layer need: get_backend_type() (backend identifier, e.g. "cpu", "cuda"), size() (element count, all backends), to_cpu(), clone(). Each concrete implementation is the single source of truth for its own backend type. Backend-specific APIs (element access, resize, iterators, storage handles, descriptor serialization) are the responsibility of each concrete implementation (e.g. CpuBufferImpl::get_storage()) or the BufferBackend plugin (e.g. create_descriptor_with_endpoint(), from_descriptor_with_endpoint()).
  • Buffer<T> API split: size() and empty() work for all backends (delegate to the virtual BufferImplBase::size()). All other std::vector-compatible APIs — element access (operator[], at, front, back, data), iterators, capacity (reserve, capacity, shrink_to_fit), and modifiers (assign, resize, clear, push_back, pop_back, emplace_back) — are CPU-backend-only and go directly through CpuBufferImpl::get_storage(), throwing std::runtime_error for non-CPU backends. Backend management APIs (get_backend_type(), get_impl(), to_vector()) and copy/move semantics work for all backends. The backend implementation is set once at construction time via Buffer(unique_ptr<BufferImplBase<T>>); there is no post-construction setter, eliminating race conditions with concurrent reads. get_backend_type() delegates to the underlying implementation (impl_->get_backend_type()).
  • C helpers for introspection (c_helpers.h, c_helpers.cpp): rosidl_buffer_uint8_size(), rosidl_buffer_uint8_data() / rosidl_buffer_uint8_data_mut(), rosidl_buffer_uint8_resize(), rosidl_buffer_uint8_throw_if_not_cpu() — allow C typesupport introspection (e.g. rmw_cyclonedds) to read/write Buffer size and CPU data; non-CPU backends throw when accessed. Used by rosidl_typesupport_introspection_c generated code for uint8[] fields.

src/ros2/rosidl/rosidl_buffer_backend (new)

  • Abstract BufferBackend plugin interface that vendors implement to provide custom memory backends (descriptor creation, endpoint lifecycle hooks). The interface requires: get_backend_type(), create_descriptor_with_endpoint(), from_descriptor_with_endpoint(), and optional hooks get_backend_aux_info(), on_creating_endpoint(), on_discovering_endpoint().
  • create_descriptor_with_endpoint() returns nullptr when the backend cannot handle the given endpoint (e.g., peer does not support that backend), signaling the serialization layer to fall back to CPU-based std::vector serialization. Backends are expected to track per-endpoint compatibility internally (e.g., caching the result of on_discovering_endpoint()).
  • This package does not depend on rosidl_typesupport_fastrtps_cpp — it only depends on rmw (for endpoint info types). Descriptor serializer registration is handled directly by backends via rosidl_typesupport_fastrtps_cpp::register_buffer_descriptor<DescriptorMsgT>(backend_name) (see rosidl_typesupport_fastrtps_cpp).

src/ros2/rosidl/rosidl_buffer_backend_registry (new)

  • Singleton registry using pluginlib to discover and load BufferBackend plugins at runtime.
  • Buffer backend loader (buffer_backend_loader.hpp/cpp): provides initialize_buffer_backends() and shutdown_buffer_backends() — consolidated functions (previously duplicated in each RMW) that load backend plugins, populate the global BufferDescriptorOps and DescriptorSerializers maps in rosidl_typesupport_fastrtps_cpp, and clean up on shutdown. RMW implementations call these during rmw_init/rmw_shutdown instead of maintaining their own copies. Depends on rosidl_typesupport_fastrtps_cpp.

src/ros2/rcl_buffer/demo_buffer_backend/demo_buffer_backend, demo_buffer, demo_buffer_backend_msgs (new)

  • A reference demo backend plugin with its buffer implementation and descriptor message, used for testing the plugin system end-to-end.
  • DemoBufferBackend caches per-endpoint compatibility internally (from on_discovering_endpoint() via a GID-keyed hash map) and returns nullptr from create_descriptor_with_endpoint() when the peer does not support the demo backend, triggering CPU fallback in the serialization layer.

src/ros2/rosidl/rosidl_buffer_py (new) — rclpy Buffer bindings

  • Python bindings (pybind11) for rosidl::Buffer, exposing the Buffer class and helpers used by rosidl_generator_py: _get_buffer_ptr() (Python → C) and _take_buffer_from_ptr() (C → Python, takes ownership).
  • Minimal Buffer class: The Python Buffer is a lightweight wrapper for vendor-backed (non-CPU) data only. CPU-based buffer data is always delivered to rclpy subscribers as plain array.array('B') by the C→Python conversion layer, so Buffer does not need array.array API compatibility. Buffer exposes only: backend_type (property), to_bytes() (for diagnostics/logging), is_cpu (property), __len__(), and __repr__(). Users interact with non-CPU buffers through vendor-specific APIs. Existing rclpy code using array.array continues to work unchanged.

src/ros2/rcl_buffer/demo_buffer_backend/demo_buffer_py (new)

  • Python bindings for the demo backend: factory to create rosidl_buffer.Buffer instances backed by DemoBufferImpl for use from rclpy nodes.

src/ros2/rcl_buffer/test_rosidl_buffer (new)

  • Integration tests (1-pub/1-sub, 1-pub/2-sub, 2-pub/2-sub) verifying buffer transfer for both CPU and demo backends.
  • Parameterized subscriber nodes: Both C++ (demo_backend_image_subscriber_node.cpp) and Python (rclpy_image_subscriber.py) subscriber nodes declare an acceptable_buffer_backends ROS parameter (default "__default__" sentinel). When set to a non-sentinel value, the parameter is passed to create_subscription options, allowing launch tests to exercise different backend configurations.
  • acceptable_buffer_backends tests: Launch tests covering subscriber backend option combinations — test_1pub_1sub_demo_to_demo_sub_any.py (sub="any"), test_1pub_1sub_demo_to_demo_sub_demo.py (sub="demo"), test_1pub_1sub_cpu_to_cpu_sub_empty.py (sub=""), test_1pub_1sub_cpu_to_cpu_sub_cpu.py (sub="cpu"), test_1pub_1sub_cpu_to_cpu_sub_default.py (sub=default), test_1pub_1sub_demo_to_cpu_sub_empty.py (pub=demo, sub=""), test_1pub_1sub_demo_to_cpu_sub_cpu.py (pub=demo, sub="cpu"), test_1pub_1sub_demo_to_cpu_sub_default.py (pub=demo, sub=default). All existing demo-to-demo tests set acceptable_buffer_backends: 'any' on their subscribers so the demo backend is accepted.
  • Buffer & compat tests: test_buffer_array_compat.py (unit tests for Buffer API: type identity, backend_type, to_bytes, len, repr, is_buffer; integration with sensor_msgs/Image and rosidl_runtime_py convert/set_message utilities); launch tests for legacy array.array publisher/subscriber with demo Buffer publisher/subscriber in both directions and cross-language (rclcpp demo pub → rclpy legacy array sub).
  • Cyclone DDS tests: test_1pub_1sub_cpu_to_cpu_cyclonedds.py, test_1pub_1sub_demo_to_cpu_cyclonedds.py, test_1pub_2sub_cpu_to_cpu_cyclonedds.py — integration tests for Buffer with rmw_cyclonedds (introspection typesupport).
  • FastRTPS tests: test_1pub_1sub_cpu_to_cpu_fastrtps.py, test_1pub_1sub_demo_to_demo_fastrtps.py, test_1pub_2sub_cpu_to_cpu_fastrtps.py, test_1pub_2sub_demo_to_demo_fastrtps.py, test_2pub_2sub_cpu_to_cpu_fastrtps.py, test_2pub_2sub_demo_to_demo_fastrtps.py — integration tests for Buffer with rmw_fastrtps_cpp (per-endpoint DataWriter/DataReader). Uses max_publish_count=50 (vs 5 for Zenoh/CycloneDDS) to account for DDS discovery latency. test_1pub_1sub_demo_to_cpu_fastrtps.py exists but is not registered pending mixed-backend support.

src/ros2/rmw/rmw (modified) — subscription option for buffer backends

  • rmw_subscription_options_t (types.h): New field acceptable_buffer_backends (const char *): comma-separated list of acceptable buffer backend names. NULL, empty string, or "cpu" means only the CPU-based buffer backend is accepted (backward-compatible default). "any" means all installed backends are accepted. Specific backend names (e.g. "cuda,demo") restrict to those backends.
  • Defaults (subscription_options.c): rmw_get_default_subscription_options() sets acceptable_buffer_backends = "cpu".
  • Test (test_subscription_options.cpp): Asserts default has acceptable_buffer_backends equal to "cpu" (via EXPECT_STREQ).

src/ros2/rclcpp/rclcpp (modified) — subscription options + generic subscription

  • Subscription options (subscription_options.hpp): SubscriptionOptionsBase (and thus SubscriptionOptionsWithAllocator) gains acceptable_buffer_backends (std::string, default "cpu"). "cpu" or empty means only the CPU backend is accepted (backward-compatible default); "any" means all installed backends; comma-separated for specific backends (e.g. "cuda,demo"). to_rcl_subscription_options() copies it into rmw_subscription_options.acceptable_buffer_backends when non-empty.
  • Generic subscription (generic_subscription.hpp): Constructor uses a copy of the options with acceptable_buffer_backends forced to "cpu" via force_cpu_buffer_backend_(options), so serialized messages received are always CPU-based — used by ros2 bag record for direct storage in bag files.

src/ros2/rclpy/rclpy (modified) — expose acceptable_buffer_backends

  • create_subscription (node.py): New optional parameter acceptable_buffer_backends: Optional[str] = 'cpu'; documented as comma-separated list of acceptable buffer backend names ("cpu" or empty for CPU-only, "any" for all installed backends). Passed through to the C extension.
  • C extension (subscription.cpp, subscription.hpp): Subscription creation accepts acceptable_buffer_backends (py::object); when not None, casts to string and sets subscription_ops.rmw_subscription_options.acceptable_buffer_backends.

src/ros2/rosidl/rosidl_generator_cpp (modified)

  • Code generator now emits rosidl::Buffer<uint8_t> instead of std::vector<uint8_t> for uint8[] fields.

src/ros2/rosidl/rosidl_runtime_c (modified) — C sequence struct for Buffer

  • Primitive sequence struct (primitives_sequence.h): Each sequence type gains an is_rosidl_buffer field (bool). When true, data points to an rosidl::Buffer<T>* and must not be freed by sequence code.
  • Sequence functions (primitives_sequence_functions.c): init sets is_rosidl_buffer = false; fini checks is_rosidl_buffer and, if true, clears the sequence without deallocating data; copy propagates is_rosidl_buffer.

src/ros2/rosidl/rosidl_generator_c (modified) — rclpy C message layout

  • Message fini (msg__functions.c.em): For messages with uint8[] fields, generates fini logic that, when msg->field.is_rosidl_buffer is true, clears the sequence fields without calling the normal sequence fini (avoids freeing a borrowed rosidl::Buffer* used by the Python C extension).

src/ros2/rosidl/rosidl_runtime_cpp (modified)

  • Added trait specializations for Buffer<T> and a dependency on rosidl_buffer.

src/ros2/rosidl/rosidl_typesupport_introspection_c (modified) — Buffer support for introspection

  • MessageMember (message_introspection.h): New field is_rosidl_buffer_ (bool); when true, the member is a uint8[] backed by rosidl::Buffer<uint8_t> (C side uses sequence with is_rosidl_buffer). New field on MessageMembers: has_buffer_fields_.
  • Generated type support (msg__type_support.c.em): For uint8[] fields, generates size_function / get/resize/fetch/assign that call rosidl_buffer C helpers (rosidl_buffer_uint8_size, rosidl_buffer_uint8_data, rosidl_buffer_uint8_data_mut, rosidl_buffer_uint8_resize) when member->is_rosidl_buffer; otherwise uses plain sequence. Adds dependency on rosidl_buffer. Introspection accessors throw for non-CPU backends.

src/ros2/rosidl/rosidl_typesupport_introspection_cpp (modified) — Buffer support for introspection

  • MessageMember (message_introspection.hpp): New field is_rosidl_buffer_ (bool); when true, the member is rosidl::Buffer<uint8_t>. New field on MessageMembers: has_buffer_fields_.
  • Generated type support (msg__type_support.cpp.em): For uint8[] fields, generates size_function, resize_function, get/fetch/assign that operate on rosidl::Buffer<uint8_t>; sets is_rosidl_buffer_ in the member array. Adds dependency on rosidl_buffer. Introspection accessors throw for non-CPU backends.

src/ros2/rosidl/rosidl_typesupport_introspection_tests (modified)

  • UnboundedSequences test: Extended to cover message introspection with uint8[] (Buffer) fields; depends on updated introspection C/C++ packages.

src/ros2/rosidl_python/rosidl_generator_py (modified) — rclpy Buffer support

  • Python message setters (_msg.py.em): For uint8[] fields, allow assigning from rosidl_buffer.Buffer (optional import) or plain array.array; existing code that sets msg.data = array.array('B', ...) continues to work unchanged.
  • Python ↔ C conversion (_msg_support.c.em): Messages with uint8[] fields use the sequence's is_rosidl_buffer flag: when true, data holds a borrowed rosidl::Buffer<uint8_t>* instead of a malloc'd array. Python → C accepts both Buffer (non-CPU path) and array.array (normal byte copy path).
  • Python → C: For non-CPU rosidl_buffer.Buffer objects, call rosidl_buffer._rosidl_buffer_py._get_buffer_ptr() and set the C++ Buffer pointer plus is_rosidl_buffer = true so the C++/RMW path uses the existing buffer without copying.
  • C → Python: When the RMW deserializes into a vendor-backed buffer, the C struct has is_rosidl_buffer == true; the generated code calls rosidl_buffer._rosidl_buffer_py._take_buffer_from_ptr() to wrap the pointer in a Python Buffer (taking ownership) and set it on the message, then clears is_rosidl_buffer so message fini does not free it.

src/ros2/rosidl_runtime_py/rosidl_runtime_py (modified) — Buffer handling in utilities

  • set_message (set_message.py): When setting message fields from a dict, if the current field is an rosidl_buffer.Buffer, construct an array.array('B', field_value) to replace it so set_message_fields() works with messages whose uint8[] fields are Buffers.
  • convert (convert.py): In message_to_yaml and related helpers, detect rosidl_buffer.Buffer by type(val) is _RosidlBuffer and use val.to_bytes() to convert to a list of ints for abbreviation and display, so serialization/display of messages with Buffer fields works.

src/ros2/rosidl_typesupport_fastrtps/rosidl_typesupport_fastrtps_cpp (modified)

  • Extended the type support callbacks struct with has_buffer_fields flag and endpoint-aware serialize/deserialize function pointers.
  • Added buffer_serialization.hpp with global registries for backend descriptor operations and FastCDR serializers, plus template helpers for Buffer serialization. DescriptorSerializers::serialize/deserialize carry endpoint_info so that nested Buffer fields inside descriptors get endpoint-aware serialization (e.g., a cuda backend's device_data descriptor field can use IPC instead of forcing a CPU fallback through the generic operator<<).
  • Descriptor registration: register_buffer_descriptor<DescriptorMsgT>(backend_name) (in register_buffer_descriptor.hpp) lets backends register using existing rosidl-generated type support (no separate registration libs); backends call rosidl_typesupport_fastrtps_cpp::register_buffer_descriptor<T>() in their constructor, and the RMW only verifies the serializers map is populated. The registered lambdas prefer cdr_serialize_with_endpoint/cdr_deserialize_with_endpoint when available on the type support callbacks, falling back to cdr_serialize/cdr_deserialize for backward compatibility.
  • Backend-driven compatibility: serialize_buffer_with_endpoint() calls the backend's create_descriptor_with_endpoint() before writing any wire format bytes; if the backend returns nullptr (incompatible endpoint), the buffer is serialized as std::vector<T> via to_vector() (CPU fallback). This eliminates the previous EndpointCompatibilityResolver / thread-local compatibility guard mechanism — each backend is now the single source of truth for endpoint compatibility.
  • Wire compatibility update for uint8[]: CPU (or fallback) Buffer serialization writes exactly legacy std::vector<uint8_t> bytes (uint32 length + raw data) with no backend string prefix.
  • Descriptor disambiguation marker: descriptor-backed Buffer payloads now start with a first-word marker (kBufferDescriptorMarker = 0xFFFFFFFF) followed by backend_type string, then the serialized descriptor; deserialization peeks the first uint32 — if it equals the marker, the descriptor path is taken; any other value is treated as a legacy sequence length. The backend_type key is used to look up both the BufferDescriptorOps and DescriptorSerializers.
  • operator<< / operator>> for Buffer<T> are aligned with legacy CPU-vector wire bytes and reject descriptor-marker payloads in the plain operator>> path.
  • Updated code generation templates to detect Buffer fields and emit endpoint-aware serialization code.
  • Added regression test test_buffer_wire_compat.cpp (wired in package CMakeLists.txt) covering byte-equivalence with legacy vector, legacy-byte deserialization, and descriptor-marker routing.

src/ros2/rosidl_typesupport_fastrtps/rosidl_typesupport_fastrtps_c (modified) — rclpy C type support

  • Buffer-aware C type support (msg__type_support_c.cpp.em): For messages with uint8[] fields, generated C serialize/deserialize check the sequence's is_rosidl_buffer flag and call serialize_buffer_with_endpoint / deserialize_buffer_with_endpoint, so rclpy (which uses the C typesupport) gets full buffer serialization and vendor-backed buffer handling; on deserialize, vendor backends set is_rosidl_buffer = true and store the Buffer* in the C struct for Python to wrap via _take_buffer_from_ptr.
  • For non-is_rosidl_buffer uint8[] sequences, endpoint-aware C serialization now uses plain legacy sequence bytes (uint32 size + raw array) rather than a "cpu" string prefix, preserving wire compatibility with existing uint8[] traffic and bag data.

src/ros2/rmw_zenoh/rmw_zenoh_cpp (modified)

  • Calls rosidl_buffer_backend_registry::initialize_buffer_backends() / shutdown_buffer_backends() during RMW lifecycle to load and tear down buffer backend plugins (the loader logic is consolidated in rosidl_buffer_backend_registry).
  • Descriptor registration: Backends auto-register FastCDR descriptor serializers in their constructor via rosidl_typesupport_fastrtps_cpp::register_buffer_descriptor<DescriptorMsgT>(); the RMW only verifies that the serializers map is populated for each loaded backend (no per-backend registration function call).
  • acceptable_buffer_backends: When creating a buffer-aware subscription, reads sub_options.acceptable_buffer_backends and parses the comma-separated list. NULL, empty, or "cpu" → CPU-only: the subscription stays buffer-aware (per-endpoint route) and advertises "cpu" as its only supported backend in the liveliness token, so publishers use CPU-serialized data. "any" → all installed backends. Specific names → filtered to those backends. In on_publisher_discovered(), CPU is always added to the publisher's backend list since CPU serialization is implicitly supported by all buffer-aware publishers, ensuring backends_compatible() passes for CPU-only subscribers matched with publishers that only advertise non-CPU backends.
  • Extended liveliness key-expressions to advertise each endpoint's supported backends.
  • Added graph cache discovery callbacks so buffer-aware publishers and subscribers detect each other dynamically.
  • Buffer-aware publishers create per-subscriber Zenoh endpoints; endpoint info is passed to the typesupport serialization layer, which delegates compatibility decisions to each backend's create_descriptor_with_endpoint() (nullptr → CPU fallback). Publisher creation explicitly adds "cpu" to backend_aux_info when has_buffer_fields is true (CPU serialization is always implicitly supported). Fallback publish: publish() first sends endpoint-aware messages to per-subscriber endpoints via publish_buffer_aware(), then conditionally falls through to the standard (CPU-serialized) base-key publish path only when the total matched subscription count (from graph_cache_->publisher_count_matched_subscriptions()) exceeds the number of discovered buffer-aware subscribers — avoiding unnecessary CPU conversion (to_vector()) of vendor-backed buffer data when all subscribers are buffer-aware. During the discovery window (no subscribers yet), publish_buffer_aware() returns early and the standard path publishes to the base key. Lock-ordering safety: on_subscriber_discovered() uses a three-phase pattern — (1) acquire mutex_ to check duplicates, collect state, mark the endpoint key in pending_endpoints_, check is_shutdown_, and release; (2) call notify_endpoint_discovered and declare_advanced_publisher without holding mutex_; (3) re-acquire mutex_ to store the endpoint and subscriber info, clear the pending marker. shutdown() moves endpoints to a local map under mutex_, sets is_shutdown_, then undeclares Zenoh publishers outside the lock, and also undeclares the base publisher pub_ (which buffer-aware publishers keep for fallback). This prevents deadlocks where mutex_ → Zenoh session lock conflicts with GraphCache lock → discovery callback → mutex_.
  • Buffer-aware subscribers create per-publisher Zenoh subscriptions; the subscription Message struct owns its endpoint info via std::optional<EndpointInfoStorage> (copied from the shared_ptr<SubscriptionEndpoint> at enqueue time) to prevent dangling pointers if the endpoint is destroyed before the message is processed. Endpoint info is passed into deserialization for correct backend reconstruction. on_publisher_discovered() uses the same three-phase lock-ordering pattern (with pending_sub_endpoints_ and is_shutdown_ checks); shutdown() moves endpoints to a local map under mutex_ and undeclares Zenoh subscribers outside the lock, and now safely calls graph_cache_->unregister_discovery_callbacks() (previously disabled due to the deadlock).

src/ros2/rmw_fastrtps/rmw_fastrtps_cpp (modified) — buffer-aware per-endpoint pub/sub

  • Calls rosidl_buffer_backend_registry::initialize_buffer_backends() / shutdown_buffer_backends() during rmw_init/rmw_shutdown to load and tear down buffer backend plugins (the loader logic is consolidated in rosidl_buffer_backend_registry).
  • Added buffer_endpoint_registry module (buffer_endpoint_registry.hpp/cpp): per-process singleton that maps topic names to discovery callbacks; publishers register subscriber-discovery callbacks, subscribers register publisher-discovery callbacks. notify_subscriber_discovered()/notify_publisher_discovered() are invoked from DDS participant listener discovery. Maintains known_subscribers_/known_publishers_ so late-registering callbacks see already-discovered endpoints.
  • Publisher creation (publisher.cpp, rmw_publisher.cpp): detects buffer-aware types via callbacks->has_buffer_fields; encodes supported backend names in DDS user_data QoS. Publisher creation explicitly adds "cpu" to backend_aux_info when has_buffer_fields is true, ensuring the BUFBE:cpu= marker is always present in user_data even when no backend plugins are loaded. Registers a subscriber-discovery callback that dynamically creates a per-subscriber DataWriter on a unique topic {topic}/_buf/{pub_gid_hex}_{sub_gid_hex} with reliable QoS, synchronous publish mode, and data sharing off. Discovery callbacks capture a shared buffer_alive_flag_ (shared_ptr<atomic<bool>>) and check it before accessing the raw info pointer, preventing use-after-free during concurrent destroy; rmw_destroy_publisher sets the flag to false before unregistering callbacks. Lock-ordering safety: Discovery callbacks use a three-phase pattern — (1) acquire buffer_mutex_ to check duplicates, mark the endpoint key in pending_buffer_endpoints_, and release; (2) call DDS create_topic/create_datawriter without holding buffer_mutex_; (3) re-acquire buffer_mutex_ to store the endpoint and clear the pending marker. This prevents deadlocks where buffer_mutex_ → DDS internal lock conflicts with DDS discovery thread → buffer_mutex_. rmw_destroy_publisher likewise moves endpoints to a local list under buffer_mutex_ and calls delete_datawriter/delete_topic after releasing it.
  • Subscription creation (subscription.cpp, rmw_subscription.cpp): detects buffer-aware types; parses acceptable_buffer_backends with the same semantics as rmw_zenoh_cpp (NULL/empty/"cpu" → CPU-only with {"cpu":""} in user_data, "any" → all installed, specific names → filtered); encodes backends in DDS user_data; registers a publisher-discovery callback that creates a per-publisher DataReader on the matching unique topic. Topic reuse: In single-process scenarios where the publisher's per-subscriber DataWriter already created the DDS topic on the same participant, the subscription-side callback uses lookup_topicdescription() to detect and reuse the existing topic (with owns_topic = false on BufferSubscriptionEndpoint) instead of attempting create_topic() which would fail. Each per-publisher DataReader has a BufferDataReaderListener that triggers a GuardCondition (buffer_data_guard_) and calls notify_buffer_data_available() so rmw_wait wakes up for buffer data. Discovery callbacks use the same buffer_alive_flag_ pattern and three-phase lock-ordering pattern as publishers (with pending_buffer_endpoints_); rmw_destroy_subscription sets the flag to false before cleanup and performs DDS delete_datareader/delete_topic (only when owns_topic is true) outside the lock.
  • Buffer-aware publish (rmw_publish.cpp): publish_to_buffer_endpoints() iterates over per-subscriber endpoints, calls callbacks->cdr_serialize_with_endpoint() with the endpoint info, writes raw CDR bytes via FASTDDS_SERIALIZED_DATA_TYPE_CDR_BUFFER. Returns whether non-buffer-aware subscribers exist by comparing publisher_event_->subscription_count() (total matched subscriptions on the main DataWriter) against buffer_endpoints_.size() + pending_buffer_endpoints_.size(). rmw_publish() conditionally calls __rmw_publish() on the main DataWriter only when non-buffer-aware subscribers are present, avoiding unnecessary CPU conversion of vendor-backed buffer data. Backend compatibility is determined inside the typesupport layer by each backend's create_descriptor_with_endpoint() return value (nullptr → CPU fallback).
  • Buffer-aware take (rmw_take.cpp): take_buffer_aware() iterates over per-publisher DataReaders, takes raw CDR bytes via FASTDDS_SERIALIZED_DATA_TYPE_CDR_BUFFER, calls callbacks->cdr_deserialize_with_endpoint() with explicit XCDRv1 / PLAIN_CDR encoding. Fallback to main DataReader: rmw_take() and rmw_take_with_info() call take_buffer_aware() first; if no buffer endpoint data is available (*taken is false), they fall back to __rmw_take() / __rmw_take_with_info() on the main DataReader for messages from non-buffer-aware publishers (e.g., cross-RMW). Guard condition re-arm: After a successful buffer take, take_buffer_aware() checks all buffer endpoint DataReaders for remaining unread data and re-sets buffer_data_guard_ if any is found — DDS on_data_available only fires on status transitions, so without re-arming, queued samples would not wake rmw_wait.
  • Context init (rmw_init.cpp, init_rmw_context_impl.cpp): calls initialize_buffer_backends() on init; registers a DDS participant listener callback that parses buffer backends from discovered endpoints' user_data and notifies BufferEndpointRegistry.
  • Dependencies: rosidl_buffer, rosidl_buffer_backend_registry added to package.xml and CMakeLists.txt.

src/ros2/rmw_fastrtps/rmw_fastrtps_shared_cpp (modified) — shared types and QoS for buffer support

  • custom_participant_info.hpp: added BufferDiscoveryCallback type and buffer_discovery_cb_ field; set_buffer_discovery_callback() registers a callback invoked during on_data_available discovery when user_data contains buffer backend info.
  • custom_publisher_info.hpp: added BufferPublisherEndpoint struct (key, DataWriter, Topic, target subscriber GID, backend aux info); added is_buffer_aware_, backend_aux_info_, local_endpoint_info_, buffer_mutex_, buffer_endpoints_, pending_buffer_endpoints_, buffer_alive_flag_ (shared_ptr<atomic<bool>>), participant_, dds_publisher_ to CustomPublisherInfo.
  • custom_subscriber_info.hpp: added BufferSubscriptionEndpoint struct (key, DataReader, Topic, owns_topic flag, listener, publisher GID, backend aux info); added is_buffer_aware_, my_backend_types_, local_endpoint_info_, buffer_mutex_, buffer_endpoints_, pending_buffer_endpoints_, buffer_alive_flag_ (shared_ptr<atomic<bool>>), buffer_data_guard_ to CustomSubscriberInfo; added RMWSubscriptionEvent::notify_buffer_data_available(). The owns_topic flag tracks whether the endpoint created its own DDS topic or is reusing one from the same participant (single-process case), controlling whether delete_topic() is called during cleanup.
  • qos.hpp/qos.cpp: added optional buffer_backends parameter to get_datareader_qos()/get_datawriter_qos(); added encode_buffer_backends_for_user_data() (format: BUFBE:name=aux;name2=aux2 appended to user_data) and parse_buffer_backends_from_user_data().
  • rmw_common.hpp: added _assign_message_info() helper.
  • rmw_wait.cpp: extended has_triggered_condition and WaitSet attachment to also check buffer_data_guard_; after wait(), if the main DataReader has no data but buffer_data_guard_ is triggered, marks the subscription as ready and resets the guard.
  • custom_subscriber_info.cpp: implemented notify_buffer_data_available() to invoke on_new_message_cb_.
  • Dependencies: rosidl_buffer_backend_registry added to package.xml.

src/ros2/rmw_cyclonedds/rmw_cyclonedds_cpp (modified) — rosidl::Buffer CPU-based fallback

  • Dependency: Added rosidl_buffer to package.xml and CMakeLists.txt.
  • TypeSupport2 (TypeSupport2.hpp, TypeSupport2.cpp): For C++ introspection, uint8[] fields are treated as rosidl::Buffer<uint8_t>sizeof_type(), size, and serialization use the Buffer API. For C introspection, when the sequence has is_rosidl_buffer set, seq->data is interpreted as Buffer<uint8_t>* for size and byte access; serialization/deserialization use the Buffer’s CPU data (or throw for non-CPU).
  • TypeSupport_impl (TypeSupport_impl.hpp): Deserialize path for unbounded sequences: when member->is_rosidl_buffer_ and the member is an array, deserialize into rosidl::Buffer<uint8_t>; when the C sequence has is_rosidl_buffer and non-null data, treat as Buffer pointer for copying into the message. Ensures Cyclone DDS (introspection typesupport) can send/receive messages with Buffer-backed uint8[] using CPU fallback.