Source code for bionumpy.io.indexed_fasta

from pathlib import Path
from typing import Union

import numpy as np
from ..encoded_array import EncodedArray, as_encoded_array, EncodedRaggedArray
from .multiline_buffer import FastaIdxBuffer, FastaIdx
from ..datatypes import Interval
from .files import bnp_open
from ..encodings import BaseEncoding
from ..encodings.string_encodings import StringEncoding


def read_index(filename: str) -> dict:
    """Read a fa.fai into a nested dict

    Parameters
    ----------
    filename : str
        The filename for the fasta index

    Returns
    -------
    dict
        nested dict. chromosome names to dicts of index values

    """
    split_lines = (line.split("\t") for line in open(filename))
    return {chromosome.split()[0]:
            {"rlen": int(rlen), "offset": int(offset),
             "lenc": int(lenc), "lenb": int(lenb)}
            for chromosome, rlen, offset, lenc, lenb in split_lines}


def create_index(filename: str) -> FastaIdx:
    """Create a fasta index for a fasta file

    Parameters
    ----------
    filename : str
        Filename of the fasta file

    Returns
    -------
    FastaIdx
        Fasta index as bnpdataclass

    """

    reader = bnp_open(filename, buffer_type=FastaIdxBuffer)
    indice_builders = list(reader.read_chunks())
    offsets = np.cumsum([0]+[idx.byte_size[0] for idx in indice_builders])
    return np.concatenate([
        FastaIdx(idx.chromosome,
                 idx.length,
                 idx.start+offset,
                 idx.characters_per_line,
                 idx.line_length)
        for idx, offset in zip(indice_builders, offsets)])


[docs] class IndexedFasta: """ Class representing an indexed fasta file. Behaves like dict of chrom names to sequences """ def __init__(self, filename: Union[str, Path]): if isinstance(filename, str): filename = Path(filename) self._filename = filename self._index = read_index(filename.with_suffix(filename.suffix + ".fai")) self._f_obj = open(filename, "rb") self._index_table = FastaIdx.from_entry_tuples( [(name, var['rlen'], var['offset'], var['lenc'], var['lenb']) for name, var in self._index.items()])# if '_' not in name])
[docs] def get_contig_lengths(self) -> dict: """Return a dict of chromosome names to seqeunce lengths Returns ------- dict chromosome name to sequence length mapping """ return {name: values["lenc"] for name, values in self._index.items()}
def keys(self): return self._index.keys() def values(self): return (self[key] for key in self.keys()) def items(self): return ((key, self[key]) for key in self.keys()) def __repr__(self): return f"Indexed Fasta File with chromosome sizes: {self.get_contig_lengths()}" def __getitem__(self, chromosome: str) -> EncodedArray: """Return entire sequence of the given chromosome Parameters ---------- chromosome : str chromsome name Returns ------- EncodedArray The sequence for that chromoeme """ idx = self._index[chromosome] lenb, rlen, lenc = (idx["lenb"], idx["rlen"], idx["lenc"]) n_rows = (rlen + lenc - 1) // lenc data = np.empty(lenb * n_rows, dtype=np.uint8) bytes_to_read = (n_rows - 1) * lenb + (rlen - (n_rows - 1) * lenc) self._f_obj.seek(idx["offset"]) self._f_obj.readinto(data[:bytes_to_read]) assert np.all(data[:bytes_to_read] > 0), data[:bytes_to_read] data = data.reshape(n_rows, lenb) ret = data[:, :lenc].ravel()[:rlen] assert np.all(ret[:rlen] > 0), ret assert ret.size == idx["rlen"], ( ret.size, idx["rlen"], ret.size - idx["rlen"], data.shape, ) return EncodedArray(ret, BaseEncoding) return EncodedArray(((ret - ord("A")) % 32) + ord("A"), BaseEncoding) def _get_interval_sequences_fast(self, intervals: Interval) -> EncodedRaggedArray: tmp = {name: self._index[name] for name in intervals.chromosome.encoding.get_labels()} index_table = FastaIdx.from_entry_tuples( [(name, var['rlen'], var['offset'], var['lenc'], var['lenb']) for name, var in tmp.items()]) pre_alloc = np.empty((intervals.stop-intervals.start).sum(), dtype=np.uint8) chromosome_i = intervals.chromosome.raw() indices: FastaIdx = index_table[chromosome_i] start_rows = intervals.start//indices.characters_per_line start_mods = intervals.start % indices.characters_per_line start_offsets = start_rows*indices.line_length+start_mods stop_rows = intervals.stop // indices.characters_per_line stop_offsets = stop_rows*indices.line_length+intervals.stop % indices.characters_per_line read_starts = indices.start + start_offsets read_lengths = stop_offsets-start_offsets lengths = intervals.stop-intervals.start n_rows = stop_rows-start_rows offsets = np.insert(np.cumsum(lengths), 0, 0) for read_start, read_length, n_row, start_mod, lenb, a_offset in zip( read_starts, read_lengths, n_rows, start_mods, indices.line_length, offsets): self._f_obj.seek(read_start) r_sequence = np.frombuffer(self._f_obj.read(read_length), dtype=np.uint8) sequence = np.delete(r_sequence, [lenb*(j+1)-1-start_mod for j in range(n_row)]) # assert not np.any(sequence == 10), (np.flatnonzero(r_sequence==10), [lenb*(j+1)-1-start_mod # for j in range(n_row)], read_start, read_length, n_row, start_mod, lenb, a_offset, intervals[i:i+1], indices[i:i+1]) pre_alloc[a_offset:a_offset+sequence.size] = sequence a = EncodedArray(pre_alloc, BaseEncoding) return EncodedRaggedArray(a, lengths)
[docs] def get_interval_sequences(self, intervals: Interval) -> EncodedRaggedArray: """Get the seqeunces for a set of genomic intervals Parameters ---------- intervals : Interval Intervals Returns ------- EncodedRaggedArray Sequences """ if isinstance(intervals.chromosome.encoding, StringEncoding): return self._get_interval_sequences_fast(intervals) lengths = [] cur_offset = 0 pre_alloc = np.empty((intervals.stop-intervals.start).sum(), dtype=np.uint8) alloc_offset = 0 for interval in intervals: chromosome = interval.chromosome.to_string() idx = self._index[chromosome] lenb, rlen, lenc = (idx["lenb"], idx["rlen"], idx["lenc"]) start_row = interval.start//lenc start_mod = interval.start % lenc start_offset = start_row*lenb+start_mod stop_row = interval.stop // lenc stop_offset = stop_row*lenb+interval.stop % lenc self._f_obj.seek(idx["offset"] + start_offset) lengths.append(stop_offset-start_offset-(stop_row-start_row)) D = stop_offset-start_offset tmp = np.frombuffer(self._f_obj.read(stop_offset-start_offset), dtype=np.uint8) tmp = np.delete(tmp, [lenb*(j+1)-1-start_mod for j in range(stop_row-start_row)]) pre_alloc[alloc_offset:alloc_offset+tmp.size] = tmp alloc_offset += tmp.size cur_offset += stop_offset-start_offset assert alloc_offset == pre_alloc.size, (alloc_offset, pre_alloc.size) assert np.all(pre_alloc> 0), np.sum(pre_alloc==0) # s = np.delete(np.array(sequences, dtype=np.uint8), delete_indices) #s = np.delete(pre_alloc[:alloc_offset], delete_indices) a = EncodedArray(pre_alloc, BaseEncoding) return EncodedRaggedArray(a, lengths)