Skip to content

Commit 6b68e9e

Browse files
committed
[improve][misc] Optimize TLS performance by omitting extra buffer copies (#23115)
(cherry picked from commit 1db3c5f)
1 parent 53fb549 commit 6b68e9e

3 files changed

Lines changed: 35 additions & 5 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
116116
} else {
117117
ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc()));
118118
}
119-
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
120-
} else {
121-
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
122119
}
120+
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.getEncoder(this.enableTls));
123121

124122
if (pulsar.getConfiguration().isHaProxyProtocolEnabled()) {
125123
ch.pipeline().addLast(OptionalProxyProtocolDecoder.NAME, new OptionalProxyProtocolDecoder());

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarChannelInitializer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.pulsar.client.impl;
2020

2121
import io.netty.channel.Channel;
22+
import io.netty.channel.ChannelHandler;
2223
import io.netty.channel.ChannelInitializer;
2324
import io.netty.channel.socket.SocketChannel;
2425
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -147,11 +148,12 @@ public void initChannel(SocketChannel ch) throws Exception {
147148
ch.pipeline().addLast("consolidation", new FlushConsolidationHandler(1024, true));
148149

149150
// Setup channel except for the SsHandler for TLS enabled connections
150-
ch.pipeline().addLast("ByteBufPairEncoder", tlsEnabled ? ByteBufPair.COPYING_ENCODER : ByteBufPair.ENCODER);
151+
ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.getEncoder(tlsEnabled));
151152

152153
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
153154
Commands.DEFAULT_MAX_MESSAGE_SIZE + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
154-
ch.pipeline().addLast("handler", clientCnxSupplier.get());
155+
ChannelHandler clientCnx = clientCnxSupplier.get();
156+
ch.pipeline().addLast("handler", clientCnx);
155157
}
156158

157159
/**

pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,39 @@ public ReferenceCounted touch(Object hint) {
107107
return this;
108108
}
109109

110+
/**
111+
* Encoder that writes a {@link ByteBufPair} to the socket.
112+
* Use {@link #getEncoder(boolean)} to get the appropriate encoder instead of referencing this.
113+
*/
114+
@Deprecated
110115
public static final Encoder ENCODER = new Encoder();
116+
117+
private static final boolean COPY_ENCODER_REQUIRED_FOR_TLS;
118+
static {
119+
boolean copyEncoderRequiredForTls = false;
120+
try {
121+
// io.netty.handler.ssl.SslHandlerCoalescingBufferQueue is only available in netty 4.1.111 and later
122+
// when the class is available, there's no need to use the CopyingEncoder when TLS is enabled
123+
ByteBuf.class.getClassLoader().loadClass("io.netty.handler.ssl.SslHandlerCoalescingBufferQueue");
124+
} catch (ClassNotFoundException e) {
125+
copyEncoderRequiredForTls = true;
126+
}
127+
COPY_ENCODER_REQUIRED_FOR_TLS = copyEncoderRequiredForTls;
128+
}
129+
130+
/**
131+
* Encoder that makes a copy of the ByteBufs before writing them to the socket.
132+
* This is needed with Netty <4.1.111.Final when TLS is enabled, because the SslHandler will modify the input
133+
* ByteBufs.
134+
* Use {@link #getEncoder(boolean)} to get the appropriate encoder instead of referencing this.
135+
*/
136+
@Deprecated
111137
public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();
112138

139+
public static ChannelOutboundHandlerAdapter getEncoder(boolean tlsEnabled) {
140+
return tlsEnabled && COPY_ENCODER_REQUIRED_FOR_TLS ? COPYING_ENCODER : ENCODER;
141+
}
142+
113143
@Sharable
114144
@SuppressWarnings("checkstyle:JavadocType")
115145
public static class Encoder extends ChannelOutboundHandlerAdapter {

0 commit comments

Comments
 (0)