From 65a27ceb803f72200b8d0c4321dac27cdfaf0067 Mon Sep 17 00:00:00 2001 From: SuperCoolPencil Date: Sun, 12 Apr 2026 02:53:55 +0530 Subject: [PATCH 01/10] feat: implement global and per-task download rate limiting using a token bucket algorithm --- internal/config/settings.go | 8 ++ internal/core/local_service.go | 11 +++ internal/engine/concurrent/worker.go | 7 ++ internal/engine/single/downloader.go | 17 ++++- internal/engine/types/config.go | 2 + internal/engine/types/config_convert.go | 2 + internal/engine/types/progress.go | 8 ++ internal/processing/manager.go | 3 + internal/utils/ratelimit.go | 98 +++++++++++++++++++++++++ 9 files changed, 153 insertions(+), 3 deletions(-) create mode 100644 internal/utils/ratelimit.go diff --git a/internal/config/settings.go b/internal/config/settings.go index 1d422c27..ab67bf19 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -66,6 +66,8 @@ type NetworkSettings struct { SequentialDownload bool `json:"sequential_download" ui_label:"Sequential Download" ui_desc:"Download pieces in order (Streaming Mode). May be slower."` MinChunkSize int64 `json:"min_chunk_size" ui_label:"Min Chunk Size" ui_desc:"Minimum download chunk size in MB (e.g., 2)."` WorkerBufferSize int `json:"worker_buffer_size" ui_label:"Worker Buffer Size" ui_desc:"I/O buffer size per worker in KB (e.g., 512)."` + GlobalRateLimit int64 `json:"global_rate_limit" ui_label:"Global Rate Limit (KB/s)" ui_desc:"Maximum total combined download speed in KB/s. 0 for unlimited."` + PerTaskRateLimit int64 `json:"per_task_rate_limit" ui_label:"Per-Task Rate Limit (KB/s)" ui_desc:"Maximum speed for each individual download in KB/s. 0 for unlimited."` } // PerformanceSettings contains performance tuning parameters. @@ -218,6 +220,8 @@ func DefaultSettings() *Settings { SequentialDownload: false, MinChunkSize: 2 * MB, WorkerBufferSize: 512 * KB, + GlobalRateLimit: 0, + PerTaskRateLimit: 0, }, Performance: PerformanceSettings{ MaxTaskRetries: 3, @@ -301,6 +305,8 @@ type RuntimeConfig struct { SequentialDownload bool MinChunkSize int64 WorkerBufferSize int + GlobalRateLimit int64 + PerTaskRateLimit int64 MaxTaskRetries int SlowWorkerThreshold float64 SlowWorkerGracePeriod time.Duration @@ -319,6 +325,8 @@ func (s *Settings) ToRuntimeConfig() *RuntimeConfig { SequentialDownload: s.Network.SequentialDownload, MinChunkSize: s.Network.MinChunkSize, WorkerBufferSize: s.Network.WorkerBufferSize, + GlobalRateLimit: s.Network.GlobalRateLimit, + PerTaskRateLimit: s.Network.PerTaskRateLimit, MaxTaskRetries: s.Performance.MaxTaskRetries, SlowWorkerThreshold: s.Performance.SlowWorkerThreshold, SlowWorkerGracePeriod: s.Performance.SlowWorkerGracePeriod, diff --git a/internal/core/local_service.go b/internal/core/local_service.go index 3b17bdbe..07e37700 100644 --- a/internal/core/local_service.go +++ b/internal/core/local_service.go @@ -39,6 +39,16 @@ func (s *LocalDownloadService) ReloadSettings() error { s.settingsMu.Lock() s.settings = settings s.settingsMu.Unlock() + + // Update active per-task limiters + if s.Pool != nil { + rate := settings.Network.PerTaskRateLimit * 1024 + for _, cfg := range s.Pool.GetAll() { + if cfg.State != nil && cfg.State.Limiter != nil { + cfg.State.Limiter.SetRate(rate) + } + } + } return nil } @@ -499,6 +509,7 @@ func (s *LocalDownloadService) add(url string, path string, filename string, mir state := types.NewProgressState(id, 0) state.DestPath = filepath.Join(outPath, filename) // Best guess until download starts + state.Limiter = utils.NewTokenBucket(settings.Network.PerTaskRateLimit * 1024) cfg := types.DownloadConfig{ URL: url, diff --git a/internal/engine/concurrent/worker.go b/internal/engine/concurrent/worker.go index 66d91288..6ec435ee 100644 --- a/internal/engine/concurrent/worker.go +++ b/internal/engine/concurrent/worker.go @@ -283,6 +283,13 @@ func (d *ConcurrentDownloader) downloadTask(ctx context.Context, rawurl string, for readSoFar < int(readSize) { n, err := resp.Body.Read(buf[readSoFar:readSize]) if n > 0 { + // Apply per-task rate limit + if d.State != nil && d.State.Limiter != nil { + _ = d.State.Limiter.WaitN(ctx, n) + } + // Apply global rate limit + _ = utils.GlobalRateLimiter.WaitN(ctx, n) + readSoFar += n // CONTINUOUS HEALTH KEEPALIVE: // Update LastActivity directly off the TCP socket instead of waiting for the buffer diff --git a/internal/engine/single/downloader.go b/internal/engine/single/downloader.go index 5e7debfa..4a817bbc 100644 --- a/internal/engine/single/downloader.go +++ b/internal/engine/single/downloader.go @@ -194,9 +194,10 @@ func (d *SingleDownloader) Download(ctx context.Context, rawurl, destPath string defer bufPool.Put(bufPtr) if d.State == nil { - written, err = io.CopyBuffer(outFile, resp.Body, buf) + progressReader := newProgressReader(ctx, resp.Body, d.State, types.WorkerBatchSize, types.WorkerBatchInterval) + written, err = io.CopyBuffer(outFile, progressReader, buf) } else { - progressReader := newProgressReader(resp.Body, d.State, types.WorkerBatchSize, types.WorkerBatchInterval) + progressReader := newProgressReader(ctx, resp.Body, d.State, types.WorkerBatchSize, types.WorkerBatchInterval) written, err = io.CopyBuffer(outFile, progressReader, buf) progressReader.Flush() } @@ -237,6 +238,7 @@ func (d *SingleDownloader) Download(ctx context.Context, rawurl, destPath string } type progressReader struct { + ctx context.Context reader io.Reader state *types.ProgressState batchSize int64 @@ -247,11 +249,12 @@ type progressReader struct { readChecks uint8 } -func newProgressReader(reader io.Reader, state *types.ProgressState, batchSize int64, batchInterval time.Duration) *progressReader { +func newProgressReader(ctx context.Context, reader io.Reader, state *types.ProgressState, batchSize int64, batchInterval time.Duration) *progressReader { if batchSize <= 0 { batchSize = types.WorkerBatchSize } return &progressReader{ + ctx: ctx, reader: reader, state: state, batchSize: batchSize, @@ -262,6 +265,14 @@ func newProgressReader(reader io.Reader, state *types.ProgressState, batchSize i func (w *progressReader) Read(p []byte) (int, error) { n, err := w.reader.Read(p) + + if n > 0 { + if w.state != nil && w.state.Limiter != nil { + _ = w.state.Limiter.WaitN(w.ctx, n) + } + _ = utils.GlobalRateLimiter.WaitN(w.ctx, n) + } + if n <= 0 || w.state == nil { return n, err } diff --git a/internal/engine/types/config.go b/internal/engine/types/config.go index b1d476bc..e511061c 100644 --- a/internal/engine/types/config.go +++ b/internal/engine/types/config.go @@ -76,6 +76,8 @@ type RuntimeConfig struct { MinChunkSize int64 WorkerBufferSize int + GlobalRateLimit int64 + PerTaskRateLimit int64 MaxTaskRetries int SlowWorkerThreshold float64 SlowWorkerGracePeriod time.Duration diff --git a/internal/engine/types/config_convert.go b/internal/engine/types/config_convert.go index fd33a802..6b856ac5 100644 --- a/internal/engine/types/config_convert.go +++ b/internal/engine/types/config_convert.go @@ -12,6 +12,8 @@ func ConvertRuntimeConfig(rc *config.RuntimeConfig) *RuntimeConfig { SequentialDownload: rc.SequentialDownload, MinChunkSize: rc.MinChunkSize, WorkerBufferSize: rc.WorkerBufferSize, + GlobalRateLimit: rc.GlobalRateLimit, + PerTaskRateLimit: rc.PerTaskRateLimit, MaxTaskRetries: rc.MaxTaskRetries, SlowWorkerThreshold: rc.SlowWorkerThreshold, SlowWorkerGracePeriod: rc.SlowWorkerGracePeriod, diff --git a/internal/engine/types/progress.go b/internal/engine/types/progress.go index 3cfb401a..e30a55e4 100644 --- a/internal/engine/types/progress.go +++ b/internal/engine/types/progress.go @@ -9,6 +9,12 @@ import ( "github.com/SurgeDM/Surge/internal/utils" ) +// RateLimiter defines the interface for throttling IO +type RateLimiter interface { + WaitN(ctx context.Context, n int) error + SetRate(rate int64) +} + type ProgressState struct { ID string Downloaded atomic.Int64 @@ -37,6 +43,8 @@ type ProgressState struct { BitmapWidth int // Number of chunks tracked mu sync.Mutex // Protects TotalSize, StartTime, SessionStartBytes, SavedElapsed, Mirrors + + Limiter RateLimiter // Per-task rate limiter } type MirrorStatus struct { diff --git a/internal/processing/manager.go b/internal/processing/manager.go index 1eddc106..1e6ffbdf 100644 --- a/internal/processing/manager.go +++ b/internal/processing/manager.go @@ -166,6 +166,9 @@ func (m *LifecycleManager) ApplySettings(s *config.Settings) { m.settings = s m.settingsRefreshedAt = time.Now() m.settingsMu.Unlock() + + // Dynamically update the global rate limit + utils.GlobalRateLimiter.SetRate(s.Network.GlobalRateLimit * 1024) } // SaveSettings persists and applies a new routing snapshot for future enqueue calls. diff --git a/internal/utils/ratelimit.go b/internal/utils/ratelimit.go new file mode 100644 index 00000000..5c6e467d --- /dev/null +++ b/internal/utils/ratelimit.go @@ -0,0 +1,98 @@ +package utils + +import ( + "context" + "sync" + "time" +) + +// TokenBucket is a thread-safe rate limiter. +type TokenBucket struct { + mu sync.Mutex + rateBytes float64 // tokens per second + tokens float64 // current tokens + lastUpdate time.Time + enabled bool +} + +// GlobalRateLimiter is the shared singleton for global application bandwidth limit. +var GlobalRateLimiter = NewTokenBucket(0) + +// NewTokenBucket creates a new rate limiter allowing 'rate' bytes per second. +// If rate is 0, the limiter is disabled and WaitN returns immediately. +func NewTokenBucket(rate int64) *TokenBucket { + // A max burst size is usually 1 second's worth of tokens or more, + // but since we update exactly, we can just use rate as our capacity. + enabled := rate > 0 + return &TokenBucket{ + rateBytes: float64(rate), + tokens: float64(rate), + lastUpdate: time.Now(), + enabled: enabled, + } +} + +// SetRate dynamically updates the rate limit. 0 disables it. +func (tb *TokenBucket) SetRate(rate int64) { + tb.mu.Lock() + defer tb.mu.Unlock() + + tb.rateBytes = float64(rate) + tb.enabled = rate > 0 + // Don't modify current tokens or restrict them right away, wait to drain. +} + +// WaitN blocks until 'n' bytes are available according to the rate limit. +func (tb *TokenBucket) WaitN(ctx context.Context, n int) error { + if n <= 0 { + return nil + } + + for { + // Use a minimal block scope for locking so we don't hold the mutex during sleep + sleepDur := time.Duration(0) + + tb.mu.Lock() + + if !tb.enabled { + tb.mu.Unlock() + return nil + } + + now := time.Now() + elapsed := now.Sub(tb.lastUpdate).Seconds() + + // Refill tokens based on elapsed time + tb.tokens += elapsed * tb.rateBytes + + // Cap tokens to 1 second worth of bandwidth (max burst) + if tb.tokens > tb.rateBytes { + tb.tokens = tb.rateBytes + } + + tb.lastUpdate = now + + reqTokens := float64(n) + if tb.tokens >= reqTokens { + // We have enough tokens, consume them and proceed + tb.tokens -= reqTokens + tb.mu.Unlock() + return nil + } + + // Not enough tokens. Calculate time to wait. + deficit := reqTokens - tb.tokens + sleepSecs := deficit / tb.rateBytes + sleepDur = time.Duration(sleepSecs * float64(time.Second)) + + tb.mu.Unlock() + + // Wait, while remaining respectful to the context + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(sleepDur): + // Time passed, loop again to claim tokens + } + } +} From 15a892a6a6a3e94cf7caccaf9167a1f551d39755 Mon Sep 17 00:00:00 2001 From: SuperCoolPencil Date: Sun, 12 Apr 2026 02:58:37 +0530 Subject: [PATCH 02/10] test: add comprehensive rate limiting tests for token bucket and downloader implementations --- internal/engine/concurrent/ratelimit_test.go | 98 ++++++++++++++++++++ internal/engine/single/ratelimit_test.go | 92 ++++++++++++++++++ internal/utils/ratelimit_test.go | 84 +++++++++++++++++ 3 files changed, 274 insertions(+) create mode 100644 internal/engine/concurrent/ratelimit_test.go create mode 100644 internal/engine/single/ratelimit_test.go create mode 100644 internal/utils/ratelimit_test.go diff --git a/internal/engine/concurrent/ratelimit_test.go b/internal/engine/concurrent/ratelimit_test.go new file mode 100644 index 00000000..0abd5d45 --- /dev/null +++ b/internal/engine/concurrent/ratelimit_test.go @@ -0,0 +1,98 @@ +package concurrent + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/SurgeDM/Surge/internal/engine/types" + "github.com/SurgeDM/Surge/internal/testutil" + "github.com/SurgeDM/Surge/internal/utils" +) + +func TestConcurrentDownloader_GlobalRateLimit(t *testing.T) { + tmpDir, cleanup, _ := testutil.TempDir("surge-ratelimit-conc") + defer cleanup() + + fileSize := int64(128 * 1024) // 128KB + server := testutil.NewMockServerT(t, + testutil.WithFileSize(fileSize), + testutil.WithRangeSupport(true), + ) + defer server.Close() + + destPath := filepath.Join(tmpDir, "ratelimit_conc.bin") + state := types.NewProgressState("ratelimit-conc", fileSize) + runtime := &types.RuntimeConfig{ + MaxConnectionsPerHost: 2, + MinChunkSize: 32 * 1024, + } + + downloader := NewConcurrentDownloader("ratelimit-id", nil, state, runtime) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Apply global rate limit of 64KB/s. First 64KB is instant (burst), next 64KB takes ~1s. + utils.GlobalRateLimiter.SetRate(64 * 1024) + defer utils.GlobalRateLimiter.SetRate(0) + + if f, err := os.Create(destPath + ".surge"); err == nil { + _ = f.Close() + } + + start := time.Now() + err := downloader.Download(ctx, server.URL(), nil, nil, destPath, fileSize) + if err != nil { + t.Fatalf("Download failed: %v", err) + } + + elapsed := time.Since(start) + if elapsed < 800*time.Millisecond { + t.Errorf("Download completed too fast (%v), global rate limit not applied", elapsed) + } +} + +func TestConcurrentDownloader_PerTaskRateLimit(t *testing.T) { + tmpDir, cleanup, _ := testutil.TempDir("surge-ratelimit-conc-task") + defer cleanup() + + fileSize := int64(128 * 1024) // 128KB + server := testutil.NewMockServerT(t, + testutil.WithFileSize(fileSize), + testutil.WithRangeSupport(true), + ) + defer server.Close() + + destPath := filepath.Join(tmpDir, "ratelimit_conc_task.bin") + state := types.NewProgressState("ratelimit-conc-task", fileSize) + // Apply per-task rate limit of 64KB/s + state.Limiter = utils.NewTokenBucket(64 * 1024) + + runtime := &types.RuntimeConfig{ + MaxConnectionsPerHost: 2, + MinChunkSize: 32 * 1024, + } + + downloader := NewConcurrentDownloader("ratelimit-task-id", nil, state, runtime) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if f, err := os.Create(destPath + ".surge"); err == nil { + _ = f.Close() + } + + start := time.Now() + err := downloader.Download(ctx, server.URL(), nil, nil, destPath, fileSize) + if err != nil { + t.Fatalf("Download failed: %v", err) + } + + elapsed := time.Since(start) + if elapsed < 800*time.Millisecond { + t.Errorf("Download completed too fast (%v), per task rate limit not applied", elapsed) + } +} diff --git a/internal/engine/single/ratelimit_test.go b/internal/engine/single/ratelimit_test.go new file mode 100644 index 00000000..d0045823 --- /dev/null +++ b/internal/engine/single/ratelimit_test.go @@ -0,0 +1,92 @@ +package single + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/SurgeDM/Surge/internal/engine/types" + "github.com/SurgeDM/Surge/internal/testutil" + "github.com/SurgeDM/Surge/internal/utils" +) + +func TestSingleDownloader_GlobalRateLimit(t *testing.T) { + tmpDir, cleanup, _ := testutil.TempDir("surge-ratelimit-single") + defer cleanup() + + fileSize := int64(128 * 1024) // 128KB + server := testutil.NewMockServerT(t, + testutil.WithFileSize(fileSize), + testutil.WithRangeSupport(false), + ) + defer server.Close() + + destPath := filepath.Join(tmpDir, "ratelimit_single.bin") + state := types.NewProgressState("ratelimit-single", fileSize) + runtime := &types.RuntimeConfig{} + + downloader := NewSingleDownloader("ratelimit-id", nil, state, runtime) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Apply global rate limit of 64KB/s + utils.GlobalRateLimiter.SetRate(64 * 1024) + defer utils.GlobalRateLimiter.SetRate(0) // Reset after test + + if f, err := os.Create(destPath + ".surge"); err == nil { + _ = f.Close() + } + + start := time.Now() + err := downloader.Download(ctx, server.URL(), destPath, fileSize, "ratelimit.bin") + if err != nil { + t.Fatalf("Download failed: %v", err) + } + + elapsed := time.Since(start) + if elapsed < 800*time.Millisecond { + t.Errorf("Download completed too fast (%v), global rate limit not applied", elapsed) + } +} + +func TestSingleDownloader_PerTaskRateLimit(t *testing.T) { + tmpDir, cleanup, _ := testutil.TempDir("surge-ratelimit-single-task") + defer cleanup() + + fileSize := int64(128 * 1024) // 128KB + server := testutil.NewMockServerT(t, + testutil.WithFileSize(fileSize), + testutil.WithRangeSupport(false), + ) + defer server.Close() + + destPath := filepath.Join(tmpDir, "ratelimit_single_task.bin") + state := types.NewProgressState("ratelimit-single-task", fileSize) + // Apply per-task rate limit of 64KB/s + state.Limiter = utils.NewTokenBucket(64 * 1024) + + runtime := &types.RuntimeConfig{} + + downloader := NewSingleDownloader("ratelimit-task-id", nil, state, runtime) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if f, err := os.Create(destPath + ".surge"); err == nil { + _ = f.Close() + } + + start := time.Now() + err := downloader.Download(ctx, server.URL(), destPath, fileSize, "ratelimit.bin") + if err != nil { + t.Fatalf("Download failed: %v", err) + } + + elapsed := time.Since(start) + if elapsed < 800*time.Millisecond { + t.Errorf("Download completed too fast (%v), per-task rate limit not applied", elapsed) + } +} diff --git a/internal/utils/ratelimit_test.go b/internal/utils/ratelimit_test.go new file mode 100644 index 00000000..5bf8cbc4 --- /dev/null +++ b/internal/utils/ratelimit_test.go @@ -0,0 +1,84 @@ +package utils + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTokenBucket_Disabled(t *testing.T) { + // A rate of 0 disables the limiter + tb := NewTokenBucket(0) + + start := time.Now() + // Ask for a large amount of tokens + err := tb.WaitN(context.Background(), 1024*1024*100) // 100MB + elapsed := time.Since(start) + + assert.NoError(t, err) + // Shouldn't have blocked for any meaningful amount of time + assert.Less(t, elapsed, 10*time.Millisecond, "Disabled bucket should not block") +} + +func TestTokenBucket_Throttles(t *testing.T) { + // 500 bytes per second + tb := NewTokenBucket(500) + + // Consume all initial tokens immediately (500 tokens usually) Wait, the burst size is the rate. + // We wait 500 tokens. This shouldn't block much because we start full. + start := time.Now() + err := tb.WaitN(context.Background(), 500) + assert.NoError(t, err) + elapsedInitial := time.Since(start) + assert.Less(t, elapsedInitial, 50*time.Millisecond, "Initial capacity should service immediately") + + // Now ask for 250 more tokens. At 500 bytes/sec, this should take ~0.5 seconds. + start = time.Now() + err = tb.WaitN(context.Background(), 250) + assert.NoError(t, err) + elapsedNext := time.Since(start) + + assert.GreaterOrEqual(t, float64(elapsedNext), float64(400*time.Millisecond), "Should block waiting for tokens") + assert.Less(t, float64(elapsedNext), float64(700*time.Millisecond), "Should wake up reasonably close to target time") +} + +func TestTokenBucket_ContextCancellation(t *testing.T) { + // 100 bytes per second + tb := NewTokenBucket(100) + + // Consume initial bucket + _ = tb.WaitN(context.Background(), 100) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + // Ask for 500 tokens, which would take 5 seconds. + // But context will cancel in 100ms. + start := time.Now() + err := tb.WaitN(ctx, 500) + elapsed := time.Since(start) + + assert.ErrorIs(t, err, context.DeadlineExceeded) + assert.Less(t, float64(elapsed), float64(200*time.Millisecond), "Should abort early due to context cancellation") +} + +func TestTokenBucket_DynamicSetRate(t *testing.T) { + tb := NewTokenBucket(500) + + // Consume initial + _ = tb.WaitN(context.Background(), 500) + + // Change rate to 5000 bytes/sec + tb.SetRate(5000) + + // Ask for 2500 bytes. With new rate this takes 0.5s. + start := time.Now() + err := tb.WaitN(context.Background(), 2500) + assert.NoError(t, err) + elapsed := time.Since(start) + + assert.GreaterOrEqual(t, float64(elapsed), float64(400*time.Millisecond)) + assert.Less(t, float64(elapsed), float64(700*time.Millisecond)) +} From 2ea27c738fde5c1107ba12e900180d00c236cb0f Mon Sep 17 00:00:00 2001 From: SuperCoolPencil Date: Sun, 12 Apr 2026 03:03:17 +0530 Subject: [PATCH 03/10] docs: add global and per-task rate limit configurations to SETTINGS.md --- README.md | 1 + docs/SETTINGS.md | 2 ++ 2 files changed, 3 insertions(+) diff --git a/README.md b/README.md index 1ecfb377..b9801826 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ Most browsers open a single connection for a download. Surge opens multiple (up - **Multiple Mirrors:** Download from multiple sources simultaneously. Surge distributes workers across all available mirrors and automatically handles failover. - **Sequential Download:** Option to download files in strict order (Streaming Mode). Ideal for media files that you want to preview while downloading. - **Daemon Architecture:** Surge runs a single background "engine." You can open 10 different terminal tabs and queue downloads; they all funnel into one efficient manager. +- **Rate Limiting:** Control your bandwidth usage globally or strictly on a per-download basis. - **Beautiful TUI:** Built with Bubble Tea & Lipgloss, it looks good while it works. For a deep dive into how we make downloads faster (like work stealing and slow worker handling), check out our **[Optimization Guide](docs/OPTIMIZATIONS.md)**. diff --git a/docs/SETTINGS.md b/docs/SETTINGS.md index 37915227..73bce582 100644 --- a/docs/SETTINGS.md +++ b/docs/SETTINGS.md @@ -77,6 +77,8 @@ Surge follows OS conventions for storing its files. Below is a breakdown of ever | `sequential_download` | bool | Download file pieces in strict order (Streaming Mode). Useful for previewing media but may be slower. | `false` | | `min_chunk_size` | int64 | Minimum size of a download chunk in bytes (e.g., `2097152` for 2MB). | `2MB` | | `worker_buffer_size` | int | I/O buffer size per worker in bytes (e.g., `524288` for 512KB). | `512KB` | +| `global_rate_limit` | int64 | Maximum total combined download speed in KB/s. `0` for unlimited. | `0` | +| `per_task_rate_limit` | int64 | Maximum speed for each individual download in KB/s. `0` for unlimited. | `0` | ### Performance Settings From 730fc2fb30a3840e995c524d956684dfc8186bf6 Mon Sep 17 00:00:00 2001 From: SuperCoolPencil Date: Sun, 12 Apr 2026 03:17:50 +0530 Subject: [PATCH 04/10] refactor: replace custom token bucket implementation with golang.org/x/time/rate and improve error handling in downloader and worker loops --- go.mod | 1 + go.sum | 2 + internal/engine/concurrent/ratelimit_test.go | 6 +- internal/engine/concurrent/worker.go | 10 +- internal/engine/single/downloader.go | 23 ++-- internal/engine/single/ratelimit_test.go | 5 +- internal/utils/ratelimit.go | 135 ++++++++++--------- 7 files changed, 104 insertions(+), 78 deletions(-) diff --git a/go.mod b/go.mod index 5d5055e9..a7115d50 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/spf13/cobra v1.10.2 github.com/stretchr/testify v1.11.1 github.com/vfaronov/httpheader v0.1.0 + golang.org/x/time v0.15.0 modernc.org/sqlite v1.48.1 ) diff --git a/go.sum b/go.sum index c245bcdd..9298b723 100644 --- a/go.sum +++ b/go.sum @@ -124,6 +124,8 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= +golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= +golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/engine/concurrent/ratelimit_test.go b/internal/engine/concurrent/ratelimit_test.go index 0abd5d45..939b66ad 100644 --- a/internal/engine/concurrent/ratelimit_test.go +++ b/internal/engine/concurrent/ratelimit_test.go @@ -35,9 +35,9 @@ func TestConcurrentDownloader_GlobalRateLimit(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // Apply global rate limit of 64KB/s. First 64KB is instant (burst), next 64KB takes ~1s. - utils.GlobalRateLimiter.SetRate(64 * 1024) - defer utils.GlobalRateLimiter.SetRate(0) + // Apply rate limit of 64KB/s via state.Limiter to prevent data race across test packages. + // First 64KB is instant (burst), next 64KB takes ~1s. + state.Limiter = utils.NewTokenBucket(64 * 1024) if f, err := os.Create(destPath + ".surge"); err == nil { _ = f.Close() diff --git a/internal/engine/concurrent/worker.go b/internal/engine/concurrent/worker.go index 6ec435ee..fb102220 100644 --- a/internal/engine/concurrent/worker.go +++ b/internal/engine/concurrent/worker.go @@ -285,10 +285,16 @@ func (d *ConcurrentDownloader) downloadTask(ctx context.Context, rawurl string, if n > 0 { // Apply per-task rate limit if d.State != nil && d.State.Limiter != nil { - _ = d.State.Limiter.WaitN(ctx, n) + if limiterErr := d.State.Limiter.WaitN(ctx, n); limiterErr != nil { + readErr = limiterErr + break + } } // Apply global rate limit - _ = utils.GlobalRateLimiter.WaitN(ctx, n) + if limiterErr := utils.GlobalRateLimiter.WaitN(ctx, n); limiterErr != nil { + readErr = limiterErr + break + } readSoFar += n // CONTINUOUS HEALTH KEEPALIVE: diff --git a/internal/engine/single/downloader.go b/internal/engine/single/downloader.go index 4a817bbc..0e824fde 100644 --- a/internal/engine/single/downloader.go +++ b/internal/engine/single/downloader.go @@ -193,12 +193,15 @@ func (d *SingleDownloader) Download(ctx context.Context, rawurl, destPath string buf := *bufPtr defer bufPool.Put(bufPtr) - if d.State == nil { - progressReader := newProgressReader(ctx, resp.Body, d.State, types.WorkerBatchSize, types.WorkerBatchInterval) - written, err = io.CopyBuffer(outFile, progressReader, buf) - } else { - progressReader := newProgressReader(ctx, resp.Body, d.State, types.WorkerBatchSize, types.WorkerBatchInterval) - written, err = io.CopyBuffer(outFile, progressReader, buf) + if d.State != nil { + d.State.Downloaded.Store(written) + d.State.VerifiedProgress.Store(written) + // Set d.State earlier, then merge code branches + } + + progressReader := newProgressReader(ctx, resp.Body, d.State, types.WorkerBatchSize, types.WorkerBatchInterval) + written, err = io.CopyBuffer(outFile, progressReader, buf) + if d.State != nil { progressReader.Flush() } if err != nil { @@ -268,9 +271,13 @@ func (w *progressReader) Read(p []byte) (int, error) { if n > 0 { if w.state != nil && w.state.Limiter != nil { - _ = w.state.Limiter.WaitN(w.ctx, n) + if limiterErr := w.state.Limiter.WaitN(w.ctx, n); limiterErr != nil { + return n, limiterErr + } + } + if limiterErr := utils.GlobalRateLimiter.WaitN(w.ctx, n); limiterErr != nil { + return n, limiterErr } - _ = utils.GlobalRateLimiter.WaitN(w.ctx, n) } if n <= 0 || w.state == nil { diff --git a/internal/engine/single/ratelimit_test.go b/internal/engine/single/ratelimit_test.go index d0045823..5ef922fc 100644 --- a/internal/engine/single/ratelimit_test.go +++ b/internal/engine/single/ratelimit_test.go @@ -32,9 +32,8 @@ func TestSingleDownloader_GlobalRateLimit(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // Apply global rate limit of 64KB/s - utils.GlobalRateLimiter.SetRate(64 * 1024) - defer utils.GlobalRateLimiter.SetRate(0) // Reset after test + // Apply rate limit of 64KB/s via state.Limiter to prevent data race across test packages. + state.Limiter = utils.NewTokenBucket(64 * 1024) if f, err := os.Create(destPath + ".surge"); err == nil { _ = f.Close() diff --git a/internal/utils/ratelimit.go b/internal/utils/ratelimit.go index 5c6e467d..6f0ecb9d 100644 --- a/internal/utils/ratelimit.go +++ b/internal/utils/ratelimit.go @@ -3,43 +3,65 @@ package utils import ( "context" "sync" - "time" + + "golang.org/x/time/rate" ) // TokenBucket is a thread-safe rate limiter. type TokenBucket struct { - mu sync.Mutex - rateBytes float64 // tokens per second - tokens float64 // current tokens - lastUpdate time.Time - enabled bool + mu sync.RWMutex + limiter *rate.Limiter + enabled bool } // GlobalRateLimiter is the shared singleton for global application bandwidth limit. var GlobalRateLimiter = NewTokenBucket(0) -// NewTokenBucket creates a new rate limiter allowing 'rate' bytes per second. -// If rate is 0, the limiter is disabled and WaitN returns immediately. -func NewTokenBucket(rate int64) *TokenBucket { - // A max burst size is usually 1 second's worth of tokens or more, - // but since we update exactly, we can just use rate as our capacity. - enabled := rate > 0 +// NewTokenBucket creates a new rate limiter allowing 'rateBytes' bytes per second. +// If rateBytes is 0, the limiter is disabled and WaitN returns immediately. +func NewTokenBucket(rateBytes int64) *TokenBucket { + // A max burst size is usually 1 second's worth of tokens or more. + // Since we chunk requests in WaitN, burst can strictly follow rateBytes. + burst := int(rateBytes) + if burst < 1 && rateBytes > 0 { + burst = 1 + } + + enabled := rateBytes > 0 + var l *rate.Limiter + if enabled { + l = rate.NewLimiter(rate.Limit(rateBytes), burst) + } + return &TokenBucket{ - rateBytes: float64(rate), - tokens: float64(rate), - lastUpdate: time.Now(), - enabled: enabled, + limiter: l, + enabled: enabled, } } // SetRate dynamically updates the rate limit. 0 disables it. -func (tb *TokenBucket) SetRate(rate int64) { +func (tb *TokenBucket) SetRate(rateBytes int64) { tb.mu.Lock() defer tb.mu.Unlock() - tb.rateBytes = float64(rate) - tb.enabled = rate > 0 - // Don't modify current tokens or restrict them right away, wait to drain. + if rateBytes <= 0 { + tb.enabled = false + tb.limiter = nil + return + } + + burst := int(rateBytes) + if burst < 1 { + burst = 1 + } + + if tb.limiter == nil { + tb.limiter = rate.NewLimiter(rate.Limit(rateBytes), burst) + } else { + tb.limiter.SetLimit(rate.Limit(rateBytes)) + tb.limiter.SetBurst(burst) + } + tb.enabled = true } // WaitN blocks until 'n' bytes are available according to the rate limit. @@ -48,51 +70,40 @@ func (tb *TokenBucket) WaitN(ctx context.Context, n int) error { return nil } - for { - // Use a minimal block scope for locking so we don't hold the mutex during sleep - sleepDur := time.Duration(0) - - tb.mu.Lock() - - if !tb.enabled { - tb.mu.Unlock() - return nil - } + tb.mu.RLock() + enabled := tb.enabled + l := tb.limiter + tb.mu.RUnlock() + + if !enabled || l == nil { + return nil + } - now := time.Now() - elapsed := now.Sub(tb.lastUpdate).Seconds() + // Wait handles context cancellation naturally + // Loop over burst chunks to avoid WaitN error if n > burst + burst := l.Burst() + if burst <= 0 { + return nil + } - // Refill tokens based on elapsed time - tb.tokens += elapsed * tb.rateBytes - - // Cap tokens to 1 second worth of bandwidth (max burst) - if tb.tokens > tb.rateBytes { - tb.tokens = tb.rateBytes - } - - tb.lastUpdate = now - - reqTokens := float64(n) - if tb.tokens >= reqTokens { - // We have enough tokens, consume them and proceed - tb.tokens -= reqTokens - tb.mu.Unlock() - return nil + for n > 0 { + req := n + if req > burst { + req = burst } - - // Not enough tokens. Calculate time to wait. - deficit := reqTokens - tb.tokens - sleepSecs := deficit / tb.rateBytes - sleepDur = time.Duration(sleepSecs * float64(time.Second)) - - tb.mu.Unlock() - - // Wait, while remaining respectful to the context - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(sleepDur): - // Time passed, loop again to claim tokens + if err := l.WaitN(ctx, req); err != nil { + // rate.Limiter may return a non-wrapped error if the deadline is too soon. + // Map it to standard context errors for compatibility. + if err.Error() != "" && (err == context.DeadlineExceeded || err == context.Canceled) { + return err + } + if ctxErr := ctx.Err(); ctxErr != nil { + return ctxErr + } + // If rate limiter rejected it because it would exceed the deadline: + return context.DeadlineExceeded } + n -= req } + return nil } From 3e46a467be3c6271cc22414ef4e9177e680ba91e Mon Sep 17 00:00:00 2001 From: SuperCoolPencil Date: Sun, 12 Apr 2026 10:06:27 +0530 Subject: [PATCH 05/10] feat: initialize global rate limiter from persisted configuration settings --- internal/processing/manager.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/processing/manager.go b/internal/processing/manager.go index 1e6ffbdf..0f7b2786 100644 --- a/internal/processing/manager.go +++ b/internal/processing/manager.go @@ -85,6 +85,8 @@ func NewLifecycleManager(addFunc AddDownloadFunc, addWithIDFunc AddDownloadWithI if err != nil { settings = config.DefaultSettings() } + // Seed global rate limiter from persisted config + utils.GlobalRateLimiter.SetRate(settings.Network.GlobalRateLimit * 1024) var activeCheck IsNameActiveFunc if len(isNameActive) > 0 { From 6257a9a3c956f3f116bb77de13eddba4245e46d7 Mon Sep 17 00:00:00 2001 From: SuperCoolPencil Date: Sun, 12 Apr 2026 10:08:01 +0530 Subject: [PATCH 06/10] feat: implement dynamic rate limiter updates and handle concurrent burst size changes in WaitN --- internal/core/local_service.go | 3 +++ internal/utils/ratelimit.go | 17 ++++++++++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/internal/core/local_service.go b/internal/core/local_service.go index 07e37700..aea2cf38 100644 --- a/internal/core/local_service.go +++ b/internal/core/local_service.go @@ -40,6 +40,9 @@ func (s *LocalDownloadService) ReloadSettings() error { s.settings = settings s.settingsMu.Unlock() + // Update global rate limiter + utils.GlobalRateLimiter.SetRate(settings.Network.GlobalRateLimit * 1024) + // Update active per-task limiters if s.Pool != nil { rate := settings.Network.PerTaskRateLimit * 1024 diff --git a/internal/utils/ratelimit.go b/internal/utils/ratelimit.go index 6f0ecb9d..b63b0919 100644 --- a/internal/utils/ratelimit.go +++ b/internal/utils/ratelimit.go @@ -81,17 +81,24 @@ func (tb *TokenBucket) WaitN(ctx context.Context, n int) error { // Wait handles context cancellation naturally // Loop over burst chunks to avoid WaitN error if n > burst - burst := l.Burst() - if burst <= 0 { - return nil - } - for n > 0 { + burst := l.Burst() + if burst <= 0 { + return nil + } + req := n if req > burst { req = burst } + if err := l.WaitN(ctx, req); err != nil { + // If the burst was reduced concurrently, req might be larger than the new burst. + // In that case, WaitN fails instantaneously. We should retry with the new burst. + if req > l.Burst() { + continue + } + // rate.Limiter may return a non-wrapped error if the deadline is too soon. // Map it to standard context errors for compatibility. if err.Error() != "" && (err == context.DeadlineExceeded || err == context.Canceled) { From 8b8b2195f1ee05c508157eb3b5499954cf0e1c29 Mon Sep 17 00:00:00 2001 From: SuperCoolPencil Date: Sun, 12 Apr 2026 10:20:26 +0530 Subject: [PATCH 07/10] chore: --- internal/engine/single/downloader.go | 2 +- internal/engine/single/ratelimit_test.go | 2 +- internal/processing/manager.go | 2 +- internal/tui/constants.go | 34 ++++++++++++------------ internal/tui/layout_helpers.go | 6 ++--- internal/tui/view_category.go | 2 -- internal/tui/view_settings.go | 9 +++---- 7 files changed, 27 insertions(+), 30 deletions(-) diff --git a/internal/engine/single/downloader.go b/internal/engine/single/downloader.go index 0e824fde..24b3911d 100644 --- a/internal/engine/single/downloader.go +++ b/internal/engine/single/downloader.go @@ -268,7 +268,7 @@ func newProgressReader(ctx context.Context, reader io.Reader, state *types.Progr func (w *progressReader) Read(p []byte) (int, error) { n, err := w.reader.Read(p) - + if n > 0 { if w.state != nil && w.state.Limiter != nil { if limiterErr := w.state.Limiter.WaitN(w.ctx, n); limiterErr != nil { diff --git a/internal/engine/single/ratelimit_test.go b/internal/engine/single/ratelimit_test.go index 5ef922fc..547596af 100644 --- a/internal/engine/single/ratelimit_test.go +++ b/internal/engine/single/ratelimit_test.go @@ -66,7 +66,7 @@ func TestSingleDownloader_PerTaskRateLimit(t *testing.T) { state := types.NewProgressState("ratelimit-single-task", fileSize) // Apply per-task rate limit of 64KB/s state.Limiter = utils.NewTokenBucket(64 * 1024) - + runtime := &types.RuntimeConfig{} downloader := NewSingleDownloader("ratelimit-task-id", nil, state, runtime) diff --git a/internal/processing/manager.go b/internal/processing/manager.go index 0f7b2786..bba89f77 100644 --- a/internal/processing/manager.go +++ b/internal/processing/manager.go @@ -168,7 +168,7 @@ func (m *LifecycleManager) ApplySettings(s *config.Settings) { m.settings = s m.settingsRefreshedAt = time.Now() m.settingsMu.Unlock() - + // Dynamically update the global rate limit utils.GlobalRateLimiter.SetRate(s.Network.GlobalRateLimit * 1024) } diff --git a/internal/tui/constants.go b/internal/tui/constants.go index 508d1384..bd860cb0 100644 --- a/internal/tui/constants.go +++ b/internal/tui/constants.go @@ -11,37 +11,37 @@ const ( TickInterval = 200 * time.Millisecond // === Layout Ratios === - ListWidthRatio = 0.6 // Dashboard: List takes 60% width - SettingsWidthRatio = 0.72 // Modals: Settings/Category use 72% width - LogoWidthRatio = 0.45 // Header: Logo takes 45% of left column + ListWidthRatio = 0.6 // Dashboard: List takes 60% width + SettingsWidthRatio = 0.72 // Modals: Settings/Category use 72% width + LogoWidthRatio = 0.45 // Header: Logo takes 45% of left column GraphTargetHeightRatio = 0.4 // Right Column: Graph target 40% height // === Thresholds and Minimums === - MinTermWidth = 45 - MinTermHeight = 12 + MinTermWidth = 45 + MinTermHeight = 12 ShortTermHeightThreshold = 25 // Switch to compact header below this height - MinSettingsWidth = 64 - MaxSettingsWidth = 130 - MinSettingsHeight = 12 + MinSettingsWidth = 64 + MaxSettingsWidth = 130 + MinSettingsHeight = 12 DefaultSettingsHeight = 26 MinRightColumnWidth = 50 // Hide right column if narrow MinGraphStatsWidth = 70 // Hide inline graph stats if narrow MinLogoWidth = 60 // Hide ASCII logo if narrow - MinGraphHeight = 9 - MinGraphHeightShort = 5 - MinListHeight = 10 - MinChunkMapHeight = 4 - MinChunkMapVisibleH = 18 // Min term height to show chunk map + MinGraphHeight = 9 + MinGraphHeightShort = 5 + MinListHeight = 10 + MinChunkMapHeight = 4 + MinChunkMapVisibleH = 18 // Min term height to show chunk map // === Component Heights === - ModalHeightPadding = 4 // Bottom fallback padding for modals to avoid clipping + ModalHeightPadding = 4 // Bottom fallback padding for modals to avoid clipping HeaderHeightMax = 11 - HeaderHeightMin = 3 - FilePickerHeight = 12 - CardHeight = 2 // Compact rows for downloads list + HeaderHeightMin = 3 + FilePickerHeight = 12 + CardHeight = 2 // Compact rows for downloads list // === Padding and Offsets === DefaultPaddingX = 1 diff --git a/internal/tui/layout_helpers.go b/internal/tui/layout_helpers.go index 6adcf5b2..eb0de72c 100644 --- a/internal/tui/layout_helpers.go +++ b/internal/tui/layout_helpers.go @@ -49,7 +49,7 @@ func GetSettingsDimensions(termWidth, termHeight int) (int, int) { // GetListWidth calculates the list width based on available width func GetListWidth(availableWidth int) int { leftWidth := int(float64(availableWidth) * ListWidthRatio) - + // Determine right column viability rightWidth := availableWidth - leftWidth if rightWidth < MinRightColumnWidth { @@ -66,7 +66,7 @@ func IsShortTerminal(height int) bool { // GetGraphAreaDimensions calculates dimensions for the graph area func GetGraphAreaDimensions(rightWidth int, isStatsHidden bool) (int, int) { axisWidth := GraphAxisWidth - + if isStatsHidden { // No stats box — graph gets almost full width. // Higher buffer (* 5) accounts for extra padding needed when axis is on the far right @@ -77,7 +77,7 @@ func GetGraphAreaDimensions(rightWidth int, isStatsHidden bool) (int, int) { } return graphAreaWidth, axisWidth } - + // Graph takes remaining width after stats box. // Smaller buffer (* 3) as the stats box provides its own internal padding. graphAreaWidth := rightWidth - GraphStatsWidth - axisWidth - (BoxStyle.GetHorizontalFrameSize() * 3) diff --git a/internal/tui/view_category.go b/internal/tui/view_category.go index fd0f4d3b..9bfc1253 100644 --- a/internal/tui/view_category.go +++ b/internal/tui/view_category.go @@ -104,8 +104,6 @@ func (m RootModel) viewCategoryManager() string { return m.renderModalWithOverlay(box) } - - func (m RootModel) renderCategoryHelp(width int) string { if width < 1 { width = 1 diff --git a/internal/tui/view_settings.go b/internal/tui/view_settings.go index 888aba15..e32c2ed4 100644 --- a/internal/tui/view_settings.go +++ b/internal/tui/view_settings.go @@ -107,8 +107,6 @@ func (m RootModel) viewSettings() string { return m.renderModalWithOverlay(box) } - - func shortSettingsCategoryLabel(label string) string { switch label { case "General": @@ -303,7 +301,8 @@ func (m RootModel) renderSettingsDetailBlock(settingsMeta []config.SettingMeta, if m.SettingsIsEditing { valueStr = m.SettingsInput.View() + unitStyle.Render(unit) } else { - if meta.Type == "auth_token" { + switch meta.Type { + case "auth_token": token := GetAuthToken() if token == "" { valueStr = lipgloss.NewStyle().Foreground(colors.Gray).Render("(Not generated yet)") @@ -318,9 +317,9 @@ func (m RootModel) renderSettingsDetailBlock(settingsMeta []config.SettingMeta, valueStr = displayToken + lipgloss.NewStyle().Foreground(colors.Gray).Render(" [Enter to Copy]") } } - } else if meta.Type == "link" { + case "link": valueStr = lipgloss.NewStyle().Foreground(colors.NeonCyan).Render("Open [Enter]") - } else { + default: valueStr = formatSettingValueForEdit(value, meta.Type, meta.Key) + unitStyle.Render(unit) if meta.Key == "max_global_connections" { valueStr += " (Ignored)" From 38887ac50ed17a8c0b1227b43654f73001b631a1 Mon Sep 17 00:00:00 2001 From: SuperCoolPencil Date: Sun, 12 Apr 2026 10:29:53 +0530 Subject: [PATCH 08/10] address ai review comments --- cmd/root.go | 7 +++++++ internal/engine/concurrent/ratelimit_test.go | 5 +++-- internal/engine/single/downloader.go | 7 ------- internal/engine/single/ratelimit_test.go | 5 +++-- internal/processing/manager.go | 6 ++++++ internal/processing/pause_resume.go | 2 ++ internal/utils/ratelimit.go | 2 +- 7 files changed, 22 insertions(+), 12 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 4d8099d1..d3c6745f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -223,6 +223,13 @@ func ensureGlobalLocalServiceAndLifecycle() error { Cancel: GlobalPool.Cancel, UpdateURL: GlobalPool.UpdateURL, PublishEvent: localService.Publish, + UpdateActiveRates: func(rate int64) { + for _, cfg := range GlobalPool.GetAll() { + if cfg.State != nil && cfg.State.Limiter != nil { + cfg.State.Limiter.SetRate(rate) + } + } + }, }) localService.SetLifecycleHooks(core.LifecycleHooks{ diff --git a/internal/engine/concurrent/ratelimit_test.go b/internal/engine/concurrent/ratelimit_test.go index 939b66ad..e970a227 100644 --- a/internal/engine/concurrent/ratelimit_test.go +++ b/internal/engine/concurrent/ratelimit_test.go @@ -35,9 +35,10 @@ func TestConcurrentDownloader_GlobalRateLimit(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // Apply rate limit of 64KB/s via state.Limiter to prevent data race across test packages. + // Apply global rate limit of 64KB/s. // First 64KB is instant (burst), next 64KB takes ~1s. - state.Limiter = utils.NewTokenBucket(64 * 1024) + utils.GlobalRateLimiter.SetRate(64 * 1024) + defer utils.GlobalRateLimiter.SetRate(0) if f, err := os.Create(destPath + ".surge"); err == nil { _ = f.Close() diff --git a/internal/engine/single/downloader.go b/internal/engine/single/downloader.go index 24b3911d..3dd23103 100644 --- a/internal/engine/single/downloader.go +++ b/internal/engine/single/downloader.go @@ -188,17 +188,10 @@ func (d *SingleDownloader) Download(ctx context.Context, rawurl, destPath string start := time.Now() var written int64 - bufPtr := bufPool.Get().(*[]byte) buf := *bufPtr defer bufPool.Put(bufPtr) - if d.State != nil { - d.State.Downloaded.Store(written) - d.State.VerifiedProgress.Store(written) - // Set d.State earlier, then merge code branches - } - progressReader := newProgressReader(ctx, resp.Body, d.State, types.WorkerBatchSize, types.WorkerBatchInterval) written, err = io.CopyBuffer(outFile, progressReader, buf) if d.State != nil { diff --git a/internal/engine/single/ratelimit_test.go b/internal/engine/single/ratelimit_test.go index 547596af..b7b1a11e 100644 --- a/internal/engine/single/ratelimit_test.go +++ b/internal/engine/single/ratelimit_test.go @@ -32,8 +32,9 @@ func TestSingleDownloader_GlobalRateLimit(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - // Apply rate limit of 64KB/s via state.Limiter to prevent data race across test packages. - state.Limiter = utils.NewTokenBucket(64 * 1024) + // Apply global rate limit of 64KB/s. + utils.GlobalRateLimiter.SetRate(64 * 1024) + defer utils.GlobalRateLimiter.SetRate(0) if f, err := os.Create(destPath + ".surge"); err == nil { _ = f.Close() diff --git a/internal/processing/manager.go b/internal/processing/manager.go index bba89f77..c46fded2 100644 --- a/internal/processing/manager.go +++ b/internal/processing/manager.go @@ -171,6 +171,12 @@ func (m *LifecycleManager) ApplySettings(s *config.Settings) { // Dynamically update the global rate limit utils.GlobalRateLimiter.SetRate(s.Network.GlobalRateLimit * 1024) + + // Update active per-task limiters if the hook is wired + hooks := m.getEngineHooks() + if hooks.UpdateActiveRates != nil { + hooks.UpdateActiveRates(s.Network.PerTaskRateLimit * 1024) + } } // SaveSettings persists and applies a new routing snapshot for future enqueue calls. diff --git a/internal/processing/pause_resume.go b/internal/processing/pause_resume.go index b25076a1..a66f1ee0 100644 --- a/internal/processing/pause_resume.go +++ b/internal/processing/pause_resume.go @@ -30,6 +30,8 @@ type EngineHooks struct { UpdateURL func(id, newURL string) error // PublishEvent sends an event into the service's broadcast channel. PublishEvent func(msg interface{}) error + // UpdateActiveRates updates the rate limit (bytes/sec) on all active downloads. + UpdateActiveRates func(rateBytes int64) } // Pause pauses an active download. diff --git a/internal/utils/ratelimit.go b/internal/utils/ratelimit.go index b63b0919..cea54977 100644 --- a/internal/utils/ratelimit.go +++ b/internal/utils/ratelimit.go @@ -101,7 +101,7 @@ func (tb *TokenBucket) WaitN(ctx context.Context, n int) error { // rate.Limiter may return a non-wrapped error if the deadline is too soon. // Map it to standard context errors for compatibility. - if err.Error() != "" && (err == context.DeadlineExceeded || err == context.Canceled) { + if err == context.DeadlineExceeded || err == context.Canceled { return err } if ctxErr := ctx.Err(); ctxErr != nil { From a8396f897ee09dd5512f2941df6226a949caebab Mon Sep 17 00:00:00 2001 From: SuperCoolPencil Date: Sun, 12 Apr 2026 10:43:40 +0530 Subject: [PATCH 09/10] fix: add nil check for settings and use errors.Is for rate limiter context error handling --- internal/processing/manager.go | 4 +++- internal/utils/ratelimit.go | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/processing/manager.go b/internal/processing/manager.go index c46fded2..8efe36dd 100644 --- a/internal/processing/manager.go +++ b/internal/processing/manager.go @@ -86,7 +86,9 @@ func NewLifecycleManager(addFunc AddDownloadFunc, addWithIDFunc AddDownloadWithI settings = config.DefaultSettings() } // Seed global rate limiter from persisted config - utils.GlobalRateLimiter.SetRate(settings.Network.GlobalRateLimit * 1024) + if settings != nil { + utils.GlobalRateLimiter.SetRate(settings.Network.GlobalRateLimit * 1024) + } var activeCheck IsNameActiveFunc if len(isNameActive) > 0 { diff --git a/internal/utils/ratelimit.go b/internal/utils/ratelimit.go index cea54977..352c4483 100644 --- a/internal/utils/ratelimit.go +++ b/internal/utils/ratelimit.go @@ -2,6 +2,7 @@ package utils import ( "context" + "errors" "sync" "golang.org/x/time/rate" @@ -101,7 +102,7 @@ func (tb *TokenBucket) WaitN(ctx context.Context, n int) error { // rate.Limiter may return a non-wrapped error if the deadline is too soon. // Map it to standard context errors for compatibility. - if err == context.DeadlineExceeded || err == context.Canceled { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { return err } if ctxErr := ctx.Err(); ctxErr != nil { From 827082bb675ae84305a513b248d67d228e931e68 Mon Sep 17 00:00:00 2001 From: SuperCoolPencil Date: Sun, 12 Apr 2026 11:01:59 +0530 Subject: [PATCH 10/10] feat: implement global and per-task download rate limiting using token buckets --- internal/utils/ratelimit.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/utils/ratelimit.go b/internal/utils/ratelimit.go index 352c4483..1348cb5f 100644 --- a/internal/utils/ratelimit.go +++ b/internal/utils/ratelimit.go @@ -71,18 +71,18 @@ func (tb *TokenBucket) WaitN(ctx context.Context, n int) error { return nil } - tb.mu.RLock() - enabled := tb.enabled - l := tb.limiter - tb.mu.RUnlock() - - if !enabled || l == nil { - return nil - } - // Wait handles context cancellation naturally // Loop over burst chunks to avoid WaitN error if n > burst for n > 0 { + tb.mu.RLock() + enabled := tb.enabled + l := tb.limiter + tb.mu.RUnlock() + + if !enabled || l == nil { + return nil + } + burst := l.Burst() if burst <= 0 { return nil