Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: try parallelizing across sigs #526

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rustworkx-core = "0.15.1"
streaming-stats = "0.2.3"
rust_decimal = { version = "1.36.0", features = ["maths"] }
rust_decimal_macros = "1.36.0"
getset = "0.1"

[dev-dependencies]
assert_cmd = "2.0.16"
Expand Down
183 changes: 77 additions & 106 deletions src/manysketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,13 @@
use anyhow::{anyhow, Result};
use rayon::prelude::*;

use crate::utils::{load_fasta_fromfile, parse_params_str, sigwriter, Params};
use camino::Utf8Path as Path;
use needletail::parse_fastx_file;
use sourmash::cmd::ComputeParameters;
use sourmash::signature::Signature;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;

pub fn build_siginfo(params: &[Params], input_moltype: &str) -> Vec<Signature> {
let mut sigs = Vec::new();

for param in params.iter().cloned() {
match input_moltype {
// if dna, only build dna sigs. if protein, only build protein sigs, etc
"dna" | "DNA" if !param.is_dna => continue,
"protein" if !param.is_protein && !param.is_dayhoff && !param.is_hp => continue,
_ => (),
}

// Adjust ksize value based on the is_protein flag
let adjusted_ksize = if param.is_protein || param.is_dayhoff || param.is_hp {
param.ksize * 3
} else {
param.ksize
};

let cp = ComputeParameters::builder()
.ksizes(vec![adjusted_ksize])
.scaled(param.scaled)
.protein(param.is_protein)
.dna(param.is_dna)
.dayhoff(param.is_dayhoff)
.hp(param.is_hp)
.num_hashes(param.num)
.track_abundance(param.track_abundance)
.build();

let sig = Signature::from_params(&cp);
sigs.push(sig);
}

sigs
}
use crate::utils::buildutils::{BuildCollection, MultiSelect, MultiSelection};
use crate::utils::{load_fasta_fromfile, zipwriter_handle};

