Source code for bionumpy.io.delimited_buffers

import io
import logging
import dataclasses
from typing import List, Optional
from npstructures import RaggedArray, RaggedShape
from ..bnpdataclass import bnpdataclass, BNPDataClass
from ..bnpdataclass.lazybnpdataclass import LazyBNPDataClass
from ..datatypes import (Interval, SequenceEntry, Bed12, Bed6, BedGraph,
                         GTFEntry, GFFEntry, ChromosomeSize, NarrowPeak, GfaPath)
from ..encoded_array import EncodedArray, EncodedRaggedArray
from ..encoded_array import as_encoded_array
from ..encodings import Encoding
from ..encodings.exceptions import EncodingError
from ..encodings.alphabet_encoding import DigitEncoding
from ..encoded_array import BaseEncoding
from ..string_array import as_string_array
from ..typing import SequenceID
from ..util import is_subclass_or_instance
from .file_buffers import FileBuffer, NEWLINE, TextBufferExtractor, TextThroughputExtractor
from .strops import (
    split, str_to_int, str_to_float, str_to_int_with_missing, str_to_float_with_missing)
from .dump_csv import dump_csv, join_columns
from .exceptions import FormatException
import numpy as np
from ..bnpdataclass.bnpdataclass import make_dataclass
from ..bnpdataclass.lazybnpdataclass import create_lazy_class


