Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions statefun/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@
wg.Wait()
}

func (r *Runtime) stopRequestSubscriptions() {

Check failure on line 315 in statefun/runtime.go

View workflow job for this annotation

GitHub Actions / lint

func `(*Runtime).stopRequestSubscriptions` is unused (unused)
for {
allFunctionsReadyForShutdown := true
for _, ft := range r.registeredFunctionTypes {
Expand Down Expand Up @@ -605,7 +605,6 @@
r.stopFunctionSubscriptions(ctx)
if r.afterStartRunning.Load() {
r.gs.cancelPhaseOne()
r.afterStartRunning.Store(false)
}
kvConsistencyCheck = nil
}
Expand Down Expand Up @@ -650,6 +649,7 @@
r.config.isActiveInstance = true
r.activeInstanceMu.Unlock()
r.gs.resetPhaseOneCtx()
r.afterStartRunning.Store(false)
subscribeRequired = true
}
default:
Expand Down Expand Up @@ -684,11 +684,6 @@
}

if r.config.isActiveInstance {
if r.afterStartRunning.CompareAndSwap(false, true) {
lg.GetLogger().Debugf(ctx, "runtime is active, run afterStartFunctions")
r.runAfterStartFunctions(r.gs.phaseOneCtx())
}

for ftName, revID := range revisions {
if revID == 0 {
tryLock(ftName)
Expand All @@ -713,6 +708,11 @@
lg.Logf(lg.ErrorLevel, "function subscriptions failed: %v", err)
}
}

if r.afterStartRunning.CompareAndSwap(false, true) {
lg.GetLogger().Debugf(ctx, "run afterStartFunctions")
r.runAfterStartFunctions(r.gs.phaseOneCtx())
}
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions statefun/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/nats-io/nats.go"
)

var errTransactionAlreadyApplied = fmt.Errorf("transaction already applied by another runtime")

const (
WALOperationsStreamName = "wal_operations"
WALCommitsStreamName = "wal_commits"
Expand Down Expand Up @@ -136,6 +138,12 @@ func (dm *Domain) runTransactionCommitter(ctx context.Context, ready chan struct
}

if err := dm.applyTransactionOperations(ctx, txID); err != nil {
if errors.Is(err, errTransactionAlreadyApplied) {
lg.Logf(lg.DebugLevel, "TransactionCommitter: transaction %s already applied, acking", txID)
system.MsgOnErrorReturn(msg.Ack())
processedCount++
return
}
lg.Logf(lg.ErrorLevel, "TransactionCommitter: failed to apply transaction %s: %s", txID, err)
system.MsgOnErrorReturn(msg.Nak())
return
Expand Down Expand Up @@ -237,6 +245,10 @@ func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) e

sub, err := dm.js.PullSubscribe(opsSubject, consumerName)
if err != nil {
if errors.Is(err, nats.ErrConsumerDeleted) {
lg.Logf(lg.DebugLevel, "Per-tx consumer %s already deleted, transaction applied by another runtime", consumerName)
return errTransactionAlreadyApplied
}
lg.Logf(lg.ErrorLevel, "Failed to create subscription: %s", err)
return fmt.Errorf("failed to subscribe to operations: %w", err)
}
Expand All @@ -258,6 +270,10 @@ func (dm *Domain) applyTransactionOperations(ctx context.Context, txID string) e
lg.Logf(lg.TraceLevel, "applyTransactionOperations: finished, processed %d operations for tx_id=%s", totalOps, txID)
break
}
if errors.Is(err, nats.ErrConsumerDeleted) {
lg.Logf(lg.DebugLevel, "Per-tx consumer deleted during fetch, transaction applied by another runtime (tx_id=%s)", txID)
return errTransactionAlreadyApplied
}
return fmt.Errorf("failed to fetch operations: %w", err)
}

Expand Down
Loading