Skip to content
Merged
13 changes: 7 additions & 6 deletions distarray/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ def is_LocalArray(typestring):

return result

_DIMDATAS = """
{dim_data_name} = {local_name}.dim_data
_DIM_DATA_PER_RANK = """
{ddpr_name} = {local_name}.dim_data
"""

def _make_mdmap_from_local_dimdata(local_name, context):
dim_data_name = context._generate_key()
context._execute(_DIMDATAS.format(local_name=local_name, dim_data_name=dim_data_name))
dim_datas = context._pull(dim_data_name)
return Distribution.from_dim_data(context, dim_datas)
ddpr_name = context._generate_key()
context._execute(_DIM_DATA_PER_RANK.format(local_name=local_name,
ddpr_name=ddpr_name))
dim_data_per_rank = context._pull(ddpr_name)
return Distribution.from_dim_data_per_rank(context, dim_data_per_rank)

def _get_attribute(context, key, name):
local_key = context._generate_key()
Expand Down
68 changes: 35 additions & 33 deletions distarray/client_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ def choose_map(dist_type):
return cls_from_dist_type[dist_type]


def map_from_dim_datas(dim_datas):
""" Generates a ClientMap instance from a santized sequence of dim_data
def map_from_dim_data_per_rank(dim_data_per_rank):
""" Generates a ClientMap instance from a sanitized sequence of dim_data
dictionaries.

Parameters
----------
dim_datas : sequence of dictionaries
dim_data_per_rank : sequence of dictionaries
Each dictionary is a "dimension dictionary" from the distributed array
protocol, one per process in this dimension of the process grid. The
dimension dictionaries shall all have the same keys and values for
Expand All @@ -91,17 +91,18 @@ def map_from_dim_datas(dim_datas):
An instance of a subclass of MapBase.

"""
# check that all proccesses / ranks are accounted for.
proc_ranks = sorted(dd['proc_grid_rank'] for dd in dim_datas)
if proc_ranks != list(range(len(dim_datas))):
# check that all processes / ranks are accounted for.
proc_ranks = sorted(dd['proc_grid_rank'] for dd in dim_data_per_rank)
if proc_ranks != list(range(len(dim_data_per_rank))):
msg = "Ranks of processes (%r) not consistent."
raise ValueError(msg % proc_ranks)
# Sort dim_datas according to proc_grid_rank.
dim_datas = sorted(dim_datas, key=lambda d: d['proc_grid_rank'])
# Sort dim_data_per_rank according to proc_grid_rank.
dim_data_per_rank = sorted(dim_data_per_rank,
key=lambda d: d['proc_grid_rank'])

dist_type = dim_datas[0]['dist_type']
dist_type = dim_data_per_rank[0]['dist_type']
map_class = choose_map(dist_type)
return map_class.from_dim_data(dim_datas)
return map_class.from_dim_data_per_rank(dim_data_per_rank)


def map_from_global_dim_dict(global_dim_dict):
Expand Down Expand Up @@ -164,12 +165,12 @@ def from_global_dim_dict(cls, glb_dim_dict):
return cls(size, grid_size=1)

@classmethod
def from_dim_data(cls, dim_data_seq):
if len(dim_data_seq) != 1:
def from_dim_data_per_rank(cls, dim_data_per_rank):
if len(dim_data_per_rank) != 1:
msg = ("Number of dimension dictionaries "
"non-unitary for non-distributed dimension.")
raise ValueError(msg)
dd = dim_data_seq[0]
dd = dim_data_per_rank[0]
if dd['dist_type'] != 'n':
msg = "Wrong dist_type (%r) for non-distributed map."
raise ValueError(msg % dd['dist_type'])
Expand Down Expand Up @@ -219,19 +220,19 @@ def from_global_dim_dict(cls, glb_dim_dict):
return self

