Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions pip/pip-426.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@

# PIP-426: Enhanced Consumer Throttling and Unacknowledged Message Tracking for Exclusive and Failover Subscriptions

# Background knowledge

In [13618](https://github.com/apache/pulsar/pull/48), we have added a configuration named maxUnackedMessagesPerConsumer to restrict consumer for receiving messages without acknowledging-msg up to the threshold.

# Motivation

Apache Pulsar currently lacks full support for enforcing unacknowledged message limits and consumer-side flow control in exclusive and failover subscriptions. While these mechanisms function correctly for shared subscriptions, their absence in exclusive/failover modes causes critical limitations:

Comment on lines +10 to +11
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree with this.

After having "pending acks" also for failover subscriptions, it will be easier to come up with a proper solution for #15189 . When a failover subscription changes the active consumer, it causes duplicate processing which is problematic and surprising for many applications. Failover subscription active consumer changing should be potentially handled by first "draining" the previous consumer before sending messages to the new active consumer. (The draining approach is used for Key_Shared subscriptions since Pulsar 4.0 / PIP-379)

Copy link
Copy Markdown
Contributor Author

@berg223 berg223 Jun 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's also benefit to solve similiar issues to #15189. The draining approach could also be implemented . Your comments always open my mind and point me in the right direction !

1. **Throttling Inaccuracies**
Consumers can exceed configured `maxUnackedMessagesOnConsumer` limits, risking resource exhaustion.

2. **Tracking Deficiencies**
`pendingAcks` tracking is disabled or partially functional, compromising visibility into message processing.

3. **Inaccurate unacked message count**
Current implementation lacks support for:
- Cumulative acknowledgements
- Message batching
- Transactional operations


This proposal addresses these gaps by extending the `pendingAcks` system to support all subscription types.

# Goals

## In Scope

1. **Strict Throttling Enforcement**
Apply `maxUnackedMessagesOnConsumer` limits to exclusive and failover subscriptions.

2. **Unified Tracking Mechanism**
Enable `pendingAcks` for accurate unacknowledged message tracking across all subscription types.

3. **Accurate unacked message count**
Support cumulative acknowledgements, message batching, and transactional operations.

4. **Feature Flag**
Add a feature flag to enable/disable the new feature.

## Out of Scope
- No guarantee that messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time in exclusive/failover subscriptions.
- No changes to the existing public API or wire protocol.
- No change in behavior for shared subscriptions.

# High Level Design

1. Remove the `Subscription.isIndividualAckMode()` restriction that limits `pendingAcks` usage to shared subscriptions.
2. Extend `PersistentAcknowledgmentsGroupingTracker` for exclusive/failover consumers.
3. Enhance flow control in `PersistentDispatcherSingleActiveConsumer`

# Detailed Design

## Design & Implementation Details
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, each message in a Pulsar topic can have a persistent, monotonically increasing local index, which functions much like a Kafka offset. This feature was officially introduced by Pull Request #9039, which implemented PIP-70 ("Introduce lightweight broker entry metadata").

With this local index, the number of unacknowledged messages for a consumer can be calculated with a simple subtraction:

unacked_messages = last_delivered_index - mark_delete_position

Given its reliability, considering its proven value, enabling this index by default in a future Apache Pulsar release would be reasonable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for more background knowledge! It's really helpful to solve origin issue #24159 . Since there is no need to add flow permits to exlusive or failover subscription (details discussion see: https://lists.apache.org/thread/glvd8lrvyll9mdtp62d34x1k38swrls1), there is no motivation except fix the origin issue. IMO, we can close this pip to keep the pip from too much useless information.


### 1. Track the unacked message when sending message

Remove the `Subscription.isIndividualAckMode(subType)` restriction from
- `Consumer()` constructor when creating `PendingAcksMap`
- `Consumer.sendMessages()` when track unacked messages to `PendingAcksMap`
- `Consumer.incrementUnackedMessages()` and `Consumer.addAndGetUnAckedMsgs()`

### 2. Remove the unacked message when acknowledge message

#### 2.1 Changes for `Consumer.individualAckNormal()`

a) For non-batch messages:

Remove the `Subscription.isIndividualAckMode(subType)` restriction from
- `Consumer.getAckedCountForMsgIdNoAckSets()` (Ensure get the right acked message count)
- `Consumer.checkCanRemovePendingAcksAndHandle()` (Ensure the message is removed from `PendingAcksMap`)
- `Consumer.addAndGetUnAckedMsgs()` (Ensure decrease the unacked message count)
- `Consumer.shouldBlockConsumerOnUnackMsgs()` (Ensure the consumer is unblocked when unacked message count is small enough)

