From 19c9687f6e28aa3972823556587933c340b92096 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 8 Jan 2026 15:19:53 +0000 Subject: [PATCH 1/3] Initial plan From 4d188d3f794c68106146a655059ef94c89de77ff Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 8 Jan 2026 15:26:51 +0000 Subject: [PATCH 2/3] Optimize queue performance: fix defer in loops, efficient sorting, and key caching Co-authored-by: Ren0503 <54244073+Ren0503@users.noreply.github.com> --- job.go | 8 ++----- queue.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/job.go b/job.go index 435edc9..42aa051 100644 --- a/job.go +++ b/job.go @@ -3,7 +3,6 @@ package queue import ( "context" "fmt" - "strings" "time" ) @@ -127,9 +126,6 @@ func (job *Job) IsFinished() bool { } func (job *Job) getKey() string { - if job.queue.config.Prefix != "" { - prefix := job.queue.config.Prefix - return fmt.Sprintf("%s:%s", strings.ToLower(prefix+job.queue.Name), job.Id) - } - return fmt.Sprintf("%s:%s", strings.ToLower(job.queue.Name), job.Id) + // Use cached queue key to avoid repeated string operations + return job.queue.cachedKey + ":" + job.Id } diff --git a/queue.go b/queue.go index e744192..b2f0d69 100644 --- a/queue.go +++ b/queue.go @@ -32,6 +32,7 @@ type Queue struct { running bool config Options Logger Logger + cachedKey string // Cache the computed key to avoid repeated string operations } type RateLimiter struct { @@ -95,6 +96,13 @@ func New(name string, opt *Options) *Queue { queue.config.Timeout = 1 * time.Minute } + // Pre-compute and cache the key + if opt.Prefix != "" { + queue.cachedKey = strings.ToLower(opt.Prefix + name) + } else { + queue.cachedKey = strings.ToLower(name) + } + return queue } @@ -110,8 +118,13 @@ func (q *Queue) AddJob(opt AddJobOptions) { q.formatLog(LoggerInfo, "Add job %s to waiting", opt.Id) job = q.newJob(opt) } - q.jobs = append(q.jobs, *job) - sort.SliceStable(q.jobs, func(i, j int) bool { return q.jobs[i].Priority > q.jobs[j].Priority }) + // Use binary search to find insertion point for O(log n) instead of O(n log n) sort + insertIdx := sort.Search(len(q.jobs), func(i int) bool { + return q.jobs[i].Priority < job.Priority + }) + q.jobs = append(q.jobs, Job{}) + copy(q.jobs[insertIdx+1:], q.jobs[insertIdx:]) + q.jobs[insertIdx] = *job q.Run() } @@ -125,7 +138,15 @@ type AddJobOptions struct { // rate limited, the jobs are delayed. Otherwise, the jobs are added to the // waiting list and the queue is run. func (q *Queue) BulkAddJob(options []AddJobOptions) { + if len(options) == 0 { + return + } + + // Sort input options by priority once sort.SliceStable(options, func(i, j int) bool { return options[i].Priority > options[j].Priority }) + + // Pre-allocate space for new jobs + newJobs := make([]Job, 0, len(options)) for _, option := range options { var job *Job if q.IsLimit() { @@ -135,12 +156,34 @@ func (q *Queue) BulkAddJob(options []AddJobOptions) { q.formatLog(LoggerInfo, "Add job %s to waiting", option.Id) job = q.newJob(option) } - q.jobs = append(q.jobs, *job) + newJobs = append(newJobs, *job) } - sort.SliceStable(q.jobs, func(i, j int) bool { return q.jobs[i].Priority > q.jobs[j].Priority }) + + // Merge sorted slices efficiently + q.jobs = mergeSortedJobs(q.jobs, newJobs) q.Run() } +// mergeSortedJobs merges two sorted job slices by priority (descending) +func mergeSortedJobs(jobs1, jobs2 []Job) []Job { + result := make([]Job, 0, len(jobs1)+len(jobs2)) + i, j := 0, 0 + + for i < len(jobs1) && j < len(jobs2) { + if jobs1[i].Priority >= jobs2[j].Priority { + result = append(result, jobs1[i]) + i++ + } else { + result = append(result, jobs2[j]) + j++ + } + } + + result = append(result, jobs1[i:]...) + result = append(result, jobs2[j:]...) + return result +} + // Process sets the callback for the queue to process jobs. If the queue has a // scheduler, it will be started with the given cron pattern. Otherwise, the // callback is simply stored. @@ -187,7 +230,6 @@ func (q *Queue) Run() { numJobs := execJobs[:min] ctx, cancel := context.WithTimeout(context.Background(), q.config.Timeout) - defer cancel() var wg sync.WaitGroup done := make(chan struct{}) @@ -218,6 +260,7 @@ func (q *Queue) Run() { case <-ctx.Done(): q.MarkJobFailedTimeout(numJobs) } + cancel() // Handle remove job execJobs = execJobs[min:] @@ -256,12 +299,12 @@ func (q *Queue) Retry() { numJobs := execJobs[:min] ctx, cancel := context.WithTimeout(context.Background(), q.config.Timeout) - defer cancel() var wg sync.WaitGroup done := make(chan struct{}) var finishedJob []string + var finishedMu sync.Mutex for i := range numJobs { job := numJobs[i] wg.Add(1) @@ -269,7 +312,9 @@ func (q *Queue) Retry() { defer wg.Done() q.jobFnc(job) if job.IsFinished() { + finishedMu.Lock() finishedJob = append(finishedJob, job.Id) + finishedMu.Unlock() } }(job) } @@ -285,6 +330,7 @@ func (q *Queue) Retry() { case <-ctx.Done(): q.MarkJobFailedTimeout(numJobs) } + cancel() if len(finishedJob) > 0 { for _, id := range finishedJob { @@ -423,9 +469,5 @@ func (q *Queue) log(logType LoggerType, format string, v ...any) { } func (q *Queue) getKey() string { - if q.config.Prefix != "" { - prefix := q.config.Prefix - return strings.ToLower(prefix + q.Name) - } - return strings.ToLower(q.Name) + return q.cachedKey } From 4851eb3cb6ccf8df6258a1854cc28547ff0cc71f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 8 Jan 2026 15:28:13 +0000 Subject: [PATCH 3/3] Address code review feedback: improve insertion and maintain Run() call consistency Co-authored-by: Ren0503 <54244073+Ren0503@users.noreply.github.com> --- queue.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/queue.go b/queue.go index b2f0d69..af82696 100644 --- a/queue.go +++ b/queue.go @@ -122,9 +122,8 @@ func (q *Queue) AddJob(opt AddJobOptions) { insertIdx := sort.Search(len(q.jobs), func(i int) bool { return q.jobs[i].Priority < job.Priority }) - q.jobs = append(q.jobs, Job{}) - copy(q.jobs[insertIdx+1:], q.jobs[insertIdx:]) - q.jobs[insertIdx] = *job + // Efficient insertion: grow slice and insert at correct position + q.jobs = append(q.jobs[:insertIdx], append([]Job{*job}, q.jobs[insertIdx:]...)...) q.Run() } @@ -139,6 +138,7 @@ type AddJobOptions struct { // waiting list and the queue is run. func (q *Queue) BulkAddJob(options []AddJobOptions) { if len(options) == 0 { + q.Run() return }