Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions all_specs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ func setupTestOptionsWithNamespace(namespace string) (Options, error) {

func testOptionsWithNamespace(namespace string) Options {
return Options{
ServerAddr: "localhost:6379",
ProcessID: "1",
Database: 15,
PoolSize: 1,
Namespace: namespace,
ServerAddr: "localhost:6379",
ProcessID: "1",
Database: 15,
PoolSize: 1,
Namespace: namespace,
FailoverStrategy: NoFailover,
}
}

Expand Down
36 changes: 31 additions & 5 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"os"
"sync"
"time"

"github.com/digitalocean/go-workers2/storage"
Expand All @@ -16,6 +17,7 @@ type Fetcher interface {
Fetch()
Acknowledge(*Msg)
Ready() chan bool
SetActive(bool)
Messages() chan *Msg
Close()
Closed() bool
Expand All @@ -25,24 +27,30 @@ type simpleFetcher struct {
store storage.Store
processID string
queue string
active bool
lock sync.Mutex
ready chan bool
messages chan *Msg
stop chan bool
exit chan bool
closed chan bool
logger *log.Logger

messages chan *Msg
stop chan bool
exit chan bool
closed chan bool
logger *log.Logger
}

func newSimpleFetcher(queue string, opts Options) *simpleFetcher {
logger := opts.Logger
if logger == nil {
logger = log.New(os.Stdout, "go-workers2: ", log.Ldate|log.Lmicroseconds)
}
// always active if failover strategy is not active/passive failover
active := opts.FailoverStrategy != ActivePassiveFailover

return &simpleFetcher{
store: opts.store,
processID: opts.ProcessID,
queue: queue,
active: active,
ready: make(chan bool),
messages: make(chan *Msg),
stop: make(chan bool),
Expand Down Expand Up @@ -92,6 +100,12 @@ func (f *simpleFetcher) Fetch() {
}

func (f *simpleFetcher) tryFetchMessage() {
f.lock.Lock()
if !f.active {
f.lock.Unlock()
return
}
f.lock.Unlock()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not defer the unlock?

message, err := f.store.DequeueMessage(context.Background(), f.queue, f.inprogressQueue(), 1*time.Second)
if err != nil {
// If redis returns null, the queue is empty.
Expand Down Expand Up @@ -127,6 +141,12 @@ func (f *simpleFetcher) Ready() chan bool {
return f.ready
}

func (f *simpleFetcher) SetActive(active bool) {
f.lock.Lock()
defer f.lock.Unlock()
f.active = active
}

func (f *simpleFetcher) Close() {
f.stop <- true
<-f.exit
Expand All @@ -142,6 +162,12 @@ func (f *simpleFetcher) Closed() bool {
}

func (f *simpleFetcher) inprogressMessages() []string {
f.lock.Lock()
if !f.active {
f.lock.Unlock()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

return nil
}
f.lock.Unlock()
messages, err := f.store.ListMessages(context.Background(), f.inprogressQueue())
if err != nil {
f.logger.Println("ERR: ", err)
Expand Down
71 changes: 71 additions & 0 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ import (
"log"
"os"
"sync"
"time"

"github.com/go-redis/redis/v8"
"github.com/google/uuid"
)

const (
updateActiveClusterInterval = 60 * time.Second
evictInterval = -90 * time.Second
)

// Manager coordinates work, workers, and signaling needed for job processing
type Manager struct {
uuid string
Expand All @@ -19,6 +25,8 @@ type Manager struct {
lock sync.Mutex
signal chan os.Signal
running bool
stop chan bool
active bool
logger *log.Logger

beforeStartHooks []func()
Expand Down Expand Up @@ -126,6 +134,17 @@ func (m *Manager) Run() {
wg.Done()
}()

if m.opts.FailoverStrategy == ActivePassiveFailover {
wg.Add(1)
go func() {
err := m.updateActiveClusterByTimeInterval()
if err != nil {
m.logger.Println("ERR: ", m.uuid, err)
}
wg.Done()
}()
}

wg.Add(len(m.workers))
for i := range m.workers {
w := m.workers[i]
Expand Down Expand Up @@ -165,6 +184,7 @@ func (m *Manager) Stop() {
for _, h := range m.duringDrainHooks {
h()
}
m.stop <- true
m.stopSignalHandler()
}

Expand Down Expand Up @@ -224,6 +244,57 @@ func (m *Manager) GetStats() (Stats, error) {
return stats, nil
}

func (m *Manager) updateActiveClusterByTimeInterval() error {
ticker := time.NewTicker(updateActiveClusterInterval)
defer ticker.Stop()
for {
select {
case <-m.stop:
{
return nil
}
case <-ticker.C:
m.lock.Lock()
if !m.running {
m.lock.Unlock()
return nil
}
m.lock.Unlock()
err := m.UpdateActiveCluster()
if err != nil {
return err
}
}
}
}

func (m *Manager) UpdateActiveCluster() error {
ctx := context.Background()
now, err := m.opts.store.GetTime(ctx)
if err != nil {
return err
}
err = m.opts.store.EvictExpiredClusters(ctx, now.Add(evictInterval).Unix())
if err != nil {
return err
}
err = m.opts.store.AddActiveCluster(ctx, m.uuid, m.opts.ClusterPriority)
if err != nil {
return err
}
activeClusterUUID, err := m.opts.store.GetActiveClusterName(ctx)
activeManager := m.uuid == activeClusterUUID
m.lock.Lock()
if (m.active && !activeManager) || (!m.active && activeManager) {
m.active = activeManager
for _, worker := range m.workers {
worker.Active() <- activeManager
}
}
m.lock.Unlock()
return nil
}

// GetRetries returns the set of retry jobs for the manager
func (m *Manager) GetRetries(page uint64, pageSize int64, match string) (Retries, error) {
// TODO: add back pagination and filtering
Expand Down
15 changes: 15 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (
"github.com/go-redis/redis/v8"
)

type FailoverStrategy int

const (
NoFailover FailoverStrategy = iota
ActivePassiveFailover
ActiveActiveFailover
)

// Options contains the set of configuration options for a manager and/or producer
type Options struct {
ProcessID string
Expand All @@ -30,6 +38,13 @@ type Options struct {
// Optional display name used when displaying manager stats
ManagerDisplayName string

FailoverStrategy FailoverStrategy

// One or more managers can belong to a cluster
// Active/passive failover will failover by cluster
ClusterName string
ClusterPriority float64

// Log
Logger *log.Logger

Expand Down
45 changes: 45 additions & 0 deletions storage/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,48 @@ func (r *redisStore) IncrementStats(ctx context.Context, metric string) error {
func (r *redisStore) getQueueName(queue string) string {
return r.namespace + "queue:" + queue
}

func (r *redisStore) GetTime(ctx context.Context) (time.Time, error) {
return r.client.Time(ctx).Result()
}

func (r *redisStore) AddActiveCluster(ctx context.Context, managerUUID string, managerPriority float64) error {
now, err := r.GetTime(ctx)
if err != nil {
return err
}
_, err = r.client.ZAdd(ctx, r.namespace+"managers-active-ts", &redis.Z{Member: managerUUID, Score: float64(now.Unix())}).Result()
if err != nil {
return err
}
_, err = r.client.ZAdd(ctx, r.namespace+"managers-active", &redis.Z{Member: managerUUID, Score: managerPriority}).Result()
if err != nil {
return err
}
return nil
}

// EvictExpiredManagers evicts managers that have a last active <= expireTS
func (r *redisStore) EvictExpiredClusters(ctx context.Context, expireTS int64) error {
evictManagerUUIDs, err := r.client.ZRangeByScore(ctx, r.namespace+"managers-active-ts", &redis.ZRangeBy{Min: "-inf", Max: fmt.Sprintf("(%d", expireTS)}).Result()
if err != nil {
return err
}
_, err = r.client.ZRem(ctx, r.namespace+"managers-active", evictManagerUUIDs).Result()
if err != nil {
return err
}
r.client.ZRemRangeByScore(ctx, r.namespace+"managers-active-ts", "-inf", fmt.Sprintf("(%d", expireTS))
return nil
}

func (r *redisStore) GetActiveClusterName(ctx context.Context) (string, error) {
activeManagerUUIDs, err := r.client.ZRangeByScore(ctx, r.namespace+"managers-active-ts", &redis.ZRangeBy{Min: "-inf", Max: "+inf", Offset: 0, Count: 1}).Result()
if err != nil {
return "", err
}
if len(activeManagerUUIDs) == 0 {
return "", nil
}
return activeManagerUUIDs[1], nil
}
7 changes: 7 additions & 0 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type Retries struct {

// Store is the interface for storing and retrieving data
type Store interface {
// Worker manager cluster operations
AddActiveCluster(ctx context.Context, managerUUID string, managerPriority float64) error
EvictExpiredClusters(ctx context.Context, expireTS int64) error
GetActiveClusterName(ctx context.Context) (string, error)

// General queue operations
CreateQueue(ctx context.Context, queue string) error
Expand All @@ -59,4 +63,7 @@ type Store interface {

// Retries
GetAllRetries(ctx context.Context) (*Retries, error)

// Misc
GetTime(ctx context.Context) (time.Time, error)
}
8 changes: 8 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type worker struct {
runnersLock sync.Mutex
stop chan bool
running bool
active chan bool
logger *log.Logger
}

Expand All @@ -25,6 +26,7 @@ func newWorker(logger *log.Logger, queue string, concurrency int, handler JobFun
handler: handler,
concurrency: concurrency,
stop: make(chan bool),
active: make(chan bool),
logger: logger,
}
return w
Expand Down Expand Up @@ -84,12 +86,18 @@ func (w *worker) start(fetcher Fetcher) {
}
w.runnersLock.Unlock()
}
case active := <-w.Active():
fetcher.SetActive(active)
case <-exit:
return
}
}
}

func (w *worker) Active() chan bool {
return w.active
}

func (w *worker) quit() {
w.runnersLock.Lock()
defer w.runnersLock.Unlock()
Expand Down
16 changes: 9 additions & 7 deletions worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ import (
type dummyFetcher struct {
queue func() string
fetch func()
setActive func(bool)
acknowledge func(*Msg)
ready func() chan bool
messages func() chan *Msg
close func()
closed func() bool
}

func (d dummyFetcher) Queue() string { return d.queue() }
func (d dummyFetcher) Fetch() { d.fetch() }
func (d dummyFetcher) Acknowledge(m *Msg) { d.acknowledge(m) }
func (d dummyFetcher) Ready() chan bool { return d.ready() }
func (d dummyFetcher) Messages() chan *Msg { return d.messages() }
func (d dummyFetcher) Close() { d.close() }
func (d dummyFetcher) Closed() bool { return d.closed() }
func (d dummyFetcher) Queue() string { return d.queue() }
func (d dummyFetcher) Fetch() { d.fetch() }
func (d dummyFetcher) Acknowledge(m *Msg) { d.acknowledge(m) }
func (d dummyFetcher) Ready() chan bool { return d.ready() }
func (d dummyFetcher) SetActive(active bool) { d.setActive(active) }
func (d dummyFetcher) Messages() chan *Msg { return d.messages() }
func (d dummyFetcher) Close() { d.close() }
func (d dummyFetcher) Closed() bool { return d.closed() }

func TestNewWorker(t *testing.T) {
testLogger := log.New(os.Stdout, "test-go-workers2: ", log.Ldate|log.Lmicroseconds)
Expand Down