pub fn manysketch(
filelist: String,
Expand All @@ -71,25 +35,25 @@ pub fn manysketch(
bail!("Output must be a zip file.");
}

// set up a multi-producer, single-consumer channel that receives Signature
// set up a multi-producer, single-consumer channel that receives BuildCollection
let (send, recv) =
std::sync::mpsc::sync_channel::<Option<Vec<Signature>>>(rayon::current_num_threads());
// need to use Arc so we can write the manifest after all sigs have written
// let send = std::sync::Arc::new(send);
std::sync::mpsc::sync_channel::<Option<BuildCollection>>(rayon::current_num_threads());

// & spawn a thread that is dedicated to printing to a buffered output
let thrd = sigwriter(recv, output);
let thrd = zipwriter_handle(recv, output);

// parse param string into params_vec, print error if fail
let param_result = parse_params_str(param_str);
let params_vec = match param_result {
Ok(params) => params,
// params --> buildcollection
let sig_template_result = BuildCollection::from_param_str(param_str.as_str());
let sig_templates = match sig_template_result {
Ok(sig_templates) => sig_templates,
Err(e) => {
eprintln!("Error parsing params string: {}", e);
bail!("Failed to parse params string");
bail!("Failed to parse params string: {}", e);
}
};

// print sig templates to build
let _params = sig_templates.summarize_params();

// iterate over filelist_paths
let processed_fastas = AtomicUsize::new(0);
let failed_paths = AtomicUsize::new(0);
Expand All @@ -103,22 +67,24 @@ pub fn manysketch(
.filter_map(|fastadata| {
let name = &fastadata.name;
let filenames = &fastadata.paths;
let moltype = &fastadata.input_type;
// build sig templates for these sketches from params, check if there are sigs to build
let sig_templates = build_siginfo(&params_vec, moltype);
let input_moltype = &fastadata.input_type;
let mut sigs = sig_templates.clone();
// filter sig templates for this fasta by moltype
// atm, we only do DNA->DNA, prot->prot Future -- figure out if we need to modify to allow translate/skip
let multiselection = MultiSelection::from_input_moltype(input_moltype.as_str())
.expect("could not build selection from input moltype");
// todo: select should work in place??
sigs = sigs
.select(&multiselection)
.expect("could not select on sig_templates");

// if no sigs to build, skip this iteration
if sig_templates.is_empty() {
if sigs.is_empty() {
skipped_paths.fetch_add(filenames.len(), atomic::Ordering::SeqCst);
processed_fastas.fetch_add(1, atomic::Ordering::SeqCst);
return None;
}

let mut sigs = sig_templates.clone();
// have name / filename been set for each sig yet?
let mut set_name = false;
// if merging multiple files, sourmash sets filename as last filename
let last_filename = filenames.last().unwrap();

for filename in filenames {
// increment processed_fastas counter; make 1-based for % reporting
let i = processed_fastas.fetch_add(1, atomic::Ordering::SeqCst);
Expand All @@ -132,60 +98,65 @@ pub fn manysketch(
percent_processed
);
}

// Open fasta file reader
let mut reader = match parse_fastx_file(filename) {
Ok(r) => r,
Err(err) => {
eprintln!("Error opening file {}: {:?}", filename, err);
failed_paths.fetch_add(1, atomic::Ordering::SeqCst);
return None;
}
};

// parse fasta and add to signature
while let Some(record_result) = reader.next() {
match record_result {
Ok(record) => {
// do we need to normalize to make sure all the bases are consistently capitalized?
// let norm_seq = record.normalize(false);
sigs.iter_mut().for_each(|sig| {
if singleton {
let record_name = std::str::from_utf8(record.id())
.expect("could not get record id");
sig.set_name(record_name);
sig.set_filename(filename.as_str());
} else if !set_name {
sig.set_name(name);
// sourmash sets filename to last filename if merging fastas
sig.set_filename(last_filename.as_str());
};
if moltype == "protein" {
sig.add_protein(&record.seq())
.expect("Failed to add protein");
} else {
sig.add_sequence(&record.seq(), true)
.expect("Failed to add sequence");
// if not force, panics with 'N' in dna sequence
if singleton {
// Open fasta file reader
let mut reader = match parse_fastx_file(filename) {
Ok(r) => r,
Err(err) => {
eprintln!("Error opening file {}: {:?}", filename, err);
failed_paths.fetch_add(1, atomic::Ordering::SeqCst);
return None;
}
};

while let Some(record_result) = reader.next() {
match record_result {
Ok(record) => {
if let Err(err) = sigs.build_singleton_sigs(
record,
&input_moltype,
filename.to_string(),
) {
eprintln!(
"Error building signatures from file: {}, {:?}",
filename, err
);
// do we want to keep track of singleton sigs that fail? if so, how?
}
});
if !set_name {
set_name = true;
// send singleton sigs for writing
if let Err(e) = send.send(Some(sigs)) {
eprintln!("Unable to send internal data: {:?}", e);
return None;
}
sigs = sig_templates.clone();
}
Err(err) => eprintln!("Error while processing record: {:?}", err),
}
Err(err) => eprintln!("Error while processing record: {:?}", err),
}
if singleton {
// write sigs immediately to avoid memory issues
if let Err(e) = send.send(Some(sigs.clone())) {
eprintln!("Unable to send internal data: {:?}", e);
return None;
} else {
match sigs.build_sigs_from_file_or_stdin(
input_moltype,
name.clone(),
filename.to_string(),
) {
Ok(record_count) => {
// maybe only print if verbose??
// println!(
// "Successfully built signatures from file: {}. Records processed: {}",
// filename, record_count
// );
}
Err(err) => {
eprintln!(
"Error building signatures from file: {}, {:?}",
filename, err
);
failed_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
sigs = sig_templates.clone();
}
}
}
// if singleton sketches, they have already been written; only write aggregate sketches
// if singleton sketches, they have already been written; only send aggregated sketches to be written
if singleton {
None
} else {
Expand All @@ -194,7 +165,7 @@ pub fn manysketch(
})
.try_for_each_with(
send.clone(),
|s: &mut std::sync::mpsc::SyncSender<Option<Vec<Signature>>>, sigs| {
|s: &mut std::sync::mpsc::SyncSender<Option<BuildCollection>>, sigs| {
if let Err(e) = s.send(Some(sigs)) {
Err(format!("Unable to send internal data: {:?}", e))
} else {
Expand Down
105 changes: 84 additions & 21 deletions src/python/tests/test_sketch.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def test_manysketch_bad_fa_csv_2(runtmp, capfd):
captured = capfd.readouterr()
print(captured.err)
assert "Could not load fasta files: no signatures created." in captured.err
assert "Error opening file bad2.fa: ParseError" in captured.err
assert "Error building signatures from file: bad2.fa" in captured.err


def test_manysketch_bad_fa_csv_3(runtmp, capfd):
Expand Down Expand Up @@ -453,35 +453,35 @@ def test_manysketch_bad_param_str_moltype(runtmp, capfd):
captured = capfd.readouterr()
print(captured.err)
assert (
"Error parsing params string: No moltype provided in params string k=31,scaled=100"
"Error parsing params string 'k=31,scaled=100': No moltype provided"
in captured.err
)
assert "Failed to parse params string" in captured.err


def test_manysketch_bad_param_str_ksize(runtmp, capfd):
# no ksize provided in param str
fa_csv = runtmp.output("db-fa.txt")
# def test_manysketch_bad_param_str_ksize(runtmp, capfd):
# # no ksize provided in param str
# fa_csv = runtmp.output("db-fa.txt")

fa1 = get_test_data("short.fa")
fa2 = get_test_data("short2.fa")
fa3 = get_test_data("short3.fa")
# fa1 = get_test_data("short.fa")
# fa2 = get_test_data("short2.fa")
# fa3 = get_test_data("short3.fa")

make_assembly_csv(fa_csv, [fa1, fa2, fa3])
output = runtmp.output("out.zip")
# make_assembly_csv(fa_csv, [fa1, fa2, fa3])
# output = runtmp.output("out.zip")

with pytest.raises(utils.SourmashCommandFailed):
runtmp.sourmash(
"scripts", "manysketch", fa_csv, "-o", output, "-p", "dna,scaled=100"
)
# with pytest.raises(utils.SourmashCommandFailed):
# runtmp.sourmash(
# "scripts", "manysketch", fa_csv, "-o", output, "-p", "dna,scaled=100"
# )

captured = capfd.readouterr()
print(captured.err)
assert (
"Error parsing params string: No ksizes provided in params string dna,scaled=100"
in captured.err
)
assert "Failed to parse params string" in captured.err
# captured = capfd.readouterr()
# print(captured.err)
# assert (
# "Error parsing params string: No ksizes provided in params string dna,scaled=100"
# in captured.err
# )
# assert "Failed to parse params string" in captured.err


def test_manysketch_empty_fa_csv(runtmp, capfd):
Expand Down Expand Up @@ -1397,3 +1397,66 @@ def test_singlesketch_multimoltype_fail(runtmp):
"-p",
"protein,dna,k=7",
)


def test_singlesketch_gzipped_output(runtmp):
"""Test singlesketch with gzipped output."""
fa1 = get_test_data("short.fa")
output = runtmp.output("short.sig.gz")

# Run the singlesketch command
runtmp.sourmash("scripts", "singlesketch", fa1, "-o", output)

# Check if the output exists and contains the expected data
assert os.path.exists(output)

# Verify the file is gzipped
import gzip

try:
with gzip.open(output, "rt") as f:
f.read(1) # Try to read a single character to ensure it's valid gzip
except gzip.BadGzipFile:
assert False, f"Output file {output} is not a valid gzipped file."

# check the signatures
sig = sourmash.load_one_signature(output)

assert sig.name == "short.fa"
assert sig.minhash.ksize == 31
assert sig.minhash.is_dna
assert sig.minhash.scaled == 1000

# validate against sourmash sketch
output2 = runtmp.output("short2.sig")
runtmp.sourmash("sketch", "dna", fa1, "-o", output2)
sig2 = sourmash.load_one_signature(output2)
assert sig.minhash.hashes == sig2.minhash.hashes


def test_singlesketch_zip_output(runtmp):
"""Test singlesketch with zip output."""
fa1 = get_test_data("short.fa")
output = runtmp.output("short.zip")

# Run the singlesketch command
runtmp.sourmash("scripts", "singlesketch", fa1, "-o", output)

# Check if the output exists and contains the expected data
assert os.path.exists(output)
idx = sourmash.load_file_as_index(output)
sigs = list(idx.signatures())
assert len(sigs) == 1
print(sigs)
sig = sigs[0]

assert sig.name == "short.fa"
assert sig.minhash.ksize == 31
assert sig.minhash.is_dna
assert sig.minhash.scaled == 1000

# validate against sourmash sketch
output2 = runtmp.output("short2.sig")
runtmp.sourmash("sketch", "dna", fa1, "-o", output2)
sig2 = sourmash.load_one_signature(output2)
assert sig.minhash.hashes == sig2.minhash.hashes
Loading
Loading