From dfca3da7d357f28fb7497be5668c93fca570d724 Mon Sep 17 00:00:00 2001 From: Raul Sanchez-Mateos Date: Mon, 17 Apr 2023 12:24:06 +0200 Subject: [PATCH 01/15] Add whitelist interfaces to DDS Pipe Signed-off-by: Raul Sanchez-Mateos --- .../SimpleParticipantConfiguration.hpp | 6 ++- .../participant/rtps/SimpleParticipant.hpp | 6 +++ .../SimpleParticipantConfiguration.cpp | 11 +++++ .../dynamic_types/DynTypesParticipant.cpp | 43 ++++++++++++++++- .../participant/rtps/SimpleParticipant.cpp | 48 ++++++++++++++++++- .../ddspipe_yaml/yaml_configuration_tags.hpp | 1 + .../src/cpp/YamlReader_participants.cpp | 6 +++ 7 files changed, 118 insertions(+), 3 deletions(-) diff --git a/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp b/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp index af259e50c..957048d25 100644 --- a/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp +++ b/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp @@ -16,6 +16,7 @@ #include #include +#include #include namespace eprosima { @@ -47,7 +48,10 @@ struct SimpleParticipantConfiguration : public ParticipantConfiguration // VARIABLES ///////////////////////// - core::types::DomainId domain {0u}; + eprosima::ddspipe::core::types::DomainId domain {0u}; + + std::set whitelist {}; + }; } /* namespace participants */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp index 668848a73..24f2031b7 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp @@ -46,6 +46,12 @@ class SimpleParticipant : public CommonParticipant const std::shared_ptr& participant_configuration, const std::shared_ptr& payload_pool, const std::shared_ptr& discovery_database); + +protected: + + static fastrtps::rtps::RTPSParticipantAttributes reckon_participant_attributes_( + const SimpleParticipantConfiguration* configuration); + }; } /* namespace rtps */ diff --git a/ddspipe_participants/src/cpp/configuration/SimpleParticipantConfiguration.cpp b/ddspipe_participants/src/cpp/configuration/SimpleParticipantConfiguration.cpp index 8c5f34e89..b7ac65ae2 100644 --- a/ddspipe_participants/src/cpp/configuration/SimpleParticipantConfiguration.cpp +++ b/ddspipe_participants/src/cpp/configuration/SimpleParticipantConfiguration.cpp @@ -15,6 +15,7 @@ #include #include +#include namespace eprosima { namespace ddspipe { @@ -34,6 +35,16 @@ bool SimpleParticipantConfiguration::is_valid( return false; } + // Check whitelist interfaces + for (types::IpType ip : whitelist) + { + if (!types::Address::is_ipv4_correct(ip)) + { + error_msg << "Incorrect IPv4 address " << ip << " in whitelist interfaces. "; + return false; + } + } + return true; } diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp index a1caebd1f..f0514eb9e 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp @@ -26,6 +26,8 @@ #include #include #include +#include +#include #include @@ -179,6 +181,10 @@ void DynTypesParticipant::internal_notify_type_object_( void DynTypesParticipant::initialize_internal_dds_participant_() { + + std::shared_ptr configuration = + std::dynamic_pointer_cast(this->configuration_); + eprosima::fastdds::dds::DomainParticipantQos pqos; pqos.name(this->id()); @@ -186,6 +192,41 @@ void DynTypesParticipant::initialize_internal_dds_participant_() pqos.wire_protocol().builtin.typelookup_config.use_server = false; pqos.wire_protocol().builtin.typelookup_config.use_client = true; + // Configure whitelist interfaces + if (!configuration->whitelist.empty()) + { + pqos.transport().use_builtin_transports = false; + + std::shared_ptr shm_transport = + std::make_shared(); + + std::shared_ptr udp_transport = + std::make_shared(); + + + for (const types::IpType& ip : configuration->whitelist) + { + if (types::Address::is_ipv4_correct(ip)) + { + udp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_DYNTYPES_PARTICIPANT, + "Adding " << ip << " to whitelist interfaces " << + " in Participant " << configuration->id << " initialization."); + + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_DYNTYPES_PARTICIPANT, + "Not valid IPv4. Discarding whitelist interface " << ip << + " in Participant " << configuration->id << " initialization."); + } + } + + pqos.transport().user_transports.push_back(shm_transport); + pqos.transport().user_transports.push_back(udp_transport); + } + // Force DDS entities to be created disabled // NOTE: this is very dangerous because we are modifying a global variable (and a not thread safe one) in a // local function. @@ -200,7 +241,7 @@ void DynTypesParticipant::initialize_internal_dds_participant_() // CREATE THE PARTICIPANT dds_participant_ = eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant( - std::dynamic_pointer_cast(this->configuration_)->domain, + configuration->domain, pqos); dds_participant_->set_listener(this); diff --git a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp index e8745d7ee..cf57374b2 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp @@ -16,6 +16,8 @@ #include #include +#include +#include #include @@ -33,10 +35,54 @@ SimpleParticipant::SimpleParticipant( payload_pool, discovery_database, participant_configuration->domain, - CommonParticipant::reckon_participant_attributes_(participant_configuration.get())) + reckon_participant_attributes_(participant_configuration.get())) { } +fastrtps::rtps::RTPSParticipantAttributes +SimpleParticipant::reckon_participant_attributes_( + const SimpleParticipantConfiguration* configuration) +{ + // Use default as base attributes + fastrtps::rtps::RTPSParticipantAttributes params = CommonParticipant::reckon_participant_attributes_(configuration); + + if (!configuration->whitelist.empty()) + { + params.useBuiltinTransports = false; + + std::shared_ptr shm_transport = + std::make_shared(); + + std::shared_ptr udp_transport = + std::make_shared(); + + + for (const types::IpType& ip : configuration->whitelist) + { + if (types::Address::is_ipv4_correct(ip)) + { + udp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_SIMPLE_PARTICIPANT, + "Adding " << ip << " to whitelist interfaces " << + " in Participant " << configuration->id << " initialization."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_SIMPLE_PARTICIPANT, + "Not valid IPv4. Discarding whitelist interface " << ip << + " in Participant " << configuration->id << " initialization."); + } + } + + params.userTransports.push_back(shm_transport); + params.userTransports.push_back(udp_transport); + } + + + return params; +} + } /* namespace rtps */ } /* namespace participants */ } /* namespace ddspipe */ diff --git a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp index e0eb9e80a..a3c77be4f 100644 --- a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp +++ b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp @@ -59,6 +59,7 @@ constexpr const char* ECHO_VERBOSE_TAG("verbose"); //! Echo in verbose mode // RTPS related tags // Simple RTPS related tags constexpr const char* DOMAIN_ID_TAG("domain"); //! Domain Id of the participant +constexpr const char* WHITELIST_INTERFACES_TAG("whitelist-interfaces"); //! Domain Id of the participant // Discovery Server related tags constexpr const char* DISCOVERY_SERVER_GUID_PREFIX_TAG("discovery-server-guid"); //! TODO: add comment diff --git a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp index c9188e829..f666ec4d1 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp @@ -132,6 +132,12 @@ void YamlReader::fill( { object.domain = get(yml, DOMAIN_ID_TAG, version); } + + // Optional whitelist interfaces + // if (YamlReader::is_tag_present(yml, WHITELIST_INTERFACES_TAG)) + // { + // object.whitelist = YamlReader::get_set(yml, WHITELIST_INTERFACES_TAG, version); + // } } template <> From 5c68dfb1d201e91d6c18842ff2d99bf84592d719 Mon Sep 17 00:00:00 2001 From: Raul Sanchez-Mateos Date: Tue, 18 Apr 2023 14:41:59 +0200 Subject: [PATCH 02/15] Add transport and ignore participant flags configuration Signed-off-by: Raul Sanchez-Mateos --- .../ddspipe_core/types/dds/Participant.hpp | 34 +++++++ .../SimpleParticipantConfiguration.hpp | 6 +- .../participant/rtps/SimpleParticipant.hpp | 4 + .../types/address/Address.hpp | 4 +- .../dynamic_types/DynTypesParticipant.cpp | 75 ++++++++++----- .../participant/rtps/SimpleParticipant.cpp | 96 ++++++++++++++----- .../ddspipe_yaml/yaml_configuration_tags.hpp | 10 ++ .../src/cpp/YamlReader_participants.cpp | 28 +++++- ddspipe_yaml/src/cpp/YamlReader_types.cpp | 20 ++++ 9 files changed, 222 insertions(+), 55 deletions(-) create mode 100644 ddspipe_core/include/ddspipe_core/types/dds/Participant.hpp diff --git a/ddspipe_core/include/ddspipe_core/types/dds/Participant.hpp b/ddspipe_core/include/ddspipe_core/types/dds/Participant.hpp new file mode 100644 index 000000000..afcab92a1 --- /dev/null +++ b/ddspipe_core/include/ddspipe_core/types/dds/Participant.hpp @@ -0,0 +1,34 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// limitations under the License. + +#pragma once + +namespace eprosima { +namespace ddspipe { +namespace core { +namespace types { + +enum class IgnoreParticipantFlags +{ + no_filter, + filter_different_host, + filter_different_process, + filter_same_process, + filter_different_and_same_process +}; + +} /* namespace types */ +} /* namespace core */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp b/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp index 957048d25..ca72c867f 100644 --- a/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp +++ b/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace eprosima { namespace ddspipe { @@ -50,8 +51,11 @@ struct SimpleParticipantConfiguration : public ParticipantConfiguration eprosima::ddspipe::core::types::DomainId domain {0u}; - std::set whitelist {}; + std::set whitelist {}; + participants::types::TransportProtocol transport {participants::types::TransportProtocol::builtin}; + + core::types::IgnoreParticipantFlags ignore_participant_flags {core::types::IgnoreParticipantFlags::no_filter}; }; } /* namespace participants */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp index 24f2031b7..088cbcb6d 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp @@ -14,6 +14,8 @@ #pragma once +#include + #include #include @@ -52,6 +54,8 @@ class SimpleParticipant : public CommonParticipant static fastrtps::rtps::RTPSParticipantAttributes reckon_participant_attributes_( const SimpleParticipantConfiguration* configuration); + static std::shared_ptr configure_upd_transport_( + std::set whitelist = {}); }; } /* namespace rtps */ diff --git a/ddspipe_participants/include/ddspipe_participants/types/address/Address.hpp b/ddspipe_participants/include/ddspipe_participants/types/address/Address.hpp index 00fd09a82..ba24dc5ca 100644 --- a/ddspipe_participants/include/ddspipe_participants/types/address/Address.hpp +++ b/ddspipe_participants/include/ddspipe_participants/types/address/Address.hpp @@ -45,7 +45,9 @@ enum class IpVersion : int enum class TransportProtocol { udp, - tcp + tcp, + shm, + builtin }; /** diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp index f0514eb9e..d47b1b2e5 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp @@ -28,7 +28,7 @@ #include #include #include - +#include #include #include @@ -192,41 +192,64 @@ void DynTypesParticipant::initialize_internal_dds_participant_() pqos.wire_protocol().builtin.typelookup_config.use_server = false; pqos.wire_protocol().builtin.typelookup_config.use_client = true; - // Configure whitelist interfaces - if (!configuration->whitelist.empty()) + // Configure Participant transports + if (configuration->transport == participants::types::TransportProtocol::builtin) { - pqos.transport().use_builtin_transports = false; - - std::shared_ptr shm_transport = - std::make_shared(); - - std::shared_ptr udp_transport = - std::make_shared(); + if (!configuration->whitelist.empty()) + { + pqos.transport().use_builtin_transports = false; + std::shared_ptr shm_transport = + std::make_shared(); + pqos.transport().user_transports.push_back(shm_transport); - for (const types::IpType& ip : configuration->whitelist) - { - if (types::Address::is_ipv4_correct(ip)) - { - udp_transport->interfaceWhiteList.emplace_back(ip); - logInfo(DDSPIPE_DYNTYPES_PARTICIPANT, - "Adding " << ip << " to whitelist interfaces " << - " in Participant " << configuration->id << " initialization."); + std::shared_ptr udp_transport = + SimpleParticipant::configure_upd_transport_(configuration->whitelist); + pqos.transport().user_transports.push_back(udp_transport); - } - else - { - // Invalid address, continue with next one - logWarning(DDSPIPE_DYNTYPES_PARTICIPANT, - "Not valid IPv4. Discarding whitelist interface " << ip << - " in Participant " << configuration->id << " initialization."); - } } + } + else if (configuration->transport == participants::types::TransportProtocol::shm) + { + pqos.transport().use_builtin_transports = false; + std::shared_ptr shm_transport = + std::make_shared(); pqos.transport().user_transports.push_back(shm_transport); + } + else if (configuration->transport == participants::types::TransportProtocol::udp) + { + std::shared_ptr udp_transport = + SimpleParticipant::configure_upd_transport_(configuration->whitelist); + pqos.transport().user_transports.push_back(udp_transport); pqos.transport().user_transports.push_back(udp_transport); } + // Participant discovery filter configuration + switch (configuration->ignore_participant_flags) + { + case core::types::IgnoreParticipantFlags::no_filter: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; + break; + case core::types::IgnoreParticipantFlags::filter_different_host: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; + break; + case core::types::IgnoreParticipantFlags::filter_different_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_same_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_different_and_same_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + static_cast( + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS | + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS); + break; + default: + break; + } + // Force DDS entities to be created disabled // NOTE: this is very dangerous because we are modifying a global variable (and a not thread safe one) in a // local function. diff --git a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp index cf57374b2..24dde3ef4 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include @@ -46,43 +47,92 @@ SimpleParticipant::reckon_participant_attributes_( // Use default as base attributes fastrtps::rtps::RTPSParticipantAttributes params = CommonParticipant::reckon_participant_attributes_(configuration); - if (!configuration->whitelist.empty()) + // Configure Participant transports + if (configuration->transport == participants::types::TransportProtocol::builtin) { - params.useBuiltinTransports = false; - - std::shared_ptr shm_transport = - std::make_shared(); + if (!configuration->whitelist.empty()) + { + params.useBuiltinTransports = false; - std::shared_ptr udp_transport = - std::make_shared(); + std::shared_ptr shm_transport = + std::make_shared(); + params.userTransports.push_back(shm_transport); + std::shared_ptr udp_transport = + configure_upd_transport_(configuration->whitelist); + params.userTransports.push_back(udp_transport); - for (const types::IpType& ip : configuration->whitelist) - { - if (types::Address::is_ipv4_correct(ip)) - { - udp_transport->interfaceWhiteList.emplace_back(ip); - logInfo(DDSPIPE_SIMPLE_PARTICIPANT, - "Adding " << ip << " to whitelist interfaces " << - " in Participant " << configuration->id << " initialization."); - } - else - { - // Invalid address, continue with next one - logWarning(DDSPIPE_SIMPLE_PARTICIPANT, - "Not valid IPv4. Discarding whitelist interface " << ip << - " in Participant " << configuration->id << " initialization."); - } } + } + else if (configuration->transport == participants::types::TransportProtocol::shm) + { + params.useBuiltinTransports = false; + std::shared_ptr shm_transport = + std::make_shared(); params.userTransports.push_back(shm_transport); + } + else if (configuration->transport == participants::types::TransportProtocol::udp) + { + std::shared_ptr udp_transport = + configure_upd_transport_(configuration->whitelist); params.userTransports.push_back(udp_transport); } + // Participant discovery filter configuration + switch (configuration->ignore_participant_flags) + { + case core::types::IgnoreParticipantFlags::no_filter: + params.builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; + break; + case core::types::IgnoreParticipantFlags::filter_different_host: + params.builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; + break; + case core::types::IgnoreParticipantFlags::filter_different_process: + params.builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_same_process: + params.builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_different_and_same_process: + params.builtin.discovery_config.ignoreParticipantFlags = + static_cast( + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS | + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS); + break; + default: + break; + } return params; } +std::shared_ptr +SimpleParticipant::configure_upd_transport_( + std::set whitelist) +{ + std::shared_ptr udp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv4_correct(ip)) + { + udp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_SIMPLE_PARTICIPANT, + "Adding " << ip << " to whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_SIMPLE_PARTICIPANT, + "Not valid IPv4. Discarding whitelist interface " << ip << "."); + } + } + + return udp_transport; +} + } /* namespace rtps */ } /* namespace participants */ } /* namespace ddspipe */ diff --git a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp index a3c77be4f..2e6a552a1 100644 --- a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp +++ b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp @@ -61,6 +61,14 @@ constexpr const char* ECHO_VERBOSE_TAG("verbose"); //! Echo in verbose mode constexpr const char* DOMAIN_ID_TAG("domain"); //! Domain Id of the participant constexpr const char* WHITELIST_INTERFACES_TAG("whitelist-interfaces"); //! Domain Id of the participant +// Participant disvoery settings +constexpr const char* IGNORE_PARTICIPANT_FLAGS_TAG("ignore-participant-flags"); //! Domain Id of the participant +constexpr const char* IGNORE_PARTICIPANT_FLAGS_NO_FILTER_TAG("no_filter"); //! Domain Id of the participant +constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_HOST_TAG("filter_different_host"); //! Domain Id of the participant +constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_PROCESS_TAG("filter_different_process"); //! Domain Id of the participant +constexpr const char* IGNORE_PARTICIPANT_FLAGS_SAME_PROCESS_TAG("filter_same_process"); //! Domain Id of the participant +constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_AND_SAME_PROCESS_TAG("filter_different_and_same_process"); //! Domain Id of the participant + // Discovery Server related tags constexpr const char* DISCOVERY_SERVER_GUID_PREFIX_TAG("discovery-server-guid"); //! TODO: add comment constexpr const char* LISTENING_ADDRESSES_TAG("listening-addresses"); //! TODO: add comment @@ -90,6 +98,8 @@ constexpr const char* ADDRESS_IP_VERSION_V6_TAG("v6"); //! TODO: add comment constexpr const char* ADDRESS_TRANSPORT_TAG("transport"); //! TODO: add comment constexpr const char* ADDRESS_TRANSPORT_UDP_TAG("udp"); //! TODO: add comment constexpr const char* ADDRESS_TRANSPORT_TCP_TAG("tcp"); //! TODO: add comment +constexpr const char* ADDRESS_TRANSPORT_SHM_TAG("shm"); //! TODO: add comment +constexpr const char* ADDRESS_TRANSPORT_BUILTIN_TAG("builtin"); //! TODO: add comment // Discovery Server Guid related tags constexpr const char* DISCOVERY_SERVER_GUID_TAG("guid"); //! TODO: add comment diff --git a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp index f666ec4d1..792460c49 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp @@ -134,10 +134,30 @@ void YamlReader::fill( } // Optional whitelist interfaces - // if (YamlReader::is_tag_present(yml, WHITELIST_INTERFACES_TAG)) - // { - // object.whitelist = YamlReader::get_set(yml, WHITELIST_INTERFACES_TAG, version); - // } + if (YamlReader::is_tag_present(yml, WHITELIST_INTERFACES_TAG)) + { + object.whitelist = YamlReader::get_set(yml, WHITELIST_INTERFACES_TAG, version); + } + + // Optional get Transport protocol + if (YamlReader::is_tag_present(yml, ADDRESS_TRANSPORT_TAG)) + { + object.transport = get(yml, ADDRESS_TRANSPORT_TAG, version); + } + else + { + object.transport = participants::types::TransportProtocol::builtin; + } + + // Optional get ignore participant flags + if (YamlReader::is_tag_present(yml, IGNORE_PARTICIPANT_FLAGS_TAG)) + { + object.ignore_participant_flags = get(yml, IGNORE_PARTICIPANT_FLAGS_TAG, version); + } + else + { + object.ignore_participant_flags = core::types::IgnoreParticipantFlags::no_filter; + } } template <> diff --git a/ddspipe_yaml/src/cpp/YamlReader_types.cpp b/ddspipe_yaml/src/cpp/YamlReader_types.cpp index 74283a6bd..acb4dcb33 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_types.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_types.cpp @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -48,6 +49,23 @@ namespace yaml { using namespace eprosima::ddspipe::core::types; using namespace eprosima::ddspipe::participants::types; +template <> +DDSPIPE_YAML_DllAPI +IgnoreParticipantFlags YamlReader::get( + const Yaml& yml, + const YamlReaderVersion /* version */) +{ + return get_enumeration( + yml, + { + {IGNORE_PARTICIPANT_FLAGS_NO_FILTER_TAG, IgnoreParticipantFlags::no_filter}, + {IGNORE_PARTICIPANT_FLAGS_DIFFERENT_HOST_TAG, IgnoreParticipantFlags::filter_different_host}, + {IGNORE_PARTICIPANT_FLAGS_DIFFERENT_PROCESS_TAG, IgnoreParticipantFlags::filter_different_process}, + {IGNORE_PARTICIPANT_FLAGS_SAME_PROCESS_TAG, IgnoreParticipantFlags::filter_same_process}, + {IGNORE_PARTICIPANT_FLAGS_DIFFERENT_AND_SAME_PROCESS_TAG, IgnoreParticipantFlags::filter_different_and_same_process}, + }); +} + template <> DDSPIPE_YAML_DllAPI YamlReaderVersion YamlReader::get( @@ -74,6 +92,8 @@ TransportProtocol YamlReader::get( { {ADDRESS_TRANSPORT_TCP_TAG, TransportProtocol::tcp}, {ADDRESS_TRANSPORT_UDP_TAG, TransportProtocol::udp}, + {ADDRESS_TRANSPORT_SHM_TAG, TransportProtocol::shm}, + {ADDRESS_TRANSPORT_BUILTIN_TAG, TransportProtocol::builtin} }); } From e0618cc168ae94b5f54a3c63c732243b99fdf4c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20L=C3=B3pez=20Fern=C3=A1ndez?= Date: Wed, 19 Apr 2023 10:54:17 +0200 Subject: [PATCH 03/15] Disable builtin transport with UDP-only configuration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juan López Fernández --- .../src/cpp/participant/dynamic_types/DynTypesParticipant.cpp | 3 ++- .../src/cpp/participant/rtps/SimpleParticipant.cpp | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp index d47b1b2e5..58e4962ce 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp @@ -206,7 +206,6 @@ void DynTypesParticipant::initialize_internal_dds_participant_() std::shared_ptr udp_transport = SimpleParticipant::configure_upd_transport_(configuration->whitelist); pqos.transport().user_transports.push_back(udp_transport); - } } else if (configuration->transport == participants::types::TransportProtocol::shm) @@ -219,6 +218,8 @@ void DynTypesParticipant::initialize_internal_dds_participant_() } else if (configuration->transport == participants::types::TransportProtocol::udp) { + pqos.transport().use_builtin_transports = false; + std::shared_ptr udp_transport = SimpleParticipant::configure_upd_transport_(configuration->whitelist); pqos.transport().user_transports.push_back(udp_transport); diff --git a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp index 24dde3ef4..d030a97fa 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp @@ -61,7 +61,6 @@ SimpleParticipant::reckon_participant_attributes_( std::shared_ptr udp_transport = configure_upd_transport_(configuration->whitelist); params.userTransports.push_back(udp_transport); - } } else if (configuration->transport == participants::types::TransportProtocol::shm) @@ -74,6 +73,8 @@ SimpleParticipant::reckon_participant_attributes_( } else if (configuration->transport == participants::types::TransportProtocol::udp) { + params.useBuiltinTransports = false; + std::shared_ptr udp_transport = configure_upd_transport_(configuration->whitelist); params.userTransports.push_back(udp_transport); From 177cbf52560c5385742a141a55e52cb905668340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20L=C3=B3pez=20Fern=C3=A1ndez?= Date: Mon, 29 May 2023 10:37:46 +0200 Subject: [PATCH 04/15] Apply whitelist configuration to WAN participants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juan López Fernández --- .../{Participant.hpp => CustomTransport.hpp} | 9 ++ .../SimpleParticipantConfiguration.hpp | 8 +- .../participant/rtps/CommonParticipant.hpp | 14 ++- .../participant/rtps/SimpleParticipant.hpp | 8 +- .../types/address/Address.hpp | 4 +- .../dynamic_types/DynTypesParticipant.cpp | 10 +- .../participant/rtps/CommonParticipant.cpp | 112 ++++++++++++++++++ .../rtps/DiscoveryServerParticipant.cpp | 12 +- .../rtps/InitialPeersParticipant.cpp | 12 +- .../participant/rtps/SimpleParticipant.cpp | 36 +----- .../ddspipe_yaml/yaml_configuration_tags.hpp | 29 +++-- .../src/cpp/YamlReader_participants.cpp | 20 +++- ddspipe_yaml/src/cpp/YamlReader_types.cpp | 21 +++- 13 files changed, 212 insertions(+), 83 deletions(-) rename ddspipe_core/include/ddspipe_core/types/dds/{Participant.hpp => CustomTransport.hpp} (84%) diff --git a/ddspipe_core/include/ddspipe_core/types/dds/Participant.hpp b/ddspipe_core/include/ddspipe_core/types/dds/CustomTransport.hpp similarity index 84% rename from ddspipe_core/include/ddspipe_core/types/dds/Participant.hpp rename to ddspipe_core/include/ddspipe_core/types/dds/CustomTransport.hpp index afcab92a1..f705f3740 100644 --- a/ddspipe_core/include/ddspipe_core/types/dds/Participant.hpp +++ b/ddspipe_core/include/ddspipe_core/types/dds/CustomTransport.hpp @@ -19,6 +19,15 @@ namespace ddspipe { namespace core { namespace types { +//! Different options for transport configuration +enum class TransportDescriptors +{ + builtin, + udp_only, + shm_only +}; + +//! Possible values for Ignore Participant Flags enum class IgnoreParticipantFlags { no_filter, diff --git a/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp b/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp index ca72c867f..37fe7deff 100644 --- a/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp +++ b/ddspipe_participants/include/ddspipe_participants/configuration/SimpleParticipantConfiguration.hpp @@ -14,11 +14,11 @@ #pragma once +#include +#include #include #include #include -#include -#include namespace eprosima { namespace ddspipe { @@ -49,11 +49,11 @@ struct SimpleParticipantConfiguration : public ParticipantConfiguration // VARIABLES ///////////////////////// - eprosima::ddspipe::core::types::DomainId domain {0u}; + core::types::DomainId domain {0u}; std::set whitelist {}; - participants::types::TransportProtocol transport {participants::types::TransportProtocol::builtin}; + core::types::TransportDescriptors transport {core::types::TransportDescriptors::builtin}; core::types::IgnoreParticipantFlags ignore_participant_flags {core::types::IgnoreParticipantFlags::no_filter}; }; diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp index 6fd71382a..d71abc445 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp @@ -19,16 +19,18 @@ #include #include #include -#include #include +#include + -#include -#include #include #include +#include +#include -#include #include +#include +#include namespace eprosima { namespace ddspipe { @@ -202,6 +204,10 @@ class CommonParticipant static fastrtps::rtps::RTPSParticipantAttributes reckon_participant_attributes_( const ParticipantConfiguration* participant_configuration); + template + static std::shared_ptr create_descriptor_( + std::set whitelist = {}); + ///// // VARIABLES diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp index 088cbcb6d..ee18060e0 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp @@ -14,8 +14,6 @@ #pragma once -#include - #include #include @@ -51,11 +49,11 @@ class SimpleParticipant : public CommonParticipant protected: + /** + * @brief Static method that gives the attributes for a Simple Participant. + */ static fastrtps::rtps::RTPSParticipantAttributes reckon_participant_attributes_( const SimpleParticipantConfiguration* configuration); - - static std::shared_ptr configure_upd_transport_( - std::set whitelist = {}); }; } /* namespace rtps */ diff --git a/ddspipe_participants/include/ddspipe_participants/types/address/Address.hpp b/ddspipe_participants/include/ddspipe_participants/types/address/Address.hpp index ba24dc5ca..00fd09a82 100644 --- a/ddspipe_participants/include/ddspipe_participants/types/address/Address.hpp +++ b/ddspipe_participants/include/ddspipe_participants/types/address/Address.hpp @@ -45,9 +45,7 @@ enum class IpVersion : int enum class TransportProtocol { udp, - tcp, - shm, - builtin + tcp }; /** diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp index 58e4962ce..93270bb56 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp @@ -193,7 +193,7 @@ void DynTypesParticipant::initialize_internal_dds_participant_() pqos.wire_protocol().builtin.typelookup_config.use_client = true; // Configure Participant transports - if (configuration->transport == participants::types::TransportProtocol::builtin) + if (configuration->transport == core::types::TransportDescriptors::builtin) { if (!configuration->whitelist.empty()) { @@ -204,11 +204,11 @@ void DynTypesParticipant::initialize_internal_dds_participant_() pqos.transport().user_transports.push_back(shm_transport); std::shared_ptr udp_transport = - SimpleParticipant::configure_upd_transport_(configuration->whitelist); + create_descriptor_(configuration->whitelist); pqos.transport().user_transports.push_back(udp_transport); } } - else if (configuration->transport == participants::types::TransportProtocol::shm) + else if (configuration->transport == core::types::TransportDescriptors::shm_only) { pqos.transport().use_builtin_transports = false; @@ -216,12 +216,12 @@ void DynTypesParticipant::initialize_internal_dds_participant_() std::make_shared(); pqos.transport().user_transports.push_back(shm_transport); } - else if (configuration->transport == participants::types::TransportProtocol::udp) + else if (configuration->transport == core::types::TransportDescriptors::udp_only) { pqos.transport().use_builtin_transports = false; std::shared_ptr udp_transport = - SimpleParticipant::configure_upd_transport_(configuration->whitelist); + create_descriptor_(configuration->whitelist); pqos.transport().user_transports.push_back(udp_transport); pqos.transport().user_transports.push_back(udp_transport); } diff --git a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp index b2b297f6e..49348dabb 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp @@ -15,6 +15,10 @@ #include +#include +#include +#include +#include #include #include @@ -446,6 +450,114 @@ CommonParticipant::reckon_participant_attributes_( return params; } +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr udp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv4_correct(ip)) + { + udp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to UDP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv4. Discarding UDP whitelist interface " << ip << "."); + } + } + + return udp_transport; +} + +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr udp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv6_correct(ip)) + { + udp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to UDP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv6. Discarding UDP whitelist interface " << ip << "."); + } + } + + return udp_transport; +} + +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr tcp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv4_correct(ip)) + { + tcp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to TCP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv4. Discarding TCP whitelist interface " << ip << "."); + } + } + + return tcp_transport; +} + +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr tcp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv6_correct(ip)) + { + tcp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to TCP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv6. Discarding TCP whitelist interface " << ip << "."); + } + } + + return tcp_transport; +} + } /* namespace rtps */ } /* namespace participants */ } /* namespace ddspipe */ diff --git a/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp index 2892886de..f9cb594ec 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp @@ -120,7 +120,7 @@ DiscoveryServerParticipant::reckon_participant_attributes_( } else { - descriptor = std::make_shared(); + descriptor = create_descriptor_(configuration->whitelist); descriptor->add_listener_port(address.port()); descriptor->set_WAN_address(address.ip()); @@ -139,7 +139,7 @@ DiscoveryServerParticipant::reckon_participant_attributes_( has_listening_tcp_ipv6 = true; std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); descriptor->add_listener_port(address.port()); @@ -299,7 +299,7 @@ DiscoveryServerParticipant::reckon_participant_attributes_( if (has_connection_tcp_ipv4 && !has_listening_tcp_ipv4) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); // Enable TLS if (tls_config.is_active()) @@ -315,7 +315,7 @@ DiscoveryServerParticipant::reckon_participant_attributes_( if (has_connection_tcp_ipv6 && !has_listening_tcp_ipv6) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); // Enable TLS if (tls_config.is_active()) @@ -333,7 +333,7 @@ DiscoveryServerParticipant::reckon_participant_attributes_( if (has_udp_ipv4) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); params.userTransports.push_back(descriptor); logDebug(DDSPIPE_DISCOVERYSERVER_PARTICIPANT, @@ -342,7 +342,7 @@ DiscoveryServerParticipant::reckon_participant_attributes_( if (has_udp_ipv6) { std::shared_ptr descriptor_v6 = - std::make_shared(); + create_descriptor_(configuration->whitelist); params.userTransports.push_back(descriptor_v6); logDebug(DDSPIPE_DISCOVERYSERVER_PARTICIPANT, diff --git a/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp index 1f99e49ee..c101a15ba 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp @@ -115,7 +115,7 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic } else { - descriptor = std::make_shared(); + descriptor = create_descriptor_(configuration->whitelist); descriptor->add_listener_port(address.port()); descriptor->set_WAN_address(address.ip()); @@ -134,7 +134,7 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic has_listening_tcp_ipv6 = true; std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); descriptor->add_listener_port(address.port()); @@ -254,7 +254,7 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic if (has_connection_tcp_ipv4 && !has_listening_tcp_ipv4) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); // Enable TLS if (tls_config.is_active()) @@ -271,7 +271,7 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic if (has_connection_tcp_ipv6 && !has_listening_tcp_ipv6) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); // Enable TLS if (tls_config.is_active()) @@ -289,7 +289,7 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic if (has_udp_ipv4) { std::shared_ptr descriptor = - std::make_shared(); + create_descriptor_(configuration->whitelist); params.userTransports.push_back(descriptor); logDebug(DDSPIPE_INITIALPEERS_PARTICIPANT, @@ -299,7 +299,7 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic if (has_udp_ipv6) { std::shared_ptr descriptor_v6 = - std::make_shared(); + create_descriptor_(configuration->whitelist); params.userTransports.push_back(descriptor_v6); logDebug(DDSPIPE_INITIALPEERS_PARTICIPANT, diff --git a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp index d030a97fa..c7a18feed 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp @@ -48,7 +48,7 @@ SimpleParticipant::reckon_participant_attributes_( fastrtps::rtps::RTPSParticipantAttributes params = CommonParticipant::reckon_participant_attributes_(configuration); // Configure Participant transports - if (configuration->transport == participants::types::TransportProtocol::builtin) + if (configuration->transport == core::types::TransportDescriptors::builtin) { if (!configuration->whitelist.empty()) { @@ -59,11 +59,11 @@ SimpleParticipant::reckon_participant_attributes_( params.userTransports.push_back(shm_transport); std::shared_ptr udp_transport = - configure_upd_transport_(configuration->whitelist); + create_descriptor_(configuration->whitelist); params.userTransports.push_back(udp_transport); } } - else if (configuration->transport == participants::types::TransportProtocol::shm) + else if (configuration->transport == core::types::TransportDescriptors::shm_only) { params.useBuiltinTransports = false; @@ -71,12 +71,12 @@ SimpleParticipant::reckon_participant_attributes_( std::make_shared(); params.userTransports.push_back(shm_transport); } - else if (configuration->transport == participants::types::TransportProtocol::udp) + else if (configuration->transport == core::types::TransportDescriptors::udp_only) { params.useBuiltinTransports = false; std::shared_ptr udp_transport = - configure_upd_transport_(configuration->whitelist); + create_descriptor_(configuration->whitelist); params.userTransports.push_back(udp_transport); } @@ -108,32 +108,6 @@ SimpleParticipant::reckon_participant_attributes_( return params; } -std::shared_ptr -SimpleParticipant::configure_upd_transport_( - std::set whitelist) -{ - std::shared_ptr udp_transport = - std::make_shared(); - - for (const types::IpType& ip : whitelist) - { - if (types::Address::is_ipv4_correct(ip)) - { - udp_transport->interfaceWhiteList.emplace_back(ip); - logInfo(DDSPIPE_SIMPLE_PARTICIPANT, - "Adding " << ip << " to whitelist interfaces."); - } - else - { - // Invalid address, continue with next one - logWarning(DDSPIPE_SIMPLE_PARTICIPANT, - "Not valid IPv4. Discarding whitelist interface " << ip << "."); - } - } - - return udp_transport; -} - } /* namespace rtps */ } /* namespace participants */ } /* namespace ddspipe */ diff --git a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp index 2e6a552a1..6c1c78fbc 100644 --- a/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp +++ b/ddspipe_yaml/include/ddspipe_yaml/yaml_configuration_tags.hpp @@ -57,17 +57,26 @@ constexpr const char* ECHO_DISCOVERY_TAG("discovery"); //! Echo Discovery recei constexpr const char* ECHO_VERBOSE_TAG("verbose"); //! Echo in verbose mode // RTPS related tags + +// Transport related tags +constexpr const char* WHITELIST_INTERFACES_TAG("whitelist-interfaces"); //! TODO: add comment + +// Custom transport descriptors tags +constexpr const char* TRANSPORT_DESCRIPTORS_TRANSPORT_TAG("transport"); //! TODO: add comment +constexpr const char* TRANSPORT_DESCRIPTORS_BUILTIN_TAG("builtin"); //! TODO: add comment +constexpr const char* TRANSPORT_DESCRIPTORS_UDP_TAG("udp"); //! TODO: add comment +constexpr const char* TRANSPORT_DESCRIPTORS_SHM_TAG("shm"); //! TODO: add comment + +// Participant discovery settings +constexpr const char* IGNORE_PARTICIPANT_FLAGS_TAG("ignore-participant-flags"); //! TODO: add comment +constexpr const char* IGNORE_PARTICIPANT_FLAGS_NO_FILTER_TAG("no_filter"); //! TODO: add comment +constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_HOST_TAG("filter_different_host"); //! TODO: add comment +constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_PROCESS_TAG("filter_different_process"); //! TODO: add comment +constexpr const char* IGNORE_PARTICIPANT_FLAGS_SAME_PROCESS_TAG("filter_same_process"); //! TODO: add comment +constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_AND_SAME_PROCESS_TAG("filter_different_and_same_process"); //! TODO: add comment + // Simple RTPS related tags constexpr const char* DOMAIN_ID_TAG("domain"); //! Domain Id of the participant -constexpr const char* WHITELIST_INTERFACES_TAG("whitelist-interfaces"); //! Domain Id of the participant - -// Participant disvoery settings -constexpr const char* IGNORE_PARTICIPANT_FLAGS_TAG("ignore-participant-flags"); //! Domain Id of the participant -constexpr const char* IGNORE_PARTICIPANT_FLAGS_NO_FILTER_TAG("no_filter"); //! Domain Id of the participant -constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_HOST_TAG("filter_different_host"); //! Domain Id of the participant -constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_PROCESS_TAG("filter_different_process"); //! Domain Id of the participant -constexpr const char* IGNORE_PARTICIPANT_FLAGS_SAME_PROCESS_TAG("filter_same_process"); //! Domain Id of the participant -constexpr const char* IGNORE_PARTICIPANT_FLAGS_DIFFERENT_AND_SAME_PROCESS_TAG("filter_different_and_same_process"); //! Domain Id of the participant // Discovery Server related tags constexpr const char* DISCOVERY_SERVER_GUID_PREFIX_TAG("discovery-server-guid"); //! TODO: add comment @@ -98,8 +107,6 @@ constexpr const char* ADDRESS_IP_VERSION_V6_TAG("v6"); //! TODO: add comment constexpr const char* ADDRESS_TRANSPORT_TAG("transport"); //! TODO: add comment constexpr const char* ADDRESS_TRANSPORT_UDP_TAG("udp"); //! TODO: add comment constexpr const char* ADDRESS_TRANSPORT_TCP_TAG("tcp"); //! TODO: add comment -constexpr const char* ADDRESS_TRANSPORT_SHM_TAG("shm"); //! TODO: add comment -constexpr const char* ADDRESS_TRANSPORT_BUILTIN_TAG("builtin"); //! TODO: add comment // Discovery Server Guid related tags constexpr const char* DISCOVERY_SERVER_GUID_TAG("guid"); //! TODO: add comment diff --git a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp index 792460c49..7d1350501 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp @@ -139,14 +139,14 @@ void YamlReader::fill( object.whitelist = YamlReader::get_set(yml, WHITELIST_INTERFACES_TAG, version); } - // Optional get Transport protocol - if (YamlReader::is_tag_present(yml, ADDRESS_TRANSPORT_TAG)) + // Optional get Transport descriptors + if (YamlReader::is_tag_present(yml, TRANSPORT_DESCRIPTORS_TRANSPORT_TAG)) { - object.transport = get(yml, ADDRESS_TRANSPORT_TAG, version); + object.transport = get(yml, TRANSPORT_DESCRIPTORS_TRANSPORT_TAG, version); } else { - object.transport = participants::types::TransportProtocol::builtin; + object.transport = core::types::TransportDescriptors::builtin; } // Optional get ignore participant flags @@ -183,6 +183,12 @@ void YamlReader::fill( // Parent class fill fill(object, yml, version); + // Optional whitelist interfaces + if (YamlReader::is_tag_present(yml, WHITELIST_INTERFACES_TAG)) + { + object.whitelist = YamlReader::get_set(yml, WHITELIST_INTERFACES_TAG, version); + } + // Optional listening addresses if (YamlReader::is_tag_present(yml, LISTENING_ADDRESSES_TAG)) { @@ -245,6 +251,12 @@ void YamlReader::fill( // Parent class fill fill(object, yml, version); + // Optional whitelist interfaces + if (YamlReader::is_tag_present(yml, WHITELIST_INTERFACES_TAG)) + { + object.whitelist = YamlReader::get_set(yml, WHITELIST_INTERFACES_TAG, version); + } + // Optional listening addresses if (YamlReader::is_tag_present(yml, LISTENING_ADDRESSES_TAG)) { diff --git a/ddspipe_yaml/src/cpp/YamlReader_types.cpp b/ddspipe_yaml/src/cpp/YamlReader_types.cpp index acb4dcb33..4629b00d1 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_types.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_types.cpp @@ -21,9 +21,9 @@ #include #include +#include #include #include -#include #include #include #include @@ -49,6 +49,21 @@ namespace yaml { using namespace eprosima::ddspipe::core::types; using namespace eprosima::ddspipe::participants::types; +template <> +DDSPIPE_YAML_DllAPI +TransportDescriptors YamlReader::get( + const Yaml& yml, + const YamlReaderVersion /* version */) +{ + return get_enumeration( + yml, + { + {ADDRESS_TRANSPORT_TCP_TAG, TransportDescriptors::builtin}, + {ADDRESS_TRANSPORT_TCP_TAG, TransportDescriptors::udp_only}, + {ADDRESS_TRANSPORT_UDP_TAG, TransportDescriptors::shm_only} + }); +} + template <> DDSPIPE_YAML_DllAPI IgnoreParticipantFlags YamlReader::get( @@ -91,9 +106,7 @@ TransportProtocol YamlReader::get( yml, { {ADDRESS_TRANSPORT_TCP_TAG, TransportProtocol::tcp}, - {ADDRESS_TRANSPORT_UDP_TAG, TransportProtocol::udp}, - {ADDRESS_TRANSPORT_SHM_TAG, TransportProtocol::shm}, - {ADDRESS_TRANSPORT_BUILTIN_TAG, TransportProtocol::builtin} + {ADDRESS_TRANSPORT_UDP_TAG, TransportProtocol::udp} }); } From 234945f97eb60e2b9f34158c1664f12cb2fc8d8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20L=C3=B3pez=20Fern=C3=A1ndez?= Date: Mon, 29 May 2023 12:37:27 +0200 Subject: [PATCH 05/15] Uncrustify MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juan López Fernández --- .../participant/rtps/CommonParticipant.hpp | 2 +- .../dynamic_types/DynTypesParticipant.cpp | 14 +++++++++----- .../cpp/participant/rtps/CommonParticipant.cpp | 16 ++++++++-------- .../rtps/DiscoveryServerParticipant.cpp | 3 ++- .../participant/rtps/InitialPeersParticipant.cpp | 3 ++- .../cpp/participant/rtps/SimpleParticipant.cpp | 12 ++++++++---- ddspipe_yaml/src/cpp/YamlReader_participants.cpp | 3 ++- ddspipe_yaml/src/cpp/YamlReader_types.cpp | 3 ++- 8 files changed, 34 insertions(+), 22 deletions(-) diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp index d71abc445..81d5fcbc5 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp @@ -206,7 +206,7 @@ class CommonParticipant template static std::shared_ptr create_descriptor_( - std::set whitelist = {}); + std::set whitelist = {}); ///// // VARIABLES diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp index 93270bb56..a1e3b945a 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesParticipant.cpp @@ -222,7 +222,7 @@ void DynTypesParticipant::initialize_internal_dds_participant_() std::shared_ptr udp_transport = create_descriptor_(configuration->whitelist); - pqos.transport().user_transports.push_back(udp_transport); + pqos.transport().user_transports.push_back(udp_transport); pqos.transport().user_transports.push_back(udp_transport); } @@ -230,16 +230,20 @@ void DynTypesParticipant::initialize_internal_dds_participant_() switch (configuration->ignore_participant_flags) { case core::types::IgnoreParticipantFlags::no_filter: - pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; break; case core::types::IgnoreParticipantFlags::filter_different_host: - pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; break; case core::types::IgnoreParticipantFlags::filter_different_process: - pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; break; case core::types::IgnoreParticipantFlags::filter_same_process: - pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; break; case core::types::IgnoreParticipantFlags::filter_different_and_same_process: pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = diff --git a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp index 49348dabb..af34101f0 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp @@ -456,7 +456,7 @@ CommonParticipant::create_descriptor_( std::set whitelist) { std::shared_ptr udp_transport = - std::make_shared(); + std::make_shared(); for (const types::IpType& ip : whitelist) { @@ -464,7 +464,7 @@ CommonParticipant::create_descriptor_( { udp_transport->interfaceWhiteList.emplace_back(ip); logInfo(DDSPIPE_COMMON_PARTICIPANT, - "Adding " << ip << " to UDP whitelist interfaces."); + "Adding " << ip << " to UDP whitelist interfaces."); } else { @@ -483,7 +483,7 @@ CommonParticipant::create_descriptor_( std::set whitelist) { std::shared_ptr udp_transport = - std::make_shared(); + std::make_shared(); for (const types::IpType& ip : whitelist) { @@ -491,7 +491,7 @@ CommonParticipant::create_descriptor_( { udp_transport->interfaceWhiteList.emplace_back(ip); logInfo(DDSPIPE_COMMON_PARTICIPANT, - "Adding " << ip << " to UDP whitelist interfaces."); + "Adding " << ip << " to UDP whitelist interfaces."); } else { @@ -510,7 +510,7 @@ CommonParticipant::create_descriptor_( std::set whitelist) { std::shared_ptr tcp_transport = - std::make_shared(); + std::make_shared(); for (const types::IpType& ip : whitelist) { @@ -518,7 +518,7 @@ CommonParticipant::create_descriptor_( { tcp_transport->interfaceWhiteList.emplace_back(ip); logInfo(DDSPIPE_COMMON_PARTICIPANT, - "Adding " << ip << " to TCP whitelist interfaces."); + "Adding " << ip << " to TCP whitelist interfaces."); } else { @@ -537,7 +537,7 @@ CommonParticipant::create_descriptor_( std::set whitelist) { std::shared_ptr tcp_transport = - std::make_shared(); + std::make_shared(); for (const types::IpType& ip : whitelist) { @@ -545,7 +545,7 @@ CommonParticipant::create_descriptor_( { tcp_transport->interfaceWhiteList.emplace_back(ip); logInfo(DDSPIPE_COMMON_PARTICIPANT, - "Adding " << ip << " to TCP whitelist interfaces."); + "Adding " << ip << " to TCP whitelist interfaces."); } else { diff --git a/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp index f9cb594ec..09a05d3bd 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/DiscoveryServerParticipant.cpp @@ -120,7 +120,8 @@ DiscoveryServerParticipant::reckon_participant_attributes_( } else { - descriptor = create_descriptor_(configuration->whitelist); + descriptor = create_descriptor_( + configuration->whitelist); descriptor->add_listener_port(address.port()); descriptor->set_WAN_address(address.ip()); diff --git a/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp index c101a15ba..7e19a06a8 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/InitialPeersParticipant.cpp @@ -115,7 +115,8 @@ fastrtps::rtps::RTPSParticipantAttributes InitialPeersParticipant::reckon_partic } else { - descriptor = create_descriptor_(configuration->whitelist); + descriptor = create_descriptor_( + configuration->whitelist); descriptor->add_listener_port(address.port()); descriptor->set_WAN_address(address.ip()); diff --git a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp index c7a18feed..09b43dceb 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/SimpleParticipant.cpp @@ -84,16 +84,20 @@ SimpleParticipant::reckon_participant_attributes_( switch (configuration->ignore_participant_flags) { case core::types::IgnoreParticipantFlags::no_filter: - params.builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; + params.builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; break; case core::types::IgnoreParticipantFlags::filter_different_host: - params.builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; + params.builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; break; case core::types::IgnoreParticipantFlags::filter_different_process: - params.builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; + params.builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; break; case core::types::IgnoreParticipantFlags::filter_same_process: - params.builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; + params.builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; break; case core::types::IgnoreParticipantFlags::filter_different_and_same_process: params.builtin.discovery_config.ignoreParticipantFlags = diff --git a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp index 7d1350501..a8aa358c0 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_participants.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_participants.cpp @@ -152,7 +152,8 @@ void YamlReader::fill( // Optional get ignore participant flags if (YamlReader::is_tag_present(yml, IGNORE_PARTICIPANT_FLAGS_TAG)) { - object.ignore_participant_flags = get(yml, IGNORE_PARTICIPANT_FLAGS_TAG, version); + object.ignore_participant_flags = get(yml, IGNORE_PARTICIPANT_FLAGS_TAG, + version); } else { diff --git a/ddspipe_yaml/src/cpp/YamlReader_types.cpp b/ddspipe_yaml/src/cpp/YamlReader_types.cpp index 4629b00d1..8f15a824b 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_types.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_types.cpp @@ -77,7 +77,8 @@ IgnoreParticipantFlags YamlReader::get( {IGNORE_PARTICIPANT_FLAGS_DIFFERENT_HOST_TAG, IgnoreParticipantFlags::filter_different_host}, {IGNORE_PARTICIPANT_FLAGS_DIFFERENT_PROCESS_TAG, IgnoreParticipantFlags::filter_different_process}, {IGNORE_PARTICIPANT_FLAGS_SAME_PROCESS_TAG, IgnoreParticipantFlags::filter_same_process}, - {IGNORE_PARTICIPANT_FLAGS_DIFFERENT_AND_SAME_PROCESS_TAG, IgnoreParticipantFlags::filter_different_and_same_process}, + {IGNORE_PARTICIPANT_FLAGS_DIFFERENT_AND_SAME_PROCESS_TAG, + IgnoreParticipantFlags::filter_different_and_same_process}, }); } From 9f9da6d01632829e3f760dae00186cd31b40a98c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20L=C3=B3pez=20Fern=C3=A1ndez?= Date: Mon, 29 May 2023 14:51:18 +0200 Subject: [PATCH 06/15] Fix typo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juan López Fernández --- ddspipe_yaml/src/cpp/YamlReader_types.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ddspipe_yaml/src/cpp/YamlReader_types.cpp b/ddspipe_yaml/src/cpp/YamlReader_types.cpp index 8f15a824b..5cecaedee 100644 --- a/ddspipe_yaml/src/cpp/YamlReader_types.cpp +++ b/ddspipe_yaml/src/cpp/YamlReader_types.cpp @@ -58,9 +58,9 @@ TransportDescriptors YamlReader::get( return get_enumeration( yml, { - {ADDRESS_TRANSPORT_TCP_TAG, TransportDescriptors::builtin}, - {ADDRESS_TRANSPORT_TCP_TAG, TransportDescriptors::udp_only}, - {ADDRESS_TRANSPORT_UDP_TAG, TransportDescriptors::shm_only} + {TRANSPORT_DESCRIPTORS_BUILTIN_TAG, TransportDescriptors::builtin}, + {TRANSPORT_DESCRIPTORS_UDP_TAG, TransportDescriptors::udp_only}, + {TRANSPORT_DESCRIPTORS_SHM_TAG, TransportDescriptors::shm_only} }); } From db6bda0096a16c688ad5feeba5e0331316da0b6d Mon Sep 17 00:00:00 2001 From: jparisu Date: Thu, 20 Apr 2023 12:58:23 +0200 Subject: [PATCH 07/15] Add support for dynamic type routing Signed-off-by: jparisu --- .../participant/dds/DdsCommonParticipant.hpp | 81 +++++++ .../DynTypesPublicationParticipant.hpp | 104 ++++++++ .../DynTypesSubscriptionParticipant.hpp | 87 +++++++ .../participant/rtps/SimpleParticipant.hpp | 3 + .../writer/auxiliar/InternalWriter.hpp | 46 ++++ .../participant/dds/DdsCommonParticipant.cpp | 199 +++++++++++++++ .../DynTypesPublicationParticipant.cpp | 226 ++++++++++++++++++ .../DynTypesSubscriptionParticipant.cpp | 166 +++++++++++++ .../cpp/writer/auxiliar/InternalWriter.cpp | 38 +++ 9 files changed, 950 insertions(+) create mode 100644 ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp create mode 100644 ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesPublicationParticipant.hpp create mode 100644 ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesSubscriptionParticipant.hpp create mode 100644 ddspipe_participants/include/ddspipe_participants/writer/auxiliar/InternalWriter.hpp create mode 100644 ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp create mode 100644 ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp create mode 100644 ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp create mode 100644 ddspipe_participants/src/cpp/writer/auxiliar/InternalWriter.cpp diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp new file mode 100644 index 000000000..4330e902a --- /dev/null +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp @@ -0,0 +1,81 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include +#include + +#include +#include +#include + +#include +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { +namespace dds { + +/** + * TODO comment + */ +class DdsCommonParticipant : public core::IParticipant , public eprosima::fastdds::dds::DomainParticipantListener +{ +public: + + DDSPIPE_PARTICIPANTS_DllAPI + virtual ~DdsCommonParticipant(); + + DDSPIPE_PARTICIPANTS_DllAPI + virtual core::types::ParticipantId id() const noexcept override; + + DDSPIPE_PARTICIPANTS_DllAPI + virtual bool is_rtps_kind() const noexcept override; + + // NOTE: This should not be initialized here, but it is just for simplicity and less code in childs + DDSPIPE_PARTICIPANTS_DllAPI + virtual bool is_repeater() const noexcept override; + + DDSPIPE_PARTICIPANTS_DllAPI + virtual void init(); + +protected: + + // TODO + DDSPIPE_PARTICIPANTS_DllAPI + DdsCommonParticipant( + const std::shared_ptr& participant_configuration); + + DDSPIPE_PARTICIPANTS_DllAPI + core::types::Endpoint simulate_endpoint_( + const core::types::DdsTopic& topic) const; + + eprosima::fastdds::dds::DomainParticipant* dds_participant_; + eprosima::fastdds::dds::Publisher* dds_publisher_; + eprosima::fastdds::dds::Subscriber* dds_subscriber_; + + core::types::ParticipantId id_; + + std::shared_ptr participant_configuration_; +}; + +} /* namespace dds */ +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesPublicationParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesPublicationParticipant.hpp new file mode 100644 index 000000000..03d7f81e5 --- /dev/null +++ b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesPublicationParticipant.hpp @@ -0,0 +1,104 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +/** + * TODO comment + */ +class DynTypesPublicationParticipant : public dds::DdsCommonParticipant +{ +public: + + // TODO + DDSPIPE_PARTICIPANTS_DllAPI + DynTypesPublicationParticipant( + const std::shared_ptr& participant_configuration, + const std::shared_ptr& discovery_database); + + DDSPIPE_PARTICIPANTS_DllAPI + virtual ~DynTypesPublicationParticipant(); + + DDSPIPE_PARTICIPANTS_DllAPI + std::shared_ptr create_writer( + const core::ITopic& topic) override; + + DDSPIPE_PARTICIPANTS_DllAPI + std::shared_ptr create_reader( + const core::ITopic& topic) override; + +protected: + + DDSPIPE_PARTICIPANTS_DllAPI + utils::ReturnCode receive_type_object_( + eprosima::fastrtps::types::DynamicType_ptr dynamic_type); + + DDSPIPE_PARTICIPANTS_DllAPI + utils::ReturnCode receive_type_object_( + ddspipe::core::IRoutingData& data); + + DDSPIPE_PARTICIPANTS_DllAPI + void create_empty_datawriter_nts_( + const core::types::DdsTopic& topic); + + static + DDSPIPE_PARTICIPANTS_DllAPI + fastdds::dds::DataWriterQos + default_empty_datawriter_qos_( + const core::types::DdsTopic& topic) noexcept; + + static + DDSPIPE_PARTICIPANTS_DllAPI + fastdds::dds::TopicQos + default_topic_qos_( + const core::types::DdsTopic& topic) noexcept; + + std::map< + core::types::DdsTopic, + std::pair< + fastdds::dds::Topic*, + fastdds::dds::DataWriter*>> writers_; + + std::map types_discovered_; + + //! Type Object Internal Writer + std::shared_ptr type_object_writer_; + + std::mutex mutex_; +}; + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesSubscriptionParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesSubscriptionParticipant.hpp new file mode 100644 index 000000000..935172639 --- /dev/null +++ b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesSubscriptionParticipant.hpp @@ -0,0 +1,87 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file DynTypesSubscriptionParticipant.hpp + */ + +#pragma once + +#include +#include + +#include + +#include +#include +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +/** + * TODO comment + */ +class DynTypesSubscriptionParticipant : public dds::DdsCommonParticipant +{ +public: + + // TODO + DDSPIPE_PARTICIPANTS_DllAPI + DynTypesSubscriptionParticipant( + const std::shared_ptr& participant_configuration, + const std::shared_ptr& discovery_database); + + DDSPIPE_PARTICIPANTS_DllAPI + virtual ~DynTypesSubscriptionParticipant() = default; + + DDSPIPE_PARTICIPANTS_DllAPI + std::shared_ptr create_writer( + const core::ITopic& topic) override; + + DDSPIPE_PARTICIPANTS_DllAPI + std::shared_ptr create_reader( + const core::ITopic& topic) override; + + DDSPIPE_PARTICIPANTS_DllAPI + virtual void on_type_discovery( + eprosima::fastdds::dds::DomainParticipant* participant, + const eprosima::fastrtps::rtps::SampleIdentity& request_sample_id, + const eprosima::fastrtps::string_255& topic, + const eprosima::fastrtps::types::TypeIdentifier* identifier, + const eprosima::fastrtps::types::TypeObject* object, + eprosima::fastrtps::types::DynamicType_ptr dyn_type) override; + + DDSPIPE_PARTICIPANTS_DllAPI + virtual void on_type_information_received( + eprosima::fastdds::dds::DomainParticipant* participant, + const eprosima::fastrtps::string_255 topic_name, + const eprosima::fastrtps::string_255 type_name, + const eprosima::fastrtps::types::TypeInformation& type_information) override; + +protected: + + DDSPIPE_PARTICIPANTS_DllAPI + void internal_notify_type_object_( + eprosima::fastrtps::types::DynamicType_ptr dynamic_type); + + //! Type Object Internal Reader + std::shared_ptr type_object_reader_; +}; + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp index ee18060e0..77d4cf1f9 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp @@ -47,6 +47,9 @@ class SimpleParticipant : public CommonParticipant const std::shared_ptr& payload_pool, const std::shared_ptr& discovery_database); + static std::shared_ptr configure_upd_transport_( + std::set whitelist = {}); + protected: /** diff --git a/ddspipe_participants/include/ddspipe_participants/writer/auxiliar/InternalWriter.hpp b/ddspipe_participants/include/ddspipe_participants/writer/auxiliar/InternalWriter.hpp new file mode 100644 index 000000000..30ef3172d --- /dev/null +++ b/ddspipe_participants/include/ddspipe_participants/writer/auxiliar/InternalWriter.hpp @@ -0,0 +1,46 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +/** + * Writer implementation that allow to register a callback to be executed with each data received. + */ +class InternalWriter : public ddspipe::participants::BaseWriter +{ +public: + + InternalWriter( + const ddspipe::core::types::ParticipantId& participant_id, + const std::function& callback); + +protected: + + virtual utils::ReturnCode write_nts_( + ddspipe::core::IRoutingData& data) noexcept override; + + const std::function callback_; +}; + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp new file mode 100644 index 000000000..a77bcd8cf --- /dev/null +++ b/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp @@ -0,0 +1,199 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { +namespace dds { + +using namespace eprosima::ddspipe::core; +using namespace eprosima::ddspipe::core::types; +using namespace eprosima::fastrtps::types; + +DdsCommonParticipant::DdsCommonParticipant( + const std::shared_ptr& participant_configuration) + : participant_configuration_(participant_configuration) +{ + // Do nothing +} + +DdsCommonParticipant::~DdsCommonParticipant() +{ + dds_participant_->set_listener(nullptr); + + dds_participant_->delete_publisher(dds_publisher_); + dds_participant_->delete_subscriber(dds_subscriber_); + + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(dds_participant_); +} + +core::types::ParticipantId DdsCommonParticipant::id() const noexcept +{ + return participant_configuration_->id; +} + +bool DdsCommonParticipant::is_rtps_kind() const noexcept +{ + return false; +} + +bool DdsCommonParticipant::is_repeater() const noexcept +{ + return false; +} + +void DdsCommonParticipant::init() +{ + eprosima::fastdds::dds::DomainParticipantQos pqos; + pqos.name(this->id()); + + // Set Type LookUp to ON + // TODO this should not be true in all cases, but lets keep it for now + pqos.wire_protocol().builtin.typelookup_config.use_server = true; + pqos.wire_protocol().builtin.typelookup_config.use_client = true; + + // Configure Participant transports + if (participant_configuration_->transport == participants::types::TransportProtocol::builtin) + { + if (!participant_configuration_->whitelist.empty()) + { + pqos.transport().use_builtin_transports = false; + + std::shared_ptr shm_transport = + std::make_shared(); + pqos.transport().user_transports.push_back(shm_transport); + + std::shared_ptr udp_transport = + rtps::SimpleParticipant::configure_upd_transport_(participant_configuration_->whitelist); + pqos.transport().user_transports.push_back(udp_transport); + } + } + else if (participant_configuration_->transport == participants::types::TransportProtocol::shm) + { + pqos.transport().use_builtin_transports = false; + + std::shared_ptr shm_transport = + std::make_shared(); + pqos.transport().user_transports.push_back(shm_transport); + } + else if (participant_configuration_->transport == participants::types::TransportProtocol::udp) + { + pqos.transport().use_builtin_transports = false; + + std::shared_ptr udp_transport = + rtps::SimpleParticipant::configure_upd_transport_(participant_configuration_->whitelist); + pqos.transport().user_transports.push_back(udp_transport); + pqos.transport().user_transports.push_back(udp_transport); + } + + // Participant discovery filter participant_configuration_ + switch (participant_configuration_->ignore_participant_flags) + { + case core::types::IgnoreParticipantFlags::no_filter: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; + break; + case core::types::IgnoreParticipantFlags::filter_different_host: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; + break; + case core::types::IgnoreParticipantFlags::filter_different_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_same_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; + break; + case core::types::IgnoreParticipantFlags::filter_different_and_same_process: + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + static_cast( + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS | + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS); + break; + default: + break; + } + + // Force DDS entities to be created disabled + // NOTE: this is very dangerous because we are modifying a global variable (and a not thread safe one) in a + // local function. + // However, this is required, otherwise we could fail in two points: + // - receive in this object, maybe in same thread a discovery callback, which could use this variable + // (e.g to check if the Participant called is this one) + // - lose a discovery callback + fastdds::dds::DomainParticipantFactoryQos original_fact_qos; + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->get_qos( + original_fact_qos); + + fastdds::dds::DomainParticipantFactoryQos fact_qos; + fact_qos.entity_factory().autoenable_created_entities = false; + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_qos( + fact_qos); + + // CREATE THE PARTICIPANT + dds_participant_ = eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->create_participant( + participant_configuration_->domain, + pqos, + this); + + dds_participant_->enable(); + + // Restore default DomainParticipantQoS (create enabled entities) after creating and enabling this participant + // WARNING: not thread safe at the moment of this writing, see note above. + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->set_qos( + original_fact_qos); + + if (dds_participant_ == nullptr) + { + throw utils::InitializationException("Error creating DDS Participant."); + } + + dds_publisher_ = dds_participant_->create_publisher(fastdds::dds::PublisherQos()); + dds_subscriber_ = dds_participant_->create_subscriber(fastdds::dds::SubscriberQos()); +} + +core::types::Endpoint DdsCommonParticipant::simulate_endpoint_( + const DdsTopic& topic) const +{ + core::types::Endpoint endpoint; + endpoint.kind = core::types::EndpointKind::reader; + endpoint.guid = core::types::Guid::new_unique_guid(); + endpoint.topic = topic; + endpoint.discoverer_participant_id = this->id(); + + return endpoint; +} + +} /* namespace dds */ +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp new file mode 100644 index 000000000..d4b66a3ef --- /dev/null +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp @@ -0,0 +1,226 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +using namespace eprosima::ddspipe::core; +using namespace eprosima::ddspipe::core::types; +using namespace eprosima::fastrtps::types; + +DynTypesPublicationParticipant::DynTypesPublicationParticipant( + const std::shared_ptr& participant_configuration, + const std::shared_ptr& discovery_database) + : dds::DdsCommonParticipant(participant_configuration) +{ + // Create Internal Writer to receive Type Objects + // TODO: study why couldn't do with bind and try it, better than create a lambda + auto participant_callback = [this](ddspipe::core::IRoutingData& data) + { + return this->receive_type_object_(data); + }; + + type_object_writer_ = std::make_shared( + participant_configuration->id, + participant_callback); + + discovery_database->add_endpoint( + simulate_endpoint_(type_object_topic()) + ); +} + +DynTypesPublicationParticipant::~DynTypesPublicationParticipant() +{ + for (auto& writer : writers_) + { + dds_publisher_->delete_datawriter(writer.second.second); + dds_participant_->delete_topic(writer.second.first); + } +} + +std::shared_ptr DynTypesPublicationParticipant::create_writer( + const ITopic& topic) +{ + if (is_type_object_topic(topic)) + { + return this->type_object_writer_; + } + + // This participant does not write anything + return std::make_shared(); +} + +std::shared_ptr DynTypesPublicationParticipant::create_reader( + const ITopic& topic) +{ + // In case of a DDS topic, check if datawriter must be created, or already exist + if (topic.internal_type_discriminator() == INTERNAL_TOPIC_TYPE_RTPS) + { + std::lock_guard _(mutex_); + create_empty_datawriter_nts_(dynamic_cast(topic)); + } + + // This participant does not read anything + return std::make_shared(); +} + +utils::ReturnCode DynTypesPublicationParticipant::receive_type_object_( + eprosima::fastrtps::types::DynamicType_ptr dynamic_type) +{ + // All this function is protected + std::lock_guard _(mutex_); + + auto type_name = dynamic_type->get_name(); + + // First, check if the type already exist. If so, nothing to do + auto it = types_discovered_.find(type_name); + if (it != types_discovered_.end()) + { + return utils::ReturnCode::RETCODE_OK; + } + + // It is a new type, so add it to the types + types_discovered_[type_name] = dynamic_type; + + // Register Participant + eprosima::fastdds::dds::TypeSupport type(new eprosima::fastrtps::types::DynamicPubSubType(dynamic_type)); + type->auto_fill_type_information(true); + type->auto_fill_type_object(true); + dds_participant_->register_type(type); + + // Check if any topic already discovered uses this new type. If so create data writer + std::vector topics_to_create_writers; + for (const auto& writer : writers_) + { + if (writer.first.type_name == type_name) + { + // This should always be nullptr, but just in case + if (nullptr == writer.second.second) + { + topics_to_create_writers.push_back(writer.first); + } + } + } + + for (const auto& topic : topics_to_create_writers) + { + create_empty_datawriter_nts_(topic); + } + + return utils::ReturnCode::RETCODE_OK; +} + +void DynTypesPublicationParticipant::create_empty_datawriter_nts_( + const core::types::DdsTopic& topic) +{ + auto topic_name = topic.topic_name(); + auto type_name = topic.type_name; + + // Check if topic already exist + auto it_topic = writers_.find(topic); + + // If not, create it empty + if (it_topic == writers_.end()) + { + writers_[topic] = {nullptr, nullptr}; + it_topic = writers_.find(topic); + } + + // If writer already created, finish function + if (nullptr != it_topic->second.first) + { + return; + } + + // Writer is not created + // Check if type exists. If so, create writer, if not leave + auto it_type = types_discovered_.find(type_name); + if (it_type == types_discovered_.end()) + { + return; + } + + // Create topic + fastdds::dds::Topic* dds_topic = dds_participant_->create_topic( + topic.topic_name(), + topic.type_name, + default_topic_qos_(topic) + ); + + // Create writer + writers_[topic] = + { + dds_topic, + dds_publisher_->create_datawriter( + dds_topic, + default_empty_datawriter_qos_(topic)) + }; +} + +fastdds::dds::DataWriterQos +DynTypesPublicationParticipant::default_empty_datawriter_qos_( + const core::types::DdsTopic& topic) noexcept +{ + // TODO decide which qos to use. Using less restrictive + auto qos = fastdds::dds::DataWriterQos(); + qos.durability().kind = fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS; + qos.reliability().kind = fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS; + + return qos; +} + +fastdds::dds::TopicQos +DynTypesPublicationParticipant::default_topic_qos_( + const core::types::DdsTopic& topic) noexcept +{ + // TODO decide which qos to use + return fastdds::dds::TopicQos(); +} + +utils::ReturnCode DynTypesPublicationParticipant::receive_type_object_( + ddspipe::core::IRoutingData& data) +{ + // Assuming that data is of type required + auto& dynamic_type_data = dynamic_cast(data); + return receive_type_object_(dynamic_type_data.dynamic_type); +} + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp new file mode 100644 index 000000000..d7be10bba --- /dev/null +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp @@ -0,0 +1,166 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +using namespace eprosima::ddspipe::core; +using namespace eprosima::ddspipe::core::types; +using namespace eprosima::fastrtps::types; + +DynTypesSubscriptionParticipant::DynTypesSubscriptionParticipant( + const std::shared_ptr& participant_configuration, + const std::shared_ptr& discovery_database) + : dds::DdsCommonParticipant(participant_configuration) + , type_object_reader_(std::make_shared( + participant_configuration->id)) +{ + discovery_database->add_endpoint( + simulate_endpoint_(type_object_topic()) + ); +} + +std::shared_ptr DynTypesSubscriptionParticipant::create_writer( + const ITopic& topic) +{ + // This participant does not write anything + return std::make_shared(); +} + +std::shared_ptr DynTypesSubscriptionParticipant::create_reader( + const ITopic& topic) +{ + if (is_type_object_topic(topic)) + { + return this->type_object_reader_; + } + + // This participant does not read anything + return std::make_shared(); +} + +void DynTypesSubscriptionParticipant::on_type_discovery( + eprosima::fastdds::dds::DomainParticipant* /* participant */, + const fastrtps::rtps::SampleIdentity& /* request_sample_id */, + const fastrtps::string_255& /* topic */, + const fastrtps::types::TypeIdentifier* identifier, + const fastrtps::types::TypeObject* object, + fastrtps::types::DynamicType_ptr dyn_type) +{ + if (nullptr != dyn_type) + { + // Register type obj in singleton factory + TypeObjectFactory::get_instance()->add_type_object( + dyn_type->get_name(), identifier, object); + internal_notify_type_object_(dyn_type); + } +} + +void DynTypesSubscriptionParticipant::on_type_information_received( + eprosima::fastdds::dds::DomainParticipant* participant, + const fastrtps::string_255 /* topic_name */, + const fastrtps::string_255 type_name, + const fastrtps::types::TypeInformation& type_information) +{ + std::string type_name_ = type_name.to_string(); + const TypeIdentifier* type_identifier = nullptr; + const TypeObject* type_object = nullptr; + DynamicType_ptr dynamic_type(nullptr); + + // Check if complete identifier already present in factory + type_identifier = TypeObjectFactory::get_instance()->get_type_identifier(type_name_, true); + if (type_identifier) + { + type_object = TypeObjectFactory::get_instance()->get_type_object(type_name_, true); + } + + // If complete not found, try with minimal + if (!type_object) + { + type_identifier = TypeObjectFactory::get_instance()->get_type_identifier(type_name_, false); + if (type_identifier) + { + type_object = TypeObjectFactory::get_instance()->get_type_object(type_name_, false); + } + } + + // Build dynamic type if type identifier and object found in factory + if (type_identifier && type_object) + { + dynamic_type = TypeObjectFactory::get_instance()->build_dynamic_type(type_name_, type_identifier, type_object); + } + + // Request type object through TypeLookup if not present in factory, or if type building failed + if (!dynamic_type) + { + std::function callback( + [this] + (const std::string& /* type_name */, const DynamicType_ptr type) + { + this->internal_notify_type_object_(type); + }); + // Registering type and creating reader + participant->register_remote_type( + type_information, + type_name_, + callback); + } + else + { + internal_notify_type_object_(dynamic_type); + } +} + +void DynTypesSubscriptionParticipant::internal_notify_type_object_( + eprosima::fastrtps::types::DynamicType_ptr dynamic_type) +{ + logInfo(DDSPIPE_DYNTYPES_PARTICIPANT, + "Participant " << this->id() << " discovered type object " << dynamic_type->get_name()); + + // Create data containing Dynamic Type + auto data = std::make_unique(); + data->dynamic_type = dynamic_type; // TODO: add constructor with param + + // Insert new data in internal reader queue + type_object_reader_->simulate_data_reception(std::move(data)); +} + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ diff --git a/ddspipe_participants/src/cpp/writer/auxiliar/InternalWriter.cpp b/ddspipe_participants/src/cpp/writer/auxiliar/InternalWriter.cpp new file mode 100644 index 000000000..e26813e37 --- /dev/null +++ b/ddspipe_participants/src/cpp/writer/auxiliar/InternalWriter.cpp @@ -0,0 +1,38 @@ +// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace eprosima { +namespace ddspipe { +namespace participants { + +InternalWriter::InternalWriter( + const ddspipe::core::types::ParticipantId& participant_id, + const std::function& callback) + : ddspipe::participants::BaseWriter(participant_id) + , callback_(callback) +{ + // Do nothing +} + +utils::ReturnCode InternalWriter::write_nts_( + ddspipe::core::IRoutingData& data) noexcept +{ + return callback_(data); +} + +} /* namespace participants */ +} /* namespace ddspipe */ +} /* namespace eprosima */ From 38a522bdf2c2227063f424335d3ebc0a5752f1df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20L=C3=B3pez=20Fern=C3=A1ndez?= Date: Mon, 24 Apr 2023 18:10:53 +0200 Subject: [PATCH 08/15] Disable type information/object filling for RTPS entities MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juan López Fernández --- ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp | 4 ++++ ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp index b74bb87a4..db6e23d4c 100644 --- a/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp +++ b/ddspipe_participants/src/cpp/reader/rtps/CommonReader.cpp @@ -365,6 +365,10 @@ fastrtps::TopicAttributes CommonReader::reckon_topic_attributes_( att.historyQos.kind = eprosima::fastdds::dds::HistoryQosPolicyKind::KEEP_LAST_HISTORY_QOS; att.historyQos.depth = topic.topic_qos.history_depth; + // Disable type object/information filling + att.auto_fill_type_information = false; + att.auto_fill_type_object = false; + return att; } diff --git a/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp b/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp index ba4eac76c..9cce51ad7 100644 --- a/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp +++ b/ddspipe_participants/src/cpp/writer/rtps/CommonWriter.cpp @@ -378,6 +378,10 @@ fastrtps::TopicAttributes CommonWriter::reckon_topic_attributes_( att.topicName = topic.m_topic_name; att.topicDataType = topic.type_name; + // Disable type object/information filling + att.auto_fill_type_information = false; + att.auto_fill_type_object = false; + return att; } From 4a274e2a16eb754d07cabd8b72fd61d593e4f8d6 Mon Sep 17 00:00:00 2001 From: jparisu Date: Tue, 25 Apr 2023 15:48:55 +0200 Subject: [PATCH 09/15] Fix closure seg fault Signed-off-by: jparisu --- .../dynamic_types/DynTypesPublicationParticipant.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp index d4b66a3ef..1f03a0230 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp @@ -69,8 +69,11 @@ DynTypesPublicationParticipant::~DynTypesPublicationParticipant() { for (auto& writer : writers_) { - dds_publisher_->delete_datawriter(writer.second.second); - dds_participant_->delete_topic(writer.second.first); + if (nullptr != writer.second.second) + { + dds_publisher_->delete_datawriter(writer.second.second); + dds_participant_->delete_topic(writer.second.first); + } } } From a2b3af1e4c8a83f9973c027b8b0acc94be6574bf Mon Sep 17 00:00:00 2001 From: jparisu Date: Tue, 25 Apr 2023 15:51:37 +0200 Subject: [PATCH 10/15] Add logs to DynTypes pub Signed-off-by: jparisu --- .../dynamic_types/DynTypesPublicationParticipant.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp index 1f03a0230..b2b57b9fe 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp @@ -111,6 +111,8 @@ utils::ReturnCode DynTypesPublicationParticipant::receive_type_object_( auto type_name = dynamic_type->get_name(); + logInfo(DDSPIPE_DYNTYPES_DDSPARTICIPANT, "Received type object for type " << type_name); + // First, check if the type already exist. If so, nothing to do auto it = types_discovered_.find(type_name); if (it != types_discovered_.end()) @@ -194,6 +196,8 @@ void DynTypesPublicationParticipant::create_empty_datawriter_nts_( dds_topic, default_empty_datawriter_qos_(topic)) }; + + logInfo(DDSPIPE_DYNTYPES_DDSPARTICIPANT, "Created writer for topic " << topic); } fastdds::dds::DataWriterQos From afd31e791d8df3b413103a64324754633af044dc Mon Sep 17 00:00:00 2001 From: jparisu Date: Tue, 25 Apr 2023 16:38:40 +0200 Subject: [PATCH 11/15] Fix topic qos on DDS writers associated with topic Signed-off-by: jparisu --- .../dynamic_types/DynTypesPublicationParticipant.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp index b2b57b9fe..7ac0c1573 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp @@ -206,8 +206,16 @@ DynTypesPublicationParticipant::default_empty_datawriter_qos_( { // TODO decide which qos to use. Using less restrictive auto qos = fastdds::dds::DataWriterQos(); - qos.durability().kind = fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS; - qos.reliability().kind = fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS; + qos.durability().kind = + ( topic.topic_qos.is_transient_local() ? + fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS : + fastdds::dds::DurabilityQosPolicyKind::VOLATILE_DURABILITY_QOS + ); + qos.reliability().kind = + ( topic.topic_qos.is_reliable() ? + fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS : + fastdds::dds::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS + ); return qos; } From bb32efe43f66eeaf0282852c37f832ccf68d6a8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20L=C3=B3pez=20Fern=C3=A1ndez?= Date: Thu, 27 Apr 2023 09:10:19 +0200 Subject: [PATCH 12/15] Remove redundant simulate_endpoint method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juan López Fernández --- .../participant/dds/DdsCommonParticipant.hpp | 4 ---- .../src/cpp/participant/dds/DdsCommonParticipant.cpp | 12 ------------ .../dynamic_types/DynTypesPublicationParticipant.cpp | 4 +++- .../DynTypesSubscriptionParticipant.cpp | 5 ++++- 4 files changed, 7 insertions(+), 18 deletions(-) diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp index 4330e902a..0c77954b0 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp @@ -62,10 +62,6 @@ class DdsCommonParticipant : public core::IParticipant , public eprosima::fastdd DdsCommonParticipant( const std::shared_ptr& participant_configuration); - DDSPIPE_PARTICIPANTS_DllAPI - core::types::Endpoint simulate_endpoint_( - const core::types::DdsTopic& topic) const; - eprosima::fastdds::dds::DomainParticipant* dds_participant_; eprosima::fastdds::dds::Publisher* dds_publisher_; eprosima::fastdds::dds::Subscriber* dds_subscriber_; diff --git a/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp index a77bcd8cf..9b11c10e5 100644 --- a/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp @@ -181,18 +181,6 @@ void DdsCommonParticipant::init() dds_subscriber_ = dds_participant_->create_subscriber(fastdds::dds::SubscriberQos()); } -core::types::Endpoint DdsCommonParticipant::simulate_endpoint_( - const DdsTopic& topic) const -{ - core::types::Endpoint endpoint; - endpoint.kind = core::types::EndpointKind::reader; - endpoint.guid = core::types::Guid::new_unique_guid(); - endpoint.topic = topic; - endpoint.discoverer_participant_id = this->id(); - - return endpoint; -} - } /* namespace dds */ } /* namespace participants */ } /* namespace ddspipe */ diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp index 7ac0c1573..dc98c0cdb 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp @@ -29,11 +29,13 @@ #include +#include #include #include #include #include #include + #include namespace eprosima { @@ -61,7 +63,7 @@ DynTypesPublicationParticipant::DynTypesPublicationParticipant( participant_callback); discovery_database->add_endpoint( - simulate_endpoint_(type_object_topic()) + rtps::CommonParticipant::simulate_endpoint(type_object_topic(), this->id()) ); } diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp index d7be10bba..fb48b3193 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp @@ -30,10 +30,13 @@ #include #include + +#include #include #include #include #include + #include namespace eprosima { @@ -52,7 +55,7 @@ DynTypesSubscriptionParticipant::DynTypesSubscriptionParticipant( participant_configuration->id)) { discovery_database->add_endpoint( - simulate_endpoint_(type_object_topic()) + rtps::CommonParticipant::simulate_endpoint(type_object_topic(), this->id()) ); } From 326df6848564faaa1cc5c50b06dc5ec1909ced81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20L=C3=B3pez=20Fern=C3=A1ndez?= Date: Mon, 29 May 2023 14:05:13 +0200 Subject: [PATCH 13/15] Fix rebase issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juan López Fernández --- .../participant/rtps/CommonParticipant.hpp | 13 +- .../participant/rtps/SimpleParticipant.hpp | 3 - .../participant/dds/DdsCommonParticipant.cpp | 29 ++- .../participant/rtps/CommonParticipant.cpp | 216 +++++++++--------- 4 files changed, 135 insertions(+), 126 deletions(-) diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp index 81d5fcbc5..30129cf38 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/CommonParticipant.hpp @@ -158,6 +158,15 @@ class CommonParticipant const core::types::DdsTopic& topic, const core::types::ParticipantId& discoverer_id); + /** + * @brief Create a transport descriptor with whitelist and type given by specialization. + * + */ + template + DDSPIPE_PARTICIPANTS_DllAPI + static std::shared_ptr create_descriptor_( + std::set whitelist = {}); + protected: /** @@ -204,10 +213,6 @@ class CommonParticipant static fastrtps::rtps::RTPSParticipantAttributes reckon_participant_attributes_( const ParticipantConfiguration* participant_configuration); - template - static std::shared_ptr create_descriptor_( - std::set whitelist = {}); - ///// // VARIABLES diff --git a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp index 77d4cf1f9..ee18060e0 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/rtps/SimpleParticipant.hpp @@ -47,9 +47,6 @@ class SimpleParticipant : public CommonParticipant const std::shared_ptr& payload_pool, const std::shared_ptr& discovery_database); - static std::shared_ptr configure_upd_transport_( - std::set whitelist = {}); - protected: /** diff --git a/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp index 9b11c10e5..f3efd4a26 100644 --- a/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dds/DdsCommonParticipant.cpp @@ -30,7 +30,8 @@ #include -#include +#include + #include namespace eprosima { @@ -85,7 +86,7 @@ void DdsCommonParticipant::init() pqos.wire_protocol().builtin.typelookup_config.use_client = true; // Configure Participant transports - if (participant_configuration_->transport == participants::types::TransportProtocol::builtin) + if (participant_configuration_->transport == core::types::TransportDescriptors::builtin) { if (!participant_configuration_->whitelist.empty()) { @@ -96,11 +97,12 @@ void DdsCommonParticipant::init() pqos.transport().user_transports.push_back(shm_transport); std::shared_ptr udp_transport = - rtps::SimpleParticipant::configure_upd_transport_(participant_configuration_->whitelist); + rtps::CommonParticipant::create_descriptor_( + participant_configuration_->whitelist); pqos.transport().user_transports.push_back(udp_transport); } } - else if (participant_configuration_->transport == participants::types::TransportProtocol::shm) + else if (participant_configuration_->transport == core::types::TransportDescriptors::shm_only) { pqos.transport().use_builtin_transports = false; @@ -108,13 +110,14 @@ void DdsCommonParticipant::init() std::make_shared(); pqos.transport().user_transports.push_back(shm_transport); } - else if (participant_configuration_->transport == participants::types::TransportProtocol::udp) + else if (participant_configuration_->transport == core::types::TransportDescriptors::udp_only) { pqos.transport().use_builtin_transports = false; std::shared_ptr udp_transport = - rtps::SimpleParticipant::configure_upd_transport_(participant_configuration_->whitelist); - pqos.transport().user_transports.push_back(udp_transport); + rtps::CommonParticipant::create_descriptor_( + participant_configuration_->whitelist); + pqos.transport().user_transports.push_back(udp_transport); pqos.transport().user_transports.push_back(udp_transport); } @@ -122,16 +125,20 @@ void DdsCommonParticipant::init() switch (participant_configuration_->ignore_participant_flags) { case core::types::IgnoreParticipantFlags::no_filter: - pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::NO_FILTER; break; case core::types::IgnoreParticipantFlags::filter_different_host: - pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_HOST; break; case core::types::IgnoreParticipantFlags::filter_different_process: - pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_DIFFERENT_PROCESS; break; case core::types::IgnoreParticipantFlags::filter_same_process: - pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; + pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = + eprosima::fastrtps::rtps::ParticipantFilteringFlags_t::FILTER_SAME_PROCESS; break; case core::types::IgnoreParticipantFlags::filter_different_and_same_process: pqos.wire_protocol().builtin.discovery_config.ignoreParticipantFlags = diff --git a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp index af34101f0..709fa2a3e 100644 --- a/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/rtps/CommonParticipant.cpp @@ -280,6 +280,114 @@ core::types::Endpoint CommonParticipant::simulate_endpoint( return endpoint; } +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr udp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv4_correct(ip)) + { + udp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to UDP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv4. Discarding UDP whitelist interface " << ip << "."); + } + } + + return udp_transport; +} + +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr udp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv6_correct(ip)) + { + udp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to UDP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv6. Discarding UDP whitelist interface " << ip << "."); + } + } + + return udp_transport; +} + +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr tcp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv4_correct(ip)) + { + tcp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to TCP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv4. Discarding TCP whitelist interface " << ip << "."); + } + } + + return tcp_transport; +} + +template<> +std::shared_ptr +CommonParticipant::create_descriptor_( + std::set whitelist) +{ + std::shared_ptr tcp_transport = + std::make_shared(); + + for (const types::IpType& ip : whitelist) + { + if (types::Address::is_ipv6_correct(ip)) + { + tcp_transport->interfaceWhiteList.emplace_back(ip); + logInfo(DDSPIPE_COMMON_PARTICIPANT, + "Adding " << ip << " to TCP whitelist interfaces."); + } + else + { + // Invalid address, continue with next one + logWarning(DDSPIPE_COMMON_PARTICIPANT, + "Not valid IPv6. Discarding TCP whitelist interface " << ip << "."); + } + } + + return tcp_transport; +} + bool CommonParticipant::is_repeater() const noexcept { return configuration_->is_repeater; @@ -450,114 +558,6 @@ CommonParticipant::reckon_participant_attributes_( return params; } -template<> -std::shared_ptr -CommonParticipant::create_descriptor_( - std::set whitelist) -{ - std::shared_ptr udp_transport = - std::make_shared(); - - for (const types::IpType& ip : whitelist) - { - if (types::Address::is_ipv4_correct(ip)) - { - udp_transport->interfaceWhiteList.emplace_back(ip); - logInfo(DDSPIPE_COMMON_PARTICIPANT, - "Adding " << ip << " to UDP whitelist interfaces."); - } - else - { - // Invalid address, continue with next one - logWarning(DDSPIPE_COMMON_PARTICIPANT, - "Not valid IPv4. Discarding UDP whitelist interface " << ip << "."); - } - } - - return udp_transport; -} - -template<> -std::shared_ptr -CommonParticipant::create_descriptor_( - std::set whitelist) -{ - std::shared_ptr udp_transport = - std::make_shared(); - - for (const types::IpType& ip : whitelist) - { - if (types::Address::is_ipv6_correct(ip)) - { - udp_transport->interfaceWhiteList.emplace_back(ip); - logInfo(DDSPIPE_COMMON_PARTICIPANT, - "Adding " << ip << " to UDP whitelist interfaces."); - } - else - { - // Invalid address, continue with next one - logWarning(DDSPIPE_COMMON_PARTICIPANT, - "Not valid IPv6. Discarding UDP whitelist interface " << ip << "."); - } - } - - return udp_transport; -} - -template<> -std::shared_ptr -CommonParticipant::create_descriptor_( - std::set whitelist) -{ - std::shared_ptr tcp_transport = - std::make_shared(); - - for (const types::IpType& ip : whitelist) - { - if (types::Address::is_ipv4_correct(ip)) - { - tcp_transport->interfaceWhiteList.emplace_back(ip); - logInfo(DDSPIPE_COMMON_PARTICIPANT, - "Adding " << ip << " to TCP whitelist interfaces."); - } - else - { - // Invalid address, continue with next one - logWarning(DDSPIPE_COMMON_PARTICIPANT, - "Not valid IPv4. Discarding TCP whitelist interface " << ip << "."); - } - } - - return tcp_transport; -} - -template<> -std::shared_ptr -CommonParticipant::create_descriptor_( - std::set whitelist) -{ - std::shared_ptr tcp_transport = - std::make_shared(); - - for (const types::IpType& ip : whitelist) - { - if (types::Address::is_ipv6_correct(ip)) - { - tcp_transport->interfaceWhiteList.emplace_back(ip); - logInfo(DDSPIPE_COMMON_PARTICIPANT, - "Adding " << ip << " to TCP whitelist interfaces."); - } - else - { - // Invalid address, continue with next one - logWarning(DDSPIPE_COMMON_PARTICIPANT, - "Not valid IPv6. Discarding TCP whitelist interface " << ip << "."); - } - } - - return tcp_transport; -} - } /* namespace rtps */ } /* namespace participants */ } /* namespace ddspipe */ From 26b5861c3d604753e15949dedb15efc1ec0f8cfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20L=C3=B3pez=20Fern=C3=A1ndez?= Date: Mon, 29 May 2023 14:22:13 +0200 Subject: [PATCH 14/15] Set Datawriters ownership QoS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juan López Fernández --- .../dynamic_types/DynTypesPublicationParticipant.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp index dc98c0cdb..dbd2355ff 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp @@ -218,6 +218,11 @@ DynTypesPublicationParticipant::default_empty_datawriter_qos_( fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS : fastdds::dds::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS ); + qos.ownership().kind = + ( topic.topic_qos.has_ownership() ? + fastdds::dds::OwnershipQosPolicyKind::EXCLUSIVE_OWNERSHIP_QOS : + fastdds::dds::OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS + ); return qos; } From ed5405ee7991cd1dd97b193d14d678fecb4a7e1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20L=C3=B3pez=20Fern=C3=A1ndez?= Date: Mon, 29 May 2023 14:23:08 +0200 Subject: [PATCH 15/15] Uncrustify MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juan López Fernández --- .../participant/dds/DdsCommonParticipant.hpp | 4 ++-- .../DynTypesPublicationParticipant.hpp | 6 +++--- .../DynTypesPublicationParticipant.cpp | 20 +++++++++---------- .../DynTypesSubscriptionParticipant.cpp | 4 ++-- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp index 0c77954b0..a03fea8b6 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dds/DdsCommonParticipant.hpp @@ -35,7 +35,7 @@ namespace dds { /** * TODO comment */ -class DdsCommonParticipant : public core::IParticipant , public eprosima::fastdds::dds::DomainParticipantListener +class DdsCommonParticipant : public core::IParticipant, public eprosima::fastdds::dds::DomainParticipantListener { public: @@ -60,7 +60,7 @@ class DdsCommonParticipant : public core::IParticipant , public eprosima::fastdd // TODO DDSPIPE_PARTICIPANTS_DllAPI DdsCommonParticipant( - const std::shared_ptr& participant_configuration); + const std::shared_ptr& participant_configuration); eprosima::fastdds::dds::DomainParticipant* dds_participant_; eprosima::fastdds::dds::Publisher* dds_publisher_; diff --git a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesPublicationParticipant.hpp b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesPublicationParticipant.hpp index 03d7f81e5..a086cb1ce 100644 --- a/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesPublicationParticipant.hpp +++ b/ddspipe_participants/include/ddspipe_participants/participant/dynamic_types/DynTypesPublicationParticipant.hpp @@ -71,19 +71,19 @@ class DynTypesPublicationParticipant : public dds::DdsCommonParticipant DDSPIPE_PARTICIPANTS_DllAPI void create_empty_datawriter_nts_( - const core::types::DdsTopic& topic); + const core::types::DdsTopic& topic); static DDSPIPE_PARTICIPANTS_DllAPI fastdds::dds::DataWriterQos default_empty_datawriter_qos_( - const core::types::DdsTopic& topic) noexcept; + const core::types::DdsTopic& topic) noexcept; static DDSPIPE_PARTICIPANTS_DllAPI fastdds::dds::TopicQos default_topic_qos_( - const core::types::DdsTopic& topic) noexcept; + const core::types::DdsTopic& topic) noexcept; std::map< core::types::DdsTopic, diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp index dbd2355ff..b87e18c3c 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesPublicationParticipant.cpp @@ -64,7 +64,7 @@ DynTypesPublicationParticipant::DynTypesPublicationParticipant( discovery_database->add_endpoint( rtps::CommonParticipant::simulate_endpoint(type_object_topic(), this->id()) - ); + ); } DynTypesPublicationParticipant::~DynTypesPublicationParticipant() @@ -188,7 +188,7 @@ void DynTypesPublicationParticipant::create_empty_datawriter_nts_( topic.topic_name(), topic.type_name, default_topic_qos_(topic) - ); + ); // Create writer writers_[topic] = @@ -204,32 +204,32 @@ void DynTypesPublicationParticipant::create_empty_datawriter_nts_( fastdds::dds::DataWriterQos DynTypesPublicationParticipant::default_empty_datawriter_qos_( - const core::types::DdsTopic& topic) noexcept + const core::types::DdsTopic& topic) noexcept { // TODO decide which qos to use. Using less restrictive auto qos = fastdds::dds::DataWriterQos(); qos.durability().kind = - ( topic.topic_qos.is_transient_local() ? + ( topic.topic_qos.is_transient_local() ? fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS : fastdds::dds::DurabilityQosPolicyKind::VOLATILE_DURABILITY_QOS - ); + ); qos.reliability().kind = - ( topic.topic_qos.is_reliable() ? + ( topic.topic_qos.is_reliable() ? fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS : fastdds::dds::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS - ); + ); qos.ownership().kind = - ( topic.topic_qos.has_ownership() ? + ( topic.topic_qos.has_ownership() ? fastdds::dds::OwnershipQosPolicyKind::EXCLUSIVE_OWNERSHIP_QOS : fastdds::dds::OwnershipQosPolicyKind::SHARED_OWNERSHIP_QOS - ); + ); return qos; } fastdds::dds::TopicQos DynTypesPublicationParticipant::default_topic_qos_( - const core::types::DdsTopic& topic) noexcept + const core::types::DdsTopic& topic) noexcept { // TODO decide which qos to use return fastdds::dds::TopicQos(); diff --git a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp index fb48b3193..bc09c9035 100644 --- a/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp +++ b/ddspipe_participants/src/cpp/participant/dynamic_types/DynTypesSubscriptionParticipant.cpp @@ -52,11 +52,11 @@ DynTypesSubscriptionParticipant::DynTypesSubscriptionParticipant( const std::shared_ptr& discovery_database) : dds::DdsCommonParticipant(participant_configuration) , type_object_reader_(std::make_shared( - participant_configuration->id)) + participant_configuration->id)) { discovery_database->add_endpoint( rtps::CommonParticipant::simulate_endpoint(type_object_topic(), this->id()) - ); + ); } std::shared_ptr DynTypesSubscriptionParticipant::create_writer(