5050import java .util .concurrent .TimeUnit ;
5151import java .util .concurrent .atomic .AtomicBoolean ;
5252import java .util .concurrent .atomic .AtomicLong ;
53+ import java .util .concurrent .atomic .AtomicReference ;
5354import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
5455import java .util .function .BiFunction ;
5556import java .util .stream .Collectors ;
@@ -276,6 +277,8 @@ protected TopicStatsHelper initialValue() {
276277 @ Getter
277278 private final ExecutorService orderedExecutor ;
278279
280+ private volatile CloseFutures closeFutures ;
281+
279282 @ Getter
280283 private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics ();
281284
@@ -299,6 +302,50 @@ private static class EstimateTimeBasedBacklogQuotaCheckResult {
299302 Long estimatedOldestUnacknowledgedMessageTimestamp ;
300303 }
301304
305+ /***
306+ * We use 3 futures to prevent a new closing if there is an in-progress deletion or closing. We make Pulsar return
307+ * the in-progress one when it is called the second time.
308+ *
309+ * The topic closing will be called the below scenarios:
310+ * 1. Calling "pulsar-admin topics unload". Relate to {@link CloseFutures#waitDisconnectClients}.
311+ * 2. Namespace bundle transfer or unloading.
312+ * a. The unloading topic triggered by unloading namespace bundles will not wait for clients disconnect. Relate
313+ * to {@link CloseFutures#notWaitDisconnectClients}.
314+ * b. The unloading topic triggered by unloading namespace bundles was seperated to two steps when using
315+ * {@link ExtensibleLoadManagerImpl}.
316+ * b-1. step-1: fence the topic on the original Broker, and do not trigger reconnections of clients. Relate
317+ * to {@link CloseFutures#transferring}. This step is a half closing.
318+ * b-2. step-2: send the owner broker information to clients and disconnect clients. Relate
319+ * to {@link CloseFutures#notWaitDisconnectClients}.
320+ *
321+ * The three futures will be setting as the below rule:
322+ * Event: Topic close.
323+ * - If the first one closing is called by "close and not disconnect clients":
324+ * - {@link CloseFutures#transferring} will be initialized as "close and not disconnect clients".
325+ * - {@link CloseFutures#waitDisconnectClients} ang {@link CloseFutures#notWaitDisconnectClients} will be empty,
326+ * the second closing will do a new close after {@link CloseFutures#transferring} is completed.
327+ * - If the first one closing is called by "close and not wait for clients disconnect":
328+ * - {@link CloseFutures#waitDisconnectClients} will be initialized as "waiting for clients disconnect".
329+ * - {@link CloseFutures#notWaitDisconnectClients} ang {@link CloseFutures#transferring} will be
330+ * initialized as "not waiting for clients disconnect" .
331+ * - If the first one closing is called by "close and wait for clients disconnect", the three futures will be
332+ * initialized as "waiting for clients disconnect".
333+ * Event: Topic delete.
334+ * the three futures will be initialized as "waiting for clients disconnect".
335+ */
336+ private class CloseFutures {
337+ private final CompletableFuture <Void > transferring ;
338+ private final CompletableFuture <Void > notWaitDisconnectClients ;
339+ private final CompletableFuture <Void > waitDisconnectClients ;
340+
341+ public CloseFutures (CompletableFuture <Void > transferring , CompletableFuture <Void > waitDisconnectClients ,
342+ CompletableFuture <Void > notWaitDisconnectClients ) {
343+ this .transferring = transferring ;
344+ this .waitDisconnectClients = waitDisconnectClients ;
345+ this .notWaitDisconnectClients = notWaitDisconnectClients ;
346+ }
347+ }
348+
302349 private static class TopicStatsHelper {
303350 public double averageMsgSize ;
304351 public double aggMsgRateIn ;
@@ -1417,8 +1464,11 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
14171464 }
14181465
14191466 fenceTopicToCloseOrDelete (); // Avoid clients reconnections while deleting
1467+ // Mark the progress of close to prevent close calling concurrently.
1468+ this .closeFutures =
1469+ new CloseFutures (new CompletableFuture (), new CompletableFuture (), new CompletableFuture ());
14201470
1421- return getBrokerService ().getPulsar ().getPulsarResources ().getNamespaceResources ()
1471+ CompletableFuture < Void > res = getBrokerService ().getPulsar ().getPulsarResources ().getNamespaceResources ()
14221472 .getPartitionedTopicResources ().runWithMarkDeleteAsync (TopicName .get (topic ), () -> {
14231473 CompletableFuture <Void > deleteFuture = new CompletableFuture <>();
14241474
@@ -1528,6 +1578,11 @@ public void deleteLedgerComplete(Object ctx) {
15281578 unfenceTopicToResume ();
15291579 }
15301580 });
1581+
1582+ FutureUtil .completeAfter (closeFutures .transferring , res );
1583+ FutureUtil .completeAfter (closeFutures .notWaitDisconnectClients , res );
1584+ FutureUtil .completeAfter (closeFutures .waitDisconnectClients , res );
1585+ return res ;
15311586 } finally {
15321587 lock .writeLock ().unlock ();
15331588 }
@@ -1543,6 +1598,12 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
15431598 return close (true , closeWithoutWaitingClientDisconnect );
15441599 }
15451600
1601+ private enum CloseTypes {
1602+ transferring ,
1603+ notWaitDisconnectClients ,
1604+ waitDisconnectClients ;
1605+ }
1606+
15461607 /**
15471608 * Close this topic - close all producers and subscriptions associated with this topic.
15481609 *
@@ -1553,32 +1614,57 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
15531614 @ Override
15541615 public CompletableFuture <Void > close (
15551616 boolean disconnectClients , boolean closeWithoutWaitingClientDisconnect ) {
1556- CompletableFuture <Void > closeFuture = new CompletableFuture <>();
1557-
15581617 lock .writeLock ().lock ();
1559- try {
1560- if (!disconnectClients ) {
1561- transferring = true ;
1562- }
1618+ // Choose the close type.
1619+ CloseTypes closeType ;
1620+ if (!disconnectClients ) {
1621+ closeType = CloseTypes .transferring ;
1622+ } else if (closeWithoutWaitingClientDisconnect ) {
1623+ closeType = CloseTypes .notWaitDisconnectClients ;
1624+ } else {
15631625 // closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
15641626 // forcefully wants to close managed-ledger without waiting all resources to be closed.
1565- if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect ) {
1566- fenceTopicToCloseOrDelete ();
1627+ closeType = CloseTypes .waitDisconnectClients ;
1628+ }
1629+ /** Maybe there is a in-progress half closing task. see the section 2-b-1 of {@link CloseFutures}. **/
1630+ CompletableFuture <Void > inProgressTransferCloseTask = null ;
1631+ try {
1632+ // Return in-progress future if exists.
1633+ if (isClosingOrDeleting ) {
1634+ if (closeType == CloseTypes .transferring ) {
1635+ return closeFutures .transferring ;
1636+ }
1637+ if (closeType == CloseTypes .notWaitDisconnectClients && closeFutures .notWaitDisconnectClients != null ) {
1638+ return closeFutures .notWaitDisconnectClients ;
1639+ }
1640+ if (closeType == CloseTypes .waitDisconnectClients && closeFutures .waitDisconnectClients != null ) {
1641+ return closeFutures .waitDisconnectClients ;
1642+ }
1643+ if (transferring ) {
1644+ inProgressTransferCloseTask = closeFutures .transferring ;
1645+ }
1646+ }
1647+ fenceTopicToCloseOrDelete ();
1648+ if (closeType == CloseTypes .transferring ) {
1649+ transferring = true ;
1650+ this .closeFutures = new CloseFutures (new CompletableFuture (), null , null );
15671651 } else {
1568- log .warn ("[{}] Topic is already being closed or deleted" , topic );
1569- closeFuture .completeExceptionally (new TopicFencedException ("Topic is already fenced" ));
1570- return closeFuture ;
1652+ this .closeFutures =
1653+ new CloseFutures (new CompletableFuture (), new CompletableFuture (), new CompletableFuture ());
15711654 }
15721655 } finally {
15731656 lock .writeLock ().unlock ();
15741657 }
15751658
15761659 List <CompletableFuture <Void >> futures = new ArrayList <>();
1660+ if (inProgressTransferCloseTask != null ) {
1661+ futures .add (inProgressTransferCloseTask );
1662+ }
15771663
15781664 futures .add (transactionBuffer .closeAsync ());
15791665 replicators .forEach ((cluster , replicator ) -> futures .add (replicator .terminate ()));
15801666 shadowReplicators .forEach ((__ , replicator ) -> futures .add (replicator .terminate ()));
1581- if (disconnectClients ) {
1667+ if (closeType != CloseTypes . transferring ) {
15821668 futures .add (ExtensibleLoadManagerImpl .getAssignedBrokerLookupData (
15831669 brokerService .getPulsar (), topic ).thenAccept (lookupData -> {
15841670 producers .values ().forEach (producer -> futures .add (producer .disconnect (lookupData )));
@@ -1616,40 +1702,79 @@ public CompletableFuture<Void> close(
16161702 }
16171703 }
16181704
1619- CompletableFuture <Void > clientCloseFuture = closeWithoutWaitingClientDisconnect
1620- ? CompletableFuture .completedFuture (null )
1621- : FutureUtil .waitForAll (futures );
1705+ CompletableFuture <Void > disconnectClientsInCurrentCall = null ;
1706+ // Note: "disconnectClientsToCache" is a non-able value, it is null when close type is transferring.
1707+ AtomicReference <CompletableFuture <Void >> disconnectClientsToCache = new AtomicReference <>();
1708+ switch (closeType ) {
1709+ case transferring -> {
1710+ disconnectClientsInCurrentCall = FutureUtil .waitForAll (futures );
1711+ break ;
1712+ }
1713+ case notWaitDisconnectClients -> {
1714+ disconnectClientsInCurrentCall = CompletableFuture .completedFuture (null );
1715+ disconnectClientsToCache .set (FutureUtil .waitForAll (futures ));
1716+ break ;
1717+ }
1718+ case waitDisconnectClients -> {
1719+ disconnectClientsInCurrentCall = FutureUtil .waitForAll (futures );
1720+ disconnectClientsToCache .set (disconnectClientsInCurrentCall );
1721+ }
1722+ }
16221723
1623- clientCloseFuture .thenRun (() -> {
1624- // After having disconnected all producers/consumers, close the managed ledger
1625- ledger .asyncClose (new CloseCallback () {
1626- @ Override
1627- public void closeComplete (Object ctx ) {
1628- if (disconnectClients ) {
1629- // Everything is now closed, remove the topic from map
1630- disposeTopic (closeFuture );
1631- } else {
1632- closeFuture .complete (null );
1633- }
1724+ CompletableFuture <Void > closeFuture = new CompletableFuture <>();
1725+ Runnable closeLedgerAfterCloseClients = (() -> ledger .asyncClose (new CloseCallback () {
1726+ @ Override
1727+ public void closeComplete (Object ctx ) {
1728+ if (closeType != CloseTypes .transferring ) {
1729+ // Everything is now closed, remove the topic from map
1730+ disposeTopic (closeFuture );
1731+ } else {
1732+ closeFuture .complete (null );
16341733 }
1734+ }
16351735
1636- @ Override
1637- public void closeFailed (ManagedLedgerException exception , Object ctx ) {
1638- log .error ("[{}] Failed to close managed ledger, proceeding anyway." , topic , exception );
1639- if (disconnectClients ) {
1640- disposeTopic (closeFuture );
1641- } else {
1642- closeFuture .complete (null );
1643- }
1736+ @ Override
1737+ public void closeFailed (ManagedLedgerException exception , Object ctx ) {
1738+ log .error ("[{}] Failed to close managed ledger, proceeding anyway." , topic , exception );
1739+ if (closeType != CloseTypes .transferring ) {
1740+ disposeTopic (closeFuture );
1741+ } else {
1742+ closeFuture .complete (null );
16441743 }
1645- }, null );
1646- }).exceptionally (exception -> {
1744+ }
1745+ }, null ));
1746+
1747+ disconnectClientsInCurrentCall .thenRun (closeLedgerAfterCloseClients ).exceptionally (exception -> {
16471748 log .error ("[{}] Error closing topic" , topic , exception );
16481749 unfenceTopicToResume ();
16491750 closeFuture .completeExceptionally (exception );
16501751 return null ;
16511752 });
16521753
1754+ switch (closeType ) {
1755+ case transferring -> {
1756+ FutureUtil .completeAfterAll (closeFutures .transferring , closeFuture );
1757+ break ;
1758+ }
1759+ case notWaitDisconnectClients -> {
1760+ FutureUtil .completeAfterAll (closeFutures .transferring , closeFuture );
1761+ FutureUtil .completeAfter (closeFutures .notWaitDisconnectClients , closeFuture );
1762+ FutureUtil .completeAfterAll (closeFutures .waitDisconnectClients ,
1763+ closeFuture .thenCompose (ignore -> disconnectClientsToCache .get ().exceptionally (ex -> {
1764+ // Since the managed ledger has been closed, eat the error of clients disconnection.
1765+ log .error ("[{}] Closed managed ledger, but disconnect clients failed,"
1766+ + " this topic will be marked closed" , topic , ex );
1767+ return null ;
1768+ })));
1769+ break ;
1770+ }
1771+ case waitDisconnectClients -> {
1772+ FutureUtil .completeAfterAll (closeFutures .transferring , closeFuture );
1773+ FutureUtil .completeAfter (closeFutures .notWaitDisconnectClients , closeFuture );
1774+ FutureUtil .completeAfterAll (closeFutures .waitDisconnectClients , closeFuture );
1775+ }
1776+ }
1777+
16531778 return closeFuture ;
16541779 }
16551780
0 commit comments