Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ protected EntryImpl newObject(Handle<EntryImpl> handle) {
private long ledgerId;
private long entryId;
ByteBuf data;
private boolean skipped;

public static EntryImpl create(LedgerEntry ledgerEntry) {
EntryImpl entry = RECYCLER.get();
Expand All @@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
return entry;
}

public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
entry.ledgerId = ledgerId;
entry.entryId = entryId;
entry.skipped = skipped;
entry.data = Unpooled.wrappedBuffer(new byte[0]);
entry.setRefCnt(1);
return entry;
}

public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) {
EntryImpl entry = RECYCLER.get();
entry.timestamp = System.nanoTime();
Expand Down Expand Up @@ -146,6 +158,10 @@ public long getEntryId() {
return entryId;
}

public boolean skipped() {
return skipped;
}

@Override
public int compareTo(EntryImpl other) {
if (this.ledgerId != other.ledgerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,15 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
recycle();
return;
}
List<Entry> skippedEntries = new ArrayList<>();
PositionImpl startPosition = readPosition;
PositionImpl endPosition = (PositionImpl) nexReadPosition;
while (startPosition.compareTo(endPosition) < 0) {
skippedEntries.add(EntryImpl.create(startPosition.ledgerId, startPosition.entryId, true));
startPosition = cursor.ledger.getNextValidPosition(startPosition);
}
List<Entry> filteredEntries = cursor.filterReadEntries(skippedEntries);
entries.addAll(filteredEntries);
updateReadPosition(nexReadPosition);
checkReadCompletion();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.pulsar.broker.service;

import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -31,6 +33,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -41,6 +44,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
Expand Down Expand Up @@ -91,6 +95,25 @@ public int filterEntriesForConsumer(List<? extends Entry> entries, EntryBatchSiz
isReplayRead, consumer);
}

/**
* 1. Acknowledge skipped messages;
* 2. Filter out skipped messages;
*/
public List<Entry> filterAndAcknowledgeSkippedEntry(List<Entry> entries) {
List<Position> skippedPositions = new ArrayList<>();
List<Entry> filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> {
if (entry instanceof EntryImpl) {
if (((EntryImpl) entry).skipped()) {
skippedPositions.add(new PositionImpl(entry.getLedgerId(), entry.getEntryId()));
return false;
}
}
return true;
}));
subscription.acknowledgeMessage(skippedPositions, CommandAck.AckType.Individual, Collections.emptyMap());
return filterEntries;
}

