Source code for bionumpy.io.one_line_buffer

import dataclasses
from typing import List

import numpy as np

from .dump_csv import get_column
from ..encoded_array import EncodedArray, BaseEncoding, change_encoding, EncodedRaggedArray
from ..bnpdataclass import bnpdataclass
from ..datatypes import SequenceEntry
from .exceptions import FormatException
from .file_buffers import FileBuffer, TextBufferExtractor, IncompleteEntryException, NEWLINE, TextThroughputExtractor


class OneLineBuffer(FileBuffer):
    """ Base class for file formats where data fields are contained in lines."""

    n_lines_per_entry = 2
    _buffer_divisor = 32
    _line_offsets = (1, 0)
    _empty_lines = []

    def __init__(self, buffer_extractor: TextBufferExtractor):
        self._buffer_extractor = buffer_extractor
        self._is_validated = True

    @property
    def n_lines(self):
        return len(self._buffer_extractor)*self.n_lines_per_entry

    @property
    def data(self):
        return self._buffer_extractor.data

    @classmethod
    def _get_buffer_extractor(cls, data, new_lines):
        tmp = np.insert(new_lines, 0, -1)+1
        field_ends = new_lines.reshape(-1, cls.n_lines_per_entry)
        field_ends = cls._modify_for_carriage_return(field_ends, data)
        field_starts = tmp[:-1].reshape(-1, cls.n_lines_per_entry)+(np.array(cls._line_offsets))
        entry_starts = tmp[:-1:cls.n_lines_per_entry]
        entry_ends = tmp[::cls.n_lines_per_entry][1:]

        return TextThroughputExtractor(data,
                                       field_starts,
                                       field_ends=field_ends,
                                       entry_starts=entry_starts,
                                       entry_ends=entry_ends)

    @classmethod
    def contains_complete_entry(cls, chunks):
        if len(chunks) == 1:
            try:
                return True, cls.from_raw_buffer(chunks[0])
            except IncompleteEntryException:
                return False
        return super().contains_complete_entry(chunks)

    @classmethod
    def from_raw_buffer(cls, chunk, header_data=None) -> "OneLineBuffer":
        """Create a buffer with full entries

        Extract complete entries, i. e. a number of lines that is divisible by lines per entry

        Parameters
        ----------
        chunk : np.ndarray
            Raw buffer with data that might end with incomplete entry

        Returns
        -------
        'OneLineBuffer'
            Buffer with complete entries

        Examples
        --------
        8

        """
        assert header_data is None
        chunk = EncodedArray(chunk, BaseEncoding)
        new_lines = np.flatnonzero(chunk == NEWLINE)
        n_lines = new_lines.size
        if n_lines < cls.n_lines_per_entry:
            raise IncompleteEntryException("No complete entry in buffer. Try increasing chunk_size.")
        new_lines = new_lines[: n_lines - (n_lines % cls.n_lines_per_entry)]
        chunk = chunk[: new_lines[-1] + 1]
        data = chunk[: new_lines[-1] + 1]
        cls._validate(data, new_lines)
        return cls(cls._get_buffer_extractor(data, new_lines))

    def get_data(self) -> bnpdataclass:
        """Get and parse fields from each line"""
        headers, sequences = [self._buffer_extractor.get_field_by_number(i) for i in (0, 1)]
        return SequenceEntry(headers, sequences)

    def get_field_by_number(self, i: int, t: type=None):
        """ Get a field indexed by number"""

        text = self._buffer_extractor.get_field_by_number(i)
        if t is not None:
            return self._get_parser(t)(text)
        return text

    def __getitem__(self, idx):
        return self.__class__(self._buffer_extractor[idx])

    def count_entries(self) -> int:
        """Count number of entries in file"""
        return len(self._buffer_extractor)

    def get_field_range_as_text(self, start, end):
        """Get a range of fields as text"""
        assert end == start+1
        return self.get_text_field_by_number(start)

    @classmethod
    def from_data(cls, entries: bnpdataclass) -> "OneLineBuffer":
        """Convert the data from the entries into a buffer that can be written to file

        Extract the name and the sequence and make a buffer with alternating lines

        Parameters
        ----------
        entries : bnpdataclass
            The entries to be written to the buffer


        Returns
        -------
        "OneLineBuffer"
            A ASCII encoded buffer
        """

        # names = entries.name
        # sequences = entries.sequence
        # if entries.sequence.encoding != BaseEncoding:
        # sequences = change_encoding(sequences, BaseEncoding)
        data_dict = [(field.type, getattr(entries, field.name)) for field in dataclasses.fields(entries)]
        columns= [get_column(value, key) for key, value in data_dict]
        return cls.join_fields(columns)
        # return cls.join_fields([names, sequences])

    @classmethod
    def join_fields(cls, fields: List[EncodedRaggedArray]):
        field_lengths = np.hstack([field.shape[1][:, None] for field in fields])
        line_lengths = field_lengths+1
        for i in range(len(fields)):
            line_lengths[:, i] += cls._line_offsets[i]
        entry_lengths = line_lengths.sum(axis=-1)
        buffer_size = entry_lengths.sum()
        buf = EncodedArray(np.empty(buffer_size, dtype=np.uint8), BaseEncoding)
        lines = EncodedRaggedArray(buf, line_lengths.ravel())
        step = cls.n_lines_per_entry
        for i, field in enumerate(fields):
            lines[i::step, cls._line_offsets[i]:-1] = field
        lines[0::step, 0] = cls.HEADER
        lines[:, -1] = "\n"
        return buf

    @classmethod
    def _validate(cls, data, new_lines):
        header = cls.HEADER
        if data.size == 0 and new_lines.size==0:
            return
        n_lines = new_lines.size
        n_lines_per_entry = cls.n_lines_per_entry
        assert n_lines % n_lines_per_entry == 0, "Wrong number of lines in buffer"
        header_idxs = (
                new_lines[n_lines_per_entry - 1: -1: n_lines_per_entry]
                + 1
        )

        if np.any(data[header_idxs] != header) or data[0] != header:
            if data[0] != header:
                line_number = 0
            else:
                line_number = (np.flatnonzero(data[header_idxs] != header)[0] + 1) * n_lines_per_entry
            raise FormatException(f"Expected header line to start with {header}" % data, line_number=line_number)

    def get_text_field_by_number(self, i):
        return self.get_field_by_number(i)

    @classmethod
    def _modify_for_carriage_return(cls, field_ends, data):
        if field_ends.size == 0 or field_ends[0, 0] < 1:
            return field_ends
        last_chars = data[field_ends[:cls.n_lines_per_entry, 0] - 1]
        if not np.any(last_chars == "\r"):
            return field_ends
        return field_ends - (data[field_ends-1] == '\r')


[docs] class TwoLineFastaBuffer(OneLineBuffer): HEADER = ">" # 62 n_lines_per_entry = 2 dataclass = SequenceEntry