|
18 | 18 | */ |
19 | 19 | package org.apache.pulsar.client.api; |
20 | 20 |
|
| 21 | +import static org.awaitility.Awaitility.await; |
21 | 22 | import com.google.common.collect.Sets; |
22 | | - |
23 | 23 | import java.time.Duration; |
24 | 24 | import java.util.concurrent.CountDownLatch; |
25 | 25 | import java.util.concurrent.atomic.AtomicInteger; |
26 | | - |
27 | 26 | import org.apache.pulsar.broker.BrokerTestUtil; |
28 | 27 | import org.apache.pulsar.broker.service.Dispatcher; |
29 | 28 | import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; |
|
38 | 37 | import org.testng.Assert; |
39 | 38 | import org.testng.annotations.Test; |
40 | 39 |
|
41 | | -import static org.awaitility.Awaitility.await; |
42 | | - |
43 | 40 | @Test(groups = "flaky") |
44 | 41 | public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchThrottlingTest { |
45 | 42 | private static final Logger log = LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class); |
@@ -243,6 +240,7 @@ private void testMessageNotDuplicated(SubscriptionType subscription) throws Exce |
243 | 240 | admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); |
244 | 241 | admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate); |
245 | 242 | admin.namespaces().setDispatchRate(namespace, topicDispatchRate); |
| 243 | + long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(); |
246 | 244 | admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate); |
247 | 245 |
|
248 | 246 | final int numProducedMessages = 30; |
@@ -302,6 +300,9 @@ private void testMessageNotDuplicated(SubscriptionType subscription) throws Exce |
302 | 300 |
|
303 | 301 | consumer.close(); |
304 | 302 | producer.close(); |
| 303 | + |
| 304 | + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes)); |
| 305 | + |
305 | 306 | admin.topics().delete(topicName, true); |
306 | 307 | admin.namespaces().deleteNamespace(namespace); |
307 | 308 | } |
@@ -417,6 +418,7 @@ private void testDispatchRate(SubscriptionType subscription, |
417 | 418 | admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); |
418 | 419 | admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate); |
419 | 420 | admin.namespaces().setDispatchRate(namespace, topicDispatchRate); |
| 421 | + long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(); |
420 | 422 | admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate); |
421 | 423 |
|
422 | 424 | final int numProducedMessages = 30; |
@@ -480,6 +482,7 @@ private void testDispatchRate(SubscriptionType subscription, |
480 | 482 |
|
481 | 483 | consumer.close(); |
482 | 484 | producer.close(); |
| 485 | + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes)); |
483 | 486 | admin.topics().delete(topicName, true); |
484 | 487 | admin.namespaces().deleteNamespace(namespace); |
485 | 488 | } |
@@ -532,6 +535,7 @@ public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(Subscri |
532 | 535 | final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace2 + "/throttlingAll"); |
533 | 536 | final String subName = "my-subscriber-name-" + subscription; |
534 | 537 |
|
| 538 | + long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(); |
535 | 539 | final int byteRate = 1000; |
536 | 540 | admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + byteRate); |
537 | 541 | admin.namespaces().createNamespace(namespace1, Sets.newHashSet("test")); |
@@ -591,6 +595,7 @@ public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(Subscri |
591 | 595 | consumer2.close(); |
592 | 596 | producer1.close(); |
593 | 597 | producer2.close(); |
| 598 | + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes)); |
594 | 599 | log.info("-- Exiting {} test --", methodName); |
595 | 600 | } |
596 | 601 |
|
@@ -739,7 +744,9 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription) |
739 | 744 |
|
740 | 745 | consumer.close(); |
741 | 746 | producer.close(); |
742 | | - pulsar.getConfiguration().setDispatchThrottlingRatePerSubscriptionInMsg(initValue); |
| 747 | + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg", |
| 748 | + Integer.toString(initValue)); |
| 749 | + conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false); |
743 | 750 | log.info("-- Exiting {} test --", methodName); |
744 | 751 | } |
745 | 752 |
|
@@ -855,7 +862,8 @@ public void testClusterPolicyOverrideConfiguration() throws Exception { |
855 | 862 |
|
856 | 863 | producer.close(); |
857 | 864 | producer2.close(); |
858 | | - |
| 865 | + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg", |
| 866 | + Integer.toString(initValue)); |
859 | 867 | log.info("-- Exiting {} test --", methodName); |
860 | 868 | } |
861 | 869 |
|
|
0 commit comments