Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
cad9a84
[fix][broker] fix unacked message count is zero when using exclusive …
berg223 Jun 3, 2025
498b737
[fix][broker] rollback and handle acks when using Exclusive and Failover
berg223 Jun 4, 2025
2d09532
Update pulsar-broker/src/main/java/org/apache/pulsar/broker/service/C…
berg223 Jun 4, 2025
dc8795d
[fix][broker] test negative ack
berg223 Jun 4, 2025
d60f160
[fix][broker] fix consumer throtting and unacked message stats is wro…
berg223 Jun 5, 2025
0abae8c
[fix][broker] move addAndGetUnAckedMsgs outside of if condition
berg223 Jun 5, 2025
4e1bc43
[fix][broker] change testUnackedMessages to batch mode
berg223 Jun 5, 2025
fbd62e9
Merge branch 'fix_exclusive_unacked_stats' into fix_consumer_throttin…
berg223 Jun 5, 2025
4cbbad8
[fix][broker] fix issue about redelivery
berg223 Jun 5, 2025
b6d5c55
[fix][broker] support batch mode
berg223 Jun 7, 2025
4d85d70
[fix][broker] correct the comment
berg223 Jun 8, 2025
acfcadb
[fix][broker] fix flow control issue
berg223 Jun 8, 2025
e6cff41
[fix][broker] fix flow control issue
berg223 Jun 8, 2025
3e56df5
[fix][broker] fix flow control issue
berg223 Jun 8, 2025
8197860
[fix][broker] add test about transaction
berg223 Jun 8, 2025
f4610ac
[fix][broker] add test about transaction
berg223 Jun 8, 2025
513ab2d
[fix][broker] add data provider
berg223 Jun 8, 2025
71e991a
[fix][broker] test abort cumulative acknowledge
berg223 Jun 8, 2025
e4913cf
[fix][broker] fix CI failure
berg223 Jun 8, 2025
848a910
[fix][broker] fix CI failure
berg223 Jun 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* running.
*/
public class PendingAcksMap {

/**
* Callback interface for handling the addition of pending acknowledgments.
*/
Expand Down Expand Up @@ -227,6 +228,15 @@ public void forEachAndClose(PendingAcksConsumer processor) {
}
}

public void clear() {
try {
writeLock.lock();
pendingAcks.clear();
} finally {
writeLock.unlock();
}
}

/**
* Check if the map contains a pending ack for the given ledger ID and entry ID.
*
Expand Down Expand Up @@ -330,9 +340,10 @@ public boolean remove(long ledgerId, long entryId) {
*
* @param markDeleteLedgerId the ledger ID up to which to remove pending acks
* @param markDeleteEntryId the entry ID up to which to remove pending acks
* @return the sum batchSize of removed pending acks
*/
public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, false);
public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
return internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, false);
}

/**
Expand All @@ -345,15 +356,18 @@ public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
* @param markDeleteLedgerId the ledger ID up to which to remove pending acks
* @param markDeleteEntryId the entry ID up to which to remove pending acks
* @param useWriteLock true if the method should use a write lock, false otherwise
* @return the sum batchSize of removed pending acks
*/
private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntryId, boolean useWriteLock) {
private int internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntryId, boolean useWriteLock) {
PendingAcksRemoveHandler pendingAcksRemoveHandler = pendingAcksRemoveHandlerSupplier.get();
// track if the write lock was acquired
boolean acquiredWriteLock = false;
// track if a batch was started
boolean batchStarted = false;
// track if the method should retry with a write lock
boolean retryWithWriteLock = false;
// the number of removed pending acks
int removeCount = 0;
try {
if (useWriteLock) {
writeLock.lock();
Expand All @@ -378,9 +392,10 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry
while (entryMapIterator.hasNext()) {
Long2ObjectMap.Entry<IntIntPair> intIntPairEntry = entryMapIterator.next();
long entryId = intIntPairEntry.getLongKey();
int batchSize = intIntPairEntry.getValue().leftInt();
if (!acquiredWriteLock) {
retryWithWriteLock = true;
return;
break;
}
if (pendingAcksRemoveHandler != null) {
if (!batchStarted) {
Expand All @@ -390,12 +405,16 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry
int stickyKeyHash = intIntPairEntry.getValue().rightInt();
pendingAcksRemoveHandler.handleRemoving(consumer, ledgerId, entryId, stickyKeyHash, closed);
}
removeCount += batchSize;
entryMapIterator.remove();
}
if (retryWithWriteLock) {
break;
}
if (ledgerMap.isEmpty()) {
if (!acquiredWriteLock) {
retryWithWriteLock = true;
return;
break;
}
ledgerMapIterator.remove();
}
Expand All @@ -409,10 +428,11 @@ private void internalRemoveAllUpTo(long markDeleteLedgerId, long markDeleteEntry
} else {
readLock.unlock();
if (retryWithWriteLock) {
internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, true);
removeCount = internalRemoveAllUpTo(markDeleteLedgerId, markDeleteEntryId, true);
}
}
}
return removeCount;
}

private void handleRemovePendingAck(long ledgerId, long entryId, int stickyKeyHash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
: ""/* NonDurableCursor doesn't have name */);
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.readFailureBackoff = new Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(),
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(),
TimeUnit.MILLISECONDS);
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.initializeDispatchRateLimiterIfNeeded();
}
Expand Down Expand Up @@ -231,18 +231,19 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
SendMessageInfo sendMessageInfo, long epoch) {
currentConsumer
.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
redeliveryTracker, epoch)
.addListener(future -> {
if (future.isSuccess()) {
acquirePermitsForDeliveredMessages(topic, cursor, entries.size(),
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());

// Schedule a new read batch operation only after the previous batch has been written to the socket.
executor.execute(() -> readMoreEntries(getActiveConsumer()));
}
});
.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
redeliveryTracker, epoch)
.addListener(future -> {
if (future.isSuccess()) {
acquirePermitsForDeliveredMessages(topic, cursor, entries.size(),
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());

// Schedule a new read batch operation only after the previous batch has been written to the
// socket.
executor.execute(() -> readMoreEntries(getActiveConsumer()));
}
});
}

