Skip to content

Commit 1cd1aef

Browse files
authored
[improve][broker] PIP-192 Added ServiceUnitStateCompactionStrategy (#19045)
1 parent 72b2e7e commit 1cd1aef

File tree

11 files changed

+1168
-54
lines changed

11 files changed

+1168
-54
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@
146146
import org.apache.pulsar.common.util.ThreadDumpUtil;
147147
import org.apache.pulsar.common.util.netty.EventLoopUtil;
148148
import org.apache.pulsar.compaction.Compactor;
149+
import org.apache.pulsar.compaction.StrategicTwoPhaseCompactor;
149150
import org.apache.pulsar.compaction.TwoPhaseCompactor;
150151
import org.apache.pulsar.functions.worker.ErrorNotifier;
151152
import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -198,6 +199,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
198199
private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED;
199200
private BookKeeperClientFactory bkClientFactory;
200201
private Compactor compactor;
202+
private StrategicTwoPhaseCompactor strategicCompactor;
201203
private ResourceUsageTransportManager resourceUsageTransportManager;
202204
private ResourceGroupService resourceGroupServiceManager;
203205

@@ -1473,6 +1475,19 @@ public Compactor getNullableCompactor() {
14731475
return this.compactor;
14741476
}
14751477

1478+
public StrategicTwoPhaseCompactor newStrategicCompactor() throws PulsarServerException {
1479+
return new StrategicTwoPhaseCompactor(this.getConfiguration(),
1480+
getClient(), getBookKeeperClient(),
1481+
getCompactorExecutor());
1482+
}
1483+
1484+
public synchronized StrategicTwoPhaseCompactor getStrategicCompactor() throws PulsarServerException {
1485+
if (this.strategicCompactor == null) {
1486+
this.strategicCompactor = newStrategicCompactor();
1487+
}
1488+
return this.strategicCompactor;
1489+
}
1490+
14761491
protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
14771492
if (this.offloaderScheduler == null) {
14781493
this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ public enum ServiceUnitState {
6666
Splitting; // the service unit(e.g. bundle) is in the process of splitting.
6767

6868
private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
69-
Free, Set.of(Owned, Assigned),
69+
// (Free -> Released | Splitting) transitions are required
70+
// when the topic is compacted in the middle of transfer or split.
71+
Free, Set.of(Owned, Assigned, Released, Splitting),
7072
Owned, Set.of(Assigned, Splitting, Free),
7173
Assigned, Set.of(Owned, Released, Free),
7274
Released, Set.of(Owned, Free),

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java

Lines changed: 42 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import java.util.concurrent.ScheduledFuture;
4444
import java.util.concurrent.TimeUnit;
4545
import java.util.concurrent.TimeoutException;
46-
import java.util.concurrent.atomic.AtomicInteger;
46+
import java.util.concurrent.atomic.AtomicLong;
4747
import lombok.extern.slf4j.Slf4j;
4848
import org.apache.commons.lang3.StringUtils;
4949
import org.apache.commons.lang3.mutable.MutableInt;
@@ -101,7 +101,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
101101
private long totalCleanupCnt = 0;
102102
private long totalBrokerCleanupTombstoneCnt = 0;
103103
private long totalServiceUnitCleanupTombstoneCnt = 0;
104-
private long totalServiceUnitCleanupErrorCnt = 0;
104+
private AtomicLong totalCleanupErrorCnt = new AtomicLong();
105105
private long totalCleanupScheduledCnt = 0;
106106
private long totalCleanupIgnoredCnt = 0;
107107
private long totalCleanupCancelledCnt = 0;
@@ -175,10 +175,11 @@ public synchronized void start() throws PulsarServerException {
175175
}
176176
tableview = pulsar.getClient().newTableViewBuilder(schema)
177177
.topic(TOPIC)
178-
// TODO: enable CompactionStrategy
178+
.loadConf(Map.of(
179+
"topicCompactionStrategyClassName",
180+
ServiceUnitStateCompactionStrategy.class.getName()))
179181
.create();
180-
// TODO: schedule listen instead of foreachAndListen
181-
tableview.forEachAndListen((key, value) -> handle(key, value));
182+
tableview.listen((key, value) -> handle(key, value));
182183
log.debug("Successfully started the channel tableview.");
183184

184185
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
@@ -332,8 +333,6 @@ private void handle(String serviceUnit, ServiceUnitStateData data) {
332333
}
333334

334335
ServiceUnitState state = data == null ? Free : data.state();
335-
336-
// TODO : Add state validation in tableview by the compaction strategy
337336
switch (state) {
338337
case Owned -> handleOwnEvent(serviceUnit, data);
339338
case Assigned -> handleAssignEvent(serviceUnit, data);
@@ -599,7 +598,16 @@ private void scheduleCleanup(String broker, long delayInSecs) {
599598
.delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor());
600599
totalCleanupScheduledCnt++;
601600
return CompletableFuture
602-
.runAsync(() -> doCleanup(broker), delayed);
601+
.runAsync(() -> {
602+
try {
603+
doCleanup(broker);
604+
} catch (Throwable e) {
605+
log.error("Failed to run the cleanup job for the broker {}, "
606+
+ "totalCleanupErrorCnt:{}.",
607+
broker, totalCleanupErrorCnt.incrementAndGet(), e);
608+
}
609+
}
610+
, delayed);
603611
});
604612

605613
log.info("Scheduled ownership cleanup for broker:{} with delay:{} secs. Pending clean jobs:{}.",
@@ -610,23 +618,23 @@ private void scheduleCleanup(String broker, long delayInSecs) {
610618
private void doCleanup(String broker) {
611619
long startTime = System.nanoTime();
612620
log.info("Started ownership cleanup for the inactive broker:{}", broker);
613-
AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger();
614-
AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger();
621+
int serviceUnitTombstoneCnt = 0;
622+
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
615623
for (Map.Entry<String, ServiceUnitStateData> etr : tableview.entrySet()) {
616624
ServiceUnitStateData stateData = etr.getValue();
617625
String serviceUnit = etr.getKey();
618626
if (StringUtils.equals(broker, stateData.broker())
619627
|| StringUtils.equals(broker, stateData.sourceBroker())) {
620628
log.info("Cleaning ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData);
621629
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
622-
if (e == null) {
623-
serviceUnitTombstoneCnt.incrementAndGet();
624-
} else {
625-
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.",
626-
serviceUnit, stateData);
627-
serviceUnitTombstoneErrorCnt.incrementAndGet();
630+
if (e != null) {
631+
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, "
632+
+ "cleanupErrorCnt:{}.",
633+
serviceUnit, stateData,
634+
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart);
628635
}
629636
});
637+
serviceUnitTombstoneCnt++;
630638
}
631639
}
632640

@@ -636,26 +644,22 @@ private void doCleanup(String broker) {
636644
log.error("Failed to flush the in-flight messages.", e);
637645
}
638646

639-
if (serviceUnitTombstoneCnt.get() > 0) {
647+
if (serviceUnitTombstoneCnt > 0) {
640648
this.totalCleanupCnt++;
641-
this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt.get();
649+
this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt;
642650
this.totalBrokerCleanupTombstoneCnt++;
643651
}
644652

645-
if (serviceUnitTombstoneErrorCnt.get() > 0) {
646-
this.totalServiceUnitCleanupErrorCnt += serviceUnitTombstoneErrorCnt.get();
647-
}
648-
649653
double cleanupTime = TimeUnit.NANOSECONDS
650654
.toMillis((System.nanoTime() - startTime));
651655
// TODO: clean load data stores
652656
log.info("Completed a cleanup for the inactive broker:{} in {} ms. "
653657
+ "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, "
654-
+ "serviceUnitTombstoneErrorCnt:{}, metrics:{} ",
658+
+ "approximate cleanupErrorCnt:{}, metrics:{} ",
655659
broker,
656660
cleanupTime,
657661
serviceUnitTombstoneCnt,
658-
serviceUnitTombstoneErrorCnt,
662+
totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
659663
printCleanupMetrics());
660664
cleanupJobs.remove(broker);
661665
}
@@ -675,8 +679,8 @@ private void monitorOwnerships(List<String> brokers) {
675679
long startTime = System.nanoTime();
676680
Set<String> inactiveBrokers = new HashSet<>();
677681
Set<String> activeBrokers = new HashSet<>(brokers);
678-
AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger();
679-
AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger();
682+
int serviceUnitTombstoneCnt = 0;
683+
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
680684
long now = System.currentTimeMillis();
681685
for (Map.Entry<String, ServiceUnitStateData> etr : tableview.entrySet()) {
682686
String serviceUnit = etr.getKey();
@@ -690,14 +694,14 @@ private void monitorOwnerships(List<String> brokers) {
690694
serviceUnit, stateData);
691695

692696
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
693-
if (e == null) {
694-
serviceUnitTombstoneCnt.incrementAndGet();
695-
} else {
696-
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.",
697-
serviceUnit, stateData);
698-
serviceUnitTombstoneErrorCnt.incrementAndGet();
697+
if (e != null) {
698+
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, "
699+
+ "cleanupErrorCnt:{}.",
700+
serviceUnit, stateData,
701+
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart);
699702
}
700703
});
704+
serviceUnitTombstoneCnt++;
701705
}
702706
}
703707

