diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index c387aa970adb6..ae7251e5d853d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -43,6 +43,7 @@ protected EntryImpl newObject(Handle handle) { private long ledgerId; private long entryId; ByteBuf data; + private boolean skipped; public static EntryImpl create(LedgerEntry ledgerEntry) { EntryImpl entry = RECYCLER.get(); @@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) { return entry; } + public static EntryImpl create(long ledgerId, long entryId, boolean skipped) { + EntryImpl entry = RECYCLER.get(); + entry.timestamp = System.nanoTime(); + entry.ledgerId = ledgerId; + entry.entryId = entryId; + entry.skipped = skipped; + entry.data = Unpooled.wrappedBuffer(new byte[0]); + entry.setRefCnt(1); + return entry; + } + public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) { EntryImpl entry = RECYCLER.get(); entry.timestamp = System.nanoTime(); @@ -146,6 +158,10 @@ public long getEntryId() { return entryId; } + public boolean skipped() { + return skipped; + } + @Override public int compareTo(EntryImpl other) { if (this.ledgerId != other.ledgerId) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 2fd2e02b26ea8..7a7948b0791d7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -109,6 +109,15 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { recycle(); return; } + List skippedEntries = new ArrayList<>(); + PositionImpl startPosition = readPosition; + PositionImpl endPosition = (PositionImpl) nexReadPosition; + while (startPosition.compareTo(endPosition) < 0) { + skippedEntries.add(EntryImpl.create(startPosition.ledgerId, startPosition.entryId, true)); + startPosition = cursor.ledger.getNextValidPosition(startPosition); + } + List filteredEntries = cursor.filterReadEntries(skippedEntries); + entries.addAll(filteredEntries); updateReadPosition(nexReadPosition); checkReadCompletion(); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 3e3fba07eacc4..dcb5a40e9fd29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.service; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import java.util.ArrayList; import java.util.Collections; @@ -31,6 +33,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; @@ -41,6 +44,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; @@ -91,6 +95,25 @@ public int filterEntriesForConsumer(List entries, EntryBatchSiz isReplayRead, consumer); } + /** + * 1. Acknowledge skipped messages; + * 2. Filter out skipped messages; + */ + public List filterAndAcknowledgeSkippedEntry(List entries) { + List skippedPositions = new ArrayList<>(); + List filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> { + if (entry instanceof EntryImpl) { + if (((EntryImpl) entry).skipped()) { + skippedPositions.add(new PositionImpl(entry.getLedgerId(), entry.getEntryId())); + return false; + } + } + return true; + })); + subscription.acknowledgeMessage(skippedPositions, CommandAck.AckType.Individual, Collections.emptyMap()); + return filterEntries; + } + /** * Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 02d2e725379b6..c46fe28e08389 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -564,7 +564,8 @@ public final synchronized void readEntriesComplete(List entries, Object c protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List entries) { sendInProgress = true; try { - return trySendMessagesToConsumers(readType, entries); + List filterEntries = filterAndAcknowledgeSkippedEntry(entries); + return trySendMessagesToConsumers(readType, filterEntries); } finally { sendInProgress = false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 47830e669af4e..43f79b2a25474 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -146,7 +146,8 @@ protected void cancelPendingRead() { @Override public void readEntriesComplete(final List entries, Object obj) { topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> { - internalReadEntriesComplete(entries, obj); + List filterEntries = filterAndAcknowledgeSkippedEntry(entries); + internalReadEntriesComplete(filterEntries, obj); })); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 612d9368b8c70..9ce0061d73668 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -19,9 +19,7 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.fail; +import static org.testng.Assert.*; import java.lang.reflect.Field; import java.util.Map.Entry; @@ -43,6 +41,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.util.StringUtils; import org.apache.pulsar.broker.BrokerTestUtil; @@ -166,6 +165,101 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception { consumer.close(); } + @Test + public void testSkipCorruptDataLedgerAndCheckMarkdelete() throws Exception { + // Ensure intended state for autoSkipNonRecoverableData + admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "true"); + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getWebServiceAddress()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + + final String ns1 = "prop/usc/crash-broker"; + final int totalMessages = 100; + final int totalDataLedgers = 5; + final int entriesPerLedger = totalMessages / totalDataLedgers; + + try { + admin.namespaces().createNamespace(ns1); + } catch (Exception e) { + } + + final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis(); + // Create subscription + Consumer consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name") + .receiverQueueSize(5).subscribe(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); + Field configField = ManagedCursorImpl.class.getDeclaredField("config"); + configField.setAccessible(true); + // Create multiple data-ledger + ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor); + config.setMaxEntriesPerLedger(entriesPerLedger); + config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + // bookkeeper client + Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper"); + bookKeeperField.setAccessible(true); + // Create multiple data-ledger + BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml); + + // (1) publish messages in 5 data-ledgers each with 20 entries under managed-ledger + Producer producer = client.newProducer().topic(topic1).create(); + for (int i = 0; i < totalMessages; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + producer.close(); + + Message msg = null; + // (2) consume 20 messages from first ledger + for (int i = 0; i < entriesPerLedger; i++) { + msg = consumer.receive(); + consumer.acknowledge(msg); + } + consumer.close(); + PositionImpl markDeletePosition1 = (PositionImpl) cursor.getMarkDeletedPosition(); + + // (3) delete first 4 data-ledgers and clear cache + NavigableMap ledgerInfo = ml.getLedgersInfo(); + Entry lastLedger = ledgerInfo.lastEntry(); + ledgerInfo.entrySet().forEach(entry -> { + if (!entry.equals(lastLedger)) { + assertEquals(entry.getValue().getEntries(), entriesPerLedger); + try { + bookKeeper.deleteLedger(entry.getKey()); + } catch (Exception e) { + log.warn("failed to delete ledger {}", entry.getKey(), e); + } + } + }); + pulsar.getBrokerService().removeTopicFromCache(topic1); + ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory(); + Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); + field.setAccessible(true); + @SuppressWarnings("unchecked") + ConcurrentHashMap> ledgers = (ConcurrentHashMap>) field + .get(factory); + ledgers.clear(); + + // (4) consumer will be able to consume 20 messages from last non-deleted ledger + consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name") + .receiverQueueSize(5).subscribe(); + for (int i = 0; i < entriesPerLedger; i++) { + msg = consumer.receive(); + consumer.acknowledge(msg); + } + consumer.close(); + + topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); + ml = (ManagedLedgerImpl) topic.getManagedLedger(); + cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); + PositionImpl markDeletePosition2 = (PositionImpl) cursor.getMarkDeletedPosition(); + // markDeletePosition moves forward + assertTrue(markDeletePosition2.compareTo(markDeletePosition1) > 0); + } + /** * It verifies broker-configuration using which broker can skip non-recoverable data-ledgers. *