Skip to content

Commit 4dca93b

Browse files
authored
Close TransactionBuffer when MessageDeduplication#checkStatus failed (#19288)
1 parent fd700da commit 4dca93b

File tree

2 files changed

+71
-1
lines changed

2 files changed

+71
-1
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1390,6 +1390,13 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
13901390
});
13911391
}
13921392

1393+
@VisibleForTesting
1394+
public void createPersistentTopic0(final String topic, boolean createIfMissing,
1395+
CompletableFuture<Optional<Topic>> topicFuture,
1396+
Map<String, String> properties) {
1397+
createPersistentTopic(topic, createIfMissing, topicFuture, properties);
1398+
}
1399+
13931400
private void createPersistentTopic(final String topic, boolean createIfMissing,
13941401
CompletableFuture<Optional<Topic>> topicFuture,
13951402
Map<String, String> properties) {
@@ -1434,7 +1441,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
14341441
try {
14351442
PersistentTopic persistentTopic = isSystemTopic(topic)
14361443
? new SystemTopic(topic, ledger, BrokerService.this)
1437-
: new PersistentTopic(topic, ledger, BrokerService.this);
1444+
: newPersistentTopic(topic, ledger, BrokerService.this);
14381445
persistentTopic
14391446
.initialize()
14401447
.thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
@@ -1464,6 +1471,12 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
14641471
.exceptionally((ex) -> {
14651472
log.warn("Replication or dedup check failed."
14661473
+ " Removing topic from topics list {}, {}", topic, ex);
1474+
persistentTopic.getTransactionBuffer()
1475+
.closeAsync()
1476+
.exceptionally(t -> {
1477+
log.error("[{}] Close transactionBuffer failed", topic, t);
1478+
return null;
1479+
});
14671480
persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
14681481
topics.remove(topic, topicFuture);
14691482
topicFuture.completeExceptionally(ex);
@@ -3085,6 +3098,11 @@ public long getPausedConnections() {
30853098
return pausedConnections.longValue();
30863099
}
30873100

3101+
@VisibleForTesting
3102+
public PersistentTopic newPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
3103+
return new PersistentTopic(topic, ledger, brokerService);
3104+
}
3105+
30883106
@VisibleForTesting
30893107
public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
30903108
this.pulsarChannelInitFactory = factory;

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,16 @@
1818
*/
1919
package org.apache.pulsar.broker.transaction.buffer;
2020

21+
import org.apache.bookkeeper.mledger.ManagedLedger;
22+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
2123
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
24+
import org.apache.pulsar.broker.PulsarService;
25+
import org.apache.pulsar.broker.service.BrokerService;
26+
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
2227
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
2328
import org.apache.pulsar.broker.transaction.TransactionTestBase;
29+
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
30+
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
2431
import org.apache.pulsar.client.api.Producer;
2532
import org.apache.pulsar.client.api.transaction.Transaction;
2633
import org.apache.pulsar.common.naming.TopicName;
@@ -30,11 +37,17 @@
3037
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
3138
import org.awaitility.Awaitility;
3239
import org.powermock.reflect.Whitebox;
40+
import org.mockito.Mockito;
41+
import org.testng.Assert;
3342
import org.testng.annotations.AfterMethod;
3443
import org.testng.annotations.BeforeMethod;
3544
import org.testng.annotations.Test;
45+
import java.util.Collections;
3646
import java.util.Map;
47+
import java.util.UUID;
48+
import java.util.concurrent.CompletableFuture;
3749
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.atomic.AtomicReference;
3851

3952
public class TopicTransactionBufferTest extends TransactionTestBase {
4053

@@ -86,4 +99,43 @@ public void testTransactionBufferAppendMarkerWriteFailState() throws Exception {
8699
Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed);
87100
txn.commit().get();
88101
}
102+
103+
@Test
104+
public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception {
105+
String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
106+
PulsarService pulsar = pulsarServiceList.get(0);
107+
BrokerService brokerService0 = pulsar.getBrokerService();
108+
BrokerService brokerService = Mockito.spy(brokerService0);
109+
AtomicReference<PersistentTopic> reference = new AtomicReference<>();
110+
111+
Mockito
112+
.doAnswer(inv -> {
113+
String topic1 = inv.getArgument(0);
114+
ManagedLedger ledger = inv.getArgument(1);
115+
BrokerService service = inv.getArgument(2);
116+
if (TopicName.get(topic1).isPersistent()) {
117+
PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service));
118+
CompletableFuture<Void> f = new CompletableFuture<>();
119+
f.completeExceptionally(new ManagedLedgerException("This is an exception"));
120+
Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
121+
reference.set(pt);
122+
return pt;
123+
} else {
124+
return new NonPersistentTopic(topic1, service);
125+
}
126+
})
127+
.when(brokerService)
128+
.newPersistentTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService));
129+
130+
brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap());
131+
132+
Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null);
133+
PersistentTopic persistentTopic = reference.get();
134+
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
135+
Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
136+
TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
137+
TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
138+
Assert.assertEquals(ttb.getState(), expectState);
139+
}
140+
89141
}

0 commit comments

Comments
 (0)