Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ jobs:
- 6379:6379

steps:
- name: Set up Go 1.13
uses: actions/setup-go@v2
- name: Set up Go
uses: actions/setup-go@v6
with:
go-version: 1.13
go-version: 1.26
id: go

- name: Checkout code
uses: actions/checkout@v2
uses: actions/checkout@v6

- name: Test
run: make test
28 changes: 15 additions & 13 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
module github.com/digitalocean/go-workers2

go 1.21

require (
github.com/bitly/go-simplejson v0.5.0
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/go-redis/redis/v8 v8.4.4
github.com/golang/protobuf v1.4.3 // indirect
github.com/google/uuid v1.1.4
github.com/kr/text v0.2.0 // indirect
github.com/nxadm/tail v1.4.6 // indirect
github.com/redis/go-redis/v9 v9.18.0
github.com/spf13/cobra v1.1.1
github.com/stretchr/testify v1.6.1
golang.org/x/net v0.0.0-20201224014010-6772e930b67b // indirect
golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 // indirect
golang.org/x/text v0.3.4 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/protobuf v1.25.0 // indirect
)

require (
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/atomic v1.11.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210105161348-2e78108cf5f8 // indirect
)

go 1.13
97 changes: 14 additions & 83 deletions go.sum

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"time"

"github.com/digitalocean/go-workers2/storage"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)

// Manager coordinates work, workers, and signaling needed for job processing
Expand Down Expand Up @@ -61,6 +61,7 @@ func NewManager(options Options) (*Manager, error) {
}

// GetRedisClient returns the Redis client used by the manager
// Deprecated: the Redis client is an internal implementation and access will be removed
func (m *Manager) GetRedisClient() *redis.Client {
return m.opts.client
}
Expand Down
19 changes: 9 additions & 10 deletions manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/digitalocean/go-workers2/storage"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -38,10 +38,10 @@ func TestNewManagerWithRedisClient(t *testing.T) {
}

client := redis.NewClient(&redis.Options{
IdleTimeout: 1,
Password: "ab",
DB: 2,
TLSConfig: &tls.Config{ServerName: "test_tls2"},
ConnMaxIdleTime: 1,
Password: "ab",
DB: 2,
TLSConfig: &tls.Config{ServerName: "test_tls2"},
})

mgr, err := NewManagerWithRedisClient(opts, client)
Expand All @@ -62,10 +62,10 @@ func TestNewManagerWithRedisClientNoProcessID(t *testing.T) {
}

client := redis.NewClient(&redis.Options{
IdleTimeout: 1,
Password: "ab",
DB: 2,
TLSConfig: &tls.Config{ServerName: "test_tls2"},
ConnMaxIdleTime: 1,
Password: "ab",
DB: 2,
TLSConfig: &tls.Config{ServerName: "test_tls2"},
})

mgr, err := NewManagerWithRedisClient(opts, client)
Expand Down Expand Up @@ -490,7 +490,6 @@ type testPrioritizedActiveManagerConfig struct {
waitGroup sync.WaitGroup
assertHeartbeat chan bool
assertedHeartbeat bool
assertedActivate bool
}

func TestManager_Run_PrioritizedActiveManager(t *testing.T) {
Expand Down
28 changes: 14 additions & 14 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"time"

"github.com/digitalocean/go-workers2/storage"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
)

