From 2f8f02ae3adcced2c8cb37cd7f199fe1f646fee1 Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 4 Sep 2025 09:48:25 +1200 Subject: [PATCH 1/4] close downstream connections when upstream close --- src/amqproxy/upstream.cr | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 2b840c8..7f1b052 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -114,14 +114,21 @@ module AMQProxy end private def close_all_client_channels(code = 500_u16, reason = "UPSTREAM_ERROR") + clients = Set(Client).new @channels_lock.synchronize do return if @channels.empty? Log.debug { "Upstream connection closed, closing #{@channels.size} client channels" } @channels.each_value do |downstream_channel| - downstream_channel.close(code, reason) + clients << downstream_channel.client end @channels.clear end + + # Close all client connections that were using this upstream + clients.each do |client| + Log.debug { "Closing client connection due to upstream failure" } + client.close_connection(code, reason) + end end private def send_to_all_clients(frame : AMQ::Protocol::Frame::Connection) From 0623630b2e5b9f437713d00eb5145f367ba7d3ab Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 4 Sep 2025 11:25:32 +1200 Subject: [PATCH 2/4] some reorganisation --- src/amqproxy/upstream.cr | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 7f1b052..3ef5287 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -73,7 +73,7 @@ module AMQProxy when AMQ::Protocol::Frame::Heartbeat then send frame when AMQ::Protocol::Frame::Connection::Close Log.error { "Upstream closed connection: #{frame.reply_text} #{frame.reply_code}" } - close_all_client_channels(frame.reply_code, frame.reply_text) + close_all_downstream_client_connections(frame.reply_code, frame.reply_text) begin send AMQ::Protocol::Frame::Connection::CloseOk.new rescue WriteError @@ -106,29 +106,27 @@ module AMQProxy Log.info { "Connection error #{ex.inspect}" } unless socket.closed? ensure socket.close rescue nil - close_all_client_channels + close_all_downstream_client_connections end def closed? @socket.closed? end - private def close_all_client_channels(code = 500_u16, reason = "UPSTREAM_ERROR") + private def close_all_downstream_client_connections(code = 500_u16, reason = "UPSTREAM_ERROR") clients = Set(Client).new @channels_lock.synchronize do return if @channels.empty? - Log.debug { "Upstream connection closed, closing #{@channels.size} client channels" } - @channels.each_value do |downstream_channel| - clients << downstream_channel.client + @channels.each_value do |downstream_connection| + clients << downstream_connection.client + end + + clients.each do |client| + Log.debug { "Closing client connection due to upstream failure. Client: #{client}" } + client.close_connection(code, reason) end @channels.clear end - - # Close all client connections that were using this upstream - clients.each do |client| - Log.debug { "Closing client connection due to upstream failure" } - client.close_connection(code, reason) - end end private def send_to_all_clients(frame : AMQ::Protocol::Frame::Connection) From 4b064937ca1952926bf41537b0952c80fb0ba83d Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 4 Sep 2025 11:33:28 +1200 Subject: [PATCH 3/4] no need to include client --- src/amqproxy/upstream.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 3ef5287..e600d7f 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -122,7 +122,7 @@ module AMQProxy end clients.each do |client| - Log.debug { "Closing client connection due to upstream failure. Client: #{client}" } + Log.debug { "Closing client connection due to upstream failure." } client.close_connection(code, reason) end @channels.clear From 574805cd9059cb19bea051aca2cd8daabd8e86c9 Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 4 Sep 2025 12:25:17 +1200 Subject: [PATCH 4/4] log once not for each closure --- src/amqproxy/upstream.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index e600d7f..681bf26 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -117,12 +117,12 @@ module AMQProxy clients = Set(Client).new @channels_lock.synchronize do return if @channels.empty? + Log.debug { "Upstream connection closed, closing #{@channels.size} client connections" } @channels.each_value do |downstream_connection| clients << downstream_connection.client end clients.each do |client| - Log.debug { "Closing client connection due to upstream failure." } client.close_connection(code, reason) end @channels.clear