Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions account/src/main/java/bisq/account/accounts/Account.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ public bisq.account.protobuf.Account toProto(boolean serializeForHash) {
return resolveProto(serializeForHash);
}

@Override
public bisq.account.protobuf.Account completeProto() {
return toProto(false);
}

protected bisq.account.protobuf.Account.Builder getAccountBuilder(boolean serializeForHash) {
return bisq.account.protobuf.Account.newBuilder()
.setId(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ public bisq.account.protobuf.AccountPayload toProto(boolean serializeForHash) {
return resolveProto(serializeForHash);
}

@Override
public bisq.account.protobuf.AccountPayload completeProto() {
return toProto(false);
}

protected bisq.account.protobuf.AccountPayload.Builder getAccountPayloadBuilder(boolean serializeForHash) {
return bisq.account.protobuf.AccountPayload.newBuilder()
.setId(id);
Expand Down
13 changes: 13 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ typesafe-config-lib = { strictly = '1.4.3' }

zxing-lib = { strictly = '3.5.3' }

netty-lib = { strictly = '4.2.7.Final' }

# Referenced in subproject's build.gradle > dependencies block in the form 'implementation libs.guava'
# Note: keys can contain dash (protobuf-java) but the dash is replaced by dot when referenced
# in a build.gradle ('implementation libs.protobuf.java')
Expand Down Expand Up @@ -174,6 +176,15 @@ typesafe-config = { module = 'com.typesafe:config', version.ref = 'typesafe-conf

zxing = { module = 'com.google.zxing:javase', version.ref = 'zxing-lib' }

netty-all = { module = 'io.netty:netty-all', version.ref = 'netty-lib' }
netty-transport = { module = 'io.netty:netty-transport', version.ref = 'netty-lib' }
netty-buffer = { module = 'io.netty:netty-buffer', version.ref = 'netty-lib' }
netty-common = { module = 'io.netty:netty-common', version.ref = 'netty-lib' }
netty-handler = { module = 'io.netty:netty-handler', version.ref = 'netty-lib' }
netty-handler-proxy = { module = 'io.netty:netty-handler-proxy', version.ref = 'netty-lib' }
netty-codec-socks = { module = 'io.netty:netty-codec-socks', version.ref = 'netty-lib' }
netty-codec-protobuf = { module = 'io.netty:netty-codec-protobuf', version.ref = 'netty-lib' }

# Defines groups of libs that are commonly used together
# Referenced in dependencies block as 'implementation libs.bundles.i2p'
[bundles]
Expand All @@ -190,6 +201,8 @@ rest-api-libs = ['swagger-jaxrs2-jakarta', 'glassfish-jersey-jdk-http', 'glassfi
'jackson-core', 'jackson-annotations', 'jackson-databind', 'jackson-datatype']
websocket-libs = ['glassfish-jersey-json-jackson', 'glassfish-jersey-server', 'glassfish-jersey-containers-grizzly',
'glassfish-grizzly-websockets-server', 'jakarta-websocket', 'jackson-databind', 'jackson-datatype', 'swagger-swagger-annotations']
nonblockingio = ['netty-all', 'netty-transport', 'netty-buffer', 'netty-common', 'netty-handler', 'netty-handler-proxy',
'netty-codec-socks', 'netty-codec-protobuf']

# Referenced in subproject's build.gradle > plugin block as alias: `alias(libs.plugins.protobuf)`
# Note: plugin version constraints are not supported by the java-platform plugin, so cannot be enforced there. However,
Expand Down
2 changes: 2 additions & 0 deletions network/network/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ dependencies {
implementation(libs.jsocks)
implementation(libs.bundles.i2p)

implementation(libs.bundles.nonblockingio)

integrationTestImplementation(libs.mockito)
}
133 changes: 47 additions & 86 deletions network/network/src/main/java/bisq/network/p2p/node/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package bisq.network.p2p.node;

import bisq.common.network.Address;
import bisq.common.network.DefaultPeerSocket;
import bisq.common.network.PeerSocket;
import bisq.common.network.TransportType;
import bisq.common.threading.AbortPolicyWithLogging;
import bisq.common.threading.ExecutorFactory;
Expand All @@ -36,21 +34,19 @@
import bisq.network.p2p.node.envelope.NetworkEnvelopeSocket;
import bisq.network.p2p.node.network_load.ConnectionMetrics;
import bisq.network.p2p.node.network_load.NetworkLoadSnapshot;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.util.Comparator;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -96,6 +92,7 @@ public interface Listener {
}

private final AuthorizationService authorizationService;
private final ChannelHandlerContext context;
@Getter
private final String id;
@Getter
Expand All @@ -111,8 +108,6 @@ public interface Listener {
private final ConnectionThrottle connectionThrottle;
private final Handler handler;
private final Set<Listener> listeners = new CopyOnWriteArraySet<>();
@Nullable
private Future<?> inputHandlerFuture;
// We use counter value 0 in the handshake, thus we start here with 1 as it's not the first message
@Getter(AccessLevel.PACKAGE)
private final AtomicInteger sentMessageCounter = new AtomicInteger(1);
Expand All @@ -121,17 +116,19 @@ public interface Listener {
private volatile boolean listeningStopped;
private final ThreadPoolExecutor readExecutor;
private final ThreadPoolExecutor sendExecutor;
private final SimpleChannelInboundHandler<bisq.network.protobuf.NetworkEnvelope> inboundMessageHandler;

protected Connection(AuthorizationService authorizationService,
ChannelHandlerContext context,
String connectionId,
Socket socket,
Capability peersCapability,
NetworkLoadSnapshot peersNetworkLoadSnapshot,
ConnectionMetrics connectionMetrics,
ConnectionThrottle connectionThrottle,
Handler handler,
BiConsumer<Connection, Exception> errorHandler) {
this.authorizationService = authorizationService;
this.context = context;
this.id = connectionId;
this.peersCapability = peersCapability;
this.peersNetworkLoadSnapshot = peersNetworkLoadSnapshot;
Expand All @@ -143,63 +140,41 @@ protected Connection(AuthorizationService authorizationService,
readExecutor = createReadExecutor();
sendExecutor = createSendExecutor();

try {
PeerSocket peerSocket = new DefaultPeerSocket(socket);
this.networkEnvelopeSocket = new NetworkEnvelopeSocket(peerSocket);
} catch (IOException exception) {
log.error("Could not create objectOutputStream/objectInputStream for socket {}", socket, exception);
errorHandler.accept(this, exception);
shutdown(CloseReason.EXCEPTION.exception(exception));
return;
}
try {
inputHandlerFuture = readExecutor.submit(() -> {
inboundMessageHandler = new SimpleChannelInboundHandler<>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, bisq.network.protobuf.NetworkEnvelope proto) {
try {
long readTs = 0;
while (isInputStreamActive()) {
if (readTs != 0) {
log.debug("Processing message took {} ms. Wait for new message from {}. ", System.currentTimeMillis() - readTs, getPeerAddress());
} else {
log.debug("Wait for new message from {}", getPeerAddress());
}
var proto = networkEnvelopeSocket.receiveNextEnvelope();
readTs = System.currentTimeMillis();
if (proto == null) {
log.info("Proto from networkEnvelopeSocket.receiveNextEnvelope() is null. " +
"This is expected if the input stream has reached EOF. We shut down the connection.");
shutdown(CloseReason.EXCEPTION.exception(new EOFException("Input stream reached EOF")));
return;
}
if (proto == null) {
log.info("Proto from networkEnvelopeSocket.receiveNextEnvelope() is null. " +
"This is expected if the input stream has reached EOF. We shut down the connection.");
shutdown(CloseReason.EXCEPTION.exception(new EOFException("Input stream reached EOF")));
return;
}

// receiveNextEnvelope might need some time wo we check again if connection is still active
if (!isInputStreamActive()) {
return;
}
// receiveNextEnvelope might need some time wo we check again if connection is still active
if (!isActive()) {
return;
}

connectionThrottle.throttleReceiveMessage();
// ThrottleReceiveMessage can cause a delay by Thread.sleep
if (!isInputStreamActive()) {
return;
}
long ts = System.currentTimeMillis();
NetworkEnvelope networkEnvelope = NetworkEnvelope.fromProto(proto);
long deserializeTime = System.currentTimeMillis() - ts;
networkEnvelope.verifyVersion();
connectionMetrics.onReceived(networkEnvelope, deserializeTime);

EnvelopePayloadMessage envelopePayloadMessage = networkEnvelope.getEnvelopePayloadMessage();
log.debug("Received message: {} at: {}",
StringUtils.truncate(envelopePayloadMessage.toString(), 200), this);
requestResponseManager.onReceived(envelopePayloadMessage);

if (isInputStreamActive()) {
boolean isMessageAuthorized = handler.isMessageAuthorized(envelopePayloadMessage,
networkEnvelope.getAuthorizationToken(),
this);
if (isMessageAuthorized) {
handler.handleNetworkMessage(envelopePayloadMessage, this);
listeners.forEach(listener -> NetworkExecutors.getNotifyExecutor().submit(() -> listener.onNetworkMessage(envelopePayloadMessage)));
}
long ts = System.currentTimeMillis();
NetworkEnvelope networkEnvelope = NetworkEnvelope.fromProto(proto);

long deserializeTime = System.currentTimeMillis() - ts;
networkEnvelope.verifyVersion();
connectionMetrics.onReceived(networkEnvelope, deserializeTime);

EnvelopePayloadMessage envelopePayloadMessage = networkEnvelope.getEnvelopePayloadMessage();
log.debug("Received message: {} at: {}",
StringUtils.truncate(envelopePayloadMessage.toString(), 200), this);
requestResponseManager.onReceived(envelopePayloadMessage);

if (isActive()) {
boolean isMessageAuthorized = handler.isMessageAuthorized(envelopePayloadMessage,
networkEnvelope.getAuthorizationToken(),
Connection.this);
if (isMessageAuthorized) {
handler.handleNetworkMessage(envelopePayloadMessage, Connection.this);
listeners.forEach(listener -> NetworkExecutors.getNotifyExecutor().submit(() -> listener.onNetworkMessage(envelopePayloadMessage)));
}
}
} catch (Exception exception) {
Expand All @@ -210,17 +185,14 @@ protected Connection(AuthorizationService authorizationService,

// EOFException expected if connection got closed (Socket closed message)
if (!(exception instanceof EOFException)) {
errorHandler.accept(this, exception);
errorHandler.accept(Connection.this, exception);
}
}
} finally {
}
});
} catch (RejectedExecutionException e) {
log.error("Read executor rejected task. We shut down the connection.", e);
errorHandler.accept(this, e);
inputHandlerFuture = CompletableFuture.failedFuture(e);
shutdown(CloseReason.EXCEPTION.exception(e));
}
}
};
context.pipeline().addLast(inboundMessageHandler);
}

/* --------------------------------------------------------------------- */
Expand Down Expand Up @@ -290,7 +262,7 @@ CompletableFuture<Connection> sendAsync(EnvelopePayloadMessage envelopePayloadMe
AuthorizationToken authorizationToken = createAuthorizationToken(envelopePayloadMessage);
networkEnvelope = createNetworkEnvelope(envelopePayloadMessage, authorizationToken);
long ts = System.currentTimeMillis();
networkEnvelopeSocket.send(networkEnvelope);
context.writeAndFlush(networkEnvelope.completeProto());
spentTime = System.currentTimeMillis() - ts;
}
connectionMetrics.onSent(networkEnvelope, spentTime);
Expand Down Expand Up @@ -351,15 +323,7 @@ void shutdown(CloseReason closeReason) {
shutdownStarted = true;
requestResponseManager.dispose();
connectionMetrics.clear();
if (inputHandlerFuture != null) {
inputHandlerFuture.cancel(true);
}
try {
if (networkEnvelopeSocket != null) {
networkEnvelopeSocket.close();
}
} catch (IOException ignore) {
}
context.pipeline().remove(inboundMessageHandler);
handler.handleConnectionClosed(this, closeReason);
listeners.forEach(listener -> NetworkExecutors.getNotifyExecutor().submit(() -> listener.onConnectionClosed(closeReason)));
listeners.clear();
Expand All @@ -369,18 +333,15 @@ void shutdown(CloseReason closeReason) {
}

boolean isStopped() {
return shutdownStarted
|| networkEnvelopeSocket == null
|| networkEnvelopeSocket.isClosed()
|| Thread.currentThread().isInterrupted();
return shutdownStarted || Thread.currentThread().isInterrupted();
}


/* --------------------------------------------------------------------- */
// Private
/* --------------------------------------------------------------------- */

private boolean isInputStreamActive() {
private boolean isActive() {
return !listeningStopped && isRunning();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@
import bisq.network.p2p.node.authorization.AuthorizationService;
import bisq.network.p2p.node.network_load.ConnectionMetrics;
import bisq.network.p2p.node.network_load.NetworkLoadSnapshot;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

import java.net.Socket;
import java.util.function.BiConsumer;

@Slf4j
public class InboundConnection extends Connection {
InboundConnection(AuthorizationService authorizationService,
ChannelHandlerContext context,
String connectionId,
Socket socket,
Capability peersCapability,
NetworkLoadSnapshot peersNetworkLoadSnapshot,
ConnectionMetrics connectionMetrics,
ConnectionThrottle connectionThrottle,
Handler handler,
BiConsumer<Connection, Exception> errorHandler) {
super(authorizationService,
context,
connectionId,
socket,
peersCapability,
peersNetworkLoadSnapshot,
connectionMetrics,
Expand Down
Loading
Loading