2121import static org .mockito .Mockito .any ;
2222import static org .mockito .Mockito .doAnswer ;
2323import static org .mockito .Mockito .spy ;
24- import static org .mockito .Mockito .when ;
2524import static org .testng .Assert .assertEquals ;
2625import static org .testng .Assert .assertFalse ;
26+ import static org .testng .Assert .assertNotEquals ;
2727import static org .testng .Assert .assertTrue ;
2828import io .netty .util .concurrent .FastThreadLocalThread ;
2929import java .lang .reflect .Field ;
3333import java .util .concurrent .CompletableFuture ;
3434import java .util .concurrent .CountDownLatch ;
3535import java .util .concurrent .TimeUnit ;
36+ import java .util .concurrent .atomic .AtomicBoolean ;
3637import java .util .concurrent .atomic .AtomicInteger ;
38+ import java .util .concurrent .atomic .AtomicReference ;
3739import java .util .function .BiFunction ;
3840import lombok .AllArgsConstructor ;
3941import lombok .Data ;
@@ -104,7 +106,7 @@ private ProducerImpl overrideProducerForReplicator(AbstractReplicator replicator
104106 return originalValue ;
105107 }
106108
107- @ Test
109+ @ Test ( timeOut = 45 * 1000 )
108110 public void testReplicatorProducerStatInTopic () throws Exception {
109111 final String topicName = BrokerTestUtil .newUniqueName ("persistent://" + defaultNamespace + "/tp_" );
110112 final String subscribeName = "subscribe_1" ;
@@ -130,7 +132,7 @@ public void testReplicatorProducerStatInTopic() throws Exception {
130132 });
131133 }
132134
133- @ Test
135+ @ Test ( timeOut = 45 * 1000 )
134136 public void testCreateRemoteConsumerFirst () throws Exception {
135137 final String topicName = BrokerTestUtil .newUniqueName ("persistent://" + defaultNamespace + "/tp_" );
136138 Producer <String > producer1 = client1 .newProducer (Schema .STRING ).topic (topicName ).create ();
@@ -150,28 +152,49 @@ public void testCreateRemoteConsumerFirst() throws Exception {
150152 });
151153 }
152154
153- @ Test
155+ @ Test ( timeOut = 45 * 1000 )
154156 public void testTopicCloseWhenInternalProducerCloseErrorOnce () throws Exception {
155157 final String topicName = BrokerTestUtil .newUniqueName ("persistent://" + defaultNamespace + "/tp_" );
156158 admin1 .topics ().createNonPartitionedTopic (topicName );
157159 // Wait for replicator started.
158160 waitReplicatorStarted (topicName );
159- PersistentTopic persistentTopic =
161+ PersistentTopic topic1 =
160162 (PersistentTopic ) pulsar1 .getBrokerService ().getTopic (topicName , false ).join ().get ();
161- PersistentReplicator replicator =
162- (PersistentReplicator ) persistentTopic .getReplicators ().values ().iterator ().next ();
163+ PersistentReplicator replicator1 =
164+ (PersistentReplicator ) topic1 .getReplicators ().values ().iterator ().next ();
163165 // Mock an error when calling "replicator.disconnect()"
164- ProducerImpl mockProducer = Mockito .mock (ProducerImpl .class );
165- when (mockProducer .closeAsync ()).thenReturn (CompletableFuture .failedFuture (new Exception ("mocked ex" )));
166- ProducerImpl originalProducer = overrideProducerForReplicator (replicator , mockProducer );
166+ AtomicBoolean closeFailed = new AtomicBoolean (true );
167+ final ProducerImpl mockProducer = Mockito .mock (ProducerImpl .class );
168+ final AtomicReference <ProducerImpl > originalProducer1 = new AtomicReference ();
169+ doAnswer (invocation -> {
170+ if (closeFailed .get ()) {
171+ return CompletableFuture .failedFuture (new Exception ("mocked ex" ));
172+ } else {
173+ return originalProducer1 .get ().closeAsync ();
174+ }
175+ }).when (mockProducer ).closeAsync ();
176+ originalProducer1 .set (overrideProducerForReplicator (replicator1 , mockProducer ));
167177 // Verify: since the "replicator.producer.closeAsync()" will retry after it failed, the topic unload should be
168178 // successful.
169179 admin1 .topics ().unload (topicName );
170180 // Verify: After "replicator.producer.closeAsync()" retry again, the "replicator.producer" will be closed
171181 // successful.
172- overrideProducerForReplicator (replicator , originalProducer );
182+ closeFailed .set (false );
183+ AtomicReference <PersistentTopic > topic2 = new AtomicReference ();
184+ AtomicReference <PersistentReplicator > replicator2 = new AtomicReference ();
173185 Awaitility .await ().untilAsserted (() -> {
174- Assert .assertFalse (replicator .isConnected ());
186+ topic2 .set ((PersistentTopic ) pulsar1 .getBrokerService ().getTopic (topicName , false ).join ().get ());
187+ replicator2 .set ((PersistentReplicator ) topic2 .get ().getReplicators ().values ().iterator ().next ());
188+ // It is a new Topic after reloading.
189+ assertNotEquals (topic2 .get (), topic1 );
190+ assertNotEquals (replicator2 .get (), replicator1 );
191+ });
192+ Awaitility .await ().untilAsserted (() -> {
193+ // Old replicator should be closed.
194+ Assert .assertFalse (replicator1 .isConnected ());
195+ Assert .assertFalse (originalProducer1 .get ().isConnected ());
196+ // New replicator should be connected.
197+ Assert .assertTrue (replicator2 .get ().isConnected ());
175198 });
176199 // cleanup.
177200 cleanupTopics (() -> {
@@ -205,19 +228,26 @@ private void injectMockReplicatorProducerBuilder(
205228 // Inject producer decorator.
206229 doAnswer (invocation -> {
207230 Schema schema = (Schema ) invocation .getArguments ()[0 ];
208- ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl ) internalClient .newProducer (schema );
231+ ProducerBuilderImpl <?> producerBuilder = (ProducerBuilderImpl ) internalClient .newProducer (schema );
209232 ProducerBuilder spyProducerBuilder = spy (producerBuilder );
210233 doAnswer (ignore -> {
211234 CompletableFuture <Producer > producerFuture = new CompletableFuture <>();
212- final ProducerImpl p = (ProducerImpl ) producerBuilder .create ();
213- new FastThreadLocalThread (() -> {
214- try {
215- ProducerImpl newProducer = producerDecorator .apply (producerBuilder .getConf (), p );
216- producerFuture .complete (newProducer );
217- } catch (Exception ex ) {
218- producerFuture .completeExceptionally (ex );
235+ producerBuilder .createAsync ().whenComplete ((p , t ) -> {
236+ if (t != null ) {
237+ producerFuture .completeExceptionally (t );
238+ return ;
219239 }
220- }).start ();
240+ ProducerImpl pImpl = (ProducerImpl ) p ;
241+ new FastThreadLocalThread (() -> {
242+ try {
243+ ProducerImpl newProducer = producerDecorator .apply (producerBuilder .getConf (), pImpl );
244+ producerFuture .complete (newProducer );
245+ } catch (Exception ex ) {
246+ producerFuture .completeExceptionally (ex );
247+ }
248+ }).start ();
249+ });
250+
221251 return producerFuture ;
222252 }).when (spyProducerBuilder ).createAsync ();
223253 return spyProducerBuilder ;
@@ -306,7 +336,7 @@ void startCallback() {
306336 * - The topic is wholly closed.
307337 * - Verify: the delayed created internal producer will be closed.
308338 */
309- @ Test
339+ @ Test ( timeOut = 120 * 1000 )
310340 public void testConcurrencyOfUnloadBundleAndRecreateProducer () throws Exception {
311341 final String topicName = BrokerTestUtil .newUniqueName ("persistent://" + defaultNamespace + "/tp_" );
312342 // Inject an error for "replicator.producer" creation.
@@ -347,7 +377,8 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception
347377 // Stuck start new producer, until the state of replicator change to Stopped.
348378 // The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully.
349379 Awaitility .await ().pollInterval (Duration .ofMillis (100 )).atMost (Duration .ofSeconds (60 )).untilAsserted (() -> {
350- assertTrue (createProducerCounter .get () >= failTimes );
380+ assertTrue (createProducerCounter .get () >= failTimes ,
381+ "count of retry to create producer is " + createProducerCounter .get ());
351382 });
352383 CompletableFuture <Void > topicCloseFuture = persistentTopic .close (true );
353384 Awaitility .await ().atMost (Duration .ofSeconds (30 )).untilAsserted (() -> {
0 commit comments