Skip to content

Commit 98bc499

Browse files
committed
fix: reference counts in PerChannelBookieClient
1 parent a9aae83 commit 98bc499

File tree

1 file changed

+41
-25
lines changed

1 file changed

+41
-25
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,8 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
777777
Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags) {
778778
Object request = null;
779779
CompletionKey completionKey = null;
780+
Runnable cleanupActionBeforeWrite = null;
781+
Runnable cleanupActionAfterWrite = null;
780782
if (useV2WireProtocol) {
781783
if (writeFlags.contains(WriteFlag.DEFERRED_SYNC)) {
782784
LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
@@ -786,9 +788,13 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
786788
completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
787789

788790
if (toSend instanceof ByteBuf) {
789-
request = ((ByteBuf) toSend).retainedDuplicate();
791+
ByteBuf byteBuf = ((ByteBuf) toSend).retainedDuplicate();
792+
request = byteBuf;
793+
cleanupActionBeforeWrite = byteBuf::release;
790794
} else {
791-
request = ByteBufList.clone((ByteBufList) toSend);
795+
ByteBufList byteBufList = ByteBufList.clone((ByteBufList) toSend);
796+
request = byteBufList;
797+
cleanupActionBeforeWrite = byteBufList::release;
792798
}
793799
} else {
794800
final long txnId = getTxnId();
@@ -805,7 +811,9 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
805811

806812
ByteString body = null;
807813
ByteBufList bufToSend = (ByteBufList) toSend;
808-
814+
bufToSend.retain();
815+
cleanupActionBeforeWrite = bufToSend::release;
816+
cleanupActionAfterWrite = cleanupActionBeforeWrite;
809817
if (bufToSend.hasArray()) {
810818
body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), bufToSend.arrayOffset(),
811819
bufToSend.readableBytes());
@@ -840,17 +848,9 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
840848
putCompletionKeyValue(completionKey,
841849
acquireAddCompletion(completionKey,
842850
cb, ctx, ledgerId, entryId));
843-
final Channel c = channel;
844-
if (c == null) {
845-
// usually checked in writeAndFlush, but we have extra check
846-
// because we need to release toSend.
847-
errorOut(completionKey);
848-
ReferenceCountUtil.release(toSend);
849-
return;
850-
} else {
851-
// addEntry times out on backpressure
852-
writeAndFlush(c, completionKey, request, allowFastFail);
853-
}
851+
// addEntry times out on backpressure
852+
writeAndFlush(channel, completionKey, request, allowFastFail, cleanupActionBeforeWrite,
853+
cleanupActionAfterWrite);
854854
}
855855

856856
public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
@@ -1005,7 +1005,7 @@ private void readEntryInternal(final long ledgerId,
10051005
ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId);
10061006
putCompletionKeyValue(completionKey, readCompletion);
10071007

1008-
writeAndFlush(channel, completionKey, request, allowFastFail);
1008+
writeAndFlush(channel, completionKey, request, allowFastFail, null, null);
10091009
}
10101010

10111011
public void batchReadEntries(final long ledgerId,
@@ -1048,7 +1048,7 @@ private void batchReadEntriesInternal(final long ledgerId,
10481048
completionKey, cb, ctx, ledgerId, startEntryId);
10491049
putCompletionKeyValue(completionKey, readCompletion);
10501050

1051-
writeAndFlush(channel, completionKey, request, allowFastFail);
1051+
writeAndFlush(channel, completionKey, request, allowFastFail, null, null);
10521052
}
10531053

10541054
public void getBookieInfo(final long requested, GetBookieInfoCallback cb, Object ctx) {
@@ -1170,16 +1170,20 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
11701170
private void writeAndFlush(final Channel channel,
11711171
final CompletionKey key,
11721172
final Object request) {
1173-
writeAndFlush(channel, key, request, false);
1173+
writeAndFlush(channel, key, request, false, null, null);
11741174
}
11751175

11761176
private void writeAndFlush(final Channel channel,
11771177
final CompletionKey key,
11781178
final Object request,
1179-
final boolean allowFastFail) {
1179+
final boolean allowFastFail, final Runnable cleanupActionBeforeWrite,
1180+
final Runnable cleanupActionAfterWrite) {
11801181
if (channel == null) {
11811182
LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
11821183
errorOut(key);
1184+
if (cleanupActionBeforeWrite != null) {
1185+
cleanupActionBeforeWrite.run();
1186+
}
11831187
return;
11841188
}
11851189

@@ -1194,27 +1198,39 @@ private void writeAndFlush(final Channel channel,
11941198
StringUtils.requestToString(request));
11951199

11961200
errorOut(key, BKException.Code.TooManyRequestsException);
1201+
if (cleanupActionBeforeWrite != null) {
1202+
cleanupActionBeforeWrite.run();
1203+
}
11971204
return;
11981205
}
11991206

12001207
try {
12011208
final long startTime = MathUtils.nowInNano();
12021209

12031210
ChannelPromise promise = channel.newPromise().addListener(future -> {
1204-
if (future.isSuccess()) {
1205-
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
1206-
CompletionValue completion = completionObjects.get(key);
1207-
if (completion != null) {
1208-
completion.setOutstanding();
1211+
try {
1212+
if (future.isSuccess()) {
1213+
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
1214+
CompletionValue completion = completionObjects.get(key);
1215+
if (completion != null) {
1216+
completion.setOutstanding();
1217+
}
1218+
} else {
1219+
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
1220+
}
1221+
} finally {
1222+
if (cleanupActionAfterWrite != null) {
1223+
cleanupActionAfterWrite.run();
12091224
}
1210-
} else {
1211-
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
12121225
}
12131226
});
12141227
channel.writeAndFlush(request, promise);
12151228
} catch (Throwable e) {
12161229
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
12171230
errorOut(key);
1231+
if (cleanupActionBeforeWrite != null) {
1232+
cleanupActionBeforeWrite.run();
1233+
}
12181234
}
12191235
}
12201236

0 commit comments

Comments
 (0)