Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
fdca505
Add basic slice support to positivify.
bgrant May 7, 2014
47e89ba
Add a docstring to positivify.
bgrant May 7, 2014
8b4d6d3
Add better support for slices to positivify.
bgrant May 7, 2014
29707fe
Merge branch 'master' into feature/add-slicing
bgrant May 20, 2014
3732ced
Fix indexing errors by using Integral instead of int.
bgrant May 20, 2014
4b8e155
Merge branch 'master' into feature/add-slicing
bgrant May 20, 2014
72b5ab8
WIP: Add failing slice test.
bgrant May 20, 2014
4c66e91
Add a tuple_intersection function to metadata_utils.
bgrant May 20, 2014
b6d0ae2
Add slice support to dist/maps (for BlockMap)...
bgrant May 20, 2014
9eb9a03
Add slice support to local/maps (for BlockMap).
bgrant May 20, 2014
9bdae51
Allow multiple results through.
bgrant May 20, 2014
74b8acf
Merge branch 'refactor/restrict-use-of-checked-getitem' into feature/…
bgrant May 21, 2014
ef49a24
Unwrap a docstring.
bgrant May 21, 2014
176cfd9
Slicing works for __getitem__.
bgrant May 21, 2014
5a9483a
Fix positivify's behavior with slices.
bgrant May 22, 2014
0fa6ac3
Don't test for int, test for Integral
bgrant May 22, 2014
d969417
Add `targets` arg to context.apply calls
bgrant May 22, 2014
f0ddae1
Factor sanitize_indices out into metadata_utils
bgrant May 22, 2014
0813be2
Fix positivify and add regression tests.
bgrant May 22, 2014
9459142
Get rid of reference to old `client_map` module
bgrant May 22, 2014
19c7d52
Add classmethod Distribution.from_slice.
bgrant May 22, 2014
35668f3
`__getitem__` slicing works!?
bgrant May 23, 2014
96367c9
Fix bug.
bgrant May 23, 2014
f5b0149
Make Distribution.from_slice into slice instancemethod
bgrant May 23, 2014
4b829ad
Move new method below constructors.
bgrant May 23, 2014
e37f472
Clean up Distribution slice tests.
bgrant May 23, 2014
42e64cc
Slightly expand a Distribution.slice test.
bgrant May 23, 2014
909a64f
Add a Distribution.slice test.
bgrant May 23, 2014
cb43abf
Generalize sanitize_indices for incomplete indexing.
bgrant May 23, 2014
61788f0
Fill out sanitize_indices docstring.
bgrant May 23, 2014
70195c0
Remove a competing sanitize_indices.
bgrant May 23, 2014
c2d63e2
Add more tests to test_distarray. Some fail.
bgrant May 23, 2014
514857b
Add a call to positivify.
bgrant May 23, 2014
b392a84
Call positivify in sanitize_indices...
bgrant May 23, 2014
a9f161c
Whitespace.
bgrant May 23, 2014
6f284a0
Call `sanitize_indices` with full args.
bgrant May 23, 2014
3287d2d
Rename a value more descriptively.
bgrant May 23, 2014
af94432
Add a local slicing test.
bgrant May 26, 2014
1a86d81
Fix the slicing bug.
bgrant May 26, 2014
f779fcd
Remove another skiptest.
bgrant May 26, 2014
f94b9e3
Fix the local slicing test after API change.
bgrant May 26, 2014
64845dd
Fix output dimensionality.
bgrant May 26, 2014
03eef23
Unskip the last test.
bgrant May 26, 2014
daa6d7b
Line wrap.
bgrant May 26, 2014
a3c6efc
Remove a debugging statement.
bgrant May 26, 2014
78277dd
Preserve no-dist maps.
bgrant May 26, 2014
cb22426
Merge branch 'master' into feature/add-slicing
bgrant May 26, 2014
608d0ba
Test dist_type preservation.
bgrant May 26, 2014
857ba3b
Merge branch 'master' into feature/add-slicing
bgrant May 29, 2014
af1e916
Add a slice method to individual client Map types.
bgrant May 29, 2014
9db6153
Merge branch 'master' into feature/add-slicing
bgrant Jun 5, 2014
eb5d98b
Handle an error in `localarray.__getitem__` sooner.
bgrant Jun 9, 2014
688ba54
Merge branch 'master' into feature/add-slicing
bgrant Jun 12, 2014
d62eca2
Rename owners -> index_owners.
bgrant Jun 12, 2014
9a1f7ec
Split slice_owners off of index_owners.
bgrant Jun 12, 2014
911925b
Split local_from_global and the inverse...
bgrant Jun 12, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 70 additions & 47 deletions distarray/dist/distarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import operator
from itertools import product
from functools import reduce
from collections import Sequence

