From e682d7246ec06c13dfb73e26515d4940fd42b8c3 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Fri, 2 May 2014 15:49:44 -0500 Subject: [PATCH 1/6] Move uid and DISTARRAY_BASE_NAME to utils. --- distarray/dist/context.py | 3 +-- distarray/utils.py | 5 +++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/distarray/dist/context.py b/distarray/dist/context.py index 807e57d8..1a223c71 100644 --- a/distarray/dist/context.py +++ b/distarray/dist/context.py @@ -10,7 +10,6 @@ from __future__ import absolute_import -import uuid import collections import atexit @@ -22,7 +21,7 @@ from distarray.dist.maps import Distribution from distarray.dist.ipython_utils import IPythonClient -from distarray import DISTARRAY_BASE_NAME +from distarray.utils import uid, DISTARRAY_BASE_NAME class Context(object): diff --git a/distarray/utils.py b/distarray/utils.py index 52e52764..d8b4ab82 100644 --- a/distarray/utils.py +++ b/distarray/utils.py @@ -8,9 +8,14 @@ """ from math import sqrt +import uuid + from distarray.externals.six import next +DISTARRAY_BASE_NAME = '__distarray__' +def uid(): + return DISTARRAY_BASE_NAME + uuid.uuid4().hex[:16] def multi_for(iterables): if not iterables: From 88e31dc5335b3f5e0bb6cff4b9fe9d142bbea82e Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Fri, 2 May 2014 15:50:22 -0500 Subject: [PATCH 2/6] Add `apply` function which is analogous to IPython.parallel's apply. --- distarray/dist/context.py | 82 ++++++++++++++++++++++++++++++++------- 1 file changed, 67 insertions(+), 15 deletions(-) diff --git a/distarray/dist/context.py b/distarray/dist/context.py index 1a223c71..fc058132 100644 --- a/distarray/dist/context.py +++ b/distarray/dist/context.py @@ -68,8 +68,10 @@ def __init__(self, client=None, targets=None): # FIXME: IPython bug #4296: This doesn't work under Python 3 #with self.view.sync_imports(): # import distarray - self.view.execute("import distarray.local; " + self.view.execute("from functools import reduce; " + "import distarray.local; " "import distarray.local.mpiutils; " + "import distarray.utils; " "import numpy") self.context_key = self._setup_context_key() @@ -81,7 +83,7 @@ def _setup_context_key(self): Create a dict on the engines which will hold everything from this context. """ - context_key = DISTARRAY_BASE_NAME + self.uid() + context_key = uid() cmd = ("import types, sys;" "%s = types.ModuleType('%s');") cmd %= (context_key, context_key) @@ -138,21 +140,9 @@ def _set_engine_rank_mapping(self): # the intracomm. self.targets = [target_from_rank[i] for i in range(len(target_from_rank))] - # Key management routines: - @staticmethod - def _key_prefix(): - """ Get the base name for all keys. """ - return DISTARRAY_BASE_NAME - - @staticmethod - def uid(): - """Generate a unique valid python name.""" - # Full length seems excessively verbose so use 16 characters. - return Context._key_prefix() + uuid.uuid4().hex[:16] - def _generate_key(self): """ Generate a unique key name for this context. """ - key = "%s.%s" % (self.context_key, 'key_' + self.uid()) + key = "%s.%s" % (self.context_key, 'key_' + uid()) return key def _key_and_push(self, *values): @@ -502,3 +492,65 @@ def fromfunction(self, function, shape, **kwargs): '**{kwargs_name})') self._execute(cmd.format(**locals())) return DistArray.from_localarrays(da_name, distribution=distribution) + + def apply(self, func, args=None, kwargs=None, targets=None): + """ + Analogous to IPython.parallel.view.apply_sync + + Parameters + ---------- + func : function + args : tuple + positional arguments to func + kwargs : dict + key word arguments to func + targets : sequence of integers + engines func is to be run on. + + Returns + ------- + name : str + name of the result on the engines + """ + + def func_wrapper(func, result_name, *args, **kwargs): + """ + Function which calls the applied function after grabbing all the + arguments on the engines that are passed in as names of the form + `__distarray__`. + """ + main = __import__('__main__') + prefix = main.distarray.utils.DISTARRAY_BASE_NAME + + # convert args + args = list(args) + for i, a in enumerate(args): + if (isinstance(a, str) and a.startswith(prefix)): + args[i] = main.reduce(getattr, [main] + a.split('.')) + args = tuple(args) + + # convert kwargs + for k in kwargs.keys(): + val = kwargs[k] + if (isinstance(val, str) and val.startswith(prefix)): + kwargs[k] = main.reduce(getattr, [main] + val.split('.')) + + setattr(main, result_name, func(*args, **kwargs)) + return result_name + + # create a name for the result + result_name = uid() + + # default arguments + if args is None: + args = (func, result_name) + else: + args = tuple([func, result_name] + list(args)) + + kwargs = {} if kwargs is None else kwargs + targets = self.targets if targets is None else targets + + result = self.view._really_apply(func_wrapper, args=args, kwargs=kwargs, + targets=targets, block=True) + # `result` is a list of `result_name` 4 times, we only need one. + return result[0] From 641ca44a269945d1f8bbf2c55a16c1d4d7dae671 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Fri, 2 May 2014 15:51:05 -0500 Subject: [PATCH 3/6] Tests for new apply function. --- distarray/dist/tests/test_context.py | 76 ++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/distarray/dist/tests/test_context.py b/distarray/dist/tests/test_context.py index 0fb3f72e..cd5dcff6 100644 --- a/distarray/dist/tests/test_context.py +++ b/distarray/dist/tests/test_context.py @@ -126,5 +126,81 @@ def test_3D(self): self.assertEqual(c.grid_shape, (1, 1, 3)) +class TestApply(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.context = Context() + + def test_apply_no_args(self): + + def foo(): + return 42 + + name = self.context.apply(foo) + val = self.context._pull(name, targets=[0])[0] + + self.assertEqual(val, 42) + + def test_apply_pos_args(self): + + def foo(a, b, c): + return a + b + c + + # push all arguments + name = self.context.apply(foo, (1, 2, 3)) + val = self.context._pull(name, targets=[0])[0] + + self.assertEqual(val, 6) + + # some local, some pushed + local_thing = self.context._key_and_push(2)[0] + name = self.context.apply(foo, (1, local_thing, 3)) + val = self.context._pull(name, targets=[0])[0] + + self.assertEqual(val, 6) + + # all pushed + local_args = self.context._key_and_push(1, 2, 3) + name = self.context.apply(foo, local_args) + val = self.context._pull(name, targets=[0])[0] + + self.assertEqual(val, 6) + + def test_apply_kwargs(self): + + def foo(a, b, c=None, d=None): + c = -1 if c is None else c + d = -2 if d is None else d + return a + b + c + d + + # empty kwargs + name = self.context.apply(foo, (1, 2)) + val = self.context._pull(name, targets=[0])[0] + + self.assertEqual(val, 0) + + # some empty + name = self.context.apply(foo, (1, 2), {'d': 3}) + val = self.context._pull(name, targets=[0])[0] + + self.assertEqual(val, 5) + + # all kwargs + name = self.context.apply(foo, (1, 2), {'c': 2, 'd': 3}) + val = self.context._pull(name, targets=[0])[0] + + self.assertEqual(val, 8) + + # now with local values + local_a = self.context._key_and_push(1)[0] + local_c = self.context._key_and_push(3)[0] + + name = self.context.apply(foo, (local_a, 2), {'c': local_c, 'd': 3}) + val = self.context._pull(name, targets=[0])[0] + + self.assertEqual(val, 9) + + if __name__ == '__main__': unittest.main(verbosity=2) From 205091c656b95ce5e931012ccadcb882b951a730 Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Sun, 4 May 2014 15:49:55 -0500 Subject: [PATCH 4/6] Add return value flag. --- distarray/dist/context.py | 23 ++++++++++++++++------- distarray/dist/tests/test_context.py | 9 +++++++++ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/distarray/dist/context.py b/distarray/dist/context.py index fc058132..ba0f1f03 100644 --- a/distarray/dist/context.py +++ b/distarray/dist/context.py @@ -493,7 +493,8 @@ def fromfunction(self, function, shape, **kwargs): self._execute(cmd.format(**locals())) return DistArray.from_localarrays(da_name, distribution=distribution) - def apply(self, func, args=None, kwargs=None, targets=None): + def apply(self, func, args=None, kwargs=None, targets=None, + return_name=True): """ Analogous to IPython.parallel.view.apply_sync @@ -513,7 +514,7 @@ def apply(self, func, args=None, kwargs=None, targets=None): name of the result on the engines """ - def func_wrapper(func, result_name, *args, **kwargs): + def func_wrapper(func, result_name, return_name, *args, **kwargs): """ Function which calls the applied function after grabbing all the arguments on the engines that are passed in as names of the form @@ -536,21 +537,29 @@ def func_wrapper(func, result_name, *args, **kwargs): kwargs[k] = main.reduce(getattr, [main] + val.split('.')) setattr(main, result_name, func(*args, **kwargs)) - return result_name + + if return_name: + return result_name + else: + return getattr(main, result_name) # create a name for the result result_name = uid() # default arguments if args is None: - args = (func, result_name) + args = (func, result_name, return_name) else: - args = tuple([func, result_name] + list(args)) + args = tuple([func, result_name, return_name] + list(args)) kwargs = {} if kwargs is None else kwargs + targets = self.targets if targets is None else targets result = self.view._really_apply(func_wrapper, args=args, kwargs=kwargs, targets=targets, block=True) - # `result` is a list of `result_name` 4 times, we only need one. - return result[0] + if return_name: + # result is a list of the same name 4 times, so just return 1. + return result[0] + else: + return result diff --git a/distarray/dist/tests/test_context.py b/distarray/dist/tests/test_context.py index cd5dcff6..52978843 100644 --- a/distarray/dist/tests/test_context.py +++ b/distarray/dist/tests/test_context.py @@ -201,6 +201,15 @@ def foo(a, b, c=None, d=None): self.assertEqual(val, 9) + def test_apply_return_val(self): + + def foo(a, b, c=None): + c = 3 if c is None else c + return a + b + c + + val = self.context.apply(foo, (1, 2), {'c': 5}, return_name=False) + self.assertEqual(val, [8]*len(self.context.targets)) + if __name__ == '__main__': unittest.main(verbosity=2) From cf08e3aa8d1238062a728dc632fa4fe43b2663cf Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Mon, 5 May 2014 14:51:37 -0500 Subject: [PATCH 5/6] Return function result by default. --- distarray/dist/context.py | 34 ++++++++++---------- distarray/dist/tests/test_context.py | 48 +++++++++++++--------------- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/distarray/dist/context.py b/distarray/dist/context.py index ba0f1f03..5003f0b4 100644 --- a/distarray/dist/context.py +++ b/distarray/dist/context.py @@ -494,7 +494,7 @@ def fromfunction(self, function, shape, **kwargs): return DistArray.from_localarrays(da_name, distribution=distribution) def apply(self, func, args=None, kwargs=None, targets=None, - return_name=True): + result_name=None): """ Analogous to IPython.parallel.view.apply_sync @@ -507,14 +507,19 @@ def apply(self, func, args=None, kwargs=None, targets=None, key word arguments to func targets : sequence of integers engines func is to be run on. + result_name : str + The name given the result on the engines. If given this is returned + to act as a proxy object. Returns ------- - name : str - name of the result on the engines + if result_name is not None : str + Name of the result on the engines. + else: list + A list of the results on all the engines. """ - def func_wrapper(func, result_name, return_name, *args, **kwargs): + def func_wrapper(func, result_name, *args, **kwargs): """ Function which calls the applied function after grabbing all the arguments on the engines that are passed in as names of the form @@ -536,29 +541,26 @@ def func_wrapper(func, result_name, return_name, *args, **kwargs): if (isinstance(val, str) and val.startswith(prefix)): kwargs[k] = main.reduce(getattr, [main] + val.split('.')) - setattr(main, result_name, func(*args, **kwargs)) - - if return_name: + if result_name: + setattr(main, result_name, func(*args, **kwargs)) return result_name else: - return getattr(main, result_name) - - # create a name for the result - result_name = uid() + return func(*args, **kwargs) # default arguments if args is None: - args = (func, result_name, return_name) + args = (func, result_name) else: - args = tuple([func, result_name, return_name] + list(args)) + args = tuple([func, result_name] + list(args)) kwargs = {} if kwargs is None else kwargs targets = self.targets if targets is None else targets - result = self.view._really_apply(func_wrapper, args=args, kwargs=kwargs, - targets=targets, block=True) - if return_name: + result = self.view._really_apply(func_wrapper, args=args, + kwargs=kwargs, targets=targets, + block=True) + if result_name is not None: # result is a list of the same name 4 times, so just return 1. return result[0] else: diff --git a/distarray/dist/tests/test_context.py b/distarray/dist/tests/test_context.py index 52978843..2628ff80 100644 --- a/distarray/dist/tests/test_context.py +++ b/distarray/dist/tests/test_context.py @@ -137,10 +137,9 @@ def test_apply_no_args(self): def foo(): return 42 - name = self.context.apply(foo) - val = self.context._pull(name, targets=[0])[0] + val = self.context.apply(foo) - self.assertEqual(val, 42) + self.assertEqual(val, [42]*4) def test_apply_pos_args(self): @@ -148,24 +147,20 @@ def foo(a, b, c): return a + b + c # push all arguments - name = self.context.apply(foo, (1, 2, 3)) - val = self.context._pull(name, targets=[0])[0] - - self.assertEqual(val, 6) + val = self.context.apply(foo, (1, 2, 3)) + self.assertEqual(val, [6]*4) # some local, some pushed local_thing = self.context._key_and_push(2)[0] - name = self.context.apply(foo, (1, local_thing, 3)) - val = self.context._pull(name, targets=[0])[0] + val = self.context.apply(foo, (1, local_thing, 3)) - self.assertEqual(val, 6) + self.assertEqual(val, [6]*4) # all pushed local_args = self.context._key_and_push(1, 2, 3) - name = self.context.apply(foo, local_args) - val = self.context._pull(name, targets=[0])[0] + val = self.context.apply(foo, local_args) - self.assertEqual(val, 6) + self.assertEqual(val, [6]*4) def test_apply_kwargs(self): @@ -175,31 +170,27 @@ def foo(a, b, c=None, d=None): return a + b + c + d # empty kwargs - name = self.context.apply(foo, (1, 2)) - val = self.context._pull(name, targets=[0])[0] + val = self.context.apply(foo, (1, 2)) - self.assertEqual(val, 0) + self.assertEqual(val, [0]*4) # some empty - name = self.context.apply(foo, (1, 2), {'d': 3}) - val = self.context._pull(name, targets=[0])[0] + val = self.context.apply(foo, (1, 2), {'d': 3}) - self.assertEqual(val, 5) + self.assertEqual(val, [5]*4) # all kwargs - name = self.context.apply(foo, (1, 2), {'c': 2, 'd': 3}) - val = self.context._pull(name, targets=[0])[0] + val = self.context.apply(foo, (1, 2), {'c': 2, 'd': 3}) - self.assertEqual(val, 8) + self.assertEqual(val, [8]*4) # now with local values local_a = self.context._key_and_push(1)[0] local_c = self.context._key_and_push(3)[0] - name = self.context.apply(foo, (local_a, 2), {'c': local_c, 'd': 3}) - val = self.context._pull(name, targets=[0])[0] + val = self.context.apply(foo, (local_a, 2), {'c': local_c, 'd': 3}) - self.assertEqual(val, 9) + self.assertEqual(val, [9]*4) def test_apply_return_val(self): @@ -207,7 +198,12 @@ def foo(a, b, c=None): c = 3 if c is None else c return a + b + c - val = self.context.apply(foo, (1, 2), {'c': 5}, return_name=False) + name = self.context.apply(foo, (1, 2), {'c': 5}, result_name='test') + + self.assertEqual(name, 'test') + + val = self.context._pull(name) + self.assertEqual(val, [8]*len(self.context.targets)) From 4336a7861b9b8d672d7fef289557661d1980412f Mon Sep 17 00:00:00 2001 From: Blake Griffith Date: Tue, 6 May 2014 13:29:53 -0500 Subject: [PATCH 6/6] Refactor argument handling. --- distarray/dist/context.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/distarray/dist/context.py b/distarray/dist/context.py index 5003f0b4..c9120e0d 100644 --- a/distarray/dist/context.py +++ b/distarray/dist/context.py @@ -519,7 +519,7 @@ def apply(self, func, args=None, kwargs=None, targets=None, A list of the results on all the engines. """ - def func_wrapper(func, result_name, *args, **kwargs): + def func_wrapper(func, result_name, args, kwargs): """ Function which calls the applied function after grabbing all the arguments on the engines that are passed in as names of the form @@ -548,18 +548,14 @@ def func_wrapper(func, result_name, *args, **kwargs): return func(*args, **kwargs) # default arguments - if args is None: - args = (func, result_name) - else: - args = tuple([func, result_name] + list(args)) - + args = () if args is None else args kwargs = {} if kwargs is None else kwargs + wrapped_args = (func, result_name, args, kwargs) targets = self.targets if targets is None else targets - result = self.view._really_apply(func_wrapper, args=args, - kwargs=kwargs, targets=targets, - block=True) + result = self.view._really_apply(func_wrapper, args=wrapped_args, + targets=targets, block=True) if result_name is not None: # result is a list of the same name 4 times, so just return 1. return result[0]