Skip to content

Commit 46e2fa3

Browse files
poorbarcodelhotari
andcommitted
[fix] [broker] Fix compatibility issues for PIP-344 (#23136)
Co-authored-by: Lari Hotari <lhotari@apache.org> (cherry picked from commit 702c0b3)
1 parent b7959ca commit 46e2fa3

17 files changed

Lines changed: 272 additions & 43 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2424
import static java.util.concurrent.TimeUnit.SECONDS;
2525
import static org.apache.commons.lang3.StringUtils.isNotBlank;
26+
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
2627
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
2728
import com.google.common.hash.Hashing;
2829
import io.opentelemetry.api.common.AttributeKey;
@@ -1498,8 +1499,22 @@ public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(St
14981499
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
14991500
|| actEx instanceof PulsarAdminException.NotFoundException) {
15001501
return CompletableFuture.completedFuture(false);
1502+
} else if (actEx instanceof PulsarClientException.FeatureNotSupportedException fe){
1503+
if (fe.getFailedFeatureCheck() == SupportsGetPartitionedMetadataWithoutAutoCreation) {
1504+
// Since the feature PIP-344 isn't supported, restore the behavior to previous
1505+
// behavior before https://github.com/apache/pulsar/pull/22838 changes.
1506+
log.info("{} Checking the existence of a non-persistent non-partitioned topic "
1507+
+ "was performed using the behavior prior to PIP-344 changes, "
1508+
+ "because the broker does not support the PIP-344 feature "
1509+
+ "'supports_get_partitioned_metadata_without_auto_creation'.",
1510+
topic);
1511+
return CompletableFuture.completedFuture(false);
1512+
} else {
1513+
log.error("{} Failed to get partition metadata", topic, ex);
1514+
return CompletableFuture.failedFuture(ex);
1515+
}
15011516
} else {
1502-
log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex);
1517+
log.error("{} Failed to get partition metadata", topic, ex);
15031518
return CompletableFuture.failedFuture(ex);
15041519
}
15051520
});

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,32 @@
1818
*/
1919
package org.apache.pulsar.broker.admin;
2020

21+
import static org.testng.Assert.assertEquals;
2122
import static org.testng.Assert.assertFalse;
2223
import static org.testng.Assert.assertTrue;
24+
import java.lang.reflect.Field;
2325
import java.net.URL;
26+
import java.util.Arrays;
2427
import java.util.List;
28+
import java.util.concurrent.CompletableFuture;
2529
import lombok.extern.slf4j.Slf4j;
30+
import org.apache.pulsar.broker.BrokerTestUtil;
2631
import org.apache.pulsar.broker.PulsarService;
2732
import org.apache.pulsar.client.admin.PulsarAdmin;
2833
import org.apache.pulsar.client.api.PulsarClient;
34+
import org.apache.pulsar.client.api.PulsarClientException;
35+
import org.apache.pulsar.client.impl.ClientCnx;
36+
import org.apache.pulsar.client.impl.ConnectionPool;
2937
import org.apache.pulsar.client.impl.PulsarClientImpl;
3038
import org.apache.pulsar.common.naming.TopicDomain;
3139
import org.apache.pulsar.common.naming.TopicName;
40+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
3241
import org.apache.pulsar.common.policies.data.TopicType;
42+
import org.apache.pulsar.common.util.FutureUtil;
43+
import org.awaitility.Awaitility;
3344
import org.testng.annotations.AfterClass;
3445
import org.testng.annotations.BeforeClass;
46+
import org.testng.annotations.DataProvider;
3547
import org.testng.annotations.Test;
3648

