Skip to content

Commit 5591dc2

Browse files
tjiumingnicoloboschi
authored andcommitted
[fix][txn] Fix PendingAckHandleImpl when pendingAckStoreProvider.checkInitializedBefore failed (apache#18859)
(cherry picked from commit 1be5a69) (cherry picked from commit e5dedfe)
1 parent d8b951b commit 5591dc2

1 file changed

Lines changed: 14 additions & 7 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,20 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
141141

142142
this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
143143
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
144-
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
145-
if (init) {
146-
initPendingAckStore();
147-
} else {
148-
completeHandleFuture();
149-
}
150-
});
144+
145+
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
146+
.thenAccept(init -> {
147+
if (init) {
148+
initPendingAckStore();
149+
} else {
150+
completeHandleFuture();
151+
}
152+
})
153+
.exceptionally(t -> {
154+
changeToErrorState();
155+
exceptionHandleFuture(t);
156+
return null;
157+
});
151158
}
152159

153160
private void initPendingAckStore() {

0 commit comments

Comments
 (0)