From b79c1e4da7f7246d626ba22e889dac2d1519a28f Mon Sep 17 00:00:00 2001 From: atauov Date: Wed, 25 Feb 2026 20:21:38 +0500 Subject: [PATCH] on active -> passive transition is to immediately block generation of new WAL commit/ops messages --- statefun/cache/cache.go | 15 +++++++++++++++ statefun/runtime.go | 5 +++++ 2 files changed, 20 insertions(+) diff --git a/statefun/cache/cache.go b/statefun/cache/cache.go index 1c99e6d6..ca27b2c0 100644 --- a/statefun/cache/cache.go +++ b/statefun/cache/cache.go @@ -295,6 +295,9 @@ type Store struct { transactionsMutex *sync.Mutex getKeysByPatternFromKVMutex *sync.Mutex + // walWriteEnabled - true for active instance + // only active instances can write to WAL streams + walWriteEnabled atomic.Bool transactionGenerator TransactionGenerator //write barrier state @@ -499,6 +502,9 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo cs.ctx, cs.cancel = context.WithCancel(ctx) + // default - can not publish to WAL + cs.walWriteEnabled.Store(false) + storeUpdatesHandler := func(cs *Store) { system.GlobalPrometrics.GetRoutinesCounter().Started("cache.storeUpdatesHandler") defer system.GlobalPrometrics.GetRoutinesCounter().Stopped("cache.storeUpdatesHandler") @@ -597,6 +603,11 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo time.Sleep(100 * time.Millisecond) continue } + if !cs.walWriteEnabled.Load() { + le.Debugf(ctx, "WAL writes are disabled, skip this iteration") + time.Sleep(100 * time.Millisecond) + continue + } if shutdownStatus == shutdownStatusNone { barrierTime := system.GetCurrentTimeNs() @@ -1229,4 +1240,8 @@ func (cs *Store) SetTransactionGenerator(tg TransactionGenerator) { cs.transactionGenerator = tg } +func (cs *Store) SetWALWriteEnabled(enabled bool) { + cs.walWriteEnabled.Store(enabled) +} + // ----------------------------------- diff --git a/statefun/runtime.go b/statefun/runtime.go index 33ca256b..300941bb 100644 --- a/statefun/runtime.go +++ b/statefun/runtime.go @@ -259,6 +259,9 @@ func (r *Runtime) Start(ctx context.Context, cacheConfig *cache.Config) error { r.config.isActiveInstance = true } + // if active - can publish to WAL, passive - can not + r.Domain.Cache().SetWALWriteEnabled(r.config.isActiveInstance) + // Handle single-instance functions. singleInstanceFunctionRevisions := make(map[string]uint64) if err := r.handleSingleInstanceFunctions(r.gs.ctxPhaseThree, singleInstanceFunctionRevisions); err != nil { @@ -602,6 +605,7 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi r.config.isActiveInstance = false r.config.activeRevID = 0 r.activeInstanceMu.Unlock() + r.Domain.Cache().SetWALWriteEnabled(false) r.stopFunctionSubscriptions(ctx) if r.afterStartRunning.Load() { r.gs.cancelPhaseOne() @@ -648,6 +652,7 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi r.activeInstanceMu.Lock() r.config.isActiveInstance = true r.activeInstanceMu.Unlock() + r.Domain.Cache().SetWALWriteEnabled(true) r.gs.resetPhaseOneCtx() r.afterStartRunning.Store(false) subscribeRequired = true