From 4e88456a88f90ef684b4871ec4ac334f7eeb2353 Mon Sep 17 00:00:00 2001 From: atauov Date: Wed, 18 Feb 2026 14:55:57 +0500 Subject: [PATCH 1/2] fix HA --- statefun/runtime.go | 10 +++++----- statefun/wal.go | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/statefun/runtime.go b/statefun/runtime.go index d7bd9af7..e32a9477 100644 --- a/statefun/runtime.go +++ b/statefun/runtime.go @@ -684,11 +684,6 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi } if r.config.isActiveInstance { - if r.afterStartRunning.CompareAndSwap(false, true) { - lg.GetLogger().Debugf(ctx, "runtime is active, run afterStartFunctions") - r.runAfterStartFunctions(r.gs.phaseOneCtx()) - } - for ftName, revID := range revisions { if revID == 0 { tryLock(ftName) @@ -713,6 +708,11 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi lg.Logf(lg.ErrorLevel, "function subscriptions failed: %v", err) } } + + if r.afterStartRunning.CompareAndSwap(false, true) { + lg.GetLogger().Debugf(ctx, "run afterStartFunctions") + r.runAfterStartFunctions(r.gs.phaseOneCtx()) + } } } } diff --git a/statefun/wal.go b/statefun/wal.go index ce32a837..0b6e5a9f 100644 --- a/statefun/wal.go +++ b/statefun/wal.go @@ -15,6 +15,8 @@ import ( "github.com/nats-io/nats.go" ) +var errTransactionAlreadyApplied = fmt.Errorf("transaction already applied by another runtime") + const ( WALOperationsStreamName = "wal_operations" WALCommitsStreamName = "wal_commits" @@ -136,6 +138,12 @@ func (dm *Domain) runTransactionCommitter(ctx context.Context, ready chan struct } if err := dm.applyTransactionOperations(ctx, txID); err != nil { + if errors.Is(err, errTransactionAlreadyApplied) { + lg.Logf(lg.DebugLevel, "TransactionCommitter: transaction %s already applied, acking", txID) + system.MsgOnErrorReturn(msg.Ack()) + processedCount++ + return + } lg.Logf(lg.ErrorLevel, "TransactionCommitter: failed to apply transaction %s: %s", txID, err) system.MsgOnErrorReturn(msg.Nak()) return @@ -237,6 +245,10 @@ func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) e sub, err := dm.js.PullSubscribe(opsSubject, consumerName) if err != nil { + if errors.Is(err, nats.ErrConsumerDeleted) { + lg.Logf(lg.DebugLevel, "Per-tx consumer %s already deleted, transaction applied by another runtime", consumerName) + return errTransactionAlreadyApplied + } lg.Logf(lg.ErrorLevel, "Failed to create subscription: %s", err) return fmt.Errorf("failed to subscribe to operations: %w", err) } @@ -258,6 +270,10 @@ func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) e lg.Logf(lg.TraceLevel, "applyTransactionOperations: finished, processed %d operations for tx_id=%s", totalOps, txID) break } + if errors.Is(err, nats.ErrConsumerDeleted) { + lg.Logf(lg.DebugLevel, "Per-tx consumer deleted during fetch, transaction applied by another runtime (tx_id=%s)", txID) + return errTransactionAlreadyApplied + } return fmt.Errorf("failed to fetch operations: %w", err) } From 4fa3161581230e194bbb3ff19c384e05682cc901 Mon Sep 17 00:00:00 2001 From: atauov Date: Fri, 20 Feb 2026 11:17:25 +0500 Subject: [PATCH 2/2] fix afterStart run --- statefun/runtime.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statefun/runtime.go b/statefun/runtime.go index e32a9477..33ca256b 100644 --- a/statefun/runtime.go +++ b/statefun/runtime.go @@ -605,7 +605,6 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi r.stopFunctionSubscriptions(ctx) if r.afterStartRunning.Load() { r.gs.cancelPhaseOne() - r.afterStartRunning.Store(false) } kvConsistencyCheck = nil } @@ -650,6 +649,7 @@ func (r *Runtime) singleInstanceFunctionLocksUpdater(ctx context.Context, revisi r.config.isActiveInstance = true r.activeInstanceMu.Unlock() r.gs.resetPhaseOneCtx() + r.afterStartRunning.Store(false) subscribeRequired = true } default: