Skip to content

Commit 63d4cf2

Browse files
authored
ManagedLedger: move to FENCED state in case of BadVersionException (#17736)
1 parent a8b265d commit 63d4cf2

6 files changed

Lines changed: 189 additions & 31 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ public ManagedLedgerFencedException() {
8080
super(new Exception("Attempted to use a fenced managed ledger"));
8181
}
8282

83+
public ManagedLedgerFencedException(String message) {
84+
super(message);
85+
}
86+
8387
public ManagedLedgerFencedException(Exception e) {
8488
super(e);
8589
}

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
431431

432432
@Override
433433
public void operationFailed(MetaStoreException e) {
434+
handleBadVersion(e);
434435
if (e instanceof MetadataNotFoundException) {
435436
callback.initializeFailed(new ManagedLedgerNotFoundException(e));
436437
} else {
@@ -481,6 +482,7 @@ public void operationComplete(Void v, Stat stat) {
481482

482483
@Override
483484
public void operationFailed(MetaStoreException e) {
485+
handleBadVersion(e);
484486
callback.initializeFailed(new ManagedLedgerException(e));
485487
}
486488
};
@@ -1022,6 +1024,7 @@ public void operationComplete(Void result, Stat stat) {
10221024

10231025
@Override
10241026
public void operationFailed(MetaStoreException e) {
1027+
handleBadVersion(e);
10251028
callback.deleteCursorFailed(e, ctx);
10261029
}
10271030

@@ -1312,6 +1315,7 @@ public void operationComplete(Void result, Stat stat) {
13121315
@Override
13131316
public void operationFailed(MetaStoreException e) {
13141317
log.error("[{}] Failed to terminate managed ledger: {}", name, e.getMessage());
1318+
handleBadVersion(e);
13151319
callback.terminateFailed(new ManagedLedgerException(e), ctx);
13161320
}
13171321
});
@@ -1396,6 +1400,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
13961400
public synchronized void asyncClose(final CloseCallback callback, final Object ctx) {
13971401
State state = STATE_UPDATER.get(this);
13981402
if (state == State.Fenced) {
1403+
cancelScheduledTasks();
13991404
factory.close(this);
14001405
callback.closeFailed(new ManagedLedgerFencedException(), ctx);
14011406
return;
@@ -1519,6 +1524,7 @@ public void operationComplete(Void v, Stat stat) {
15191524
@Override
15201525
public void operationFailed(MetaStoreException e) {
15211526
log.warn("[{}] Error updating meta data with the new list of ledgers: {}", name, e.getMessage());
1527+
handleBadVersion(e);
15221528
mbean.startDataLedgerDeleteOp();
15231529
bookKeeper.asyncDeleteLedger(lh.getId(), (rc1, ctx1) -> {
15241530
mbean.endDataLedgerDeleteOp();
@@ -1527,14 +1533,12 @@ public void operationFailed(MetaStoreException e) {
15271533
BKException.getMessage(rc1));
15281534
}
15291535
}, null);
1530-
15311536
if (e instanceof BadVersionException) {
15321537
synchronized (ManagedLedgerImpl.this) {
15331538
log.error(
15341539
"[{}] Failed to update ledger list. z-node version mismatch. Closing managed ledger",
15351540
name);
15361541
lastLedgerCreationFailureTimestamp = clock.millis();
1537-
STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced);
15381542
// Return ManagedLedgerFencedException to addFailed callback
15391543
// to indicate that the ledger is now fenced and topic needs to be closed
15401544
clearPendingAddEntries(new ManagedLedgerFencedException(e));
@@ -1557,6 +1561,12 @@ public void operationFailed(MetaStoreException e) {
15571561
updateLedgersListAfterRollover(cb, newLedger);
15581562
}
15591563
}
1564+
1565+
private void handleBadVersion(Throwable e) {
1566+
if (e instanceof BadVersionException) {
1567+
setFenced();
1568+
}
1569+
}
15601570
private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback, LedgerInfo newLedger) {
15611571
if (!metadataMutex.tryLock()) {
15621572
// Defer update for later
@@ -2463,12 +2473,19 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
24632473
log.debug("[{}] Start TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.keySet(),
24642474
TOTAL_SIZE_UPDATER.get(this));
24652475
}
2466-
if (STATE_UPDATER.get(this) == State.Closed) {
2476+
State currentState = STATE_UPDATER.get(this);
2477+
if (currentState == State.Closed) {
24672478
log.debug("[{}] Ignoring trimming request since the managed ledger was already closed", name);
24682479
trimmerMutex.unlock();
24692480
promise.completeExceptionally(new ManagedLedgerAlreadyClosedException("Can't trim closed ledger"));
24702481
return;
24712482
}
2483+
if (currentState == State.Fenced) {
2484+
log.debug("[{}] Ignoring trimming request since the managed ledger was already fenced", name);
2485+
trimmerMutex.unlock();
2486+
promise.completeExceptionally(new ManagedLedgerFencedException("Can't trim fenced ledger"));
2487+
return;
2488+
}
24722489

24732490
long slowestReaderLedgerId = -1;
24742491
if (!cursors.hasDurableCursors()) {
@@ -2557,7 +2574,7 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
25572574
return;
25582575
}
25592576

2560-
if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming
2577+
if (currentState == State.CreatingLedger // Give up now and schedule a new trimming
25612578
|| !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
25622579
scheduleDeferredTrimming(isTruncate, promise);
25632580
trimmerMutex.unlock();
@@ -2624,6 +2641,7 @@ public void operationFailed(MetaStoreException e) {
26242641
log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
26252642
metadataMutex.unlock();
26262643
trimmerMutex.unlock();
2644+
handleBadVersion(e);
26272645

26282646
promise.completeExceptionally(e);
26292647
}
@@ -2708,7 +2726,7 @@ public void deleteLedgerFailed(ManagedLedgerException e, Object ctx) {
27082726
public void asyncDelete(final DeleteLedgerCallback callback, final Object ctx) {
27092727
// Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and
27102728
// ledgers
2711-
STATE_UPDATER.set(this, State.Fenced);
2729+
setFenced();
27122730
cancelScheduledTasks();
27132731

27142732
List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
@@ -2957,7 +2975,7 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct
29572975
promise.whenComplete((result, exception) -> {
29582976
offloadMutex.unlock();
29592977
if (exception != null) {
2960-
callback.offloadFailed(new ManagedLedgerException(exception), ctx);
2978+
callback.offloadFailed(ManagedLedgerException.getManagedLedgerException(exception), ctx);
29612979
} else {
29622980
callback.offloadComplete(result, ctx);
29632981
}
@@ -2971,11 +2989,17 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct
29712989

29722990
private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
29732991
PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
2974-
if (getState() == State.Closed) {
2992+
State currentState = getState();
2993+
if (currentState == State.Closed) {
29752994
promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
29762995
String.format("managed ledger [%s] has already closed", name)));
29772996
return;
29782997
}
2998+
if (currentState == State.Fenced) {
2999+
promise.completeExceptionally(new ManagedLedgerFencedException(
3000+
String.format("managed ledger [%s] is fenced", name)));
3001+
return;
3002+
}
29793003
LedgerInfo info = ledgersToOffload.poll();
29803004
if (info == null) {
29813005
if (firstError.isPresent()) {
@@ -3117,6 +3141,7 @@ public void operationComplete(Void result, Stat stat) {
31173141

31183142
@Override
31193143
public void operationFailed(MetaStoreException e) {
3144+
handleBadVersion(e);
31203145
unlockingPromise.completeExceptionally(e);
31213146
}
31223147
});
@@ -3639,6 +3664,7 @@ private void checkManagedLedgerIsOpen() throws ManagedLedgerException {
36393664
}
36403665

36413666
synchronized void setFenced() {
3667+
log.info("{} Moving to Fenced state", name);
36423668
STATE_UPDATER.set(this, State.Fenced);
36433669
}
36443670

@@ -3842,12 +3868,21 @@ private void scheduleTimeoutTask() {
38423868
? Math.max(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds())
38433869
: timeoutSec;
38443870
this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
3845-
checkAddTimeout();
3846-
checkReadTimeout();
3871+
checkTimeouts();
38473872
}), timeoutSec, timeoutSec, TimeUnit.SECONDS);
38483873
}
38493874
}
38503875

3876+
private void checkTimeouts() {
3877+
final State state = STATE_UPDATER.get(this);
3878+
if (state == State.Closed
3879+
|| state == State.Fenced) {
3880+
return;
3881+
}
3882+
checkAddTimeout();
3883+
checkReadTimeout();
3884+
}
3885+
38513886
private void checkAddTimeout() {
38523887
long timeoutSec = config.getAddEntryTimeoutSeconds();
38533888
if (timeoutSec < 1) {
@@ -4004,6 +4039,7 @@ public void operationComplete(Void result, Stat version) {
40044039
@Override
40054040
public void operationFailed(MetaStoreException e) {
40064041
log.error("[{}] Update managedLedger's properties failed", name, e);
4042+
handleBadVersion(e);
40074043
callback.updatePropertiesFailed(e, ctx);
40084044
metadataMutex.unlock();
40094045
}

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818
*/
1919
package org.apache.bookkeeper.mledger.impl;
2020

21+
import static org.hamcrest.CoreMatchers.instanceOf;
22+
import static org.hamcrest.MatcherAssert.assertThat;
2123
import static org.testng.Assert.assertEquals;
2224
import static org.testng.Assert.assertFalse;
2325
import static org.testng.Assert.assertNull;
2426
import static org.testng.Assert.assertTrue;
27+
import static org.testng.Assert.expectThrows;
2528
import static org.testng.Assert.fail;
2629
import io.netty.buffer.ByteBuf;
2730
import java.util.List;
2831
import java.util.concurrent.CompletableFuture;
2932
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.ExecutionException;
3034
import java.util.concurrent.atomic.AtomicReference;
3135
import lombok.Cleanup;
3236
import org.apache.bookkeeper.client.BKException;
@@ -387,6 +391,72 @@ public void recoverAfterZnodeVersionError() throws Exception {
387391
}
388392
}
389393

394+
@Test
395+
public void recoverAfterZnodeVersionErrorWhileTrimming() throws Exception {
396+
ManagedLedger ledger = factory.open("my_test_ledger_trim",
397+
new ManagedLedgerConfig()
398+
.setMaxEntriesPerLedger(2));
399+
ledger.addEntry("test".getBytes());
400+
ledger.addEntry("test".getBytes());
401+
ledger.addEntry("test".getBytes());
402+
403+
metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
404+
path.equals("/managed-ledgers/my_test_ledger_trim")
405+
&& op == FaultInjectionMetadataStore.OperationType.PUT
406+
);
407+
408+
CompletableFuture<?> handle = new CompletableFuture<>();
409+
ledger.trimConsumedLedgersInBackground(handle);
410+
assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(),
411+
instanceOf(ManagedLedgerException.BadVersionException.class));
412+
413+
assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState());
414+
415+
// if the task started after the ML moved to Fenced state, it must fail
416+
CompletableFuture<?> handleAlreadyFenced = new CompletableFuture<>();
417+
ledger.trimConsumedLedgersInBackground(handleAlreadyFenced);
418+
assertThat(expectThrows(ExecutionException.class, () -> handleAlreadyFenced.get()).getCause(),
419+
instanceOf(ManagedLedgerException.ManagedLedgerFencedException.class));
420+
421+
try {
422+
ledger.addEntry("entry".getBytes());
423+
fail("should fail");
424+
} catch (ManagedLedgerFencedException e) {
425+
assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage());
426+
}
427+
428+
assertFalse(factory.ledgers.isEmpty());
429+
try {
430+
ledger.close();
431+
} catch (ManagedLedgerFencedException e) {
432+
assertEquals("Attempted to use a fenced managed ledger", e.getCause().getMessage());
433+
}
434+
435+
// verify that the ManagedLedger has been unregistered even if it was fenced
436+
assertTrue(factory.ledgers.isEmpty());
437+
}
438+
439+
@Test
440+
public void badVersionErrorDuringTruncateLedger() throws Exception {
441+
ManagedLedger ledger = factory.open("my_test_ledger_trim",
442+
new ManagedLedgerConfig()
443+
.setMaxEntriesPerLedger(2));
444+
ledger.addEntry("test".getBytes());
445+
ledger.addEntry("test".getBytes());
446+
ledger.addEntry("test".getBytes());
447+
448+
metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
449+
path.equals("/managed-ledgers/my_test_ledger_trim")
450+
&& op == FaultInjectionMetadataStore.OperationType.PUT
451+
);
452+
453+
CompletableFuture<?> handle = ledger.asyncTruncate();
454+
assertThat(expectThrows(ExecutionException.class, () -> handle.get()).getCause(),
455+
instanceOf(ManagedLedgerException.BadVersionException.class));
456+
457+
assertEquals(ManagedLedgerImpl.State.Fenced, ((ManagedLedgerImpl) ledger).getState());
458+
}
459+
390460
@Test
391461
public void recoverAfterWriteError() throws Exception {
392462
ManagedLedger ledger = factory.open("my_test_ledger");

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.testng.Assert.assertEquals;
2222
import static org.testng.Assert.assertFalse;
2323
import static org.testng.Assert.assertNotEquals;
24+
import static org.testng.Assert.assertThrows;
2425
import static org.testng.Assert.assertTrue;
2526
import static org.testng.Assert.fail;
2627
import java.lang.reflect.Field;
@@ -48,6 +49,8 @@
4849
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
4950
import org.apache.commons.lang3.tuple.Pair;
5051
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
52+
import org.apache.pulsar.metadata.api.MetadataStoreException;
53+
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
5154
import org.slf4j.Logger;
5255
import org.slf4j.LoggerFactory;
5356
import org.testng.annotations.Test;
@@ -125,6 +128,51 @@ public void testOffload() throws Exception {
125128
.filter(e -> e.getOffloadContext().getComplete())
126129
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
127130
offloader.offloadedLedgers());
131+
132+
// ledgers should be marked as offloaded
133+
ledger.getLedgersInfoAsList().stream().allMatch(l -> l.hasOffloadContext());
134+
}
135+
136+
@Test
137+
public void testOffloadFenced() throws Exception {
138+
MockLedgerOffloader offloader = new MockLedgerOffloader();
139+
ManagedLedgerConfig config = new ManagedLedgerConfig();
140+
config.setMaxEntriesPerLedger(10);
141+
config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
142+
config.setRetentionTime(10, TimeUnit.MINUTES);
143+
config.setRetentionSizeInMB(10);
144+
config.setLedgerOffloader(offloader);
145+
ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
146+
147+
int i = 0;
148+
for (; i < 25; i++) {
149+
String content = "entry-" + i;
150+
ledger.addEntry(content.getBytes());
151+
}
152+
assertEquals(ledger.getLedgersInfoAsList().size(), 3);
153+
154+
metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
155+
path.equals("/managed-ledgers/my_test_ledger")
156+
&& op == FaultInjectionMetadataStore.OperationType.PUT
157+
);
158+
159+
assertThrows(ManagedLedgerException.ManagedLedgerFencedException.class, () ->
160+
ledger.offloadPrefix(ledger.getLastConfirmedEntry()));
161+
162+
assertEquals(ledger.getLedgersInfoAsList().size(), 3);
163+
164+
// the offloader actually wrote the data on the storage
165+
assertEquals(ledger.getLedgersInfoAsList().stream()
166+
.filter(e -> e.getOffloadContext().getComplete())
167+
.map(e -> e.getLedgerId()).collect(Collectors.toSet()),
168+
offloader.offloadedLedgers());
169+
170+
// but the ledgers should not be marked as offloaded in local memory, as the write to metadata failed
171+
ledger.getLedgersInfoAsList().stream().allMatch(l -> !l.hasOffloadContext());
172+
173+
// check that the ledger is fenced
174+
assertEquals(ManagedLedgerImpl.State.Fenced, ledger.getState());
175+
128176
}
129177

130178
@Test

0 commit comments

Comments
 (0)