Skip to content
Closed
129 changes: 74 additions & 55 deletions distarray/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
# Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc.
# Distributed under the terms of the BSD License. See COPYING.rst.
# ---------------------------------------------------------------------------

"""
`Context` objects contain the information required for distarrays to
communicate with localarrays.
`Context` objects contain the information required for `DistArray`s to
communicate with `LocalArray`s.
"""


Expand All @@ -26,6 +27,7 @@


class Context(object):

"""
Context objects manage the setup and communication of the worker processes
for DistArray objects. A DistArray object has a context, and contexts have
Expand All @@ -35,7 +37,6 @@ class Context(object):
Typically there is just one context object that uses all processes,
although it is possible to have more than one context with a different
selection of engines.

"""

_CLEANUP = None
Expand Down Expand Up @@ -199,14 +200,10 @@ def _push0(self, d):
def _pull0(self, k):
return self.view.pull(k, targets=self.targets[0], block=True)

def _create_local(self, local_call, shape, dist, grid_shape, dtype):
def _create_local(self, local_call, distribution, dtype):
"""Creates LocalArrays with the method named in `local_call`."""
da_key = self._generate_key()
comm_name = self._comm_key
distribution = Distribution.from_shape(context=self,
shape=shape,
dist=dist,
grid_shape=grid_shape)
ddpr = distribution.get_dim_data_per_rank()
ddpr_name, dtype_name = self._key_and_push(ddpr, dtype)
cmd = ('{da_key} = {local_call}(distarray.local.maps.Distribution('
Expand All @@ -216,26 +213,53 @@ def _create_local(self, local_call, shape, dist, grid_shape, dtype):
return DistArray.from_localarrays(da_key, distribution=distribution,
dtype=dtype)

def zeros(self, shape, dtype=float, dist=None, grid_shape=None):
if dist is None:
dist = {0: 'b'}
return self._create_local(local_call='distarray.local.zeros',
shape=shape, dist=dist,
grid_shape=grid_shape, dtype=dtype)
def empty(self, distribution, dtype=float):
"""Create an empty Distarray.

def ones(self, shape, dtype=float, dist=None, grid_shape=None):
if dist is None:
dist = {0: 'b'}
return self._create_local(local_call='distarray.local.ones',
shape=shape, dist=dist,
grid_shape=grid_shape, dtype=dtype,)
Parameters
----------
distribution : Distribution object
dtype : NumPy dtype, optional (default float)

def empty(self, shape, dtype=float, dist=None, grid_shape=None):
if dist is None:
dist = {0: 'b'}
Returns
-------
DistArray
A DistArray distributed as specified, with uninitialized values.
"""
return self._create_local(local_call='distarray.local.empty',
shape=shape, dist=dist,
grid_shape=grid_shape, dtype=dtype)
distribution=distribution, dtype=dtype)

def zeros(self, distribution, dtype=float):
"""Create a Distarray filled with zeros.

Parameters
----------
distribution : Distribution object
dtype : NumPy dtype, optional (default float)

Returns
-------
DistArray
A DistArray distributed as specified, filled with zeros.
"""
return self._create_local(local_call='distarray.local.zeros',
distribution=distribution, dtype=dtype)

def ones(self, distribution, dtype=float):
"""Create a Distarray filled with ones.

Parameters
----------
distribution : Distribution object
dtype : NumPy dtype, optional (default float)

Returns
-------
DistArray
A DistArray distributed as specified, filled with ones.
"""
return self._create_local(local_call='distarray.local.ones',
distribution=distribution, dtype=dtype,)

def save_dnpy(self, name, da):
"""
Expand Down Expand Up @@ -386,51 +410,41 @@ def save_hdf5(self, filename, da, key='buffer', mode='a'):
'distarray.local.save_hdf5(%s, %s, %s, %s)' % subs
)

def load_npy(self, filename, dim_data_per_rank):
def load_npy(self, filename, distribution):
"""
Load a DistArray from a dataset in a ``.npy`` file.

Parameters
----------
filename : str
Filename to load.
dim_data_per_rank : sequence of tuples of dict
A "dim_data" data structure for every rank. Described here:
https://github.com/enthought/distributed-array-protocol
distribution: Distribution object

Returns
-------
result : DistArray
A DistArray encapsulating the file loaded.

