Skip to content

Commit 451b0d7

Browse files
tjiumingcongbobo184
authored andcommitted
[fix][txn] Fix PendingAckHandleImpl when pendingAckStoreProvider.checkInitializedBefore failed (#18859)
(cherry picked from commit 1be5a69)
1 parent e79fd0e commit 451b0d7

1 file changed

Lines changed: 14 additions & 7 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,20 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
140140

141141
this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
142142
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
143-
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
144-
if (init) {
145-
initPendingAckStore();
146-
} else {
147-
completeHandleFuture();
148-
}
149-
});
143+
144+
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
145+
.thenAccept(init -> {
146+
if (init) {
147+
initPendingAckStore();
148+
} else {
149+
completeHandleFuture();
150+
}
151+
})
152+
.exceptionally(t -> {
153+
changeToErrorState();
154+
exceptionHandleFuture(t);
155+
return null;
156+
});
150157
}
151158

152159
private void initPendingAckStore() {

0 commit comments

Comments
 (0)