class DelimitedBuffer(FileBuffer):
    """Base class for file buffers for delimited files such as csv or tsv.
    Each line should correspond to an entry, and each column to a variable.

    Provides convenience methods for extracting and decoding integers from columns,
    and text from columns into Sequences objects
    """

    DELIMITER = "\t"
    COMMENT = "#"
    HAS_UNCOMMENTED_HEADER_LINE = False
    n_lines_per_entry = 1

    def __init__(self, buffer_extractor: TextBufferExtractor, header_data=None):
        self._buffer_extractor = buffer_extractor
        self._header_data = header_data
        self._is_validated = True

    def concatenate(self, buffers):
        return self.__class__(self._buffer_extractor.concatenate([b._buffer_extractor for b in buffers]),
                              header_data=self._header_data)

    def __init___(self, data: EncodedArray, new_lines: np.ndarray = None, delimiters: np.ndarray = None,
                  header_data=None, buffer_extractor=None):
        super().__init__(data, new_lines)
        if delimiters is None:
            delimiters = np.concatenate(
                ([-1], np.flatnonzero(self._data == self.DELIMITER), self._new_lines)
            )
            delimiters.sort(kind="mergesort")
        self._delimiters = delimiters
        self._header_data = header_data
        self.__buffer_extractor = buffer_extractor

    @classmethod
    def from_raw_buffer(cls, chunk: np.ndarray, header_data=None) -> "DelimitedBuffer":
        """Make EncodedArray of the chunk and extract all complete lines

        Also find all delimiters in the buffer

        Parameters
        ----------
        chunk : np.ndarray
            Raw bytes chunk as array
        header_data : 6
            Any header data that was read in `read_header`

        Returns
        -------
        DelimitedBuffer
            DelimitedBuffer object of all complete lines

        """
        chunk = EncodedArray(chunk, BaseEncoding)
        mask = chunk == NEWLINE
        mask |= chunk == cls.DELIMITER
        delimiters = np.flatnonzero(mask)
        entry_ends = np.flatnonzero(chunk[delimiters] == '\n')
        if entry_ends.size == 0:
            logging.warning("Foud no new lines. Chunk size may be too low. Try increasing")
            raise
        n_fields = cls._get_n_fields(entry_ends)
        size = delimiters[entry_ends[-1]]+1
        delimiters = np.insert(delimiters[:entry_ends[-1]+1], 0, -1)
        buffer_extractor = cls._get_buffer_extractor(
            chunk[:size], delimiters, n_fields)
        return cls(buffer_extractor, header_data)
        # return cls(chunk[:new_lines[-1] + 1], new_lines, delimiters, header_data, buffer_extractor=buffer_extractor)

    @classmethod
    def _get_n_fields(cls, entry_ends):
        return entry_ends[0] + 1

    @property
    def __buffer_extractor(self):
        if self.__buffer_extractor is None:
            self.__buffer_extractor = self._get_buffer_extractor()
        return self.__buffer_extractor

    @classmethod
    def _get_buffer_extractor(cls, data, delimiters, n_cols) -> TextThroughputExtractor:
        starts = delimiters[:-1].reshape(-1, n_cols) + 1
        ends = delimiters[1:].reshape(-1, n_cols)
        ends = cls._modify_for_carriage_return(ends, data)
        entry_starts = starts[:, 0]
        entry_ends = ends[:, -1] + 1
        return TextThroughputExtractor(data, starts, field_ends=ends, entry_starts=entry_starts, entry_ends=entry_ends)

    @classmethod
    def _modify_for_carriage_return(cls, ends, data):
        if data.size==0 or ends[0, -1]==0:
            return ends
        if data[ends[0, -1]-1] == '\r':
            ends = ends.copy()
            ends[:, -1] -= data[ends[:, -1]-1] == '\r'
        return ends

    def __getitem__(self, idx):
        return self.__class__(self._buffer_extractor[idx], self._header_data)

    @property
    def entries(self):
        if not hasattr(self, "_entries"):
            lengths = np.diff(self._new_lines)
            lengths = np.insert(lengths, 0, self._new_lines[0] + 1)
            self._entries = EncodedRaggedArray(self._data, RaggedShape(lengths))
        return self._entries

    def get_text(self, col, fixed_length=True, keep_sep=False):
        assert not fixed_length and not keep_sep
        if not fixed_length and not keep_sep:
            return self._buffer_extractor.get_field_by_number(col)
        """Extract text from a column

        Extract strings from the specified column into either a 2d
        array or a RaggedArray

        Parameters
        ----------
        col : int
            column index
        fixed_length : bool
            whether all strings have equal length
        keep_sep : bool
            keep seperator at end

        Examples
        --------
        FIXME: Add docs.

        """

    @classmethod
    def join_fields(cls, fields_list: List[EncodedRaggedArray]):
        return join_columns(fields_list, cls.DELIMITER).ravel()

    def get_field_range_as_text(self, *args, **kwargs):
        return self.get_column_range_as_text(*args, **kwargs)

    def get_column_range_as_text(self, col_start, col_end, keep_sep=False):
        """Get multiple columns as text

       Parameters
        ----------
        col_start : int
            column index to start at
        col_end : int
            column index to end at
        keep_sep : bool
            keep seperator at end
        """
        self.validate_if_not()
        assert col_end == col_start + 1
        return self._buffer_extractor.get_field_by_number(col_start)

    @staticmethod
    def _move_ints_to_digit_array(ints, n_digits):
        powers = np.uint8(10) ** np.arange(n_digits)[::-1]
        ret = (ints[..., None] // powers) % 10
        return EncodedArray(ret, DigitEncoding)

    def _validate(self):
        chunk = self._data
        delimiters = self._delimiters[1:]
        n_delimiters_per_line = (
                next(i for i, d in enumerate(delimiters) if chunk[d] == NEWLINE) + 1
        )
        self._n_cols = n_delimiters_per_line
        should_be_new_lines = chunk[delimiters[n_delimiters_per_line - 1::n_delimiters_per_line]]
        if delimiters.size % n_delimiters_per_line != 0 or np.any(should_be_new_lines != "\n"):
            offending_line = np.flatnonzero(should_be_new_lines != "\n")[0]
            lines = split(self._data, '\n')
            raise FormatException(
                f"Irregular number of delimiters per line ({delimiters.size}, {n_delimiters_per_line}): {lines}",
                line_number=offending_line)
        self._validated = True

    @classmethod
    def from_data(cls, data: BNPDataClass) -> "DelimitedBuffer":
        """Put each field of the dataclass into a column in a buffer.

        Parameters
        data : bnpdataclass
            Data
        """
        if isinstance(data, LazyBNPDataClass):
            return cls.from_data(data.get_data_object())
        data_dict = [(field.type, getattr(data, field.name)) for field in dataclasses.fields(data)]
        return dump_csv(data_dict, cls.DELIMITER)

    @classmethod
    def make_header(cls, data: bnpdataclass):
        header = ""
        if data.has_context("header"):
            header = data.get_context("header")
        return bytes(header, "ascii")

    def get_data(self) -> BNPDataClass:
        """Parse the data in the buffer according to the fields in _dataclass

        Returns
        -------
        _dataclass
            Dataclass with parsed data

        """
        self.validate_if_not()
        columns = {}
        fields = dataclasses.fields(self.actual_dataclass)
        for col_number, field in enumerate(fields):
            col = self._get_field_by_number(col_number, field.type)
            columns[field.name] = col
        n_entries = len(next(col for col in columns if col is not None))
        columns = {c: value if c is not None else np.empty((n_entries, 0))
                   for c, value in columns.items()}
        data = self.actual_dataclass(**columns)
        data.set_context("header", self._header_data)
        return data

    def _get_field_by_number(self, col_number, field_type):

        if field_type is None:
            return None
        self.validate_if_not()
        if field_type == int:
            subresult = self._buffer_extractor.get_digit_array(col_number)
            text = subresult[0]
        elif field_type == SequenceID:
            subresult = self._buffer_extractor.get_padded_field(col_number)
            text = subresult
        else:
            subresult: EncodedRaggedArray = self._buffer_extractor.get_field_by_number(
                col_number,
                keep_sep=(field_type == List[int] or field_type==List[float]))
            text = subresult
        assert isinstance(text, (EncodedRaggedArray, EncodedArray)), text
        parser = self._get_parser(field_type)
        if parser is None:
            assert False, (self.__class__, field_type)
        try:
            parsed = parser(subresult)
            assert len(parsed) == len(text)
        except EncodingError as e:
            if isinstance(text, EncodedArray):
                row_number = e.offset // text.shape[1]
            else:
                row_number = np.searchsorted(np.cumsum(text.lengths), e.offset, side="right")
            raise FormatException(e.args[0], line_number=row_number)
        # if is_subclass_or_instance(field_type, Encoding):
        #    parsed = as_encoded_array(subresult, field_type)
        return parsed


    @property
    def actual_dataclass(self):
        return self.dataclass

    def get_field_by_number(self, field_nr: int, field_type: type = object):
        self.validate_if_not()
        if field_type is None:
            field_type = dataclasses.fields(self.actual_dataclass)[field_nr]
        return self._get_field_by_number(
            field_nr, field_type)

    def _parse_split_floats(self, text, sep=','):
        function = str_to_float
        return self._parse_split_fields(text, function, sep)

    def _parse_split_ints(self, text, sep=','):
        function = str_to_int
        return self._parse_split_fields(text, function, sep)

    def _parse_split_fields(self, text, function, sep):
        if len(sep):
            try:
                text[:, -1] = sep
            except ValueError:
                text = text.copy()
                text[:, -1] = sep
            int_strings = split(text.ravel()[:-1], sep=sep)

            if np.any(int_strings.lengths == 0):
                mask = int_strings.lengths != 0
                return RaggedArray(function(int_strings[mask]), (text == sep).sum(axis=-1), safe_mode=False) # TODO: is it necessary with unsafe mode here
            return RaggedArray(function(int_strings), (text == sep).sum(axis=-1))
        else:
            mask = as_encoded_array(text.ravel(), DigitEncoding).raw()
            return RaggedArray(mask, text.shape)

    def count_entries(self) -> int:
        """Count the number of entries in the buffer"""
        return len(self._buffer_extractor)

    @property
    def n_lines(self):
        return len(self._buffer_extractor)


class GfaSequenceBuffer(DelimitedBuffer):
    dataclass = SequenceEntry

    # SKIP_LAZY = True

    def get_data(self):
        ids = self.get_text(1, fixed_length=False)
        sequences = self.get_text(col=2, fixed_length=False)
        return SequenceEntry(ids, sequences)

    def get_field_by_number(self, field_nr: int, field_type: type = object):
        return super().get_field_by_number(field_nr + 1, field_type)

    @classmethod
    def from_data(cls, data: SequenceEntry) -> EncodedArray:
        return dump_csv([(str, as_encoded_array(["S"] * len(data))),
                         (str, data.name),
                         (str, data.sequence)])


class GfaPathBuffer(DelimitedBuffer):
    def get_data(self):
        name = self.get_text(1, fixed_length=False)
        nodes_lists = self.get_text(2, keep_sep=True, fixed_length=False)
        nodes_lists[:, -1] = ","
        lengths = np.sum(nodes_lists == ",", axis=-1)
        all_node_texts = split(nodes_lists.ravel()[:-1], ",")
        int_text = all_node_texts[:, :-1]
        node_ids = str_to_int(int_text)
        directions = np.where(all_node_texts[:, -1] == "+", 1, -1)
        node_ids = RaggedArray(node_ids, lengths)
        directions = RaggedArray(directions, lengths)
        data = GfaPath(name, node_ids, directions)
        return data


def get_bufferclass_for_datatype(_dataclass: bnpdataclass, delimiter: str = "\t", has_header: bool = False,
                                 comment: str = "#",
                                 sub_delimiter=",") -> type:
    """Create a FileBuffer class that can read a delimited file with the fields specified in `_dataclass`

    This can be used to create a parser for a custom delimited file format and also more generic csv 
    reading. The order of the fields in the `_dataclass` is used as the order of the columns in the delimited
    file, unless `has_header=True`, in whcih case the name of the field corresponds to the name in the header.

    Parameters
    ----------
    _dataclass : bnpdataclass
        The dataclass used as template for the DelimitedBuffer
    delimiter : str
        The character used to separate the columns
    has_header : bool
        Wheter a header line should used to match the dataclass fields to columns
    comment : str
        The characted used to specify comment/unused lines in the file

    """

    class DatatypeBuffer(DelimitedBuffer):
        DELIMITER = delimiter
        COMMENT = comment
        HAS_UNCOMMENTED_HEADER_LINE = has_header
        dataclass = _dataclass

        # fields = None
        # data: EncodedArray, new_lines: np.ndarray, delimiters: np.ndarray = None,
        # def __init__(self, buffer_extractor, header_data: List[str] = None):
        #     super().__init__(buffer_extractor, header_data)
        #     # super().__init__(data, new_lines, delimiters, header_data)
        #   # self.set_fields_from_header(header_data)

        @classmethod
        def modify_class_with_header_data(cls, columns):
            fields = dataclasses.fields(cls.dataclass)
            type_dict = {field.name: field.type for field in fields}
            new_fields = [(name, type_dict[name]) if name in type_dict else (name, str) for name in columns]
            # ordered_fields = [next(field for field in fields if field.name == col) for col in columns]
            tmp = make_dataclass(
                new_fields, cls.dataclass.__name__ + 'Permuted')
            new_dataclass = make_dataclass([], cls.dataclass.__name__ + 'Permuted', bases=(cls.dataclass, tmp))
            assert [f.name for f in dataclasses.fields(tmp)] == columns, (
            columns, [f.name for f in dataclasses.fields(tmp)])

            class NewClass(cls):
                _actual_dataclass = cls.dataclass
                dataclass = tmp
                lazy_class = create_lazy_class(tmp)

            return NewClass

        def get_data(self) -> BNPDataClass:
            return super().get_data().astype(self._actual_dataclass)

        @classmethod
        def read_header(cls, file_object: io.FileIO) -> List[str]:
            """Read the column names from the header if `has_header=True`

            Parameters
            ----------
            file_object : io.FileIO

            Returns
            -------
            List[str]
                Column names
            """
            super().read_header(file_object)
            if not has_header:
                return None
            delimiter = cls.DELIMITER
            if not isinstance(delimiter, str):
                delimiter = chr(delimiter)
            return file_object.readline().decode('ascii').strip().split(delimiter)

        @classmethod
        def make_header(cls, data: bnpdataclass):
            """makes a header from field names separated by delimiter"""
            return bytes(cls.DELIMITER.join([field.name for field in dataclasses.fields(data)]) + "\n", 'ascii')

    #         def set_fields_from_header(self, columns: List[str]):
    #             if not has_header:
    #                 return None
    #             fields = dataclasses.fields(self.dataclass)
    #             ordered_fields = [next(field for field in fields if field.name == col) for col in columns]
    # #            self._permuted_data_class = dataclasses.make_dataclass('TmpDataclass', ordered_fields)
    #             self.fields = ordered_fields
    #             assert np.array_equal(columns, [field.name for field in self.fields])
    #
    #         def get_field_by_number(self, field_nr: int, field_type: type=object):
    #             # if self.fields is None:
    #             return super().get_field_by_number(field_nr, field_type)
    #             # col_id, t = next((i, field.type) for i, field in enumerate(dataclasses.fields(self.dataclass)) if field.name == self.fields[field_nr].name)
    #             #return super().get_field_by_number(col_id, t)
    #             # fields = self.fields if self.fields is not None else dataclasses.fields(self.dataclass)
    #
    #         def get_data(self) -> _dataclass:
    #             """Parse the data in the buffer according to the fields in _dataclass
    #
    #             Returns
    #             -------
    #             _dataclass
    #                 Dataclass with parsed data
    #
    #             """
    #             self.validate_if_not()
    #             columns = {}
    #             fields = self.fields if self.fields is not None else dataclasses.fields(self.dataclass)
    #             for col_number, field in enumerate(fields):
    #                 col = self._get_field_by_number(col_number, field.type)
    #                 columns[field.name] = col
    #             n_entries = len(next(col for col in columns if col is not None))
    #             columns = {c: value if c is not None else np.empty((n_entries, 0))
    #                        for c, value in columns.items()}
    #             return self.dataclass(**columns)

    DatatypeBuffer.__name__ = _dataclass.__name__ + "Buffer"
    DatatypeBuffer.__qualname__ = _dataclass.__qualname__ + "Buffer"
    return DatatypeBuffer


[docs] class BedBuffer(DelimitedBuffer): dataclass = Interval def __get_integers(self, cols: list) -> np.ndarray: ''' This is maybe a quicker way to parse ints than the default''' """Get integers from integer string Extract integers from the specified columns Parameters ---------- cols : list list of columns containing integers """ assert np.all(cols < self._n_cols), (str(self._data), cols, self._n_cols) cols = np.asanyarray(cols) assert cols.size == 1 integer_starts = self._col_starts(cols) integer_ends = self._col_ends(cols) array = self._move_intervals_to_2d_array(integer_starts, integer_ends, fill_value='0') try: digits = as_encoded_array(array, DigitEncoding).raw() except EncodingError as e: row_number = e.offset // array.shape[-1] # rows._shape.starts, e.offset, side="right")-1 raise FormatException(e.args[0], line_number=row_number) powers = 10 ** np.arange(digits.shape[-1])[::-1] return digits.dot(powers).reshape(-1, cols.size)
class Bed6Buffer(BedBuffer): dataclass = Bed6 class Bed12Buffer(Bed6Buffer): dataclass = Bed12 class BdgBuffer(BedBuffer): dataclass = BedGraph class NarrowPeakBuffer(DelimitedBuffer): dataclass = NarrowPeak class GTFBuffer(DelimitedBuffer): dataclass = GTFEntry class ChromosomeSizeBuffer(DelimitedBuffer): dataclass = ChromosomeSize class DelimitedBufferWithInernalComments(DelimitedBuffer): @classmethod def _calculate_col_starts_and_ends(cls, data, delimiters): comment_mask = (data[delimiters[:-1]] == '\n') & (data[delimiters[:-1] + 1] == cls.COMMENT) comment_mask = np.flatnonzero(comment_mask) start_delimiters = np.delete(delimiters, comment_mask)[:-1] end_delimiters = np.delete(delimiters, comment_mask + 1) if data[0] != cls.COMMENT: start_delimiters = np.insert(start_delimiters, 0, -1) else: end_delimiters = end_delimiters[1:] return start_delimiters + 1, end_delimiters def _col_ends(self, col): return self._wig_col_ends[:, col] def _col_starts(self, col): return self._wig_col_starts[:, col] def _validate(self): self._validated = True def __init__(self, buffer_extractor: TextBufferExtractor, header_data=None): self._buffer_extractor = buffer_extractor self._header_data = header_data self._is_validated = True # data: EncodedArray, new_lines: np.ndarray, delimiters: np.ndarray = None, header_data=None): # delimiters_mask = data == self.DELIMITER # delimiters_mask[new_lines] = True # delimiters = np.append(np.flatnonzero(delimiters_mask), data.size-1) # super().__init__(data, new_lines, delimiters, header_data) # starts, ends = self._calculate_col_starts_and_ends(data, delimiters) # n_fields = next(i for i, d in enumerate(ends) if data[d] == '\n') + 1 # self._n_cols = n_fields # self._wig_col_starts = starts.reshape(-1, n_fields) # self._wig_col_ends = ends.reshape(-1, n_fields) @classmethod def _get_buffer_extractor(cls, data, new_lines) -> TextBufferExtractor: delimiters_mask = (data == cls.DELIMITER) delimiters_mask[new_lines] = True delimiters = np.append(np.flatnonzero(delimiters_mask), data.size - 1) starts, ends = cls._calculate_col_starts_and_ends(data, delimiters) n_fields = next(i for i, d in enumerate(ends) if data[d] == '\n') + 1 return TextBufferExtractor(data, starts.reshape(-1, n_fields), ends.reshape(-1, n_fields)) @classmethod def from_raw_buffer(cls, chunk: np.ndarray, header_data=None) -> "DelimitedBuffer": """Make EncodedArray of the chunk and extract all complete lines Also find all delimiters in the buffer Parameters ---------- chunk : np.ndarray Raw bytes chunk as array header_data : 6 Any header data that was read in `read_header` Returns ------- DelimitedBuffer DelimitedBuffer object of all complete lines """ chunk = EncodedArray(chunk, BaseEncoding) new_lines = np.flatnonzero(chunk == '\n') extractor = cls._get_buffer_extractor( chunk[:new_lines[-1] + 1], new_lines[:-1]) return cls(extractor, header_data) def __get_integers(self, cols: list) -> np.ndarray: """Get integers from integer string Extract integers from the specified columns Parameters ---------- cols : list list of columns containing integers """ assert np.all(cols < self._n_cols), (str(self._data), cols, self._n_cols) cols = np.asanyarray(cols) integer_starts = self._col_starts(cols) integer_ends = self._col_ends(cols) array = self._move_intervals_to_2d_array(integer_starts, integer_ends, fill_value='0') try: digits = as_encoded_array(array, DigitEncoding).raw() except EncodingError as e: row_number = e.offset // array.shape[-1] # rows._shape.starts, e.offset, side="right")-1 raise FormatException(e.args[0], line_number=row_number) powers = 10 ** np.arange(digits.shape[-1])[::-1] return digits.dot(powers).reshape(-1, cols.size) class GFFBuffer(DelimitedBufferWithInernalComments): dataclass = GFFEntry