Skip to content

Commit e25c7f0

Browse files
authored
[fix][offload] Fix Offload readHandle cannot close multi times. (#22162)
1 parent 6ec473e commit e25c7f0

3 files changed

Lines changed: 59 additions & 23 deletions

File tree

tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.ScheduledExecutorService;
2929
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicReference;
3031
import org.apache.bookkeeper.client.BKException;
3132
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
3233
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -36,6 +37,7 @@
3637
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
3738
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
3839
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
40+
import org.apache.bookkeeper.mledger.ManagedLedgerException;
3941
import org.apache.hadoop.io.BytesWritable;
4042
import org.apache.hadoop.io.LongWritable;
4143
import org.apache.hadoop.io.MapFile;
@@ -53,6 +55,12 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle {
5355
private final LedgerOffloaderStats offloaderStats;
5456
private final String managedLedgerName;
5557
private final String topicName;
58+
enum State {
59+
Opened,
60+
Closed
61+
}
62+
private volatile State state;
63+
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
5664

5765
private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId,
5866
LedgerOffloaderStats offloaderStats,
@@ -72,6 +80,7 @@ private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader r
7280
offloaderStats.recordReadOffloadIndexLatency(topicName,
7381
System.nanoTime() - startReadIndexTime, TimeUnit.NANOSECONDS);
7482
this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes());
83+
state = State.Opened;
7584
} catch (IOException e) {
7685
log.error("Fail to read LedgerMetadata for ledgerId {}",
7786
ledgerId);
@@ -92,15 +101,20 @@ public LedgerMetadata getLedgerMetadata() {
92101

93102
@Override
94103
public CompletableFuture<Void> closeAsync() {
95-
CompletableFuture<Void> promise = new CompletableFuture<>();
104+
if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
105+
return closeFuture.get();
106+
}
107+
108+
CompletableFuture<Void> promise = closeFuture.get();
96109
executor.execute(() -> {
97-
try {
98-
reader.close();
99-
promise.complete(null);
100-
} catch (IOException t) {
101-
promise.completeExceptionally(t);
102-
}
103-
});
110+
try {
111+
reader.close();
112+
state = State.Closed;
113+
promise.complete(null);
114+
} catch (IOException t) {
115+
promise.completeExceptionally(t);
116+
}
117+
});
104118
return promise;
105119
}
106120

@@ -111,6 +125,12 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
111125
}
112126
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
113127
executor.execute(() -> {
128+
if (state == State.Closed) {
129+
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
130+
ledgerId, firstEntry, lastEntry);
131+
promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
132+
return;
133+
}
114134
if (firstEntry > lastEntry
115135
|| firstEntry < 0
116136
|| lastEntry > getLastAddConfirmed()) {

tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.bookkeeper.mledger.offload.jcloud.impl;
2020

21+
import com.google.common.annotations.VisibleForTesting;
2122
import com.google.common.cache.Cache;
2223
import com.google.common.cache.CacheBuilder;
2324
import io.netty.buffer.ByteBuf;
@@ -30,6 +31,7 @@
3031
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.ScheduledExecutorService;
3233
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.atomic.AtomicReference;
3335
import org.apache.bookkeeper.client.BKException;
3436
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
3537
import org.apache.bookkeeper.client.api.LedgerEntries;
@@ -66,13 +68,14 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
6668
.newBuilder()
6769
.expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
6870
.build();
71+
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
6972

7073
enum State {
7174
Opened,
7275
Closed
7376
}
7477

75-
private State state = null;
78+
private volatile State state = null;
7679

7780
private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
7881
BackedInputStream inputStream, ExecutorService executor) {
@@ -96,18 +99,22 @@ public LedgerMetadata getLedgerMetadata() {
9699

97100
@Override
98101
public CompletableFuture<Void> closeAsync() {
99-
CompletableFuture<Void> promise = new CompletableFuture<>();
102+
if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
103+
return closeFuture.get();
104+
}
105+
106+
CompletableFuture<Void> promise = closeFuture.get();
100107
executor.execute(() -> {
101-
try {
102-
index.close();
103-
inputStream.close();
104-
entryOffsets.invalidateAll();
105-
state = State.Closed;
106-
promise.complete(null);
107-
} catch (IOException t) {
108-
promise.completeExceptionally(t);
109-
}
110-
});
108+
try {
109+
index.close();
110+
inputStream.close();
111+
entryOffsets.invalidateAll();
112+
state = State.Closed;
113+
promise.complete(null);
114+
} catch (IOException t) {
115+
promise.completeExceptionally(t);
116+
}
117+
});
111118
return promise;
112119
}
113120

@@ -298,6 +305,7 @@ public static ReadHandle open(ScheduledExecutorService executor,
298305
}
299306

300307
// for testing
308+
@VisibleForTesting
301309
State getState() {
302310
return this.state;
303311
}

tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.ExecutorService;
3131
import java.util.concurrent.ScheduledExecutorService;
3232
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicReference;
3334
import lombok.val;
3435
import org.apache.bookkeeper.client.BKException;
3536
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
@@ -60,7 +61,8 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
6061
private final List<BackedInputStream> inputStreams;
6162
private final List<DataInputStream> dataStreams;
6263
private final ExecutorService executor;
63-
private State state = null;
64+
private volatile State state = null;
65+
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
6466

6567
enum State {
6668
Opened,
@@ -123,7 +125,11 @@ public LedgerMetadata getLedgerMetadata() {
123125

124126
@Override
125127
public CompletableFuture<Void> closeAsync() {
126-
CompletableFuture<Void> promise = new CompletableFuture<>();
128+
if (closeFuture.get() != null || !closeFuture.compareAndSet(null, new CompletableFuture<>())) {
129+
return closeFuture.get();
130+
}
131+
132+
CompletableFuture<Void> promise = closeFuture.get();
127133
executor.execute(() -> {
128134
try {
129135
for (OffloadIndexBlockV2 indexBlock : indices) {
@@ -143,7 +149,9 @@ public CompletableFuture<Void> closeAsync() {
143149

144150
@Override
145151
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
146-
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
152+
if (log.isDebugEnabled()) {
153+
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
154+
}
147155
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
148156
executor.execute(() -> {
149157
if (state == State.Closed) {

0 commit comments

Comments
 (0)