diff --git a/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/client/DriftNettyClientModule.java b/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/client/DriftNettyClientModule.java index 820362858..bc901b9f8 100644 --- a/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/client/DriftNettyClientModule.java +++ b/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/client/DriftNettyClientModule.java @@ -27,6 +27,7 @@ import io.airlift.drift.transport.client.DriftClientConfig; import io.airlift.drift.transport.client.MethodInvokerFactory; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import jakarta.annotation.PreDestroy; import java.lang.annotation.Annotation; @@ -42,7 +43,7 @@ public class DriftNettyClientModule public DriftNettyClientModule() { - this(ByteBufAllocator.DEFAULT); + this(PooledByteBufAllocator.DEFAULT); } @VisibleForTesting diff --git a/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/client/DriftNettyMethodInvokerFactory.java b/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/client/DriftNettyMethodInvokerFactory.java index e64329c98..87fbe7be9 100644 --- a/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/client/DriftNettyMethodInvokerFactory.java +++ b/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/client/DriftNettyMethodInvokerFactory.java @@ -23,8 +23,10 @@ import io.airlift.drift.transport.netty.ssl.SslContextFactory; import io.airlift.drift.transport.netty.ssl.SslContextFactory.SslContextParameters; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import jakarta.annotation.PreDestroy; import java.io.Closeable; @@ -50,7 +52,7 @@ public class DriftNettyMethodInvokerFactory public static DriftNettyMethodInvokerFactory createStaticDriftNettyMethodInvokerFactory(DriftNettyClientConfig clientConfig) { - return createStaticDriftNettyMethodInvokerFactory(clientConfig, ByteBufAllocator.DEFAULT); + return createStaticDriftNettyMethodInvokerFactory(clientConfig, PooledByteBufAllocator.DEFAULT); } @VisibleForTesting @@ -63,7 +65,7 @@ public DriftNettyMethodInvokerFactory( DriftNettyConnectionFactoryConfig factoryConfig, Function clientConfigurationProvider) { - this(factoryConfig, clientConfigurationProvider, ByteBufAllocator.DEFAULT); + this(factoryConfig, clientConfigurationProvider, PooledByteBufAllocator.DEFAULT); } @VisibleForTesting @@ -74,7 +76,7 @@ public DriftNettyMethodInvokerFactory( { requireNonNull(factoryConfig, "factoryConfig is null"); - group = new NioEventLoopGroup(factoryConfig.getThreadCount(), daemonThreadsNamed("drift-client-%s")); + group = new MultiThreadIoEventLoopGroup(factoryConfig.getThreadCount(), daemonThreadsNamed("drift-client-%s"), NioIoHandler.newFactory()); this.clientConfigurationProvider = requireNonNull(clientConfigurationProvider, "clientConfigurationProvider is null"); this.sslContextFactory = createSslContextFactory(true, factoryConfig.getSslContextRefreshTime(), group); diff --git a/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/server/DriftNettyServerTransport.java b/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/server/DriftNettyServerTransport.java index ad74636c8..db871c29f 100644 --- a/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/server/DriftNettyServerTransport.java +++ b/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/server/DriftNettyServerTransport.java @@ -21,9 +21,11 @@ import io.airlift.drift.transport.server.ServerTransport; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.util.concurrent.Future; @@ -56,7 +58,7 @@ public class DriftNettyServerTransport public DriftNettyServerTransport(ServerMethodInvoker methodInvoker, DriftNettyServerConfig config) { - this(methodInvoker, config, ByteBufAllocator.DEFAULT); + this(methodInvoker, config, PooledByteBufAllocator.DEFAULT); } @VisibleForTesting @@ -66,9 +68,9 @@ public DriftNettyServerTransport(ServerMethodInvoker methodInvoker, DriftNettySe requireNonNull(config, "config is null"); this.port = config.getPort(); - ioGroup = new NioEventLoopGroup(config.getIoThreadCount(), threadsNamed("drift-server-io-%s")); + ioGroup = new MultiThreadIoEventLoopGroup(config.getIoThreadCount(), threadsNamed("drift-server-io-%s"), NioIoHandler.newFactory()); - workerGroup = new NioEventLoopGroup(config.getWorkerThreadCount(), threadsNamed("drift-server-worker-%s")); + workerGroup = new MultiThreadIoEventLoopGroup(config.getWorkerThreadCount(), threadsNamed("drift-server-worker-%s"), NioIoHandler.newFactory()); Optional> sslContext = Optional.empty(); if (config.isSslEnabled()) { @@ -101,6 +103,7 @@ public DriftNettyServerTransport(ServerMethodInvoker methodInvoker, DriftNettySe .childHandler(serverInitializer) .option(SO_BACKLOG, config.getAcceptBacklog()) .option(ALLOCATOR, allocator) + .childOption(ALLOCATOR, allocator) .childOption(SO_KEEPALIVE, true) .validate(); } diff --git a/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/server/DriftNettyServerTransportFactory.java b/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/server/DriftNettyServerTransportFactory.java index f79e3d569..fb26c8f20 100644 --- a/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/server/DriftNettyServerTransportFactory.java +++ b/drift-transport-netty/src/main/java/io/airlift/drift/transport/netty/server/DriftNettyServerTransportFactory.java @@ -20,6 +20,7 @@ import io.airlift.drift.transport.server.ServerTransport; import io.airlift.drift.transport.server.ServerTransportFactory; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import static java.util.Objects.requireNonNull; @@ -31,7 +32,7 @@ public class DriftNettyServerTransportFactory public DriftNettyServerTransportFactory(DriftNettyServerConfig config) { - this(config, ByteBufAllocator.DEFAULT); + this(config, PooledByteBufAllocator.DEFAULT); } @Inject