@classmethod
def from_dim_data(cls, dim_data_seq):
def from_dim_data_per_rank(cls, dim_data_per_rank):
self = cls.__new__(cls)
dd = dim_data_seq[0]
dd = dim_data_per_rank[0]
if dd['dist_type'] != 'b':
msg = "Wrong dist_type (%r) for block map."
raise ValueError(msg % dd['dist_type'])
self.size = dd['size']
self.grid_size = dd['proc_grid_size']
if self.grid_size != len(dim_data_seq):
if self.grid_size != len(dim_data_per_rank):
msg = ("Number of dimension dictionaries (%r)"
"inconsistent with proc_grid_size (%r).")
raise ValueError(msg % (len(dim_data_seq), self.grid_size))
self.bounds = [(d['start'], d['stop']) for d in dim_data_seq]
raise ValueError(msg % (len(dim_data_per_rank), self.grid_size))
self.bounds = [(d['start'], d['stop']) for d in dim_data_per_rank]
self.boundary_padding, self.comm_padding = dd.get('padding', (0, 0))

return self
Expand Down Expand Up @@ -287,17 +288,17 @@ def from_global_dim_dict(cls, glb_dim_dict):
return cls(size, grid_size, block_size)

@classmethod
def from_dim_data(cls, dim_data_seq):
dd = dim_data_seq[0]
def from_dim_data_per_rank(cls, dim_data_per_rank):
dd = dim_data_per_rank[0]
if dd['dist_type'] != 'c':
msg = "Wrong dist_type (%r) for cyclic map."
raise ValueError(msg % dd['dist_type'])
size = dd['size']
grid_size = dd['proc_grid_size']
if grid_size != len(dim_data_seq):
if grid_size != len(dim_data_per_rank):
msg = ("Number of dimension dictionaries (%r)"
"inconsistent with proc_grid_size (%r).")
raise ValueError(msg % (len(dim_data_seq), grid_size))
raise ValueError(msg % (len(dim_data_per_rank), grid_size))
block_size = dd.get('block_size', 1)
return cls(size, grid_size, block_size)

Expand Down Expand Up @@ -335,18 +336,18 @@ def from_global_dim_dict(cls, glb_dim_dict):
return cls(size, grid_size, indices=indices)

@classmethod
def from_dim_data(cls, dim_data_seq):
dd = dim_data_seq[0]
def from_dim_data_per_rank(cls, dim_data_per_rank):
dd = dim_data_per_rank[0]
if dd['dist_type'] != 'u':
msg = "Wrong dist_type (%r) for unstructured map."
raise ValueError(msg % dd['dist_type'])
size = dd['size']
grid_size = dd['proc_grid_size']
if grid_size != len(dim_data_seq):
if grid_size != len(dim_data_per_rank):
msg = ("Number of dimension dictionaries (%r)"
"inconsistent with proc_grid_size (%r).")
raise ValueError(msg % (len(dim_data_seq), grid_size))
indices = [dd['indices'] for dd in dim_data_seq]
raise ValueError(msg % (len(dim_data_per_rank), grid_size))
indices = [dd['indices'] for dd in dim_data_per_rank]
return cls(size, grid_size, indices=indices)

def __init__(self, size, grid_size, indices=None):
Expand Down Expand Up @@ -387,13 +388,13 @@ class Distribution(object):
"""

@classmethod
def from_dim_data(cls, context, dim_datas):
def from_dim_data_per_rank(cls, context, dim_data_per_rank):
""" Creates a Distribution from a sequence of `dim_data` dictionary
tuples from each LocalArray.
"""

self = cls.__new__(cls)
dd0 = dim_datas[0]
dd0 = dim_data_per_rank[0]
self.context = context
self.shape = tuple(dd['size'] for dd in dd0)
self.ndim = len(dd0)
Expand All @@ -402,16 +403,17 @@ def from_dim_data(cls, context, dim_datas):

validate_grid_shape(self.grid_shape, self.dist, len(context.targets))

coords = [tuple(d['proc_grid_rank'] for d in dd) for dd in dim_datas]
coords = [tuple(d['proc_grid_rank'] for d in dd) for dd in
dim_data_per_rank]
self.rank_from_coords = {c: r for (r, c) in enumerate(coords)}

dim_data_per_dim = [_compactify_dicts(dict_tuple)
for dict_tuple in zip(*dim_datas)]
for dict_tuple in zip(*dim_data_per_rank)]

if len(dim_data_per_dim) != self.ndim:
raise ValueError("Inconsistent dimensions.")

self.maps = [map_from_dim_datas(ddpd) for ddpd in dim_data_per_dim]
self.maps = [map_from_dim_data_per_rank(ddpd) for ddpd in dim_data_per_dim]

return self

Expand Down Expand Up @@ -481,7 +483,7 @@ def owning_targets(self, idxs):
"""
return [self.context.targets[r] for r in self.owning_ranks(idxs)]

