Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 51 additions & 37 deletions src/main/java/net/jodah/lyra/ConnectionOptions.java
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package net.jodah.lyra;

import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.concurrent.ExecutorService;

import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.ConnectionFactory;
import net.jodah.lyra.config.AddressResolver;
import net.jodah.lyra.config.DefaultAddressResolver;
import net.jodah.lyra.internal.util.Addresses;
import net.jodah.lyra.internal.util.Assert;
import net.jodah.lyra.util.Duration;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.ConnectionFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/**
* Connection options. Changes will not effect connections that have already been created.
*
*
* @author Jonathan Halterman
*/
public class ConnectionOptions {
private ConnectionFactory factory;
private String host = "localhost";
private Address[] addresses;
private AddressResolver addressResolver;
private String name;
private ExecutorService executor;

Expand All @@ -34,7 +34,7 @@ public ConnectionOptions() {

/**
* Creates a new Options object for the {@code connectionFactory}.
*
*
* @throws NullPointerException if {@code connectionFactory} is null
*/
public ConnectionOptions(ConnectionFactory connectionFactory) {
Expand All @@ -57,7 +57,7 @@ private ConnectionOptions(ConnectionOptions options) {
factory.setSaslConfig(options.factory.getSaslConfig());
factory.setSocketFactory(options.factory.getSocketFactory());
host = options.host;
addresses = options.addresses;
addressResolver = options.addressResolver;
name = options.name;
executor = options.executor;
}
Expand All @@ -71,14 +71,15 @@ public ConnectionOptions copy() {

/**
* Returns the addresses to attempt connections to, in round-robin order.
*
*
* @see #withAddresses(Address...)
* @see #withAddresses(String)
* @see #withHost(String)
* @see #withHosts(String...)
*/
public Address[] getAddresses() {
return addresses == null ? new Address[] { new Address(host, factory.getPort()) } : addresses;
Address address = new Address(host, factory.getPort());
return addressResolver == null ? new Address[] {address} : addressResolver.resolveAddresses();
}

/**
Expand All @@ -90,7 +91,7 @@ public ConnectionFactory getConnectionFactory() {

/**
* Returns the consumer executor.
*
*
* @see #withConsumerExecutor(ExecutorService)
*/
public ExecutorService getConsumerExecutor() {
Expand All @@ -101,30 +102,42 @@ public String getName() {
return name;
}

/**
* Sets the {@code addressResolver} to resolve addresses
*
* @throws NullPointerException if {@code addressResolver} is null
*/
public ConnectionOptions withAddressResolver(AddressResolver addressResolver) {
this.addressResolver = Assert.notNull(addressResolver, "addressResolver");
return this;
}

/**
* Sets the {@code addresses} to attempt connections to, in round-robin order.
*
*
* @throws NullPointerException if {@code addresses} is null
*/
public ConnectionOptions withAddresses(Address... addresses) {
this.addresses = Assert.notNull(addresses, "addresses");
Address[] _addresses = Assert.notNull(addresses, "addresses");
this.addressResolver = new DefaultAddressResolver(_addresses);
return this;
}

/**
* Sets the {@code addresses}.
*
*
* @param addresses formatted as "host1[:port],host2[:port]", etc.
* @throws NullPointerException if {@code addresses} is null
*/
public ConnectionOptions withAddresses(String addresses) {
this.addresses = Address.parseAddresses(Assert.notNull(addresses, "addresses"));
Address[] _addresses = Address.parseAddresses(Assert.notNull(addresses, "addresses"));
this.addressResolver = new DefaultAddressResolver(_addresses);
return this;
}

/**
* Sets the client properties.
*
*
* @throws NullPointerException if {@code clientProperties} is null
*/
public ConnectionOptions withClientProperties(Map<String, Object> clientProperties) {
Expand All @@ -134,7 +147,7 @@ public ConnectionOptions withClientProperties(Map<String, Object> clientProperti

/**
* Sets the {@code connectionFactory}.
*
*
* @throws NullPointerException if {@code connectionFactory} is null
*/
public ConnectionOptions withConnectionFactory(ConnectionFactory connectionFactory) {
Expand All @@ -144,7 +157,7 @@ public ConnectionOptions withConnectionFactory(ConnectionFactory connectionFacto

/**
* Set the connection timeout, zero for infinite, for an individual connection attempt.
*
*
* @throws NullPointerException if {@code connectionTimeout} is null
*/
public ConnectionOptions withConnectionTimeout(Duration connectionTimeout) {
Expand All @@ -155,7 +168,7 @@ public ConnectionOptions withConnectionTimeout(Duration connectionTimeout) {
/**
* Sets the executor used to handle consumer callbacks. The {@code executor} will not be shutdown
* when a connection is closed.
*
*
* @throws NullPointerException if {@code executor} is null
*/
public ConnectionOptions withConsumerExecutor(ExecutorService executor) {
Expand All @@ -165,7 +178,7 @@ public ConnectionOptions withConsumerExecutor(ExecutorService executor) {

/**
* Sets the {@code host}.
*
*
* @throws NullPointerException if {@code host} is null
*/
public ConnectionOptions withHost(String host) {
Expand All @@ -175,17 +188,18 @@ public ConnectionOptions withHost(String host) {

/**
* Sets the {@code hosts} to attempt connections to, in round-robin order.
*
*
* @throws NullPointerException if {@code hosts} is null
*/
public ConnectionOptions withHosts(String... hosts) {
this.addresses = Addresses.addressesFor(Assert.notNull(hosts, "hosts"), 5672);
Address[] _addresses = Addresses.addressesFor(Assert.notNull(hosts, "hosts"), 5672);
this.addressResolver = new DefaultAddressResolver(_addresses);
return this;
}

/**
* Sets the connection name. Used for logging and consumer thread naming.
*
*
* @throws NullPointerException if {@code name} is null
*/
public ConnectionOptions withName(String name) {
Expand All @@ -211,7 +225,7 @@ public ConnectionOptions withPort(int port) {

/**
* Set the requested heartbeat, zero for none.
*
*
* @throws NullPointerException if {@code requestedHeartbeat} is null
*/
public ConnectionOptions withRequestedHeartbeat(Duration requestedHeartbeat) {
Expand All @@ -221,7 +235,7 @@ public ConnectionOptions withRequestedHeartbeat(Duration requestedHeartbeat) {

/**
* Sets the SocketFactory to create connections with.
*
*
* @throws NullPointerException if {@code hosts} is null
*/
public ConnectionOptions withSocketFactory(SocketFactory socketFactory) {
Expand All @@ -239,7 +253,7 @@ public ConnectionOptions withSsl() throws NoSuchAlgorithmException, KeyManagemen

/**
* Sets the initialized {@code sslContext} to use.
*
*
* @throws NullPointerException if {@code sslContext} is null
*/
public ConnectionOptions withSslProtocol(SSLContext sslContext) {
Expand All @@ -249,7 +263,7 @@ public ConnectionOptions withSslProtocol(SSLContext sslContext) {

/**
* Sets the {@code sslProtocol} to use.
*
*
* @throws NullPointerException if {@code sslProtocol} is null
*/
public ConnectionOptions withSslProtocol(String sslProtocol) throws NoSuchAlgorithmException,
Expand All @@ -260,7 +274,7 @@ public ConnectionOptions withSslProtocol(String sslProtocol) throws NoSuchAlgori

/**
* Sets the {@code sslProtocol} and {@code trustManager} to use.
*
*
* @throws NullPointerException if {@code sslProtocol} or {@code trustManager} are null
*/
public ConnectionOptions withSslProtocol(String sslProtocol, TrustManager trustManager)
Expand All @@ -272,7 +286,7 @@ public ConnectionOptions withSslProtocol(String sslProtocol, TrustManager trustM

/**
* Sets the username.
*
*
* @throws NullPointerException if {@code username} is null
*/
public ConnectionOptions withUsername(String username) {
Expand All @@ -282,7 +296,7 @@ public ConnectionOptions withUsername(String username) {

/**
* Sets the virtual host.
*
*
* @throws NullPointerException if {@code virtualHost} is null
*/
public ConnectionOptions withVirtualHost(String virtualHost) {
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/net/jodah/lyra/config/AddressResolver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package net.jodah.lyra.config;

import com.rabbitmq.client.Address;

/**
* Address resolver to resolve the addresses
*
* @author Srinath C
*/
public interface AddressResolver {

Address[] resolveAddresses();
}
22 changes: 22 additions & 0 deletions src/main/java/net/jodah/lyra/config/DefaultAddressResolver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package net.jodah.lyra.config;

import com.rabbitmq.client.Address;

/**
* Address resolver to resolve the addresses in a round-robin manner
*
* @author Srinath C
*/
public class DefaultAddressResolver implements AddressResolver {

private final Address[] addresses;

public DefaultAddressResolver(Address[] addresses) {
this.addresses = addresses;
}

@Override
public Address[] resolveAddresses() {
return addresses;
}
}
33 changes: 14 additions & 19 deletions src/main/java/net/jodah/lyra/internal/ConnectionHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package net.jodah.lyra.internal;

import com.rabbitmq.client.*;
import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.config.Config;
import net.jodah.lyra.config.ConfigurableChannel;
import net.jodah.lyra.config.ConnectionConfig;
import net.jodah.lyra.event.ChannelListener;
import net.jodah.lyra.event.ConnectionListener;
import net.jodah.lyra.internal.util.Reflection;
import net.jodah.lyra.internal.util.concurrent.NamedThreadFactory;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
Expand All @@ -11,24 +21,9 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import net.jodah.lyra.ConnectionOptions;
import net.jodah.lyra.config.Config;
import net.jodah.lyra.config.ConfigurableChannel;
import net.jodah.lyra.config.ConnectionConfig;
import net.jodah.lyra.event.ChannelListener;
import net.jodah.lyra.event.ConnectionListener;
import net.jodah.lyra.internal.util.Reflection;
import net.jodah.lyra.internal.util.concurrent.NamedThreadFactory;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;

/**
* Handles connection method invocations.
*
*
* @author Jonathan Halterman
*/
public class ConnectionHandler extends RetryableResource implements InvocationHandler {
Expand Down Expand Up @@ -205,13 +200,13 @@ private void createConnection(RecurringPolicy<?> recurringPolicy, final boolean
delegate = callWithRetries(new Callable<Connection>() {
@Override
public Connection call() throws IOException {
log.info("{} connection {} to {}", recovery ? "Recovering" : "Creating", connectionName,
options.getAddresses());
Address[] addresses = options.getAddresses();
log.info("{} connection {} to {}", recovery ? "Recovering" : "Creating", connectionName, addresses);
ExecutorService consumerPool = options.getConsumerExecutor() == null ? Executors.newCachedThreadPool(new NamedThreadFactory(
String.format("rabbitmq-%s-consumer", connectionName)))
: options.getConsumerExecutor();
ConnectionFactory cxnFactory = options.getConnectionFactory();
Connection connection = cxnFactory.newConnection(consumerPool, options.getAddresses());
Connection connection = cxnFactory.newConnection(consumerPool, addresses);
final String amqpAddress = String.format("amqp://%s:%s/%s", connection.getAddress()
.getHostAddress(), connection.getPort(), "/".equals(cxnFactory.getVirtualHost()) ? ""
: cxnFactory.getVirtualHost());
Expand Down
36 changes: 36 additions & 0 deletions src/test/java/net/jodah/lyra/config/AddressResolverTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package net.jodah.lyra.config;

import com.rabbitmq.client.Address;
import net.jodah.lyra.ConnectionOptions;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;

public class AddressResolverTest {

@Test
public void resolveAddresses() {
ConnectionOptions options = new ConnectionOptions();
Address[] testAddresses = Address.parseAddresses("localhost:5672,localhost:5673,localhost:5674");
options = options.withAddresses(testAddresses);
assertEquals(options.getAddresses(), testAddresses, "Addresses retrieved did not match expected address");
}

@Test
public void customResolver() {
ConnectionOptions options = new ConnectionOptions();
final Address[] addresses1 = Address.parseAddresses("host1:5672,host2:5672,host3:5672");
final Address[] addresses2 = Address.parseAddresses("host4:5672,host5:5672,host6:5672");
options.withAddressResolver(new AddressResolver() {
int counter = -1;

@Override
public Address[] resolveAddresses() {
counter++;
return (counter % 2 == 0) ? addresses1 : addresses2;
}
});
assertEquals(options.getAddresses(), addresses1, "Addresses retrieved did not match expected address");
assertEquals(options.getAddresses(), addresses2, "Addresses retrieved did not match expected address");
}
}