Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions distarray/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ def is_LocalArray(typestring):

return result

_DIMDATAS = """
{dim_data_name} = {local_name}.dim_data
_DIM_DATA_PER_RANK = """
{ddpr_name} = {local_name}.dim_data
"""

def _make_mdmap_from_local_dimdata(local_name, context):
def _make_distribution_from_dim_data_per_rank(local_name, context):
dim_data_name = context._generate_key()
context._execute(_DIMDATAS.format(local_name=local_name, dim_data_name=dim_data_name))
dim_datas = context._pull(dim_data_name)
return Distribution.from_dim_data(context, dim_datas)
context._execute(_DIM_DATA_PER_RANK.format(local_name=local_name,
ddpr_name=dim_data_name))
dim_data_per_rank = context._pull(dim_data_name)
return Distribution.from_dim_data_per_rank(context, dim_data_per_rank)

def _get_attribute(context, key, name):
local_key = context._generate_key()
Expand All @@ -88,23 +89,22 @@ class DistArray(object):

__array_priority__ = 20.0

def __init__(self, mdmap, dtype):
""" Creates a new empty distarray according to the multi-dimensional
map given.
"""
def __init__(self, distribution, dtype):
"""Creates an empty DistArray according to the `distribution` given."""
# FIXME: code duplication with context.py.
ctx = mdmap.context
ctx = distribution.context
# FIXME: this is bad...
comm_name = ctx._comm_key
# FIXME: and this is bad...
da_key = ctx._generate_key()
names = ctx._key_and_push(mdmap.shape, mdmap.dist, mdmap.grid_shape, dtype)
names = ctx._key_and_push(distribution.shape, distribution.dist,
distribution.grid_shape, dtype)
shape_name, dist_name, grid_shape_name, dtype_name = names
cmd = ('{da_key} = distarray.local.empty('
'distarray.local.maps.Distribution.from_shape({shape_name}, '
'{dist_name}, {grid_shape_name}, {comm_name}), {dtype_name})')
ctx._execute(cmd.format(**locals()))
self.mdmap = mdmap
self.distribution = distribution
self.key = da_key
self._dtype = dtype

Expand All @@ -117,7 +117,8 @@ def from_localarrays(cls, key, context):
"""
da = cls.__new__(cls)
da.key = key
da.mdmap = _make_mdmap_from_local_dimdata(key, context)
da.distribution = _make_distribution_from_dim_data_per_rank(key,
context)
da._dtype = _get_attribute(context, key, 'dtype')
return da

Expand All @@ -139,7 +140,7 @@ def __getitem__(self, index):
return self.__getitem__(tuple_index)

elif isinstance(index, tuple):
targets = self.mdmap.owning_targets(index)
targets = self.distribution.owning_targets(index)
result_key = self.context._generate_key()
fmt = '%s = %s.checked_getitem(%s)'
statement = fmt % (result_key, self.key, index)
Expand All @@ -164,7 +165,7 @@ def __setitem__(self, index, value):
return self.__setitem__(tuple_index, value)

elif isinstance(index, tuple):
targets = self.mdmap.owning_targets(index)
targets = self.distribution.owning_targets(index)
result_key = self.context._generate_key()
fmt = '%s = %s.checked_setitem(%s, %s)'
statement = fmt % (result_key, self.key, index, value)
Expand All @@ -178,23 +179,23 @@ def __setitem__(self, index, value):

@property
def context(self):
return self.mdmap.context
return self.distribution.context

@property
def shape(self):
return self.mdmap.shape
return self.distribution.shape

@property
def global_size(self):
return reduce(operator.mul, self.shape)

@property
def dist(self):
return self.mdmap.dist
return self.distribution.dist

@property
def grid_shape(self):
return self.mdmap.grid_shape
return self.distribution.grid_shape

@property
def ndim(self):
Expand Down
70 changes: 36 additions & 34 deletions distarray/client_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _compactify_dicts(dicts):
# ---------------------------------------------------------------------------

