Source code for bionumpy.genomic_data.genome

import os
import numpy as np
from typing import Dict, List, Optional
from pathlib import PurePath
from ..io import bnp_open, Bed6Buffer, BedBuffer, open_indexed
from ..io.files import buffer_types, BamBuffer, BamIntervalBuffer
from ..io.indexed_files import IndexBuffer, create_index
from .genomic_track import GenomicArray
from .genomic_intervals import GenomicIntervals, GenomicLocation, LocationEntry
from .genomic_sequence import GenomicSequence
from .annotation import GenomicAnnotation
from .genome_context import GenomeContext, keep_all, ignore_underscores
from ..encoded_array import as_encoded_array
from ..bnpdataclass import replace, BNPDataClass
from ..datatypes import BedGraph, Interval
from ..util.formating import table


[docs] class Genome: ''' The entry point for working with genomic data. Supports reading data directly from file and into GenomicData objects, or creating GenomicData objects from `BNPDataClass` objects, such `Interval`, `BedGraph` or `LocationEntry`. Supports both in memory or streamed/lazy data, controlled with the `stream` keyword. For streamed data it requires that the data in the file uses the same chromosome ordering as the genome. This will usually be the same as either what is provided in the `chrom.sizes` file, or a simple alphabetic ordering. If sort order errors occur, try using `sort_names=True` when creating the `Genome` object. ''' def __init__(self, chrom_sizes: Dict[str, int], fasta_filename: str = None, sort_names=False, filter_function=lambda x: True): if isinstance(chrom_sizes, GenomeContext): self._genome_context = chrom_sizes else: if sort_names: chrom_sizes = {key: chrom_sizes[key] for key in sorted(chrom_sizes.keys())} self._genome_context = GenomeContext.from_dict(chrom_sizes, filter_function) self._fasta_filename = fasta_filename
[docs] def with_ignored_added(self, ignored: List[str]) -> 'Genome': ''' Make a new GenomeContext with additional ignored chromosomes. This is useful for allowing but ignoring chromosome names that are not in the origin genome. Parameters ---------- ignored: Iterable[str] Returns ------- Genome ''' return self.__class__(self._genome_context.with_ignored_added(ignored), self._fasta_filename)
[docs] @classmethod def from_dict(cls, chrom_sizes: Dict[str, int], *args, **kwargs) -> 'Genome': ''' Create a Genome object from a dictionary of chromosome sizes Parameters ---------- chrom_sizes: Dict[str, int] args: Additional args to be passed to the Genome constructor kwargs: Additional kwargs to be passed to the Genome constructor Returns ------- Genome Examples -------- >>> import bionumpy as bnp >>> bnp.Genome.from_dict({'chr1': 1000, 'chr2': 2000}) Genome(['chr1', 'chr2']) ''' return cls(chrom_sizes, *args, **kwargs)
[docs] @classmethod def from_file(cls, filename: str, sort_names: bool=False, filter_function=ignore_underscores) -> 'Genome': """Read genome information from a 'chrom.sizes' or 'fa.fai' file File should contain rows of names and lengths of chromosomes. If a fasta file is provided, a fasta index will be written to 'fa.fai' if not already theree. The sequence will then be available through, 'genome.read_sequence()' without any 'filename' parameter. Parameters ---------- cls : filename : str sort_names : bool Whether or not to sort the chromosome names Returns ------- 'Genome' A `Genome` object created from the read chromosome sizes Examples -------- >>> import bionumpy as bnp >>> bnp.Genome.from_file('example_data/hg38.chrom.sizes') Genome(['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chr8', 'chr9', 'chr10', '...']) """ path = PurePath(filename) suffix = path.suffixes[-1] index_file_name = path.with_suffix(path.suffix + ".fai") fasta_filename = None if suffix in (".fa", ".fasta"): if not os.path.isfile(index_file_name): bnp_open(index_file_name, "w", buffer_type=IndexBuffer).write(create_index(path)) fasta_filename = filename filename = index_file_name split_lines = (line.split()[:2] for line in open(filename)) return cls({name: int(length) for name, length in split_lines}, fasta_filename=fasta_filename, sort_names=sort_names, filter_function=filter_function)
@staticmethod def _open(filename, stream, buffer_type=None): f = bnp_open(filename, buffer_type=buffer_type) if stream: content = f.read_chunks() else: content = f.read() return content
[docs] def get_track(self, bedgraph: BedGraph) -> GenomicArray: """Create a `GenomicArray` for the data in the bedgraph Parameters ---------- bedgraph : BedGraph Returns ------- GenomicArray Examples -------- >>> import bionumpy as bnp >>> bedgraph = bnp.datatypes.BedGraph(chromosome=['chr1', 'chr1', 'chr2'], start=[0, 10, 0], stop=[5, 15, 5], value=[1, 2, 3]) >>> genome = bnp.Genome.from_dict({'chr1': 20, 'chr2': 10}) >>> genome.get_track(bedgraph) chr1: [1 1 1 1 1 0 0 0 0 0 2 2 2 2 2 0 0 0 0 0] chr2: [3 3 3 3 3 0 0 0 0 0] """ bedgraph = self._mask_data_on_extra_chromosomes(bedgraph) return GenomicArray.from_bedgraph(bedgraph, self._genome_context)
[docs] def read_track(self, filename: str, stream: bool = False) -> GenomicArray: """Read a bedgraph from file and convert it to a `GenomicArray` If `stream` is `True` then read the bedgraph in chunks and get a lazy evaluated GenomicArray Parameters ---------- filename : str Filename for the bedgraph file stream : bool Whether or not to read as stream Returns ------- GenomicArray Examples -------- >>> import bionumpy as bnp >>> genome = bnp.Genome.from_dict({'chr1': 30000, 'chr2': 31000, 'chr3': 32000}) >>> genome.read_track('example_data/small_treat_pileup.bdg') chr1: [ 0.0 0.0 1.0 ... 0.0 0.0 0.0] chr2: [ 0.0 0.0 0.0 ... 0.0 0.0 0.0] chr3: [ 0.0 0.0 0.0 ... 0.0 0.0 0.0] """ content = self._open(filename, stream) return self.get_track(content)
[docs] def get_intervals(self, intervals: Interval, stranded: bool = False) -> GenomicIntervals: """Get genomic intervals from interval data. Parameters ---------- intervals : Interval Interval's or stream of Intervals's stranded : bool Wheter or not the intervals are stranded Returns ------- GenomicIntervals Examples -------- >>> import bionumpy as bnp >>> intervals = bnp.Interval(chromosome=['chr1', 'chr1', 'chr2'], start=[0, 10, 0], stop=[5, 15, 5]) >>> genome = bnp.Genome.from_dict({'chr1': 20, 'chr2': 10}) >>> genome.get_intervals(intervals) Genomic Intervals on ['chr1', 'chr2']: Interval with 3 entries chromosome start stop chr1 0 5 chr1 10 15 chr2 0 5 """ return GenomicIntervals.from_intervals(intervals, self._genome_context, is_stranded=stranded)
[docs] def read_intervals(self, filename: str, stranded: bool = False, stream: bool = False, buffer_type=None) -> GenomicIntervals: """Read a bed file and represent it as `GenomicIntervals` If `stream` is `True` then read the bedgraph in chunks and get a lazy evaluated GenomicTrack Parameters ---------- filename : str Filename for the bed file stranded : bool Whether or not to treat the intervals as stranded stream : bool Wheter to read as a stream Returns ------- GenomicIntervals Examples -------- >>> import bionumpy as bnp >>> genome = bnp.Genome.from_file('example_data/small_sequence.fa') >>> genome.read_intervals('example_data/small_summits.bed') Genomic Intervals on ['chr1', 'chr2', 'chr3']: Interval with 13 entries chromosome start stop chr1 639 640 chr1 6023 6024 chr1 7124 7125 chr2 849 850 chr2 6320 6321 chr2 8483 8484 chr2 11342 11343 chr2 12527 12528 chr2 13092 13093 chr2 18943 18944 """ path = PurePath(filename) suffix = path.suffixes[-1] if suffix == ".gz": suffix = path.suffixes[-2] if buffer_type is None: buffer_type = buffer_types[suffix] if buffer_type == BedBuffer and stranded: buffer_type = Bed6Buffer if buffer_type == BamBuffer: buffer_type = BamIntervalBuffer content = self._open(filename, stream, buffer_type=buffer_type) return self.get_intervals(content, stranded)
[docs] def read_locations(self, filename: str, stranded: bool = False, stream: bool = False, has_numeric_chromosomes=False, buffer_type=None) -> GenomicLocation: """Read a set of locations from file and convert them to `GenomicLocation` Locations can be treated as stranded or not stranded. If `has_numeric_chromosomes` then 'chr' will be prepended to all the chromosome names in order to make them compatible with the genome's chromosome names Parameters ---------- filename : str Filename of the locations stranded : bool Whether or not the locations are stranded stream : bool Whether the data should be read as a stream has_numeric_chromosomes : bool Whether the file has the chromosomes as numbers Returns ------- GenomicLocation (Stranded) GenomicLocation Examples -------- >>> import bionumpy as bnp >>> genome = bnp.Genome.from_file('example_data/hg38.chrom.sizes') >>> genome.read_locations('example_data/thousand_genomes.vcf', has_numeric_chromosomes=True) Genomic Locations on ['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chr8', 'chr9', 'chr10', '...']: LocationEntry with 74 entries chromosome position chr21 5033883 chr21 5035657 chr21 5038297 chr21 5038312 chr21 5052250 chr21 5053935 chr21 5053961 chr21 5063903 chr21 5063916 chr21 5064678 """ assert not (stream and has_numeric_chromosomes) assert not stranded, "Stranded locations are not supported yet" f = bnp_open(filename, buffer_type=buffer_type) data = f.read_chunks() if not stream: data_list = list(data) if len(data_list) == 0: data = LocationEntry.empty() else: data = np.concatenate(data_list) return self.get_locations(data, has_numeric_chromosomes=has_numeric_chromosomes)
def _mask_data_on_extra_chromosomes(self, data, chromosome_field_name='chromosome'): if (not isinstance(data, BNPDataClass)) or len(data)==0: return data encoded_chromosomes = self._genome_context.encoding.encode(getattr(data, chromosome_field_name)) data = replace(data, **{chromosome_field_name: encoded_chromosomes}) mask = self._genome_context.is_included(encoded_chromosomes) return data[mask]
[docs] def get_locations(self, data: LocationEntry, has_numeric_chromosomes=False) -> GenomicLocation: """Create `GenomicLocation`s from location entries Parameters ---------- data : LocationEntry BNPDataClass of location entries Returns ------- GenomicLocation Examples -------- >>> import bionumpy as bnp >>> genome = bnp.Genome.from_file('example_data/small_sequence.fa') >>> genome.get_locations(bnp.datatypes.LocationEntry(chromosome=['chr1', 'chr1', 'chr2'], position=[0, 10, 0])) Genomic Locations on ['chr1', 'chr2', 'chr3']: LocationEntry with 3 entries chromosome position chr1 0 chr1 10 chr2 0 """ if has_numeric_chromosomes: data = replace(data, chromosome=as_encoded_array(['chr'+chromosome.to_string() for chromosome in data.chromosome])) data = self._mask_data_on_extra_chromosomes(data) return GenomicLocation.from_data(data, self._genome_context)
[docs] def read_sequence(self, filename: Optional[str] = None) -> GenomicSequence: """Read the genomic sequence from file. If a `fasta` file was used to create the Genome object, `filename` can be `None` in which case the sequence will be read from that fasta file Parameters ---------- filename : str Returns ------- GenomicSequence Examples -------- >>> import bionumpy as bnp >>> genome = bnp.Genome.from_file('example_data/small_sequence.fa') >>> genome.read_sequence() GenomicSequence over chromosomes: ['chr1', 'chr2', 'chr3'] >>> genome = bnp.Genome.from_file('example_data/small.chrom.sizes') >>> genome.read_sequence('example_data/small_sequence.fa') GenomicSequence over chromosomes: ['chr1', 'chr2', 'chr3'] """ if filename is None: assert self._fasta_filename is not None filename = self._fasta_filename return GenomicSequence.from_indexed_fasta(open_indexed(filename), genome_context=self._genome_context)
[docs] def read_annotation(self, filename: str) -> GenomicAnnotation: """Read genomic annotation from a 'gtf' or 'gff' file The annotation file should have 'gene', 'transcript' and 'exon' entries. Parameters ---------- filename : str Filename of the annotation file Returns ------- GenomicAnnotation `GenomicAnnotation` containing the genes, transcripts and exons Examples -------- >>> import bionumpy as bnp >>> genome = bnp.Genome.from_file('example_data/small_sequence.fa') >>> genome.read_annotation('example_data/small.gtf') GenomicAnnotation(genome_context=['chr1', 'chr2', 'chr3'], data=GTFEntry with 5 entries chromosome source feature_type start stop score strand phase atributes chr1 knownGene transcript 17369 17436 . - . gene_id "ENST0000061921 chr1 knownGene exon 17369 17436 . - . gene_id "ENST0000061921 chr1 knownGene transcript 29554 31097 . + . gene_id "ENST0000047335 chr1 knownGene exon 29554 30039 . + . gene_id "ENST0000047335 chr1 knownGene exon 30564 30667 . + . gene_id "ENST0000047335) """ gtf_entries = self._open(filename, stream=False) return GenomicAnnotation.from_gtf_entries(gtf_entries, self._genome_context)
def __repr__(self): return f"{self.__class__.__name__}(" + repr(self._genome_context) + ")" def __str__(self): return table( ((key, value) for key, value in self._genome_context.chrom_sizes.items() if '_' not in key), headers=["Chromosome", "Size"])
[docs] def get_genome_context(self) -> GenomeContext: ''' Get the genome context of the Genome Returns ------- GenomeContext ''' return self._genome_context
@property def size(self) -> int: '''The size of the genome''' return self._genome_context.size