From 4158a586627ad7a6f50bf2a9ff5e502d22bc872c Mon Sep 17 00:00:00 2001 From: atauov Date: Tue, 10 Mar 2026 15:18:37 +0500 Subject: [PATCH] fix overutil CPU - rollback storeUpdatesHandler logic + close channel check - backupBarrierStatus from cache - reduce logs --- statefun/cache/backup_write_barrier.go | 8 ++++++-- statefun/cache/cache.go | 12 +++++++++--- statefun/domain.go | 12 ++---------- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/statefun/cache/backup_write_barrier.go b/statefun/cache/backup_write_barrier.go index 4d5e53bc..5d71d7de 100644 --- a/statefun/cache/backup_write_barrier.go +++ b/statefun/cache/backup_write_barrier.go @@ -49,11 +49,14 @@ func (cs *Store) checkBackupBarrierInfoBeforeWrite(opTime int64) error { } } - lg.Logf(lg.InfoLevel, "clearBackupBarrier: backup barrier cleared") - return nil } +func (cs *Store) IsBackupBarrierActive() bool { + _, status := cs.getBackupBarrierState() + return status == BackupBarrierStatusLocked || status == BackupBarrierStatusLocking +} + func (cs *Store) getBackupBarrierState() (timestamp int64, status int32) { if cs.shouldRefreshBackupBarrier() { cs.refreshBackupBarrierFromKV() @@ -73,6 +76,7 @@ func (cs *Store) refreshBackupBarrierFromKV() { barrier, err := cs.getBackupBarrierInfo() if err != nil { lg.Logf(lg.ErrorLevel, "refreshBackupBarrierFromKV: failed to read barrier from KV: %s", err) + atomic.StoreInt64(&cs.backupBarrierLastChecked, system.GetCurrentTimeNs()) return } diff --git a/statefun/cache/cache.go b/statefun/cache/cache.go index 3395a7e0..aeb17400 100644 --- a/statefun/cache/cache.go +++ b/statefun/cache/cache.go @@ -510,11 +510,17 @@ func NewCacheStore(ctx context.Context, cacheConfig *Config, js nats.JetStreamCo defer system.GlobalPrometrics.GetRoutinesCounter().Stopped("cache.storeUpdatesHandler") if w, err := kv.Watch(cacheConfig.kvStorePrefix+".>", nats.IgnoreDeletes()); err == nil { defer system.MsgOnErrorReturn(w.Stop()) - for { + activeKVSync := true + for activeKVSync { select { case <-cs.ctx.Done(): - return - case entry := <-w.Updates(): + activeKVSync = false + case entry, ok := <-w.Updates(): + if !ok { + le.Warnf(ctx, "storeUpdatesHandler: KV watcher channel closed unexpectedly") + activeKVSync = false + break + } if entry != nil { key := cs.fromStoreKey(entry.Key()) valueBytes := entry.Value() diff --git a/statefun/domain.go b/statefun/domain.go index fa052d23..2f08ae29 100644 --- a/statefun/domain.go +++ b/statefun/domain.go @@ -10,7 +10,6 @@ import ( "github.com/nats-io/nats.go" - "github.com/foliagecp/easyjson" "github.com/foliagecp/sdk/embedded/nats/kv" "github.com/foliagecp/sdk/statefun/cache" lg "github.com/foliagecp/sdk/statefun/logger" @@ -681,15 +680,8 @@ func (dm *Domain) GenerateTransactionID() string { } func (dm *Domain) isBackupBarrierActive() bool { - entry, err := dm.kv.Get(cache.BackupBarrierLockKey) - if err != nil { - lg.Logf(lg.ErrorLevel, "IsBackupBarrierActive: failed to get backup barrier lock entry: %s", err) - return false - } - barrier, ok := easyjson.JSONFromBytes(entry.Value()) - if !ok { + if dm.cache == nil { return false } - status := int32(barrier.GetByPath("status").AsNumericDefault(cache.BackupBarrierStatusUnlocked)) - return status == cache.BackupBarrierStatusLocked + return dm.cache.IsBackupBarrierActive() }