Skip to content

Commit 29a74ac

Browse files
authored
fix: reference counting (retain/release) in PerChannelBookieClient (#4293) (#4396)
### Motivation This addresses the remaining gaps of #4289 in handling ByteBuf retain/release. This PR will also address the concern about NioBuffer lifecycle brought up in the review of the original PR review: #791 (comment) . This PR fixes several problems: * ByteString buffer lifecycle in client, follows ByteBufList lifecycle * ByteBufList lifecycle, moved to write promise * Calling of write promises in AuthHandler which buffers messages while authentication is in progress. It was ignoring the promises. ### Changes - add 2 callback parameters to writeAndFlush: cleanupActionFailedBeforeWrite and cleanupActionAfterWrite - use these callback actions for proper cleanup - extract a utility class ByteStringUtil for wrapping ByteBufList or ByteBuf as concatenated zero copy ByteString - properly handle releasing of ByteBufList in the write promise - properly handle calling promises that are buffered while authentication is in progress (cherry picked from commit 0ef2f99) # Conflicts: # bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
1 parent b334d9f commit 29a74ac

6 files changed

Lines changed: 222 additions & 90 deletions

File tree

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
355355
super.write(ctx, msg, promise);
356356
super.flush(ctx);
357357
} else {
358-
waitingForAuth.add(msg);
358+
addMsgAndPromiseToQueue(msg, promise);
359359
}
360360
} else if (msg instanceof BookieProtocol.Request) {
361361
// let auth messages through, queue the rest
@@ -364,16 +364,26 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
364364
super.write(ctx, msg, promise);
365365
super.flush(ctx);
366366
} else {
367-
waitingForAuth.add(msg);
367+
addMsgAndPromiseToQueue(msg, promise);
368368
}
369369
} else if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
370-
waitingForAuth.add(msg);
370+
addMsgAndPromiseToQueue(msg, promise);
371371
} else {
372372
LOG.info("[{}] dropping write of message {}", ctx.channel(), msg);
373373
}
374374
}
375375
}
376376

