Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions internal/service/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,12 @@ func (w *Worker) Process(
return newClientError(errors.New("invalid token"))
}

// Fetch the file in a goroutine to allow the annotations to be processed while the payload is being fetch.
chanPayload := make(chan []byte)
chanError := make(chan error)
// Fetch the file in a goroutine to allow the annotations to be processed while the payload is being fetch. The
// channels are buffered so the goroutine can always complete its single send and exit, even when Process returns
// early (e.g. on a token or annotation error) before reaching the select that drains them. With unbuffered
// channels the goroutine would block forever on the send, leaking both the goroutine and the payload it holds.
chanPayload := make(chan []byte, 1)
chanError := make(chan error, 1)
go func() {
payload, err := w.fetchFile(ctx, path)
if err != nil {
Expand Down
62 changes: 62 additions & 0 deletions internal/service/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"os"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -259,6 +260,67 @@ func TestWorkerProcess(t *testing.T) {
}
}

// TestWorkerProcessNoGoroutineLeak is a regression test for a goroutine + heap leak in Process. The file is fetched in
// a goroutine that sends the payload over a channel. When Process returns early on the png path (here, because
// fetchAnnotations fails) before draining that channel, the goroutine must still be able to complete its send and
// exit. With unbuffered channels it would block forever, leaking the goroutine and the payload it holds.
func TestWorkerProcessNoGoroutineLeak(t *testing.T) {
urlSecret := "secret"
validToken := urlsign.GenerateToken(urlSecret, 8*time.Hour, time.Now().Add(time.Hour), "documents")

payload, err := os.ReadFile("testdata/sample.pdf")
require.NoError(t, err)

// A fresh body is needed per call because it is consumed while reading, and fetches run concurrently with the
// caller; a plain fake avoids sharing a single drained buffer across goroutines.
s3Client := fakeS3{payload: payload}

var annotationStorage mockWorkerAnnotationStorage
annotationStorage.On("FetchAnnotation", mock.Anything, mock.Anything).
Return([]any(nil), errors.New("annotation storage is down"))

w := Worker{
HTTPClient: http.DefaultClient,
URLSigningSecret: urlSecret,
TraceExtractor: traceExtractor,
StorageBucketRegion: map[string]string{"bucket-1": "eu-central-1"},
getS3Client: func(string) (workerS3API, error) { return s3Client, nil },
AnnotationStorage: &annotationStorage,
}
require.NoError(t, w.Init())

// Let any startup goroutines settle, then take a baseline.
time.Sleep(50 * time.Millisecond)
baseline := runtime.NumGoroutine()

const iterations = 50
url := fmt.Sprintf("documents?token=%s", validToken)
for range iterations {
err := w.Process(
context.Background(), url, "bucket-1/file.pdf", 1, 0, 0, 72, bytes.NewBuffer([]byte{}), "png",
)
require.Error(t, err)
}

// With the fix the fetch goroutines complete their send into the buffered channel and exit; poll until the count
// returns near baseline. With the bug present they stay blocked and the count remains elevated by ~iterations.
require.Eventually(t, func() bool {
return runtime.NumGoroutine() <= baseline+5
}, 5*time.Second, 20*time.Millisecond,
"goroutines did not return to baseline (leak): baseline=%d current=%d", baseline, runtime.NumGoroutine())
}

// fakeS3 returns a fresh reader over payload on every call so concurrent fetches don't share a drained buffer.
type fakeS3 struct {
payload []byte
}

func (f fakeS3) GetObject(
context.Context, *s3.GetObjectInput, ...func(*s3.Options),
) (*s3.GetObjectOutput, error) {
return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewReader(f.payload))}, nil
}

type mockS3 struct {
mock.Mock
}
Expand Down
Loading