Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
private long ledgerId;
private long entryId;
ByteBuf data;
private boolean skipped;

public static EntryImpl create(LedgerEntry ledgerEntry) {
EntryImpl entry = RECYCLER.get();
Expand All @@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
return entry;
}

public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.skipped = skipped;
entry.data = Unpooled.wrappedBuffer(new byte[0]);
entry.setRefCnt(1);
return entry;
}

public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
Expand Down Expand Up @@ -146,6 +158,10 @@ public long getEntryId() {
return entryId;
}

public boolean skipped() {
return skipped;
}

@Override
public int compareTo(EntryImpl other) {
if (this.ledgerId != other.ledgerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
recycle();
return;
}
List<Entry> skippedEntries = new ArrayList<>();
PositionImpl startPosition = readPosition;
PositionImpl endPosition = (PositionImpl) nexReadPosition;
while (startPosition.compareTo(endPosition) < 0) {
skippedEntries.add(EntryImpl.create(startPosition.ledgerId, startPosition.entryId, true));
startPosition = cursor.ledger.getNextValidPosition(startPosition);
}
List<Entry> filteredEntries = cursor.filterReadEntries(skippedEntries);
entries.addAll(filteredEntries);
updateReadPosition(nexReadPosition);
checkReadCompletion();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.pulsar.broker.service;

import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -31,6 +33,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -41,6 +44,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
Expand Down Expand Up @@ -91,6 +95,25 @@ public int filterEntriesForConsumer(List<? extends Entry> entries, EntryBatchSiz
isReplayRead, consumer);
}

/**
* 1. Acknowledge skipped messages;
* 2. Filter out skipped messages;
*/
public List<Entry> filterAndAcknowledgeSkippedEntry(List<Entry> entries) {
List<Position> skippedPositions = new ArrayList<>();
List<Entry> filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> {
if (entry instanceof EntryImpl) {
if (((EntryImpl) entry).skipped()) {
skippedPositions.add(new PositionImpl(entry.getLedgerId(), entry.getEntryId()));
return false;
}
}
return true;
}));
subscription.acknowledgeMessage(skippedPositions, CommandAck.AckType.Individual, Collections.emptyMap());
return filterEntries;
}

/**
* Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
sendInProgress = true;
try {
return trySendMessagesToConsumers(readType, entries);
List<Entry> filterEntries = filterAndAcknowledgeSkippedEntry(entries);
return trySendMessagesToConsumers(readType, filterEntries);
} finally {
sendInProgress = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ protected void cancelPendingRead() {
@Override
public void readEntriesComplete(final List<Entry> entries, Object obj) {
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
internalReadEntriesComplete(entries, obj);
List<Entry> filterEntries = filterAndAcknowledgeSkippedEntry(entries);
internalReadEntriesComplete(filterEntries, obj);
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.pulsar.broker.service;

import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;
import static org.testng.Assert.*;

import java.lang.reflect.Field;
import java.util.Map.Entry;
Expand All @@ -43,6 +41,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
Expand Down Expand Up @@ -166,6 +165,101 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
consumer.close();
}

@Test
public void testSkipCorruptDataLedgerAndCheckMarkdelete() throws Exception {
// Ensure intended state for autoSkipNonRecoverableData
admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "true");
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();

final String ns1 = "prop/usc/crash-broker";
final int totalMessages = 100;
final int totalDataLedgers = 5;
final int entriesPerLedger = totalMessages / totalDataLedgers;

try {
admin.namespaces().createNamespace(ns1);
} catch (Exception e) {
}

final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis();
// Create subscription
Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
.receiverQueueSize(5).subscribe();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
Field configField = ManagedCursorImpl.class.getDeclaredField("config");
configField.setAccessible(true);
// Create multiple data-ledger
ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor);
config.setMaxEntriesPerLedger(entriesPerLedger);
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
// bookkeeper client
Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
bookKeeperField.setAccessible(true);
// Create multiple data-ledger
BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);

// (1) publish messages in 5 data-ledgers each with 20 entries under managed-ledger
Producer<byte[]> producer = client.newProducer().topic(topic1).create();
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
producer.close();

Message<byte[]> msg = null;
// (2) consume 20 messages from first ledger
for (int i = 0; i < entriesPerLedger; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
}
consumer.close();
PositionImpl markDeletePosition1 = (PositionImpl) cursor.getMarkDeletedPosition();

// (3) delete first 4 data-ledgers and clear cache
NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
ledgerInfo.entrySet().forEach(entry -> {
if (!entry.equals(lastLedger)) {
assertEquals(entry.getValue().getEntries(), entriesPerLedger);
try {
bookKeeper.deleteLedger(entry.getKey());
} catch (Exception e) {
log.warn("failed to delete ledger {}", entry.getKey(), e);
}
}
});
pulsar.getBrokerService().removeTopicFromCache(topic1);
ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) field
.get(factory);
ledgers.clear();

// (4) consumer will be able to consume 20 messages from last non-deleted ledger
consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
.receiverQueueSize(5).subscribe();
for (int i = 0; i < entriesPerLedger; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
}
consumer.close();

topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
ml = (ManagedLedgerImpl) topic.getManagedLedger();
cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
PositionImpl markDeletePosition2 = (PositionImpl) cursor.getMarkDeletedPosition();
// markDeletePosition moves forward
assertTrue(markDeletePosition2.compareTo(markDeletePosition1) > 0);
}

/**
* It verifies broker-configuration using which broker can skip non-recoverable data-ledgers.
*
Expand Down