import numpy as np

import distarray
from distarray.metadata_utils import sanitize_indices
from distarray.dist.maps import Distribution
from distarray.utils import _raise_nie
from distarray.metadata_utils import normalize_reduction_axes
Expand All @@ -32,7 +34,6 @@
# Code
# ---------------------------------------------------------------------------


class DistArray(object):

__array_priority__ = 20.0
Expand Down Expand Up @@ -84,7 +85,8 @@ def get_dim_datas_and_dtype(arr):

# has context, get dist and dtype
elif (distribution is None) and (dtype is None):
res = context.apply(get_dim_datas_and_dtype, args=(key,))
res = context.apply(get_dim_datas_and_dtype, args=(key,),
targets=targets)
dim_datas = [i[0] for i in res]
dtypes = [i[1] for i in res]
da._dtype = dtypes[0]
Expand All @@ -95,7 +97,8 @@ def get_dim_datas_and_dtype(arr):
# has context and dtype, get dist
elif (distribution is None) and (dtype is not None):
da._dtype = dtype
dim_datas = context.apply(getattr, args=(key, 'dim_data'))
dim_datas = context.apply(getattr, args=(key, 'dim_data'),
targets=targets)
da.distribution = Distribution.from_dim_data_per_rank(context,
dim_datas,
targets)
Expand Down Expand Up @@ -128,10 +131,31 @@ def __repr__(self):
(self.shape, self.targets)
return s

def _process_return_value(self, result, return_proxy, index, targets):

if return_proxy:
# proxy returned as result of slice
# slicing shouldn't alter the dtype
result = result[0]
return DistArray.from_localarrays(key=result,
context=self.context,
targets=targets,
dtype=self.dtype)

elif isinstance(result, Sequence):
somethings = [i for i in result if i is not None]
if len(somethings) == 0:
# using checked_getitem and all return None
raise IndexError("Index %r is is not present." % (index,))
if len(somethings) == 1:
return somethings[0]
else:
return result
else:
assert False # impossible is nothing


def __getitem__(self, index):
#TODO: FIXME: major performance improvements possible here,
# especially for special cases like `index == slice(None)`.
# This would dramatically improve tondarray's performance.

# to be run locally
def checked_getitem(arr, index):
Expand All @@ -141,30 +165,34 @@ def checked_getitem(arr, index):
def raw_getitem(arr, index):
return arr.global_index[index]

if isinstance(index, int) or isinstance(index, slice):
tuple_index = (index,)
return self.__getitem__(tuple_index)
# to be run locally
def get_slice(arr, index, ddpr, comm):
from distarray.local.maps import Distribution
local_distribution = Distribution(comm=comm,
dim_data=ddpr[comm.Get_rank()])
result = arr.global_index.get_slice(index, local_distribution)
return proxyize(result)

return_type, index = sanitize_indices(index, ndim=self.ndim,
shape=self.shape)
return_proxy = (return_type == 'view')
targets = self.distribution.owning_targets(index)

args = [self.key, index]
if self.distribution.has_precise_index:
if return_proxy: # returning a new DistArray view
new_distribution = self.distribution.slice(index)
ddpr = new_distribution.get_dim_data_per_rank()
args.extend([ddpr, new_distribution.comm])
local_fn = get_slice
else: # returning a value
local_fn = raw_getitem
else: # returning a value from unstructured
local_fn = checked_getitem

result = self.context.apply(local_fn, args=args, targets=targets)
return self._process_return_value(result, return_proxy, index, targets)

elif isinstance(index, tuple):
targets = self.distribution.owning_targets(index)

