From c8f6745cdbafcebbbfda5a0e81a6c6125daf3105 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20B=C3=A4lter?= Date: Fri, 19 Dec 2025 09:53:00 +0100 Subject: [PATCH 1/4] Add specs for duplicate channel.close frame prevention Add two specs that verify the proxy does not send duplicate Channel::Close frames to upstream when clients crash: 1. Client-initiated close: Opens multiple channels, sends Channel::Close for all of them, then crashes before receiving CloseOk. Without the fix, this causes duplicate Channel::Close frames which triggers RabbitMQ to close the connection with "expected 'channel.open'" error. 2. Upstream-initiated close: Triggers a channel error (consume from non-existent queue), receives Channel::Close from upstream, then crashes before sending CloseOk. The proxy should have already sent CloseOk to upstream. These specs reproduce the issue reported where client crashes could cause the upstream connection to be closed, affecting all clients sharing that connection. --- spec/amqproxy/server_spec.cr | 159 +++++++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) diff --git a/spec/amqproxy/server_spec.cr b/spec/amqproxy/server_spec.cr index bd8c4ca..914d881 100644 --- a/spec/amqproxy/server_spec.cr +++ b/spec/amqproxy/server_spec.cr @@ -259,4 +259,163 @@ describe AMQProxy::Server do end end end + + it "does not send duplicate channel.close frames when client crashes after sending close" do + with_server do |server, proxy_url| + Fiber.yield + uri = URI.parse(proxy_url) + + # Open multiple channels and close them rapidly, then crash + # This increases the probability of triggering the race condition + num_channels = 10 + + socket = TCPSocket.new(uri.host.not_nil!, uri.port.not_nil!) + socket.sync = false + + # AMQP handshake + socket.write AMQ::Protocol::PROTOCOL_START_0_9_1.to_slice + socket.flush + AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) + start_ok = AMQ::Protocol::Frame::Connection::StartOk.new( + response: "\u0000guest\u0000guest", + client_properties: AMQ::Protocol::Table.new, + mechanism: "PLAIN", + locale: "en_US" + ) + socket.write_bytes start_ok, IO::ByteFormat::NetworkEndian + socket.flush + AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) + tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(2047_u16, 131072_u32, 0_u16) + socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian + socket.flush + open = AMQ::Protocol::Frame::Connection::Open.new(vhost: "/") + socket.write_bytes open, IO::ByteFormat::NetworkEndian + socket.flush + AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) + + # Open multiple channels + 1_u16.upto(num_channels.to_u16) do |ch_id| + channel_open = AMQ::Protocol::Frame::Channel::Open.new(ch_id) + socket.write_bytes channel_open, IO::ByteFormat::NetworkEndian + end + socket.flush + + # Read all Channel::OpenOk responses + num_channels.times do + AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) + .as(AMQ::Protocol::Frame::Channel::OpenOk) + end + + sleep 0.1.seconds + server.upstream_connections.should eq 1 + + # Send Channel::Close for ALL channels rapidly without waiting for CloseOk + 1_u16.upto(num_channels.to_u16) do |ch_id| + channel_close = AMQ::Protocol::Frame::Channel::Close.new(ch_id, 200_u16, "Normal close", 0_u16, 0_u16) + socket.write_bytes channel_close, IO::ByteFormat::NetworkEndian + end + socket.flush + + # Simulate crash: close socket abruptly WITHOUT waiting for any CloseOk + socket.close + + # Give the proxy time to process the disconnect + sleep 0.3.seconds + + # The upstream connection should still be open + # If duplicate Channel::Close was sent, upstream would have closed the connection + # with error: "expected 'channel.open'" + server.upstream_connections.should eq 1 + + # Verify we can still use the upstream connection with another client + AMQP::Client.start(proxy_url) do |conn| + ch = conn.channel + ch.basic_publish_confirm "test", "amq.fanout" + end + + sleep 0.1.seconds + server.upstream_connections.should eq 1 + end + end + + it "does not send duplicate channel.close when upstream initiates close and client crashes" do + with_server do |server, proxy_url| + Fiber.yield + uri = URI.parse(proxy_url) + + # Use low-level socket to control the exact frame sequence + socket = TCPSocket.new(uri.host.not_nil!, uri.port.not_nil!) + socket.sync = false + + # AMQP handshake + socket.write AMQ::Protocol::PROTOCOL_START_0_9_1.to_slice + socket.flush + AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) + start_ok = AMQ::Protocol::Frame::Connection::StartOk.new( + response: "\u0000guest\u0000guest", + client_properties: AMQ::Protocol::Table.new, + mechanism: "PLAIN", + locale: "en_US" + ) + socket.write_bytes start_ok, IO::ByteFormat::NetworkEndian + socket.flush + AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) + tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(2047_u16, 131072_u32, 0_u16) + socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian + socket.flush + open = AMQ::Protocol::Frame::Connection::Open.new(vhost: "/") + socket.write_bytes open, IO::ByteFormat::NetworkEndian + socket.flush + AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) + + # Open a channel + channel_open = AMQ::Protocol::Frame::Channel::Open.new(1_u16) + socket.write_bytes channel_open, IO::ByteFormat::NetworkEndian + socket.flush + AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) + + # Now the upstream connection should exist (created when channel was opened) + sleep 0.1.seconds + server.upstream_connections.should eq 1 + + # Trigger a channel error by consuming from non-existent queue + # This will cause the upstream to send Channel::Close + basic_consume = AMQ::Protocol::Frame::Basic::Consume.new( + channel: 1_u16, + reserved1: 0_u16, + queue: "non_existent_queue_#{rand}", + consumer_tag: "test", + no_local: false, + no_ack: false, + exclusive: false, + no_wait: false, + arguments: AMQ::Protocol::Table.new + ) + socket.write_bytes basic_consume, IO::ByteFormat::NetworkEndian + socket.flush + + # Read the Channel::Close from the server (forwarded through proxy) + frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) + frame.should be_a(AMQ::Protocol::Frame::Channel::Close) + + # Simulate client crash: close socket WITHOUT sending CloseOk + socket.close + + # Give the proxy time to process the disconnect + sleep 0.2.seconds + + # The upstream connection should still be open + # The proxy should have already sent CloseOk to upstream when it received Channel::Close + server.upstream_connections.should eq 1 + + # Verify we can still use the upstream connection + AMQP::Client.start(proxy_url) do |conn| + ch = conn.channel + ch.basic_publish_confirm "test", "amq.fanout" + end + + sleep 0.1.seconds + server.upstream_connections.should eq 1 + end + end end From 212b5c61970f61446d54a034dda73b02ce382eda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20B=C3=A4lter?= Date: Fri, 19 Dec 2025 12:43:50 +0100 Subject: [PATCH 2/4] Fix duplicate channel.close frames when client crashes When a client sends Channel::Close and then crashes before receiving CloseOk, the proxy would send a duplicate Channel::Close from close_all_upstream_channels, causing the upstream to close the connection with "expected 'channel.open'" error. Fix: Delete the channel from @channel_map immediately when forwarding Channel::Close, rather than waiting for CloseOk. This way, if the client crashes, close_all_upstream_channels won't find the channel and won't send a duplicate close. This is simpler than using nil as a sentinel value - just remove the channel from the map when it starts closing, regardless of which side initiated the close. --- spec/amqproxy/server_spec.cr | 131 ++++------------------------------- src/amqproxy/client.cr | 13 +++- 2 files changed, 25 insertions(+), 119 deletions(-) diff --git a/spec/amqproxy/server_spec.cr b/spec/amqproxy/server_spec.cr index 914d881..2ffc66a 100644 --- a/spec/amqproxy/server_spec.cr +++ b/spec/amqproxy/server_spec.cr @@ -263,158 +263,57 @@ describe AMQProxy::Server do it "does not send duplicate channel.close frames when client crashes after sending close" do with_server do |server, proxy_url| Fiber.yield - uri = URI.parse(proxy_url) # Open multiple channels and close them rapidly, then crash # This increases the probability of triggering the race condition num_channels = 10 - socket = TCPSocket.new(uri.host.not_nil!, uri.port.not_nil!) - socket.sync = false - - # AMQP handshake - socket.write AMQ::Protocol::PROTOCOL_START_0_9_1.to_slice - socket.flush - AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) - start_ok = AMQ::Protocol::Frame::Connection::StartOk.new( - response: "\u0000guest\u0000guest", - client_properties: AMQ::Protocol::Table.new, - mechanism: "PLAIN", - locale: "en_US" - ) - socket.write_bytes start_ok, IO::ByteFormat::NetworkEndian - socket.flush - AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) - tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(2047_u16, 131072_u32, 0_u16) - socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian - socket.flush - open = AMQ::Protocol::Frame::Connection::Open.new(vhost: "/") - socket.write_bytes open, IO::ByteFormat::NetworkEndian - socket.flush - AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) - - # Open multiple channels - 1_u16.upto(num_channels.to_u16) do |ch_id| - channel_open = AMQ::Protocol::Frame::Channel::Open.new(ch_id) - socket.write_bytes channel_open, IO::ByteFormat::NetworkEndian - end - socket.flush - - # Read all Channel::OpenOk responses - num_channels.times do - AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) - .as(AMQ::Protocol::Frame::Channel::OpenOk) - end - + conn = AMQP::Client.new(proxy_url).connect + channels = (1..num_channels).map { conn.channel } sleep 0.1.seconds server.upstream_connections.should eq 1 # Send Channel::Close for ALL channels rapidly without waiting for CloseOk - 1_u16.upto(num_channels.to_u16) do |ch_id| - channel_close = AMQ::Protocol::Frame::Channel::Close.new(ch_id, 200_u16, "Normal close", 0_u16, 0_u16) - socket.write_bytes channel_close, IO::ByteFormat::NetworkEndian + channels.each do |channel| + conn.write AMQ::Protocol::Frame::Channel::Close.new(channel.id, 200_u16, "Normal close", 0_u16, 0_u16) end - socket.flush # Simulate crash: close socket abruptly WITHOUT waiting for any CloseOk - socket.close + conn.@io.close # Give the proxy time to process the disconnect sleep 0.3.seconds - # The upstream connection should still be open + # The upstream connection should still be open (not 0) # If duplicate Channel::Close was sent, upstream would have closed the connection # with error: "expected 'channel.open'" server.upstream_connections.should eq 1 - - # Verify we can still use the upstream connection with another client - AMQP::Client.start(proxy_url) do |conn| - ch = conn.channel - ch.basic_publish_confirm "test", "amq.fanout" - end - - sleep 0.1.seconds - server.upstream_connections.should eq 1 end end it "does not send duplicate channel.close when upstream initiates close and client crashes" do with_server do |server, proxy_url| Fiber.yield - uri = URI.parse(proxy_url) - # Use low-level socket to control the exact frame sequence - socket = TCPSocket.new(uri.host.not_nil!, uri.port.not_nil!) - socket.sync = false - - # AMQP handshake - socket.write AMQ::Protocol::PROTOCOL_START_0_9_1.to_slice - socket.flush - AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) - start_ok = AMQ::Protocol::Frame::Connection::StartOk.new( - response: "\u0000guest\u0000guest", - client_properties: AMQ::Protocol::Table.new, - mechanism: "PLAIN", - locale: "en_US" - ) - socket.write_bytes start_ok, IO::ByteFormat::NetworkEndian - socket.flush - AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) - tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(2047_u16, 131072_u32, 0_u16) - socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian - socket.flush - open = AMQ::Protocol::Frame::Connection::Open.new(vhost: "/") - socket.write_bytes open, IO::ByteFormat::NetworkEndian - socket.flush - AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) - - # Open a channel - channel_open = AMQ::Protocol::Frame::Channel::Open.new(1_u16) - socket.write_bytes channel_open, IO::ByteFormat::NetworkEndian - socket.flush - AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) - - # Now the upstream connection should exist (created when channel was opened) + conn = AMQP::Client.new(proxy_url).connect + ch = conn.channel sleep 0.1.seconds server.upstream_connections.should eq 1 # Trigger a channel error by consuming from non-existent queue # This will cause the upstream to send Channel::Close - basic_consume = AMQ::Protocol::Frame::Basic::Consume.new( - channel: 1_u16, - reserved1: 0_u16, - queue: "non_existent_queue_#{rand}", - consumer_tag: "test", - no_local: false, - no_ack: false, - exclusive: false, - no_wait: false, - arguments: AMQ::Protocol::Table.new - ) - socket.write_bytes basic_consume, IO::ByteFormat::NetworkEndian - socket.flush - - # Read the Channel::Close from the server (forwarded through proxy) - frame = AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) - frame.should be_a(AMQ::Protocol::Frame::Channel::Close) + expect_raises(AMQP::Client::Channel::ClosedException) do + ch.basic_consume("non_existent_queue_#{rand}") { } + end - # Simulate client crash: close socket WITHOUT sending CloseOk - socket.close + # Simulate client crash: close socket WITHOUT proper connection close + conn.@io.close # Give the proxy time to process the disconnect sleep 0.2.seconds - # The upstream connection should still be open - # The proxy should have already sent CloseOk to upstream when it received Channel::Close - server.upstream_connections.should eq 1 - - # Verify we can still use the upstream connection - AMQP::Client.start(proxy_url) do |conn| - ch = conn.channel - ch.basic_publish_confirm "test", "amq.fanout" - end - - sleep 0.1.seconds + # The upstream connection should still be open (not 0) + # The proxy already sent CloseOk to upstream when it received Channel::Close server.upstream_connections.should eq 1 end end diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index ffdc8a6..26682ee 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -74,6 +74,14 @@ module AMQProxy upstream_channel = channel_pool.get(DownstreamChannel.new(self, frame.channel)) @channel_map[frame.channel] = upstream_channel write AMQ::Protocol::Frame::Channel::OpenOk.new(frame.channel) + when AMQ::Protocol::Frame::Channel::Close + if upstream_channel = @channel_map.delete(frame.channel) + # Channel was open, forward close to upstream + upstream_channel.write(frame) + else + # Channel doesn't exist + close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame) + end when AMQ::Protocol::Frame::Channel::CloseOk # Server closed channel, CloseOk reply to server is already sent @channel_map.delete(frame.channel) @@ -148,9 +156,8 @@ module AMQProxy @socket.flush unless expect_more_frames?(frame) end case frame - when AMQ::Protocol::Frame::Channel::Close - @channel_map[frame.channel] = nil - when AMQ::Protocol::Frame::Channel::CloseOk + when AMQ::Protocol::Frame::Channel::Close, + AMQ::Protocol::Frame::Channel::CloseOk @channel_map.delete(frame.channel) when AMQ::Protocol::Frame::Connection::CloseOk @socket.close rescue nil From 935b54829b3fc8f385f9f38e8671583a2438feae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20B=C3=A4lter?= Date: Fri, 19 Dec 2025 23:13:08 +0100 Subject: [PATCH 3/4] Remove nilable type from channel_map The channel_map no longer stores nil values after the previous fix changed from assigning nil to using delete(). This simplifies the type from Hash(UInt16, UpstreamChannel?) to Hash(UInt16, UpstreamChannel) and removes the unnecessary .try call when closing channels. --- src/amqproxy/client.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index 26682ee..26eed4e 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -8,7 +8,7 @@ module AMQProxy class Client Log = ::Log.for(self) getter credentials : Credentials - @channel_map = Hash(UInt16, UpstreamChannel?).new + @channel_map = Hash(UInt16, UpstreamChannel).new @lock = Mutex.new @frame_max : UInt32 @channel_max : UInt16 @@ -182,7 +182,7 @@ module AMQProxy private def close_all_upstream_channels(code = 500_u16, reason = "CLIENT_DISCONNECTED") @channel_map.each_value do |upstream_channel| - upstream_channel.try &.close(code, reason) + upstream_channel.close(code, reason) rescue Upstream::WriteError Log.debug { "Upstream write error while closing client's channels" } next # Nothing to do From e90c56e6556408ac64a4e2419a3fe90e6d24e6b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Anders=20B=C3=A4lter?= Date: Fri, 19 Dec 2025 23:23:33 +0100 Subject: [PATCH 4/4] Improve specs with canary connection to verify upstream stays open Added a canary connection in both channel.close specs that stays open throughout the test. If the bug exists (duplicate Channel::Close sent), the upstream connection would close, affecting ALL clients including the canary. Verifying the canary remains open proves the upstream connection didn't close due to duplicate frames. This addresses Chad's concern that checking upstream_connections.should eq 1 alone might not be meaningful if the proxy quickly reconnects. --- spec/amqproxy/server_spec.cr | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/spec/amqproxy/server_spec.cr b/spec/amqproxy/server_spec.cr index 2ffc66a..3957a48 100644 --- a/spec/amqproxy/server_spec.cr +++ b/spec/amqproxy/server_spec.cr @@ -264,6 +264,12 @@ describe AMQProxy::Server do with_server do |server, proxy_url| Fiber.yield + # Open a canary connection to verify the upstream connection stays open + # If the bug exists, duplicate Channel::Close will close the upstream connection, + # affecting ALL clients sharing that connection + canary = AMQP::Client.new(proxy_url).connect + canary_channel = canary.channel + # Open multiple channels and close them rapidly, then crash # This increases the probability of triggering the race condition num_channels = 10 @@ -284,9 +290,9 @@ describe AMQProxy::Server do # Give the proxy time to process the disconnect sleep 0.3.seconds - # The upstream connection should still be open (not 0) - # If duplicate Channel::Close was sent, upstream would have closed the connection - # with error: "expected 'channel.open'" + # Verify the canary connection is still alive (proves upstream connection didn't close) + canary.closed?.should be_false + canary_channel.closed?.should be_false server.upstream_connections.should eq 1 end end @@ -295,6 +301,12 @@ describe AMQProxy::Server do with_server do |server, proxy_url| Fiber.yield + # Open a canary connection to verify the upstream connection stays open + # If the bug exists, duplicate Channel::Close will close the upstream connection, + # affecting ALL clients sharing that connection + canary = AMQP::Client.new(proxy_url).connect + canary_channel = canary.channel + conn = AMQP::Client.new(proxy_url).connect ch = conn.channel sleep 0.1.seconds @@ -312,8 +324,10 @@ describe AMQProxy::Server do # Give the proxy time to process the disconnect sleep 0.2.seconds - # The upstream connection should still be open (not 0) + # Verify the canary connection is still alive (proves upstream connection didn't close) # The proxy already sent CloseOk to upstream when it received Channel::Close + canary.closed?.should be_false + canary_channel.closed?.should be_false server.upstream_connections.should eq 1 end end