def choose_map(dist_type):
"""Choose a map classe given one of the distribution types."""
"""Choose a map class given one of the distribution types."""
cls_from_dist_type = {
'b': BlockMap,
'c': BlockCyclicMap,
Expand All @@ -73,13 +73,13 @@ def choose_map(dist_type):
return cls_from_dist_type[dist_type]


def map_from_dim_datas(dim_datas):
""" Generates a ClientMap instance from a santized sequence of dim_data
def map_from_dim_data_per_rank(dim_data_per_rank):
""" Generates a ClientMap instance from a sanitized sequence of dim_data
dictionaries.

Parameters
----------
dim_datas : sequence of dictionaries
dim_data_per_rank : sequence of dictionaries
Each dictionary is a "dimension dictionary" from the distributed array
protocol, one per process in this dimension of the process grid. The
dimension dictionaries shall all have the same keys and values for
Expand All @@ -91,17 +91,18 @@ def map_from_dim_datas(dim_datas):
An instance of a subclass of MapBase.

"""
# check that all proccesses / ranks are accounted for.
proc_ranks = sorted(dd['proc_grid_rank'] for dd in dim_datas)
if proc_ranks != list(range(len(dim_datas))):
# check that all processes / ranks are accounted for.
proc_ranks = sorted(dd['proc_grid_rank'] for dd in dim_data_per_rank)
if proc_ranks != list(range(len(dim_data_per_rank))):
msg = "Ranks of processes (%r) not consistent."
raise ValueError(msg % proc_ranks)
# Sort dim_datas according to proc_grid_rank.
dim_datas = sorted(dim_datas, key=lambda d: d['proc_grid_rank'])
# Sort dim_data_per_rank according to proc_grid_rank.
dim_data_per_rank = sorted(dim_data_per_rank,
key=lambda d: d['proc_grid_rank'])

dist_type = dim_datas[0]['dist_type']
dist_type = dim_data_per_rank[0]['dist_type']
map_class = choose_map(dist_type)
return map_class.from_dim_data(dim_datas)
return map_class.from_dim_data_per_rank(dim_data_per_rank)


def map_from_global_dim_dict(global_dim_dict):
Expand Down Expand Up @@ -164,12 +165,12 @@ def from_global_dim_dict(cls, glb_dim_dict):
return cls(size, grid_size=1)

@classmethod
def from_dim_data(cls, dim_data_seq):
if len(dim_data_seq) != 1:
def from_dim_data_per_rank(cls, dim_data_per_rank):
if len(dim_data_per_rank) != 1:
msg = ("Number of dimension dictionaries "
"non-unitary for non-distributed dimension.")
raise ValueError(msg)
dd = dim_data_seq[0]
dd = dim_data_per_rank[0]
if dd['dist_type'] != 'n':
msg = "Wrong dist_type (%r) for non-distributed map."
raise ValueError(msg % dd['dist_type'])
Expand Down Expand Up @@ -219,19 +220,19 @@ def from_global_dim_dict(cls, glb_dim_dict):
return self

@classmethod
def from_dim_data(cls, dim_data_seq):
def from_dim_data_per_rank(cls, dim_data_per_rank):
self = cls.__new__(cls)
dd = dim_data_seq[0]
dd = dim_data_per_rank[0]
if dd['dist_type'] != 'b':
msg = "Wrong dist_type (%r) for block map."
raise ValueError(msg % dd['dist_type'])
self.size = dd['size']
self.grid_size = dd['proc_grid_size']
if self.grid_size != len(dim_data_seq):
if self.grid_size != len(dim_data_per_rank):
msg = ("Number of dimension dictionaries (%r)"
"inconsistent with proc_grid_size (%r).")
raise ValueError(msg % (len(dim_data_seq), self.grid_size))
self.bounds = [(d['start'], d['stop']) for d in dim_data_seq]
raise ValueError(msg % (len(dim_data_per_rank), self.grid_size))
self.bounds = [(d['start'], d['stop']) for d in dim_data_per_rank]
self.boundary_padding, self.comm_padding = dd.get('padding', (0, 0))

return self
Expand Down Expand Up @@ -287,17 +288,17 @@ def from_global_dim_dict(cls, glb_dim_dict):
return cls(size, grid_size, block_size)

@classmethod
def from_dim_data(cls, dim_data_seq):
dd = dim_data_seq[0]
def from_dim_data_per_rank(cls, dim_data_per_rank):
dd = dim_data_per_rank[0]
if dd['dist_type'] != 'c':
msg = "Wrong dist_type (%r) for cyclic map."
raise ValueError(msg % dd['dist_type'])
size = dd['size']
grid_size = dd['proc_grid_size']
if grid_size != len(dim_data_seq):
if grid_size != len(dim_data_per_rank):
msg = ("Number of dimension dictionaries (%r)"
"inconsistent with proc_grid_size (%r).")
raise ValueError(msg % (len(dim_data_seq), grid_size))
raise ValueError(msg % (len(dim_data_per_rank), grid_size))
block_size = dd.get('block_size', 1)
return cls(size, grid_size, block_size)

Expand Down Expand Up @@ -335,18 +336,18 @@ def from_global_dim_dict(cls, glb_dim_dict):
return cls(size, grid_size, indices=indices)

@classmethod
def from_dim_data(cls, dim_data_seq):
dd = dim_data_seq[0]
def from_dim_data_per_rank(cls, dim_data_per_rank):
dd = dim_data_per_rank[0]
if dd['dist_type'] != 'u':
msg = "Wrong dist_type (%r) for unstructured map."
raise ValueError(msg % dd['dist_type'])
size = dd['size']
grid_size = dd['proc_grid_size']
if grid_size != len(dim_data_seq):
if grid_size != len(dim_data_per_rank):
msg = ("Number of dimension dictionaries (%r)"
"inconsistent with proc_grid_size (%r).")
raise ValueError(msg % (len(dim_data_seq), grid_size))
indices = [dd['indices'] for dd in dim_data_seq]
raise ValueError(msg % (len(dim_data_per_rank), grid_size))
indices = [dd['indices'] for dd in dim_data_per_rank]
return cls(size, grid_size, indices=indices)

def __init__(self, size, grid_size, indices=None):
Expand Down Expand Up @@ -387,13 +388,13 @@ class Distribution(object):
"""

@classmethod
def from_dim_data(cls, context, dim_datas):
def from_dim_data_per_rank(cls, context, dim_data_per_rank):
""" Creates a Distribution from a sequence of `dim_data` dictionary
tuples from each LocalArray.
"""

self = cls.__new__(cls)
dd0 = dim_datas[0]
dd0 = dim_data_per_rank[0]
self.context = context
self.shape = tuple(dd['size'] for dd in dd0)
self.ndim = len(dd0)
Expand All @@ -402,16 +403,17 @@ def from_dim_data(cls, context, dim_datas):

validate_grid_shape(self.grid_shape, self.dist, len(context.targets))

coords = [tuple(d['proc_grid_rank'] for d in dd) for dd in dim_datas]
coords = [tuple(d['proc_grid_rank'] for d in dd) for dd in
dim_data_per_rank]
self.rank_from_coords = {c: r for (r, c) in enumerate(coords)}

dim_data_per_dim = [_compactify_dicts(dict_tuple)
for dict_tuple in zip(*dim_datas)]
for dict_tuple in zip(*dim_data_per_rank)]

