Source code for bionumpy.arithmetics.intervals

from typing import List
from numbers import Number
import numpy.typing as npt
import dataclasses
from operator import itemgetter
import numpy as np
from npstructures import RunLength2dArray, RunLengthArray
from bionumpy.encodings.string_encodings import StringEncoding
from .bedgraph import BedGraph
from .. import streamable
from ..streams.grouped import chromosome_map
from ..datatypes import Interval
from ..bnpdataclass import bnpdataclass
from ..util import interleave
from ..bnpdataclass import replace


class GenomicRunLengthArray(RunLengthArray):
    def to_array(self) -> np.ndarray:
        """Convert the runlength array to a normal numpy array

        Returns
        -------
        np.ndarray
        """
        if len(self) == 0:
            return np.empty_like(self._values, shape=(0,))
        values = np.asarray(self._values)
        if values.dtype == np.float64:
            values = values.view(np.uint64)
        elif values.dtype == np.float32:
            values = values.view(np.uint32)
        elif values.dtype == np.float16:
            values = values.view(np.uint16)
        array = np.zeros_like(values, shape=len(self))
        op = np.logical_xor if array.dtype == bool else np.bitwise_xor
        diffs = op(values[:-1], values[1:])
        array[self._starts[1:]] = diffs
        array[self._starts[0]] = values[0]
        op.accumulate(array, out=array)
        return array.view(self._values.dtype)

    @classmethod
    def from_intervals(cls, starts: npt.ArrayLike, ends: npt.ArrayLike, size: int, values: npt.ArrayLike = True, default_value=0) -> 'GenomicRunLengthArray':
        """Constuct a runlength array from a set of intervals and values

        Parameters
        ----------
        starts : ArrayLike
        ends : ArrayLike
        size : int
        values : ArrayLike
        default_value :

        Returns
        -------
        'RunLengthArray'
        """
        
        assert np.all(ends > starts), (ends[ends<=starts], starts[ends <= starts])
        assert np.all(starts[1:] >= ends[:-1])
        prefix = [0] if (len(starts) == 0 or starts[0] != 0) else []
        postfix = [size] if (len(ends) == 0 or ends[-1] != size) else []

        events = np.empty(len(prefix) + len(postfix) + starts.size + ends.size, dtype=int)
        if len(prefix):
            events[0] = prefix[0]
        if len(postfix):
            events[len(prefix):-1:2] = starts
            events[len(prefix)+1:-1:2] = ends
            events[-1] = postfix[0]
        else:
            events[len(prefix)::2] = starts
            events[len(prefix)+1::2] = ends

        tmp = values
        if isinstance(values, Number):
            values = np.empty(2*(events.size//2+1), dtype=np.array(values).dtype)
            values[::2] = default_value
            values[1::2] = tmp
        else:
            values = interleave(np.broadcast(default_value, values.shape), values)
            if ends[-1] != size:
                values = np.append(values, default_value)

        if (len(starts) > 0) and (starts[0] == 0):
            values = values[1:]

        values = values[:(len(events)-1)]
        return cls(events, values, do_clean=True)

    @classmethod
    def from_bedgraph(cls, bedgraph, size=None):
        if len(bedgraph) == 0:
            assert size is not None
            return cls(np.array([0, size], dtype=int),
                       np.array([0]))
        # assert bedgraph.start[0] == 0, bedgraph
        missing_idx = np.flatnonzero(bedgraph.start[1:] != bedgraph.stop[:-1])
        if len(missing_idx):
            start = np.insert(bedgraph.start, missing_idx+1, bedgraph.stop[missing_idx])
            value = np.insert(bedgraph.value, missing_idx+1, 0)
        else:
            start, value = (bedgraph.start, bedgraph.value)
        if size is not None:
            assert bedgraph.stop[-1] <= size, (bedgraph.stop[-1], size)
        if (size is None) or (size == bedgraph.stop[-1]):
            events = np.append(start, bedgraph.stop[-1])
            values = value
        else:
            events = np.append(start, [bedgraph.stop[-1], size])
            values = np.append(value, 0)
        if events[0] != 0:
            events = np.insert(events, 0, 0)
            values = np.insert(values, 0, 0)
        return cls(events, values)

    def to_bedgraph(self, sequence_name):
        return BedGraph([sequence_name]*len(self.starts), self.starts,
                        self.ends, self.values)

    @classmethod
    def from_rle(cls, rle):
        return cls(rle._events, rle._values)

    def extract_intervals(self, intervals):
        return self[intervals]


@bnpdataclass
class RawInterval:
    start: int
    stop: int


[docs] def get_pileup(intervals: Interval, chromosome_size: int) -> GenomicRunLengthArray: """Get the number of intervals that overlap each position of the chromosome/contig This uses run length encoded arrays to handle the sparse data that we get from intervals. Parameters ---------- intervals : Interval, Intervals on the same chromosome/contig chromosome_size : int size of the chromsome/contig Examples -------- >>> from bionumpy.datatypes import Interval >>> from bionumpy.arithmetics import get_boolean_mask, get_pileup >>> intervals = Interval(["chr1", "chr1", "chr1"], [3, 5, 10], [8, 7, 12]) >>> pileup = get_pileup(intervals, 20) >>> print(pileup) [0 0 0 1 1 2 2 1 0 0 1 1 0 0 0 0 0 0 0 0] """ if len(intervals) == 0: return GenomicRunLengthArray(np.array([0, chromosome_size], dtype=int), np.array([0], dtype=int)) rla = RunLength2dArray.from_intervals(intervals.start, intervals.stop, chromosome_size) return GenomicRunLengthArray.from_rle(rla.sum(axis=0))
[docs] def get_boolean_mask(intervals: Interval, chromosome_size: int): """Get a boolean mask representing where any inteval hits Uses run length encoded binary arrays to represent the areas covered by any interval. The mask that is returned supports numpy ufuncs, so that you can run logical operations on them s.a. `& | ~` and also numpy indexing so you can use it to filter positions and intervals. Parameters ---------- intervals : Interval Intervals on the same chromosome/contig chromosome_size : int The size of the chromosome/contig Examples -------- >>> intervals = Interval(["chr1", "chr1", "chr1"], [3, 5, 10], [8, 7, 12]) >>> print(intervals) Interval with 3 entries chromosome start stop chr1 3 8 chr1 5 7 chr1 10 12 >>> mask = get_boolean_mask(intervals, 20) >>> print(mask.astype(int)) [0 0 0 1 1 1 1 1 0 0 1 1 0 0 0 0 0 0 0 0] Get complement of the mask: >>> complement = ~mask >>> print(complement.astype(int)) [1 1 1 0 0 0 0 0 1 1 0 0 1 1 1 1 1 1 1 1] Get the intersections (`&`) and union (`|`) of the mask and another mask >>> other_mask = get_boolean_mask(Interval(["chr1"], [9], [15]), 20) >>> intersection = mask & other_mask >>> print(intersection.astype(int)) [0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0] >>> union = mask | other_mask >>> print(union.astype(int)) [0 0 0 1 1 1 1 1 0 1 1 1 1 1 1 0 0 0 0 0] Find wether some positions overlap the mask: >>> print(other_mask[intervals.start]) [False False True] """ assert np.all(intervals.stop <= chromosome_size), (np.max(intervals.stop), chromosome_size) if len(intervals) == 0: return GenomicRunLengthArray.from_intervals(np.array([], dtype=int), np.array([], dtype=int), int(chromosome_size), default_value=False) merged = merge_intervals(intervals[np.argsort(intervals.start)]) m = merged.start != merged.stop return GenomicRunLengthArray.from_intervals(merged.start[m], merged.stop[m], size=chromosome_size, default_value=False)
# merged.start, merged.stop, def human_key_func(chrom_name): assert chrom_name.startswith("chr"), chrom_name parts = chrom_name[3:].split("_", maxsplit=1) assert len(parts) <= 2, chrom_name is_numeric = 1-parts[0].isdigit() b = parts[0] if is_numeric else int(parts[0]) c = parts[-1] if len(parts) == 2 else "" return (is_numeric, b, c)
[docs] def sort_intervals(intervals: Interval, chromosome_key_function: callable = lambda x: x, sort_order: List[str] = None) -> Interval: """Sort intervals on "chromosome", "start", "stop" Parameters ---------- intervals : Interval Unsorted intervals Returns ------- Interval Sorted intervals """ if hasattr(intervals.chromosome, 'encoding') and isinstance(intervals.chromosome.encoding, StringEncoding): args = np.lexsort((intervals.start, intervals.chromosome)) return intervals[args] if sort_order is not None: chromosome_key_function = {name: i for i, name in enumerate(sort_order)}.__getitem__ s = sorted((chromosome_key_function(interval.chromosome.to_string()), interval.start, interval.stop, i) for i, interval in enumerate(intervals)) indices = list(map(itemgetter(-1), s)) return intervals[indices]
@chromosome_map() def merge_intervals(intervals: Interval, distance: int = 0) -> Interval: """Merge a set of sorted intervals Merges any overlapping intervals into a single interval. If `distance` is specified, will merge any intervals that are closer than `distance` apart. Parameters ---------- intervals : Interval The sorted intervals to be merged distance : int Max distance for merging Return -------- Merged Intervals """ if len(intervals) == 0: return intervals assert np.all(intervals.start[:-1] <= intervals.start[1:]), "merge_intervals requires intervals sorted on start position" stops = np.maximum.accumulate(intervals.stop) if distance > 0: stops += distance valid_start_mask = intervals.start[1:] > stops[:-1] # intervals[:-1].stop start_mask = np.concatenate(([True], valid_start_mask)) stop_mask = np.concatenate((valid_start_mask, [True])) new_interval = intervals[start_mask] new_interval.stop = stops[stop_mask] if distance > 0: new_interval.stop -= distance assert np.all(new_interval.start[1:] > new_interval.stop[:-1]), np.sum(new_interval.start[1:] > new_interval.stop[:-1]) return new_interval @chromosome_map(reduction=sum) def count_overlap(intervals_a, intervals_b): starts = np.concatenate([intervals_a.start, intervals_b.start]) stops = np.concatenate([intervals_a.stop, intervals_b.stop]) starts.sort(kind="mergesort") stops.sort(kind="mergesort") return np.sum(np.maximum(stops[:-1]-starts[1:], 0)) @streamable() def intersect(intervals_a, intervals_b): all_intervals = np.concatenate([intervals_a, intervals_b]) all_intervals = all_intervals[np.argsort(all_intervals.start, kind="mergesort")] stops = np.sort(all_intervals.stop, kind="mergesort") mask = stops[:-1] > all_intervals.start[1:] result = all_intervals[1:][mask] result.stop = stops[:-1][mask] return result @streamable() def global_intersect(intervals_b, intervals_a): all_intervals = np.concatenate([intervals_a, intervals_b]) all_intervals = all_intervals[np.lexsort((all_intervals.start, all_intervals.chromosome))] stops = all_intervals.stop[np.lexsort((all_intervals.stop, all_intervals.chromosome))] mask = stops[:-1] > all_intervals.start[1:] result = all_intervals[1:][mask] result.stop = stops[:-1][mask] return result def unique_intersect(intervals_a, intervals_b, genome_size): genome_mask = get_boolean_mask(intervals_b, genome_size) entry_mask = genome_mask[intervals_a].any(axis=-1) return intervals_a[entry_mask] @chromosome_map() def extend(intervals, both=None, forward=None, reverse=None, left=None, right=None): directed = (forward is not None) or (reverse is not None) undirected = (left is not None) or (right is not None) assert sum([both is not None, directed, undirected]) == 1 if both is not None: return dataclasses.replace(intervals, start=intervals.start-both, stop=intervals.stop+both) if undirected: starts = interval.start-left if left is not None else interval.start stops = interval.stop+right if right is not None else interval.stop dataclasses.replace(intervals, start=starts, stop=stops) if directed: if forward is None: forward = 0 if reverse is None: reverse = 0 return dataclasses.replace( intervals, start = np.where(interval.strand=="+", intervals.start-reverse, intervals.start-forward), stop = np.where(interval.strand=="+", intervals.stop+forward, intervals.start+reverse) ) def extend_to_size(intervals: Interval, fragment_length: int, chromosome_size: np.ndarray) -> Interval: """Extend/shrink intervals to match the given length. Stranded For intervals on the + strand, keep the start coordinate and adjust the stop coordinate. For - intervals keep the stop coordinate and adjust the start Parameters ---------- intervals : Interval fragment_length : int Returns ------- Interval """ is_forward = intervals.strand.ravel() == "+" start = np.where(is_forward, intervals.start, np.maximum(intervals.stop-fragment_length, 0)) stop = np.where(is_forward, np.minimum(intervals.start+fragment_length, chromosome_size), intervals.stop) return dataclasses.replace( intervals, start=start, stop=stop) def pileup(intervals): chroms = np.concatenate([intervals.chromosome, intervals.chromosome]) positions = np.concatenate((intervals.start, intervals.stop)) args = np.argsort(positions, kind="mergesort") values = np.where(args >= len(intervals), -1, 1) np.cumsum(values, out=values) positions = positions[args] intervals = np.lib.stride_tricks.sliding_window_view(positions, 2) mask = np.flatnonzero(intervals[:, 0] == intervals[:, 1]) intervals = np.delete(intervals, mask, axis=0) values = np.delete(values, mask) mask = np.flatnonzero(values[1:] == values[:-1]) values = np.delete(values, mask) starts = np.delete(intervals[:, 0], mask+1) stops = np.delete(intervals[:, 1], mask) return BedGraph(chroms[:values.size-1], starts, stops, values[:-1]) def clip(intervals: Interval, chrom_sizes) -> Interval: """Clip intervals so that all intervals are contained in their corresponding chromosome Parameters ---------- intervals : Interval Returns ------- Interval """ return replace( intervals, start=np.maximum(0, intervals.start), stop=np.minimum(chrom_sizes, intervals.stop))