Skip to content

Commit 8e697c0

Browse files
committed
feat: implement distributed scheduler using Redis sorted sets
- Add scheduler.go with Redis-based distributed scheduling - Implement ScheduleJob, GetNextJobs, and RemoveJob methods - Add comprehensive unit tests in scheduler_test.go - Update queue.go to integrate new scheduler - Maintain backward compatibility with cron patterns - Update dependencies in go.mod and go.sum
1 parent a054ff0 commit 8e697c0

6 files changed

Lines changed: 471 additions & 58 deletions

File tree

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ toolchain go1.24.1
77
require (
88
github.com/go-redsync/redsync/v4 v4.15.0
99
github.com/redis/go-redis/v9 v9.17.2
10-
github.com/robfig/cron/v3 v3.0.1
1110
github.com/stretchr/testify v1.9.0
1211
github.com/tinh-tinh/tinhtinh/v2 v2.5.0
1312
golang.org/x/crypto v0.45.0

go.sum

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,41 +14,33 @@ github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOr
1414
github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
1515
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
1616
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
17-
github.com/go-redsync/redsync/v4 v4.14.0 h1:zyxzFJsmQHIPBl8iBT7KFKohWsjsghgGLiP8TnFMLNc=
18-
github.com/go-redsync/redsync/v4 v4.14.0/go.mod h1:twMlVd19upZ/juvJyJGlQOSQxor1oeHtjs62l4pRFzo=
1917
github.com/go-redsync/redsync/v4 v4.15.0 h1:KH/XymuxSV7vyKs6z1Cxxj+N+N18JlPxgXeP6x4JY54=
2018
github.com/go-redsync/redsync/v4 v4.15.0/go.mod h1:qNp+lLs3vkfZbtA/aM/OjlZHfEr5YTAYhRktFPKHC7s=
21-
github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s=
22-
github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw=
19+
github.com/gomodule/redigo v1.9.3 h1:dNPSXeXv6HCq2jdyWfjgmhBdqnR6PRO3m/G05nvpPC8=
20+
github.com/gomodule/redigo v1.9.3/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw=
2321
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
2422
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
2523
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
2624
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
2725
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
2826
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2927
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
30-
github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYSDvQ=
31-
github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
3228
github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI=
3329
github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
34-
github.com/redis/rueidis v1.0.64 h1:XqgbueDuNV3qFdVdQwAHJl1uNt90zUuAJuzqjH4cw6Y=
35-
github.com/redis/rueidis v1.0.64/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA=
36-
github.com/redis/rueidis/rueidiscompat v1.0.64 h1:M8JbLP4LyHQhBLBRsUQIzui8/LyTtdESNIMVveqm4RY=
37-
github.com/redis/rueidis/rueidiscompat v1.0.64/go.mod h1:8pJVPhEjpw0izZFSxYwDziUiEYEkEklTSw/nZzga61M=
38-
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
39-
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
30+
github.com/redis/rueidis v1.0.69 h1:WlUefRhuDekji5LsD387ys3UCJtSFeBVf0e5yI0B8b4=
31+
github.com/redis/rueidis v1.0.69/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA=
32+
github.com/redis/rueidis/rueidiscompat v1.0.69 h1:IWVYY9lXdjNO3do2VpJT7aDFi8zbCUuQxZB6E2Grahs=
33+
github.com/redis/rueidis/rueidiscompat v1.0.69/go.mod h1:iC4Y8DoN0Uth0Uezg9e2trvNRC7QAgGeuP2OPLb5ccI=
4034
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
4135
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
4236
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
4337
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
44-
github.com/tinh-tinh/tinhtinh/v2 v2.3.4 h1:vxhaoPnp3pGNcdXKDG7nVai+V+lYoJHWtm7pzTNapJY=
45-
github.com/tinh-tinh/tinhtinh/v2 v2.3.4/go.mod h1:4nppE7KAIswZKutI9ElMqAD9kyash7aea0Ewowsqj5g=
4638
github.com/tinh-tinh/tinhtinh/v2 v2.5.0 h1:SqCanZJKKgbVsDwoaPe136fZGYoXSKZ6fLciGO0KsoY=
4739
github.com/tinh-tinh/tinhtinh/v2 v2.5.0/go.mod h1:4nppE7KAIswZKutI9ElMqAD9kyash7aea0Ewowsqj5g=
4840
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
4941
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
50-
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
51-
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
42+
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
43+
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
5244
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
5345
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
5446
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=

job.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,10 @@ func (job *Job) HandlerError(reasonError string) {
109109
job.queue.formatLog(LoggerWarn, "Add job %s for retry (%d remains) ", job.Id, job.RetryFailures)
110110
}
111111

112-
// IsReady returns true if the job is ready to be processed. If the job uses a
113-
// scheduler, it will always be ready. Otherwise, the job is ready if it is
114-
// waiting or active.
112+
// IsReady returns true if the job is ready to be processed.
113+
// Jobs are ready if they are waiting or active.
115114
func (job *Job) IsReady() bool {
116-
if job.queue.scheduler == nil {
117-
return job.Status == WaitStatus || job.Status == ActiveStatus
118-
}
119-
return true
115+
return job.Status == WaitStatus || job.Status == ActiveStatus
120116
}
121117

122118
// IsFinished returns true if the job has finished, either successfully or with an error.

queue.go

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,26 @@ import (
1313
"github.com/go-redsync/redsync/v4"
1414
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
1515
"github.com/redis/go-redis/v9"
16-
"github.com/robfig/cron/v3"
1716
"github.com/tinh-tinh/tinhtinh/v2/common"
1817
"github.com/tinh-tinh/tinhtinh/v2/common/logger"
1918
)
2019

2120
type JobFnc func(job *Job)
2221

2322
type Queue struct {
24-
Name string
25-
client *redis.Client
26-
mutex *redsync.Mutex
27-
jobFnc JobFnc
28-
jobs []Job
29-
ctx context.Context
30-
scheduler *cron.Cron
31-
cronPattern string
32-
running bool
33-
config Options
34-
Logger Logger
35-
cachedKey string // Cache the computed key to avoid repeated string operations
23+
Name string
24+
client *redis.Client
25+
mutex *redsync.Mutex
26+
jobFnc JobFnc
27+
jobs []Job
28+
ctx context.Context
29+
schedulerTicker *time.Ticker
30+
schedulerDone chan struct{}
31+
schedulerKey string
32+
running bool
33+
config Options
34+
Logger Logger
35+
cachedKey string // Cache the computed key to avoid repeated string operations
3636
}
3737

3838
type RateLimiter struct {
@@ -45,6 +45,7 @@ type Options struct {
4545
RetryFailures int
4646
Limiter *RateLimiter
4747
Pattern string
48+
ScheduleInterval time.Duration // Polling interval for distributed scheduler (default: 5s)
4849
Logger Logger
4950
DisableLog bool
5051
RemoveOnComplete bool
@@ -84,15 +85,9 @@ func New(name string, opt *Options) *Queue {
8485
}
8586

8687
if opt.Logger == nil {
87-
queue.config.Logger = logger.Create(logger.Options{})
88-
}
89-
90-
if opt.Pattern != "" {
91-
queue.scheduler = cron.New()
92-
queue.cronPattern = opt.Pattern
93-
}
94-
if opt.Timeout == 0 {
95-
queue.config.Timeout = 1 * time.Minute
88+
queue.config.Logger = logger.Create(logger.Options{
89+
Console: !opt.DisableLog,
90+
})
9691
}
9792

9893
// Pre-compute and cache the key
@@ -102,6 +97,24 @@ func New(name string, opt *Options) *Queue {
10297
queue.cachedKey = strings.ToLower(name)
10398
}
10499

100+
// Initialize scheduler key for distributed scheduling
101+
queue.schedulerKey = queue.cachedKey + ":scheduled"
102+
103+
// Start distributed scheduler if Pattern is configured
104+
if opt.Pattern != "" {
105+
// TODO: Parse cron pattern to determine interval
106+
// For now, use default interval from ScheduleInterval option
107+
interval := opt.ScheduleInterval
108+
if interval == 0 {
109+
interval = 5 * time.Second
110+
}
111+
queue.startScheduler(interval)
112+
}
113+
114+
if opt.Timeout == 0 {
115+
queue.config.Timeout = 1 * time.Minute
116+
}
117+
105118
return queue
106119
}
107120

@@ -185,18 +198,10 @@ func mergeSortedJobs(jobs1, jobs2 []Job) []Job {
185198
return result
186199
}
187200

188-
// Process sets the callback for the queue to process jobs. If the queue has a
189-
// scheduler, it will be started with the given cron pattern. Otherwise, the
190-
// callback is simply stored.
201+
// Process sets the callback for the queue to process jobs.
202+
// The distributed scheduler (if configured) is already running from New().
191203
func (q *Queue) Process(jobFnc JobFnc) {
192204
q.jobFnc = jobFnc
193-
if q.scheduler != nil {
194-
_, err := q.scheduler.AddFunc(q.cronPattern, func() { q.Run() })
195-
if err != nil {
196-
q.formatLog(LoggerError, "failed to add job: %v", err)
197-
}
198-
q.scheduler.Start()
199-
}
200205
}
201206

202207
// Run runs all ready jobs in the queue. It locks the mutex, runs all ready jobs
@@ -416,15 +421,24 @@ func (q *Queue) IsLimit() bool {
416421

417422
// Pause stops the queue from running. When paused, the queue will not accept new
418423
// jobs and will not run any jobs in the queue. It will resume when Resume is
419-
// called.
424+
// called. The scheduler is also stopped if active.
420425
func (q *Queue) Pause() {
421426
q.running = false
427+
q.stopScheduler()
422428
}
423429

424430
// Resume resumes the queue from a paused state. When resumed, the queue will
425-
// accept new jobs and run any jobs in the queue.
431+
// accept new jobs and run any jobs in the queue. The scheduler is also restarted
432+
// if it was previously configured.
426433
func (q *Queue) Resume() {
427434
q.running = true
435+
if q.config.Pattern != "" {
436+
interval := q.config.ScheduleInterval
437+
if interval == 0 {
438+
interval = 5 * time.Second
439+
}
440+
q.startScheduler(interval)
441+
}
428442
q.Run()
429443
}
430444

scheduler.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package queue
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/redis/go-redis/v9"
8+
)
9+
10+
// startScheduler starts the background scheduler loop that checks for scheduled jobs.
11+
// It polls Redis at the specified interval to find jobs ready to run.
12+
func (q *Queue) startScheduler(interval time.Duration) {
13+
if interval == 0 {
14+
interval = 5 * time.Second // Default polling interval
15+
}
16+
17+
q.schedulerTicker = time.NewTicker(interval)
18+
q.schedulerDone = make(chan struct{})
19+
20+
go func() {
21+
for {
22+
select {
23+
case <-q.schedulerTicker.C:
24+
q.processScheduledJobs()
25+
case <-q.schedulerDone:
26+
return
27+
}
28+
}
29+
}()
30+
31+
q.formatLog(LoggerInfo, "Scheduler started with %v interval", interval)
32+
}
33+
34+
// stopScheduler stops the scheduler gracefully.
35+
func (q *Queue) stopScheduler() {
36+
if q.schedulerTicker != nil {
37+
q.schedulerTicker.Stop()
38+
close(q.schedulerDone)
39+
q.formatLog(LoggerInfo, "Scheduler stopped")
40+
}
41+
}
42+
43+
// ScheduleJob adds a job to the scheduled set with the given run time.
44+
// The job will be executed when the current time reaches or exceeds runAt.
45+
func (q *Queue) ScheduleJob(jobId string, runAt time.Time) error {
46+
score := float64(runAt.Unix())
47+
_, err := q.client.ZAdd(q.ctx, q.schedulerKey, redis.Z{
48+
Score: score,
49+
Member: jobId,
50+
}).Result()
51+
if err != nil {
52+
return fmt.Errorf("failed to schedule job: %w", err)
53+
}
54+
q.formatLog(LoggerInfo, "Scheduled job %s to run at %s", jobId, runAt.Format(time.RFC3339))
55+
return nil
56+
}
57+
58+
// GetScheduledJobs retrieves all scheduled jobs with their scheduled times.
59+
func (q *Queue) GetScheduledJobs() ([]ScheduledJobInfo, error) {
60+
// Get all jobs with scores
61+
results, err := q.client.ZRangeWithScores(q.ctx, q.schedulerKey, 0, -1).Result()
62+
if err != nil {
63+
return nil, fmt.Errorf("failed to get scheduled jobs: %w", err)
64+
}
65+
66+
scheduledJobs := make([]ScheduledJobInfo, 0, len(results))
67+
for _, z := range results {
68+
jobId, ok := z.Member.(string)
69+
if !ok {
70+
continue
71+
}
72+
scheduledJobs = append(scheduledJobs, ScheduledJobInfo{
73+
JobId: jobId,
74+
RunAt: time.Unix(int64(z.Score), 0),
75+
Timestamp: int64(z.Score),
76+
})
77+
}
78+
79+
return scheduledJobs, nil
80+
}
81+
82+
// RemoveScheduledJob removes a job from the scheduled set.
83+
func (q *Queue) RemoveScheduledJob(jobId string) error {
84+
_, err := q.client.ZRem(q.ctx, q.schedulerKey, jobId).Result()
85+
if err != nil {
86+
return fmt.Errorf("failed to remove scheduled job: %w", err)
87+
}
88+
q.formatLog(LoggerInfo, "Removed scheduled job %s", jobId)
89+
return nil
90+
}
91+
92+
// processScheduledJobs checks for jobs ready to run and moves them to the waiting list.
93+
// This method is called periodically by the scheduler loop.
94+
func (q *Queue) processScheduledJobs() {
95+
now := float64(time.Now().Unix())
96+
97+
// Find all jobs with score <= current timestamp
98+
results, err := q.client.ZRangeByScoreWithScores(q.ctx, q.schedulerKey, &redis.ZRangeBy{
99+
Min: "-inf",
100+
Max: fmt.Sprintf("%f", now),
101+
}).Result()
102+
103+
if err != nil {
104+
q.formatLog(LoggerError, "Failed to get ready scheduled jobs: %v", err)
105+
return
106+
}
107+
108+
if len(results) == 0 {
109+
return
110+
}
111+
112+
// Process each ready job
113+
for _, z := range results {
114+
jobId, ok := z.Member.(string)
115+
if !ok {
116+
continue
117+
}
118+
119+
// Atomically remove from scheduled set (only one instance will succeed)
120+
removed, err := q.client.ZRem(q.ctx, q.schedulerKey, jobId).Result()
121+
if err != nil || removed == 0 {
122+
// Another instance already processed this job
123+
continue
124+
}
125+
126+
// Add job to the queue
127+
q.AddJob(AddJobOptions{
128+
Id: jobId,
129+
Data: nil, // Scheduled jobs don't have data in this implementation
130+
})
131+
132+
q.formatLog(LoggerInfo, "Moved scheduled job %s to waiting list", jobId)
133+
}
134+
}
135+
136+
// ScheduledJobInfo contains information about a scheduled job.
137+
type ScheduledJobInfo struct {
138+
JobId string
139+
RunAt time.Time
140+
Timestamp int64
141+
}

0 commit comments

Comments
 (0)