Skip to content

Commit bbdc173

Browse files
authored
[refactor][broker] PIP-301 Part-1: Add BundleDataResources (#21119)
### Motivation See pip: #21129 ### Modifications Add `BundleDataResources`
1 parent 9ab7417 commit bbdc173

File tree

14 files changed

+173
-93
lines changed

14 files changed

+173
-93
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.resources;
20+
21+
import java.util.Optional;
22+
import java.util.concurrent.CompletableFuture;
23+
import lombok.Getter;
24+
import org.apache.pulsar.common.naming.NamespaceName;
25+
import org.apache.pulsar.metadata.api.MetadataStore;
26+
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
27+
28+
@Getter
29+
public class LoadBalanceResources {
30+
public static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";
31+
32+
private final BundleDataResources bundleDataResources;
33+
34+
public LoadBalanceResources(MetadataStore store, int operationTimeoutSec) {
35+
bundleDataResources = new BundleDataResources(store, operationTimeoutSec);
36+
}
37+
38+
public static class BundleDataResources extends BaseResources<BundleData> {
39+
public BundleDataResources(MetadataStore store, int operationTimeoutSec) {
40+
super(store, BundleData.class, operationTimeoutSec);
41+
}
42+
43+
public CompletableFuture<Optional<BundleData>> getBundleData(String bundle) {
44+
return getAsync(getBundleDataPath(bundle));
45+
}
46+
47+
public CompletableFuture<Void> updateBundleData(String bundle, BundleData data) {
48+
return setWithCreateAsync(getBundleDataPath(bundle), __ -> data);
49+
}
50+
51+
public CompletableFuture<Void> deleteBundleData(String bundle) {
52+
return deleteAsync(getBundleDataPath(bundle));
53+
}
54+
55+
// clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in metadata-store
56+
public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
57+
final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
58+
return getStore().deleteRecursive(namespaceBundlePath);
59+
}
60+
61+
// clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
62+
public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
63+
final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
64+
return getStore().deleteRecursive(tenantBundlePath);
65+
}
66+
67+
// Get the metadata store path for the given bundle full name.
68+
private String getBundleDataPath(final String bundle) {
69+
return BUNDLE_DATA_BASE_PATH + "/" + bundle;
70+
}
71+
}
72+
}

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,15 @@ public class NamespaceResources extends BaseResources<Policies> {
4949
private final IsolationPolicyResources isolationPolicies;
5050
private final PartitionedTopicResources partitionedTopicResources;
5151
private final MetadataStore configurationStore;
52-
private final MetadataStore localStore;
5352

5453
public static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
5554
private static final String NAMESPACE_BASE_PATH = "/namespace";
56-
private static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";
5755

58-
public NamespaceResources(MetadataStore localStore, MetadataStore configurationStore, int operationTimeoutSec) {
56+
public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec) {
5957
super(configurationStore, Policies.class, operationTimeoutSec);
6058
this.configurationStore = configurationStore;
6159
isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec);
6260
partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);
63-
this.localStore = localStore;
6461
}
6562

