Skip to content

Commit 2d20d0e

Browse files
poorbarcodenodece
authored andcommitted
[fix] [broker] Fix compatibility issues for PIP-344 (apache#23136)
Co-authored-by: Lari Hotari <lhotari@apache.org> (cherry picked from commit 702c0b3) Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 5c59062 commit 2d20d0e

File tree

18 files changed

+306
-81
lines changed

18 files changed

+306
-81
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2525
import static java.util.concurrent.TimeUnit.SECONDS;
2626
import static org.apache.commons.lang3.StringUtils.isNotBlank;
27+
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
2728
import com.google.common.collect.Lists;
2829
import com.google.common.hash.Hashing;
2930
import io.prometheus.client.Counter;
@@ -1220,11 +1221,23 @@ public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(St
12201221
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
12211222
|| actEx instanceof PulsarAdminException.NotFoundException) {
12221223
return false;
1223-
} else {
1224-
log.error("{} Failed to get partition metadata due to redirecting fails", topic,
1225-
ex);
1226-
throw new CompletionException(ex);
1224+
} else if (actEx instanceof PulsarClientException.FeatureNotSupportedException) {
1225+
PulsarClientException.FeatureNotSupportedException fe =
1226+
(PulsarClientException.FeatureNotSupportedException) actEx;
1227+
if (fe.getFailedFeatureCheck()
1228+
== SupportsGetPartitionedMetadataWithoutAutoCreation) {
1229+
// Since the feature PIP-344 isn't supported, restore the behavior to previous
1230+
// behavior before https://github.com/apache/pulsar/pull/22838 changes.
1231+
log.info("{} Checking the existence of a non-persistent non-partitioned topic "
1232+
+ "was performed using the behavior prior to PIP-344 changes, "
1233+
+ "because the broker does not support the PIP-344 feature "
1234+
+ "'supports_get_partitioned_metadata_without_auto_creation'.",
1235+
topic);
1236+
return false;
1237+
}
12271238
}
1239+
log.error("{} Failed to get partition metadata", topic, ex);
1240+
throw new CompletionException(ex);
12281241
});
12291242
});
12301243
});

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3007,7 +3007,8 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata
30073007
return CompletableFuture.completedFuture(partitionedTopicMetadata);
30083008
}
30093009
// The partitioned topic might be created concurrently.
3010-
if (ex.getCause() instanceof MetadataStoreException.AlreadyExistsException) {
3010+
if (ex.getCause()
3011+
instanceof MetadataStoreException.AlreadyExistsException) {
30113012
log.info(
30123013
"[{}] The partitioned topic is already created, try to "
30133014
+ "refresh "

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,32 @@
1818
*/
1919
package org.apache.pulsar.broker.admin;
2020

21+
import static org.testng.Assert.assertEquals;
2122
import static org.testng.Assert.assertFalse;
2223
import static org.testng.Assert.assertTrue;
24+
import java.lang.reflect.Field;
2325
import java.net.URL;
26+
import java.util.Arrays;
2427
import java.util.List;
28+
import java.util.concurrent.CompletableFuture;
2529
import lombok.extern.slf4j.Slf4j;
30+
import org.apache.pulsar.broker.BrokerTestUtil;
2631
import org.apache.pulsar.broker.PulsarService;
2732
import org.apache.pulsar.client.admin.PulsarAdmin;
2833
import org.apache.pulsar.client.api.PulsarClient;
34+
import org.apache.pulsar.client.api.PulsarClientException;
35+
import org.apache.pulsar.client.impl.ClientCnx;
36+
import org.apache.pulsar.client.impl.ConnectionPool;
2937
import org.apache.pulsar.client.impl.PulsarClientImpl;
3038
import org.apache.pulsar.common.naming.TopicDomain;
3139
import org.apache.pulsar.common.naming.TopicName;
40+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
3241
import org.apache.pulsar.common.policies.data.TopicType;
42+
import org.apache.pulsar.common.util.FutureUtil;
43+
import org.awaitility.Awaitility;
3344
import org.testng.annotations.AfterClass;
3445
import org.testng.annotations.BeforeClass;
46+
import org.testng.annotations.DataProvider;
3547
import org.testng.annotations.Test;
3648

3749
@Test(groups = "broker-admin")
@@ -219,4 +231,80 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
219231
super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation,
220232
paramMetadataAutoCreationEnabled, isUsingHttpLookup);
221233
}
234+
235+
@DataProvider(name = "autoCreationParamsAllForNonPersistentTopic")
236+
public Object[][] autoCreationParamsAllForNonPersistentTopic(){
237+
return new Object[][]{
238+
// configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
239+
{true, true, true},
240+
{true, true, false},
241+
{true, false, true},
242+
{true, false, false},
243+
{false, true, true},
244+
{false, true, false},
245+
{false, false, true},
246+
{false, false, false}
247+
};
248+
}
249+
250+
@Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic", priority = Integer.MAX_VALUE)
251+
public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean configAllowAutoTopicCreation,
252+
boolean paramMetadataAutoCreationEnabled,
253+
boolean isUsingHttpLookup) throws Exception {
254+
modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3);
255+
256+
// Initialize the connections of internal Pulsar Client.
257+
PulsarClientImpl client1 = (PulsarClientImpl) pulsar1.getClient();
258+
PulsarClientImpl client2 = (PulsarClientImpl) pulsar2.getClient();
259+
client1.getLookup(pulsar2.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
260+
client2.getLookup(pulsar1.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
261+
262+
// Inject a not support flag into the connections initialized.
263+
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
264+
field.setAccessible(true);
265+
for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
266+
ConnectionPool pool = client.getCnxPool();
267+
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
268+
ClientCnx clientCnx = connectionFuture.join();
269+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
270+
field.set(clientCnx, false);
271+
}
272+
}
273+
// Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback
274+
// to "getPartitionsForTopic(topic, true)" behavior.
275+
int lookupPermitsBefore = getLookupRequestPermits();
276+
277+
// Verify: we will not get an un-support error.
278+
PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
279+
for (PulsarClientImpl client : clientArray) {
280+
final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
281+
try {
282+
PartitionedTopicMetadata topicMetadata = client
283+
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
284+
.join();
285+
log.info("Get topic metadata: {}", topicMetadata.partitions);
286+
} catch (Exception ex) {
287+
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
288+
assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException
289+
|| unwrapEx instanceof PulsarClientException.NotFoundException);
290+
assertFalse(ex.getMessage().contains("getting partitions without auto-creation is not supported from"
291+
+ " the broker"));
292+
}
293+
}
294+
295+
// Verify: lookup semaphore has been releases.
296+
Awaitility.await().untilAsserted(() -> {
297+
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
298+
});
299+
300+
// reset clients.
301+
for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
302+
ConnectionPool pool = client.getCnxPool();
303+
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
304+
ClientCnx clientCnx = connectionFuture.join();
305+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
306+
field.set(clientCnx, true);
307+
}
308+
}
309+
}
222310
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,21 @@
2323
import static org.testng.Assert.assertTrue;
2424
import static org.testng.Assert.fail;
2525
import com.google.common.collect.Sets;
26+
import java.lang.reflect.Field;
2627
import java.net.URL;
2728
import java.util.Collections;
2829
import java.util.List;
2930
import java.util.Optional;
31+
import java.util.concurrent.CompletableFuture;
3032
import lombok.extern.slf4j.Slf4j;
3133
import org.apache.pulsar.broker.BrokerTestUtil;
3234
import org.apache.pulsar.broker.PulsarService;
3335
import org.apache.pulsar.broker.ServiceConfiguration;
3436
import org.apache.pulsar.client.admin.PulsarAdmin;
3537
import org.apache.pulsar.client.api.PulsarClient;
3638
import org.apache.pulsar.client.api.PulsarClientException;
39+
import org.apache.pulsar.client.impl.ClientCnx;
40+
import org.apache.pulsar.client.impl.ConnectionPool;
3741
import org.apache.pulsar.client.impl.PulsarClientImpl;
3842
import org.apache.pulsar.common.naming.NamespaceName;
3943
import org.apache.pulsar.common.naming.TopicDomain;
@@ -226,6 +230,60 @@ public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) t
226230
}
227231
}
228232

