Skip to content

Commit 493385f

Browse files
poorbarcodenikhil-ctds
authored andcommitted
[fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner (apache#21946)
(cherry picked from commit 4924052) (cherry picked from commit 670aff0)
1 parent f89facf commit 493385f

12 files changed

Lines changed: 655 additions & 169 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java

Lines changed: 250 additions & 82 deletions
Large diffs are not rendered by default.

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,7 +774,7 @@ public CompletableFuture<Void> closeAndRemoveReplicationClient(String clusterNam
774774
if (ot.isPresent()) {
775775
Replicator r = ot.get().getReplicators().get(clusterName);
776776
if (r != null && r.isConnected()) {
777-
r.disconnect(false).whenComplete((v, e) -> f.complete(null));
777+
r.terminate().whenComplete((v, e) -> f.complete(null));
778778
return;
779779
}
780780
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ public interface Replicator {
2929

3030
ReplicatorStatsImpl getStats();
3131

32-
CompletableFuture<Void> disconnect();
32+
CompletableFuture<Void> terminate();
3333

34-
CompletableFuture<Void> disconnect(boolean b);
34+
CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer);
3535

3636
void updateRates();
3737

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ protected String getProducerName() {
6767
}
6868

6969
@Override
70-
protected void readEntries(Producer<byte[]> producer) {
70+
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
7171
this.producer = (ProducerImpl) producer;
7272

7373
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
@@ -78,8 +78,7 @@ protected void readEntries(Producer<byte[]> producer) {
7878
"[{}] Replicator was stopped while creating the producer."
7979
+ " Closing it. Replicator state: {}",
8080
replicatorId, STATE_UPDATER.get(this));
81-
STATE_UPDATER.set(this, State.Stopping);
82-
closeProducerAsync();
81+
doCloseProducerAsync(producer, () -> {});
8382
return;
8483
}
8584
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
415415
CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
416416
if (closeIfClientsConnected) {
417417
List<CompletableFuture<Void>> futures = new ArrayList<>();
418-
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
418+
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
419419
producers.values().forEach(producer -> futures.add(producer.disconnect()));
420420
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
421421
FutureUtil.waitForAll(futures).thenRun(() -> {
@@ -508,7 +508,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
508508

509509
List<CompletableFuture<Void>> futures = new ArrayList<>();
510510

511-
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
511+
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
512512
producers.values().forEach(producer -> futures.add(producer.disconnect()));
513513
if (topicPublishRateLimiter != null) {
514514
topicPublishRateLimiter.close();
@@ -553,7 +553,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
553553

554554
public CompletableFuture<Void> stopReplProducers() {
555555
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
556-
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
556+
replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate()));
557557
return FutureUtil.waitForAll(closeFutures);
558558
}
559559

@@ -633,7 +633,7 @@ CompletableFuture<Void> removeReplicator(String remoteCluster) {
633633

634634
String name = NonPersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
635635

636-
replicators.get(remoteCluster).disconnect().thenRun(() -> {
636+
replicators.get(remoteCluster).terminate().thenRun(() -> {
637637
log.info("[{}] Successfully removed replicator {}", name, remoteCluster);
638638
replicators.remove(remoteCluster);
639639

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 40 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
*/
1919
package org.apache.pulsar.broker.service.persistent;
2020

21+
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Started;
22+
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Starting;
23+
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminated;
24+
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminating;
2125
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
2226
import com.google.common.annotations.VisibleForTesting;
2327
import io.netty.buffer.ByteBuf;
@@ -26,7 +30,6 @@
2630
import java.util.List;
2731
import java.util.Optional;
2832
import java.util.concurrent.CompletableFuture;
29-
import java.util.concurrent.CompletionException;
3033
import java.util.concurrent.ExecutionException;
3134
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -43,10 +46,10 @@
4346
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
4447
import org.apache.bookkeeper.mledger.Position;
4548
import org.apache.bookkeeper.mledger.impl.PositionImpl;
49+
import org.apache.commons.lang3.tuple.Pair;
4650
import org.apache.pulsar.broker.PulsarServerException;
4751
import org.apache.pulsar.broker.service.AbstractReplicator;
4852
import org.apache.pulsar.broker.service.BrokerService;
49-
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
5053
import org.apache.pulsar.broker.service.MessageExpirer;
5154
import org.apache.pulsar.broker.service.Replicator;
5255
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
@@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
134137
}
135138

136139
@Override
137-
protected void readEntries(Producer<byte[]> producer) {
138-
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting
140+
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
141+
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting.
139142
cursor.rewind();
140-
141143
cursor.cancelPendingReadRequest();
142-
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
143-
this.producer = (ProducerImpl) producer;
144144

145-
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
146-
log.info("[{}] Created replicator producer", replicatorId);
145+
/**
146+
* 1. Try change state to {@link Started}.
147+
* 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value
148+
* producer when the state is {@link Started}.
149+
*/
150+
Pair<Boolean, State> changeStateRes;
151+
changeStateRes = compareSetAndGetState(Starting, Started);
152+
if (changeStateRes.getLeft()) {
153+
this.producer = (ProducerImpl) producer;
154+
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
155+
// Trigger a new read.
156+
log.info("[{}] Created replicator producer, Replicator state: {}", replicatorId, state);
147157
backOff.reset();
148-
// activate cursor: so, entries can be cached
158+
// activate cursor: so, entries can be cached.
149159
this.cursor.setActive();
150160
// read entries
151161
readMoreEntries();
152162
} else {
153-
log.info(
154-
"[{}] Replicator was stopped while creating the producer."
155-
+ " Closing it. Replicator state: {}",
156-
replicatorId, STATE_UPDATER.get(this));
157-
STATE_UPDATER.set(this, State.Stopping);
158-
closeProducerAsync();
163+
if (changeStateRes.getRight() == Started) {
164+
// Since only one task can call "producerBuilder.createAsync()", this scenario is not expected.
165+
// So print a warn log.
166+
log.warn("[{}] Replicator was already started by another thread while creating the producer."
167+
+ " Closing the producer newly created. Replicator state: {}", replicatorId, state);
168+
} else if (changeStateRes.getRight() == Terminating || changeStateRes.getRight() == Terminated) {
169+
log.info("[{}] Replicator was terminated, so close the producer. Replicator state: {}",
170+
replicatorId, state);
171+
} else {
172+
log.error("[{}] Replicator state is not expected, so close the producer. Replicator state: {}",
173+
replicatorId, changeStateRes.getRight());
174+
}
175+
// Close the producer if change the state fail.
176+
doCloseProducerAsync(producer, () -> {});
159177
}
160-
161178
}
162179

163180
@Override
@@ -420,8 +437,8 @@ public CompletableFuture<MessageId> getFuture() {
420437

421438
@Override
422439
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
423-
if (STATE_UPDATER.get(this) != State.Started) {
424-
log.info("[{}] Replicator was stopped while reading entries."
440+
if (state != Started) {
441+
log.info("[{}] Replicator was disconnected while reading entries."
425442
+ " Stop reading. Replicator state: {}",
426443
replicatorId, STATE_UPDATER.get(this));
427444
return;
@@ -436,8 +453,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
436453
log.error("[{}] Error reading entries because replicator is"
437454
+ " already deleted and cursor is already closed {}, ({})",
438455
replicatorId, ctx, exception.getMessage(), exception);
439-
// replicator is already deleted and cursor is already closed so, producer should also be stopped
440-
closeProducerAsync();
456+
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
457+
terminate();
441458
return;
442459
} else if (!(exception instanceof TooManyRequestsException)) {
443460
log.error("[{}] Error reading entries at {}. Retrying to read in {}s. ({})",
@@ -555,8 +572,8 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
555572
if (exception instanceof CursorAlreadyClosedException) {
556573
log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
557574
+ " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
558-
// replicator is already deleted and cursor is already closed so, producer should also be stopped
559-
closeProducerAsync();
575+
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
576+
terminate();
560577
return;
561578
}
562579
if (ctx instanceof PositionImpl) {
@@ -675,30 +692,6 @@ protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl<
675692
}
676693
}
677694

678-
@Override
679-
public CompletableFuture<Void> disconnect() {
680-
return disconnect(false);
681-
}
682-
683-
@Override
684-
public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) {
685-
final CompletableFuture<Void> future = new CompletableFuture<>();
686-
687-
super.disconnect(failIfHasBacklog).thenRun(() -> {
688-
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
689-
future.complete(null);
690-
}).exceptionally(ex -> {
691-
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
692-
if (!(t instanceof TopicBusyException)) {
693-
log.error("[{}] Failed to close dispatch rate limiter: {}", replicatorId, ex.getMessage());
694-
}
695-
future.completeExceptionally(t);
696-
return null;
697-
});
698-
699-
return future;
700-
}
701-
702695
@Override
703696
public boolean isConnected() {
704697
ProducerImpl<?> producer = this.producer;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -775,15 +775,15 @@ public CompletableFuture<Void> startReplProducers() {
775775

776776
public CompletableFuture<Void> stopReplProducers() {
777777
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
778-
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
779-
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect()));
778+
replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate()));
779+
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.terminate()));
780780
return FutureUtil.waitForAll(closeFutures);
781781
}
782782

783783
private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
784784
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
785-
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true)));
786-
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true)));
785+
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true, true)));
786+
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true, true)));
787787
return FutureUtil.waitForAll(closeFutures);
788788
}
789789

