Skip to content

Commit 02a06c4

Browse files
poorbarcodemerlimat
authored andcommitted
[fix] Fix ByteBuf release/retain in PerChannelBookClient (#4289)
* [fix] ByteBuf release/retain incorrect * improve the code comment * fix other cases * modify the code comment * improve the code * improve the test * add description
1 parent 996b5d4 commit 02a06c4

File tree

2 files changed

+179
-3
lines changed

2 files changed

+179
-3
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -842,10 +842,10 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, Referen
842842
cb, ctx, ledgerId, entryId));
843843
final Channel c = channel;
844844
if (c == null) {
845-
// usually checked in writeAndFlush, but we have extra check
846-
// because we need to release toSend.
845+
// Manually release the binary data(variable "request") that we manually created when it can not be sent out
846+
// because the channel is switching.
847847
errorOut(completionKey);
848-
ReferenceCountUtil.release(toSend);
848+
ReferenceCountUtil.release(request);
849849
return;
850850
} else {
851851
// addEntry times out on backpressure
@@ -1180,6 +1180,7 @@ private void writeAndFlush(final Channel channel,
11801180
if (channel == null) {
11811181
LOG.warn("Operation {} failed: channel == null", StringUtils.requestToString(request));
11821182
errorOut(key);
1183+
ReferenceCountUtil.release(request);
11831184
return;
11841185
}
11851186

@@ -1194,6 +1195,7 @@ private void writeAndFlush(final Channel channel,
11941195
StringUtils.requestToString(request));
11951196

11961197
errorOut(key, BKException.Code.TooManyRequestsException);
1198+
ReferenceCountUtil.release(request);
11971199
return;
11981200
}
11991201

@@ -1215,6 +1217,9 @@ private void writeAndFlush(final Channel channel,
12151217
} catch (Throwable e) {
12161218
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
12171219
errorOut(key);
1220+
// If the request goes into the writeAndFlush, it should be handled well by Netty. So all the exceptions we
1221+
// get here, we can release the request.
1222+
ReferenceCountUtil.release(request);
12181223
}
12191224
}
12201225

bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,31 @@
2727
import io.netty.buffer.ByteBuf;
2828
import io.netty.buffer.ByteBufAllocator;
2929
import io.netty.buffer.ByteBufUtil;
30+
import io.netty.buffer.PooledByteBufAllocator;
3031
import io.netty.buffer.Unpooled;
3132
import io.netty.buffer.UnpooledByteBufAllocator;
33+
import io.netty.channel.Channel;
3234
import io.netty.channel.EventLoopGroup;
3335
import io.netty.channel.nio.NioEventLoopGroup;
3436
import io.netty.util.ReferenceCounted;
3537
import io.netty.util.concurrent.DefaultThreadFactory;
3638
import java.io.File;
3739
import java.io.IOException;
3840
import java.nio.ByteBuffer;
41+
import java.time.Duration;
3942
import java.util.Arrays;
4043
import java.util.concurrent.CountDownLatch;
4144
import java.util.concurrent.Executors;
4245
import java.util.concurrent.ScheduledExecutorService;
46+
import java.util.concurrent.atomic.AtomicBoolean;
4347
import java.util.concurrent.atomic.AtomicInteger;
4448
import java.util.concurrent.atomic.AtomicReference;
49+
import lombok.extern.slf4j.Slf4j;
4550
import org.apache.bookkeeper.bookie.MockUncleanShutdownDetection;
4651
import org.apache.bookkeeper.bookie.TestBookieImpl;
4752
import org.apache.bookkeeper.client.BKException;
4853
import org.apache.bookkeeper.client.BKException.Code;
54+
import org.apache.bookkeeper.client.BookKeeper;
4955
import org.apache.bookkeeper.client.BookKeeperClientStats;
5056
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
5157
import org.apache.bookkeeper.client.api.WriteFlag;
@@ -57,27 +63,32 @@
5763
import org.apache.bookkeeper.net.BookieSocketAddress;
5864
import org.apache.bookkeeper.proto.BookieClient;
5965
import org.apache.bookkeeper.proto.BookieClientImpl;
66+
import org.apache.bookkeeper.proto.BookieProtoEncoding;
6067
import org.apache.bookkeeper.proto.BookieProtocol;
6168
import org.apache.bookkeeper.proto.BookieServer;
6269
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
6370
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
6471
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
6572
import org.apache.bookkeeper.proto.BookkeeperProtocol;
6673
import org.apache.bookkeeper.proto.DataFormats;
74+
import org.apache.bookkeeper.proto.PerChannelBookieClient;
75+
import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
6776
import org.apache.bookkeeper.proto.checksum.DigestManager;
6877
import org.apache.bookkeeper.stats.NullStatsLogger;
6978
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
7079
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
7180
import org.apache.bookkeeper.util.ByteBufList;
7281
import org.apache.bookkeeper.util.IOUtils;
7382
import org.awaitility.Awaitility;
83+
import org.awaitility.reflect.WhiteboxImpl;
7484
import org.junit.After;
7585
import org.junit.Before;
7686
import org.junit.Test;
7787

