From 98764a856005744d8807c07925395fb7fc451ecc Mon Sep 17 00:00:00 2001 From: Christian Panadero Date: Mon, 15 Jun 2026 17:17:23 +0200 Subject: [PATCH] Fix goroutine and heap leak in Worker.Process Process fetches the file in a goroutine that sends the payload over a channel. The channels were unbuffered, so the goroutine blocked on the send until the caller received. On the png path, Process can return early (invalid token, or a fetchAnnotations failure) before reaching the select that drains the channel. When that happened the fetch goroutine blocked forever on the send and never observed ctx cancellation, leaking the goroutine and the full PDF payload it held in memory. In ec1 prod this showed up as live goroutines and heap-live-size ramping linearly between deploys and resetting on each restart. fetchAnnotations hits Redis on every png request, so intermittent Redis errors (or a malformed stored annotation) steadily accumulated leaked goroutines. Buffer both channels with capacity 1 so the goroutine can always complete its single send and exit even when the caller abandons it. Add a regression test that drives the fetchAnnotations-error path and asserts goroutines return to baseline; it leaks ~50 goroutines without the fix. Co-Authored-By: Claude Opus 4.8 (1M context) --- internal/service/worker.go | 9 +++-- internal/service/worker_test.go | 62 +++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/internal/service/worker.go b/internal/service/worker.go index 3126e15..355d4d1 100644 --- a/internal/service/worker.go +++ b/internal/service/worker.go @@ -105,9 +105,12 @@ func (w *Worker) Process( return newClientError(errors.New("invalid token")) } - // Fetch the file in a goroutine to allow the annotations to be processed while the payload is being fetch. - chanPayload := make(chan []byte) - chanError := make(chan error) + // Fetch the file in a goroutine to allow the annotations to be processed while the payload is being fetch. The + // channels are buffered so the goroutine can always complete its single send and exit, even when Process returns + // early (e.g. on a token or annotation error) before reaching the select that drains them. With unbuffered + // channels the goroutine would block forever on the send, leaking both the goroutine and the payload it holds. + chanPayload := make(chan []byte, 1) + chanError := make(chan error, 1) go func() { payload, err := w.fetchFile(ctx, path) if err != nil { diff --git a/internal/service/worker_test.go b/internal/service/worker_test.go index 171e790..7cc3156 100644 --- a/internal/service/worker_test.go +++ b/internal/service/worker_test.go @@ -8,6 +8,7 @@ import ( "io" "net/http" "os" + "runtime" "testing" "time" @@ -259,6 +260,67 @@ func TestWorkerProcess(t *testing.T) { } } +// TestWorkerProcessNoGoroutineLeak is a regression test for a goroutine + heap leak in Process. The file is fetched in +// a goroutine that sends the payload over a channel. When Process returns early on the png path (here, because +// fetchAnnotations fails) before draining that channel, the goroutine must still be able to complete its send and +// exit. With unbuffered channels it would block forever, leaking the goroutine and the payload it holds. +func TestWorkerProcessNoGoroutineLeak(t *testing.T) { + urlSecret := "secret" + validToken := urlsign.GenerateToken(urlSecret, 8*time.Hour, time.Now().Add(time.Hour), "documents") + + payload, err := os.ReadFile("testdata/sample.pdf") + require.NoError(t, err) + + // A fresh body is needed per call because it is consumed while reading, and fetches run concurrently with the + // caller; a plain fake avoids sharing a single drained buffer across goroutines. + s3Client := fakeS3{payload: payload} + + var annotationStorage mockWorkerAnnotationStorage + annotationStorage.On("FetchAnnotation", mock.Anything, mock.Anything). + Return([]any(nil), errors.New("annotation storage is down")) + + w := Worker{ + HTTPClient: http.DefaultClient, + URLSigningSecret: urlSecret, + TraceExtractor: traceExtractor, + StorageBucketRegion: map[string]string{"bucket-1": "eu-central-1"}, + getS3Client: func(string) (workerS3API, error) { return s3Client, nil }, + AnnotationStorage: &annotationStorage, + } + require.NoError(t, w.Init()) + + // Let any startup goroutines settle, then take a baseline. + time.Sleep(50 * time.Millisecond) + baseline := runtime.NumGoroutine() + + const iterations = 50 + url := fmt.Sprintf("documents?token=%s", validToken) + for range iterations { + err := w.Process( + context.Background(), url, "bucket-1/file.pdf", 1, 0, 0, 72, bytes.NewBuffer([]byte{}), "png", + ) + require.Error(t, err) + } + + // With the fix the fetch goroutines complete their send into the buffered channel and exit; poll until the count + // returns near baseline. With the bug present they stay blocked and the count remains elevated by ~iterations. + require.Eventually(t, func() bool { + return runtime.NumGoroutine() <= baseline+5 + }, 5*time.Second, 20*time.Millisecond, + "goroutines did not return to baseline (leak): baseline=%d current=%d", baseline, runtime.NumGoroutine()) +} + +// fakeS3 returns a fresh reader over payload on every call so concurrent fetches don't share a drained buffer. +type fakeS3 struct { + payload []byte +} + +func (f fakeS3) GetObject( + context.Context, *s3.GetObjectInput, ...func(*s3.Options), +) (*s3.GetObjectOutput, error) { + return &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewReader(f.payload))}, nil +} + type mockS3 struct { mock.Mock }