From 812fa63f60269c15d8b95245055d182b8ffd1bf0 Mon Sep 17 00:00:00 2001 From: bigmarvin Date: Mon, 31 Aug 2020 16:16:36 +0800 Subject: [PATCH] Resolve hostname of node again during reconnection This is essential when memcached cluster is built on cloud infra, e.g. k8s, where nodes move and their ips vary. --- .../spy/memcached/MemcachedConnection.java | 2 +- .../java/net/spy/memcached/MemcachedNode.java | 8 +++ .../spy/memcached/MemcachedNodeROImpl.java | 4 ++ .../protocol/TCPMemcachedNodeImpl.java | 19 ++++++- .../net/spy/memcached/MockMemcachedNode.java | 4 ++ .../binary/AddressResolutionTest.java | 54 +++++++++++++++++++ 6 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 src/test/java/net/spy/memcached/protocol/binary/AddressResolutionTest.java diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index 47f432fb1..11ab707ca 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -1131,7 +1131,7 @@ private void attemptReconnects() { ch.configureBlocking(false); ch.socket().setTcpNoDelay(!connectionFactory.useNagleAlgorithm()); int ops = 0; - if (ch.connect(node.getSocketAddress())) { + if (ch.connect(node.getSocketAddress(true))) { connected(node); addedQueue.offer(node); getLogger().info("Immediately reconnected to %s", node); diff --git a/src/main/java/net/spy/memcached/MemcachedNode.java b/src/main/java/net/spy/memcached/MemcachedNode.java index c86f96e70..b2342efd7 100644 --- a/src/main/java/net/spy/memcached/MemcachedNode.java +++ b/src/main/java/net/spy/memcached/MemcachedNode.java @@ -135,6 +135,14 @@ public interface MemcachedNode { */ SocketAddress getSocketAddress(); + /** + * Get the SocketAddress of the server to which this node is connected, and resolve it again if specified. + * + * @param resolve whether to resolve and update the address + * @return The SocketAddress of the server to which this node is connected + */ + SocketAddress getSocketAddress(boolean resolve); + /** * True if this node is active. i.e. is is currently connected and * expected to be able to process requests diff --git a/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java b/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java index d3ae9c497..f0b1666e4 100644 --- a/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java +++ b/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java @@ -106,6 +106,10 @@ public SocketAddress getSocketAddress() { return root.getSocketAddress(); } + public SocketAddress getSocketAddress(boolean resolve) { + return root.getSocketAddress(resolve); + } + public ByteBuffer getWbuf() { throw new UnsupportedOperationException(); } diff --git a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java index 52deeb04b..f50056333 100644 --- a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java +++ b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java @@ -24,6 +24,7 @@ package net.spy.memcached.protocol; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; @@ -51,7 +52,7 @@ public abstract class TCPMemcachedNodeImpl extends SpyObject implements MemcachedNode { - private final SocketAddress socketAddress; + private SocketAddress socketAddress; private final ByteBuffer rbuf; private final ByteBuffer wbuf; protected final BlockingQueue writeQ; @@ -426,6 +427,22 @@ public final ByteBuffer getWbuf() { * @see net.spy.memcached.MemcachedNode#getSocketAddress() */ public final SocketAddress getSocketAddress() { + return getSocketAddress(false); + } + + public final SocketAddress getSocketAddress(boolean resolve) { + if (resolve && socketAddress instanceof InetSocketAddress) { + InetSocketAddress originalAddress = (InetSocketAddress) socketAddress; + InetSocketAddress resolvedAddress = new InetSocketAddress( + originalAddress.getHostName(), originalAddress.getPort()); + + if (!originalAddress.equals(resolvedAddress)) { + socketAddress = resolvedAddress; + getLogger().info("node address changed from %s to %s", + originalAddress, resolvedAddress); + } + } + return socketAddress; } diff --git a/src/test/java/net/spy/memcached/MockMemcachedNode.java b/src/test/java/net/spy/memcached/MockMemcachedNode.java index b35c32949..4d927bfb0 100644 --- a/src/test/java/net/spy/memcached/MockMemcachedNode.java +++ b/src/test/java/net/spy/memcached/MockMemcachedNode.java @@ -43,6 +43,10 @@ public SocketAddress getSocketAddress() { return socketAddress; } + public SocketAddress getSocketAddress(boolean resolve) { + return socketAddress; // sufficiently good + } + public MockMemcachedNode(InetSocketAddress socketAddress) { this.socketAddress = socketAddress; } diff --git a/src/test/java/net/spy/memcached/protocol/binary/AddressResolutionTest.java b/src/test/java/net/spy/memcached/protocol/binary/AddressResolutionTest.java new file mode 100644 index 000000000..9b14afde5 --- /dev/null +++ b/src/test/java/net/spy/memcached/protocol/binary/AddressResolutionTest.java @@ -0,0 +1,54 @@ +package net.spy.memcached.protocol.binary; + +import junit.framework.Assert; +import net.spy.memcached.DefaultConnectionFactory; +import net.spy.memcached.MemcachedNode; +import net.spy.memcached.ops.Operation; +import org.junit.Test; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SocketChannel; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +public class AddressResolutionTest { + @Test + public void testResolveAddressWithHostname() throws Exception { + BlockingQueue queue = new ArrayBlockingQueue<>(1); + MemcachedNode node = new BinaryMemcachedNodeImpl(new InetSocketAddress("memcached.org", 80), + SocketChannel.open(), 1024, queue, queue, queue, 1024L, false, + 1024, 1024, new DefaultConnectionFactory()); + SocketAddress originalSocketAddress = node.getSocketAddress(); + SocketAddress resolvedSocketAddress = node.getSocketAddress(true); + Assert.assertEquals("socket address with hostname couldn't be resolved", + originalSocketAddress, resolvedSocketAddress); + } + + @Test + public void testResolveAddressWithAddress() throws Exception { + BlockingQueue queue = new ArrayBlockingQueue<>(1); + MemcachedNode node = new BinaryMemcachedNodeImpl( + new InetSocketAddress(InetAddress.getByName("memcached.org"), 80), + SocketChannel.open(), 1024, queue, queue, queue, 1024L, false, + 1024, 1024, new DefaultConnectionFactory()); + SocketAddress originalSocketAddress = node.getSocketAddress(); + SocketAddress resolvedSocketAddress = node.getSocketAddress(true); + Assert.assertEquals("socket address with address couldn't be resolved", + originalSocketAddress, resolvedSocketAddress); + } + + @Test + public void testResolveAddressWithAddressAsHostname() throws Exception { + BlockingQueue queue = new ArrayBlockingQueue<>(1); + MemcachedNode node = new BinaryMemcachedNodeImpl( + new InetSocketAddress(InetAddress.getByName("memcached.org").getHostAddress(), 80), + SocketChannel.open(), 1024, queue, queue, queue, 1024L, false, + 1024, 1024, new DefaultConnectionFactory()); + SocketAddress originalSocketAddress = node.getSocketAddress(); + SocketAddress resolvedSocketAddress = node.getSocketAddress(true); + Assert.assertEquals("socket address with address as hostname couldn't be resolved", + originalSocketAddress, resolvedSocketAddress); + } +}