"""
if len(self.targets) != len(dim_data_per_rank):
errmsg = "`dim_data_per_rank` must contain a dim_data for every rank."
raise TypeError(errmsg)

da_key = self._generate_key()
subs = ((da_key,) + self._key_and_push(filename, dim_data_per_rank) +
ddpr = distribution.get_dim_data_per_rank()
subs = ((da_key,) + self._key_and_push(filename, ddpr) +
(self._comm_key,) + (self._comm_key,))

self._execute(
'%s = distarray.local.load_npy(%s, %s[%s.Get_rank()], %s)' % subs
)

distribution = Distribution.from_dim_data_per_rank(self,
dim_data_per_rank)
return DistArray.from_localarrays(da_key, distribution=distribution)

def load_hdf5(self, filename, dim_data_per_rank, key='buffer'):
def load_hdf5(self, filename, distribution, key='buffer'):
"""
Load a DistArray from a dataset in an ``.hdf5`` file.

Parameters
----------
filename : str
Filename to load.
dim_data_per_rank : sequence of tuples of dict
A "dim_data" data structure for every rank. Described here:
https://github.com/enthought/distributed-array-protocol
distribution: Distribution object
key : str, optional
The identifier for the group to load the DistArray from (the
default is 'buffer').
Expand All @@ -447,29 +461,34 @@ def load_hdf5(self, filename, dim_data_per_rank, key='buffer'):
errmsg = "An MPI-enabled h5py must be available to use load_hdf5."
raise ImportError(errmsg)

if len(self.targets) != len(dim_data_per_rank):
errmsg = "`dim_data_per_rank` must contain a dim_data for every rank."
raise TypeError(errmsg)