/**
* Filter entries with prefetched message metadata range so that there is no need to peek metadata from Entry.
*
Expand All @@ -107,8 +130,9 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
List<Position> entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
List<PositionImpl> entriesToRedeliver = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters);
List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
List<PositionImpl> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
final Entry entry = entries.get(i);
if (entry == null) {
Expand All @@ -123,18 +147,24 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
);

int entryMsgCnt = msgMetadata == null ? 1 : msgMetadata.getNumMessagesInBatch();
this.filterProcessedMsgs.add(entryMsgCnt);
if (hasFilter) {
this.filterProcessedMsgs.add(entryMsgCnt);
}

EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRejectedMsgs.add(entryMsgCnt);
entry.release();
continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add((PositionImpl) entry.getPosition());
entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRescheduledMsgs.add(entryMsgCnt);
entry.release();
continue;
Expand Down Expand Up @@ -176,7 +206,9 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
continue;
}

this.filterAcceptedMsgs.add(entryMsgCnt);
if (hasFilter) {
this.filterAcceptedMsgs.add(entryMsgCnt);
}

totalEntries++;
int batchSize = msgMetadata.getNumMessagesInBatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1997,7 +1997,7 @@ public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) {
TopicName topicName = TopicName.get(topic);
if (serviceUnit.includes(topicName) && getTopicReference(topic).isPresent()) {
log.info("[{}][{}] Clean unloaded topic from cache.", serviceUnit.toString(), topic);
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit);
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit, null);
}
}
}
Expand All @@ -2006,15 +2006,56 @@ public AuthorizationService getAuthorizationService() {
return authorizationService;
}

public CompletableFuture<Void> removeTopicFromCache(String topic) {
public CompletableFuture<Void> removeTopicFromCache(String topicName) {
return removeTopicFutureFromCache(topicName, null);
}

public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
Optional<CompletableFuture<Optional<Topic>>> createTopicFuture = findTopicFutureInCache(topic);
if (createTopicFuture.isEmpty()){
return CompletableFuture.completedFuture(null);
}
return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get());
}

private Optional<CompletableFuture<Optional<Topic>>> findTopicFutureInCache(Topic topic){
if (topic == null){
return Optional.empty();
}
final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topic.getName());
// If not exists in cache, do nothing.
if (createTopicFuture == null){
return Optional.empty();
}
// If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic.
if (!createTopicFuture.isDone()){
return Optional.empty();
}
// If the future in cache has exception complete,
// the topic instance in the cache is not the same with the topic.
if (createTopicFuture.isCompletedExceptionally()){
return Optional.empty();
}
Optional<Topic> optionalTopic = createTopicFuture.join();
Topic topicInCache = optionalTopic.orElse(null);
if (topicInCache == null || topicInCache != topic){
return Optional.empty();
} else {
return Optional.of(createTopicFuture);
}
}

private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
CompletableFuture<Optional<Topic>> createTopicFuture) {
TopicName topicName = TopicName.get(topic);
return pulsar.getNamespaceService().getBundleAsync(topicName)
.thenAccept(namespaceBundle -> {
removeTopicFromCache(topic, namespaceBundle);
removeTopicFromCache(topic, namespaceBundle, createTopicFuture);
});
}

public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle) {
private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle,
CompletableFuture<Optional<Topic>> createTopicFuture) {
String bundleName = namespaceBundle.toString();
String namespaceName = TopicName.get(topic).getNamespaceObject().toString();

Expand All @@ -2041,7 +2082,12 @@ public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle)
}
}
}
topics.remove(topic);

if (createTopicFuture == null) {
topics.remove(topic);
} else {
topics.remove(topic, createTopicFuture);
}

Compactor compactor = pulsar.getNullableCompactor();
if (compactor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
// topic GC iterates over topics map and removing from the map with the same thread creates
// deadlock. so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(NonPersistentTopic.this);
unregisterTopicPolicyListener();
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
Expand Down Expand Up @@ -516,7 +516,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
// unload topic iterates over topics map and removing from the map with the same thread creates deadlock.
// so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(NonPersistentTopic.this);
unregisterTopicPolicyListener();
closeFuture.complete(null);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
protected final synchronized boolean sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
sendInProgress = true;
try {
return trySendMessagesToConsumers(readType, entries);
List<Entry> filterEntries = filterAndAcknowledgeSkippedEntry(entries);
return trySendMessagesToConsumers(readType, filterEntries);
} finally {
sendInProgress = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ protected void cancelPendingRead() {
@Override
public void readEntriesComplete(final List<Entry> entries, Object obj) {
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
internalReadEntriesComplete(entries, obj);
List<Entry> filterEntries = filterAndAcknowledgeSkippedEntry(entries);
internalReadEntriesComplete(filterEntries, obj);
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(PersistentTopic.this);

dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

Expand Down Expand Up @@ -1305,7 +1305,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
@Override
public void closeComplete(Object ctx) {
// Everything is now closed, remove the topic from map
brokerService.removeTopicFromCache(topic)
brokerService.removeTopicFromCache(PersistentTopic.this)
.thenRun(() -> {
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);

Expand All @@ -1327,7 +1327,7 @@ public void closeComplete(Object ctx) {
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
brokerService.removeTopicFromCache(topic);
brokerService.removeTopicFromCache(PersistentTopic.this);
unregisterTopicPolicyListener();
closeFuture.complete(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ protected void setup() throws Exception {
protected void cleanup() throws Exception {
super.internalCleanup();
useStaticPorts = false;
resetConfig();
}

@Override
Expand Down Expand Up @@ -105,6 +104,7 @@ public void testGetWorkerServiceException() throws Exception {

@Test
public void testAdvertisedAddress() throws Exception {
cleanup();
useStaticPorts = true;
setup();
assertEquals(pulsar.getAdvertisedAddress(), "localhost");
Expand All @@ -117,6 +117,7 @@ public void testAdvertisedAddress() throws Exception {

@Test
public void testAdvertisedListeners() throws Exception {
cleanup();
// don't use dynamic ports when using advertised listeners (#12079)
useStaticPorts = true;
conf.setAdvertisedListeners("internal:pulsar://gateway:6650, internal:pulsar+ssl://gateway:6651");
Expand All @@ -132,6 +133,7 @@ public void testAdvertisedListeners() throws Exception {

@Test
public void testDynamicBrokerPort() throws Exception {
cleanup();
useStaticPorts = false;
setup();
assertEquals(pulsar.getAdvertisedAddress(), "localhost");
Expand Down
Loading