@Override
Expand Down Expand Up @@ -435,6 +436,11 @@ protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
int avgMessagesPerEntry = Math.max(1, consumer.getAvgMessagesPerEntry());
messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
}
// We need to ensure that we don't exceed the max unacked messages.
int maxUnackedMessages = consumer.getMaxUnackedMessages();
if (maxUnackedMessages > 0) {
messagesToRead = Math.min(messagesToRead, maxUnackedMessages - consumer.getUnackedMessages());
}

// throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
// active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
Expand Down Expand Up @@ -585,6 +591,11 @@ public boolean checkAndUnblockIfStuck() {
return false;
}

public int getUnackedMessages() {
Consumer activeConsumer = getActiveConsumer();
return activeConsumer != null ? activeConsumer.getUnackedMessages() : 0;
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);

public static class ReadEntriesCtx {
Expand All @@ -597,13 +608,14 @@ public static class ReadEntriesCtx {
private ReadEntriesCtx(Recycler.Handle<ReadEntriesCtx> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<ReadEntriesCtx> RECYCLER =
new Recycler<ReadEntriesCtx>() {
@Override
protected ReadEntriesCtx newObject(Recycler.Handle<ReadEntriesCtx> recyclerHandle) {
return new ReadEntriesCtx(recyclerHandle);
}
};
@Override
protected ReadEntriesCtx newObject(Recycler.Handle<ReadEntriesCtx> recyclerHandle) {
return new ReadEntriesCtx(recyclerHandle);
}
};

public static ReadEntriesCtx create(Consumer consumer, long epoch) {
ReadEntriesCtx readEntriesCtx = RECYCLER.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,9 @@ public CompletableFuture<SubscriptionStatsImpl> getStatsAsync(GetStatsOptions ge
SubType subType = getType();
subStats.type = getTypeString();
if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
Consumer activeConsumer = ((PersistentDispatcherSingleActiveConsumer) dispatcher).getActiveConsumer();
PersistentDispatcherSingleActiveConsumer d = ((PersistentDispatcherSingleActiveConsumer) dispatcher);
subStats.unackedMessages = d.getUnackedMessages();
Consumer activeConsumer = d.getActiveConsumer();
if (activeConsumer != null) {
subStats.activeConsumerName = activeConsumer.consumerName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,12 @@ public void testMaxUnackedMessagesOnSubApplied() throws Exception {
}

@Test(timeOut = 30000)
public void testMaxUnackedMessagesOnConsumer() throws Exception {
public void testMaxUnackedMessagesOnSharedConsumer() throws Exception {
final String topicName = testTopic + System.currentTimeMillis();
final String subscriberName = "test-sub" + System.currentTimeMillis();
final int unackMsgAllowed = 100;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 300;
final int totalProducedMsgs = 400;

ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
Expand Down Expand Up @@ -305,16 +305,183 @@ public void testMaxUnackedMessagesOnConsumer() throws Exception {
String message = "my-message-" + i;
producer.send(message);
}
AtomicInteger consumer1Counter = new AtomicInteger(0);
AtomicInteger consumer2Counter = new AtomicInteger(0);
AtomicInteger consumer3Counter = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(2);
CountDownLatch countDownLatch = new CountDownLatch(3);
startConsumer(consumer1, consumer1Counter, countDownLatch);
startConsumer(consumer2, consumer2Counter, countDownLatch);
startConsumer(consumer3, consumer3Counter, countDownLatch);
countDownLatch.await(10, TimeUnit.SECONDS);
assertEquals(consumer1Counter.get(), unackMsgAllowed);
assertEquals(consumer2Counter.get(), unackMsgAllowed);
assertEquals(consumer3Counter.get(), unackMsgAllowed);
}

@Test(timeOut = 30000)
public void testMaxUnackedMessagesOnExclusiveConsumer() throws Exception {
final String topicName = testTopic + System.currentTimeMillis();
final String subscriberName = "test-sub-exclusive" + System.currentTimeMillis();
final int unackMsgAllowed = 100;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 300;

ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
.ackTimeout(1, TimeUnit.MINUTES)
.subscriptionType(SubscriptionType.Exclusive);
@Cleanup
Consumer<String> consumer = consumerBuilder.subscribe();
// 1) Produced Messages
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message);
}
// 2) Unlimited, so all messages can be consumed
int count = 0;
List<Message<String>> list = new ArrayList<>(totalProducedMsgs);
for (int i = 0; i < totalProducedMsgs; i++) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
list.add(message);
}
assertEquals(count, totalProducedMsgs);
list.forEach(message -> {
try {
consumer.acknowledge(message);
} catch (PulsarClientException e) {
}
});
// 3) Set restrictions, so only part of the data can be consumed
waitCacheInit(topicName);
admin.topics().setMaxUnackedMessagesOnConsumer(topicName, unackMsgAllowed);
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), unackMsgAllowed);
// 4) consumer can only consume 100 messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message);
}
int consumerCounter = 0;
Message<String> message = null;
for (int i = 0; i < totalProducedMsgs; i++) {
try {
Message<String> msg = consumer.receive(500, TimeUnit.MILLISECONDS);
if (msg == null) {
break;
}
message = msg;
++consumerCounter;
} catch (PulsarClientException e) {
break;
}
}
assertEquals(consumerCounter, unackMsgAllowed);
consumer.acknowledgeCumulative(message.getMessageId());
consumerCounter = 0;
for (int i = 0; i < totalProducedMsgs - unackMsgAllowed; i++) {
try {
message = consumer.receive(500, TimeUnit.MILLISECONDS);
if (message == null) {
break;
}
++consumerCounter;
} catch (PulsarClientException e) {
break;
}
}
assertEquals(consumerCounter, unackMsgAllowed);
}

