diff --git a/README.md b/README.md index 90142db..c3ff4b9 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,7 @@ You probably want to modify `/etc/systemd/system/amqproxy.service` and configure | Listen port | Port to listen on. This is the port which will be the target of clients | `--port` / `-p` | `LISTEN_PORT` | `[listen] > port` | `5673` | | Log level | Controls log verbosity.

Available levels (see [84codes/logger.cr](https://github.com/84codes/logger.cr/blob/main/src/logger.cr#L86)):
- `DEBUG`: Low-level information for developers
- `INFO`: Generic (useful) information about system operation
- `WARN`: Warnings
- `ERROR`: Handleable error conditions
- `FATAL`: Unhandleable errors that results in a program crash | `--debug` / `-d`: Sets the level to `DEBUG` | - | `[main] > log_level` | `INFO` | | Idle connection timeout | Maximum time in seconds an unused pooled connection stays open | `--idle-connection-timeout` / `-t` | `IDLE_CONNECTION_TIMEOUT` | `[main] > idle_connection_timeout` | `5` | +| Max upstream channels | Maximum number of channels per upstream connection. The effective limit will be the lowest value between this setting and the upstream server's channel_max from the connection tune packet | `--max-upstream-channels` | `MAX_UPSTREAM_CHANNELS` | `[main] > max_upstream_channels` | `65535` | | Upstream | AMQP URL that points to the upstream RabbitMQ server to which the proxy should connect to. May only contain scheme, host & port (optional). Example: `amqps://rabbitmq.example.com` | Pass as argument after all options | `AMQP_URL` | `[main] > upstream` | | ### How to configure diff --git a/config/example.ini b/config/example.ini index d47aa99..7fc055c 100644 --- a/config/example.ini +++ b/config/example.ini @@ -1,6 +1,7 @@ [main] log_level = info idle_connection_timeout = 5 +max_upstream_channels = 65535 upstream = amqp://localhost:5672 [listen] diff --git a/src/amqproxy/channel_pool.cr b/src/amqproxy/channel_pool.cr index c88d281..f702f16 100644 --- a/src/amqproxy/channel_pool.cr +++ b/src/amqproxy/channel_pool.cr @@ -9,7 +9,7 @@ module AMQProxy @lock = Mutex.new @upstreams = Deque(Upstream).new - def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32) + def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32, @max_upstream_channels : UInt16) spawn shrink_pool_loop, name: "shrink pool loop" end @@ -36,7 +36,7 @@ module AMQProxy end private def add_upstream - upstream = Upstream.new(@host, @port, @tls_ctx, @credentials) + upstream = Upstream.new(@host, @port, @tls_ctx, @credentials, @max_upstream_channels) Log.info { "Adding upstream connection" } @upstreams.unshift upstream spawn(name: "Upstream#read_loop") do diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 24e5ce2..a864ea2 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -14,6 +14,7 @@ class AMQProxy::CLI @http_port = 15673 @log_level : ::Log::Severity = ::Log::Severity::Info @idle_connection_timeout : Int32 = 5 + @max_upstream_channels : UInt16 = UInt16::MAX @term_timeout = -1 @term_client_close_timeout = 0 @server : AMQProxy::Server? = nil @@ -27,6 +28,7 @@ class AMQProxy::CLI when "upstream" then @upstream = value when "log_level" then @log_level = ::Log::Severity.parse(value) when "idle_connection_timeout" then @idle_connection_timeout = value.to_i + when "max_upstream_channels" then @max_upstream_channels = value.to_u16 when "term_timeout" then @term_timeout = value.to_i when "term_client_close_timeout" then @term_client_close_timeout = value.to_i else raise "Unsupported config #{name}/#{key}" @@ -54,6 +56,7 @@ class AMQProxy::CLI @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port @log_level = ENV["LOG_LEVEL"]?.try { |level| ::Log::Severity.parse(level) } || @log_level @idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout + @max_upstream_channels = ENV["MAX_UPSTREAM_CHANNELS"]?.try &.to_u16 || @max_upstream_channels @term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout @term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout @upstream = ENV["AMQP_URL"]? || @upstream @@ -81,6 +84,9 @@ class AMQProxy::CLI parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| @idle_connection_timeout = v.to_i end + parser.on("--max-upstream-channels=CHANNELS", "Maximum channels per upstream connection (default: server max or 65535)") do |v| + @max_upstream_channels = v.to_u16 + end parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| @term_timeout = v.to_i end @@ -117,7 +123,7 @@ class AMQProxy::CLI Signal::INT.trap &->self.initiate_shutdown(Signal) Signal::TERM.trap &->self.initiate_shutdown(Signal) - server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) + server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout, @max_upstream_channels) HTTPServer.new(server, @listen_address, @http_port.to_i) server.listen(@listen_address, @listen_port.to_i) diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index 4af02f4..fe04409 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -21,10 +21,10 @@ module AMQProxy new(host, port, tls, idle_connection_timeout) end - def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5) + def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5, max_upstream_channels = UInt16::MAX) tls_ctx = OpenSSL::SSL::Context::Client.new if upstream_tls @channel_pools = Hash(Credentials, ChannelPool).new do |hash, credentials| - hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout) + hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout, max_upstream_channels) end Log.info { "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}" } end diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 681bf26..d28f0aa 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -16,7 +16,7 @@ module AMQProxy @channel_max : UInt16 @frame_max : UInt32 - def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials) + def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials, @max_upstream_channels : UInt16 = UInt16::MAX) tcp_socket = TCPSocket.new(@host, @port) tcp_socket.sync = false tcp_socket.keepalive = true @@ -117,12 +117,12 @@ module AMQProxy clients = Set(Client).new @channels_lock.synchronize do return if @channels.empty? - Log.debug { "Upstream connection closed, closing #{@channels.size} client connections" } @channels.each_value do |downstream_connection| clients << downstream_connection.client end clients.each do |client| + Log.debug { "Closing client connection due to upstream failure." } client.close_connection(code, reason) end @channels.clear @@ -203,7 +203,8 @@ module AMQProxy case tune = AMQ::Protocol::Frame.from_io(@socket) when AMQ::Protocol::Frame::Connection::Tune - channel_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max + server_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max + channel_max = Math.min(server_max, @max_upstream_channels) frame_max = tune.frame_max.zero? ? 131072_u32 : Math.min(131072_u32, tune.frame_max) tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(channel_max, frame_max, tune.heartbeat) @socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian