diff --git a/README.md b/README.md index d2350fb..002d628 100644 --- a/README.md +++ b/README.md @@ -231,7 +231,7 @@ Current status of Ramba compatibility with NumPy APIs. Key: 🟢 works | | rearrange elements | 🔴 not implemented | |Index/slice | range slice | 🟢 works | produces view like in numpy; steps > 1 and negative steps are supported | | masked arrays | 🟡 partial | only in assignments / in-place operations / reductions; see docs for details -| | fancy indexing | 🟡 partial | fancy/advanced indexing using an array of indices is very expensive in a distributed context; See docs for details/limitations. +| | fancy indexing | 🟢 mostly works | fancy/advanced indexing using an array of indices is very expensive in a distributed context; See docs for details/limitations. | | index routines | 🔴 not implemented | ("where" partly works) |Math | arithmetic operations | 🟢 works | +, -, +=, //, etc. | | comparisons | 🟢 works | diff --git a/docs/index.md b/docs/index.md index 385b952..3c085f9 100644 --- a/docs/index.md +++ b/docs/index.md @@ -65,10 +65,10 @@ corresponding elements of the array should be updated; other elements remain un (b) When the masked array appears in any other expression, an output 1D array is constructed, containing all of the elements for which the mask is True. The output is always 1D and a copy, regardless of the dimensionality of the original array. -Ramba currently only supports the first use case. +Ramba currently only supports the first use case. Mixing mask indexing with slices, fancy indexing, etc., is not supported. ### Fancy / Advanced Indexing -Ramba now supports fancy/advanced indexing of an array with array of index values. Although this works, in a distributed context this is a very expensive operation. The result is always a copy, and may require significant communication betweeen nodes. The result array will attempt to match the distribution of the indexing array, or use a clean default distribution if there is no distributed index. +Ramba now supports fancy/advanced indexing of an array with array of index values. Although this works, in a distributed context this is a very expensive operation. When indexing for read, the result is always a copy, and may require significant communication betweeen nodes. The result array will attempt to match the distribution of the indexing array, or use a clean default distribution if there is no distributed index. When setting the array using advanced indexing, a view is used. If more than one term refers to the same element of the array, then the result is unpredictable (due to parallel execution); this is unlike numpy, where the "last" value set wins. Mixing advanced indexing on an axis with simple indexing, slices, "None", ellipses, etc. on others is also supported. Supplying index arrays for multiple axes is supported (as long as the arrays can broadcast together, as in Numpy). However, in the current implementation, at most only one of the index arrays can be a distributed Ramba array -- others must be nondistributed Numpy arrays. Note that the precise rules used in Numpy when mixing these indexing types is bit arcane. Ramba tries to match these, but the position of the dimensions corresponding to the index arrays in the output shape may differ from Numpy when mixing broadcasting of indexing arrays, None, and slices in the same operation. diff --git a/ramba/ramba.py b/ramba/ramba.py index 6f59fd0..1dcf85e 100644 --- a/ramba/ramba.py +++ b/ramba/ramba.py @@ -6062,31 +6062,138 @@ def array_binop( @classmethod - def setitem_executor(cls, temp_array, self, key, value): - dprint(1, "ndarray::__setitem__:", key, type(key), value, type(value)) + def setitem_array_executor(cls, temp_array, self, index, value): + dprint(1, "ndarray::__setitem__:", index, type(index), value, type(value)) if self.readonly: raise ValueError("assignment destination is read-only") - #if self.shape == (): # 0d case - # print("setitem 0d:", key, type(key)) - # # Is there a better way of testing and performing type conversion? - # tmp = np.empty((), dtype=self.dtype) - # tmp[()] = value - # self.distribution = tmp - # return - + # index is a mask ndarray -- boolean type with same shape as array (or broadcastable to that shape) is_mask = False - if isinstance(key, ndarray) and key.dtype==bool and key.broadcastable_to(self.shape): - is_mask = True - elif not isinstance(key, tuple): - key = (key,) + if isinstance(index, ndarray) and index.dtype==bool: + if index.broadcastable_to(self.shape): + is_mask=True + else: + raise IndexError("Mask index shape does not match array shape") + + index_has_slice = not is_mask and builtins.any([isinstance(i, slice) for i in index]) + index_has_array = not is_mask and builtins.any([isinstance(i, (tuple, list, np.ndarray, ndarray)) for i in index]) + index_has_ndarray = not is_mask and builtins.any([isinstance(i, ndarray) for i in index]) + + if isinstance(value, (list, tuple)): + try: + value = np.array(value) + except: + raise ValueError("Cannot convert sequences to array") + + if index_has_array: + # This is the advanced indexing case. + dim_sizes, preindex, postindind, arrayind, dist = dim_sizes_from_index(index, self.shape) + dst_arr = manual_idx_ndarray(self[preindex]) + print(f"HERE: {dst_arr.shape} {dim_sizes} {dst_arr.distribution}") + if isinstance(value, np.ndarray): + value = array(value) + if isinstance(value, ndarray): + if not value.broadcastable_to(dim_sizes): + raise ValueError(f"Mismatched shapes during fancy indexing assignment {dim_sizes} {value.shape}") + value = value.broadcast_to(dim_sizes) + # if index has an ndarray and value dist does not match, copy value to array with matching dist + if index_has_ndarray and not shardview.compatible_distributions(dist, value.distribution): + tmp = empty(dim_sizes, dtype=value.dtype, distribution=dist) + tmp[...] = value + value = tmp + value.instantiate() + valueind = value + valuetype = np.array(0, dtype=value.dtype)[()] + else: # value is a scalar; distribute computation according to ndarray in index (or default dist) + valueind = empty(dim_sizes, distribution=dist) # construct array with desired dist; wasteful but works + valueind.instantiate() # is this needed? + valuetype = value + + dst_arr_dist = deferred_op.get_temp_var() # temp variable to store distribution of target array + deferred_op.add_op(["#",valueind], valueind, precode=["", dst_arr_dist,"=",dst_arr.distribution]) # make sure that value/indexing array is first array and used for loop size; copy distribution (numpy array) so it does not get passed in multiple times + # construct indexing tuples + ind_arr = deferred_op.get_temp_var() # temporary variable to hold constructed global index tuple + arrindstr = ",".join([f"index[{i}]+global_start[{i}]" for i in range(arrayind.start,arrayind.stop)]) + indop = ["", ind_arr, "= ("] + for i in postindind: + if isinstance(i, numbers.Integral): + indop[-1]+=f"index[{i}]+global_start[{i}], " + elif isinstance(i, ndarray): + indop[-1]+=f"int(" + indop.append(i) + indop.append("), ") + else: + indop[-1]+=f"int(" + indop.append(i) + indop.append(f"[({arrindstr})]), ") + indop[-1]+=")" + deferred_op.add_op(indop, valueind) + ind_arr_local = deferred_op.get_temp_var() # temporary variable to hold constructed local index tuple + indop_local = ["", ind_arr_local, "= ("] + for i in range(len(postindind)): + indop_local.extend([ind_arr, f"[{i}]-",dst_arr_dist, f"[worker_num,1,{i}], "]) + indop_local[-1]+=")" + deferred_op.add_op(indop_local, valueind) + # if index is in local range of source array, copy value to output array + deferred_op.add_op(["if shardview.has_index(", dst_arr_dist,"[worker_num],",ind_arr, "): ", dst_arr, "[", ind_arr_local, "] = ", value], valueind) + # otherwise, add to list of items to push to remote node + deferred_op.add_op(["else:"],valueind) + nodeid = deferred_op.get_temp_var() + deferred_op.add_op([" ", nodeid,"=shardview.find_index(", dst_arr_dist,",",ind_arr,")"], valueind) + need_comm = manual_idx_ndarray((num_workers,num_workers), dtype=bool, dist_dims=0, flex_dist=False) + deferred_op.add_op([" ", need_comm, "[0][int(", nodeid, ")]=True"], valueind, + precode=["for i in range(num_workers): ", need_comm, "[0,i]=False"] ) + comm = deferred_op.get_temp_var() + deferred_op.add_op([" ", comm, "[", nodeid, "].append(list(", ind_arr, "))"], valueind, + precode=["", comm,"=[[[", dst_arr,".shape[0]]]*0 for _ in range(num_workers)]"]) + vals = deferred_op.get_temp_var() + deferred_op.add_op([" ", vals, "[", nodeid, "].append(", value, ")"], valueind, + precode=["", vals,"=[[", valuetype,"]*0 for _ in range(num_workers)]"]) + # prepare messages + deferred_op.add_op(["#"], valueind, postcode=["for i in range(num_workers):"] ) + tmp_arr = deferred_op.get_temp_var() + tmp_arr2 = deferred_op.get_temp_var() + indextypestr = 'int64' if ramba_big_data else 'int32' + valuetypestr = str(np.array(valuetype).dtype) + deferred_op.add_op(["#"], valueind, postcode=[" if not ", need_comm,"[0,i]: continue"] ) + deferred_op.add_op(["#"], valueind, postcode=[" ",tmp_arr,"=np.array(", comm,"[i], dtype=np."+indextypestr+")"] ) + deferred_op.add_op(["#"], valueind, postcode=[" with numba.objmode(): add_comm_msg(i,", tmp_arr, ")"], + precode=["with numba.objmode(): init_comm(num_workers)"] ) + deferred_op.add_op(["#"], valueind, postcode=[" ",tmp_arr2,"=np.array(", vals,"[i], dtype=np."+valuetypestr+")"] ) + deferred_op.add_op(["#"], valueind, postcode=[" with numba.objmode(): add_comm_msg(i,", tmp_arr2, ")"]) - if not is_mask: - key = tuple([self.handle_0d_index(ind) for ind in key]) + # Exchange messages + need_comm = need_comm.asarray() + print("need_comm array: ",need_comm) + remote_exec_all("all2all", uuid.uuid4(), need_comm) - if is_mask or builtins.any([isinstance(i, slice) or i is Ellipsis for i in key]) or len(key) < len(self.shape): - view = self[key] - if isinstance(value, (int, bool, float, complex, np.generic)): + # Fill in from received data + dst_arr_dist = deferred_op.get_temp_var() # temp variable to store distribution of target array + deferred_op.add_op(["#",valueind], valueind, precode=["", dst_arr_dist,"=",dst_arr.distribution]) + deferred_op.add_op(["#"], valueind, precode=["for i in range(num_workers):"]) + deferred_op.add_op(["#"], valueind, precode=[" if not ", need_comm.T, "[worker_num,i]: continue"]) + req_arr = deferred_op.get_temp_var() + deferred_op.add_op(["#"], valueind, precode=[" with numba.objmode(", req_arr,"='"+indextypestr+"[:,:]'): ", req_arr, "=get_comm_msg(i,0)"]) + reply_arr = deferred_op.get_temp_var() + deferred_op.add_op(["#"], valueind, precode=[" with numba.objmode(", reply_arr,"='"+valuetypestr+"[:]'): ", reply_arr, "=get_comm_msg(i,1)"]) + deferred_op.add_op(["#"], valueind, precode=[" for j in numba.prange(", req_arr, ".shape[0]):"]) + ind_arr_local = deferred_op.get_temp_var() # temporary variable to hold constructed local index tuple + indop_local = [" ", ind_arr_local, "= ("] + for i in range(len(postindind)): + indop_local.extend([req_arr, f"[j,{i}]-",dst_arr_dist, f"[worker_num,1,{i}], "]) + indop_local[-1]+=")" + deferred_op.add_op(["#"], valueind, precode=indop_local) + deferred_op.add_op(["#"], valueind, precode=[" ", dst_arr, "[", ind_arr_local, "]=", reply_arr,"[j]"]) + deferred_op.add_op(["pass #"], valueind, precode=["return"]) + deferred_op.do_ops() + + # Done! + return + + # If this is a maxked array, any of the indices are slices or the number of indices is less than the number of array dimensions. + elif is_mask or index_has_slice or len(index) < len(self.shape): + view = self[index] + if isinstance(value, (int, bool, float, complex, np.generic)): # non-array value deferred_op.add_op(["", view, " = ", value, ""], view) return if isinstance(value, np.ndarray): @@ -6102,62 +6209,118 @@ def setitem_executor(cls, temp_array, self, key, value): deferred_op.add_op(["", view, " = ", value, ""], view) return else: - # TODO: Should try to broadcast value to view.shape before giving up; Done? print("Mismatched shapes", view.shape, value.shape) assert 0 - # Need to handle all possible remaining cases. - print("Don't know how to set index", key, " of dist array of shape", self.shape) - assert 0 + print("Don't know how to gset index", index, type(index), " of dist array of shape", self.shape) + assert 0 # Handle other types @DAGapi - def setitem(self, key, value): - key = self.handle_0d_index(key) - if isinstance(key, (ndarray, np.ndarray)): + def setitem_array(self, index, value): + # index is a mask ndarray -- boolean type with same shape as array (or broadcastable to that shape) + #if isinstance(index, ndarray) and index.dtype==bool and index.broadcastable_to(self.shape): + # return DAGshape(self.shape, self.dtype, False) + if isinstance(index, ndarray) and index.dtype==bool: + if index.broadcastable_to(self.shape): + return DAGshape(self.shape, self.dtype, self) + else: + raise IndexError("Mask index shape does not match array shape") + + if len(index) > self.ndim: + raise IndexError(f"too many indices for array: array is {self.ndim}-dimensional, but {len(index)} were indexed") + + index_has_slice = builtins.any([isinstance(i, slice) for i in index]) + index_has_array = builtins.any([isinstance(i, (tuple, list, np.ndarray, ndarray)) for i in index]) + + if index_has_array: + # This is the advanced indexing case. + dim_sizes,_,_,_,_ = dim_sizes_from_index(index, self.shape) return DAGshape(self.shape, self.dtype, self) - if not isinstance(key, tuple): - key = (key,) - # Convert various Integral types to plain Python int. - key = tuple([int(x) if not isinstance(x, (bool, np.bool_)) and isinstance(x, numbers.Integral) else x for x in key]) - key = tuple([self.handle_0d_index(ind) for ind in key]) - key2 = tuple(i for i in key if i is not Ellipsis) - if len(key2)+11: raise IndexError("an index can only have a single ellipsis ('...')") - if builtins.all([isinstance(i, numbers.Integral) for i in key2]): - if len(key2) > self.ndim: - raise IndexError(f"too many indices for array: array is {self.ndim}-dimensional, but {len(key2)} were indexed") - elif len(key2) == self.ndim: - dprint(1, "ndarray::__setitem__:", key, type(key), value, type(value)) + if len(ellpos)==1: + ellpos = ellpos[0] + preind = index[:ellpos] + postind = index[ellpos+1:] + index = preind + tuple( slice(None) for _ in range(self.ndim - len(index) + 1 + index.count(None)) ) + postind + + # Handle None; np.newaxis is an alias for None + if builtins.any([x is None for x in index]): + newdims = [i for i in range(len(index)) if index[i] is None] + newindex = tuple([i if i is not None else slice(None) for i in index]) + expanded_array = expand_dims(self, newdims) + #print(self.shape, newdims, expanded_array.shape, newindex) + return expanded_array.setitem(newindex, value) + + # If all the indices are integers and the number of indices equals the number of array dimensions. + if builtins.all([isinstance(i, numbers.Integral) for i in index]): + if len(index) > len(self.shape): + raise IndexError(f"too many indices for array: array is {self.ndim}-dimensional, but {len(index)} were indexed") + elif len(index) == len(self.shape): + dprint(1, "ndarray::__setitem__:", index, type(index), value, type(value)) if isinstance(value, (list, tuple)): raise ValueError("setting an array element with a sequence") if isinstance(value, (ndarray, np.ndarray)): if value.size!=1: raise ValueError("setting an array element with an array size>1") value = value[tuple(0 for _ in range(value.ndim))] - # Is there a better way of testing and performing type conversion? - tmp = np.empty((), dtype=self.dtype) - tmp[()] = value - value = tmp[()] + dprint(2, "before setitem_real instantiate") self.instantiate() - if self.shape == (): # 0d case - dprint(2,"setitem 0d:", key, type(key)) - self.distribution = tmp - return if self.readonly: raise ValueError("assignment destination is read-only") - index = canonical_index(key2, self.shape, allslice=False) + # 0d case + if self.shape==(): + dprint(2,"setitem 0d:", index, type(index)) + self.distribution[()] = value + return self.distribution + + index = canonical_index(index, self.shape, allslice=False) owner = shardview.find_index(self.distribution, index) dprint(2, "owner:", owner) remote_exec( owner, "setitem_global", self.gid, index, value, self.distribution[owner]) return - return DAGshape(self.shape, self.dtype, self) + return self.setitem_array(index, value) def __setitem__(self, key, value): - return self.setitem(key, value) + return self.setitem_real(key, value) def __getitem__(self, index): return self.getitem_real(index) @@ -6260,7 +6423,7 @@ def getitem_array_executor(cls, temp_array, self, index): deferred_op.add_op(["#"], dst_arr, precode=indop_local) deferred_op.add_op(["#"], dst_arr, precode=[" ", reply_arr, "[j]=", src_arr, "[", ind_arr_local, "]"]) deferred_op.add_op(["#"], dst_arr, precode=[" with numba.objmode(): add_comm_msg(i,", reply_arr,")"]) - deferred_op.add_op(["",dst_arr], dst_arr, precode=["return"]) + deferred_op.add_op(["pass #",dst_arr], dst_arr, precode=["return"]) deferred_op.do_ops() # Exchange messages @@ -6281,7 +6444,7 @@ def getitem_array_executor(cls, temp_array, self, index): indop_local[-1]+=")" deferred_op.add_op(["#"], dst_arr, precode=indop_local) deferred_op.add_op(["#"], dst_arr, precode=[" ", dst_arr, "[", ind_arr_local, "]=", reply_arr,"[j]"]) - deferred_op.add_op(["",dst_arr], dst_arr, precode=["return"]) + deferred_op.add_op(["pass #",dst_arr], dst_arr, precode=["return"]) deferred_op.do_ops() # Done! @@ -6370,13 +6533,6 @@ def getitem_real(self, index): if not isinstance(index, tuple): index = (index,) - if index == (): - if self.shape == (): # 0d case - self.instantiate() - return self.distribution[()] - else: - index=(...,) - index = tuple([self.handle_0d_index(ind) for ind in index]) # Handle Ellipsis -- it can occur in any position, and may be combined with np.newzxis/None diff --git a/ramba/tests/test_distributed_array.py b/ramba/tests/test_distributed_array.py index fbe8c20..452f72a 100644 --- a/ramba/tests/test_distributed_array.py +++ b/ramba/tests/test_distributed_array.py @@ -888,6 +888,29 @@ def impl(app): run_both(impl) + def test_fancy_indexing3(self): + # Test advanced indexing -- setitem to scalar + def impl(app): + a = app.arange(500) + b = a[::2] + c = app.fromfunction(lambda i,j: i+j*100, (50,3), dtype=int) + b[c] = 1 + return a + + run_both(impl) + + def test_fancy_indexing4(self): + # Test advanced indexing -- setitem to dist array with mismatched distribution + def impl(app): + a = app.arange(500) + b = a[::2] + c = app.fromfunction(lambda i,j: i+j*100, (50,3), dtype=int) + d = app.fromfunction(lambda i,j: (i-j), (50,100), dtype=int) + b[c] = d[:,48:51] + return a + + run_both(impl) + def test_smap1(self): a = ramba.arange(100) b = ramba.smap("lambda x: 3*x-7", a)