Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions amqp/public_include/fty/messagebus2/amqp/MessageBusAmqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,42 @@ namespace fty::messagebus2::amqp {

// Default amqp end point
static auto constexpr DEFAULT_ENDPOINT{"amqp://127.0.0.1:5672"};

static auto constexpr BUS_IDENTITY{"AMQP"};
static const std::string TOPIC_PREFIX = "topic://";
static const std::string QUEUE_PREFIX = "queue://";

class MsgBusAmqp;
class AmqpClient;

class MessageBusAmqp final : public MessageBus
{
public:
MessageBusAmqp(const ClientName& clientName = utils::getClientId("MessageBusAmqp"), const Endpoint& endpoint = DEFAULT_ENDPOINT);

MessageBusAmqp(const std::string& clientName = utils::getClientId("MessageBusAmqp"), const Endpoint& endpoint = DEFAULT_ENDPOINT);
~MessageBusAmqp() = default;

MessageBusAmqp(MessageBusAmqp&&) = delete;
MessageBusAmqp& operator=(MessageBusAmqp&&) = delete;
MessageBusAmqp(const MessageBusAmqp&) = delete;
MessageBusAmqp& operator=(const MessageBusAmqp&) = delete;
MessageBusAmqp& operator = (MessageBusAmqp&&) = delete;
MessageBusAmqp(const MessageBusAmqp&) = delete;
MessageBusAmqp& operator = (const MessageBusAmqp&) = delete;

[[nodiscard]] fty::Expected<void, ComState> connect() noexcept override;
[[nodiscard]] fty::Expected<void, DeliveryState> send(const Message& msg) noexcept override;
[[nodiscard]] fty::Expected<void, DeliveryState> receive(
const Address& address, MessageListener&& func, const std::string& filter = {}) noexcept override;
[[nodiscard]] fty::Expected<void, DeliveryState> unreceive(const Address& address) noexcept override;
[[nodiscard]] fty::Expected<Message, DeliveryState> request(const Message& msg, int timeOut) noexcept override;

void setConnectionErrorListener(ConnectionErrorListener errorListener);

const Address& address, MessageListener&& messageListener, const std::string& filter = {}) noexcept override;
[[nodiscard]] fty::Expected<void, DeliveryState> unreceive(const Address& address) noexcept override;
void setConnectionErrorListener(ConnectionErrorListener errorListener = {});
// Sync request with timeout
[[nodiscard]] fty::Expected<Message, DeliveryState> request(const Message& message, int timeoutInSeconds) noexcept override;
[[nodiscard]] const ClientName& clientName() const noexcept override;
[[nodiscard]] const Identity& identity() const noexcept override;
[[nodiscard]] const Identity& identity() const noexcept override;

using MessageBus::receive;
private:
std::shared_ptr<MsgBusAmqp> m_busAmqp;
// Test if the service is available or not
bool isServiceAvailable();

// Client name
std::string m_clientName{};
// Amqp endpoint
Endpoint m_endpoint{};
// AmqpClient instance
std::shared_ptr<AmqpClient> m_clientPtr;
};

} // namespace fty::messagebus2::amqp
75 changes: 26 additions & 49 deletions amqp/src/AmqpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace fty::messagebus2::amqp {
using namespace fty::messagebus2;
using MessageListener = fty::messagebus2::MessageListener;

static auto constexpr TIMEOUT = std::chrono::seconds(5);
static auto constexpr TIMEOUT_MS = 5000;

AmqpClient::AmqpClient(const Endpoint& url)
: m_url(url), m_pool(std::make_shared<fty::messagebus2::utils::PoolWorker>(10))
Expand All @@ -68,7 +68,7 @@ void AmqpClient::on_container_start(proton::container& container)
container.connect(m_url, connectOpts().reconnect(reconnectOpts()));
} catch (const std::exception& e) {
logError("Exception {}", e.what());
m_connectPromise.set_value(ComState::ConnectFailed);
m_connectPromise.setValue(ComState::ConnectFailed);
}
}

Expand All @@ -87,19 +87,13 @@ void AmqpClient::on_connection_open(proton::connection& connection)
} else {
logDebug("Connected on url: {}", m_url);
}
m_connectPromise.set_value(ComState::Connected);
m_connectPromise.setValue(ComState::Connected);
}

