Skip to content

Commit 326271a

Browse files
poorbarcodesrinath-ctds
authored andcommitted
[fix][broker] Replication stuck when partitions count between two clusters is not the same (apache#22983)
(cherry picked from commit a8ce990) (cherry picked from commit 25542d8)
1 parent 4c58701 commit 326271a

8 files changed

Lines changed: 273 additions & 11 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.pulsar.client.api.Producer;
3737
import org.apache.pulsar.client.api.ProducerBuilder;
3838
import org.apache.pulsar.client.api.Schema;
39+
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
3940
import org.apache.pulsar.client.impl.ProducerImpl;
4041
import org.apache.pulsar.client.impl.PulsarClientImpl;
4142
import org.apache.pulsar.common.naming.TopicName;
@@ -166,6 +167,10 @@ public void startProducer() {
166167
}
167168

168169
log.info("[{}] Starting replicator", replicatorId);
170+
// Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on
171+
// the remote cluster.
172+
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
173+
builderImpl.getConf().setNonPartitionedTopicExpected(true);
169174
producerBuilder.createAsync().thenAccept(producer -> {
170175
setProducerAndTriggerReadEntries(producer);
171176
}).exceptionally(ex -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
150150
Pair<Boolean, State> changeStateRes;
151151
changeStateRes = compareSetAndGetState(Starting, Started);
152152
if (changeStateRes.getLeft()) {
153+
if (!(producer instanceof ProducerImpl)) {
154+
log.error("[{}] The partitions count between two clusters is not the same, the replicator can not be"
155+
+ " created successfully: {}", replicatorId, state);
156+
doCloseProducerAsync(producer, () -> {});
157+
throw new ClassCastException(producer.getClass().getName() + " can not be cast to ProducerImpl");
158+
}
153159
this.producer = (ProducerImpl) producer;
154160
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
155161
// Trigger a new read.

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@
3838
import org.apache.pulsar.broker.PulsarService;
3939
import org.apache.pulsar.broker.ServiceConfiguration;
4040
import org.apache.pulsar.client.api.Producer;
41-
import org.apache.pulsar.client.api.ProducerBuilder;
4241
import org.apache.pulsar.client.api.Schema;
4342
import org.apache.pulsar.client.impl.ConnectionPool;
43+
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
4444
import org.apache.pulsar.client.impl.PulsarClientImpl;
45+
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
4546
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
4647
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
4748
import org.awaitility.Awaitility;
@@ -69,7 +70,8 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
6970
when(localClient.getCnxPool()).thenReturn(connectionPool);
7071
final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
7172
when(remoteClient.getCnxPool()).thenReturn(connectionPool);
72-
final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
73+
final ProducerConfigurationData producerConf = new ProducerConfigurationData();
74+
final ProducerBuilderImpl producerBuilder = mock(ProducerBuilderImpl.class);
7375
final ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
7476
when(broker.executor()).thenReturn(eventLoopGroup);
7577
when(broker.getTopics()).thenReturn(topics);
@@ -85,6 +87,7 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
8587
when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder);
8688
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
8789
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
90+
when(producerBuilder.getConf()).thenReturn(producerConf);
8891
// Mock create producer fail.
8992
when(producerBuilder.create()).thenThrow(new RuntimeException("mocked ex"));
9093
when(producerBuilder.createAsync())

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.Arrays;
3838
import java.util.Collections;
3939
import java.util.HashSet;
40+
import java.util.List;
4041
import java.util.Optional;
4142
import java.util.Set;
4243
import java.util.UUID;
@@ -47,7 +48,9 @@
4748
import java.util.concurrent.atomic.AtomicInteger;
4849
import java.util.concurrent.atomic.AtomicReference;
4950
import java.util.function.BiFunction;
51+
import java.util.function.Predicate;
5052
import java.util.function.Supplier;
53+
import java.util.stream.Collectors;
5154
import lombok.AllArgsConstructor;
5255
import lombok.Data;
5356
import lombok.SneakyThrows;
@@ -76,11 +79,13 @@
7679
import org.apache.pulsar.common.naming.TopicName;
7780
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
7881
import org.apache.pulsar.common.policies.data.ClusterData;
82+
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
7983
import org.apache.pulsar.common.policies.data.RetentionPolicies;
8084
import org.apache.pulsar.common.policies.data.TenantInfo;
8185
import org.apache.pulsar.common.policies.data.TopicStats;
82-
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
86+
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
8387
import org.apache.pulsar.common.util.FutureUtil;
88+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
8489
import org.awaitility.Awaitility;
8590
import org.awaitility.reflect.WhiteboxImpl;
8691
import org.mockito.Mockito;
@@ -1037,4 +1042,90 @@ public void testConfigReplicationStartAt() throws Exception {
10371042
admin1.topics().delete(topic3, false);
10381043
admin2.topics().delete(topic3, false);
10391044
}
1045+
1046+
@DataProvider(name = "replicationModes")
1047+
public Object[][] replicationModes() {
1048+
return new Object[][]{
1049+
{ReplicationMode.OneWay},
1050+
{ReplicationMode.DoubleWay}
1051+
};
1052+
}
1053+
1054+
protected enum ReplicationMode {
1055+
OneWay,
1056+
DoubleWay;
1057+
}
1058+
1059+
@Test(dataProvider = "replicationModes")
1060+
public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception {
1061+
String ns = defaultTenant + "/" + UUID.randomUUID().toString().replace("-", "");
1062+
admin1.namespaces().createNamespace(ns);
1063+
admin2.namespaces().createNamespace(ns);
1064+
1065+
// Set topic auto-creation rule.
1066+
// c1: no-partitioned topic
1067+
// c2: partitioned topic with 2 partitions.
1068+
AutoTopicCreationOverride autoTopicCreation =
1069+
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
1070+
.topicType("partitioned").defaultNumPartitions(2).build();
1071+
admin2.namespaces().setAutoTopicCreation(ns, autoTopicCreation);
1072+
Awaitility.await().untilAsserted(() -> {
1073+
assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join().getDefaultNumPartitions(), 2);
1074+
// Trigger system topic __change_event's initialize.
1075+
pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get("persistent://" + ns + "/1"));
1076+
});
1077+
1078+
// Create non-partitioned topic.
1079+
// Enable replication.
1080+
final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_");
1081+
admin1.topics().createNonPartitionedTopic(tp);
1082+
admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2)));
1083+
if (replicationMode.equals(ReplicationMode.DoubleWay)) {
1084+
admin2.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2)));
1085+
}
1086+
1087+
// Trigger and wait for replicator starts.
1088+
Producer<String> p1 = client1.newProducer(Schema.STRING).topic(tp).create();
1089+
p1.send("msg-1");
1090+
p1.close();
1091+
Awaitility.await().untilAsserted(() -> {
1092+
PersistentTopic persistentTopic = (PersistentTopic) broker1.getTopic(tp, false).join().get();
1093+
assertFalse(persistentTopic.getReplicators().isEmpty());
1094+
});
1095+
1096+
// Verify: the topics are the same between two clusters.
1097+
Predicate<String> topicNameFilter = t -> {
1098+
TopicName topicName = TopicName.get(t);
1099+
if (!topicName.getNamespace().equals(ns)) {
1100+
return false;
1101+
}
1102+
return t.startsWith(tp);
1103+
};
1104+
Awaitility.await().untilAsserted(() -> {
1105+
List<String> topics1 = pulsar1.getBrokerService().getTopics().keys()
1106+
.stream().filter(topicNameFilter).collect(Collectors.toList());
1107+
List<String> topics2 = pulsar2.getBrokerService().getTopics().keys()
1108+
.stream().filter(topicNameFilter).collect(Collectors.toList());
1109+
Collections.sort(topics1);
1110+
Collections.sort(topics2);
1111+
assertEquals(topics1, topics2);
1112+
});
1113+
1114+
// cleanup.
1115+
admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1)));
1116+
if (replicationMode.equals(ReplicationMode.DoubleWay)) {
1117+
admin2.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster2)));
1118+
}
1119+
Awaitility.await().untilAsserted(() -> {
1120+
PersistentTopic persistentTopic = (PersistentTopic) broker1.getTopic(tp, false).join().get();
1121+
assertTrue(persistentTopic.getReplicators().isEmpty());
1122+
if (replicationMode.equals(ReplicationMode.DoubleWay)) {
1123+
assertTrue(persistentTopic.getReplicators().isEmpty());
1124+
}
1125+
});
1126+
admin1.topics().delete(tp, false);
1127+
admin2.topics().delete(tp, false);
1128+
admin1.namespaces().deleteNamespace(ns);
1129+
admin2.namespaces().deleteNamespace(ns);
1130+
}
10401131
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,4 +161,10 @@ public void testConfigReplicationStartAt() throws Exception {
161161
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
162162
});
163163
}
164+
165+
@Test(enabled = false)
166+
@Override
167+
public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception {
168+
super.testDifferentTopicCreationRule(replicationMode);
169+
}
164170
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.pulsar.broker.BrokerTestUtil;
23+
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
24+
import org.apache.pulsar.common.naming.TopicName;
25+
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
26+
import org.apache.pulsar.common.policies.data.TopicType;
27+
import org.testng.Assert;
28+
import org.testng.annotations.AfterClass;
29+
import org.testng.annotations.BeforeClass;
30+
import org.testng.annotations.DataProvider;
31+
import org.testng.annotations.Test;
32+
33+
@Slf4j
34+
public class NonPartitionedTopicExpectedTest extends ProducerConsumerBase {
35+
36+
@BeforeClass
37+
@Override
38+
protected void setup() throws Exception {
39+
super.internalSetup();
40+
super.producerBaseSetup();
41+
}
42+
43+
@AfterClass(alwaysRun = true)
44+
@Override
45+
protected void cleanup() throws Exception {
46+
super.internalCleanup();
47+
}
48+
49+
@Test
50+
public void testWhenNonPartitionedTopicExists() throws Exception {
51+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
52+
admin.topics().createNonPartitionedTopic(topic);
53+
ProducerBuilderImpl<String> producerBuilder =
54+
(ProducerBuilderImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topic);
55+
producerBuilder.getConf().setNonPartitionedTopicExpected(true);
56+
// Verify: create successfully.
57+
Producer producer = producerBuilder.create();
58+
// cleanup.
59+
producer.close();
60+
admin.topics().delete(topic, false);
61+
}
62+
63+
@Test
64+
public void testWhenPartitionedTopicExists() throws Exception {
65+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
66+
admin.topics().createPartitionedTopic(topic, 2);
67+
ProducerBuilderImpl<String> producerBuilder =
68+
(ProducerBuilderImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topic);
69+
producerBuilder.getConf().setNonPartitionedTopicExpected(true);
70+
// Verify: failed to create.
71+
try {
72+
producerBuilder.create();
73+
Assert.fail("expected an error since producer expected a non-partitioned topic");
74+
} catch (Exception ex) {
75+
// expected an error.
76+
log.error("expected error", ex);
77+
}
78+
// cleanup.
79+
admin.topics().deletePartitionedTopic(topic, false);
80+
}
81+
82+
@DataProvider(name = "topicTypes")
83+
public Object[][] topicTypes() {
84+
return new Object[][]{
85+
{TopicType.PARTITIONED},
86+
{TopicType.NON_PARTITIONED}
87+
};
88+
}
89+
90+
@Test(dataProvider = "topicTypes")
91+
public void testWhenTopicNotExists(TopicType topicType) throws Exception {
92+
final String namespace = "public/default";
93+
final String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp");
94+
final TopicName topicName = TopicName.get(topic);
95+
AutoTopicCreationOverride.Builder policyBuilder = AutoTopicCreationOverride.builder()
96+
.topicType(topicType.toString()).allowAutoTopicCreation(true);
97+
if (topicType.equals(TopicType.PARTITIONED)) {
98+
policyBuilder.defaultNumPartitions(2);
99+
}
100+
AutoTopicCreationOverride policy = policyBuilder.build();
101+
admin.namespaces().setAutoTopicCreation(namespace, policy);
102+
103+
ProducerBuilderImpl<String> producerBuilder =
104+
(ProducerBuilderImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topic);
105+
producerBuilder.getConf().setNonPartitionedTopicExpected(true);
106+
// Verify: create successfully.
107+
Producer producer = producerBuilder.create();
108+
// Verify: only create non-partitioned topic.
109+
Assert.assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
110+
.partitionedTopicExists(topicName));
111+
Assert.assertTrue(pulsar.getNamespaceService().checkNonPartitionedTopicExists(topicName).join());
112+
113+
// cleanup.
114+
producer.close();
115+
admin.topics().delete(topic, false);
116+
admin.namespaces().removeAutoTopicCreation(namespace);
117+
}
118+
}

0 commit comments

Comments
 (0)