Skip to content

Commit c163106

Browse files
committed
add a test
1 parent e7c6318 commit c163106

2 files changed

Lines changed: 263 additions & 0 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,56 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.doAnswer;
23+
import static org.mockito.Mockito.spy;
2124
import static org.testng.Assert.assertEquals;
2225
import static org.testng.Assert.assertFalse;
2326
import static org.testng.Assert.assertTrue;
27+
import com.google.common.collect.Sets;
28+
import io.netty.util.concurrent.FastThreadLocalThread;
2429
import java.lang.reflect.Field;
30+
import java.lang.reflect.Method;
31+
import java.time.Duration;
2532
import java.util.Optional;
33+
import java.util.UUID;
2634
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.CountDownLatch;
2736
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicInteger;
38+
import java.util.concurrent.atomic.AtomicReference;
39+
import java.util.function.BiFunction;
40+
import lombok.AllArgsConstructor;
41+
import lombok.Data;
42+
import lombok.SneakyThrows;
43+
import lombok.extern.slf4j.Slf4j;
44+
import org.apache.bookkeeper.mledger.AsyncCallbacks;
45+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
46+
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
47+
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
2848
import org.apache.pulsar.broker.BrokerTestUtil;
2949
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
3050
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
3151
import org.apache.pulsar.client.api.Consumer;
3252
import org.apache.pulsar.client.api.Producer;
53+
import org.apache.pulsar.client.api.ProducerBuilder;
54+
import org.apache.pulsar.client.api.PulsarClient;
3355
import org.apache.pulsar.client.api.Schema;
56+
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
3457
import org.apache.pulsar.client.impl.ProducerImpl;
58+
import org.apache.pulsar.client.impl.PulsarClientImpl;
59+
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
3560
import org.apache.pulsar.common.policies.data.TopicStats;
61+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
3662
import org.awaitility.Awaitility;
63+
import org.awaitility.reflect.WhiteboxImpl;
3764
import org.mockito.Mockito;
3865
import org.testng.Assert;
3966
import org.testng.annotations.AfterClass;
4067
import org.testng.annotations.BeforeClass;
4168
import org.testng.annotations.Test;
4269

