Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion statefun/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@
transactionsMutex *sync.Mutex
getKeysByPatternFromKVMutex *sync.Mutex

// walWriteEnabled - true for active instance
// only active instances can write to WAL streams
walWriteEnabled atomic.Bool
transactionGenerator TransactionGenerator

//write barrier state
Expand Down Expand Up @@ -499,6 +502,9 @@

cs.ctx, cs.cancel = context.WithCancel(ctx)

// default - can not publish to WAL
cs.walWriteEnabled.Store(false)

storeUpdatesHandler := func(cs *Store) {
system.GlobalPrometrics.GetRoutinesCounter().Started("cache.storeUpdatesHandler")
defer system.GlobalPrometrics.GetRoutinesCounter().Stopped("cache.storeUpdatesHandler")
Expand All @@ -507,7 +513,7 @@
for activeKVSync {
select {
case <-cs.ctx.Done():
activeKVSync = false

Check failure on line 516 in statefun/cache/cache.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to activeKVSync (ineffassign)
return
case entry := <-w.Updates():
if entry != nil {
Expand Down Expand Up @@ -585,6 +591,9 @@
defer system.GlobalPrometrics.GetRoutinesCounter().Stopped("cache.kvLazyWriter")

shutdownStatus := shutdownStatusNone
lastWALNotReadyLogAt := time.Time{}
lastWALDisabledLogAt := time.Time{}
skipLogInterval := 5 * time.Second
for {
if shutdownStatus == shutdownStatusReady {
le.Debugf(ctx, "cache synced, ready for shutdown")
Expand All @@ -593,7 +602,18 @@
}

if cs.transactionGenerator == nil {
le.Debugf(ctx, "WAL is not ready, skip this iteration")
if time.Since(lastWALNotReadyLogAt) >= skipLogInterval {
le.Tracef(ctx, "WAL is not ready, skip this iteration")
lastWALNotReadyLogAt = time.Now()
}
time.Sleep(100 * time.Millisecond)
continue
}
if !cs.walWriteEnabled.Load() {
if time.Since(lastWALDisabledLogAt) >= skipLogInterval {
le.Tracef(ctx, "WAL writes are disabled, skip this iteration")
lastWALDisabledLogAt = time.Now()
}
time.Sleep(100 * time.Millisecond)
continue
}
Expand Down Expand Up @@ -1229,4 +1249,8 @@
cs.transactionGenerator = tg
}

func (cs *Store) SetWALWriteEnabled(enabled bool) {
cs.walWriteEnabled.Store(enabled)
}

// -----------------------------------
29 changes: 19 additions & 10 deletions statefun/function_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

"github.com/foliagecp/easyjson"

"github.com/foliagecp/sdk/statefun/logger"
lg "github.com/foliagecp/sdk/statefun/logger"
sfPlugins "github.com/foliagecp/sdk/statefun/plugins"
"github.com/foliagecp/sdk/statefun/system"
Expand Down Expand Up @@ -65,7 +64,7 @@
ft.sfWorkerPool = NewSFWorkerPool(ft, config.functionWorkerPoolConfig)
runtime.registeredFunctionTypes[ft.name] = ft
} else {
lg.GetLogger().Errorf(context.TODO(), "Function type '%s' is not registered. Ensure that all function types are registered before starting the runtime.", ft.name)
lg.Logf(lg.ErrorLevel, "Function type '%s' is not registered. Ensure that all function types are registered before starting the runtime.", ft.name)
}
return ft
}
Expand Down Expand Up @@ -144,7 +143,7 @@

if !ft.TokenTryAcquire() {
msg.RefusalCallback(true) // No redelivering cause system have no more scaling resources!
logger.Logf(logger.ErrorLevel, sendMsgFuncErrorMsg, ft.name, id, "no tokens left")
lg.Logf(lg.ErrorLevel, sendMsgFuncErrorMsg, ft.name, id, "no tokens left")
return
}

Expand All @@ -168,7 +167,7 @@
default:
ft.TokenRelease()
msg.RefusalCallback(false) // Can try to rediliver cause free tokens still exists, system have scaling resources
logger.Logf(logger.WarnLevel, sendMsgFuncErrorMsg, ft.name, id, "queue for current id is full")
lg.Logf(lg.WarnLevel, sendMsgFuncErrorMsg, ft.name, id, "queue for current id is full")
}
}

Expand All @@ -177,7 +176,7 @@
ft.idKeyMutex.Lock(id)
defer func() {
if r := recover(); r != nil {
logger.Logf(logger.ErrorLevel, "panic in workerTaskExecutor for %s:%s: %v", ft.name, id, r)
lg.Logf(lg.ErrorLevel, "panic in workerTaskExecutor for %s:%s: %v", ft.name, id, r)
}
ft.idKeyMutex.Unlock(id)
}()
Expand Down Expand Up @@ -246,6 +245,16 @@
}

