diff --git a/src/main/java/net/jodah/lyra/ConnectionOptions.java b/src/main/java/net/jodah/lyra/ConnectionOptions.java index 5b2120b..57571ab 100644 --- a/src/main/java/net/jodah/lyra/ConnectionOptions.java +++ b/src/main/java/net/jodah/lyra/ConnectionOptions.java @@ -1,30 +1,30 @@ package net.jodah.lyra; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.util.Map; -import java.util.concurrent.ExecutorService; - -import javax.net.SocketFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; - +import com.rabbitmq.client.Address; +import com.rabbitmq.client.ConnectionFactory; +import net.jodah.lyra.config.AddressResolver; +import net.jodah.lyra.config.DefaultAddressResolver; import net.jodah.lyra.internal.util.Addresses; import net.jodah.lyra.internal.util.Assert; import net.jodah.lyra.util.Duration; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.ConnectionFactory; +import javax.net.SocketFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.util.Map; +import java.util.concurrent.ExecutorService; /** * Connection options. Changes will not effect connections that have already been created. - * + * * @author Jonathan Halterman */ public class ConnectionOptions { private ConnectionFactory factory; private String host = "localhost"; - private Address[] addresses; + private AddressResolver addressResolver; private String name; private ExecutorService executor; @@ -34,7 +34,7 @@ public ConnectionOptions() { /** * Creates a new Options object for the {@code connectionFactory}. - * + * * @throws NullPointerException if {@code connectionFactory} is null */ public ConnectionOptions(ConnectionFactory connectionFactory) { @@ -57,7 +57,7 @@ private ConnectionOptions(ConnectionOptions options) { factory.setSaslConfig(options.factory.getSaslConfig()); factory.setSocketFactory(options.factory.getSocketFactory()); host = options.host; - addresses = options.addresses; + addressResolver = options.addressResolver; name = options.name; executor = options.executor; } @@ -71,14 +71,15 @@ public ConnectionOptions copy() { /** * Returns the addresses to attempt connections to, in round-robin order. - * + * * @see #withAddresses(Address...) * @see #withAddresses(String) * @see #withHost(String) * @see #withHosts(String...) */ public Address[] getAddresses() { - return addresses == null ? new Address[] { new Address(host, factory.getPort()) } : addresses; + Address address = new Address(host, factory.getPort()); + return addressResolver == null ? new Address[] {address} : addressResolver.resolveAddresses(); } /** @@ -90,7 +91,7 @@ public ConnectionFactory getConnectionFactory() { /** * Returns the consumer executor. - * + * * @see #withConsumerExecutor(ExecutorService) */ public ExecutorService getConsumerExecutor() { @@ -101,30 +102,42 @@ public String getName() { return name; } + /** + * Sets the {@code addressResolver} to resolve addresses + * + * @throws NullPointerException if {@code addressResolver} is null + */ + public ConnectionOptions withAddressResolver(AddressResolver addressResolver) { + this.addressResolver = Assert.notNull(addressResolver, "addressResolver"); + return this; + } + /** * Sets the {@code addresses} to attempt connections to, in round-robin order. - * + * * @throws NullPointerException if {@code addresses} is null */ public ConnectionOptions withAddresses(Address... addresses) { - this.addresses = Assert.notNull(addresses, "addresses"); + Address[] _addresses = Assert.notNull(addresses, "addresses"); + this.addressResolver = new DefaultAddressResolver(_addresses); return this; } /** * Sets the {@code addresses}. - * + * * @param addresses formatted as "host1[:port],host2[:port]", etc. * @throws NullPointerException if {@code addresses} is null */ public ConnectionOptions withAddresses(String addresses) { - this.addresses = Address.parseAddresses(Assert.notNull(addresses, "addresses")); + Address[] _addresses = Address.parseAddresses(Assert.notNull(addresses, "addresses")); + this.addressResolver = new DefaultAddressResolver(_addresses); return this; } /** * Sets the client properties. - * + * * @throws NullPointerException if {@code clientProperties} is null */ public ConnectionOptions withClientProperties(Map clientProperties) { @@ -134,7 +147,7 @@ public ConnectionOptions withClientProperties(Map clientProperti /** * Sets the {@code connectionFactory}. - * + * * @throws NullPointerException if {@code connectionFactory} is null */ public ConnectionOptions withConnectionFactory(ConnectionFactory connectionFactory) { @@ -144,7 +157,7 @@ public ConnectionOptions withConnectionFactory(ConnectionFactory connectionFacto /** * Set the connection timeout, zero for infinite, for an individual connection attempt. - * + * * @throws NullPointerException if {@code connectionTimeout} is null */ public ConnectionOptions withConnectionTimeout(Duration connectionTimeout) { @@ -155,7 +168,7 @@ public ConnectionOptions withConnectionTimeout(Duration connectionTimeout) { /** * Sets the executor used to handle consumer callbacks. The {@code executor} will not be shutdown * when a connection is closed. - * + * * @throws NullPointerException if {@code executor} is null */ public ConnectionOptions withConsumerExecutor(ExecutorService executor) { @@ -165,7 +178,7 @@ public ConnectionOptions withConsumerExecutor(ExecutorService executor) { /** * Sets the {@code host}. - * + * * @throws NullPointerException if {@code host} is null */ public ConnectionOptions withHost(String host) { @@ -175,17 +188,18 @@ public ConnectionOptions withHost(String host) { /** * Sets the {@code hosts} to attempt connections to, in round-robin order. - * + * * @throws NullPointerException if {@code hosts} is null */ public ConnectionOptions withHosts(String... hosts) { - this.addresses = Addresses.addressesFor(Assert.notNull(hosts, "hosts"), 5672); + Address[] _addresses = Addresses.addressesFor(Assert.notNull(hosts, "hosts"), 5672); + this.addressResolver = new DefaultAddressResolver(_addresses); return this; } /** * Sets the connection name. Used for logging and consumer thread naming. - * + * * @throws NullPointerException if {@code name} is null */ public ConnectionOptions withName(String name) { @@ -211,7 +225,7 @@ public ConnectionOptions withPort(int port) { /** * Set the requested heartbeat, zero for none. - * + * * @throws NullPointerException if {@code requestedHeartbeat} is null */ public ConnectionOptions withRequestedHeartbeat(Duration requestedHeartbeat) { @@ -221,7 +235,7 @@ public ConnectionOptions withRequestedHeartbeat(Duration requestedHeartbeat) { /** * Sets the SocketFactory to create connections with. - * + * * @throws NullPointerException if {@code hosts} is null */ public ConnectionOptions withSocketFactory(SocketFactory socketFactory) { @@ -239,7 +253,7 @@ public ConnectionOptions withSsl() throws NoSuchAlgorithmException, KeyManagemen /** * Sets the initialized {@code sslContext} to use. - * + * * @throws NullPointerException if {@code sslContext} is null */ public ConnectionOptions withSslProtocol(SSLContext sslContext) { @@ -249,7 +263,7 @@ public ConnectionOptions withSslProtocol(SSLContext sslContext) { /** * Sets the {@code sslProtocol} to use. - * + * * @throws NullPointerException if {@code sslProtocol} is null */ public ConnectionOptions withSslProtocol(String sslProtocol) throws NoSuchAlgorithmException, @@ -260,7 +274,7 @@ public ConnectionOptions withSslProtocol(String sslProtocol) throws NoSuchAlgori /** * Sets the {@code sslProtocol} and {@code trustManager} to use. - * + * * @throws NullPointerException if {@code sslProtocol} or {@code trustManager} are null */ public ConnectionOptions withSslProtocol(String sslProtocol, TrustManager trustManager) @@ -272,7 +286,7 @@ public ConnectionOptions withSslProtocol(String sslProtocol, TrustManager trustM /** * Sets the username. - * + * * @throws NullPointerException if {@code username} is null */ public ConnectionOptions withUsername(String username) { @@ -282,7 +296,7 @@ public ConnectionOptions withUsername(String username) { /** * Sets the virtual host. - * + * * @throws NullPointerException if {@code virtualHost} is null */ public ConnectionOptions withVirtualHost(String virtualHost) { diff --git a/src/main/java/net/jodah/lyra/config/AddressResolver.java b/src/main/java/net/jodah/lyra/config/AddressResolver.java new file mode 100644 index 0000000..6329fd1 --- /dev/null +++ b/src/main/java/net/jodah/lyra/config/AddressResolver.java @@ -0,0 +1,13 @@ +package net.jodah.lyra.config; + +import com.rabbitmq.client.Address; + +/** + * Address resolver to resolve the addresses + * + * @author Srinath C + */ +public interface AddressResolver { + + Address[] resolveAddresses(); +} diff --git a/src/main/java/net/jodah/lyra/config/DefaultAddressResolver.java b/src/main/java/net/jodah/lyra/config/DefaultAddressResolver.java new file mode 100644 index 0000000..25698ce --- /dev/null +++ b/src/main/java/net/jodah/lyra/config/DefaultAddressResolver.java @@ -0,0 +1,22 @@ +package net.jodah.lyra.config; + +import com.rabbitmq.client.Address; + +/** + * Address resolver to resolve the addresses in a round-robin manner + * + * @author Srinath C + */ +public class DefaultAddressResolver implements AddressResolver { + + private final Address[] addresses; + + public DefaultAddressResolver(Address[] addresses) { + this.addresses = addresses; + } + + @Override + public Address[] resolveAddresses() { + return addresses; + } +} diff --git a/src/main/java/net/jodah/lyra/internal/ConnectionHandler.java b/src/main/java/net/jodah/lyra/internal/ConnectionHandler.java index 9de46a3..afb6386 100644 --- a/src/main/java/net/jodah/lyra/internal/ConnectionHandler.java +++ b/src/main/java/net/jodah/lyra/internal/ConnectionHandler.java @@ -1,5 +1,15 @@ package net.jodah.lyra.internal; +import com.rabbitmq.client.*; +import net.jodah.lyra.ConnectionOptions; +import net.jodah.lyra.config.Config; +import net.jodah.lyra.config.ConfigurableChannel; +import net.jodah.lyra.config.ConnectionConfig; +import net.jodah.lyra.event.ChannelListener; +import net.jodah.lyra.event.ConnectionListener; +import net.jodah.lyra.internal.util.Reflection; +import net.jodah.lyra.internal.util.concurrent.NamedThreadFactory; + import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; @@ -11,24 +21,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import net.jodah.lyra.ConnectionOptions; -import net.jodah.lyra.config.Config; -import net.jodah.lyra.config.ConfigurableChannel; -import net.jodah.lyra.config.ConnectionConfig; -import net.jodah.lyra.event.ChannelListener; -import net.jodah.lyra.event.ConnectionListener; -import net.jodah.lyra.internal.util.Reflection; -import net.jodah.lyra.internal.util.concurrent.NamedThreadFactory; - -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.ShutdownListener; -import com.rabbitmq.client.ShutdownSignalException; - /** * Handles connection method invocations. - * + * * @author Jonathan Halterman */ public class ConnectionHandler extends RetryableResource implements InvocationHandler { @@ -205,13 +200,13 @@ private void createConnection(RecurringPolicy recurringPolicy, final boolean delegate = callWithRetries(new Callable() { @Override public Connection call() throws IOException { - log.info("{} connection {} to {}", recovery ? "Recovering" : "Creating", connectionName, - options.getAddresses()); + Address[] addresses = options.getAddresses(); + log.info("{} connection {} to {}", recovery ? "Recovering" : "Creating", connectionName, addresses); ExecutorService consumerPool = options.getConsumerExecutor() == null ? Executors.newCachedThreadPool(new NamedThreadFactory( String.format("rabbitmq-%s-consumer", connectionName))) : options.getConsumerExecutor(); ConnectionFactory cxnFactory = options.getConnectionFactory(); - Connection connection = cxnFactory.newConnection(consumerPool, options.getAddresses()); + Connection connection = cxnFactory.newConnection(consumerPool, addresses); final String amqpAddress = String.format("amqp://%s:%s/%s", connection.getAddress() .getHostAddress(), connection.getPort(), "/".equals(cxnFactory.getVirtualHost()) ? "" : cxnFactory.getVirtualHost()); diff --git a/src/test/java/net/jodah/lyra/config/AddressResolverTest.java b/src/test/java/net/jodah/lyra/config/AddressResolverTest.java new file mode 100644 index 0000000..2d6211f --- /dev/null +++ b/src/test/java/net/jodah/lyra/config/AddressResolverTest.java @@ -0,0 +1,36 @@ +package net.jodah.lyra.config; + +import com.rabbitmq.client.Address; +import net.jodah.lyra.ConnectionOptions; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class AddressResolverTest { + + @Test + public void resolveAddresses() { + ConnectionOptions options = new ConnectionOptions(); + Address[] testAddresses = Address.parseAddresses("localhost:5672,localhost:5673,localhost:5674"); + options = options.withAddresses(testAddresses); + assertEquals(options.getAddresses(), testAddresses, "Addresses retrieved did not match expected address"); + } + + @Test + public void customResolver() { + ConnectionOptions options = new ConnectionOptions(); + final Address[] addresses1 = Address.parseAddresses("host1:5672,host2:5672,host3:5672"); + final Address[] addresses2 = Address.parseAddresses("host4:5672,host5:5672,host6:5672"); + options.withAddressResolver(new AddressResolver() { + int counter = -1; + + @Override + public Address[] resolveAddresses() { + counter++; + return (counter % 2 == 0) ? addresses1 : addresses2; + } + }); + assertEquals(options.getAddresses(), addresses1, "Addresses retrieved did not match expected address"); + assertEquals(options.getAddresses(), addresses2, "Addresses retrieved did not match expected address"); + } +}