b) For batch messages:

Remove the `Subscription.isIndividualAckMode(subType)` restriction from
- `Consumer.getAckOwnerConsumerAndBatchSize()` (Ensure get the batch size of entry)
- `Consumer.getAckedCountForBatchIndexLevelEnabled()` (Ensure get the right acked message count)
- `Consumer.individualAckNormal` when `subscription.syncBatchPositionBitSetForPendingAck(position);` (Ensure the message is synced to `org.apache.PersistentAcknowledgmentsGroupingTracker` to achieve that message redelivery after abort transaction)
- `Consumer.addAndGetUnAckedMsgs()` (Ensure decrease the unacked message count)

c) for batch and non-batch messages

Remove the `Subscription.isIndividualAckMode(subType)` restriction from

- `Consumer.individualAckNormal` after future of `subscription.acknowledgeMessage` complete

#### 2.2 Changes for `Consumer.individualAckWithTransaction()`

Remove the `Subscription.isIndividualAckMode(subType)` restriction from
- `Consumer.getAckOwnerConsumerAndBatchSize()` (Ensure get the batch size of entry)
- `getAckedCountForTransactionAck` and `Consumer.getAckedCountForBatchIndexLevelEnabled()` (Ensure get the right acked message count)
- `Consumer.addAndGetUnAckedMsgs()` (Ensure decrease the unacked message count)
- `Consumer.checkCanRemovePendingAcksAndHandle()` (Ensure the message is removed from `PendingAcksMap`)
- `Consumer.individualAckWithTransaction` after future of `Consumer.transactionIndividualAcknowledge()` complete

#### 2.3 Changes for `Consumer.messageAcked()`

For cumulative ack, we should:
1. get the acked message count
2. decrease the unacked message count
3. remove the message from `PendingAcksMap`
4. unblock the consumer if the unacked message count is small enough

code will be like this:
```
if (ack.getAckType() == AckType.Cumulative) {
future.thenRun(() -> {
ObjectIntPair<Consumer> ackOwnerConsumerAndBatchSize =
getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), msgId.getEntryId());
Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left();
int ackedCount = removePendingAcksUpToPosition(ackOwnerConsumer, position);
if (ackedCount > 0) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer);
}
});
}
```


## Public-facing Changes

<!--
Describe the additions you plan to make for each public facing component.
Remove the sections you are not changing.
Clearly mark any changes which are BREAKING backward compatability.
-->

### Public API
<!--
When adding a new endpoint to the REST API, please make sure to document the following:

* path
* query parameters
* HTTP body parameters, usually as JSON.
* Response codes, and for each what they mean.
For each response code, please include a detailed description of the response body JSON, specifying each field and what it means.
This is the place to document the errors.
-->

### Binary protocol

### Configuration

### CLI

### Metrics

<!--
For each metric provide:
* Full name
* Description
* Attributes (labels)
* Unit
-->


# Monitoring

<!--
Describe how the changes you make in this proposal should be monitored.
Don't describe the detailed metrics - they should be at "Public-facing Changes" / "Metrics" section.
Describe how the user will use the metrics to monitor the feature: Which alerts they should set up, which thresholds, ...
-->

# Security Considerations
<!--
A detailed description of the security details that ought to be considered for the PIP. This is most relevant for any new HTTP endpoints, new Pulsar Protocol Commands, and new security features. The goal is to describe details like which role will have permission to perform an action.

An important aspect to consider is also multi-tenancy: Does the feature I'm adding have the permissions / roles set in such a way that prevent one tenant accessing another tenant's data/configuration? For example, the Admin API to read a specific message for a topic only allows a client to read messages for the target topic. However, that was not always the case. CVE-2021-41571 (https://github.com/apache/pulsar/wiki/CVE-2021-41571) resulted because the API was incorrectly written and did not properly prevent a client from reading another topic's messages even though authorization was in place. The problem was missing input validation that verified the requested message was actually a message for that topic. The fix to CVE-2021-41571 was input validation.

If there is uncertainty for this section, please submit the PIP and request for feedback on the mailing list.
-->

# Backward & Forward Compatibility

- Fully backward compatible.
- No config changes required unless explicitly setting `maxUnackedMessagesOnConsumer` for new use cases.

# Alternatives
None

# General Notes

# Links
<!--
Updated afterwards
-->
* Mailing List discussion thread: https://lists.apache.org/thread/glvd8lrvyll9mdtp62d34x1k38swrls1
* Mailing List voting thread: TBD
- [Original Issue #24159](https://github.com/apache/pulsar/issues/24159)
- PIP-379 implementation [PR #24396](https://github.com/apache/pulsar/pull/24396)