const (
Expand Down Expand Up @@ -80,26 +80,26 @@ func processOptions(options Options) (Options, error) {

if options.ServerAddr != "" {
options.client = redis.NewClient(&redis.Options{
IdleTimeout: redisIdleTimeout,
Password: options.Password,
DB: options.Database,
PoolSize: options.PoolSize,
Addr: options.ServerAddr,
TLSConfig: options.RedisTLSConfig,
ConnMaxIdleTime: redisIdleTimeout,
Password: options.Password,
DB: options.Database,
PoolSize: options.PoolSize,
Addr: options.ServerAddr,
TLSConfig: options.RedisTLSConfig,
})
} else if options.SentinelAddrs != "" {
if options.RedisMasterName == "" {
return Options{}, errors.New("Sentinel configuration requires a master name")
}

options.client = redis.NewFailoverClient(&redis.FailoverOptions{
IdleTimeout: redisIdleTimeout,
Password: options.Password,
DB: options.Database,
PoolSize: options.PoolSize,
SentinelAddrs: strings.Split(options.SentinelAddrs, ","),
MasterName: options.RedisMasterName,
TLSConfig: options.RedisTLSConfig,
ConnMaxIdleTime: redisIdleTimeout,
Password: options.Password,
DB: options.Database,
PoolSize: options.PoolSize,
SentinelAddrs: strings.Split(options.SentinelAddrs, ","),
MasterName: options.RedisMasterName,
TLSConfig: options.RedisTLSConfig,
})
} else {
return Options{}, errors.New("Options requires either the Server or Sentinels option")
Expand Down
2 changes: 1 addition & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"io"
"time"

"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
)

const (
Expand Down
18 changes: 9 additions & 9 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"testing"

"github.com/digitalocean/go-workers2/storage"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -180,10 +180,10 @@ func TestNewProducerWithRedisClient(t *testing.T) {
}

client := redis.NewClient(&redis.Options{
IdleTimeout: 1,
Password: "ab",
DB: 2,
TLSConfig: &tls.Config{ServerName: "test_tls3"},
ConnMaxIdleTime: 1,
Password: "ab",
DB: 2,
TLSConfig: &tls.Config{ServerName: "test_tls3"},
})

producer, err := NewProducerWithRedisClient(opts, client)
Expand All @@ -203,10 +203,10 @@ func TestNewProducerWithRedisClientNoProcessID(t *testing.T) {
}

client := redis.NewClient(&redis.Options{
IdleTimeout: 1,
Password: "ab",
DB: 2,
TLSConfig: &tls.Config{ServerName: "test_tls2"},
ConnMaxIdleTime: 1,
Password: "ab",
DB: 2,
TLSConfig: &tls.Config{ServerName: "test_tls2"},
})

mgr, err := NewProducerWithRedisClient(opts, client)
Expand Down
8 changes: 4 additions & 4 deletions scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"

"github.com/digitalocean/go-workers2/storage"
"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

Expand All @@ -25,9 +25,9 @@ func TestScheduled(t *testing.T) {
message2, _ := NewMsg("{\"queue\":\"myqueue\",\"foo\":\"bar2\"}")
message3, _ := NewMsg("{\"queue\":\"default\",\"foo\":\"bar3\"}")

rc.ZAdd(ctx, retryQueue(opts.Namespace), &redis.Z{Score: now - 60.0, Member: message1.ToJson()}).Result()
rc.ZAdd(ctx, retryQueue(opts.Namespace), &redis.Z{Score: now - 10.0, Member: message2.ToJson()}).Result()
rc.ZAdd(ctx, retryQueue(opts.Namespace), &redis.Z{Score: now + 60.0, Member: message3.ToJson()}).Result()
rc.ZAdd(ctx, retryQueue(opts.Namespace), redis.Z{Score: now - 60.0, Member: message1.ToJson()}).Result()
rc.ZAdd(ctx, retryQueue(opts.Namespace), redis.Z{Score: now - 10.0, Member: message2.ToJson()}).Result()
rc.ZAdd(ctx, retryQueue(opts.Namespace), redis.Z{Score: now + 60.0, Member: message3.ToJson()}).Result()

scheduled.poll()

Expand Down
12 changes: 4 additions & 8 deletions storage/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strconv"
"time"

"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
)

type redisStore struct {
Expand Down Expand Up @@ -168,10 +168,6 @@ func (r *redisStore) SendHeartbeat(ctx context.Context, heartbeat *Heartbeat) er
return nil
}

func (r *redisStore) getTaskRunnerID(pid int, tid string) string {
return fmt.Sprintf("%d-%s", pid, tid)
}

func (r *redisStore) RequeueMessagesFromInProgressQueue(ctx context.Context, inprogressQueue, queue string) ([]string, error) {
var requeuedMsgs []string
for {
Expand Down Expand Up @@ -208,7 +204,7 @@ func (r *redisStore) RemoveHeartbeat(ctx context.Context, heartbeatID string) er
}

func (r *redisStore) EnqueueMessage(ctx context.Context, queue string, priority float64, message string) error {
_, err := r.client.ZAdd(ctx, r.getQueueName(queue), &redis.Z{
_, err := r.client.ZAdd(ctx, r.getQueueName(queue), redis.Z{
Score: priority,
Member: message,
}).Result()
Expand All @@ -217,7 +213,7 @@ func (r *redisStore) EnqueueMessage(ctx context.Context, queue string, priority
}

func (r *redisStore) EnqueueScheduledMessage(ctx context.Context, priority float64, message string) error {
_, err := r.client.ZAdd(ctx, r.namespace+ScheduledJobsKey, &redis.Z{
_, err := r.client.ZAdd(ctx, r.namespace+ScheduledJobsKey, redis.Z{
Score: priority,
Member: message,
}).Result()
Expand Down Expand Up @@ -256,7 +252,7 @@ func (r *redisStore) DequeueScheduledMessage(ctx context.Context, priority float
}

func (r *redisStore) EnqueueRetriedMessage(ctx context.Context, priority float64, message string) error {
_, err := r.client.ZAdd(ctx, r.namespace+RetryKey, &redis.Z{
_, err := r.client.ZAdd(ctx, r.namespace+RetryKey, redis.Z{
Score: priority,
Member: message,
}).Result()
Expand Down
Loading