From 71f01ea4c28bfcd2179d1314627ea825ca3f1bbe Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Wed, 11 Jun 2014 18:30:18 -0500 Subject: [PATCH 01/27] Dumbest possible redistribution working. --- distarray/dist/context.py | 17 ++++++++----- distarray/dist/distarray.py | 19 +++++++++++++++ distarray/dist/maps.py | 10 ++++++++ distarray/dist/tests/test_distarray.py | 33 ++++++++++++++++++++++++++ distarray/dist/tests/test_maps.py | 16 +++++++++++++ 5 files changed, 89 insertions(+), 6 deletions(-) diff --git a/distarray/dist/context.py b/distarray/dist/context.py index 5294ac96..b421fdcc 100644 --- a/distarray/dist/context.py +++ b/distarray/dist/context.py @@ -558,7 +558,7 @@ def _local_fromfunction(func, comm, ddpr, kwargs): targets=distribution.targets) return DistArray.from_localarrays(da_name[0], distribution=distribution) - def apply(self, func, args=None, kwargs=None, targets=None): + def apply(self, func, args=None, kwargs=None, targets=None, default=None): """ Analogous to IPython.parallel.view.apply_sync @@ -577,7 +577,7 @@ def apply(self, func, args=None, kwargs=None, targets=None): return a list of the results on the each engine. """ - def func_wrapper(func, apply_nonce, context_key, args, kwargs): + def func_wrapper(func, apply_nonce, context_key, args, kwargs, default): """ Function which calls the applied function after grabbing all the arguments on the engines that are passed in as names of the form @@ -611,22 +611,27 @@ def func_wrapper(func, apply_nonce, context_key, args, kwargs): args = list(args) for i, a in enumerate(args): if (isinstance(a, str) and a.startswith(prefix)): - args[i] = main.reduce(getattr, [main] + a.split('.')) + try: + args[i] = main.reduce(getattr, [main] + a.split('.')) + except AttributeError: + args[i] = default args = tuple(args) # convert kwargs for k in kwargs.keys(): val = kwargs[k] if (isinstance(val, str) and val.startswith(prefix)): - kwargs[k] = main.reduce(getattr, [main] + val.split('.')) - + try: + kwargs[k] = main.reduce(getattr, [main] + val.split('.')) + except AttributeError: + kwargs[k] = default return func(*args, **kwargs) # default arguments args = () if args is None else args kwargs = {} if kwargs is None else kwargs apply_nonce = uid()[13:] - wrapped_args = (func, apply_nonce, self.context_key, args, kwargs) + wrapped_args = (func, apply_nonce, self.context_key, args, kwargs, default) targets = self.targets if targets is None else targets diff --git a/distarray/dist/distarray.py b/distarray/dist/distarray.py index 823b65aa..64d8cdda 100644 --- a/distarray/dist/distarray.py +++ b/distarray/dist/distarray.py @@ -493,6 +493,25 @@ def local_view(larr, ddpr, dtype): return DistArray.from_localarrays(key=new_key, distribution=new_dist, dtype=dtype) + def distribute_as(self, dist): + + plan = self.distribution.get_redist_plan(dist) + ubercomm = self.distribution.comm_union(dist) + result = DistArray(dist, dtype=self.dtype) + + def _local_redistribute(comm, plan, la0, la1): + myrank = comm.Get_rank() + for dta in plan: + if dta['from_rank'] == myrank: + comm.Send(la0.ndarray, dest=dta['to_rank']) + if dta['to_rank'] == myrank: + comm.Recv(la1.ndarray, source=dta['from_rank']) + + self.context.apply(_local_redistribute, (ubercomm, plan, self.key, result.key), + targets=self.context.targets) + return result + # return self.context.ones(dist) + # Binary operators def _binary_op_from_ufunc(self, other, func, rop_str=None, *args, **kwargs): diff --git a/distarray/dist/maps.py b/distarray/dist/maps.py index 08971b33..778f3820 100644 --- a/distarray/dist/maps.py +++ b/distarray/dist/maps.py @@ -836,3 +836,13 @@ def view(self, new_dimsize=None): def localshapes(self): return shapes_from_dim_data_per_rank(self.get_dim_data_per_rank()) + + def get_redist_plan(self, other_dist): + return [ + {'from_rank': 1, 'to_rank': 0, 'from_indices': (0, 20), 'to_indices': (0, 20)}, + {'from_rank': 3, 'to_rank': 2, 'from_indices': (20, 40), 'to_indices': (20, 40)}, + ] + + def comm_union(self, *dists): + all_targets = sorted(reduce(set.union, [d.targets for d in dists], set(self.targets))) + return self.context._make_subcomm(all_targets) diff --git a/distarray/dist/tests/test_distarray.py b/distarray/dist/tests/test_distarray.py index dba4031d..ed680e59 100644 --- a/distarray/dist/tests/test_distarray.py +++ b/distarray/dist/tests/test_distarray.py @@ -1014,5 +1014,38 @@ def test_incompatible_dtype(self): da.view(dtype=dtype) +class TestBlockRedistribution(ContextTestCase): + + def test_redist_1D(self): + dist0 = Distribution.from_shape(self.context, + (40,), ('b',), (2,), + targets=[1,3]) + dist1 = Distribution.from_shape(self.context, + (40,), ('b',), (2,), + targets=[0,2]) + da = self.context.ones(dist0) + db = da.distribute_as(dist1) + + self.assertIs(db.distribution, dist1) + self.assertSequenceEqual(db.localshapes(), da.localshapes()) + assert_array_equal(da.tondarray(), db.tondarray()) + + dist2 = Distribution.from_shape(self.context, + (40,), ('b',), (2,), + targets=[0,2]) + + da.fill(-42) + dc = da.distribute_as(dist2) + self.assertIs(dc.distribution, dist2) + self.assertSequenceEqual(dc.localshapes(), da.localshapes()) + assert_array_equal(da.tondarray(), dc.tondarray()) + + # dist0 = Distribution.from_shape(self.context, (40,), ('b',), (4,)) + # dist1 = Distribution.from_shape(self.context, (40,), ('b',), (3,), + # targets=self.context.targets[:3]) + # self.assertEqual(len(dist0.targets), 4) + # self.assertEqual(len(dist1.targets), 3) + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/distarray/dist/tests/test_maps.py b/distarray/dist/tests/test_maps.py index c7823bd3..3654a8ed 100644 --- a/distarray/dist/tests/test_maps.py +++ b/distarray/dist/tests/test_maps.py @@ -299,6 +299,22 @@ def test_all_n_dist(self): dist=('n', 'n')) self.context.ones(distribution) +class TestRedistribution(ContextTestCase): + + def test_block_redistribution(self): + dist0 = client_map.Distribution.from_shape(self.context, + (40,), ('b',), (2,), + targets=[1,3]) + dist1 = client_map.Distribution.from_shape(self.context, + (40,), ('b',), (2,), + targets=[0,2]) + plan = dist0.get_redist_plan(dist1) + expected = [ + {'from_rank': 1, 'to_rank': 0, 'from_indices': (0, 20), 'to_indices': (0, 20)}, + {'from_rank': 3, 'to_rank': 2, 'from_indices': (20, 40), 'to_indices': (20, 40)}, + ] + self.assertEqual(plan, expected) + class TestNoEmptyLocals(ContextTestCase): From 0b53ccde0a1c98daf324ee0e7b5f0a7c468245c4 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Thu, 12 Jun 2014 16:10:12 -0500 Subject: [PATCH 02/27] Generalize local redistribution to handle slices. --- distarray/dist/distarray.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/distarray/dist/distarray.py b/distarray/dist/distarray.py index 64d8cdda..491f0eeb 100644 --- a/distarray/dist/distarray.py +++ b/distarray/dist/distarray.py @@ -499,18 +499,23 @@ def distribute_as(self, dist): ubercomm = self.distribution.comm_union(dist) result = DistArray(dist, dtype=self.dtype) - def _local_redistribute(comm, plan, la0, la1): + def _local_redistribute(comm, plan, la_from, la_to): myrank = comm.Get_rank() for dta in plan: if dta['from_rank'] == myrank: - comm.Send(la0.ndarray, dest=dta['to_rank']) - if dta['to_rank'] == myrank: - comm.Recv(la1.ndarray, source=dta['from_rank']) + start, stop = dta['from_indices'][:2] + local_start, local_stop = map(la_from.local_from_global, (start, stop-1)) + from_slice = slice(local_start[0], local_stop[0] + 1) + comm.Send(la_from.ndarray[from_slice], dest=dta['to_rank']) + elif dta['to_rank'] == myrank: + start, stop = dta['to_indices'][:2] + local_start, local_stop = map(la_to.local_from_global, (start, stop-1)) + to_slice = slice(local_start[0], local_stop[0] + 1) + comm.Recv(la_to.ndarray[to_slice], source=dta['from_rank']) self.context.apply(_local_redistribute, (ubercomm, plan, self.key, result.key), targets=self.context.targets) return result - # return self.context.ones(dist) # Binary operators From 6abb248139468eb9acc08a26314ab31e9de3307a Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Fri, 13 Jun 2014 15:02:41 -0500 Subject: [PATCH 03/27] Generalize plan creation for block redistribution. --- distarray/dist/distarray.py | 16 ++++----- distarray/dist/maps.py | 56 ++++++++++++++++++++++++++++--- distarray/dist/tests/test_maps.py | 28 ++++++++++++---- 3 files changed, 81 insertions(+), 19 deletions(-) diff --git a/distarray/dist/distarray.py b/distarray/dist/distarray.py index 491f0eeb..844b031d 100644 --- a/distarray/dist/distarray.py +++ b/distarray/dist/distarray.py @@ -502,16 +502,16 @@ def distribute_as(self, dist): def _local_redistribute(comm, plan, la_from, la_to): myrank = comm.Get_rank() for dta in plan: - if dta['from_rank'] == myrank: - start, stop = dta['from_indices'][:2] + if dta['source_rank'] == myrank: + start, stop = dta['indices'][:2] local_start, local_stop = map(la_from.local_from_global, (start, stop-1)) - from_slice = slice(local_start[0], local_stop[0] + 1) - comm.Send(la_from.ndarray[from_slice], dest=dta['to_rank']) - elif dta['to_rank'] == myrank: - start, stop = dta['to_indices'][:2] + source_slice = slice(local_start[0], local_stop[0] + 1) + comm.Send(la_from.ndarray[source_slice], dest=dta['dest_rank']) + elif dta['dest_rank'] == myrank: + start, stop = dta['indices'][:2] local_start, local_stop = map(la_to.local_from_global, (start, stop-1)) - to_slice = slice(local_start[0], local_stop[0] + 1) - comm.Recv(la_to.ndarray[to_slice], source=dta['from_rank']) + dest_slice = slice(local_start[0], local_stop[0] + 1) + comm.Recv(la_to.ndarray[dest_slice], source=dta['source_rank']) self.context.apply(_local_redistribute, (ubercomm, plan, self.key, result.key), targets=self.context.targets) diff --git a/distarray/dist/maps.py b/distarray/dist/maps.py index 778f3820..73e07b03 100644 --- a/distarray/dist/maps.py +++ b/distarray/dist/maps.py @@ -838,11 +838,59 @@ def localshapes(self): return shapes_from_dim_data_per_rank(self.get_dim_data_per_rank()) def get_redist_plan(self, other_dist): - return [ - {'from_rank': 1, 'to_rank': 0, 'from_indices': (0, 20), 'to_indices': (0, 20)}, - {'from_rank': 3, 'to_rank': 2, 'from_indices': (20, 40), 'to_indices': (20, 40)}, - ] + # Get all targets + all_targets = sorted(set(self.targets + other_dist.targets)) + union_rank_from_target = {t: r for (r, t) in enumerate(all_targets)} + + source_ranks = range(len(self.targets)) + source_targets = self.targets + union_rank_from_source_rank = {sr: union_rank_from_target[st] for (sr, st) in zip(source_ranks, source_targets)} + + dest_ranks = range(len(other_dist.targets)) + dest_targets = other_dist.targets + union_rank_from_dest_rank = {sr: union_rank_from_target[st] for (sr, st) in zip(dest_ranks, dest_targets)} + + source_ddpr = self.get_dim_data_per_rank() + dest_ddpr = other_dist.get_dim_data_per_rank() + source_dest_pairs = product(source_ddpr, dest_ddpr) + + plan = [] + for source_dd, dest_dd in source_dest_pairs: + intersection = indices_intersection(source_dd, dest_dd) + if intersection: + source_coords = tuple(dd['proc_grid_rank'] for dd in source_dd) + source_rank = self.rank_from_coords[source_coords] + dest_coords = tuple(dd['proc_grid_rank'] for dd in dest_dd) + dest_rank = other_dist.rank_from_coords[dest_coords] + plan.append( + { + 'source_rank': union_rank_from_source_rank[source_rank], + 'dest_rank': union_rank_from_dest_rank[dest_rank], + 'indices': intersection, + } + ) + + return plan def comm_union(self, *dists): all_targets = sorted(reduce(set.union, [d.targets for d in dists], set(self.targets))) return self.context._make_subcomm(all_targets) + + +def indices_intersection(source_dimdata, dest_dimdata): + if not (len(source_dimdata) == len(dest_dimdata) == 1): + raise NotImplementedError() + + source_dimdict = source_dimdata[0] + dest_dimdict = dest_dimdata[0] + + if not (source_dimdict['dist_type'] == dest_dimdict['dist_type'] == 'b'): + raise ValueError("Only 'b' dist_type supported") + + source_idxs = source_dimdict['start'], source_dimdict['stop'] + dest_idxs = dest_dimdict['start'], dest_dimdict['stop'] + + lower = max(source_idxs[0], dest_idxs[0]) + upper = min(source_idxs[1], dest_idxs[1]) + + return (lower, upper) if upper > lower else () diff --git a/distarray/dist/tests/test_maps.py b/distarray/dist/tests/test_maps.py index 3654a8ed..0c7d4a2e 100644 --- a/distarray/dist/tests/test_maps.py +++ b/distarray/dist/tests/test_maps.py @@ -301,17 +301,31 @@ def test_all_n_dist(self): class TestRedistribution(ContextTestCase): - def test_block_redistribution(self): - dist0 = client_map.Distribution.from_shape(self.context, + def test_block_redistribution_one_to_one(self): + source_dist = client_map.Distribution.from_shape(self.context, (40,), ('b',), (2,), - targets=[1,3]) - dist1 = client_map.Distribution.from_shape(self.context, + targets=[1, 3]) + dest_dist = client_map.Distribution.from_shape(self.context, + (40,), ('b',), (2,), + targets=[0, 2]) + plan = source_dist.get_redist_plan(dest_dist) + expected = [ + {'source_rank': 1, 'dest_rank': 0, 'indices': (0, 20)}, + {'source_rank': 3, 'dest_rank': 2, 'indices': (20, 40)}, + ] + self.assertEqual(plan, expected) + + def test_block_redist_one_to_many(self): + source_dist = client_map.Distribution.from_shape(self.context, + (40,), ('b',), (1,), + targets=[1]) + dest_dist = client_map.Distribution.from_shape(self.context, (40,), ('b',), (2,), targets=[0,2]) - plan = dist0.get_redist_plan(dist1) + plan = source_dist.get_redist_plan(dest_dist) expected = [ - {'from_rank': 1, 'to_rank': 0, 'from_indices': (0, 20), 'to_indices': (0, 20)}, - {'from_rank': 3, 'to_rank': 2, 'from_indices': (20, 40), 'to_indices': (20, 40)}, + {'source_rank': 1, 'dest_rank': 0, 'indices': (0, 20)}, + {'source_rank': 1, 'dest_rank': 2, 'indices': (20, 40)}, ] self.assertEqual(plan, expected) From 04656ec833ad40e28fd6292012d968b3c53ee124 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Fri, 13 Jun 2014 15:06:24 -0500 Subject: [PATCH 04/27] Cleanup imports. --- distarray/dist/tests/test_maps.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distarray/dist/tests/test_maps.py b/distarray/dist/tests/test_maps.py index 0c7d4a2e..585cf1ca 100644 --- a/distarray/dist/tests/test_maps.py +++ b/distarray/dist/tests/test_maps.py @@ -302,10 +302,10 @@ def test_all_n_dist(self): class TestRedistribution(ContextTestCase): def test_block_redistribution_one_to_one(self): - source_dist = client_map.Distribution.from_shape(self.context, + source_dist = Distribution.from_shape(self.context, (40,), ('b',), (2,), targets=[1, 3]) - dest_dist = client_map.Distribution.from_shape(self.context, + dest_dist = Distribution.from_shape(self.context, (40,), ('b',), (2,), targets=[0, 2]) plan = source_dist.get_redist_plan(dest_dist) @@ -316,10 +316,10 @@ def test_block_redistribution_one_to_one(self): self.assertEqual(plan, expected) def test_block_redist_one_to_many(self): - source_dist = client_map.Distribution.from_shape(self.context, + source_dist = Distribution.from_shape(self.context, (40,), ('b',), (1,), targets=[1]) - dest_dist = client_map.Distribution.from_shape(self.context, + dest_dist = Distribution.from_shape(self.context, (40,), ('b',), (2,), targets=[0,2]) plan = source_dist.get_redist_plan(dest_dist) From 3857d0220a5bf1a531ed1442781cea4eb9d9e53b Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Fri, 13 Jun 2014 16:28:01 -0500 Subject: [PATCH 05/27] Use tuple_intersection(). --- distarray/dist/maps.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/distarray/dist/maps.py b/distarray/dist/maps.py index 73e07b03..5760a171 100644 --- a/distarray/dist/maps.py +++ b/distarray/dist/maps.py @@ -890,7 +890,4 @@ def indices_intersection(source_dimdata, dest_dimdata): source_idxs = source_dimdict['start'], source_dimdict['stop'] dest_idxs = dest_dimdict['start'], dest_dimdict['stop'] - lower = max(source_idxs[0], dest_idxs[0]) - upper = min(source_idxs[1], dest_idxs[1]) - - return (lower, upper) if upper > lower else () + return tuple_intersection(source_idxs, dest_idxs) From 5120cab4e0a2c60b8d6e36b23af9cdb63e14129f Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Tue, 17 Jun 2014 16:23:26 -0500 Subject: [PATCH 06/27] Fix after rebase. --- distarray/dist/tests/test_maps.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/distarray/dist/tests/test_maps.py b/distarray/dist/tests/test_maps.py index 585cf1ca..875dd71c 100644 --- a/distarray/dist/tests/test_maps.py +++ b/distarray/dist/tests/test_maps.py @@ -310,8 +310,8 @@ def test_block_redistribution_one_to_one(self): targets=[0, 2]) plan = source_dist.get_redist_plan(dest_dist) expected = [ - {'source_rank': 1, 'dest_rank': 0, 'indices': (0, 20)}, - {'source_rank': 3, 'dest_rank': 2, 'indices': (20, 40)}, + {'source_rank': 1, 'dest_rank': 0, 'indices': (0, 20, 1)}, + {'source_rank': 3, 'dest_rank': 2, 'indices': (20, 40, 1)}, ] self.assertEqual(plan, expected) @@ -324,11 +324,23 @@ def test_block_redist_one_to_many(self): targets=[0,2]) plan = source_dist.get_redist_plan(dest_dist) expected = [ - {'source_rank': 1, 'dest_rank': 0, 'indices': (0, 20)}, - {'source_rank': 1, 'dest_rank': 2, 'indices': (20, 40)}, + {'source_rank': 1, 'dest_rank': 0, 'indices': (0, 20, 1)}, + {'source_rank': 1, 'dest_rank': 2, 'indices': (20, 40, 1)}, ] self.assertEqual(plan, expected) + def test_block_redist_identity(self): + source_dist = dest_dist = Distribution.from_shape(self.context, + (40,), ('b',), (4,)) + plan_a = source_dist.get_redist_plan(dest_dist) + plan_b = dest_dist.get_redist_plan(source_dist) + self.assertSequenceEqual(plan_a, plan_b) + for p, lshape in zip(plan_a, source_dist.localshapes()): + self.assertEqual(p['source_rank'], p['dest_rank']) + start, stop, step = p['indices'] + self.assertEqual(step, 1) + self.assertEqual(stop - start, lshape[0]) + class TestNoEmptyLocals(ContextTestCase): From fb74fe96d55e6eff11d21f9189d672f0a0c464f7 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Tue, 17 Jun 2014 16:57:42 -0500 Subject: [PATCH 07/27] Adds redistribution test for many-to-one. --- distarray/dist/tests/test_maps.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/distarray/dist/tests/test_maps.py b/distarray/dist/tests/test_maps.py index 875dd71c..5981fba6 100644 --- a/distarray/dist/tests/test_maps.py +++ b/distarray/dist/tests/test_maps.py @@ -329,6 +329,20 @@ def test_block_redist_one_to_many(self): ] self.assertEqual(plan, expected) + def test_block_redist_many_to_one(self): + source_dist = Distribution.from_shape(self.context, + (40,), ('b',), (2,), + targets=[1, 2]) + dest_dist = Distribution.from_shape(self.context, + (40,), ('b',), (1,), + targets=[0]) + plan = source_dist.get_redist_plan(dest_dist) + expected = [ + {'source_rank': 1, 'dest_rank': 0, 'indices': (0, 20, 1)}, + {'source_rank': 2, 'dest_rank': 0, 'indices': (20, 40, 1)}, + ] + self.assertEqual(plan, expected) + def test_block_redist_identity(self): source_dist = dest_dist = Distribution.from_shape(self.context, (40,), ('b',), (4,)) From 0f40f8432dd04d8f619d442e216924ec67d921db Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Tue, 17 Jun 2014 17:39:38 -0500 Subject: [PATCH 08/27] Multidimensional block redistribution working. --- distarray/dist/distarray.py | 20 ++++++++++++-------- distarray/dist/maps.py | 28 +++++++++++++++------------- distarray/dist/tests/test_maps.py | 24 +++++++++++++++++------- 3 files changed, 44 insertions(+), 28 deletions(-) diff --git a/distarray/dist/distarray.py b/distarray/dist/distarray.py index 844b031d..4b4536ea 100644 --- a/distarray/dist/distarray.py +++ b/distarray/dist/distarray.py @@ -503,15 +503,19 @@ def _local_redistribute(comm, plan, la_from, la_to): myrank = comm.Get_rank() for dta in plan: if dta['source_rank'] == myrank: - start, stop = dta['indices'][:2] - local_start, local_stop = map(la_from.local_from_global, (start, stop-1)) - source_slice = slice(local_start[0], local_stop[0] + 1) - comm.Send(la_from.ndarray[source_slice], dest=dta['dest_rank']) + source_slices = [] + for indices in dta['indices']: + start, stop = indices[:2] + local_start, local_stop = map(la_from.local_from_global, (start, stop-1)) + source_slices.append(slice(local_start[0], local_stop[0] + 1)) + comm.Send(la_from.ndarray[source_slices], dest=dta['dest_rank']) elif dta['dest_rank'] == myrank: - start, stop = dta['indices'][:2] - local_start, local_stop = map(la_to.local_from_global, (start, stop-1)) - dest_slice = slice(local_start[0], local_stop[0] + 1) - comm.Recv(la_to.ndarray[dest_slice], source=dta['source_rank']) + dest_slices = [] + for indices in dta['indices']: + start, stop = indices[:2] + local_start, local_stop = map(la_to.local_from_global, (start, stop-1)) + dest_slices.append(slice(local_start[0], local_stop[0] + 1)) + comm.Recv(la_to.ndarray[dest_slices], source=dta['source_rank']) self.context.apply(_local_redistribute, (ubercomm, plan, self.key, result.key), targets=self.context.targets) diff --git a/distarray/dist/maps.py b/distarray/dist/maps.py index 5760a171..2b06ec8a 100644 --- a/distarray/dist/maps.py +++ b/distarray/dist/maps.py @@ -856,8 +856,8 @@ def get_redist_plan(self, other_dist): plan = [] for source_dd, dest_dd in source_dest_pairs: - intersection = indices_intersection(source_dd, dest_dd) - if intersection: + intersections = indices_intersections(source_dd, dest_dd) + if any(i for i in intersections): source_coords = tuple(dd['proc_grid_rank'] for dd in source_dd) source_rank = self.rank_from_coords[source_coords] dest_coords = tuple(dd['proc_grid_rank'] for dd in dest_dd) @@ -866,7 +866,7 @@ def get_redist_plan(self, other_dist): { 'source_rank': union_rank_from_source_rank[source_rank], 'dest_rank': union_rank_from_dest_rank[dest_rank], - 'indices': intersection, + 'indices': intersections, } ) @@ -877,17 +877,19 @@ def comm_union(self, *dists): return self.context._make_subcomm(all_targets) -def indices_intersection(source_dimdata, dest_dimdata): - if not (len(source_dimdata) == len(dest_dimdata) == 1): - raise NotImplementedError() +def indices_intersections(source_dimdata, dest_dimdata): - source_dimdict = source_dimdata[0] - dest_dimdict = dest_dimdata[0] + intersections = [] + for source_dimdict, dest_dimdict in zip(source_dimdata, dest_dimdata): + source_dimdict = source_dimdata[0] + dest_dimdict = dest_dimdata[0] - if not (source_dimdict['dist_type'] == dest_dimdict['dist_type'] == 'b'): - raise ValueError("Only 'b' dist_type supported") + if not (source_dimdict['dist_type'] == dest_dimdict['dist_type'] == 'b'): + raise ValueError("Only 'b' dist_type supported") - source_idxs = source_dimdict['start'], source_dimdict['stop'] - dest_idxs = dest_dimdict['start'], dest_dimdict['stop'] + source_idxs = source_dimdict['start'], source_dimdict['stop'] + dest_idxs = dest_dimdict['start'], dest_dimdict['stop'] - return tuple_intersection(source_idxs, dest_idxs) + intersections.append(tuple_intersection(source_idxs, dest_idxs)) + + return intersections diff --git a/distarray/dist/tests/test_maps.py b/distarray/dist/tests/test_maps.py index 5981fba6..471a90f8 100644 --- a/distarray/dist/tests/test_maps.py +++ b/distarray/dist/tests/test_maps.py @@ -310,8 +310,8 @@ def test_block_redistribution_one_to_one(self): targets=[0, 2]) plan = source_dist.get_redist_plan(dest_dist) expected = [ - {'source_rank': 1, 'dest_rank': 0, 'indices': (0, 20, 1)}, - {'source_rank': 3, 'dest_rank': 2, 'indices': (20, 40, 1)}, + {'source_rank': 1, 'dest_rank': 0, 'indices': [(0, 20, 1)]}, + {'source_rank': 3, 'dest_rank': 2, 'indices': [(20, 40, 1)]}, ] self.assertEqual(plan, expected) @@ -324,8 +324,8 @@ def test_block_redist_one_to_many(self): targets=[0,2]) plan = source_dist.get_redist_plan(dest_dist) expected = [ - {'source_rank': 1, 'dest_rank': 0, 'indices': (0, 20, 1)}, - {'source_rank': 1, 'dest_rank': 2, 'indices': (20, 40, 1)}, + {'source_rank': 1, 'dest_rank': 0, 'indices': [(0, 20, 1)]}, + {'source_rank': 1, 'dest_rank': 2, 'indices': [(20, 40, 1)]}, ] self.assertEqual(plan, expected) @@ -338,8 +338,8 @@ def test_block_redist_many_to_one(self): targets=[0]) plan = source_dist.get_redist_plan(dest_dist) expected = [ - {'source_rank': 1, 'dest_rank': 0, 'indices': (0, 20, 1)}, - {'source_rank': 2, 'dest_rank': 0, 'indices': (20, 40, 1)}, + {'source_rank': 1, 'dest_rank': 0, 'indices': [(0, 20, 1)]}, + {'source_rank': 2, 'dest_rank': 0, 'indices': [(20, 40, 1)]}, ] self.assertEqual(plan, expected) @@ -351,10 +351,20 @@ def test_block_redist_identity(self): self.assertSequenceEqual(plan_a, plan_b) for p, lshape in zip(plan_a, source_dist.localshapes()): self.assertEqual(p['source_rank'], p['dest_rank']) - start, stop, step = p['indices'] + start, stop, step = p['indices'][0] self.assertEqual(step, 1) self.assertEqual(stop - start, lshape[0]) + def test_block_redist_2D_identity(self): + source_dist = dest_dist = Distribution.from_shape(self.context, + (10, 10), ('b', 'b'), (1, 1), + targets=[0]) + plan = source_dist.get_redist_plan(dest_dist) + expected = [ + {'source_rank': 0, 'dest_rank': 0, 'indices': [(0, 10, 1), (0, 10, 1)]}, + ] + self.assertEqual(plan, expected) + class TestNoEmptyLocals(ContextTestCase): From 868d2bbdb85a082c755fa3d72fd7a3b6a71baa74 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Thu, 19 Jun 2014 17:44:56 -0500 Subject: [PATCH 09/27] More tests and debugging of n-dimensional redistribution. --- distarray/dist/maps.py | 4 +--- distarray/dist/tests/test_maps.py | 34 +++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/distarray/dist/maps.py b/distarray/dist/maps.py index 2b06ec8a..b8e69d0e 100644 --- a/distarray/dist/maps.py +++ b/distarray/dist/maps.py @@ -857,7 +857,7 @@ def get_redist_plan(self, other_dist): plan = [] for source_dd, dest_dd in source_dest_pairs: intersections = indices_intersections(source_dd, dest_dd) - if any(i for i in intersections): + if all(i for i in intersections): source_coords = tuple(dd['proc_grid_rank'] for dd in source_dd) source_rank = self.rank_from_coords[source_coords] dest_coords = tuple(dd['proc_grid_rank'] for dd in dest_dd) @@ -881,8 +881,6 @@ def indices_intersections(source_dimdata, dest_dimdata): intersections = [] for source_dimdict, dest_dimdict in zip(source_dimdata, dest_dimdata): - source_dimdict = source_dimdata[0] - dest_dimdict = dest_dimdata[0] if not (source_dimdict['dist_type'] == dest_dimdict['dist_type'] == 'b'): raise ValueError("Only 'b' dist_type supported") diff --git a/distarray/dist/tests/test_maps.py b/distarray/dist/tests/test_maps.py index 471a90f8..eb376158 100644 --- a/distarray/dist/tests/test_maps.py +++ b/distarray/dist/tests/test_maps.py @@ -365,6 +365,40 @@ def test_block_redist_2D_identity(self): ] self.assertEqual(plan, expected) + def test_block_redist_2D_many_to_one(self): + source_dist = Distribution.from_shape(self.context, + (9, 9), ('b', 'b'), (2, 2), + targets=range(4)) + dest_dist = Distribution.from_shape(self.context, + (9, 9), ('b', 'b'), (1, 1), + targets=[2]) + plan = source_dist.get_redist_plan(dest_dist) + expected = [ + {'source_rank': 0, 'dest_rank': 2, 'indices': [(0, 5, 1), (0, 5, 1)]}, + {'source_rank': 1, 'dest_rank': 2, 'indices': [(0, 5, 1), (5, 9, 1)]}, + {'source_rank': 2, 'dest_rank': 2, 'indices': [(5, 9, 1), (0, 5, 1)]}, + {'source_rank': 3, 'dest_rank': 2, 'indices': [(5, 9, 1), (5, 9, 1)]}, + ] + for p, e in zip(plan, expected): + self.assertEqual(p, e) + + def test_block_redist_2D_one_to_many(self): + source_dist = Distribution.from_shape(self.context, + (9, 9), ('b', 'b'), (1, 1), + targets=[2]) + dest_dist = Distribution.from_shape(self.context, + (9, 9), ('b', 'b'), (2, 2), + targets=range(4)) + plan = source_dist.get_redist_plan(dest_dist) + expected = [ + {'source_rank': 2, 'dest_rank': 0, 'indices': [(0, 5, 1), (0, 5, 1)]}, + {'source_rank': 2, 'dest_rank': 1, 'indices': [(0, 5, 1), (5, 9, 1)]}, + {'source_rank': 2, 'dest_rank': 2, 'indices': [(5, 9, 1), (0, 5, 1)]}, + {'source_rank': 2, 'dest_rank': 3, 'indices': [(5, 9, 1), (5, 9, 1)]}, + ] + for p, e in zip(plan, expected): + self.assertEqual(p, e) + class TestNoEmptyLocals(ContextTestCase): From a027b7788f7bd168f73e820041000b1f2a24713b Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Fri, 20 Jun 2014 13:26:51 -0500 Subject: [PATCH 10/27] Test for local-only operation in redistribution. --- distarray/dist/distarray.py | 45 +++++++++++++++----------- distarray/dist/maps.py | 2 +- distarray/dist/tests/test_distarray.py | 33 +++++++++++++++++++ distarray/local/localarray.py | 30 +++++++++++++++++ 4 files changed, 91 insertions(+), 19 deletions(-) diff --git a/distarray/dist/distarray.py b/distarray/dist/distarray.py index 4b4536ea..8da89613 100644 --- a/distarray/dist/distarray.py +++ b/distarray/dist/distarray.py @@ -496,29 +496,38 @@ def local_view(larr, ddpr, dtype): def distribute_as(self, dist): plan = self.distribution.get_redist_plan(dist) - ubercomm = self.distribution.comm_union(dist) + ubercomm, all_targets = self.distribution.comm_union(dist) result = DistArray(dist, dtype=self.dtype) def _local_redistribute(comm, plan, la_from, la_to): - myrank = comm.Get_rank() - for dta in plan: - if dta['source_rank'] == myrank: - source_slices = [] - for indices in dta['indices']: - start, stop = indices[:2] - local_start, local_stop = map(la_from.local_from_global, (start, stop-1)) - source_slices.append(slice(local_start[0], local_stop[0] + 1)) - comm.Send(la_from.ndarray[source_slices], dest=dta['dest_rank']) - elif dta['dest_rank'] == myrank: - dest_slices = [] - for indices in dta['indices']: - start, stop = indices[:2] - local_start, local_stop = map(la_to.local_from_global, (start, stop-1)) - dest_slices.append(slice(local_start[0], local_stop[0] + 1)) - comm.Recv(la_to.ndarray[dest_slices], source=dta['source_rank']) + from distarray.local import redistribute + redistribute(comm, plan, la_from, la_to) + # myrank = comm.Get_rank() + # for dta in plan: + # if dta['source_rank'] == myrank: + # source_slices = [] + # for indices in dta['indices']: + # start, stop = indices[:2] + # local_start, local_stop = map(la_from.local_from_global, (start, stop-1)) + # source_slices.append(slice(local_start[0], local_stop[0] + 1)) + # sliced_ndarr = la_from.ndarray[source_slices] + # sliced_buffer = sliced_ndarr.ravel() + # # if not sliced_ndarr.flags.contiguous: + # # raise RuntimeError("slice %s not contiguous for Send() on rank %s" % (source_slices, myrank)) + # comm.Send(sliced_buffer, dest=dta['dest_rank']) + # elif dta['dest_rank'] == myrank: + # dest_slices = [] + # for indices in dta['indices']: + # start, stop = indices[:2] + # local_start, local_stop = map(la_to.local_from_global, (start, stop-1)) + # dest_slices.append(slice(local_start[0], local_stop[0] + 1)) + # sliced_ndarr = la_to.ndarray[dest_slices] + # if not sliced_ndarr.flags.contiguous: + # sliced_ndarr = sliced_ndarr.ravel() + # comm.Recv(sliced_ndarr, source=dta['source_rank']) self.context.apply(_local_redistribute, (ubercomm, plan, self.key, result.key), - targets=self.context.targets) + targets=all_targets) return result # Binary operators diff --git a/distarray/dist/maps.py b/distarray/dist/maps.py index b8e69d0e..42f54249 100644 --- a/distarray/dist/maps.py +++ b/distarray/dist/maps.py @@ -874,7 +874,7 @@ def get_redist_plan(self, other_dist): def comm_union(self, *dists): all_targets = sorted(reduce(set.union, [d.targets for d in dists], set(self.targets))) - return self.context._make_subcomm(all_targets) + return self.context._make_subcomm(all_targets), all_targets def indices_intersections(source_dimdata, dest_dimdata): diff --git a/distarray/dist/tests/test_distarray.py b/distarray/dist/tests/test_distarray.py index ed680e59..ea559110 100644 --- a/distarray/dist/tests/test_distarray.py +++ b/distarray/dist/tests/test_distarray.py @@ -1016,6 +1016,15 @@ def test_incompatible_dtype(self): class TestBlockRedistribution(ContextTestCase): + def test_redist_identity(self): + source_dist = dest_dist = Distribution.from_shape(self.context, + (10, 10), ('b', 'b'), (1, 1), + targets=[0]) + source_da = self.context.empty(source_dist, dtype=numpy.int32) + source_da.fill(-42) + dest_da = source_da.distribute_as(dest_dist) + assert_array_equal(source_da.tondarray(), dest_da.tondarray()) + def test_redist_1D(self): dist0 = Distribution.from_shape(self.context, (40,), ('b',), (2,), @@ -1047,5 +1056,29 @@ def test_redist_1D(self): # self.assertEqual(len(dist1.targets), 3) + # def _test_redist_2D(self): + # nrows, ncols = 7, 13 + # source_dist = Distribution.from_shape(self.context, + # (nrows, ncols), ('b', 'b'), (2, 2), + # targets=range(4)) + # dest_gdd = ( + # { + # 'dist_type': 'b', + # 'bounds': [0, nrows//3, nrows], + # }, + # { + # 'dist_type': 'b', + # 'bounds': [0, ncols//3, ncols], + # } + # ) + # import ipdb; ipdb.set_trace() + # dest_dist = Distribution(self.context, dest_gdd, targets=range(4)) + # source_da = self.context.empty(source_dist, dtype=numpy.int32) + # source_da.fill(-42) + + # dest_da = source_da.distribute_as(dest_dist) + # assert_array_equal(source_da.tondarray(), dest_da.tondarray()) + + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/distarray/local/localarray.py b/distarray/local/localarray.py index 9cc8e94b..6ad4b1ae 100644 --- a/distarray/local/localarray.py +++ b/distarray/local/localarray.py @@ -23,6 +23,36 @@ from distarray.local import format, maps from distarray.local.error import InvalidDimensionError, IncompatibleArrayError +def make_local_slices(local_arr, glb_indices): + local_slices = [] + for indices in glb_indices: + start, stop = indices[:2] + local_start, local_stop = map(local_arr.local_from_global, (start, stop-1)) + local_slices.append(slice(local_start[0], local_stop[0] + 1)) + return local_slices + +def redistribute(comm, plan, la_from, la_to): + myrank = comm.Get_rank() + for dta in plan: + if dta['source_rank'] == dta['dest_rank'] == myrank: + # simple local copy from `la_from` to `la_to` + slices = make_local_slices(la_from, dta['indices']) + la_to.ndarray[slices] = la_from.ndarray[slices] + elif dta['source_rank'] == myrank: + print(myrank, dta, "send") + source_slices = make_local_slices(la_from, dta['indices']) + sliced_ndarr = la_from.ndarray[source_slices] + # sliced_buffer = sliced_ndarr.ravel() + # if not sliced_ndarr.flags.contiguous: + # raise RuntimeError("slice %s not contiguous for Send() on rank %s" % (source_slices, myrank)) + comm.Send(sliced_ndarr, dest=dta['dest_rank']) + elif dta['dest_rank'] == myrank: + print(myrank, dta, "recv") + dest_slices = make_local_slices(la_to, dta['indices']) + sliced_ndarr = la_to.ndarray[dest_slices] + # if not sliced_ndarr.flags.contiguous: + # sliced_ndarr = sliced_ndarr.ravel() + comm.Recv(sliced_ndarr, source=dta['source_rank']) class GlobalIndex(object): """Object which provides access to global indexing on LocalArrays.""" From e8465199f90bdc0f9046ad7bec725d66d7332387 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Fri, 20 Jun 2014 17:35:00 -0500 Subject: [PATCH 11/27] More general block redistribution working. --- distarray/dist/distarray.py | 23 ------------ distarray/dist/tests/test_distarray.py | 50 +++++++++++--------------- distarray/local/localarray.py | 27 ++++++-------- 3 files changed, 31 insertions(+), 69 deletions(-) diff --git a/distarray/dist/distarray.py b/distarray/dist/distarray.py index 8da89613..ffc5ef06 100644 --- a/distarray/dist/distarray.py +++ b/distarray/dist/distarray.py @@ -502,29 +502,6 @@ def distribute_as(self, dist): def _local_redistribute(comm, plan, la_from, la_to): from distarray.local import redistribute redistribute(comm, plan, la_from, la_to) - # myrank = comm.Get_rank() - # for dta in plan: - # if dta['source_rank'] == myrank: - # source_slices = [] - # for indices in dta['indices']: - # start, stop = indices[:2] - # local_start, local_stop = map(la_from.local_from_global, (start, stop-1)) - # source_slices.append(slice(local_start[0], local_stop[0] + 1)) - # sliced_ndarr = la_from.ndarray[source_slices] - # sliced_buffer = sliced_ndarr.ravel() - # # if not sliced_ndarr.flags.contiguous: - # # raise RuntimeError("slice %s not contiguous for Send() on rank %s" % (source_slices, myrank)) - # comm.Send(sliced_buffer, dest=dta['dest_rank']) - # elif dta['dest_rank'] == myrank: - # dest_slices = [] - # for indices in dta['indices']: - # start, stop = indices[:2] - # local_start, local_stop = map(la_to.local_from_global, (start, stop-1)) - # dest_slices.append(slice(local_start[0], local_stop[0] + 1)) - # sliced_ndarr = la_to.ndarray[dest_slices] - # if not sliced_ndarr.flags.contiguous: - # sliced_ndarr = sliced_ndarr.ravel() - # comm.Recv(sliced_ndarr, source=dta['source_rank']) self.context.apply(_local_redistribute, (ubercomm, plan, self.key, result.key), targets=all_targets) diff --git a/distarray/dist/tests/test_distarray.py b/distarray/dist/tests/test_distarray.py index ea559110..12c1b8bd 100644 --- a/distarray/dist/tests/test_distarray.py +++ b/distarray/dist/tests/test_distarray.py @@ -1049,35 +1049,27 @@ def test_redist_1D(self): self.assertSequenceEqual(dc.localshapes(), da.localshapes()) assert_array_equal(da.tondarray(), dc.tondarray()) - # dist0 = Distribution.from_shape(self.context, (40,), ('b',), (4,)) - # dist1 = Distribution.from_shape(self.context, (40,), ('b',), (3,), - # targets=self.context.targets[:3]) - # self.assertEqual(len(dist0.targets), 4) - # self.assertEqual(len(dist1.targets), 3) - - - # def _test_redist_2D(self): - # nrows, ncols = 7, 13 - # source_dist = Distribution.from_shape(self.context, - # (nrows, ncols), ('b', 'b'), (2, 2), - # targets=range(4)) - # dest_gdd = ( - # { - # 'dist_type': 'b', - # 'bounds': [0, nrows//3, nrows], - # }, - # { - # 'dist_type': 'b', - # 'bounds': [0, ncols//3, ncols], - # } - # ) - # import ipdb; ipdb.set_trace() - # dest_dist = Distribution(self.context, dest_gdd, targets=range(4)) - # source_da = self.context.empty(source_dist, dtype=numpy.int32) - # source_da.fill(-42) - - # dest_da = source_da.distribute_as(dest_dist) - # assert_array_equal(source_da.tondarray(), dest_da.tondarray()) + def test_redist_2D(self): + nrows, ncols = 7, 13 + source_dist = Distribution.from_shape(self.context, + (nrows, ncols), ('b', 'b'), (2, 2), + targets=range(4)) + dest_gdd = ( + { + 'dist_type': 'b', + 'bounds': [0, nrows//3, nrows], + }, + { + 'dist_type': 'b', + 'bounds': [0, ncols//3, ncols], + } + ) + dest_dist = Distribution(self.context, dest_gdd, targets=range(4)) + source_da = self.context.empty(source_dist, dtype=numpy.int32) + source_da.fill(-42) + + dest_da = source_da.distribute_as(dest_dist) + assert_array_equal(source_da.tondarray(), dest_da.tondarray()) if __name__ == '__main__': diff --git a/distarray/local/localarray.py b/distarray/local/localarray.py index 6ad4b1ae..fea5dcf5 100644 --- a/distarray/local/localarray.py +++ b/distarray/local/localarray.py @@ -24,35 +24,28 @@ from distarray.local.error import InvalidDimensionError, IncompatibleArrayError def make_local_slices(local_arr, glb_indices): - local_slices = [] - for indices in glb_indices: - start, stop = indices[:2] - local_start, local_stop = map(local_arr.local_from_global, (start, stop-1)) - local_slices.append(slice(local_start[0], local_stop[0] + 1)) - return local_slices + slices = tuple(slice(*inds) for inds in glb_indices) + return local_arr.local_from_global(*slices) def redistribute(comm, plan, la_from, la_to): myrank = comm.Get_rank() for dta in plan: if dta['source_rank'] == dta['dest_rank'] == myrank: # simple local copy from `la_from` to `la_to` - slices = make_local_slices(la_from, dta['indices']) - la_to.ndarray[slices] = la_from.ndarray[slices] + slices_from = make_local_slices(la_from, dta['indices']) + slices_to = make_local_slices(la_to, dta['indices']) + la_to.ndarray[slices_to] = la_from.ndarray[slices_from] elif dta['source_rank'] == myrank: - print(myrank, dta, "send") source_slices = make_local_slices(la_from, dta['indices']) sliced_ndarr = la_from.ndarray[source_slices] - # sliced_buffer = sliced_ndarr.ravel() - # if not sliced_ndarr.flags.contiguous: - # raise RuntimeError("slice %s not contiguous for Send() on rank %s" % (source_slices, myrank)) - comm.Send(sliced_ndarr, dest=dta['dest_rank']) + sliced_buffer = sliced_ndarr.ravel() + comm.Send(sliced_buffer, dest=dta['dest_rank']) elif dta['dest_rank'] == myrank: - print(myrank, dta, "recv") dest_slices = make_local_slices(la_to, dta['indices']) sliced_ndarr = la_to.ndarray[dest_slices] - # if not sliced_ndarr.flags.contiguous: - # sliced_ndarr = sliced_ndarr.ravel() - comm.Recv(sliced_ndarr, source=dta['source_rank']) + recv_buffer = np.empty_like(sliced_ndarr) + comm.Recv(recv_buffer, source=dta['source_rank']) + sliced_ndarr[...] = recv_buffer class GlobalIndex(object): """Object which provides access to global indexing on LocalArrays.""" From e318fb1f619afec28812606e289411be9f849217 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Thu, 3 Jul 2014 15:20:26 -0500 Subject: [PATCH 12/27] Fix API changes after rebase. --- distarray/dist/tests/test_distarray.py | 25 ++++------- distarray/dist/tests/test_maps.py | 60 +++++++++++--------------- distarray/local/localarray.py | 2 +- 3 files changed, 35 insertions(+), 52 deletions(-) diff --git a/distarray/dist/tests/test_distarray.py b/distarray/dist/tests/test_distarray.py index 12c1b8bd..1104021e 100644 --- a/distarray/dist/tests/test_distarray.py +++ b/distarray/dist/tests/test_distarray.py @@ -1017,21 +1017,16 @@ def test_incompatible_dtype(self): class TestBlockRedistribution(ContextTestCase): def test_redist_identity(self): - source_dist = dest_dist = Distribution.from_shape(self.context, - (10, 10), ('b', 'b'), (1, 1), - targets=[0]) + source_dist = dest_dist = Distribution(self.context, (10, 10), + ('b', 'b'), (1, 1), targets=[0]) source_da = self.context.empty(source_dist, dtype=numpy.int32) source_da.fill(-42) dest_da = source_da.distribute_as(dest_dist) assert_array_equal(source_da.tondarray(), dest_da.tondarray()) def test_redist_1D(self): - dist0 = Distribution.from_shape(self.context, - (40,), ('b',), (2,), - targets=[1,3]) - dist1 = Distribution.from_shape(self.context, - (40,), ('b',), (2,), - targets=[0,2]) + dist0 = Distribution(self.context, (40,), ('b',), (2,), targets=[1,3]) + dist1 = Distribution(self.context, (40,), ('b',), (2,), targets=[0,2]) da = self.context.ones(dist0) db = da.distribute_as(dist1) @@ -1039,9 +1034,7 @@ def test_redist_1D(self): self.assertSequenceEqual(db.localshapes(), da.localshapes()) assert_array_equal(da.tondarray(), db.tondarray()) - dist2 = Distribution.from_shape(self.context, - (40,), ('b',), (2,), - targets=[0,2]) + dist2 = Distribution(self.context, (40,), ('b',), (2,), targets=[0,2]) da.fill(-42) dc = da.distribute_as(dist2) @@ -1051,9 +1044,8 @@ def test_redist_1D(self): def test_redist_2D(self): nrows, ncols = 7, 13 - source_dist = Distribution.from_shape(self.context, - (nrows, ncols), ('b', 'b'), (2, 2), - targets=range(4)) + source_dist = Distribution(self.context, (nrows, ncols), + ('b', 'b'), (2, 2), targets=range(4)) dest_gdd = ( { 'dist_type': 'b', @@ -1064,7 +1056,8 @@ def test_redist_2D(self): 'bounds': [0, ncols//3, ncols], } ) - dest_dist = Distribution(self.context, dest_gdd, targets=range(4)) + dest_dist = Distribution.from_global_dim_data(self.context, + dest_gdd, targets=range(4)) source_da = self.context.empty(source_dist, dtype=numpy.int32) source_da.fill(-42) diff --git a/distarray/dist/tests/test_maps.py b/distarray/dist/tests/test_maps.py index eb376158..9e2e3c89 100644 --- a/distarray/dist/tests/test_maps.py +++ b/distarray/dist/tests/test_maps.py @@ -302,12 +302,10 @@ def test_all_n_dist(self): class TestRedistribution(ContextTestCase): def test_block_redistribution_one_to_one(self): - source_dist = Distribution.from_shape(self.context, - (40,), ('b',), (2,), - targets=[1, 3]) - dest_dist = Distribution.from_shape(self.context, - (40,), ('b',), (2,), - targets=[0, 2]) + source_dist = Distribution(self.context, (40,), + ('b',), (2,), targets=[1, 3]) + dest_dist = Distribution(self.context, (40,), + ('b',), (2,), targets=[0, 2]) plan = source_dist.get_redist_plan(dest_dist) expected = [ {'source_rank': 1, 'dest_rank': 0, 'indices': [(0, 20, 1)]}, @@ -316,12 +314,10 @@ def test_block_redistribution_one_to_one(self): self.assertEqual(plan, expected) def test_block_redist_one_to_many(self): - source_dist = Distribution.from_shape(self.context, - (40,), ('b',), (1,), - targets=[1]) - dest_dist = Distribution.from_shape(self.context, - (40,), ('b',), (2,), - targets=[0,2]) + source_dist = Distribution(self.context, (40,), + ('b',), (1,), targets=[1]) + dest_dist = Distribution(self.context, (40,), + ('b',), (2,), targets=[0,2]) plan = source_dist.get_redist_plan(dest_dist) expected = [ {'source_rank': 1, 'dest_rank': 0, 'indices': [(0, 20, 1)]}, @@ -330,12 +326,10 @@ def test_block_redist_one_to_many(self): self.assertEqual(plan, expected) def test_block_redist_many_to_one(self): - source_dist = Distribution.from_shape(self.context, - (40,), ('b',), (2,), - targets=[1, 2]) - dest_dist = Distribution.from_shape(self.context, - (40,), ('b',), (1,), - targets=[0]) + source_dist = Distribution(self.context, (40,), + ('b',), (2,), targets=[1, 2]) + dest_dist = Distribution(self.context, (40,), + ('b',), (1,), targets=[0]) plan = source_dist.get_redist_plan(dest_dist) expected = [ {'source_rank': 1, 'dest_rank': 0, 'indices': [(0, 20, 1)]}, @@ -344,8 +338,8 @@ def test_block_redist_many_to_one(self): self.assertEqual(plan, expected) def test_block_redist_identity(self): - source_dist = dest_dist = Distribution.from_shape(self.context, - (40,), ('b',), (4,)) + source_dist = dest_dist = Distribution(self.context, (40,), + ('b',), (4,)) plan_a = source_dist.get_redist_plan(dest_dist) plan_b = dest_dist.get_redist_plan(source_dist) self.assertSequenceEqual(plan_a, plan_b) @@ -356,9 +350,9 @@ def test_block_redist_identity(self): self.assertEqual(stop - start, lshape[0]) def test_block_redist_2D_identity(self): - source_dist = dest_dist = Distribution.from_shape(self.context, - (10, 10), ('b', 'b'), (1, 1), - targets=[0]) + source_dist = dest_dist = Distribution(self.context, (10, 10), + ('b', 'b'), (1, 1), + targets=[0]) plan = source_dist.get_redist_plan(dest_dist) expected = [ {'source_rank': 0, 'dest_rank': 0, 'indices': [(0, 10, 1), (0, 10, 1)]}, @@ -366,12 +360,10 @@ def test_block_redist_2D_identity(self): self.assertEqual(plan, expected) def test_block_redist_2D_many_to_one(self): - source_dist = Distribution.from_shape(self.context, - (9, 9), ('b', 'b'), (2, 2), - targets=range(4)) - dest_dist = Distribution.from_shape(self.context, - (9, 9), ('b', 'b'), (1, 1), - targets=[2]) + source_dist = Distribution(self.context, (9, 9), + ('b', 'b'), (2, 2), targets=range(4)) + dest_dist = Distribution(self.context, (9, 9), + ('b', 'b'), (1, 1), targets=[2]) plan = source_dist.get_redist_plan(dest_dist) expected = [ {'source_rank': 0, 'dest_rank': 2, 'indices': [(0, 5, 1), (0, 5, 1)]}, @@ -383,12 +375,10 @@ def test_block_redist_2D_many_to_one(self): self.assertEqual(p, e) def test_block_redist_2D_one_to_many(self): - source_dist = Distribution.from_shape(self.context, - (9, 9), ('b', 'b'), (1, 1), - targets=[2]) - dest_dist = Distribution.from_shape(self.context, - (9, 9), ('b', 'b'), (2, 2), - targets=range(4)) + source_dist = Distribution(self.context, (9, 9), + ('b', 'b'), (1, 1), targets=[2]) + dest_dist = Distribution(self.context, (9, 9), + ('b', 'b'), (2, 2), targets=range(4)) plan = source_dist.get_redist_plan(dest_dist) expected = [ {'source_rank': 2, 'dest_rank': 0, 'indices': [(0, 5, 1), (0, 5, 1)]}, diff --git a/distarray/local/localarray.py b/distarray/local/localarray.py index fea5dcf5..3319c9cd 100644 --- a/distarray/local/localarray.py +++ b/distarray/local/localarray.py @@ -25,7 +25,7 @@ def make_local_slices(local_arr, glb_indices): slices = tuple(slice(*inds) for inds in glb_indices) - return local_arr.local_from_global(*slices) + return local_arr.local_from_global(slices) def redistribute(comm, plan, la_from, la_to): myrank = comm.Get_rank() From 244031f320af6b0a82d7c24bf4d1657307b1e71c Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Thu, 31 Jul 2014 16:05:44 -0500 Subject: [PATCH 13/27] Add default value back to `Context.apply()`. Necessary with redistribution, since we're calling functions on processes where an array might not be defined. --- distarray/globalapi/context.py | 22 ++++++++++++++-------- distarray/globalapi/distarray.py | 2 +- distarray/mpi_engine.py | 18 +++++++++++++----- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/distarray/globalapi/context.py b/distarray/globalapi/context.py index 2809e142..c9b8fe23 100644 --- a/distarray/globalapi/context.py +++ b/distarray/globalapi/context.py @@ -740,7 +740,7 @@ def _execute(self, lines, targets): def _push(self, d, targets): return self.view.push(d, targets=targets, block=True) - def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): + def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False, default=None): """ Analogous to IPython.parallel.view.apply_sync @@ -761,7 +761,7 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): return a list of the results on the each engine. """ - def func_wrapper(func, apply_nonce, context_key, args, kwargs, autoproxyize): + def func_wrapper(func, apply_nonce, context_key, args, kwargs, autoproxyize, default): """ Function which calls the applied function after grabbing all the arguments on the engines that are passed in as names of the form @@ -794,14 +794,20 @@ def func_wrapper(func, apply_nonce, context_key, args, kwargs, autoproxyize): args = list(args) for i, a in enumerate(args): if isinstance(a, main.Proxy): - args[i] = a.dereference() + try: + args[i] = a.dereference() + except AttributeError: + args[i] = default args = tuple(args) # convert kwargs for k in kwargs.keys(): val = kwargs[k] if isinstance(val, main.Proxy): - kwargs[k] = val.dereference() + try: + kwargs[k] = val.dereference() + except AttributeError: + kwargs[k] = default result = func(*args, **kwargs) @@ -814,7 +820,7 @@ def func_wrapper(func, apply_nonce, context_key, args, kwargs, autoproxyize): args = () if args is None else args kwargs = {} if kwargs is None else kwargs apply_nonce = uid()[13:] - wrapped_args = (func, apply_nonce, self.context_key, args, kwargs, autoproxyize) + wrapped_args = (func, apply_nonce, self.context_key, args, kwargs, autoproxyize, default) targets = self.targets if targets is None else targets @@ -946,7 +952,7 @@ def _push(self, d, targets=None): msg = ('push', d) return self._send_msg(msg, targets=targets) - def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): + def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False, default=None): """ Analogous to IPython.parallel.view.apply_sync @@ -984,10 +990,10 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): func_data = (func_code, func_name, func_defaults, func_closure) - msg = ('func_call', func_data, args, kwargs, apply_metadata, autoproxyize) + msg = ('func_call', func_data, args, kwargs, apply_metadata, autoproxyize, default) else: - msg = ('builtin_call', func, args, kwargs, autoproxyize) + msg = ('builtin_call', func, args, kwargs, autoproxyize, default) self._send_msg(msg, targets=targets) return self._recv_msg(targets=targets) diff --git a/distarray/globalapi/distarray.py b/distarray/globalapi/distarray.py index 6801c248..cbf95818 100644 --- a/distarray/globalapi/distarray.py +++ b/distarray/globalapi/distarray.py @@ -501,7 +501,7 @@ def distribute_as(self, dist): def _local_redistribute(comm, plan, la_from, la_to): - from distarray.local import redistribute + from distarray.localapi import redistribute redistribute(comm, plan, la_from, la_to) self.context.apply(_local_redistribute, (ubercomm, plan, self.key, result.key), diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index d99c54b1..f36613c4 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -42,20 +42,26 @@ def __init__(self): break Engine.INTERCOMM.Free() - def arg_kwarg_proxy_converter(self, args, kwargs): + def arg_kwarg_proxy_converter(self, args, kwargs, default): module = import_module('__main__') # convert args args = list(args) for i, a in enumerate(args): if isinstance(a, module.Proxy): - args[i] = a.dereference() + try: + args[i] = a.dereference() + except AttributeError: + args[i] = default args = tuple(args) # convert kwargs for k in kwargs.keys(): val = kwargs[k] if isinstance(val, module.Proxy): - kwargs[k] = val.dereference() + try: + kwargs[k] = val.dereference() + except AttributeError: + kwargs[k] = default return args, kwargs @@ -99,11 +105,12 @@ def func_call(self, msg): kwargs = msg[3] nonce, context_key = msg[4] autoproxyize = msg[5] + default = msg[6] module = import_module('__main__') module.proxyize.set_state(nonce) - args, kwargs = self.arg_kwarg_proxy_converter(args, kwargs) + args, kwargs = self.arg_kwarg_proxy_converter(args, kwargs, default) new_func_globals = module.__dict__ # add proper proxyize, context_key new_func_globals.update({'proxyize': module.proxyize, @@ -152,8 +159,9 @@ def builtin_call(self, msg): func = msg[1] args = msg[2] kwargs = msg[3] + default = msg[4] - args, kwargs = self.arg_kwarg_proxy_converter(args, kwargs) + args, kwargs = self.arg_kwarg_proxy_converter(args, kwargs, default) res = func(*args, **kwargs) Engine.INTERCOMM.send(res, dest=self.client_rank) From 4f341b793b5ed929f5fd85a6861d8c7797f5f84a Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Wed, 6 Aug 2014 13:01:54 -0500 Subject: [PATCH 14/27] WIP: on our way towards redistribution working. --- distarray/globalapi/distarray.py | 13 ++- distarray/globalapi/maps.py | 81 ++++++++++++--- distarray/globalapi/tests/test_distarray.py | 103 ++++++++++++++++++-- distarray/localapi/localarray.py | 76 +++++++++++++++ distarray/localapi/maps.py | 33 +++++-- 5 files changed, 274 insertions(+), 32 deletions(-) diff --git a/distarray/globalapi/distarray.py b/distarray/globalapi/distarray.py index cbf95818..2a0c3882 100644 --- a/distarray/globalapi/distarray.py +++ b/distarray/globalapi/distarray.py @@ -494,16 +494,23 @@ def local_view(larr, ddpr, dtype): dtype=dtype) def distribute_as(self, dist): - plan = self.distribution.get_redist_plan(dist) ubercomm, all_targets = self.distribution.comm_union(dist) result = DistArray(dist, dtype=self.dtype) - - def _local_redistribute(comm, plan, la_from, la_to): + def _local_redistribute_same_shape(comm, plan, la_from, la_to): from distarray.localapi import redistribute redistribute(comm, plan, la_from, la_to) + def _local_redistribute_general(comm, plan, la_from, la_to): + from distarray.localapi import redistribute_general + redistribute_general(comm, plan, la_from, la_to) + + if self.distribution.shape == dist.shape: + _local_redistribute = _local_redistribute_same_shape + else: + _local_redistribute = _local_redistribute_general + self.context.apply(_local_redistribute, (ubercomm, plan, self.key, result.key), targets=all_targets) return result diff --git a/distarray/globalapi/maps.py b/distarray/globalapi/maps.py index 3fa2a20f..b38b1376 100644 --- a/distarray/globalapi/maps.py +++ b/distarray/globalapi/maps.py @@ -551,7 +551,8 @@ def from_maps(cls, context, maps, targets=None): self = super(Distribution, cls).__new__(cls) self.context = context self.targets = sorted(targets or context.targets) - self.comm = self.context.make_subcomm(self.targets) + self._comm = None + # self.context.make_subcomm(self.targets) self.maps = maps self.shape = tuple(m.size for m in self.maps) self.ndim = len(self.maps) @@ -758,6 +759,12 @@ def __getitem__(self, idx): def __len__(self): return len(self.maps) + @property + def comm(self): + if self._comm is None: + self._comm = self.context.make_subcomm(self.targets) + return self._comm + @property def has_precise_index(self): """ @@ -869,6 +876,28 @@ def view(self, new_dimsize=None): def localshapes(self): return shapes_from_dim_data_per_rank(self.get_dim_data_per_rank()) + + @staticmethod + def _redist_intersection_same_shape(source_dimdata, dest_dimdata): + + intersections = [] + for source_dimdict, dest_dimdict in zip(source_dimdata, dest_dimdata): + + if not (source_dimdict['dist_type'] == dest_dimdict['dist_type'] == 'b'): + raise ValueError("Only 'b' dist_type supported") + + source_idxs = source_dimdict['start'], source_dimdict['stop'] + dest_idxs = dest_dimdict['start'], dest_dimdict['stop'] + + intersections.append(tuple_intersection(source_idxs, dest_idxs)) + + return intersections + + @staticmethod + def _redist_intersection_reshape(source_dimdata, dest_dimdata): + source_flat = global_flat_indices(source_dimdata) + dest_flat = global_flat_indices(dest_dimdata) + return global_flat_indices_intersection(source_flat, dest_flat) def get_redist_plan(self, other_dist): # Get all targets @@ -887,10 +916,15 @@ def get_redist_plan(self, other_dist): dest_ddpr = other_dist.get_dim_data_per_rank() source_dest_pairs = product(source_ddpr, dest_ddpr) + if self.shape == other_dist.shape: + _intersection = Distribution._redist_intersection_same_shape + else: + _intersection = Distribution._redist_intersection_reshape + plan = [] for source_dd, dest_dd in source_dest_pairs: - intersections = indices_intersections(source_dd, dest_dd) - if all(i for i in intersections): + intersections = _intersection(source_dd, dest_dd) + if intersections and all(i for i in intersections): source_coords = tuple(dd['proc_grid_rank'] for dd in source_dd) source_rank = self.rank_from_coords[source_coords] dest_coords = tuple(dd['proc_grid_rank'] for dd in dest_dd) @@ -910,17 +944,40 @@ def comm_union(self, *dists): return self.context.make_subcomm(all_targets), all_targets -def indices_intersections(source_dimdata, dest_dimdata): +def global_flat_indices(dim_data): + # TODO: FIXME: can be optimized when the last dimension is 'n'. + + glb_shape = tuple(dd['size'] for dd in dim_data) + + def accum(start, next): + return tuple(s * next for s in start) + (next,) + + glb_strides = reduce(accum, tuple(glb_shape[1:]) + (1,), ()) + + ranges = [range(dd['start'], dd['stop']) for dd in dim_data[:-1]] + start_ranges = ranges + [[dim_data[-1].get('start', 0)]] + stop_ranges = ranges + [[dim_data[-1].get('stop', dim_data[-1]['size'])]] + + def flatten(idx): + return sum(a * b for (a, b) in zip(idx, glb_strides)) - intersections = [] - for source_dimdict, dest_dimdict in zip(source_dimdata, dest_dimdata): + starts = map(flatten, product(*start_ranges)) + stops = map(flatten, product(*stop_ranges)) - if not (source_dimdict['dist_type'] == dest_dimdict['dist_type'] == 'b'): - raise ValueError("Only 'b' dist_type supported") + intervals = zip(starts, stops) - source_idxs = source_dimdict['start'], source_dimdict['stop'] - dest_idxs = dest_dimdict['start'], dest_dimdict['stop'] + def squeeze(accum, next): + last = accum[-1] + if not last: + return [next] + elif last[-1] != next[0]: + return accum + [next] + elif last[-1] == next[0]: + return accum[:-1] + [(last[0], next[-1])] - intersections.append(tuple_intersection(source_idxs, dest_idxs)) + intervals = reduce(squeeze, intervals, [[]]) + return intervals - return intersections +def global_flat_indices_intersection(gfis0, gfis1): + intersections = filter(None, [tuple_intersection(a, b) for (a, b) in product(gfis0, gfis1)]) + return [i[:2] for i in intersections] diff --git a/distarray/globalapi/tests/test_distarray.py b/distarray/globalapi/tests/test_distarray.py index e79d9af7..359e886d 100644 --- a/distarray/globalapi/tests/test_distarray.py +++ b/distarray/globalapi/tests/test_distarray.py @@ -20,7 +20,7 @@ from distarray.externals.six.moves import range from distarray.testing import DefaultContextTestCase from distarray.globalapi.distarray import DistArray -from distarray.globalapi.maps import Distribution +from distarray.globalapi.maps import Distribution, global_flat_indices class TestDistArray(DefaultContextTestCase): @@ -1048,13 +1048,13 @@ def test_redist_2D(self): source_dist = Distribution(self.context, (nrows, ncols), ('b', 'b'), (2, 2), targets=range(4)) dest_gdd = ( - { - 'dist_type': 'b', - 'bounds': [0, nrows//3, nrows], + { + 'dist_type': 'b', + 'bounds': [0, nrows//3, nrows], }, - { - 'dist_type': 'b', - 'bounds': [0, ncols//3, ncols], + { + 'dist_type': 'b', + 'bounds': [0, ncols//3, ncols], } ) dest_dist = Distribution.from_global_dim_data(self.context, @@ -1065,6 +1065,95 @@ def test_redist_2D(self): dest_da = source_da.distribute_as(dest_dist) assert_array_equal(source_da.tondarray(), dest_da.tondarray()) + def test_redist_reshape_same_target(self): + dist0 = Distribution(self.context, (40,), ('b',), (1,), targets=[1]) + dist1 = Distribution(self.context, (5, 8), ('b', 'n'), (1,), targets=[1]) + + da_src = self.context.empty(dist0) + da_src.fill(-10) + da_dest = da_src.distribute_as(dist1) + + expected = numpy.empty((5, 8)) + expected.fill(-10) + + assert_array_equal(da_dest.tondarray(), expected) + + def test_redist_reshape_diff_target(self): + dist0 = Distribution(self.context, (40,), ('b',), (1,), targets=[0]) + dist1 = Distribution(self.context, (5, 8), ('b', 'n'), (1,), targets=[1]) + + da_src = self.context.empty(dist0) + da_src.fill(-10) + da_dest = da_src.distribute_as(dist1) + + expected = numpy.empty((5, 8)) + expected.fill(-10) + + assert_array_equal(da_dest.tondarray(), expected) + + def test_redist_reshape_split_targets(self): + dist0 = Distribution(self.context, (40,), ('b',), (1,), targets=[0]) + dist1 = Distribution(self.context, (5, 8), ('b', 'n'), (2,), targets=[0,1]) + + da_src = self.context.empty(dist0) + da_src.fill(-10) + da_dest = da_src.distribute_as(dist1) + + expected = numpy.empty((5, 8)) + expected.fill(-10) + + assert_array_equal(da_dest.tondarray(), expected) + + def test_redist_reshape_disjoint_targets(self): + dist0 = Distribution(self.context, (40,), ('b',), (2,), targets=[1,3]) + dist1 = Distribution(self.context, (5, 8), ('b', 'n'), (2,), targets=[0,2]) + + da_src = self.context.empty(dist0) + da_src.fill(-10) + da_dest = da_src.distribute_as(dist1) + + expected = numpy.empty((5, 8)) + expected.fill(-10) + + assert_array_equal(da_dest.tondarray(), expected) + + # def test_redist_reshape_big(self): + # three_dee_shape = (13, 17, 23) + # two_dee_shape = (13 * 23, 17) + # NN = 13 * 17 * 23 + # dist0 = Distribution(self.context, three_dee_shape, ('b', 'b', 'n'), (2, 2)) + # dist1 = Distribution(self.context, two_dee_shape, ('b', 'b'), (2, 2)) + + # da_src = self.context.empty(dist0) + # da_src.fill(47) + + # da_dest = da_src.distribute_as(dist1) + + # self.assertTrue(da_dest.distribution.is_compatible(dist1)) + + # expected = numpy.empty(two_dee_shape) + # expected.fill(47) + + # assert_array_equal(da_dest.tondarray(), expected) + + def test_global_flat_indices(self): + dist0 = Distribution(self.context, (40,), ('b',), (2,), targets=[1,3]) + + self.assertSequenceEqual([[(0, 20)], [(20, 40)]], + [global_flat_indices(ddpr) for ddpr in dist0.get_dim_data_per_rank()]) + + dist1 = Distribution(self.context, (5, 8), ('b', 'n'), (2,), targets=[0,2]) + + self.assertSequenceEqual([[(0, 24)], [(24, 40)]], + [global_flat_indices(ddpr) for ddpr in dist1.get_dim_data_per_rank()]) + + dist2 = Distribution(self.context, (5, 8), ('b', 'b'), (2, 2), targets=[0,1,2,3]) + + self.assertSequenceEqual([[(0, 4), (8, 12), (16, 20)], + [(4, 8), (12, 16), (20, 24)], + [(24, 28), (32, 36)], + [(28, 32), (36, 40)]], + [global_flat_indices(ddpr) for ddpr in dist2.get_dim_data_per_rank()]) if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/distarray/localapi/localarray.py b/distarray/localapi/localarray.py index 0e4a2b85..806e0850 100644 --- a/distarray/localapi/localarray.py +++ b/distarray/localapi/localarray.py @@ -6,6 +6,9 @@ from __future__ import print_function, division +def mpi_print(*args, **kwargs): + from distarray.mpionly_utils import get_world_rank + print("[%d]" % get_world_rank(), *args, **kwargs) # --------------------------------------------------------------------------- # Imports @@ -27,6 +30,79 @@ def make_local_slices(local_arr, glb_indices): slices = tuple(slice(*inds) for inds in glb_indices) return local_arr.local_from_global(slices) +def ndim_from_flat(flat, strides): + res = [] + for st in strides: + res.append(flat // st) + flat %= st + return tuple(res) + +def flat_from_ndim(ndim, strides): + return sum(i * s for (i, s) in zip(ndim, strides)) + +def _accum(start, next): + return tuple(s * next for s in start) + (next,) + +def _get_strides(shape): + return reduce(_accum, tuple(shape[1:]) + (1,), ()) + +def _transform(local_distribution, glb_flat): + glb_strides = _get_strides(local_distribution.global_shape) + local_strides = _get_strides(local_distribution.local_shape) + glb_ndim_inds = ndim_from_flat(glb_flat, glb_strides) + local_ind = local_distribution.local_from_global(glb_ndim_inds, check_bounds=False) + local_flat = local_distribution.local_flat_from_local(local_ind) + return local_flat + +def _squeeze(accum, next): + last = accum[-1] + if not last: + return [next] + elif last[-1] != next[0]: + return accum + [next] + elif last[-1] == next[0]: + return accum[:-1] + [(last[0], next[-1])] + +def _condense(intervals): + intervals = reduce(_squeeze, intervals, [[]]) + return intervals + +def _massage_indices(local_distribution, glb_intervals): + local_flat_slices = [(_transform(local_distribution, i[0]), + _transform(local_distribution, i[1])) + for i in glb_intervals] + return _condense(local_flat_slices) + +def _mpi_dtype_from_intervals(npdtype, local_intervals): + blocklengths = [stop-start for (start, stop) in local_intervals] + displacements = [start for (start, _) in local_intervals] + mpidtype = MPI.__TypeDict__[np.sctype2char(npdtype)] + newtype = mpidtype.Create_indexed(blocklengths, displacements) + newtype.Commit() + return newtype + +def redistribute_general(comm, plan, la_from, la_to): + myrank = comm.Get_rank() + for dta in plan: + if dta['source_rank'] == dta['dest_rank'] == myrank: + # mpi_print("sending from source %d to dest %d" % (dta['source_rank'], dta['dest_rank'])) + from_intervals = _massage_indices(la_from.distribution, dta['indices']) + to_intervals = _massage_indices(la_to.distribution, dta['indices']) + from_dtype = _mpi_dtype_from_intervals(la_from.dtype, from_intervals) + to_dtype = _mpi_dtype_from_intervals(la_to.dtype, to_intervals) + comm.Sendrecv(sendbuf=[la_from.ndarray, 1, from_dtype], dest=myrank, + recvbuf=[la_to.ndarray, 1, to_dtype], source=myrank) + elif dta['source_rank'] == myrank: + # mpi_print("sending from source %d to dest %d" % (dta['source_rank'], dta['dest_rank'])) + from_intervals = _massage_indices(la_from.distribution, dta['indices']) + from_dtype = _mpi_dtype_from_intervals(la_from.dtype, from_intervals) + comm.Send([la_from.ndarray, 1, from_dtype], dest=dta['dest_rank']) + elif dta['dest_rank'] == myrank: + # mpi_print("receiving from source %d to dest %d" % (dta['source_rank'], dta['dest_rank'])) + to_intervals = _massage_indices(la_to.distribution, dta['indices']) + to_dtype = _mpi_dtype_from_intervals(la_to.dtype, to_intervals) + comm.Recv([la_to.ndarray, 1, to_dtype], source=dta['source_rank']) + def redistribute(comm, plan, la_from, la_to): myrank = comm.Get_rank() for dta in plan: diff --git a/distarray/localapi/maps.py b/distarray/localapi/maps.py index 8e7f8eb7..0fcb7809 100644 --- a/distarray/localapi/maps.py +++ b/distarray/localapi/maps.py @@ -144,13 +144,14 @@ def coords_from_rank(self, rank): def rank_from_coords(self, coords): return self.comm.Get_cart_rank(coords) - def local_from_global(self, global_ind): + def local_from_global(self, global_ind, check_bounds=True): """ Given `global_ind` indices, translate into local indices.""" - _, idxs = sanitize_indices(global_ind, self.ndim, self.global_shape) + if check_bounds: + _, idxs = sanitize_indices(global_ind, self.ndim, self.global_shape) local_idxs = [] for m, idx in zip(self._maps, global_ind): if isinstance(idx, Integral): - local_idxs.append(m.local_from_global_index(idx)) + local_idxs.append(m.local_from_global_index(idx, check_bounds=check_bounds)) elif isinstance(idx, slice): local_idxs.append(m.local_from_global_slice(idx)) else: @@ -169,6 +170,12 @@ def global_from_local(self, local_ind): raise TypeError("Index must be Integral or slice.") return tuple(global_idxs) + def local_flat_from_local(self, local_ind): + local_strides = _get_strides(self.local_shape) + def flatten(idx, strides): + return sum(a * b for (a, b) in zip(idx, strides)) + return flatten(local_ind, local_strides) + def map_from_dim_dict(dd): """ Factory function that returns a 1D map for a given dimension @@ -204,6 +211,12 @@ def map_from_dim_dict(dd): raise ValueError("Unsupported dist_type of %r" % dist_type) +def _accum(start, next): + return tuple(s * next for s in start) + (next,) + +def _get_strides(shape): + return reduce(_accum, tuple(shape[1:]) + (1,), ()) + class MapBase(object): """ Base class for all one dimensional Map classes. @@ -225,8 +238,8 @@ def __init__(self, global_size, grid_size, grid_rank, start, stop): self.grid_size = grid_size self.grid_rank = grid_rank - def local_from_global_index(self, gidx): - if gidx < self.start or gidx >= self.stop: + def local_from_global_index(self, gidx, check_bounds=True): + if check_bounds and (gidx < self.start or gidx >= self.stop): raise IndexError("Global index %s out of bounds" % gidx) return gidx - self.start @@ -303,8 +316,8 @@ def __init__(self, global_size, grid_size, grid_rank, start): self.local_size = (global_size - 1 - grid_rank) // grid_size + 1 self.global_size = global_size - def local_from_global_index(self, gidx): - if (gidx - self.start) % self.grid_size: + def local_from_global_index(self, gidx, check_bounds=True): + if check_bounds and ((gidx - self.start) % self.grid_size): raise IndexError("Global index %s out of bounds" % gidx) return (gidx - self.start) // self.grid_size @@ -360,9 +373,9 @@ def __init__(self, global_size, grid_size, grid_rank, start, block_size): self.local_size = local_nblocks * block_size + local_partial self.global_size = global_size - def local_from_global_index(self, gidx): + def local_from_global_index(self, gidx, check_bounds=True): global_block, offset = divmod(gidx, self.block_size) - if (global_block - self.start_block) % self.grid_size: + if check_bounds and ((global_block - self.start_block) % self.grid_size): raise IndexError("Global index %s out of bounds" % gidx) return self.block_size * ((global_block - self.start_block) // self.grid_size) + offset @@ -422,7 +435,7 @@ def __init__(self, global_size, grid_size, grid_rank, indices): local_indices = range(self.local_size) self._local_index = dict(zip(self.indices, local_indices)) - def local_from_global_index(self, gidx): + def local_from_global_index(self, gidx, check_bounds=True): try: lidx = self._local_index[gidx] except KeyError: From 14af63a9f477fdd94117dd2aeeef7e56ac9f16d0 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Wed, 6 Aug 2014 13:05:34 -0500 Subject: [PATCH 15/27] Uncomment test, skip it instead. --- distarray/globalapi/tests/test_distarray.py | 29 +++++++++++---------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/distarray/globalapi/tests/test_distarray.py b/distarray/globalapi/tests/test_distarray.py index 359e886d..2d2c49ec 100644 --- a/distarray/globalapi/tests/test_distarray.py +++ b/distarray/globalapi/tests/test_distarray.py @@ -1117,24 +1117,25 @@ def test_redist_reshape_disjoint_targets(self): assert_array_equal(da_dest.tondarray(), expected) - # def test_redist_reshape_big(self): - # three_dee_shape = (13, 17, 23) - # two_dee_shape = (13 * 23, 17) - # NN = 13 * 17 * 23 - # dist0 = Distribution(self.context, three_dee_shape, ('b', 'b', 'n'), (2, 2)) - # dist1 = Distribution(self.context, two_dee_shape, ('b', 'b'), (2, 2)) - - # da_src = self.context.empty(dist0) - # da_src.fill(47) + @unittest.skip("Not working yet, and hangs...") + def test_redist_reshape_big(self): + three_dee_shape = (13, 17, 23) + two_dee_shape = (13 * 23, 17) + NN = 13 * 17 * 23 + dist0 = Distribution(self.context, three_dee_shape, ('b', 'b', 'n'), (2, 2)) + dist1 = Distribution(self.context, two_dee_shape, ('b', 'b'), (2, 2)) + + da_src = self.context.empty(dist0) + da_src.fill(47) - # da_dest = da_src.distribute_as(dist1) + da_dest = da_src.distribute_as(dist1) - # self.assertTrue(da_dest.distribution.is_compatible(dist1)) + self.assertTrue(da_dest.distribution.is_compatible(dist1)) - # expected = numpy.empty(two_dee_shape) - # expected.fill(47) + expected = numpy.empty(two_dee_shape) + expected.fill(47) - # assert_array_equal(da_dest.tondarray(), expected) + assert_array_equal(da_dest.tondarray(), expected) def test_global_flat_indices(self): dist0 = Distribution(self.context, (40,), ('b',), (2,), targets=[1,3]) From 6470fd44a48c254d02d99192e32a9f97dee57540 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Wed, 6 Aug 2014 16:09:35 -0500 Subject: [PATCH 16/27] Fix `reduce` imports for Py3. --- distarray/localapi/localarray.py | 2 +- distarray/localapi/maps.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/distarray/localapi/localarray.py b/distarray/localapi/localarray.py index 806e0850..0d45cedd 100644 --- a/distarray/localapi/localarray.py +++ b/distarray/localapi/localarray.py @@ -18,7 +18,7 @@ def mpi_print(*args, **kwargs): import numpy as np from distarray.externals import six -from distarray.externals.six.moves import zip +from distarray.externals.six.moves import zip, reduce from distarray.metadata_utils import sanitize_indices diff --git a/distarray/localapi/maps.py b/distarray/localapi/maps.py index 0fcb7809..8b19fe5c 100644 --- a/distarray/localapi/maps.py +++ b/distarray/localapi/maps.py @@ -25,7 +25,7 @@ from numbers import Integral import numpy as np -from distarray.externals.six.moves import range, zip +from distarray.externals.six.moves import range, zip, reduce from distarray.localapi import construct from distarray.metadata_utils import (make_grid_shape, normalize_grid_shape, From d3ef7ae34c4f042b2593c5c76b3915695d4b7d1f Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Wed, 6 Aug 2014 17:55:07 -0500 Subject: [PATCH 17/27] More general redistribution. --- Makefile | 8 ++++---- distarray/globalapi/tests/test_distarray.py | 9 ++++----- distarray/localapi/localarray.py | 5 +++-- distarray/localapi/maps.py | 21 ++++++++++----------- 4 files changed, 21 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index 4068a11d..dc598cc0 100644 --- a/Makefile +++ b/Makefile @@ -60,9 +60,9 @@ install: # Testing-related targets. # ---------------------------------------------------------------------------- -test_client: +test_ipython: ${PYTHON} -m unittest discover -c -.PHONY: test_client +.PHONY: test_ipython test_client_with_coverage: ${COVERAGE} run -pm unittest discover -cv @@ -95,10 +95,10 @@ test_mpi_with_coverage: ${MPI_ONLY_LAUNCH_TEST} .PHONY: test_mpi_with_coverage -test: test_client test_engines test_mpi +test: test_ipython test_mpi test_engines .PHONY: test -test_with_coverage: test_client_with_coverage test_engines_with_coverage test_mpi_with_coverage +test_with_coverage: test_client_with_coverage test_mpi_with_coverage test_engines_with_coverage .PHONY: test_with_coverage coverage_report: diff --git a/distarray/globalapi/tests/test_distarray.py b/distarray/globalapi/tests/test_distarray.py index 2d2c49ec..1f906e0e 100644 --- a/distarray/globalapi/tests/test_distarray.py +++ b/distarray/globalapi/tests/test_distarray.py @@ -1117,11 +1117,10 @@ def test_redist_reshape_disjoint_targets(self): assert_array_equal(da_dest.tondarray(), expected) - @unittest.skip("Not working yet, and hangs...") - def test_redist_reshape_big(self): - three_dee_shape = (13, 17, 23) - two_dee_shape = (13 * 23, 17) - NN = 13 * 17 * 23 + def test_redist_reshape_three_dee(self): + three_dee_shape = (3, 5, 2) + two_dee_shape = (3 * 2, 5) + NN = 3 * 5 * 2 dist0 = Distribution(self.context, three_dee_shape, ('b', 'b', 'n'), (2, 2)) dist1 = Distribution(self.context, two_dee_shape, ('b', 'b'), (2, 2)) diff --git a/distarray/localapi/localarray.py b/distarray/localapi/localarray.py index 0d45cedd..79bfefae 100644 --- a/distarray/localapi/localarray.py +++ b/distarray/localapi/localarray.py @@ -50,7 +50,7 @@ def _transform(local_distribution, glb_flat): glb_strides = _get_strides(local_distribution.global_shape) local_strides = _get_strides(local_distribution.local_shape) glb_ndim_inds = ndim_from_flat(glb_flat, glb_strides) - local_ind = local_distribution.local_from_global(glb_ndim_inds, check_bounds=False) + local_ind = local_distribution.local_from_global(glb_ndim_inds) local_flat = local_distribution.local_flat_from_local(local_ind) return local_flat @@ -68,8 +68,9 @@ def _condense(intervals): return intervals def _massage_indices(local_distribution, glb_intervals): + # XXX: TODO: document why we do `-1)+1` below. local_flat_slices = [(_transform(local_distribution, i[0]), - _transform(local_distribution, i[1])) + _transform(local_distribution, i[1]-1)+1) for i in glb_intervals] return _condense(local_flat_slices) diff --git a/distarray/localapi/maps.py b/distarray/localapi/maps.py index 8b19fe5c..093800af 100644 --- a/distarray/localapi/maps.py +++ b/distarray/localapi/maps.py @@ -144,14 +144,13 @@ def coords_from_rank(self, rank): def rank_from_coords(self, coords): return self.comm.Get_cart_rank(coords) - def local_from_global(self, global_ind, check_bounds=True): + def local_from_global(self, global_ind): """ Given `global_ind` indices, translate into local indices.""" - if check_bounds: - _, idxs = sanitize_indices(global_ind, self.ndim, self.global_shape) + _, idxs = sanitize_indices(global_ind, self.ndim, self.global_shape) local_idxs = [] for m, idx in zip(self._maps, global_ind): if isinstance(idx, Integral): - local_idxs.append(m.local_from_global_index(idx, check_bounds=check_bounds)) + local_idxs.append(m.local_from_global_index(idx)) elif isinstance(idx, slice): local_idxs.append(m.local_from_global_slice(idx)) else: @@ -238,8 +237,8 @@ def __init__(self, global_size, grid_size, grid_rank, start, stop): self.grid_size = grid_size self.grid_rank = grid_rank - def local_from_global_index(self, gidx, check_bounds=True): - if check_bounds and (gidx < self.start or gidx >= self.stop): + def local_from_global_index(self, gidx): + if gidx < self.start or gidx >= self.stop: raise IndexError("Global index %s out of bounds" % gidx) return gidx - self.start @@ -316,8 +315,8 @@ def __init__(self, global_size, grid_size, grid_rank, start): self.local_size = (global_size - 1 - grid_rank) // grid_size + 1 self.global_size = global_size - def local_from_global_index(self, gidx, check_bounds=True): - if check_bounds and ((gidx - self.start) % self.grid_size): + def local_from_global_index(self, gidx): + if (gidx - self.start) % self.grid_size: raise IndexError("Global index %s out of bounds" % gidx) return (gidx - self.start) // self.grid_size @@ -373,9 +372,9 @@ def __init__(self, global_size, grid_size, grid_rank, start, block_size): self.local_size = local_nblocks * block_size + local_partial self.global_size = global_size - def local_from_global_index(self, gidx, check_bounds=True): + def local_from_global_index(self, gidx): global_block, offset = divmod(gidx, self.block_size) - if check_bounds and ((global_block - self.start_block) % self.grid_size): + if (global_block - self.start_block) % self.grid_size: raise IndexError("Global index %s out of bounds" % gidx) return self.block_size * ((global_block - self.start_block) // self.grid_size) + offset @@ -435,7 +434,7 @@ def __init__(self, global_size, grid_size, grid_rank, indices): local_indices = range(self.local_size) self._local_index = dict(zip(self.indices, local_indices)) - def local_from_global_index(self, gidx, check_bounds=True): + def local_from_global_index(self, gidx): try: lidx = self._local_index[gidx] except KeyError: From a67c843b9a12bd537c3c0946925fa4d456bcbe9f Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Wed, 6 Aug 2014 18:02:21 -0500 Subject: [PATCH 18/27] Cleanup a bit. --- distarray/localapi/localarray.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/distarray/localapi/localarray.py b/distarray/localapi/localarray.py index 79bfefae..fda4d38f 100644 --- a/distarray/localapi/localarray.py +++ b/distarray/localapi/localarray.py @@ -74,10 +74,11 @@ def _massage_indices(local_distribution, glb_intervals): for i in glb_intervals] return _condense(local_flat_slices) -def _mpi_dtype_from_intervals(npdtype, local_intervals): +def _mpi_dtype_from_intervals(larr, glb_intervals): + local_intervals = _massage_indices(larr.distribution, glb_intervals) blocklengths = [stop-start for (start, stop) in local_intervals] displacements = [start for (start, _) in local_intervals] - mpidtype = MPI.__TypeDict__[np.sctype2char(npdtype)] + mpidtype = MPI.__TypeDict__[np.sctype2char(larr.dtype)] newtype = mpidtype.Create_indexed(blocklengths, displacements) newtype.Commit() return newtype @@ -87,21 +88,17 @@ def redistribute_general(comm, plan, la_from, la_to): for dta in plan: if dta['source_rank'] == dta['dest_rank'] == myrank: # mpi_print("sending from source %d to dest %d" % (dta['source_rank'], dta['dest_rank'])) - from_intervals = _massage_indices(la_from.distribution, dta['indices']) - to_intervals = _massage_indices(la_to.distribution, dta['indices']) - from_dtype = _mpi_dtype_from_intervals(la_from.dtype, from_intervals) - to_dtype = _mpi_dtype_from_intervals(la_to.dtype, to_intervals) + from_dtype = _mpi_dtype_from_intervals(la_from, dta['indices']) + to_dtype = _mpi_dtype_from_intervals(la_to, dta['indices']) comm.Sendrecv(sendbuf=[la_from.ndarray, 1, from_dtype], dest=myrank, recvbuf=[la_to.ndarray, 1, to_dtype], source=myrank) elif dta['source_rank'] == myrank: # mpi_print("sending from source %d to dest %d" % (dta['source_rank'], dta['dest_rank'])) - from_intervals = _massage_indices(la_from.distribution, dta['indices']) - from_dtype = _mpi_dtype_from_intervals(la_from.dtype, from_intervals) + from_dtype = _mpi_dtype_from_intervals(la_from, dta['indices']) comm.Send([la_from.ndarray, 1, from_dtype], dest=dta['dest_rank']) elif dta['dest_rank'] == myrank: # mpi_print("receiving from source %d to dest %d" % (dta['source_rank'], dta['dest_rank'])) - to_intervals = _massage_indices(la_to.distribution, dta['indices']) - to_dtype = _mpi_dtype_from_intervals(la_to.dtype, to_intervals) + to_dtype = _mpi_dtype_from_intervals(la_to, dta['indices']) comm.Recv([la_to.ndarray, 1, to_dtype], source=dta['source_rank']) def redistribute(comm, plan, la_from, la_to): From e7b1a23245b686d1545f0002d71de7b23c0266bc Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Wed, 6 Aug 2014 18:19:10 -0500 Subject: [PATCH 19/27] Fix bug with NoDist dimensions and redistribution. --- distarray/globalapi/maps.py | 9 +++++++-- distarray/globalapi/tests/test_distarray.py | 20 +++++++++++++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/distarray/globalapi/maps.py b/distarray/globalapi/maps.py index b38b1376..0632814b 100644 --- a/distarray/globalapi/maps.py +++ b/distarray/globalapi/maps.py @@ -947,6 +947,11 @@ def comm_union(self, *dists): def global_flat_indices(dim_data): # TODO: FIXME: can be optimized when the last dimension is 'n'. + for dd in dim_data: + if dd['dist_type'] == 'n': + dd['start'] = 0 + dd['stop'] = dd['size'] + glb_shape = tuple(dd['size'] for dd in dim_data) def accum(start, next): @@ -955,8 +960,8 @@ def accum(start, next): glb_strides = reduce(accum, tuple(glb_shape[1:]) + (1,), ()) ranges = [range(dd['start'], dd['stop']) for dd in dim_data[:-1]] - start_ranges = ranges + [[dim_data[-1].get('start', 0)]] - stop_ranges = ranges + [[dim_data[-1].get('stop', dim_data[-1]['size'])]] + start_ranges = ranges + [[dim_data[-1]['start']]] + stop_ranges = ranges + [[dim_data[-1]['stop']]] def flatten(idx): return sum(a * b for (a, b) in zip(idx, glb_strides)) diff --git a/distarray/globalapi/tests/test_distarray.py b/distarray/globalapi/tests/test_distarray.py index 1f906e0e..65133c6c 100644 --- a/distarray/globalapi/tests/test_distarray.py +++ b/distarray/globalapi/tests/test_distarray.py @@ -1120,7 +1120,6 @@ def test_redist_reshape_disjoint_targets(self): def test_redist_reshape_three_dee(self): three_dee_shape = (3, 5, 2) two_dee_shape = (3 * 2, 5) - NN = 3 * 5 * 2 dist0 = Distribution(self.context, three_dee_shape, ('b', 'b', 'n'), (2, 2)) dist1 = Distribution(self.context, two_dee_shape, ('b', 'b'), (2, 2)) @@ -1136,6 +1135,25 @@ def test_redist_reshape_three_dee(self): assert_array_equal(da_dest.tondarray(), expected) + def test_redist_reshape_big(self): + three_dee_shape = (13, 17, 19) + two_dee_shape = (13, 19 * 17) + + dist0 = Distribution(self.context, three_dee_shape, ('b', 'n', 'b'), (2, 1, 2)) + dist1 = Distribution(self.context, two_dee_shape, ('b', 'b'), (2, 2)) + + da_src = self.context.empty(dist0) + da_src.fill(47) + + da_dest = da_src.distribute_as(dist1) + + self.assertTrue(da_dest.distribution.is_compatible(dist1)) + + expected = numpy.empty(two_dee_shape) + expected.fill(47) + + assert_array_equal(da_dest.tondarray(), expected) + def test_global_flat_indices(self): dist0 = Distribution(self.context, (40,), ('b',), (2,), targets=[1,3]) From f6f4a5c0b36256290067e77ff355b62528e0036d Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Wed, 6 Aug 2014 18:24:02 -0500 Subject: [PATCH 20/27] Put in TODOs for cleanup / refactoring. --- distarray/globalapi/maps.py | 1 + distarray/localapi/localarray.py | 17 +++++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/distarray/globalapi/maps.py b/distarray/globalapi/maps.py index 0632814b..91375cfe 100644 --- a/distarray/globalapi/maps.py +++ b/distarray/globalapi/maps.py @@ -971,6 +971,7 @@ def flatten(idx): intervals = zip(starts, stops) + # TODO: move to common place. Code duplication!!! def squeeze(accum, next): last = accum[-1] if not last: diff --git a/distarray/localapi/localarray.py b/distarray/localapi/localarray.py index fda4d38f..5889bfe8 100644 --- a/distarray/localapi/localarray.py +++ b/distarray/localapi/localarray.py @@ -26,10 +26,7 @@ def mpi_print(*args, **kwargs): from distarray.localapi import format, maps from distarray.localapi.error import InvalidDimensionError, IncompatibleArrayError -def make_local_slices(local_arr, glb_indices): - slices = tuple(slice(*inds) for inds in glb_indices) - return local_arr.local_from_global(slices) - +# TODO: move to common place. def ndim_from_flat(flat, strides): res = [] for st in strides: @@ -37,15 +34,19 @@ def ndim_from_flat(flat, strides): flat %= st return tuple(res) +# TODO: move to common place. def flat_from_ndim(ndim, strides): return sum(i * s for (i, s) in zip(ndim, strides)) +# TODO: move to common place. def _accum(start, next): return tuple(s * next for s in start) + (next,) +# TODO: move to common place. def _get_strides(shape): return reduce(_accum, tuple(shape[1:]) + (1,), ()) +# TODO: move to common place. def _transform(local_distribution, glb_flat): glb_strides = _get_strides(local_distribution.global_shape) local_strides = _get_strides(local_distribution.local_shape) @@ -54,6 +55,7 @@ def _transform(local_distribution, glb_flat): local_flat = local_distribution.local_flat_from_local(local_ind) return local_flat +# TODO: move to common place. def _squeeze(accum, next): last = accum[-1] if not last: @@ -63,10 +65,12 @@ def _squeeze(accum, next): elif last[-1] == next[0]: return accum[:-1] + [(last[0], next[-1])] +# TODO: move to common place. def _condense(intervals): intervals = reduce(_squeeze, intervals, [[]]) return intervals +# TODO: move to common place. def _massage_indices(local_distribution, glb_intervals): # XXX: TODO: document why we do `-1)+1` below. local_flat_slices = [(_transform(local_distribution, i[0]), @@ -101,6 +105,11 @@ def redistribute_general(comm, plan, la_from, la_to): to_dtype = _mpi_dtype_from_intervals(la_to, dta['indices']) comm.Recv([la_to.ndarray, 1, to_dtype], source=dta['source_rank']) + +def make_local_slices(local_arr, glb_indices): + slices = tuple(slice(*inds) for inds in glb_indices) + return local_arr.local_from_global(slices) + def redistribute(comm, plan, la_from, la_to): myrank = comm.Get_rank() for dta in plan: From d164ed8619e1bc39a11607df39cb6449a0d61123 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Mon, 1 Sep 2014 11:38:39 -0500 Subject: [PATCH 21/27] Redistribution cleanup and refactoring. --- distarray/globalapi/maps.py | 70 ++++++++++++++++---------------- distarray/localapi/localarray.py | 51 ++++------------------- distarray/metadata_utils.py | 31 ++++++++++++++ 3 files changed, 73 insertions(+), 79 deletions(-) diff --git a/distarray/globalapi/maps.py b/distarray/globalapi/maps.py index 91375cfe..4813dc64 100644 --- a/distarray/globalapi/maps.py +++ b/distarray/globalapi/maps.py @@ -41,7 +41,9 @@ sanitize_indices, _start_stop_block, tuple_intersection, - shapes_from_dim_data_per_rank) + shapes_from_dim_data_per_rank, + condense, + strides_from_shape) def _dedup_dim_dicts(dim_dicts): @@ -876,6 +878,15 @@ def view(self, new_dimsize=None): def localshapes(self): return shapes_from_dim_data_per_rank(self.get_dim_data_per_rank()) + + def comm_union(self, *dists): + dist_targets = [d.targets for d in dists] + all_targets = sorted(reduce(set.union, dist_targets, set(self.targets))) + return self.context.make_subcomm(all_targets), all_targets + + # ------------------------------------------------------------------------ + # Redistribution + # ------------------------------------------------------------------------ @staticmethod def _redist_intersection_same_shape(source_dimdata, dest_dimdata): @@ -883,7 +894,8 @@ def _redist_intersection_same_shape(source_dimdata, dest_dimdata): intersections = [] for source_dimdict, dest_dimdict in zip(source_dimdata, dest_dimdata): - if not (source_dimdict['dist_type'] == dest_dimdict['dist_type'] == 'b'): + if not (source_dimdict['dist_type'] == + dest_dimdict['dist_type'] == 'b'): raise ValueError("Only 'b' dist_type supported") source_idxs = source_dimdict['start'], source_dimdict['stop'] @@ -897,7 +909,7 @@ def _redist_intersection_same_shape(source_dimdata, dest_dimdata): def _redist_intersection_reshape(source_dimdata, dest_dimdata): source_flat = global_flat_indices(source_dimdata) dest_flat = global_flat_indices(dest_dimdata) - return global_flat_indices_intersection(source_flat, dest_flat) + return _global_flat_indices_intersection(source_flat, dest_flat) def get_redist_plan(self, other_dist): # Get all targets @@ -906,11 +918,15 @@ def get_redist_plan(self, other_dist): source_ranks = range(len(self.targets)) source_targets = self.targets - union_rank_from_source_rank = {sr: union_rank_from_target[st] for (sr, st) in zip(source_ranks, source_targets)} + union_rank_from_source_rank = {sr: union_rank_from_target[st] + for (sr, st) in + zip(source_ranks, source_targets)} dest_ranks = range(len(other_dist.targets)) dest_targets = other_dist.targets - union_rank_from_dest_rank = {sr: union_rank_from_target[st] for (sr, st) in zip(dest_ranks, dest_targets)} + union_rank_from_dest_rank = {sr: union_rank_from_target[st] + for (sr, st) in + zip(dest_ranks, dest_targets)} source_ddpr = self.get_dim_data_per_rank() dest_ddpr = other_dist.get_dim_data_per_rank() @@ -929,20 +945,19 @@ def get_redist_plan(self, other_dist): source_rank = self.rank_from_coords[source_coords] dest_coords = tuple(dd['proc_grid_rank'] for dd in dest_dd) dest_rank = other_dist.rank_from_coords[dest_coords] - plan.append( - { - 'source_rank': union_rank_from_source_rank[source_rank], - 'dest_rank': union_rank_from_dest_rank[dest_rank], - 'indices': intersections, - } - ) + plan.append({ + 'source_rank': union_rank_from_source_rank[source_rank], + 'dest_rank': union_rank_from_dest_rank[dest_rank], + 'indices': intersections, + } + ) return plan - def comm_union(self, *dists): - all_targets = sorted(reduce(set.union, [d.targets for d in dists], set(self.targets))) - return self.context.make_subcomm(all_targets), all_targets +# ---------------------------------------------------------------------------- +# Redistribution helper functions. +# ---------------------------------------------------------------------------- def global_flat_indices(dim_data): # TODO: FIXME: can be optimized when the last dimension is 'n'. @@ -953,11 +968,7 @@ def global_flat_indices(dim_data): dd['stop'] = dd['size'] glb_shape = tuple(dd['size'] for dd in dim_data) - - def accum(start, next): - return tuple(s * next for s in start) + (next,) - - glb_strides = reduce(accum, tuple(glb_shape[1:]) + (1,), ()) + glb_strides = strides_from_shape(glb_shape) ranges = [range(dd['start'], dd['stop']) for dd in dim_data[:-1]] start_ranges = ranges + [[dim_data[-1]['start']]] @@ -970,20 +981,9 @@ def flatten(idx): stops = map(flatten, product(*stop_ranges)) intervals = zip(starts, stops) + return condense(intervals) - # TODO: move to common place. Code duplication!!! - def squeeze(accum, next): - last = accum[-1] - if not last: - return [next] - elif last[-1] != next[0]: - return accum + [next] - elif last[-1] == next[0]: - return accum[:-1] + [(last[0], next[-1])] - - intervals = reduce(squeeze, intervals, [[]]) - return intervals - -def global_flat_indices_intersection(gfis0, gfis1): - intersections = filter(None, [tuple_intersection(a, b) for (a, b) in product(gfis0, gfis1)]) +def _global_flat_indices_intersection(gfis0, gfis1): + intersections = filter(None, [tuple_intersection(a, b) + for (a, b) in product(gfis0, gfis1)]) return [i[:2] for i in intersections] diff --git a/distarray/localapi/localarray.py b/distarray/localapi/localarray.py index dac2a697..432f86e8 100644 --- a/distarray/localapi/localarray.py +++ b/distarray/localapi/localarray.py @@ -27,63 +27,30 @@ def mpi_print(*args, **kwargs): from distarray.externals import six from distarray.externals.six.moves import zip, reduce -from distarray.metadata_utils import sanitize_indices +from distarray.metadata_utils import sanitize_indices, strides_from_shape, ndim_from_flat, condense from distarray.localapi.mpiutils import MPI from distarray.localapi import format, maps from distarray.localapi.error import InvalidDimensionError, IncompatibleArrayError -# TODO: move to common place. -def ndim_from_flat(flat, strides): - res = [] - for st in strides: - res.append(flat // st) - flat %= st - return tuple(res) +# ---------------------------------------------------------------------------- +# Local Redistribution functions +# ---------------------------------------------------------------------------- -# TODO: move to common place. -def flat_from_ndim(ndim, strides): - return sum(i * s for (i, s) in zip(ndim, strides)) - -# TODO: move to common place. -def _accum(start, next): - return tuple(s * next for s in start) + (next,) - -# TODO: move to common place. -def _get_strides(shape): - return reduce(_accum, tuple(shape[1:]) + (1,), ()) - -# TODO: move to common place. def _transform(local_distribution, glb_flat): - glb_strides = _get_strides(local_distribution.global_shape) - local_strides = _get_strides(local_distribution.local_shape) + glb_strides = strides_from_shape(local_distribution.global_shape) + local_strides = strides_from_shape(local_distribution.local_shape) glb_ndim_inds = ndim_from_flat(glb_flat, glb_strides) local_ind = local_distribution.local_from_global(glb_ndim_inds) local_flat = local_distribution.local_flat_from_local(local_ind) return local_flat -# TODO: move to common place. -def _squeeze(accum, next): - last = accum[-1] - if not last: - return [next] - elif last[-1] != next[0]: - return accum + [next] - elif last[-1] == next[0]: - return accum[:-1] + [(last[0], next[-1])] - -# TODO: move to common place. -def _condense(intervals): - intervals = reduce(_squeeze, intervals, [[]]) - return intervals - -# TODO: move to common place. def _massage_indices(local_distribution, glb_intervals): # XXX: TODO: document why we do `-1)+1` below. local_flat_slices = [(_transform(local_distribution, i[0]), _transform(local_distribution, i[1]-1)+1) for i in glb_intervals] - return _condense(local_flat_slices) + return condense(local_flat_slices) def _mpi_dtype_from_intervals(larr, glb_intervals): local_intervals = _massage_indices(larr.distribution, glb_intervals) @@ -98,21 +65,17 @@ def redistribute_general(comm, plan, la_from, la_to): myrank = comm.Get_rank() for dta in plan: if dta['source_rank'] == dta['dest_rank'] == myrank: - # mpi_print("sending from source %d to dest %d" % (dta['source_rank'], dta['dest_rank'])) from_dtype = _mpi_dtype_from_intervals(la_from, dta['indices']) to_dtype = _mpi_dtype_from_intervals(la_to, dta['indices']) comm.Sendrecv(sendbuf=[la_from.ndarray, 1, from_dtype], dest=myrank, recvbuf=[la_to.ndarray, 1, to_dtype], source=myrank) elif dta['source_rank'] == myrank: - # mpi_print("sending from source %d to dest %d" % (dta['source_rank'], dta['dest_rank'])) from_dtype = _mpi_dtype_from_intervals(la_from, dta['indices']) comm.Send([la_from.ndarray, 1, from_dtype], dest=dta['dest_rank']) elif dta['dest_rank'] == myrank: - # mpi_print("receiving from source %d to dest %d" % (dta['source_rank'], dta['dest_rank'])) to_dtype = _mpi_dtype_from_intervals(la_to, dta['indices']) comm.Recv([la_to.ndarray, 1, to_dtype], source=dta['source_rank']) - def make_local_slices(local_arr, glb_indices): slices = tuple(slice(*inds) for inds in glb_indices) return local_arr.local_from_global(slices) diff --git a/distarray/metadata_utils.py b/distarray/metadata_utils.py index 02185efe..38f1d82b 100644 --- a/distarray/metadata_utils.py +++ b/distarray/metadata_utils.py @@ -529,3 +529,34 @@ def shapes_from_dim_data_per_rank(ddpr): # ddpr = dim_data_per_rank shape.append(size_from_dim_data(dd)) shape_list.append(tuple(shape)) return shape_list + +# ---------------------------------------------------------------------------- +# Redistribution-related utilities. +# ---------------------------------------------------------------------------- + +def _accum(start, next): + return tuple(s * next for s in start) + (next,) + +def strides_from_shape(shape): + return reduce(_accum, tuple(shape[1:]) + (1,), ()) + +def ndim_from_flat(flat, strides): + res = [] + for st in strides: + res.append(flat // st) + flat %= st + return tuple(res) + +def _squeeze(accum, next): + last = accum[-1] + if not last: + return [next] + elif last[-1] != next[0]: + return accum + [next] + elif last[-1] == next[0]: + return accum[:-1] + [(last[0], next[-1])] + +def condense(intervals): + intervals = reduce(_squeeze, intervals, [[]]) + return intervals + From 7d5e431b3895f6f7820cff57aff551ed3e5b60c6 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Mon, 1 Sep 2014 11:42:24 -0500 Subject: [PATCH 22/27] Refactor redistribution tests. --- distarray/globalapi/tests/test_distarray.py | 40 +++++++++++---------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/distarray/globalapi/tests/test_distarray.py b/distarray/globalapi/tests/test_distarray.py index 65133c6c..194bc231 100644 --- a/distarray/globalapi/tests/test_distarray.py +++ b/distarray/globalapi/tests/test_distarray.py @@ -1065,6 +1065,28 @@ def test_redist_2D(self): dest_da = source_da.distribute_as(dest_dist) assert_array_equal(source_da.tondarray(), dest_da.tondarray()) + +class TestReshapeRedistribution(DefaultContextTestCase): + + def test_global_flat_indices(self): + dist0 = Distribution(self.context, (40,), ('b',), (2,), targets=[1,3]) + + self.assertSequenceEqual([[(0, 20)], [(20, 40)]], + [global_flat_indices(ddpr) for ddpr in dist0.get_dim_data_per_rank()]) + + dist1 = Distribution(self.context, (5, 8), ('b', 'n'), (2,), targets=[0,2]) + + self.assertSequenceEqual([[(0, 24)], [(24, 40)]], + [global_flat_indices(ddpr) for ddpr in dist1.get_dim_data_per_rank()]) + + dist2 = Distribution(self.context, (5, 8), ('b', 'b'), (2, 2), targets=[0,1,2,3]) + + self.assertSequenceEqual([[(0, 4), (8, 12), (16, 20)], + [(4, 8), (12, 16), (20, 24)], + [(24, 28), (32, 36)], + [(28, 32), (36, 40)]], + [global_flat_indices(ddpr) for ddpr in dist2.get_dim_data_per_rank()]) + def test_redist_reshape_same_target(self): dist0 = Distribution(self.context, (40,), ('b',), (1,), targets=[1]) dist1 = Distribution(self.context, (5, 8), ('b', 'n'), (1,), targets=[1]) @@ -1154,24 +1176,6 @@ def test_redist_reshape_big(self): assert_array_equal(da_dest.tondarray(), expected) - def test_global_flat_indices(self): - dist0 = Distribution(self.context, (40,), ('b',), (2,), targets=[1,3]) - - self.assertSequenceEqual([[(0, 20)], [(20, 40)]], - [global_flat_indices(ddpr) for ddpr in dist0.get_dim_data_per_rank()]) - - dist1 = Distribution(self.context, (5, 8), ('b', 'n'), (2,), targets=[0,2]) - - self.assertSequenceEqual([[(0, 24)], [(24, 40)]], - [global_flat_indices(ddpr) for ddpr in dist1.get_dim_data_per_rank()]) - - dist2 = Distribution(self.context, (5, 8), ('b', 'b'), (2, 2), targets=[0,1,2,3]) - - self.assertSequenceEqual([[(0, 4), (8, 12), (16, 20)], - [(4, 8), (12, 16), (20, 24)], - [(24, 28), (32, 36)], - [(28, 32), (36, 40)]], - [global_flat_indices(ddpr) for ddpr in dist2.get_dim_data_per_rank()]) if __name__ == '__main__': unittest.main(verbosity=2) From 8c8bc3ee175c4471cf2c3b553b94902d88d70d9c Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Fri, 5 Sep 2014 11:47:07 -0500 Subject: [PATCH 23/27] Remove `default` argument to apply. Refactor `arg_kwarg_proxy_converter` into function in metadata_utils, add explanatory comment. --- distarray/globalapi/context.py | 32 ++++++++--------------------- distarray/metadata_utils.py | 37 ++++++++++++++++++++++++++++++++++ distarray/mpi_engine.py | 29 +++----------------------- 3 files changed, 48 insertions(+), 50 deletions(-) diff --git a/distarray/globalapi/context.py b/distarray/globalapi/context.py index 41753eb4..619c70e1 100644 --- a/distarray/globalapi/context.py +++ b/distarray/globalapi/context.py @@ -743,7 +743,7 @@ def _execute(self, lines, targets): def _push(self, d, targets): return self.view.push(d, targets=targets, block=True) - def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False, default=None): + def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): """ Analogous to IPython.parallel.view.apply_sync @@ -764,7 +764,7 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False, return a list of the results on the each engine. """ - def func_wrapper(func, apply_nonce, context_key, args, kwargs, autoproxyize, default): + def func_wrapper(func, apply_nonce, context_key, args, kwargs, autoproxyize): """ Function which calls the applied function after grabbing all the arguments on the engines that are passed in as names of the form @@ -772,6 +772,7 @@ def func_wrapper(func, apply_nonce, context_key, args, kwargs, autoproxyize, def """ from importlib import import_module import types + from distarray.metadata_utils import arg_kwarg_proxy_converter from distarray.localapi import LocalArray main = import_module('__main__') @@ -793,25 +794,8 @@ def func_wrapper(func, apply_nonce, context_key, args, kwargs, autoproxyize, def func = types.FunctionType(func_code, new_func_globals, func_name, func_defaults, func_closure) - # convert args - args = list(args) - for i, a in enumerate(args): - if isinstance(a, main.Proxy): - try: - args[i] = a.dereference() - except AttributeError: - args[i] = default - args = tuple(args) - - # convert kwargs - for k in kwargs.keys(): - val = kwargs[k] - if isinstance(val, main.Proxy): - try: - kwargs[k] = val.dereference() - except AttributeError: - kwargs[k] = default + args, kwargs = arg_kwarg_proxy_converter(args, kwargs) result = func(*args, **kwargs) if autoproxyize and isinstance(result, LocalArray): @@ -823,7 +807,7 @@ def func_wrapper(func, apply_nonce, context_key, args, kwargs, autoproxyize, def args = () if args is None else args kwargs = {} if kwargs is None else kwargs apply_nonce = nonce() - wrapped_args = (func, apply_nonce, self.context_key, args, kwargs, autoproxyize, default) + wrapped_args = (func, apply_nonce, self.context_key, args, kwargs, autoproxyize) targets = self.targets if targets is None else targets @@ -954,7 +938,7 @@ def _push(self, d, targets=None): msg = ('push', d) return self._send_msg(msg, targets=targets) - def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False, default=None): + def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): """ Analogous to IPython.parallel.view.apply_sync @@ -992,10 +976,10 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False, func_data = (func_code, func_name, func_defaults, func_closure) - msg = ('func_call', func_data, args, kwargs, apply_metadata, autoproxyize, default) + msg = ('func_call', func_data, args, kwargs, apply_metadata, autoproxyize) else: - msg = ('builtin_call', func, args, kwargs, autoproxyize, default) + msg = ('builtin_call', func, args, kwargs, autoproxyize) self._send_msg(msg, targets=targets) return self._recv_msg(targets=targets) diff --git a/distarray/metadata_utils.py b/distarray/metadata_utils.py index 38f1d82b..e865b631 100644 --- a/distarray/metadata_utils.py +++ b/distarray/metadata_utils.py @@ -560,3 +560,40 @@ def condense(intervals): intervals = reduce(_squeeze, intervals, [[]]) return intervals +# ---------------------------------------------------------------------------- +# `apply` related utilities. +# ---------------------------------------------------------------------------- + +def arg_kwarg_proxy_converter(args, kwargs, module_name='__main__'): + from importlib import import_module + + module = import_module(module_name) + # convert args + + # In some situations, like redistributing a DistArray from one set of + # targets to a disjoint set, the source and destination DistArrays (and + # associated LocalArrays) are in different communicators with different + # targets. In those cases, it is possible for a proxy object for one + # DistArray to not refer to anything on this target. In that case, + # `a.dereference()` raises an `AttributeError`. We intercept that here and + # assign `None` instead. + + args = list(args) + for i, a in enumerate(args): + if isinstance(a, module.Proxy): + try: + args[i] = a.dereference() + except AttributeError: + args[i] = None + args = tuple(args) + + # convert kwargs + for k in kwargs.keys(): + val = kwargs[k] + if isinstance(val, module.Proxy): + try: + kwargs[k] = val.dereference() + except AttributeError: + kwargs[k] = None + + return args, kwargs diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index 82145277..5193f40c 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -11,6 +11,7 @@ from importlib import import_module import types +from distarray.metadata_utils import arg_kwarg_proxy_converter from distarray.localapi import LocalArray from distarray.localapi.proxyize import Proxy @@ -42,28 +43,6 @@ def __init__(self): break Engine.INTERCOMM.Free() - def arg_kwarg_proxy_converter(self, args, kwargs, default): - module = import_module('__main__') - # convert args - args = list(args) - for i, a in enumerate(args): - if isinstance(a, module.Proxy): - try: - args[i] = a.dereference() - except AttributeError: - args[i] = default - args = tuple(args) - - # convert kwargs - for k in kwargs.keys(): - val = kwargs[k] - if isinstance(val, module.Proxy): - try: - kwargs[k] = val.dereference() - except AttributeError: - kwargs[k] = default - - return args, kwargs def is_engine(self): if self.world.rank != self.client_rank: @@ -105,12 +84,11 @@ def func_call(self, msg): kwargs = msg[3] nonce, context_key = msg[4] autoproxyize = msg[5] - default = msg[6] module = import_module('__main__') module.proxyize.set_state(nonce) - args, kwargs = self.arg_kwarg_proxy_converter(args, kwargs, default) + args, kwargs = arg_kwarg_proxy_converter(args, kwargs) new_func_globals = module.__dict__ # add proper proxyize, context_key new_func_globals.update({'proxyize': module.proxyize, @@ -159,9 +137,8 @@ def builtin_call(self, msg): func = msg[1] args = msg[2] kwargs = msg[3] - default = msg[4] - args, kwargs = self.arg_kwarg_proxy_converter(args, kwargs, default) + args, kwargs = arg_kwarg_proxy_converter(args, kwargs) res = func(*args, **kwargs) Engine.INTERCOMM.send(res, dest=self.client_rank) From 75921e11be23b009d69637b54274a017593f5ca7 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Fri, 5 Sep 2014 12:09:22 -0500 Subject: [PATCH 24/27] Adds docstrings. --- distarray/globalapi/distarray.py | 21 +++++++++++++++++++++ distarray/globalapi/maps.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/distarray/globalapi/distarray.py b/distarray/globalapi/distarray.py index e6cd1add..397e75e3 100644 --- a/distarray/globalapi/distarray.py +++ b/distarray/globalapi/distarray.py @@ -496,6 +496,27 @@ def local_view(larr, ddpr, dtype): dtype=dtype) def distribute_as(self, dist): + """ + Redistributes this DistArray, returning a new DistArray with the same + data and corresponding distribution. + + Parameters + ---------- + dist : distribution object + Distribution for the new DistArray. The new distribution must have + the same number of items as this distarray. The global shape and + targets may be different. + + Returns + ------- + DistArray + A new DistArray distributed according to `dist`. + + Note + ---- + Implemented for block and non-distributed dimensions only. + + """ plan = self.distribution.get_redist_plan(dist) ubercomm, all_targets = self.distribution.comm_union(dist) result = DistArray(dist, dtype=self.dtype) diff --git a/distarray/globalapi/maps.py b/distarray/globalapi/maps.py index 4813dc64..b0b237d3 100644 --- a/distarray/globalapi/maps.py +++ b/distarray/globalapi/maps.py @@ -554,7 +554,6 @@ def from_maps(cls, context, maps, targets=None): self.context = context self.targets = sorted(targets or context.targets) self._comm = None - # self.context.make_subcomm(self.targets) self.maps = maps self.shape = tuple(m.size for m in self.maps) self.ndim = len(self.maps) @@ -880,6 +879,20 @@ def localshapes(self): return shapes_from_dim_data_per_rank(self.get_dim_data_per_rank()) def comm_union(self, *dists): + """ + Make a communicator that includes the union of all targets in `dists`. + + Parameters + ---------- + dists: sequence of distribution objects. + + Returns + ------- + tuple + First element is encompassing communicator proxy; second is a + sequence of all targets in `dists`. + + """ dist_targets = [d.targets for d in dists] all_targets = sorted(reduce(set.union, dist_targets, set(self.targets))) return self.context.make_subcomm(all_targets), all_targets @@ -960,6 +973,20 @@ def get_redist_plan(self, other_dist): # ---------------------------------------------------------------------------- def global_flat_indices(dim_data): + """ + Return a list of tuples of indices into the flattened global array. + + Parameters + ---------- + dim_data: dimension dictionary. + + Returns + ------- + list of 2-tuples of ints. + Each tuple is a (start, stop) interval into the flattened global array. + All selected ranges comprise the indices for this dim_data's sub-array. + + """ # TODO: FIXME: can be optimized when the last dimension is 'n'. for dd in dim_data: From 1579b1bca89a6227f46e1af631c3b5c87a357b80 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Fri, 5 Sep 2014 12:29:39 -0500 Subject: [PATCH 25/27] Raise `ValueError` if attempting to redistribute to incompatible size. --- distarray/globalapi/distarray.py | 9 ++++++++- distarray/globalapi/tests/test_distarray.py | 11 +++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/distarray/globalapi/distarray.py b/distarray/globalapi/distarray.py index 397e75e3..33a86783 100644 --- a/distarray/globalapi/distarray.py +++ b/distarray/globalapi/distarray.py @@ -529,10 +529,17 @@ def _local_redistribute_general(comm, plan, la_from, la_to): from distarray.localapi import redistribute_general redistribute_general(comm, plan, la_from, la_to) + source_size = self.global_size + dest_size = reduce(operator.mul, dist.shape, 1) + if self.distribution.shape == dist.shape: _local_redistribute = _local_redistribute_same_shape - else: + elif source_size == dest_size: _local_redistribute = _local_redistribute_general + else: + msg = ("Original size %d != new size %d," + " and total size of new array must be unchanged.") + raise ValueError(msg % (source_size, dest_size)) self.context.apply(_local_redistribute, (ubercomm, plan, self.key, result.key), targets=all_targets) diff --git a/distarray/globalapi/tests/test_distarray.py b/distarray/globalapi/tests/test_distarray.py index 194bc231..88baf313 100644 --- a/distarray/globalapi/tests/test_distarray.py +++ b/distarray/globalapi/tests/test_distarray.py @@ -1065,6 +1065,17 @@ def test_redist_2D(self): dest_da = source_da.distribute_as(dest_dist) assert_array_equal(source_da.tondarray(), dest_da.tondarray()) + def test_redist_incompatible_sizes(self): + source_da = self.context.empty((10,), dtype=numpy.int32) + + with self.assertRaises(ValueError): + source_da.distribute_as(Distribution(self.context, (9,))) + + with self.assertRaises(ValueError): + source_da.distribute_as(Distribution(self.context, (3, 4))) + + with self.assertRaises(ValueError): + source_da.distribute_as(Distribution(self.context, (3, 4000))) class TestReshapeRedistribution(DefaultContextTestCase): From 4d3c8058ea5991a8c14a87192b6121845b2a89ba Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Fri, 5 Sep 2014 12:46:20 -0500 Subject: [PATCH 26/27] Raise `NotImplementedError` in some redistribution cases. Also allow `distribute_as` to take `shape_or_dist`. --- distarray/globalapi/distarray.py | 25 ++++++++++++++------- distarray/globalapi/tests/test_distarray.py | 14 ++++++++++++ 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/distarray/globalapi/distarray.py b/distarray/globalapi/distarray.py index 33a86783..35d456bd 100644 --- a/distarray/globalapi/distarray.py +++ b/distarray/globalapi/distarray.py @@ -24,7 +24,7 @@ import distarray.localapi from distarray.metadata_utils import sanitize_indices -from distarray.globalapi.maps import Distribution +from distarray.globalapi.maps import Distribution, asdistribution from distarray.utils import _raise_nie from distarray.metadata_utils import normalize_reduction_axes @@ -495,17 +495,18 @@ def local_view(larr, ddpr, dtype): return DistArray.from_localarrays(key=new_key, distribution=new_dist, dtype=dtype) - def distribute_as(self, dist): + def distribute_as(self, shape_or_dist): """ Redistributes this DistArray, returning a new DistArray with the same data and corresponding distribution. Parameters ---------- - dist : distribution object + shape_or_dist : shape tuple or Distribution object. Distribution for the new DistArray. The new distribution must have the same number of items as this distarray. The global shape and - targets may be different. + targets may be different. If shape tuple, immediately converted to + a Distribution object with default parameters. Returns ------- @@ -514,12 +515,16 @@ def distribute_as(self, dist): Note ---- - Implemented for block and non-distributed dimensions only. + Currently implemented for block and non-distributed maps only. """ - plan = self.distribution.get_redist_plan(dist) - ubercomm, all_targets = self.distribution.comm_union(dist) - result = DistArray(dist, dtype=self.dtype) + + dist = asdistribution(self.context, shape_or_dist) + + if (any(d not in ('b', 'n') for d in self.distribution.dist) or + any(d not in ('b', 'n') for d in dist.dist)): + msg = "Only block and non-distributed dimensions currently supported." + raise NotImplementedError(msg) def _local_redistribute_same_shape(comm, plan, la_from, la_to): from distarray.localapi import redistribute @@ -541,6 +546,10 @@ def _local_redistribute_general(comm, plan, la_from, la_to): " and total size of new array must be unchanged.") raise ValueError(msg % (source_size, dest_size)) + plan = self.distribution.get_redist_plan(dist) + ubercomm, all_targets = self.distribution.comm_union(dist) + result = DistArray(dist, dtype=self.dtype) + self.context.apply(_local_redistribute, (ubercomm, plan, self.key, result.key), targets=all_targets) return result diff --git a/distarray/globalapi/tests/test_distarray.py b/distarray/globalapi/tests/test_distarray.py index 88baf313..3bacdd70 100644 --- a/distarray/globalapi/tests/test_distarray.py +++ b/distarray/globalapi/tests/test_distarray.py @@ -1077,6 +1077,20 @@ def test_redist_incompatible_sizes(self): with self.assertRaises(ValueError): source_da.distribute_as(Distribution(self.context, (3, 4000))) + def test_redist_unsupported_dist_types(self): + source_dist = Distribution(self.context, (10, 20), ('n', 'c')) + source_da = self.context.empty(source_dist) + + with self.assertRaises(NotImplementedError): + source_da.distribute_as(source_da.distribution) + + source_da = self.context.empty((10, 20), ('b', 'b')) + dest_dist = Distribution(self.context, (10, 20), ('b', 'c')) + + with self.assertRaises(NotImplementedError): + source_da.distribute_as(dest_dist) + + class TestReshapeRedistribution(DefaultContextTestCase): def test_global_flat_indices(self): From f61e439fb5a72b45e0afd258349804d1f0aaa666 Mon Sep 17 00:00:00 2001 From: Kurt Smith Date: Fri, 5 Sep 2014 13:30:45 -0500 Subject: [PATCH 27/27] Fix target renaming in Makefile. --- Makefile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index c58e5aff..1f20f4b2 100644 --- a/Makefile +++ b/Makefile @@ -64,9 +64,9 @@ test_ipython: ${PYTHON} -m unittest discover -c .PHONY: test_ipython -test_client_with_coverage: +test_ipython_with_coverage: ${COVERAGE} run -pm unittest discover -cv -.PHONY: test_client_with_coverage +.PHONY: test_ipython_with_coverage ${PARALLEL_OUT_DIR} : mkdir ${PARALLEL_OUT_DIR} @@ -98,7 +98,7 @@ test_mpi_with_coverage: test: test_ipython test_mpi test_engines .PHONY: test -test_with_coverage: test_client_with_coverage test_mpi_with_coverage test_engines_with_coverage +test_with_coverage: test_ipython_with_coverage test_mpi_with_coverage test_engines_with_coverage .PHONY: test_with_coverage coverage_report: