Skip to content

Commit 0b5f31d

Browse files
tjiumingTechnoboy-
authored andcommitted
[fix][txn] Fix PendingAckHandleImpl when pendingAckStoreProvider.checkInitializedBefore failed (#18859)
1 parent 8138553 commit 0b5f31d

1 file changed

Lines changed: 14 additions & 7 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,20 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
144144

145145
this.pendingAckStoreProvider = this.persistentSubscription.getTopic()
146146
.getBrokerService().getPulsar().getTransactionPendingAckStoreProvider();
147-
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> {
148-
if (init) {
149-
initPendingAckStore();
150-
} else {
151-
completeHandleFuture();
152-
}
153-
});
147+
148+
pendingAckStoreProvider.checkInitializedBefore(persistentSubscription)
149+
.thenAccept(init -> {
150+
if (init) {
151+
initPendingAckStore();
152+
} else {
153+
completeHandleFuture();
154+
}
155+
})
156+
.exceptionally(t -> {
157+
changeToErrorState();
158+
exceptionHandleFuture(t);
159+
return null;
160+
});
154161
}
155162

156163
private void initPendingAckStore() {

0 commit comments

Comments
 (0)