From 27ce6cdd5592546407ba571c89d6bc7bf26528fa Mon Sep 17 00:00:00 2001 From: cpriti-os Date: Mon, 18 May 2026 15:58:42 +0000 Subject: [PATCH 1/4] chore(storage): log additional bytes on read close --- storage/grpc_reader_multi_range.go | 7 ++ storage/reader.go | 7 ++ storage/reader_test.go | 108 +++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+) diff --git a/storage/grpc_reader_multi_range.go b/storage/grpc_reader_multi_range.go index 11f96afd1f1c..bb524ce86d3d 100644 --- a/storage/grpc_reader_multi_range.go +++ b/storage/grpc_reader_multi_range.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "io" + "log" "sync" "cloud.google.com/go/storage/internal/apiv2/storagepb" @@ -979,6 +980,9 @@ func (m *multiRangeDownloaderManager) processDataRanges(result mrdSessionResult, req.completed = true delete(mrdStream.pendingRanges, req.readID) mrdStream.updateCapacity(m, -1, 0) + if req.length >= 0 && req.bytesWritten > req.length { + log.Printf("storage: received %d more bytes than requested from GCS", req.bytesWritten-req.length) + } m.runCallback(req.origOffset, req.bytesWritten, nil, req.callback) } } @@ -1051,6 +1055,9 @@ func (m *multiRangeDownloaderManager) failRange(mrdStream *mrdStream, req *range mrdStream.updateCapacity(m, -1, -(req.length - req.bytesWritten)) } } + if req.length >= 0 && req.bytesWritten > req.length { + log.Printf("storage: received %d more bytes than requested from GCS", req.bytesWritten-req.length) + } m.runCallback(req.origOffset, req.bytesWritten, err, req.callback) } diff --git a/storage/reader.go b/storage/reader.go index 1ba21a21fa7e..88c1ebe85965 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -19,6 +19,7 @@ import ( "fmt" "hash/crc32" "io" + "log" "net/http" "strings" "sync" @@ -354,6 +355,9 @@ type Reader struct { // Close closes the Reader. It must be called when done reading. func (r *Reader) Close() error { + if r.remain < 0 { + log.Printf("storage: received %d more bytes than requested from GCS", -r.remain) + } err := r.reader.Close() endSpan(r.ctx, err) return err @@ -395,6 +399,9 @@ func (r *Reader) Remain() int64 { if r.unfinalized { return -1 } + if r.remain < 0 { + return 0 + } return r.remain } diff --git a/storage/reader_test.go b/storage/reader_test.go index 923a647a1c79..3ef890f2d2de 100644 --- a/storage/reader_test.go +++ b/storage/reader_test.go @@ -21,9 +21,11 @@ import ( "errors" "fmt" "io" + "log" "net/http" "net/http/httptest" "net/url" + "os" "strconv" "strings" "testing" @@ -597,3 +599,109 @@ func multiReaderTest(ctx context.Context, t *testing.T, test func(*testing.T, *C }) } } + +type mockReadCloserExtraBytes struct { + content []byte + pos int +} + +func (m *mockReadCloserExtraBytes) Read(p []byte) (n int, err error) { + if m.pos >= len(m.content) { + return 0, io.EOF + } + n = copy(p, m.content[m.pos:]) + m.pos += n + return n, nil +} + +func (m *mockReadCloserExtraBytes) Close() error { + return nil +} + +func TestReaderExtraBytesLogging(t *testing.T) { + var logOutput bytes.Buffer + log.SetOutput(&logOutput) + defer log.SetOutput(os.Stderr) + + m := &mockReadCloserExtraBytes{content: []byte("hello world!")} // 12 bytes + + r := &Reader{ + reader: m, + remain: 5, // We pretend we only requested 5 bytes + } + + buf := make([]byte, 100) + n, err := r.Read(buf) + if err != nil && err != io.EOF { + t.Fatalf("unexpected error: %v", err) + } + + if n != 12 { + t.Fatalf("expected 12 bytes read, got %d", n) + } + + // Logging should not have occurred yet during reads + if logOutput.Len() > 0 { + t.Errorf("expected no logging during reads, but got: %q", logOutput.String()) + } + + // Remain() should be 0 because we have fully read our requested range (and more) + if rem := r.Remain(); rem != 0 { + t.Errorf("expected Remain() to be 0, got %d", rem) + } + + // Close should trigger the log + if err := r.Close(); err != nil { + t.Fatalf("unexpected error on Close(): %v", err) + } + + logStr := logOutput.String() + expectedLog := "storage: received 7 more bytes than requested from GCS" + if !strings.Contains(logStr, expectedLog) { + t.Errorf("expected log output to contain %q, but got %q", expectedLog, logStr) + } +} + +func TestReaderWriteToExtraBytesLogging(t *testing.T) { + var logOutput bytes.Buffer + log.SetOutput(&logOutput) + defer log.SetOutput(os.Stderr) + + m := &mockReadCloserExtraBytes{content: []byte("hello world!")} // 12 bytes + + r := &Reader{ + reader: m, + remain: 5, // We pretend we only requested 5 bytes + } + + var dst bytes.Buffer + n, err := r.WriteTo(&dst) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if n != 12 { + t.Fatalf("expected 12 bytes copied, got %d", n) + } + + // Logging should not have occurred yet during writes + if logOutput.Len() > 0 { + t.Errorf("expected no logging during WriteTo, but got: %q", logOutput.String()) + } + + // Remain() should be 0 because we have fully read our requested range (and more) + if rem := r.Remain(); rem != 0 { + t.Errorf("expected Remain() to be 0, got %d", rem) + } + + // Close should trigger the log + if err := r.Close(); err != nil { + t.Fatalf("unexpected error on Close(): %v", err) + } + + logStr := logOutput.String() + expectedLog := "storage: received 7 more bytes than requested from GCS" + if !strings.Contains(logStr, expectedLog) { + t.Errorf("expected log output to contain %q, but got %q", expectedLog, logStr) + } +} From 3d9c85c1a5b473291dab7afbdb41303f2417a26c Mon Sep 17 00:00:00 2001 From: cpriti-os Date: Mon, 18 May 2026 16:10:06 +0000 Subject: [PATCH 2/4] chore(storage): include bucket and object context in extra bytes warning logs --- storage/grpc_client.go | 2 ++ storage/grpc_reader.go | 2 ++ storage/grpc_reader_multi_range.go | 4 ++-- storage/http_client.go | 2 ++ storage/reader.go | 5 ++++- storage/reader_test.go | 8 ++++++-- 6 files changed, 18 insertions(+), 5 deletions(-) diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 19c8d70184f5..adb579b56ef2 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -1421,6 +1421,8 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange handle: &handle, remain: remain, unfinalized: !finalized, + bucket: params.bucket, + object: params.object, } // For a zero-length request, explicitly close the stream and set remaining diff --git a/storage/grpc_reader.go b/storage/grpc_reader.go index b39199144c8f..1e431412c37d 100644 --- a/storage/grpc_reader.go +++ b/storage/grpc_reader.go @@ -222,6 +222,8 @@ func (c *grpcStorageClient) NewRangeReaderReadObject(ctx context.Context, params checkCRC: checkCRC, }, checkCRC: checkCRC, + bucket: params.bucket, + object: params.object, } cr := msg.GetContentRange() diff --git a/storage/grpc_reader_multi_range.go b/storage/grpc_reader_multi_range.go index bb524ce86d3d..b865fecbdd8e 100644 --- a/storage/grpc_reader_multi_range.go +++ b/storage/grpc_reader_multi_range.go @@ -981,7 +981,7 @@ func (m *multiRangeDownloaderManager) processDataRanges(result mrdSessionResult, delete(mrdStream.pendingRanges, req.readID) mrdStream.updateCapacity(m, -1, 0) if req.length >= 0 && req.bytesWritten > req.length { - log.Printf("storage: received %d more bytes than requested from GCS", req.bytesWritten-req.length) + log.Printf("storage: received %d more bytes than requested from GCS for bucket %q, object %q", req.bytesWritten-req.length, m.params.bucket, m.params.object) } m.runCallback(req.origOffset, req.bytesWritten, nil, req.callback) } @@ -1056,7 +1056,7 @@ func (m *multiRangeDownloaderManager) failRange(mrdStream *mrdStream, req *range } } if req.length >= 0 && req.bytesWritten > req.length { - log.Printf("storage: received %d more bytes than requested from GCS", req.bytesWritten-req.length) + log.Printf("storage: received %d more bytes than requested from GCS for bucket %q, object %q", req.bytesWritten-req.length, m.params.bucket, m.params.object) } m.runCallback(req.origOffset, req.bytesWritten, err, req.callback) } diff --git a/storage/http_client.go b/storage/http_client.go index d117d90e551e..c1254fc8f402 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -1652,6 +1652,8 @@ func parseReadResponse(res *http.Response, params *newRangeReaderParams, reopen wantCRC: crc, checkCRC: checkCRC, }, + bucket: params.bucket, + object: params.object, }, nil } diff --git a/storage/reader.go b/storage/reader.go index 88c1ebe85965..d653bc6f785e 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -351,12 +351,15 @@ type Reader struct { mu sync.Mutex handle *ReadHandle unfinalized bool + + bucket string + object string } // Close closes the Reader. It must be called when done reading. func (r *Reader) Close() error { if r.remain < 0 { - log.Printf("storage: received %d more bytes than requested from GCS", -r.remain) + log.Printf("storage: received %d more bytes than requested from GCS for bucket %q, object %q", -r.remain, r.bucket, r.object) } err := r.reader.Close() endSpan(r.ctx, err) diff --git a/storage/reader_test.go b/storage/reader_test.go index 3ef890f2d2de..80ea48cef368 100644 --- a/storage/reader_test.go +++ b/storage/reader_test.go @@ -628,6 +628,8 @@ func TestReaderExtraBytesLogging(t *testing.T) { r := &Reader{ reader: m, remain: 5, // We pretend we only requested 5 bytes + bucket: "my-bucket", + object: "my-object", } buf := make([]byte, 100) @@ -656,7 +658,7 @@ func TestReaderExtraBytesLogging(t *testing.T) { } logStr := logOutput.String() - expectedLog := "storage: received 7 more bytes than requested from GCS" + expectedLog := `storage: received 7 more bytes than requested from GCS for bucket "my-bucket", object "my-object"` if !strings.Contains(logStr, expectedLog) { t.Errorf("expected log output to contain %q, but got %q", expectedLog, logStr) } @@ -672,6 +674,8 @@ func TestReaderWriteToExtraBytesLogging(t *testing.T) { r := &Reader{ reader: m, remain: 5, // We pretend we only requested 5 bytes + bucket: "my-bucket", + object: "my-object", } var dst bytes.Buffer @@ -700,7 +704,7 @@ func TestReaderWriteToExtraBytesLogging(t *testing.T) { } logStr := logOutput.String() - expectedLog := "storage: received 7 more bytes than requested from GCS" + expectedLog := `storage: received 7 more bytes than requested from GCS for bucket "my-bucket", object "my-object"` if !strings.Contains(logStr, expectedLog) { t.Errorf("expected log output to contain %q, but got %q", expectedLog, logStr) } From 88db4261bd3bf1f99f8c8437c46d826911f71998 Mon Sep 17 00:00:00 2001 From: cpriti-os Date: Mon, 18 May 2026 16:17:20 +0000 Subject: [PATCH 3/4] test(storage): capture and restore log writer safely using log.Writer --- storage/reader_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/storage/reader_test.go b/storage/reader_test.go index 80ea48cef368..34ea10e6782a 100644 --- a/storage/reader_test.go +++ b/storage/reader_test.go @@ -25,7 +25,6 @@ import ( "net/http" "net/http/httptest" "net/url" - "os" "strconv" "strings" "testing" @@ -620,8 +619,9 @@ func (m *mockReadCloserExtraBytes) Close() error { func TestReaderExtraBytesLogging(t *testing.T) { var logOutput bytes.Buffer + old := log.Writer() log.SetOutput(&logOutput) - defer log.SetOutput(os.Stderr) + defer log.SetOutput(old) m := &mockReadCloserExtraBytes{content: []byte("hello world!")} // 12 bytes @@ -666,8 +666,9 @@ func TestReaderExtraBytesLogging(t *testing.T) { func TestReaderWriteToExtraBytesLogging(t *testing.T) { var logOutput bytes.Buffer + old := log.Writer() log.SetOutput(&logOutput) - defer log.SetOutput(os.Stderr) + defer log.SetOutput(old) m := &mockReadCloserExtraBytes{content: []byte("hello world!")} // 12 bytes From 8c90a5e7a20e4a47ed88ed047d61df0aa2b281f0 Mon Sep 17 00:00:00 2001 From: cpriti-os Date: Mon, 18 May 2026 16:32:49 +0000 Subject: [PATCH 4/4] chore(storage): synchronize access to remain field in Reader methods to avoid data races --- storage/reader.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/storage/reader.go b/storage/reader.go index d653bc6f785e..b4412876bef7 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -358,8 +358,11 @@ type Reader struct { // Close closes the Reader. It must be called when done reading. func (r *Reader) Close() error { - if r.remain < 0 { - log.Printf("storage: received %d more bytes than requested from GCS for bucket %q, object %q", -r.remain, r.bucket, r.object) + r.mu.Lock() + rem := r.remain + r.mu.Unlock() + if rem < 0 { + log.Printf("storage: received %d more bytes than requested from GCS for bucket %q, object %q", -rem, r.bucket, r.object) } err := r.reader.Close() endSpan(r.ctx, err) @@ -368,9 +371,11 @@ func (r *Reader) Close() error { func (r *Reader) Read(p []byte) (int, error) { n, err := r.reader.Read(p) + r.mu.Lock() if r.remain != -1 { r.remain -= int64(n) } + r.mu.Unlock() return n, err } @@ -380,9 +385,11 @@ func (r *Reader) WriteTo(w io.Writer) (int64, error) { // This implicitly calls r.reader.WriteTo for gRPC only. JSON and XML don't have an // implementation of WriteTo. n, err := io.Copy(w, r.reader) + r.mu.Lock() if r.remain != -1 { r.remain -= int64(n) } + r.mu.Unlock() return n, err } @@ -402,6 +409,8 @@ func (r *Reader) Remain() int64 { if r.unfinalized { return -1 } + r.mu.Lock() + defer r.mu.Unlock() if r.remain < 0 { return 0 }