args = (self.key, index)
if self.distribution.has_precise_index:
result = self.context.apply(raw_getitem, args=args,
targets=targets)
else:
result = self.context.apply(checked_getitem, args=args,
targets=targets)
result = [i for i in result if i is not None]
if len(result) != 1:
raise IndexError("Getting more than one result (%s) is not "
"supported yet." % (result,))
elif result is None:
raise IndexError("Index %r is out of bounds" % (index,))
else:
return result[0]
else:
raise TypeError("Invalid index type.")

def __setitem__(self, index, value):
#TODO: FIXME: major performance improvements possible here.
Expand All @@ -181,26 +209,21 @@ def checked_setitem(arr, index, value):
def raw_setitem(arr, index, value):
arr.global_index[index] = value

if isinstance(index, int) or isinstance(index, slice):
tuple_index = (index,)
return self.__setitem__(tuple_index, value)
_, index = sanitize_indices(index, ndim=self.ndim, shape=self.shape)

elif isinstance(index, tuple):
targets = self.distribution.owning_targets(index)
args = (self.key, index, value)
if self.distribution.has_precise_index:
self.context.apply(raw_setitem, args=args, targets=targets)
else:
result = self.context.apply(checked_setitem, args=args,
targets=targets)
result = [i for i in result if i is not None]
if len(result) > 1:
raise IndexError("Setting more than one result (%s) is "
"not supported yet." % (result,))
elif result == []:
raise IndexError("Index %s is out of bounds" % (index,))
targets = self.distribution.owning_targets(index)
args = (self.key, index, value)
if self.distribution.has_precise_index:
self.context.apply(raw_setitem, args=args, targets=targets)
else:
raise TypeError("Invalid index type.")
result = self.context.apply(checked_setitem, args=args,
targets=targets)
result = [i for i in result if i is not None]
if len(result) > 1:
raise IndexError("Setting more than one result (%s) is "
"not supported yet." % (result,))
elif result == []:
raise IndexError("Index %s is out of bounds" % (index,))

@property
def context(self):
Expand Down
97 changes: 84 additions & 13 deletions distarray/dist/maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import operator
from itertools import product
from abc import ABCMeta, abstractmethod
from numbers import Integral

import numpy as np

Expand All @@ -34,11 +35,12 @@
from distarray.utils import remove_elements
from distarray.metadata_utils import (normalize_dist,
normalize_grid_shape,
make_grid_shape,
positivify,
_start_stop_block,
normalize_dim_dict,
normalize_reduction_axes,
make_grid_shape,
sanitize_indices,
_start_stop_block,
tuple_intersection,
shapes_from_dim_data_per_rank)


Expand Down Expand Up @@ -137,13 +139,13 @@ class MapBase(object):
dimension of a distributed array. Maps allow distributed arrays to keep
track of which process to talk to when indexing and slicing.

Classes that inherit from `MapBase` must implement the `owners()`
Classes that inherit from `MapBase` must implement the `index_owners()`
abstractmethod.

"""

@abstractmethod
def owners(self, idx):
def index_owners(self, idx):
""" Returns a list of process IDs in this dimension that might possibly
own `idx`.

Expand Down Expand Up @@ -194,9 +196,12 @@ def __init__(self, size, grid_size):
self.size = size
self.grid_size = grid_size

def owners(self, idx):
def index_owners(self, idx):
return [0] if 0 <= idx < self.size else []

def slice_owners(self, idx):
return [0] # slicing doesn't complain about out-of-bounds indices

def get_dimdicts(self):
return ({
'dist_type': 'n',
Expand All @@ -205,6 +210,19 @@ def get_dimdicts(self):
'proc_grid_rank': 0,
},)

def slice(self, idx):
"""Make a new Map from a slice."""
start = idx.start if idx.start is not None else 0
stop = idx.stop if idx.stop is not None else self.size
intersection = tuple_intersection((0, self.size), (start, stop))
if intersection:
intersection_size = intersection[1] - intersection[0]
else:
intersection_size = 0

return {'dist_type': self.dist,
'size': intersection_size}


class BlockMap(MapBase):

