4444import java .util .SortedSet ;
4545import java .util .TreeSet ;
4646import java .util .UUID ;
47- import java .util .concurrent .Callable ;
4847import java .util .concurrent .CompletableFuture ;
4948import java .util .concurrent .ConcurrentHashMap ;
5049import java .util .concurrent .CountDownLatch ;
5150import java .util .concurrent .ExecutorService ;
5251import java .util .concurrent .Executors ;
53- import java .util .concurrent .Future ;
5452import java .util .concurrent .TimeUnit ;
5553import java .util .concurrent .atomic .AtomicBoolean ;
5654import java .util .stream .Collectors ;
6866import org .apache .bookkeeper .mledger .impl .PositionImpl ;
6967import org .apache .pulsar .broker .BrokerTestUtil ;
7068import org .apache .pulsar .broker .PulsarService ;
69+ import org .apache .pulsar .broker .auth .MockedPulsarServiceBaseTest ;
7170import org .apache .pulsar .broker .service .BrokerServiceException .NamingException ;
7271import org .apache .pulsar .broker .service .persistent .PersistentReplicator ;
7372import org .apache .pulsar .broker .service .persistent .PersistentTopic ;
@@ -154,88 +153,6 @@ public Object[][] partitionedTopicProvider() {
154153 return new Object [][] { { Boolean .TRUE }, { Boolean .FALSE } };
155154 }
156155
157- @ Test (priority = Integer .MAX_VALUE )
158- public void testConfigChange () throws Exception {
159- log .info ("--- Starting ReplicatorTest::testConfigChange ---" );
160- // This test is to verify that the config change on global namespace is successfully applied in broker during
161- // runtime.
162- // Run a set of producer tasks to create the topics
163- List <Future <Void >> results = new ArrayList <>();
164- for (int i = 0 ; i < 10 ; i ++) {
165- final TopicName dest = TopicName .get (BrokerTestUtil .newUniqueName ("persistent://pulsar/ns/topic-" + i ));
166-
167- results .add (executor .submit (new Callable <Void >() {
168- @ Override
169- public Void call () throws Exception {
170-
171- @ Cleanup
172- MessageProducer producer = new MessageProducer (url1 , dest );
173- log .info ("--- Starting producer --- " + url1 );
174-
175- @ Cleanup
176- MessageConsumer consumer = new MessageConsumer (url1 , dest );
177- log .info ("--- Starting Consumer --- " + url1 );
178-
179- producer .produce (2 );
180- consumer .receive (2 );
181- return null ;
182- }
183- }));
184- }
185-
186- for (Future <Void > result : results ) {
187- try {
188- result .get ();
189- } catch (Exception e ) {
190- log .error ("exception in getting future result " , e );
191- fail (String .format ("replication test failed with %s exception" , e .getMessage ()));
192- }
193- }
194-
195- Thread .sleep (1000L );
196- // Make sure that the internal replicators map contains remote cluster info
197- ConcurrentOpenHashMap <String , PulsarClient > replicationClients1 = ns1 .getReplicationClients ();
198- ConcurrentOpenHashMap <String , PulsarClient > replicationClients2 = ns2 .getReplicationClients ();
199- ConcurrentOpenHashMap <String , PulsarClient > replicationClients3 = ns3 .getReplicationClients ();
200-
201- Assert .assertNotNull (replicationClients1 .get ("r2" ));
202- Assert .assertNotNull (replicationClients1 .get ("r3" ));
203- Assert .assertNotNull (replicationClients2 .get ("r1" ));
204- Assert .assertNotNull (replicationClients2 .get ("r3" ));
205- Assert .assertNotNull (replicationClients3 .get ("r1" ));
206- Assert .assertNotNull (replicationClients3 .get ("r2" ));
207-
208- // Case 1: Update the global namespace replication configuration to only contains the local cluster itself
209- admin1 .namespaces ().setNamespaceReplicationClusters ("pulsar/ns" , Sets .newHashSet ("r1" ));
210-
211- // Wait for config changes to be updated.
212- Thread .sleep (1000L );
213-
214- // Make sure that the internal replicators map still contains remote cluster info
215- Assert .assertNotNull (replicationClients1 .get ("r2" ));
216- Assert .assertNotNull (replicationClients1 .get ("r3" ));
217- Assert .assertNotNull (replicationClients2 .get ("r1" ));
218- Assert .assertNotNull (replicationClients2 .get ("r3" ));
219- Assert .assertNotNull (replicationClients3 .get ("r1" ));
220- Assert .assertNotNull (replicationClients3 .get ("r2" ));
221-
222- // Case 2: Update the configuration back
223- admin1 .namespaces ().setNamespaceReplicationClusters ("pulsar/ns" , Sets .newHashSet ("r1" , "r2" , "r3" ));
224-
225- // Wait for config changes to be updated.
226- Thread .sleep (1000L );
227-
228- // Make sure that the internal replicators map still contains remote cluster info
229- Assert .assertNotNull (replicationClients1 .get ("r2" ));
230- Assert .assertNotNull (replicationClients1 .get ("r3" ));
231- Assert .assertNotNull (replicationClients2 .get ("r1" ));
232- Assert .assertNotNull (replicationClients2 .get ("r3" ));
233- Assert .assertNotNull (replicationClients3 .get ("r1" ));
234- Assert .assertNotNull (replicationClients3 .get ("r2" ));
235-
236- // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
237- }
238-
239156 @ Test (timeOut = 10000 )
240157 public void activeBrokerParse () throws Exception {
241158 pulsar1 .getConfiguration ().setAuthorizationEnabled (true );
@@ -253,6 +170,32 @@ public void activeBrokerParse() throws Exception {
253170 pulsar1 .getConfiguration ().setAuthorizationEnabled (false );
254171 }
255172
173+ @ Test
174+ public void testForcefullyTopicDeletion () throws Exception {
175+ log .info ("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---" );
176+
177+ final String namespace = BrokerTestUtil .newUniqueName ("pulsar/removeClusterTest" );
178+ admin1 .namespaces ().createNamespace (namespace );
179+ admin1 .namespaces ().setNamespaceReplicationClusters (namespace , Sets .newHashSet ("r1" ));
180+
181+ final String topicName = "persistent://" + namespace + "/topic" ;
182+
183+ @ Cleanup
184+ PulsarClient client1 = PulsarClient .builder ().serviceUrl (url1 .toString ()).statsInterval (0 , TimeUnit .SECONDS )
185+ .build ();
186+
187+ ProducerImpl <byte []> producer1 = (ProducerImpl <byte []>) client1 .newProducer ().topic (topicName )
188+ .enableBatching (false ).messageRoutingMode (MessageRoutingMode .SinglePartition ).create ();
189+ producer1 .close ();
190+
191+ admin1 .topics ().delete (topicName , true );
192+
193+ MockedPulsarServiceBaseTest
194+ .retryStrategically ((test ) -> !pulsar1 .getBrokerService ().getTopics ().containsKey (topicName ), 50 , 150 );
195+
196+ Assert .assertFalse (pulsar1 .getBrokerService ().getTopics ().containsKey (topicName ));
197+ }
198+
256199 @ SuppressWarnings ("unchecked" )
257200 @ Test (timeOut = 30000 )
258201 public void testConcurrentReplicator () throws Exception {
@@ -1240,7 +1183,7 @@ public void testReplicatedCluster() throws Exception {
12401183
12411184 log .info ("--- Starting ReplicatorTest::testReplicatedCluster ---" );
12421185
1243- final String namespace = "pulsar/global/repl" ;
1186+ final String namespace = BrokerTestUtil . newUniqueName ( "pulsar/global/repl" ) ;
12441187 final String topicName = BrokerTestUtil .newUniqueName ("persistent://" + namespace + "/topic1" );
12451188 admin1 .namespaces ().createNamespace (namespace );
12461189 admin1 .namespaces ().setNamespaceReplicationClusters (namespace , Sets .newHashSet ("r1" , "r2" , "r3" ));
@@ -1647,7 +1590,7 @@ public void testReplicatorWithFailedAck() throws Exception {
16471590
16481591 log .info ("--- Starting ReplicatorTest::testReplication ---" );
16491592
1650- String namespace = "pulsar/global/ns2" ;
1593+ String namespace = BrokerTestUtil . newUniqueName ( "pulsar/global/ns" ) ;
16511594 admin1 .namespaces ().createNamespace (namespace , Sets .newHashSet ("r1" ));
16521595 final TopicName dest = TopicName
16531596 .get (BrokerTestUtil .newUniqueName ("persistent://" + namespace + "/ackFailedTopic" ));
@@ -1685,13 +1628,15 @@ public void testReplicatorWithFailedAck() throws Exception {
16851628
16861629 MessageIdImpl lastMessageId = (MessageIdImpl ) topic .getLastMessageId ().get ();
16871630 Position lastPosition = PositionImpl .get (lastMessageId .getLedgerId (), lastMessageId .getEntryId ());
1688- ConcurrentOpenHashMap <String , Replicator > replicators = topic .getReplicators ();
1689- PersistentReplicator replicator = (PersistentReplicator ) replicators .get ("r2" );
16901631
16911632 Awaitility .await ().pollInterval (1 , TimeUnit .SECONDS ).timeout (30 , TimeUnit .SECONDS )
1692- .untilAsserted (() -> assertEquals (org .apache .pulsar .broker .service .AbstractReplicator .State .Started ,
1693- replicator .getState ()));
1694- assertEquals (replicator .getState (), org .apache .pulsar .broker .service .AbstractReplicator .State .Started );
1633+ .ignoreExceptions ()
1634+ .untilAsserted (() -> {
1635+ ConcurrentOpenHashMap <String , Replicator > replicators = topic .getReplicators ();
1636+ PersistentReplicator replicator = (PersistentReplicator ) replicators .get ("r2" );
1637+ assertEquals (org .apache .pulsar .broker .service .AbstractReplicator .State .Started ,
1638+ replicator .getState ());
1639+ });
16951640
16961641 // Make sure all the data has replicated to the remote cluster before close the cursor.
16971642 Awaitility .await ().untilAsserted (() -> assertEquals (cursor .getMarkDeletedPosition (), lastPosition ));
@@ -1717,7 +1662,7 @@ public void testReplicatorWithFailedAck() throws Exception {
17171662 @ Test
17181663 public void testWhenUpdateReplicationCluster () throws Exception {
17191664 log .info ("--- testWhenUpdateReplicationCluster ---" );
1720- String namespace = "pulsar/ns2" ;
1665+ String namespace = BrokerTestUtil . newUniqueName ( "pulsar/ns" ); ;
17211666 admin1 .namespaces ().createNamespace (namespace );
17221667 admin1 .namespaces ().setNamespaceReplicationClusters (namespace , Sets .newHashSet ("r1" , "r2" ));
17231668 final TopicName dest = TopicName .get (
@@ -1746,12 +1691,12 @@ public void testWhenUpdateReplicationCluster() throws Exception {
17461691 @ Test
17471692 public void testReplicatorProducerNotExceed () throws Exception {
17481693 log .info ("--- testReplicatorProducerNotExceed ---" );
1749- String namespace1 = "pulsar/ns11" ;
1694+ String namespace1 = BrokerTestUtil . newUniqueName ( "pulsar/ns1" ) ;
17501695 admin1 .namespaces ().createNamespace (namespace1 );
17511696 admin1 .namespaces ().setNamespaceReplicationClusters (namespace1 , Sets .newHashSet ("r1" , "r2" ));
17521697 final TopicName dest1 = TopicName .get (
17531698 BrokerTestUtil .newUniqueName ("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1" ));
1754- String namespace2 = "pulsar/ns22" ;
1699+ String namespace2 = BrokerTestUtil . newUniqueName ( "pulsar/ns2" ) ;
17551700 admin2 .namespaces ().createNamespace (namespace2 );
17561701 admin2 .namespaces ().setNamespaceReplicationClusters (namespace2 , Sets .newHashSet ("r1" , "r2" ));
17571702 final TopicName dest2 = TopicName .get (
0 commit comments