|
33 | 33 | import com.google.cloud.storage.Hasher.UncheckedChecksumMismatchException; |
34 | 34 | import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef; |
35 | 35 | import com.google.cloud.storage.Retrying.Retrier; |
| 36 | +import java.util.OptionalLong; |
36 | 37 | import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel; |
37 | 38 | import com.google.common.base.Suppliers; |
38 | 39 | import com.google.protobuf.ByteString; |
@@ -91,7 +92,9 @@ final class GapicUnbufferedReadableByteChannel |
91 | 92 | this.result = result; |
92 | 93 | this.read = read; |
93 | 94 | this.req = req; |
94 | | - this.hasher = hasher; |
| 95 | + this.hasher = (req.getReadOffset() == 0) |
| 96 | + ? new CumulativeHasher(hasher, 0, req.getReadLimit() <= 0 ? OptionalLong.empty() : OptionalLong.of(req.getReadLimit())) |
| 97 | + : hasher; |
95 | 98 | this.fetchOffset = new AtomicLong(req.getReadOffset()); |
96 | 99 | this.blobOffset = req.getReadOffset(); |
97 | 100 | this.retrier = retrier; |
@@ -174,6 +177,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { |
174 | 177 | } |
175 | 178 | if (take == EOF_MARKER) { |
176 | 179 | complete = true; |
| 180 | + validateCumulativeChecksum(); |
177 | 181 | break; |
178 | 182 | } |
179 | 183 |
|
@@ -311,6 +315,18 @@ private IOException createError(String message) throws IOException { |
311 | 315 | return new IOException(message, cause); |
312 | 316 | } |
313 | 317 |
|
| 318 | + private void validateCumulativeChecksum() throws IOException { |
| 319 | + if (hasher instanceof CumulativeHasher) { |
| 320 | + CumulativeHasher cumulativeHasher = (CumulativeHasher) hasher; |
| 321 | + try { |
| 322 | + cumulativeHasher.validateCumulativeChecksum(metadata); |
| 323 | + } catch (UncheckedCumulativeChecksumMismatchException exception) { |
| 324 | + throw new IOException(StorageException.coalesce(exception)); |
| 325 | + } |
| 326 | + } |
| 327 | + } |
| 328 | + |
| 329 | + |
314 | 330 | private final class ReadObjectObserver extends StateCheckingResponseObserver<ReadObjectResponse> { |
315 | 331 |
|
316 | 332 | private final SettableApiFuture<Void> open = SettableApiFuture.create(); |
|
0 commit comments