From 2f8f02ae3adcced2c8cb37cd7f199fe1f646fee1 Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 4 Sep 2025 09:48:25 +1200 Subject: [PATCH 1/9] close downstream connections when upstream close --- src/amqproxy/upstream.cr | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 2b840c8..7f1b052 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -114,14 +114,21 @@ module AMQProxy end private def close_all_client_channels(code = 500_u16, reason = "UPSTREAM_ERROR") + clients = Set(Client).new @channels_lock.synchronize do return if @channels.empty? Log.debug { "Upstream connection closed, closing #{@channels.size} client channels" } @channels.each_value do |downstream_channel| - downstream_channel.close(code, reason) + clients << downstream_channel.client end @channels.clear end + + # Close all client connections that were using this upstream + clients.each do |client| + Log.debug { "Closing client connection due to upstream failure" } + client.close_connection(code, reason) + end end private def send_to_all_clients(frame : AMQ::Protocol::Frame::Connection) From 0623630b2e5b9f437713d00eb5145f367ba7d3ab Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 4 Sep 2025 11:25:32 +1200 Subject: [PATCH 2/9] some reorganisation --- src/amqproxy/upstream.cr | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 7f1b052..3ef5287 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -73,7 +73,7 @@ module AMQProxy when AMQ::Protocol::Frame::Heartbeat then send frame when AMQ::Protocol::Frame::Connection::Close Log.error { "Upstream closed connection: #{frame.reply_text} #{frame.reply_code}" } - close_all_client_channels(frame.reply_code, frame.reply_text) + close_all_downstream_client_connections(frame.reply_code, frame.reply_text) begin send AMQ::Protocol::Frame::Connection::CloseOk.new rescue WriteError @@ -106,29 +106,27 @@ module AMQProxy Log.info { "Connection error #{ex.inspect}" } unless socket.closed? ensure socket.close rescue nil - close_all_client_channels + close_all_downstream_client_connections end def closed? @socket.closed? end - private def close_all_client_channels(code = 500_u16, reason = "UPSTREAM_ERROR") + private def close_all_downstream_client_connections(code = 500_u16, reason = "UPSTREAM_ERROR") clients = Set(Client).new @channels_lock.synchronize do return if @channels.empty? - Log.debug { "Upstream connection closed, closing #{@channels.size} client channels" } - @channels.each_value do |downstream_channel| - clients << downstream_channel.client + @channels.each_value do |downstream_connection| + clients << downstream_connection.client + end + + clients.each do |client| + Log.debug { "Closing client connection due to upstream failure. Client: #{client}" } + client.close_connection(code, reason) end @channels.clear end - - # Close all client connections that were using this upstream - clients.each do |client| - Log.debug { "Closing client connection due to upstream failure" } - client.close_connection(code, reason) - end end private def send_to_all_clients(frame : AMQ::Protocol::Frame::Connection) From 4b064937ca1952926bf41537b0952c80fb0ba83d Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 4 Sep 2025 11:33:28 +1200 Subject: [PATCH 3/9] no need to include client --- src/amqproxy/upstream.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 3ef5287..e600d7f 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -122,7 +122,7 @@ module AMQProxy end clients.each do |client| - Log.debug { "Closing client connection due to upstream failure. Client: #{client}" } + Log.debug { "Closing client connection due to upstream failure." } client.close_connection(code, reason) end @channels.clear From 574805cd9059cb19bea051aca2cd8daabd8e86c9 Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 4 Sep 2025 12:25:17 +1200 Subject: [PATCH 4/9] log once not for each closure --- src/amqproxy/upstream.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index e600d7f..681bf26 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -117,12 +117,12 @@ module AMQProxy clients = Set(Client).new @channels_lock.synchronize do return if @channels.empty? + Log.debug { "Upstream connection closed, closing #{@channels.size} client connections" } @channels.each_value do |downstream_connection| clients << downstream_connection.client end clients.each do |client| - Log.debug { "Closing client connection due to upstream failure." } client.close_connection(code, reason) end @channels.clear From d580d30c8c7e997a6c366da6693f1ca4320f7061 Mon Sep 17 00:00:00 2001 From: markus812498 Date: Tue, 9 Sep 2025 11:38:45 +1200 Subject: [PATCH 5/9] spellcheck --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index bb7aff4..90142db 100644 --- a/README.md +++ b/README.md @@ -8,9 +8,9 @@ This proxy server, if run on the same machine as the client can save all that la Only "safe" channels are reused, that is channels where only Basic Publish or Basic Get (with no_ack) has occurred. Any channels who has subscribed to a queue will be closed when the client disconnects. However, the connection to the upstream AMQP server are always kept open and can be reused. -In our benchmarks publishing one message per connection to a server (using TLS) with a round-trip latency of 50ms, takes on avarage 10ms using the proxy and 500ms without. You can read more about the proxy here [Maintaining long-lived connections with AMQProxy](https://www.cloudamqp.com/blog/2019-05-29-maintaining-long-lived-connections-with-AMQProxy.html) +In our benchmarks publishing one message per connection to a server (using TLS) with a round-trip latency of 50ms, takes on average 10ms using the proxy and 500ms without. You can read more about the proxy here [Maintaining long-lived connections with AMQProxy](https://www.cloudamqp.com/blog/2019-05-29-maintaining-long-lived-connections-with-AMQProxy.html) -As of version 2.0.0 connections to the server can be shared by multiple client connections. When a client opens a channel it will get a channel on a shared upstream connection, the proxy will remap the channel numbers between the two. Many client connections can therefor share a single upstream connection. The benefit is that way fewer connections are needed to the upstream server. For instance, establihsing 10.000 connections after a server reboot might normally take several minutes, but with this proxy it can happen in seconds. +As of version 2.0.0 connections to the server can be shared by multiple client connections. When a client opens a channel it will get a channel on a shared upstream connection, the proxy will remap the channel numbers between the two. Many client connections can therefor share a single upstream connection. The benefit is that way fewer connections are needed to the upstream server. For instance, establishing 10.000 connections after a server reboot might normally take several minutes, but with this proxy it can happen in seconds. A health check for amqproxy is available over http on http://listen_address:http_port/healthz and will return 200 when amqproxy is healthy. @@ -43,7 +43,7 @@ docker run --rm -it -p 5673:5673 cloudamqp/amqproxy amqp://SERVER:5672 Note: If you are running the upstream server on localhost then you will have to add the `--network host` flag to the docker run command. -Then from your AMQP client connect to localhost:5673, it will resuse connections made to the upstream. The AMQP_URL should only include protocol, hostname and port (only if non default, 5672 for AMQP and 5671 for AMQPS). Any username, password or vhost will be ignored, and it's up to the client to provide them. +Then from your AMQP client connect to localhost:5673, it will reuse connections made to the upstream. The AMQP_URL should only include protocol, hostname and port (only if non default, 5672 for AMQP and 5671 for AMQPS). Any username, password or vhost will be ignored, and it's up to the client to provide them. ## Installation (from source) @@ -90,5 +90,5 @@ There are three ways to configure the AMQProxy. 2. Command line options & argument 3. Environment variables -Settings that are avilable in the config file will override the corresponding command line options. A command line option will override the corresponding environment variable. And so on. +Settings that are available in the config file will override the corresponding command line options. A command line option will override the corresponding environment variable. And so on. The different configuration approaches can also be mixed. From d3cbf414706624b0cb31226e467d8da032836f09 Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 20 Nov 2025 11:23:59 +1300 Subject: [PATCH 6/9] adding max channels per upstream connection as configurable item --- README.md | 1 + config/example.ini | 1 + src/amqproxy/channel_pool.cr | 4 ++-- src/amqproxy/cli.cr | 8 +++++++- src/amqproxy/server.cr | 4 ++-- src/amqproxy/upstream.cr | 5 +++-- 6 files changed, 16 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 90142db..c3ff4b9 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,7 @@ You probably want to modify `/etc/systemd/system/amqproxy.service` and configure | Listen port | Port to listen on. This is the port which will be the target of clients | `--port` / `-p` | `LISTEN_PORT` | `[listen] > port` | `5673` | | Log level | Controls log verbosity.