func (ft *FunctionType) handleMsgForID(id string, msg FunctionTypeMsg, typenameIDContextProcessor *sfPlugins.StatefunContextProcessor) {
// In HA mode passive runtime must not enqueue new tasks
// request will reject by timeout
if ft.runtime.config.activePassiveMode && !ft.runtime.IsActiveInstance() {
if msg.AckCallback != nil {
msg.AckCallback(true) // we dont want to redeliver this
}
lg.Logf(lg.DebugLevel, sendMsgFuncErrorMsg, ft.name, id, "runtime is passive")
return
}

ft.lastMsgTimeNs.Store(uint64(system.GetCurrentTimeNs()))
msgRequestCallback := msg.RequestCallback
replyDataChannel := make(chan *easyjson.JSON, 1)
Expand Down Expand Up @@ -506,9 +515,9 @@
if !ft.signalSubscription.IsValid() {
return
}
logger.GetLogger().Debugf(context.TODO(), "draining signal subscription for typename %s", ft.name)
lg.Logf(lg.DebugLevel, "draining signal subscription for typename %s", ft.name)
if err := ft.signalSubscription.Drain(); err != nil {
logger.GetLogger().Errorf(context.TODO(), "failed to drain signal subscription for typename %s: %s", ft.name, err.Error())
lg.Logf(lg.ErrorLevel, "failed to drain signal subscription for typename %s: %s", ft.name, err.Error())
return
}

Expand All @@ -519,11 +528,11 @@
for {
select {
case <-timeout:
logger.GetLogger().Errorf(context.TODO(), "timeout waiting for signal subscription drain for typename %s", ft.name)
lg.Logf(lg.ErrorLevel, "timeout waiting for signal subscription drain for typename %s", ft.name)
return
case <-ticker.C:
if !ft.signalSubscription.IsValid() {
logger.GetLogger().Debugf(context.TODO(), "signal subscription drained successfully for typename %s", ft.name)
lg.Logf(lg.DebugLevel, "signal subscription drained successfully for typename %s", ft.name)
return
}
}
Expand All @@ -535,6 +544,6 @@
return
}
ft.sfWorkerPool.Stop()
ft.requestSubscription.Unsubscribe()

Check failure on line 547 in statefun/function_type.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `ft.requestSubscription.Unsubscribe` is not checked (errcheck)
logger.GetLogger().Debugf(context.TODO(), "unsubscribe request subscription for typename %s", ft.name)
lg.Logf(lg.DebugLevel, "unsubscribe request subscription for typename %s", ft.name)
}
40 changes: 40 additions & 0 deletions statefun/function_type_wp.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,46 @@ func (wp *SFWorkerPool) Stop() {
wp.wg.Wait()
}

// DropPendingTasks removes tasks that have not started yet
// Running tasks are not interrupted
func (wp *SFWorkerPool) DropPendingTasks() (dropped int) {
// Drop tasks already moved into worker queue
drainingSharedQueue := true
for drainingSharedQueue {
select {
case task := <-wp.taskQueue:
dropped++
wp.ft.TokenRelease()
if task.Msg.Data.AckCallback != nil {
// do not try to redeliver
task.Msg.Data.AckCallback(true)
}
default:
drainingSharedQueue = false
}
}

// Drop tasks still waiting in per-id channels
wp.ft.idHandlersChannel.Range(func(_, value any) bool {
ch := value.(chan FunctionTypeMsg)
for {
select {
case msg := <-ch:
dropped++
wp.ft.TokenRelease()
if msg.AckCallback != nil {
// do not try to redeliver
msg.AckCallback(true)
}
default:
return true
}
}
})

return dropped
}

func (wp *SFWorkerPool) GetWorkerPoolLoadPercentage() float64 {
return 100.0 * float64(len(wp.taskQueue)) / float64(cap(wp.taskQueue))
}
Expand Down
20 changes: 20 additions & 0 deletions statefun/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@
r.config.isActiveInstance = true
}

// if active - can publish to WAL, passive - can not
r.Domain.Cache().SetWALWriteEnabled(r.config.isActiveInstance)

// Handle single-instance functions.
singleInstanceFunctionRevisions := make(map[string]uint64)
if err := r.handleSingleInstanceFunctions(r.gs.ctxPhaseThree, singleInstanceFunctionRevisions); err != nil {
Expand Down Expand Up @@ -312,7 +315,7 @@
wg.Wait()
}

func (r *Runtime) stopRequestSubscriptions() {

Check failure on line 318 in statefun/runtime.go

View workflow job for this annotation

GitHub Actions / lint

func `(*Runtime).stopRequestSubscriptions` is unused (unused)
for {
allFunctionsReadyForShutdown := true
for _, ft := range r.registeredFunctionTypes {
Expand Down Expand Up @@ -476,6 +479,20 @@
}
}

func (r *Runtime) dropAllFunctionPendingTasks() {
totalDropped := 0
for _, ft := range r.registeredFunctionTypes {
dropped := ft.sfWorkerPool.DropPendingTasks()
totalDropped += dropped
if dropped > 0 {
lg.Logf(lg.DebugLevel, "Dropped %d pending tasks for function %s on passive transition", dropped, ft.name)
}
}
if totalDropped > 0 {
lg.Logf(lg.DebugLevel, "Dropped %d pending tasks in total on passive transition", totalDropped)
}
}

// runAfterStartFunctions executes the registered OnAfterStart functions.
func (r *Runtime) runAfterStartFunctions(ctx context.Context) {
for _, fnWithMode := range r.onAfterStartFunctionsWithMode {
Expand Down Expand Up @@ -602,7 +619,9 @@
r.config.isActiveInstance = false
r.config.activeRevID = 0
r.activeInstanceMu.Unlock()
r.Domain.Cache().SetWALWriteEnabled(false)
r.stopFunctionSubscriptions(ctx)
r.dropAllFunctionPendingTasks()
if r.afterStartRunning.Load() {
r.gs.cancelPhaseOne()
}
Expand Down Expand Up @@ -648,6 +667,7 @@
r.activeInstanceMu.Lock()
r.config.isActiveInstance = true
r.activeInstanceMu.Unlock()
r.Domain.Cache().SetWALWriteEnabled(true)
r.gs.resetPhaseOneCtx()
r.afterStartRunning.Store(false)
subscribeRequired = true
Expand Down
Loading