@@ -1365,8 +1365,8 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
13651365
List<CompletableFuture<Void>> futures = new ArrayList<>();
13661366
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
13671367
if (closeIfClientsConnected) {
1368-
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
1369-
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
1368+
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
1369+
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
13701370
producers.values().forEach(producer -> futures.add(producer.disconnect()));
13711371
}
13721372
FutureUtil.waitForAll(futures).thenRunAsync(() -> {
@@ -1498,8 +1498,8 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
14981498
List<CompletableFuture<Void>> futures = new ArrayList<>();
14991499

15001500
futures.add(transactionBuffer.closeAsync());
1501-
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
1502-
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
1501+
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
1502+
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
15031503
producers.values().forEach(producer -> futures.add(producer.disconnect()));
15041504
if (topicPublishRateLimiter != null) {
15051505
topicPublishRateLimiter.close();
@@ -1856,7 +1856,7 @@ CompletableFuture<Void> removeReplicator(String remoteCluster) {
18561856

18571857
String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
18581858

1859-
Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect)
1859+
Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::terminate)
18601860
.orElse(CompletableFuture.completedFuture(null)).thenRun(() -> {
18611861
ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
18621862
@Override
@@ -1928,7 +1928,7 @@ CompletableFuture<Void> removeShadowReplicator(String shadowTopic) {
19281928
log.info("[{}] Removing shadow topic replicator to {}", topic, shadowTopic);
19291929
final CompletableFuture<Void> future = new CompletableFuture<>();
19301930
String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
1931-
shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> {
1931+
shadowReplicators.get(shadowTopic).terminate().thenRun(() -> {
19321932

19331933
ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
19341934
@Override
@@ -2671,6 +2671,15 @@ public void checkGC() {
26712671
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
26722672
maxInactiveDurationInSec);
26732673
}
2674+
/**
2675+
* There is a race condition that may cause a NPE:
2676+
* - task 1: a callback of "replicator.cursor.asyncRead" will trigger a replication.
2677+
* - task 2: "closeReplProducersIfNoBacklog" called by current thread will make the variable
2678+
* "replicator.producer" to a null value.
2679+
* Race condition: task 1 will get a NPE when it tries to send messages using the variable
2680+
* "replicator.producer", because task 2 will set this variable to "null".
2681+
* TODO Create a seperated PR to fix it.
2682+
*/
26742683
closeReplProducersIfNoBacklog().thenRun(() -> {
26752684
if (hasRemoteProducers()) {
26762685
if (log.isDebugEnabled()) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.pulsar.client.api.Schema;
4343
import org.apache.pulsar.client.impl.ConnectionPool;
4444
import org.apache.pulsar.client.impl.PulsarClientImpl;
45+
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
4546
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
4647
import org.awaitility.Awaitility;
4748
import org.awaitility.reflect.WhiteboxImpl;
@@ -92,7 +93,7 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
9293
final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName,
9394
replicatorPrefix, broker, remoteClient);
9495
replicator.startProducer();
95-
replicator.disconnect();
96+
replicator.terminate();
9697

9798
// Verify task will done.
9899
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
@@ -127,7 +128,7 @@ protected String getProducerName() {
127128
}
128129

129130
@Override
130-
protected void readEntries(Producer<byte[]> producer) {
131+
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
131132

132133
}
133134

@@ -137,7 +138,22 @@ protected Position getReplicatorReadPosition() {
137138
}
138139

139140
@Override
140-
protected long getNumberOfEntriesInBacklog() {
141+
public ReplicatorStatsImpl getStats() {
142+
return null;
143+
}
144+
145+
@Override
146+
public void updateRates() {
147+
148+
}
149+
150+
@Override
151+
public boolean isConnected() {
152+
return false;
153+
}
154+
155+
@Override
156+
public long getNumberOfEntriesInBacklog() {
141157
return 0;
142158
}
143159

0 commit comments

Comments
 (0)