Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions spec/amqproxy/server_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,76 @@ describe AMQProxy::Server do
end
end
end

it "does not send duplicate channel.close frames when client crashes after sending close" do
with_server do |server, proxy_url|
Fiber.yield

# Open a canary connection to verify the upstream connection stays open
# If the bug exists, duplicate Channel::Close will close the upstream connection,
# affecting ALL clients sharing that connection
canary = AMQP::Client.new(proxy_url).connect
canary_channel = canary.channel

# Open multiple channels and close them rapidly, then crash
# This increases the probability of triggering the race condition
num_channels = 10

conn = AMQP::Client.new(proxy_url).connect
channels = (1..num_channels).map { conn.channel }
sleep 0.1.seconds
server.upstream_connections.should eq 1

# Send Channel::Close for ALL channels rapidly without waiting for CloseOk
channels.each do |channel|
conn.write AMQ::Protocol::Frame::Channel::Close.new(channel.id, 200_u16, "Normal close", 0_u16, 0_u16)
end

# Simulate crash: close socket abruptly WITHOUT waiting for any CloseOk
conn.@io.close

# Give the proxy time to process the disconnect
sleep 0.3.seconds

# Verify the canary connection is still alive (proves upstream connection didn't close)
canary.closed?.should be_false
canary_channel.closed?.should be_false
server.upstream_connections.should eq 1
end
end

it "does not send duplicate channel.close when upstream initiates close and client crashes" do
with_server do |server, proxy_url|
Fiber.yield

# Open a canary connection to verify the upstream connection stays open
# If the bug exists, duplicate Channel::Close will close the upstream connection,
# affecting ALL clients sharing that connection
canary = AMQP::Client.new(proxy_url).connect
canary_channel = canary.channel

conn = AMQP::Client.new(proxy_url).connect
ch = conn.channel
sleep 0.1.seconds
server.upstream_connections.should eq 1

# Trigger a channel error by consuming from non-existent queue
# This will cause the upstream to send Channel::Close
expect_raises(AMQP::Client::Channel::ClosedException) do
ch.basic_consume("non_existent_queue_#{rand}") { }
end

# Simulate client crash: close socket WITHOUT proper connection close
conn.@io.close

# Give the proxy time to process the disconnect
sleep 0.2.seconds

# Verify the canary connection is still alive (proves upstream connection didn't close)
# The proxy already sent CloseOk to upstream when it received Channel::Close
canary.closed?.should be_false
canary_channel.closed?.should be_false
server.upstream_connections.should eq 1
end
end
end
17 changes: 12 additions & 5 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ module AMQProxy
class Client
Log = ::Log.for(self)
getter credentials : Credentials
@channel_map = Hash(UInt16, UpstreamChannel?).new
@channel_map = Hash(UInt16, UpstreamChannel).new
@lock = Mutex.new
@frame_max : UInt32
@channel_max : UInt16
Expand Down Expand Up @@ -74,6 +74,14 @@ module AMQProxy
upstream_channel = channel_pool.get(DownstreamChannel.new(self, frame.channel))
@channel_map[frame.channel] = upstream_channel
write AMQ::Protocol::Frame::Channel::OpenOk.new(frame.channel)
when AMQ::Protocol::Frame::Channel::Close
if upstream_channel = @channel_map.delete(frame.channel)
# Channel was open, forward close to upstream
upstream_channel.write(frame)
else
# Channel doesn't exist
close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame)
end
when AMQ::Protocol::Frame::Channel::CloseOk
# Server closed channel, CloseOk reply to server is already sent
@channel_map.delete(frame.channel)
Expand Down Expand Up @@ -148,9 +156,8 @@ module AMQProxy
@socket.flush unless expect_more_frames?(frame)
end
case frame
when AMQ::Protocol::Frame::Channel::Close
@channel_map[frame.channel] = nil
when AMQ::Protocol::Frame::Channel::CloseOk
when AMQ::Protocol::Frame::Channel::Close,
AMQ::Protocol::Frame::Channel::CloseOk
@channel_map.delete(frame.channel)
when AMQ::Protocol::Frame::Connection::CloseOk
@socket.close rescue nil
Expand All @@ -175,7 +182,7 @@ module AMQProxy

private def close_all_upstream_channels(code = 500_u16, reason = "CLIENT_DISCONNECTED")
@channel_map.each_value do |upstream_channel|
upstream_channel.try &.close(code, reason)
upstream_channel.close(code, reason)
rescue Upstream::WriteError
Log.debug { "Upstream write error while closing client's channels" }
next # Nothing to do
Expand Down