7888
/**
7989
* Test the bookie client.
8090
*/
91+
@Slf4j
8192
public class BookieClientTest {
8293
BookieServer bs;
8394
File tmpDir;
@@ -745,4 +756,164 @@ public void testBatchedReadWithMaxSizeLimitCase2() throws Exception {
745756
assertTrue(Arrays.equals(kbData, bytes));
746757
}
747758
}
759+
760+
/**
761+
* Explain the stacks of "BookieClientImpl.addEntry" here
762+
* 1.`BookieClientImpl.addEntry`.
763+
* a.Retain the `ByteBuf` before get `PerChannelBookieClient`. We call this `ByteBuf` as `toSend` in the
764+
* following sections. `toSend.recCnf` is `2` now.
765+
* 2.`Get PerChannelBookieClient`.
766+
* 3.`ChannelReadyForAddEntryCallback.operationComplete`
767+
* a.`PerChannelBookieClient.addEntry`
768+
* a-1.Build a new ByteBuf for request command. We call this `ByteBuf` new as `request` in the following
769+
* sections.
770+
* a-2.`channle.writeAndFlush(request)` or release the ByteBuf when `channel` is switching.
771+
* Note the callback will be called immediately if the channel is switching.
772+
* b.Release the `ByteBuf` since it has been retained at `step 1`. `toSend.recCnf` should be `1` now.
773+
*/
774+
public void testDataRefCnfWhenReconnect(boolean useV2WireProtocol, boolean smallPayload,
775+
boolean withDelayReconnect, boolean withDelayAddEntry,
776+
int tryTimes) throws Exception {
777+
final long ledgerId = 1;
778+
final BookieId addr = bs.getBookieId();
779+
// Build passwd.
780+
byte[] passwd = new byte[20];
781+
Arrays.fill(passwd, (byte) 'a');
782+
// Build digest manager.
783+
DigestManager digestManager = DigestManager.instantiate(1, passwd,
784+
BookKeeper.DigestType.toProtoDigestType(BookKeeper.DigestType.DUMMY),
785+
PooledByteBufAllocator.DEFAULT, useV2WireProtocol);
786+
// Build client.
787+
ClientConfiguration clientConf = new ClientConfiguration();
788+
clientConf.setUseV2WireProtocol(useV2WireProtocol);
789+
BookieClientImpl client = new BookieClientImpl(clientConf, eventLoopGroup,
790+
UnpooledByteBufAllocator.DEFAULT, executor, scheduler, NullStatsLogger.INSTANCE,
791+
BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
792+
793+
// Inject a reconnect event.
794+
// 1. Get the channel that will be used.
795+
// 2. Call add entry.
796+
// 3. Another thread close the channel that is using.
797+
for (int i = 0; i < tryTimes; i++) {
798+
long entryId = i + 1;
799+
long lac = i;
800+
// Build payload.
801+
int payloadLen;
802+
ByteBuf payload;
803+
if (smallPayload) {
804+
payloadLen = 1;
805+
payload = PooledByteBufAllocator.DEFAULT.buffer(1);
806+
payload.writeByte(1);
807+
} else {
808+
payloadLen = BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD;
809+
payload = PooledByteBufAllocator.DEFAULT.buffer();
810+
byte[] bs = new byte[payloadLen];
811+
payload.writeBytes(bs);
812+
}
813+
814+
// Digest.
815+
ReferenceCounted bb = digestManager.computeDigestAndPackageForSending(entryId, lac,
816+
payloadLen * entryId, payload, passwd, BookieProtocol.FLAG_NONE);
817+
log.info("Before send. bb.refCnf: {}", bb.refCnt());
818+
819+
// Step: get the channel that will be used.
820+
PerChannelBookieClientPool perChannelBookieClientPool = client.lookupClient(addr);
821+
AtomicReference<PerChannelBookieClient> perChannelBookieClient = new AtomicReference<>();
822+
perChannelBookieClientPool.obtain((rc, result) -> perChannelBookieClient.set(result), ledgerId);
823+
Awaitility.await().untilAsserted(() -> {
824+
assertNotNull(perChannelBookieClient.get());
825+
});
826+
827+
// Step: Inject a reconnect event.
828+
final int delayMillis = i;
829+
new Thread(() -> {
830+
if (withDelayReconnect) {
831+
sleep(delayMillis);
832+
}
833+
Channel channel = WhiteboxImpl.getInternalState(perChannelBookieClient.get(), "channel");
834+
if (channel != null) {
835+
channel.close();
836+
}
837+
}).start();
838+
if (withDelayAddEntry) {
839+
sleep(delayMillis);
840+
}
841+
842+
// Step: add entry.
843+
AtomicBoolean callbackExecuted = new AtomicBoolean();
844+
WriteCallback callback = (rc, lId, eId, socketAddr, ctx) -> {
845+
log.info("Writing is finished. rc: {}, withDelayReconnect: {}, withDelayAddEntry: {}, ledgerId: {},"
846+
+ " entryId: {}, socketAddr: {}, ctx: {}",
847+
rc, withDelayReconnect, withDelayAddEntry, lId, eId, socketAddr, ctx);
848+
callbackExecuted.set(true);
849+
};
850+
client.addEntry(addr, ledgerId, passwd, entryId, bb, callback, i, BookieProtocol.FLAG_NONE, false,
851+
WriteFlag.NONE);
852+
// Wait for adding entry is finish.
853+
Awaitility.await().untilAsserted(() -> assertTrue(callbackExecuted.get()));
854+
// The steps have be explained on the method description.
855+
// Since the step "3-a-2" always runs before the step "3-b", so the "callbackExecuted" will be finished
856+
// before the step "3-b". Add a sleep to wait the step "3-a-2" is finish.
857+
Thread.sleep(100);
858+
// Check the ref count.
859+
Awaitility.await().atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
860+
assertEquals(1, bb.refCnt());
861+
// V2 will release this original data if it is a small.
862+
if (!useV2WireProtocol && !smallPayload) {
863+
assertEquals(1, payload.refCnt());
864+
}
865+
});
866+
bb.release();
867+
// V2 will release this original data if it is a small.
868+
if (!useV2WireProtocol && !smallPayload) {
869+
payload.release();
870+
}
871+
}
872+
// cleanup.
873+
client.close();
874+
}
875+
876+
private void sleep(int milliSeconds) {
877+
try {
878+
if (milliSeconds > 0) {
879+
Thread.sleep(1);
880+
}
881+
} catch (InterruptedException e) {
882+
log.warn("Error occurs", e);
883+
}
884+
}
885+
886+
/**
887+
* Relate to https://github.com/apache/bookkeeper/pull/4289.
888+
*/
889+
@Test
890+
public void testDataRefCnfWhenReconnectV2() throws Exception {
891+
// Large payload.
892+
// Run this test may not reproduce the issue, you can reproduce the issue this way:
893+
// 1. Add two break points.
894+
// a. At the line "Channel c = channel" in the method PerChannelBookieClient.addEntry.
895+
// b. At the line "channel = null" in the method "PerChannelBookieClient.channelInactive".
896+
// 2. Make the break point b to run earlier than the break point a during debugging.
897+
testDataRefCnfWhenReconnect(true, false, false, false, 10);
898+
testDataRefCnfWhenReconnect(true, false, true, false, 10);
899+
testDataRefCnfWhenReconnect(true, false, false, true, 10);
900+
901+
// Small payload.
902+
// There is no issue without https://github.com/apache/bookkeeper/pull/4289, just add a test for this scenario.
903+
testDataRefCnfWhenReconnect(true, true, false, false, 10);
904+
testDataRefCnfWhenReconnect(true, true, true, false, 10);
905+
testDataRefCnfWhenReconnect(true, true, false, true, 10);
906+
}
907+
908+
/**
909+
* Please see the comment of the scenario "Large payload" in the {@link #testDataRefCnfWhenReconnectV2()} if you
910+
* can not reproduce the issue when running this test.
911+
* Relate to https://github.com/apache/bookkeeper/pull/4289.
912+
*/
913+
@Test
914+
public void testDataRefCnfWhenReconnectV3() throws Exception {
915+
testDataRefCnfWhenReconnect(false, true, false, false, 10);
916+
testDataRefCnfWhenReconnect(false, true, true, false, 10);
917+
testDataRefCnfWhenReconnect(false, true, false, true, 10);
918+
}
748919
}

0 commit comments

Comments
 (0)