Source code for bionumpy.io.files

from pathlib import PurePath
from typing import Union


from .gzip_reading import gzip
# import gzip
import dataclasses
from .file_buffers import FileBuffer
from .fastq_buffer import FastQBuffer
from .multiline_buffer import MultiLineFastaBuffer
from .bam import BamBuffer, BamIntervalBuffer
from .delimited_buffers import (BedBuffer, GfaSequenceBuffer,
                                GFFBuffer, ChromosomeSizeBuffer,
                                NarrowPeakBuffer, BdgBuffer, GTFBuffer)
from .buffers.sam import SAMBuffer
from .vcf_buffers import VCFBuffer
from .pairs import PairsBuffer
from .wig import WigBuffer
from .parser import NumpyFileReader, NpBufferedWriter, NumpyBamWriter
from .exceptions import FormatException
from ..streams import NpDataclassStream
from ..bnpdataclass import BNPDataClass, bnpdataclass
from .npdataclassreader import NpDataclassReader
import logging

logger = logging.getLogger(__name__)


buffer_types = {
    ".vcf": VCFBuffer,
    ".bed": BedBuffer,
    '.bdg': BdgBuffer,
    ".narrowPeak": NarrowPeakBuffer,
    ".fasta": MultiLineFastaBuffer,
    ".fa": MultiLineFastaBuffer,
    ".fastq": FastQBuffer,
    ".fq": FastQBuffer,
    ".gfa": GfaSequenceBuffer,
    ".gff": GFFBuffer,
    ".gtf": GTFBuffer,
    ".gff3": GFFBuffer,
    ".sam": SAMBuffer, #, comment="@"),
    ".bam": BamBuffer,
    ".sizes": ChromosomeSizeBuffer,
    '.wig': WigBuffer,
    '.pairs': PairsBuffer,
    '.pa5': PairsBuffer,
}


def _get_buffered_file(
    filename, suffix, mode, is_gzip=False, buffer_type=None, **kwargs
):
    open_func = gzip.open if is_gzip else open

    if buffer_type is None:
        buffer_type = _get_buffer_type(suffix)
    writer_class = NpBufferedWriter
    if suffix == ".bam":
        writer_class = NumpyBamWriter
    if mode in ("w", "write", "wb"):
        return writer_class(open_func(filename, "wb"), buffer_type)
    elif mode in ('a', 'append', 'ab'):
        return writer_class(open_func(filename, 'ab'), buffer_type)

    # kwargs2 = {key: val for key, val in kwargs.items() if key in ["has_header"]}
    file_reader = NumpyFileReader(open_func(filename, "rb"), buffer_type) # , **kwargs2)
    if is_gzip:
        file_reader.set_prepend_mode()
    lazy= kwargs.get('lazy', None)
    return NpDataclassReader(file_reader, lazy=lazy)


def _get_buffer_type(suffix):
    if suffix in buffer_types:
        return buffer_types[suffix]
    else:
        raise RuntimeError(f"File format {suffix} does not have a default buffer type. "
                           f"Specify buffer_type argument using get_bufferclass_for_datatype function or"
                           f"use one of {str(list(buffer_types.keys()))[1:-1]}")