70+
@Slf4j
4371
@Test(groups = "broker")
4472
public class OneWayReplicatorTest extends OneWayReplicatorTestBase {
4573

@@ -153,4 +181,225 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
153181
admin2.topics().delete(topicName);
154182
});
155183
}
184+
185+
private void injectMockReplicatorProducerBuilder(
186+
BiFunction<ProducerConfigurationData, ProducerImpl, ProducerImpl> producerDecorator)
187+
throws Exception {
188+
String cluster2 = pulsar2.getConfig().getClusterName();
189+
BrokerService brokerService = pulsar1.getBrokerService();
190+
// Wait for the internal client created.
191+
final String topicNameTriggerInternalClientCreate =
192+
BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
193+
admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate);
194+
waitReplicatorStarted(topicNameTriggerInternalClientCreate);
195+
cleanupTopics(() -> {
196+
admin1.topics().delete(topicNameTriggerInternalClientCreate);
197+
admin2.topics().delete(topicNameTriggerInternalClientCreate);
198+
});
199+
200+
// Inject spy client.
201+
ConcurrentOpenHashMap<String, PulsarClient>
202+
replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients");
203+
PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2);
204+
PulsarClient spyClient = spy(internalClient);
205+
replicationClients.put(cluster2, spyClient);
206+
207+
// Inject producer decorator.
208+
doAnswer(invocation -> {
209+
Schema schema = (Schema) invocation.getArguments()[0];
210+
ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl) internalClient.newProducer(schema);
211+
ProducerBuilder spyProducerBuilder = spy(producerBuilder);
212+
doAnswer(ignore -> {
213+
CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
214+
final ProducerImpl p = (ProducerImpl) producerBuilder.create();
215+
new FastThreadLocalThread(() -> {
216+
try {
217+
ProducerImpl newProducer = producerDecorator.apply(producerBuilder.getConf(), p);
218+
producerFuture.complete(newProducer);
219+
} catch (Exception ex) {
220+
producerFuture.completeExceptionally(ex);
221+
}
222+
}).start();
223+
return producerFuture;
224+
}).when(spyProducerBuilder).createAsync();
225+
return spyProducerBuilder;
226+
}).when(spyClient).newProducer(any(Schema.class));
227+
}
228+
229+
private SpyCursor spyCursor(PersistentTopic persistentTopic, String cursorName) throws Exception {
230+
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
231+
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(cursorName);
232+
ManagedCursorImpl spyCursor = spy(cursor);
233+
// remove cursor.
234+
ml.getCursors().removeCursor(cursorName);
235+
ml.deactivateCursor(cursor);
236+
// Add the spy one. addCursor(ManagedCursorImpl cursor)
237+
Method m = ManagedLedgerImpl.class.getDeclaredMethod("addCursor", new Class[]{ManagedCursorImpl.class});
238+
m.setAccessible(true);
239+
m.invoke(ml, new Object[]{spyCursor});
240+
return new SpyCursor(cursor, spyCursor);
241+
}
242+
243+
@Data
244+
@AllArgsConstructor
245+
static class SpyCursor {
246+
ManagedCursorImpl original;
247+
ManagedCursorImpl spy;
248+
}
249+
250+
private CursorCloseSignal makeCursorClosingDelay(SpyCursor spyCursor) throws Exception {
251+
CountDownLatch startCloseSignal = new CountDownLatch(1);
252+
CountDownLatch startCallbackSignal = new CountDownLatch(1);
253+
doAnswer(invocation -> {
254+
AsyncCallbacks.CloseCallback originalCallback = (AsyncCallbacks.CloseCallback) invocation.getArguments()[0];
255+
Object ctx = invocation.getArguments()[1];
256+
AsyncCallbacks.CloseCallback newCallback = new AsyncCallbacks.CloseCallback() {
257+
@Override
258+
public void closeComplete(Object ctx) {
259+
new FastThreadLocalThread(new Runnable() {
260+
@Override
261+
@SneakyThrows
262+
public void run() {
263+
startCallbackSignal.await();
264+
originalCallback.closeComplete(ctx);
265+
}
266+
}).start();
267+
}
268+
269+
@Override
270+
public void closeFailed(ManagedLedgerException exception, Object ctx) {
271+
new FastThreadLocalThread(new Runnable() {
272+
@Override
273+
@SneakyThrows
274+
public void run() {
275+
startCallbackSignal.await();
276+
originalCallback.closeFailed(exception, ctx);
277+
}
278+
}).start();
279+
}
280+
};
281+
startCloseSignal.await();
282+
spyCursor.original.asyncClose(newCallback, ctx);
283+
return null;
284+
}).when(spyCursor.spy).asyncClose(any(AsyncCallbacks.CloseCallback.class), any());
285+
return new CursorCloseSignal(startCloseSignal, startCallbackSignal);
286+
}
287+
288+
@AllArgsConstructor
289+
static class CursorCloseSignal {
290+
CountDownLatch startCloseSignal;
291+
CountDownLatch startCallbackSignal;
292+
293+
void startClose() {
294+
startCloseSignal.countDown();
295+
}
296+
297+
void startCallback() {
298+
startCallbackSignal.countDown();
299+
}
300+
}
301+
302+
/**
303+
* See the description and execution flow: https://github.com/apache/pulsar/pull/21948.
304+
* Steps:
305+
* 1.Create topic, does not enable replication now.
306+
* - The topic will be loaded in the memory.
307+
* 2.Enable namespace level replication.
308+
* - Broker creates a replicator, and the internal producer of replicator is starting.
309+
* - We inject an error to make the internal producer fail to connect,after few seconds, it will retry to start.
310+
* 3.Unload bundle.
311+
* - Starting to close the topic.
312+
* - The replicator will be closed, but it will not close the internal producer, because the producer has not
313+
* been created successfully.
314+
* - We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck. So the topic is still
315+
* in the process of being closed now.
316+
* 4.Internal producer retry to connect.
317+
* - At the next retry, it connected successful. Since the state of "repl.cursor" is not "Closed", this producer
318+
* will not be closed now.
319+
* 5.Topic closed.
320+
* - Cancel the stuck of closing the "repl.cursor".
321+
* - The topic is wholly closed.
322+
* 6.Verify: the delayed created internal producer will be closed. In other words, there is no producer is connected
323+
* to the remote cluster.
324+
*/
325+
@Test
326+
public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception {
327+
final String namespaceName = defaultTenant + "/" + UUID.randomUUID().toString().replaceAll("-", "");
328+
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp_");
329+
// 1.Create topic, does not enable replication now.
330+
admin1.namespaces().createNamespace(namespaceName);
331+
admin2.namespaces().createNamespace(namespaceName);
332+
admin1.topics().createNonPartitionedTopic(topicName);
333+
PersistentTopic persistentTopic =
334+
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
335+
336+
// We inject an error to make the internal producer fail to connect.
337+
// The delay time of next retry to create producer is below:
338+
// 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s...
339+
// If the retry counter is larger than 6, the next creation will be slow enough to close Replicator.
340+
final AtomicInteger createProducerCounter = new AtomicInteger();
341+
final int failTimes = 6;
342+
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
343+
if (topicName.equals(producerCnf.getTopicName())) {
344+
// There is a switch to determine create producer successfully or not.
345+
if (createProducerCounter.incrementAndGet() > failTimes) {
346+
return originalProducer;
347+
}
348+
log.info("Retry create replicator.producer count: {}", createProducerCounter);
349+
// Release producer and fail callback.
350+
originalProducer.closeAsync();
351+
throw new RuntimeException("mock error");
352+
}
353+
return originalProducer;
354+
});
355+
356+
// 2.Enable namespace level replication.
357+
admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1, cluster2));
358+
AtomicReference<PersistentReplicator> replicator = new AtomicReference<PersistentReplicator>();
359+
Awaitility.await().untilAsserted(() -> {
360+
assertFalse(persistentTopic.getReplicators().isEmpty());
361+
replicator.set(
362+
(PersistentReplicator) persistentTopic.getReplicators().values().iterator().next());
363+
// Since we inject a producer creation error, the replicator can not start successfully.
364+
assertFalse(replicator.get().isConnected());
365+
});
366+
367+
// We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck, until the internal
368+
// producer of the replicator started.
369+
SpyCursor spyCursor =
370+
spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName());
371+
CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor);
372+
373+
// 3.Unload bundle: call "topic.close(false)".
374+
// Stuck start new producer, until the state of replicator change to Stopped.
375+
// The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully.
376+
Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> {
377+
assertTrue(createProducerCounter.get() >= failTimes);
378+
});
379+
CompletableFuture<Void> topicCloseFuture = persistentTopic.close(true);
380+
Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
381+
String state = String.valueOf(replicator.get().getState());
382+
assertTrue(state.equals("Stopped") || state.equals("Terminated"));
383+
});
384+
385+
// 5.Delay close cursor, until "replicator.producer" create successfully.
386+
// The next once retry time of create "replicator.producer" will be 3.2s.
387+
Thread.sleep(4 * 1000);
388+
log.info("Replicator.state: {}", replicator.get().getState());
389+
cursorCloseSignal.startClose();
390+
cursorCloseSignal.startCallback();
391+
// Wait for topic close successfully.
392+
topicCloseFuture.join();
393+
394+
// 6. Verify there is no orphan producer on the remote cluster.
395+
Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> {
396+
PersistentTopic persistentTopic2 =
397+
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
398+
assertEquals(persistentTopic2.getProducers().size(), 0);
399+
Assert.assertFalse(replicator.get().isConnected());
400+
});
401+
402+
// cleanup.
403+
cleanupNamespace(namespaceName);
404+
}
156405
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.common.collect.Sets;
2222
import java.net.URL;
2323
import java.util.Collections;
24+
import java.util.List;
2425
import java.util.Optional;
2526
import lombok.extern.slf4j.Slf4j;
2627
import org.apache.pulsar.broker.PulsarService;
@@ -146,6 +147,19 @@ protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Excep
146147
admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster1, cluster2));
147148
}
148149

150+
protected void cleanupNamespace(String namespace) throws Exception {
151+
admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1));
152+
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList(namespace);
153+
for (String partitionedTopic: partitionedTopics) {
154+
admin1.topics().deletePartitionedTopic(partitionedTopic);
155+
}
156+
List<String> nonPartitionedTopics = admin1.topics().getList(namespace);
157+
for (String nonPartitionedTopic: nonPartitionedTopics) {
158+
admin1.topics().delete(nonPartitionedTopic);
159+
}
160+
admin1.namespaces().deleteNamespace(namespace);
161+
}
162+
149163
protected interface CleanupTopicAction {
150164
void run() throws Exception;
151165
}

0 commit comments

Comments
 (0)