diff --git a/spec/amqproxy/server_spec.cr b/spec/amqproxy/server_spec.cr index bd8c4ca..3957a48 100644 --- a/spec/amqproxy/server_spec.cr +++ b/spec/amqproxy/server_spec.cr @@ -259,4 +259,76 @@ describe AMQProxy::Server do end end end + + it "does not send duplicate channel.close frames when client crashes after sending close" do + with_server do |server, proxy_url| + Fiber.yield + + # Open a canary connection to verify the upstream connection stays open + # If the bug exists, duplicate Channel::Close will close the upstream connection, + # affecting ALL clients sharing that connection + canary = AMQP::Client.new(proxy_url).connect + canary_channel = canary.channel + + # Open multiple channels and close them rapidly, then crash + # This increases the probability of triggering the race condition + num_channels = 10 + + conn = AMQP::Client.new(proxy_url).connect + channels = (1..num_channels).map { conn.channel } + sleep 0.1.seconds + server.upstream_connections.should eq 1 + + # Send Channel::Close for ALL channels rapidly without waiting for CloseOk + channels.each do |channel| + conn.write AMQ::Protocol::Frame::Channel::Close.new(channel.id, 200_u16, "Normal close", 0_u16, 0_u16) + end + + # Simulate crash: close socket abruptly WITHOUT waiting for any CloseOk + conn.@io.close + + # Give the proxy time to process the disconnect + sleep 0.3.seconds + + # Verify the canary connection is still alive (proves upstream connection didn't close) + canary.closed?.should be_false + canary_channel.closed?.should be_false + server.upstream_connections.should eq 1 + end + end + + it "does not send duplicate channel.close when upstream initiates close and client crashes" do + with_server do |server, proxy_url| + Fiber.yield + + # Open a canary connection to verify the upstream connection stays open + # If the bug exists, duplicate Channel::Close will close the upstream connection, + # affecting ALL clients sharing that connection + canary = AMQP::Client.new(proxy_url).connect + canary_channel = canary.channel + + conn = AMQP::Client.new(proxy_url).connect + ch = conn.channel + sleep 0.1.seconds + server.upstream_connections.should eq 1 + + # Trigger a channel error by consuming from non-existent queue + # This will cause the upstream to send Channel::Close + expect_raises(AMQP::Client::Channel::ClosedException) do + ch.basic_consume("non_existent_queue_#{rand}") { } + end + + # Simulate client crash: close socket WITHOUT proper connection close + conn.@io.close + + # Give the proxy time to process the disconnect + sleep 0.2.seconds + + # Verify the canary connection is still alive (proves upstream connection didn't close) + # The proxy already sent CloseOk to upstream when it received Channel::Close + canary.closed?.should be_false + canary_channel.closed?.should be_false + server.upstream_connections.should eq 1 + end + end end diff --git a/src/amqproxy/client.cr b/src/amqproxy/client.cr index ffdc8a6..26eed4e 100644 --- a/src/amqproxy/client.cr +++ b/src/amqproxy/client.cr @@ -8,7 +8,7 @@ module AMQProxy class Client Log = ::Log.for(self) getter credentials : Credentials - @channel_map = Hash(UInt16, UpstreamChannel?).new + @channel_map = Hash(UInt16, UpstreamChannel).new @lock = Mutex.new @frame_max : UInt32 @channel_max : UInt16 @@ -74,6 +74,14 @@ module AMQProxy upstream_channel = channel_pool.get(DownstreamChannel.new(self, frame.channel)) @channel_map[frame.channel] = upstream_channel write AMQ::Protocol::Frame::Channel::OpenOk.new(frame.channel) + when AMQ::Protocol::Frame::Channel::Close + if upstream_channel = @channel_map.delete(frame.channel) + # Channel was open, forward close to upstream + upstream_channel.write(frame) + else + # Channel doesn't exist + close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame) + end when AMQ::Protocol::Frame::Channel::CloseOk # Server closed channel, CloseOk reply to server is already sent @channel_map.delete(frame.channel) @@ -148,9 +156,8 @@ module AMQProxy @socket.flush unless expect_more_frames?(frame) end case frame - when AMQ::Protocol::Frame::Channel::Close - @channel_map[frame.channel] = nil - when AMQ::Protocol::Frame::Channel::CloseOk + when AMQ::Protocol::Frame::Channel::Close, + AMQ::Protocol::Frame::Channel::CloseOk @channel_map.delete(frame.channel) when AMQ::Protocol::Frame::Connection::CloseOk @socket.close rescue nil @@ -175,7 +182,7 @@ module AMQProxy private def close_all_upstream_channels(code = 500_u16, reason = "CLIENT_DISCONNECTED") @channel_map.each_value do |upstream_channel| - upstream_channel.try &.close(code, reason) + upstream_channel.close(code, reason) rescue Upstream::WriteError Log.debug { "Upstream write error while closing client's channels" } next # Nothing to do