void AmqpClient::on_connection_close(proton::connection&)
{
logDebug("Close connection ...");
// TODO: To rework with common promise class (protection against promise already satisfied)
try {
m_deconnectPromise.set_value();
}
catch (const std::exception& e) {
logWarn("m_deconnectPromise error: {}", e.what());
}
m_deconnectPromise.setValue();
}

void AmqpClient::on_connection_error(proton::connection& connection)
Expand Down Expand Up @@ -137,7 +131,7 @@ void AmqpClient::on_sendable(proton::sender& sender)
sender.send(m_message);
sender.close();
// TODO: To rework with common promise class (protection against promise already satisfied)
m_promiseSender.set_value();
m_promiseSender.setValue();
logDebug("Message sent");
}
catch (const std::exception& e) {
Expand All @@ -154,26 +148,14 @@ void AmqpClient::on_sender_close(proton::sender&)
void AmqpClient::on_receiver_open(proton::receiver& receiver)
{
logDebug("Waiting any message on target address: {}", receiver.source().address());
// TODO: To rework with common promise class (protection against promise already satisfied)
try {
m_promiseReceiver.set_value();
}
catch (const std::exception& e) {
logWarn("m_promiseReceiver error: {}", e.what());
}
m_promiseReceiver.setValue();
}

void AmqpClient::on_receiver_close(proton::receiver&)
{
logDebug("Close receiver ...");
// TODO: Wait stop receiver doesn't work as expected: exception !!!!
//try {
// TODO: To rework with common promise class (protection against promise already satisfied)
//m_promiseReceiver.set_value();
//}
//catch (const std::exception& e) {
// logWarn("m_promiseReceiver error: {}", e.what());
//}
//m_promiseReceiver.setValue();
}

void AmqpClient::on_error(const proton::error_condition& error)
Expand All @@ -185,7 +167,7 @@ void AmqpClient::on_transport_error(proton::transport& transport)
{
logError("Transport error: {}", transport.error().what());
// Reset connect promise in case of send or receive arrived before connection open
m_connectPromise = std::promise<fty::messagebus2::ComState>();
m_connectPromise.reset();
m_communicationState = ComState::Lost;
}

Expand All @@ -198,33 +180,29 @@ void AmqpClient::resetPromises()
{
std::lock_guard<std::mutex> lock(m_lock);
logDebug("Reset all promises");
m_connectPromise = std::promise<fty::messagebus2::ComState>();
m_deconnectPromise = std::promise<void>();
m_promiseSender = std::promise<void>();
m_promiseReceiver = std::promise<void>();
m_connectPromise.reset();
m_deconnectPromise.reset();
m_promiseSender.reset();
m_promiseReceiver.reset();
}

bool AmqpClient::isConnected() {
// Wait communication was restored
// test if connected
return (m_connection && m_connection.active() && connected() == ComState::Connected);
}

ComState AmqpClient::connected()
{
//logTrace("AmqpClient::connected m_communicationState={}", m_communicationState);
// Wait communication was restored
if (m_communicationState != ComState::Connected) {
auto connectFuture = m_connectPromise.get_future();
if (connectFuture.wait_for(TIMEOUT) != std::future_status::timeout) {
try {
m_communicationState = connectFuture.get();
} catch (const std::future_error& e) {
logError("Caught future error {}", e.what());
if (m_connectPromise.waitFor(TIMEOUT_MS)) {
if (auto value = m_connectPromise.getValue(); value) {
m_communicationState = *value;
}
} else {
m_communicationState = ComState::ConnectFailed;
}
}
//logTrace("AmqpClient::connected m_communicationState={}", m_communicationState);
return m_communicationState;
}

Expand All @@ -238,15 +216,15 @@ DeliveryState AmqpClient::send(const proton::message& msg)
auto deliveryState = DeliveryState::Rejected;
if (isConnected()) {
std::lock_guard<std::mutex> lock(m_lockMain);
m_promiseSender = std::promise<void>();
m_promiseSender.reset();
logDebug("Sending message to {} ...", msg.to());
m_message.clear();
m_message = msg;
m_connection.work_queue().add([=]() {
m_connection.open_sender(msg.to());
});
// Wait to know if the message has been sent or not
if (m_promiseSender.get_future().wait_for(TIMEOUT) != std::future_status::timeout) {
if (m_promiseSender.waitFor(TIMEOUT_MS)) {
deliveryState = DeliveryState::Accepted;
}
}
Expand All @@ -259,15 +237,15 @@ DeliveryState AmqpClient::receive(const Address& address, MessageListener messag
if (isConnected()) {
std::lock_guard<std::mutex> lock(m_lockMain);
logDebug("Set receiver to wait message(s) from {} ...", address);
m_promiseReceiver = std::promise<void>();
m_promiseReceiver.reset();

(!filter.empty()) ? setSubscriptions(filter, messageListener) : setSubscriptions(address, messageListener);

m_connection.work_queue().add([=]() {
m_connection.open_receiver(address, proton::receiver_options().auto_accept(true));
});

if (m_promiseReceiver.get_future().wait_for(TIMEOUT) != std::future_status::timeout) {
if (m_promiseReceiver.waitFor(TIMEOUT_MS)) {
deliveryState = DeliveryState::Accepted;
}
}
Expand Down Expand Up @@ -356,11 +334,11 @@ DeliveryState AmqpClient::unreceive(const Address& address)
isFound = true;
receiver.close();
deliveryState = DeliveryState::Accepted;
// TODO: Wait stop receiver dosn't work: exception !!!!
// TODO: Wait stop receiver doesn't work: exception !!!!
//m_connection.work_queue().add([&]() { receiver.close(); });
/*m_promiseReceiver = std::promise<void>();
/*m_promiseReceiver.reset();
m_connection.work_queue().add([&]() { receiver.close(); });
if (m_promiseReceiver.get_future().wait_for(TIMEOUT) != std::future_status::timeout) {
if (m_promiseReceiver.waitFor(TIMEOUT_MS)) {
logDebug("Receiver closed for {}", address);
deliveryState = DeliveryState::Accepted;
} else {
Expand Down Expand Up @@ -391,12 +369,11 @@ DeliveryState AmqpClient::unreceiveFilter(const std::string& filter)
void AmqpClient::close()
{
std::lock_guard<std::mutex> lock(m_lock);
m_deconnectPromise = std::promise<void>();
m_deconnectPromise.reset();
if (m_connection && m_connection.active()) {
m_connection.work_queue().add([=]() { m_connection.close(); });

auto deconnectFuture = m_deconnectPromise.get_future();
if (deconnectFuture.wait_for(TIMEOUT) == std::future_status::timeout) {
if (!m_deconnectPromise.waitFor(TIMEOUT_MS)) {
logError("AmqpClient::close De-connection timeout reached");
}
}
Expand Down
20 changes: 10 additions & 10 deletions amqp/src/AmqpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

#pragma once

#include "fty/messagebus2/amqp/MessageBusAmqp.h"
#include "MsgBusAmqpUtils.h"
#include <fty/messagebus2/MessageBus.h>
#include <fty/messagebus2/MessageBusStatus.h>
#include <future>
#include "fty/messagebus2/utils/MsgBusPoolWorker.hpp"
#include "fty/messagebus2/Promise.h"
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/delivery.hpp>
Expand All @@ -34,12 +34,12 @@
#include <proton/transport.hpp>
#include <proton/work_queue.hpp>

#include "fty/messagebus2/utils/MsgBusPoolWorker.hpp"

namespace fty::messagebus2::amqp {

using MessageListener = fty::messagebus2::MessageListener;
using SubScriptionListener = std::map<Address, MessageListener>;
class AmqpClient;
using AmqpClientPointer = std::shared_ptr<AmqpClient>;

class AmqpClient : public proton::messaging_handler
{
Expand Down Expand Up @@ -95,11 +95,11 @@ class AmqpClient : public proton::messaging_handler
std::mutex m_lockMain;

// Set of promise for synchronization
std::promise<fty::messagebus2::ComState> m_connectPromise;
std::promise<void> m_deconnectPromise;
std::promise<void> m_promiseSender;
std::promise<void> m_promiseReceiver;
std::promise<void> m_promiseSenderClose;
Promise<fty::messagebus2::ComState> m_connectPromise;
Promise<void> m_deconnectPromise;
Promise<void> m_promiseSender;
Promise<void> m_promiseReceiver;
Promise<void> m_promiseSenderClose;

void setSubscriptions(const Address& address, MessageListener messageListener);
void unsetSubscriptions(const Address& address);
Expand Down
Loading