From 80710926c5677735737ba31af99beb83027c1a62 Mon Sep 17 00:00:00 2001 From: KimStrand <231482327+KimStrand@users.noreply.github.com> Date: Tue, 11 Nov 2025 16:49:39 +0100 Subject: [PATCH] Introduce netty non blocking io. --- .../java/bisq/account/accounts/Account.java | 5 - .../bisq/account/accounts/AccountPayload.java | 5 - gradle/libs.versions.toml | 13 + network/network/build.gradle.kts | 2 + .../bisq/network/p2p/node/Connection.java | 133 +++----- .../network/p2p/node/InboundConnection.java | 6 +- .../main/java/bisq/network/p2p/node/Node.java | 309 ++++++------------ .../network/p2p/node/OutboundConnection.java | 6 +- .../p2p/node/handshake/HandshakeHandler.java | 87 +++++ .../handshake/InboundHandshakeHandler.java | 137 ++++++++ .../handshake/OutboundHandshakeHandler.java | 193 +++++++++++ .../transport/ClearNetTransportService.java | 126 ++++++- .../node/transport/I2PTransportService.java | 23 +- .../node/transport/TorTransportService.java | 157 ++++++++- .../p2p/node/transport/TransportService.java | 21 ++ .../java/bisq/network/tor/TorService.java | 32 +- 16 files changed, 924 insertions(+), 331 deletions(-) create mode 100644 network/network/src/main/java/bisq/network/p2p/node/handshake/HandshakeHandler.java create mode 100644 network/network/src/main/java/bisq/network/p2p/node/handshake/InboundHandshakeHandler.java create mode 100644 network/network/src/main/java/bisq/network/p2p/node/handshake/OutboundHandshakeHandler.java diff --git a/account/src/main/java/bisq/account/accounts/Account.java b/account/src/main/java/bisq/account/accounts/Account.java index 5cdf7351f8..ba7cef8dae 100644 --- a/account/src/main/java/bisq/account/accounts/Account.java +++ b/account/src/main/java/bisq/account/accounts/Account.java @@ -65,11 +65,6 @@ public bisq.account.protobuf.Account toProto(boolean serializeForHash) { return resolveProto(serializeForHash); } - @Override - public bisq.account.protobuf.Account completeProto() { - return toProto(false); - } - protected bisq.account.protobuf.Account.Builder getAccountBuilder(boolean serializeForHash) { return bisq.account.protobuf.Account.newBuilder() .setId(id) diff --git a/account/src/main/java/bisq/account/accounts/AccountPayload.java b/account/src/main/java/bisq/account/accounts/AccountPayload.java index 6123576d91..f897aed923 100644 --- a/account/src/main/java/bisq/account/accounts/AccountPayload.java +++ b/account/src/main/java/bisq/account/accounts/AccountPayload.java @@ -65,11 +65,6 @@ public bisq.account.protobuf.AccountPayload toProto(boolean serializeForHash) { return resolveProto(serializeForHash); } - @Override - public bisq.account.protobuf.AccountPayload completeProto() { - return toProto(false); - } - protected bisq.account.protobuf.AccountPayload.Builder getAccountPayloadBuilder(boolean serializeForHash) { return bisq.account.protobuf.AccountPayload.newBuilder() .setId(id); diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index a081fb12ec..1c6c1b1dd0 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -75,6 +75,8 @@ typesafe-config-lib = { strictly = '1.4.3' } zxing-lib = { strictly = '3.5.3' } +netty-lib = { strictly = '4.2.7.Final' } + # Referenced in subproject's build.gradle > dependencies block in the form 'implementation libs.guava' # Note: keys can contain dash (protobuf-java) but the dash is replaced by dot when referenced # in a build.gradle ('implementation libs.protobuf.java') @@ -174,6 +176,15 @@ typesafe-config = { module = 'com.typesafe:config', version.ref = 'typesafe-conf zxing = { module = 'com.google.zxing:javase', version.ref = 'zxing-lib' } +netty-all = { module = 'io.netty:netty-all', version.ref = 'netty-lib' } +netty-transport = { module = 'io.netty:netty-transport', version.ref = 'netty-lib' } +netty-buffer = { module = 'io.netty:netty-buffer', version.ref = 'netty-lib' } +netty-common = { module = 'io.netty:netty-common', version.ref = 'netty-lib' } +netty-handler = { module = 'io.netty:netty-handler', version.ref = 'netty-lib' } +netty-handler-proxy = { module = 'io.netty:netty-handler-proxy', version.ref = 'netty-lib' } +netty-codec-socks = { module = 'io.netty:netty-codec-socks', version.ref = 'netty-lib' } +netty-codec-protobuf = { module = 'io.netty:netty-codec-protobuf', version.ref = 'netty-lib' } + # Defines groups of libs that are commonly used together # Referenced in dependencies block as 'implementation libs.bundles.i2p' [bundles] @@ -190,6 +201,8 @@ rest-api-libs = ['swagger-jaxrs2-jakarta', 'glassfish-jersey-jdk-http', 'glassfi 'jackson-core', 'jackson-annotations', 'jackson-databind', 'jackson-datatype'] websocket-libs = ['glassfish-jersey-json-jackson', 'glassfish-jersey-server', 'glassfish-jersey-containers-grizzly', 'glassfish-grizzly-websockets-server', 'jakarta-websocket', 'jackson-databind', 'jackson-datatype', 'swagger-swagger-annotations'] +nonblockingio = ['netty-all', 'netty-transport', 'netty-buffer', 'netty-common', 'netty-handler', 'netty-handler-proxy', + 'netty-codec-socks', 'netty-codec-protobuf'] # Referenced in subproject's build.gradle > plugin block as alias: `alias(libs.plugins.protobuf)` # Note: plugin version constraints are not supported by the java-platform plugin, so cannot be enforced there. However, diff --git a/network/network/build.gradle.kts b/network/network/build.gradle.kts index 76bcfe4fe1..1909467a80 100644 --- a/network/network/build.gradle.kts +++ b/network/network/build.gradle.kts @@ -23,5 +23,7 @@ dependencies { implementation(libs.jsocks) implementation(libs.bundles.i2p) + implementation(libs.bundles.nonblockingio) + integrationTestImplementation(libs.mockito) } \ No newline at end of file diff --git a/network/network/src/main/java/bisq/network/p2p/node/Connection.java b/network/network/src/main/java/bisq/network/p2p/node/Connection.java index c25e973fe0..4d87d38abc 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/Connection.java +++ b/network/network/src/main/java/bisq/network/p2p/node/Connection.java @@ -18,8 +18,6 @@ package bisq.network.p2p.node; import bisq.common.network.Address; -import bisq.common.network.DefaultPeerSocket; -import bisq.common.network.PeerSocket; import bisq.common.network.TransportType; import bisq.common.threading.AbortPolicyWithLogging; import bisq.common.threading.ExecutorFactory; @@ -36,21 +34,19 @@ import bisq.network.p2p.node.envelope.NetworkEnvelopeSocket; import bisq.network.p2p.node.network_load.ConnectionMetrics; import bisq.network.p2p.node.network_load.NetworkLoadSnapshot; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; import lombok.AccessLevel; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import javax.annotation.Nullable; import java.io.EOFException; -import java.io.IOException; -import java.net.Socket; import java.util.Comparator; import java.util.Date; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -96,6 +92,7 @@ public interface Listener { } private final AuthorizationService authorizationService; + private final ChannelHandlerContext context; @Getter private final String id; @Getter @@ -111,8 +108,6 @@ public interface Listener { private final ConnectionThrottle connectionThrottle; private final Handler handler; private final Set listeners = new CopyOnWriteArraySet<>(); - @Nullable - private Future inputHandlerFuture; // We use counter value 0 in the handshake, thus we start here with 1 as it's not the first message @Getter(AccessLevel.PACKAGE) private final AtomicInteger sentMessageCounter = new AtomicInteger(1); @@ -121,10 +116,11 @@ public interface Listener { private volatile boolean listeningStopped; private final ThreadPoolExecutor readExecutor; private final ThreadPoolExecutor sendExecutor; + private final SimpleChannelInboundHandler inboundMessageHandler; protected Connection(AuthorizationService authorizationService, + ChannelHandlerContext context, String connectionId, - Socket socket, Capability peersCapability, NetworkLoadSnapshot peersNetworkLoadSnapshot, ConnectionMetrics connectionMetrics, @@ -132,6 +128,7 @@ protected Connection(AuthorizationService authorizationService, Handler handler, BiConsumer errorHandler) { this.authorizationService = authorizationService; + this.context = context; this.id = connectionId; this.peersCapability = peersCapability; this.peersNetworkLoadSnapshot = peersNetworkLoadSnapshot; @@ -143,63 +140,41 @@ protected Connection(AuthorizationService authorizationService, readExecutor = createReadExecutor(); sendExecutor = createSendExecutor(); - try { - PeerSocket peerSocket = new DefaultPeerSocket(socket); - this.networkEnvelopeSocket = new NetworkEnvelopeSocket(peerSocket); - } catch (IOException exception) { - log.error("Could not create objectOutputStream/objectInputStream for socket {}", socket, exception); - errorHandler.accept(this, exception); - shutdown(CloseReason.EXCEPTION.exception(exception)); - return; - } - try { - inputHandlerFuture = readExecutor.submit(() -> { + inboundMessageHandler = new SimpleChannelInboundHandler<>() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, bisq.network.protobuf.NetworkEnvelope proto) { try { - long readTs = 0; - while (isInputStreamActive()) { - if (readTs != 0) { - log.debug("Processing message took {} ms. Wait for new message from {}. ", System.currentTimeMillis() - readTs, getPeerAddress()); - } else { - log.debug("Wait for new message from {}", getPeerAddress()); - } - var proto = networkEnvelopeSocket.receiveNextEnvelope(); - readTs = System.currentTimeMillis(); - if (proto == null) { - log.info("Proto from networkEnvelopeSocket.receiveNextEnvelope() is null. " + - "This is expected if the input stream has reached EOF. We shut down the connection."); - shutdown(CloseReason.EXCEPTION.exception(new EOFException("Input stream reached EOF"))); - return; - } + if (proto == null) { + log.info("Proto from networkEnvelopeSocket.receiveNextEnvelope() is null. " + + "This is expected if the input stream has reached EOF. We shut down the connection."); + shutdown(CloseReason.EXCEPTION.exception(new EOFException("Input stream reached EOF"))); + return; + } - // receiveNextEnvelope might need some time wo we check again if connection is still active - if (!isInputStreamActive()) { - return; - } + // receiveNextEnvelope might need some time wo we check again if connection is still active + if (!isActive()) { + return; + } - connectionThrottle.throttleReceiveMessage(); - // ThrottleReceiveMessage can cause a delay by Thread.sleep - if (!isInputStreamActive()) { - return; - } - long ts = System.currentTimeMillis(); - NetworkEnvelope networkEnvelope = NetworkEnvelope.fromProto(proto); - long deserializeTime = System.currentTimeMillis() - ts; - networkEnvelope.verifyVersion(); - connectionMetrics.onReceived(networkEnvelope, deserializeTime); - - EnvelopePayloadMessage envelopePayloadMessage = networkEnvelope.getEnvelopePayloadMessage(); - log.debug("Received message: {} at: {}", - StringUtils.truncate(envelopePayloadMessage.toString(), 200), this); - requestResponseManager.onReceived(envelopePayloadMessage); - - if (isInputStreamActive()) { - boolean isMessageAuthorized = handler.isMessageAuthorized(envelopePayloadMessage, - networkEnvelope.getAuthorizationToken(), - this); - if (isMessageAuthorized) { - handler.handleNetworkMessage(envelopePayloadMessage, this); - listeners.forEach(listener -> NetworkExecutors.getNotifyExecutor().submit(() -> listener.onNetworkMessage(envelopePayloadMessage))); - } + long ts = System.currentTimeMillis(); + NetworkEnvelope networkEnvelope = NetworkEnvelope.fromProto(proto); + + long deserializeTime = System.currentTimeMillis() - ts; + networkEnvelope.verifyVersion(); + connectionMetrics.onReceived(networkEnvelope, deserializeTime); + + EnvelopePayloadMessage envelopePayloadMessage = networkEnvelope.getEnvelopePayloadMessage(); + log.debug("Received message: {} at: {}", + StringUtils.truncate(envelopePayloadMessage.toString(), 200), this); + requestResponseManager.onReceived(envelopePayloadMessage); + + if (isActive()) { + boolean isMessageAuthorized = handler.isMessageAuthorized(envelopePayloadMessage, + networkEnvelope.getAuthorizationToken(), + Connection.this); + if (isMessageAuthorized) { + handler.handleNetworkMessage(envelopePayloadMessage, Connection.this); + listeners.forEach(listener -> NetworkExecutors.getNotifyExecutor().submit(() -> listener.onNetworkMessage(envelopePayloadMessage))); } } } catch (Exception exception) { @@ -210,17 +185,14 @@ protected Connection(AuthorizationService authorizationService, // EOFException expected if connection got closed (Socket closed message) if (!(exception instanceof EOFException)) { - errorHandler.accept(this, exception); + errorHandler.accept(Connection.this, exception); } } + } finally { } - }); - } catch (RejectedExecutionException e) { - log.error("Read executor rejected task. We shut down the connection.", e); - errorHandler.accept(this, e); - inputHandlerFuture = CompletableFuture.failedFuture(e); - shutdown(CloseReason.EXCEPTION.exception(e)); - } + } + }; + context.pipeline().addLast(inboundMessageHandler); } /* --------------------------------------------------------------------- */ @@ -290,7 +262,7 @@ CompletableFuture sendAsync(EnvelopePayloadMessage envelopePayloadMe AuthorizationToken authorizationToken = createAuthorizationToken(envelopePayloadMessage); networkEnvelope = createNetworkEnvelope(envelopePayloadMessage, authorizationToken); long ts = System.currentTimeMillis(); - networkEnvelopeSocket.send(networkEnvelope); + context.writeAndFlush(networkEnvelope.completeProto()); spentTime = System.currentTimeMillis() - ts; } connectionMetrics.onSent(networkEnvelope, spentTime); @@ -351,15 +323,7 @@ void shutdown(CloseReason closeReason) { shutdownStarted = true; requestResponseManager.dispose(); connectionMetrics.clear(); - if (inputHandlerFuture != null) { - inputHandlerFuture.cancel(true); - } - try { - if (networkEnvelopeSocket != null) { - networkEnvelopeSocket.close(); - } - } catch (IOException ignore) { - } + context.pipeline().remove(inboundMessageHandler); handler.handleConnectionClosed(this, closeReason); listeners.forEach(listener -> NetworkExecutors.getNotifyExecutor().submit(() -> listener.onConnectionClosed(closeReason))); listeners.clear(); @@ -369,10 +333,7 @@ void shutdown(CloseReason closeReason) { } boolean isStopped() { - return shutdownStarted - || networkEnvelopeSocket == null - || networkEnvelopeSocket.isClosed() - || Thread.currentThread().isInterrupted(); + return shutdownStarted || Thread.currentThread().isInterrupted(); } @@ -380,7 +341,7 @@ boolean isStopped() { // Private /* --------------------------------------------------------------------- */ - private boolean isInputStreamActive() { + private boolean isActive() { return !listeningStopped && isRunning(); } diff --git a/network/network/src/main/java/bisq/network/p2p/node/InboundConnection.java b/network/network/src/main/java/bisq/network/p2p/node/InboundConnection.java index 2ccaf21802..5e36511b60 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/InboundConnection.java +++ b/network/network/src/main/java/bisq/network/p2p/node/InboundConnection.java @@ -20,16 +20,16 @@ import bisq.network.p2p.node.authorization.AuthorizationService; import bisq.network.p2p.node.network_load.ConnectionMetrics; import bisq.network.p2p.node.network_load.NetworkLoadSnapshot; +import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; -import java.net.Socket; import java.util.function.BiConsumer; @Slf4j public class InboundConnection extends Connection { InboundConnection(AuthorizationService authorizationService, + ChannelHandlerContext context, String connectionId, - Socket socket, Capability peersCapability, NetworkLoadSnapshot peersNetworkLoadSnapshot, ConnectionMetrics connectionMetrics, @@ -37,8 +37,8 @@ public class InboundConnection extends Connection { Handler handler, BiConsumer errorHandler) { super(authorizationService, + context, connectionId, - socket, peersCapability, peersNetworkLoadSnapshot, connectionMetrics, diff --git a/network/network/src/main/java/bisq/network/p2p/node/Node.java b/network/network/src/main/java/bisq/network/p2p/node/Node.java index ec17e7a55a..2c173966aa 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/Node.java +++ b/network/network/src/main/java/bisq/network/p2p/node/Node.java @@ -35,13 +35,17 @@ import bisq.network.p2p.node.authorization.AuthorizationService; import bisq.network.p2p.node.authorization.AuthorizationToken; import bisq.network.p2p.node.handshake.ConnectionHandshake; +import bisq.network.p2p.node.handshake.HandshakeHandler; +import bisq.network.p2p.node.handshake.InboundHandshakeHandler; +import bisq.network.p2p.node.handshake.OutboundHandshakeHandler; import bisq.network.p2p.node.network_load.NetworkLoadSnapshot; -import bisq.network.p2p.node.transport.ServerSocketResult; import bisq.network.p2p.node.transport.TransportService; import bisq.network.p2p.services.peer_group.BanList; import bisq.security.keys.KeyBundle; import bisq.security.keys.KeyBundleService; import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; @@ -50,7 +54,6 @@ import java.io.EOFException; import java.io.IOException; import java.net.ConnectException; -import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; @@ -148,7 +151,6 @@ public Config(TransportType transportType, private final BanList banList; private final TransportService transportService; private final AuthorizationService authorizationService; - private final int socketTimeout; // in ms private final Set supportedTransportTypes; private final Set features; @Getter @@ -195,7 +197,6 @@ public Node(NetworkId networkId, transportType = config.getTransportType(); supportedTransportTypes = config.getSupportedTransportTypes(); features = config.getFeatures(); - socketTimeout = config.getSocketTimeout(); this.banList = banList; this.transportService = transportService; this.authorizationService = authorizationService; @@ -263,94 +264,52 @@ public CompletableFuture initializeAsync() { } private void createServerAndListen() { - ServerSocketResult serverSocketResult = transportService.getServerSocket(networkId, keyBundle, nodeId); // blocking - myCapability = Optional.of(Capability.myCapability(serverSocketResult.getAddress(), new ArrayList<>(supportedTransportTypes), new ArrayList<>(features))); - server = Optional.of(new Server(serverSocketResult, - socketTimeout, - socket -> handleNewClientSocketAsync(socket, myCapability.get()), - exception -> { - handleException(exception); - // If server fails we shut down the node - shutdown(); + if (myCapability.isEmpty()) { + Address myAddress = transportService.evaluateMyAddress(networkId, keyBundle).join(); + myCapability = Optional.of(Capability.myCapability(myAddress, new ArrayList<>(supportedTransportTypes), new ArrayList<>(features))); + } + + transportService.startNettyServer(networkId, + keyBundle, + () -> new InboundHandshakeHandler(authorizationService, + banList, + myCapability.get(), + networkLoadSnapshot.getCurrentNetworkLoad(), + keyBundle, + new HandshakeHandler.Handler() { + @Override + public void onHandshakeCompleted(ChannelHandlerContext context, + HandshakeHandler.Result result) { + Address peersAddress = result.getPeersCapability().getAddress(); + NetworkLoadSnapshot peersNetworkLoadSnapshot = new NetworkLoadSnapshot(result.getPeersNetworkLoad()); + ConnectionThrottle connectionThrottle = new ConnectionThrottle(peersNetworkLoadSnapshot, networkLoadSnapshot, config); + InboundConnection connection = new InboundConnection(authorizationService, + context, + result.getConnectionId(), + result.getPeersCapability(), + peersNetworkLoadSnapshot, + result.getConnectionMetrics(), + connectionThrottle, + Node.this, + Node.this::handleConnectionException); + inboundConnectionsByAddress.put(peersAddress, connection); + listeners.forEach(listener -> NetworkExecutors.getNotifyExecutor().submit(() -> listener.onConnection(connection))); + } + + @Override + public void onClosed(Channel channel) { + log.debug("onClosed {}", channel); + } + })) + .whenComplete(((address, throwable) -> { + log.info("Server started for {}", address); })); } - private CompletableFuture handleNewClientSocketAsync(Socket socket, Capability myCapability) { - try { - return CompletableFuture.runAsync(() -> { - ConnectionHandshake connectionHandshake = null; - try { - connectionHandshake = new ConnectionHandshake(socket, - banList, - myCapability, - authorizationService, - keyBundle); - connectionHandshakes.put(connectionHandshake.getId(), connectionHandshake); - log.debug("Inbound handshake request at: {}", myCapability.getAddress()); - ConnectionHandshake.Result result = connectionHandshake.onSocket(networkLoadSnapshot.getCurrentNetworkLoad()); // Blocking call - - Address address = result.getPeersCapability().getAddress(); - log.debug("Inbound handshake completed: Initiated by {} to {}", address, myCapability.getAddress()); - - // As time passed we check again if connection is still not available - if (inboundConnectionsByAddress.containsKey(address)) { - log.warn("Have already an InboundConnection from {}. This can happen when a " + - "handshake was in progress while we received a new connection from that address. " + - "We close the existing connection (instead of closing the new socket) as the existing connection might be a stale connection.", address); - inboundConnectionsByAddress.get(address).shutdown(CloseReason.MAYBE_STALE_CONNECTION); - } - - InboundConnection connection = createInboundConnection(socket, result); - inboundConnectionsByAddress.put(connection.getPeerAddress(), connection); - listeners.forEach(listener -> NetworkExecutors.getNotifyExecutor().submit(() -> listener.onConnection(connection))); - } catch (Throwable throwable) { - try { - socket.close(); - } catch (IOException ignore) { - } - - handleException(throwable); - } finally { - if (connectionHandshake != null) { - connectionHandshake.shutdown(); - connectionHandshakes.remove(connectionHandshake.getId()); - } - } - }, getExecutor()); - } catch (RejectedExecutionException e) { - log.error("Node executor rejected task at handleNewClientSocketAsync", e); - return CompletableFuture.failedFuture(new ConnectionException("Node executor rejected task at handleNewClientSocketAsync")); - } - } - - private InboundConnection createInboundConnection(Socket socket, ConnectionHandshake.Result result) { - NetworkLoadSnapshot peersNetworkLoadSnapshot = new NetworkLoadSnapshot(result.getPeersNetworkLoad()); - ConnectionThrottle connectionThrottle = new ConnectionThrottle(peersNetworkLoadSnapshot, networkLoadSnapshot, config); - return new InboundConnection(authorizationService, - result.getConnectionId(), - socket, - result.getPeersCapability(), - peersNetworkLoadSnapshot, - result.getConnectionMetrics(), - connectionThrottle, - this, - this::handleException); - } - - /* --------------------------------------------------------------------- */ // Send /* --------------------------------------------------------------------- */ - public CompletableFuture sendAsync(EnvelopePayloadMessage envelopePayloadMessage, Address address) { - try { - return getOrCreateConnectionAsync(address) - .thenCompose(connection -> sendAsync(envelopePayloadMessage, connection)); - } catch (Exception e) { - return CompletableFuture.failedFuture(e); - } - } - public CompletableFuture sendAsync(EnvelopePayloadMessage envelopePayloadMessage, Connection connection) { try { @@ -358,7 +317,7 @@ public CompletableFuture sendAsync(EnvelopePayloadMessage envelopePa .handle((con, exception) -> { if (exception != null) { if (connection.isRunning() && !(exception.getCause() instanceof SocketException)) { - handleException(connection, exception); + handleConnectionException(connection, exception); log.debug("Send message failed", exception); closeConnection(connection, CloseReason.EXCEPTION.exception(exception)); } @@ -371,7 +330,6 @@ public CompletableFuture sendAsync(EnvelopePayloadMessage envelopePa } } - /* --------------------------------------------------------------------- */ // Connection /* --------------------------------------------------------------------- */ @@ -386,10 +344,6 @@ public CompletableFuture getOrCreateConnectionAsync(Address address) } } - public boolean hasConnection(Address address) { - return outboundConnectionsByAddress.containsKey(address) || inboundConnectionsByAddress.containsKey(address); - } - public Optional findConnection(Connection connection) { if (connection instanceof OutboundConnection) { return Optional.ofNullable(outboundConnectionsByAddress.get(connection.getPeerAddress())); @@ -408,7 +362,6 @@ public Optional findConnection(Address address) { } } - /* --------------------------------------------------------------------- */ // OutboundConnection /* --------------------------------------------------------------------- */ @@ -450,140 +403,65 @@ private Connection createOutboundConnection(Address address, Capability myCapabi throw new ConnectionException(ADDRESS_BANNED, "PeerAddress is banned. address=" + address); } - Socket socket = createSocket(address); // Blocking call - - // As time passed we check again if connection is still not available - Optional outboundConnection = findOutboundConnectionAndCloseSocketIfPresent(address, socket); - if (outboundConnection.isPresent()) { - return outboundConnection.get(); - } - try { - ConnectionHandshake.Result result = startConnectionHandshake(address, socket, myCapability); // Blocking call log.debug("Create new outbound connection to {}", address); - // As time passed we check again if connection is still not available - outboundConnection = findOutboundConnectionAndCloseSocketIfPresent(address, socket); - if (outboundConnection.isPresent()) { - return outboundConnection.get(); - } - - if (!isDefaultNode) { - log.info("We create an outbound connection to {} from a user node. node={}", address, getNodeInfo()); - } + return startConnectionHandshake(address, myCapability) + .whenComplete((connection, throwable) -> { + log.debug("Outbound connection to {} created", address); + }).join(); - return createNewOutboundConnection(address, socket, result); } catch (Throwable throwable) { - if (socket != null) { - try { - socket.close(); - } catch (IOException ignore) { - } - } handleException(throwable); throw new ConnectionException(throwable); } } - private Socket createSocket(Address address) { - try { - return transportService.getSocket(address, nodeId); // Blocking call - } catch (IOException e) { - handleException(e); - throw new ConnectionException(e); - } - } - - private OutboundConnection createNewOutboundConnection(Address address, - Socket socket, - ConnectionHandshake.Result result) { - OutboundConnection connection = null; - try { - NetworkLoadSnapshot peersNetworkLoadSnapshot = new NetworkLoadSnapshot(result.getPeersNetworkLoad()); - ConnectionThrottle connectionThrottle = new ConnectionThrottle(peersNetworkLoadSnapshot, networkLoadSnapshot, config); - connection = new OutboundConnection(authorizationService, - result.getConnectionId(), - socket, - address, - result.getPeersCapability(), - peersNetworkLoadSnapshot, - result.getConnectionMetrics(), - connectionThrottle, - this, - this::handleException); - outboundConnectionsByAddress.put(address, connection); - - OutboundConnection finalConnection = connection; - listeners.forEach(listener -> NetworkExecutors.getNotifyExecutor().submit(() -> listener.onConnection(finalConnection))); - return connection; - } catch (Exception exception) { - log.error("Creating outbound connection failed", exception); - try { - socket.close(); - } catch (IOException ignore) { + private CompletableFuture startConnectionHandshake(Address peersAddress, + Capability myCapability) { + CompletableFuture future = new CompletableFuture<>(); + HandshakeHandler.Handler handler = new HandshakeHandler.Handler() { + @Override + public void onHandshakeCompleted(ChannelHandlerContext context, HandshakeHandler.Result result) { + log.debug("onHandshakeCompleted {}", result); + Address peersAddress = result.getPeersCapability().getAddress(); + NetworkLoadSnapshot peersNetworkLoadSnapshot = new NetworkLoadSnapshot(result.getPeersNetworkLoad()); + ConnectionThrottle connectionThrottle = new ConnectionThrottle(peersNetworkLoadSnapshot, networkLoadSnapshot, config); + OutboundConnection connection = new OutboundConnection(authorizationService, + context, + result.getConnectionId(), + peersAddress, + result.getPeersCapability(), + peersNetworkLoadSnapshot, + result.getConnectionMetrics(), + connectionThrottle, + Node.this, + Node.this::handleConnectionException); + outboundConnectionsByAddress.put(peersAddress, connection); + listeners.forEach(listener -> NetworkExecutors.getNotifyExecutor().submit(() -> listener.onConnection(connection))); + future.complete(connection); } - if (connection != null) { - connection.shutdown(CloseReason.EXCEPTION); - outboundConnectionsByAddress.remove(address); - } - handleException(exception); - throw new ConnectionException(exception); - } - } - private ConnectionHandshake.Result startConnectionHandshake(Address address, - Socket socket, - Capability myCapability) { - ConnectionHandshake connectionHandshake = null; - try { - connectionHandshake = new ConnectionHandshake(socket, - banList, - myCapability, - authorizationService, - keyBundle); - - connectionHandshakes.put(connectionHandshake.getId(), connectionHandshake); - log.debug("Outbound handshake started: Initiated by {} to {}", myCapability.getAddress(), address); - ConnectionHandshake.Result result = connectionHandshake.start(networkLoadSnapshot.getCurrentNetworkLoad(), address); - log.debug("Outbound handshake completed: Initiated by {} to {}", myCapability.getAddress(), address); - - if (!address.isClearNetAddress()) { - // For clearnet this check doesn't make sense because: - // - the peer binds to 127.0.0.1, therefore reports 127.0.0.1 in the handshake - // - we use the peer's public IP to connect to him - checkArgument(address.equals(result.getPeersCapability().getAddress()), - "Peers reported address must match address we used to connect"); + @Override + public void onClosed(Channel channel) { + log.debug("onClosed {}", channel); } - return result; - } catch (Exception exception) { - log.error("Starting outbound handshake to {} failed. {}", address, exception.getMessage()); - try { - socket.close(); - } catch (IOException ignore) { - } - handleException(exception); - throw new ConnectionException(ConnectionException.Reason.HANDSHAKE_FAILED, exception); - } finally { - if (connectionHandshake != null) { - connectionHandshake.shutdown(); - connectionHandshakes.remove(connectionHandshake.getId()); - } - } - } + }; + transportService.connect(peersAddress, + () -> new OutboundHandshakeHandler(authorizationService, + banList, + myCapability, + networkLoadSnapshot.getCurrentNetworkLoad(), + keyBundle, + peersAddress, + handler)) + .whenComplete((channel, throwable) -> { + if (throwable == null && channel != null) { + log.debug("Connection to {} established", peersAddress); + } + }); - private Optional findOutboundConnectionAndCloseSocketIfPresent(Address address, Socket socket) { - if (outboundConnectionsByAddress.containsKey(address)) { - log.warn("Has have already an OutboundConnection to {}. This can happen while we " + - "we waited for the socket creation at the createOutboundConnection method. " + - "We will close the socket and use the existing connection instead.", address); - try { - socket.close(); - } catch (IOException ignore) { - } - // ofNullable in case the connection have been removed in the meantime. - return Optional.ofNullable(outboundConnectionsByAddress.get(address)); - } - return Optional.empty(); + return future; } public Stream getAllConnections() { @@ -610,8 +488,6 @@ CompletableFuture isPeerOnlineAsync(Address address, String nodeId) { return transportService.isPeerOnlineAsync(address, nodeId); } - - /* --------------------------------------------------------------------- */ // Connection.Handler /* --------------------------------------------------------------------- */ @@ -757,7 +633,8 @@ public void removeListener(Listener listener) { } public Optional
findMyAddress() { - return server.map(Server::getAddress); + return myCapability.map(Capability::getAddress); +// return server.map(Server::getAddress); } public boolean notMyself(Address address) { @@ -783,7 +660,7 @@ public String getNodeInfo() { // Private /* --------------------------------------------------------------------- */ - private void handleException(Connection connection, Throwable exception) { + private void handleConnectionException(Connection connection, Throwable exception) { log.debug("Got called handleException. connection={}, exception={}", connection, exception.getMessage()); if (isShutdown()) { return; diff --git a/network/network/src/main/java/bisq/network/p2p/node/OutboundConnection.java b/network/network/src/main/java/bisq/network/p2p/node/OutboundConnection.java index f48f5d519a..222029c7b4 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/OutboundConnection.java +++ b/network/network/src/main/java/bisq/network/p2p/node/OutboundConnection.java @@ -21,10 +21,10 @@ import bisq.network.p2p.node.authorization.AuthorizationService; import bisq.network.p2p.node.network_load.ConnectionMetrics; import bisq.network.p2p.node.network_load.NetworkLoadSnapshot; +import io.netty.channel.ChannelHandlerContext; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import java.net.Socket; import java.util.function.BiConsumer; @Slf4j @@ -34,8 +34,8 @@ public class OutboundConnection extends Connection { private final Address address; OutboundConnection(AuthorizationService authorizationService, + ChannelHandlerContext context, String connectionId, - Socket socket, Address address, Capability peersCapability, NetworkLoadSnapshot peersNetworkLoadSnapshot, @@ -44,8 +44,8 @@ public class OutboundConnection extends Connection { Handler handler, BiConsumer errorHandler) { super(authorizationService, + context, connectionId, - socket, peersCapability, peersNetworkLoadSnapshot, connectionMetrics, diff --git a/network/network/src/main/java/bisq/network/p2p/node/handshake/HandshakeHandler.java b/network/network/src/main/java/bisq/network/p2p/node/handshake/HandshakeHandler.java new file mode 100644 index 0000000000..198be99513 --- /dev/null +++ b/network/network/src/main/java/bisq/network/p2p/node/handshake/HandshakeHandler.java @@ -0,0 +1,87 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.node.handshake; + +import bisq.network.p2p.node.Capability; +import bisq.network.p2p.node.authorization.AuthorizationService; +import bisq.network.p2p.node.network_load.ConnectionMetrics; +import bisq.network.p2p.node.network_load.NetworkLoad; +import bisq.network.p2p.services.peer_group.BanList; +import bisq.security.keys.KeyBundle; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class HandshakeHandler extends SimpleChannelInboundHandler { + @Getter + public static final class Result { + private final Capability peersCapability; + private final NetworkLoad peersNetworkLoad; + private final ConnectionMetrics connectionMetrics; + private final String connectionId; + + Result(Capability peersCapability, + NetworkLoad peersNetworkLoad, + ConnectionMetrics connectionMetrics, + String connectionId) { + this.peersCapability = peersCapability; + this.peersNetworkLoad = peersNetworkLoad; + this.connectionMetrics = connectionMetrics; + this.connectionId = connectionId; + } + } + + public interface Handler { + void onHandshakeCompleted(ChannelHandlerContext context, Result result); + + void onClosed(Channel channel); + } + + protected final Capability myCapability; + protected final NetworkLoad myNetworkLoad; + protected final BanList banList; + protected final AuthorizationService authorizationService; + protected final KeyBundle myKeyBundle; + protected final Handler handler; + protected final ConnectionMetrics connectionMetrics = new ConnectionMetrics(); + + protected long ts; + + public HandshakeHandler(AuthorizationService authorizationService, + BanList banList, + Capability myCapability, + NetworkLoad myNetworkLoad, + KeyBundle myKeyBundle, + Handler handler) { + this.myCapability = myCapability; + this.myNetworkLoad = myNetworkLoad; + this.banList = banList; + this.authorizationService = authorizationService; + this.myKeyBundle = myKeyBundle; + this.handler = handler; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + handler.onClosed(ctx.channel()); + } +} + diff --git a/network/network/src/main/java/bisq/network/p2p/node/handshake/InboundHandshakeHandler.java b/network/network/src/main/java/bisq/network/p2p/node/handshake/InboundHandshakeHandler.java new file mode 100644 index 0000000000..5044429cfb --- /dev/null +++ b/network/network/src/main/java/bisq/network/p2p/node/handshake/InboundHandshakeHandler.java @@ -0,0 +1,137 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.node.handshake; + +import bisq.common.encoding.Hex; +import bisq.common.network.Address; +import bisq.common.util.StringUtils; +import bisq.network.p2p.message.NetworkEnvelope; +import bisq.network.p2p.node.Capability; +import bisq.network.p2p.node.ConnectionException; +import bisq.network.p2p.node.authorization.AuthorizationService; +import bisq.network.p2p.node.authorization.AuthorizationToken; +import bisq.network.p2p.node.network_load.NetworkLoad; +import bisq.network.p2p.services.peer_group.BanList; +import bisq.security.keys.KeyBundle; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; + +import static bisq.network.p2p.node.ConnectionException.Reason.ADDRESS_BANNED; +import static bisq.network.p2p.node.ConnectionException.Reason.AUTHORIZATION_FAILED; +import static bisq.network.p2p.node.ConnectionException.Reason.ONION_ADDRESS_VERIFICATION_FAILED; +import static bisq.network.p2p.node.ConnectionException.Reason.PROTOBUF_IS_NULL; + + +@Slf4j +public class InboundHandshakeHandler extends HandshakeHandler { + public InboundHandshakeHandler(AuthorizationService authorizationService, + BanList banList, + Capability myCapability, + NetworkLoad myNetworkLoad, + KeyBundle myKeyBundle, + Handler handler) { + super(authorizationService, + banList, + myCapability, + myNetworkLoad, + myKeyBundle, + handler); + } + + @Override + public void channelActive(ChannelHandlerContext context) throws Exception { + super.channelActive(context); + } + + @Override + protected void channelRead0(ChannelHandlerContext context, bisq.network.protobuf.NetworkEnvelope proto) { + try { + if (proto == null) { + throw new ConnectionException(PROTOBUF_IS_NULL, "NetworkEnvelope protobuf is null"); + } + + long ts = System.currentTimeMillis(); + NetworkEnvelope networkEnvelope = NetworkEnvelope.fromProto(proto); + long deserializeTime = System.currentTimeMillis() - ts; + + networkEnvelope.verifyVersion(); + + if (!(networkEnvelope.getEnvelopePayloadMessage() instanceof ConnectionHandshake.Request request)) { + throw new ConnectionException("RequestEnvelope.message() not type of Request. requestEnvelope=" + + networkEnvelope); + } + Capability requestersCapability = request.getCapability(); + Address peerAddress = requestersCapability.getAddress(); + + //TODO banList not implemented yet to get set banned addresses. + if (banList.isBanned(peerAddress)) { + throw new ConnectionException(ADDRESS_BANNED, "PeerAddress is banned. address=" + peerAddress); + } + + Address myAddress = myCapability.getAddress(); + // As the request did not know our load at the initial request, they used the NetworkLoad.INITIAL_LOAD for the AuthorizationToken. + String connectionId = StringUtils.createUid(); + boolean isAuthorized = authorizationService.isAuthorized(request, + networkEnvelope.getAuthorizationToken(), + NetworkLoad.INITIAL_NETWORK_LOAD, + connectionId, + myAddress.getFullAddress()); + if (!isAuthorized) { + throw new ConnectionException(AUTHORIZATION_FAILED, + "Authorization of inbound connection request failed. AuthorizationToken=" + + networkEnvelope.getAuthorizationToken()); + } + + if (myAddress.isTorAddress() && peerAddress.isTorAddress()) { + if (!OnionAddressValidation.verify(myAddress, peerAddress, request.getSignatureDate(), request.getAddressOwnershipProof())) { + throw new ConnectionException(ONION_ADDRESS_VERIFICATION_FAILED, "Peer couldn't proof its onion address: " + peerAddress.getFullAddress() + + ", Proof: " + Hex.encode(request.getAddressOwnershipProof().orElseThrow())); + } + } + NetworkLoad peersNetworkLoad = request.getNetworkLoad(); + log.debug("Clients capability {}, load={}", requestersCapability, peersNetworkLoad); + connectionMetrics.onReceived(networkEnvelope, deserializeTime); + + // We reply with the same version as the peer has to avoid pow hash check failures + Capability responseCapability = Capability.withVersion(myCapability, requestersCapability.getVersion()); + ConnectionHandshake.Response response = new ConnectionHandshake.Response(responseCapability, myNetworkLoad); + AuthorizationToken token = authorizationService.createToken(response, + peersNetworkLoad, + peerAddress.getFullAddress(), + 0, + requestersCapability.getFeatures()); + NetworkEnvelope responseNetworkEnvelope = new NetworkEnvelope(token, response); + long startSendTs = System.currentTimeMillis(); + + context.writeAndFlush(responseNetworkEnvelope.completeProto()); + + connectionMetrics.onSent(responseNetworkEnvelope, System.currentTimeMillis() - startSendTs); + connectionMetrics.addRtt(System.currentTimeMillis() - ts); + handler.onHandshakeCompleted(context, new Result(requestersCapability, peersNetworkLoad, connectionMetrics, connectionId)); + + context.pipeline().remove(this); + } catch (Exception exception) { + if (exception instanceof ConnectionException connectionException) { + throw connectionException; + } else { + throw new ConnectionException(exception); + } + } + } +} + diff --git a/network/network/src/main/java/bisq/network/p2p/node/handshake/OutboundHandshakeHandler.java b/network/network/src/main/java/bisq/network/p2p/node/handshake/OutboundHandshakeHandler.java new file mode 100644 index 0000000000..e27c0f8340 --- /dev/null +++ b/network/network/src/main/java/bisq/network/p2p/node/handshake/OutboundHandshakeHandler.java @@ -0,0 +1,193 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.network.p2p.node.handshake; + +import bisq.common.network.Address; +import bisq.common.util.StringUtils; +import bisq.network.p2p.message.NetworkEnvelope; +import bisq.network.p2p.node.Capability; +import bisq.network.p2p.node.ConnectionException; +import bisq.network.p2p.node.Feature; +import bisq.network.p2p.node.authorization.AuthorizationService; +import bisq.network.p2p.node.authorization.AuthorizationToken; +import bisq.network.p2p.node.network_load.ConnectionMetrics; +import bisq.network.p2p.node.network_load.NetworkLoad; +import bisq.network.p2p.services.peer_group.BanList; +import bisq.security.keys.KeyBundle; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.proxy.ProxyConnectionEvent; +import io.netty.handler.proxy.Socks5ProxyHandler; +import lombok.extern.slf4j.Slf4j; + +import java.util.Optional; +import java.util.Set; + +import static bisq.network.p2p.node.ConnectionException.Reason.ADDRESS_BANNED; +import static bisq.network.p2p.node.ConnectionException.Reason.AUTHORIZATION_FAILED; +import static bisq.network.p2p.node.ConnectionException.Reason.PROTOBUF_IS_NULL; + + +@Slf4j +public class OutboundHandshakeHandler extends HandshakeHandler { + private Address peerAddress; + + public OutboundHandshakeHandler(AuthorizationService authorizationService, + BanList banList, + Capability myCapability, + NetworkLoad myNetworkLoad, + KeyBundle myKeyBundle, + Address peerAddress, + Handler handler) { + super(authorizationService, + banList, + myCapability, + myNetworkLoad, + myKeyBundle, + handler); + this.peerAddress = peerAddress; + } + + @Override + public void userEventTriggered(ChannelHandlerContext context, Object event) throws Exception { + // The Socks5ProxyHandler does the handshake protocol with the proxy asynchronously and fires the + // ProxyConnectionEvent once completed. + if (event instanceof ProxyConnectionEvent) { + log.info("SOCKS5 ready, sending handshake"); + start(context); + + // Socks5ProxyHandler not needed anymore + context.pipeline().remove(Socks5ProxyHandler.class); + } + super.userEventTriggered(context, event); + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) { + cause.printStackTrace(); + context.close(); + } + + @Override + public void channelActive(ChannelHandlerContext context) throws Exception { + // If there's a Socks5ProxyHandler in the pipeline, wait for ProxyConnectionEvent before starting the handshake. + // Otherwise start the handshake immediately (direct connection). + if (context.pipeline().get(Socks5ProxyHandler.class) == null) { + start(context); + } + super.channelActive(context); + } + + private void start(ChannelHandlerContext context) { + try { + ConnectionMetrics connectionMetrics = new ConnectionMetrics(); + + Address myAddress = myCapability.getAddress(); + long signatureDate = System.currentTimeMillis(); + Optional signature = OnionAddressValidation.sign(myAddress, peerAddress, signatureDate, myKeyBundle.getTorKeyPair().getPrivateKey()); + + ConnectionHandshake.Request request = new ConnectionHandshake.Request(myCapability, signature, myNetworkLoad, signatureDate); + + // As we do not know the peers networkLoad yet, we use the NetworkLoad.INITIAL_LOAD. + NetworkLoad initialNetworkLoad = NetworkLoad.INITIAL_NETWORK_LOAD; + + // As we do not know the peers features yet, we use a set of minimal default features. + Set peersFeatures = Feature.DEFAULT_FEATURES; + + AuthorizationToken token = authorizationService.createToken(request, + initialNetworkLoad, + peerAddress.getFullAddress(), + 0, + peersFeatures); + NetworkEnvelope requestNetworkEnvelope = new NetworkEnvelope(token, request); + ts = System.currentTimeMillis(); + + context.writeAndFlush(requestNetworkEnvelope.completeProto()); + connectionMetrics.onSent(requestNetworkEnvelope, System.currentTimeMillis() - ts); + } catch (Exception e) { + // networkEnvelopeSocket.close(); + if (e instanceof ConnectionException) { + throw (ConnectionException) e; + } else { + // May be SocketTimeoutException, IOException or unexpected Exception + throw new ConnectionException(e); + } + } + } + + @Override + protected void channelRead0(ChannelHandlerContext context, bisq.network.protobuf.NetworkEnvelope proto) { + if (peerAddress == null) { + log.error("peerAddress is expected to be not null."); + return; + } + + try { + if (proto == null) { + throw new ConnectionException(PROTOBUF_IS_NULL, + "Response NetworkEnvelope protobuf is null. peerAddress=" + peerAddress); + } + + long startDeserializeTs = System.currentTimeMillis(); + NetworkEnvelope networkEnvelope = NetworkEnvelope.fromProto(proto); + long deserializeTime = System.currentTimeMillis() - startDeserializeTs; + + networkEnvelope.verifyVersion(); + if (!(networkEnvelope.getEnvelopePayloadMessage() instanceof ConnectionHandshake.Response response)) { + throw new ConnectionException("ResponseEnvelope.message() not type of Response. responseEnvelope=" + + networkEnvelope); + } + Capability peersCapability = response.getCapability(); + Address peersAddress = peersCapability.getAddress(); + if (banList.isBanned(peersAddress)) { + throw new ConnectionException(ADDRESS_BANNED, "PeerAddress is banned. address=" + peersAddress); + } + + String connectionId = StringUtils.createUid(); + Address myAddress = myCapability.getAddress(); + boolean isAuthorized = authorizationService.isAuthorized(response, + networkEnvelope.getAuthorizationToken(), + myNetworkLoad, + connectionId, + myAddress.getFullAddress()); + + if (!isAuthorized) { + throw new ConnectionException(AUTHORIZATION_FAILED, + "ConnectionHandshake.Response authorization failed at outbound connection attempt. AuthorizationToken=" + + networkEnvelope.getAuthorizationToken()); + } + + connectionMetrics.onReceived(networkEnvelope, deserializeTime); + + long rrt = System.currentTimeMillis() - ts; + connectionMetrics.addRtt(rrt); + + NetworkLoad peersNetworkLoad = response.getNetworkLoad(); + log.debug("Peers capability {}, load={}", peersCapability, peersNetworkLoad); + handler.onHandshakeCompleted(context, new Result(peersCapability, peersNetworkLoad, connectionMetrics, connectionId)); + + context.pipeline().remove(this); + } catch (Exception exception) { + if (exception instanceof ConnectionException connectionException) { + throw connectionException; + } else { + throw new ConnectionException(exception); + } + } + } +} + diff --git a/network/network/src/main/java/bisq/network/p2p/node/transport/ClearNetTransportService.java b/network/network/src/main/java/bisq/network/p2p/node/transport/ClearNetTransportService.java index 707b84d8b3..c2878d2b08 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/transport/ClearNetTransportService.java +++ b/network/network/src/main/java/bisq/network/p2p/node/transport/ClearNetTransportService.java @@ -12,7 +12,28 @@ import bisq.common.observable.Observable; import bisq.common.observable.map.ObservableHashMap; import bisq.network.identity.NetworkId; +import bisq.network.p2p.node.ConnectionException; +import bisq.network.p2p.node.handshake.InboundHandshakeHandler; +import bisq.network.p2p.node.handshake.OutboundHandshakeHandler; +import bisq.network.protobuf.NetworkEnvelope; import bisq.security.keys.KeyBundle; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import io.netty.handler.logging.LoggingHandler; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; @@ -27,6 +48,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import static bisq.common.facades.FacadeProvider.getClearNetAddressTypeFacade; import static bisq.common.threading.ExecutorFactory.commonForkJoinPool; @@ -87,6 +109,9 @@ public Config(Path dataDirPath, @Getter public final ObservableHashMap initializedServerSocketTimestampByNetworkId = new ObservableHashMap<>(); + private final EventLoopGroup bossGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + private final EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + public ClearNetTransportService(TransportConfig config) { socketTimeout = config.getSocketTimeout(); connectTimeoutMs = ((Config) config).getConnectTimeoutMs(); @@ -117,6 +142,7 @@ public void initialize() { @Override public CompletableFuture shutdown() { + log.info("shutdown"); if (!initializeCalled) { return CompletableFuture.completedFuture(true); } @@ -125,8 +151,100 @@ public CompletableFuture shutdown() { initializeServerSocketTimestampByNetworkId.clear(); initializedServerSocketTimestampByNetworkId.clear(); timestampByTransportState.clear(); - setTransportState(TransportState.TERMINATED); - return CompletableFuture.completedFuture(true); + + CompletableFuture bossGroupShutdown = new CompletableFuture<>(); + bossGroup.shutdownGracefully().addListener(future -> + bossGroupShutdown.complete(null) + ); + + CompletableFuture workerGroupShutdown = new CompletableFuture<>(); + workerGroup.shutdownGracefully().addListener(future -> + workerGroupShutdown.complete(null) + ); + + // Return a future that completes when BOTH are done + return workerGroupShutdown + .thenCombine(workerGroupShutdown, (b1, b2) -> true) + .whenComplete((result, throwable) -> setTransportState(TransportState.TERMINATED)); + } + + @Override + public CompletableFuture
startNettyServer(NetworkId networkId, + KeyBundle keyBundle, + Supplier handshakeHandlerSupplier) { + int port = networkId.getAddressByTransportTypeMap().get(TransportType.CLEAR).getPort(); + Address address = getClearNetAddressTypeFacade().toMyLocalAddress(port); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler()) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel channel) { + InboundHandshakeHandler inboundHandshakeHandler = handshakeHandlerSupplier.get(); + channel.pipeline() + .addLast(new LoggingHandler()) + .addLast(new ProtobufVarint32FrameDecoder()) + .addLast(new ProtobufDecoder(NetworkEnvelope.getDefaultInstance())) + .addLast(new ProtobufVarint32LengthFieldPrepender()) + .addLast(new ProtobufEncoder()) + .addLast(inboundHandshakeHandler); + } + }); + CompletableFuture
serverStarted = new CompletableFuture<>(); + bootstrap.bind(port).addListener(future -> { + if (future instanceof ChannelFuture channelFuture && future.isSuccess()) { + Channel channel = channelFuture.channel(); + serverStarted.complete(address); + } else { + serverStarted.completeExceptionally(future.cause()); + } + }); + return serverStarted; + } + + @Override + public CompletableFuture connect(Address address, + Supplier handshakeHandlerSupplier) { + CompletableFuture future = new CompletableFuture<>(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(workerGroup) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) { + OutboundHandshakeHandler outboundHandshakeHandler = handshakeHandlerSupplier.get(); + + socketChannel.pipeline() + .addLast(new LoggingHandler()) + .addLast(new ProtobufVarint32FrameDecoder()) + .addLast(new ProtobufDecoder(NetworkEnvelope.getDefaultInstance())) + .addLast(new ProtobufVarint32LengthFieldPrepender()) + .addLast(new ProtobufEncoder()) + .addLast(outboundHandshakeHandler); + } + }); + ChannelFuture connect = bootstrap.connect(address.getHost(), address.getPort()); + connect.addListener((ChannelFutureListener) channelFuture -> { + if (channelFuture.isSuccess()) { + future.complete(channelFuture.channel()); + } else { + future.completeExceptionally(channelFuture.cause()); + } + }); + return future; + } + + @Override + public CompletableFuture
evaluateMyAddress(NetworkId networkId, KeyBundle keyBundle) { + return CompletableFuture.supplyAsync(() -> { + try { + int port = networkId.getAddressByTransportTypeMap().get(TransportType.CLEAR).getPort(); + return getClearNetAddressTypeFacade().toMyLocalAddress(port); + } catch (Exception exception) { + throw new ConnectionException(exception); + } + }); } @Override @@ -139,11 +257,11 @@ public ServerSocketResult getServerSocket(NetworkId networkId, KeyBundle keyBund try { ServerSocket serverSocket = new ServerSocket(port); - ClearnetAddress address = getClearNetAddressTypeFacade().toMyLocalAddress(port); + Address address = evaluateMyAddress(networkId, keyBundle).get(); log.debug("ServerSocket created at port {}", port); initializedServerSocketTimestampByNetworkId.put(networkId, System.currentTimeMillis()); return new ServerSocketResult(serverSocket, address); - } catch (IOException e) { + } catch (Exception e) { log.error("Error at getServerSocket. Port {}", port, e); throw new CompletionException(e); } diff --git a/network/network/src/main/java/bisq/network/p2p/node/transport/I2PTransportService.java b/network/network/src/main/java/bisq/network/p2p/node/transport/I2PTransportService.java index f2db52c07f..b31e173351 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/transport/I2PTransportService.java +++ b/network/network/src/main/java/bisq/network/p2p/node/transport/I2PTransportService.java @@ -218,6 +218,23 @@ public CompletableFuture shutdown() { .whenComplete((result, throwable) -> setTransportState(TransportState.TERMINATED)); } + @Override + public CompletableFuture
evaluateMyAddress(NetworkId networkId, KeyBundle keyBundle) { + return CompletableFuture.supplyAsync(() -> { + try { + int port = networkId.getAddressByTransportTypeMap().get(TransportType.I2P).getPort(); + + I2PKeyPair i2PKeyPair = keyBundle.getI2PKeyPair(); + String destinationBase64 = i2PKeyPair.getDestinationBase64(); + String destinationBase32 = i2PKeyPair.getDestinationBase32(); + // Port is irrelevant for I2P + return new I2PAddress(destinationBase64, destinationBase32, port); + } catch (Exception exception) { + throw new ConnectionException(exception); + } + }); + } + @Override public ServerSocketResult getServerSocket(NetworkId networkId, KeyBundle keyBundle, String nodeId) { try { @@ -228,11 +245,9 @@ public ServerSocketResult getServerSocket(NetworkId networkId, KeyBundle keyBund initializeServerSocketTimestampByNetworkId.put(networkId, System.currentTimeMillis()); I2PKeyPair i2PKeyPair = keyBundle.getI2PKeyPair(); ServerSocket serverSocket = i2pClient.getServerSocket(i2PKeyPair, nodeId); - String destinationBase64 = i2PKeyPair.getDestinationBase64(); - String destinationBase32 = i2PKeyPair.getDestinationBase32(); - I2PAddress i2PAddress = new I2PAddress(destinationBase64, destinationBase32, port); + Address i2PAddress = evaluateMyAddress(networkId, keyBundle).get(); initializedServerSocketTimestampByNetworkId.put(networkId, System.currentTimeMillis()); - log.info("ServerSocket created. destinationBase32={}, destinationBase64={}, port={}", i2PKeyPair.getDestinationBase32(), destinationBase64, port); + log.info("ServerSocket created. destinationBase32={}, destinationBase64={}, port={}", i2PKeyPair.getDestinationBase32(), i2PKeyPair.getDestinationBase64(), port); return new ServerSocketResult(serverSocket, i2PAddress); } catch (Exception exception) { log.error("getServerSocket failed", exception); diff --git a/network/network/src/main/java/bisq/network/p2p/node/transport/TorTransportService.java b/network/network/src/main/java/bisq/network/p2p/node/transport/TorTransportService.java index fd5b4c04b2..9ce81b63cf 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/transport/TorTransportService.java +++ b/network/network/src/main/java/bisq/network/p2p/node/transport/TorTransportService.java @@ -6,13 +6,37 @@ import bisq.common.network.TransportType; import bisq.common.observable.Observable; import bisq.common.observable.map.ObservableHashMap; +import bisq.common.threading.ExecutorFactory; import bisq.network.identity.NetworkId; import bisq.network.p2p.node.ConnectionException; +import bisq.network.p2p.node.handshake.InboundHandshakeHandler; +import bisq.network.p2p.node.handshake.OutboundHandshakeHandler; +import bisq.network.protobuf.NetworkEnvelope; import bisq.network.tor.TorService; import bisq.network.tor.TorTransportConfig; import bisq.security.keys.KeyBundle; import bisq.security.keys.TorKeyPair; import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.proxy.Socks5ProxyHandler; +import io.netty.resolver.NoopAddressResolverGroup; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -23,6 +47,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkArgument; @@ -41,6 +66,9 @@ public class TorTransportService implements TransportService { @Getter public final ObservableHashMap initializedServerSocketTimestampByNetworkId = new ObservableHashMap<>(); + private final EventLoopGroup bossGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + private final EventLoopGroup workerGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + public TorTransportService(TransportConfig config) { socketTimeout = config.getSocketTimeout(); if (torService == null) { @@ -63,10 +91,134 @@ public void initialize() { public CompletableFuture shutdown() { log.info("shutdown"); setTransportState(TransportState.STOPPING); - return torService.shutdown() + + CompletableFuture bossGroupShutdown = new CompletableFuture<>(); + bossGroup.shutdownGracefully().addListener(future -> + bossGroupShutdown.complete(null) + ); + + CompletableFuture workerGroupShutdown = new CompletableFuture<>(); + workerGroup.shutdownGracefully().addListener(future -> + workerGroupShutdown.complete(null) + ); + + CompletableFuture groupsShutdown = CompletableFuture.allOf(bossGroupShutdown, workerGroupShutdown); + + // When both groups are done, shutdown torService + return groupsShutdown + .thenCompose(v -> torService.shutdown()) .whenComplete((result, throwable) -> setTransportState(TransportState.TERMINATED)); } + @Override + public CompletableFuture
startNettyServer(NetworkId networkId, + KeyBundle keyBundle, + Supplier handshakeHandlerSupplier) { + + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .handler(new LoggingHandler()) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel channel) { + InboundHandshakeHandler inboundHandshakeHandler = handshakeHandlerSupplier.get(); + // For inbound tor connections we don't need the Socks5ProxyHandler, only for outbound. + channel.pipeline() + .addLast(new LoggingHandler()) + .addLast(new ProtobufVarint32FrameDecoder()) + .addLast(new ProtobufDecoder(NetworkEnvelope.getDefaultInstance())) + .addLast(new ProtobufVarint32LengthFieldPrepender()) + .addLast(new ProtobufEncoder()) + .addLast(inboundHandshakeHandler); + } + }); + int port = networkId.getAddressByTransportTypeMap().get(TransportType.TOR).getPort(); + TorKeyPair torKeyPair = keyBundle.getTorKeyPair(); + Address address = evaluateMyAddress(networkId, keyBundle).get(); + CompletableFuture
serverFuture = new CompletableFuture<>(); + torService.publishOnionServiceForNetty(port, torKeyPair) + .whenComplete((localPort, throwable) -> { + if (throwable == null) { + bootstrap.bind(localPort).addListener(future -> { + if (future instanceof ChannelFuture channelFuture && future.isSuccess()) { + Channel channel = channelFuture.channel(); + serverFuture.complete(address); + } else { + serverFuture.completeExceptionally(future.cause()); + } + }); + } else { + log.error("publishOnionServiceForNetty failed", throwable); + } + }); + return serverFuture; + } catch (InterruptedException e) { + log.warn("Thread got interrupted at getServerSocket method", e); + Thread.currentThread().interrupt(); // Restore interrupted state + throw new ConnectionException(e); + } catch (ExecutionException e) { + throw new ConnectionException(e); + } + } + + @Override + public CompletableFuture connect(Address address, + Supplier handshakeHandlerSupplier) { + return CompletableFuture.supplyAsync(() -> { + int socksPort = torService.getSocksPort(); + checkArgument(socksPort != -1, "socksPort is not yet set. torService must be initialized before connect is called."); + CompletableFuture future = new CompletableFuture<>(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000000); + bootstrap.option(ChannelOption.SO_KEEPALIVE, true); + bootstrap.resolver(NoopAddressResolverGroup.INSTANCE); + bootstrap.group(workerGroup) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) { + OutboundHandshakeHandler outboundHandshakeHandler = handshakeHandlerSupplier.get(); + Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress("127.0.0.1", socksPort)); + socks5ProxyHandler.setConnectTimeoutMillis(1000000); + socketChannel.pipeline() + .addFirst(socks5ProxyHandler) + .addLast(new ProtobufVarint32FrameDecoder()) + .addLast(new ProtobufDecoder(NetworkEnvelope.getDefaultInstance())) + .addLast(new ProtobufVarint32LengthFieldPrepender()) + .addLast(new ProtobufEncoder()) + .addLast(outboundHandshakeHandler); + } + }); + + // Create unresolved socket address so Tor handles DNS resolution + InetSocketAddress unresolved = InetSocketAddress.createUnresolved(address.getHost(), address.getPort()); + ChannelFuture connect = bootstrap.connect(unresolved); + connect.addListener((ChannelFutureListener) channelFuture -> { + if (!channelFuture.isSuccess()) { + // complete only on TCP failure (e.g., Tor not listening) + future.completeExceptionally(channelFuture.cause()); + } + }); + return future.join(); + }, ExecutorFactory.newSingleThreadExecutor("")); + } + + @Override + public CompletableFuture
evaluateMyAddress(NetworkId networkId, KeyBundle keyBundle) { + return CompletableFuture.supplyAsync(() -> { + try { + int port = networkId.getAddressByTransportTypeMap().get(TransportType.TOR).getPort(); + TorKeyPair torKeyPair = keyBundle.getTorKeyPair(); + String onionAddress = torKeyPair.getOnionAddress(); + return new TorAddress(onionAddress, port); + } catch (Exception exception) { + throw new ConnectionException(exception); + } + }); + } + @Override public ServerSocketResult getServerSocket(NetworkId networkId, KeyBundle keyBundle, String nodeId) { try { @@ -76,8 +228,7 @@ public ServerSocketResult getServerSocket(NetworkId networkId, KeyBundle keyBund initializeServerSocketTimestampByNetworkId.put(networkId, System.currentTimeMillis()); TorKeyPair torKeyPair = keyBundle.getTorKeyPair(); - String onionAddress = torKeyPair.getOnionAddress(); - TorAddress address = new TorAddress(onionAddress, port); + Address address = evaluateMyAddress(networkId, keyBundle).get(); ServerSocket serverSocket = torService.publishOnionServiceAndCreateServerSocket(port, torKeyPair).get(); initializedServerSocketTimestampByNetworkId.put(networkId, System.currentTimeMillis()); return new ServerSocketResult(serverSocket, address); diff --git a/network/network/src/main/java/bisq/network/p2p/node/transport/TransportService.java b/network/network/src/main/java/bisq/network/p2p/node/transport/TransportService.java index 023906fae3..10db15fd66 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/transport/TransportService.java +++ b/network/network/src/main/java/bisq/network/p2p/node/transport/TransportService.java @@ -23,13 +23,17 @@ import bisq.common.observable.Observable; import bisq.common.observable.map.ObservableHashMap; import bisq.network.identity.NetworkId; +import bisq.network.p2p.node.handshake.InboundHandshakeHandler; +import bisq.network.p2p.node.handshake.OutboundHandshakeHandler; import bisq.security.keys.KeyBundle; import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; +import io.netty.channel.Channel; import java.io.IOException; import java.net.Socket; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkArgument; @@ -54,10 +58,25 @@ static TransportService create(TransportType transportType, TransportConfig conf CompletableFuture shutdown(); + default CompletableFuture
createNettyServer(NetworkId networkId, + KeyBundle keyBundle) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture
startNettyServer(NetworkId networkId, + KeyBundle keyBundle, + Supplier handshakeHandlerSupplier) { + return CompletableFuture.completedFuture(null); + } + ServerSocketResult getServerSocket(NetworkId networkId, KeyBundle keyBundle, String nodeId); Socket getSocket(Address address, String nodeId) throws IOException; + default CompletableFuture connect(Address address, Supplier handshakeHandlerSupplier) { + return null; + } + default Optional getSocksProxy() throws IOException { return Optional.empty(); } @@ -81,4 +100,6 @@ default void setTransportState(TransportState newTransportState) { ObservableHashMap getInitializeServerSocketTimestampByNetworkId(); ObservableHashMap getInitializedServerSocketTimestampByNetworkId(); + + CompletableFuture
evaluateMyAddress(NetworkId networkId, KeyBundle keyBundle); } diff --git a/network/tor/tor/src/main/java/bisq/network/tor/TorService.java b/network/tor/tor/src/main/java/bisq/network/tor/TorService.java index c8bc474420..32758fad98 100644 --- a/network/tor/tor/src/main/java/bisq/network/tor/TorService.java +++ b/network/tor/tor/src/main/java/bisq/network/tor/TorService.java @@ -84,6 +84,8 @@ public class TorService implements Service { private Optional torProcess = Optional.empty(); private Optional torSocksProxyFactory = Optional.empty(); private final Map externalTorConfigMap = new HashMap<>(); + @Getter + private int socksPort = -1; public TorService(TorTransportConfig transportConfig) { this.transportConfig = transportConfig; @@ -168,8 +170,8 @@ public CompletableFuture initialize() { torController.authenticate(hashedControlPassword); torController.bootstrap(); - int port = torController.getSocksPort(); - torSocksProxyFactory = Optional.of(new TorSocksProxyFactory(port)); + socksPort = torController.getSocksPort(); + torSocksProxyFactory = Optional.of(new TorSocksProxyFactory(socksPort)); return CompletableFuture.completedFuture(true); } @@ -215,6 +217,32 @@ public CompletableFuture shutdown() { }, commonForkJoinPool()); } + public CompletableFuture publishOnionServiceForNetty(int port, TorKeyPair torKeyPair) { + long ts = System.currentTimeMillis(); + try { + InetAddress bindAddress = !LinuxDistribution.isWhonix() ? Inet4Address.getLoopbackAddress() + : Inet4Address.getByName("0.0.0.0"); + var localServerSocket = new ServerSocket(RANDOM_PORT, 50, bindAddress); + int localPort = localServerSocket.getLocalPort(); + localServerSocket.close(); + String onionAddress = torKeyPair.getOnionAddress(); + log.info("Publish onion service for onion address {}:{}", onionAddress, port); + checkArgument(!publishedOnionServices.contains(onionAddress), "publishOnionServiceForNetty must be called only once"); + torController.publish(torKeyPair, port, localPort); + publishedOnionServices.add(onionAddress); + log.info("Tor onion service Ready. Took {} ms. Onion address={}:{}", + System.currentTimeMillis() - ts, onionAddress, port); + return CompletableFuture.completedFuture(localPort); + } catch (InterruptedException e) { + log.warn("Can't create onion service. Thread got interrupted at publishOnionService method", e); + Thread.currentThread().interrupt(); // Restore interrupted state + return CompletableFuture.failedFuture(e); + } catch (IOException e) { + log.error("Can't create onion service", e); + return CompletableFuture.failedFuture(e); + } + } + public CompletableFuture publishOnionService(int localPort, int onionServicePort, TorKeyPair torKeyPair) { long ts = System.currentTimeMillis(); try {