Expand Down Expand Up @@ -254,13 +272,25 @@ def __init__(self, size, grid_size):
for grid_rank in range(grid_size)]
self.boundary_padding = self.comm_padding = 0

def owners(self, idx):
def index_owners(self, idx):
coords = []
for (coord, (lower, upper)) in enumerate(self.bounds):
if lower <= idx < upper:
coords.append(coord)
return coords

def slice_owners(self, idx):
coords = []
if idx.step not in {None, 1}:
msg = "Slicing only implemented for step=1"
raise NotImplementedError(msg)
for (coord, (lower, upper)) in enumerate(self.bounds):
slice_tuple = (idx.start if idx.start is not None else 0,
idx.stop if idx.stop is not None else self.size)
if tuple_intersection((lower, upper), slice_tuple):
coords.append(coord)
return coords if coords != [] else [0]

def get_dimdicts(self):
grid_ranks = range(len(self.bounds))
cpadding = self.comm_padding
Expand All @@ -282,6 +312,22 @@ def get_dimdicts(self):
})
return tuple(out)

def slice(self, idx):
"""Make a new Map from a slice."""
new_bounds = [0]
start = idx.start if idx.start is not None else 0
# iterate over the processes in this dimension
for proc_start, proc_stop in self.bounds:
stop = idx.stop if idx.stop is not None else proc_stop
intersection = tuple_intersection((proc_start, proc_stop),
(start, stop))
if intersection:
size = intersection[1] - intersection[0]
new_bounds.append(size + new_bounds[-1])

return {'dist_type': self.dist,
'bounds': new_bounds}


class BlockCyclicMap(MapBase):

Expand Down Expand Up @@ -317,7 +363,7 @@ def __init__(self, size, grid_size, block_size=1):
self.grid_size = grid_size
self.block_size = block_size

def owners(self, idx):
def index_owners(self, idx):
idx_block = idx // self.block_size
return [idx_block % self.grid_size]

Expand Down Expand Up @@ -367,13 +413,13 @@ def __init__(self, size, grid_size, indices=None):
if self.indices is not None:
# Convert to NumPy arrays if not already.
self.indices = [np.asarray(ind) for ind in self.indices]
self._owners = range(self.grid_size)
self._index_owners = range(self.grid_size)

def owners(self, idx):
def index_owners(self, idx):
# TODO: FIXME: for now, the unstructured map just returns all
# processes. Can be optimized if we know the upper and lower bounds
# for each local array's global indices.
return self._owners
return self._index_owners

def get_dimdicts(self):
if self.indices is None:
Expand Down Expand Up @@ -607,6 +653,24 @@ def has_precise_index(self):
"""
return not any(isinstance(m, UnstructuredMap) for m in self.maps)

def slice(self, index_tuple):
"""Make a new Distribution from a slice."""
new_targets = self.owning_targets(index_tuple)
global_dim_data = []
# iterate over the dimensions
for map_, idx in zip(self.maps, index_tuple):
if isinstance(idx, Integral):
continue # integral indexing returns reduced dimensionality
elif isinstance(idx, slice):
global_dim_data.append(map_.slice(idx))
else:
msg = "Index must be a sequence of Integrals and slices."
raise TypeError(msg)

return self.__class__(context=self.context,
global_dim_data=global_dim_data,
targets=new_targets)

def owning_ranks(self, idxs):
""" Returns a list of ranks that may *possibly* own the location in the
`idxs` tuple.
Expand All @@ -618,8 +682,15 @@ def owning_ranks(self, idxs):

If the `idxs` tuple is out of bounds, raises `IndexError`.
"""
idxs = map(positivify, idxs, self.shape) # positivify and check
dim_coord_hits = [m.owners(idx) for (m, idx) in zip(self.maps, idxs)]
_, idxs = sanitize_indices(idxs, ndim=self.ndim, shape=self.shape)
dim_coord_hits = []
for m, idx in zip(self.maps, idxs):
if isinstance(idx, Integral):
owners = m.index_owners(idx)
elif isinstance(idx, slice):
owners = m.slice_owners(idx)
dim_coord_hits.append(owners)

all_coords = product(*dim_coord_hits)
ranks = [self.rank_from_coords[c] for c in all_coords]
return ranks
Expand Down
Loading