Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,8 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
handle: &handle,
remain: remain,
unfinalized: !finalized,
bucket: params.bucket,
object: params.object,
}

// For a zero-length request, explicitly close the stream and set remaining
Expand Down
2 changes: 2 additions & 0 deletions storage/grpc_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ func (c *grpcStorageClient) NewRangeReaderReadObject(ctx context.Context, params
checkCRC: checkCRC,
},
checkCRC: checkCRC,
bucket: params.bucket,
object: params.object,
}

cr := msg.GetContentRange()
Expand Down
7 changes: 7 additions & 0 deletions storage/grpc_reader_multi_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"log"
"sync"

"cloud.google.com/go/storage/internal/apiv2/storagepb"
Expand Down Expand Up @@ -979,6 +980,9 @@ func (m *multiRangeDownloaderManager) processDataRanges(result mrdSessionResult,
req.completed = true
delete(mrdStream.pendingRanges, req.readID)
mrdStream.updateCapacity(m, -1, 0)
if req.length >= 0 && req.bytesWritten > req.length {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check scenarios with negative length and negative offset? I know we convert them when the metadata is available but can be useful to doubt check.

log.Printf("storage: received %d more bytes than requested from GCS for bucket %q, object %q", req.bytesWritten-req.length, m.params.bucket, m.params.object)
}
m.runCallback(req.origOffset, req.bytesWritten, nil, req.callback)
}
}
Expand Down Expand Up @@ -1051,6 +1055,9 @@ func (m *multiRangeDownloaderManager) failRange(mrdStream *mrdStream, req *range
mrdStream.updateCapacity(m, -1, -(req.length - req.bytesWritten))
}
}
if req.length >= 0 && req.bytesWritten > req.length {
log.Printf("storage: received %d more bytes than requested from GCS for bucket %q, object %q", req.bytesWritten-req.length, m.params.bucket, m.params.object)
}
m.runCallback(req.origOffset, req.bytesWritten, err, req.callback)
}

Expand Down
2 changes: 2 additions & 0 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,8 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen
wantCRC: crc,
checkCRC: checkCRC,
},
bucket: params.bucket,
object: params.object,
}, nil
}

Expand Down
19 changes: 19 additions & 0 deletions storage/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"hash/crc32"
"io"
"log"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -350,20 +351,31 @@ type Reader struct {
mu sync.Mutex
handle *ReadHandle
unfinalized bool

bucket string
object string
}

// Close closes the Reader. It must be called when done reading.
func (r *Reader) Close() error {
r.mu.Lock()
rem := r.remain
r.mu.Unlock()
if rem < 0 {
log.Printf("storage: received %d more bytes than requested from GCS for bucket %q, object %q", -rem, r.bucket, r.object)
}
err := r.reader.Close()
endSpan(r.ctx, err)
return err
}

func (r *Reader) Read(p []byte) (int, error) {
n, err := r.reader.Read(p)
r.mu.Lock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this locking will introduce regressions. Should perform some regression testing I think.

if r.remain != -1 {
r.remain -= int64(n)
}
Comment on lines 375 to 377
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the explicit check for -1? I don't see us initializing r.remain with -1 anywhere. Can you check if it is still relevant?

r.mu.Unlock()
return n, err
}

Expand All @@ -373,9 +385,11 @@ func (r *Reader) WriteTo(w io.Writer) (int64, error) {
// This implicitly calls r.reader.WriteTo for gRPC only. JSON and XML don't have an
// implementation of WriteTo.
n, err := io.Copy(w, r.reader)
r.mu.Lock()
if r.remain != -1 {
r.remain -= int64(n)
}
r.mu.Unlock()
return n, err
}

Expand All @@ -395,6 +409,11 @@ func (r *Reader) Remain() int64 {
if r.unfinalized {
return -1
}
r.mu.Lock()
defer r.mu.Unlock()
if r.remain < 0 {
return 0
}
return r.remain
}

Expand Down
113 changes: 113 additions & 0 deletions storage/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
Comment thread
cpriti-os marked this conversation as resolved.
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -597,3 +598,115 @@ func multiReaderTest(ctx context.Context, t *testing.T, test func(*testing.T, *C
})
}
}

type mockReadCloserExtraBytes struct {
content []byte
pos int
}

func (m *mockReadCloserExtraBytes) Read(p []byte) (n int, err error) {
if m.pos >= len(m.content) {
return 0, io.EOF
}
n = copy(p, m.content[m.pos:])
m.pos += n
return n, nil
}

func (m *mockReadCloserExtraBytes) Close() error {
return nil
}

func TestReaderExtraBytesLogging(t *testing.T) {
var logOutput bytes.Buffer
old := log.Writer()
log.SetOutput(&logOutput)
defer log.SetOutput(old)

m := &mockReadCloserExtraBytes{content: []byte("hello world!")} // 12 bytes

r := &Reader{
reader: m,
remain: 5, // We pretend we only requested 5 bytes
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add periods everywhere.

bucket: "my-bucket",
object: "my-object",
}

buf := make([]byte, 100)
n, err := r.Read(buf)
if err != nil && err != io.EOF {
t.Fatalf("unexpected error: %v", err)
}

if n != 12 {
t.Fatalf("expected 12 bytes read, got %d", n)
}

// Logging should not have occurred yet during reads
if logOutput.Len() > 0 {
t.Errorf("expected no logging during reads, but got: %q", logOutput.String())
}

// Remain() should be 0 because we have fully read our requested range (and more)
if rem := r.Remain(); rem != 0 {
t.Errorf("expected Remain() to be 0, got %d", rem)
}

// Close should trigger the log
if err := r.Close(); err != nil {
t.Fatalf("unexpected error on Close(): %v", err)
}

logStr := logOutput.String()
expectedLog := `storage: received 7 more bytes than requested from GCS for bucket "my-bucket", object "my-object"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: replace "my-bucket" with r.bucket?

if !strings.Contains(logStr, expectedLog) {
t.Errorf("expected log output to contain %q, but got %q", expectedLog, logStr)
}
}

func TestReaderWriteToExtraBytesLogging(t *testing.T) {
var logOutput bytes.Buffer
old := log.Writer()
log.SetOutput(&logOutput)
defer log.SetOutput(old)

m := &mockReadCloserExtraBytes{content: []byte("hello world!")} // 12 bytes

r := &Reader{
reader: m,
remain: 5, // We pretend we only requested 5 bytes
bucket: "my-bucket",
object: "my-object",
}

var dst bytes.Buffer
n, err := r.WriteTo(&dst)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if n != 12 {
t.Fatalf("expected 12 bytes copied, got %d", n)
}

// Logging should not have occurred yet during writes
if logOutput.Len() > 0 {
t.Errorf("expected no logging during WriteTo, but got: %q", logOutput.String())
}

// Remain() should be 0 because we have fully read our requested range (and more)
if rem := r.Remain(); rem != 0 {
t.Errorf("expected Remain() to be 0, got %d", rem)
}

// Close should trigger the log
if err := r.Close(); err != nil {
t.Fatalf("unexpected error on Close(): %v", err)
}

logStr := logOutput.String()
expectedLog := `storage: received 7 more bytes than requested from GCS for bucket "my-bucket", object "my-object"`
if !strings.Contains(logStr, expectedLog) {
t.Errorf("expected log output to contain %q, but got %q", expectedLog, logStr)
}
}
Loading