Source code for bionumpy.genomic_data.genomic_track

import numpy as np
from abc import abstractmethod
from .genomic_data import GenomicData
from .genome_context_base import GenomeContextBase
from ..computation_graph import StreamNode, ComputationNode
from ..datatypes import Interval, BedGraph
from ..arithmetics.intervals import GenomicRunLengthArray
from npstructures import RunLengthRaggedArray
from typing import List, Union, Iterable, Tuple, Dict, Any


[docs] class GenomicArray(GenomicData): ''' Class for representing data on a genome. ''' @property def genome_context(self): return self._genome_context @abstractmethod def __array_ufunc__(self, ufunc: callable, method: str, *inputs, **kwargs) -> 'GenomicArray': return NotImplemented @abstractmethod def __array_function__(self, func: callable, types: List, args: List, kwargs: Dict) -> Any: return NotImplemented @abstractmethod def sum(self, axis: int = None) -> float: return NotImplemented def to_bedgraph(self) -> BedGraph: return NotImplemented
[docs] @classmethod def from_global_data(cls, global_pileup: GenomicRunLengthArray, genome_context: GenomeContextBase) -> 'GenomicArray': """Create the genomic array from data represened on a flat/concatenated genome. Parameters ---------- global_pileup : GenomicRunLengthArray Array over the concatenated genome genome_context : GenomeContextBase The genome context Returns ------- 'GenomicArray' """ return GenomicArrayGlobal(global_pileup, genome_context)
def _get_intervals_from_data(self, name, data): if data.dtype == bool: return Interval([name]*len(data.starts), data.starts, data.ends)[data.values] else: return BedGraph([name]*len(data.starts), data.starts, data.ends, data.values)
[docs] @classmethod def from_bedgraph(cls, bedgraph: BedGraph, genome_context: GenomeContextBase) -> 'GenomicData': """Create a genomic array from a bedgraph and genomic context Parameters ---------- bedgraph : BedGraph Bedgraph with data along the genome genome_context : GenomeContextBase The genome context Returns ------- 'GenomicData' """ if isinstance(bedgraph, BedGraph): go = genome_context.global_offset gi = go.from_local_interval(bedgraph) rle = GenomicRunLengthArray.from_bedgraph(gi, go.total_size()) return cls.from_global_data(rle, genome_context) filled = genome_context.iter_chromosomes(bedgraph, BedGraph) # filled = fill_grouped(groupby(bedgraph, 'chromosome'), chrom_sizes.keys(), BedGraph) interval_stream = StreamNode(filled) return GenomicArrayNode(ComputationNode(GenomicRunLengthArray.from_bedgraph, [interval_stream, StreamNode(iter(genome_context.chrom_sizes.values()))]), genome_context)
class GenomicArrayGlobal(GenomicArray, np.lib.mixins.NDArrayOperatorsMixin): ''' Class for holding the the enitre genomic array in memory ''' def __init__(self, global_track: GenomicRunLengthArray, genome_context: GenomeContextBase): assert isinstance(global_track, GenomicRunLengthArray), global_track self._global_track = global_track self._genome_context = genome_context @property def dtype(self): return self._global_track.dtype def _index_boolean(self, idx): assert idx.dtype == bool assert isinstance(idx, GenomicArrayGlobal) return self._global_track[idx._global_track] def sum(self) -> float: return self._global_track.sum(axis=None) def extract_chromsome(self, chromosome: Union[str, List[str]]) -> Union[GenomicRunLengthArray, RunLengthRaggedArray]: """Get the data on one or more chromosomes Parameters ---------- chromosome : Union[str, List[str]] A chromosome name or a list of chromosome names Returns ------- Union[GenomicRunLengthArray, RunLengthRaggedArray] A runlengtharray for a single chromosome or a RunLengthRaggedArray for a list of chromosomes """ assert isinstance(chromosome, str) offset = self._genome_context.global_offset.get_offset([chromosome])[0] return self._global_track[offset:offset + self._genome_context.global_offset.get_size([chromosome])[0]] def __repr__(self) -> str: lines = [] for name, _ in zip(self._genome_context.chrom_sizes, range(10)): lines.append(f'{name}: {self[name]}') if len(self._genome_context.chrom_sizes) > 10: lines.extend('...') return '\n'.join(lines) def to_dict(self) -> Dict[str, GenomicRunLengthArray]: go = self._genome_context.global_offset names = go.names() offsets = go.get_offset(names) sizes = go.get_size(names) return {name: self._global_track[offset:offset+size].to_array() for name, offset, size in zip(names, offsets, sizes)} def __array_ufunc__(self, ufunc: callable, method: str, *inputs, **kwargs): inputs = [(i._global_track if isinstance(i, GenomicArrayGlobal) else i) for i in inputs] r = self._global_track.__array_ufunc__(ufunc, method, *inputs, **kwargs) return self.__class__(r, self._genome_context) def __array_function__(self, func: callable, types: List, args: List, kwargs: Dict): """Handles any numpy array functions called on a raggedarray Parameters ---------- func : callable types : List args : List kwargs : Dict """ args = [(i._global_track if isinstance(i, GenomicArrayGlobal) else i) for i in args] if func == np.histogram: return np.histogram(*args, **kwargs) if func == np.sum: return self.sum(*args[1:], **kwargs) return NotImplemented def get_data(self) -> Union[Interval, BedGraph]: """The data of the array represented in BNPDataClass Returns ------- Union[Interval, BedGraph] If boolean mask,return the intervals where True. Else a bedgraph containing the data """ go = self._genome_context._global_offset names = go.names() starts = go.get_offset(names) stops = starts+go.get_size(names) intervals_list = [] for name, start, stop in zip(names, starts, stops): assert isinstance(self._global_track, GenomicRunLengthArray) data = self._global_track[start:stop] assert isinstance(data, GenomicRunLengthArray) intervals_list.append(self._get_intervals_from_data(name, data)) return np.concatenate(intervals_list) def extract_intervals(self, intervals: Union[Interval, 'GenomicIntervals'], stranded: bool = False) -> RunLengthRaggedArray: """Extract the data contained in a set of intervals Parameters ---------- intervals : Interval stranded : bool Returns ------- RunLengthRaggedArray """ global_intervals = self._genome_context.global_offset.from_local_interval(intervals) rle = self._global_track[global_intervals] if not stranded: return rle r = rle[:, ::-1] return np.where((intervals.strand.ravel() == '+')[:, np.newaxis], rle, r) def extract_locations(self, locations: 'GenomicLocation', stranded=None) -> RunLengthRaggedArray: """Extract the data contained in a set of intervals Parameters ---------- intervals : Interval stranded : bool Returns ------- RunLengthRaggedArray """ global_intervals = self._genome_context.global_offset.from_local_coordinates(locations.chromosome, locations.position) return self._global_track[global_intervals] # if not stranded: # return rle # r = rle[:, ::-1] # return np.where((intervals.strand.ravel() == '+')[:, np.newaxis], # rle, r) @classmethod def from_dict(cls, d: Dict[str, GenomicRunLengthArray], genome_context: GenomeContextBase) -> 'GenomicData': """Create genomic data from a dict of data with chromosomes as keys Parameters ---------- d : Dict[str, GenomicRunLengthArray] Returns ------- 'GenomicData' """ # chrom_sizes = {name: array.size for name, array in d.items()} array = np.concatenate(list(d.values())) return cls(array, genome_context) @classmethod def from_stream(cls, stream: Iterable[Tuple[str, GenomicRunLengthArray]], genome_context: GenomeContextBase) -> 'GenomicData': return cls.from_dict(dict(stream)) class GenomicArrayNode(GenomicArray, np.lib.mixins.NDArrayOperatorsMixin): ''' Class for representing a genomic array, but only keeping data for one chromosome in memory at a time ''' def __str__(self): return 'GTN:' + str(self._run_length_node) __repr__ = __str__ def __init__(self, run_length_node: ComputationNode, genome_context: GenomeContextBase): self._run_length_node = run_length_node self._chrom_name_node = StreamNode(iter(genome_context.chrom_sizes.keys())) self._genome_context = genome_context def __array_ufunc__(self, ufunc: callable, method: str, *inputs, **kwargs): args = [gtn._run_length_node if isinstance(gtn, GenomicArrayNode) else gtn for gtn in inputs] return self.__class__(ufunc(*args, **kwargs), self._genome_context) def get_data(self) -> ComputationNode: """Get the data from the array in a lazy BNPDataClass Returns ------- ComputationNode A lazy compution node representing the data from the array """ return ComputationNode(self._get_intervals_from_data, [self._chrom_name_node, self._run_length_node]) def _index_boolean(self, idx): assert isinstance(idx, GenomicArrayNode) return ComputationNode(lambda a, i: a[i], [self._run_length_node, idx]) def extract_intervals(self, intervals: Interval, stranded: bool = False) -> ComputationNode: """Extract the data on a set of intervals. Returns a `ComputationNode` representing the extracted data. Parameters ---------- intervals : Interval The set of intervals to extract data from stranded : bool Whether or not to treat the intervals as stranded Returns ------- ComputationNode: A computation node representing the raggedarray containing data from all the intervals """ def stranded_func(ra: GenomicRunLengthArray, start: int, stop: int, strand: str): assert np.all(stop <= ra.size), (np.max(stop), ra.size) rle = ra[start:stop] r = rle[:, ::-1] return np.where((strand == '+')[:, np.newaxis], rle, r) def unstranded_func(ra: GenomicRunLengthArray, start: int, stop: int): assert np.all(stop <= ra.size), (np.max(stop), ra.size) return ra[start:stop] assert self.genome_context.is_compatible(intervals.genome_context), (self.genome_context, intervals.genome_context) intervals = intervals.as_stream() if stranded: return ComputationNode(stranded_func, [self._run_length_node, intervals.start, intervals.stop, intervals.strand]) return ComputationNode(unstranded_func, [self._run_length_node, intervals.start, intervals.stop]) def extract_chromsome(self, chromosome: Union[str, List[str]]) -> 'GenomicData': assert False def from_stream(cls, stream: Iterable[Tuple[str, GenomicRunLengthArray]], genome_context: GenomeContextBase) -> 'GenomicData': stream_node = StreamNode((array for _, array in stream)) return cls(stream_node, genome_context) @classmethod def from_dict(cls, d: Dict[str, GenomicRunLengthArray], genome_context: GenomeContextBase) -> 'GenomicData': """Create genomic data from a dict of data with chromosomes as keys Parameters ---------- d : Dict[str, GenomicRunLengthArray] Returns ------- 'GenomicData' """ # chrom_sizes = {name: array.size for name, array in d.items()} stream_node = StreamNode(iter(d.values())) return cls(stream_node, genome_context) def to_dict(self): assert False def sum(self, axis=None) -> float: return np.sum(self._run_length_node) def __array_function__(self, func: callable, types: List, args: List, kwargs: Dict): """Handles any numpy array functions called on a raggedarray Parameters ---------- func : callable types : List args : List kwargs : Dict """ if func == np.histogram: return np.histogram(args[0]._run_length_node, *args[1:], **kwargs) return NotImplemented