@@ -67,7 +67,16 @@ public abstract class AbstractReplicator {
6767 private volatile State state = State .Stopped ;
6868
6969 protected enum State {
70- Stopped , Starting , Started , Stopping
70+ // The internal producer is stopped.
71+ Stopped ,
72+ // Trying to create a new internal producer.
73+ Starting ,
74+ // The internal producer has started, and tries copy data.
75+ Started ,
76+ // The internal producer is trying to stop.
77+ Stopping ,
78+ // The replicator is never used again. Pulsar will create a new Replicator when enable replication again.
79+ Closed
7180 }
7281
7382 public AbstractReplicator (String localCluster , Topic localTopic , String remoteCluster , String remoteTopicName ,
@@ -124,8 +133,7 @@ public synchronized void startProducer() {
124133 replicatorId , waitTimeMs / 1000.0 );
125134 }
126135 // BackOff before retrying
127- brokerService .executor ().schedule (this ::checkTopicActiveAndRetryStartProducer , waitTimeMs ,
128- TimeUnit .MILLISECONDS );
136+ scheduleCheckTopicActiveAndStartProducer (waitTimeMs );
129137 return ;
130138 }
131139 State state = STATE_UPDATER .get (this );
@@ -150,10 +158,8 @@ public synchronized void startProducer() {
150158 long waitTimeMs = backOff .next ();
151159 log .warn ("[{}] Failed to create remote producer ({}), retrying in {} s" ,
152160 replicatorId , ex .getMessage (), waitTimeMs / 1000.0 );
153-
154161 // BackOff before retrying
155- brokerService .executor ().schedule (this ::checkTopicActiveAndRetryStartProducer , waitTimeMs ,
156- TimeUnit .MILLISECONDS );
162+ scheduleCheckTopicActiveAndStartProducer (waitTimeMs );
157163 } else {
158164 log .warn ("[{}] Failed to create remote producer. Replicator state: {}" , replicatorId ,
159165 STATE_UPDATER .get (this ), ex );
@@ -163,16 +169,38 @@ public synchronized void startProducer() {
163169
164170 }
165171
166- protected void checkTopicActiveAndRetryStartProducer ( ) {
167- isLocalTopicActive ().thenAccept ( isTopicActive -> {
168- if (isTopicActive ) {
169- startProducer () ;
172+ protected void scheduleCheckTopicActiveAndStartProducer ( final long waitTimeMs ) {
173+ brokerService . executor ().schedule (() -> {
174+ if (state == State . Closed ) {
175+ return ;
170176 }
171- }).exceptionally (ex -> {
172- log .warn ("[{}] Stop retry to create producer due to topic load fail. Replicator state: {}" , replicatorId ,
173- STATE_UPDATER .get (this ), ex );
174- return null ;
175- });
177+ CompletableFuture <Optional <Topic >> topicFuture = brokerService .getTopics ().get (localTopicName );
178+ if (topicFuture == null ) {
179+ // Topic closed.
180+ return ;
181+ }
182+ topicFuture .thenAccept (optional -> {
183+ if (optional .isEmpty ()) {
184+ // Topic closed.
185+ return ;
186+ }
187+ if (optional .get () != localTopic ) {
188+ // Topic closed and created a new one, current replicator is outdated.
189+ return ;
190+ }
191+ // TODO check isClosing or Deleting.
192+ Replicator replicator = localTopic .getReplicators ().get (replicatorId );
193+ if (replicator != AbstractReplicator .this ) {
194+ // Current replicator has been closed, and created a new one.
195+ return ;
196+ }
197+ startProducer ();
198+ }).exceptionally (ex -> {
199+ log .warn ("[{}] [{}] Stop retry to create producer due to unknown error. Replicator state: {}" ,
200+ localTopicName , replicatorId , STATE_UPDATER .get (this ), ex );
201+ return null ;
202+ });
203+ }, waitTimeMs , TimeUnit .MILLISECONDS );
176204 }
177205
178206 protected CompletableFuture <Boolean > isLocalTopicActive () {
@@ -188,14 +216,14 @@ protected CompletableFuture<Boolean> isLocalTopicActive() {
188216 }, brokerService .executor ());
189217 }
190218
191- protected synchronized CompletableFuture <Void > closeProducerAsync ( ) {
219+ protected synchronized CompletableFuture <Void > closeAsync ( boolean onlyCloseProducer ) {
192220 if (producer == null ) {
193- STATE_UPDATER . set ( this , State . Stopped );
221+ updateStatus ( onlyCloseProducer );
194222 return CompletableFuture .completedFuture (null );
195223 }
196224 CompletableFuture <Void > future = producer .closeAsync ();
197225 return future .thenRun (() -> {
198- STATE_UPDATER . set ( this , State . Stopped );
226+ updateStatus ( onlyCloseProducer );
199227 this .producer = null ;
200228 // deactivate further read
201229 disableReplicatorRead ();
@@ -206,11 +234,21 @@ protected synchronized CompletableFuture<Void> closeProducerAsync() {
206234 + " retrying again in {} s" ,
207235 replicatorId , ex .getMessage (), waitTimeMs / 1000.0 );
208236 // BackOff before retrying
209- brokerService .executor ().schedule (this ::closeProducerAsync , waitTimeMs , TimeUnit .MILLISECONDS );
237+ brokerService .executor ().schedule (() -> closeAsync (onlyCloseProducer ),
238+ waitTimeMs , TimeUnit .MILLISECONDS );
210239 return null ;
211240 });
212241 }
213242
243+ protected void updateStatus (boolean onlyCloseProducer ) {
244+ if (onlyCloseProducer ) {
245+ // Only close producer.
246+ STATE_UPDATER .set (this , State .Stopped );
247+ } else {
248+ // Close replicator.
249+ STATE_UPDATER .set (this , State .Closed );
250+ }
251+ }
214252
215253 public CompletableFuture <Void > disconnect () {
216254 return disconnect (false );
@@ -239,7 +277,7 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
239277 getReplicatorReadPosition (), getNumberOfEntriesInBacklog ());
240278 }
241279
242- return closeProducerAsync ( );
280+ return closeAsync ( true );
243281 }
244282
245283 public CompletableFuture <Void > remove () {
0 commit comments