Benchmarking Examples
Following is the code for the examples used in the benchmarking. The scripts will usually work both as a command line script, but also includes a test showing how to use it in python.
BAM Filtering
Read a BAM file and filter out reads with MAPQ < 60:
with bnp.open(output[0], 'w') as f:
for chunk in bnp.open(input[0]).read_chunks():
f.write(chunk[chunk.mapq == 60])
Unique Intersection
Filter one bed file on whether the intervals overlap with another bed file:
import bionumpy as bnp
def unique_intersect(filename_a: str, filename_b: str, chrom_sizes_file: str, out_filename: str):
# Load the genome
genome = bnp.Genome.from_file(chrom_sizes_file, filter_function=None)
# Load the intervals from the second file and get the binary mask
genome_mask = genome.read_intervals(filename_b).get_mask()
with bnp.open(out_filename, 'w') as f:
for intervals in bnp.open(filename_a).read_chunks():
mask = genome_mask[intervals].any(axis=-1)
f.write(intervals[mask])
def test():
out_filename = 'example_data/ctcf_intersect.bed.gz'
unique_intersect('example_data/ctcf.bed.gz', 'example_data/znf263.bed.gz',
'example_data/hg38.chrom.sizes',
out_filename)
assert bnp.count_entries(out_filename) == 3951
if __name__ == '__main__':
import sys
unique_intersect(*sys.argv[1:])
Jaccard Index
Calculate the Jaccard index between two or more bed files:
"""
Calculates the jaccard similarity between all pairs of bed files.
Writes the results to stdout.
"""
import itertools
from typing import List, Tuple, Dict
import bionumpy as bnp
def jaccard_func(mask_a: bnp.genomic_data.GenomicArray, mask_b: bnp.genomic_data.GenomicArray):
'''Jaccard = intersection / union'''
return (mask_a & mask_b).sum() / (mask_a | mask_b).sum()
def jaccard(chrom_sizes_file: str, bed_files: List[str]) -> Dict[Tuple[str, str], float]:
genome = bnp.Genome.from_file(chrom_sizes_file)
masks = {filename: genome.read_intervals(filename).get_mask() for filename in bed_files}
results = {(file_a, file_b): jaccard_func(masks[file_a], masks[file_b])
for file_a, file_b in itertools.combinations(masks.keys(), r=2)}
return results
def test():
chrom_sizes = 'example_data/hg38.chrom.sizes'
files = ['example_data/ctcf.bed.gz', 'example_data/znf263.bed.gz']
assert len(jaccard(chrom_sizes, files)) == 1
if __name__ == "__main__":
import sys
chrom_sizes = sys.argv[1]
files = sys.argv[2:]
result = jaccard(chrom_sizes, files)
for r in result.items():
print(r)
Count Kmers
Count the number of kmers in a fasta/fastq file:
import bionumpy as bnp
def count_kmers(sequence_entries: bnp.EncodedArray) -> bnp.EncodedCounts:
sequence = bnp.as_encoded_array(sequence_entries, bnp.DNAEncoding)
kmers = bnp.get_kmers(sequence, k=5)
return bnp.count_encoded(kmers, axis=None)
def count_all_kmers(input_file: str, output_file: str):
buffer_type = bnp.TwoLineFastaBuffer if input_file.endswith(('.fa', '.fa.gz')) else None
stream = bnp.open(input_file, buffer_type=buffer_type).read_chunks()
kmers = sum(count_kmers(chunk.sequence) for chunk in stream)
with open(output_file, 'w') as f:
f.writelines(f'{kmer}\t{count}\n'
for kmer, count
in sorted(zip(kmers.alphabet, kmers.counts)))
def test():
count_all_kmers('example_data/big.fq.gz', 'tmp.csv')
if __name__ == "__main__":
import sys
count_all_kmers(*sys.argv[1:])
Reverse Complement
Reverse complement a fasta/fastq file:
import bionumpy as bnp
def reverse_complement(input_filename: str, output_filename: str):
"""Reverse complements a fasta or fastq file and writes the result to a new file."""
bt = lambda filename: (bnp.TwoLineFastaBuffer if filename.endswith(('fa', 'fa.gz')) else None)
with bnp.open(output_filename, "w", buffer_type=bt(output_filename)) as outfile:
for chunk in bnp.open(input_filename, buffer_type=bt(input_filename)).read_chunks():
rc = bnp.sequence.get_reverse_complement(chunk.sequence)
outfile.write(bnp.replace(chunk, sequence=rc))
def test():
reverse_complement('example_data/big.fq.gz', 'example_data/big_rc.fq.gz')
assert bnp.count_entries('example_data/big_rc.fq.gz') == bnp.count_entries('example_data/big.fq.gz')
if __name__ == '__main__':
import sys
reverse_complement(sys.argv[1], sys.argv[2])
Sequence Length Distribution
Calculate the length distribution of sequences in a fasta/fastq file:
import bionumpy as bnp
import numpy as np
def get_sorted_sequence_length_distribution(file_name: str):
'''Return the sequence lengths found and their frequency'''
counts = bnp.bincount(chunk.sequence.lengths for chunk in bnp.open(file_name).read_chunks())
sizes = np.flatnonzero(counts)
return sizes, counts[sizes]
def test():
get_sorted_sequence_length_distribution("example_data/big.fq.gz")
if __name__ == "__main__":
import sys
file_name = sys.argv[1]
out_file = sys.argv[2]
sizes, counts = get_sorted_sequence_length_distribution(file_name)
with open(out_file, "w") as f:
f.write("\n".join(["%d %d" % (size, count) for size, count in zip(sizes, counts)]) + "\n")
Subsampling
Subsample exactly half of the seqeuences in a fasta/fastq file. This example is complicated when working on big files as it one have to figure out how many sequences to subsample from each chunk to get exactly half of the sequences:
import dataclasses
from itertools import islice
import numpy as np
import bionumpy as bnp
@dataclasses.dataclass
class State:
n_entries_left: int
n_samples_left: int
def subsample_reads(input_filename: str, output_filename: str):
bt = bnp.TwoLineFastaBuffer if input_filename.endswith(('fa', 'fasta', 'fa.gz', 'fasta.gz')) else None
n_entries = bnp.count_entries(input_filename, buffer_type=bt)
state = State(n_entries_left=n_entries,
n_samples_left=n_entries // 2)
subsets = (sample_from_chunk(chunk, state) for chunk in bnp.open(input_filename, buffer_type=bt).read_chunks())
with bnp.open(output_filename, "w", buffer_type=bt) as out_file:
for subset in islice(subsets, 0, None):
out_file.write(subset)
def sample_from_chunk(chunk: bnp.SequenceEntry, state: State) -> bnp.SequenceEntry:
chunk_size = len(chunk)
n_samples_in_this_chunk = np.random.hypergeometric(chunk_size, state.n_entries_left - chunk_size,
state.n_samples_left)
rows = np.random.choice(np.arange(chunk_size), n_samples_in_this_chunk, replace=False)
subset = chunk[np.sort(rows)]
state.n_samples_left -= n_samples_in_this_chunk
state.n_entries_left -= chunk_size
return subset
def test():
filename = 'example_data/big.fq.gz'
out_filename = 'tmp.fq.gz'
subsample_reads(filename, out_filename)
assert bnp.count_entries(out_filename) == bnp.count_entries(filename) // 2
if __name__ == '__main__':
import sys
subsample_reads(sys.argv[1], sys.argv[2])
Translation
Translate DNA sequences in a fasta file to protein sequences:
import bionumpy as bnp
from bionumpy.sequence import translate_dna_to_protein
def translate_dna(input_file: str, output_file: str):
with bnp.open(output_file, "w") as f:
for chunk in bnp.open(input_file).read_chunks():
translated = bnp.replace(chunk, sequence=translate_dna_to_protein(chunk.sequence))
f.write(translated)
def test_translate_dna():
input_file = "example_data/dna_translatable.fa"
output_file = "tmp.fa"
translate_dna(input_file, output_file)
assert bnp.count_entries(output_file) == bnp.count_entries(input_file)
if __name__ == "__main__":
import sys
translate_dna(*sys.argv[1:])
VCF Filtering
Filter a VCF file on the allele count:
import bionumpy as bnp
def filter_file_on_allele_count(input_file: str, output_file: str, min_ac: int = 10):
with bnp.open(output_file, "w") as output_file:
for chunk in bnp.open(input_file).read_chunks():
output_file.write(chunk[chunk.info.AC.ravel() >= min_ac])
def test():
filter_file_on_allele_count("example_data/variants_with_header.vcf", "test.vcf", min_ac=1)
assert bnp.count_entries("test.vcf") == 53
if __name__ == "__main__":
import sys
filter_file_on_allele_count(sys.argv[1], sys.argv[2], int(sys.argv[3]))