3749
@Test(groups = "broker-admin")
@@ -219,4 +231,80 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
219231
super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation,
220232
paramMetadataAutoCreationEnabled, isUsingHttpLookup);
221233
}
234+
235+
@DataProvider(name = "autoCreationParamsAllForNonPersistentTopic")
236+
public Object[][] autoCreationParamsAllForNonPersistentTopic(){
237+
return new Object[][]{
238+
// configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
239+
{true, true, true},
240+
{true, true, false},
241+
{true, false, true},
242+
{true, false, false},
243+
{false, true, true},
244+
{false, true, false},
245+
{false, false, true},
246+
{false, false, false}
247+
};
248+
}
249+
250+
@Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic", priority = Integer.MAX_VALUE)
251+
public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean configAllowAutoTopicCreation,
252+
boolean paramMetadataAutoCreationEnabled,
253+
boolean isUsingHttpLookup) throws Exception {
254+
modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3);
255+
256+
// Initialize the connections of internal Pulsar Client.
257+
PulsarClientImpl client1 = (PulsarClientImpl) pulsar1.getClient();
258+
PulsarClientImpl client2 = (PulsarClientImpl) pulsar2.getClient();
259+
client1.getLookup(pulsar2.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
260+
client2.getLookup(pulsar1.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
261+
262+
// Inject a not support flag into the connections initialized.
263+
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
264+
field.setAccessible(true);
265+
for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
266+
ConnectionPool pool = client.getCnxPool();
267+
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
268+
ClientCnx clientCnx = connectionFuture.join();
269+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
270+
field.set(clientCnx, false);
271+
}
272+
}
273+
// Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback
274+
// to "getPartitionsForTopic(topic, true)" behavior.
275+
int lookupPermitsBefore = getLookupRequestPermits();
276+
277+
// Verify: we will not get an un-support error.
278+
PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
279+
for (PulsarClientImpl client : clientArray) {
280+
final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
281+
try {
282+
PartitionedTopicMetadata topicMetadata = client
283+
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
284+
.join();
285+
log.info("Get topic metadata: {}", topicMetadata.partitions);
286+
} catch (Exception ex) {
287+
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
288+
assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException
289+
|| unwrapEx instanceof PulsarClientException.NotFoundException);
290+
assertFalse(ex.getMessage().contains("getting partitions without auto-creation is not supported from"
291+
+ " the broker"));
292+
}
293+
}
294+
295+
// Verify: lookup semaphore has been releases.
296+
Awaitility.await().untilAsserted(() -> {
297+
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
298+
});
299+
300+
// reset clients.
301+
for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
302+
ConnectionPool pool = client.getCnxPool();
303+
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
304+
ClientCnx clientCnx = connectionFuture.join();
305+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
306+
field.set(clientCnx, true);
307+
}
308+
}
309+
}
222310
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,21 @@
2323
import static org.testng.Assert.assertTrue;
2424
import static org.testng.Assert.fail;
2525
import com.google.common.collect.Sets;
26+
import java.lang.reflect.Field;
2627
import java.net.URL;
2728
import java.util.Collections;
2829
import java.util.List;
2930
import java.util.Optional;
31+
import java.util.concurrent.CompletableFuture;
3032
import lombok.extern.slf4j.Slf4j;
3133
import org.apache.pulsar.broker.BrokerTestUtil;
3234
import org.apache.pulsar.broker.PulsarService;
3335
import org.apache.pulsar.broker.ServiceConfiguration;
3436
import org.apache.pulsar.client.admin.PulsarAdmin;
3537
import org.apache.pulsar.client.api.PulsarClient;
3638
import org.apache.pulsar.client.api.PulsarClientException;
39+
import org.apache.pulsar.client.impl.ClientCnx;
40+
import org.apache.pulsar.client.impl.ConnectionPool;
3741
import org.apache.pulsar.client.impl.PulsarClientImpl;
3842
import org.apache.pulsar.common.naming.NamespaceName;
3943
import org.apache.pulsar.common.naming.TopicDomain;
@@ -225,6 +229,60 @@ public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) t
225229
}
226230
}
227231

