Skip to content

Commit 7a614c0

Browse files
committed
[fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner (#21946)
(cherry picked from commit 4924052)
1 parent fccbc18 commit 7a614c0

File tree

12 files changed

+656
-170
lines changed

12 files changed

+656
-170
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 250 additions & 82 deletions
Large diffs are not rendered by default.

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ public CompletableFuture<Void> closeAndRemoveReplicationClient(String clusterNam
689689
if (ot.isPresent()) {
690690
Replicator r = ot.get().getReplicators().get(clusterName);
691691
if (r != null && r.isConnected()) {
692-
r.disconnect(false).whenComplete((v, e) -> f.complete(null));
692+
r.terminate().whenComplete((v, e) -> f.complete(null));
693693
return;
694694
}
695695
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ public interface Replicator {
2929

3030
ReplicatorStatsImpl getStats();
3131

32-
CompletableFuture<Void> disconnect();
32+
CompletableFuture<Void> terminate();
3333

34-
CompletableFuture<Void> disconnect(boolean b);
34+
CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer);
3535

3636
void updateRates();
3737

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ protected String getProducerName() {
6767
}
6868

6969
@Override
70-
protected void readEntries(Producer<byte[]> producer) {
70+
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
7171
this.producer = (ProducerImpl) producer;
7272

7373
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
@@ -78,8 +78,7 @@ protected void readEntries(Producer<byte[]> producer) {
7878
"[{}] Replicator was stopped while creating the producer."
7979
+ " Closing it. Replicator state: {}",
8080
replicatorId, STATE_UPDATER.get(this));
81-
STATE_UPDATER.set(this, State.Stopping);
82-
closeProducerAsync();
81+
doCloseProducerAsync(producer, () -> {});
8382
return;
8483
}
8584
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
420420
CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
421421
if (closeIfClientsConnected) {
422422
List<CompletableFuture<Void>> futures = new ArrayList<>();
423-
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
423+
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
424424
producers.values().forEach(producer -> futures.add(producer.disconnect()));
425425
subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty())));
426426
FutureUtil.waitForAll(futures).thenRun(() -> {
@@ -524,7 +524,7 @@ public CompletableFuture<Void> close(
524524

525525
List<CompletableFuture<Void>> futures = new ArrayList<>();
526526

527-
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
527+
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
528528
if (disconnectClients) {
529529
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
530530
brokerService.getPulsar(), topic).thenAccept(lookupData -> {
@@ -583,7 +583,7 @@ public CompletableFuture<Void> close(
583583

584584
public CompletableFuture<Void> stopReplProducers() {
585585
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
586-
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
586+
replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate()));
587587
return FutureUtil.waitForAll(closeFutures);
588588
}
589589

@@ -663,7 +663,7 @@ CompletableFuture<Void> removeReplicator(String remoteCluster) {
663663

664664
String name = NonPersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
665665

666-
replicators.get(remoteCluster).disconnect().thenRun(() -> {
666+
replicators.get(remoteCluster).terminate().thenRun(() -> {
667667
log.info("[{}] Successfully removed replicator {}", name, remoteCluster);
668668
replicators.remove(remoteCluster);
669669

@@ -1032,7 +1032,7 @@ private CompletableFuture<Void> disconnectReplicators() {
10321032
List<CompletableFuture<Void>> futures = new ArrayList<>();
10331033
ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators = getReplicators();
10341034
replicators.forEach((r, replicator) -> {
1035-
futures.add(replicator.disconnect());
1035+
futures.add(replicator.terminate());
10361036
});
10371037
return FutureUtil.waitForAll(futures);
10381038
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 40 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21+
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Started;
22+
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Starting;
23+
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminated;
24+
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminating;
2125
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
2226
import com.google.common.annotations.VisibleForTesting;
2327
import io.netty.buffer.ByteBuf;
@@ -26,7 +30,6 @@
2630
import java.util.List;
2731
import java.util.Optional;
2832
import java.util.concurrent.CompletableFuture;
29-
import java.util.concurrent.CompletionException;
3033
import java.util.concurrent.ExecutionException;
3134
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -43,10 +46,10 @@
4346
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
4447
import org.apache.bookkeeper.mledger.Position;
4548
import org.apache.bookkeeper.mledger.impl.PositionImpl;
49+
import org.apache.commons.lang3.tuple.Pair;
4650
import org.apache.pulsar.broker.PulsarServerException;
4751
import org.apache.pulsar.broker.service.AbstractReplicator;
4852
import org.apache.pulsar.broker.service.BrokerService;
49-
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
5053
import org.apache.pulsar.broker.service.MessageExpirer;
5154
import org.apache.pulsar.broker.service.Replicator;
5255
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
@@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
134137
}
135138

136139
@Override
137-
protected void readEntries(Producer<byte[]> producer) {
138-
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting
140+
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
141+
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting.
139142
cursor.rewind();
140-
141143
cursor.cancelPendingReadRequest();
142-
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
143-
this.producer = (ProducerImpl) producer;
144144

145-
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
146-
log.info("[{}] Created replicator producer", replicatorId);
145+
/**
146+
* 1. Try change state to {@link Started}.
147+
* 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value
148+
* producer when the state is {@link Started}.
149+
*/
150+
Pair<Boolean, State> changeStateRes;
151+
changeStateRes = compareSetAndGetState(Starting, Started);
152+
if (changeStateRes.getLeft()) {
153+
this.producer = (ProducerImpl) producer;
154+
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
155+
// Trigger a new read.
156+
log.info("[{}] Created replicator producer, Replicator state: {}", replicatorId, state);
147157
backOff.reset();
148-
// activate cursor: so, entries can be cached
158+
// activate cursor: so, entries can be cached.
149159
this.cursor.setActive();
150160
// read entries
151161
readMoreEntries();
152162
} else {
153-
log.info(
154-
"[{}] Replicator was stopped while creating the producer."
155-
+ " Closing it. Replicator state: {}",
156-
replicatorId, STATE_UPDATER.get(this));
157-
STATE_UPDATER.set(this, State.Stopping);
158-
closeProducerAsync();
163+
if (changeStateRes.getRight() == Started) {
164+
// Since only one task can call "producerBuilder.createAsync()", this scenario is not expected.
165+
// So print a warn log.
166+
log.warn("[{}] Replicator was already started by another thread while creating the producer."
167+
+ " Closing the producer newly created. Replicator state: {}", replicatorId, state);
168+
} else if (changeStateRes.getRight() == Terminating || changeStateRes.getRight() == Terminated) {
169+
log.info("[{}] Replicator was terminated, so close the producer. Replicator state: {}",
170+
replicatorId, state);
171+
} else {
172+
log.error("[{}] Replicator state is not expected, so close the producer. Replicator state: {}",
173+
replicatorId, changeStateRes.getRight());
174+
}
175+
// Close the producer if change the state fail.
176+
doCloseProducerAsync(producer, () -> {});
159177
}
160-
161178
}
162179

163180
@Override
@@ -420,8 +437,8 @@ public CompletableFuture<MessageId> getFuture() {
420437

421438
@Override
422439
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
423-
if (STATE_UPDATER.get(this) != State.Started) {
424-
log.info("[{}] Replicator was stopped while reading entries."
440+
if (state != Started) {
441+
log.info("[{}] Replicator was disconnected while reading entries."
425442
+ " Stop reading. Replicator state: {}",
426443
replicatorId, STATE_UPDATER.get(this));
427444
return;
@@ -436,8 +453,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
436453
log.error("[{}] Error reading entries because replicator is"
437454
+ " already deleted and cursor is already closed {}, ({})",
438455
replicatorId, ctx, exception.getMessage(), exception);
439-
// replicator is already deleted and cursor is already closed so, producer should also be stopped
440-
closeProducerAsync();
456+
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
457+
terminate();
441458
return;
442459
} else if (!(exception instanceof TooManyRequestsException)) {
443460
log.error("[{}] Error reading entries at {}. Retrying to read in {}s. ({})",
@@ -555,8 +572,8 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
555572
if (exception instanceof CursorAlreadyClosedException) {
556573
log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
557574
+ " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
558-
// replicator is already deleted and cursor is already closed so, producer should also be stopped
559-
closeProducerAsync();
575+
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
576+
terminate();
560577
return;
561578
}
562579
if (ctx instanceof PositionImpl) {
@@ -675,30 +692,6 @@ protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl<
675692
}
676693
}
677694

678-
@Override
679-
public CompletableFuture<Void> disconnect() {
680-
return disconnect(false);
681-
}
682-
683-
@Override
684-
public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) {
685-
final CompletableFuture<Void> future = new CompletableFuture<>();
686-
687-
super.disconnect(failIfHasBacklog).thenRun(() -> {
688-
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
689-
future.complete(null);
690-
}).exceptionally(ex -> {
691-
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
692-
if (!(t instanceof TopicBusyException)) {
693-
log.error("[{}] Failed to close dispatch rate limiter: {}", replicatorId, ex.getMessage());
694-
}
695-
future.completeExceptionally(t);
696-
return null;
697-
});
698-
699-
return future;
700-
}
701-
702695
@Override
703696
public boolean isConnected() {
704697
ProducerImpl<?> producer = this.producer;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -799,15 +799,15 @@ public CompletableFuture<Void> startReplProducers() {
799799

800800
public CompletableFuture<Void> stopReplProducers() {
801801
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
802-
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
803-
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect()));
802+
replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate()));
803+
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.terminate()));
804804
return FutureUtil.waitForAll(closeFutures);
805805
}
806806

807807
private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
808808
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
809-
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true)));
810-
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true)));
809+
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true, true)));
810+
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true, true)));
811811
return FutureUtil.waitForAll(closeFutures);
812812
}
813813