Available levels (see [84codes/logger.cr](https://github.com/84codes/logger.cr/blob/main/src/logger.cr#L86)):
- `DEBUG`: Low-level information for developers
- `INFO`: Generic (useful) information about system operation
- `WARN`: Warnings
- `ERROR`: Handleable error conditions
- `FATAL`: Unhandleable errors that results in a program crash | `--debug` / `-d`: Sets the level to `DEBUG` | - | `[main] > log_level` | `INFO` | | Idle connection timeout | Maximum time in seconds an unused pooled connection stays open | `--idle-connection-timeout` / `-t` | `IDLE_CONNECTION_TIMEOUT` | `[main] > idle_connection_timeout` | `5` | +| Max upstream channels | Maximum number of channels per upstream connection. The effective limit will be the lowest value between this setting and the upstream server's channel_max from the connection tune packet | `--max-upstream-channels` | `MAX_UPSTREAM_CHANNELS` | `[main] > max_upstream_channels` | `65535` | | Upstream | AMQP URL that points to the upstream RabbitMQ server to which the proxy should connect to. May only contain scheme, host & port (optional). Example: `amqps://rabbitmq.example.com` | Pass as argument after all options | `AMQP_URL` | `[main] > upstream` | | ### How to configure diff --git a/config/example.ini b/config/example.ini index d47aa99..7fc055c 100644 --- a/config/example.ini +++ b/config/example.ini @@ -1,6 +1,7 @@ [main] log_level = info idle_connection_timeout = 5 +max_upstream_channels = 65535 upstream = amqp://localhost:5672 [listen] diff --git a/src/amqproxy/channel_pool.cr b/src/amqproxy/channel_pool.cr index c88d281..f702f16 100644 --- a/src/amqproxy/channel_pool.cr +++ b/src/amqproxy/channel_pool.cr @@ -9,7 +9,7 @@ module AMQProxy @lock = Mutex.new @upstreams = Deque(Upstream).new - def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32) + def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32, @max_upstream_channels : UInt16) spawn shrink_pool_loop, name: "shrink pool loop" end @@ -36,7 +36,7 @@ module AMQProxy end private def add_upstream - upstream = Upstream.new(@host, @port, @tls_ctx, @credentials) + upstream = Upstream.new(@host, @port, @tls_ctx, @credentials, @max_upstream_channels) Log.info { "Adding upstream connection" } @upstreams.unshift upstream spawn(name: "Upstream#read_loop") do diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 24e5ce2..a864ea2 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -14,6 +14,7 @@ class AMQProxy::CLI @http_port = 15673 @log_level : ::Log::Severity = ::Log::Severity::Info @idle_connection_timeout : Int32 = 5 + @max_upstream_channels : UInt16 = UInt16::MAX @term_timeout = -1 @term_client_close_timeout = 0 @server : AMQProxy::Server? = nil @@ -27,6 +28,7 @@ class AMQProxy::CLI when "upstream" then @upstream = value when "log_level" then @log_level = ::Log::Severity.parse(value) when "idle_connection_timeout" then @idle_connection_timeout = value.to_i + when "max_upstream_channels" then @max_upstream_channels = value.to_u16 when "term_timeout" then @term_timeout = value.to_i when "term_client_close_timeout" then @term_client_close_timeout = value.to_i else raise "Unsupported config #{name}/#{key}" @@ -54,6 +56,7 @@ class AMQProxy::CLI @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port @log_level = ENV["LOG_LEVEL"]?.try { |level| ::Log::Severity.parse(level) } || @log_level @idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout + @max_upstream_channels = ENV["MAX_UPSTREAM_CHANNELS"]?.try &.to_u16 || @max_upstream_channels @term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout @term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout @upstream = ENV["AMQP_URL"]? || @upstream @@ -81,6 +84,9 @@ class AMQProxy::CLI parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| @idle_connection_timeout = v.to_i end + parser.on("--max-upstream-channels=CHANNELS", "Maximum channels per upstream connection (default: server max or 65535)") do |v| + @max_upstream_channels = v.to_u16 + end parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| @term_timeout = v.to_i end @@ -117,7 +123,7 @@ class AMQProxy::CLI Signal::INT.trap &->self.initiate_shutdown(Signal) Signal::TERM.trap &->self.initiate_shutdown(Signal) - server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) + server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout, @max_upstream_channels) HTTPServer.new(server, @listen_address, @http_port.to_i) server.listen(@listen_address, @listen_port.to_i) diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index 4af02f4..fe04409 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -21,10 +21,10 @@ module AMQProxy new(host, port, tls, idle_connection_timeout) end - def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5) + def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5, max_upstream_channels = UInt16::MAX) tls_ctx = OpenSSL::SSL::Context::Client.new if upstream_tls @channel_pools = Hash(Credentials, ChannelPool).new do |hash, credentials| - hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout) + hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout, max_upstream_channels) end Log.info { "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}" } end diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 681bf26..5490743 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -16,7 +16,7 @@ module AMQProxy @channel_max : UInt16 @frame_max : UInt32 - def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials) + def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials, @max_upstream_channels : UInt16 = UInt16::MAX) tcp_socket = TCPSocket.new(@host, @port) tcp_socket.sync = false tcp_socket.keepalive = true @@ -203,7 +203,8 @@ module AMQProxy case tune = AMQ::Protocol::Frame.from_io(@socket) when AMQ::Protocol::Frame::Connection::Tune - channel_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max + server_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max + channel_max = Math.min(server_max, @max_upstream_channels) frame_max = tune.frame_max.zero? ? 131072_u32 : Math.min(131072_u32, tune.frame_max) tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(channel_max, frame_max, tune.heartbeat) @socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian From 1c8ce3ebdf1bd968131139c22f2b1d8ab9c4efa9 Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 4 Sep 2025 09:48:25 +1200 Subject: [PATCH 7/9] close downstream connections when upstream close --- src/amqproxy/upstream.cr | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 5490743..05fc04b 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -127,6 +127,12 @@ module AMQProxy end @channels.clear end + + # Close all client connections that were using this upstream + clients.each do |client| + Log.debug { "Closing client connection due to upstream failure" } + client.close_connection(code, reason) + end end private def send_to_all_clients(frame : AMQ::Protocol::Frame::Connection) From a0b5c01a9e9d5fa25dc0a94e545e5d9eda0eba45 Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 4 Sep 2025 11:25:32 +1200 Subject: [PATCH 8/9] some reorganisation pick 2f8f02a # close downstream connections when upstream close --- src/amqproxy/upstream.cr | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 05fc04b..48a41ef 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -117,22 +117,23 @@ module AMQProxy clients = Set(Client).new @channels_lock.synchronize do return if @channels.empty? +<<<<<<< HEAD Log.debug { "Upstream connection closed, closing #{@channels.size} client connections" } +======= +>>>>>>> 0623630 (some reorganisation) @channels.each_value do |downstream_connection| clients << downstream_connection.client end clients.each do |client| +<<<<<<< HEAD +======= + Log.debug { "Closing client connection due to upstream failure. Client: #{client}" } +>>>>>>> 0623630 (some reorganisation) client.close_connection(code, reason) end @channels.clear end - - # Close all client connections that were using this upstream - clients.each do |client| - Log.debug { "Closing client connection due to upstream failure" } - client.close_connection(code, reason) - end end private def send_to_all_clients(frame : AMQ::Protocol::Frame::Connection) From efdb91d2f4ff07a5d225cd89b0d5839fa729fe23 Mon Sep 17 00:00:00 2001 From: markus812498 Date: Thu, 4 Sep 2025 11:33:28 +1200 Subject: [PATCH 9/9] no need to include client pick 574805c # log once not for each closure pick d3cbf41 # adding max channels per upstream connection as configurable item --- src/amqproxy/upstream.cr | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 48a41ef..d28f0aa 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -117,19 +117,12 @@ module AMQProxy clients = Set(Client).new @channels_lock.synchronize do return if @channels.empty? -<<<<<<< HEAD - Log.debug { "Upstream connection closed, closing #{@channels.size} client connections" } -======= ->>>>>>> 0623630 (some reorganisation) @channels.each_value do |downstream_connection| clients << downstream_connection.client end clients.each do |client| -<<<<<<< HEAD -======= - Log.debug { "Closing client connection due to upstream failure. Client: #{client}" } ->>>>>>> 0623630 (some reorganisation) + Log.debug { "Closing client connection due to upstream failure." } client.close_connection(code, reason) end @channels.clear