Source code for bionumpy.streams.decorators

import logging
import types

from .stream import BnpStream

logger = logging.getLogger("streamable")


[docs] class streamable: """A decorator to make a function applicable to streams of data Examples -------- >>> from bionumpy.streams import streamable, BnpStream >>> @streamable(list) ... def add(a, b): ... return a + b >>> add(10, 15) 25 >>> stream = BnpStream(iter(range(10))) >>> add(stream, 15) [15, 16, 17, 18, 19, 20, 21, 22, 23, 24] >>> @streamable(sum) ... def mul(a, b): ... return a * b >>> stream = BnpStream(iter([3, 4])) >>> mul(stream, 2) 14 """ def __init__(self, reduction: callable = None): """Take an optional reduction operator that will be called on the result stream Parameters ---------- reduction : callable A reduction function that can be called on a stream of data """ self._reduction = reduction @staticmethod def _args_stream(args, stream_indices): args = list(args) streams = tuple(args[i] for i in stream_indices) for stream_args in zip(*streams): new_args = list(args) for i, stream_arg in zip(stream_indices, stream_args): new_args[i] = stream_arg yield new_args def __call__(self, func: callable) -> callable: """Return a new function that applies the input function to all chunks in a stream Parameters ---------- func : callable A normal function that operates on one chunk of data Returns ------- callable A new function that applies the original function to every chunk in a stream of data Examples -------- FIXME: Add docs. """ def log(sequence): for i, args in enumerate(sequence): # logger.debug(f"Running {func.__name__} on chunk {i}") yield args def new_func(*args, **kwargs): """Apply a function to all chunks in a stream if any of the arguments are `BnpStream` If no arguments are BnpStream, then the function is simply called on the given `args` and `kwargs`. If however one of the arguments is a `BnpStream` the the function is applied to all chunks in the stream. If the `reduction` parameter was given, then the reduction is also called on the resulting stream. Parameters ---------- *args : **kwargs : """ stream_args = [i for i, arg in enumerate(args) if isinstance(arg, (BnpStream, types.GeneratorType))] if len(stream_args) == 0: return func(*args, **kwargs) args_stream = log(self._args_stream(args, stream_args)) stream = (func(*new_args, **kwargs) for new_args in args_stream) if self._reduction is None: return BnpStream(stream) else: return self._reduction(stream) StreamClass = args[stream_args[0]].__class__ return StreamClass((func(*new_args, **kwargs) for new_args in args_stream), dataclass=args[stream_args[0]].dataclass) return new_func