da_key = self._generate_key()
subs = ((da_key,) + self._key_and_push(filename, dim_data_per_rank) +
ddpr = distribution.get_dim_data_per_rank()
subs = ((da_key,) + self._key_and_push(filename, ddpr) +
(self._comm_key,) + self._key_and_push(key) + (self._comm_key,))

self._execute(
'%s = distarray.local.load_hdf5(%s, %s[%s.Get_rank()], %s, %s)' % subs
)
return DistArray.from_localarrays(da_key, distribution=distribution)

distribution = Distribution.from_dim_data_per_rank(self,
dim_data_per_rank)
def fromndarray(self, arr, distribution=None):
"""Create a DistArray from an ndarray.

return DistArray.from_localarrays(da_key, distribution=distribution)
Parameters
----------
distribution : Distribution object, optional
If a Distribution object is not provided, one is created with
`Distribution.from_shape(arr.shape)`.

def fromndarray(self, arr, dist=None, grid_shape=None):
"""Convert an ndarray to a distarray."""
if dist is None:
dist = {0: 'b'}
out = self.empty(arr.shape, dtype=arr.dtype, dist=dist,
grid_shape=grid_shape)
Returns
-------
DistArray
A DistArray distributed as specified, using the values and dtype
from `arr`.
"""
if distribution is None:
distribution = Distribution.from_shape(self, arr.shape)
out = self.empty(distribution, dtype=arr.dtype)
for index, value in numpy.ndenumerate(arr):
out[index] = value
return out
Expand Down
5 changes: 2 additions & 3 deletions distarray/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import functools

from distarray.client import DistArray
from distarray.client_map import Distribution
from distarray.context import Context
from distarray.error import ContextError
from distarray.utils import has_exactly_one
Expand Down Expand Up @@ -195,9 +196,7 @@ def __call__(self, *args, **kwargs):
for arg in args:
if isinstance(arg, DistArray):
# Create the output distarray.
out = context.empty(arg.shape, dtype=arg.dtype,
dist=arg.dist,
grid_shape=arg.grid_shape)
out = context.empty(arg.distribution, dtype=arg.dtype)
# parse args
args_str, kwargs_str = self.key_and_push_args(
args, kwargs, context=context,
Expand Down
50 changes: 33 additions & 17 deletions distarray/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ def tearDown(self):

def test_set_and_getitem_block_dist(self):
size = 10
dap = self.dac.empty((size,), dist={0: 'b'})
distribution = Distribution.from_shape(self.dac, (size,),
dist={0: 'b'})
dap = self.dac.empty(distribution)

for val in range(size):
dap[val] = val
Expand All @@ -47,23 +49,26 @@ def test_set_and_getitem_block_dist(self):

def test_set_and_getitem_nd_block_dist(self):
size = 5
dap = self.dac.empty((size, size), dist={0: 'b', 1: 'b'})
distribution = Distribution.from_shape(self.dac, (size, size),
dist={0: 'b', 1: 'b'})
dap = self.dac.empty(distribution)

for row in range(size):
for col in range(size):
val = size*row + col
dap[row, col] = val
self.assertEqual(dap[row, col], val)

for row in range(1 ,size + 1):
for row in range(1, size + 1):
for col in range(1, size + 1):
dap[-row, -col] = row + col
self.assertEqual(dap[-row, -col], row + col)


def test_set_and_getitem_cyclic_dist(self):
size = 10
dap = self.dac.empty((size,), dist={0: 'c'})
distribution = Distribution.from_shape(self.dac, (size,),
dist={0: 'c'})
dap = self.dac.empty(distribution)

for val in range(size):
dap[val] = val
Expand All @@ -74,36 +79,43 @@ def test_set_and_getitem_cyclic_dist(self):
self.assertEqual(dap[-i], i)

def test_get_index_error(self):
dap = self.dac.empty((10,), dist={0: 'c'})
distribution = Distribution.from_shape(self.dac, (10,), dist={0: 'c'})
dap = self.dac.empty(distribution)
with self.assertRaises(IndexError):
dap[11]
with self.assertRaises(IndexError):
dap[-11]

def test_set_index_error(self):
dap = self.dac.empty((10,), dist={0: 'c'})
distribution = Distribution.from_shape(self.dac, (10,), dist={0: 'c'})
dap = self.dac.empty(distribution)
with self.assertRaises(IndexError):
dap[11] = 55
with self.assertRaises(IndexError):
dap[-11] = 55

def test_iteration(self):
size = 10
dap = self.dac.empty((size,), dist={0: 'c'})
distribution = Distribution.from_shape(self.dac, (size,),
dist={0: 'c'})
dap = self.dac.empty(distribution)
dap.fill(10)
for val in dap:
self.assertEqual(val, 10)

def test_tondarray(self):
dap = self.dac.empty((3, 3))
distribution = Distribution.from_shape(self.dac, (3, 3))
dap = self.dac.empty(distribution)
ndarr = numpy.arange(9).reshape(3, 3)
for (i, j), val in numpy.ndenumerate(ndarr):
dap[i, j] = ndarr[i, j]
numpy.testing.assert_array_equal(dap.tondarray(), ndarr)

def test_global_tolocal_bug(self):
# gh-issue #154
dap = self.dac.zeros((3, 3), dist=('n', 'b'))
distribution = Distribution.from_shape(self.dac, (3, 3),
dist=('n', 'b'))
dap = self.dac.zeros(distribution)
ndarr = numpy.zeros((3, 3))
numpy.testing.assert_array_equal(dap.tondarray(), ndarr)

Expand Down Expand Up @@ -327,20 +339,22 @@ def test___init__(self):

def test_zeros(self):
shape = (16, 16)
zero_distarray = self.context.zeros(shape)
distribution = Distribution.from_shape(self.context, shape)
zero_distarray = self.context.zeros(distribution)
zero_ndarray = numpy.zeros(shape)
assert_array_equal(zero_distarray.tondarray(), zero_ndarray)

def test_ones(self):
shape = (16, 16)
one_distarray = self.context.ones(shape)
distribution = Distribution.from_shape(self.context, shape)
one_distarray = self.context.ones(distribution)
one_ndarray = numpy.ones(shape)
assert_array_equal(one_distarray.tondarray(), one_ndarray)

def test_empty(self):
shape = (16, 16)
empty_distarray = self.context.empty(shape)
self.assertEqual(empty_distarray.shape, shape)
distribution = Distribution.from_shape(self.context, (16, 16))
empty_distarray = self.context.empty(distribution)
self.assertEqual(empty_distarray.shape, distribution.shape)

def test_fromndarray(self):
ndarr = numpy.arange(16).reshape(4, 4)
Expand All @@ -350,8 +364,10 @@ def test_fromndarray(self):

def test_grid_rank(self):
# regression test for issue #235
a = self.context.empty((4, 4, 4), dist=('b', 'n', 'b'),
grid_shape=(1, 1, 4))
d = Distribution.from_shape(self.context, (4, 4, 4),
dist=('b', 'n', 'b'),
grid_shape=(1, 1, 4))
a = self.context.empty(d)
self.assertEqual(a.grid_shape, (1, 1, 4))

def test_fromfunction(self):
Expand Down
Loading