diff --git a/internal/service/worker.go b/internal/service/worker.go index 3126e15..355d4d1 100644 --- a/internal/service/worker.go +++ b/internal/service/worker.go @@ -105,9 +105,12 @@ func (w *Worker) Process( return newClientError(errors.New("invalid token")) } - // Fetch the file in a goroutine to allow the annotations to be processed while the payload is being fetch. - chanPayload := make(chan []byte) - chanError := make(chan error) + // Fetch the file in a goroutine to allow the annotations to be processed while the payload is being fetch. The + // channels are buffered so the goroutine can always complete its single send and exit, even when Process returns + // early (e.g. on a token or annotation error) before reaching the select that drains them. With unbuffered + // channels the goroutine would block forever on the send, leaking both the goroutine and the payload it holds. + chanPayload := make(chan []byte, 1) + chanError := make(chan error, 1) go func() { payload, err := w.fetchFile(ctx, path) if err != nil { diff --git a/internal/service/worker_test.go b/internal/service/worker_test.go index 171e790..7cc3156 100644 --- a/internal/service/worker_test.go +++ b/internal/service/worker_test.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "os" + "runtime" "testing" "time" @@ -259,6 +260,67 @@ func TestWorkerProcess(t *testing.T) { } } +// TestWorkerProcessNoGoroutineLeak is a regression test for a goroutine + heap leak in Process. The file is fetched in +// a goroutine that sends the payload over a channel. When Process returns early on the png path (here, because +// fetchAnnotations fails) before draining that channel, the goroutine must still be able to complete its send and +// exit. With unbuffered channels it would block forever, leaking the goroutine and the payload it holds. +func TestWorkerProcessNoGoroutineLeak(t *testing.T) { + urlSecret := "secret" + validToken := urlsign.GenerateToken(urlSecret, 8*time.Hour, time.Now().Add(time.Hour), "documents") + + payload, err := os.ReadFile("testdata/sample.pdf") + require.NoError(t, err) + + // A fresh body is needed per call because it is consumed while reading, and fetches run concurrently with the + // caller; a plain fake avoids sharing a single drained buffer across goroutines. + s3Client := fakeS3{payload: payload} + + var annotationStorage mockWorkerAnnotationStorage + annotationStorage.On("FetchAnnotation", mock.Anything, mock.Anything). + Return([]any(nil), errors.New("annotation storage is down")) + + w := Worker{ + HTTPClient: http.DefaultClient, + URLSigningSecret: urlSecret, + TraceExtractor: traceExtractor, + StorageBucketRegion: map[string]string{"bucket-1": "eu-central-1"}, + getS3Client: func(string) (workerS3API, error) { return s3Client, nil }, + AnnotationStorage: &annotationStorage, + } + require.NoError(t, w.Init()) + + // Let any startup goroutines settle, then take a baseline. + time.Sleep(50 * time.Millisecond) + baseline := runtime.NumGoroutine() + + const iterations = 50 + url := fmt.Sprintf("documents?token=%s", validToken) + for range iterations { + err := w.Process( + context.Background(), url, "bucket-1/file.pdf", 1, 0, 0, 72, bytes.NewBuffer([]byte{}), "png", + ) + require.Error(t, err) + } + + // With the fix the fetch goroutines complete their send into the buffered channel and exit; poll until the count + // returns near baseline. With the bug present they stay blocked and the count remains elevated by ~iterations. + require.Eventually(t, func() bool { + return runtime.NumGoroutine() <= baseline+5 + }, 5*time.Second, 20*time.Millisecond, + "goroutines did not return to baseline (leak): baseline=%d current=%d", baseline, runtime.NumGoroutine()) +} + +// fakeS3 returns a fresh reader over payload on every call so concurrent fetches don't share a drained buffer. +type fakeS3 struct { + payload []byte +} + +func (f fakeS3) GetObject( + context.Context, *s3.GetObjectInput, ...func(*s3.Options), +) (*s3.GetObjectOutput, error) { + return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewReader(f.payload))}, nil +} + type mockS3 struct { mock.Mock }