Skip to content

Commit 951f10d

Browse files
wenbingshenwenbingshen
andauthored
[client] fix writeLac memory leak and thread safety issue (#4713)
* fix writeLac memory leak --------- Co-authored-by: wenbingshen <wenbingshen@tencent.com>
1 parent be1f6aa commit 951f10d

2 files changed

Lines changed: 194 additions & 12 deletions

File tree

bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingWriteLacOp.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.bookkeeper.client;
1919

20+
import io.netty.util.ReferenceCountUtil;
2021
import java.util.BitSet;
2122
import java.util.List;
2223
import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback;
@@ -37,7 +38,6 @@
3738
*/
3839
class PendingWriteLacOp implements WriteLacCallback {
3940
private static final Logger LOG = LoggerFactory.getLogger(PendingWriteLacOp.class);
40-
ByteBufList toSend;
4141
AddLacCallback cb;
4242
long lac;
4343
Object ctx;
@@ -59,11 +59,11 @@ class PendingWriteLacOp implements WriteLacCallback {
5959
this.cb = cb;
6060
this.ctx = ctx;
6161
this.lac = LedgerHandle.INVALID_ENTRY_ID;
62-
ackSet = lh.distributionSchedule.getAckSet();
62+
ackSet = lh.getDistributionSchedule().getAckSet();
6363
currentEnsemble = ensemble;
6464
}
6565

66-
void setLac(long lac) {
66+
synchronized void setLac(long lac) {
6767
this.lac = lac;
6868

6969
this.receivedResponseSet = new BitSet(
@@ -72,23 +72,29 @@ void setLac(long lac) {
7272
lh.getLedgerMetadata().getWriteQuorumSize());
7373
}
7474

75-
void sendWriteLacRequest(int bookieIndex) {
75+
void sendWriteLacRequest(int bookieIndex, ByteBufList toSend) {
7676
clientCtx.getBookieClient().writeLac(currentEnsemble.get(bookieIndex),
7777
lh.ledgerId, lh.ledgerKey, lac, toSend, this, bookieIndex);
7878
}
7979

8080
void initiate(ByteBufList toSend) {
81-
this.toSend = toSend;
82-
83-
for (int i = 0; i < lh.distributionSchedule.getWriteQuorumSize(); i++) {
84-
sendWriteLacRequest(lh.distributionSchedule.getWriteSetBookieIndex(lac, i));
81+
try {
82+
for (int i = 0; i < lh.getDistributionSchedule().getWriteQuorumSize(); i++) {
83+
sendWriteLacRequest(lh.getDistributionSchedule().getWriteSetBookieIndex(lac, i), toSend);
84+
}
85+
} finally {
86+
ReferenceCountUtil.release(toSend);
8587
}
88+
8689
}
8790

8891
@Override
89-
public void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) {
92+
public synchronized void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) {
9093
int bookieIndex = (Integer) ctx;
9194

95+
// We got response.
96+
receivedResponseSet.clear(bookieIndex);
97+
9298
if (completed) {
9399
return;
94100
}
@@ -97,9 +103,6 @@ public void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) {
97103
lastSeenError = rc;
98104
}
99105

100-
// We got response.
101-
receivedResponseSet.clear(bookieIndex);
102-
103106
if (rc == BKException.Code.OK) {
104107
if (ackSet.completeBookieAndCheck(bookieIndex) && !completed) {
105108
completed = true;
@@ -115,4 +118,5 @@ public void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) {
115118
cb.addLacComplete(lastSeenError, lh, ctx);
116119
}
117120
}
121+
118122
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.bookkeeper.client;
20+
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
23+
import static org.junit.Assert.assertTrue;
24+
import static org.mockito.ArgumentMatchers.any;
25+
import static org.mockito.ArgumentMatchers.anyLong;
26+
import static org.mockito.Mockito.doNothing;
27+
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.when;
29+
30+
import java.util.Arrays;
31+
import org.apache.bookkeeper.client.api.LedgerMetadata;
32+
import org.apache.bookkeeper.net.BookieId;
33+
import org.apache.bookkeeper.proto.BookieClient;
34+
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
35+
import org.apache.bookkeeper.util.ByteBufList;
36+
import org.junit.Before;
37+
import org.junit.Test;
38+
39+
/**
40+
* Unit test of {@link PendingWriteLacOp}.
41+
*/
42+
public class PendingWriteLacOpTest implements AsyncCallback.AddLacCallback {
43+
44+
private LedgerHandle lh;
45+
private ClientContext mockClientContext;
46+
private BookieClient mockBookieClient;
47+
private boolean callbackInvoked;
48+
49+
@Before
50+
public void setup() {
51+
lh = mock(LedgerHandle.class);
52+
mockClientContext = mock(ClientContext.class);
53+
mockBookieClient = mock(BookieClient.class);
54+
doNothing().when(mockBookieClient).writeLac(any(BookieId.class), anyLong(), any(byte[].class), anyLong(),
55+
any(ByteBufList.class), any(BookkeeperInternalCallbacks.WriteLacCallback.class), any(Object.class));
56+
when(mockClientContext.getBookieClient()).thenReturn(mockBookieClient);
57+
callbackInvoked = false;
58+
}
59+
60+
@Test
61+
public void testWriteLacOp332() {
62+
// 3-3-2: ack quorum=2, complete after 2 OK responses, release toSend after 3rd response
63+
when(lh.getDistributionSchedule())
64+
.thenReturn(new RoundRobinDistributionSchedule(3, 2, 3));
65+
66+
LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
67+
when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3);
68+
when(ledgerMetadata.getAckQuorumSize()).thenReturn(2);
69+
when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
70+
71+
PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh, mockClientContext,
72+
lh.getCurrentEnsemble(), this, null);
73+
writeLacOp.setLac(1000);
74+
75+
assertEquals(1000, writeLacOp.lac);
76+
assertFalse(writeLacOp.completed);
77+
assertFalse(writeLacOp.receivedResponseSet.isEmpty());
78+
79+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
80+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 1);
81+
82+
assertTrue(callbackInvoked);
83+
assertTrue(writeLacOp.completed);
84+
assertFalse(writeLacOp.receivedResponseSet.isEmpty());
85+
86+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 2);
87+
assertTrue(writeLacOp.receivedResponseSet.isEmpty());
88+
}
89+
90+
@Test
91+
public void testWriteLacOp333() {
92+
// 3-3-3: ack quorum=3, complete only after all 3 responses
93+
when(lh.getDistributionSchedule())
94+
.thenReturn(new RoundRobinDistributionSchedule(3, 3, 3));
95+
96+
LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
97+
when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3);
98+
when(ledgerMetadata.getAckQuorumSize()).thenReturn(3);
99+
when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
100+
101+
PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh, mockClientContext,
102+
lh.getCurrentEnsemble(), this, null);
103+
writeLacOp.setLac(1000);
104+
105+
assertEquals(1000, writeLacOp.lac);
106+
assertFalse(writeLacOp.completed);
107+
assertFalse(writeLacOp.receivedResponseSet.isEmpty());
108+
109+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
110+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 1);
111+
112+
assertFalse(callbackInvoked);
113+
assertFalse(writeLacOp.completed);
114+
assertFalse(writeLacOp.receivedResponseSet.isEmpty());
115+
116+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 2);
117+
assertTrue(callbackInvoked);
118+
assertTrue(writeLacOp.completed);
119+
assertTrue(writeLacOp.receivedResponseSet.isEmpty());
120+
}
121+
122+
@Test
123+
public void testWriteLacOp111() {
124+
// 1-1-1: single bookie, complete immediately on first response
125+
when(lh.getDistributionSchedule())
126+
.thenReturn(new RoundRobinDistributionSchedule(1, 1, 1));
127+
128+
LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
129+
when(ledgerMetadata.getWriteQuorumSize()).thenReturn(1);
130+
when(ledgerMetadata.getAckQuorumSize()).thenReturn(1);
131+
when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
132+
133+
PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh, mockClientContext,
134+
lh.getCurrentEnsemble(), this, null);
135+
writeLacOp.setLac(1000);
136+
137+
assertFalse(writeLacOp.completed);
138+
assertFalse(writeLacOp.receivedResponseSet.isEmpty());
139+
140+
writeLacOp.writeLacComplete(BKException.Code.OK, 2000, null, 0);
141+
142+
assertTrue(callbackInvoked);
143+
assertTrue(writeLacOp.completed);
144+
assertTrue(writeLacOp.receivedResponseSet.isEmpty());
145+
}
146+
147+
@Test
148+
public void testInitiateReleasesBuffer() {
149+
// Verify toSend buffer is released by initiate() after all requests are sent
150+
when(lh.getDistributionSchedule())
151+
.thenReturn(new RoundRobinDistributionSchedule(3, 2, 3));
152+
153+
LedgerMetadata ledgerMetadata = mock(LedgerMetadata.class);
154+
when(ledgerMetadata.getWriteQuorumSize()).thenReturn(3);
155+
when(ledgerMetadata.getAckQuorumSize()).thenReturn(2);
156+
when(lh.getLedgerMetadata()).thenReturn(ledgerMetadata);
157+
when(lh.getCurrentEnsemble()).thenReturn(Arrays.asList(BookieId.parse("bookie1"),
158+
BookieId.parse("bookie2"), BookieId.parse("bookie3")));
159+
160+
PendingWriteLacOp writeLacOp = new PendingWriteLacOp(lh, mockClientContext,
161+
lh.getCurrentEnsemble(), this, null);
162+
163+
writeLacOp.setLac(1000);
164+
165+
ByteBufList toSend = ByteBufList.get();
166+
assertEquals(1, toSend.refCnt());
167+
168+
writeLacOp.initiate(toSend);
169+
170+
// After initiate(), the caller's reference should be released
171+
assertEquals(0, toSend.refCnt());
172+
}
173+
174+
@Override
175+
public synchronized void addLacComplete(int rc, LedgerHandle lh, Object ctx) {
176+
callbackInvoked = true;
177+
}
178+
}

0 commit comments

Comments
 (0)