@@ -1389,8 +1389,8 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
13891389
List<CompletableFuture<Void>> futures = new ArrayList<>();
13901390
subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty())));
13911391
if (closeIfClientsConnected) {
1392-
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
1393-
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
1392+
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
1393+
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
13941394
producers.values().forEach(producer -> futures.add(producer.disconnect()));
13951395
}
13961396
FutureUtil.waitForAll(futures).thenRunAsync(() -> {
@@ -1532,8 +1532,8 @@ public CompletableFuture<Void> close(
15321532
List<CompletableFuture<Void>> futures = new ArrayList<>();
15331533

15341534
futures.add(transactionBuffer.closeAsync());
1535-
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
1536-
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
1535+
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
1536+
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
15371537
if (disconnectClients) {
15381538
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
15391539
brokerService.getPulsar(), topic).thenAccept(lookupData -> {
@@ -1908,7 +1908,7 @@ CompletableFuture<Void> removeReplicator(String remoteCluster) {
19081908

19091909
String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
19101910

1911-
Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect)
1911+
Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::terminate)
19121912
.orElse(CompletableFuture.completedFuture(null)).thenRun(() -> {
19131913
ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
19141914
@Override
@@ -1980,7 +1980,7 @@ CompletableFuture<Void> removeShadowReplicator(String shadowTopic) {
19801980
log.info("[{}] Removing shadow topic replicator to {}", topic, shadowTopic);
19811981
final CompletableFuture<Void> future = new CompletableFuture<>();
19821982
String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
1983-
shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> {
1983+
shadowReplicators.get(shadowTopic).terminate().thenRun(() -> {
19841984

19851985
ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
19861986
@Override
@@ -2849,7 +2849,7 @@ private CompletableFuture<Void> checkAndDisconnectReplicators() {
28492849
ConcurrentOpenHashMap<String, Replicator> replicators = getReplicators();
28502850
replicators.forEach((r, replicator) -> {
28512851
if (replicator.getNumberOfEntriesInBacklog() <= 0) {
2852-
futures.add(replicator.disconnect());
2852+
futures.add(replicator.terminate());
28532853
}
28542854
});
28552855
return FutureUtil.waitForAll(futures);
@@ -2900,6 +2900,15 @@ public void checkGC() {
29002900
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
29012901
maxInactiveDurationInSec);
29022902
}
2903+
/**
2904+
* There is a race condition that may cause a NPE:
2905+
* - task 1: a callback of "replicator.cursor.asyncRead" will trigger a replication.
2906+
* - task 2: "closeReplProducersIfNoBacklog" called by current thread will make the variable
2907+
* "replicator.producer" to a null value.
2908+
* Race condition: task 1 will get a NPE when it tries to send messages using the variable
2909+
* "replicator.producer", because task 2 will set this variable to "null".
2910+
* TODO Create a seperated PR to fix it.
2911+
*/
29032912
closeReplProducersIfNoBacklog().thenRun(() -> {
29042913
if (hasRemoteProducers()) {
29052914
if (log.isDebugEnabled()) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.pulsar.client.api.Schema;
4444
import org.apache.pulsar.client.impl.ConnectionPool;
4545
import org.apache.pulsar.client.impl.PulsarClientImpl;
46+
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
4647
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
4748
import org.awaitility.Awaitility;
4849
import org.awaitility.reflect.WhiteboxImpl;
@@ -94,7 +95,7 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
9495
final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName,
9596
replicatorPrefix, broker, remoteClient);
9697
replicator.startProducer();
97-
replicator.disconnect();
98+
replicator.terminate();
9899

99100
// Verify task will done.
100101
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
@@ -129,7 +130,7 @@ protected String getProducerName() {
129130
}
130131

131132
@Override
132-
protected void readEntries(Producer<byte[]> producer) {
133+
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
133134

134135
}
135136

@@ -139,7 +140,22 @@ protected Position getReplicatorReadPosition() {
139140
}
140141

141142
@Override
142-
protected long getNumberOfEntriesInBacklog() {
143+
public ReplicatorStatsImpl getStats() {
144+
return null;
145+
}
146+
147+
@Override
148+
public void updateRates() {
149+
150+
}
151+
152+
@Override
153+
public boolean isConnected() {
154+
return false;
155+
}
156+
157+
@Override
158+
public long getNumberOfEntriesInBacklog() {
143159
return 0;
144160
}
145161

0 commit comments

Comments
 (0)