6663
public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
@@ -379,17 +376,4 @@ public CompletableFuture<Void> runWithMarkDeleteAsync(TopicName topic,
379376
return future;
380377
}
381378
}
382-
383-
// clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in metadata-store
384-
public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
385-
final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
386-
return this.localStore.deleteRecursive(namespaceBundlePath);
387-
}
388-
389-
// clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
390-
public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
391-
final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
392-
return this.localStore.deleteRecursive(tenantBundlePath);
393-
}
394-
395379
}

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public class PulsarResources {
4848
@Getter
4949
private final TopicResources topicResources;
5050
@Getter
51+
private final LoadBalanceResources loadBalanceResources;
52+
@Getter
5153
private final Optional<MetadataStore> localMetadataStore;
5254
@Getter
5355
private final Optional<MetadataStore> configurationMetadataStore;
@@ -60,8 +62,7 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura
6062
if (configurationMetadataStore != null) {
6163
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
6264
clusterResources = new ClusterResources(configurationMetadataStore, operationTimeoutSec);
63-
namespaceResources = new NamespaceResources(localMetadataStore, configurationMetadataStore
64-
, operationTimeoutSec);
65+
namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
6566
resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
6667
} else {
6768
tenantResources = null;
@@ -76,12 +77,14 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura
7677
loadReportResources = new LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
7778
bookieResources = new BookieResources(localMetadataStore, operationTimeoutSec);
7879
topicResources = new TopicResources(localMetadataStore);
80+
loadBalanceResources = new LoadBalanceResources(localMetadataStore, operationTimeoutSec);
7981
} else {
8082
dynamicConfigResources = null;
8183
localPolicies = null;
8284
loadReportResources = null;
8385
bookieResources = null;
8486
topicResources = null;
87+
loadBalanceResources = null;
8588
}
8689

8790
this.localMetadataStore = Optional.ofNullable(localMetadataStore);
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.resources;
20+
21+
import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
22+
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.verify;
25+
import static org.testng.Assert.assertThrows;
26+
import org.apache.pulsar.common.naming.NamespaceName;
27+
import org.apache.pulsar.metadata.api.MetadataStore;
28+
import org.testng.annotations.BeforeMethod;
29+
import org.testng.annotations.Test;
30+
31+
public class LoadBalanceResourcesTest {
32+
private MetadataStore configurationStore;
33+
private MetadataStore localStore;
34+
private LoadBalanceResources loadBalanceResources;
35+
36+
@BeforeMethod
37+
public void setup() {
38+
localStore = mock(MetadataStore.class);
39+
configurationStore = mock(MetadataStore.class);
40+
loadBalanceResources = new LoadBalanceResources(localStore, 30);
41+
}
42+
43+
/**
44+
* Test that the bundle-data node is deleted from the local stores.
45+
*/
46+
@Test
47+
public void testDeleteBundleDataAsync() {
48+
NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
49+
String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
50+
loadBalanceResources.getBundleDataResources().deleteBundleDataAsync(ns);
51+
52+
String tenant="my-tenant";
53+
String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
54+
loadBalanceResources.getBundleDataResources().deleteBundleDataTenantAsync(tenant);
55+
56+
verify(localStore).deleteRecursive(namespaceBundlePath);
57+
verify(localStore).deleteRecursive(tenantBundlePath);
58+
59+
assertThrows(()-> verify(configurationStore).deleteRecursive(namespaceBundlePath));
60+
assertThrows(()-> verify(configurationStore).deleteRecursive(tenantBundlePath));
61+
}
62+
}

pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,12 @@
1818
*/
1919
package org.apache.pulsar.broker.resources;
2020

21-
import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
22-
import static org.mockito.Mockito.mock;
23-
import static org.mockito.Mockito.verify;
2421
import static org.testng.Assert.assertFalse;
25-
import static org.testng.Assert.assertThrows;
2622
import static org.testng.Assert.assertTrue;
27-
28-
import org.apache.pulsar.common.naming.NamespaceName;
29-
import org.apache.pulsar.metadata.api.MetadataStore;
30-
import org.testng.annotations.BeforeMethod;
3123
import org.testng.annotations.Test;
3224

33-
3425
public class NamespaceResourcesTest {
3526

36-
private MetadataStore localStore;
37-
private MetadataStore configurationStore;
38-
private NamespaceResources namespaceResources;
39-
40-
private static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";
41-
42-
@BeforeMethod
43-
public void setup() {
44-
localStore = mock(MetadataStore.class);
45-
configurationStore = mock(MetadataStore.class);
46-
namespaceResources = new NamespaceResources(localStore, configurationStore, 30);
47-
}
48-
4927
@Test
5028
public void test_pathIsFromNamespace() {
5129
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
@@ -54,25 +32,5 @@ public void test_pathIsFromNamespace() {
5432
assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
5533
}
5634

57-
/**
58-
* Test that the bundle-data node is deleted from the local stores.
59-
*/
60-
@Test
61-
public void testDeleteBundleDataAsync() {
62-
NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
63-
String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
64-
namespaceResources.deleteBundleDataAsync(ns);
65-
66-
String tenant="my-tenant";
67-
String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
68-
namespaceResources.deleteBundleDataTenantAsync(tenant);
69-
70-
verify(localStore).deleteRecursive(namespaceBundlePath);
71-
verify(localStore).deleteRecursive(tenantBundlePath);
72-
73-
assertThrows(()-> verify(configurationStore).deleteRecursive(namespaceBundlePath));
74-
assertThrows(()-> verify(configurationStore).deleteRecursive(tenantBundlePath));
75-
}
76-
7735

7836
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,8 @@ protected CompletableFuture<Void> internalClearZkSources() {
468468
// clear z-node of local policies
469469
.thenCompose(ignore -> getLocalPolicies().deleteLocalPoliciesAsync(namespaceName))
470470
// clear /loadbalance/bundle-data
471-
.thenCompose(ignore -> namespaceResources().deleteBundleDataAsync(namespaceName));
471+
.thenCompose(ignore ->
472+
loadBalanceResources().getBundleDataResources().deleteBundleDataAsync(namespaceName));
472473

473474
}
474475

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ protected CompletableFuture<Void> internalDeleteTenantAsync(String tenant) {
236236
.getPartitionedTopicResources().clearPartitionedTopicTenantAsync(tenant))
237237
.thenCompose(__ -> pulsar().getPulsarResources().getLocalPolicies()
238238
.deleteLocalPoliciesTenantAsync(tenant))
239-
.thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources()
239+
.thenCompose(__ -> pulsar().getPulsarResources().getLoadBalanceResources().getBundleDataResources()
240240
.deleteBundleDataTenantAsync(tenant));
241241
}
242242

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
5959
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
6060
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
61+
import org.apache.pulsar.broker.resources.PulsarResources;
6162
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
6263
import org.apache.pulsar.client.admin.PulsarAdminException;
6364
import org.apache.pulsar.client.util.ExecutorProvider;
@@ -91,9 +92,6 @@
9192
public class ModularLoadManagerImpl implements ModularLoadManager {
9293
private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class);
9394

94-
// Path to ZNode whose children contain BundleData jsons for each bundle (new API version of ResourceQuota).
95-
public static final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
96-
9795
// Default message rate to assume for unseen bundles.
9896
public static final double DEFAULT_MESSAGE_RATE = 50;
9997

@@ -120,7 +118,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
120118
private LockManager<LocalBrokerData> brokersData;
121119
private ResourceLock<LocalBrokerData> brokerDataLock;
122120

123-
private MetadataCache<BundleData> bundlesCache;
124121
private MetadataCache<ResourceQuota> resourceQuotaCache;
125122
private MetadataCache<TimeAverageBrokerData> timeAverageBrokerDataCache;
126123

@@ -172,6 +169,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
172169
// Pulsar service used to initialize this.
173170
private PulsarService pulsar;
174171

172+
private PulsarResources pulsarResources;
173+
175174
// Executor service used to update broker data.
176175
private final ExecutorService executors;
177176

@@ -243,8 +242,8 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
243242
@Override
244243
public void initialize(final PulsarService pulsar) {
245244
this.pulsar = pulsar;
245+
this.pulsarResources = pulsar.getPulsarResources();
246246
brokersData = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
247-
bundlesCache = pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
248247
resourceQuotaCache = pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
249248
timeAverageBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
250249
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
@@ -273,7 +272,7 @@ public void initialize(final PulsarService pulsar) {
273272

274273
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap);
275274
// register listeners for domain changes
276-
pulsar.getPulsarResources().getClusterResources().getFailureDomainResources()
275+
pulsarResources.getClusterResources().getFailureDomainResources()
277276
.registerListener(__ -> {
278277
executors.execute(
279278
() -> LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap));
@@ -381,7 +380,8 @@ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
381380
public BundleData getBundleDataOrDefault(final String bundle) {
382381
BundleData bundleData = null;
383382
try {
384-
Optional<BundleData> optBundleData = bundlesCache.get(getBundleDataPath(bundle)).join();
383+
Optional<BundleData> optBundleData =
384+
pulsarResources.getLoadBalanceResources().getBundleDataResources().getBundleData(bundle).join();
385385
if (optBundleData.isPresent()) {
386386
return optBundleData.get();
387387
}
@@ -418,11 +418,6 @@ public BundleData getBundleDataOrDefault(final String bundle) {
418418
return bundleData;
419419
}
420420

421-
// Get the metadata store path for the given bundle full name.
422-
public static String getBundleDataPath(final String bundle) {
423-
return BUNDLE_DATA_PATH + "/" + bundle;
424-
}
425-
426421
// Use the Pulsar client to acquire the namespace bundle stats.
427422
private Map<String, NamespaceBundleStats> getBundleStats() {
428423
return pulsar.getBrokerService().getBundleStats();
@@ -1151,8 +1146,8 @@ public void writeBundleDataOnZooKeeper() {
11511146
for (Map.Entry<String, BundleData> entry : loadData.getBundleData().entrySet()) {
11521147
final String bundle = entry.getKey();
11531148
final BundleData data = entry.getValue();
1154-
futures.add(bundlesCache.readModifyUpdateOrCreate(getBundleDataPath(bundle), __ -> data)
1155-
.thenApply(__ -> null));
1149+
futures.add(
1150+
pulsarResources.getLoadBalanceResources().getBundleDataResources().updateBundleData(bundle, data));
11561151
}
11571152

11581153
// Write the time average broker data to metadata store.
@@ -1173,7 +1168,7 @@ public void writeBundleDataOnZooKeeper() {
11731168

11741169
private void deleteBundleDataFromMetadataStore(String bundle) {
11751170
try {
1176-
bundlesCache.delete(getBundleDataPath(bundle)).join();
1171+
pulsarResources.getLoadBalanceResources().getBundleDataResources().deleteBundleData(bundle).join();
11771172
} catch (Exception e) {
11781173
if (!(e.getCause() instanceof NotFoundException)) {
11791174
log.warn("Failed to delete bundle-data {} from metadata store", bundle, e);

0 commit comments

Comments
 (0)