diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp index 669cd04f4..460bb2ed2 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/CommonParticipant.hpp @@ -60,7 +60,7 @@ namespace dds { * @warning This Participant class does not support RPC so far. * @todo TODO */ -class CommonParticipant : public core::IParticipant, public fastdds::dds::DomainParticipantListener +class CommonParticipant : public core::IParticipant { public: @@ -112,23 +112,62 @@ class CommonParticipant : public core::IParticipant, public fastdds::dds::Domain // LISTENER METHODS ///////////////////////// - void on_participant_discovery( - fastdds::dds::DomainParticipant* participant, - fastdds::rtps::ParticipantDiscoveryStatus reason, - const fastdds::rtps::ParticipantBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; - - void on_data_reader_discovery( - fastdds::dds::DomainParticipant* participant, - fastdds::rtps::ReaderDiscoveryStatus reason, - const fastdds::dds::SubscriptionBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; - - void on_data_writer_discovery( - fastdds::dds::DomainParticipant* participant, - fastdds::rtps::WriterDiscoveryStatus reason, - const fastdds::dds::PublicationBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; + class DdsListener : public fastdds::dds::DomainParticipantListener + { + public: + + DDSPIPE_PARTICIPANTS_DllAPI + explicit DdsListener( + std::shared_ptr conf, + std::shared_ptr ddb); + + /** + * @brief Override method from \c DomainParticipantListener + * + * This method is only used for debugging purposes. + */ + DDSPIPE_PARTICIPANTS_DllAPI + void on_participant_discovery( + fastdds::dds::DomainParticipant* participant, + fastdds::rtps::ParticipantDiscoveryStatus reason, + const fastdds::rtps::ParticipantBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + /** + * @brief Override method from \c DomainParticipantListener . + * + * This method adds to the database the discovered or modified endpoint. + */ + DDSPIPE_PARTICIPANTS_DllAPI + void on_data_reader_discovery( + fastdds::dds::DomainParticipant* participant, + fastdds::rtps::ReaderDiscoveryStatus reason, + const fastdds::dds::SubscriptionBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + /** + * @brief Override method from \c DomainParticipantListener . + * + * This method adds to the database the discovered or modified endpoint. + */ + DDSPIPE_PARTICIPANTS_DllAPI + void on_data_writer_discovery( + fastdds::dds::DomainParticipant* participant, + fastdds::rtps::WriterDiscoveryStatus reason, + const fastdds::dds::PublicationBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + protected: + + //! Shared pointer to the configuration of the participant + const std::shared_ptr configuration_; + //! Shared pointer to the discovery database + const std::shared_ptr discovery_database_; + + }; + + //! Unique pointer to the internal DDS Participant Listener + std::unique_ptr dds_participant_listener_; protected: @@ -142,19 +181,33 @@ class CommonParticipant : public core::IParticipant, public fastdds::dds::Domain const std::shared_ptr& payload_pool, const std::shared_ptr& discovery_database); + ///////////////////////// + // VIRTUAL METHODS + ///////////////////////// + + /** + * @brief Virtual method that creates a listener for the internal DDS Participant. + * It should be overridden if a different listener is needed. + */ + DDSPIPE_PARTICIPANTS_DllAPI + virtual std::unique_ptr create_listener_(); + ///////////////////////// // INTERNAL VIRTUAL METHODS ///////////////////////// + DDSPIPE_PARTICIPANTS_DllAPI virtual fastdds::dds::DomainParticipantQos add_qos_properties_( fastdds::dds::DomainParticipantQos& qos) const; + DDSPIPE_PARTICIPANTS_DllAPI virtual fastdds::dds::DomainParticipantQos reckon_participant_qos_() const; + DDSPIPE_PARTICIPANTS_DllAPI virtual fastdds::dds::DomainParticipant* create_dds_participant_(); diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp index 0ddfda678..6bc62e0eb 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesParticipant.hpp @@ -74,31 +74,57 @@ class DynTypesParticipant : public rtps::SimpleParticipant std::shared_ptr create_reader( const core::ITopic& topic) override; - DDSPIPE_PARTICIPANTS_DllAPI - void on_reader_discovery( - fastdds::rtps::RTPSParticipant* participant, - fastdds::rtps::ReaderDiscoveryStatus reason, - const fastdds::rtps::SubscriptionBuiltinTopicData& info, - bool& should_be_ignored) override; - - DDSPIPE_PARTICIPANTS_DllAPI - void on_writer_discovery( - fastdds::rtps::RTPSParticipant* participant, - fastdds::rtps::WriterDiscoveryStatus reason, - const fastdds::rtps::PublicationBuiltinTopicData& info, - bool& should_be_ignored) override; + class DynTypesRtpsListener : public rtps::CommonParticipant::RtpsListener + { + public: + + DDSPIPE_PARTICIPANTS_DllAPI + explicit DynTypesRtpsListener( + std::shared_ptr conf, + std::shared_ptr ddb, + std::shared_ptr internal_reader); + + DDSPIPE_PARTICIPANTS_DllAPI + void on_reader_discovery( + fastdds::rtps::RTPSParticipant* participant, + fastdds::rtps::ReaderDiscoveryStatus reason, + const fastdds::rtps::SubscriptionBuiltinTopicData& info, + bool& should_be_ignored) override; + + DDSPIPE_PARTICIPANTS_DllAPI + void on_writer_discovery( + fastdds::rtps::RTPSParticipant* participant, + fastdds::rtps::WriterDiscoveryStatus reason, + const fastdds::rtps::PublicationBuiltinTopicData& info, + bool& should_be_ignored) override; + + //! Type Object Reader getter + inline std::shared_ptr type_object_reader() const + { + return type_object_reader_; + } + + protected: + + //! Copy of Type Object Internal Reader + std::shared_ptr type_object_reader_; + //! Received types set + std::set received_types_; + + void notify_type_discovered_( + const fastdds::dds::xtypes::TypeInformation& type_info, + const std::string& type_name); + + }; protected: - void notify_type_discovered_( - const fastdds::dds::xtypes::TypeInformation& type_info, - const std::string& type_name); + //! Override method from \c CommonParticipant to create the internal RTPS participant listener + std::unique_ptr create_listener_() override; //! Type Object Internal Reader std::shared_ptr type_object_reader_; - //! Received types set - std::set received_types_; }; } /* namespace participants */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp index d77619605..117464db3 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp @@ -52,7 +52,6 @@ namespace rtps { */ class CommonParticipant : public core::IParticipant - , public fastdds::rtps::RTPSParticipantListener { public: @@ -123,41 +122,62 @@ class CommonParticipant // RTPS LISTENER METHODS ///////////////////////// - /** - * @brief Override method from \c RTPSParticipantListener . - * - * This method only is for debugging purposes. - */ - DDSPIPE_PARTICIPANTS_DllAPI - virtual void on_participant_discovery( - fastdds::rtps::RTPSParticipant* participant, - fastdds::rtps::ParticipantDiscoveryStatus reason, - const fastdds::rtps::ParticipantBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; - - /** - * @brief Override method from \c RTPSParticipantListener . - * - * This method adds to database the endpoint discovered or modified. - */ - DDSPIPE_PARTICIPANTS_DllAPI - virtual void on_reader_discovery( - fastdds::rtps::RTPSParticipant* participant, - fastdds::rtps::ReaderDiscoveryStatus reason, - const fastdds::rtps::SubscriptionBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; - - /** - * @brief Override method from \c RTPSParticipantListener . - * - * This method adds to database the endpoint discovered or modified. - */ - DDSPIPE_PARTICIPANTS_DllAPI - virtual void on_writer_discovery( - fastdds::rtps::RTPSParticipant* participant, - fastdds::rtps::WriterDiscoveryStatus reason, - const fastdds::rtps::PublicationBuiltinTopicData& info, - bool& /*should_be_ignored*/) override; + class RtpsListener : public fastdds::rtps::RTPSParticipantListener + { + public: + + DDSPIPE_PARTICIPANTS_DllAPI + explicit RtpsListener( + std::shared_ptr conf, + std::shared_ptr ddb); + + /** + * @brief Override method from \c RTPSParticipantListener . + * + * This method is only used for debugging purposes. + */ + DDSPIPE_PARTICIPANTS_DllAPI + virtual void on_participant_discovery( + fastdds::rtps::RTPSParticipant* participant, + fastdds::rtps::ParticipantDiscoveryStatus reason, + const fastdds::rtps::ParticipantBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + /** + * @brief Override method from \c RTPSParticipantListener . + * + * This method adds to database the endpoint discovered or modified. + */ + DDSPIPE_PARTICIPANTS_DllAPI + virtual void on_reader_discovery( + fastdds::rtps::RTPSParticipant* participant, + fastdds::rtps::ReaderDiscoveryStatus reason, + const fastdds::rtps::SubscriptionBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + /** + * @brief Override method from \c RTPSParticipantListener . + * + * This method adds to database the endpoint discovered or modified. + */ + DDSPIPE_PARTICIPANTS_DllAPI + virtual void on_writer_discovery( + fastdds::rtps::RTPSParticipant* participant, + fastdds::rtps::WriterDiscoveryStatus reason, + const fastdds::rtps::PublicationBuiltinTopicData& info, + bool& /*should_be_ignored*/) override; + + protected: + + //! Shared pointer to the configuration of the participant + const std::shared_ptr configuration_; + //! Shared pointer to the discovery database + const std::shared_ptr discovery_database_; + + }; + + //! Unique pointer to the internal RTPS Participant Listener + std::unique_ptr rtps_participant_listener_; ////////////////// // STATIC METHODS @@ -221,6 +241,15 @@ class CommonParticipant DDSPIPE_PARTICIPANTS_DllAPI virtual fastdds::rtps::RTPSParticipantAttributes reckon_participant_attributes_() const; + /** + * @brief Virtual method that creates a listener for the internal RTPS Participant. + * It should be overridden if a different listener is needed. + * This method must be called after the RTPS Participant is created, otherwise no listener will be set. + * @return A unique pointer to an RTPS Participant Listener. + */ + DDSPIPE_PARTICIPANTS_DllAPI + virtual std::unique_ptr create_listener_(); + ///// // VARIABLES diff --git a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp index ff4638575..401c92e05 100644 --- a/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/CommonParticipant.cpp @@ -48,11 +48,7 @@ CommonParticipant::~CommonParticipant() if (dds_participant_) { dds_participant_->set_listener(nullptr); - - for (auto& topic : dds_topics_) - { - dds_participant_->delete_topic(topic.second); - } + dds_participant_->delete_contained_entities(); eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(dds_participant_); } @@ -215,7 +211,16 @@ std::shared_ptr CommonParticipant::create_reader( } } -void CommonParticipant::on_participant_discovery( +CommonParticipant::DdsListener::DdsListener( + std::shared_ptr conf, + std::shared_ptr ddb) + : configuration_(conf) + , discovery_database_(ddb) +{ + EPROSIMA_LOG_INFO(DDSPIPE_DDS_PARTICIPANT, "Creating DDS Listener for Participant " << conf->id << "."); +} + +void CommonParticipant::DdsListener::on_participant_discovery( fastdds::dds::DomainParticipant* participant, fastdds::rtps::ParticipantDiscoveryStatus reason, const fastdds::rtps::ParticipantBuiltinTopicData& info, @@ -252,14 +257,14 @@ void CommonParticipant::on_participant_discovery( } } -void CommonParticipant::on_data_reader_discovery( - fastdds::dds::DomainParticipant*, +void CommonParticipant::DdsListener::on_data_reader_discovery( + fastdds::dds::DomainParticipant* participant, fastdds::rtps::ReaderDiscoveryStatus reason, const fastdds::dds::SubscriptionBuiltinTopicData& info, bool& /*should_be_ignored*/) { // If reader is from other participant, store it in discovery database - if (detail::come_from_same_participant_(info.guid, this->dds_participant_->guid())) + if (detail::come_from_same_participant_(info.guid, participant->guid())) { // Come from this participant, do nothing return; @@ -267,7 +272,7 @@ void CommonParticipant::on_data_reader_discovery( // Calculate endpoint info core::types::Endpoint info_reader = - detail::create_endpoint_from_info_(info, id()); + detail::create_endpoint_from_info_(info, configuration_->id); // If new endpoint discovered if (reason == fastdds::rtps::ReaderDiscoveryStatus::DISCOVERED_READER) @@ -276,14 +281,14 @@ void CommonParticipant::on_data_reader_discovery( "Found in Participant " << configuration_->id << " new Reader " << info.guid << "."); // TODO check logic because if an endpoint is lost by liveliness it may be inserted again when already in database - this->discovery_database_->add_endpoint(info_reader); + discovery_database_->add_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::CHANGED_QOS_READER) { EPROSIMA_LOG_INFO(DDSPIPE_DISCOVERY, configuration_->id << " participant : " << "Reader " << info.guid << " changed TopicQoS."); - this->discovery_database_->update_endpoint(info_reader); + discovery_database_->update_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::REMOVED_READER) { @@ -291,7 +296,7 @@ void CommonParticipant::on_data_reader_discovery( configuration_->id << " participant : " << "Reader " << info.guid << " removed."); info_reader.active = false; - this->discovery_database_->update_endpoint(info_reader); + discovery_database_->update_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::IGNORED_READER) { @@ -302,14 +307,14 @@ void CommonParticipant::on_data_reader_discovery( } } -void CommonParticipant::on_data_writer_discovery( - fastdds::dds::DomainParticipant*, +void CommonParticipant::DdsListener::on_data_writer_discovery( + fastdds::dds::DomainParticipant* participant, fastdds::rtps::WriterDiscoveryStatus reason, const fastdds::dds::PublicationBuiltinTopicData& info, bool& /*should_be_ignored*/) { // If writer is from other participant, store it in discovery database - if (detail::come_from_same_participant_(info.guid, this->dds_participant_->guid())) + if (detail::come_from_same_participant_(info.guid, participant->guid())) { // Come from this participant, do nothing return; @@ -317,7 +322,7 @@ void CommonParticipant::on_data_writer_discovery( // Calculate endpoint info core::types::Endpoint info_writer = - detail::create_endpoint_from_info_(info, id()); + detail::create_endpoint_from_info_(info, configuration_->id); // If new endpoint discovered if (reason == fastdds::rtps::WriterDiscoveryStatus::DISCOVERED_WRITER) @@ -326,14 +331,14 @@ void CommonParticipant::on_data_writer_discovery( "Found in Participant " << configuration_->id << " new Writer " << info.guid << "."); // TODO check logic because if an endpoint is lost by liveliness it may be inserted again when already in database - this->discovery_database_->add_endpoint(info_writer); + discovery_database_->add_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::CHANGED_QOS_WRITER) { EPROSIMA_LOG_INFO(DDSPIPE_DISCOVERY, configuration_->id << " participant : " << "Writer " << info.guid << " changed TopicQoS."); - this->discovery_database_->update_endpoint(info_writer); + discovery_database_->update_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::REMOVED_WRITER) { @@ -341,7 +346,7 @@ void CommonParticipant::on_data_writer_discovery( configuration_->id << " participant : " << "Writer " << info.guid << " removed."); info_writer.active = false; - this->discovery_database_->update_endpoint(info_writer); + discovery_database_->update_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::IGNORED_WRITER) { @@ -363,6 +368,12 @@ CommonParticipant::CommonParticipant( // Do nothing } +std::unique_ptr CommonParticipant::create_listener_() +{ + EPROSIMA_LOG_INFO(DDSPIPE_DDS_PARTICIPANT, "Creating DDS Listener from CommonParticipant."); + return std::make_unique(configuration_, discovery_database_); +} + fastdds::dds::DomainParticipantQos CommonParticipant::add_qos_properties_( fastdds::dds::DomainParticipantQos& qos) const { @@ -403,10 +414,17 @@ fastdds::dds::DomainParticipant* CommonParticipant::create_dds_participant_() mask << fastdds::dds::StatusMask::publication_matched(); mask << fastdds::dds::StatusMask::subscription_matched(); + // Create the participant listener + dds_participant_listener_ = create_listener_(); + if (!dds_participant_listener_) + { + EPROSIMA_LOG_WARNING(DDSPIPE_DDS_PARTICIPANT, "Error creating DDS Participant Listener."); + } + return eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant( configuration_->domain, reckon_participant_qos_(), - this, + dds_participant_listener_.get(), mask); } diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp index f2d60025e..c84ac9ee2 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp @@ -69,14 +69,26 @@ std::shared_ptr DynTypesParticipant::create_reader( // If type object topic, return the internal reader for type objects if (core::types::is_type_object_topic(topic)) { - return this->type_object_reader_; + return static_cast(rtps_participant_listener_.get())-> + type_object_reader(); } // If not type object, use the parent method return rtps::SimpleParticipant::create_reader(topic); } -void DynTypesParticipant::on_reader_discovery( +DynTypesParticipant::DynTypesRtpsListener::DynTypesRtpsListener( + std::shared_ptr conf, + std::shared_ptr ddb, + std::shared_ptr internal_reader) + : rtps::CommonParticipant::RtpsListener( + conf, + ddb) + , type_object_reader_(internal_reader) +{ +} + +void DynTypesParticipant::DynTypesRtpsListener::on_reader_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::ReaderDiscoveryStatus reason, const fastdds::rtps::SubscriptionBuiltinTopicData& info, @@ -88,13 +100,13 @@ void DynTypesParticipant::on_reader_discovery( const auto type_info = info.type_information.type_information; const auto type_name = info.type_name.to_string(); - rtps::CommonParticipant::on_reader_discovery(participant, reason, info, should_be_ignored); + rtps::CommonParticipant::RtpsListener::on_reader_discovery(participant, reason, info, should_be_ignored); notify_type_discovered_(type_info, type_name); } } -void DynTypesParticipant::on_writer_discovery( +void DynTypesParticipant::DynTypesRtpsListener::on_writer_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::WriterDiscoveryStatus reason, const fastdds::rtps::PublicationBuiltinTopicData& info, @@ -106,13 +118,13 @@ void DynTypesParticipant::on_writer_discovery( const auto type_info = info.type_information.type_information; const auto type_name = info.type_name.to_string(); - rtps::CommonParticipant::on_writer_discovery(participant, reason, info, should_be_ignored); + rtps::CommonParticipant::RtpsListener::on_writer_discovery(participant, reason, info, should_be_ignored); notify_type_discovered_(type_info, type_name); } } -void DynTypesParticipant::notify_type_discovered_( +void DynTypesParticipant::DynTypesRtpsListener::notify_type_discovered_( const fastdds::dds::xtypes::TypeInformation& type_info, const std::string& type_name) { @@ -150,7 +162,7 @@ void DynTypesParticipant::notify_type_discovered_( // Notify type_identifier // NOTE: We assume each type_name corresponds to only one type_identifier EPROSIMA_LOG_INFO(DDSPIPE_DYNTYPES_PARTICIPANT, - "Participant " << this->id() << " discovered type object " << dyn_type->get_name()); + "Participant " << configuration_->id << " discovered type object " << dyn_type->get_name()); monitor_type_discovered(type_name); @@ -166,6 +178,11 @@ void DynTypesParticipant::notify_type_discovered_( type_object_reader_->simulate_data_reception(std::move(data)); } +std::unique_ptr DynTypesParticipant::create_listener_() +{ + return std::make_unique(configuration_, discovery_database_, type_object_reader_); +} + } /* namespace participants */ } /* namespace ddspipe */ } /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp index 0414b816b..3799bebfb 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp @@ -79,7 +79,16 @@ void CommonParticipant::init() participant_attributes_); } -void CommonParticipant::on_participant_discovery( +CommonParticipant::RtpsListener::RtpsListener( + std::shared_ptr conf, + std::shared_ptr ddb) + : configuration_(conf) + , discovery_database_(ddb) +{ + EPROSIMA_LOG_INFO(DDSPIPE_RTPS_PARTICIPANT, "Creating RTPS Listener for Participant " << conf->id << "."); +} + +void CommonParticipant::RtpsListener::on_participant_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::ParticipantDiscoveryStatus reason, const fastdds::rtps::ParticipantBuiltinTopicData& info, @@ -116,7 +125,7 @@ void CommonParticipant::on_participant_discovery( } } -void CommonParticipant::on_reader_discovery( +void CommonParticipant::RtpsListener::on_reader_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::ReaderDiscoveryStatus reason, const fastdds::rtps::SubscriptionBuiltinTopicData& info, @@ -126,14 +135,14 @@ void CommonParticipant::on_reader_discovery( { core::types::Endpoint info_reader = detail::create_endpoint_from_info_( - info, this->id()); + info, configuration_->id); if (reason == fastdds::rtps::ReaderDiscoveryStatus::DISCOVERED_READER) { EPROSIMA_LOG_INFO(DDSPIPE_DISCOVERY, "Found in Participant " << configuration_->id << " new Reader " << info.guid << "."); - this->discovery_database_->add_endpoint(info_reader); + discovery_database_->add_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::CHANGED_QOS_READER) { @@ -141,7 +150,7 @@ void CommonParticipant::on_reader_discovery( configuration_->id << " participant : " << "Reader " << info.guid << " changed TopicQoS."); - this->discovery_database_->update_endpoint(info_reader); + discovery_database_->update_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::REMOVED_READER) { @@ -149,7 +158,7 @@ void CommonParticipant::on_reader_discovery( configuration_->id << " participant : " << "Reader " << info.guid << " removed."); info_reader.active = false; - this->discovery_database_->update_endpoint(info_reader); + discovery_database_->update_endpoint(info_reader); } else if (reason == fastdds::rtps::ReaderDiscoveryStatus::IGNORED_READER) { @@ -161,7 +170,7 @@ void CommonParticipant::on_reader_discovery( } } -void CommonParticipant::on_writer_discovery( +void CommonParticipant::RtpsListener::on_writer_discovery( fastdds::rtps::RTPSParticipant* participant, fastdds::rtps::WriterDiscoveryStatus reason, const fastdds::rtps::PublicationBuiltinTopicData& info, @@ -171,14 +180,14 @@ void CommonParticipant::on_writer_discovery( { core::types::Endpoint info_writer = detail::create_endpoint_from_info_( - info, this->id()); + info, configuration_->id); if (reason == fastdds::rtps::WriterDiscoveryStatus::DISCOVERED_WRITER) { EPROSIMA_LOG_INFO(DDSPIPE_DISCOVERY, "Found in Participant " << configuration_->id << " new Writer " << info.guid << "."); - this->discovery_database_->add_endpoint(info_writer); + discovery_database_->add_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::CHANGED_QOS_WRITER) { @@ -186,7 +195,7 @@ void CommonParticipant::on_writer_discovery( configuration_->id << " participant : " << "Writer " << info.guid << " changed TopicQoS."); - this->discovery_database_->update_endpoint(info_writer); + discovery_database_->update_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::REMOVED_WRITER) { @@ -194,7 +203,7 @@ void CommonParticipant::on_writer_discovery( configuration_->id << " participant : " << "Writer " << info.guid << " removed."); info_writer.active = false; - this->discovery_database_->update_endpoint(info_writer); + discovery_database_->update_endpoint(info_writer); } else if (reason == fastdds::rtps::WriterDiscoveryStatus::IGNORED_WRITER) { @@ -326,12 +335,19 @@ void CommonParticipant::create_participant_( EPROSIMA_LOG_INFO(DDSPIPE_RTPS_PARTICIPANT, "Creating Participant in domain " << domain); + // Create the RTPS Participant Listener + rtps_participant_listener_ = create_listener_(); + if (!rtps_participant_listener_) + { + EPROSIMA_LOG_WARNING(DDSPIPE_RTPS_PARTICIPANT, "Error creating RTPS Participant Listener."); + } + // Listener must be set in creation as no callbacks should be missed // It is safe to do so here as object is already created and callbacks do not require anything set in this method rtps_participant_ = fastdds::rtps::RTPSDomain::createParticipant( domain, participant_attributes, - this); + rtps_participant_listener_.get()); if (!rtps_participant_) { @@ -500,6 +516,13 @@ CommonParticipant::reckon_participant_attributes_() const return params; } +std::unique_ptr +CommonParticipant::create_listener_() +{ + EPROSIMA_LOG_INFO(DDSPIPE_RTPS_PARTICIPANT, "Creating RTPS Listener from CommonParticipant."); + return std::make_unique(configuration_, discovery_database_); +} + } /* namespace rtps */ } /* namespace participants */ } /* namespace ddspipe */ diff --git a/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp b/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp index 87449a8fc..3b2905c43 100644 --- a/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp +++ b/ddspipe_participants/src/cpp/reader/auxiliar/BaseReader.cpp @@ -165,7 +165,7 @@ bool BaseReader::should_accept_sample_() noexcept void BaseReader::on_data_available_() const noexcept { - if (on_data_available_lambda_set_) + if (on_data_available_lambda_set_ && enabled_.load()) { on_data_available_lambda_(); } diff --git a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp index da1dd7edf..c74dbae15 100644 --- a/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/dds/CommonReader.cpp @@ -75,7 +75,7 @@ void CommonReader::init() reader_ = dds_subscriber_->create_datareader( dds_topic_, reckon_reader_qos_(), - nullptr, + this, eprosima::fastdds::dds::StatusMask::all(), payload_pool_); @@ -86,9 +86,18 @@ void CommonReader::init() participant_id_ << " in topic " << topic_ << "."); } - // Set listener after entity creation to avoid SEGFAULT (produced when callback using reader_ is - // invoked before the variable is fully set) - reader_->set_listener(this); + // Subscriber is created with autoenable set to false, so we need to enable the reader manually. + // This is done just to ensure that the reader is not registered before any other method modifying the reader pointer + // is called, opening a window for potential data races. Although Fast DDS ensures that this cannot happen, this + // procedure protects against future bad practices introducing the aforementioned data races. + if (fastdds::dds::RETCODE_OK != reader_->enable()) + { + dds_subscriber_->delete_datareader(reader_); + reader_ = nullptr; + throw utils::InitializationException( + utils::Formatter() << "Error enabling DataReader for Participant " << + participant_id_ << " in topic " << topic_ << "."); + } } @@ -191,7 +200,8 @@ utils::ReturnCode CommonReader::take_nts_( // There has been an error taking the data. Exit. return ret; } - } while (!should_accept_sample_(info)); + } + while (!should_accept_sample_(info)); EPROSIMA_LOG_INFO(DDSPIPE_DDS_READER, "Data taken in " << participant_id_ << " for topic " << topic_ << "."); @@ -224,6 +234,7 @@ fastdds::dds::SubscriberQos CommonReader::reckon_subscriber_qos_() const { qos.partition().push_back("*"); } + qos.entity_factory().autoenable_created_entities = false; return qos; } diff --git a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp index befc5e950..3587d930c 100644 --- a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp @@ -107,6 +107,7 @@ void CommonReader::internal_entities_creation_( // Create CommonReader // Listener must be set in creation as no callbacks should be missed // It is safe to do so here as object is already created and callbacks do not require anything set in this method + // Also, no data races can ocurr as no callback will be called until this reader is registered rtps_reader_ = fastdds::rtps::RTPSDomain::createRTPSReader( rtps_participant_, non_const_reader_attributes, @@ -121,10 +122,6 @@ void CommonReader::internal_entities_creation_( participant_id_ << " in topic " << topic_ << "."); } - // Set listener after entity creation to avoid SEGFAULT (produced when callback using rtps_reader_ is - // invoked before the variable is fully set) - rtps_reader_->set_listener(this); - // Register reader with topic if (!rtps_participant_->register_reader(rtps_reader_, topic_description, reader_qos)) {