[docs] def bnp_open(filename: str, mode: str = None, buffer_type=None, lazy=None) -> Union[NpDataclassReader, NpBufferedWriter]: """Open a file according to its suffix Open a `NpDataclassReader` file object, that can be used to read the file, either in chunks or completely. Files read in chunks can be used together with the `@bnp.streamable` decorator to call a function on all chunks in the file and optionally reduce the results. If `mode="w"` it opens a writer object. Parameters ---------- filename : str Name of the file to open mode : str Either "w" or "r" buffer_type : FileBuffer A `FileBuffer` class to specify how the data in the file should be interpreted lazy : bool If True, the data will be read lazily, i. e. only when it is accessed. This is useful to speed up reading of large files, but it is more memory demanding Returns ------- NpDataclassReader A file reader object Examples -------- >>> import bionumpy as bnp >>> all_data = bnp.open("example_data/big.fq.gz").read() >>> print(all_data) SequenceEntryWithQuality with 1000 entries name sequence quality 2fa9ee19-5c51-4281-a... CGGTAGCCAGCTGCGTTCAG... [10 5 5 12 5 4 3 1f9ca490-2f25-484a-8... GATGCATACTTCGTTCGATT... [ 5 4 5 4 6 6 5 06936a64-6c08-40e9-8... GTTTTGTCGCTGCGTTCAGT... [ 3 5 6 7 7 5 4 d6a555a1-d8dd-4e55-9... CGTATGCTTTGAGATTCATT... [ 2 3 4 4 4 4 6 91ca9c6c-12fe-4255-8... CGGTGTACTTCGTTCCAGCT... [ 4 3 5 6 3 5 6 4dbe5037-abe2-4176-8... GCAGGTGATGCTTTGGTTCA... [ 2 3 4 6 7 7 6 df3de4e9-48ca-45fc-8... CATGCTTCGTTGGTTACCTC... [ 5 5 5 4 7 7 7 bfde9b59-2f6d-48e8-8... CTGTTGTGCGCTTCGTTCAT... [ 8 8 10 7 8 6 3 dbcfd59a-7a96-46a2-9... CGATTATTTGGTTCGTTCAT... [ 5 4 2 3 5 2 2 a0f83c4e-4c20-4c15-b... GTTGTACTTTACGTTTCAAT... [ 3 5 10 6 7 6 6 >>> first_chunk = bnp.open("example_data/big.fq.gz").read_chunk(300000) >>> print(first_chunk) SequenceEntryWithQuality with 511 entries name sequence quality 2fa9ee19-5c51-4281-a... CGGTAGCCAGCTGCGTTCAG... [10 5 5 12 5 4 3 1f9ca490-2f25-484a-8... GATGCATACTTCGTTCGATT... [ 5 4 5 4 6 6 5 06936a64-6c08-40e9-8... GTTTTGTCGCTGCGTTCAGT... [ 3 5 6 7 7 5 4 d6a555a1-d8dd-4e55-9... CGTATGCTTTGAGATTCATT... [ 2 3 4 4 4 4 6 91ca9c6c-12fe-4255-8... CGGTGTACTTCGTTCCAGCT... [ 4 3 5 6 3 5 6 4dbe5037-abe2-4176-8... GCAGGTGATGCTTTGGTTCA... [ 2 3 4 6 7 7 6 df3de4e9-48ca-45fc-8... CATGCTTCGTTGGTTACCTC... [ 5 5 5 4 7 7 7 bfde9b59-2f6d-48e8-8... CTGTTGTGCGCTTCGTTCAT... [ 8 8 10 7 8 6 3 dbcfd59a-7a96-46a2-9... CGATTATTTGGTTCGTTCAT... [ 5 4 2 3 5 2 2 a0f83c4e-4c20-4c15-b... GTTGTACTTTACGTTTCAAT... [ 3 5 10 6 7 6 6 >>> all_chunks = bnp.open("example_data/big.fq.gz").read_chunks(300000) >>> for chunk in all_chunks: ... print(chunk) ... SequenceEntryWithQuality with 511 entries name sequence quality 2fa9ee19-5c51-4281-a... CGGTAGCCAGCTGCGTTCAG... [10 5 5 12 5 4 3 1f9ca490-2f25-484a-8... GATGCATACTTCGTTCGATT... [ 5 4 5 4 6 6 5 06936a64-6c08-40e9-8... GTTTTGTCGCTGCGTTCAGT... [ 3 5 6 7 7 5 4 d6a555a1-d8dd-4e55-9... CGTATGCTTTGAGATTCATT... [ 2 3 4 4 4 4 6 91ca9c6c-12fe-4255-8... CGGTGTACTTCGTTCCAGCT... [ 4 3 5 6 3 5 6 4dbe5037-abe2-4176-8... GCAGGTGATGCTTTGGTTCA... [ 2 3 4 6 7 7 6 df3de4e9-48ca-45fc-8... CATGCTTCGTTGGTTACCTC... [ 5 5 5 4 7 7 7 bfde9b59-2f6d-48e8-8... CTGTTGTGCGCTTCGTTCAT... [ 8 8 10 7 8 6 3 dbcfd59a-7a96-46a2-9... CGATTATTTGGTTCGTTCAT... [ 5 4 2 3 5 2 2 a0f83c4e-4c20-4c15-b... GTTGTACTTTACGTTTCAAT... [ 3 5 10 6 7 6 6 SequenceEntryWithQuality with 489 entries name sequence quality 5f27fb90-2cb0-43d0-a... CGTTGCTGATTCAGCATCAA... [ 5 3 2 3 2 2 4 e23294d9-0079-4345-a... CGAGCCGCTTCGTTCCGGTT... [ 4 5 3 3 3 4 3 56736851-ccc9-41a6-9... CGGTGCCTTCGTTCATTTCT... [ 8 3 7 7 3 1 2 f156362d-d380-480d-8... CTGTTGCGCCCCGGAACAGT... [ 7 11 9 4 4 4 3 300f89ef-608a-463f-8... CATACTTTGGTTCATTCTGT... [ 3 2 4 4 4 4 5 755b1702-4560-4c04-a... GGTATACTTGCCCTACGTTC... [10 9 13 6 3 3 4 98de4f6b-d094-41e8-9... GTTGTACTTCGTTCAGTTTC... [ 4 5 6 4 7 6 6 00ac3f41-f735-49e5-9... GTTGTACTTCGTTCAGCTCT... [ 3 4 5 4 4 10 12 1 f92d30bc-f77f-401e-9... GTTGTACTGCTTCGTTCAGT... [ 6 3 4 3 6 3 2 7e2c14c0-0662-4cc3-8... TGATACATTACTTCGTTCGA... [ 3 8 4 7 2 4 3 """ path = PurePath(filename) suffix = path.suffixes[-1] is_gzip = suffix in (".gz", ".bam") if suffix == ".gz": suffix = path.suffixes[-2] return _get_buffered_file(filename, suffix, mode, is_gzip=is_gzip, buffer_type=buffer_type, lazy=lazy)
[docs] def count_entries(filename: str, buffer_type: FileBuffer = None) -> int: """Count the number of entries in the file By default it uses the file suffix to imply the file format. But a specific `FileBuffer` can be provided. Parameters ---------- filename : str Name of the file to count the entries of buffer_type : FileBuffer A `FileBuffer` class to specify how the data in the file should be interpreted Returns ------- int The number of entries in the file Examples -------- 6 """ reader = NumpyFileReader logger.info(f"Counting entries in {filename}") path = PurePath(filename) suffix = path.suffixes[-1] is_gzip = suffix in (".gz", ".bam") if suffix == '.bam': reader = NumpyBamReader if suffix == ".gz": suffix = path.suffixes[-2] open_func = gzip.open if is_gzip else open if buffer_type is None: buffer_type = _get_buffer_type(suffix) file_reader = reader(open_func(filename, "rb"), buffer_type) if is_gzip: file_reader.set_prepend_mode() chunk_counts = (chunk.count_entries() for chunk in file_reader.read_chunks(min_chunk_size=500000)) return sum(chunk_counts)
def read(filename: str, mode: str = None, buffer_type=None) -> NpDataclassReader: 'openes a file, reads it and closes it ' with bnp_open(filename, mode, buffer_type) as f: content = f.read() return content