if len(dim_data_per_dim) != self.ndim:
raise ValueError("Inconsistent dimensions.")

self.maps = [map_from_dim_datas(ddpd) for ddpd in dim_data_per_dim]
self.maps = [map_from_dim_data_per_rank(ddpd) for ddpd in dim_data_per_dim]

return self

Expand Down Expand Up @@ -481,7 +483,7 @@ def owning_targets(self, idxs):
"""
return [self.context.targets[r] for r in self.owning_ranks(idxs)]

def get_local_dim_datas(self):
def get_dim_data_per_rank(self):
dds = [enumerate(m.get_dimdicts()) for m in self.maps]
cart_dds = product(*dds)
coord_and_dd = [zip(*cdd) for cdd in cart_dds]
Expand Down
4 changes: 2 additions & 2 deletions distarray/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ def from_global_dim_data(self, global_dim_data, dtype=float):

"""
# global_dim_data is a sequence of dictionaries, one per dimension.
mdmap = Distribution(self, global_dim_data)
dim_data_per_rank = mdmap.get_local_dim_datas()
distribution = Distribution(self, global_dim_data)
dim_data_per_rank = distribution.get_dim_data_per_rank()

if len(self.targets) != len(dim_data_per_rank):
errmsg = "`dim_data_per_rank` must contain a dim_data for every rank."
Expand Down
31 changes: 17 additions & 14 deletions distarray/local/tests/paralleltest_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,23 +161,26 @@ def tearDown(self):
os.remove(self.output_path)

