Skip to content

Commit 3a218dd

Browse files
committed
[fix] [test] Fix flaky test ReplicatorTest (#22594)
(cherry picked from commit 6fdc0e3)
1 parent 06a9fec commit 3a218dd

2 files changed

Lines changed: 130 additions & 120 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java

Lines changed: 98 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,22 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.testng.Assert.fail;
2122
import com.google.common.collect.Sets;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.Callable;
26+
import java.util.concurrent.Future;
2227
import lombok.Cleanup;
23-
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
28+
import lombok.extern.slf4j.Slf4j;
29+
import org.apache.pulsar.broker.BrokerTestUtil;
2430
import org.apache.pulsar.client.api.MessageRoutingMode;
2531
import org.apache.pulsar.client.api.PulsarClient;
2632
import org.apache.pulsar.client.impl.ConsumerImpl;
2733
import org.apache.pulsar.client.impl.ProducerImpl;
34+
import org.apache.pulsar.common.naming.TopicName;
35+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
2836
import org.awaitility.Awaitility;
29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
3137
import org.testng.Assert;
3238
import org.testng.annotations.AfterClass;
3339
import org.testng.annotations.BeforeClass;
@@ -37,6 +43,11 @@
3743
import java.lang.reflect.Method;
3844
import java.util.concurrent.TimeUnit;
3945

46+
/**
47+
* The tests in this class should be denied in a production pulsar cluster. they are very dangerous, which leads to
48+
* a lot of topic deletion and makes namespace policies being incorrect.
49+
*/
50+
@Slf4j
4051
@Test(groups = "broker-impl")
4152
public class ReplicatorGlobalNSTest extends ReplicatorTestBase {
4253

@@ -65,7 +76,7 @@ public void cleanup() throws Exception {
6576
*
6677
* @throws Exception
6778
*/
68-
@Test
79+
@Test(priority = Integer.MAX_VALUE)
6980
public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
7081
log.info("--- Starting ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---");
7182

@@ -99,32 +110,88 @@ public void testRemoveLocalClusterOnGlobalNamespace() throws Exception {
99110
});
100111
}
101112

102-
@Test
103-
public void testForcefullyTopicDeletion() throws Exception {
104-
log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");
105-
106-
final String namespace = "pulsar/removeClusterTest";
107-
admin1.namespaces().createNamespace(namespace);
108-
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
109-
110-
final String topicName = "persistent://" + namespace + "/topic";
111-
112-
@Cleanup
113-
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
114-
.build();
115-
116-
ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
117-
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
118-
producer1.close();
119-
120-
admin1.topics().delete(topicName, true);
121-
122-
MockedPulsarServiceBaseTest
123-
.retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);
124-
125-
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
113+
/**
114+
* This is not a formal operation and can cause serious problems if call it in a production environment.
115+
*/
116+
@Test(priority = Integer.MAX_VALUE - 1)
117+
public void testConfigChange() throws Exception {
118+
log.info("--- Starting ReplicatorTest::testConfigChange ---");
119+
// This test is to verify that the config change on global namespace is successfully applied in broker during
120+
// runtime.
121+
// Run a set of producer tasks to create the topics
122+
List<Future<Void>> results = new ArrayList<>();
123+
for (int i = 0; i < 10; i++) {
124+
final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
125+
126+
results.add(executor.submit(new Callable<Void>() {
127+
@Override
128+
public Void call() throws Exception {
129+
130+
@Cleanup
131+
MessageProducer producer = new MessageProducer(url1, dest);
132+
log.info("--- Starting producer --- " + url1);
133+
134+
@Cleanup
135+
MessageConsumer consumer = new MessageConsumer(url1, dest);
136+
log.info("--- Starting Consumer --- " + url1);
137+
138+
producer.produce(2);
139+
consumer.receive(2);
140+
return null;
141+
}
142+
}));
143+
}
144+
145+
for (Future<Void> result : results) {
146+
try {
147+
result.get();
148+
} catch (Exception e) {
149+
log.error("exception in getting future result ", e);
150+
fail(String.format("replication test failed with %s exception", e.getMessage()));
151+
}
152+
}
153+
154+
Thread.sleep(1000L);
155+
// Make sure that the internal replicators map contains remote cluster info
156+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
157+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
158+
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
159+
160+
Assert.assertNotNull(replicationClients1.get("r2"));
161+
Assert.assertNotNull(replicationClients1.get("r3"));
162+
Assert.assertNotNull(replicationClients2.get("r1"));
163+
Assert.assertNotNull(replicationClients2.get("r3"));
164+
Assert.assertNotNull(replicationClients3.get("r1"));
165+
Assert.assertNotNull(replicationClients3.get("r2"));
166+
167+
// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
168+
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
169+
170+
// Wait for config changes to be updated.
171+
Thread.sleep(1000L);
172+
173+
// Make sure that the internal replicators map still contains remote cluster info
174+
Assert.assertNotNull(replicationClients1.get("r2"));
175+
Assert.assertNotNull(replicationClients1.get("r3"));
176+
Assert.assertNotNull(replicationClients2.get("r1"));
177+
Assert.assertNotNull(replicationClients2.get("r3"));
178+
Assert.assertNotNull(replicationClients3.get("r1"));
179+
Assert.assertNotNull(replicationClients3.get("r2"));
180+
181+
// Case 2: Update the configuration back
182+
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
183+
184+
// Wait for config changes to be updated.
185+
Thread.sleep(1000L);
186+
187+
// Make sure that the internal replicators map still contains remote cluster info
188+
Assert.assertNotNull(replicationClients1.get("r2"));
189+
Assert.assertNotNull(replicationClients1.get("r3"));
190+
Assert.assertNotNull(replicationClients2.get("r1"));
191+
Assert.assertNotNull(replicationClients2.get("r3"));
192+
Assert.assertNotNull(replicationClients3.get("r1"));
193+
Assert.assertNotNull(replicationClients3.get("r2"));
194+
195+
// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
126196
}
127-
128-
private static final Logger log = LoggerFactory.getLogger(ReplicatorGlobalNSTest.class);
129-
130197
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java

Lines changed: 32 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,11 @@
4444
import java.util.SortedSet;
4545
import java.util.TreeSet;
4646
import java.util.UUID;
47-
import java.util.concurrent.Callable;
4847
import java.util.concurrent.CompletableFuture;
4948
import java.util.concurrent.ConcurrentHashMap;
5049
import java.util.concurrent.CountDownLatch;
5150
import java.util.concurrent.ExecutorService;
5251
import java.util.concurrent.Executors;
53-
import java.util.concurrent.Future;
5452
import java.util.concurrent.TimeUnit;
5553
import java.util.concurrent.atomic.AtomicBoolean;
5654
import java.util.stream.Collectors;
@@ -68,6 +66,7 @@
6866
import org.apache.bookkeeper.mledger.impl.PositionImpl;
6967
import org.apache.pulsar.broker.BrokerTestUtil;
7068
import org.apache.pulsar.broker.PulsarService;
69+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
7170
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
7271
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
7372
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -154,88 +153,6 @@ public Object[][] partitionedTopicProvider() {
154153
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
155154
}
156155

157-
@Test(priority = Integer.MAX_VALUE)
158-
public void testConfigChange() throws Exception {
159-
log.info("--- Starting ReplicatorTest::testConfigChange ---");
160-
// This test is to verify that the config change on global namespace is successfully applied in broker during
161-
// runtime.
162-
// Run a set of producer tasks to create the topics
163-
List<Future<Void>> results = new ArrayList<>();
164-
for (int i = 0; i < 10; i++) {
165-
final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i));
166-
167-
results.add(executor.submit(new Callable<Void>() {
168-
@Override
169-
public Void call() throws Exception {
170-
171-
@Cleanup
172-
MessageProducer producer = new MessageProducer(url1, dest);
173-
log.info("--- Starting producer --- " + url1);
174-
175-
@Cleanup
176-
MessageConsumer consumer = new MessageConsumer(url1, dest);
177-
log.info("--- Starting Consumer --- " + url1);
178-
179-
producer.produce(2);
180-
consumer.receive(2);
181-
return null;
182-
}
183-
}));
184-
}
185-
186-
for (Future<Void> result : results) {
187-
try {
188-
result.get();
189-
} catch (Exception e) {
190-
log.error("exception in getting future result ", e);
191-
fail(String.format("replication test failed with %s exception", e.getMessage()));
192-
}
193-
}
194-
195-
Thread.sleep(1000L);
196-
// Make sure that the internal replicators map contains remote cluster info
197-
ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
198-
ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
199-
ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
200-
201-
Assert.assertNotNull(replicationClients1.get("r2"));
202-
Assert.assertNotNull(replicationClients1.get("r3"));
203-
Assert.assertNotNull(replicationClients2.get("r1"));
204-
Assert.assertNotNull(replicationClients2.get("r3"));
205-
Assert.assertNotNull(replicationClients3.get("r1"));
206-
Assert.assertNotNull(replicationClients3.get("r2"));
207-
208-
// Case 1: Update the global namespace replication configuration to only contains the local cluster itself
209-
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
210-
211-
// Wait for config changes to be updated.
212-
Thread.sleep(1000L);
213-
214-
// Make sure that the internal replicators map still contains remote cluster info
215-
Assert.assertNotNull(replicationClients1.get("r2"));
216-
Assert.assertNotNull(replicationClients1.get("r3"));
217-
Assert.assertNotNull(replicationClients2.get("r1"));
218-
Assert.assertNotNull(replicationClients2.get("r3"));
219-
Assert.assertNotNull(replicationClients3.get("r1"));
220-
Assert.assertNotNull(replicationClients3.get("r2"));
221-
222-
// Case 2: Update the configuration back
223-
admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
224-
225-
// Wait for config changes to be updated.
226-
Thread.sleep(1000L);
227-
228-
// Make sure that the internal replicators map still contains remote cluster info
229-
Assert.assertNotNull(replicationClients1.get("r2"));
230-
Assert.assertNotNull(replicationClients1.get("r3"));
231-
Assert.assertNotNull(replicationClients2.get("r1"));
232-
Assert.assertNotNull(replicationClients2.get("r3"));
233-
Assert.assertNotNull(replicationClients3.get("r1"));
234-
Assert.assertNotNull(replicationClients3.get("r2"));
235-
236-
// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
237-
}
238-
239156
@Test(timeOut = 10000)
240157
public void activeBrokerParse() throws Exception {
241158
pulsar1.getConfiguration().setAuthorizationEnabled(true);
@@ -253,6 +170,32 @@ public void activeBrokerParse() throws Exception {
253170
pulsar1.getConfiguration().setAuthorizationEnabled(false);
254171
}
255172

173+
@Test
174+
public void testForcefullyTopicDeletion() throws Exception {
175+
log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---");
176+
177+
final String namespace = BrokerTestUtil.newUniqueName("pulsar/removeClusterTest");
178+
admin1.namespaces().createNamespace(namespace);
179+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
180+
181+
final String topicName = "persistent://" + namespace + "/topic";
182+
183+
@Cleanup
184+
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
185+
.build();
186+
187+
ProducerImpl<byte[]> producer1 = (ProducerImpl<byte[]>) client1.newProducer().topic(topicName)
188+
.enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
189+
producer1.close();
190+
191+
admin1.topics().delete(topicName, true);
192+
193+
MockedPulsarServiceBaseTest
194+
.retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150);
195+
196+
Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName));
197+
}
198+
256199
@SuppressWarnings("unchecked")
257200
@Test(timeOut = 30000)
258201
public void testConcurrentReplicator() throws Exception {
@@ -1270,7 +1213,7 @@ public void testReplicatedCluster() throws Exception {
12701213

12711214
log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");
12721215

1273-
final String namespace = "pulsar/global/repl";
1216+
final String namespace = BrokerTestUtil.newUniqueName("pulsar/global/repl");
12741217
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1");
12751218
admin1.namespaces().createNamespace(namespace);
12761219
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
@@ -1677,7 +1620,7 @@ public void testReplicatorWithFailedAck() throws Exception {
16771620

16781621
log.info("--- Starting ReplicatorTest::testReplication ---");
16791622

1680-
String namespace = "pulsar/global/ns2";
1623+
String namespace = BrokerTestUtil.newUniqueName("pulsar/global/ns");
16811624
admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1"));
16821625
final TopicName dest = TopicName
16831626
.get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/ackFailedTopic"));
@@ -1749,7 +1692,7 @@ public void testReplicatorWithFailedAck() throws Exception {
17491692
@Test
17501693
public void testWhenUpdateReplicationCluster() throws Exception {
17511694
log.info("--- testWhenUpdateReplicationCluster ---");
1752-
String namespace = "pulsar/ns2";
1695+
String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");;
17531696
admin1.namespaces().createNamespace(namespace);
17541697
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
17551698
final TopicName dest = TopicName.get(
@@ -1778,12 +1721,12 @@ public void testWhenUpdateReplicationCluster() throws Exception {
17781721
@Test
17791722
public void testReplicatorProducerNotExceed() throws Exception {
17801723
log.info("--- testReplicatorProducerNotExceed ---");
1781-
String namespace1 = "pulsar/ns11";
1724+
String namespace1 = BrokerTestUtil.newUniqueName("pulsar/ns1");
17821725
admin1.namespaces().createNamespace(namespace1);
17831726
admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2"));
17841727
final TopicName dest1 = TopicName.get(
17851728
BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1"));
1786-
String namespace2 = "pulsar/ns22";
1729+
String namespace2 = BrokerTestUtil.newUniqueName("pulsar/ns2");
17871730
admin2.namespaces().createNamespace(namespace2);
17881731
admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2"));
17891732
final TopicName dest2 = TopicName.get(

0 commit comments

Comments
 (0)