233+
@Test(dataProvider = "topicDomains", priority = Integer.MAX_VALUE)
234+
public void testCompatibilityForNewClientAndOldBroker(TopicDomain topicDomain) throws Exception {
235+
modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
236+
// Initialize connections.
237+
String pulsarUrl = pulsar1.getBrokerServiceUrl();
238+
PulsarClientImpl[] clients = getClientsToTest(false);
239+
for (PulsarClientImpl client : clients) {
240+
client.getLookup(pulsarUrl).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
241+
}
242+
// Inject a not support flag into the connections initialized.
243+
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
244+
field.setAccessible(true);
245+
for (PulsarClientImpl client : clients) {
246+
ConnectionPool pool = client.getCnxPool();
247+
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
248+
ClientCnx clientCnx = connectionFuture.join();
249+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
250+
field.set(clientCnx, false);
251+
}
252+
}
253+
254+
// Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback to
255+
// "getPartitionsForTopic(topic)" behavior.
256+
int lookupPermitsBefore = getLookupRequestPermits();
257+
for (PulsarClientImpl client : clients) {
258+
// Verify: the behavior of topic creation.
259+
final String tp = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
260+
client.getPartitionedTopicMetadata(tp, false, true).join();
261+
Optional<PartitionedTopicMetadata> metadata1 = pulsar1.getPulsarResources().getNamespaceResources()
262+
.getPartitionedTopicResources()
263+
.getPartitionedTopicMetadataAsync(TopicName.get(tp), true).join();
264+
assertTrue(metadata1.isPresent());
265+
assertEquals(metadata1.get().partitions, 3);
266+
267+
// Verify: lookup semaphore has been releases.
268+
Awaitility.await().untilAsserted(() -> {
269+
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
270+
});
271+
272+
// Cleanup.
273+
admin1.topics().deletePartitionedTopic(tp, false);
274+
}
275+
276+
// reset clients.
277+
for (PulsarClientImpl client : clients) {
278+
ConnectionPool pool = client.getCnxPool();
279+
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
280+
ClientCnx clientCnx = connectionFuture.join();
281+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
282+
field.set(clientCnx, true);
283+
}
284+
}
285+
}
286+
229287
@DataProvider(name = "autoCreationParamsAll")
230288
public Object[][] autoCreationParamsAll(){
231289
return new Object[][]{
@@ -266,7 +324,7 @@ public void testGetMetadataIfNonPartitionedTopicExists(boolean configAllowAutoTo
266324
for (PulsarClientImpl client : clientArray) {
267325
// Verify: the result of get partitioned topic metadata.
268326
PartitionedTopicMetadata response =
269-
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
327+
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();
270328
assertEquals(response.partitions, 0);
271329
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
272330
assertFalse(partitionedTopics.contains(topicNameStr));
@@ -299,7 +357,7 @@ public void testGetMetadataIfPartitionedTopicExists(boolean configAllowAutoTopic
299357
for (PulsarClientImpl client : clientArray) {
300358
// Verify: the result of get partitioned topic metadata.
301359
PartitionedTopicMetadata response =
302-
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
360+
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();
303361
assertEquals(response.partitions, 3);
304362
verifyNonPartitionedTopicNeverCreated(topicNameStr);
305363

@@ -333,7 +391,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai
333391
// Case-1: normal topic.
334392
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
335393
// Verify: the result of get partitioned topic metadata.
336-
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
394+
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
337395
assertEquals(response.partitions, 3);
338396
// Verify: the behavior of topic creation.
339397
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
@@ -348,7 +406,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai
348406
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
349407
// Verify: the result of get partitioned topic metadata.
350408
PartitionedTopicMetadata response2 =
351-
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
409+
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
352410
assertEquals(response2.partitions, 0);
353411
// Verify: the behavior of topic creation.
354412
List<String> partitionedTopics2 =
@@ -381,7 +439,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo
381439
// Case 1: normal topic.
382440
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
383441
// Verify: the result of get partitioned topic metadata.
384-
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
442+
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
385443
assertEquals(response.partitions, 0);
386444
// Verify: the behavior of topic creation.
387445
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
@@ -393,7 +451,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo
393451
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
394452
// Verify: the result of get partitioned topic metadata.
395453
PartitionedTopicMetadata response2 =
396-
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
454+
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
397455
assertEquals(response2.partitions, 0);
398456
// Verify: the behavior of topic creation.
399457
List<String> partitionedTopics2 =
@@ -444,7 +502,7 @@ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreati
444502
final TopicName topicName = TopicName.get(topicNameStr);
445503
// Verify: the result of get partitioned topic metadata.
446504
try {
447-
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
505+
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
448506
.join();
449507
fail("Expect a not found exception");
450508
} catch (Exception e) {
@@ -497,7 +555,7 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
497555
// Verify: the result of get partitioned topic metadata.
498556
try {
499557
PartitionedTopicMetadata topicMetadata = client
500-
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
558+
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
501559
.join();
502560
log.info("Get topic metadata: {}", topicMetadata.partitions);
503561
fail("Expected a not found ex");

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
134134
// we want to skip the "lookup" phase, because it is blocked by the HTTP API
135135
LookupService mockLookup = mock(LookupService.class);
136136
Whitebox.setInternalState(pulsarClient, "lookup", mockLookup);
137-
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean())).thenAnswer(i -> {
137+
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())).thenAnswer(i -> {
138138
return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0));
139139
});
140140
when(mockLookup.getBroker(any())).thenAnswer(i -> {

0 commit comments

Comments
 (0)