Skip to content

Commit 31bfc0d

Browse files
committed
[fix][broker] Replication stuck when partitions count between two clusters is not the same (#22983)
(cherry picked from commit a8ce990)
1 parent bfa92e5 commit 31bfc0d

8 files changed

Lines changed: 272 additions & 11 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.pulsar.client.api.Producer;
3737
import org.apache.pulsar.client.api.ProducerBuilder;
3838
import org.apache.pulsar.client.api.Schema;
39+
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
3940
import org.apache.pulsar.client.impl.ProducerImpl;
4041
import org.apache.pulsar.client.impl.PulsarClientImpl;
4142
import org.apache.pulsar.common.naming.TopicName;
@@ -166,6 +167,10 @@ public void startProducer() {
166167
}
167168

168169
log.info("[{}] Starting replicator", replicatorId);
170+
// Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on
171+
// the remote cluster.
172+
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
173+
builderImpl.getConf().setNonPartitionedTopicExpected(true);
169174
producerBuilder.createAsync().thenAccept(producer -> {
170175
setProducerAndTriggerReadEntries(producer);
171176
}).exceptionally(ex -> {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
150150
Pair<Boolean, State> changeStateRes;
151151
changeStateRes = compareSetAndGetState(Starting, Started);
152152
if (changeStateRes.getLeft()) {
153+
if (!(producer instanceof ProducerImpl)) {
154+
log.error("[{}] The partitions count between two clusters is not the same, the replicator can not be"
155+
+ " created successfully: {}", replicatorId, state);
156+
doCloseProducerAsync(producer, () -> {});
157+
throw new ClassCastException(producer.getClass().getName() + " can not be cast to ProducerImpl");
158+
}
153159
this.producer = (ProducerImpl) producer;
154160
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
155161
// Trigger a new read.

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@
3939
import org.apache.pulsar.broker.PulsarService;
4040
import org.apache.pulsar.broker.ServiceConfiguration;
4141
import org.apache.pulsar.client.api.Producer;
42-
import org.apache.pulsar.client.api.ProducerBuilder;
4342
import org.apache.pulsar.client.api.Schema;
4443
import org.apache.pulsar.client.impl.ConnectionPool;
44+
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
4545
import org.apache.pulsar.client.impl.PulsarClientImpl;
46+
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
4647
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
4748
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
4849
import org.awaitility.Awaitility;
@@ -71,7 +72,8 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
7172
when(localClient.getCnxPool()).thenReturn(connectionPool);
7273
final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
7374
when(remoteClient.getCnxPool()).thenReturn(connectionPool);
74-
final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
75+
final ProducerConfigurationData producerConf = new ProducerConfigurationData();
76+
final ProducerBuilderImpl producerBuilder = mock(ProducerBuilderImpl.class);
7577
final ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
7678
when(broker.executor()).thenReturn(eventLoopGroup);
7779
when(broker.getTopics()).thenReturn(topics);
@@ -87,6 +89,7 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
8789
when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder);
8890
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
8991
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
92+
when(producerBuilder.getConf()).thenReturn(producerConf);
9093
// Mock create producer fail.
9194
when(producerBuilder.create()).thenThrow(new RuntimeException("mocked ex"));
9295
when(producerBuilder.createAsync())

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.Collections;
4141
import java.util.HashSet;
4242
import java.util.Iterator;
43+
import java.util.List;
4344
import java.util.Optional;
4445
import java.util.Set;
4546
import java.util.UUID;
@@ -50,7 +51,9 @@
5051
import java.util.concurrent.atomic.AtomicInteger;
5152
import java.util.concurrent.atomic.AtomicReference;
5253
import java.util.function.BiFunction;
54+
import java.util.function.Predicate;
5355
import java.util.function.Supplier;
56+
import java.util.stream.Collectors;
5457
import lombok.AllArgsConstructor;
5558
import lombok.Data;
5659
import lombok.SneakyThrows;
@@ -79,11 +82,13 @@
7982
import org.apache.pulsar.common.naming.TopicName;
8083
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
8184
import org.apache.pulsar.common.policies.data.ClusterData;
85+
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
8286
import org.apache.pulsar.common.policies.data.RetentionPolicies;
8387
import org.apache.pulsar.common.policies.data.TenantInfo;
8488
import org.apache.pulsar.common.policies.data.TopicStats;
85-
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
89+
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
8690
import org.apache.pulsar.common.util.FutureUtil;
91+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
8792
import org.awaitility.Awaitility;
8893
import org.awaitility.reflect.WhiteboxImpl;
8994
import org.mockito.Mockito;
@@ -1069,4 +1074,90 @@ public void testConfigReplicationStartAt() throws Exception {
10691074
admin1.topics().delete(topic3, false);
10701075
admin2.topics().delete(topic3, false);
10711076
}
1077+
1078+
@DataProvider(name = "replicationModes")
1079+
public Object[][] replicationModes() {
1080+
return new Object[][]{
1081+
{ReplicationMode.OneWay},
1082+
{ReplicationMode.DoubleWay}
1083+
};
1084+
}
1085+
1086+
protected enum ReplicationMode {
1087+
OneWay,
1088+
DoubleWay;
1089+
}
1090+
1091+
@Test(dataProvider = "replicationModes")
1092+
public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception {
1093+
String ns = defaultTenant + "/" + UUID.randomUUID().toString().replace("-", "");
1094+
admin1.namespaces().createNamespace(ns);
1095+
admin2.namespaces().createNamespace(ns);
1096+
1097+
// Set topic auto-creation rule.
1098+
// c1: no-partitioned topic
1099+
// c2: partitioned topic with 2 partitions.
1100+
AutoTopicCreationOverride autoTopicCreation =
1101+
AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true)
1102+
.topicType("partitioned").defaultNumPartitions(2).build();
1103+
admin2.namespaces().setAutoTopicCreation(ns, autoTopicCreation);
1104+
Awaitility.await().untilAsserted(() -> {
1105+
assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join().getDefaultNumPartitions(), 2);
1106+
// Trigger system topic __change_event's initialize.
1107+
pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get("persistent://" + ns + "/1"));
1108+
});
1109+
1110+
// Create non-partitioned topic.
1111+
// Enable replication.
1112+
final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_");
1113+
admin1.topics().createNonPartitionedTopic(tp);
1114+
admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2)));
1115+
if (replicationMode.equals(ReplicationMode.DoubleWay)) {
1116+
admin2.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2)));
1117+
}
1118+
1119+
// Trigger and wait for replicator starts.
1120+
Producer<String> p1 = client1.newProducer(Schema.STRING).topic(tp).create();
1121+
p1.send("msg-1");
1122+
p1.close();
1123+
Awaitility.await().untilAsserted(() -> {
1124+
PersistentTopic persistentTopic = (PersistentTopic) broker1.getTopic(tp, false).join().get();
1125+
assertFalse(persistentTopic.getReplicators().isEmpty());
1126+
});
1127+
1128+
// Verify: the topics are the same between two clusters.
1129+
Predicate<String> topicNameFilter = t -> {
1130+
TopicName topicName = TopicName.get(t);
1131+
if (!topicName.getNamespace().equals(ns)) {
1132+
return false;
1133+
}
1134+
return t.startsWith(tp);
1135+
};
1136+
Awaitility.await().untilAsserted(() -> {
1137+
List<String> topics1 = pulsar1.getBrokerService().getTopics().keys()
1138+
.stream().filter(topicNameFilter).collect(Collectors.toList());
1139+
List<String> topics2 = pulsar2.getBrokerService().getTopics().keys()
1140+
.stream().filter(topicNameFilter).collect(Collectors.toList());
1141+
Collections.sort(topics1);
1142+
Collections.sort(topics2);
1143+
assertEquals(topics1, topics2);
1144+
});
1145+
1146+
// cleanup.
1147+
admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1)));
1148+
if (replicationMode.equals(ReplicationMode.DoubleWay)) {
1149+
admin2.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster2)));
1150+
}
1151+
Awaitility.await().untilAsserted(() -> {
1152+
PersistentTopic persistentTopic = (PersistentTopic) broker1.getTopic(tp, false).join().get();
1153+
assertTrue(persistentTopic.getReplicators().isEmpty());
1154+
if (replicationMode.equals(ReplicationMode.DoubleWay)) {
1155+
assertTrue(persistentTopic.getReplicators().isEmpty());
1156+
}
1157+
});
1158+
admin1.topics().delete(tp, false);
1159+
admin2.topics().delete(tp, false);
1160+
admin1.namespaces().deleteNamespace(ns);
1161+
admin2.namespaces().deleteNamespace(ns);
1162+
}
10721163
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,4 +161,10 @@ public void testConfigReplicationStartAt() throws Exception {
161161
pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest");
162162
});
163163
}
164+
165+
@Test(enabled = false)
166+
@Override
167+
public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception {
168+
super.testDifferentTopicCreationRule(replicationMode);
169+
}
164170
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.pulsar.broker.BrokerTestUtil;
23+
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
24+
import org.apache.pulsar.common.naming.TopicName;
25+
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
26+
import org.apache.pulsar.common.policies.data.TopicType;
27+
import org.testng.Assert;
28+
import org.testng.annotations.AfterClass;
29+
import org.testng.annotations.BeforeClass;
30+
import org.testng.annotations.DataProvider;
31+
import org.testng.annotations.Test;
32+
33+
@Slf4j
34+
public class NonPartitionedTopicExpectedTest extends ProducerConsumerBase {
35+
36+
@BeforeClass
37+
@Override
38+
protected void setup() throws Exception {
39+
super.internalSetup();
40+
super.producerBaseSetup();
41+
}
42+
43+
@AfterClass(alwaysRun = true)
44+
@Override
45+
protected void cleanup() throws Exception {
46+
super.internalCleanup();
47+
}
48+
49+
@Test
50+
public void testWhenNonPartitionedTopicExists() throws Exception {
51+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
52+
admin.topics().createNonPartitionedTopic(topic);
53+
ProducerBuilderImpl<String> producerBuilder =
54+
(ProducerBuilderImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topic);
55+
producerBuilder.getConf().setNonPartitionedTopicExpected(true);
56+
// Verify: create successfully.
57+
Producer producer = producerBuilder.create();
58+
// cleanup.
59+
producer.close();
60+
admin.topics().delete(topic, false);
61+
}
62+
63+
@Test
64+
public void testWhenPartitionedTopicExists() throws Exception {
65+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
66+
admin.topics().createPartitionedTopic(topic, 2);
67+
ProducerBuilderImpl<String> producerBuilder =
68+
(ProducerBuilderImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topic);
69+
producerBuilder.getConf().setNonPartitionedTopicExpected(true);
70+
// Verify: failed to create.
71+
try {
72+
producerBuilder.create();
73+
Assert.fail("expected an error since producer expected a non-partitioned topic");
74+
} catch (Exception ex) {
75+
// expected an error.
76+
log.error("expected error", ex);
77+
}
78+
// cleanup.
79+
admin.topics().deletePartitionedTopic(topic, false);
80+
}
81+
82+
@DataProvider(name = "topicTypes")
83+
public Object[][] topicTypes() {
84+
return new Object[][]{
85+
{TopicType.PARTITIONED},
86+
{TopicType.NON_PARTITIONED}
87+
};
88+
}
89+
90+
@Test(dataProvider = "topicTypes")
91+
public void testWhenTopicNotExists(TopicType topicType) throws Exception {
92+
final String namespace = "public/default";
93+
final String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp");
94+
final TopicName topicName = TopicName.get(topic);
95+
AutoTopicCreationOverride.Builder policyBuilder = AutoTopicCreationOverride.builder()
96+
.topicType(topicType.toString()).allowAutoTopicCreation(true);
97+
if (topicType.equals(TopicType.PARTITIONED)) {
98+
policyBuilder.defaultNumPartitions(2);
99+
}
100+
AutoTopicCreationOverride policy = policyBuilder.build();
101+
admin.namespaces().setAutoTopicCreation(namespace, policy);
102+
103+
ProducerBuilderImpl<String> producerBuilder =
104+
(ProducerBuilderImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topic);
105+
producerBuilder.getConf().setNonPartitionedTopicExpected(true);
106+
// Verify: create successfully.
107+
Producer producer = producerBuilder.create();
108+
// Verify: only create non-partitioned topic.
109+
Assert.assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
110+
.partitionedTopicExists(topicName));
111+
Assert.assertTrue(pulsar.getNamespaceService().checkNonPartitionedTopicExists(topicName).join());
112+
113+
// cleanup.
114+
producer.close();
115+
admin.topics().delete(topic, false);
116+
admin.namespaces().removeAutoTopicCreation(namespace);
117+
}
118+
}

0 commit comments

Comments
 (0)