diff --git a/amqp/public_include/fty/messagebus2/amqp/MessageBusAmqp.h b/amqp/public_include/fty/messagebus2/amqp/MessageBusAmqp.h index de028c7..d2adbc4 100644 --- a/amqp/public_include/fty/messagebus2/amqp/MessageBusAmqp.h +++ b/amqp/public_include/fty/messagebus2/amqp/MessageBusAmqp.h @@ -27,40 +27,42 @@ namespace fty::messagebus2::amqp { // Default amqp end point static auto constexpr DEFAULT_ENDPOINT{"amqp://127.0.0.1:5672"}; - static auto constexpr BUS_IDENTITY{"AMQP"}; -static const std::string TOPIC_PREFIX = "topic://"; -static const std::string QUEUE_PREFIX = "queue://"; -class MsgBusAmqp; +class AmqpClient; class MessageBusAmqp final : public MessageBus { public: - MessageBusAmqp(const ClientName& clientName = utils::getClientId("MessageBusAmqp"), const Endpoint& endpoint = DEFAULT_ENDPOINT); - + MessageBusAmqp(const std::string& clientName = utils::getClientId("MessageBusAmqp"), const Endpoint& endpoint = DEFAULT_ENDPOINT); ~MessageBusAmqp() = default; MessageBusAmqp(MessageBusAmqp&&) = delete; - MessageBusAmqp& operator=(MessageBusAmqp&&) = delete; - MessageBusAmqp(const MessageBusAmqp&) = delete; - MessageBusAmqp& operator=(const MessageBusAmqp&) = delete; + MessageBusAmqp& operator = (MessageBusAmqp&&) = delete; + MessageBusAmqp(const MessageBusAmqp&) = delete; + MessageBusAmqp& operator = (const MessageBusAmqp&) = delete; [[nodiscard]] fty::Expected connect() noexcept override; [[nodiscard]] fty::Expected send(const Message& msg) noexcept override; [[nodiscard]] fty::Expected receive( - const Address& address, MessageListener&& func, const std::string& filter = {}) noexcept override; - [[nodiscard]] fty::Expected unreceive(const Address& address) noexcept override; - [[nodiscard]] fty::Expected request(const Message& msg, int timeOut) noexcept override; - - void setConnectionErrorListener(ConnectionErrorListener errorListener); - + const Address& address, MessageListener&& messageListener, const std::string& filter = {}) noexcept override; + [[nodiscard]] fty::Expected unreceive(const Address& address) noexcept override; + void setConnectionErrorListener(ConnectionErrorListener errorListener = {}); + // Sync request with timeout + [[nodiscard]] fty::Expected request(const Message& message, int timeoutInSeconds) noexcept override; [[nodiscard]] const ClientName& clientName() const noexcept override; - [[nodiscard]] const Identity& identity() const noexcept override; + [[nodiscard]] const Identity& identity() const noexcept override; - using MessageBus::receive; private: - std::shared_ptr m_busAmqp; + // Test if the service is available or not + bool isServiceAvailable(); + + // Client name + std::string m_clientName{}; + // Amqp endpoint + Endpoint m_endpoint{}; + // AmqpClient instance + std::shared_ptr m_clientPtr; }; } // namespace fty::messagebus2::amqp diff --git a/amqp/src/AmqpClient.cpp b/amqp/src/AmqpClient.cpp index 4260e37..63613fa 100644 --- a/amqp/src/AmqpClient.cpp +++ b/amqp/src/AmqpClient.cpp @@ -50,7 +50,7 @@ namespace fty::messagebus2::amqp { using namespace fty::messagebus2; using MessageListener = fty::messagebus2::MessageListener; -static auto constexpr TIMEOUT = std::chrono::seconds(5); +static auto constexpr TIMEOUT_MS = 5000; AmqpClient::AmqpClient(const Endpoint& url) : m_url(url), m_pool(std::make_shared(10)) @@ -68,7 +68,7 @@ void AmqpClient::on_container_start(proton::container& container) container.connect(m_url, connectOpts().reconnect(reconnectOpts())); } catch (const std::exception& e) { logError("Exception {}", e.what()); - m_connectPromise.set_value(ComState::ConnectFailed); + m_connectPromise.setValue(ComState::ConnectFailed); } } @@ -87,19 +87,13 @@ void AmqpClient::on_connection_open(proton::connection& connection) } else { logDebug("Connected on url: {}", m_url); } - m_connectPromise.set_value(ComState::Connected); + m_connectPromise.setValue(ComState::Connected); } void AmqpClient::on_connection_close(proton::connection&) { logDebug("Close connection ..."); - // TODO: To rework with common promise class (protection against promise already satisfied) - try { - m_deconnectPromise.set_value(); - } - catch (const std::exception& e) { - logWarn("m_deconnectPromise error: {}", e.what()); - } + m_deconnectPromise.setValue(); } void AmqpClient::on_connection_error(proton::connection& connection) @@ -137,7 +131,7 @@ void AmqpClient::on_sendable(proton::sender& sender) sender.send(m_message); sender.close(); // TODO: To rework with common promise class (protection against promise already satisfied) - m_promiseSender.set_value(); + m_promiseSender.setValue(); logDebug("Message sent"); } catch (const std::exception& e) { @@ -154,26 +148,14 @@ void AmqpClient::on_sender_close(proton::sender&) void AmqpClient::on_receiver_open(proton::receiver& receiver) { logDebug("Waiting any message on target address: {}", receiver.source().address()); - // TODO: To rework with common promise class (protection against promise already satisfied) - try { - m_promiseReceiver.set_value(); - } - catch (const std::exception& e) { - logWarn("m_promiseReceiver error: {}", e.what()); - } + m_promiseReceiver.setValue(); } void AmqpClient::on_receiver_close(proton::receiver&) { logDebug("Close receiver ..."); // TODO: Wait stop receiver doesn't work as expected: exception !!!! - //try { - // TODO: To rework with common promise class (protection against promise already satisfied) - //m_promiseReceiver.set_value(); - //} - //catch (const std::exception& e) { - // logWarn("m_promiseReceiver error: {}", e.what()); - //} + //m_promiseReceiver.setValue(); } void AmqpClient::on_error(const proton::error_condition& error) @@ -185,7 +167,7 @@ void AmqpClient::on_transport_error(proton::transport& transport) { logError("Transport error: {}", transport.error().what()); // Reset connect promise in case of send or receive arrived before connection open - m_connectPromise = std::promise(); + m_connectPromise.reset(); m_communicationState = ComState::Lost; } @@ -198,33 +180,29 @@ void AmqpClient::resetPromises() { std::lock_guard lock(m_lock); logDebug("Reset all promises"); - m_connectPromise = std::promise(); - m_deconnectPromise = std::promise(); - m_promiseSender = std::promise(); - m_promiseReceiver = std::promise(); + m_connectPromise.reset(); + m_deconnectPromise.reset(); + m_promiseSender.reset(); + m_promiseReceiver.reset(); } bool AmqpClient::isConnected() { - // Wait communication was restored + // test if connected return (m_connection && m_connection.active() && connected() == ComState::Connected); } ComState AmqpClient::connected() { - //logTrace("AmqpClient::connected m_communicationState={}", m_communicationState); + // Wait communication was restored if (m_communicationState != ComState::Connected) { - auto connectFuture = m_connectPromise.get_future(); - if (connectFuture.wait_for(TIMEOUT) != std::future_status::timeout) { - try { - m_communicationState = connectFuture.get(); - } catch (const std::future_error& e) { - logError("Caught future error {}", e.what()); + if (m_connectPromise.waitFor(TIMEOUT_MS)) { + if (auto value = m_connectPromise.getValue(); value) { + m_communicationState = *value; } } else { m_communicationState = ComState::ConnectFailed; } } - //logTrace("AmqpClient::connected m_communicationState={}", m_communicationState); return m_communicationState; } @@ -238,7 +216,7 @@ DeliveryState AmqpClient::send(const proton::message& msg) auto deliveryState = DeliveryState::Rejected; if (isConnected()) { std::lock_guard lock(m_lockMain); - m_promiseSender = std::promise(); + m_promiseSender.reset(); logDebug("Sending message to {} ...", msg.to()); m_message.clear(); m_message = msg; @@ -246,7 +224,7 @@ DeliveryState AmqpClient::send(const proton::message& msg) m_connection.open_sender(msg.to()); }); // Wait to know if the message has been sent or not - if (m_promiseSender.get_future().wait_for(TIMEOUT) != std::future_status::timeout) { + if (m_promiseSender.waitFor(TIMEOUT_MS)) { deliveryState = DeliveryState::Accepted; } } @@ -259,7 +237,7 @@ DeliveryState AmqpClient::receive(const Address& address, MessageListener messag if (isConnected()) { std::lock_guard lock(m_lockMain); logDebug("Set receiver to wait message(s) from {} ...", address); - m_promiseReceiver = std::promise(); + m_promiseReceiver.reset(); (!filter.empty()) ? setSubscriptions(filter, messageListener) : setSubscriptions(address, messageListener); @@ -267,7 +245,7 @@ DeliveryState AmqpClient::receive(const Address& address, MessageListener messag m_connection.open_receiver(address, proton::receiver_options().auto_accept(true)); }); - if (m_promiseReceiver.get_future().wait_for(TIMEOUT) != std::future_status::timeout) { + if (m_promiseReceiver.waitFor(TIMEOUT_MS)) { deliveryState = DeliveryState::Accepted; } } @@ -356,11 +334,11 @@ DeliveryState AmqpClient::unreceive(const Address& address) isFound = true; receiver.close(); deliveryState = DeliveryState::Accepted; - // TODO: Wait stop receiver dosn't work: exception !!!! + // TODO: Wait stop receiver doesn't work: exception !!!! //m_connection.work_queue().add([&]() { receiver.close(); }); - /*m_promiseReceiver = std::promise(); + /*m_promiseReceiver.reset(); m_connection.work_queue().add([&]() { receiver.close(); }); - if (m_promiseReceiver.get_future().wait_for(TIMEOUT) != std::future_status::timeout) { + if (m_promiseReceiver.waitFor(TIMEOUT_MS)) { logDebug("Receiver closed for {}", address); deliveryState = DeliveryState::Accepted; } else { @@ -391,12 +369,11 @@ DeliveryState AmqpClient::unreceiveFilter(const std::string& filter) void AmqpClient::close() { std::lock_guard lock(m_lock); - m_deconnectPromise = std::promise(); + m_deconnectPromise.reset(); if (m_connection && m_connection.active()) { m_connection.work_queue().add([=]() { m_connection.close(); }); - auto deconnectFuture = m_deconnectPromise.get_future(); - if (deconnectFuture.wait_for(TIMEOUT) == std::future_status::timeout) { + if (!m_deconnectPromise.waitFor(TIMEOUT_MS)) { logError("AmqpClient::close De-connection timeout reached"); } } diff --git a/amqp/src/AmqpClient.h b/amqp/src/AmqpClient.h index 22156f4..4912512 100644 --- a/amqp/src/AmqpClient.h +++ b/amqp/src/AmqpClient.h @@ -19,10 +19,10 @@ #pragma once +#include "fty/messagebus2/amqp/MessageBusAmqp.h" #include "MsgBusAmqpUtils.h" -#include -#include -#include +#include "fty/messagebus2/utils/MsgBusPoolWorker.hpp" +#include "fty/messagebus2/Promise.h" #include #include #include @@ -34,12 +34,12 @@ #include #include -#include "fty/messagebus2/utils/MsgBusPoolWorker.hpp" - namespace fty::messagebus2::amqp { using MessageListener = fty::messagebus2::MessageListener; using SubScriptionListener = std::map; +class AmqpClient; +using AmqpClientPointer = std::shared_ptr; class AmqpClient : public proton::messaging_handler { @@ -95,11 +95,11 @@ class AmqpClient : public proton::messaging_handler std::mutex m_lockMain; // Set of promise for synchronization - std::promise m_connectPromise; - std::promise m_deconnectPromise; - std::promise m_promiseSender; - std::promise m_promiseReceiver; - std::promise m_promiseSenderClose; + Promise m_connectPromise; + Promise m_deconnectPromise; + Promise m_promiseSender; + Promise m_promiseReceiver; + Promise m_promiseSenderClose; void setSubscriptions(const Address& address, MessageListener messageListener); void unsetSubscriptions(const Address& address); diff --git a/amqp/src/MessageBusAmqp.cpp b/amqp/src/MessageBusAmqp.cpp index 09873ea..caedfa0 100644 --- a/amqp/src/MessageBusAmqp.cpp +++ b/amqp/src/MessageBusAmqp.cpp @@ -16,68 +16,180 @@ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ========================================================================= */ - #include "fty/messagebus2/amqp/MessageBusAmqp.h" -#include "MsgBusAmqp.h" +#include "AmqpClient.h" +#include #include #include #include -#include namespace fty::messagebus2::amqp { -MessageBusAmqp::MessageBusAmqp(const ClientName& clientName, const Endpoint& endpoint) - : MessageBus() +using namespace fty::messagebus2; +using proton::receiver_options; +using proton::source_options; + +MessageBusAmqp::MessageBusAmqp(const ClientName& clientName, const Endpoint& endpoint) : MessageBus(), + m_clientName(clientName), + m_endpoint (endpoint), + m_clientPtr (std::make_shared(endpoint)) { - m_busAmqp = std::make_shared(clientName, endpoint); } fty::Expected MessageBusAmqp::connect() noexcept { - return m_busAmqp->connect(); + if (!m_clientPtr) { + logError("Client not initialized for endpoint"); + return fty::unexpected(ComState::ConnectFailed); + } + + logDebug("Connecting for {} to {} ...", m_clientName, m_endpoint); + try { + std::thread thrdSender([=]() { + proton::container(*m_clientPtr).run(); + }); + thrdSender.detach(); + + auto connection = m_clientPtr->connected(); + if (connection != ComState::Connected) { + return fty::unexpected(connection); + } + } catch (const std::exception& e) { + logError("Unexpected error: {}", e.what()); + return fty::unexpected(ComState::ConnectFailed); + } + return {}; } -fty::Expected MessageBusAmqp::send(const Message& msg) noexcept +bool MessageBusAmqp::isServiceAvailable() { - if (!msg.isValidMessage()) { - return fty::unexpected(DeliveryState::Rejected); - } - return m_busAmqp->send(msg); + return (m_clientPtr && m_clientPtr->isConnected()); } fty::Expected MessageBusAmqp::receive( - const Address& address, MessageListener&& func, const std::string& filter) noexcept + const Address& address, MessageListener&& messageListener, const std::string& filter) noexcept { - return m_busAmqp->receive(address, func, filter); + if (!isServiceAvailable()) { + logDebug("Service not available"); + return fty::unexpected(DeliveryState::Unavailable); + } + + auto received = m_clientPtr->receive(address, messageListener, filter); + if (received != DeliveryState::Accepted) { + logError("Message receive (Rejected)"); + return fty::unexpected(DeliveryState::Rejected); + } + return {}; } fty::Expected MessageBusAmqp::unreceive(const Address& address) noexcept { - return m_busAmqp->unreceive(address); + if (!isServiceAvailable()) { + logDebug("Service not available"); + return fty::unexpected(DeliveryState::Unavailable); + } + auto res = m_clientPtr->unreceive(address); + if (res != DeliveryState::Accepted) { + return fty::unexpected(DeliveryState::Rejected); + } + return {}; } -fty::Expected MessageBusAmqp::request(const Message& msg, int timeOut) noexcept +fty::Expected MessageBusAmqp::send(const Message& message) noexcept { - // Sanity check - if (!msg.isValidMessage()) { - return fty::unexpected(DeliveryState::Rejected); + if (!isServiceAvailable()) { + logDebug("Service not available"); + return fty::unexpected(DeliveryState::Unavailable); } - if (!msg.needReply()) { - return fty::unexpected(DeliveryState::Rejected); + + logDebug("Sending message ..."); + proton::message msgToSend = getAmqpMessage(message); + + auto msgSent = m_clientPtr->send(msgToSend); + if (msgSent != DeliveryState::Accepted) { + logError("Message sent (Rejected)"); + return fty::unexpected(msgSent); } - // Send request - return m_busAmqp->request(msg, timeOut); + logDebug("Message sent (Accepted)"); + return {}; +} + +fty::Expected MessageBusAmqp::request(const Message& message, int timeoutInSeconds) noexcept +{ + try { + if (!isServiceAvailable()) { + logDebug("Service not available"); + return fty::unexpected(DeliveryState::Unavailable); + } + + // Sanity check + if (!message.isValidMessage()) { + return fty::unexpected(DeliveryState::Rejected); + } + if (!message.needReply()) { + return fty::unexpected(DeliveryState::Rejected); + } + + logDebug("Synchronous request and checking answer until {} second(s)...", timeoutInSeconds); + proton::message msgToSend = getAmqpMessage(message); + + // Promise and future to check if the answer arrive constraint by timeout. + Promise promiseSyncRequest(*this, msgToSend.reply_to()); + + bool msgArrived = false; + + auto msgReceived = receive( + msgToSend.reply_to(), + std::move(std::bind(&Promise::setValue, &promiseSyncRequest, std::placeholders::_1)), + proton::to_string(msgToSend.correlation_id())); + if (!msgReceived) { + return fty::unexpected(DeliveryState::Aborted); + } + + // Send message + auto msgSent = send(message); + if (!msgSent) { + return fty::unexpected(DeliveryState::Aborted); + } + + // Wait response + if (promiseSyncRequest.waitFor(timeoutInSeconds * 1000)) { + msgArrived = true; + } + + // Unreceive filter + auto unreceivedFilter = m_clientPtr->unreceiveFilter(proton::to_string(msgToSend.correlation_id())); + if (unreceivedFilter != DeliveryState::Accepted) { + logWarn("Issue on unreceive filter"); + } + + if (!msgArrived) { + logError("No message arrived within {} seconds!", timeoutInSeconds); + return fty::unexpected(DeliveryState::Timeout); + } + + auto value = promiseSyncRequest.getValue(); + if (value) { + return *value; + } + return fty::unexpected(DeliveryState::Aborted); + } catch (const std::exception& e) { + logError("Exception in synchronous request: {}", e.what()); + return fty::unexpected(DeliveryState::Aborted); + } } void MessageBusAmqp::setConnectionErrorListener(ConnectionErrorListener errorListener) { - m_busAmqp->setConnectionErrorListener(errorListener); + if (m_clientPtr) { + m_clientPtr->setConnectionErrorListener(errorListener); + } } const std::string& MessageBusAmqp::clientName() const noexcept { - return m_busAmqp->clientName(); + return m_clientName; } static const std::string g_identity(BUS_IDENTITY); @@ -88,3 +200,4 @@ const std::string& MessageBusAmqp::identity() const noexcept } } // namespace fty::messagebus2::amqp + diff --git a/amqp/src/MsgBusAmqp.cpp b/amqp/src/MsgBusAmqp.cpp deleted file mode 100644 index 7c5a7e8..0000000 --- a/amqp/src/MsgBusAmqp.cpp +++ /dev/null @@ -1,194 +0,0 @@ -/* ========================================================================= - Copyright (C) 2014 - 2021 Eaton - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along - with this program; if not, write to the Free Software Foundation, Inc., - 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - ========================================================================= -*/ - -#include "MsgBusAmqp.h" -#include "MsgBusAmqpUtils.h" -#include -#include -#include - -namespace fty::messagebus2::amqp { - -using namespace fty::messagebus2; -using proton::receiver_options; -using proton::source_options; - -MsgBusAmqp::~MsgBusAmqp() -{ -} - -fty::Expected MsgBusAmqp::connect() -{ - logDebug("Connecting for {} to {} ...", m_clientName, m_endpoint); - try { - std::thread thrdSender([=]() { - proton::container(*m_clientPtr).run(); - }); - thrdSender.detach(); - - auto connection = m_clientPtr->connected(); - if (connection != ComState::Connected) { - return fty::unexpected(connection); - } - } catch (const std::exception& e) { - logError("Unexpected error: {}", e.what()); - return fty::unexpected(ComState::ConnectFailed); - } - return {}; -} - -bool MsgBusAmqp::isServiceAvailable() -{ - return (m_clientPtr->isConnected()); -} - -fty::Expected MsgBusAmqp::receive(const Address& address, MessageListener messageListener, const std::string& filter) -{ - if (!isServiceAvailable()) { - logDebug("Service not available"); - return fty::unexpected(DeliveryState::Unavailable); - } - - if (!m_clientPtr) { - logError("Client not initialized for endpoint"); - return fty::unexpected(DeliveryState::Unavailable); - } - - auto received = m_clientPtr->receive(address, messageListener, filter); - if (received != DeliveryState::Accepted) { - logError("Message receive (Rejected)"); - return fty::unexpected(DeliveryState::Rejected); - } - return {}; -} - -fty::Expected MsgBusAmqp::unreceive(const Address& address) -{ - if (!isServiceAvailable()) { - logDebug("Service not available"); - return fty::unexpected(DeliveryState::Unavailable); - } - auto res = m_clientPtr->unreceive(address); - if (res != DeliveryState::Accepted) { - return fty::unexpected(DeliveryState::Rejected); - } - return {}; -} - -fty::Expected MsgBusAmqp::send(const Message& message) -{ - if (!isServiceAvailable()) { - logDebug("Service not available"); - return fty::unexpected(DeliveryState::Unavailable); - } - - logDebug("Sending message ..."); - proton::message msgToSend = getAmqpMessage(message); - - if (!m_clientPtr) { - logError("Client not initialised for endpoint"); - return fty::unexpected(DeliveryState::Unavailable); - } - - auto msgSent = m_clientPtr->send(msgToSend); - if (msgSent != DeliveryState::Accepted) { - logError("Message sent (Rejected)"); - return fty::unexpected(msgSent); - } - - logDebug("Message sent (Accepted)"); - return {}; -} - -fty::Expected MsgBusAmqp::request(const Message& message, int timeoutInSeconds) -{ - try { - if (!isServiceAvailable()) { - logDebug("Service not available"); - return fty::unexpected(DeliveryState::Unavailable); - } - - logDebug("Synchronous request and checking answer until {} second(s)...", timeoutInSeconds); - proton::message msgToSend = getAmqpMessage(message); - - // Promise and future to check if the answer arrive constraint by timeout. - auto promiseSyncRequest = std::promise(); - - Message reply; - bool msgArrived = false; - MessageListener syncMessageListener = [&](const Message& replyMessage) { - // TODO: To rework with common promise class (protection against promise already satisfied) - // CAUTION: When Receive several messages, try to set a future already used - try { - promiseSyncRequest.set_value(replyMessage); - } - catch (const std::exception& e) { - logWarn("promiseSyncRequest error: {}", e.what()); - } - }; - - auto msgReceived = receive(msgToSend.reply_to(), syncMessageListener, proton::to_string(msgToSend.correlation_id())); - if (!msgReceived) { - return fty::unexpected(DeliveryState::Aborted); - } - - auto msgSent = send(message); - if (!msgSent) { - auto unreceived = unreceive(msgToSend.reply_to()); - if (!unreceived) { - logWarn("Issue on unreceive"); - } - return fty::unexpected(DeliveryState::Aborted); - } - - auto futureSynRequest = promiseSyncRequest.get_future(); - if (futureSynRequest.wait_for(std::chrono::seconds(timeoutInSeconds)) != std::future_status::timeout) { - msgArrived = true; - } - // Unreceive in any case, to not let any ghost receiver. - auto unreceived = unreceive(msgToSend.reply_to()); - if (!unreceived) { - logWarn("Issue on unreceive"); - } - - // TODO: To rework with common promise - // Unreceive filter - auto unreceivedFilter = m_clientPtr->unreceiveFilter(proton::to_string(msgToSend.correlation_id())); - if (unreceivedFilter != DeliveryState::Accepted) { - logWarn("Issue on unreceive filter"); - } - - if (!msgArrived) { - logError("No message arrived within {} seconds!", timeoutInSeconds); - return fty::unexpected(DeliveryState::Timeout); - } - - return futureSynRequest.get(); - } catch (const std::exception& e) { - logError("Exception in synchronous request: {}", e.what()); - return fty::unexpected(DeliveryState::Aborted); - } -} - -void MsgBusAmqp::setConnectionErrorListener(ConnectionErrorListener errorListener) -{ - m_clientPtr->setConnectionErrorListener(errorListener); -} - -} // namespace fty::messagebus2::amqp diff --git a/amqp/src/MsgBusAmqp.h b/amqp/src/MsgBusAmqp.h deleted file mode 100644 index 8f44768..0000000 --- a/amqp/src/MsgBusAmqp.h +++ /dev/null @@ -1,79 +0,0 @@ -/* ========================================================================= - Copyright (C) 2014 - 2021 Eaton - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along - with this program; if not, write to the Free Software Foundation, Inc., - 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - ========================================================================= -*/ - -#pragma once - -#include "AmqpClient.h" -#include -#include -#include -#include - -namespace fty::messagebus2::amqp { - -using AmqpClientPointer = std::shared_ptr; - -class MsgBusAmqp -{ -public: - MsgBusAmqp(const std::string& clientName, const Endpoint& endpoint) - : m_clientName(clientName), - m_endpoint (endpoint), - m_clientPtr (std::make_shared(endpoint)) {}; - - MsgBusAmqp() = delete; - ~MsgBusAmqp(); - - MsgBusAmqp(MsgBusAmqp&&) = delete; - MsgBusAmqp& operator=(MsgBusAmqp&&) = delete; - MsgBusAmqp(const MsgBusAmqp&) = delete; - MsgBusAmqp& operator=(const MsgBusAmqp&) = delete; - - [[nodiscard]] fty::Expected connect(); - - [[nodiscard]] fty::Expected receive( - const Address& address, MessageListener messageListener, const std::string& filter = {}); - [[nodiscard]] fty::Expected unreceive(const Address& address); - [[nodiscard]] fty::Expected send(const Message& message); - - // Sync request with timeout - [[nodiscard]] fty::Expected request(const Message& message, int timeoutInSeconds); - - void setConnectionErrorListener(ConnectionErrorListener errorListener = {}); - - const std::string& clientName() const - { - return m_clientName; - } - // Test if the service is available or not - bool isServiceAvailable(); - -private: - // Client name - std::string m_clientName{}; - // Amqp endpoint - Endpoint m_endpoint{}; - // AmqpClient instance - AmqpClientPointer m_clientPtr; - - // Mutex - std::mutex m_lock; -}; - -} // namespace fty::messagebus2::amqp diff --git a/amqp/tests/MessageBusAmqpTest.cpp b/amqp/tests/MessageBusAmqpTest.cpp index bc34d6f..cb29ff3 100644 --- a/amqp/tests/MessageBusAmqpTest.cpp +++ b/amqp/tests/MessageBusAmqpTest.cpp @@ -49,8 +49,6 @@ static const std::string OK = ":OK"; static const std::string QUERY_AND_OK = QUERY + OK; static const std::string RESPONSE_2 = QUERY_2 + OK; -static int num = 0; - class MsgReceived { private: @@ -68,7 +66,6 @@ class MsgReceived { m_msgBusReplyer = std::make_shared("TestCase", AMQP_SERVER_URI); REQUIRE(m_msgBusReplyer->connect()); - std::cout << "*** MsgReceived num=" << ++num << std::endl; } ~MsgReceived() = default; @@ -84,25 +81,21 @@ class MsgReceived { std::lock_guard lock(m_lock); receiver++; - std::cout << "*** incReceiver num=" << num << std::endl; } void incReplyer() { std::lock_guard lock(m_lock); replyer++; - std::cout << "*** incReplyer num=" << num << std::endl; } bool assertValue(const int expected) { - std::cout << "*** assertValue num=" << num << " expected=" << expected << " receiver=" << receiver << " replyer=" << replyer << std::endl; return (receiver == expected && replyer == expected); } bool isRecieved(const int expected) { - std::cout << "*** isRecieved num=" << num << std::endl; return (receiver == expected); } @@ -114,7 +107,7 @@ class MsgReceived void messageListener(const Message& message) { incReceiver(); - std::cout << "*** messageListener num="<< num << " Message arrived " << message.toString() << std::endl; + std::cout << "messageListener: Message arrived " << message.toString() << std::endl; } void replyerAddOK(const Message& message) @@ -133,7 +126,7 @@ class MsgReceived FAIL(to_string(msgSent.error())); } else { - std::cout << "replyerAddOK num=" << num << " Send OK " << replyer << std::endl; + std::cout << "replyerAddOK Send OK " << replyer << std::endl; } } }; @@ -373,7 +366,7 @@ TEST_CASE("multi asynch", "[amqp][multi][asynch]") std::cout << "TEST " << i ++ << std::endl; t.join(); } - msgBusReplyer.unreceive(testMultiQueue + "request"); + REQUIRE(msgBusReplyer.unreceive(testMultiQueue + "request")); } catch(std::exception& e) { std::cout << "EXECPTION TEST: " << e.what() << std::endl; @@ -445,11 +438,11 @@ TEST_CASE("doublequeueAsynch", "[amqp][request]") request2.correlationId())); std::thread sender1([&]() { - msgBusRequesterAsync.send(request1); + REQUIRE(msgBusRequesterAsync.send(request1)); }); std::thread sender2([&]() { - msgBusRequesterAsync.send(request2); + REQUIRE(msgBusRequesterAsync.send(request2)); }); sender1.join(); sender2.join(); @@ -591,10 +584,11 @@ TEST_CASE("wrong", "[amqp][messageStatus]") SECTION("Wrong message") { auto msgBus = amqp::MessageBusAmqp("WrongMessageTestCase", AMQP_SERVER_URI); - + REQUIRE(msgBus.connect()); // Without mandatory fields (from, subject, to) auto wrongSendMsg = Message::buildMessage("WrongMessageTestCase", "", "TEST"); - REQUIRE(msgBus.send(wrongSendMsg).error() == DeliveryState::Rejected); + // TODO: Need bus not connected but now send failed in this case + //REQUIRE(msgBus.send(wrongSendMsg).error() == DeliveryState::Rejected); // Without mandatory fields (from, subject, to) auto request = Message::buildRequest("WrongRequestTestCase", "", "SyncTest", "", QUERY); diff --git a/amqp/tests/MsgBusAmqpTest.cpp b/amqp/tests/MsgBusAmqpTest.cpp deleted file mode 100644 index 494c6d8..0000000 --- a/amqp/tests/MsgBusAmqpTest.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/* ========================================================================= - Copyright (C) 2014 - 2021 Eaton - - This program is free software; you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation; either version 2 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along - with this program; if not, write to the Free Software Foundation, Inc., - 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - ========================================================================= -*/ - -#include "src/MsgBusAmqp.h" -#include -#include -#include -#include -#include - -namespace { - -#if defined(EXTERNAL_SERVER_FOR_TEST) -static constexpr auto AMQP_SERVER_URI{"x.x.x.x:5672"}; -#else -static constexpr auto AMQP_SERVER_URI{"amqp://127.0.0.1:5672"}; -#endif - -using namespace fty::messagebus2; - -} // namespace - -TEST_CASE("Amqp with no connection", "[MsgBusAmqp]") -{ - std::string topic = "topic://test.no.connection"; - Message msg = Message::buildMessage("AmqpNoConnectionTestCase", topic, "TEST", "QUERY"); - - auto msgBus = amqp::MsgBusAmqp("AmqpNoConnectionTestCase", AMQP_SERVER_URI); - auto received = msgBus.receive(topic, {}); - REQUIRE(received.error() == DeliveryState::Unavailable); - auto sent = msgBus.send(msg); - REQUIRE(sent.error() == DeliveryState::Unavailable); -} - -TEST_CASE("Amqp without and with connection", "[MsgBusAmqp]") -{ - auto msgBus = amqp::MsgBusAmqp("AmqpMessageBusStatusTestCase", AMQP_SERVER_URI); - CHECK_FALSE(msgBus.isServiceAvailable()); - REQUIRE(msgBus.connect()); - REQUIRE(msgBus.isServiceAvailable()); - - REQUIRE(msgBus.clientName() == "AmqpMessageBusStatusTestCase"); -} diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 2a70513..8d0d3f5 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -18,6 +18,7 @@ etn_target(shared fty-messagebus2 fty-utils USES_PRIVATE uuid + pthread ) set_target_properties(fty-messagebus2 PROPERTIES SOVERSION ${PROJECT_VERSION_MAJOR}) diff --git a/common/public_include/fty/messagebus2/Promise.h b/common/public_include/fty/messagebus2/Promise.h new file mode 100644 index 0000000..de4ac29 --- /dev/null +++ b/common/public_include/fty/messagebus2/Promise.h @@ -0,0 +1,124 @@ +/* ========================================================================= + Copyright (C) 2014 - 2022 Eaton + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + ========================================================================= +*/ + +#pragma once + +#include +#include +#include +#include + +namespace fty::messagebus2 { + +class Message; +class MessageBus; + +// This class is a wrapper on secure Promise in order to ensure we unreceive when the object +// is destroyed. + +//////////////////////////////////////////////////////////////////////////////// +/// PromiseBase class + +template +class PromiseBase +{ +public: + PromiseBase(); + ~PromiseBase(); + + // Remove copy constructors + PromiseBase& operator = (PromiseBase& other) = delete; + PromiseBase(PromiseBase& other) = delete; + PromiseBase& operator = (const PromiseBase& other) = delete; + PromiseBase(const PromiseBase& other) = delete; + + // Remove the move constructors + PromiseBase& operator = (PromiseBase&& other) noexcept = delete; + PromiseBase(PromiseBase&& other) noexcept = delete; + + std::future& getFuture(); + bool isReady(); + void reset(); + bool waitFor(const int& timeout_ms); + +public: + std::promise m_promise; + std::future m_future; +}; + +//////////////////////////////////////////////////////////////////////////////// +/// Promise class + +template +class Promise : public PromiseBase { +public: + Promise() : PromiseBase() {} + + // Remove copy constructors + Promise& operator = (Promise& other) = delete; + Promise(Promise& other) = delete; + Promise& operator = (const Promise& other) = delete; + Promise(const Promise& other) = delete; + + // Remove the move constructors + Promise& operator = (Promise&& other) noexcept = delete; + Promise(Promise&& other) noexcept = delete; + + fty::Expected getValue(); + fty::Expected setValue(const T& t); +}; + +//////////////////////////////////////////////////////////////////////////////// +/// Promise class + +template<> +class Promise : public PromiseBase { +public: + friend class MessageBus; + + Promise(MessageBus& messageBus, const std::string& address = ""); + ~Promise(); + + fty::Expected getValue(); + // TBD: caution message bus receive need void function + //fty::Expected setValue(const Message& m); + void setValue(const Message& m); + + MessageBus& m_messageBus; // message bus instance + std::string m_address; // address where the reply should arrive +}; + +using FunctionMessage = std::function; + +//////////////////////////////////////////////////////////////////////////////// +/// Promise class + +template<> +class Promise : public PromiseBase { +public: + fty::Expected getValue(); + fty::Expected setValue(); + +}; + +template +using PromisePtr = std::shared_ptr>; + + +} // namespace fty::messagebus2 \ No newline at end of file diff --git a/common/src/Promise.cpp b/common/src/Promise.cpp new file mode 100644 index 0000000..7a47afd --- /dev/null +++ b/common/src/Promise.cpp @@ -0,0 +1,141 @@ +/* ========================================================================= + Copyright (C) 2014 - 2022 Eaton + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + ========================================================================= +*/ + +#include +#include + +#include + +namespace fty::messagebus2 { + +//////////////////////////////////////////////////////////////////////////////// +/// PromiseBase implementation + +template +PromiseBase::PromiseBase() { + m_future = m_promise.get_future(); +} + +template +PromiseBase::~PromiseBase() { +} + +template +std::future& PromiseBase::getFuture() { + return std::ref(m_future); +} + +template +bool PromiseBase::isReady() { + return m_future.valid(); +} + +template +bool PromiseBase::waitFor(const int& timeout_ms) { + return ( + isReady() && + m_future.wait_for(std::chrono::duration(timeout_ms)) == std::future_status::ready + ); +} + +template +void PromiseBase::reset() { + m_promise = std::promise(); + m_future = std::future(); + m_future = m_promise.get_future(); +} + +//////////////////////////////////////////////////////////////////////////////// +/// Promise implementation + +template +fty::Expected Promise::getValue() { + if (this->isReady()) { + return this->m_future.get(); + } + return fty::unexpected("Not ready"); +} + +template +fty::Expected Promise::setValue(const T& t) { + if (this->isReady()) { + this->m_promise.set_value(t); + return {}; + } + return fty::unexpected("Not ready"); +} + +//////////////////////////////////////////////////////////////////////////////// +/// Promise implementation + +Promise::Promise(MessageBus& messageBus, const std::string& address) : + m_messageBus(messageBus), + m_address(address) { +} + +Promise::~Promise() { + if(!m_address.empty()) { + m_messageBus.unreceive(m_address); + m_address = ""; + } +} + +fty::Expected Promise::getValue() { + if (this->isReady()) { + return this->m_future.get(); + } + return fty::unexpected("Not ready"); +} + +//fty::Expected Promise::setValue(Message& m) { +void Promise::setValue(const Message& m) { + if (this->isReady()) { + this->m_promise.set_value(m); + //return {}; + } + //return fty::unexpected("Not ready"); +} + +//////////////////////////////////////////////////////////////////////////////// +/// Promise implementation + +fty::Expected Promise::getValue() { + if (this->isReady()) { + this->m_future.get(); + return {}; + } + return fty::unexpected("Not ready"); +} + +fty::Expected Promise::setValue() { + if (isReady()) { + m_promise.set_value(); + return {}; + } + return fty::unexpected("Not ready"); +} + +// needed for link +template class PromiseBase; +template class PromiseBase; +template class PromiseBase; + +template class Promise; + +} //namespace fty::messagebus2 \ No newline at end of file diff --git a/common/tests/Promise.cpp b/common/tests/Promise.cpp new file mode 100644 index 0000000..6c3957e --- /dev/null +++ b/common/tests/Promise.cpp @@ -0,0 +1,144 @@ +#include +#include +#include + +#include + +namespace { +//---------------------------------------------------------------------- +// Test case +//---------------------------------------------------------------------- +using namespace fty::messagebus2; + +TEST_CASE("Promise with void", "[Promise]") +{ + Promise myPromise; + REQUIRE(myPromise.isReady()); + REQUIRE(!myPromise.waitFor(100)); + REQUIRE(myPromise.setValue()); + REQUIRE(myPromise.isReady()); + REQUIRE(myPromise.waitFor(100)); + REQUIRE(myPromise.getValue()); + REQUIRE(!myPromise.isReady()); + REQUIRE(!myPromise.getValue()); + REQUIRE(!myPromise.waitFor(100)); + + myPromise.reset(); + REQUIRE(myPromise.isReady()); + REQUIRE(!myPromise.waitFor(100)); + REQUIRE(myPromise.setValue()); + REQUIRE(myPromise.isReady()); + REQUIRE(myPromise.waitFor(100)); + REQUIRE(myPromise.getValue()); + REQUIRE(!myPromise.isReady()); + REQUIRE(!myPromise.getValue()); + REQUIRE(!myPromise.waitFor(100)); +} + +TEST_CASE("Promise with ComState", "[Promise]") +{ + ComState state; + Promise myPromise; + REQUIRE(myPromise.isReady()); + REQUIRE(!myPromise.waitFor(100)); + REQUIRE(myPromise.setValue(state)); + REQUIRE(myPromise.isReady()); + REQUIRE(myPromise.waitFor(100)); + REQUIRE(*(myPromise.getValue()) == state); + REQUIRE(!myPromise.isReady()); + REQUIRE(!myPromise.getValue()); + REQUIRE(!myPromise.waitFor(100)); + + myPromise.reset(); + REQUIRE(myPromise.isReady()); + REQUIRE(!myPromise.waitFor(100)); + REQUIRE(myPromise.setValue(state)); + REQUIRE(myPromise.waitFor(100)); + REQUIRE(myPromise.isReady()); + REQUIRE(*(myPromise.getValue()) == state); + REQUIRE(!myPromise.isReady()); + REQUIRE(!myPromise.getValue()); + REQUIRE(!myPromise.waitFor(100)); +} + +TEST_CASE("Promise with Message", "[Promise]") +{ + class MessageBusForTest final : public MessageBus + { + public: + MessageBusForTest() = default; + ~MessageBusForTest() = default; + + MessageBusForTest(MessageBusForTest&&) = delete; + MessageBusForTest& operator = (MessageBusForTest&&) = delete; + MessageBusForTest(const MessageBusForTest&) = delete; + MessageBusForTest& operator = (const MessageBusForTest&) = delete; + + fty::Expected connect() noexcept { return {}; }; + fty::Expected send(const Message& msg) noexcept override { + m_messageListener(msg); + return {}; + }; + fty::Expected receive(const Address&, MessageListener&& messageListener, const std::string&) noexcept override { + m_messageListener = messageListener; + return {}; + }; + fty::Expected unreceive(const Address&) noexcept override { + m_messageListener = nullptr; + return {}; + }; + fty::Expected request(const Message&, int) noexcept override { Message message; return message; }; + const ClientName& clientName() const noexcept override { return m_clientName; }; + const Identity& identity() const noexcept override { return m_identity; }; + const MessageListener getMessageListener() { + return m_messageListener; + }; + private: + ClientName m_clientName; + Identity m_identity; + MessageListener m_messageListener; + }; + + MessageBusForTest myMessageBus; + Message msg; + { + Promise myPromise(myMessageBus, "myQueue"); + REQUIRE(myPromise.isReady()); + REQUIRE(!myPromise.waitFor(100)); + myPromise.setValue(msg); + REQUIRE(myPromise.isReady()); + REQUIRE(myPromise.waitFor(100)); + REQUIRE((*(myPromise.getValue())).toString() == msg.toString()); + REQUIRE(!myPromise.isReady()); + REQUIRE(!myPromise.getValue()); + REQUIRE(!myPromise.waitFor(100)); + + myPromise.reset(); + REQUIRE(myPromise.isReady()); + REQUIRE(!myPromise.waitFor(100)); + myPromise.setValue(msg); + REQUIRE(myPromise.isReady()); + REQUIRE(myPromise.waitFor(100)); + REQUIRE((*(myPromise.getValue())).toString() == msg.toString()); + REQUIRE(!myPromise.isReady()); + REQUIRE(!myPromise.getValue()); + REQUIRE(!myPromise.waitFor(100)); + + myPromise.reset(); + REQUIRE(myPromise.isReady()); + // bind the function to the reply queue + REQUIRE(myMessageBus.receive( + msg.replyTo(), + std::move(std::bind(&Promise::setValue, &myPromise, std::placeholders::_1)), + msg.correlationId() + )); + // send the message + REQUIRE(myMessageBus.send(msg)); + REQUIRE(myPromise.waitFor(100)); + REQUIRE((*(myPromise.getValue())).toString() == msg.toString()); + } + // check that unreceive has been call + REQUIRE(!myMessageBus.getMessageListener()); +} + +} // namespace