Skip to content

Commit 1f122f1

Browse files
committed
improve MessageDispatchThrottlingTest
1 parent c732852 commit 1f122f1

2 files changed

Lines changed: 49 additions & 12 deletions

File tree

pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,29 +53,53 @@
5353
import org.slf4j.Logger;
5454
import org.slf4j.LoggerFactory;
5555
import org.testng.Assert;
56+
import org.testng.annotations.AfterClass;
5657
import org.testng.annotations.AfterMethod;
57-
import org.testng.annotations.BeforeMethod;
58+
import org.testng.annotations.BeforeClass;
5859
import org.testng.annotations.DataProvider;
5960
import org.testng.annotations.Test;
6061

6162
@Test(groups = "flaky")
6263
public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
6364
private static final Logger log = LoggerFactory.getLogger(MessageDispatchThrottlingTest.class);
6465

65-
@BeforeMethod
66+
@BeforeClass
6667
@Override
6768
protected void setup() throws Exception {
6869
this.conf.setClusterName("test");
6970
super.internalSetup();
7071
super.producerBaseSetup();
7172
}
7273

73-
@AfterMethod(alwaysRun = true)
74+
@AfterClass(alwaysRun = true)
7475
@Override
7576
protected void cleanup() throws Exception {
7677
super.internalCleanup();
7778
}
7879

80+
@AfterMethod(alwaysRun = true)
81+
protected void reset() throws Exception {
82+
pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
83+
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
84+
85+
for (String tenant : admin.tenants().getTenants()) {
86+
for (String namespace : admin.namespaces().getNamespaces(tenant)) {
87+
deleteNamespaceGraceFully(namespace, true);
88+
}
89+
admin.tenants().deleteTenant(tenant, true);
90+
}
91+
92+
for (String cluster : admin.clusters().getClusters()) {
93+
admin.clusters().deleteCluster(cluster);
94+
}
95+
96+
pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
97+
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
98+
99+
super.producerBaseSetup();
100+
}
101+
102+
79103
@DataProvider(name = "subscriptions")
80104
public Object[][] subscriptionsProvider() {
81105
return new Object[][] { new Object[] { SubscriptionType.Shared }, { SubscriptionType.Exclusive } };
@@ -280,6 +304,7 @@ public void testClusterMsgByteRateLimitingClusterConfig() throws Exception {
280304
final long byteRate = 1024 * 1024;// 1MB rate enough to let all msg to be delivered
281305

282306
int initValue = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg();
307+
long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
283308
// (1) Update message-dispatch-rate limit
284309
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
285310
Integer.toString(messageRate));
@@ -325,7 +350,9 @@ public void testClusterMsgByteRateLimitingClusterConfig() throws Exception {
325350

326351
consumer.close();
327352
producer.close();
328-
pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(initValue);
353+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
354+
Integer.toString(initValue));
355+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInByte", Long.toString(initBytes));
329356
log.info("-- Exiting {} test --", methodName);
330357
}
331358

@@ -675,7 +702,8 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription)
675702

676703
consumer.close();
677704
producer.close();
678-
pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(initValue);
705+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
706+
Integer.toString(initValue));
679707
log.info("-- Exiting {} test --", methodName);
680708
}
681709

@@ -981,7 +1009,8 @@ public void testClusterPolicyOverrideConfiguration() throws Exception {
9811009

9821010
producer.close();
9831011
producer2.close();
984-
1012+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
1013+
Integer.toString(initValue));
9851014
log.info("-- Exiting {} test --", methodName);
9861015
}
9871016

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@
1818
*/
1919
package org.apache.pulsar.client.api;
2020

21+
import static org.awaitility.Awaitility.await;
2122
import com.google.common.collect.Sets;
22-
2323
import java.time.Duration;
2424
import java.util.concurrent.CountDownLatch;
2525
import java.util.concurrent.atomic.AtomicInteger;
26-
2726
import org.apache.pulsar.broker.BrokerTestUtil;
2827
import org.apache.pulsar.broker.service.Dispatcher;
2928
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
@@ -38,8 +37,6 @@
3837
import org.testng.Assert;
3938
import org.testng.annotations.Test;
4039

41-
import static org.awaitility.Awaitility.await;
42-
4340
@Test(groups = "flaky")
4441
public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchThrottlingTest {
4542
private static final Logger log = LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class);
@@ -243,6 +240,7 @@ private void testMessageNotDuplicated(SubscriptionType subscription) throws Exce
243240
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
244241
admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
245242
admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
243+
long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
246244
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
247245

248246
final int numProducedMessages = 30;
@@ -302,6 +300,9 @@ private void testMessageNotDuplicated(SubscriptionType subscription) throws Exce
302300

303301
consumer.close();
304302
producer.close();
303+
304+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes));
305+
305306
admin.topics().delete(topicName, true);
306307
admin.namespaces().deleteNamespace(namespace);
307308
}
@@ -417,6 +418,7 @@ private void testDispatchRate(SubscriptionType subscription,
417418
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
418419
admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
419420
admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
421+
long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
420422
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
421423

422424
final int numProducedMessages = 30;
@@ -480,6 +482,7 @@ private void testDispatchRate(SubscriptionType subscription,
480482

481483
consumer.close();
482484
producer.close();
485+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes));
483486
admin.topics().delete(topicName, true);
484487
admin.namespaces().deleteNamespace(namespace);
485488
}
@@ -532,6 +535,7 @@ public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(Subscri
532535
final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace2 + "/throttlingAll");
533536
final String subName = "my-subscriber-name-" + subscription;
534537

538+
long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
535539
final int byteRate = 1000;
536540
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + byteRate);
537541
admin.namespaces().createNamespace(namespace1, Sets.newHashSet("test"));
@@ -591,6 +595,7 @@ public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(Subscri
591595
consumer2.close();
592596
producer1.close();
593597
producer2.close();
598+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes));
594599
log.info("-- Exiting {} test --", methodName);
595600
}
596601

@@ -739,7 +744,9 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription)
739744

740745
consumer.close();
741746
producer.close();
742-
pulsar.getConfiguration().setDispatchThrottlingRatePerSubscriptionInMsg(initValue);
747+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg",
748+
Integer.toString(initValue));
749+
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false);
743750
log.info("-- Exiting {} test --", methodName);
744751
}
745752

@@ -855,7 +862,8 @@ public void testClusterPolicyOverrideConfiguration() throws Exception {
855862

856863
producer.close();
857864
producer2.close();
858-
865+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg",
866+
Integer.toString(initValue));
859867
log.info("-- Exiting {} test --", methodName);
860868
}
861869

0 commit comments

Comments
 (0)