Skip to content

Commit b774666

Browse files
authored
[fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21948)
1 parent d475655 commit b774666

File tree

7 files changed

+239
-22
lines changed

7 files changed

+239
-22
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) {
248248
}
249249
startProducer();
250250
}).exceptionally(ex -> {
251-
log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
251+
log.error("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and"
252252
+ " trigger a terminate. Replicator state: {}",
253253
localTopicName, replicatorId, STATE_UPDATER.get(this), ex);
254254
terminate();
@@ -377,9 +377,13 @@ public CompletableFuture<Void> terminate() {
377377
this.producer = null;
378378
// set the cursor as inactive.
379379
disableReplicatorRead();
380+
// release resources.
381+
doReleaseResources();
380382
});
381383
}
382384

385+
protected void doReleaseResources() {}
386+
383387
protected boolean tryChangeStatusToTerminating() {
384388
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){
385389
return true;
@@ -468,4 +472,8 @@ protected ImmutablePair<Boolean, State> compareSetAndGetState(State expect, Stat
468472
}
469473
return compareSetAndGetState(expect, update);
470474
}
475+
476+
public boolean isTerminated() {
477+
return state == State.Terminating || state == State.Terminated;
478+
}
471479
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,6 @@ default Optional<DispatchRateLimiter> getRateLimiter() {
5151
boolean isConnected();
5252

5353
long getNumberOfEntriesInBacklog();
54+
55+
boolean isTerminated();
5456
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
450450
long waitTimeMillis = readFailureBackoff.next();
451451

452452
if (exception instanceof CursorAlreadyClosedException) {
453-
log.error("[{}] Error reading entries because replicator is"
453+
log.warn("[{}] Error reading entries because replicator is"
454454
+ " already deleted and cursor is already closed {}, ({})",
455455
replicatorId, ctx, exception.getMessage(), exception);
456456
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
@@ -570,7 +570,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
570570
log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx,
571571
exception.getMessage(), exception);
572572
if (exception instanceof CursorAlreadyClosedException) {
573-
log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
573+
log.warn("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
574574
+ " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
575575
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
576576
terminate();
@@ -698,6 +698,11 @@ public boolean isConnected() {
698698
return producer != null && producer.isConnected();
699699
}
700700

701+
@Override
702+
protected void doReleaseResources() {
703+
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
704+
}
705+
701706
private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);
702707

703708
@VisibleForTesting

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1731,6 +1731,7 @@ public CompletableFuture<Void> checkReplication() {
17311731
return deleteForcefully();
17321732
}
17331733

1734+
removeTerminatedReplicators(replicators);
17341735
List<CompletableFuture<Void>> futures = new ArrayList<>();
17351736

17361737
// Check for missing replicators
@@ -1769,6 +1770,8 @@ private CompletableFuture<Void> checkShadowReplication() {
17691770
if (log.isDebugEnabled()) {
17701771
log.debug("[{}] Checking shadow replication status, shadowTopics={}", topic, configuredShadowTopics);
17711772
}
1773+
1774+
removeTerminatedReplicators(shadowReplicators);
17721775
List<CompletableFuture<Void>> futures = new ArrayList<>();
17731776

17741777
// Check for missing replicators
@@ -1919,19 +1922,30 @@ protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, Ma
19191922
if (replicationClient == null) {
19201923
return;
19211924
}
1922-
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
1923-
try {
1924-
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
1925-
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
1926-
} catch (PulsarServerException e) {
1927-
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
1925+
lock.readLock().lock();
1926+
try {
1927+
if (isClosingOrDeleting) {
1928+
// Whether is "transferring" or not, do not create new replicator.
1929+
log.info("[{}] Skip to create replicator because this topic is closing."
1930+
+ " remote cluster: {}. State of transferring : {}",
1931+
topic, remoteCluster, transferring);
1932+
return;
19281933
}
1929-
return null;
1930-
});
1931-
1932-
// clean up replicator if startup is failed
1933-
if (replicator == null) {
1934-
replicators.removeNullValue(remoteCluster);
1934+
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
1935+
try {
1936+
return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster,
1937+
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
1938+
} catch (PulsarServerException e) {
1939+
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
1940+
}
1941+
return null;
1942+
});
1943+
// clean up replicator if startup is failed
1944+
if (replicator == null) {
1945+
replicators.removeNullValue(remoteCluster);
1946+
}
1947+
} finally {
1948+
lock.readLock().unlock();
19351949
}
19361950
});
19371951
}
@@ -3881,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() {
38813895
}
38823896

