Skip to content

Commit 7143776

Browse files
committed
add more tests
1 parent 33d85e6 commit 7143776

4 files changed

Lines changed: 122 additions & 4 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public void startProducer() {
219219
// Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on
220220
// the remote cluster.
221221
ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder;
222-
builderImpl.getConf().setForceNoPartitioned(true);
222+
builderImpl.getConf().setNonPartitionedTopicExpected(true);
223223
return producerBuilder.createAsync().thenAccept(producer -> {
224224
setProducerAndTriggerReadEntries(producer);
225225
});
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api;
20+
21+
import lombok.extern.slf4j.Slf4j;
22+
import org.apache.pulsar.broker.BrokerTestUtil;
23+
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
24+
import org.apache.pulsar.common.naming.TopicName;
25+
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
26+
import org.apache.pulsar.common.policies.data.TopicType;
27+
import org.testng.Assert;
28+
import org.testng.annotations.AfterClass;
29+
import org.testng.annotations.BeforeClass;
30+
import org.testng.annotations.DataProvider;
31+
import org.testng.annotations.Test;
32+
33+
@Slf4j
34+
public class NonPartitionedTopicExpectedTest extends ProducerConsumerBase {
35+
36+
@BeforeClass
37+
@Override
38+
protected void setup() throws Exception {
39+
super.internalSetup();
40+
super.producerBaseSetup();
41+
}
42+
43+
@AfterClass(alwaysRun = true)
44+
@Override
45+
protected void cleanup() throws Exception {
46+
super.internalCleanup();
47+
}
48+
49+
@Test
50+
public void testWhenNonPartitionedTopicExists() throws Exception {
51+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
52+
admin.topics().createNonPartitionedTopic(topic);
53+
ProducerBuilderImpl<String> producerBuilder =
54+
(ProducerBuilderImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topic);
55+
producerBuilder.getConf().setNonPartitionedTopicExpected(true);
56+
// Verify: create successfully.
57+
Producer producer = producerBuilder.create();
58+
// cleanup.
59+
producer.close();
60+
admin.topics().delete(topic, false);
61+
}
62+
63+
@Test
64+
public void testWhenPartitionedTopicExists() throws Exception {
65+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
66+
admin.topics().createPartitionedTopic(topic, 2);
67+
ProducerBuilderImpl<String> producerBuilder =
68+
(ProducerBuilderImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topic);
69+
producerBuilder.getConf().setNonPartitionedTopicExpected(true);
70+
// Verify: failed to create.
71+
try {
72+
producerBuilder.create();
73+
Assert.fail("expected an error since producer expected a non-partitioned topic");
74+
} catch (Exception ex) {
75+
// expected an error.
76+
log.error("expected error", ex);
77+
}
78+
// cleanup.
79+
admin.topics().deletePartitionedTopic(topic, false);
80+
}
81+
82+
@DataProvider(name = "topicTypes")
83+
public Object[][] topicTypes() {
84+
return new Object[][]{
85+
{TopicType.PARTITIONED},
86+
{TopicType.NON_PARTITIONED}
87+
};
88+
}
89+
90+
@Test(dataProvider = "topicTypes")
91+
public void testWhenTopicNotExists(TopicType topicType) throws Exception {
92+
final String namespace = "public/default";
93+
final String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp");
94+
final TopicName topicName = TopicName.get(topic);
95+
AutoTopicCreationOverride.Builder policyBuilder = AutoTopicCreationOverride.builder()
96+
.topicType(topicType.toString()).allowAutoTopicCreation(true);
97+
if (topicType.equals(TopicType.PARTITIONED)) {
98+
policyBuilder.defaultNumPartitions(2);
99+
}
100+
AutoTopicCreationOverride policy = policyBuilder.build();
101+
admin.namespaces().setAutoTopicCreation(namespace, policy);
102+
103+
ProducerBuilderImpl<String> producerBuilder =
104+
(ProducerBuilderImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topic);
105+
producerBuilder.getConf().setNonPartitionedTopicExpected(true);
106+
// Verify: create successfully.
107+
Producer producer = producerBuilder.create();
108+
// Verify: only create non-partitioned topic.
109+
Assert.assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
110+
.partitionedTopicExists(topicName));
111+
Assert.assertTrue(pulsar.getNamespaceService().checkNonPartitionedTopicExists(topicName).join());
112+
113+
// cleanup.
114+
producer.close();
115+
admin.topics().delete(topic, false);
116+
admin.namespaces().removeAutoTopicCreation(namespace);
117+
}
118+
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,14 +419,14 @@ private <T> CompletableFuture<Producer<T>> createProducerAsync(String topic,
419419

420420

421421

422-
checkPartitions(topic, conf.isForceNoPartitioned(), conf.getProducerName()).thenAccept(partitions -> {
422+
checkPartitions(topic, conf.isNonPartitionedTopicExpected(), conf.getProducerName()).thenAccept(partitions -> {
423423
if (log.isDebugEnabled()) {
424424
log.debug("[{}] Received topic metadata. partitions: {}", topic, partitions);
425425
}
426426

427427
ProducerBase<T> producer;
428428
if (partitions > 0) {
429-
if (conf.isForceNoPartitioned()) {
429+
if (conf.isNonPartitionedTopicExpected()) {
430430
String errorMsg = String.format("Can not create the producer[{}] for the topic[{}] that contains {}"
431431
+ " partitions, but the producer does not support for a partitioned topic.",
432432
conf.getProducerName(), topic, partitions);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
204204

205205
private SortedMap<String, String> properties = new TreeMap<>();
206206

207-
private boolean forceNoPartitioned;
207+
private boolean isNonPartitionedTopicExpected;
208208

209209
@ApiModelProperty(
210210
name = "initialSubscriptionName",

0 commit comments

Comments
 (0)