Skip to content

Commit 8eb7ee1

Browse files
authored
[fix] Close TransactionBuffer when create persistent topic timeout (#19384)
1 parent c91303d commit 8eb7ee1

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1625,8 +1625,15 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
16251625
- topicCreateTimeMs;
16261626
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
16271627
if (topicFuture.isCompletedExceptionally()) {
1628+
// Check create persistent topic timeout.
16281629
log.warn("{} future is already completed with failure {}, closing the"
16291630
+ " topic", topic, FutureUtil.getException(topicFuture));
1631+
persistentTopic.getTransactionBuffer()
1632+
.closeAsync()
1633+
.exceptionally(t -> {
1634+
log.error("[{}] Close transactionBuffer failed", topic, t);
1635+
return null;
1636+
});
16301637
persistentTopic.stopReplProducers()
16311638
.whenCompleteAsync((v, exception) -> {
16321639
topics.remove(topic, topicFuture);

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.commons.lang3.reflect.FieldUtils;
2525
import org.apache.pulsar.broker.PulsarService;
2626
import org.apache.pulsar.broker.service.BrokerService;
27+
import org.apache.pulsar.broker.service.Topic;
2728
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
2829
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
2930
import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -42,8 +43,11 @@
4243
import org.testng.annotations.AfterMethod;
4344
import org.testng.annotations.BeforeMethod;
4445
import org.testng.annotations.Test;
46+
47+
import java.time.Duration;
4548
import java.util.Collections;
4649
import java.util.Map;
50+
import java.util.Optional;
4751
import java.util.UUID;
4852
import java.util.concurrent.CompletableFuture;
4953
import java.util.concurrent.TimeUnit;
@@ -116,7 +120,7 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep
116120
Class<?> topicKlass = inv.getArgument(3);
117121
if (topicKlass.equals(PersistentTopic.class)) {
118122
PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service));
119-
CompletableFuture<Void> f =CompletableFuture
123+
CompletableFuture<Void> f = CompletableFuture
120124
.failedFuture(new ManagedLedgerException("This is an exception"));
121125
Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
122126
reference.set(pt);
@@ -140,4 +144,39 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep
140144
Assert.assertEquals(ttb.getState(), expectState);
141145
}
142146

147+
148+
@Test
149+
public void testCloseTransactionBufferWhenTimeout() throws Exception {
150+
String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
151+
PulsarService pulsar = pulsarServiceList.get(0);
152+
BrokerService brokerService0 = pulsar.getBrokerService();
153+
BrokerService brokerService = Mockito.spy(brokerService0);
154+
AtomicReference<PersistentTopic> reference = new AtomicReference<>();
155+
pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10);
156+
long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1);
157+
158+
Mockito
159+
.doAnswer(inv -> {
160+
Thread.sleep(topicLoadTimeout);
161+
PersistentTopic persistentTopic = (PersistentTopic) inv.callRealMethod();
162+
reference.set(persistentTopic);
163+
return persistentTopic;
164+
})
165+
.when(brokerService)
166+
.newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService),
167+
Mockito.eq(PersistentTopic.class));
168+
169+
CompletableFuture<Optional<Topic>> f = brokerService.getTopic(topic, true);
170+
171+
Awaitility.waitAtMost(20, TimeUnit.SECONDS)
172+
.pollInterval(Duration.ofSeconds(2)).until(() -> reference.get() != null);
173+
PersistentTopic persistentTopic = reference.get();
174+
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
175+
Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
176+
TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
177+
TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
178+
Assert.assertEquals(ttb.getState(), expectState);
179+
Assert.assertTrue(f.isCompletedExceptionally());
180+
}
181+
143182
}

0 commit comments

Comments
 (0)