Skip to content

Commit 1506b48

Browse files
committed
fix test
1 parent efc2890 commit 1506b48

File tree

2 files changed

+122
-5
lines changed

2 files changed

+122
-5
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import static org.testng.Assert.assertFalse;
2626
import static org.testng.Assert.assertNotEquals;
2727
import static org.testng.Assert.assertTrue;
28+
import com.google.common.collect.Sets;
2829
import io.netty.util.concurrent.FastThreadLocalThread;
2930
import java.lang.reflect.Field;
3031
import java.lang.reflect.Method;
3132
import java.time.Duration;
3233
import java.util.Optional;
34+
import java.util.UUID;
3335
import java.util.concurrent.CompletableFuture;
3436
import java.util.concurrent.CountDownLatch;
3537
import java.util.concurrent.TimeUnit;
@@ -409,4 +411,115 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
409411
admin2.topics().delete(topicName);
410412
});
411413
}
414+
415+
/**
416+
* See the description and execution flow: https://github.com/apache/pulsar/pull/21948.
417+
* Steps:
418+
* 1.Create topic, does not enable replication now.
419+
* - The topic will be loaded in the memory.
420+
* 2.Enable namespace level replication.
421+
* - Broker creates a replicator, and the internal producer of replicator is starting.
422+
* - We inject an error to make the internal producer fail to connect,after few seconds, it will retry to start.
423+
* 3.Unload bundle.
424+
* - Starting to close the topic.
425+
* - The replicator will be closed, but it will not close the internal producer, because the producer has not
426+
* been created successfully.
427+
* - We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck. So the topic is still
428+
* in the process of being closed now.
429+
* 4.Internal producer retry to connect.
430+
* - At the next retry, it connected successful. Since the state of "repl.cursor" is not "Closed", this producer
431+
* will not be closed now.
432+
* 5.Topic closed.
433+
* - Cancel the stuck of closing the "repl.cursor".
434+
* - The topic is wholly closed.
435+
* 6.Verify: the delayed created internal producer will be closed. In other words, there is no producer is connected
436+
* to the remote cluster.
437+
*/
438+
@Test
439+
public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception {
440+
final String namespaceName = defaultTenant + "/" + UUID.randomUUID().toString().replaceAll("-", "");
441+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp_");
442+
// 1.Create topic, does not enable replication now.
443+
admin1.namespaces().createNamespace(namespaceName);
444+
admin2.namespaces().createNamespace(namespaceName);
445+
admin1.topics().createNonPartitionedTopic(topicName);
446+
PersistentTopic persistentTopic =
447+
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
448+
449+
// We inject an error to make the internal producer fail to connect.
450+
// The delay time of next retry to create producer is below:
451+
// 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
452+
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
453+
final AtomicInteger createProducerCounter = new AtomicInteger();
454+
final int failTimes = 6;
455+
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
456+
if (topicName.equals(producerCnf.getTopicName())) {
457+
// There is a switch to determine create producer successfully or not.
458+
if (createProducerCounter.incrementAndGet() > failTimes) {
459+
return originalProducer;
460+
}
461+
log.info("Retry create replicator.producer count: {}", createProducerCounter);
462+
// Release producer and fail callback.
463+
originalProducer.closeAsync();
464+
throw new RuntimeException("mock error");
465+
}
466+
return originalProducer;
467+
});
468+
469+
// 2.Enable namespace level replication.
470+
admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1, cluster2));
471+
AtomicReference<PersistentReplicator> replicator = new AtomicReference<PersistentReplicator>();
472+
Awaitility.await().untilAsserted(() -> {
473+
assertFalse(persistentTopic.getReplicators().isEmpty());
474+
replicator.set(
475+
(PersistentReplicator) persistentTopic.getReplicators().values().iterator().next());
476+
// Since we inject a producer creation error, the replicator can not start successfully.
477+
assertFalse(replicator.get().isConnected());
478+
});
479+
480+
// We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck, until the internal
481+
// producer of the replicator started.
482+
SpyCursor spyCursor =
483+
spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName());
484+
CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor);
485+
486+
// 3.Unload bundle: call "topic.close(false)".
487+
// Stuck start new producer, until the state of replicator change to Stopped.
488+
// The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully.
489+
Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
490+
assertTrue(createProducerCounter.get() >= failTimes);
491+
});
492+
CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
493+
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
494+
String state = String.valueOf(replicator.get().getState());
495+
log.error("replicator state: {}", state);
496+
assertTrue(state.equals("Disconnected") || state.equals("Terminated"));
497+
});
498+
499+
// 5.Delay close cursor, until "replicator.producer" create successfully.
500+
// The next once retry time of create "replicator.producer" will be 3.2s.
501+
Thread.sleep(4 * 1000);
502+
log.info("Replicator.state: {}", replicator.get().getState());
503+
cursorCloseSignal.startClose();
504+
cursorCloseSignal.startCallback();
505+
// Wait for topic close successfully.
506+
topicCloseFuture.join();
507+
508+
// 6. Verify there is no orphan producer on the remote cluster.
509+
Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
510+
PersistentTopic persistentTopic2 =
511+
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
512+
assertEquals(persistentTopic2.getProducers().size(), 0);
513+
Assert.assertFalse(replicator.get().isConnected());
514+
});
515+
516+
// cleanup.
517+
cleanupTopics(namespaceName, () -> {
518+
admin1.topics().delete(topicName);
519+
admin2.topics().delete(topicName);
520+
});
521+
admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1));
522+
admin1.namespaces().deleteNamespace(namespaceName);
523+
admin2.namespaces().deleteNamespace(namespaceName);
524+
}
412525
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,12 +147,16 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
147147
}
148148

149149
protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception {
150-
waitChangeEventsInit(defaultNamespace);
151-
admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Collections.singleton(cluster1));
152-
admin1.namespaces().unload(defaultNamespace);
150+
cleanupTopics(defaultNamespace, cleanupTopicAction);
151+
}
152+
153+
protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAction) throws Exception {
154+
waitChangeEventsInit(namespace);
155+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1));
156+
admin1.namespaces().unload(namespace);
153157
cleanupTopicAction.run();
154-
admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster1, cluster2));
155-
waitChangeEventsInit(defaultNamespace);
158+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2));
159+
waitChangeEventsInit(namespace);
156160
}
157161

158162
protected void waitChangeEventsInit(String namespace) {

0 commit comments

Comments
 (0)