def test_load_bn(self):
dim_datas = bn_test_data
la = load_npy(self.output_path, dim_datas[self.rank], comm=self.comm)
dim_data_per_rank = bn_test_data
la = load_npy(self.output_path, dim_data_per_rank[self.rank],
comm=self.comm)
assert_equal(la, self.expected[numpy.newaxis, self.rank])

def test_load_nc(self):
dim_datas = nc_test_data
dim_data_per_rank = nc_test_data
expected_slices = [(slice(None), slice(0, None, 2)),
(slice(None), slice(1, None, 2))]

la = load_npy(self.output_path, dim_datas[self.rank], comm=self.comm)
la = load_npy(self.output_path, dim_data_per_rank[self.rank],
comm=self.comm)
assert_equal(la, self.expected[expected_slices[self.rank]])

def test_load_nu(self):
dim_datas = nu_test_data
expected_indices = [dd[1]['indices'] for dd in dim_datas]
dim_data_per_rank = nu_test_data
expected_indices = [dd[1]['indices'] for dd in dim_data_per_rank]

la = load_npy(self.output_path, dim_datas[self.rank], comm=self.comm)
la = load_npy(self.output_path, dim_data_per_rank[self.rank],
comm=self.comm)
assert_equal(la, self.expected[:, expected_indices[self.rank]])


Expand Down Expand Up @@ -253,28 +256,28 @@ def tearDown(self):
os.remove(self.output_path)

def test_load_bn(self):
dim_datas = bn_test_data
la = load_hdf5(self.output_path, dim_datas[self.rank],
dim_data_per_rank = bn_test_data
la = load_hdf5(self.output_path, dim_data_per_rank[self.rank],
key=self.key, comm=self.comm)
with self.h5py.File(self.output_path, 'r', driver='mpio',
comm=self.comm) as fp:
assert_equal(la, self.expected[numpy.newaxis, self.rank])

def test_load_nc(self):
dim_datas = nc_test_data
dim_data_per_rank = nc_test_data
expected_slices = [(slice(None), slice(0, None, 2)),
(slice(None), slice(1, None, 2))]
la = load_hdf5(self.output_path, dim_datas[self.rank],
la = load_hdf5(self.output_path, dim_data_per_rank[self.rank],
key=self.key, comm=self.comm)
with self.h5py.File(self.output_path, 'r', driver='mpio',
comm=self.comm) as fp:
expected_slice = expected_slices[self.rank]
assert_equal(la, self.expected[expected_slice])

def test_load_nu(self):
dim_datas = nu_test_data
expected_indices = [dd[1]['indices'] for dd in dim_datas]
la = load_hdf5(self.output_path, dim_datas[self.rank],
dim_data_per_rank = nu_test_data
expected_indices = [dd[1]['indices'] for dd in dim_data_per_rank]
la = load_hdf5(self.output_path, dim_data_per_rank[self.rank],
key=self.key, comm=self.comm)
with self.h5py.File(self.output_path, 'r', driver='mpio',
comm=self.comm) as fp:
Expand Down
Loading