377+
// Add the message and the associated promise to the queue.
378+
// The promise is added to the same queue as the message without an additional wrapper object so
379+
// that object allocations can be avoided. A similar solution is used in Netty codebase.
380+
private void addMsgAndPromiseToQueue(Object msg, ChannelPromise promise) {
381+
waitingForAuth.add(msg);
382+
if (promise != null && !promise.isVoid()) {
383+
waitingForAuth.add(promise);
384+
}
385+
}
386+
377387
long newTxnId() {
378388
return transactionIdGenerator.incrementAndGet();
379389
}
@@ -433,10 +443,19 @@ public void operationComplete(int rc, Void v) {
433443
if (rc == BKException.Code.OK) {
434444
synchronized (this) {
435445
authenticated = true;
436-
Object msg = waitingForAuth.poll();
437-
while (msg != null) {
438-
NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, msg);
439-
msg = waitingForAuth.poll();
446+
while (true) {
447+
Object msg = waitingForAuth.poll();
448+
if (msg == null) {
449+
break;
450+
}
451+
ChannelPromise promise;
452+
// check if the message has an associated promise as the next element in the queue
453+
if (waitingForAuth.peek() instanceof ChannelPromise) {
454+
promise = (ChannelPromise) waitingForAuth.poll();
455+
} else {
456+
promise = ctx.voidPromise();
457+
}
458+
ctx.writeAndFlush(msg, promise);
440459
}
441460
}
442461
} else {

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -260,18 +260,20 @@ public void writeLac(final BookieId addr, final long ledgerId, final byte[] mast
260260

261261
toSend.retain();
262262
client.obtain((rc, pcbc) -> {
263-
if (rc != BKException.Code.OK) {
264-
try {
265-
executor.executeOrdered(ledgerId,
266-
() -> cb.writeLacComplete(rc, ledgerId, addr, ctx));
267-
} catch (RejectedExecutionException re) {
268-
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
263+
try {
264+
if (rc != BKException.Code.OK) {
265+
try {
266+
executor.executeOrdered(ledgerId,
267+
() -> cb.writeLacComplete(rc, ledgerId, addr, ctx));
268+
} catch (RejectedExecutionException re) {
269+
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
270+
}
271+
} else {
272+
pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
269273
}
270-
} else {
271-
pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
274+
} finally {
275+
ReferenceCountUtil.release(toSend);
272276
}
273-
274-
ReferenceCountUtil.release(toSend);
275277
}, ledgerId, useV3Enforced);
276278
}
277279

@@ -392,14 +394,16 @@ static ChannelReadyForAddEntryCallback create(
392394
@Override
393395
public void operationComplete(final int rc,
394396
PerChannelBookieClient pcbc) {
395-
if (rc != BKException.Code.OK) {
396-
bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
397-
} else {
398-
pcbc.addEntry(ledgerId, masterKey, entryId,
399-
toSend, cb, ctx, options, allowFastFail, writeFlags);
397+
try {
398+
if (rc != BKException.Code.OK) {
399+
bookieClient.completeAdd(rc, ledgerId, entryId, addr, cb, ctx);
400+
} else {
401+
pcbc.addEntry(ledgerId, masterKey, entryId,
402+
toSend, cb, ctx, options, allowFastFail, writeFlags);
403+
}
404+
} finally {
405+
ReferenceCountUtil.release(toSend);
400406
}
401-
402-
ReferenceCountUtil.release(toSend);
403407
recycle();
404408
}
405409

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.apache.bookkeeper.proto;
20+
21+
import com.google.protobuf.ByteString;
22+
import com.google.protobuf.UnsafeByteOperations;
23+
import io.netty.buffer.ByteBuf;
24+
import java.nio.ByteBuffer;
25+
import org.apache.bookkeeper.util.ByteBufList;
26+
27+
public class ByteStringUtil {
28+
29+
/**
30+
* Wrap the internal buffers of a ByteBufList into a single ByteString.
31+
* The lifecycle of the wrapped ByteString is tied to the ByteBufList.
32+
*
33+
* @param bufList ByteBufList to wrap
34+
* @return ByteString wrapping the internal buffers of the ByteBufList
35+
*/
36+
public static ByteString byteBufListToByteString(ByteBufList bufList) {
37+
ByteString aggregated = null;
38+
for (int i = 0; i < bufList.size(); i++) {
39+
ByteBuf buffer = bufList.getBuffer(i);
40+
if (buffer.readableBytes() > 0) {
41+
aggregated = byteBufToByteString(aggregated, buffer);
42+
}
43+
}
44+
return aggregated != null ? aggregated : ByteString.EMPTY;
45+
}
46+
47+
/**
48+
* Wrap the internal buffers of a ByteBuf into a single ByteString.
49+
* The lifecycle of the wrapped ByteString is tied to the ByteBuf.
50+
*
51+
* @param byteBuf ByteBuf to wrap
52+
* @return ByteString wrapping the internal buffers of the ByteBuf
53+
*/
54+
public static ByteString byteBufToByteString(ByteBuf byteBuf) {
55+
return byteBufToByteString(null, byteBuf);
56+
}
57+
58+
// internal method to aggregate a ByteBuf into a single aggregated ByteString
59+
private static ByteString byteBufToByteString(ByteString aggregated, ByteBuf byteBuf) {
60+
if (byteBuf.readableBytes() == 0) {
61+
return ByteString.EMPTY;
62+
}
63+
if (byteBuf.nioBufferCount() > 1) {
64+
for (ByteBuffer nioBuffer : byteBuf.nioBuffers()) {
65+
ByteString piece = UnsafeByteOperations.unsafeWrap(nioBuffer);
66+
aggregated = (aggregated == null) ? piece : aggregated.concat(piece);
67+
}
68+
} else {
69+
ByteString piece;
70+
if (byteBuf.hasArray()) {
71+
piece = UnsafeByteOperations.unsafeWrap(byteBuf.array(), byteBuf.arrayOffset() + byteBuf.readerIndex(),
72+
byteBuf.readableBytes());
73+
} else {
74+
piece = UnsafeByteOperations.unsafeWrap(byteBuf.nioBuffer());
75+
}
76+
aggregated = (aggregated == null) ? piece : aggregated.concat(piece);
77+
}
78+
return aggregated;
79+
}
80+
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Lines changed: 48 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -699,14 +699,10 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB
699699
.setVersion(ProtocolVersion.VERSION_THREE)
700700
.setOperation(OperationType.WRITE_LAC)
701701
.setTxnId(txnId);
702-
ByteString body;
703-
if (toSend.hasArray()) {
704-
body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
705-
} else if (toSend.size() == 1) {
706-
body = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer());
707-
} else {
708-
body = UnsafeByteOperations.unsafeWrap(toSend.toArray());
709-
}
702+
ByteString body = ByteStringUtil.byteBufListToByteString(toSend);
703+
toSend.retain();
704+
Runnable cleanupActionFailedBeforeWrite = toSend::release;
705+
Runnable cleanupActionAfterWrite = cleanupActionFailedBeforeWrite;
710706
WriteLacRequest.Builder writeLacBuilder = WriteLacRequest.newBuilder()
711707
.setLedgerId(ledgerId)
712708
.setLac(lac)
@@ -717,7 +713,8 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB
717713
.setHeader(headerBuilder)
718714
.setWriteLacRequest(writeLacBuilder)
719715
.build();
720-
writeAndFlush(channel, completionKey, writeLacRequest);
716+
writeAndFlush(channel, completionKey, writeLacRequest, false, cleanupActionFailedBeforeWrite,
717+
cleanupActionAfterWrite);
721718
}
722719

723720
void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
@@ -776,6 +773,8 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
776773
Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags) {
777774
Object request = null;
778775
CompletionKey completionKey = null;
776+
Runnable cleanupActionFailedBeforeWrite = null;
777+
Runnable cleanupActionAfterWrite = null;
779778
if (useV2WireProtocol) {
780779
if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
781780
LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
@@ -785,9 +784,14 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
785784
completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
786785

787786
if (toSend instanceof ByteBuf) {
788-
request = ((ByteBuf) toSend).retainedDuplicate();
787+
ByteBuf byteBuf = ((ByteBuf) toSend).retainedDuplicate();
788+
request = byteBuf;
789+
cleanupActionFailedBeforeWrite = byteBuf::release;
789790
} else {
790-
request = ByteBufList.clone((ByteBufList) toSend);
791+
ByteBufList byteBufList = (ByteBufList) toSend;
792+
byteBufList.retain();
793+
request = byteBufList;
794+
cleanupActionFailedBeforeWrite = byteBufList::release;
791795
}
792796
} else {
793797
final long txnId = getTxnId();
@@ -802,19 +806,11 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
802806
headerBuilder.setPriority(DEFAULT_HIGH_PRIORITY_VALUE);
803807
}
804808

805-
ByteString body = null;
806809
ByteBufList bufToSend = (ByteBufList) toSend;
807-
808-
if (bufToSend.hasArray()) {
809-
body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), bufToSend.arrayOffset(),
810-
bufToSend.readableBytes());
811-
} else {
812-
for (int i = 0; i < bufToSend.size(); i++) {
813-
ByteString piece = UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer());
814-
// use ByteString.concat to avoid byte[] allocation when toSend has multiple ByteBufs
815-
body = (body == null) ? piece : body.concat(piece);
816-
}
817-
}
810+
ByteString body = ByteStringUtil.byteBufListToByteString(bufToSend);
811+
bufToSend.retain();
812+
cleanupActionFailedBeforeWrite = bufToSend::release;
813+
cleanupActionAfterWrite = cleanupActionFailedBeforeWrite;
818814
AddRequest.Builder addBuilder = AddRequest.newBuilder()
819815
.setLedgerId(ledgerId)
820816
.setEntryId(entryId)
@@ -839,17 +835,9 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
839835
putCompletionKeyValue(completionKey,
840836
acquireAddCompletion(completionKey,
841837
cb, ctx, ledgerId, entryId));
842-
final Channel c = channel;
843-
if (c == null) {
844-
// Manually release the binary data(variable "request") that we manually created when it can not be sent out
845-
// because the channel is switching.
846-
errorOut(completionKey);
847-
ReferenceCountUtil.release(request);
848-
return;
849-
} else {
850-
// addEntry times out on backpressure
851-
writeAndFlush(c, completionKey, request, allowFastFail);
852-
}
838+
// addEntry times out on backpressure
839+
writeAndFlush(channel, completionKey, request, allowFastFail, cleanupActionFailedBeforeWrite,
840+
cleanupActionAfterWrite);
853841
}
854842

855843
public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
@@ -1004,7 +992,7 @@ private void readEntryInternal(final long ledgerId,
1004992
ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId);
1005993
putCompletionKeyValue(completionKey, readCompletion);
1006994

1007-
writeAndFlush(channel, completionKey, request, allowFastFail);
995+
writeAndFlush(channel, completionKey, request, allowFastFail, null, null);
1008996
}
1009997

