diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index 55c67f95e6..eeed02c2da 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -599,7 +599,9 @@ func (a *Activity) UpdateActivityExecutionOptions( attempt.Stamp++ - if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { + if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || + a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED || + a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED { // Re-create the start-to-close timeout task with the new stamp and (possibly updated) timeout. // The old task was invalidated by the stamp increment above. if timeout := a.GetStartToCloseTimeout().AsDuration(); timeout > 0 { @@ -793,7 +795,7 @@ func (a *Activity) handlePauseRequested(ctx chasm.MutableContext, req *activityp if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { return nil, serviceerror.NewFailedPrecondition("cannot pause an activity with a pending cancellation") } - if a.PauseState != nil { + if a.isPaused() { newReqID := req.GetFrontendRequest().GetRequestId() existingReqID := a.PauseState.GetRequestId() if newReqID != "" && existingReqID == newReqID { @@ -807,19 +809,19 @@ func (a *Activity) handlePauseRequested(ctx chasm.MutableContext, req *activityp return nil, err } - if TransitionPaused.Possible(a) { - // SCHEDULED → real PAUSED status; stamp bumped to invalidate the pending dispatch task. - if err := TransitionPaused.Apply(a, ctx, pauseEvent{ - req: req.GetFrontendRequest(), - metricsHandler: metricsHandler, - }); err != nil { + event := pauseEvent{req: req.GetFrontendRequest(), metricsHandler: metricsHandler} + switch a.GetStatus() { + case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: + if err := TransitionPaused.Apply(a, ctx, event); err != nil { return nil, err } - return &activitypb.PauseActivityExecutionResponse{}, nil + case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED: + if err := TransitionPauseRequested.Apply(a, ctx, event); err != nil { + return nil, err + } + default: + return nil, serviceerror.NewFailedPreconditionf("activity is in non-pausable state %v", a.GetStatus()) } - // STARTED → flag-only pause. Status stays STARTED so the worker's token remains valid. - // The worker will see ActivityPaused=true on the next heartbeat. - a.pause(ctx, pauseEvent{req.GetFrontendRequest(), metricsHandler}) return &activitypb.PauseActivityExecutionResponse{}, nil } @@ -829,8 +831,7 @@ func (a *Activity) handleUnpauseRequested(ctx chasm.MutableContext, req *activit if a.isTerminal() { return nil, serviceerror.NewFailedPreconditionf("activity is in terminal state %v", a.GetStatus()) } - // Not paused → no-op. - if a.PauseState == nil { + if !a.isPaused() { return &activitypb.UnpauseActivityExecutionResponse{}, nil } @@ -839,38 +840,39 @@ func (a *Activity) handleUnpauseRequested(ctx chasm.MutableContext, req *activit return nil, err } - if TransitionUnpaused.Possible(a) { - if err := TransitionUnpaused.Apply(a, ctx, unpauseEvent{ - req: req.GetFrontendRequest(), - metricsHandler: metricsHandler, - }); err != nil { + event := unpauseEvent{req: req.GetFrontendRequest(), metricsHandler: metricsHandler} + switch a.GetStatus() { + case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED: + if err := TransitionUnpaused.Apply(a, ctx, event); err != nil { return nil, err } - return &activitypb.UnpauseActivityExecutionResponse{}, nil + case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: + if err := TransitionUnpausedWhilePauseRequested.Apply(a, ctx, event); err != nil { + return nil, err + } + default: + return nil, serviceerror.NewFailedPreconditionf("activity is in non-unpausable state %v", a.GetStatus()) } + a.emitOnUnpausedMetrics(metricsHandler) + return &activitypb.UnpauseActivityExecutionResponse{}, nil +} - // Flag-based pause (status is STARTED, CANCEL_REQUESTED, or SCHEDULED after retry while paused). - if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || - a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { - // Worker continues with its existing token — no stamp bump needed, no dispatch task. - // Cancel takes precedence over pause. Unpause clears the pause flag but does not re-dispatch; - // the activity remains CANCEL_REQUESTED and will be cancelled when the worker responds. - a.PauseState = nil - a.emitOnUnpausedMetrics(metricsHandler) - return &activitypb.UnpauseActivityExecutionResponse{}, nil +// isPaused reports whether the activity is currently paused (waiting) or has a pending pause request +// (worker still running). +func (a *Activity) isPaused() bool { + switch a.GetStatus() { + case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: + return true + default: + return false } - a.unpause(ctx, unpauseEvent{ - req: req.GetFrontendRequest(), - metricsHandler: metricsHandler, - }) - return &activitypb.UnpauseActivityExecutionResponse{}, nil } func (a *Activity) unpause( ctx chasm.MutableContext, event unpauseEvent, ) { - a.PauseState = nil attempt := a.LastAttempt.Get(ctx) if event.req.GetResetAttempts() { attempt.Count = 1 @@ -894,10 +896,9 @@ func (a *Activity) unpause( a, chasm.TaskAttributes{ScheduledTime: scheduleTime}, &activitypb.ActivityDispatchTask{Stamp: attempt.GetStamp()}) - a.emitOnUnpausedMetrics(event.metricsHandler) } -func (a *Activity) pause( +func (a *Activity) recordPauseState( ctx chasm.MutableContext, event pauseEvent, ) { @@ -970,44 +971,42 @@ func (a *Activity) handleReset(ctx chasm.MutableContext, req *activitypb.ResetAc switch a.Status { case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, - activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED: + activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: // Activity is running. Defer reset to the next retry so we don't break // the running worker's task token (which encodes the current attempt count). a.ActivityReset = true if frontendReq.GetResetHeartbeat() { a.ResetHeartbeats = true } - if !keepPaused { - // Clear PauseState now so TransitionRescheduled can dispatch without being - // blocked by the validator (which drops dispatch tasks when PauseState != nil). - a.PauseState = nil + if a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED && !keepPaused { + // Unpause; the deferred reset will apply on the next retry via STARTED->SCHEDULED. + if err := TransitionUnpausedWhilePauseRequested.Apply(a, ctx, unpauseEvent{ + req: &workflowservice.UnpauseActivityExecutionRequest{}, + metricsHandler: metricsHandler, + }); err != nil { + return nil, err + } } a.emitOnResetMetrics(metricsHandler) return &activitypb.ResetActivityExecutionResponse{}, nil - case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, - activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: - // A SCHEDULED activity can carry PauseState when a pause was applied concurrently - // (e.g. deferred from a STARTED→retry path). In that case the dispatch task emitted - // by TransitionReset would be dropped by the validator, leaving the activity stuck. - // Treat any non-nil PauseState the same way as the explicit PAUSED status. - if a.PauseState != nil { - if keepPaused { - // Reset counts but keep the activity paused. - // No dispatch task — the user must unpause to re-dispatch. - attempt := a.LastAttempt.Get(ctx) - attempt.Count = 1 - attempt.Stamp++ - attempt.CurrentRetryInterval = nil - if frontendReq.GetResetHeartbeat() { - a.clearHeartbeat(ctx) - } - a.emitOnResetMetrics(metricsHandler) - return &activitypb.ResetActivityExecutionResponse{}, nil + case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED: + if keepPaused { + // Reset counts but keep the activity paused. + attempt := a.LastAttempt.Get(ctx) + attempt.Count = 1 + attempt.Stamp++ + attempt.CurrentRetryInterval = nil + if frontendReq.GetResetHeartbeat() { + a.clearHeartbeat(ctx) } - // keepPaused=false: clear pause state so the dispatch task isn't dropped. - a.PauseState = nil + a.emitOnResetMetrics(metricsHandler) + return &activitypb.ResetActivityExecutionResponse{}, nil } + fallthrough + + case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED: if err := TransitionReset.Apply(a, ctx, resetEvent{ req: frontendReq, scheduleTime: scheduleTime, @@ -1046,6 +1045,22 @@ func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableC return nil } +// applyFailedAttempt mutates activity state when a worker yields with retries remaining. +func (a *Activity) applyFailedAttempt(ctx chasm.MutableContext, event rescheduleEvent) error { + attempt := a.LastAttempt.Get(ctx) + if a.ActivityReset { + attempt.Count = 0 + a.ActivityReset = false + if a.ResetHeartbeats { + a.ResetHeartbeats = false + a.clearHeartbeat(ctx) + } + } + attempt.Count++ + attempt.Stamp++ + return a.recordFailedAttempt(ctx, event.retryInterval, event.failure, ctx.Now(a), false) +} + // recordFailedAttempt records any failures resulting from a tried attempt, including worker application failures and // start-to-close timeouts. Since the calls come from retried attempts we update the attempt failure info but leave // the outcome failure empty to avoid duplication. @@ -1073,9 +1088,8 @@ func (a *Activity) recordFailedAttempt( } // tryReschedule attempts to reschedule the activity for retry. Returns true if rescheduled, false -// if retry is not possible. When the activity has PauseState set (flag-based pause from STARTED), -// the retry transitions to SCHEDULED normally but the dispatch task is blocked by the pause flag -// until the activity is unpaused. +// if retry is not possible. If a pause request has been received then we transition to Paused; +// otherwise to Scheduled. func (a *Activity) tryReschedule( ctx chasm.MutableContext, overridingRetryInterval time.Duration, @@ -1085,14 +1099,15 @@ func (a *Activity) tryReschedule( if !shouldRetry { return false, nil } - return true, TransitionRescheduled.Apply(a, ctx, rescheduleEvent{ - retryInterval: retryInterval, - failure: failure, - }) + event := rescheduleEvent{retryInterval: retryInterval, failure: failure} + if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED { + return true, TransitionAttemptFailedWhilePauseRequested.Apply(a, ctx, event) + } + return true, TransitionRescheduled.Apply(a, ctx, event) } func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration) { - if !TransitionRescheduled.Possible(a) { + if !TransitionRescheduled.Possible(a) && !TransitionAttemptFailedWhilePauseRequested.Possible(a) { return false, 0 } attempt := a.LastAttempt.Get(ctx) @@ -1187,7 +1202,7 @@ func (a *Activity) RecordHeartbeat( } return &historyservice.RecordActivityTaskHeartbeatResponse{ CancelRequested: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, - ActivityPaused: a.PauseState != nil, + ActivityPaused: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, ActivityReset: a.ActivityReset, }, nil } @@ -1198,7 +1213,8 @@ func InternalStatusToAPIStatus(status activitypb.ActivityExecutionStatus) enumsp case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, - activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED: + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: return enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED: return enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED @@ -1227,6 +1243,8 @@ func internalStatusToRunState(status activitypb.ActivityExecutionStatus) enumspb return enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED: return enumspb.PENDING_ACTIVITY_STATE_PAUSED + case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: + return enumspb.PENDING_ACTIVITY_STATE_PAUSE_REQUESTED case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED, @@ -1241,19 +1259,7 @@ func internalStatusToRunState(status activitypb.ActivityExecutionStatus) enumspb func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) *apiactivitypb.ActivityExecutionInfo { status := InternalStatusToAPIStatus(a.GetStatus()) - // Derive the external run state with hybrid pause logic: - // PAUSED status (real) → PAUSED - // STARTED + PauseState != nil (pause requested while running) → PAUSE_REQUESTED - // SCHEDULED + PauseState != nil (retry while paused flag set) → PAUSED - // All other cases → derived from internal status directly - var runState enumspb.PendingActivityState - if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && a.PauseState != nil { - runState = enumspb.PENDING_ACTIVITY_STATE_PAUSE_REQUESTED - } else if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED && a.PauseState != nil { - runState = enumspb.PENDING_ACTIVITY_STATE_PAUSED - } else { - runState = internalStatusToRunState(a.GetStatus()) - } + runState := internalStatusToRunState(a.GetStatus()) requestData := a.RequestData.Get(ctx) attempt := a.LastAttempt.Get(ctx) @@ -1466,7 +1472,8 @@ func (a *Activity) validateActivityTaskToken( requestNamespaceID string, ) error { if a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && - a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { + a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED && + a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED { return serviceerror.NewNotFound("activity task not found") } if token.Attempt != ByIDTokenAttempt && token.Attempt != a.LastAttempt.Get(ctx).GetCount() { diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index 52d5112789..8f9aaed7a5 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -35,11 +35,7 @@ func (h *activityDispatchTaskHandler) Validate( _ chasm.TaskAttributes, task *activitypb.ActivityDispatchTask, ) (bool, error) { - // Do not dispatch while the activity has a pause flag set (SCHEDULED + PauseState from a retry - // while a STARTED activity was flag-paused). TransitionStarted.Possible already returns false for - // real PAUSED status activities (source must be SCHEDULED, and PAUSED → SCHEDULED via unpause). return (TransitionStarted.Possible(activity) && - activity.PauseState == nil && task.Stamp == activity.LastAttempt.Get(ctx).GetStamp()), nil } @@ -96,9 +92,7 @@ func (h *scheduleToStartTimeoutTaskHandler) Validate( _ chasm.TaskAttributes, task *activitypb.ScheduleToStartTimeoutTask, ) (bool, error) { - // Do not time out a SCHEDULED activity that has the pause flag set (retry while paused). return (activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED && - activity.PauseState == nil && task.Stamp == activity.LastAttempt.Get(ctx).GetStamp()), nil } @@ -182,7 +176,8 @@ func (h *startToCloseTimeoutTaskHandler) Validate( task *activitypb.StartToCloseTimeoutTask, ) (bool, error) { valid := ((activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || - activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED) && + activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED || + activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED) && task.Stamp == activity.LastAttempt.Get(ctx).GetStamp()) return valid, nil } @@ -243,7 +238,8 @@ func (h *heartbeatTimeoutTaskHandler) Validate( // last heartbeat was received after hb_i. If so, we reject this timeout task. Otherwise, the // Execute function runs and we fail the attempt. if activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && - activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { + activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED && + activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED { return false, nil } // Task attempt must still match current attempt. diff --git a/chasm/lib/activity/activity_test.go b/chasm/lib/activity/activity_test.go index 06888df8d8..ded7b7fa83 100644 --- a/chasm/lib/activity/activity_test.go +++ b/chasm/lib/activity/activity_test.go @@ -272,7 +272,6 @@ func TestRecordHeartbeatPauseResetCancelFlags(t *testing.T) { testCases := []struct { name string status activitypb.ActivityExecutionStatus - pauseState *activitypb.ActivityPauseState activityReset bool wantPaused bool wantReset bool @@ -292,15 +291,13 @@ func TestRecordHeartbeatPauseResetCancelFlags(t *testing.T) { wantReset: true, }, { - name: "pause set propagates ActivityPaused", - status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, - pauseState: &activitypb.ActivityPauseState{PauseTime: timestamppb.New(testTime)}, + name: "PAUSE_REQUESTED status propagates ActivityPaused", + status: activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, wantPaused: true, }, { name: "pause and reset both propagate", - status: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, - pauseState: &activitypb.ActivityPauseState{PauseTime: timestamppb.New(testTime)}, + status: activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, activityReset: true, wantPaused: true, wantReset: true, @@ -333,7 +330,6 @@ func TestRecordHeartbeatPauseResetCancelFlags(t *testing.T) { ActivityState: &activitypb.ActivityState{ Status: tc.status, HeartbeatTimeout: durationpb.New(0), - PauseState: tc.pauseState, ActivityReset: tc.activityReset, }, LastAttempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{Count: attempt}), diff --git a/chasm/lib/activity/gen/activitypb/v1/activity_state.go-helpers.pb.go b/chasm/lib/activity/gen/activitypb/v1/activity_state.go-helpers.pb.go index 8ca3e1cb52..03a1d65a46 100644 --- a/chasm/lib/activity/gen/activitypb/v1/activity_state.go-helpers.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/activity_state.go-helpers.pb.go @@ -315,6 +315,7 @@ var ( "Terminated": 7, "TimedOut": 8, "Paused": 9, + "PauseRequested": 10, } ) diff --git a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go index 0a93abdde5..3636d95eff 100644 --- a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go @@ -61,31 +61,32 @@ const ( // Additionally, after all retries are exhausted for start-to-close or heartbeat timeouts, the activity will also // transition to timed out status. ACTIVITY_EXECUTION_STATUS_TIMED_OUT ActivityExecutionStatus = 8 - // The activity has been paused while in the SCHEDULED state. No worker will be dispatched until - // the activity is unpaused. The activity's pause_state field is populated with the identity, - // reason, and time of the pause request. - // - // Note: pausing a STARTED activity does not transition to this status. Instead, the pause is - // delivered as a flag (pause_state is set, status stays STARTED) and the worker is notified - // via ActivityPaused=true on its next heartbeat. The external run state in that case is - // PAUSE_REQUESTED. If the worker fails and retries while the flag is set, the retry lands in - // SCHEDULED with pause_state still populated and the dispatch task is blocked until unpause. + // The activity is paused and no worker is currently executing the attempt. No dispatch task will + // be issued until the activity is unpaused. Reachable from SCHEDULED via an operator pause, and + // from PAUSE_REQUESTED when the worker yields due to failure with retries remaining, or due to + // timeout with retries remaining. ACTIVITY_EXECUTION_STATUS_PAUSED ActivityExecutionStatus = 9 + // An operator pause was received while the activity was STARTED. The worker is still executing + // under its existing task token. The worker will be notified via ActivityPaused=true on its next + // heartbeat response. When the worker yields due to failure with retries remaining, or due to + // timeout with retries remaining, the activity will transition to PAUSED. + ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED ActivityExecutionStatus = 10 ) // Enum value maps for ActivityExecutionStatus. var ( ActivityExecutionStatus_name = map[int32]string{ - 0: "ACTIVITY_EXECUTION_STATUS_UNSPECIFIED", - 1: "ACTIVITY_EXECUTION_STATUS_SCHEDULED", - 2: "ACTIVITY_EXECUTION_STATUS_STARTED", - 3: "ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED", - 4: "ACTIVITY_EXECUTION_STATUS_COMPLETED", - 5: "ACTIVITY_EXECUTION_STATUS_FAILED", - 6: "ACTIVITY_EXECUTION_STATUS_CANCELED", - 7: "ACTIVITY_EXECUTION_STATUS_TERMINATED", - 8: "ACTIVITY_EXECUTION_STATUS_TIMED_OUT", - 9: "ACTIVITY_EXECUTION_STATUS_PAUSED", + 0: "ACTIVITY_EXECUTION_STATUS_UNSPECIFIED", + 1: "ACTIVITY_EXECUTION_STATUS_SCHEDULED", + 2: "ACTIVITY_EXECUTION_STATUS_STARTED", + 3: "ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED", + 4: "ACTIVITY_EXECUTION_STATUS_COMPLETED", + 5: "ACTIVITY_EXECUTION_STATUS_FAILED", + 6: "ACTIVITY_EXECUTION_STATUS_CANCELED", + 7: "ACTIVITY_EXECUTION_STATUS_TERMINATED", + 8: "ACTIVITY_EXECUTION_STATUS_TIMED_OUT", + 9: "ACTIVITY_EXECUTION_STATUS_PAUSED", + 10: "ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED", } ActivityExecutionStatus_value = map[string]int32{ "ACTIVITY_EXECUTION_STATUS_UNSPECIFIED": 0, @@ -98,6 +99,7 @@ var ( "ACTIVITY_EXECUTION_STATUS_TERMINATED": 7, "ACTIVITY_EXECUTION_STATUS_TIMED_OUT": 8, "ACTIVITY_EXECUTION_STATUS_PAUSED": 9, + "ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED": 10, } ) @@ -131,6 +133,8 @@ func (x ActivityExecutionStatus) String() string { // Deprecated: Use ActivityExecutionStatus.Descriptor instead. case ACTIVITY_EXECUTION_STATUS_PAUSED: return "Paused" + case ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED: + return "PauseRequested" default: return strconv.Itoa(int(x)) } @@ -1114,7 +1118,7 @@ const file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawD "\x06output\x18\x01 \x01(\v2 .temporal.api.common.v1.PayloadsR\x06output\x1aD\n" + "\x06Failed\x12:\n" + "\afailure\x18\x01 \x01(\v2 .temporal.api.failure.v1.FailureR\afailureB\t\n" + - "\avariant*\xb4\x03\n" + + "\avariant*\xe3\x03\n" + "\x17ActivityExecutionStatus\x12)\n" + "%ACTIVITY_EXECUTION_STATUS_UNSPECIFIED\x10\x00\x12'\n" + "#ACTIVITY_EXECUTION_STATUS_SCHEDULED\x10\x01\x12%\n" + @@ -1125,7 +1129,9 @@ const file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawD "\"ACTIVITY_EXECUTION_STATUS_CANCELED\x10\x06\x12(\n" + "$ACTIVITY_EXECUTION_STATUS_TERMINATED\x10\a\x12'\n" + "#ACTIVITY_EXECUTION_STATUS_TIMED_OUT\x10\b\x12$\n" + - " ACTIVITY_EXECUTION_STATUS_PAUSED\x10\tBDZBgo.temporal.io/server/chasm/lib/activity/gen/activitypb;activitypbb\x06proto3" + " ACTIVITY_EXECUTION_STATUS_PAUSED\x10\t\x12-\n" + + ")ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED\x10\n" + + "BDZBgo.temporal.io/server/chasm/lib/activity/gen/activitypb;activitypbb\x06proto3" var ( file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawDescOnce sync.Once diff --git a/chasm/lib/activity/proto/v1/activity_state.proto b/chasm/lib/activity/proto/v1/activity_state.proto index 40a3401b04..acf05d6978 100644 --- a/chasm/lib/activity/proto/v1/activity_state.proto +++ b/chasm/lib/activity/proto/v1/activity_state.proto @@ -41,16 +41,17 @@ enum ActivityExecutionStatus { // Additionally, after all retries are exhausted for start-to-close or heartbeat timeouts, the activity will also // transition to timed out status. ACTIVITY_EXECUTION_STATUS_TIMED_OUT = 8; - // The activity has been paused while in the SCHEDULED state. No worker will be dispatched until - // the activity is unpaused. The activity's pause_state field is populated with the identity, - // reason, and time of the pause request. - // - // Note: pausing a STARTED activity does not transition to this status. Instead, the pause is - // delivered as a flag (pause_state is set, status stays STARTED) and the worker is notified - // via ActivityPaused=true on its next heartbeat. The external run state in that case is - // PAUSE_REQUESTED. If the worker fails and retries while the flag is set, the retry lands in - // SCHEDULED with pause_state still populated and the dispatch task is blocked until unpause. + // The activity is paused and no worker is currently executing the attempt. No dispatch task will + // be issued until the activity is unpaused. Reachable from SCHEDULED via an operator pause, and + // from PAUSE_REQUESTED when the worker yields due to failure with retries remaining, or due to + // timeout with retries remaining. ACTIVITY_EXECUTION_STATUS_PAUSED = 9; + + // An operator pause was received while the activity was STARTED. The worker is still executing + // under its existing task token. The worker will be notified via ActivityPaused=true on its next + // heartbeat response. When the worker yields due to failure with retries remaining, or due to + // timeout with retries remaining, the activity will transition to PAUSED. + ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED = 10; } message ActivityState { diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index 675e4408f8..d883fe89bb 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -100,27 +100,11 @@ var TransitionRescheduled = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, func(a *Activity, ctx chasm.MutableContext, event rescheduleEvent) error { - attempt := a.LastAttempt.Get(ctx) - currentTime := ctx.Now(a) - - // Apply deferred reset: set Count to 0 so the increment below produces 1. - if a.ActivityReset { - attempt.Count = 0 - a.ActivityReset = false - if a.ResetHeartbeats { - a.ResetHeartbeats = false - a.clearHeartbeat(ctx) - } - } - - attempt.Count++ - attempt.Stamp++ - - err := a.recordFailedAttempt(ctx, event.retryInterval, event.failure, currentTime, false) - if err != nil { + if err := a.applyFailedAttempt(ctx, event); err != nil { return err } + attempt := a.LastAttempt.Get(ctx) retryScheduledTime := attemptScheduleTimeForRetry(attempt).AsTime() if timeout := a.GetScheduleToStartTimeout().AsDuration(); timeout > 0 { @@ -199,11 +183,11 @@ var TransitionCompleted = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, func(a *Activity, ctx chasm.MutableContext, event completeEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { - a.PauseState = nil a.ActivityReset = false a.ResetHeartbeats = false @@ -236,12 +220,12 @@ var TransitionFailed = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, func(a *Activity, ctx chasm.MutableContext, event failedEvent) error { return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { req := event.req.GetFailedRequest() - a.PauseState = nil a.ActivityReset = false a.ResetHeartbeats = false @@ -277,6 +261,7 @@ var TransitionTerminated = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, func(a *Activity, ctx chasm.MutableContext, event terminateEvent) error { @@ -284,7 +269,6 @@ var TransitionTerminated = chasm.NewTransition( a.TerminateState = &activitypb.ActivityTerminateState{ RequestId: event.request.RequestID, } - a.PauseState = nil a.ActivityReset = false a.ResetHeartbeats = false outcome := a.Outcome.Get(ctx) @@ -310,14 +294,13 @@ var TransitionTerminated = chasm.NewTransition( ) // TransitionCancelRequested transitions to CancelRequested status. -// PAUSED activities (real status, no worker) are cancelled immediately in handleCancellationRequested -// rather than waiting for a worker response. var TransitionCancelRequested = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, func(a *Activity, ctx chasm.MutableContext, req *workflowservice.RequestCancelActivityExecutionRequest) error { @@ -361,7 +344,6 @@ var TransitionCanceled = chasm.NewTransition( Failure: failure, }, } - a.PauseState = nil a.ActivityReset = false a.ResetHeartbeats = false @@ -385,6 +367,7 @@ var TransitionTimedOut = chasm.NewTransition( activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, }, activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, func(a *Activity, ctx chasm.MutableContext, event timeoutEvent) error { @@ -409,7 +392,6 @@ var TransitionTimedOut = chasm.NewTransition( return err } - a.PauseState = nil a.ActivityReset = false a.ResetHeartbeats = false @@ -427,29 +409,41 @@ type pauseEvent struct { // TransitionPaused transitions a SCHEDULED activity to PAUSED status. The stamp is bumped to // invalidate any pending dispatch task so the activity is not dispatched while paused. -// -// Note: STARTED activities are NOT paused via this transition. Pausing a STARTED activity is a -// flag-only operation (PauseState is set, status stays STARTED) so the worker's token remains -// valid and the worker is notified via ActivityPaused=true on its next heartbeat. See -// handlePauseRequested for the full hybrid logic. var TransitionPaused = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, }, activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, func(a *Activity, ctx chasm.MutableContext, event pauseEvent) error { - a.pause(ctx, event) + a.recordPauseState(ctx, event) attempt := a.LastAttempt.Get(ctx) attempt.Stamp++ return nil }, ) +// TransitionPauseRequested transitions a STARTED activity to PAUSE_REQUESTED. The worker is still +// in charge of the activity. It will be notified via ActivityPaused=true on its next heartbeat +// response, its task token is not invalidated by this transition, and there is no stamp bump since +// StartToCloseTimeoutTask and HeartbeatTimeoutTask must stay valid. +var TransitionPauseRequested = chasm.NewTransition( + []activitypb.ActivityExecutionStatus{ + activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + }, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, + func(a *Activity, ctx chasm.MutableContext, event pauseEvent) error { + a.recordPauseState(ctx, event) + return nil + }, +) + type unpauseEvent struct { req *workflowservice.UnpauseActivityExecutionRequest metricsHandler metrics.Handler } +// TransitionUnpaused transitions PAUSED → SCHEDULED, triggering a dispatch task that will lead to +// another activity attempt. var TransitionUnpaused = chasm.NewTransition( []activitypb.ActivityExecutionStatus{ activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, @@ -461,6 +455,32 @@ var TransitionUnpaused = chasm.NewTransition( }, ) +// TransitionUnpausedWhilePauseRequested transitions PAUSE_REQUESTED → STARTED. The worker is still in charge +// of the activity. Its task token is not invalidated by this transition, and there is no stamp bump +// since StartToCloseTimeoutTask and HeartbeatTimeoutTask must stay valid. +var TransitionUnpausedWhilePauseRequested = chasm.NewTransition( + []activitypb.ActivityExecutionStatus{ + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, + }, + activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + func(a *Activity, ctx chasm.MutableContext, event unpauseEvent) error { + return nil + }, +) + +// TransitionAttemptFailedWhilePauseRequested transitions PAUSE_REQUESTED → PAUSED. It is performed instead of +// TransitionReschedule, when the activity is in PAUSE_REQUESTED and the worker yields (failure or +// timeout) with retries remaining. The failed attempt is recorded and Count is incremented. +var TransitionAttemptFailedWhilePauseRequested = chasm.NewTransition( + []activitypb.ActivityExecutionStatus{ + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED, + }, + activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED, + func(a *Activity, ctx chasm.MutableContext, event rescheduleEvent) error { + return a.applyFailedAttempt(ctx, event) + }, +) + type resetEvent struct { req *workflowservice.ResetActivityExecutionRequest scheduleTime time.Time diff --git a/tests/activity_api_reset_test.go b/tests/activity_api_reset_test.go index 8d3534aaa7..ee1fba2026 100644 --- a/tests/activity_api_reset_test.go +++ b/tests/activity_api_reset_test.go @@ -146,7 +146,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_AfterRetry() { return "", activityErr } - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck return "done!", nil } @@ -205,7 +205,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_WhileRunning() { var startedActivityCount atomic.Int32 activityFunction := func() (string, error) { startedActivityCount.Add(1) - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck return "done!", nil } @@ -279,7 +279,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_InRetry() { return "", activityErr } - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck return "done!", nil } @@ -351,7 +351,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_KeepPaused() { return "", activityErr } - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck return "done!", nil } @@ -472,7 +472,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityReset_HeartbeatDetails() { return "", errors.New("bad-luck-please-retry") } // not the first iteration - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck for activityShouldFinish.Load() == false { activity.RecordHeartbeat(ctx, "second") time.Sleep(time.Second) //nolint:forbidigo @@ -582,7 +582,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_WhilePaused() { if !activityWasReset.Load() { return "", errors.New("bad-luck-please-retry") } - s.WaitForChannel(ctx, activityCompleteCh) + s.WaitForChannel(ctx, activityCompleteCh) //nolint:staticcheck return "done!", nil } @@ -661,7 +661,7 @@ func (s *ActivityApiResetClientTestSuite) TestActivityResetApi_TerminateWhileDef activityFunction := func() (string, error) { startedActivityCount.Add(1) - s.WaitForChannel(ctx, activityBlockCh) + s.WaitForChannel(ctx, activityBlockCh) //nolint:staticcheck return "done!", nil } diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 288e31153d..bc126f7eb4 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -7825,6 +7825,281 @@ func (s *standaloneActivityTestSuite) TestPauseActivityExecution() { require.EqualValues(t, 1, descResp.GetInfo().GetAttempt(), "non-retryable fail must not increment attempt") }) + // StartToCloseTimeoutWhilePauseRequested: a STARTED activity with a pending pause must still + // observe its StartToClose timeout. The worker stops responding, the timer fires, the retry + // path consumes the pause-request and lands the activity in PAUSED with the attempt count + // incremented. + t.Run("StartToCloseTimeoutWhilePauseRequested", func(t *testing.T) { + ctx, cancel := context.WithTimeout(testcore.NewContext(), 30*time.Second) + defer cancel() + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + _, err := env.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(1 * time.Second), + RequestId: env.Tv().RequestID(), + RetryPolicy: &commonpb.RetryPolicy{ + MaximumAttempts: 10, + InitialInterval: durationpb.New(1 * time.Millisecond), + BackoffCoefficient: 1.0, + }, + }) + require.NoError(t, err) + + // Worker polls → activity is STARTED. + _, err = env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + + // Pause while STARTED → PAUSE_REQUESTED. + _, err = env.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + Identity: "test-identity", + Reason: "test-pause", + }) + require.NoError(t, err) + + // Worker stops responding. StartToCloseTimeout must fire and the pause-request must be + // consumed by a retry, landing the activity in PAUSED at attempt 2. + require.EventuallyWithT(t, func(c *assert.CollectT) { + dr, dErr := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + }) + require.NoError(c, dErr) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_PAUSED, dr.GetInfo().GetRunState()) + require.EqualValues(c, 2, dr.GetInfo().GetAttempt()) + }, 10*time.Second, 200*time.Millisecond) + }) + + // HeartbeatTimeoutWhilePauseRequested: as StartToCloseTimeoutWhilePauseRequested but for the + // heartbeat timer. + t.Run("HeartbeatTimeoutWhilePauseRequested", func(t *testing.T) { + ctx, cancel := context.WithTimeout(testcore.NewContext(), 30*time.Second) + defer cancel() + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + _, err := env.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + HeartbeatTimeout: durationpb.New(1 * time.Second), + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + RequestId: env.Tv().RequestID(), + RetryPolicy: &commonpb.RetryPolicy{ + MaximumAttempts: 10, + InitialInterval: durationpb.New(1 * time.Millisecond), + BackoffCoefficient: 1.0, + }, + }) + require.NoError(t, err) + + _, err = env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + + _, err = env.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + Identity: "test-identity", + Reason: "test-pause", + }) + require.NoError(t, err) + + // No heartbeat → HeartbeatTimeoutTask fires → retry consumes pause-request → PAUSED at attempt 2. + require.EventuallyWithT(t, func(c *assert.CollectT) { + dr, dErr := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + }) + require.NoError(c, dErr) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_PAUSED, dr.GetInfo().GetRunState()) + require.EqualValues(c, 2, dr.GetInfo().GetAttempt()) + }, 10*time.Second, 200*time.Millisecond) + }) + + // UpdateOptionsPreservesTimeoutsWhilePauseRequested: UpdateActivityExecutionOptions bumps the + // attempt stamp, which invalidates all attempt-scoped timeout tasks. The handler must re-emit + // fresh StartToClose and Heartbeat timeout tasks for PAUSE_REQUESTED activities, otherwise the + // running worker is left with no server-side timeout enforcement (only the long + // ScheduleToCloseTimeout would eventually catch a hung worker). + t.Run("UpdateOptionsPreservesTimeoutsWhilePauseRequested", func(t *testing.T) { + ctx, cancel := context.WithTimeout(testcore.NewContext(), 30*time.Second) + defer cancel() + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + startResp, err := env.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(1 * time.Minute), + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + RequestId: env.Tv().RequestID(), + RetryPolicy: &commonpb.RetryPolicy{ + MaximumAttempts: 10, + InitialInterval: durationpb.New(1 * time.Millisecond), + BackoffCoefficient: 1.0, + }, + }) + require.NoError(t, err) + + _, err = env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + + // Pause while STARTED → PAUSE_REQUESTED. + _, err = env.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + Identity: "test-identity", + Reason: "test-pause", + }) + require.NoError(t, err) + + // Update StartToCloseTimeout to 1s. The handler bumps the attempt stamp (invalidating the + // old 1-minute timeout task) and must re-emit a fresh 1-second timeout task that fires + // while the activity is PAUSE_REQUESTED. + _, err = env.FrontendClient().UpdateActivityExecutionOptions(ctx, &workflowservice.UpdateActivityExecutionOptionsRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.RunId, + ActivityOptions: &activitypb.ActivityOptions{ + StartToCloseTimeout: durationpb.New(1 * time.Second), + }, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"start_to_close_timeout"}}, + }) + require.NoError(t, err) + + // New StartToCloseTimeout fires → retry consumes pause-request → PAUSED at attempt 2. + require.EventuallyWithT(t, func(c *assert.CollectT) { + dr, dErr := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + }) + require.NoError(c, dErr) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_PAUSED, dr.GetInfo().GetRunState()) + require.EqualValues(c, 2, dr.GetInfo().GetAttempt()) + }, 10*time.Second, 200*time.Millisecond) + }) + + // ResetKeepPausedWhilePauseRequested: a reset with KeepPaused=true on a PAUSE_REQUESTED + // activity must defer the reset (via ActivityReset) and have it consumed by the next retry, + // landing the activity in PAUSED at attempt 1. Pins the contract between handleReset and + // AttemptFailedWhilePauseRequested. + t.Run("ResetKeepPausedWhilePauseRequested", func(t *testing.T) { + ctx, cancel := context.WithTimeout(testcore.NewContext(), 30*time.Second) + defer cancel() + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + startResp, err := env.FrontendClient().StartActivityExecution(ctx, &workflowservice.StartActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + ActivityType: env.Tv().ActivityType(), + Identity: env.Tv().WorkerIdentity(), + Input: defaultInput, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}, + StartToCloseTimeout: durationpb.New(1 * time.Second), + RequestId: env.Tv().RequestID(), + RetryPolicy: &commonpb.RetryPolicy{ + MaximumAttempts: 10, + InitialInterval: durationpb.New(1 * time.Millisecond), + BackoffCoefficient: 1.0, + }, + }) + require.NoError(t, err) + + // Drive the activity to attempt 2 by polling then failing attempt 1. + pollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + _, err = env.FrontendClient().RespondActivityTaskFailed(ctx, &workflowservice.RespondActivityTaskFailedRequest{ + Namespace: env.Namespace().String(), + TaskToken: pollResp.GetTaskToken(), + Failure: &failurepb.Failure{ + Message: "fail attempt 1", + FailureInfo: &failurepb.Failure_ApplicationFailureInfo{ApplicationFailureInfo: &failurepb.ApplicationFailureInfo{ + Type: "TestFailure", + }}, + }, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + + // Worker polls attempt 2 → STARTED at attempt 2. + require.EventuallyWithT(t, func(c *assert.CollectT) { + dr, dErr := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + }) + require.NoError(c, dErr) + require.EqualValues(c, 2, dr.GetInfo().GetAttempt()) + }, 5*time.Second, 100*time.Millisecond) + _, err = env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Identity: env.Tv().WorkerIdentity(), + }) + require.NoError(t, err) + + // Pause while STARTED at attempt 2 → PAUSE_REQUESTED. + _, err = env.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + Identity: "test-identity", + Reason: "test-pause", + }) + require.NoError(t, err) + + // Reset with KeepPaused=true. Reset is deferred via ActivityReset; status stays PAUSE_REQUESTED. + _, err = env.FrontendClient().ResetActivityExecution(ctx, &workflowservice.ResetActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + RunId: startResp.GetRunId(), + KeepPaused: true, + }) + require.NoError(t, err) + + // Worker stops responding. StartToCloseTimeout fires → AttemptFailedWhilePauseRequested + // consumes ActivityReset → PAUSED at attempt 1. + require.EventuallyWithT(t, func(c *assert.CollectT) { + dr, dErr := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ + Namespace: env.Namespace().String(), + ActivityId: activityID, + }) + require.NoError(c, dErr) + require.Equal(c, enumspb.PENDING_ACTIVITY_STATE_PAUSED, dr.GetInfo().GetRunState()) + require.EqualValues(c, 1, dr.GetInfo().GetAttempt()) + }, 10*time.Second, 200*time.Millisecond) + }) + // PauseRequestValidation: validate that Pause rejects invalid request fields. t.Run("PauseRequestValidation", func(t *testing.T) { ctx := testcore.NewContext() @@ -8409,11 +8684,7 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { require.Nil(t, poll2Resp.GetHeartbeatDetails(), "expected nil heartbeat details in poll after reset") }) - // UnpauseWhileCancelRequested: CANCEL_REQUESTED+PauseState is reached by pausing while STARTED - // (flag-only, PauseState set) and then cancelling (status → CANCEL_REQUESTED, PauseState stays). - // Pausing directly on a CANCEL_REQUESTED activity is now rejected (FailedPrecondition), so this - // is the only valid path into this state. Unpause must be a no-op — cancel takes precedence and - // the activity must not be re-dispatched. + // Issuing Unpause on a CANCEL_REQUESTED activity is a no-op. t.Run("UnpauseWhileCancelRequested", func(t *testing.T) { ctx := testcore.NewContext() activityID := testcore.RandomizeStr(t.Name()) @@ -8425,7 +8696,7 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { // Poll → STARTED. pollResp := env.pollActivityTaskAndValidate(ctx, t, activityID, taskQueue, runID) - // Pause while STARTED → flag-only (PauseState set, status stays STARTED). + // Pause while STARTED → PAUSE_REQUESTED. _, err := env.FrontendClient().PauseActivityExecution(ctx, &workflowservice.PauseActivityExecutionRequest{ Namespace: env.Namespace().String(), ActivityId: activityID, @@ -8435,7 +8706,7 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { }) require.NoError(t, err) - // Cancel → CANCEL_REQUESTED. PauseState remains set from the prior pause. + // Cancel → CANCEL_REQUESTED. _, err = env.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ Namespace: env.Namespace().String(), ActivityId: activityID, @@ -8446,16 +8717,16 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { }) require.NoError(t, err) - // Confirm both flags are set via heartbeat. + // Heartbeat shows cancel-requested; pause is no longer surfaced. hbResp, err := env.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{ Namespace: env.Namespace().String(), TaskToken: pollResp.TaskToken, }) require.NoError(t, err) require.True(t, hbResp.GetCancelRequested()) - require.True(t, hbResp.GetActivityPaused()) + require.False(t, hbResp.GetActivityPaused()) - // Unpause — must be a no-op: status stays CANCEL_REQUESTED, no new dispatch task. + // Unpause is a no-op: status stays CANCEL_REQUESTED. _, err = env.FrontendClient().UnpauseActivityExecution(ctx, &workflowservice.UnpauseActivityExecutionRequest{ Namespace: env.Namespace().String(), ActivityId: activityID, @@ -8464,7 +8735,6 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { }) require.NoError(t, err) - // RunState must still be CANCEL_REQUESTED after the unpause. descResp, err := env.FrontendClient().DescribeActivityExecution(ctx, &workflowservice.DescribeActivityExecutionRequest{ Namespace: env.Namespace().String(), ActivityId: activityID, @@ -8474,14 +8744,13 @@ func (s *standaloneActivityTestSuite) TestUnpauseActivityExecution() { require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED, descResp.GetInfo().GetRunState(), "unpause of a CANCEL_REQUESTED activity must be a no-op") - // Heartbeat must show CancelRequested=true, ActivityPaused=false after the unpause. hbResp2, err := env.FrontendClient().RecordActivityTaskHeartbeat(ctx, &workflowservice.RecordActivityTaskHeartbeatRequest{ Namespace: env.Namespace().String(), TaskToken: pollResp.TaskToken, }) require.NoError(t, err) require.True(t, hbResp2.GetCancelRequested(), "cancel must remain after unpause") - require.False(t, hbResp2.GetActivityPaused(), "pause flag must be cleared after unpause") + require.False(t, hbResp2.GetActivityPaused(), "pause flag must remain cleared") }) // UnpauseRequestValidation: validate that Unpause rejects invalid request fields.