import numpy as np
from typing import Dict, List, Tuple, Union
from numpy.typing import ArrayLike, DTypeLike
from numbers import Number
from .util import unsafe_extend_left, unsafe_extend_right, unsafe_extend_left_2d
from .raggedarray import RaggedArray
from .mixin import NPSArray, NPSIndexable
from .raggedarray.raggedslice import ragged_slice
import logging
logger = logging.getLogger("RunLengthArray")
def get_ra_func(name):
return lambda ragged_array, *args, **kwargs: getattr(ragged_array, name)(
*args, **kwargs
)
HANDLED_FUNCTIONS = {np.all: get_ra_func('all'),
np.sum: get_ra_func('sum'),
np.any: get_ra_func('any'),
np.mean: get_ra_func('mean')}
def implements(np_function):
"Register an __array_function__ implementation for RunLengthArray objects."
def decorator(func):
HANDLED_FUNCTIONS[np_function] = func
return func
return decorator
@implements(np.histogram)
def histogram(rla, bins=10, range=None, density=None, weights=None):
lens = np.diff(rla._events)
if weights is None:
weights = lens
else:
weights = lens*weights
return np.histogram(rla._values, bins, range, density, weights)
@implements(np.concatenate)
def concatenate(rl_arrays):
sizes = [array.size for array in rl_arrays]
offsets = np.insert(np.cumsum(sizes), 0, 0)
events = np.concatenate(
[offset + array.starts for array, offset in zip(rl_arrays, offsets)] + [offsets[-1:]])
values = np.concatenate([array.values for array in rl_arrays])
return rl_arrays[0].__class__(events, values)
class IndexableMixin:
def _step_subset(self, step, indices, values):
if step < 0:
indices = indices[..., -1][..., np.newaxis]-indices[..., ::-1]
values = values[..., ::-1]
step_size = abs(step)
if step_size != 1:
indices = (indices+step_size-1)//step_size
indices, values = self.remove_empty_intervals(indices, values)
return indices, values
def _getitem_tuple(self, raw_idx):
if len(raw_idx) == 1:
return self[raw_idx[0]]
idx = tuple(r for r in raw_idx if r is not Ellipsis)
assert len(idx) < 3, idx
if len(idx) == 0:
return self
if len(idx) == 1 and raw_idx[0] is Ellipsis:
idx = (slice(None),) + idx
elif len(idx) == 1 and raw_idx[1] is Ellipsis:
idx = idx + (slice(None),)
if len(idx) == 2:
row_idx, col_idx = idx
rows = self[row_idx]
if isinstance(row_idx, Number):
return rows[col_idx]
rows._indices.ravel()
rows._values.ravel()
if len(rows) == 0:
return rows
if isinstance(col_idx, Number):
if col_idx < 0:
col_idx = (rows._indices[:, -1]+col_idx)[:, np.newaxis]
mask = (rows._indices[:, :-1] <= col_idx) & (rows._indices[:, 1:] > col_idx)
return rows._values[mask]
elif isinstance(col_idx, slice):
start, stop, step = (col_idx.start, col_idx.stop, col_idx.step)
if step is None:
step = 1
s = slice(start, stop, np.sign(step))
is_reverse = step < 0
start_col, stop_col = (None, None)
if (start is not None and stop is not None):
if (is_reverse and (start <= stop) and start > 0) or (not is_reverse and (start >= stop) and stop>0):
return self.__class__(np.zeros((len(rows), 1), dtype=int), np.empty((len(rows), 0), dtype=int))
if stop is not None:
if stop >= 0:
stop = np.minimum(rows.shape[-1], stop)[:, np.newaxis]
else:
stop = np.maximum(0, rows.shape[-1]+stop)[:, np.newaxis]
if is_reverse:
mask = (rows._indices[..., :-1] <= stop+1) & (rows._indices[..., 1:] > stop+1)
start_col = rows._indices.shape[-1]-1# np.full(len(rows), -1, dtype=int)# np.zeros(len(rows), dtype=int)
r, c = np.nonzero(mask)
start_col[r] = c
else:
mask = (rows._indices[..., 1:] >= stop) & (rows._indices[..., :-1] < stop)
stop_col = np.full(len(rows), -1, dtype=int)
r, c = np.nonzero(mask)
stop_col[r] = c
if start is not None:
if start >= 0:
start = np.minimum(rows.shape[-1], start)[:, np.newaxis]
else:
start = np.maximum(0, rows.shape[-1]+start)[:, np.newaxis]
if is_reverse:
mask = (rows._indices[..., 1:] >= start+1) & (rows._indices[..., :-1] < start+1)
_, stop_col = np.nonzero(mask)
else:
mask = (rows._indices[..., :-1] <= start) & (rows._indices[..., 1:] > start)
_, start_col = np.nonzero(mask)
is_empty = None
if start_col is None and stop_col is None:
i, v = rows._indices, rows._values
else:
s = start_col # if start_col is not None else None
e = stop_col+2 if stop_col is not None else None
e2 = stop_col+1 if stop_col is not None else None
if s is not None and e is not None:
is_empty = (s >= e) #changed from ==
e = np.maximum(s+1, e)
e2 = np.maximum(s, e2)
i = ragged_slice(rows._indices, starts=s,
ends=e)
v = ragged_slice(rows._values, starts=s, # start_col if start_col is not None else None,
ends=e2)
if is_reverse:
if start is not None:
i[:, -1] = (start+1).ravel()
if stop is not None:
i[:, 0] = (stop+1).ravel()
else:
if stop is not None:
i[:, -1] = stop.ravel()
if start is not None:
i[:, 0] = start.ravel()
i = i - i[:, 0][:, np.newaxis]
i, v = rows._step_subset(step, i, v)
if is_empty is not None:
i[is_empty, 0] = 0
return self.__class__(i, v)
if isinstance(idx[0], np.ndarray):
row_idx, col_idx = (np.asanyarray(x).ravel() for x in idx)
tmp_array = self.__class__(self._indices[row_idx], self._values[row_idx], row_len=self._row_len)
values = [rla[c] for rla, c in zip(tmp_array, col_idx.ravel())]
if isinstance(idx[0], np.ndarray):
values = np.reshape(values, idx[0].shape)
return values
assert not isinstance(idx, tuple), (raw_idx, idx)
return self[idx]
def __getitem__(self, raw_idx: Union[Tuple, int, List[int], ArrayLike]):
if isinstance(raw_idx, tuple):
return self._getitem_tuple(raw_idx)
idx = raw_idx
self._values.ravel()
self._indices.ravel()
events = self._indices[idx]
values = self._values[idx]
if isinstance(events, RaggedArray):
assert self._row_len is None or isinstance(self._row_len, Number), self._row_len
return self.__class__(events, values, self._row_len)
if self._row_len is not None:
assert self._row_len is None or isinstance(self._row_len, Number), self._row_len
events = np.append(self._indices[idx], self._row_len)
return RunLengthArray(events, values)
[docs]
class RunLengthArray(NPSIndexable, np.lib.mixins.NDArrayOperatorsMixin):
"""Class for Run Length arrays
This is used to represent data where changes in values are
rare. Behaves much like a normal NumPy array.
Should be constructed using one of the classmethods, not initialized directly
"""
def __init__(self, events, values, do_clean=False):
self._events, self._starts = (events, values)
self._events = events# .view(NPSArray)
self._starts = events[:-1]
self._ends = events[1:]
self._values = values# .view(NPSArray)
assert events[0] == 0, events
assert len(events) == len(values)+1, (events, values, len(events), len(values))
assert np.all(events[1:] > events[:-1]), f"Empty run not allowed in RunLenghtArray (use remove_empty_intervals): {events}"
def __array__(self, dtype=None):
return np.asanyarray(self.to_array(), dtype=dtype)
def __len__(self) -> int:
if len(self._ends) == 0:
return 0
return self._ends[-1]
def __str__(self) -> str:
if self.size <= 1000:
return str(self.to_array())
return f"[ {' ' .join(str(c) for c in self[:3].to_array())} ... {' '.join(str(c) for c in self[-3:].to_array())}]"
def __repr__(self) -> str:
if self.size <= 1000:
return repr(self.to_array())
@property
def size(self) -> int:
"""
Size of the array
"""
return self._ends[-1]
@property
def shape(self) -> Tuple[int]:
return (self.size,)
@property
def ndim(self) -> int:
return 1
@property
def dtype(self) -> DTypeLike:
return self._values.dtype
@property
def starts(self) -> ArrayLike:
"""
All idxs where the value changes
"""
return self._events[:-1]
@property
def ends(self) -> ArrayLike:
"""All indices that ends a run of equal values
"""
return self._events[1:]
@property
def values(self) -> ArrayLike:
"""All values that are not equal to previous value
Returns
-------
ArrayLike
"""
return self._values
[docs]
@classmethod
def from_array(cls, array: ArrayLike) -> 'RunLengthArray':
"""Construct a RunLengthArray from a normal numpy array
Parameters
----------
array : ArrayLike
Normal numpy array
Returns
-------
'RunLengthArray'
"""
array = np.asanyarray(array)
mask = unsafe_extend_left(array) != unsafe_extend_right(array)
mask[0], mask[-1] = (True, True)
indices = np.flatnonzero(mask)
values = array[indices[:-1]]
return cls(indices, values)
@classmethod
def __from_intervals(cls, starts: ArrayLike, ends: ArrayLike, size: int , values: ArrayLike = True, default_value=0) -> 'RunLengthArray':
"""Constuct a runlength array from a set of intervals and values
Parameters
----------
starts : ArrayLike
ends : ArrayLike
size : int
values : ArrayLike
default_value :
Returns
-------
'RunLengthArray'
"""
assert np.all(ends > starts)
assert np.all(starts[1:] > ends[:-1])
prefix = [0] if (len(starts) == 0 or starts[0] != 0) else []
postfix = [size] if (len(ends) == 0 or ends[-1] != size) else []
events = np.concatenate([np.array(prefix, dtype=int), np.vstack((starts, ends)).T.ravel(), np.array(postfix, dtype=int)])
if isinstance(values, Number):
values = np.tile([default_value, values], events.size//2+1)
else:
values = np.vstack([np.broadcast(default_value, values.shape), values]).T.ravel()
if ends[-1] != size:
values = np.append(values, default_value)
if (len(starts) > 0) and starts[0] == 0:
values = values[1:]
values = values[:(len(events)-1)]
return cls(events, values, do_clean=True)
[docs]
def to_array(self) -> np.ndarray:
"""Convert the runlength array to a normal numpy array
Returns
-------
np.ndarray
"""
if len(self) == 0:
return np.empty_like(self._values, shape=(0,))
values = np.asarray(self._values)
if values.dtype == np.float64:
values = values.view(np.uint64)
elif values.dtype == np.float32:
values = values.view(np.uint32)
elif values.dtype == np.float16:
values = values.view(np.uint16)
array = np.zeros_like(values, shape=len(self))
op = np.logical_xor if array.dtype == bool else np.bitwise_xor
diffs = op(values[:-1], values[1:])
assert np.all(self._starts[1:] > self._starts[:-1])
array[self._starts[1:]] = diffs
array[self._starts[0]] = values[0]
tmp = array.copy()
op.accumulate(array, out=tmp)
# array = op.accumulate(array, dtype=array.dtype)
return tmp.view(self._values.dtype)
def __array_function__(self, func: callable, types: List, args: List, kwargs: Dict):
if func not in HANDLED_FUNCTIONS:
return NotImplemented
if not all(issubclass(t, RunLengthArray) for t in types):
return NotImplemented
return HANDLED_FUNCTIONS[func](*args, **kwargs)
def __array_ufunc__(self, ufunc: callable, method: str, *inputs, **kwargs):
"""Handle numpy unfuncs called on the runlength array
Currently only handles '__call__' modes and unary and binary functions
Parameters
----------
ufunc : callable
method : str
*inputs :
**kwargs :
"""
if method not in ("__call__"):
return NotImplemented
if len(inputs) == 1:
return self.__class__(self._events, ufunc(self._values))
assert len(inputs) == 2, f"Only unary and binary operations supported for runlengtharray {len(inputs)}"
if isinstance(inputs[1], Number):
return self.__class__(self._events, ufunc(self._values, inputs[1]))
elif isinstance(inputs[0], Number):
return self.__class__(self._events, ufunc(inputs[0], self._values))
return self._apply_binary_func(*inputs, ufunc)
def sum(self, axis=-1, out=None):
return np.sum(np.diff(self._events)*self._values)
[docs]
def any(self, axis=-1, out=None):
"""TODO, this can be sped up by assuming no empty runs"""
return np.any(self._values)
[docs]
def all(self, axis=-1, out=None):
"""TODO, this can be sped up by assuming no empty runs"""
return np.all(self._values)
def max(self, axis=-1, out=None):
return self._values.max()
def astype(self, dtype: DTypeLike) -> 'RunLengthArray':
return self.__class__(self._events, self._values.astype(dtype))
def mean(self, axis=-1, dtype=None, out=None):
assert dtype is None and out is None
if np.issubdtype(self.dtype, np.integer):
return self.astype(float).mean(axis, dtype, out)
return self.sum()/self.size
[docs]
@staticmethod
def remove_empty_intervals(events: ArrayLike, values: ArrayLike, delete_first: bool = True) -> Tuple[ArrayLike, ArrayLike]:
"""Remove any empty runs from a pair of indices and values.
Should be run before creating a runlength array from the
events and values
Parameters
----------
events : ArrayLike
values : ArrayLike
delete_first : bool
Returns
-------
Tuple[ArrayLike, ArrayLike]
"""
mask = np.flatnonzero(events[:-1] == events[1:])
if not delete_first:
mask += 1
return np.delete(events, mask), np.delete(values, mask)
[docs]
@staticmethod
def join_runs(events: ArrayLike, values: ArrayLike) -> Tuple[ArrayLike, ArrayLike]:
"""Join succesive runs with the same value
Parameters
----------
events : ArrayLike
values : ArrayLike
Returns
-------
Tuple[ArrayLike, ArrayLike]
"""
mask = np.flatnonzero(values[1:] == values[:-1])+1
return np.delete(events, mask), np.delete(values, mask)
@classmethod
def _apply_binary_func(cls, first, other, ufunc):
logging.debug(f"Applying ufunc {ufunc} to rla with {first._values.size} values")
assert len(first) == len(other), (first, other)
others_corresponding = np.searchsorted(first._events, other._events[1:-1], side="right")-1
new_values_other = ufunc(first._values[others_corresponding], other._values[1:])
first_corresponding = np.searchsorted(other._events, first._events[1:-1], side="right")-1
new_values_first = ufunc(first._values[1:], other._values[first_corresponding])
events = np.concatenate([first._events[:-1], other._events[1:]])
values = np.concatenate([[ufunc(first._values[0], other._values[0])], new_values_first, new_values_other])
args = np.argsort(events, kind="mergesort")
return first.__class__(*first.join_runs(*first.remove_empty_intervals(events[args], values[args[:-1]])))
def __apply_binary_func(self, other, ufunc):
assert len(self) == len(other), (self, other)
all_events = np.concatenate([self._starts, other._starts])
args = np.argsort(all_events, kind="mergesort")
all_events = all_events[args]
result_dtype = np.result_type(*(i.dtype for i in (self._values, other._values)))
temp_dtype = result_dtype
if result_dtype == np.float64:
temp_dtype = np.uint64
elif result_dtype == np.float32:
temp_dtype = np.uint32
o_values = other._values.astype(result_dtype).view(temp_dtype)
m_values = self._values.astype(result_dtype).view(temp_dtype)
other_d = unsafe_extend_left(o_values[:-1]) ^ o_values
other_d[0] = o_values[0]
values = np.zeros((2, len(all_events)), dtype=temp_dtype)
my_d = unsafe_extend_left(m_values)[:-1] ^ m_values
my_d[0] = m_values[0]
values[1, args >= self._starts.size] = other_d
values[0, args < self._starts.size] = my_d
values = np.bitwise_xor.accumulate(values, axis=-1)# , out=values)
values = values.view(result_dtype)
assert np.all(values[0][args < self._starts.size]==self._values), (values[0][args < self._starts.size], self._values)
assert np.all(values[1][args >= self._starts.size]==other._values), (values[1][args >= self._starts.size], other._values)
sum_values = ufunc(values[0], values[1])
rm_idxs = np.flatnonzero(all_events[:-1] == all_events[1:])
all_events = np.delete(all_events, rm_idxs)
sum_values = np.delete(sum_values, rm_idxs)
return self.__class__(np.append(all_events, self._events[-1]), sum_values)
def _get_position(self, idx):
idx = np.where(idx < 0, len(self)+idx, idx)
return self._values[np.searchsorted(self._events, idx, side="right")-1]
def _ragged_slice(self, starts, stops):
return RunLengthRaggedArray(*self._start_to_end(starts, stops))
def _get_slice(self, s: slice) -> 'RunLengthArray':
step = 1 if s.step is None else s.step
is_reverse = step < 0
start = 0
end = len(self)
if is_reverse:
start, end = (end-1, start-1)
if s.start is not None:
if s.start < 0:
start = len(self)+s.start
else:
start = s.start
if s.stop is not None:
if s.stop < 0:
end = len(self)+s.stop
else:
end = s.stop
if is_reverse:
start, end = (end+1, start+1)
if start >= end:
return self.__class__(np.array([0]), np.empty_like(self._values, shape=(0,)))
subset = self.__class__(*self._start_to_end(start, end))
assert(len(subset) == (end-start)), (subset, start, end, s)
if step != 1:
subset = subset._step_subset(step)
return subset
def _step_subset(self, step: int):
"""
[0, 1, 2, 3, 4] step=3
i=[0,1,2,3,4,5], v=[0, 1, 2, 3, 4]
/3[0,0,0,1,1,1]
->[0,1,1,1,2,2] -> (i+(step-1)
i=[0, 1, 2], v=[0, 3]
[0, 0, 0], step=2
indices=[0, 1, 2, 3], values = [0, 0, 0]
//2 = [0, 0, 1, 1] (+1//2)= [0, 1, 1, 2], (-1//2+1) =
[0, 1], [0, 0]
[0, 2]
"""
step_size = abs(step)
indices, values = (self._events, self._values)
if step < 0:
indices, values = (indices[-1] - indices[::-1], values[::-1])
indices = (indices+step_size-1)//step_size
indices, values = self.remove_empty_intervals(indices, values)# , delete_first=False)
indices, values = self.join_runs(indices, values)
return self.__class__(indices, values)
def _start_to_end(self, start, end):
start_idx = np.searchsorted(self._events, start, side="right")-1
end_idx = np.searchsorted(self._events, end, side="left")
if isinstance(start_idx, np.ndarray):
values = ragged_slice(self._values, start_idx, end_idx)
else:
values = self._values[start_idx:end_idx]
sub = start[:, np.newaxis] if isinstance(start, np.ndarray) else start
if isinstance(start_idx, np.ndarray):
events = ragged_slice(self._events, start_idx, end_idx+1)-sub
else:
events = self._events[start_idx:end_idx+1]-sub
events[..., 0] = 0
events[..., -1] = end-start
return events, values
def _getitem_bool(self, idx: 'RunLengthArray'):
starts = idx.starts[idx.values]
ends = idx.ends[idx.values]
return RunLengthRaggedArray(*self._start_to_end(starts, ends)).ravel()
def __getitem__(self, idx: Union[Tuple, List[int], Number, ArrayLike, slice]):
try:
return super().__getitem__(idx)
except ValueError:
pass
if isinstance(idx, tuple):
idx = tuple(i for i in idx if i is not Ellipsis)
assert len(idx) <= 1, idx
if len(idx) == 0:
return self
if len(idx) == 1:
idx = idx[0]
if isinstance(idx, list):
idx = np.asanyarray(idx)
if isinstance(idx, Number):
return self._get_position(idx)
if isinstance(idx, np.ndarray):
if idx.dtype == bool:
idx = np.flatnonzero(idx)
return self._get_position(idx)
if isinstance(idx, slice):
return self._get_slice(idx)
if isinstance(idx, RunLengthArray):
assert idx.dtype == bool
return self._getitem_bool(idx)
if idx is Ellipsis:
return self
assert False, f"Invalid index for {self.__class__}: {idx}"
[docs]
class RunLength2dArray(IndexableMixin, np.lib.mixins.NDArrayOperatorsMixin):
''' Multiple RunLengthArrays of the same size. Behaves like a 2d numpy array'''
def __init__(self, indices: RaggedArray, values: RaggedArray, row_len: int=None):
self._values = values
self._indices = indices
self._row_len = row_len
def __str__(self) -> str:
return "\n".join(str(row) for row in self[:min(20, len(self))])
@property
def shape(self) -> Tuple[int]:
return (len(self._indices), self._row_len)
@property
def ndim(self) -> int:
return 2
@property
def size(self) -> int:
return self.shape[0]*self.shape[1]
@property
def dtype(self) -> DTypeLike:
return self._values.dtype()
[docs]
def to_array(self) -> ArrayLike:
''' Convert to a normal 2d numpy array '''
return np.array([row.to_array() for row in self])
def __len__(self) -> int:
return len(self._indices)
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
"""Handle numpy unfuncs called on the runlength array
Currently only handles '__call__' modes and unary and binary functions
Parameters
----------
ufunc : callable
method : str
*inputs :
**kwargs :
"""
if method not in ("__call__"):
return NotImplemented
if len(inputs) == 1:
return self.__class__(self._indices, ufunc(self._values), self._row_len)
assert len(inputs) == 2
if isinstance(inputs[1], (Number, np.ndarray)):
return self.__class__(self._indices, ufunc(self._values, inputs[1]), self._row_len)
elif isinstance(inputs[0], (Number, np.ndarray)):
return self.__class__(self._indices, ufunc(self._values, inputs[0]), self._row_len)
return NotImplemented
def any(self, axis: int = None, out=None) -> ArrayLike:
if axis in (0, -2):
return self._col_any()
return self._values.any(axis=axis)
def all(self, axis: int= None, out=None) -> ArrayLike:
return self._values.all(axis=axis)
def sum(self, axis: int = None, out=None) -> ArrayLike:
if axis in (0, -2):
return self._col_sum()
assert (axis == -1 or axis is None)
lens = (self._indices[:, 1:]-self._indices[:, :-1])
if self._row_len is None:
return np.sum(self._values*lens, axis=-1)
internal_sum = np.sum(self._values[:, :-1] * lens, axis=-1)
return internal_sum + self._values[:, -1]*(self._row_len-self._indices[:, -1])
def _col_sum(self):
positions = self._indices.ravel()
L = np.max(positions) if self._row_len is None else self._row_len
args = np.argsort(positions, kind="mergesort")
if self._row_len is None:
c = np.concatenate([self._values, np.zeros((len(self), 1),
dtype=self._values.dtype)], axis=-1)
values = c.ravel()
else:
values = self._values.ravel()
assert len(values) == len(positions), (values, positions)
if np.issubdtype(values.dtype, np.integer):
if np.issubdtype(values.dtype, np.signedinteger):
values = values.astype(int)
else:
values = values.astype(np.uint64)
values = np.diff(unsafe_extend_left(values))
values[np.insert(np.cumsum(self._indices.lengths), 0, 0)[:-1]] = self._values[:, 0]
values = values[args]
np.cumsum(values, out=values)
positions = positions[args]
positions = np.append(positions, L)
return RunLengthArray(*RunLengthArray.remove_empty_intervals(positions, values))
def _col_any(self):
values = self._values.astype(bool)
indices, values = self.join_runs(self._indices, values)
assert not np.any(values[:, :-1] == values[:, 1:]), values
starts = indices.ravel()[values.ravel()]
starts.sort(kind="mergesort")
ends = indices[:, 1:].ravel()[~values[:, 1:].ravel()]
ends.sort(kind="mergesort")
ends = np.maximum.accumulate(ends)
ends = np.pad(ends, (0, starts.size-ends.size), constant_values=self._row_len)
valid_mask = unsafe_extend_right(starts) > unsafe_extend_left(ends)
valid_mask[[0, -1]] = True
starts = starts[valid_mask[:-1]]
ends = ends[valid_mask[1:]]
indices = np.array([starts, ends]).T.ravel()
values = np.tile([True, False], len(starts))
if len(starts) == 0 or starts[0] != 0:
indices = np.insert(indices, 0, 0)
values = np.insert(values, 0, False)
if indices[-1] == self._row_len:
values = values[:-1]
else:
indices = np.append(indices, self._row_len)
return RunLengthArray(indices, values)
[docs]
@classmethod
def from_array(cls, array: ArrayLike) -> 'RunLength2dArray':
"""Construct a Runlength2dArray from a normal 2d numpy array
Parameters
----------
array : ArrayLike
Returns
-------
'RunLength2dArray'
"""
array = np.asanyarray(array)
mask = unsafe_extend_left_2d(array)[:, :-1] != array
mask[:, 0] = True
mask[:, -1] = True
indices = np.flatnonzero(mask)
values = array.ravel()[indices]
indices = RaggedArray(indices, mask.sum(axis=-1))
values = RaggedArray(values, indices._shape)
indices = indices-indices[:, 0][:, np.newaxis]
return cls(indices, values, array.shape[-1])
@staticmethod
def join_runs(indices: ArrayLike, values: ArrayLike) -> Tuple[ArrayLike, ArrayLike]:
mask = unsafe_extend_left(values.ravel())[:-1] != values.ravel()
mask[values._shape.starts] = True
shape = RaggedArray(mask, values._shape).sum(axis=-1)
indices = RaggedArray(indices.ravel()[mask], shape)
values = RaggedArray(values.ravel()[mask], shape)
return indices, values
[docs]
@classmethod
def from_intervals(cls, starts: ArrayLike, ends: ArrayLike, row_len: int, value=1) -> 'RunLength2dArray':
"""Construct a RunLength2dArray from a set of intervals
Each interval becomes a row in the 2d array
Parameters
----------
cls :
starts : ArrayLike
ends : ArrayLike
row_len : int
value :
Returns
-------
'RunLength2dArray'
"""
starts_after_zero = starts > 0
ends_before_end = ends < row_len
value = np.asanyarray(value)
shape = starts_after_zero + 1 + ends_before_end
indices = RaggedArray(np.zeros(shape.sum(), int), shape)
values = RaggedArray(np.zeros(shape.sum(), value.dtype), shape)
indices[np.arange(len(starts)), starts_after_zero.astype(int)] = starts
values[np.arange(len(starts)), starts_after_zero.astype(int)] = value
indices[ends_before_end, -1] = ends[ends_before_end]
return cls(indices, values, row_len)
[docs]
class RunLengthRaggedArray(RunLength2dArray, IndexableMixin):
"""Multiple row-lenght arrays of differing lengths"""
@property
def shape(self) -> Tuple[int]:
if self._row_len is None:
return (len(self._indices), self._indices[..., -1])
return (len(self._indices), self._row_len)
[docs]
@staticmethod
def remove_empty_intervals(events: ArrayLike, values: ArrayLike) -> Tuple[ArrayLike, ArrayLike]:
"""Remove any empty runs from a pair of indices and values.
Should be run before creating a runlength array from the
events and values
Parameters
----------
events : ArrayLike
values : ArrayLike
delete_first : bool
Returns
-------
Tuple[ArrayLike, ArrayLike]
"""
values.ravel()
mask = events[..., :-1] != events[..., 1:]
mask2 = np.ones_like(events, dtype=bool)
mask2[..., 1:] = mask
values = values[mask]
events = events[mask2]
return RaggedArray(events, mask2.sum(axis=-1)), RaggedArray(values, mask.sum(axis=-1))
def __get_col_reverse(self, indices, values):
return (indices[..., -1][:, None] - indices[..., ::-1], values[..., ::-1])
[docs]
def to_array(self, ragged=True):
self._values.ravel()
self._indices.ravel()
l = [row.to_array() for row in self]
if ragged:
return RaggedArray(l)
return np.array(l)
[docs]
@classmethod
def from_array(cls, array):
return cls.from_ragged_array(
RaggedArray.from_numpy_array(np.asanyarray(array)))
[docs]
@classmethod
def from_ragged_array(cls, ragged_array: RaggedArray) -> 'RunLengthRaggedArray':
"""Construct runlengtharray from a RaggedArray
Parameters
----------
ragged_array : RaggedArray
Returns
-------
RunLengthRaggedArray
"""
data = ragged_array.ravel()
mask = np.insert(data[:-1] != data[1:], 0, True)
mask[ragged_array._shape.starts] = True
indices = np.flatnonzero(mask)
tmp = np.cumsum(unsafe_extend_left(mask))
row_lens = tmp[ragged_array._shape.ends]-tmp[ragged_array._shape.starts]
values = data[indices]
s = np.concatenate([indices, ragged_array._shape.ends])
s.sort(kind='mergesort')
indices = RaggedArray(s, row_lens+1)
start_indices = indices[:, 0][:, np.newaxis]
indices = indices-start_indices
return cls(indices,
RaggedArray(values, row_lens))# , ragged_array._shape.lengths)
def max(self, axis=-1, **kwargs):
assert axis in (-1, 1)
return self._values.max(axis=-1, **kwargs)
def argmax(self, axis=-1, **kwargs):
assert axis in (-1, 1)
m = self.max(axis=-1, keepdims=True)
rows, cols = np.nonzero(self._values == m)
_, idxs = np.unique(rows, return_index=True)
return self._indices[np.arange(len(idxs)), cols[idxs]]
def mean(self, axis=-1, **kwargs):
if axis in (0, -2):
return self.sum(axis=0)/self.col_counts()
s = self.sum(axis=-1)
l = self._row_len
if self._row_len is None:
l = self._indices[:, -1]
return s/l
def __array_function__(self, func: callable, types: List, args: List, kwargs: Dict):
if func == np.where:
condition, x, y = args
return self.__class__(np.where(x._indices._broadcast_rows(condition), x._indices, y._indices),
np.where(x._values._broadcast_rows(condition), x._values, y._values))
if func in (np.max, np.amax):
return func(args[0]._values, *args[1:], **kwargs)
if func == np.concatenate:
return rlra_concatenate(*args, **kwargs)
if func == np.sum:
return self.sum(*args[1:], **kwargs)
if func == np.mean:
return self.mean(*args[1:], **kwargs)
return NotImplemented
def col_counts(self):
indices, counts = np.unique(self.shape[-1], return_counts=True)
values = len(self)-np.insert(np.cumsum(counts), 0, 0)
indices = np.insert(indices, 0, 0)
return RunLengthArray(indices, values[:-1])
def ravel(self):
offsets = np.insert(
np.cumsum(self._indices[:, -1]),
0, 0)
indices = (self._indices[:, :-1]+offsets[:-1, np.newaxis]).ravel()
indices = np.append(indices, offsets[-1])
values = self._values.ravel()
return RunLengthArray(indices, values)
@property
def size(self) -> int:
return self.shape[-1].sum()
def rlra_concatenate(rl_ragged_arrays):
return RunLengthRaggedArray(
np.concatenate([a._indices for a in rl_ragged_arrays]),
np.concatenate([a._values for a in rl_ragged_arrays]))