Skip to content

Commit 843bd56

Browse files
poorbarcodenodece
authored andcommitted
[fix][broker] Replication stuck when partitions count between two clusters is not the same (apache#22983)
(cherry picked from commit a8ce990) Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 349c0e9 commit 843bd56

11 files changed

Lines changed: 279 additions & 29 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -369,8 +369,10 @@ private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(Set<String>
369369
}
370370
List<CompletableFuture<Void>> futures = new ArrayList<>();
371371
for (String topicName : topicNames) {
372-
futures.add(namespaceResources().getPartitionedTopicResources()
373-
.deletePartitionedTopicAsync(TopicName.get(topicName)));
372+
TopicName tn = TopicName.get(topicName);
373+
futures.add(pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
374+
.runWithMarkDeleteAsync(tn,
375+
() -> namespaceResources().getPartitionedTopicResources().deletePartitionedTopicAsync(tn)));
374376
}
375377
return FutureUtil.waitForAll(futures);
376378
}
@@ -442,7 +444,8 @@ private CompletableFuture<Policies> precheckWhenDeleteNamespace(NamespaceName ns
442444
replClusterUrl = new URL(replClusterData.getServiceUrlTls());
443445
} else {
444446
throw new RestException(Status.PRECONDITION_FAILED,
445-
"The replication cluster does not provide TLS encrypted service");
447+
"The replication cluster does not provide TLS encrypted "
448+
+ "service");
446449
}
447450
} catch (MalformedURLException checkedEx) {
448451
throw new RestException(checkedEx);

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import javax.ws.rs.core.Response.Status;
4747
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
4848
import org.apache.pulsar.broker.web.RestException;
49-
import org.apache.pulsar.client.admin.PulsarAdminException;
5049
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
5150
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
5251
import org.apache.pulsar.common.naming.NamespaceName;
@@ -69,7 +68,6 @@
6968
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
7069
import org.apache.pulsar.common.policies.data.TenantOperation;
7170
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
72-
import org.apache.pulsar.common.util.FutureUtil;
7371
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
7472
import org.slf4j.Logger;
7573
import org.slf4j.LoggerFactory;
@@ -211,12 +209,6 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
211209
asyncResponse.resume(Response.noContent().build());
212210
})
213211
.exceptionally(ex -> {
214-
Throwable cause = FutureUtil.unwrapCompletionException(ex);
215-
if (cause instanceof PulsarAdminException.ConflictException) {
216-
log.info("[{}] There are new topics created during the namespace deletion, "
217-
+ "retry to delete the namespace again.", namespaceName);
218-
pulsar().getExecutor().execute(() -> internalDeleteNamespaceAsync(force));
219-
}
220212
if (!isRedirectException(ex)) {
221213
log.error("[{}] Failed to delete namespace {}", clientAppId(), namespaceName, ex);
222214
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import javax.ws.rs.core.Response;
4646
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
4747
import org.apache.pulsar.broker.web.RestException;
48-
import org.apache.pulsar.client.admin.PulsarAdminException;
4948
import org.apache.pulsar.client.api.SubscriptionType;
5049
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
5150
import org.apache.pulsar.common.naming.NamespaceName;
@@ -72,7 +71,6 @@
7271
import org.apache.pulsar.common.policies.data.SubscribeRate;
7372
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
7473
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
75-
import org.apache.pulsar.common.util.FutureUtil;
7674
import org.slf4j.Logger;
7775
import org.slf4j.LoggerFactory;
7876

@@ -161,12 +159,6 @@ public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
161159
asyncResponse.resume(Response.noContent().build());
162160
})
163161
.exceptionally(ex -> {
164-
Throwable cause = FutureUtil.unwrapCompletionException(ex);
165-
if (cause instanceof PulsarAdminException.ConflictException) {
166-
log.info("[{}] There are new topics created during the namespace deletion, "
167-
+ "retry to delete the namespace again.", namespaceName);
168-
pulsar().getExecutor().execute(() -> internalDeleteNamespaceAsync(force));
169-
}
170162
if (!isRedirectException(ex)) {
171163
log.error("[{}] Failed to delete namespace {}", clientAppId(), namespaceName, ex);
172164
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.pulsar.client.api.ProducerBuilder;
3131
import org.apache.pulsar.client.api.Schema;
3232
import org.apache.pulsar.client.impl.Backoff;
33+
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
3334
import org.apache.pulsar.client.impl.ProducerImpl;
3435
import org.apache.pulsar.client.impl.PulsarClientImpl;
3536
import org.apache.pulsar.common.naming.TopicName;
@@ -140,6 +141,10 @@ public synchronized void startProducer() {
140141
}
141142

142143
log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
144+
// Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on
145+
// the remote cluster.
146+
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
147+
builderImpl.getConf().setNonPartitionedTopicExpected(true);
143148
producerBuilder.createAsync().thenAccept(producer -> {
144149
readEntries(producer);
145150
}).exceptionally(ex -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ protected void readEntries(org.apache.pulsar.client.api.Producer<byte[]> produce
145145

146146
cursor.cancelPendingReadRequest();
147147
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
148+
if (!(producer instanceof ProducerImpl)) {
149+
log.error("[{}] The partitions count between two clusters is not the same, the replicator can not be"
150+
+ " created successfully: {}", replicatorId);
151+
throw new ClassCastException(producer.getClass().getName() + " can not be cast to ProducerImpl");
152+
}
148153
this.producer = (ProducerImpl) producer;
149154

150155
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@
3838
import org.apache.pulsar.broker.PulsarService;
3939
import org.apache.pulsar.broker.ServiceConfiguration;
4040
import org.apache.pulsar.client.api.Producer;
41-
import org.apache.pulsar.client.api.ProducerBuilder;
4241
import org.apache.pulsar.client.api.Schema;
4342
import org.apache.pulsar.client.impl.ConnectionPool;
43+
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
4444
import org.apache.pulsar.client.impl.PulsarClientImpl;
45+
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
4546
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
4647
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
4748
import org.awaitility.Awaitility;
@@ -69,7 +70,8 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
6970
when(localClient.getCnxPool()).thenReturn(connectionPool);
7071
final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
7172
when(remoteClient.getCnxPool()).thenReturn(connectionPool);
72-
final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
73+
final ProducerConfigurationData producerConf = new ProducerConfigurationData();
74+
final ProducerBuilderImpl producerBuilder = mock(ProducerBuilderImpl.class);
7375
final ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
7476
when(broker.executor()).thenReturn(eventLoopGroup);
7577
when(broker.getTopics()).thenReturn(topics);
@@ -85,6 +87,7 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
8587
when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder);
8688
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
8789
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
90+
when(producerBuilder.getConf()).thenReturn(producerConf);
8891
// Mock create producer fail.
8992
when(producerBuilder.create()).thenThrow(new RuntimeException("mocked ex"));
9093
CompletableFuture failedFuture = CompletableFuture.supplyAsync(() -> {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,23 @@
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
2222
import static org.testng.Assert.assertEquals;
23+
import static org.testng.Assert.assertFalse;
2324
import static org.testng.Assert.assertNotEquals;
2425
import static org.testng.Assert.assertNotNull;
2526
import static org.testng.Assert.assertNull;
27+
import static org.testng.Assert.assertTrue;
2628
import java.lang.reflect.Field;
2729
import java.time.Duration;
2830
import java.util.Arrays;
2931
import java.util.Collections;
3032
import java.util.HashSet;
33+
import java.util.List;
3134
import java.util.Set;
3235
import java.util.UUID;
3336
import java.util.concurrent.TimeUnit;
37+
import java.util.function.Predicate;
3438
import java.util.function.Supplier;
39+
import java.util.stream.Collectors;
3540
import lombok.extern.slf4j.Slf4j;
3641
import org.apache.pulsar.broker.BrokerTestUtil;
3742
import org.apache.pulsar.broker.resources.ClusterResources;
@@ -44,11 +49,15 @@
4449
import org.apache.pulsar.client.api.Producer;
4550
import org.apache.pulsar.client.api.PulsarClient;
4651
import org.apache.pulsar.client.api.Schema;
52+
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
4753
import org.apache.pulsar.client.impl.ProducerImpl;
54+
import org.apache.pulsar.common.naming.TopicName;
55+
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
4856
import org.apache.pulsar.common.policies.data.ClusterData;
4957
import org.apache.pulsar.common.policies.data.RetentionPolicies;
5058
import org.apache.pulsar.common.policies.data.TenantInfo;
5159
import org.apache.pulsar.common.policies.data.TopicStats;
60+
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
5261
import org.apache.pulsar.common.util.FutureUtil;
5362
import org.awaitility.Awaitility;
5463
import org.junit.Assert;
@@ -346,6 +355,7 @@ public void testConfigReplicationStartAt() throws Exception {
346355
enableReplication(topic2);
347356
// Verify: since the replication was started at earliest, there is one message to consume.
348357
Consumer<String> c2 = client2.newConsumer(Schema.STRING).topic(topic2).subscriptionName(subscription1)
358+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
349359
.subscribe();
350360
Message<String> msg2 = c2.receive(2, TimeUnit.SECONDS);
351361
assertNotNull(msg2);
@@ -387,4 +397,91 @@ public void testConfigReplicationStartAt() throws Exception {
387397
admin1.topics().delete(topic3, false);
388398
admin2.topics().delete(topic3, false);
389399
}
400+
401+
@DataProvider(name = "replicationModes")
402+
public Object[][] replicationModes() {
403+
return new Object[][]{
404+
{ReplicationMode.OneWay},
405+
{ReplicationMode.DoubleWay}
406+
};
407+
}
408+
409+
protected enum ReplicationMode {
410+
OneWay,
411+
DoubleWay;
412+
}
413+
414+
@Test(dataProvider = "replicationModes")
415+
public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception {
416+
String ns = defaultTenant + "/" + UUID.randomUUID().toString().replace("-", "");
417+
admin1.namespaces().createNamespace(ns);
418+
admin2.namespaces().createNamespace(ns);
419+
420+
// Set topic auto-creation rule.
421+
// c1: no-partitioned topic
422+
// c2: partitioned topic with 2 partitions.
423+
AutoTopicCreationOverride autoTopicCreation =
424+
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
425+
.topicType("partitioned").defaultNumPartitions(2).build();
426+
admin2.namespaces().setAutoTopicCreation(ns, autoTopicCreation);
427+
Awaitility.await().untilAsserted(() -> {
428+
assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join().getDefaultNumPartitions(),
429+
Integer.valueOf(2));
430+
// Trigger system topic __change_event's initialize.
431+
pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get("persistent://" + ns + "/1"));
432+
});
433+
434+
// Create non-partitioned topic.
435+
// Enable replication.
436+
final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_");
437+
admin1.topics().createNonPartitionedTopic(tp);
438+
admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2)));
439+
if (replicationMode.equals(ReplicationMode.DoubleWay)) {
440+
admin2.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2)));
441+
}
442+
443+
// Trigger and wait for replicator starts.
444+
Producer<String> p1 = client1.newProducer(Schema.STRING).topic(tp).create();
445+
p1.send("msg-1");
446+
p1.close();
447+
Awaitility.await().untilAsserted(() -> {
448+
PersistentTopic persistentTopic = (PersistentTopic) broker1.getTopic(tp, false).join().get();
449+
assertFalse(persistentTopic.getReplicators().isEmpty());
450+
});
451+
452+
// Verify: the topics are the same between two clusters.
453+
Predicate<String> topicNameFilter = t -> {
454+
TopicName topicName = TopicName.get(t);
455+
if (!topicName.getNamespace().equals(ns)) {
456+
return false;
457+
}
458+
return t.startsWith(tp);
459+
};
460+
Awaitility.await().untilAsserted(() -> {
461+
List<String> topics1 = pulsar1.getBrokerService().getTopics().keys()
462+
.stream().filter(topicNameFilter).collect(Collectors.toList());
463+
List<String> topics2 = pulsar2.getBrokerService().getTopics().keys()
464+
.stream().filter(topicNameFilter).collect(Collectors.toList());
465+
Collections.sort(topics1);
466+
Collections.sort(topics2);
467+
assertEquals(topics1, topics2);
468+
});
469+
470+
// cleanup.
471+
admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1)));
472+
if (replicationMode.equals(ReplicationMode.DoubleWay)) {
473+
admin2.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster2)));
474+
}
475+
Awaitility.await().untilAsserted(() -> {
476+
PersistentTopic persistentTopic = (PersistentTopic) broker1.getTopic(tp, false).join().get();
477+
assertTrue(persistentTopic.getReplicators().isEmpty());
478+
if (replicationMode.equals(ReplicationMode.DoubleWay)) {
479+
assertTrue(persistentTopic.getReplicators().isEmpty());
480+
}
481+
});
482+
admin1.topics().delete(tp, false);
483+
admin2.topics().delete(tp, false);
484+
admin1.namespaces().deleteNamespace(ns, true);
485+
admin2.namespaces().deleteNamespace(ns, true);
486+
}
390487
}

0 commit comments

Comments
 (0)