Source code for chunkblocks.global_offset_array

import numbers

import numpy as np
from numpy.lib.mixins import NDArrayOperatorsMixin

OUT = 'out'


[docs]class GlobalOffsetArray(np.ndarray, NDArrayOperatorsMixin): """ A simple VIEW CAST of a given ndarray that is addressed via global coordinates. Negative wraparound indices are NOT supported (i.e. used for printing out) and will IGNORE indices out of bounds See below link for explanations of __new__ and __array_finalize__! https://docs.scipy.org/doc/numpy/user/basics.subclassing.html#slightly-more-realistic-example-attribute-added-to-existing-array """# noqa _HANDLED_TYPES = (np.ndarray, numbers.Number)
[docs] def __new__(cls, input_array, global_offset=None, *args, **kwargs): if not isinstance(input_array, np.ndarray): return input_array obj = np.asarray(input_array).view(cls) if global_offset is None: global_offset = tuple([0] * input_array.ndim) obj.global_offset = tuple(global_offset) if len(global_offset) != len(input_array.shape): raise ValueError("Global offset %s does not have same number dimensions as input_array shape %s" % ( global_offset, input_array.shape)) obj._bounds = None return obj
[docs] def __array_finalize__(self, obj): """ Called whenever the array is new-ed. Because unpickling does NOT call __array_finalize__, make sure they are equivalent """ if obj is None: return self.global_offset = getattr(obj, 'global_offset', None) self._bounds = getattr(obj, '_bounds', None)
[docs] def __reduce__(self): reduction = super().__reduce__() object_state = reduction[2] object_state += (self.global_offset,) return tuple(object_state if index is 2 else r for index, r in enumerate(reduction))
[docs] def __setstate__(self, state): """ Called when unpickling. Because unpickling does NOT call __array_finalize__, make sure they are equivalent """ self.global_offset = state[-1] self._bounds = None super().__setstate__(state[:-1])
[docs] def _to_internal_slices(self, index): """ Convert given index into the index used in the internal ndarray. Does NOT support end slicing and wrap around negative indices. Throw an error if computed internal indices are outside the range of the data. """ internal_index = () new_global_offsets = () if type(index) == int or type(index) == slice: index = (index,) + (slice(None),) * (len(self.shape) - 1) elif len(self.shape) > len(index): # Fill rest of dimensions of index that were not specified index = index + (slice(None),) * (len(self.shape) - len(index)) for dimension, item in enumerate(index): offset = self.global_offset[dimension] length = self.shape[dimension] if item is None: new_item = None # don't need to keep track of global offset for collapsed index else: try: start = item.start stop = item.stop if start is None: start = offset if stop is None: stop = offset + length slice_start = start - offset slice_stop = stop - offset new_item = slice(slice_start if slice_start > 0 else 0, slice_stop if slice_stop > 0 else 0, item.step) new_global_offsets += (new_item.start + offset,) except AttributeError: # Not a slice new_item = item - offset # don't need to keep track of global offset for collapsed index if new_item < 0 or new_item > length: raise IndexError('Index %s is out of bounds for axis %s with bounds [%s , %s) ' 'requested: %s bounds: %s' % ( new_item, dimension, offset, offset + length, index, self.bounds)) internal_index += (new_item,) return (internal_index, new_global_offsets)
[docs] def __getitem__(self, index): """ Access the array based on global coordinates. If we receive a tuple, it means we are slicing. When we slice, calculate the actual coordinates stored """ internal_index, new_global_offset = self._to_internal_slices(index) new_from_template = super(GlobalOffsetArray, self).__getitem__(internal_index) if hasattr(new_from_template, 'global_offset'): new_from_template.global_offset = new_global_offset if new_from_template.shape != self.shape or self.global_offset != new_global_offset: new_from_template._bounds = new_from_template.calculate_bounds() return new_from_template
[docs] def __setitem__(self, index, value): """ Access the array based on global coordinates. If we receive a tuple, it means we are slicing. When we slice, calculate the actual coordinates stored """ internal_index, _ = self._to_internal_slices(index) # use view instead of super because super will call the overriden __getitem__ function self.view(np.ndarray).__setitem__(internal_index, value)
[docs] def __str__(self): """ Overwrite string conversion to create a view instead of calling super. Super will call with the overridden __getitem__ function which will not work """ if self.global_offset is not None: return '%s, global_offset: %s' % (self.view(np.ndarray).__str__(), self.global_offset) else: return super().__str__()
[docs] def __repr__(self): """ Overwrite string conversion to create a view instead of calling super. Super will call with the overridden __getitem__ function which will not work """ return self.view(np.ndarray).__repr__()
[docs] def bounds(self): """ Get slices that are bounds of the available data """ if self._bounds is None: self._bounds = self.calculate_bounds() return self._bounds
[docs] def calculate_bounds(self): return tuple(slice(offset, offset + shape) for shape, offset in zip(self.shape, self.global_offset))
[docs] def is_contained_within(self, other): """ Check to see if this volume is contained within other """ self_bounds = self.bounds() other_bounds = other.bounds() return not any(other_slice.start > self_slice.start or self_slice.start > other_slice.stop or other_slice.start > self_slice.stop or self_slice.stop > other_slice.stop for self_slice, other_slice in zip(self_bounds, other_bounds))
[docs] def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): # noqa: C901 """ Enable injection of customized indexing for ufunc operations Must defer to the implementation of the ufunc on unwrapped values to avoid infinite loop https://docs.scipy.org/doc/numpy-1.14.0/reference/generated/numpy.lib.mixins.NDArrayOperatorsMixin.html Standard operators work normally when: * global offset and size are the same for both operands * one oeprand is fully encapsulated by another (returns a copy of the larger with the smaller added) In-place operators work normally when: * """ in_place = OUT in kwargs for x in inputs + kwargs.get(OUT, ()): # Use GlobalOffsetArray instead of type(self) for isinstance to # allow subclasses that don't override __array_ufunc__ to # handle GlobalOffsetArray objects. if not isinstance(x, self._HANDLED_TYPES + (GlobalOffsetArray,)): return NotImplemented # global offset to use in the result result = global_offset = None if len(inputs) == 2: left = inputs[0] right = inputs[1] try: smaller = larger = None left_in_right = left.is_contained_within(right) right_in_left = right.is_contained_within(left) if left_in_right and right_in_left: # same bounds/size global_offset = left.global_offset else: smaller = left if left_in_right else right larger = right if left_in_right else left sub_left = left[smaller.bounds()] sub_right = right[smaller.bounds()] sub_inputs = (sub_left, sub_right) sub_kwargs = {} if in_place: # only perform op if there are values to operate on if sub_left.size and sub_right.size: sub_kwargs[OUT] = tuple(o[right.bounds()] for o in kwargs[OUT]) getattr(ufunc, method)(*sub_inputs, **sub_kwargs) result = left global_offset = left.global_offset else: if not left_in_right and not right_in_left: raise ValueError("Non-in-place operations on overlapping GlobalOffsetArrays unsupported. " "Left bounds: %s, Right bounds: %s" % (left.bounds(), right.bounds())) # Return a copy of the larger operand and perform in place on the sub_array of that copy sample_type = type(getattr(ufunc, method)(sub_left.item(0), sub_right.item(1))) result = larger.astype(sample_type) sub_kwargs[OUT] = (result[smaller.bounds()]) sub_result = getattr(ufunc, method)(*sub_inputs, **sub_kwargs) result[smaller.bounds()] = sub_result global_offset = larger.global_offset except AttributeError: # At least one of arguments is not a GlobalOffsetArray try: global_offset = left.global_offset except AttributeError: # Left is not a GlobalOffsetArray global_offset = right.global_offset inputs = (left, right) # Must defer to the implementation of the ufunc on unwrapped values to avoid infinite loop inputs = tuple(i.view(np.ndarray) if isinstance(i, GlobalOffsetArray) else i for i in inputs) if in_place: kwargs[OUT] = tuple(o.view(np.ndarray) if isinstance(o, GlobalOffsetArray) else o for o in kwargs[OUT]) if result is None: result = getattr(ufunc, method)(*inputs, **kwargs) if type(result) is tuple: # multiple return values return tuple(type(self)(x, global_offset=global_offset) for x in result) elif method == 'at': # no return value return None else: # one return value return type(self)(result, global_offset=global_offset)