diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 19c8d70184f5..adb579b56ef2 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -1421,6 +1421,8 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange handle: &handle, remain: remain, unfinalized: !finalized, + bucket: params.bucket, + object: params.object, } // For a zero-length request, explicitly close the stream and set remaining diff --git a/storage/grpc_reader.go b/storage/grpc_reader.go index b39199144c8f..1e431412c37d 100644 --- a/storage/grpc_reader.go +++ b/storage/grpc_reader.go @@ -222,6 +222,8 @@ func (c *grpcStorageClient) NewRangeReaderReadObject(ctx context.Context, params checkCRC: checkCRC, }, checkCRC: checkCRC, + bucket: params.bucket, + object: params.object, } cr := msg.GetContentRange() diff --git a/storage/grpc_reader_multi_range.go b/storage/grpc_reader_multi_range.go index 11f96afd1f1c..b865fecbdd8e 100644 --- a/storage/grpc_reader_multi_range.go +++ b/storage/grpc_reader_multi_range.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "log" "sync" "cloud.google.com/go/storage/internal/apiv2/storagepb" @@ -979,6 +980,9 @@ func (m *multiRangeDownloaderManager) processDataRanges(result mrdSessionResult, req.completed = true delete(mrdStream.pendingRanges, req.readID) mrdStream.updateCapacity(m, -1, 0) + if req.length >= 0 && req.bytesWritten > req.length { + log.Printf("storage: received %d more bytes than requested from GCS for bucket %q, object %q", req.bytesWritten-req.length, m.params.bucket, m.params.object) + } m.runCallback(req.origOffset, req.bytesWritten, nil, req.callback) } } @@ -1051,6 +1055,9 @@ func (m *multiRangeDownloaderManager) failRange(mrdStream *mrdStream, req *range mrdStream.updateCapacity(m, -1, -(req.length - req.bytesWritten)) } } + if req.length >= 0 && req.bytesWritten > req.length { + log.Printf("storage: received %d more bytes than requested from GCS for bucket %q, object %q", req.bytesWritten-req.length, m.params.bucket, m.params.object) + } m.runCallback(req.origOffset, req.bytesWritten, err, req.callback) } diff --git a/storage/http_client.go b/storage/http_client.go index d117d90e551e..c1254fc8f402 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -1652,6 +1652,8 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen wantCRC: crc, checkCRC: checkCRC, }, + bucket: params.bucket, + object: params.object, }, nil } diff --git a/storage/reader.go b/storage/reader.go index 1ba21a21fa7e..e54f4b392e70 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -19,9 +19,11 @@ import ( "fmt" "hash/crc32" "io" + "log" "net/http" "strings" "sync" + "sync/atomic" "time" ) @@ -339,21 +341,33 @@ var emptyBody = io.NopCloser(strings.NewReader("")) // the stored CRC, returning an error from Read if there is a mismatch. This integrity check // is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding. type Reader struct { + // remain must be the first field in the struct to guarantee 64-bit + // alignment on 32-bit architectures for atomic operations. + remain int64 + Attrs ReaderObjectAttrs objectMetadata *map[string]string - seen, remain, size int64 - checkCRC bool // Did we check the CRC? This is now only used by tests. + seen, size int64 + checkCRC bool // Did we check the CRC? This is now only used by tests. reader io.ReadCloser ctx context.Context mu sync.Mutex handle *ReadHandle unfinalized bool + + bucket string + object string } // Close closes the Reader. It must be called when done reading. func (r *Reader) Close() error { + if !r.unfinalized && !r.Attrs.Decompressed { + if rem := atomic.LoadInt64(&r.remain); rem < 0 { + log.Printf("storage: received %d more bytes than requested from GCS for bucket %q, object %q", -rem, r.bucket, r.object) + } + } err := r.reader.Close() endSpan(r.ctx, err) return err @@ -361,8 +375,8 @@ func (r *Reader) Close() error { func (r *Reader) Read(p []byte) (int, error) { n, err := r.reader.Read(p) - if r.remain != -1 { - r.remain -= int64(n) + if !r.unfinalized && !r.Attrs.Decompressed { + atomic.AddInt64(&r.remain, -int64(n)) } return n, err } @@ -373,8 +387,8 @@ func (r *Reader) WriteTo(w io.Writer) (int64, error) { // This implicitly calls r.reader.WriteTo for gRPC only. JSON and XML don't have an // implementation of WriteTo. n, err := io.Copy(w, r.reader) - if r.remain != -1 { - r.remain -= int64(n) + if !r.unfinalized && !r.Attrs.Decompressed { + atomic.AddInt64(&r.remain, -int64(n)) } return n, err } @@ -392,10 +406,14 @@ func (r *Reader) Size() int64 { // Remain returns the number of bytes left to read, or -1 if unknown. // Unfinalized objects will return -1. func (r *Reader) Remain() int64 { - if r.unfinalized { + if r.unfinalized || r.Attrs.Decompressed { return -1 } - return r.remain + rem := atomic.LoadInt64(&r.remain) + if rem < 0 { + return 0 + } + return rem } // ContentType returns the content type of the object. diff --git a/storage/reader_test.go b/storage/reader_test.go index 923a647a1c79..2aff1b81858c 100644 --- a/storage/reader_test.go +++ b/storage/reader_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "io" + "log" "net/http" "net/http/httptest" "net/url" @@ -597,3 +598,115 @@ func multiReaderTest(ctx context.Context, t *testing.T, test func(*testing.T, *C }) } } + +type mockReadCloserExtraBytes struct { + content []byte + pos int +} + +func (m *mockReadCloserExtraBytes) Read(p []byte) (n int, err error) { + if m.pos >= len(m.content) { + return 0, io.EOF + } + n = copy(p, m.content[m.pos:]) + m.pos += n + return n, nil +} + +func (m *mockReadCloserExtraBytes) Close() error { + return nil +} + +func TestReaderExtraBytesLogging(t *testing.T) { + var logOutput bytes.Buffer + old := log.Writer() + log.SetOutput(&logOutput) + defer log.SetOutput(old) + + m := &mockReadCloserExtraBytes{content: []byte("hello world!")} // 12 bytes + + r := &Reader{ + reader: m, + remain: 5, // We pretend we only requested 5 bytes. + bucket: "my-bucket", + object: "my-object", + } + + buf := make([]byte, 100) + n, err := r.Read(buf) + if err != nil && err != io.EOF { + t.Fatalf("unexpected error: %v", err) + } + + if n != 12 { + t.Fatalf("expected 12 bytes read, got %d", n) + } + + // Logging should not have occurred yet during reads. + if logOutput.Len() > 0 { + t.Errorf("expected no logging during reads, but got: %q", logOutput.String()) + } + + // Remain() should be 0 because we have fully read our requested range (and more). + if rem := r.Remain(); rem != 0 { + t.Errorf("expected Remain() to be 0, got %d", rem) + } + + // Close should trigger the log. + if err := r.Close(); err != nil { + t.Fatalf("unexpected error on Close(): %v", err) + } + + logStr := logOutput.String() + expectedLog := fmt.Sprintf("storage: received 7 more bytes than requested from GCS for bucket %q, object %q", r.bucket, r.object) + if !strings.Contains(logStr, expectedLog) { + t.Errorf("expected log output to contain %q, but got %q", expectedLog, logStr) + } +} + +func TestReaderWriteToExtraBytesLogging(t *testing.T) { + var logOutput bytes.Buffer + old := log.Writer() + log.SetOutput(&logOutput) + defer log.SetOutput(old) + + m := &mockReadCloserExtraBytes{content: []byte("hello world!")} // 12 bytes + + r := &Reader{ + reader: m, + remain: 5, // We pretend we only requested 5 bytes. + bucket: "my-bucket", + object: "my-object", + } + + var dst bytes.Buffer + n, err := r.WriteTo(&dst) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if n != 12 { + t.Fatalf("expected 12 bytes copied, got %d", n) + } + + // Logging should not have occurred yet during writes. + if logOutput.Len() > 0 { + t.Errorf("expected no logging during WriteTo, but got: %q", logOutput.String()) + } + + // Remain() should be 0 because we have fully read our requested range (and more). + if rem := r.Remain(); rem != 0 { + t.Errorf("expected Remain() to be 0, got %d", rem) + } + + // Close should trigger the log. + if err := r.Close(); err != nil { + t.Fatalf("unexpected error on Close(): %v", err) + } + + logStr := logOutput.String() + expectedLog := fmt.Sprintf("storage: received 7 more bytes than requested from GCS for bucket %q, object %q", r.bucket, r.object) + if !strings.Contains(logStr, expectedLog) { + t.Errorf("expected log output to contain %q, but got %q", expectedLog, logStr) + } +}