diff --git a/all_specs_test.go b/all_specs_test.go index 2d01be8..4b0c0ef 100644 --- a/all_specs_test.go +++ b/all_specs_test.go @@ -16,11 +16,12 @@ func setupTestOptionsWithNamespace(namespace string) (Options, error) { func testOptionsWithNamespace(namespace string) Options { return Options{ - ServerAddr: "localhost:6379", - ProcessID: "1", - Database: 15, - PoolSize: 1, - Namespace: namespace, + ServerAddr: "localhost:6379", + ProcessID: "1", + Database: 15, + PoolSize: 1, + Namespace: namespace, + FailoverStrategy: NoFailover, } } diff --git a/fetcher.go b/fetcher.go index 6ba92ff..0679d45 100644 --- a/fetcher.go +++ b/fetcher.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "os" + "sync" "time" "github.com/digitalocean/go-workers2/storage" @@ -16,6 +17,7 @@ type Fetcher interface { Fetch() Acknowledge(*Msg) Ready() chan bool + SetActive(bool) Messages() chan *Msg Close() Closed() bool @@ -25,12 +27,15 @@ type simpleFetcher struct { store storage.Store processID string queue string + active bool + lock sync.Mutex ready chan bool - messages chan *Msg - stop chan bool - exit chan bool - closed chan bool - logger *log.Logger + + messages chan *Msg + stop chan bool + exit chan bool + closed chan bool + logger *log.Logger } func newSimpleFetcher(queue string, opts Options) *simpleFetcher { @@ -38,11 +43,14 @@ func newSimpleFetcher(queue string, opts Options) *simpleFetcher { if logger == nil { logger = log.New(os.Stdout, "go-workers2: ", log.Ldate|log.Lmicroseconds) } + // always active if failover strategy is not active/passive failover + active := opts.FailoverStrategy != ActivePassiveFailover return &simpleFetcher{ store: opts.store, processID: opts.ProcessID, queue: queue, + active: active, ready: make(chan bool), messages: make(chan *Msg), stop: make(chan bool), @@ -92,6 +100,12 @@ func (f *simpleFetcher) Fetch() { } func (f *simpleFetcher) tryFetchMessage() { + f.lock.Lock() + if !f.active { + f.lock.Unlock() + return + } + f.lock.Unlock() message, err := f.store.DequeueMessage(context.Background(), f.queue, f.inprogressQueue(), 1*time.Second) if err != nil { // If redis returns null, the queue is empty. @@ -127,6 +141,12 @@ func (f *simpleFetcher) Ready() chan bool { return f.ready } +func (f *simpleFetcher) SetActive(active bool) { + f.lock.Lock() + defer f.lock.Unlock() + f.active = active +} + func (f *simpleFetcher) Close() { f.stop <- true <-f.exit @@ -142,6 +162,12 @@ func (f *simpleFetcher) Closed() bool { } func (f *simpleFetcher) inprogressMessages() []string { + f.lock.Lock() + if !f.active { + f.lock.Unlock() + return nil + } + f.lock.Unlock() messages, err := f.store.ListMessages(context.Background(), f.inprogressQueue()) if err != nil { f.logger.Println("ERR: ", err) diff --git a/manager.go b/manager.go index a0bda55..7df3436 100644 --- a/manager.go +++ b/manager.go @@ -5,11 +5,17 @@ import ( "log" "os" "sync" + "time" "github.com/go-redis/redis/v8" "github.com/google/uuid" ) +const ( + updateActiveClusterInterval = 60 * time.Second + evictInterval = -90 * time.Second +) + // Manager coordinates work, workers, and signaling needed for job processing type Manager struct { uuid string @@ -19,6 +25,8 @@ type Manager struct { lock sync.Mutex signal chan os.Signal running bool + stop chan bool + active bool logger *log.Logger beforeStartHooks []func() @@ -126,6 +134,17 @@ func (m *Manager) Run() { wg.Done() }() + if m.opts.FailoverStrategy == ActivePassiveFailover { + wg.Add(1) + go func() { + err := m.updateActiveClusterByTimeInterval() + if err != nil { + m.logger.Println("ERR: ", m.uuid, err) + } + wg.Done() + }() + } + wg.Add(len(m.workers)) for i := range m.workers { w := m.workers[i] @@ -165,6 +184,7 @@ func (m *Manager) Stop() { for _, h := range m.duringDrainHooks { h() } + m.stop <- true m.stopSignalHandler() } @@ -224,6 +244,57 @@ func (m *Manager) GetStats() (Stats, error) { return stats, nil } +func (m *Manager) updateActiveClusterByTimeInterval() error { + ticker := time.NewTicker(updateActiveClusterInterval) + defer ticker.Stop() + for { + select { + case <-m.stop: + { + return nil + } + case <-ticker.C: + m.lock.Lock() + if !m.running { + m.lock.Unlock() + return nil + } + m.lock.Unlock() + err := m.UpdateActiveCluster() + if err != nil { + return err + } + } + } +} + +func (m *Manager) UpdateActiveCluster() error { + ctx := context.Background() + now, err := m.opts.store.GetTime(ctx) + if err != nil { + return err + } + err = m.opts.store.EvictExpiredClusters(ctx, now.Add(evictInterval).Unix()) + if err != nil { + return err + } + err = m.opts.store.AddActiveCluster(ctx, m.uuid, m.opts.ClusterPriority) + if err != nil { + return err + } + activeClusterUUID, err := m.opts.store.GetActiveClusterName(ctx) + activeManager := m.uuid == activeClusterUUID + m.lock.Lock() + if (m.active && !activeManager) || (!m.active && activeManager) { + m.active = activeManager + for _, worker := range m.workers { + worker.Active() <- activeManager + } + } + m.lock.Unlock() + return nil +} + // GetRetries returns the set of retry jobs for the manager func (m *Manager) GetRetries(page uint64, pageSize int64, match string) (Retries, error) { // TODO: add back pagination and filtering diff --git a/options.go b/options.go index f814d87..03ae692 100644 --- a/options.go +++ b/options.go @@ -12,6 +12,14 @@ import ( "github.com/go-redis/redis/v8" ) +type FailoverStrategy int + +const ( + NoFailover FailoverStrategy = iota + ActivePassiveFailover + ActiveActiveFailover +) + // Options contains the set of configuration options for a manager and/or producer type Options struct { ProcessID string @@ -30,6 +38,13 @@ type Options struct { // Optional display name used when displaying manager stats ManagerDisplayName string + FailoverStrategy FailoverStrategy + + // One or more managers can belong to a cluster + // Active/passive failover will failover by cluster + ClusterName string + ClusterPriority float64 + // Log Logger *log.Logger diff --git a/storage/redis.go b/storage/redis.go index 3e4b8f5..75c08cb 100644 --- a/storage/redis.go +++ b/storage/redis.go @@ -230,3 +230,48 @@ func (r *redisStore) IncrementStats(ctx context.Context, metric string) error { func (r *redisStore) getQueueName(queue string) string { return r.namespace + "queue:" + queue } + +func (r *redisStore) GetTime(ctx context.Context) (time.Time, error) { + return r.client.Time(ctx).Result() +} + +func (r *redisStore) AddActiveCluster(ctx context.Context, managerUUID string, managerPriority float64) error { + now, err := r.GetTime(ctx) + if err != nil { + return err + } + _, err = r.client.ZAdd(ctx, r.namespace+"managers-active-ts", &redis.Z{Member: managerUUID, Score: float64(now.Unix())}).Result() + if err != nil { + return err + } + _, err = r.client.ZAdd(ctx, r.namespace+"managers-active", &redis.Z{Member: managerUUID, Score: managerPriority}).Result() + if err != nil { + return err + } + return nil +} + +// EvictExpiredManagers evicts managers that have a last active <= expireTS +func (r *redisStore) EvictExpiredClusters(ctx context.Context, expireTS int64) error { + evictManagerUUIDs, err := r.client.ZRangeByScore(ctx, r.namespace+"managers-active-ts", &redis.ZRangeBy{Min: "-inf", Max: fmt.Sprintf("(%d", expireTS)}).Result() + if err != nil { + return err + } + _, err = r.client.ZRem(ctx, r.namespace+"managers-active", evictManagerUUIDs).Result() + if err != nil { + return err + } + r.client.ZRemRangeByScore(ctx, r.namespace+"managers-active-ts", "-inf", fmt.Sprintf("(%d", expireTS)) + return nil +} + +func (r *redisStore) GetActiveClusterName(ctx context.Context) (string, error) { + activeManagerUUIDs, err := r.client.ZRangeByScore(ctx, r.namespace+"managers-active-ts", &redis.ZRangeBy{Min: "-inf", Max: "+inf", Offset: 0, Count: 1}).Result() + if err != nil { + return "", err + } + if len(activeManagerUUIDs) == 0 { + return "", nil + } + return activeManagerUUIDs[1], nil +} diff --git a/storage/storage.go b/storage/storage.go index 55bfaa7..1e3c981 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -37,6 +37,10 @@ type Retries struct { // Store is the interface for storing and retrieving data type Store interface { + // Worker manager cluster operations + AddActiveCluster(ctx context.Context, managerUUID string, managerPriority float64) error + EvictExpiredClusters(ctx context.Context, expireTS int64) error + GetActiveClusterName(ctx context.Context) (string, error) // General queue operations CreateQueue(ctx context.Context, queue string) error @@ -59,4 +63,7 @@ type Store interface { // Retries GetAllRetries(ctx context.Context) (*Retries, error) + + // Misc + GetTime(ctx context.Context) (time.Time, error) } diff --git a/worker.go b/worker.go index 2acd434..cc154b0 100644 --- a/worker.go +++ b/worker.go @@ -13,6 +13,7 @@ type worker struct { runnersLock sync.Mutex stop chan bool running bool + active chan bool logger *log.Logger } @@ -25,6 +26,7 @@ func newWorker(logger *log.Logger, queue string, concurrency int, handler JobFun handler: handler, concurrency: concurrency, stop: make(chan bool), + active: make(chan bool), logger: logger, } return w @@ -84,12 +86,18 @@ func (w *worker) start(fetcher Fetcher) { } w.runnersLock.Unlock() } + case active := <-w.Active(): + fetcher.SetActive(active) case <-exit: return } } } +func (w *worker) Active() chan bool { + return w.active +} + func (w *worker) quit() { w.runnersLock.Lock() defer w.runnersLock.Unlock() diff --git a/worker_test.go b/worker_test.go index 13e5328..4ac1d0c 100644 --- a/worker_test.go +++ b/worker_test.go @@ -12,6 +12,7 @@ import ( type dummyFetcher struct { queue func() string fetch func() + setActive func(bool) acknowledge func(*Msg) ready func() chan bool messages func() chan *Msg @@ -19,13 +20,14 @@ type dummyFetcher struct { closed func() bool } -func (d dummyFetcher) Queue() string { return d.queue() } -func (d dummyFetcher) Fetch() { d.fetch() } -func (d dummyFetcher) Acknowledge(m *Msg) { d.acknowledge(m) } -func (d dummyFetcher) Ready() chan bool { return d.ready() } -func (d dummyFetcher) Messages() chan *Msg { return d.messages() } -func (d dummyFetcher) Close() { d.close() } -func (d dummyFetcher) Closed() bool { return d.closed() } +func (d dummyFetcher) Queue() string { return d.queue() } +func (d dummyFetcher) Fetch() { d.fetch() } +func (d dummyFetcher) Acknowledge(m *Msg) { d.acknowledge(m) } +func (d dummyFetcher) Ready() chan bool { return d.ready() } +func (d dummyFetcher) SetActive(active bool) { d.setActive(active) } +func (d dummyFetcher) Messages() chan *Msg { return d.messages() } +func (d dummyFetcher) Close() { d.close() } +func (d dummyFetcher) Closed() bool { return d.closed() } func TestNewWorker(t *testing.T) { testLogger := log.New(os.Stdout, "test-go-workers2: ", log.Ldate|log.Lmicroseconds)