232+
@Test(dataProvider = "topicDomains", priority = Integer.MAX_VALUE)
233+
public void testCompatibilityForNewClientAndOldBroker(TopicDomain topicDomain) throws Exception {
234+
modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
235+
// Initialize connections.
236+
String pulsarUrl = pulsar1.getBrokerServiceUrl();
237+
PulsarClientImpl[] clients = getClientsToTest(false);
238+
for (PulsarClientImpl client : clients) {
239+
client.getLookup(pulsarUrl).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
240+
}
241+
// Inject a not support flag into the connections initialized.
242+
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
243+
field.setAccessible(true);
244+
for (PulsarClientImpl client : clients) {
245+
ConnectionPool pool = client.getCnxPool();
246+
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
247+
ClientCnx clientCnx = connectionFuture.join();
248+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
249+
field.set(clientCnx, false);
250+
}
251+
}
252+
253+
// Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback to
254+
// "getPartitionsForTopic(topic)" behavior.
255+
int lookupPermitsBefore = getLookupRequestPermits();
256+
for (PulsarClientImpl client : clients) {
257+
// Verify: the behavior of topic creation.
258+
final String tp = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
259+
client.getPartitionedTopicMetadata(tp, false, true).join();
260+
Optional<PartitionedTopicMetadata> metadata1 = pulsar1.getPulsarResources().getNamespaceResources()
261+
.getPartitionedTopicResources()
262+
.getPartitionedTopicMetadataAsync(TopicName.get(tp), true).join();
263+
assertTrue(metadata1.isPresent());
264+
assertEquals(metadata1.get().partitions, 3);
265+
266+
// Verify: lookup semaphore has been releases.
267+
Awaitility.await().untilAsserted(() -> {
268+
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
269+
});
270+
271+
// Cleanup.
272+
admin1.topics().deletePartitionedTopic(tp, false);
273+
}
274+
275+
// reset clients.
276+
for (PulsarClientImpl client : clients) {
277+
ConnectionPool pool = client.getCnxPool();
278+
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
279+
ClientCnx clientCnx = connectionFuture.join();
280+
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
281+
field.set(clientCnx, true);
282+
}
283+
}
284+
}
285+
228286
@DataProvider(name = "autoCreationParamsAll")
229287
public Object[][] autoCreationParamsAll(){
230288
return new Object[][]{
@@ -265,7 +323,7 @@ public void testGetMetadataIfNonPartitionedTopicExists(boolean configAllowAutoTo
265323
for (PulsarClientImpl client : clientArray) {
266324
// Verify: the result of get partitioned topic metadata.
267325
PartitionedTopicMetadata response =
268-
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
326+
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();
269327
assertEquals(response.partitions, 0);
270328
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
271329
assertFalse(partitionedTopics.contains(topicNameStr));
@@ -298,7 +356,7 @@ public void testGetMetadataIfPartitionedTopicExists(boolean configAllowAutoTopic
298356
for (PulsarClientImpl client : clientArray) {
299357
// Verify: the result of get partitioned topic metadata.
300358
PartitionedTopicMetadata response =
301-
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
359+
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();
302360
assertEquals(response.partitions, 3);
303361
verifyNonPartitionedTopicNeverCreated(topicNameStr);
304362

@@ -332,7 +390,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai
332390
// Case-1: normal topic.
333391
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
334392
// Verify: the result of get partitioned topic metadata.
335-
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
393+
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
336394
assertEquals(response.partitions, 3);
337395
// Verify: the behavior of topic creation.
338396
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
@@ -347,7 +405,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai
347405
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
348406
// Verify: the result of get partitioned topic metadata.
349407
PartitionedTopicMetadata response2 =
350-
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
408+
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
351409
assertEquals(response2.partitions, 0);
352410
// Verify: the behavior of topic creation.
353411
List<String> partitionedTopics2 =
@@ -380,7 +438,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo
380438
// Case 1: normal topic.
381439
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
382440
// Verify: the result of get partitioned topic metadata.
383-
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
441+
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
384442
assertEquals(response.partitions, 0);
385443
// Verify: the behavior of topic creation.
386444
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
@@ -392,7 +450,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo
392450
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
393451
// Verify: the result of get partitioned topic metadata.
394452
PartitionedTopicMetadata response2 =
395-
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
453+
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
396454
assertEquals(response2.partitions, 0);
397455
// Verify: the behavior of topic creation.
398456
List<String> partitionedTopics2 =
@@ -443,7 +501,7 @@ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreati
443501
final TopicName topicName = TopicName.get(topicNameStr);
444502
// Verify: the result of get partitioned topic metadata.
445503
try {
446-
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
504+
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
447505
.join();
448506
fail("Expect a not found exception");
449507
} catch (Exception e) {
@@ -496,7 +554,7 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
496554
// Verify: the result of get partitioned topic metadata.
497555
try {
498556
PartitionedTopicMetadata topicMetadata = client
499-
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
557+
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
500558
.join();
501559
log.info("Get topic metadata: {}", topicMetadata.partitions);
502560
fail("Expected a not found ex");

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
136136
((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
137137
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean())).thenAnswer(
138138
i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
139+
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())).thenAnswer(
140+
i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
139141
when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
140142
InetSocketAddress brokerAddress =
141143
new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ public void testTransactionBufferLowWaterMark() throws Exception {
148148

149149
PartitionedTopicMetadata partitionedTopicMetadata =
150150
((PulsarClientImpl) pulsarClient).getLookup()
151-
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get();
151+
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false)
152+
.get();
152153
Transaction lowWaterMarkTxn = null;
153154
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
154155
lowWaterMarkTxn = pulsarClient.newTransaction()
@@ -253,7 +254,8 @@ public void testPendingAckLowWaterMark() throws Exception {
253254

254255
PartitionedTopicMetadata partitionedTopicMetadata =
255256
((PulsarClientImpl) pulsarClient).getLookup()
256-
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get();
257+
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false)
258+
.get();
257259
Transaction lowWaterMarkTxn = null;
258260
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
259261
lowWaterMarkTxn = pulsarClient.newTransaction()

0 commit comments

Comments
 (0)