38833897
private void unfenceTopicToResume() {
3884-
subscriptions.values().forEach(sub -> sub.resumeAfterFence());
38853898
isFenced = false;
38863899
isClosingOrDeleting = false;
3900+
subscriptions.values().forEach(sub -> sub.resumeAfterFence());
3901+
unfenceReplicatorsToResume();
3902+
}
3903+
3904+
private void unfenceReplicatorsToResume() {
3905+
checkReplication();
3906+
checkShadowReplication();
3907+
}
3908+
3909+
private void removeTerminatedReplicators(ConcurrentOpenHashMap<String, Replicator> replicators) {
3910+
Map<String, Replicator> terminatedReplicators = new HashMap<>();
3911+
replicators.forEach((cluster, replicator) -> {
3912+
if (replicator.isTerminated()) {
3913+
terminatedReplicators.put(cluster, replicator);
3914+
}
3915+
});
3916+
terminatedReplicators.entrySet().forEach(entry -> {
3917+
replicators.remove(entry.getKey(), entry.getValue());
3918+
});
38873919
}
38883920

38893921
@Override

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,21 @@
2020

2121
import static org.mockito.Mockito.any;
2222
import static org.mockito.Mockito.doAnswer;
23+
import static org.mockito.Mockito.mock;
2324
import static org.mockito.Mockito.spy;
2425
import static org.testng.Assert.assertEquals;
2526
import static org.testng.Assert.assertFalse;
2627
import static org.testng.Assert.assertNotEquals;
2728
import static org.testng.Assert.assertTrue;
2829
import static org.testng.Assert.fail;
30+
import com.google.common.collect.Sets;
2931
import io.netty.util.concurrent.FastThreadLocalThread;
3032
import java.lang.reflect.Field;
3133
import java.lang.reflect.Method;
3234
import java.time.Duration;
3335
import java.util.Arrays;
3436
import java.util.Optional;
37+
import java.util.UUID;
3538
import java.util.concurrent.CompletableFuture;
3639
import java.util.concurrent.CountDownLatch;
3740
import java.util.concurrent.TimeUnit;
@@ -48,6 +51,7 @@
4851
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
4952
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
5053
import org.apache.pulsar.broker.BrokerTestUtil;
54+
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
5155
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
5256
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
5357
import org.apache.pulsar.client.api.Consumer;
@@ -486,4 +490,166 @@ public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throw
486490
admin1.topics().deletePartitionedTopic(topicName);
487491
admin2.topics().deletePartitionedTopic(topicName);
488492
}
493+
494+
/**
495+
* See the description and execution flow: https://github.com/apache/pulsar/pull/21948.
496+
* Steps:
497+
* 1.Create topic, does not enable replication now.
498+
* - The topic will be loaded in the memory.
499+
* 2.Enable namespace level replication.
500+
* - Broker creates a replicator, and the internal producer of replicator is starting.
501+
* - We inject an error to make the internal producer fail to connect,after few seconds, it will retry to start.
502+
* 3.Unload bundle.
503+
* - Starting to close the topic.
504+
* - The replicator will be closed, but it will not close the internal producer, because the producer has not
505+
* been created successfully.
506+
* - We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck. So the topic is still
507+
* in the process of being closed now.
508+
* 4.Internal producer retry to connect.
509+
* - At the next retry, it connected successful. Since the state of "repl.cursor" is not "Closed", this producer
510+
* will not be closed now.
511+
* 5.Topic closed.
512+
* - Cancel the stuck of closing the "repl.cursor".
513+
* - The topic is wholly closed.
514+
* 6.Verify: the delayed created internal producer will be closed. In other words, there is no producer is connected
515+
* to the remote cluster.
516+
*/
517+
@Test
518+
public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception {
519+
final String namespaceName = defaultTenant + "/" + UUID.randomUUID().toString().replaceAll("-", "");
520+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp_");
521+
// 1.Create topic, does not enable replication now.
522+
admin1.namespaces().createNamespace(namespaceName);
523+
admin2.namespaces().createNamespace(namespaceName);
524+
admin1.topics().createNonPartitionedTopic(topicName);
525+
PersistentTopic persistentTopic =
526+
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
527+
528+
// We inject an error to make the internal producer fail to connect.
529+
// The delay time of next retry to create producer is below:
530+
// 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
531+
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
532+
final AtomicInteger createProducerCounter = new AtomicInteger();
533+
final int failTimes = 6;
534+
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
535+
if (topicName.equals(producerCnf.getTopicName())) {
536+
// There is a switch to determine create producer successfully or not.
537+
if (createProducerCounter.incrementAndGet() > failTimes) {
538+
return originalProducer;
539+
}
540+
log.info("Retry create replicator.producer count: {}", createProducerCounter);
541+
// Release producer and fail callback.
542+
originalProducer.closeAsync();
543+
throw new RuntimeException("mock error");
544+
}
545+
return originalProducer;
546+
});
547+
548+
// 2.Enable namespace level replication.
549+
admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1, cluster2));
550+
AtomicReference<PersistentReplicator> replicator = new AtomicReference<PersistentReplicator>();
551+
Awaitility.await().untilAsserted(() -> {
552+
assertFalse(persistentTopic.getReplicators().isEmpty());
553+
replicator.set(
554+
(PersistentReplicator) persistentTopic.getReplicators().values().iterator().next());
555+
// Since we inject a producer creation error, the replicator can not start successfully.
556+
assertFalse(replicator.get().isConnected());
557+
});
558+
559+
// We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck, until the internal
560+
// producer of the replicator started.
561+
SpyCursor spyCursor =
562+
spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName());
563+
CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor);
564+
565+
// 3.Unload bundle: call "topic.close(false)".
566+
// Stuck start new producer, until the state of replicator change to Stopped.
567+
// The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully.
568+
Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
569+
assertTrue(createProducerCounter.get() >= failTimes);
570+
});
571+
CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
572+
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
573+
String state = String.valueOf(replicator.get().getState());
574+
log.error("replicator state: {}", state);
575+
assertTrue(state.equals("Disconnected") || state.equals("Terminated"));
576+
});
577+
578+
// 5.Delay close cursor, until "replicator.producer" create successfully.
579+
// The next once retry time of create "replicator.producer" will be 3.2s.
580+
Thread.sleep(4 * 1000);
581+
log.info("Replicator.state: {}", replicator.get().getState());
582+
cursorCloseSignal.startClose();
583+
cursorCloseSignal.startCallback();
584+
// Wait for topic close successfully.
585+
topicCloseFuture.join();
586+
587+
// 6. Verify there is no orphan producer on the remote cluster.
588+
Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
589+
PersistentTopic persistentTopic2 =
590+
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
591+
assertEquals(persistentTopic2.getProducers().size(), 0);
592+
Assert.assertFalse(replicator.get().isConnected());
593+
});
594+
595+
// cleanup.
596+
cleanupTopics(namespaceName, () -> {
597+
admin1.topics().delete(topicName);
598+
admin2.topics().delete(topicName);
599+
});
600+
admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1));
601+
admin1.namespaces().deleteNamespace(namespaceName);
602+
admin2.namespaces().deleteNamespace(namespaceName);
603+
}
604+
605+
@Test
606+
public void testUnFenceTopicToReuse() throws Exception {
607+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp");
608+
// Wait for replicator started.
609+
Producer<String> producer1 = client1.newProducer(Schema.STRING).topic(topicName).create();
610+
waitReplicatorStarted(topicName);
611+
612+
// Inject an error to make topic close fails.
613+
final String mockProducerName = UUID.randomUUID().toString();
614+
final org.apache.pulsar.broker.service.Producer mockProducer =
615+
mock(org.apache.pulsar.broker.service.Producer.class);
616+
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
617+
.when(mockProducer).disconnect(any());
618+
doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error")))
619+
.when(mockProducer).disconnect();
620+
PersistentTopic persistentTopic =
621+
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
622+
persistentTopic.getProducers().put(mockProducerName, mockProducer);
623+
624+
// Do close.
625+
GeoPersistentReplicator replicator1 =
626+
(GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
627+
try {
628+
persistentTopic.close(true, false).join();
629+
fail("Expected close fails due to a producer close fails");
630+
} catch (Exception ex) {
631+
log.info("Expected error: {}", ex.getMessage());
632+
}
633+
634+
// Broker will call `topic.unfenceTopicToResume` if close clients fails.
635+
// Verify: the replicator will be re-created.
636+
Awaitility.await().untilAsserted(() -> {
637+
assertTrue(producer1.isConnected());
638+
GeoPersistentReplicator replicator2 =
639+
(GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next();
640+
assertNotEquals(replicator1, replicator2);
641+
assertFalse(replicator1.isConnected());
642+
assertFalse(replicator1.producer != null && replicator1.producer.isConnected());
643+
assertTrue(replicator2.isConnected());
644+
assertTrue(replicator2.producer != null && replicator2.producer.isConnected());
645+
});
646+
647+
// cleanup.
648+
persistentTopic.getProducers().remove(mockProducerName, mockProducer);
649+
producer1.close();
650+
cleanupTopics(() -> {
651+
admin1.topics().delete(topicName);
652+
admin2.topics().delete(topicName);
653+
});
654+
}
489655
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,16 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
150150
}
151151

152152
protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception {
153-
waitChangeEventsInit(replicatedNamespace);
154-
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Collections.singleton(cluster1));
155-
admin1.namespaces().unload(replicatedNamespace);
153+
cleanupTopics(replicatedNamespace, cleanupTopicAction);
154+
}
155+
156+
protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAction) throws Exception {
157+
waitChangeEventsInit(namespace);
158+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1));
159+
admin1.namespaces().unload(namespace);
156160
cleanupTopicAction.run();
157-
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1, cluster2));
158-
waitChangeEventsInit(replicatedNamespace);
161+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2));
162+
waitChangeEventsInit(namespace);
159163
}
160164

161165
protected void waitChangeEventsInit(String namespace) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public Object[][] partitionedTopicProvider() {
152152
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
153153
}
154154

155-
@Test
155+
@Test(priority = Integer.MAX_VALUE)
156156
public void testConfigChange() throws Exception {
157157
log.info("--- Starting ReplicatorTest::testConfigChange ---");
158158
// This test is to verify that the config change on global namespace is successfully applied in broker during

0 commit comments

Comments
 (0)