|
117 | 117 | import org.apache.pulsar.common.policies.data.AuthAction; |
118 | 118 | import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; |
119 | 119 | import org.apache.pulsar.common.policies.data.BacklogQuota; |
| 120 | +import org.apache.pulsar.common.policies.data.ClusterDataImpl; |
120 | 121 | import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; |
121 | 122 | import org.apache.pulsar.common.policies.data.DispatchRate; |
122 | 123 | import org.apache.pulsar.common.policies.data.EntryFilters; |
@@ -4301,19 +4302,27 @@ public static CompletableFuture<PartitionedTopicMetadata> unsafeGetPartitionedTo |
4301 | 4302 | // and other vital information. Even after namespace starting deletion,, |
4302 | 4303 | // we need to access the metadata of system topics to create readers and clean up topic data. |
4303 | 4304 | // If we don't do this, it can prevent namespace deletion due to inaccessible readers. |
4304 | | - checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(), isSystemTopic(topicName)) |
4305 | | - .thenCompose(res -> pulsar.getBrokerService() |
4306 | | - .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) |
4307 | | - .thenAccept(metadata -> { |
4308 | | - if (log.isDebugEnabled()) { |
4309 | | - log.debug("Total number of partitions for topic {} is {}", topicName, |
4310 | | - metadata.partitions); |
4311 | | - } |
4312 | | - metadataFuture.complete(metadata); |
4313 | | - }).exceptionally(ex -> { |
4314 | | - metadataFuture.completeExceptionally(ex.getCause()); |
4315 | | - return null; |
4316 | | - }); |
| 4305 | + CompletableFuture<Void> clusterOwnershipCheck; |
| 4306 | + if (isSystemTopic(topicName)) { |
| 4307 | + clusterOwnershipCheck = CompletableFuture.completedFuture(null); |
| 4308 | + } else { |
| 4309 | + clusterOwnershipCheck = |
| 4310 | + checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()) |
| 4311 | + .thenApply(res -> null); |
| 4312 | + } |
| 4313 | + |
| 4314 | + clusterOwnershipCheck.thenCompose(res -> pulsar.getBrokerService() |
| 4315 | + .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) |
| 4316 | + .thenAccept(metadata -> { |
| 4317 | + if (log.isDebugEnabled()) { |
| 4318 | + log.debug("Total number of partitions for topic {} is {}", topicName, |
| 4319 | + metadata.partitions); |
| 4320 | + } |
| 4321 | + metadataFuture.complete(metadata); |
| 4322 | + }).exceptionally(ex -> { |
| 4323 | + metadataFuture.completeExceptionally(ex.getCause()); |
| 4324 | + return null; |
| 4325 | + }); |
4317 | 4326 |
|
4318 | 4327 | return metadataFuture; |
4319 | 4328 | } |
|
0 commit comments