Skip to content

Commit 34898e3

Browse files
TakaHiR07fanjianye
andauthored
[fix][broker] fix can not cleanup heartbeat data if scaling down broker (#22750)
Co-authored-by: fanjianye <fanjianye@bigo.sg>
1 parent 2b1630e commit 34898e3

2 files changed

Lines changed: 50 additions & 4 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.google.common.base.Preconditions.checkNotNull;
2222
import static org.apache.commons.lang3.StringUtils.isBlank;
2323
import static org.apache.commons.lang3.StringUtils.isNotBlank;
24+
import static org.apache.pulsar.broker.admin.impl.BrokersBase.getHeartbeatTopicName;
2425
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
2526
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
2627
import com.google.common.annotations.VisibleForTesting;
@@ -72,6 +73,7 @@
7273
import org.apache.bookkeeper.mledger.LedgerOffloader;
7374
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
7475
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
76+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
7577
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
7678
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
7779
import org.apache.bookkeeper.mledger.offload.Offloaders;
@@ -414,6 +416,41 @@ private void closeLeaderElectionService() throws Exception {
414416
}
415417
}
416418

419+
private boolean isManagedLedgerNotFoundException(Throwable e) {
420+
Throwable realCause = e.getCause();
421+
return realCause instanceof ManagedLedgerException.MetadataNotFoundException
422+
|| realCause instanceof MetadataStoreException.NotFoundException;
423+
}
424+
425+
private void deleteHeartbeatResource() {
426+
if (this.brokerService != null) {
427+
LOG.info("forcefully delete heartbeat topic when close broker");
428+
429+
String heartbeatTopicNameV1 = getHeartbeatTopicName(getBrokerId(), getConfiguration(), false);
430+
String heartbeatTopicNameV2 = getHeartbeatTopicName(getBrokerId(), getConfiguration(), true);
431+
432+
try {
433+
this.brokerService.deleteTopic(heartbeatTopicNameV1, true).get();
434+
} catch (Exception e) {
435+
if (!isManagedLedgerNotFoundException(e)) {
436+
LOG.error("Closed with errors in delete heartbeat topic [{}]",
437+
heartbeatTopicNameV1, e);
438+
}
439+
}
440+
441+
try {
442+
this.brokerService.deleteTopic(heartbeatTopicNameV2, true).get();
443+
} catch (Exception e) {
444+
if (!isManagedLedgerNotFoundException(e)) {
445+
LOG.error("Closed with errors in delete heartbeat topic [{}]",
446+
heartbeatTopicNameV2, e);
447+
}
448+
}
449+
450+
LOG.info("finish forcefully delete heartbeat topic when close broker");
451+
}
452+
}
453+
417454
@Override
418455
public void close() throws PulsarServerException {
419456
try {
@@ -460,6 +497,11 @@ public CompletableFuture<Void> closeAsync() {
460497
// It only tells the Pulsar clients that this service is not ready to serve for the lookup requests
461498
state = State.Closing;
462499

500+
if (brokerId != null) {
501+
// forcefully delete heartbeat topic when close broker
502+
deleteHeartbeatResource();
503+
}
504+
463505
// close the service in reverse order v.s. in which they are started
464506
if (this.resourceUsageTransportManager != null) {
465507
try {

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -407,13 +407,17 @@ private void checkDeadlockedThreads() {
407407
}
408408
}
409409

410+
public static String getHeartbeatTopicName(String brokerId, ServiceConfiguration configuration, boolean isV2) {
411+
NamespaceName namespaceName = isV2
412+
? NamespaceService.getHeartbeatNamespaceV2(brokerId, configuration)
413+
: NamespaceService.getHeartbeatNamespace(brokerId, configuration);
414+
return String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
415+
}
410416

411417
private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
412418
String brokerId = pulsar().getBrokerId();
413-
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
414-
? NamespaceService.getHeartbeatNamespaceV2(brokerId, pulsar().getConfiguration())
415-
: NamespaceService.getHeartbeatNamespace(brokerId, pulsar().getConfiguration());
416-
final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
419+
final String topicName =
420+
getHeartbeatTopicName(brokerId, pulsar().getConfiguration(), (topicVersion == TopicVersion.V2));
417421
LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
418422
final String messageStr = UUID.randomUUID().toString();
419423
final String subscriptionName = "healthCheck-" + messageStr;

0 commit comments

Comments
 (0)