From b62daf2d207d45b58e478008db7643969896915a Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 13 May 2026 20:00:00 -0400 Subject: [PATCH 01/16] replace PauseState flag with PAUSE_REQUESTED status --- chasm/lib/activity/activity.go | 155 ++++++++---------- chasm/lib/activity/activity_tasks.go | 12 +- .../v1/activity_state.go-helpers.pb.go | 1 + .../gen/activitypb/v1/activity_state.pb.go | 48 +++--- .../activity/proto/v1/activity_state.proto | 19 ++- chasm/lib/activity/statemachine.go | 78 +++++++-- tests/standalone_activity_test.go | 20 +-- 7 files changed, 185 insertions(+), 148 deletions(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 55c67f95e6..1c670ccf4a 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -793,7 +793,7 @@ func (a *Activity) handlePauseRequested(ctx chasm.MutableContext, req *activityp if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { return nil, serviceerror.NewFailedPrecondition("cannot pause an activity with a pending cancellation") } - if a.PauseState != nil { + if a.isPaused() { newReqID := req.GetFrontendRequest().GetRequestId() existingReqID := a.PauseState.GetRequestId() if newReqID != "" && existingReqID == newReqID { @@ -807,19 +807,19 @@ func (a *Activity) handlePauseRequested(ctx chasm.MutableContext, req *activityp return nil, err } - if TransitionPaused.Possible(a) { - // SCHEDULED → real PAUSED status; stamp bumped to invalidate the pending dispatch task. - if err := TransitionPaused.Apply(a, ctx, pauseEvent{ - req: req.GetFrontendRequest(), - metricsHandler: metricsHandler, - }); err != nil { + event := pauseEvent{req: req.GetFrontendRequest(), metricsHandler: metricsHandler} + switch a.GetStatus() { + case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: + if err := TransitionPaused.Apply(a, ctx, event); err != nil { + return nil, err + } + case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED: + if err := TransitionPauseRequested.Apply(a, ctx, event); err != nil { return nil, err } - return &activitypb.PauseActivityExecutionResponse{}, nil + default: + return nil, serviceerror.NewFailedPreconditionf("activity is in non-pausable state %v", a.GetStatus()) } - // STARTED → flag-only pause. Status stays STARTED so the worker's token remains valid. - // The worker will see ActivityPaused=true on the next heartbeat. - a.pause(ctx, pauseEvent{req.GetFrontendRequest(), metricsHandler}) return &activitypb.PauseActivityExecutionResponse{}, nil } @@ -829,8 +829,7 @@ func (a *Activity) handleUnpauseRequested(ctx chasm.MutableContext, req *activit if a.isTerminal() { return nil, serviceerror.NewFailedPreconditionf("activity is in terminal state %v", a.GetStatus()) } - // Not paused → no-op. - if a.PauseState == nil { + if !a.isPaused() { return &activitypb.UnpauseActivityExecutionResponse{}, nil } @@ -839,31 +838,29 @@ func (a *Activity) handleUnpauseRequested(ctx chasm.MutableContext, req *activit return nil, err } - if TransitionUnpaused.Possible(a) { - if err := TransitionUnpaused.Apply(a, ctx, unpauseEvent{ - req: req.GetFrontendRequest(), - metricsHandler: metricsHandler, - }); err != nil { + event := unpauseEvent{req: req.GetFrontendRequest(), metricsHandler: metricsHandler} + switch a.GetStatus() { + case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED: + if err := TransitionUnpaused.Apply(a, ctx, event); err != nil { + return nil, err + } + case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: + if err := TransitionUnpausedToStarted.Apply(a, ctx, event); err != nil { return nil, err } - return &activitypb.UnpauseActivityExecutionResponse{}, nil } + return &activitypb.UnpauseActivityExecutionResponse{}, nil +} - // Flag-based pause (status is STARTED, CANCEL_REQUESTED, or SCHEDULED after retry while paused). - if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || - a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { - // Worker continues with its existing token — no stamp bump needed, no dispatch task. - // Cancel takes precedence over pause. Unpause clears the pause flag but does not re-dispatch; - // the activity remains CANCEL_REQUESTED and will be cancelled when the worker responds. - a.PauseState = nil - a.emitOnUnpausedMetrics(metricsHandler) - return &activitypb.UnpauseActivityExecutionResponse{}, nil +// isPaused reports whether the activity is currently paused (waiting) or has a pending pause request +// (worker still running). +func (a *Activity) isPaused() bool { + switch a.GetStatus() { + case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: + return true } - a.unpause(ctx, unpauseEvent{ - req: req.GetFrontendRequest(), - metricsHandler: metricsHandler, - }) - return &activitypb.UnpauseActivityExecutionResponse{}, nil + return false } func (a *Activity) unpause( @@ -897,7 +894,7 @@ func (a *Activity) unpause( a.emitOnUnpausedMetrics(event.metricsHandler) } -func (a *Activity) pause( +func (a *Activity) recordPauseState( ctx chasm.MutableContext, event pauseEvent, ) { @@ -970,44 +967,42 @@ func (a *Activity) handleReset(ctx chasm.MutableContext, req *activitypb.ResetAc switch a.Status { case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, - activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED: + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: // Activity is running. Defer reset to the next retry so we don't break // the running worker's task token (which encodes the current attempt count). a.ActivityReset = true if frontendReq.GetResetHeartbeat() { a.ResetHeartbeats = true } - if !keepPaused { - // Clear PauseState now so TransitionRescheduled can dispatch without being - // blocked by the validator (which drops dispatch tasks when PauseState != nil). - a.PauseState = nil + if a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED && !keepPaused { + // Unpause; the deferred reset will apply on the next retry via STARTED->SCHEDULED. + if err := TransitionUnpausedToStarted.Apply(a, ctx, unpauseEvent{ + req: &workflowservice.UnpauseActivityExecutionRequest{}, + metricsHandler: metricsHandler, + }); err != nil { + return nil, err + } } a.emitOnResetMetrics(metricsHandler) return &activitypb.ResetActivityExecutionResponse{}, nil - case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, - activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: - // A SCHEDULED activity can carry PauseState when a pause was applied concurrently - // (e.g. deferred from a STARTED→retry path). In that case the dispatch task emitted - // by TransitionReset would be dropped by the validator, leaving the activity stuck. - // Treat any non-nil PauseState the same way as the explicit PAUSED status. - if a.PauseState != nil { - if keepPaused { - // Reset counts but keep the activity paused. - // No dispatch task — the user must unpause to re-dispatch. - attempt := a.LastAttempt.Get(ctx) - attempt.Count = 1 - attempt.Stamp++ - attempt.CurrentRetryInterval = nil - if frontendReq.GetResetHeartbeat() { - a.clearHeartbeat(ctx) - } - a.emitOnResetMetrics(metricsHandler) - return &activitypb.ResetActivityExecutionResponse{}, nil + case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED: + if keepPaused { + // Reset counts but keep the activity paused. + attempt := a.LastAttempt.Get(ctx) + attempt.Count = 1 + attempt.Stamp++ + attempt.CurrentRetryInterval = nil + if frontendReq.GetResetHeartbeat() { + a.clearHeartbeat(ctx) } - // keepPaused=false: clear pause state so the dispatch task isn't dropped. - a.PauseState = nil + a.emitOnResetMetrics(metricsHandler) + return &activitypb.ResetActivityExecutionResponse{}, nil } + fallthrough + + case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: if err := TransitionReset.Apply(a, ctx, resetEvent{ req: frontendReq, scheduleTime: scheduleTime, @@ -1073,9 +1068,8 @@ func (a *Activity) recordFailedAttempt( } // tryReschedule attempts to reschedule the activity for retry. Returns true if rescheduled, false -// if retry is not possible. When the activity has PauseState set (flag-based pause from STARTED), -// the retry transitions to SCHEDULED normally but the dispatch task is blocked by the pause flag -// until the activity is unpaused. +// if retry is not possible. If a pause request has been received then we transition to Paused; +// otherwise to Scheduled. func (a *Activity) tryReschedule( ctx chasm.MutableContext, overridingRetryInterval time.Duration, @@ -1085,14 +1079,15 @@ func (a *Activity) tryReschedule( if !shouldRetry { return false, nil } - return true, TransitionRescheduled.Apply(a, ctx, rescheduleEvent{ - retryInterval: retryInterval, - failure: failure, - }) + event := rescheduleEvent{retryInterval: retryInterval, failure: failure} + if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED { + return true, TransitionAttemptFailedToPaused.Apply(a, ctx, event) + } + return true, TransitionRescheduled.Apply(a, ctx, event) } func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration) { - if !TransitionRescheduled.Possible(a) { + if !TransitionRescheduled.Possible(a) && !TransitionAttemptFailedToPaused.Possible(a) { return false, 0 } attempt := a.LastAttempt.Get(ctx) @@ -1187,7 +1182,7 @@ func (a *Activity) RecordHeartbeat( } return &historyservice.RecordActivityTaskHeartbeatResponse{ CancelRequested: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, - ActivityPaused: a.PauseState != nil, + ActivityPaused: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, ActivityReset: a.ActivityReset, }, nil } @@ -1198,7 +1193,8 @@ func InternalStatusToAPIStatus(status activitypb.ActivityExecutionStatus) enumsp case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, - activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED: + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: return enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED: return enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED @@ -1227,6 +1223,8 @@ func internalStatusToRunState(status activitypb.ActivityExecutionStatus) enumspb return enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED: return enumspb.PENDING_ACTIVITY_STATE_PAUSED + case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: + return enumspb.PENDING_ACTIVITY_STATE_PAUSE_REQUESTED case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED, @@ -1241,19 +1239,7 @@ func internalStatusToRunState(status activitypb.ActivityExecutionStatus) enumspb func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) *apiactivitypb.ActivityExecutionInfo { status := InternalStatusToAPIStatus(a.GetStatus()) - // Derive the external run state with hybrid pause logic: - // PAUSED status (real) → PAUSED - // STARTED + PauseState != nil (pause requested while running) → PAUSE_REQUESTED - // SCHEDULED + PauseState != nil (retry while paused flag set) → PAUSED - // All other cases → derived from internal status directly - var runState enumspb.PendingActivityState - if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && a.PauseState != nil { - runState = enumspb.PENDING_ACTIVITY_STATE_PAUSE_REQUESTED - } else if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED && a.PauseState != nil { - runState = enumspb.PENDING_ACTIVITY_STATE_PAUSED - } else { - runState = internalStatusToRunState(a.GetStatus()) - } + runState := internalStatusToRunState(a.GetStatus()) requestData := a.RequestData.Get(ctx) attempt := a.LastAttempt.Get(ctx) @@ -1466,7 +1452,8 @@ func (a *Activity) validateActivityTaskToken( requestNamespaceID string, ) error { if a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && - a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { + a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED && + a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED { return serviceerror.NewNotFound("activity task not found") } if token.Attempt != ByIDTokenAttempt && token.Attempt != a.LastAttempt.Get(ctx).GetCount() { diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index 52d5112789..8f9aaed7a5 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -35,11 +35,7 @@ func (h *activityDispatchTaskHandler) Validate( _ chasm.TaskAttributes, task *activitypb.ActivityDispatchTask, ) (bool, error) { - // Do not dispatch while the activity has a pause flag set (SCHEDULED + PauseState from a retry - // while a STARTED activity was flag-paused). TransitionStarted.Possible already returns false for - // real PAUSED status activities (source must be SCHEDULED, and PAUSED → SCHEDULED via unpause). return (TransitionStarted.Possible(activity) && - activity.PauseState == nil && task.Stamp == activity.LastAttempt.Get(ctx).GetStamp()), nil } @@ -96,9 +92,7 @@ func (h *scheduleToStartTimeoutTaskHandler) Validate( _ chasm.TaskAttributes, task *activitypb.ScheduleToStartTimeoutTask, ) (bool, error) { - // Do not time out a SCHEDULED activity that has the pause flag set (retry while paused). return (activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED && - activity.PauseState == nil && task.Stamp == activity.LastAttempt.Get(ctx).GetStamp()), nil } @@ -182,7 +176,8 @@ func (h *startToCloseTimeoutTaskHandler) Validate( task *activitypb.StartToCloseTimeoutTask, ) (bool, error) { valid := ((activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || - activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED) && + activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED || + activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED) && task.Stamp == activity.LastAttempt.Get(ctx).GetStamp()) return valid, nil } @@ -243,7 +238,8 @@ func (h *heartbeatTimeoutTaskHandler) Validate( // last heartbeat was received after hb_i. If so, we reject this timeout task. Otherwise, the // Execute function runs and we fail the attempt. if activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && - activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { + activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED && + activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED { return false, nil } // Task attempt must still match current attempt. diff --git a/chasm/lib/activity/gen/activitypb/v1/activity_state.go-helpers.pb.go b/chasm/lib/activity/gen/activitypb/v1/activity_state.go-helpers.pb.go index 8ca3e1cb52..03a1d65a46 100644 --- a/chasm/lib/activity/gen/activitypb/v1/activity_state.go-helpers.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/activity_state.go-helpers.pb.go @@ -315,6 +315,7 @@ var ( "Terminated": 7, "TimedOut": 8, "Paused": 9, + "PauseRequested": 10, } ) diff --git a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go index 0a93abdde5..e8bb358713 100644 --- a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go @@ -61,31 +61,32 @@ const ( // Additionally, after all retries are exhausted for start-to-close or heartbeat timeouts, the activity will also // transition to timed out status. ACTIVITY_EXECUTION_STATUS_TIMED_OUT ActivityExecutionStatus = 8 - // The activity has been paused while in the SCHEDULED state. No worker will be dispatched until - // the activity is unpaused. The activity's pause_state field is populated with the identity, - // reason, and time of the pause request. - // - // Note: pausing a STARTED activity does not transition to this status. Instead, the pause is - // delivered as a flag (pause_state is set, status stays STARTED) and the worker is notified - // via ActivityPaused=true on its next heartbeat. The external run state in that case is - // PAUSE_REQUESTED. If the worker fails and retries while the flag is set, the retry lands in - // SCHEDULED with pause_state still populated and the dispatch task is blocked until unpause. + // The activity is paused and no worker is currently executing the attempt. No dispatch task + // will be issued until the activity is unpaused. Reachable from SCHEDULED via an operator pause, + // and from PAUSE_REQUESTED when the worker yields (failure with retries remaining, or timeout + // with retries remaining). ACTIVITY_EXECUTION_STATUS_PAUSED ActivityExecutionStatus = 9 + // An operator pause was received while the activity was STARTED. The worker is still executing + // under its existing task token. The worker will be notified via ActivityPaused=true on its next + // heartbeat response. When the worker yields (failure with retries remaining, or timeout with + // retries remaining), the activity will transition to PAUSED. + ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED ActivityExecutionStatus = 10 ) // Enum value maps for ActivityExecutionStatus. var ( ActivityExecutionStatus_name = map[int32]string{ - 0: "ACTIVITY_EXECUTION_STATUS_UNSPECIFIED", - 1: "ACTIVITY_EXECUTION_STATUS_SCHEDULED", - 2: "ACTIVITY_EXECUTION_STATUS_STARTED", - 3: "ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED", - 4: "ACTIVITY_EXECUTION_STATUS_COMPLETED", - 5: "ACTIVITY_EXECUTION_STATUS_FAILED", - 6: "ACTIVITY_EXECUTION_STATUS_CANCELED", - 7: "ACTIVITY_EXECUTION_STATUS_TERMINATED", - 8: "ACTIVITY_EXECUTION_STATUS_TIMED_OUT", - 9: "ACTIVITY_EXECUTION_STATUS_PAUSED", + 0: "ACTIVITY_EXECUTION_STATUS_UNSPECIFIED", + 1: "ACTIVITY_EXECUTION_STATUS_SCHEDULED", + 2: "ACTIVITY_EXECUTION_STATUS_STARTED", + 3: "ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED", + 4: "ACTIVITY_EXECUTION_STATUS_COMPLETED", + 5: "ACTIVITY_EXECUTION_STATUS_FAILED", + 6: "ACTIVITY_EXECUTION_STATUS_CANCELED", + 7: "ACTIVITY_EXECUTION_STATUS_TERMINATED", + 8: "ACTIVITY_EXECUTION_STATUS_TIMED_OUT", + 9: "ACTIVITY_EXECUTION_STATUS_PAUSED", + 10: "ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED", } ActivityExecutionStatus_value = map[string]int32{ "ACTIVITY_EXECUTION_STATUS_UNSPECIFIED": 0, @@ -98,6 +99,7 @@ var ( "ACTIVITY_EXECUTION_STATUS_TERMINATED": 7, "ACTIVITY_EXECUTION_STATUS_TIMED_OUT": 8, "ACTIVITY_EXECUTION_STATUS_PAUSED": 9, + "ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED": 10, } ) @@ -131,6 +133,8 @@ func (x ActivityExecutionStatus) String() string { // Deprecated: Use ActivityExecutionStatus.Descriptor instead. case ACTIVITY_EXECUTION_STATUS_PAUSED: return "Paused" + case ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: + return "PauseRequested" default: return strconv.Itoa(int(x)) } @@ -1114,7 +1118,7 @@ const file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawD "\x06output\x18\x01 \x01(\v2 .temporal.api.common.v1.PayloadsR\x06output\x1aD\n" + "\x06Failed\x12:\n" + "\afailure\x18\x01 \x01(\v2 .temporal.api.failure.v1.FailureR\afailureB\t\n" + - "\avariant*\xb4\x03\n" + + "\avariant*\xe3\x03\n" + "\x17ActivityExecutionStatus\x12)\n" + "%ACTIVITY_EXECUTION_STATUS_UNSPECIFIED\x10\x00\x12'\n" + "#ACTIVITY_EXECUTION_STATUS_SCHEDULED\x10\x01\x12%\n" + @@ -1125,7 +1129,9 @@ const file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawD "\"ACTIVITY_EXECUTION_STATUS_CANCELED\x10\x06\x12(\n" + "$ACTIVITY_EXECUTION_STATUS_TERMINATED\x10\a\x12'\n" + "#ACTIVITY_EXECUTION_STATUS_TIMED_OUT\x10\b\x12$\n" + - " ACTIVITY_EXECUTION_STATUS_PAUSED\x10\tBDZBgo.temporal.io/server/chasm/lib/activity/gen/activitypb;activitypbb\x06proto3" + " ACTIVITY_EXECUTION_STATUS_PAUSED\x10\t\x12-\n" + + ")ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED\x10\n" + + "BDZBgo.temporal.io/server/chasm/lib/activity/gen/activitypb;activitypbb\x06proto3" var ( file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawDescOnce sync.Once diff --git a/chasm/lib/activity/proto/v1/activity_state.proto b/chasm/lib/activity/proto/v1/activity_state.proto index 40a3401b04..2e5bec35a7 100644 --- a/chasm/lib/activity/proto/v1/activity_state.proto +++ b/chasm/lib/activity/proto/v1/activity_state.proto @@ -41,16 +41,17 @@ enum ActivityExecutionStatus { // Additionally, after all retries are exhausted for start-to-close or heartbeat timeouts, the activity will also // transition to timed out status. ACTIVITY_EXECUTION_STATUS_TIMED_OUT = 8; - // The activity has been paused while in the SCHEDULED state. No worker will be dispatched until - // the activity is unpaused. The activity's pause_state field is populated with the identity, - // reason, and time of the pause request. - // - // Note: pausing a STARTED activity does not transition to this status. Instead, the pause is - // delivered as a flag (pause_state is set, status stays STARTED) and the worker is notified - // via ActivityPaused=true on its next heartbeat. The external run state in that case is - // PAUSE_REQUESTED. If the worker fails and retries while the flag is set, the retry lands in - // SCHEDULED with pause_state still populated and the dispatch task is blocked until unpause. + // The activity is paused and no worker is currently executing the attempt. No dispatch task + // will be issued until the activity is unpaused. Reachable from SCHEDULED via an operator pause, + // and from PAUSE_REQUESTED when the worker yields (failure with retries remaining, or timeout + // with retries remaining). ACTIVITY_EXECUTION_STATUS_PAUSED = 9; + + // An operator pause was received while the activity was STARTED. The worker is still executing + // under its existing task token. The worker will be notified via ActivityPaused=true on its next + // heartbeat response. When the worker yields (failure with retries remaining, or timeout with + // retries remaining), the activity will transition to PAUSED. + ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED = 10; } message ActivityState { diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index 675e4408f8..208b9d8aea 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -199,11 +199,11 @@ var TransitionCompleted = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, func(a *Activity, ctx chasm.MutableContext, event completeEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { - a.PauseState = nil a.ActivityReset = false a.ResetHeartbeats = false @@ -236,12 +236,12 @@ var TransitionFailed = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, func(a *Activity, ctx chasm.MutableContext, event failedEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { req := event.req.GetFailedRequest() - a.PauseState = nil a.ActivityReset = false a.ResetHeartbeats = false @@ -277,6 +277,7 @@ var TransitionTerminated = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, func(a *Activity, ctx chasm.MutableContext, event terminateEvent) error { @@ -284,7 +285,6 @@ var TransitionTerminated = chasm.NewTransition( a.TerminateState = &activitypb.ActivityTerminateState{ RequestId: event.request.RequestID, } - a.PauseState = nil a.ActivityReset = false a.ResetHeartbeats = false outcome := a.Outcome.Get(ctx) @@ -310,14 +310,13 @@ var TransitionTerminated = chasm.NewTransition( ) // TransitionCancelRequested transitions to CancelRequested status. -// PAUSED activities (real status, no worker) are cancelled immediately in handleCancellationRequested -// rather than waiting for a worker response. var TransitionCancelRequested = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, func(a *Activity, ctx chasm.MutableContext, req *workflowservice.RequestCancelActivityExecutionRequest) error { @@ -361,7 +360,6 @@ var TransitionCanceled = chasm.NewTransition( Failure: failure, }, } - a.PauseState = nil a.ActivityReset = false a.ResetHeartbeats = false @@ -385,6 +383,7 @@ var TransitionTimedOut = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, func(a *Activity, ctx chasm.MutableContext, event timeoutEvent) error { @@ -409,7 +408,6 @@ var TransitionTimedOut = chasm.NewTransition( return err } - a.PauseState = nil a.ActivityReset = false a.ResetHeartbeats = false @@ -427,29 +425,41 @@ type pauseEvent struct { // TransitionPaused transitions a SCHEDULED activity to PAUSED status. The stamp is bumped to // invalidate any pending dispatch task so the activity is not dispatched while paused. -// -// Note: STARTED activities are NOT paused via this transition. Pausing a STARTED activity is a -// flag-only operation (PauseState is set, status stays STARTED) so the worker's token remains -// valid and the worker is notified via ActivityPaused=true on its next heartbeat. See -// handlePauseRequested for the full hybrid logic. var TransitionPaused = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, }, activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, func(a *Activity, ctx chasm.MutableContext, event pauseEvent) error { - a.pause(ctx, event) + a.recordPauseState(ctx, event) attempt := a.LastAttempt.Get(ctx) attempt.Stamp++ return nil }, ) +// TransitionPauseRequested transitions a STARTED activity to PAUSE_REQUESTED. The worker is still +// in charge of the activity. It will be notified via ActivityPaused=true on its next heartbeat +// response, its task token is not invalidated by this transition, and there is no stamp bump since +// StartToCloseTimeoutTask and HeartbeatTimeoutTask must stay valid. +var TransitionPauseRequested = chasm.NewTransition( + []activitypb.ActivityExecutionStatus{ + activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + }, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, + func(a *Activity, ctx chasm.MutableContext, event pauseEvent) error { + a.recordPauseState(ctx, event) + return nil + }, +) + type unpauseEvent struct { req *workflowservice.UnpauseActivityExecutionRequest metricsHandler metrics.Handler } +// TransitionUnpaused transitions PAUSED → SCHEDULED, triggering a dispatch task that will lead to +// another activity attempt. var TransitionUnpaused = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, @@ -461,6 +471,48 @@ var TransitionUnpaused = chasm.NewTransition( }, ) +// TransitionUnpausedToStarted transitions PAUSE_REQUESTED → STARTED. The worker is still in charge +// of the activity. Its task token is not invalidated by this transition, and there is no stamp bump +// since StartToCloseTimeoutTask and HeartbeatTimeoutTask must stay valid. +var TransitionUnpausedToStarted = chasm.NewTransition( + []activitypb.ActivityExecutionStatus{ + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, + }, + activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + func(a *Activity, ctx chasm.MutableContext, event unpauseEvent) error { + a.emitOnUnpausedMetrics(event.metricsHandler) + return nil + }, +) + +// TransitionAttemptFailedToPaused transitions PAUSE_REQUESTED → PAUSED. It is performed instead of +// TransitionReschedule, when the activity is in PAUSE_REQUESTED and the worker yields (failure or +// timeout) with retries remaining. The failed attempt is recorded and Count is incremented. +var TransitionAttemptFailedToPaused = chasm.NewTransition( + []activitypb.ActivityExecutionStatus{ + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, + }, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + func(a *Activity, ctx chasm.MutableContext, event rescheduleEvent) error { + attempt := a.LastAttempt.Get(ctx) + currentTime := ctx.Now(a) + + if a.ActivityReset { + attempt.Count = 0 + a.ActivityReset = false + if a.ResetHeartbeats { + a.ResetHeartbeats = false + a.clearHeartbeat(ctx) + } + } + + attempt.Count++ + attempt.Stamp++ + + return a.recordFailedAttempt(ctx, event.retryInterval, event.failure, currentTime, false) + }, +) + type resetEvent struct { req *workflowservice.ResetActivityExecutionRequest scheduleTime time.Time diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 288e31153d..8a584b585d 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -8409,11 +8409,7 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { require.Nil(t, poll2Resp.GetHeartbeatDetails(), "expected nil heartbeat details in poll after reset") }) - // UnpauseWhileCancelRequested: CANCEL_REQUESTED+PauseState is reached by pausing while STARTED - // (flag-only, PauseState set) and then cancelling (status → CANCEL_REQUESTED, PauseState stays). - // Pausing directly on a CANCEL_REQUESTED activity is now rejected (FailedPrecondition), so this - // is the only valid path into this state. Unpause must be a no-op — cancel takes precedence and - // the activity must not be re-dispatched. + // Issuing Unpause on a CANCEL_REQUESTED activity is a no-op. t.Run("UnpauseWhileCancelRequested", func(t *testing.T) { ctx := testcore.NewContext() activityID := testcore.RandomizeStr(t.Name()) @@ -8425,7 +8421,7 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { // Poll → STARTED. pollResp := env.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID) - // Pause while STARTED → flag-only (PauseState set, status stays STARTED). + // Pause while STARTED → PAUSE_REQUESTED. _, err := env.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ Namespace: env.Namespace().String(), ActivityId: activityID, @@ -8435,7 +8431,7 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { }) require.NoError(t, err) - // Cancel → CANCEL_REQUESTED. PauseState remains set from the prior pause. + // Cancel → CANCEL_REQUESTED. _, err = env.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ Namespace: env.Namespace().String(), ActivityId: activityID, @@ -8446,16 +8442,16 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { }) require.NoError(t, err) - // Confirm both flags are set via heartbeat. + // Heartbeat shows cancel-requested; pause is no longer surfaced. hbResp, err := env.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{ Namespace: env.Namespace().String(), TaskToken: pollResp.TaskToken, }) require.NoError(t, err) require.True(t, hbResp.GetCancelRequested()) - require.True(t, hbResp.GetActivityPaused()) + require.False(t, hbResp.GetActivityPaused()) - // Unpause — must be a no-op: status stays CANCEL_REQUESTED, no new dispatch task. + // Unpause is a no-op: status stays CANCEL_REQUESTED. _, err = env.FrontendClient().UnpauseActivityExecution(ctx, &workflowservice.UnpauseActivityExecutionRequest{ Namespace: env.Namespace().String(), ActivityId: activityID, @@ -8464,7 +8460,6 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { }) require.NoError(t, err) - // RunState must still be CANCEL_REQUESTED after the unpause. descResp, err := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ Namespace: env.Namespace().String(), ActivityId: activityID, @@ -8474,14 +8469,13 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED, descResp.GetInfo().GetRunState(), "unpause of a CANCEL_REQUESTED activity must be a no-op") - // Heartbeat must show CancelRequested=true, ActivityPaused=false after the unpause. hbResp2, err := env.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{ Namespace: env.Namespace().String(), TaskToken: pollResp.TaskToken, }) require.NoError(t, err) require.True(t, hbResp2.GetCancelRequested(), "cancel must remain after unpause") - require.False(t, hbResp2.GetActivityPaused(), "pause flag must be cleared after unpause") + require.False(t, hbResp2.GetActivityPaused(), "pause flag must remain cleared") }) // UnpauseRequestValidation: validate that Unpause rejects invalid request fields. From 59bcb7d50b8eb6ce8389e7b4e157acae49b2ab88 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 14 May 2026 09:11:56 -0400 Subject: [PATCH 02/16] Bug fix --- chasm/lib/activity/activity.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 1c670ccf4a..e346c57372 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -599,7 +599,9 @@ func (a *Activity) UpdateActivityExecutionOptions( attempt.Stamp++ - if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { + if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || + a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED || + a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED { // Re-create the start-to-close timeout task with the new stamp and (possibly updated) timeout. // The old task was invalidated by the stamp increment above. if timeout := a.GetStartToCloseTimeout().AsDuration(); timeout > 0 { From d1b56e3919f910fe2acfc8321817fc2856f59ea3 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 14 May 2026 13:25:43 -0400 Subject: [PATCH 03/16] test: regression coverage for PAUSE_REQUESTED timeout handling Adds three sub-tests to TestPauseActivityExecution that exercise the PAUSE_REQUESTED + worker-does-not-yield interaction. Each was missing before; verified by reverting the corresponding fix and observing the specific test fail: - StartToCloseTimeoutWhilePauseRequested: Pause a STARTED activity; the worker stops responding; the StartToCloseTimeoutTask must still fire (validator must accept PAUSE_REQUESTED) and the retry must land in PAUSED. - HeartbeatTimeoutWhilePauseRequested: Same pattern for HeartbeatTimeoutTask. - UpdateOptionsPreservesTimeoutsWhilePauseRequested: Pause a STARTED activity, call UpdateActivityExecutionOptions with a shorter StartToCloseTimeout; the handler must re-emit a fresh timeout task for PAUSE_REQUESTED activities, not only STARTED/CANCEL_REQUESTED. Otherwise the stamp bump silently strips timeout enforcement from the running worker. --- tests/standalone_activity_test.go | 184 ++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 8a584b585d..f94e29ad3c 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -7825,6 +7825,190 @@ func (s *standaloneActivityTestSuite) TestPauseActivityExecution() { require.EqualValues(t, 1, descResp.GetInfo().GetAttempt(), "non-retryable fail must not increment attempt") }) + // StartToCloseTimeoutWhilePauseRequested: a STARTED activity with a pending pause must still + // observe its StartToClose timeout. The worker stops responding, the timer fires, the retry + // path consumes the pause-request and lands the activity in PAUSED with the attempt count + // incremented. + t.Run("StartToCloseTimeoutWhilePauseRequested", func(t *testing.T) { + ctx, cancel := context.WithTimeout(testcore.NewContext(), 30*time.Second) + defer cancel() + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + _, err := env.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(1 * time.Second), + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + RequestId: env.Tv().RequestID(), + RetryPolicy: &commonpb.RetryPolicy{ + MaximumAttempts: 10, + InitialInterval: durationpb.New(1 * time.Millisecond), + BackoffCoefficient: 1.0, + }, + }) + require.NoError(t, err) + + // Worker polls → activity is STARTED. + _, err = env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + + // Pause while STARTED → PAUSE_REQUESTED. + _, err = env.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + Identity: "test-identity", + Reason: "test-pause", + }) + require.NoError(t, err) + + // Worker stops responding. StartToCloseTimeout must fire and the pause-request must be + // consumed by a retry, landing the activity in PAUSED at attempt 2. + require.EventuallyWithT(t, func(c *assert.CollectT) { + dr, dErr := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + }) + require.NoError(c, dErr) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_PAUSED, dr.GetInfo().GetRunState()) + require.EqualValues(c, 2, dr.GetInfo().GetAttempt()) + }, 10*time.Second, 200*time.Millisecond) + }) + + // HeartbeatTimeoutWhilePauseRequested: as above but for the heartbeat timer. Without the + // validator accepting PAUSE_REQUESTED, the HeartbeatTimeoutTask is silently dropped and the + // activity never times out (only the longer ScheduleToCloseTimeout would catch it). + t.Run("HeartbeatTimeoutWhilePauseRequested", func(t *testing.T) { + ctx, cancel := context.WithTimeout(testcore.NewContext(), 30*time.Second) + defer cancel() + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + _, err := env.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(1 * time.Minute), + HeartbeatTimeout: durationpb.New(1 * time.Second), + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + RequestId: env.Tv().RequestID(), + RetryPolicy: &commonpb.RetryPolicy{ + MaximumAttempts: 10, + InitialInterval: durationpb.New(1 * time.Millisecond), + BackoffCoefficient: 1.0, + }, + }) + require.NoError(t, err) + + _, err = env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + + _, err = env.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + Identity: "test-identity", + Reason: "test-pause", + }) + require.NoError(t, err) + + // No heartbeat → HeartbeatTimeoutTask fires → retry consumes pause-request → PAUSED at attempt 2. + require.EventuallyWithT(t, func(c *assert.CollectT) { + dr, dErr := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + }) + require.NoError(c, dErr) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_PAUSED, dr.GetInfo().GetRunState()) + require.EqualValues(c, 2, dr.GetInfo().GetAttempt()) + }, 10*time.Second, 200*time.Millisecond) + }) + + // UpdateOptionsPreservesTimeoutsWhilePauseRequested: UpdateActivityExecutionOptions bumps the + // attempt stamp, which invalidates all attempt-scoped timeout tasks. The handler must re-emit + // fresh StartToClose and Heartbeat timeout tasks for PAUSE_REQUESTED activities, otherwise the + // running worker is left with no server-side timeout enforcement (only the long + // ScheduleToCloseTimeout would eventually catch a hung worker). + t.Run("UpdateOptionsPreservesTimeoutsWhilePauseRequested", func(t *testing.T) { + ctx, cancel := context.WithTimeout(testcore.NewContext(), 30*time.Second) + defer cancel() + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + startResp, err := env.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(1 * time.Minute), + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + RequestId: env.Tv().RequestID(), + RetryPolicy: &commonpb.RetryPolicy{ + MaximumAttempts: 10, + InitialInterval: durationpb.New(1 * time.Millisecond), + BackoffCoefficient: 1.0, + }, + }) + require.NoError(t, err) + + _, err = env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + + // Pause while STARTED → PAUSE_REQUESTED. + _, err = env.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + Identity: "test-identity", + Reason: "test-pause", + }) + require.NoError(t, err) + + // Update StartToCloseTimeout to 1s. The handler bumps the attempt stamp (invalidating the + // old 1-minute timeout task) and must re-emit a fresh 1-second timeout task that fires + // while the activity is PAUSE_REQUESTED. + _, err = env.FrontendClient().UpdateActivityExecutionOptions(ctx, &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{ + StartToCloseTimeout: durationpb.New(1 * time.Second), + }, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_to_close_timeout"}}, + }) + require.NoError(t, err) + + // New StartToCloseTimeout fires → retry consumes pause-request → PAUSED at attempt 2. + require.EventuallyWithT(t, func(c *assert.CollectT) { + dr, dErr := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + }) + require.NoError(c, dErr) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_PAUSED, dr.GetInfo().GetRunState()) + require.EqualValues(c, 2, dr.GetInfo().GetAttempt()) + }, 10*time.Second, 200*time.Millisecond) + }) + // PauseRequestValidation: validate that Pause rejects invalid request fields. t.Run("PauseRequestValidation", func(t *testing.T) { ctx := testcore.NewContext() From 316cc6acbcd7a2440f12ab5c74925d9fea22b911 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 26 May 2026 12:54:43 -0400 Subject: [PATCH 04/16] Add default case --- chasm/lib/activity/activity.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index e346c57372..a2a2d4a5b7 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -850,6 +850,8 @@ func (a *Activity) handleUnpauseRequested(ctx chasm.MutableContext, req *activit if err := TransitionUnpausedToStarted.Apply(a, ctx, event); err != nil { return nil, err } + default: + return nil, serviceerror.NewFailedPreconditionf("activity is in non-unpausable state %v", a.GetStatus()) } return &activitypb.UnpauseActivityExecutionResponse{}, nil } @@ -861,8 +863,9 @@ func (a *Activity) isPaused() bool { case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: return true + default: + return false } - return false } func (a *Activity) unpause( From cc1bc6e141526ee1c30b03ee3061267c7797149d Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 26 May 2026 13:12:31 -0400 Subject: [PATCH 05/16] Ignore linter complaint --- tests/activity_api_reset_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/activity_api_reset_test.go b/tests/activity_api_reset_test.go index 8d3534aaa7..ee1fba2026 100644 --- a/tests/activity_api_reset_test.go +++ b/tests/activity_api_reset_test.go @@ -146,7 +146,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_AfterRetry() { return "", activityErr } - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck return "done!", nil } @@ -205,7 +205,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_WhileRunning() { var startedActivityCount atomic.Int32 activityFunction := func() (string, error) { startedActivityCount.Add(1) - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck return "done!", nil } @@ -279,7 +279,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_InRetry() { return "", activityErr } - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck return "done!", nil } @@ -351,7 +351,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_KeepPaused() { return "", activityErr } - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck return "done!", nil } @@ -472,7 +472,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityReset_HeartbeatDetails() { return "", errors.New("bad-luck-please-retry") } // not the first iteration - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck for activityShouldFinish.Load() == false { activity.RecordHeartbeat(ctx, "second") time.Sleep(time.Second) //nolint:forbidigo @@ -582,7 +582,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_WhilePaused() { if !activityWasReset.Load() { return "", errors.New("bad-luck-please-retry") } - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck return "done!", nil } @@ -661,7 +661,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_TerminateWhileDef activityFunction := func() (string, error) { startedActivityCount.Add(1) - s.WaitForChannel(ctx, activityBlockCh) + s.WaitForChannel(ctx, activityBlockCh) //nolint:staticcheck return "done!", nil } From 6ff43bc2360616294920a706aee25af8f19be3be Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Tue, 26 May 2026 14:25:21 -0400 Subject: [PATCH 06/16] Remove unnecessary timeouts from tests --- tests/standalone_activity_test.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index f94e29ad3c..e7ae955bbb 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -7836,15 +7836,14 @@ func (s *standaloneActivityTestSuite) TestPauseActivityExecution() { taskQueue := testcore.RandomizeStr(t.Name()) _, err := env.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ - Namespace: env.Namespace().String(), - ActivityId: activityID, - ActivityType: env.Tv().ActivityType(), - Identity: env.Tv().WorkerIdentity(), - Input: defaultInput, - TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, - StartToCloseTimeout: durationpb.New(1 * time.Second), - ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), - RequestId: env.Tv().RequestID(), + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(1 * time.Second), + RequestId: env.Tv().RequestID(), RetryPolicy: &commonpb.RetryPolicy{ MaximumAttempts: 10, InitialInterval: durationpb.New(1 * time.Millisecond), @@ -7883,9 +7882,8 @@ func (s *standaloneActivityTestSuite) TestPauseActivityExecution() { }, 10*time.Second, 200*time.Millisecond) }) - // HeartbeatTimeoutWhilePauseRequested: as above but for the heartbeat timer. Without the - // validator accepting PAUSE_REQUESTED, the HeartbeatTimeoutTask is silently dropped and the - // activity never times out (only the longer ScheduleToCloseTimeout would catch it). + // HeartbeatTimeoutWhilePauseRequested: as StartToCloseTimeoutWhilePauseRequested but for the + // heartbeat timer. t.Run("HeartbeatTimeoutWhilePauseRequested", func(t *testing.T) { ctx, cancel := context.WithTimeout(testcore.NewContext(), 30*time.Second) defer cancel() @@ -7899,7 +7897,6 @@ func (s *standaloneActivityTestSuite) TestPauseActivityExecution() { Identity: env.Tv().WorkerIdentity(), Input: defaultInput, TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, - StartToCloseTimeout: durationpb.New(1 * time.Minute), HeartbeatTimeout: durationpb.New(1 * time.Second), ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), RequestId: env.Tv().RequestID(), From f3cb3c71e9ccdd9e1a58055ae0282254e2ea6dec Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 27 May 2026 18:40:52 -0400 Subject: [PATCH 07/16] Emit ActivityUnpause metric only from the unpause handler Previously TransitionUnpaused and TransitionUnpausedToStarted both emitted the metric inside the transition body, which meant a reset of a PAUSE_REQUESTED activity (which invokes TransitionUnpausedToStarted internally) was counted as an unpause. Hoist the emission into handleUnpauseRequested so it fires only on real unpause requests. --- chasm/lib/activity/activity.go | 2 +- chasm/lib/activity/statemachine.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index a2a2d4a5b7..07a2b3c54d 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -853,6 +853,7 @@ func (a *Activity) handleUnpauseRequested(ctx chasm.MutableContext, req *activit default: return nil, serviceerror.NewFailedPreconditionf("activity is in non-unpausable state %v", a.GetStatus()) } + a.emitOnUnpausedMetrics(metricsHandler) return &activitypb.UnpauseActivityExecutionResponse{}, nil } @@ -896,7 +897,6 @@ func (a *Activity) unpause( a, chasm.TaskAttributes{ScheduledTime: scheduleTime}, &activitypb.ActivityDispatchTask{Stamp: attempt.GetStamp()}) - a.emitOnUnpausedMetrics(event.metricsHandler) } func (a *Activity) recordPauseState( diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index 208b9d8aea..49937a5808 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -480,7 +480,6 @@ var TransitionUnpausedToStarted = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, func(a *Activity, ctx chasm.MutableContext, event unpauseEvent) error { - a.emitOnUnpausedMetrics(event.metricsHandler) return nil }, ) From 968727a4156914a655fb8426f33697a0529c4255 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 27 May 2026 18:52:12 -0400 Subject: [PATCH 08/16] test: regression coverage for Reset(KeepPaused=true) on PAUSE_REQUESTED Pins the contract between handleReset and TransitionAttemptFailedToPaused: a reset with KeepPaused=true on a PAUSE_REQUESTED activity defers via ActivityReset, the next worker yield consumes it, and the activity lands in PAUSED at attempt 1. --- tests/standalone_activity_test.go | 94 +++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index e7ae955bbb..d1a9e7e801 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -8006,6 +8006,100 @@ func (s *standaloneActivityTestSuite) TestPauseActivityExecution() { }, 10*time.Second, 200*time.Millisecond) }) + // ResetKeepPausedWhilePauseRequested: a reset with KeepPaused=true on a PAUSE_REQUESTED + // activity must defer the reset (via ActivityReset) and have it consumed by the next retry, + // landing the activity in PAUSED at attempt 1. Pins the contract between handleReset and + // TransitionAttemptFailedToPaused. + t.Run("ResetKeepPausedWhilePauseRequested", func(t *testing.T) { + ctx, cancel := context.WithTimeout(testcore.NewContext(), 30*time.Second) + defer cancel() + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + startResp, err := env.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(1 * time.Second), + RequestId: env.Tv().RequestID(), + RetryPolicy: &commonpb.RetryPolicy{ + MaximumAttempts: 10, + InitialInterval: durationpb.New(1 * time.Millisecond), + BackoffCoefficient: 1.0, + }, + }) + require.NoError(t, err) + + // Drive the activity to attempt 2 by polling then failing attempt 1. + pollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + _, err = env.FrontendClient().RespondActivityTaskFailed(ctx, &workflowservice.RespondActivityTaskFailedRequest{ + Namespace: env.Namespace().String(), + TaskToken: pollResp.GetTaskToken(), + Failure: &failurepb.Failure{ + Message: "fail attempt 1", + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ + Type: "TestFailure", + }}, + }, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + + // Worker polls attempt 2 → STARTED at attempt 2. + require.EventuallyWithT(t, func(c *assert.CollectT) { + dr, dErr := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + }) + require.NoError(c, dErr) + require.EqualValues(c, 2, dr.GetInfo().GetAttempt()) + }, 5*time.Second, 100*time.Millisecond) + _, err = env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + + // Pause while STARTED at attempt 2 → PAUSE_REQUESTED. + _, err = env.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + Identity: "test-identity", + Reason: "test-pause", + }) + require.NoError(t, err) + + // Reset with KeepPaused=true. Reset is deferred via ActivityReset; status stays PAUSE_REQUESTED. + _, err = env.FrontendClient().ResetActivityExecution(ctx, &workflowservice.ResetActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + KeepPaused: true, + }) + require.NoError(t, err) + + // Worker stops responding. StartToCloseTimeout fires → TransitionAttemptFailedToPaused + // consumes ActivityReset → PAUSED at attempt 1. + require.EventuallyWithT(t, func(c *assert.CollectT) { + dr, dErr := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + }) + require.NoError(c, dErr) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_PAUSED, dr.GetInfo().GetRunState()) + require.EqualValues(c, 1, dr.GetInfo().GetAttempt()) + }, 10*time.Second, 200*time.Millisecond) + }) + // PauseRequestValidation: validate that Pause rejects invalid request fields. t.Run("PauseRequestValidation", func(t *testing.T) { ctx := testcore.NewContext() From 36bb3a3cf783da65569232df02e814ebb1a0950b Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 27 May 2026 19:05:47 -0400 Subject: [PATCH 09/16] Don't clear unpause state --- chasm/lib/activity/activity.go | 1 - 1 file changed, 1 deletion(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 07a2b3c54d..4c11a9e9c5 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -873,7 +873,6 @@ func (a *Activity) unpause( ctx chasm.MutableContext, event unpauseEvent, ) { - a.PauseState = nil attempt := a.LastAttempt.Get(ctx) if event.req.GetResetAttempts() { attempt.Count = 1 From d5075cefa380fc107280065736886a99e3816393 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 27 May 2026 19:08:49 -0400 Subject: [PATCH 10/16] Update heartbeat test to PAUSE_REQUESTED status The test was written for the old hybrid model (PauseState non-nil while status stays STARTED meant 'paused'). RecordHeartbeat now derives ActivityPaused from status == PAUSE_REQUESTED, so the test cases must set that status to exercise the path. --- chasm/lib/activity/activity_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/chasm/lib/activity/activity_test.go b/chasm/lib/activity/activity_test.go index 06888df8d8..c85cbcb2d9 100644 --- a/chasm/lib/activity/activity_test.go +++ b/chasm/lib/activity/activity_test.go @@ -292,14 +292,14 @@ func TestRecordHeartbeatPauseResetCancelFlags(t *testing.T) { wantReset: true, }, { - name: "pause set propagates ActivityPaused", - status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + name: "PAUSE_REQUESTED status propagates ActivityPaused", + status: activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, pauseState: &activitypb.ActivityPauseState{PauseTime: timestamppb.New(testTime)}, wantPaused: true, }, { name: "pause and reset both propagate", - status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + status: activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, pauseState: &activitypb.ActivityPauseState{PauseTime: timestamppb.New(testTime)}, activityReset: true, wantPaused: true, From 8e05bcfb91e7d70165f67c6000caf8a574a34ddf Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 27 May 2026 19:15:55 -0400 Subject: [PATCH 11/16] Don't set pause state in tests --- chasm/lib/activity/activity_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/chasm/lib/activity/activity_test.go b/chasm/lib/activity/activity_test.go index c85cbcb2d9..ded7b7fa83 100644 --- a/chasm/lib/activity/activity_test.go +++ b/chasm/lib/activity/activity_test.go @@ -272,7 +272,6 @@ func TestRecordHeartbeatPauseResetCancelFlags(t *testing.T) { testCases := []struct { name string status activitypb.ActivityExecutionStatus - pauseState *activitypb.ActivityPauseState activityReset bool wantPaused bool wantReset bool @@ -294,13 +293,11 @@ func TestRecordHeartbeatPauseResetCancelFlags(t *testing.T) { { name: "PAUSE_REQUESTED status propagates ActivityPaused", status: activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, - pauseState: &activitypb.ActivityPauseState{PauseTime: timestamppb.New(testTime)}, wantPaused: true, }, { name: "pause and reset both propagate", status: activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, - pauseState: &activitypb.ActivityPauseState{PauseTime: timestamppb.New(testTime)}, activityReset: true, wantPaused: true, wantReset: true, @@ -333,7 +330,6 @@ func TestRecordHeartbeatPauseResetCancelFlags(t *testing.T) { ActivityState: &activitypb.ActivityState{ Status: tc.status, HeartbeatTimeout: durationpb.New(0), - PauseState: tc.pauseState, ActivityReset: tc.activityReset, }, LastAttempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{Count: attempt}), From fe2bb0200dfbbb428aa24adfe38687204f3e160c Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 27 May 2026 19:27:00 -0400 Subject: [PATCH 12/16] Rename transitions --- chasm/lib/activity/activity.go | 8 ++++---- chasm/lib/activity/statemachine.go | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 4c11a9e9c5..1ddb063fd4 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -847,7 +847,7 @@ func (a *Activity) handleUnpauseRequested(ctx chasm.MutableContext, req *activit return nil, err } case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: - if err := TransitionUnpausedToStarted.Apply(a, ctx, event); err != nil { + if err := TransitionUnpausedWhilePauseRequested.Apply(a, ctx, event); err != nil { return nil, err } default: @@ -981,7 +981,7 @@ func (a *Activity) handleReset(ctx chasm.MutableContext, req *activitypb.ResetAc } if a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED && !keepPaused { // Unpause; the deferred reset will apply on the next retry via STARTED->SCHEDULED. - if err := TransitionUnpausedToStarted.Apply(a, ctx, unpauseEvent{ + if err := TransitionUnpausedWhilePauseRequested.Apply(a, ctx, unpauseEvent{ req: &workflowservice.UnpauseActivityExecutionRequest{}, metricsHandler: metricsHandler, }); err != nil { @@ -1085,13 +1085,13 @@ func (a *Activity) tryReschedule( } event := rescheduleEvent{retryInterval: retryInterval, failure: failure} if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED { - return true, TransitionAttemptFailedToPaused.Apply(a, ctx, event) + return true, TransitionAttemptFailedWhilePauseRequested.Apply(a, ctx, event) } return true, TransitionRescheduled.Apply(a, ctx, event) } func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration) { - if !TransitionRescheduled.Possible(a) && !TransitionAttemptFailedToPaused.Possible(a) { + if !TransitionRescheduled.Possible(a) && !TransitionAttemptFailedWhilePauseRequested.Possible(a) { return false, 0 } attempt := a.LastAttempt.Get(ctx) diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index 49937a5808..f7bc957f4f 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -471,10 +471,10 @@ var TransitionUnpaused = chasm.NewTransition( }, ) -// TransitionUnpausedToStarted transitions PAUSE_REQUESTED → STARTED. The worker is still in charge +// TransitionUnpausedWhilePauseRequested transitions PAUSE_REQUESTED → STARTED. The worker is still in charge // of the activity. Its task token is not invalidated by this transition, and there is no stamp bump // since StartToCloseTimeoutTask and HeartbeatTimeoutTask must stay valid. -var TransitionUnpausedToStarted = chasm.NewTransition( +var TransitionUnpausedWhilePauseRequested = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, @@ -484,10 +484,10 @@ var TransitionUnpausedToStarted = chasm.NewTransition( }, ) -// TransitionAttemptFailedToPaused transitions PAUSE_REQUESTED → PAUSED. It is performed instead of +// TransitionAttemptFailedWhilePauseRequested transitions PAUSE_REQUESTED → PAUSED. It is performed instead of // TransitionReschedule, when the activity is in PAUSE_REQUESTED and the worker yields (failure or // timeout) with retries remaining. The failed attempt is recorded and Count is incremented. -var TransitionAttemptFailedToPaused = chasm.NewTransition( +var TransitionAttemptFailedWhilePauseRequested = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, From 21a3f020830bc2a538c41aea53eef7ac028a1952 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 27 May 2026 19:01:31 -0400 Subject: [PATCH 13/16] Extract shared applyFailedAttempt helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TransitionRescheduled (→ SCHEDULED) and TransitionAttemptFailedToPaused (→ PAUSED) had identical preludes: consume deferred reset, bump attempt count and stamp, record the failure. Pull that into a shared method so the two paths can't silently diverge — they now differ only in whether they additionally emit dispatch tasks. --- chasm/lib/activity/activity.go | 19 +++++++++++++++ chasm/lib/activity/statemachine.go | 37 +++--------------------------- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 1ddb063fd4..c4554f7349 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -1045,6 +1045,25 @@ func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableC return nil } +// applyFailedAttempt is the shared accounting performed when a worker yields with retries +// remaining: it consumes any deferred reset, bumps the attempt count and stamp, and records the +// failure. Both TransitionRescheduled (→ SCHEDULED) and TransitionAttemptFailedToPaused (→ PAUSED) +// call it; they differ only in whether they additionally emit dispatch tasks. +func (a *Activity) applyFailedAttempt(ctx chasm.MutableContext, event rescheduleEvent) error { + attempt := a.LastAttempt.Get(ctx) + if a.ActivityReset { + attempt.Count = 0 + a.ActivityReset = false + if a.ResetHeartbeats { + a.ResetHeartbeats = false + a.clearHeartbeat(ctx) + } + } + attempt.Count++ + attempt.Stamp++ + return a.recordFailedAttempt(ctx, event.retryInterval, event.failure, ctx.Now(a), false) +} + // recordFailedAttempt records any failures resulting from a tried attempt, including worker application failures and // start-to-close timeouts. Since the calls come from retried attempts we update the attempt failure info but leave // the outcome failure empty to avoid duplication. diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index f7bc957f4f..d883fe89bb 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -100,27 +100,11 @@ var TransitionRescheduled = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, func(a *Activity, ctx chasm.MutableContext, event rescheduleEvent) error { - attempt := a.LastAttempt.Get(ctx) - currentTime := ctx.Now(a) - - // Apply deferred reset: set Count to 0 so the increment below produces 1. - if a.ActivityReset { - attempt.Count = 0 - a.ActivityReset = false - if a.ResetHeartbeats { - a.ResetHeartbeats = false - a.clearHeartbeat(ctx) - } - } - - attempt.Count++ - attempt.Stamp++ - - err := a.recordFailedAttempt(ctx, event.retryInterval, event.failure, currentTime, false) - if err != nil { + if err := a.applyFailedAttempt(ctx, event); err != nil { return err } + attempt := a.LastAttempt.Get(ctx) retryScheduledTime := attemptScheduleTimeForRetry(attempt).AsTime() if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 { @@ -493,22 +477,7 @@ var TransitionAttemptFailedWhilePauseRequested = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, func(a *Activity, ctx chasm.MutableContext, event rescheduleEvent) error { - attempt := a.LastAttempt.Get(ctx) - currentTime := ctx.Now(a) - - if a.ActivityReset { - attempt.Count = 0 - a.ActivityReset = false - if a.ResetHeartbeats { - a.ResetHeartbeats = false - a.clearHeartbeat(ctx) - } - } - - attempt.Count++ - attempt.Stamp++ - - return a.recordFailedAttempt(ctx, event.retryInterval, event.failure, currentTime, false) + return a.applyFailedAttempt(ctx, event) }, ) From 39a14d558b0ddace16c2c01fb386a7f06d533b67 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 27 May 2026 19:31:55 -0400 Subject: [PATCH 14/16] fixup: Rename transitions --- chasm/lib/activity/activity.go | 2 +- tests/standalone_activity_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index c4554f7349..0735753e8d 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -1047,7 +1047,7 @@ func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableC // applyFailedAttempt is the shared accounting performed when a worker yields with retries // remaining: it consumes any deferred reset, bumps the attempt count and stamp, and records the -// failure. Both TransitionRescheduled (→ SCHEDULED) and TransitionAttemptFailedToPaused (→ PAUSED) +// failure. Both TransitionRescheduled (→ SCHEDULED) and AttemptFailedWhilePauseRequested (→ PAUSED) // call it; they differ only in whether they additionally emit dispatch tasks. func (a *Activity) applyFailedAttempt(ctx chasm.MutableContext, event rescheduleEvent) error { attempt := a.LastAttempt.Get(ctx) diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index d1a9e7e801..bc126f7eb4 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -8009,7 +8009,7 @@ func (s *standaloneActivityTestSuite) TestPauseActivityExecution() { // ResetKeepPausedWhilePauseRequested: a reset with KeepPaused=true on a PAUSE_REQUESTED // activity must defer the reset (via ActivityReset) and have it consumed by the next retry, // landing the activity in PAUSED at attempt 1. Pins the contract between handleReset and - // TransitionAttemptFailedToPaused. + // AttemptFailedWhilePauseRequested. t.Run("ResetKeepPausedWhilePauseRequested", func(t *testing.T) { ctx, cancel := context.WithTimeout(testcore.NewContext(), 30*time.Second) defer cancel() @@ -8087,7 +8087,7 @@ func (s *standaloneActivityTestSuite) TestPauseActivityExecution() { }) require.NoError(t, err) - // Worker stops responding. StartToCloseTimeout fires → TransitionAttemptFailedToPaused + // Worker stops responding. StartToCloseTimeout fires → AttemptFailedWhilePauseRequested // consumes ActivityReset → PAUSED at attempt 1. require.EventuallyWithT(t, func(c *assert.CollectT) { dr, dErr := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ From 6e1eff9d48775d9bf3e1124049fb3859e156fbb6 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 27 May 2026 20:01:26 -0400 Subject: [PATCH 15/16] Tweak proto comment --- .../activity/gen/activitypb/v1/activity_state.pb.go | 12 ++++++------ chasm/lib/activity/proto/v1/activity_state.proto | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go index e8bb358713..3636d95eff 100644 --- a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go @@ -61,15 +61,15 @@ const ( // Additionally, after all retries are exhausted for start-to-close or heartbeat timeouts, the activity will also // transition to timed out status. ACTIVITY_EXECUTION_STATUS_TIMED_OUT ActivityExecutionStatus = 8 - // The activity is paused and no worker is currently executing the attempt. No dispatch task - // will be issued until the activity is unpaused. Reachable from SCHEDULED via an operator pause, - // and from PAUSE_REQUESTED when the worker yields (failure with retries remaining, or timeout - // with retries remaining). + // The activity is paused and no worker is currently executing the attempt. No dispatch task will + // be issued until the activity is unpaused. Reachable from SCHEDULED via an operator pause, and + // from PAUSE_REQUESTED when the worker yields due to failure with retries remaining, or due to + // timeout with retries remaining. ACTIVITY_EXECUTION_STATUS_PAUSED ActivityExecutionStatus = 9 // An operator pause was received while the activity was STARTED. The worker is still executing // under its existing task token. The worker will be notified via ActivityPaused=true on its next - // heartbeat response. When the worker yields (failure with retries remaining, or timeout with - // retries remaining), the activity will transition to PAUSED. + // heartbeat response. When the worker yields due to failure with retries remaining, or due to + // timeout with retries remaining, the activity will transition to PAUSED. ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED ActivityExecutionStatus = 10 ) diff --git a/chasm/lib/activity/proto/v1/activity_state.proto b/chasm/lib/activity/proto/v1/activity_state.proto index 2e5bec35a7..acf05d6978 100644 --- a/chasm/lib/activity/proto/v1/activity_state.proto +++ b/chasm/lib/activity/proto/v1/activity_state.proto @@ -41,16 +41,16 @@ enum ActivityExecutionStatus { // Additionally, after all retries are exhausted for start-to-close or heartbeat timeouts, the activity will also // transition to timed out status. ACTIVITY_EXECUTION_STATUS_TIMED_OUT = 8; - // The activity is paused and no worker is currently executing the attempt. No dispatch task - // will be issued until the activity is unpaused. Reachable from SCHEDULED via an operator pause, - // and from PAUSE_REQUESTED when the worker yields (failure with retries remaining, or timeout - // with retries remaining). + // The activity is paused and no worker is currently executing the attempt. No dispatch task will + // be issued until the activity is unpaused. Reachable from SCHEDULED via an operator pause, and + // from PAUSE_REQUESTED when the worker yields due to failure with retries remaining, or due to + // timeout with retries remaining. ACTIVITY_EXECUTION_STATUS_PAUSED = 9; // An operator pause was received while the activity was STARTED. The worker is still executing // under its existing task token. The worker will be notified via ActivityPaused=true on its next - // heartbeat response. When the worker yields (failure with retries remaining, or timeout with - // retries remaining), the activity will transition to PAUSED. + // heartbeat response. When the worker yields due to failure with retries remaining, or due to + // timeout with retries remaining, the activity will transition to PAUSED. ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED = 10; } From 75fd1c500135d1bc6b6adbfb2316c5eb0c3cdb95 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Wed, 27 May 2026 20:12:24 -0400 Subject: [PATCH 16/16] Clean up comment --- chasm/lib/activity/activity.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 0735753e8d..eeed02c2da 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -1045,10 +1045,7 @@ func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableC return nil } -// applyFailedAttempt is the shared accounting performed when a worker yields with retries -// remaining: it consumes any deferred reset, bumps the attempt count and stamp, and records the -// failure. Both TransitionRescheduled (→ SCHEDULED) and AttemptFailedWhilePauseRequested (→ PAUSED) -// call it; they differ only in whether they additionally emit dispatch tasks. +// applyFailedAttempt mutates activity state when a worker yields with retries remaining. func (a *Activity) applyFailedAttempt(ctx chasm.MutableContext, event rescheduleEvent) error { attempt := a.LastAttempt.Get(ctx) if a.ActivityReset {