Skip to content

Commit 7a3e508

Browse files
author
fanjianye
committed
fix can not cleanup heartbeat data if scaling down broker
1 parent 6372b9c commit 7a3e508

1 file changed

Lines changed: 39 additions & 0 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.google.common.base.Preconditions.checkNotNull;
2222
import static org.apache.commons.lang3.StringUtils.isBlank;
2323
import static org.apache.commons.lang3.StringUtils.isNotBlank;
24+
import static org.apache.pulsar.broker.admin.impl.BrokersBase.HEALTH_CHECK_TOPIC_SUFFIX;
2425
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
2526
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
2627
import com.google.common.annotations.VisibleForTesting;
@@ -72,6 +73,7 @@
7273
import org.apache.bookkeeper.mledger.LedgerOffloader;
7374
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
7475
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
76+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
7577
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
7678
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
7779
import org.apache.bookkeeper.mledger.offload.Offloaders;
@@ -438,6 +440,12 @@ public void close() throws PulsarServerException {
438440
}
439441
}
440442

443+
protected boolean isManagedLedgerNotFoundException(Throwable e) {
444+
Throwable realCause = e.getCause();
445+
return realCause instanceof ManagedLedgerException.MetadataNotFoundException
446+
|| realCause instanceof MetadataStoreException.NotFoundException;
447+
}
448+
441449
/**
442450
* Close the current pulsar service. All resources are released.
443451
*/
@@ -453,6 +461,37 @@ public CompletableFuture<Void> closeAsync() {
453461
}
454462
state = State.Closing;
455463

464+
// forcefully delete heartbeat topic when close broker
465+
if (this.brokerService != null) {
466+
LOG.info("forcefully delete heartbeat topic when close broker");
467+
NamespaceName namespaceNameV1 =
468+
NamespaceService.getHeartbeatNamespace(getAdvertisedAddress(), getConfiguration());
469+
NamespaceName namespaceNameV2 =
470+
NamespaceService.getHeartbeatNamespaceV2(getAdvertisedAddress(), getConfiguration());
471+
String heartbeatTopicNameV1 = String.format("persistent://%s/%s", namespaceNameV1, HEALTH_CHECK_TOPIC_SUFFIX);
472+
String heartbeatTopicNameV2 = String.format("persistent://%s/%s", namespaceNameV2, HEALTH_CHECK_TOPIC_SUFFIX);
473+
474+
try {
475+
this.brokerService.deleteTopic(heartbeatTopicNameV1, true).get();
476+
} catch (Exception e) {
477+
if (!isManagedLedgerNotFoundException(e)) {
478+
LOG.error("Closed with errors in delete heartbeat topic [{}]",
479+
heartbeatTopicNameV1, e);
480+
}
481+
}
482+
483+
try {
484+
this.brokerService.deleteTopic(heartbeatTopicNameV2, true).get();
485+
} catch (Exception e) {
486+
if (!isManagedLedgerNotFoundException(e)) {
487+
LOG.error("Closed with errors in delete heartbeat topic [{}]",
488+
heartbeatTopicNameV2, e);
489+
}
490+
}
491+
492+
LOG.info("finish forcefully delete heartbeat topic when close broker");
493+
}
494+
456495
// close the service in reverse order v.s. in which they are started
457496
if (this.resourceUsageTransportManager != null) {
458497
try {

0 commit comments

Comments
 (0)