Source code for bionumpy.sequence.kmers

import numpy as np
from ..streams import streamable
from .count_encoded import count_encoded, EncodedCounts
from ..encodings.exceptions import EncodingError
from ..encodings.kmer_encodings import KmerEncoding
from .rollable import RollableFunction
from bionumpy.encodings import DNAEncoding
from ..encodings.alphabet_encoding import AlphabetEncoding
from ..encoded_array import EncodedArray, EncodedRaggedArray, BaseEncoding, change_encoding
from ..encoded_array import as_encoded_array
from npstructures.bitarray import BitArray
from ..util import as_strided, is_subclass_or_instance
import logging
logger = logging.getLogger(__name__)


class KmerEncoder(RollableFunction):
    def __init__(self, k, alphabet_encoding):
        self.window_size = k
        self._k = k
        self._encoding = alphabet_encoding
        self._alphabet_size = alphabet_encoding.alphabet_size
        self._convolution = self._alphabet_size ** np.arange(self._k)

    def __call__(self, sequence: EncodedArray) -> int:
        sequence = as_encoded_array(sequence, target_encoding=self._encoding)
        return EncodedArray(sequence.data.dot(self._convolution), KmerEncoding(self._encoding, self._k))

    def inverse(self, kmer_hash: int) -> EncodedArray:
        return EncodedArray(((kmer_hash[:, np.newaxis] // self._convolution) % self._alphabet_size), self._encoding)

    def sample_domain(self, n):
        return EncodedArray((np.random.randint(0, self._alphabet_size, size=self._k * n).reshape(n, self._k)), self._encoding)


[docs] def get_kmers(sequence: EncodedRaggedArray, k: int) -> EncodedArray: """ Get kmers for sequences. Sequences should be encoded with an AlphabetEncoding (e.g. DNAEncoding). Use bnp.change_encoding if your sequences do not have a suitable encoding. Parameters ---------- sequence : EncodedRaggedArray Sequences to get kmers from k : int The kmer size (1-31) Returns ------- EncodedRaggedArray Kmers from the sequences. Examples -------- >>> import bionumpy as bnp >>> sequences = bnp.encoded_array.as_encoded_array(["ACTG", "AAA", "TTGGC"], bnp.DNAEncoding) >>> bnp.sequence.get_kmers(sequences, 3) encoded_ragged_array([[ACT, CTG], [AAA], [TTG, TGG, GGC]], 3merEncoding(AlphabetEncoding('ACGT'))) >>> sequences = bnp.open("example_data/big.fq.gz").read().sequence >>> sequences = bnp.change_encoding(sequences, bnp.DNAEncoding) >>> bnp.sequence.get_kmers(sequences, 31)[0, 0:3] # first three kmers of first sequence encoded_array([CGGTAGCCAGCTGCGTTCAGTATGGAAGATT, GGTAGCCAGCTGCGTTCAGTATGGAAGATTT, GTAGCCAGCTGCGTTCAGTATGGAAGATTTG], 31merEncoding(AlphabetEncoding('ACGT'))) """ assert 0 < k < 32, "k must be larger than 0 and smaller than 32" if sequence.encoding == BaseEncoding: try: sequence = change_encoding(sequence, DNAEncoding) except EncodingError: logging.error("Tried to change encoding of sequences to DNAEncoding, but failed. " "Make sure your sequences are valid DNA, only containing A, C, G, and T") raise assert is_subclass_or_instance(sequence.encoding, AlphabetEncoding), \ "Sequence needs to be encoded with an AlphabetEncoding, e.g. DNAEncoding. " \ "Change encoding of your sequences by using e.g. bnp.change_encoding(sequences, bnp.DNAEncoding)" if sequence.encoding.alphabet_size == 4: # use the faster _get_dna_kmers return _get_dna_kmers(sequence, k) return KmerEncoder(k, sequence.encoding).rolling_window(sequence)
def convolution(func): def new_func(_sequence, window_size, *args, **kwargs): shape, sequence = (_sequence.shape, _sequence.ravel()) convoluted = func(sequence, window_size, *args, **kwargs) if not isinstance(shape[-1], np.ndarray): out = as_strided(convoluted, shape) else: out = EncodedRaggedArray(convoluted, shape, safe_mode=False) # FIXME: Should not use unsafe here return out[..., : (-window_size + 1)] return new_func @convolution def _get_dna_kmers(sequence, k): """ A faster version alternative to get_kmers that can be used with sequences encoded with an AlphabetEncoding with alphabet size **exactly equal to 4**. This function is called by get_kmers when alphabet size is 4 to get some speedup. See get_kmers for documentation on use. """ assert 0 < k < 32, "k must be larger than 0 and smaller than 32" assert isinstance(sequence, EncodedArray), sequence assert is_subclass_or_instance(sequence.encoding, AlphabetEncoding) assert sequence.encoding.alphabet_size == 4, \ "Only supported for sequence encoded with an AlphabetEncoding with alphabetsize 4" bit_array = BitArray.pack(sequence.data, bit_stride=2) hashes = bit_array.sliding_window(k) assert hashes.dtype == np.uint64 hashes = hashes.view(np.int64) output_encoding = KmerEncoding(sequence.encoding, k) return EncodedArray(hashes, output_encoding) @streamable(sum) def count_kmers(sequence: EncodedRaggedArray, k: int, axis=None) -> EncodedCounts: """Count the occurance of each kmer in a set of sequences For each possible kmer, count the number of occurances in a (set of) sequence(s) Parameters ---------- sequence : EncodedRaggedArray k : int Length of the kmer axis : int """ kmers = get_kmers(sequence, k) return count_encoded(kmers, axis=axis)