@@ -711,36 +715,35 @@ private void monitorOwnerships(List<String> brokers) {
711715
log.error("Failed to flush the in-flight messages.", e);
712716
}
713717

714-
if (serviceUnitTombstoneCnt.get() > 0) {
715-
this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt.get();
718+
if (serviceUnitTombstoneCnt > 0) {
719+
this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt;
716720
}
717-
this.totalServiceUnitCleanupErrorCnt += serviceUnitTombstoneErrorCnt.get();
718721

719722
double monitorTime = TimeUnit.NANOSECONDS
720723
.toMillis((System.nanoTime() - startTime));
721724
log.info("Completed the ownership monitor run in {} ms. "
722725
+ "Scheduled cleanups for inactiveBrokers:{}. inactiveBrokerCount:{}. "
723726
+ "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, "
724-
+ "serviceUnitTombstoneErrorCnt:{}, metrics:{} ",
727+
+ "approximate cleanupErrorCnt:{}, metrics:{} ",
725728
monitorTime,
726729
inactiveBrokers,
727730
inactiveBrokers.size(),
728731
serviceUnitTombstoneCnt,
729-
serviceUnitTombstoneErrorCnt,
732+
totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
730733
printCleanupMetrics());
731734

732735
}
733736

734737
private String printCleanupMetrics() {
735738
return String.format(
736739
"{totalCleanupCnt:%d, totalBrokerCleanupTombstoneCnt:%d, "
737-
+ "totalServiceUnitCleanupTombstoneCnt:%d, totalServiceUnitCleanupErrorCnt:%d, "
740+
+ "totalServiceUnitCleanupTombstoneCnt:%d, totalCleanupErrorCnt:%d, "
738741
+ "totalCleanupScheduledCnt%d, totalCleanupIgnoredCnt:%d, totalCleanupCancelledCnt:%d, "
739742
+ " activeCleanupJobs:%d}",
740743
totalCleanupCnt,
741744
totalBrokerCleanupTombstoneCnt,
742745
totalServiceUnitCleanupTombstoneCnt,
743-
totalServiceUnitCleanupErrorCnt,
746+
totalCleanupErrorCnt.get(),
744747
totalCleanupScheduledCnt,
745748
totalCleanupIgnoredCnt,
746749
totalCleanupCancelledCnt,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.channel;
20+
21+
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Assigned;
22+
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Free;
23+
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Owned;
24+
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Released;
25+
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.Splitting;
26+
import com.google.common.annotations.VisibleForTesting;
27+
import org.apache.commons.lang.StringUtils;
28+
import org.apache.pulsar.client.api.Schema;
29+
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
30+
31+
public class ServiceUnitStateCompactionStrategy implements TopicCompactionStrategy<ServiceUnitStateData> {
32+
33+
private final Schema<ServiceUnitStateData> schema;
34+
35+
private boolean checkBrokers = true;
36+
37+
public ServiceUnitStateCompactionStrategy() {
38+
schema = Schema.JSON(ServiceUnitStateData.class);
39+
}
40+
41+
@Override
42+
public Schema<ServiceUnitStateData> getSchema() {
43+
return schema;
44+
}
45+
46+
@VisibleForTesting
47+
public void checkBrokers(boolean check) {
48+
this.checkBrokers = check;
49+
}
50+
51+
@Override
52+
public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to) {
53+
ServiceUnitState prevState = from == null ? Free : from.state();
54+
ServiceUnitState state = to == null ? Free : to.state();
55+
if (!ServiceUnitState.isValidTransition(prevState, state)) {
56+
return true;
57+
}
58+
59+
if (checkBrokers) {
60+
if (prevState == Free && (state == Assigned || state == Owned)) {
61+
// Free -> Assigned || Owned broker check
62+
return StringUtils.isBlank(to.broker());
63+
} else if (prevState == Owned && state == Assigned) {
64+
// Owned -> Assigned(transfer) broker check
65+
return !StringUtils.equals(from.broker(), to.sourceBroker())
66+
|| StringUtils.isBlank(to.broker())
67+
|| StringUtils.equals(from.broker(), to.broker());
68+
} else if (prevState == Assigned && state == Released) {
69+
// Assigned -> Released(transfer) broker check
70+
return !StringUtils.equals(from.broker(), to.broker())
71+
|| !StringUtils.equals(from.sourceBroker(), to.sourceBroker());
72+
} else if (prevState == Released && state == Owned) {
73+
// Released -> Owned(transfer) broker check
74+
return !StringUtils.equals(from.broker(), to.broker())
75+
|| !StringUtils.equals(from.sourceBroker(), to.sourceBroker());
76+
} else if (prevState == Assigned && state == Owned) {
77+
// Assigned -> Owned broker check
78+
return !StringUtils.equals(from.broker(), to.broker())
79+
|| !StringUtils.equals(from.sourceBroker(), to.sourceBroker());
80+
} else if (prevState == Owned && state == Splitting) {
81+
// Owned -> Splitting broker check
82+
return !StringUtils.equals(from.broker(), to.broker());
83+
}
84+
}
85+
86+
return false;
87+
}
88+
89+
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@
7878
import org.apache.commons.collections4.CollectionUtils;
7979
import org.apache.commons.lang3.StringUtils;
8080
import org.apache.pulsar.broker.PulsarServerException;
81+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
82+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy;
8183
import org.apache.pulsar.broker.namespace.NamespaceService;
8284
import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
8385
import org.apache.pulsar.broker.service.AbstractReplicator;
@@ -152,6 +154,7 @@
152154
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
153155
import org.apache.pulsar.common.protocol.Commands;
154156
import org.apache.pulsar.common.protocol.schema.SchemaData;
157+
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
155158
import org.apache.pulsar.common.util.Codec;
156159
import org.apache.pulsar.common.util.DateFormatter;
157160
import org.apache.pulsar.common.util.FutureUtil;
@@ -203,6 +206,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
203206
private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
204207
private final CompactedTopic compactedTopic;
205208

209+
// TODO: Create compaction strategy from topic policy when exposing strategic compaction to users.
210+
private static Map<String, TopicCompactionStrategy> strategicCompactionMap = Map.of(
211+
ServiceUnitStateChannelImpl.TOPIC,
212+
new ServiceUnitStateCompactionStrategy());
213+
206214
private CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture(
207215
(MessageIdImpl) MessageId.earliest);
208216

@@ -1571,6 +1579,11 @@ public void checkCompaction() {
15711579
}
15721580

15731581
if (backlogEstimate > compactionThreshold) {
1582+
if (log.isDebugEnabled()) {
1583+
log.debug(
1584+
"topic:{} backlogEstimate:{} is bigger than compactionThreshold:{}. Triggering "
1585+
+ "compaction", topic, backlogEstimate, compactionThreshold);
1586+
}
15741587
try {
15751588
triggerCompaction();
15761589
} catch (AlreadyRunningException are) {
@@ -3000,7 +3013,13 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
30003013
public synchronized void triggerCompaction()
30013014
throws PulsarServerException, AlreadyRunningException {
30023015
if (currentCompaction.isDone()) {
3003-
currentCompaction = brokerService.pulsar().getCompactor().compact(topic);
3016+
3017+
if (strategicCompactionMap.containsKey(topic)) {
3018+
currentCompaction = brokerService.pulsar().getStrategicCompactor()
3019+
.compact(topic, strategicCompactionMap.get(topic));
3020+
} else {
3021+
currentCompaction = brokerService.pulsar().getCompactor().compact(topic);
3022+
}
30043023
} else {
30053024
throw new AlreadyRunningException("Compaction already in progress");
30063025
}

0 commit comments

Comments
 (0)