4444import java .util .SortedSet ;
4545import java .util .TreeSet ;
4646import java .util .UUID ;
47- import java .util .concurrent .Callable ;
4847import java .util .concurrent .CompletableFuture ;
4948import java .util .concurrent .ConcurrentHashMap ;
5049import java .util .concurrent .CountDownLatch ;
5150import java .util .concurrent .ExecutorService ;
5251import java .util .concurrent .Executors ;
53- import java .util .concurrent .Future ;
5452import java .util .concurrent .TimeUnit ;
5553import java .util .concurrent .atomic .AtomicBoolean ;
5654import java .util .stream .Collectors ;
6866import org .apache .bookkeeper .mledger .impl .PositionImpl ;
6967import org .apache .pulsar .broker .BrokerTestUtil ;
7068import org .apache .pulsar .broker .PulsarService ;
69+ import org .apache .pulsar .broker .auth .MockedPulsarServiceBaseTest ;
7170import org .apache .pulsar .broker .service .BrokerServiceException .NamingException ;
7271import org .apache .pulsar .broker .service .persistent .PersistentReplicator ;
7372import org .apache .pulsar .broker .service .persistent .PersistentTopic ;
@@ -154,88 +153,6 @@ public Object[][] partitionedTopicProvider() {
154153 return new Object [][] { { Boolean .TRUE }, { Boolean .FALSE } };
155154 }
156155
157- @ Test (priority = Integer .MAX_VALUE )
158- public void testConfigChange () throws Exception {
159- log .info ("--- Starting ReplicatorTest::testConfigChange ---" );
160- // This test is to verify that the config change on global namespace is successfully applied in broker during
161- // runtime.
162- // Run a set of producer tasks to create the topics
163- List <Future <Void >> results = new ArrayList <>();
164- for (int i = 0 ; i < 10 ; i ++) {
165- final TopicName dest = TopicName .get (BrokerTestUtil .newUniqueName ("persistent://pulsar/ns/topic-" + i ));
166-
167- results .add (executor .submit (new Callable <Void >() {
168- @ Override
169- public Void call () throws Exception {
170-
171- @ Cleanup
172- MessageProducer producer = new MessageProducer (url1 , dest );
173- log .info ("--- Starting producer --- " + url1 );
174-
175- @ Cleanup
176- MessageConsumer consumer = new MessageConsumer (url1 , dest );
177- log .info ("--- Starting Consumer --- " + url1 );
178-
179- producer .produce (2 );
180- consumer .receive (2 );
181- return null ;
182- }
183- }));
184- }
185-
186- for (Future <Void > result : results ) {
187- try {
188- result .get ();
189- } catch (Exception e ) {
190- log .error ("exception in getting future result " , e );
191- fail (String .format ("replication test failed with %s exception" , e .getMessage ()));
192- }
193- }
194-
195- Thread .sleep (1000L );
196- // Make sure that the internal replicators map contains remote cluster info
197- ConcurrentOpenHashMap <String , PulsarClient > replicationClients1 = ns1 .getReplicationClients ();
198- ConcurrentOpenHashMap <String , PulsarClient > replicationClients2 = ns2 .getReplicationClients ();
199- ConcurrentOpenHashMap <String , PulsarClient > replicationClients3 = ns3 .getReplicationClients ();
200-
201- Assert .assertNotNull (replicationClients1 .get ("r2" ));
202- Assert .assertNotNull (replicationClients1 .get ("r3" ));
203- Assert .assertNotNull (replicationClients2 .get ("r1" ));
204- Assert .assertNotNull (replicationClients2 .get ("r3" ));
205- Assert .assertNotNull (replicationClients3 .get ("r1" ));
206- Assert .assertNotNull (replicationClients3 .get ("r2" ));
207-
208- // Case 1: Update the global namespace replication configuration to only contains the local cluster itself
209- admin1 .namespaces ().setNamespaceReplicationClusters ("pulsar/ns" , Sets .newHashSet ("r1" ));
210-
211- // Wait for config changes to be updated.
212- Thread .sleep (1000L );
213-
214- // Make sure that the internal replicators map still contains remote cluster info
215- Assert .assertNotNull (replicationClients1 .get ("r2" ));
216- Assert .assertNotNull (replicationClients1 .get ("r3" ));
217- Assert .assertNotNull (replicationClients2 .get ("r1" ));
218- Assert .assertNotNull (replicationClients2 .get ("r3" ));
219- Assert .assertNotNull (replicationClients3 .get ("r1" ));
220- Assert .assertNotNull (replicationClients3 .get ("r2" ));
221-
222- // Case 2: Update the configuration back
223- admin1 .namespaces ().setNamespaceReplicationClusters ("pulsar/ns" , Sets .newHashSet ("r1" , "r2" , "r3" ));
224-
225- // Wait for config changes to be updated.
226- Thread .sleep (1000L );
227-
228- // Make sure that the internal replicators map still contains remote cluster info
229- Assert .assertNotNull (replicationClients1 .get ("r2" ));
230- Assert .assertNotNull (replicationClients1 .get ("r3" ));
231- Assert .assertNotNull (replicationClients2 .get ("r1" ));
232- Assert .assertNotNull (replicationClients2 .get ("r3" ));
233- Assert .assertNotNull (replicationClients3 .get ("r1" ));
234- Assert .assertNotNull (replicationClients3 .get ("r2" ));
235-
236- // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
237- }
238-
239156 @ Test (timeOut = 10000 )
240157 public void activeBrokerParse () throws Exception {
241158 pulsar1 .getConfiguration ().setAuthorizationEnabled (true );
@@ -253,6 +170,32 @@ public void activeBrokerParse() throws Exception {
253170 pulsar1 .getConfiguration ().setAuthorizationEnabled (false );
254171 }
255172
173+ @ Test
174+ public void testForcefullyTopicDeletion () throws Exception {
175+ log .info ("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---" );
176+
177+ final String namespace = BrokerTestUtil .newUniqueName ("pulsar/removeClusterTest" );
178+ admin1 .namespaces ().createNamespace (namespace );
179+ admin1 .namespaces ().setNamespaceReplicationClusters (namespace , Sets .newHashSet ("r1" ));
180+
181+ final String topicName = "persistent://" + namespace + "/topic" ;
182+
183+ @ Cleanup
184+ PulsarClient client1 = PulsarClient .builder ().serviceUrl (url1 .toString ()).statsInterval (0 , TimeUnit .SECONDS )
185+ .build ();
186+
187+ ProducerImpl <byte []> producer1 = (ProducerImpl <byte []>) client1 .newProducer ().topic (topicName )
188+ .enableBatching (false ).messageRoutingMode (MessageRoutingMode .SinglePartition ).create ();
189+ producer1 .close ();
190+
191+ admin1 .topics ().delete (topicName , true );
192+
193+ MockedPulsarServiceBaseTest
194+ .retryStrategically ((test ) -> !pulsar1 .getBrokerService ().getTopics ().containsKey (topicName ), 50 , 150 );
195+
196+ Assert .assertFalse (pulsar1 .getBrokerService ().getTopics ().containsKey (topicName ));
197+ }
198+
256199 @ SuppressWarnings ("unchecked" )
257200 @ Test (timeOut = 30000 )
258201 public void testConcurrentReplicator () throws Exception {
@@ -1270,7 +1213,7 @@ public void testReplicatedCluster() throws Exception {
12701213
12711214 log .info ("--- Starting ReplicatorTest::testReplicatedCluster ---" );
12721215
1273- final String namespace = "pulsar/global/repl" ;
1216+ final String namespace = BrokerTestUtil . newUniqueName ( "pulsar/global/repl" ) ;
12741217 final String topicName = BrokerTestUtil .newUniqueName ("persistent://" + namespace + "/topic1" );
12751218 admin1 .namespaces ().createNamespace (namespace );
12761219 admin1 .namespaces ().setNamespaceReplicationClusters (namespace , Sets .newHashSet ("r1" , "r2" , "r3" ));
@@ -1677,7 +1620,7 @@ public void testReplicatorWithFailedAck() throws Exception {
16771620
16781621 log .info ("--- Starting ReplicatorTest::testReplication ---" );
16791622
1680- String namespace = "pulsar/global/ns2" ;
1623+ String namespace = BrokerTestUtil . newUniqueName ( "pulsar/global/ns" ) ;
16811624 admin1 .namespaces ().createNamespace (namespace , Sets .newHashSet ("r1" ));
16821625 final TopicName dest = TopicName
16831626 .get (BrokerTestUtil .newUniqueName ("persistent://" + namespace + "/ackFailedTopic" ));
@@ -1749,7 +1692,7 @@ public void testReplicatorWithFailedAck() throws Exception {
17491692 @ Test
17501693 public void testWhenUpdateReplicationCluster () throws Exception {
17511694 log .info ("--- testWhenUpdateReplicationCluster ---" );
1752- String namespace = "pulsar/ns2" ;
1695+ String namespace = BrokerTestUtil . newUniqueName ( "pulsar/ns" ); ;
17531696 admin1 .namespaces ().createNamespace (namespace );
17541697 admin1 .namespaces ().setNamespaceReplicationClusters (namespace , Sets .newHashSet ("r1" , "r2" ));
17551698 final TopicName dest = TopicName .get (
@@ -1778,12 +1721,12 @@ public void testWhenUpdateReplicationCluster() throws Exception {
17781721 @ Test
17791722 public void testReplicatorProducerNotExceed () throws Exception {
17801723 log .info ("--- testReplicatorProducerNotExceed ---" );
1781- String namespace1 = "pulsar/ns11" ;
1724+ String namespace1 = BrokerTestUtil . newUniqueName ( "pulsar/ns1" ) ;
17821725 admin1 .namespaces ().createNamespace (namespace1 );
17831726 admin1 .namespaces ().setNamespaceReplicationClusters (namespace1 , Sets .newHashSet ("r1" , "r2" ));
17841727 final TopicName dest1 = TopicName .get (
17851728 BrokerTestUtil .newUniqueName ("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1" ));
1786- String namespace2 = "pulsar/ns22" ;
1729+ String namespace2 = BrokerTestUtil . newUniqueName ( "pulsar/ns2" ) ;
17871730 admin2 .namespaces ().createNamespace (namespace2 );
17881731 admin2 .namespaces ().setNamespaceReplicationClusters (namespace2 , Sets .newHashSet ("r1" , "r2" ));
17891732 final TopicName dest2 = TopicName .get (
0 commit comments