Skip to content

Commit cbb17e7

Browse files
authored
Close TransactionBuffer when MessageDeduplication#checkStatus failed (#19289)
1 parent ed3238b commit cbb17e7

File tree

2 files changed

+70
-1
lines changed

2 files changed

+70
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1385,6 +1385,14 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
13851385
});
13861386
}
13871387

1388+
1389+
@VisibleForTesting
1390+
public void createPersistentTopic0(final String topic, boolean createIfMissing,
1391+
CompletableFuture<Optional<Topic>> topicFuture,
1392+
Map<String, String> properties) {
1393+
createPersistentTopic(topic, createIfMissing, topicFuture, properties);
1394+
}
1395+
13881396
private void createPersistentTopic(final String topic, boolean createIfMissing,
13891397
CompletableFuture<Optional<Topic>> topicFuture,
13901398
Map<String, String> properties) {
@@ -1459,6 +1467,12 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
14591467
.exceptionally((ex) -> {
14601468
log.warn("Replication or dedup check failed."
14611469
+ " Removing topic from topics list {}, {}", topic, ex);
1470+
persistentTopic.getTransactionBuffer()
1471+
.closeAsync()
1472+
.exceptionally(t -> {
1473+
log.error("[{}] Close transactionBuffer failed", topic, t);
1474+
return null;
1475+
});
14621476
persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
14631477
topics.remove(topic, topicFuture);
14641478
topicFuture.completeExceptionally(ex);
@@ -3187,7 +3201,8 @@ public long getPausedConnections() {
31873201
}
31883202

31893203
@SuppressWarnings("unchecked")
3190-
private <T extends Topic> T newTopic(String topic, ManagedLedger ledger, BrokerService brokerService,
3204+
@VisibleForTesting
3205+
public <T extends Topic> T newTopic(String topic, ManagedLedger ledger, BrokerService brokerService,
31913206
Class<T> topicClazz) throws PulsarServerException {
31923207
if (topicFactory != null) {
31933208
try {

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,16 @@
1818
*/
1919
package org.apache.pulsar.broker.transaction.buffer;
2020

21+
import org.apache.bookkeeper.mledger.ManagedLedger;
22+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
2123
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
24+
import org.apache.pulsar.broker.PulsarService;
25+
import org.apache.pulsar.broker.service.BrokerService;
26+
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
2227
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
2328
import org.apache.pulsar.broker.transaction.TransactionTestBase;
29+
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
30+
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
2431
import org.apache.pulsar.client.api.Producer;
2532
import org.apache.pulsar.client.api.transaction.Transaction;
2633
import org.apache.pulsar.common.naming.TopicName;
@@ -30,11 +37,17 @@
3037
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
3138
import org.awaitility.Awaitility;
3239
import org.powermock.reflect.Whitebox;
40+
import org.mockito.Mockito;
41+
import org.testng.Assert;
3342
import org.testng.annotations.AfterMethod;
3443
import org.testng.annotations.BeforeMethod;
3544
import org.testng.annotations.Test;
45+
import java.util.Collections;
3646
import java.util.Map;
47+
import java.util.UUID;
48+
import java.util.concurrent.CompletableFuture;
3749
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.atomic.AtomicReference;
3851

3952
public class TopicTransactionBufferTest extends TransactionTestBase {
4053

@@ -86,4 +99,45 @@ public void testTransactionBufferAppendMarkerWriteFailState() throws Exception {
8699
Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed);
87100
txn.commit().get();
88101
}
102+
103+
@Test
104+
public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception {
105+
String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
106+
PulsarService pulsar = pulsarServiceList.get(0);
107+
BrokerService brokerService0 = pulsar.getBrokerService();
108+
BrokerService brokerService = Mockito.spy(brokerService0);
109+
AtomicReference<PersistentTopic> reference = new AtomicReference<>();
110+
111+
Mockito
112+
.doAnswer(inv -> {
113+
String topic1 = inv.getArgument(0);
114+
ManagedLedger ledger = inv.getArgument(1);
115+
BrokerService service = inv.getArgument(2);
116+
Class<?> topicKlass = inv.getArgument(3);
117+
if (topicKlass.equals(PersistentTopic.class)) {
118+
PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service));
119+
CompletableFuture<Void> f =CompletableFuture
120+
.failedFuture(new ManagedLedgerException("This is an exception"));
121+
Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
122+
reference.set(pt);
123+
return pt;
124+
} else {
125+
return new NonPersistentTopic(topic1, service);
126+
}
127+
})
128+
.when(brokerService)
129+
.newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService),
130+
Mockito.eq(PersistentTopic.class));
131+
132+
brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap());
133+
134+
Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null);
135+
PersistentTopic persistentTopic = reference.get();
136+
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
137+
Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
138+
TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
139+
TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
140+
Assert.assertEquals(ttb.getState(), expectState);
141+
}
142+
89143
}

0 commit comments

Comments
 (0)