def get_local_dim_datas(self):
def get_dim_data_per_rank(self):
dds = [enumerate(m.get_dimdicts()) for m in self.maps]
cart_dds = product(*dds)
coord_and_dd = [zip(*cdd) for cdd in cart_dds]
Expand Down
2 changes: 1 addition & 1 deletion distarray/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def from_global_dim_data(self, global_dim_data, dtype=float):
"""
# global_dim_data is a sequence of dictionaries, one per dimension.
mdmap = Distribution(self, global_dim_data)
dim_data_per_rank = mdmap.get_local_dim_datas()
dim_data_per_rank = mdmap.get_dim_data_per_rank()

if len(self.targets) != len(dim_data_per_rank):
errmsg = "`dim_data_per_rank` must contain a dim_data for every rank."
Expand Down
31 changes: 17 additions & 14 deletions distarray/local/tests/paralleltest_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,23 +161,26 @@ def tearDown(self):
os.remove(self.output_path)

def test_load_bn(self):
dim_datas = bn_test_data
la = load_npy(self.output_path, dim_datas[self.rank], comm=self.comm)
dim_data_per_rank = bn_test_data
la = load_npy(self.output_path, dim_data_per_rank[self.rank],
comm=self.comm)
assert_equal(la, self.expected[numpy.newaxis, self.rank])

def test_load_nc(self):
dim_datas = nc_test_data
dim_data_per_rank = nc_test_data
expected_slices = [(slice(None), slice(0, None, 2)),
(slice(None), slice(1, None, 2))]

la = load_npy(self.output_path, dim_datas[self.rank], comm=self.comm)
la = load_npy(self.output_path, dim_data_per_rank[self.rank],
comm=self.comm)
assert_equal(la, self.expected[expected_slices[self.rank]])

def test_load_nu(self):
dim_datas = nu_test_data
expected_indices = [dd[1]['indices'] for dd in dim_datas]
dim_data_per_rank = nu_test_data
expected_indices = [dd[1]['indices'] for dd in dim_data_per_rank]

la = load_npy(self.output_path, dim_datas[self.rank], comm=self.comm)
la = load_npy(self.output_path, dim_data_per_rank[self.rank],
comm=self.comm)
assert_equal(la, self.expected[:, expected_indices[self.rank]])


Expand Down Expand Up @@ -253,28 +256,28 @@ def tearDown(self):
os.remove(self.output_path)

def test_load_bn(self):
dim_datas = bn_test_data
la = load_hdf5(self.output_path, dim_datas[self.rank],
dim_data_per_rank = bn_test_data
la = load_hdf5(self.output_path, dim_data_per_rank[self.rank],
key=self.key, comm=self.comm)
with self.h5py.File(self.output_path, 'r', driver='mpio',
comm=self.comm) as fp:
assert_equal(la, self.expected[numpy.newaxis, self.rank])

def test_load_nc(self):
dim_datas = nc_test_data
dim_data_per_rank = nc_test_data
expected_slices = [(slice(None), slice(0, None, 2)),
(slice(None), slice(1, None, 2))]
la = load_hdf5(self.output_path, dim_datas[self.rank],
la = load_hdf5(self.output_path, dim_data_per_rank[self.rank],
key=self.key, comm=self.comm)
with self.h5py.File(self.output_path, 'r', driver='mpio',
comm=self.comm) as fp:
expected_slice = expected_slices[self.rank]
assert_equal(la, self.expected[expected_slice])

def test_load_nu(self):
dim_datas = nu_test_data
expected_indices = [dd[1]['indices'] for dd in dim_datas]
la = load_hdf5(self.output_path, dim_datas[self.rank],
dim_data_per_rank = nu_test_data
expected_indices = [dd[1]['indices'] for dd in dim_data_per_rank]
la = load_hdf5(self.output_path, dim_data_per_rank[self.rank],
key=self.key, comm=self.comm)
with self.h5py.File(self.output_path, 'r', driver='mpio',
comm=self.comm) as fp:
Expand Down
6 changes: 3 additions & 3 deletions distarray/local/tests/paralleltest_localarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def test_pack_unpack_index(self):

class TestLocalArrayMethods(MpiTestCase):

ddpp = [
ddpr = [
({'dist_type': 'c',
'block_size': 1,
'size': 4,
Expand Down Expand Up @@ -413,7 +413,7 @@ def test_copy_bn(self):
assert_localarrays_equal(a, b, check_dtype=True)

def test_copy_cbc(self):
distribution = Distribution(self.ddpp[self.comm.Get_rank()],
distribution = Distribution(self.ddpr[self.comm.Get_rank()],
comm=self.comm)
a = LocalArray(distribution, dtype=np.int_)
a.fill(12)
Expand All @@ -432,7 +432,7 @@ def test_astype_bn(self):

def test_astype_cbc(self):
new_dtype = np.int8
d = Distribution(self.ddpp[self.comm.Get_rank()], comm=self.comm)
d = Distribution(self.ddpr[self.comm.Get_rank()], comm=self.comm)
a = LocalArray(d, dtype=np.int32)
a.fill(12)
b = a.astype(new_dtype)
Expand Down
11 changes: 6 additions & 5 deletions distarray/local/tests/paralleltest_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ def test_block(self):
"size": 16,
}
dd1 = (dim10, dim11)
dim_datas = (dd0, dd1)
dim_data_per_rank = (dd0, dd1)

d0 = Distribution(dim_datas[self.comm.Get_rank()], comm=self.comm)
d0 = Distribution(dim_data_per_rank[self.comm.Get_rank()],
comm=self.comm)
d1 = Distribution.from_shape((16, 16), dist={0: 'b'},
grid_shape=(2, 1), comm=self.comm)
self.assert_alike(d0, d1)
Expand Down Expand Up @@ -197,10 +198,10 @@ def test_cyclic(self):
}
dd1 = (dim10, dim11)

dim_datas = (dd0, dd1)
dim_data_per_rank = (dd0, dd1)


larr = Distribution(dim_datas[self.comm.Get_rank()], comm=self.comm)
larr = Distribution(dim_data_per_rank[self.comm.Get_rank()],
comm=self.comm)
expected = Distribution.from_shape((16, 16), dist={1: 'c'},
grid_shape=(1, 2), comm=self.comm)

Expand Down
2 changes: 1 addition & 1 deletion distarray/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def test_global_dim_data_local_dim_data_equivalence(self):
},
)
mdmap = Distribution(self.context, glb_dim_data)
actual = mdmap.get_local_dim_datas()
actual = mdmap.get_dim_data_per_rank()

expected = [
({'block_size': 2,
Expand Down
12 changes: 6 additions & 6 deletions distarray/tests/test_distributed_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,22 @@ def tearDown(self):
self.dac.close()

def test_load_bn(self):
dim_datas = bn_test_data
da = self.dac.load_npy(self.output_path, dim_datas)
dim_data_per_rank = bn_test_data
da = self.dac.load_npy(self.output_path, dim_data_per_rank)
for i in range(da.shape[0]):
for j in range(da.shape[1]):
self.assertEqual(da[i, j], self.expected[i, j])

def test_load_nc(self):
dim_datas = nc_test_data
da = self.dac.load_npy(self.output_path, dim_datas)
dim_data_per_rank = nc_test_data
da = self.dac.load_npy(self.output_path, dim_data_per_rank)
for i in range(da.shape[0]):
for j in range(da.shape[1]):
self.assertEqual(da[i, j], self.expected[i, j])

def test_load_nu(self):
dim_datas = nu_test_data
da = self.dac.load_npy(self.output_path, dim_datas)
dim_data_per_rank = nu_test_data
da = self.dac.load_npy(self.output_path, dim_data_per_rank)
for i in range(da.shape[0]):
for j in range(da.shape[1]):
self.assertEqual(da[i, j], self.expected[i, j])
Expand Down