diff --git a/Cargo.lock b/Cargo.lock index b1b72c48..efcb0d20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -192,6 +192,16 @@ dependencies = [ "serde", ] +[[package]] +name = "buffer-redux" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2886ea01509598caac116942abd33ab5a88fa32acdf7e4abfa0fc489ca520c9" +dependencies = [ + "memchr", + "safemem", +] + [[package]] name = "bumpalo" version = "3.13.0" @@ -890,6 +900,20 @@ dependencies = [ "getrandom", ] +[[package]] +name = "needletail" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db05a5ab397f64070d8c998fa0fbb84e484b81f95752af317dac183a82d9295d" +dependencies = [ + "buffer-redux", + "bytecount", + "bzip2", + "flate2", + "memchr", + "xz2", +] + [[package]] name = "niffler" version = "2.5.0" @@ -1232,8 +1256,10 @@ dependencies = [ "anyhow", "assert_cmd", "assert_matches", + "csv", "env_logger", "log", + "needletail", "niffler", "predicates", "pyo3", @@ -1483,6 +1509,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" +[[package]] +name = "safemem" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" + [[package]] name = "scopeguard" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 20e47456..47a046a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,8 @@ simple-error = "0.3.0" anyhow = "1.0.75" zip = "0.6" tempfile = "3.8" +needletail = "0.5.1" +csv = "1.2.2" [dev-dependencies] assert_cmd = "2.0.4" diff --git a/doc/README.md b/doc/README.md index 04647fab..36bf535b 100644 --- a/doc/README.md +++ b/doc/README.md @@ -1,17 +1,16 @@ # fastgather, fastmultigather, and manysearch - an introduction -This repository implements four sourmash plugins, `fastgather`, `fastmultigather`, `multisearch`, and `manysearch`. These plugins make use of multithreading in Rust to provide very fast implementations of `search` and `gather`. With large databases, these commands can be hundreds to thousands of times faster, and 10-50x lower memory. +This repository implements five sourmash plugins, `manysketch`, `fastgather`, `fastmultigather`, `multisearch`, and `manysearch`. These plugins make use of multithreading in Rust to provide very fast implementations of `sketch`, `search`, and `gather`. With large databases, these commands can be hundreds to thousands of times faster, and 10-50x lower memory. The main *drawback* to these plugin commands is that their inputs and outputs are not as rich as the native sourmash commands. In particular, this means that input databases need to be prepared differently, and the output may be most useful as a prefilter in conjunction with regular sourmash commands. ## Preparing the database -All four commands use -_text files containing lists of signature files_, or "fromfiles", for the search database, and `multisearch`, `manysearch` and `fastmultigather` use "fromfiles" for queries, too. +`manysketch` requires a `fromfile` csv with columns `name,genome_filename,protein_filename`. If you don't have protein_filenames, be sure to include the trailing comma so the csv reader can process the file correctly. All four search commands use _text files containing lists of signature files_, or "fromfiles" for the search database. `multisearch`, `manysearch` and `fastmultigather` also use "fromfiles" for queries, too. (Yes, this plugin will eventually be upgraded to support zip files; keep an eye on [sourmash#2230](https://github.com/sourmash-bio/sourmash/pull/2230).) -To prepare a fromfile from a database, first you need to split the database into individual files: +To prepare a **signature** fromfile from a database, first you need to split the database into individual files: ``` mkdir gtdb-reps-rs214-k21/ cd gtdb-reps-rs214-k21/ @@ -26,6 +25,28 @@ find gtdb-reps-rs214-k21/ -name "*.sig.gz" -type f > list.gtdb-reps-rs214-k21.tx ## Running the commands +### Running `manysketch` + +The `manysketch` command sketches one or more fastas into a zipped sourmash signature collection (`zip`). + +To run `manysketch`, you need to build a text file list of fasta files, with one on each line (`fa.csv`, below). You can then run: + +``` +sourmash scripts manysketch fa.csv -o fa.zip +``` +The output will be written to `fa.zip` + +You can check if all signatures were written properly with +``` +sourmash sig summarize fa.zip +``` + +To modify sketching parameters, use `--param-str` or `-p` and provide valid param string(s) +``` +sourmash scripts manysketch fa.csv -o fa.zip -p k=21,k=31,k=51,scaled=1000,abund -p protein,k=10,scaled=200 +``` + + ### Running `multisearch` The `multisearch` command compares one or more query genomes, and one or more subject genomes. It differs from `manysearch` by loading all genomes into memory. @@ -127,6 +148,8 @@ Each command does things slightly differently, with implications for CPU and dis (The below info is for fromfile lists. If you are using mastiff indexes, very different performance parameters apply. We will update here as we benchmark and improve!) +`manysketch` loads one fasta file from disk per thread and sketches it using all signature params simultaneously. + `manysearch` loads all the queries at the beginning, and then loads one database sketch from disk per thread. The compute-per-database-sketch is dominated by I/O. So your number of threads should be chosen with care for disk load. We typically limit it to `-c 32` for shared disks. `multisearch` loads all the queries and database sketches once, at the beginning, and then uses multithreading to search across all matching sequences. For large databases it is extremely efficient at using all available cores. So 128 threads or more should work fine! diff --git a/pyproject.toml b/pyproject.toml index b4b95c91..c93a0896 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ fastgather = "pyo3_branchwater:Branchwater_Fastgather" fastmultigather = "pyo3_branchwater:Branchwater_Fastmultigather" index = "pyo3_branchwater:Branchwater_Index" check = "pyo3_branchwater:Branchwater_Check" +manysketch = "pyo3_branchwater:Branchwater_Manysketch" [tool.maturin] diff --git a/src/lib.rs b/src/lib.rs index f0ea25db..88b68ec1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,17 +21,18 @@ use std::cmp::{PartialOrd, Ordering}; use anyhow::{Result, anyhow}; +use needletail::parse_fastx_reader; #[macro_use] extern crate simple_error; -use log::{error, info}; use sourmash::signature::{Signature, SigsTrait}; use sourmash::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; use sourmash::sketch::Sketch; use sourmash::index::revindex::RevIndex; use sourmash::prelude::MinHashOps; use sourmash::prelude::FracMinHashOps; +use sourmash::cmd::ComputeParameters; /// Track a name/minhash. @@ -200,7 +201,7 @@ fn manysearch>( let (send, recv) = std::sync::mpsc::sync_channel::(rayon::current_num_threads()); // & spawn a thread that is dedicated to printing to a buffered output - let thrd = start_writer_thread(recv, output.as_ref()); + let thrd = csvwriter_thread(recv, output.as_ref()); // // Main loop: iterate (in parallel) over all search signature paths, @@ -273,11 +274,11 @@ fn manysearch>( // do some cleanup and error handling - if let Err(e) = send { - error!("Unable to send internal data: {:?}", e); + eprintln!("Unable to send internal data: {:?}", e); } if let Err(e) = thrd.join() { - error!("Unable to join internal thread: {:?}", e); + eprintln!("Unable to join internal thread: {:?}", e); } // done! @@ -378,8 +379,62 @@ fn load_sketchlist_filenames>(sketchlist_filename: &P) -> Ok(sketchlist_filenames) } -/// Load a collection of sketches from a file in parallel. +fn load_sketch_fromfile>(sketchlist_filename: &P) -> Result> { + let mut rdr = csv::Reader::from_path(sketchlist_filename)?; + + // Check for right header + let headers = rdr.headers()?; + if headers.len() != 3 || + headers.get(0).unwrap() != "name" || + headers.get(1).unwrap() != "genome_filename" || + headers.get(2).unwrap() != "protein_filename" { + return Err(anyhow!("Invalid header. Expected 'name,genome_filename,protein_filename', but got '{}'", headers.iter().collect::>().join(","))); + } + + let mut results = Vec::new(); + + let mut row_count = 0; + let mut genome_count = 0; + let mut protein_count = 0; + // Create a HashSet to keep track of processed rows. + let mut processed_rows = std::collections::HashSet::new(); + let mut duplicate_count = 0; + + for result in rdr.records() { + let record = result?; + + // Skip duplicated rows + let row_string = record.iter().collect::>().join(","); + if processed_rows.contains(&row_string) { + duplicate_count += 1; + continue; + } + processed_rows.insert(row_string.clone()); + row_count += 1; + let name = record.get(0).ok_or_else(|| anyhow!("Missing 'name' field"))?.to_string(); + + let genome_filename = record.get(1).ok_or_else(|| anyhow!("Missing 'genome_filename' field"))?; + if !genome_filename.is_empty() { + results.push((name.clone(), PathBuf::from(genome_filename), "dna".to_string())); + genome_count += 1; + } + + let protein_filename = record.get(2).ok_or_else(|| anyhow!("Missing 'protein_filename' field"))?; + if !protein_filename.is_empty() { + results.push((name, PathBuf::from(protein_filename), "protein".to_string())); + protein_count += 1; + } + } + // Print warning if there were duplicated rows. + if duplicate_count > 0 { + println!("Warning: {} duplicated rows were skipped.", duplicate_count); + } + println!("Loaded {} rows in total ({} genome and {} protein files)", row_count, genome_count, protein_count); + Ok(results) +} + +/// Load a collection of sketches from a file in parallel. fn load_sketches(sketchlist_paths: Vec, template: &Sketch) -> Result<(Vec, usize, usize)> { @@ -796,16 +851,19 @@ fn read_signatures_from_zip>( let file_name = Path::new(file.name()).file_name().unwrap().to_str().unwrap(); if file_name.ends_with(".sig") || file_name.ends_with(".sig.gz") { - let new_path = temp_dir.path().join(file_name); - let mut new_file = File::create(&new_path)?; + println!("Found signature file: {}", file_name); + let mut new_file = File::create(temp_dir.path().join(file_name))?; new_file.write_all(&sig)?; - signature_paths.push(new_path); + + // Push the created path directly to the vector + signature_paths.push(temp_dir.path().join(file_name)); } } println!("wrote {} signatures to temp dir", signature_paths.len()); Ok((signature_paths, temp_dir)) } + fn index>( siglist: P, template: Sketch, @@ -814,7 +872,7 @@ fn index>( colors: bool, ) -> Result<(), Box> { let mut temp_dir = None; - info!("Loading siglist"); + println!("Loading siglist"); let index_sigs: Vec; @@ -863,13 +921,13 @@ fn check>(index: P, quick: bool) -> Result<(), Box String { + match b { + true => "True".to_string(), + false => "False".to_string(), + } +} + +impl ResultType for ManifestRow { + fn header_fields() -> Vec<&'static str> { + vec!["internal_location", "md5", "md5short", "ksize", "moltype", "num", "scaled", "n_hashes", "with_abundance", "name", "filename"] + } + + fn format_fields(&self) -> Vec { + vec![ + self.internal_location.clone(), + self.md5.clone(), + self.md5short.clone(), + self.ksize.to_string(), + self.moltype.clone(), + self.num.to_string(), + self.scaled.to_string(), + self.n_hashes.to_string(), + bool_to_python_string(self.with_abundance), + format!("\"{}\"", self.name), // Wrap name with quotes + self.filename.clone(), + ] + } +} + +fn make_manifest_row(sig: &Signature, filename: &Path, internal_location: &str, scaled: u64, num: u32, abund: bool, is_dna: bool, is_protein: bool) -> ManifestRow { + if is_dna && is_protein { + panic!("Both is_dna and is_protein cannot be true at the same time."); + } else if !is_dna && !is_protein { + panic!("Either is_dna or is_protein must be true."); + } + let moltype = if is_dna { + "DNA".to_string() + } else { + "protein".to_string() + }; + let sketch = &sig.sketches()[0]; + ManifestRow { + internal_location: internal_location.to_string(), + md5: sig.md5sum(), + md5short: sig.md5sum()[0..8].to_string(), + ksize: sketch.ksize() as u32, + moltype, + num: num, + scaled: scaled, + n_hashes: sketch.size() as usize, + with_abundance: abund, + name: sig.name().to_string(), + // filename: filename.display().to_string(), + filename: filename.to_str().unwrap().to_string(), + } +} -fn start_writer_thread( +fn open_stdout_or_file>( + output: Option

+) -> Box { + // if output is a file, use open_output_file + if let Some(path) = output { + Box::new(open_output_file(&path)) + } else { + Box::new(std::io::stdout()) + } +} + +fn open_output_file>( + output: &P +) -> BufWriter { + let file = File::create(output).unwrap_or_else(|e| { + eprintln!("Error creating output file: {:?}", e); + std::process::exit(1); + }); + BufWriter::new(file) +} + +enum ZipMessage { + SignatureData(Vec, Vec, PathBuf), + WriteManifest, +} + + +fn sigwriter + Send + 'static>( + recv: std::sync::mpsc::Receiver, + output: String, +) -> std::thread::JoinHandle> { + std::thread::spawn(move || -> Result<()> { + let file_writer = open_output_file(&output); + + let options = zip::write::FileOptions::default().compression_method(zip::CompressionMethod::Stored); + let mut zip = zip::ZipWriter::new(file_writer); + let mut manifest_rows: Vec = Vec::new(); + // keep track of md5sum occurrences to prevent overwriting duplicates + let mut md5sum_occurrences: std::collections::HashMap = std::collections::HashMap::new(); + + while let Ok(message) = recv.recv() { + match message { + ZipMessage::SignatureData(sigs, params, filename) => { + if sigs.len() != params.len() { + bail!("Mismatched lengths of signatures and parameters"); + } + for (sig, param) in sigs.iter().zip(params.iter()) { + let md5sum_str = sig.md5sum(); + let count = md5sum_occurrences.entry(md5sum_str.clone()).or_insert(0); + *count += 1; + let sig_filename = if *count > 1 { + format!("signatures/{}_{}.sig.gz", md5sum_str, count) + } else { + format!("signatures/{}.sig.gz", md5sum_str) + }; + write_signature(&sig, &mut zip, options, &sig_filename); + manifest_rows.push(make_manifest_row(&sig, &filename, &sig_filename, param.scaled, param.num, param.track_abundance, param.is_dna, param.is_protein)); + } + }, + ZipMessage::WriteManifest => { + println!("Writing manifest"); + // Start the CSV file inside the zip + zip.start_file("SOURMASH-MANIFEST.csv", options).unwrap(); + + // write manifest version line + writeln!(&mut zip, "# SOURMASH-MANIFEST-VERSION: 1.0").unwrap(); + // Write the header + let header = ManifestRow::header_fields(); + if let Err(e) = writeln!(&mut zip, "{}", header.join(",")) { + eprintln!("Error writing header: {:?}", e); + } + + // Write each manifest row + for row in &manifest_rows { + let formatted_fields = row.format_fields(); // Assuming you have a format_fields method on ManifestRow + if let Err(e) = writeln!(&mut zip, "{}", formatted_fields.join(",")) { + eprintln!("Error writing item: {:?}", e); + } + } + // finalize the zip file writing. + zip.finish().unwrap(); + } + } + } + Ok(()) + }) +} + +fn csvwriter_thread( recv: std::sync::mpsc::Receiver, output: Option

, ) -> std::thread::JoinHandle<()> @@ -926,30 +1142,22 @@ where T: ResultType, P: Clone + std::convert::AsRef, { - let out: Box = match output { - Some(path) => { - let file = std::fs::File::create(&path).unwrap_or_else(|e| { - error!("Error creating output file: {:?}", e); - std::process::exit(1); - }); - Box::new(std::io::BufWriter::new(file)) - } - None => Box::new(std::io::stdout()), - }; - + // create output file + let out = open_stdout_or_file(output.as_ref()); + // spawn a thread that is dedicated to printing to a buffered output std::thread::spawn(move || { let mut writer = out; let header = T::header_fields(); if let Err(e) = writeln!(&mut writer, "{}", header.join(",")) { - error!("Error writing header: {:?}", e); + eprintln!("Error writing header: {:?}", e); } writer.flush().unwrap(); for item in recv.iter() { let formatted_fields = item.format_fields(); if let Err(e) = writeln!(&mut writer, "{}", formatted_fields.join(",")) { - error!("Error writing item: {:?}", e); + eprintln!("Error writing item: {:?}", e); } writer.flush().unwrap(); } @@ -957,6 +1165,34 @@ where } +fn write_signature( + sig: &Signature, + zip: &mut zip::ZipWriter>, + zip_options: zip::write::FileOptions, + sig_filename: &str, +) { + let wrapped_sig = vec![sig]; + let json_bytes = serde_json::to_vec(&wrapped_sig).unwrap(); + + let gzipped_buffer = { + let mut buffer = std::io::Cursor::new(Vec::new()); + { + let mut gz_writer = niffler::get_writer( + Box::new(&mut buffer), + niffler::compression::Format::Gzip, + niffler::compression::Level::Nine, + ).unwrap(); + gz_writer.write_all(&json_bytes).unwrap(); + } + buffer.into_inner() + }; + + zip.start_file(sig_filename, zip_options).unwrap(); + zip.write_all(&gzipped_buffer).unwrap(); +} + + + fn mastiff_manysearch>( queries_file: P, index: P, @@ -970,7 +1206,7 @@ fn mastiff_manysearch>( } // Open database once let db = RevIndex::open(index.as_ref(), true); - info!("Loaded DB"); + println!("Loaded DB"); // Load query paths let query_paths = load_sketchlist_filenames(&queries_file)?; @@ -984,7 +1220,7 @@ fn mastiff_manysearch>( let (send, recv) = std::sync::mpsc::sync_channel::(rayon::current_num_threads()); // & spawn a thread that is dedicated to printing to a buffered output - let thrd = start_writer_thread(recv, output.as_ref()); + let thrd = csvwriter_thread(recv, output.as_ref()); // // Main loop: iterate (in parallel) over all search signature paths, @@ -1063,12 +1299,12 @@ fn mastiff_manysearch>( // do some cleanup and error handling - if let Err(e) = send_result { - error!("Error during parallel processing: {}", e); + eprintln!("Error during parallel processing: {}", e); } // join the writer thread if let Err(e) = thrd.join() { - error!("Unable to join internal thread: {:?}", e); + eprintln!("Unable to join internal thread: {:?}", e); } // done! @@ -1103,7 +1339,7 @@ fn mastiff_manygather>( } // Open database once let db = RevIndex::open(index.as_ref(), true); - info!("Loaded DB"); + println!("Loaded DB"); // Load query paths let query_paths = load_sketchlist_filenames(&queries_file)?; @@ -1154,9 +1390,9 @@ fn mastiff_manygather>( let threshold = threshold_bp / query.minhash.scaled() as usize; // mastiff gather code - info!("Building counter"); + println!("Building counter"); let (counter, query_colors, hash_to_color) = db.prepare_gather_counters(&query.minhash); - info!("Counter built"); + println!("Counter built"); let matches = db.gather( counter, @@ -1205,11 +1441,11 @@ fn mastiff_manygather>( // do some cleanup and error handling - if let Err(e) = send { - error!("Unable to send internal data: {:?}", e); + eprintln!("Unable to send internal data: {:?}", e); } if let Err(e) = thrd.join() { - error!("Unable to join internal thread: {:?}", e); + eprintln!("Unable to join internal thread: {:?}", e); } // done! @@ -1367,11 +1603,11 @@ fn multisearch>( // do some cleanup and error handling - if let Err(e) = send { - error!("Unable to send internal data: {:?}", e); + eprintln!("Unable to send internal data: {:?}", e); } if let Err(e) = thrd.join() { - error!("Unable to join internal thread: {:?}", e); + eprintln!("Unable to join internal thread: {:?}", e); } // done! @@ -1381,6 +1617,292 @@ fn multisearch>( Ok(()) } +#[derive(Clone, Debug, PartialEq, Eq)] +struct Params { + ksize: u32, + track_abundance: bool, + num: u32, + scaled: u64, + seed: u32, + is_protein: bool, + is_dna: bool, +} +use std::hash::Hash; +use std::hash::Hasher; + +impl Hash for Params { + fn hash(&self, state: &mut H) { + self.ksize.hash(state); + self.track_abundance.hash(state); + self.num.hash(state); + self.scaled.hash(state); + self.seed.hash(state); + self.is_protein.hash(state); + self.is_dna.hash(state); + } +} + +fn parse_params_str(params_strs: String) -> Result, String> { + let mut unique_params: std::collections::HashSet = std::collections::HashSet::new(); + + // split params_strs by _ and iterate over each param + for p_str in params_strs.split('_').collect::>().iter() { + let items: Vec<&str> = p_str.split(',').collect(); + + let mut ksizes = Vec::new(); + let mut track_abundance = false; + let mut num = 0; + let mut scaled = 1000; + let mut seed = 42; + let mut is_protein = false; + let mut is_dna = true; + + for item in items.iter() { + match *item { + _ if item.starts_with("k=") => { + let k_value = item[2..].parse() + .map_err(|_| format!("cannot parse k='{}' as a number", &item[2..]))?; + ksizes.push(k_value); + }, + "abund" => track_abundance = true, + "noabund" => track_abundance = false, + _ if item.starts_with("num=") => { + num = item[4..].parse() + .map_err(|_| format!("cannot parse num='{}' as a number", &item[4..]))?; + }, + _ if item.starts_with("scaled=") => { + scaled = item[7..].parse() + .map_err(|_| format!("cannot parse scaled='{}' as a number", &item[7..]))?; + }, + _ if item.starts_with("seed=") => { + seed = item[5..].parse() + .map_err(|_| format!("cannot parse seed='{}' as a number", &item[5..]))?; + }, + "protein" => { + is_protein = true; + is_dna = false; + }, + "dna" => { + is_protein = false; + is_dna = true; + }, + _ => return Err(format!("unknown component '{}' in params string", item)), + } + } + + for &k in &ksizes { + let param = Params { + ksize: k, + track_abundance, + num, + scaled, + seed, + is_protein, + is_dna + }; + unique_params.insert(param); + } + } + + Ok(unique_params.into_iter().collect()) +} + +fn build_siginfo(params: &[Params], moltype: &str, name: &str, filename: &PathBuf) -> (Vec, Vec) { + let mut sigs = Vec::new(); + let mut params_vec = Vec::new(); + + for param in params.iter().cloned() { + match moltype { + // if dna, only build dna sigs. if protein, only build protein sigs + "dna" if !param.is_dna => continue, + "protein" if !param.is_protein => continue, + _ => (), + } + + // Adjust ksize value based on the is_protein flag + let adjusted_ksize = if param.is_protein { + param.ksize * 3 + } else { + param.ksize + }; + + let cp = ComputeParameters::builder() + .ksizes(vec![adjusted_ksize]) + .scaled(param.scaled) + .protein(param.is_protein) + .dna(param.is_dna) + .num_hashes(param.num) + .track_abundance(param.track_abundance) + .build(); + + // let sig = Signature::from_params(&cp); // cant set name with this + let template = sourmash::cmd::build_template(&cp); + let sig = Signature::builder() + .hash_function("0.murmur64") + .name(Some(name.to_string())) + .filename(Some(filename.to_string_lossy().into_owned())) + .signatures(template) + .build(); + sigs.push(sig); + + params_vec.push(param); + } + + (sigs, params_vec) +} + + +fn manysketch + Sync>( + filelist: P, + param_str: String, + output: String, +) -> Result<(), Box> { + + let fileinfo = match load_sketch_fromfile(&filelist) { + Ok(result) => result, + Err(e) => bail!("Could not load fromfile csv. Underlying error: {}", e) + }; + + // if no files to process, exit with error + let n_fastas = fileinfo.len(); + if n_fastas == 0 { + bail!("No files to load, exiting."); + } + + // if output doesnt end in zip, bail + if Path::new(&output).extension().map_or(true, |ext| ext != "zip") { + bail!("Output must be a zip file."); + } + + // set up a multi-producer, single-consumer channel that receives Signature + let (send, recv) = std::sync::mpsc::sync_channel::(rayon::current_num_threads()); + // need to use Arc so we can write the manifest after all sigs have written + let send = std::sync::Arc::new(send); + + // & spawn a thread that is dedicated to printing to a buffered output + let thrd = sigwriter::<&str>(recv, output); + + // parse param string into params_vec, print error if fail + let param_result = parse_params_str(param_str); + let params_vec = match param_result { + Ok(params) => params, + Err(e) => { + eprintln!("Error parsing params string: {}", e); + bail!("Failed to parse params string"); + } + }; + + // iterate over filelist_paths + let processed_fastas = AtomicUsize::new(0); + let failed_paths = AtomicUsize::new(0); + let skipped_paths: AtomicUsize = AtomicUsize::new(0); + + // set reporting threshold at every 5% or every 1 fasta, whichever is larger) + let reporting_threshold = std::cmp::max(n_fastas / 20, 1); + + let send_result = fileinfo + .par_iter() + .filter_map(|(name, filename, moltype)| { + let i = processed_fastas.fetch_add(1, atomic::Ordering::SeqCst); + // progress report at threshold + if i != 0 && i % reporting_threshold == 0 { + let percent_processed = ((i as f64 / n_fastas as f64) * 100.0).round(); + eprintln!("Processed {} fasta files ({}% done)", i, percent_processed); + } + + let mut data: Vec = vec![]; + // build sig templates from params + let (mut sigs, sig_params) = build_siginfo(¶ms_vec, &moltype, &name, &filename); + // if no sigs to build, skip + if sigs.len() == 0 { + let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + return None; + } + + // parse fasta file and add to signature + match File::open(filename) { + Ok(mut f) => { + let _ = f.read_to_end(&mut data); + + match parse_fastx_reader(&data[..]) { + Ok(mut parser) => { + while let Some(record_result) = parser.next() { + match record_result { + Ok(record) => { + for sig in &mut sigs { + if moltype == "protein" { + sig.add_protein(&record.seq()).unwrap(); + } else { + sig.add_sequence(&record.seq(), true).unwrap(); // if not force, panics with 'N' in dna sequence + } + } + }, + Err(error) => { + eprintln!("Error while processing record: {:?}", error); + } + } + } + Some((sigs, sig_params, filename)) + }, + Err(err) => { + eprintln!("Error creating parser for file {}: {:?}", filename.display(), err); + let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); + None + } + } + }, + Err(err) => { + eprintln!("Error opening file {}: {:?}", filename.display(), err); + let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); + None + } + } + }) + .try_for_each_with(send.clone(), |s: &mut std::sync::Arc>, (sigs, sig_params, filename)| { + if let Err(e) = s.send(ZipMessage::SignatureData(sigs, sig_params, filename.clone())) { + Err(format!("Unable to send internal data: {:?}", e)) + } else { + Ok(()) + } + }); + + // After the parallel work, send the WriteManifest message + std::sync::Arc::try_unwrap(send).unwrap().send(ZipMessage::WriteManifest).unwrap(); + + // do some cleanup and error handling - + if let Err(e) = send_result { + eprintln!("Error during parallel processing: {}", e); + } + + // join the writer thread + if let Err(e) = thrd.join().unwrap_or_else(|e| Err(anyhow!("Thread panicked: {:?}", e))) { + eprintln!("Error in sigwriter thread: {:?}", e); + } + + // done! + let i: usize = processed_fastas.fetch_max(0, atomic::Ordering::SeqCst); + eprintln!("DONE. Processed {} fasta files", i); + + let failed_paths = failed_paths.load(atomic::Ordering::SeqCst); + + if failed_paths == i { + bail!("Could not load fasta files: no signatures created."); + } + if failed_paths > 0 { + eprintln!("WARNING: {} fasta files failed to load. See error messages above.", + failed_paths); + } + + let skipped_paths = skipped_paths.load(atomic::Ordering::SeqCst); + if skipped_paths > 0 { + eprintln!("WARNING: {} fasta files skipped - no compatible signatures.", + skipped_paths); + } + + Ok(()) + +} + // // The python wrapper functions. @@ -1529,6 +2051,21 @@ fn do_multisearch(querylist_path: String, } } +#[pyfunction] +fn do_manysketch(filelist: String, + param_str: String, + output: String, + ) -> anyhow::Result{ + match manysketch(filelist, param_str, output) { + Ok(_) => Ok(0), + Err(e) => { + eprintln!("Error: {e}"); + Ok(1) + } + } +} + + #[pymodule] fn pyo3_branchwater(_py: Python, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(do_manysearch, m)?)?; @@ -1536,6 +2073,7 @@ fn pyo3_branchwater(_py: Python, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(do_fastmultigather, m)?)?; m.add_function(wrap_pyfunction!(do_index, m)?)?; m.add_function(wrap_pyfunction!(do_check, m)?)?; + m.add_function(wrap_pyfunction!(do_manysketch, m)?)?; m.add_function(wrap_pyfunction!(set_global_thread_pool, m)?)?; m.add_function(wrap_pyfunction!(do_multisearch, m)?)?; Ok(()) diff --git a/src/python/pyo3_branchwater/__init__.py b/src/python/pyo3_branchwater/__init__.py index 2e5e0a66..f838fefb 100755 --- a/src/python/pyo3_branchwater/__init__.py +++ b/src/python/pyo3_branchwater/__init__.py @@ -256,4 +256,43 @@ def main(self, args): args.output) if status == 0: notify(f"...multisearch is done! results in '{args.output}'") - return status \ No newline at end of file + return status + + +class Branchwater_Manysketch(CommandLinePlugin): + command = 'manysketch' + description = 'massively parallel sketching' + + def __init__(self, p): + super().__init__(p) + p.add_argument('fromfile_csv', help="a csv file containing paths to fasta files. \ + Columns must be: 'name,genome_filename,protein_filename'") + p.add_argument('-o', '--output', required=True, + help='output zip file for the signatures') + p.add_argument('-p', '--param-string', action='append', type=str, default=[], + help='parameter string for sketching (default: k=31,scaled=1000)') + p.add_argument('-c', '--cores', default=0, type=int, + help='number of cores to use (default is all available)') + + def main(self, args): + print_version() + if not args.param_string: + args.param_string = ["k=31,scaled=1000"] + notify(f"params: {args.param_string}") + + # convert to a single string for easier rust handling + args.param_string = "_".join(args.param_string) + # lowercase the param string + args.param_string = args.param_string.lower() + + num_threads = set_thread_pool(args.cores) + + notify(f"sketching all files in '{args.fromfile_csv}' using {num_threads} threads") + + super().main(args) + status = pyo3_branchwater.do_manysketch(args.fromfile_csv, + args.param_string, + args.output) + if status == 0: + notify(f"...manysketch is done! results in '{args.output}'") + return status diff --git a/src/python/tests/test-data/short-protein.fa b/src/python/tests/test-data/short-protein.fa new file mode 100644 index 00000000..2a3c39e7 --- /dev/null +++ b/src/python/tests/test-data/short-protein.fa @@ -0,0 +1,17 @@ +>shortProt +MIAAQLLAYYFTELKDDQVKKIDKYLYAMRLSDETLIDIMTRFRKEMKNGLSRDFNPTAT +VKMLPTFVRSIPDGSEKGDFIALDLGGSSFRILRVQVNHEKNQNVHMESEVYDTPENIVH +GSGSQLFDHVAECLGDFMEKRKIKDKKLPVGFTFSFPCQQSKIDEAILITWTKRFKASGV +EGADVVKLLNKAIKKRGDYDANIVAVVNDTVGTMMTCGYDDQHCEVGLIIGTGTNACYME +ELRHIDLVEGDEGRMCINTEWGAFGDDGSLEDIRTEFDREIDRGSLNPGKQLFEKMVSGM +YLGELVRLILVKMAKEGLLFEGRITPELLTRGKFNTSDVSAIEKNKEGLHNAKEILTRLG +VEPSDDDCVSVQHVCTIVSFRSANLVAATLGAILNRLRDNKGTPRLRTTVGVDGSLYKTH +PQYSRRFHKTLRRLVPDSDVRFLLSESGSGKGAAMVTAVAYRLAEQHRQIEETLAHFHLT +KDMLLEVKKRMRAEMELGLRKQTHNNAVVKMLPSFVRRTPDGTENGDFLALDLGGTNFRV +LLVKIRSGKKRTVEMHNKIYAIPIEIMQGTGEELFDHIVSCISDFLDYMGIKGPRMPLGF +TFSFPCQQTSLDAGILITWTKGFKATDCVGHDVVTLLRDAIKRREEFDLDVVAVVNDTVG +TMMTCAYEEPTCEVGLIVGTGSNACYMEEMKNVEMVEGDQGQMCINMEWGAFGDNGCLDD +IRTHYDRLVDEYSLNAGKQRYEKMISGMYLGEIVRNILIDFTKKGFLFRGQISETLKTRG +IFETKFLSQIESDRLALLQVRAILQQLGLNSTCDDSILVKTVCGVVSRRAAQLCGAGMAA +VVDKIRENRGLDRLNVTVGVDGTLYKLHPHFSRIMHQTVKELSPKCNVSFLLSEDGSGKG +AALITAVGVRLRTEASS diff --git a/src/python/tests/test-data/short.fa b/src/python/tests/test-data/short.fa new file mode 100644 index 00000000..b02775ce --- /dev/null +++ b/src/python/tests/test-data/short.fa @@ -0,0 +1,2 @@ +>shortName +TCTGATCTCGGATAAACAAGCGATCCCAGTACATTTGAATGCCCCCGAGTACTACCTTGTGTGAACCCGATGGTAATTTATTTTCTGAGGATTCAGGAGACACCACAACCTGTATGTACCTCCGATGCGCAGAGGCAACTCAGCCTGACTAGCTAGGAATGGCGGGGACGTGTTATATATGACTGATTATATGTGGAACTTAGGTGCCGCGAGCAAGATCTAGACACCCCCAATATTATTTTTCCGACCAACTTACCGGTGTTGCTCCGCCGTATACGTGGACAACTGTGGATACCGAGTCCTTGATAATCCTAATTGTACTCGGGACTTATCACTAGGACAGATGACCTTTGTCTTCTAAATTGGAGATAGTGTAATTTTTATTTAATCAAGATAATTTTGACACTTCTGCAGTGTGCGGACTATACGAGTTAGTGCTCTTTTACATCCGCGTATATCGCAACTTGGCCAATTTACGACGGGGGTGAATCGCGGAATATATCGAATCTCTGTTTAATTCATAAGATCGCACCACTTAATTGTAGACTCCTCGGCTTCGAAACACAGCTAAGGTGACTGTCGAACCCAGAACAGTTTTTAAAAAACGGCAACCTTGCCATAACACCAACATAGCAAACGCAGCCACGTCGTCCGTTCGCCCAAGGGTACATGATGCGTTACCTTAGTGAAGACTAAGCACCGTGTGTCGATAATCATTCTTAAGGTCCTAACATGTTTATACGGCAACCCCTTTGACACACGGCTTCGTTGGTGCGCTCTGGTGACAGGGCGGCCAGAATGCCGCTATCATAAAAACGTCAAAACAACACGGCTAGTTTGGCTGCTAAACAGAGTCTCGTATTCCCCTAATAGTGGCCGACGAACCTTTATTAGGCGAACCACCTGATATAGTAATCGAATCGCGTGCAGAGTGGCTGCAGAGCCTGTTTTCGCCGTGTGCTGTCTCCATGTAACGTCGAAGCCTCCGGACGCGCTCCAA diff --git a/src/python/tests/test-data/short2.fa b/src/python/tests/test-data/short2.fa new file mode 100644 index 00000000..f9da1e02 --- /dev/null +++ b/src/python/tests/test-data/short2.fa @@ -0,0 +1,2 @@ +>tr1 4 +TCTGATCTCGGATAAACAAGCGATCCCAGTACATTTGAATGCCCCCGAGTACTACCTTGTGTGAACCCGATGGTAATTTATTTTCTGAGGATTCAGGAGACACCACAACCTGTATGTACCTCCGATGCGCAGAGGCAACTCAGCCTGACTAGCTAGGAATGGCGGGGACGTGTTATATATGACTGATTATATGTGGAACTTAGGTGCCGCGAGCAAGATCTAGACACCCCCAATATTATTTTTCCGACCAACTTACCGGTGTTGCTCCGCCGTATACGTGGACAACTGTGGATACCGAGTCCTTGATAATCCTAATTGTACTCGGGACTTATCACTAGGACAGATGACCTTTGTCTTCTAAATTGGAGATAGTGTAATTTTTATTTAATCAAGATAATTTTGACACTTCTGCAGTGTGCGGACTATACGAGTTAGTGCTCTTTTACATCCGCGTATATCGCAACTTGGCCAATTTACGACGGGGGTGAATCGCGGAATATATCGAATCTCTGTTTAATTCATAAGATCGCACCACTTAATTGTAGACTCCTCGGCTTCGAAACACAGCTAAGGTGACTGTCGAACCCAGAACAGTTTTTAAAAAACGGCAACCTTGCCATAACACCAACATAGCAAACGCAGCCACGTCGTCCGTTCGCCCAAGGGTACATGATGCGTTACCTTAGTGAAGACTAAGCACCGTGTGTCGATAATCATTCTTAAGGTCCTAACATGTTTTGACACACGGCTTCGTTGGTGCGCTCTGGTGACAGGGCGGCCAGAATGCCGCTATCATAAAAACGTCAAAACAACACGGCTAGTTTGGCTGCTAAACAGAGTCTCGTATTCCCCTAATAGTGGCCGACGAACCTTTATTAGGCGAACCACCTGATATAGTAATCGAATCGCGTGCAGAGTGGCTGCAGAGCCTGTTTTCGCCGTGTGCTGTCTCCATGTAACGTCGAAGCCTCCGGACGCGCTCCAA diff --git a/src/python/tests/test-data/short3.fa b/src/python/tests/test-data/short3.fa new file mode 100644 index 00000000..941a2aac --- /dev/null +++ b/src/python/tests/test-data/short3.fa @@ -0,0 +1,5 @@ +>firstname +TCTGATCTCGGATAAACAAGCGATCCCAGTACATTTGAATGCCCCCGAGTACTACCTTGTGTGAACCCGATGGTAATTTATTTTCTGAGGATTCAGGAGACACCACAACCTGTATGTACCTCCGATGCGCAGAGGCAACTCAGCCTGACTAGCTAGGAATGGCGGGGACGTGTTATATATGACTGATTATATGTGGAACTTAGGTGCCGCGAGCAAGATCTAGACACCCCCAATATTATTTTTCCGACCAACTTACCGGTGTTGCTCCGCCGTATACGTGGACAACTGTGGATACCGAGTCCTTGATAATCCTAATTGTACTCGGGACTTATCACTAGGACAGATGACCTTTGTCTTCTAAATTGGAGATAGTGTAATTTTTATTTAATCAAGATAATTTTGACACTTCTGCAGTGTGCGGACTATACGAGTTAGTGCTCTTTTACATCCGCGTATATCGCAACTTGGCCAATTTACGACGGGGGTGAATCGCGGAATATATCGAATCTCTGTTTAATTCATAAGATCGCACCACTTAATTGTAGACTCCTCGGCTTCGAAACACAGCTAAGGTGACTGTCGAACCCAGAACAGTTTTTAAAAAA +>other +CGGCAACCTTGCCATAACACCAACATAGCAAACGCAGCCACGTCGTCCGTTCGCCCAAGGGTACATGATGCGTTACCTTAGTGAAGACTAAGCACCGTGTGTCGATAATCATTCTTAAGGTCCTAACATGTTTTGACACACGGCTTCGTTGGTGCGCTCTGGTGACAGGGCGGCCAGAATGCCGCTATCATAAAAACGTCAAAACAACACGGCTAGTTTGGCTGCTAAACAGAGTCTCGTATTCCCCTAATAGTGGCCGACGAACCTTTATTAGGCGAACCACCTGATATAGTAATCGAATCGCGTGCAGAGTGGCTGCAGAGCCTGTTTTCGCCGTGTGCTGTCTCCATGTAACGTCGAAGCCTCCGGACGCGCTCCAA + diff --git a/src/python/tests/test_sketch.py b/src/python/tests/test_sketch.py new file mode 100644 index 00000000..aff9933a --- /dev/null +++ b/src/python/tests/test_sketch.py @@ -0,0 +1,377 @@ +import os +import pytest +import pandas +import sourmash +from sourmash import index + +from . import sourmash_tst_utils as utils + + +def get_test_data(filename): + thisdir = os.path.dirname(__file__) + return os.path.join(thisdir, 'test-data', filename) + + +def make_file_csv(filename, genome_paths, protein_paths = []): + names = [os.path.basename(x).split('.fa')[0] for x in genome_paths] + # Check if the number of protein paths is less than genome paths + # and fill in the missing paths with "". + if len(protein_paths) < len(genome_paths): + protein_paths.extend(["" for _ in range(len(genome_paths) - len(protein_paths))]) + with open(filename, 'wt') as fp: + fp.write("name,genome_filename,protein_filename\n") + for name, genome_path, protein_path in zip(names, genome_paths, protein_paths): + fp.write("{},{},{}\n".format(name, genome_path, protein_path)) + + +def test_installed(runtmp): + with pytest.raises(utils.SourmashCommandFailed): + runtmp.sourmash('scripts', 'manysketch') + + assert 'usage: manysketch' in runtmp.last_result.err + + +def test_manysketch_simple(runtmp): + fa_csv = runtmp.output('db-fa.txt') + + fa1 = get_test_data('short.fa') + fa2 = get_test_data('short2.fa') + fa3 = get_test_data('short3.fa') + + make_file_csv(fa_csv, [fa1, fa2, fa3]) + + output = runtmp.output('db.zip') + + runtmp.sourmash('scripts', 'manysketch', fa_csv, '-o', output, + '--param-str', "dna,k=31,scaled=1") + + assert os.path.exists(output) + assert not runtmp.last_result.out # stdout should be empty + + idx = sourmash.load_file_as_index(output) + sigs = list(idx.signatures()) + print(sigs) + + assert len(sigs) == 3 + + +def test_manysketch_mult_k(runtmp): + fa_csv = runtmp.output('db-fa.txt') + + fa1 = get_test_data('short.fa') + fa2 = get_test_data('short2.fa') + fa3 = get_test_data('short3.fa') + + make_file_csv(fa_csv, [fa1, fa2, fa3]) + + output = runtmp.output('db.zip') + + runtmp.sourmash('scripts', 'manysketch', fa_csv, '-o', output, + '--param-str', "dna,k=21,k=31,scaled=1") + + assert os.path.exists(output) + assert not runtmp.last_result.out # stdout should be empty + + idx = sourmash.load_file_as_index(output) + sigs = list(idx.signatures()) + print(sigs) + + assert len(sigs) == 6 + + +def test_manysketch_mult_k_2(runtmp): + fa_csv = runtmp.output('db-fa.txt') + + fa1 = get_test_data('short.fa') + fa2 = get_test_data('short2.fa') + fa3 = get_test_data('short3.fa') + + make_file_csv(fa_csv, [fa1, fa2, fa3]) + + output = runtmp.output('db.zip') + + runtmp.sourmash('scripts', 'manysketch', fa_csv, '-o', output, + '--param-str', "dna,k=21,scaled=1", + '--param-str', "dna,k=31,scaled=1", + '--param-str', "dna,k=21,scaled=1") + + assert os.path.exists(output) + assert not runtmp.last_result.out # stdout should be empty + + idx = sourmash.load_file_as_index(output) + sigs = list(idx.signatures()) + print(sigs) + + assert len(sigs) == 6 + + +def test_manysketch_mult_moltype(runtmp): + fa_csv = runtmp.output('db-fa.csv') + + fa1 = get_test_data('short.fa') + fa2 = get_test_data('short2.fa') + fa3 = get_test_data('short3.fa') + protfa1 = get_test_data('short-protein.fa') + + make_file_csv(fa_csv, [fa1, fa2, fa3], [protfa1]) + + output = runtmp.output('db.zip') + + runtmp.sourmash('scripts', 'manysketch', fa_csv, '-o', output, + '--param-str', "dna,k=21,scaled=1", + '--param-str', "protein,k=10,scaled=1") + + assert os.path.exists(output) + assert not runtmp.last_result.out # stdout should be empty + + idx = sourmash.load_file_as_index(output) + sigs = list(idx.signatures()) + print(sigs) + + assert len(sigs) == 4 + # check moltypes, etc! + for sig in sigs: + if sig.name == 'short': + if sig.minhash.is_dna: + assert sig.minhash.ksize == 21 + assert sig.minhash.scaled == 1 + assert sig.md5sum() == "1474578c5c46dd09da4c2df29cf86621" + else: + assert sig.minhash.ksize == 10 + assert sig.minhash.scaled == 1 + assert sig.md5sum() == "eb4467d11e0ecd2dbde4193bfc255310" + else: + assert sig.minhash.ksize == 21 + assert sig.minhash.scaled == 1 + assert sig.minhash.is_dna + assert sig.md5sum() in ["4efeebd26644278e36b9553e018a851a","f85747ac4f473c4a71c1740d009f512b"] + + +def test_manysketch_skip_incompatible_fastas(runtmp, capfd): + # provide dna, protein fastas, but only sketch protein (skip protein fastas!) + fa_csv = runtmp.output('db-fa.csv') + + fa1 = get_test_data('short.fa') + fa2 = get_test_data('short2.fa') + fa3 = get_test_data('short3.fa') + protfa1 = get_test_data('short-protein.fa') + + make_file_csv(fa_csv, [fa1, fa2, fa3], [protfa1]) + + output = runtmp.output('db.zip') + + runtmp.sourmash('scripts', 'manysketch', fa_csv, '-o', output, + '--param-str', "protein,k=10,scaled=1") + + assert os.path.exists(output) + assert not runtmp.last_result.out # stdout should be empty + + idx = sourmash.load_file_as_index(output) + sigs = list(idx.signatures()) + print(sigs) + + captured = capfd.readouterr() + print(captured.err) + + assert len(sigs) == 1 + # check moltypes, etc! + for sig in sigs: + assert sig.minhash.ksize == 10 + assert sig.minhash.scaled == 1 + assert sig.md5sum() == "eb4467d11e0ecd2dbde4193bfc255310" + assert 'WARNING: 3 fasta files skipped - no compatible signatures.' in captured.err + + +def test_manysketch_missing_fa_csv(runtmp, capfd): + # test missing fa_csv file + fa_csv = runtmp.output('fa_csv.txt') + output = runtmp.output('out.zip') + # make_file_list(fa_csv, []) # don't make fa_csv file + + with pytest.raises(utils.SourmashCommandFailed): + runtmp.sourmash('scripts', 'manysketch', fa_csv, + '-o', output) + + captured = capfd.readouterr() + print(captured.err) + assert "Could not load fromfile csv" in captured.err + + +def test_manysketch_bad_fa_csv(runtmp, capfd): + # siglist instead of fastalist + siglist = runtmp.output('db-sigs.txt') + + sig2 = get_test_data('2.fa.sig.gz') + sig47 = get_test_data('47.fa.sig.gz') + sig63 = get_test_data('63.fa.sig.gz') + + make_file_csv(siglist, [sig2, sig47, sig63]) + + output = runtmp.output('db.zip') + + with pytest.raises(utils.SourmashCommandFailed): + runtmp.sourmash('scripts', 'manysketch', siglist, '-o', output) + + captured = capfd.readouterr() + print(captured.err) + assert "Could not load fasta files: no signatures created." in captured.err + + +def test_manysketch_bad_fa_csv_2(runtmp, capfd): + # test sketch with fasta provided instead of fa_csv + output = runtmp.output('out.zip') + fa1 = get_test_data('short.fa') + print(fa1) + + with pytest.raises(utils.SourmashCommandFailed): + runtmp.sourmash('scripts', 'manysketch', fa1, + '-o', output) + + captured = capfd.readouterr() + print(captured.err) + assert "Invalid header" in captured.err + assert "Could not load fromfile csv" in captured.err + + +def test_manysketch_bad_fa_csv_3(runtmp, capfd): + # test sketch with improperly formatted fa_csv + fa_csv = runtmp.output('db-fa.csv') + + fa1 = get_test_data('short.fa') + fa2 = get_test_data('short2.fa') + fa3 = get_test_data('short3.fa') + protfa1 = get_test_data('short-protein.fa') + + # make file csv but don't fill empty protein rows with ,"" + make_file_csv(fa_csv, [fa1, fa2, fa3], [protfa1]) + g_fa = [fa1, fa2, fa3] + p_fa = [protfa1] + with open(fa_csv, 'wt') as fp: + fp.write("name,genome_filename,protein_filename\n") + for i, g in enumerate(g_fa): + name = os.path.basename(g).split('.fa')[0] + if i < len(p_fa): + p = p_fa[i] + fp.write("{},{},{}\n".format(name, g, p)) + else: + fp.write("{},{}\n".format(name, g)) # missing prot path, no trailing comma + + output = runtmp.output('db.zip') + + with pytest.raises(utils.SourmashCommandFailed): + runtmp.sourmash('scripts', 'manysketch', fa_csv, + '-o', output) + + captured = capfd.readouterr() + print(captured.err) + assert 'found record with 2 fields' in captured.err + assert "Could not load fromfile csv" in captured.err + + +def test_manysketch_empty_fa_csv(runtmp, capfd): + # test empty fa_csv file + fa_csv = runtmp.output('fa.txt') + output = runtmp.output('out.zip') + make_file_csv(fa_csv, []) # empty + + with pytest.raises(utils.SourmashCommandFailed): + runtmp.sourmash('scripts', 'manysketch', fa_csv, + '-o', output) + + captured = capfd.readouterr() + print(captured.err) + assert "Error: No files to load, exiting." in captured.err + + +def test_manysketch_duplicated_rows(runtmp, capfd): + fa_csv = runtmp.output('db-fa.csv') + + fa1 = get_test_data('short.fa') + fa2 = get_test_data('short2.fa') + fa3 = get_test_data('short3.fa') + protfa1 = get_test_data('short-protein.fa') + + make_file_csv(fa_csv, [fa1, fa1, fa1, fa3]) + + output = runtmp.output('db.zip') + + runtmp.sourmash('scripts', 'manysketch', fa_csv, '-o', output, + '--param-str', "dna,k=21,scaled=1", + '--param-str', "protein,k=10,scaled=1") + + assert os.path.exists(output) + assert not runtmp.last_result.out # stdout should be empty + + idx = sourmash.load_file_as_index(output) + sigs = list(idx.signatures()) + print(sigs) + + captured = capfd.readouterr() + print(captured.err) + assert len(sigs) == 2 + assert "DONE. Processed 2 fasta files" in captured.err + + +def test_manysketch_N_in_dna(runtmp): + # make sure we can handle Ns in DNA sequences + fa_csv = runtmp.output('db-fa.txt') + fa1 = runtmp.output('bad.fa') + with open (fa1, 'wt') as fp: + fp.write(">bad\n") + fp.write("ACAGTN\n") + + make_file_csv(fa_csv, [fa1]) + + output = runtmp.output('db.zip') + + runtmp.sourmash('scripts', 'manysketch', fa_csv, '-o', output, + '--param-str', "dna,k=4,scaled=1") + + assert os.path.exists(output) + assert not runtmp.last_result.out # stdout should be empty + + idx = sourmash.load_file_as_index(output) + sigs = list(idx.signatures()) + print(sigs) + + assert len(sigs) == 1 + + +def test_zip_manifest(runtmp, capfd): + # test basic manifest-generating functionality. + fa_csv = runtmp.output('db-fa.txt') + + fa1 = get_test_data('short.fa') + fa2 = get_test_data('short2.fa') + fa3 = get_test_data('short3.fa') + + make_file_csv(fa_csv, [fa1, fa2, fa3]) + output = runtmp.output('db.zip') + + runtmp.sourmash('scripts', 'manysketch', fa_csv, '-o', output, + '--param-str', "dna,k=31,scaled=1") + + loader = sourmash.load_file_as_index(output) + + rows = [] + siglist = [] + for (sig, loc) in loader._signatures_with_internal(): + row = index.CollectionManifest.make_manifest_row(sig, loc) + rows.append(row) + siglist.append(sig) + + manifest = index.CollectionManifest(rows) + + assert len(manifest) == len(rows) + assert len(manifest) == 3 + + md5_list = [ row['md5'] for row in manifest.rows ] + assert '9191284a3a23a913d8d410f3d53ce8f0' in md5_list + assert 'd663bb55b2a0f8782c53c8af89f20fff' in md5_list + assert 'bf752903d635b1eb83c53fe4aae951db' in md5_list + + for sig in siglist: + assert sig in manifest + assert sig.minhash.ksize == 31 + assert sig.minhash.moltype == 'DNA' + assert sig.minhash.scaled == 1