1010998
public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) {
@@ -1126,17 +1114,20 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
11261114
private void writeAndFlush(final Channel channel,
11271115
final CompletionKey key,
11281116
final Object request) {
1129-
writeAndFlush(channel, key, request, false);
1117+
writeAndFlush(channel, key, request, false, null, null);
11301118
}
11311119

11321120
private void writeAndFlush(final Channel channel,
11331121
final CompletionKey key,
11341122
final Object request,
1135-
final boolean allowFastFail) {
1123+
final boolean allowFastFail, final Runnable cleanupActionFailedBeforeWrite,
1124+
final Runnable cleanupActionAfterWrite) {
11361125
if (channel == null) {
11371126
LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
11381127
errorOut(key);
1139-
ReferenceCountUtil.release(request);
1128+
if (cleanupActionFailedBeforeWrite != null) {
1129+
cleanupActionFailedBeforeWrite.run();
1130+
}
11401131
return;
11411132
}
11421133

@@ -1151,31 +1142,39 @@ private void writeAndFlush(final Channel channel,
11511142
StringUtils.requestToString(request));
11521143

11531144
errorOut(key, BKException.Code.TooManyRequestsException);
1154-
ReferenceCountUtil.release(request);
1145+
if (cleanupActionFailedBeforeWrite != null) {
1146+
cleanupActionFailedBeforeWrite.run();
1147+
}
11551148
return;
11561149
}
11571150

11581151
try {
11591152
final long startTime = MathUtils.nowInNano();
11601153

11611154
ChannelPromise promise = channel.newPromise().addListener(future -> {
1162-
if (future.isSuccess()) {
1163-
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
1164-
CompletionValue completion = completionObjects.get(key);
1165-
if (completion != null) {
1166-
completion.setOutstanding();
1155+
try {
1156+
if (future.isSuccess()) {
1157+
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
1158+
CompletionValue completion = completionObjects.get(key);
1159+
if (completion != null) {
1160+
completion.setOutstanding();
1161+
}
1162+
} else {
1163+
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
1164+
}
1165+
} finally {
1166+
if (cleanupActionAfterWrite != null) {
1167+
cleanupActionAfterWrite.run();
11671168
}
1168-
} else {
1169-
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
11701169
}
11711170
});
11721171
channel.writeAndFlush(request, promise);
11731172
} catch (Throwable e) {
11741173
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
11751174
errorOut(key);
1176-
// If the request goes into the writeAndFlush, it should be handled well by Netty. So all the exceptions we
1177-
// get here, we can release the request.
1178-
ReferenceCountUtil.release(request);
1175+
if (cleanupActionFailedBeforeWrite != null) {
1176+
cleanupActionFailedBeforeWrite.run();
1177+
}
11791178
}
11801179
}
11811180

0 commit comments

Comments
 (0)