1919package org .apache .pulsar .tests .integration .backwardscompatibility ;
2020
2121
22+ import java .util .List ;
23+ import java .util .concurrent .TimeUnit ;
2224import java .util .function .Supplier ;
25+ import lombok .Cleanup ;
26+ import org .apache .pulsar .client .admin .PulsarAdmin ;
27+ import org .apache .pulsar .client .api .Message ;
28+ import org .apache .pulsar .client .api .MessageIdAdv ;
29+ import org .apache .pulsar .client .api .MessageRouter ;
30+ import org .apache .pulsar .client .api .MessageRoutingMode ;
31+ import org .apache .pulsar .client .api .PulsarClient ;
32+ import org .apache .pulsar .client .api .TopicMetadata ;
2333import org .apache .pulsar .tests .integration .topologies .ClientTestBase ;
34+ import org .testng .Assert ;
2435import org .testng .annotations .Test ;
2536
2637public class ClientTest2_2 extends PulsarStandaloneTestSuite2_2 {
@@ -33,4 +44,47 @@ public void testResetCursorCompatibility(Supplier<String> serviceUrl, Supplier<S
3344 clientTestBase .resetCursorCompatibility (serviceUrl .get (), httpServiceUrl .get (), topicName );
3445 }
3546
47+ @ Test (timeOut = 20000 )
48+ public void testAutoPartitionsUpdate () throws Exception {
49+ @ Cleanup final var pulsarClient = PulsarClient .builder ()
50+ .serviceUrl (getContainer ().getPlainTextServiceUrl ())
51+ .build ();
52+ final var topic = "test-auto-part-update" ;
53+ final var topic2 = "dummy-topic" ;
54+ @ Cleanup final var admin = PulsarAdmin .builder ().serviceHttpUrl (getContainer ().getHttpServiceUrl ()).build ();
55+ // Use 2 as the initial partition number because old version broker cannot update partitions on a topic that
56+ // has only 1 partition.
57+ admin .topics ().createPartitionedTopic (topic , 2 );
58+ admin .topics ().createPartitionedTopic (topic2 , 2 );
59+ @ Cleanup final var producer = pulsarClient .newProducer ().autoUpdatePartitions (true )
60+ .autoUpdatePartitionsInterval (1 , TimeUnit .SECONDS )
61+ .messageRoutingMode (MessageRoutingMode .CustomPartition )
62+ .messageRouter (new MessageRouter () {
63+ @ Override
64+ public int choosePartition (Message <?> msg , TopicMetadata metadata ) {
65+ return metadata .numPartitions () - 1 ;
66+ }
67+ })
68+ .topic (topic )
69+ .create ();
70+ @ Cleanup final var consumer = pulsarClient .newConsumer ().autoUpdatePartitions (true )
71+ .autoUpdatePartitionsInterval (1 , TimeUnit .SECONDS )
72+ .topic (topic ).subscriptionName ("sub" )
73+ .subscribe ();
74+ @ Cleanup final var multiTopicsConsumer = pulsarClient .newConsumer ().autoUpdatePartitions (true )
75+ .autoUpdatePartitionsInterval (1 , TimeUnit .SECONDS )
76+ .topics (List .of (topic , topic2 )).subscriptionName ("sub-2" ).subscribe ();
77+
78+ admin .topics ().updatePartitionedTopic (topic , 3 );
79+ Thread .sleep (1500 );
80+ final var msgId = (MessageIdAdv ) producer .send ("msg" .getBytes ());
81+ Assert .assertEquals (msgId .getPartitionIndex (), 2 );
82+
83+ final var msg = consumer .receive (3 , TimeUnit .SECONDS );
84+ Assert .assertNotNull (msg );
85+ Assert .assertEquals (((MessageIdAdv ) msg .getMessageId ()).getPartitionIndex (), 2 );
86+ final var msg2 = multiTopicsConsumer .receive (3 , TimeUnit .SECONDS );
87+ Assert .assertNotNull (msg2 );
88+ Assert .assertEquals (msg2 .getMessageId (), msg .getMessageId ());
89+ }
3690}
0 commit comments