Skip to content

Commit de75e14

Browse files
committed
[fix][broker] Fix multiple transfer corruption issues when TLS is enabled
1 parent 13806d7 commit de75e14

1 file changed

Lines changed: 32 additions & 6 deletions

File tree

pulsar-common/src/main/java/org/apache/pulsar/common/protocol/ByteBufPair.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,19 +141,45 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
141141
if (msg instanceof ByteBufPair) {
142142
ByteBufPair b = (ByteBufPair) msg;
143143

144+
ChannelPromise compositePromise = ctx.newPromise();
145+
compositePromise.addListener(future -> {
146+
// release the ByteBufPair after the write operation is completed
147+
ReferenceCountUtil.safeRelease(b);
148+
// complete the promise passed as an argument unless it's a void promise
149+
if (!promise.isVoid()) {
150+
if (future.isSuccess()) {
151+
promise.setSuccess();
152+
} else {
153+
promise.setFailure(future.cause());
154+
}
155+
}
156+
});
157+
144158
// Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler).
145159
// For these handlers, we need to pass a copy of the buffers as the source buffers may be cached
146160
// for multiple requests.
147-
try {
148-
ctx.write(b.getFirst().copy(), ctx.voidPromise());
149-
ctx.write(b.getSecond().copy(), promise);
150-
} finally {
151-
ReferenceCountUtil.safeRelease(b);
152-
}
161+
ctx.write(nioBufferCopy(b.getFirst()), ctx.voidPromise());
162+
ctx.write(nioBufferCopy(b.getSecond()), compositePromise);
153163
} else {
154164
ctx.write(msg, promise);
155165
}
156166
}
167+
168+
// Make a shallow copy of the ByteBuf using ByteBuf.nioBuffers()/nioBuffer() method.
169+
// This is needed since SslHandler will call internalNioBuffer methods on the ByteBuf instance which is
170+
// not thread safe when the ByteBuf instance is shared across multiple threads.
171+
// This method works around the issue.
172+
// Notice: The original ByteBuf continues to control the lifecycle of the underlying memory allocation.
173+
// This is fine in this case since the ByteBufPair keeps the reference counts, and it is released after
174+
// the write method completes.
175+
private ByteBuf nioBufferCopy(ByteBuf buf) {
176+
int nioBufferCount = buf.nioBufferCount();
177+
if (nioBufferCount > 1) {
178+
return Unpooled.wrappedBuffer(buf.nioBuffers());
179+
} else {
180+
return Unpooled.wrappedBuffer(buf.nioBuffer());
181+
}
182+
}
157183
}
158184

159185
}

0 commit comments

Comments
 (0)