From 2b9813d4982ee1105e20b82a958d91ea561f6813 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Tue, 27 May 2025 14:44:36 +0200 Subject: [PATCH 1/9] Refs #21670: Protect on_data_available_lambda_ Signed-off-by: cferreiragonz --- ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp b/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp index 87449a8fc..3b2905c43 100644 --- a/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp +++ b/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp @@ -165,7 +165,7 @@ bool BaseReader::should_accept_sample_() noexcept void BaseReader::on_data_available_() const noexcept { - if (on_data_available_lambda_set_) + if (on_data_available_lambda_set_ && enabled_.load()) { on_data_available_lambda_(); } From b5b8d6be40174765b7aa1ffc9fe47bead2d5a74f Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Tue, 17 Jun 2025 15:15:09 +0200 Subject: [PATCH 2/9] Refs #21670: Set listener in RederCreation Signed-off-by: cferreiragonz --- ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp | 6 +----- ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp | 4 ---- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp index da1dd7edf..98f3c2fbf 100644 --- a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp @@ -75,7 +75,7 @@ void CommonReader::init() reader_ = dds_subscriber_->create_datareader( dds_topic_, reckon_reader_qos_(), - nullptr, + this, eprosima::fastdds::dds::StatusMask::all(), payload_pool_); @@ -86,10 +86,6 @@ void CommonReader::init() participant_id_ << " in topic " << topic_ << "."); } - // Set listener after entity creation to avoid SEGFAULT (produced when callback using reader_ is - // invoked before the variable is fully set) - reader_->set_listener(this); - } void CommonReader::on_data_available( diff --git a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp index befc5e950..fb4b5b7ee 100644 --- a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp @@ -121,10 +121,6 @@ void CommonReader::internal_entities_creation_( participant_id_ << " in topic " << topic_ << "."); } - // Set listener after entity creation to avoid SEGFAULT (produced when callback using rtps_reader_ is - // invoked before the variable is fully set) - rtps_reader_->set_listener(this); - // Register reader with topic if (!rtps_participant_->register_reader(rtps_reader_, topic_description, reader_qos)) { From 3df7f0536f1494ae1a7ae42f01adfd0253e03ba7 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Tue, 17 Jun 2025 15:37:11 +0200 Subject: [PATCH 3/9] Refs #21670: Separate DDS/RTPS Listeners from Base Classes Signed-off-by: cferreiragonz --- .../participant/dds/CommonParticipant.hpp | 92 ++++++++++++--- .../dynamic_types/DynTypesParticipant.hpp | 62 +++++++--- .../participant/rtps/CommonParticipant.hpp | 106 ++++++++++++------ .../cpp/participant/dds/CommonParticipant.cpp | 52 +++++---- .../dynamic_types/DynTypesParticipant.cpp | 12 +- .../participant/rtps/CommonParticipant.cpp | 27 +++-- 6 files changed, 244 insertions(+), 107 deletions(-) diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp index 669cd04f4..c91ad665a 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp @@ -60,7 +60,7 @@ namespace dds { * @warning This Participant class does not support RPC so far. * @todo TODO */ -class CommonParticipant : public core::IParticipant, public fastdds::dds::DomainParticipantListener +class CommonParticipant : public core::IParticipant { public: @@ -112,23 +112,70 @@ class CommonParticipant : public core::IParticipant, public fastdds::dds::Domain // LISTENER METHODS ///////////////////////// - void on_participant_discovery( - fastdds::dds::DomainParticipant* participant, - fastdds::rtps::ParticipantDiscoveryStatus reason, - const fastdds::rtps::ParticipantBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; - - void on_data_reader_discovery( - fastdds::dds::DomainParticipant* participant, - fastdds::rtps::ReaderDiscoveryStatus reason, - const fastdds::dds::SubscriptionBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; - - void on_data_writer_discovery( - fastdds::dds::DomainParticipant* participant, - fastdds::rtps::WriterDiscoveryStatus reason, - const fastdds::dds::PublicationBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; + class DDSListener : public fastdds::dds::DomainParticipantListener + { + public: + + explicit DDSListener( + std::shared_ptr conf, + std::shared_ptr ddb) + : configuration_(conf) + , discovery_database_(ddb) + { + } + + /** + * @brief Override method from \c DomainParticipantListener + * + * This method is only used for debugging purposes. + */ + void on_participant_discovery( + fastdds::dds::DomainParticipant* participant, + fastdds::rtps::ParticipantDiscoveryStatus reason, + const fastdds::rtps::ParticipantBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + /** + * @brief Override method from \c DomainParticipantListener . + * + * This method adds to the database the discovered or modified endpoint. + */ + void on_data_reader_discovery( + fastdds::dds::DomainParticipant* participant, + fastdds::rtps::ReaderDiscoveryStatus reason, + const fastdds::dds::SubscriptionBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + /** + * @brief Override method from \c DomainParticipantListener . + * + * This method adds to the database the discovered or modified endpoint. + */ + void on_data_writer_discovery( + fastdds::dds::DomainParticipant* participant, + fastdds::rtps::WriterDiscoveryStatus reason, + const fastdds::dds::PublicationBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + //! Getter for GUID of the participant + const fastdds::rtps::GUID_t& guid() const; + + //! Setter for GUID of the participant + void guid( + const fastdds::rtps::GUID_t& guid); + private: + + //! GUID of the participant + fastdds::rtps::GUID_t guid_; + //! Shared pointer to the configuration of the participant + const std::shared_ptr configuration_; + //! Shared pointer to the discovery database + const std::shared_ptr discovery_database_; + + }; + + //! Unique pointer to the internal DDS Participant Listener + std::unique_ptr dds_participant_listener_; protected: @@ -159,6 +206,15 @@ class CommonParticipant : public core::IParticipant, public fastdds::dds::Domain fastdds::dds::DomainParticipant* create_dds_participant_(); + /** + * @brief Virtual method that creates a listener for the internal DDS Participant. + * It should be overridden if a different listener is needed. + */ + virtual std::unique_ptr create_listener() + { + return std::make_unique(configuration_, discovery_database_); + } + ///////////////////////// // INTERNAL METHODS ///////////////////////// diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp index 0ddfda678..87e34dc1d 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp @@ -74,25 +74,55 @@ class DynTypesParticipant : public rtps::SimpleParticipant std::shared_ptr create_reader( const core::ITopic& topic) override; - DDSPIPE_PARTICIPANTS_DllAPI - void on_reader_discovery( - fastdds::rtps::RTPSParticipant* participant, - fastdds::rtps::ReaderDiscoveryStatus reason, - const fastdds::rtps::SubscriptionBuiltinTopicData& info, - bool& should_be_ignored) override; - - DDSPIPE_PARTICIPANTS_DllAPI - void on_writer_discovery( - fastdds::rtps::RTPSParticipant* participant, - fastdds::rtps::WriterDiscoveryStatus reason, - const fastdds::rtps::PublicationBuiltinTopicData& info, - bool& should_be_ignored) override; + class DynRTPSListener : public rtps::CommonParticipant::RTPSListener + { + public: + + explicit DynRTPSListener( + std::shared_ptr conf, + std::shared_ptr ddb, + std::shared_ptr type_object_reader_, + std::set received_types_) + : rtps::CommonParticipant::RTPSListener(conf, ddb) + , type_object_reader_(type_object_reader_) + , received_types_(received_types_) + { + } + + DDSPIPE_PARTICIPANTS_DllAPI + void on_reader_discovery( + fastdds::rtps::RTPSParticipant* participant, + fastdds::rtps::ReaderDiscoveryStatus reason, + const fastdds::rtps::SubscriptionBuiltinTopicData& info, + bool& should_be_ignored) override; + + DDSPIPE_PARTICIPANTS_DllAPI + void on_writer_discovery( + fastdds::rtps::RTPSParticipant* participant, + fastdds::rtps::WriterDiscoveryStatus reason, + const fastdds::rtps::PublicationBuiltinTopicData& info, + bool& should_be_ignored) override; + + private: + + //! Type Object Internal Reader + std::shared_ptr type_object_reader_; + //! Received types set + std::set received_types_; + + void notify_type_discovered_( + const fastdds::dds::xtypes::TypeInformation& type_info, + const std::string& type_name); + + }; protected: - void notify_type_discovered_( - const fastdds::dds::xtypes::TypeInformation& type_info, - const std::string& type_name); + //! Override method from \c CommonParticipant to create the internal RTPS participant listener + std::unique_ptr create_listener() override + { + return std::make_unique(configuration_, discovery_database_, type_object_reader_, received_types_); + } //! Type Object Internal Reader std::shared_ptr type_object_reader_; diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp index d77619605..8ee54f6b8 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp @@ -52,7 +52,6 @@ namespace rtps { */ class CommonParticipant : public core::IParticipant - , public fastdds::rtps::RTPSParticipantListener { public: @@ -123,41 +122,65 @@ class CommonParticipant // RTPS LISTENER METHODS ///////////////////////// - /** - * @brief Override method from \c RTPSParticipantListener . - * - * This method only is for debugging purposes. - */ - DDSPIPE_PARTICIPANTS_DllAPI - virtual void on_participant_discovery( - fastdds::rtps::RTPSParticipant* participant, - fastdds::rtps::ParticipantDiscoveryStatus reason, - const fastdds::rtps::ParticipantBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; - - /** - * @brief Override method from \c RTPSParticipantListener . - * - * This method adds to database the endpoint discovered or modified. - */ - DDSPIPE_PARTICIPANTS_DllAPI - virtual void on_reader_discovery( - fastdds::rtps::RTPSParticipant* participant, - fastdds::rtps::ReaderDiscoveryStatus reason, - const fastdds::rtps::SubscriptionBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; - - /** - * @brief Override method from \c RTPSParticipantListener . - * - * This method adds to database the endpoint discovered or modified. - */ - DDSPIPE_PARTICIPANTS_DllAPI - virtual void on_writer_discovery( - fastdds::rtps::RTPSParticipant* participant, - fastdds::rtps::WriterDiscoveryStatus reason, - const fastdds::rtps::PublicationBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; + class RTPSListener : public fastdds::rtps::RTPSParticipantListener + { + public: + + explicit RTPSListener( + std::shared_ptr conf, + std::shared_ptr ddb) + : configuration_(conf) + , discovery_database_(ddb) + { + } + + /** + * @brief Override method from \c RTPSParticipantListener . + * + * This method is only used for debugging purposes. + */ + DDSPIPE_PARTICIPANTS_DllAPI + virtual void on_participant_discovery( + fastdds::rtps::RTPSParticipant* participant, + fastdds::rtps::ParticipantDiscoveryStatus reason, + const fastdds::rtps::ParticipantBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + /** + * @brief Override method from \c RTPSParticipantListener . + * + * This method adds to database the endpoint discovered or modified. + */ + DDSPIPE_PARTICIPANTS_DllAPI + virtual void on_reader_discovery( + fastdds::rtps::RTPSParticipant* participant, + fastdds::rtps::ReaderDiscoveryStatus reason, + const fastdds::rtps::SubscriptionBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + /** + * @brief Override method from \c RTPSParticipantListener . + * + * This method adds to database the endpoint discovered or modified. + */ + DDSPIPE_PARTICIPANTS_DllAPI + virtual void on_writer_discovery( + fastdds::rtps::RTPSParticipant* participant, + fastdds::rtps::WriterDiscoveryStatus reason, + const fastdds::rtps::PublicationBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + protected: + + //! Shared pointer to the configuration of the participant + const std::shared_ptr configuration_; + //! Shared pointer to the discovery database + const std::shared_ptr discovery_database_; + + }; + + //! Unique pointer to the internal RTPS Participant Listener + std::unique_ptr rtps_participant_listener_; ////////////////// // STATIC METHODS @@ -221,6 +244,17 @@ class CommonParticipant DDSPIPE_PARTICIPANTS_DllAPI virtual fastdds::rtps::RTPSParticipantAttributes reckon_participant_attributes_() const; + /** + * @brief Virtual method that creates a listener for the internal RTPS Participant. + * It should be overridden if a different listener is needed. + * This method must be called after the RTPS Participant is created, otherwise no listener will be set. + * @return A unique pointer to an RTPS Participant Listener. + */ + virtual std::unique_ptr create_listener() + { + return std::make_unique(configuration_, discovery_database_); + } + ///// // VARIABLES diff --git a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp index ff4638575..5ed836aca 100644 --- a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp @@ -48,11 +48,7 @@ CommonParticipant::~CommonParticipant() if (dds_participant_) { dds_participant_->set_listener(nullptr); - - for (auto& topic : dds_topics_) - { - dds_participant_->delete_topic(topic.second); - } + dds_participant_->delete_contained_entities(); eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(dds_participant_); } @@ -95,6 +91,9 @@ void CommonParticipant::init() { throw utils::InitializationException(STR_ENTRY << "Error enabling DDS Participant " << id() << "."); } + + // Set the GUID of the participant in the DDS Listener + static_cast(dds_participant_listener_.get())->guid(dds_participant_->guid()); } core::types::ParticipantId CommonParticipant::id() const noexcept @@ -215,7 +214,7 @@ std::shared_ptr CommonParticipant::create_reader( } } -void CommonParticipant::on_participant_discovery( +void CommonParticipant::DDSListener::on_participant_discovery( fastdds::dds::DomainParticipant* participant, fastdds::rtps::ParticipantDiscoveryStatus reason, const fastdds::rtps::ParticipantBuiltinTopicData& info, @@ -252,14 +251,14 @@ void CommonParticipant::on_participant_discovery( } } -void CommonParticipant::on_data_reader_discovery( +void CommonParticipant::DDSListener::on_data_reader_discovery( fastdds::dds::DomainParticipant*, fastdds::rtps::ReaderDiscoveryStatus reason, const fastdds::dds::SubscriptionBuiltinTopicData& info, bool& /*should_be_ignored*/) { // If reader is from other participant, store it in discovery database - if (detail::come_from_same_participant_(info.guid, this->dds_participant_->guid())) + if (detail::come_from_same_participant_(info.guid, guid())) { // Come from this participant, do nothing return; @@ -267,7 +266,7 @@ void CommonParticipant::on_data_reader_discovery( // Calculate endpoint info core::types::Endpoint info_reader = - detail::create_endpoint_from_info_(info, id()); + detail::create_endpoint_from_info_(info, configuration_->id); // If new endpoint discovered if (reason == fastdds::rtps::ReaderDiscoveryStatus::DISCOVERED_READER) @@ -276,14 +275,14 @@ void CommonParticipant::on_data_reader_discovery( "Found in Participant " << configuration_->id << " new Reader " << info.guid << "."); // TODO check logic because if an endpoint is lost by liveliness it may be inserted again when already in database - this->discovery_database_->add_endpoint(info_reader); + discovery_database_->add_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::CHANGED_QOS_READER) { EPROSIMA_LOG_INFO(DDSPIPE_DISCOVERY, configuration_->id << " participant : " << "Reader " << info.guid << " changed TopicQoS."); - this->discovery_database_->update_endpoint(info_reader); + discovery_database_->update_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::REMOVED_READER) { @@ -291,7 +290,7 @@ void CommonParticipant::on_data_reader_discovery( configuration_->id << " participant : " << "Reader " << info.guid << " removed."); info_reader.active = false; - this->discovery_database_->update_endpoint(info_reader); + discovery_database_->update_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::IGNORED_READER) { @@ -302,14 +301,14 @@ void CommonParticipant::on_data_reader_discovery( } } -void CommonParticipant::on_data_writer_discovery( +void CommonParticipant::DDSListener::on_data_writer_discovery( fastdds::dds::DomainParticipant*, fastdds::rtps::WriterDiscoveryStatus reason, const fastdds::dds::PublicationBuiltinTopicData& info, bool& /*should_be_ignored*/) { // If writer is from other participant, store it in discovery database - if (detail::come_from_same_participant_(info.guid, this->dds_participant_->guid())) + if (detail::come_from_same_participant_(info.guid, guid())) { // Come from this participant, do nothing return; @@ -317,7 +316,7 @@ void CommonParticipant::on_data_writer_discovery( // Calculate endpoint info core::types::Endpoint info_writer = - detail::create_endpoint_from_info_(info, id()); + detail::create_endpoint_from_info_(info, configuration_->id); // If new endpoint discovered if (reason == fastdds::rtps::WriterDiscoveryStatus::DISCOVERED_WRITER) @@ -326,14 +325,14 @@ void CommonParticipant::on_data_writer_discovery( "Found in Participant " << configuration_->id << " new Writer " << info.guid << "."); // TODO check logic because if an endpoint is lost by liveliness it may be inserted again when already in database - this->discovery_database_->add_endpoint(info_writer); + discovery_database_->add_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::CHANGED_QOS_WRITER) { EPROSIMA_LOG_INFO(DDSPIPE_DISCOVERY, configuration_->id << " participant : " << "Writer " << info.guid << " changed TopicQoS."); - this->discovery_database_->update_endpoint(info_writer); + discovery_database_->update_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::REMOVED_WRITER) { @@ -341,7 +340,7 @@ void CommonParticipant::on_data_writer_discovery( configuration_->id << " participant : " << "Writer " << info.guid << " removed."); info_writer.active = false; - this->discovery_database_->update_endpoint(info_writer); + discovery_database_->update_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::IGNORED_WRITER) { @@ -352,6 +351,18 @@ void CommonParticipant::on_data_writer_discovery( } } +const fastdds::rtps::GUID_t& CommonParticipant::DDSListener::guid() const +{ + return guid_; +} + +void CommonParticipant::DDSListener::guid( + const fastdds::rtps::GUID_t& guid) +{ + guid_ = guid; +} + + CommonParticipant::CommonParticipant( const std::shared_ptr& participant_configuration, const std::shared_ptr& payload_pool, @@ -403,10 +414,13 @@ fastdds::dds::DomainParticipant* CommonParticipant::create_dds_participant_() mask << fastdds::dds::StatusMask::publication_matched(); mask << fastdds::dds::StatusMask::subscription_matched(); + // Create the participant listener + dds_participant_listener_ = create_listener(); + return eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant( configuration_->domain, reckon_participant_qos_(), - this, + dds_participant_listener_.get(), mask); } diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp index f2d60025e..8cdcdfe12 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp @@ -76,7 +76,7 @@ std::shared_ptr DynTypesParticipant::create_reader( return rtps::SimpleParticipant::create_reader(topic); } -void DynTypesParticipant::on_reader_discovery( +void DynTypesParticipant::DynRTPSListener::on_reader_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::ReaderDiscoveryStatus reason, const fastdds::rtps::SubscriptionBuiltinTopicData& info, @@ -88,13 +88,13 @@ void DynTypesParticipant::on_reader_discovery( const auto type_info = info.type_information.type_information; const auto type_name = info.type_name.to_string(); - rtps::CommonParticipant::on_reader_discovery(participant, reason, info, should_be_ignored); + rtps::CommonParticipant::RTPSListener::on_reader_discovery(participant, reason, info, should_be_ignored); notify_type_discovered_(type_info, type_name); } } -void DynTypesParticipant::on_writer_discovery( +void DynTypesParticipant::DynRTPSListener::on_writer_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::WriterDiscoveryStatus reason, const fastdds::rtps::PublicationBuiltinTopicData& info, @@ -106,13 +106,13 @@ void DynTypesParticipant::on_writer_discovery( const auto type_info = info.type_information.type_information; const auto type_name = info.type_name.to_string(); - rtps::CommonParticipant::on_writer_discovery(participant, reason, info, should_be_ignored); + rtps::CommonParticipant::RTPSListener::on_writer_discovery(participant, reason, info, should_be_ignored); notify_type_discovered_(type_info, type_name); } } -void DynTypesParticipant::notify_type_discovered_( +void DynTypesParticipant::DynRTPSListener::notify_type_discovered_( const fastdds::dds::xtypes::TypeInformation& type_info, const std::string& type_name) { @@ -150,7 +150,7 @@ void DynTypesParticipant::notify_type_discovered_( // Notify type_identifier // NOTE: We assume each type_name corresponds to only one type_identifier EPROSIMA_LOG_INFO(DDSPIPE_DYNTYPES_PARTICIPANT, - "Participant " << this->id() << " discovered type object " << dyn_type->get_name()); + "Participant " << configuration_->id << " discovered type object " << dyn_type->get_name()); monitor_type_discovered(type_name); diff --git a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp index 0414b816b..4461c9497 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp @@ -79,7 +79,7 @@ void CommonParticipant::init() participant_attributes_); } -void CommonParticipant::on_participant_discovery( +void CommonParticipant::RTPSListener::on_participant_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::ParticipantDiscoveryStatus reason, const fastdds::rtps::ParticipantBuiltinTopicData& info, @@ -116,7 +116,7 @@ void CommonParticipant::on_participant_discovery( } } -void CommonParticipant::on_reader_discovery( +void CommonParticipant::RTPSListener::on_reader_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::ReaderDiscoveryStatus reason, const fastdds::rtps::SubscriptionBuiltinTopicData& info, @@ -126,14 +126,14 @@ void CommonParticipant::on_reader_discovery( { core::types::Endpoint info_reader = detail::create_endpoint_from_info_( - info, this->id()); + info, configuration_->id); if (reason == fastdds::rtps::ReaderDiscoveryStatus::DISCOVERED_READER) { EPROSIMA_LOG_INFO(DDSPIPE_DISCOVERY, "Found in Participant " << configuration_->id << " new Reader " << info.guid << "."); - this->discovery_database_->add_endpoint(info_reader); + discovery_database_->add_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::CHANGED_QOS_READER) { @@ -141,7 +141,7 @@ void CommonParticipant::on_reader_discovery( configuration_->id << " participant : " << "Reader " << info.guid << " changed TopicQoS."); - this->discovery_database_->update_endpoint(info_reader); + discovery_database_->update_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::REMOVED_READER) { @@ -149,7 +149,7 @@ void CommonParticipant::on_reader_discovery( configuration_->id << " participant : " << "Reader " << info.guid << " removed."); info_reader.active = false; - this->discovery_database_->update_endpoint(info_reader); + discovery_database_->update_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::IGNORED_READER) { @@ -161,7 +161,7 @@ void CommonParticipant::on_reader_discovery( } } -void CommonParticipant::on_writer_discovery( +void CommonParticipant::RTPSListener::on_writer_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::WriterDiscoveryStatus reason, const fastdds::rtps::PublicationBuiltinTopicData& info, @@ -171,14 +171,14 @@ void CommonParticipant::on_writer_discovery( { core::types::Endpoint info_writer = detail::create_endpoint_from_info_( - info, this->id()); + info, configuration_->id); if (reason == fastdds::rtps::WriterDiscoveryStatus::DISCOVERED_WRITER) { EPROSIMA_LOG_INFO(DDSPIPE_DISCOVERY, "Found in Participant " << configuration_->id << " new Writer " << info.guid << "."); - this->discovery_database_->add_endpoint(info_writer); + discovery_database_->add_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::CHANGED_QOS_WRITER) { @@ -186,7 +186,7 @@ void CommonParticipant::on_writer_discovery( configuration_->id << " participant : " << "Writer " << info.guid << " changed TopicQoS."); - this->discovery_database_->update_endpoint(info_writer); + discovery_database_->update_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::REMOVED_WRITER) { @@ -194,7 +194,7 @@ void CommonParticipant::on_writer_discovery( configuration_->id << " participant : " << "Writer " << info.guid << " removed."); info_writer.active = false; - this->discovery_database_->update_endpoint(info_writer); + discovery_database_->update_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::IGNORED_WRITER) { @@ -326,12 +326,15 @@ void CommonParticipant::create_participant_( EPROSIMA_LOG_INFO(DDSPIPE_RTPS_PARTICIPANT, "Creating Participant in domain " << domain); + // Create the RTPS Participant Listener + rtps_participant_listener_ = create_listener(); + // Listener must be set in creation as no callbacks should be missed // It is safe to do so here as object is already created and callbacks do not require anything set in this method rtps_participant_ = fastdds::rtps::RTPSDomain::createParticipant( domain, participant_attributes, - this); + rtps_participant_listener_.get()); if (!rtps_participant_) { From 8ed78cde14ad0b113cd39d7c0d68e503b7411e79 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Tue, 17 Jun 2025 15:37:55 +0200 Subject: [PATCH 4/9] Refs #21670: Uncrustify Signed-off-by: cferreiragonz --- .../ddspipe_participants/participant/dds/CommonParticipant.hpp | 1 + .../participant/dynamic_types/DynTypesParticipant.hpp | 3 ++- .../src/cpp/participant/dds/CommonParticipant.cpp | 1 - ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp | 3 ++- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp index c91ad665a..f1902974d 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp @@ -163,6 +163,7 @@ class CommonParticipant : public core::IParticipant //! Setter for GUID of the participant void guid( const fastdds::rtps::GUID_t& guid); + private: //! GUID of the participant diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp index 87e34dc1d..e88e1c1eb 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp @@ -121,7 +121,8 @@ class DynTypesParticipant : public rtps::SimpleParticipant //! Override method from \c CommonParticipant to create the internal RTPS participant listener std::unique_ptr create_listener() override { - return std::make_unique(configuration_, discovery_database_, type_object_reader_, received_types_); + return std::make_unique(configuration_, discovery_database_, type_object_reader_, + received_types_); } //! Type Object Internal Reader diff --git a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp index 5ed836aca..75a26bbb0 100644 --- a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp @@ -362,7 +362,6 @@ void CommonParticipant::DDSListener::guid( guid_ = guid; } - CommonParticipant::CommonParticipant( const std::shared_ptr& participant_configuration, const std::shared_ptr& payload_pool, diff --git a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp index 98f3c2fbf..71837843d 100644 --- a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp @@ -187,7 +187,8 @@ utils::ReturnCode CommonReader::take_nts_( // There has been an error taking the data. Exit. return ret; } - } while (!should_accept_sample_(info)); + } + while (!should_accept_sample_(info)); EPROSIMA_LOG_INFO(DDSPIPE_DDS_READER, "Data taken in " << participant_id_ << " for topic " << topic_ << "."); From 6af6a1497df04c76bedca55fceb7cf9ffb19e201 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Wed, 18 Jun 2025 16:24:15 +0200 Subject: [PATCH 5/9] Refs #21670: Apply Review Signed-off-by: cferreiragonz --- .../participant/dds/CommonParticipant.hpp | 28 ++++++++-------- .../dynamic_types/DynTypesParticipant.hpp | 33 +++++++------------ .../participant/rtps/CommonParticipant.hpp | 17 ++++------ .../cpp/participant/dds/CommonParticipant.cpp | 15 ++++++++- .../dynamic_types/DynTypesParticipant.cpp | 31 ++++++++++++----- .../participant/rtps/CommonParticipant.cpp | 22 ++++++++++--- 6 files changed, 86 insertions(+), 60 deletions(-) diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp index f1902974d..7a67342c1 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp @@ -118,11 +118,7 @@ class CommonParticipant : public core::IParticipant explicit DDSListener( std::shared_ptr conf, - std::shared_ptr ddb) - : configuration_(conf) - , discovery_database_(ddb) - { - } + std::shared_ptr ddb); /** * @brief Override method from \c DomainParticipantListener @@ -164,7 +160,7 @@ class CommonParticipant : public core::IParticipant void guid( const fastdds::rtps::GUID_t& guid); - private: + protected: //! GUID of the participant fastdds::rtps::GUID_t guid_; @@ -190,6 +186,17 @@ class CommonParticipant : public core::IParticipant const std::shared_ptr& payload_pool, const std::shared_ptr& discovery_database); + ///////////////////////// + // VIRTUAL METHODS + ///////////////////////// + + /** + * @brief Virtual method that creates a listener for the internal DDS Participant. + * It should be overridden if a different listener is needed. + */ + DDSPIPE_PARTICIPANTS_DllAPI + virtual std::unique_ptr create_listener_(); + ///////////////////////// // INTERNAL VIRTUAL METHODS ///////////////////////// @@ -207,15 +214,6 @@ class CommonParticipant : public core::IParticipant fastdds::dds::DomainParticipant* create_dds_participant_(); - /** - * @brief Virtual method that creates a listener for the internal DDS Participant. - * It should be overridden if a different listener is needed. - */ - virtual std::unique_ptr create_listener() - { - return std::make_unique(configuration_, discovery_database_); - } - ///////////////////////// // INTERNAL METHODS ///////////////////////// diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp index e88e1c1eb..1b4fdb7af 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp @@ -74,20 +74,15 @@ class DynTypesParticipant : public rtps::SimpleParticipant std::shared_ptr create_reader( const core::ITopic& topic) override; - class DynRTPSListener : public rtps::CommonParticipant::RTPSListener + class DynTypesRtpsListener : public rtps::CommonParticipant::RtpsListener { public: - explicit DynRTPSListener( + DDSPIPE_PARTICIPANTS_DllAPI + explicit DynTypesRtpsListener( std::shared_ptr conf, std::shared_ptr ddb, - std::shared_ptr type_object_reader_, - std::set received_types_) - : rtps::CommonParticipant::RTPSListener(conf, ddb) - , type_object_reader_(type_object_reader_) - , received_types_(received_types_) - { - } + core::types::ParticipantId id); DDSPIPE_PARTICIPANTS_DllAPI void on_reader_discovery( @@ -103,7 +98,13 @@ class DynTypesParticipant : public rtps::SimpleParticipant const fastdds::rtps::PublicationBuiltinTopicData& info, bool& should_be_ignored) override; - private: + //! Type Object Reader getter + inline std::shared_ptr type_object_reader() const + { + return type_object_reader_; + } + + protected: //! Type Object Internal Reader std::shared_ptr type_object_reader_; @@ -119,17 +120,7 @@ class DynTypesParticipant : public rtps::SimpleParticipant protected: //! Override method from \c CommonParticipant to create the internal RTPS participant listener - std::unique_ptr create_listener() override - { - return std::make_unique(configuration_, discovery_database_, type_object_reader_, - received_types_); - } - - //! Type Object Internal Reader - std::shared_ptr type_object_reader_; - - //! Received types set - std::set received_types_; + std::unique_ptr create_listener_() override; }; } /* namespace participants */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp index 8ee54f6b8..117464db3 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp @@ -122,17 +122,14 @@ class CommonParticipant // RTPS LISTENER METHODS ///////////////////////// - class RTPSListener : public fastdds::rtps::RTPSParticipantListener + class RtpsListener : public fastdds::rtps::RTPSParticipantListener { public: - explicit RTPSListener( + DDSPIPE_PARTICIPANTS_DllAPI + explicit RtpsListener( std::shared_ptr conf, - std::shared_ptr ddb) - : configuration_(conf) - , discovery_database_(ddb) - { - } + std::shared_ptr ddb); /** * @brief Override method from \c RTPSParticipantListener . @@ -250,10 +247,8 @@ class CommonParticipant * This method must be called after the RTPS Participant is created, otherwise no listener will be set. * @return A unique pointer to an RTPS Participant Listener. */ - virtual std::unique_ptr create_listener() - { - return std::make_unique(configuration_, discovery_database_); - } + DDSPIPE_PARTICIPANTS_DllAPI + virtual std::unique_ptr create_listener_(); ///// // VARIABLES diff --git a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp index 75a26bbb0..17062ad55 100644 --- a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp @@ -54,6 +54,11 @@ CommonParticipant::~CommonParticipant() } } +std::unique_ptr CommonParticipant::create_listener_() +{ + return std::make_unique(configuration_, discovery_database_); +} + void CommonParticipant::init() { EPROSIMA_LOG_INFO(DDSPIPE_DDS_PARTICIPANT, "Initializing DDS Participant " << id() << "."); @@ -214,6 +219,14 @@ std::shared_ptr CommonParticipant::create_reader( } } +CommonParticipant::DDSListener::DDSListener( + std::shared_ptr conf, + std::shared_ptr ddb) + : configuration_(conf) + , discovery_database_(ddb) +{ +} + void CommonParticipant::DDSListener::on_participant_discovery( fastdds::dds::DomainParticipant* participant, fastdds::rtps::ParticipantDiscoveryStatus reason, @@ -414,7 +427,7 @@ fastdds::dds::DomainParticipant* CommonParticipant::create_dds_participant_() mask << fastdds::dds::StatusMask::subscription_matched(); // Create the participant listener - dds_participant_listener_ = create_listener(); + dds_participant_listener_ = create_listener_(); return eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant( configuration_->domain, diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp index 8cdcdfe12..ee417490e 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp @@ -50,8 +50,6 @@ DynTypesParticipant::DynTypesParticipant( participant_configuration, payload_pool, discovery_database) - , type_object_reader_(std::make_shared( - this->id())) { // Do nothing } @@ -69,14 +67,26 @@ std::shared_ptr DynTypesParticipant::create_reader( // If type object topic, return the internal reader for type objects if (core::types::is_type_object_topic(topic)) { - return this->type_object_reader_; + return static_cast(rtps_participant_listener_.get())-> + type_object_reader(); } // If not type object, use the parent method return rtps::SimpleParticipant::create_reader(topic); } -void DynTypesParticipant::DynRTPSListener::on_reader_discovery( +DynTypesParticipant::DynTypesRtpsListener::DynTypesRtpsListener( + std::shared_ptr conf, + std::shared_ptr ddb, + core::types::ParticipantId id) + : rtps::CommonParticipant::RtpsListener( + conf, + ddb) + , type_object_reader_(std::make_shared(id)) +{ +} + +void DynTypesParticipant::DynTypesRtpsListener::on_reader_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::ReaderDiscoveryStatus reason, const fastdds::rtps::SubscriptionBuiltinTopicData& info, @@ -88,13 +98,13 @@ void DynTypesParticipant::DynRTPSListener::on_reader_discovery( const auto type_info = info.type_information.type_information; const auto type_name = info.type_name.to_string(); - rtps::CommonParticipant::RTPSListener::on_reader_discovery(participant, reason, info, should_be_ignored); + rtps::CommonParticipant::RtpsListener::on_reader_discovery(participant, reason, info, should_be_ignored); notify_type_discovered_(type_info, type_name); } } -void DynTypesParticipant::DynRTPSListener::on_writer_discovery( +void DynTypesParticipant::DynTypesRtpsListener::on_writer_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::WriterDiscoveryStatus reason, const fastdds::rtps::PublicationBuiltinTopicData& info, @@ -106,13 +116,13 @@ void DynTypesParticipant::DynRTPSListener::on_writer_discovery( const auto type_info = info.type_information.type_information; const auto type_name = info.type_name.to_string(); - rtps::CommonParticipant::RTPSListener::on_writer_discovery(participant, reason, info, should_be_ignored); + rtps::CommonParticipant::RtpsListener::on_writer_discovery(participant, reason, info, should_be_ignored); notify_type_discovered_(type_info, type_name); } } -void DynTypesParticipant::DynRTPSListener::notify_type_discovered_( +void DynTypesParticipant::DynTypesRtpsListener::notify_type_discovered_( const fastdds::dds::xtypes::TypeInformation& type_info, const std::string& type_name) { @@ -166,6 +176,11 @@ void DynTypesParticipant::DynRTPSListener::notify_type_discovered_( type_object_reader_->simulate_data_reception(std::move(data)); } +std::unique_ptr DynTypesParticipant::create_listener_() +{ + return std::make_unique(configuration_, discovery_database_, id()); +} + } /* namespace participants */ } /* namespace ddspipe */ } /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp index 4461c9497..d09142f11 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp @@ -79,7 +79,15 @@ void CommonParticipant::init() participant_attributes_); } -void CommonParticipant::RTPSListener::on_participant_discovery( +CommonParticipant::RtpsListener::RtpsListener( + std::shared_ptr conf, + std::shared_ptr ddb) + : configuration_(conf) + , discovery_database_(ddb) +{ +} + +void CommonParticipant::RtpsListener::on_participant_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::ParticipantDiscoveryStatus reason, const fastdds::rtps::ParticipantBuiltinTopicData& info, @@ -116,7 +124,7 @@ void CommonParticipant::RTPSListener::on_participant_discovery( } } -void CommonParticipant::RTPSListener::on_reader_discovery( +void CommonParticipant::RtpsListener::on_reader_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::ReaderDiscoveryStatus reason, const fastdds::rtps::SubscriptionBuiltinTopicData& info, @@ -161,7 +169,7 @@ void CommonParticipant::RTPSListener::on_reader_discovery( } } -void CommonParticipant::RTPSListener::on_writer_discovery( +void CommonParticipant::RtpsListener::on_writer_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::WriterDiscoveryStatus reason, const fastdds::rtps::PublicationBuiltinTopicData& info, @@ -327,7 +335,7 @@ void CommonParticipant::create_participant_( "Creating Participant in domain " << domain); // Create the RTPS Participant Listener - rtps_participant_listener_ = create_listener(); + rtps_participant_listener_ = create_listener_(); // Listener must be set in creation as no callbacks should be missed // It is safe to do so here as object is already created and callbacks do not require anything set in this method @@ -503,6 +511,12 @@ CommonParticipant::reckon_participant_attributes_() const return params; } +std::unique_ptr +CommonParticipant::create_listener_() +{ + return std::make_unique(configuration_, discovery_database_); +} + } /* namespace rtps */ } /* namespace participants */ } /* namespace ddspipe */ From 8760503621187b263f240a5dcd54a4bf376485ce Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Thu, 19 Jun 2025 15:15:31 +0200 Subject: [PATCH 6/9] Refs #21670: Apply Review 2 Signed-off-by: cferreiragonz --- .../participant/dds/CommonParticipant.hpp | 9 -------- .../cpp/participant/dds/CommonParticipant.cpp | 22 ++++--------------- .../src/cpp/reader/dds/CommonReader.cpp | 13 +++++++++++ .../src/cpp/reader/rtps/CommonReader.cpp | 1 + 4 files changed, 18 insertions(+), 27 deletions(-) diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp index 7a67342c1..244f9a9f5 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp @@ -153,17 +153,8 @@ class CommonParticipant : public core::IParticipant const fastdds::dds::PublicationBuiltinTopicData& info, bool& /*should_be_ignored*/) override; - //! Getter for GUID of the participant - const fastdds::rtps::GUID_t& guid() const; - - //! Setter for GUID of the participant - void guid( - const fastdds::rtps::GUID_t& guid); - protected: - //! GUID of the participant - fastdds::rtps::GUID_t guid_; //! Shared pointer to the configuration of the participant const std::shared_ptr configuration_; //! Shared pointer to the discovery database diff --git a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp index 17062ad55..10999c215 100644 --- a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp @@ -96,9 +96,6 @@ void CommonParticipant::init() { throw utils::InitializationException(STR_ENTRY << "Error enabling DDS Participant " << id() << "."); } - - // Set the GUID of the participant in the DDS Listener - static_cast(dds_participant_listener_.get())->guid(dds_participant_->guid()); } core::types::ParticipantId CommonParticipant::id() const noexcept @@ -265,13 +262,13 @@ void CommonParticipant::DDSListener::on_participant_discovery( } void CommonParticipant::DDSListener::on_data_reader_discovery( - fastdds::dds::DomainParticipant*, + fastdds::dds::DomainParticipant* participant, fastdds::rtps::ReaderDiscoveryStatus reason, const fastdds::dds::SubscriptionBuiltinTopicData& info, bool& /*should_be_ignored*/) { // If reader is from other participant, store it in discovery database - if (detail::come_from_same_participant_(info.guid, guid())) + if (detail::come_from_same_participant_(info.guid, participant->guid())) { // Come from this participant, do nothing return; @@ -315,13 +312,13 @@ void CommonParticipant::DDSListener::on_data_reader_discovery( } void CommonParticipant::DDSListener::on_data_writer_discovery( - fastdds::dds::DomainParticipant*, + fastdds::dds::DomainParticipant* participant, fastdds::rtps::WriterDiscoveryStatus reason, const fastdds::dds::PublicationBuiltinTopicData& info, bool& /*should_be_ignored*/) { // If writer is from other participant, store it in discovery database - if (detail::come_from_same_participant_(info.guid, guid())) + if (detail::come_from_same_participant_(info.guid, participant->guid())) { // Come from this participant, do nothing return; @@ -364,17 +361,6 @@ void CommonParticipant::DDSListener::on_data_writer_discovery( } } -const fastdds::rtps::GUID_t& CommonParticipant::DDSListener::guid() const -{ - return guid_; -} - -void CommonParticipant::DDSListener::guid( - const fastdds::rtps::GUID_t& guid) -{ - guid_ = guid; -} - CommonParticipant::CommonParticipant( const std::shared_ptr& participant_configuration, const std::shared_ptr& payload_pool, diff --git a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp index 71837843d..bed8d9e0a 100644 --- a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp @@ -86,6 +86,18 @@ void CommonReader::init() participant_id_ << " in topic " << topic_ << "."); } + // Subscriber is created with autoenable set to false, so we need to enable the reader manually. + // This is done just to ensure that the reader is not registered before any other method modifying the reader pointer + // is called, opening a window for potential data races. Although Fast DDS ensures that this cannot happen, this + // procedure protects against future bad practices introducing the aforementioned data races. + if (fastdds::dds::RETCODE_OK != reader_->enable()) + { + dds_subscriber_->delete_datareader(reader_); + throw utils::InitializationException( + utils::Formatter() << "Error enabling DataReader for Participant " << + participant_id_ << " in topic " << topic_ << "."); + } + } void CommonReader::on_data_available( @@ -221,6 +233,7 @@ fastdds::dds::SubscriberQos CommonReader::reckon_subscriber_qos_() const { qos.partition().push_back("*"); } + qos.entity_factory().autoenable_created_entities = false; return qos; } diff --git a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp index fb4b5b7ee..3587d930c 100644 --- a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp @@ -107,6 +107,7 @@ void CommonReader::internal_entities_creation_( // Create CommonReader // Listener must be set in creation as no callbacks should be missed // It is safe to do so here as object is already created and callbacks do not require anything set in this method + // Also, no data races can ocurr as no callback will be called until this reader is registered rtps_reader_ = fastdds::rtps::RTPSDomain::createRTPSReader( rtps_participant_, non_const_reader_attributes, From 71631b6f8d73e3a51e7eada45fdbd7f6126bc3c4 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Fri, 11 Jul 2025 09:50:37 +0200 Subject: [PATCH 7/9] Refs #21670: Apply review 3 Signed-off-by: cferreiragonz --- .../participant/dds/CommonParticipant.hpp | 7 +++++++ .../dynamic_types/DynTypesParticipant.hpp | 8 ++++++-- .../cpp/participant/dds/CommonParticipant.cpp | 16 +++++++++++----- .../dynamic_types/DynTypesParticipant.cpp | 8 +++++--- .../cpp/participant/rtps/CommonParticipant.cpp | 6 ++++++ .../src/cpp/reader/dds/CommonReader.cpp | 1 + 6 files changed, 36 insertions(+), 10 deletions(-) diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp index 244f9a9f5..8ea880960 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp @@ -116,6 +116,7 @@ class CommonParticipant : public core::IParticipant { public: + DDSPIPE_PARTICIPANTS_DllAPI explicit DDSListener( std::shared_ptr conf, std::shared_ptr ddb); @@ -125,6 +126,7 @@ class CommonParticipant : public core::IParticipant * * This method is only used for debugging purposes. */ + DDSPIPE_PARTICIPANTS_DllAPI void on_participant_discovery( fastdds::dds::DomainParticipant* participant, fastdds::rtps::ParticipantDiscoveryStatus reason, @@ -136,6 +138,7 @@ class CommonParticipant : public core::IParticipant * * This method adds to the database the discovered or modified endpoint. */ + DDSPIPE_PARTICIPANTS_DllAPI void on_data_reader_discovery( fastdds::dds::DomainParticipant* participant, fastdds::rtps::ReaderDiscoveryStatus reason, @@ -147,6 +150,7 @@ class CommonParticipant : public core::IParticipant * * This method adds to the database the discovered or modified endpoint. */ + DDSPIPE_PARTICIPANTS_DllAPI void on_data_writer_discovery( fastdds::dds::DomainParticipant* participant, fastdds::rtps::WriterDiscoveryStatus reason, @@ -192,15 +196,18 @@ class CommonParticipant : public core::IParticipant // INTERNAL VIRTUAL METHODS ///////////////////////// + DDSPIPE_PARTICIPANTS_DllAPI virtual fastdds::dds::DomainParticipantQos add_qos_properties_( fastdds::dds::DomainParticipantQos& qos) const; + DDSPIPE_PARTICIPANTS_DllAPI virtual fastdds::dds::DomainParticipantQos reckon_participant_qos_() const; + DDSPIPE_PARTICIPANTS_DllAPI virtual fastdds::dds::DomainParticipant* create_dds_participant_(); diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp index 1b4fdb7af..6bc62e0eb 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp @@ -82,7 +82,7 @@ class DynTypesParticipant : public rtps::SimpleParticipant explicit DynTypesRtpsListener( std::shared_ptr conf, std::shared_ptr ddb, - core::types::ParticipantId id); + std::shared_ptr internal_reader); DDSPIPE_PARTICIPANTS_DllAPI void on_reader_discovery( @@ -106,7 +106,7 @@ class DynTypesParticipant : public rtps::SimpleParticipant protected: - //! Type Object Internal Reader + //! Copy of Type Object Internal Reader std::shared_ptr type_object_reader_; //! Received types set std::set received_types_; @@ -121,6 +121,10 @@ class DynTypesParticipant : public rtps::SimpleParticipant //! Override method from \c CommonParticipant to create the internal RTPS participant listener std::unique_ptr create_listener_() override; + + //! Type Object Internal Reader + std::shared_ptr type_object_reader_; + }; } /* namespace participants */ diff --git a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp index 10999c215..b69308a84 100644 --- a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp @@ -54,11 +54,6 @@ CommonParticipant::~CommonParticipant() } } -std::unique_ptr CommonParticipant::create_listener_() -{ - return std::make_unique(configuration_, discovery_database_); -} - void CommonParticipant::init() { EPROSIMA_LOG_INFO(DDSPIPE_DDS_PARTICIPANT, "Initializing DDS Participant " << id() << "."); @@ -222,6 +217,7 @@ CommonParticipant::DDSListener::DDSListener( : configuration_(conf) , discovery_database_(ddb) { + EPROSIMA_LOG_INFO(DDSPIPE_DDS_PARTICIPANT, "Creating DDS Listener for Participant " << conf->id << "."); } void CommonParticipant::DDSListener::on_participant_discovery( @@ -372,6 +368,12 @@ CommonParticipant::CommonParticipant( // Do nothing } +std::unique_ptr CommonParticipant::create_listener_() +{ + EPROSIMA_LOG_INFO(DDSPIPE_DDS_PARTICIPANT, "Creating DDS Listener from CommonParticipant."); + return std::make_unique(configuration_, discovery_database_); +} + fastdds::dds::DomainParticipantQos CommonParticipant::add_qos_properties_( fastdds::dds::DomainParticipantQos& qos) const { @@ -414,6 +416,10 @@ fastdds::dds::DomainParticipant* CommonParticipant::create_dds_participant_() // Create the participant listener dds_participant_listener_ = create_listener_(); + if (!dds_participant_listener_) + { + EPROSIMA_LOG_WARNING(DDSPIPE_DDS_PARTICIPANT, "Error creating DDS Participant Listener."); + } return eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant( configuration_->domain, diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp index ee417490e..c84ac9ee2 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp @@ -50,6 +50,8 @@ DynTypesParticipant::DynTypesParticipant( participant_configuration, payload_pool, discovery_database) + , type_object_reader_(std::make_shared( + this->id())) { // Do nothing } @@ -78,11 +80,11 @@ std::shared_ptr DynTypesParticipant::create_reader( DynTypesParticipant::DynTypesRtpsListener::DynTypesRtpsListener( std::shared_ptr conf, std::shared_ptr ddb, - core::types::ParticipantId id) + std::shared_ptr internal_reader) : rtps::CommonParticipant::RtpsListener( conf, ddb) - , type_object_reader_(std::make_shared(id)) + , type_object_reader_(internal_reader) { } @@ -178,7 +180,7 @@ void DynTypesParticipant::DynTypesRtpsListener::notify_type_discovered_( std::unique_ptr DynTypesParticipant::create_listener_() { - return std::make_unique(configuration_, discovery_database_, id()); + return std::make_unique(configuration_, discovery_database_, type_object_reader_); } } /* namespace participants */ diff --git a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp index d09142f11..3799bebfb 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp @@ -85,6 +85,7 @@ CommonParticipant::RtpsListener::RtpsListener( : configuration_(conf) , discovery_database_(ddb) { + EPROSIMA_LOG_INFO(DDSPIPE_RTPS_PARTICIPANT, "Creating RTPS Listener for Participant " << conf->id << "."); } void CommonParticipant::RtpsListener::on_participant_discovery( @@ -336,6 +337,10 @@ void CommonParticipant::create_participant_( // Create the RTPS Participant Listener rtps_participant_listener_ = create_listener_(); + if (!rtps_participant_listener_) + { + EPROSIMA_LOG_WARNING(DDSPIPE_RTPS_PARTICIPANT, "Error creating RTPS Participant Listener."); + } // Listener must be set in creation as no callbacks should be missed // It is safe to do so here as object is already created and callbacks do not require anything set in this method @@ -514,6 +519,7 @@ CommonParticipant::reckon_participant_attributes_() const std::unique_ptr CommonParticipant::create_listener_() { + EPROSIMA_LOG_INFO(DDSPIPE_RTPS_PARTICIPANT, "Creating RTPS Listener from CommonParticipant."); return std::make_unique(configuration_, discovery_database_); } diff --git a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp index bed8d9e0a..c520fba08 100644 --- a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp @@ -93,6 +93,7 @@ void CommonReader::init() if (fastdds::dds::RETCODE_OK != reader_->enable()) { dds_subscriber_->delete_datareader(reader_); + dds_subscriber_ = nullptr; throw utils::InitializationException( utils::Formatter() << "Error enabling DataReader for Participant " << participant_id_ << " in topic " << topic_ << "."); From 514a700975663fe5399a8eef145f9652d76ef0a5 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Fri, 11 Jul 2025 10:02:16 +0200 Subject: [PATCH 8/9] Refs #21670: Avoid using capital letters in DDS Listener Signed-off-by: cferreiragonz --- .../participant/dds/CommonParticipant.hpp | 4 ++-- .../src/cpp/participant/dds/CommonParticipant.cpp | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp index 8ea880960..460bb2ed2 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp @@ -112,12 +112,12 @@ class CommonParticipant : public core::IParticipant // LISTENER METHODS ///////////////////////// - class DDSListener : public fastdds::dds::DomainParticipantListener + class DdsListener : public fastdds::dds::DomainParticipantListener { public: DDSPIPE_PARTICIPANTS_DllAPI - explicit DDSListener( + explicit DdsListener( std::shared_ptr conf, std::shared_ptr ddb); diff --git a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp index b69308a84..401c92e05 100644 --- a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp @@ -211,7 +211,7 @@ std::shared_ptr CommonParticipant::create_reader( } } -CommonParticipant::DDSListener::DDSListener( +CommonParticipant::DdsListener::DdsListener( std::shared_ptr conf, std::shared_ptr ddb) : configuration_(conf) @@ -220,7 +220,7 @@ CommonParticipant::DDSListener::DDSListener( EPROSIMA_LOG_INFO(DDSPIPE_DDS_PARTICIPANT, "Creating DDS Listener for Participant " << conf->id << "."); } -void CommonParticipant::DDSListener::on_participant_discovery( +void CommonParticipant::DdsListener::on_participant_discovery( fastdds::dds::DomainParticipant* participant, fastdds::rtps::ParticipantDiscoveryStatus reason, const fastdds::rtps::ParticipantBuiltinTopicData& info, @@ -257,7 +257,7 @@ void CommonParticipant::DDSListener::on_participant_discovery( } } -void CommonParticipant::DDSListener::on_data_reader_discovery( +void CommonParticipant::DdsListener::on_data_reader_discovery( fastdds::dds::DomainParticipant* participant, fastdds::rtps::ReaderDiscoveryStatus reason, const fastdds::dds::SubscriptionBuiltinTopicData& info, @@ -307,7 +307,7 @@ void CommonParticipant::DDSListener::on_data_reader_discovery( } } -void CommonParticipant::DDSListener::on_data_writer_discovery( +void CommonParticipant::DdsListener::on_data_writer_discovery( fastdds::dds::DomainParticipant* participant, fastdds::rtps::WriterDiscoveryStatus reason, const fastdds::dds::PublicationBuiltinTopicData& info, @@ -371,7 +371,7 @@ CommonParticipant::CommonParticipant( std::unique_ptr CommonParticipant::create_listener_() { EPROSIMA_LOG_INFO(DDSPIPE_DDS_PARTICIPANT, "Creating DDS Listener from CommonParticipant."); - return std::make_unique(configuration_, discovery_database_); + return std::make_unique(configuration_, discovery_database_); } fastdds::dds::DomainParticipantQos CommonParticipant::add_qos_properties_( From 2277ab279ac6296ed42144e6a97a4f541d1bebc0 Mon Sep 17 00:00:00 2001 From: cferreiragonz Date: Fri, 11 Jul 2025 13:31:57 +0200 Subject: [PATCH 9/9] Refs #21670: Fix leak Signed-off-by: cferreiragonz --- ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp index c520fba08..c74dbae15 100644 --- a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp @@ -93,7 +93,7 @@ void CommonReader::init() if (fastdds::dds::RETCODE_OK != reader_->enable()) { dds_subscriber_->delete_datareader(reader_); - dds_subscriber_ = nullptr; + reader_ = nullptr; throw utils::InitializationException( utils::Formatter() << "Error enabling DataReader for Participant " << participant_id_ << " in topic " << topic_ << ".");