diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 2b840c8..681bf26 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -73,7 +73,7 @@ module AMQProxy when AMQ::Protocol::Frame::Heartbeat then send frame when AMQ::Protocol::Frame::Connection::Close Log.error { "Upstream closed connection: #{frame.reply_text} #{frame.reply_code}" } - close_all_client_channels(frame.reply_code, frame.reply_text) + close_all_downstream_client_connections(frame.reply_code, frame.reply_text) begin send AMQ::Protocol::Frame::Connection::CloseOk.new rescue WriteError @@ -106,19 +106,24 @@ module AMQProxy Log.info { "Connection error #{ex.inspect}" } unless socket.closed? ensure socket.close rescue nil - close_all_client_channels + close_all_downstream_client_connections end def closed? @socket.closed? end - private def close_all_client_channels(code = 500_u16, reason = "UPSTREAM_ERROR") + private def close_all_downstream_client_connections(code = 500_u16, reason = "UPSTREAM_ERROR") + clients = Set(Client).new @channels_lock.synchronize do return if @channels.empty? - Log.debug { "Upstream connection closed, closing #{@channels.size} client channels" } - @channels.each_value do |downstream_channel| - downstream_channel.close(code, reason) + Log.debug { "Upstream connection closed, closing #{@channels.size} client connections" } + @channels.each_value do |downstream_connection| + clients << downstream_connection.client + end + + clients.each do |client| + client.close_connection(code, reason) end @channels.clear end