@Test(timeOut = 30000)
public void testMaxUnackedMessagesOnFailOverConsumer() throws Exception {
final String topicName = testTopic + System.currentTimeMillis();
final String subscriberName = "test-sub-failover" + System.currentTimeMillis();
final int unackMsgAllowed = 100;
final int receiverQueueSize = 10;
final int totalProducedMsgs = 300;

ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionName(subscriberName).receiverQueueSize(receiverQueueSize)
.ackTimeout(1, TimeUnit.MINUTES)
.subscriptionType(SubscriptionType.Failover);
@Cleanup
Consumer<String> consumer = consumerBuilder.subscribe();
// 1) Produced Messages
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message);
}
// 2) Unlimited, so all messages can be consumed
int count = 0;
List<Message<String>> list = new ArrayList<>(totalProducedMsgs);
for (int i = 0; i < totalProducedMsgs; i++) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
count++;
list.add(message);
}
assertEquals(count, totalProducedMsgs);
list.forEach(message -> {
try {
consumer.acknowledge(message);
} catch (PulsarClientException e) {
}
});
// 3) Set restrictions, so only part of the data can be consumed
waitCacheInit(topicName);
admin.topics().setMaxUnackedMessagesOnConsumer(topicName, unackMsgAllowed);
Awaitility.await().untilAsserted(()
-> assertNotNull(admin.topics().getMaxUnackedMessagesOnConsumer(topicName)));
assertEquals(admin.topics().getMaxUnackedMessagesOnConsumer(topicName).intValue(), unackMsgAllowed);
// 4) consumer can only consume 100 messages
for (int i = 0; i < totalProducedMsgs; i++) {
String message = "my-message-" + i;
producer.send(message);
}
int consumerCounter = 0;
while (true) {
try {
Message<String> message = consumer.receive(500, TimeUnit.MILLISECONDS);
if (message == null) {
break;
}
++consumerCounter;
} catch (PulsarClientException e) {
break;
}
}
assertEquals(consumerCounter, unackMsgAllowed);

// 5) failover consumer can only consume 100 messages
@Cleanup
Consumer<String> failoverConsumer = consumerBuilder.subscribe();
consumer.close();
consumerCounter = 0;
while (true) {
try {
Message<String> message = failoverConsumer.receive(1, TimeUnit.SECONDS);
if (message == null) {
break;
}
++consumerCounter;
} catch (PulsarClientException e) {
break;
}
}
assertEquals(consumerCounter, unackMsgAllowed);
}

private void startConsumer(Consumer<String> consumer, AtomicInteger consumerCounter,
CountDownLatch countDownLatch) {
new Thread(() -> {
Expand Down
Loading
Loading