diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 2ff10c85..32e51409 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -18,6 +18,12 @@ jobs: override: true components: rustfmt, clippy + - name: Run cargo fmt + uses: actions-rs/cargo@v1 + with: + command: fmt + args: --all -- --check + - name: cache rust uses: Swatinem/rust-cache@v2 diff --git a/src/check.rs b/src/check.rs index 301c1279..3b6484ee 100644 --- a/src/check.rs +++ b/src/check.rs @@ -4,11 +4,12 @@ use crate::utils::is_revindex_database; use sourmash::index::revindex::RevIndex; - pub fn check>(index: P, quick: bool) -> Result<(), Box> { - if !is_revindex_database(index.as_ref()) { - bail!("'{}' is not a valid RevIndex database", index.as_ref().display()); + bail!( + "'{}' is not a valid RevIndex database", + index.as_ref().display() + ); } println!("Opening DB"); diff --git a/src/fastgather.rs b/src/fastgather.rs index 2ce1d149..43fe0e1b 100644 --- a/src/fastgather.rs +++ b/src/fastgather.rs @@ -1,15 +1,15 @@ /// fastgather: Run gather with a query against a list of files. use anyhow::Result; - use sourmash::signature::Signature; -use std::path::Path; use sourmash::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; use sourmash::sketch::Sketch; +use std::path::Path; -use crate::utils::{prepare_query, write_prefetch, - load_sketches_above_threshold, consume_query_by_gather, - load_sigpaths_from_zip_or_pathlist}; +use crate::utils::{ + consume_query_by_gather, load_sigpaths_from_zip_or_pathlist, load_sketches_above_threshold, + prepare_query, write_prefetch, +}; pub fn fastgather + std::fmt::Debug + std::fmt::Display + Clone>( query_filename: P, @@ -43,7 +43,10 @@ pub fn fastgather + std::fmt::Debug + std::fmt::Display + Clone>( }; // build the list of paths to match against. - eprintln!("Loading matchlist from '{}'", matchlist_filename.as_ref().display()); + eprintln!( + "Loading matchlist from '{}'", + matchlist_filename.as_ref().display() + ); let matchlist_filename = matchlist_filename.as_ref().to_string_lossy().to_string(); let (matchlist_paths, _temp_dir) = load_sigpaths_from_zip_or_pathlist(&matchlist_filename)?; @@ -51,34 +54,43 @@ pub fn fastgather + std::fmt::Debug + std::fmt::Display + Clone>( eprintln!("Loaded {} sig paths in matchlist", matchlist_paths.len()); // calculate the minimum number of hashes based on desired threshold - let threshold_hashes : u64 = { + let threshold_hashes: u64 = { let x = threshold_bp / scaled; if x > 0 { x } else { 1 } - }.try_into()?; + } + .try_into()?; - eprintln!("using threshold overlap: {} {}", - threshold_hashes, threshold_bp); + eprintln!( + "using threshold overlap: {} {}", + threshold_hashes, threshold_bp + ); // load a set of sketches, filtering for those with overlaps > threshold - let result = load_sketches_above_threshold(matchlist_paths, - &template, - &query.minhash, - threshold_hashes)?; + let result = load_sketches_above_threshold( + matchlist_paths, + &template, + &query.minhash, + threshold_hashes, + )?; let matchlist = result.0; let skipped_paths = result.1; let failed_paths = result.2; if skipped_paths > 0 { - eprintln!("WARNING: skipped {} search paths - no compatible signatures.", - skipped_paths); + eprintln!( + "WARNING: skipped {} search paths - no compatible signatures.", + skipped_paths + ); } if failed_paths > 0 { - eprintln!("WARNING: {} search paths failed to load. See error messages above.", - failed_paths); + eprintln!( + "WARNING: {} search paths failed to load. See error messages above.", + failed_paths + ); } if matchlist.is_empty() { @@ -91,7 +103,6 @@ pub fn fastgather + std::fmt::Debug + std::fmt::Display + Clone>( } // run the gather! - consume_query_by_gather(query, matchlist, threshold_hashes, - gather_output).ok(); + consume_query_by_gather(query, matchlist, threshold_hashes, gather_output).ok(); Ok(()) -} \ No newline at end of file +} diff --git a/src/fastmultigather.rs b/src/fastmultigather.rs index 9c262ef6..deb9ece0 100644 --- a/src/fastmultigather.rs +++ b/src/fastmultigather.rs @@ -1,20 +1,21 @@ /// fastmultigather: Run gather for multiple queries against a list of files. - use anyhow::Result; use rayon::prelude::*; use sourmash::signature::Signature; -use std::path::Path; use sourmash::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; use sourmash::sketch::Sketch; +use std::path::Path; use std::sync::atomic; use std::sync::atomic::AtomicUsize; use std::collections::BinaryHeap; -use crate::utils::{prepare_query, write_prefetch, PrefetchResult, - consume_query_by_gather, load_sigpaths_from_zip_or_pathlist, load_sketches_from_zip_or_pathlist, ReportType}; +use crate::utils::{ + consume_query_by_gather, load_sigpaths_from_zip_or_pathlist, + load_sketches_from_zip_or_pathlist, prepare_query, write_prefetch, PrefetchResult, ReportType, +}; pub fn fastmultigather + std::fmt::Debug + Clone>( query_filenames: P, @@ -36,109 +37,117 @@ pub fn fastmultigather + std::fmt::Debug + Clone>( let (querylist_paths, _temp_dir) = load_sigpaths_from_zip_or_pathlist(&query_filenames)?; println!("Loaded {} sig paths in querylist", querylist_paths.len()); - let threshold_hashes : u64 = { + let threshold_hashes: u64 = { let x = threshold_bp / scaled; if x > 0 { x } else { 1 } - }.try_into().unwrap(); + } + .try_into() + .unwrap(); println!("threshold overlap: {} {}", threshold_hashes, threshold_bp); // Load all the against sketches - let sketchlist = load_sketches_from_zip_or_pathlist(&matchlist_filename, &template, ReportType::Against)?; + let sketchlist = + load_sketches_from_zip_or_pathlist(&matchlist_filename, &template, ReportType::Against)?; // Iterate over all queries => do prefetch and gather! let processed_queries = AtomicUsize::new(0); let skipped_paths = AtomicUsize::new(0); let failed_paths = AtomicUsize::new(0); - querylist_paths - .par_iter() - .for_each(|q| { - // increment counter of # of queries - let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst); + querylist_paths.par_iter().for_each(|q| { + // increment counter of # of queries + let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst); - // set query_label to the last path element. - let location = q.clone().into_os_string().into_string().unwrap(); - let location = location.split('/').last().unwrap().to_string(); + // set query_label to the last path element. + let location = q.clone().into_os_string().into_string().unwrap(); + let location = location.split('/').last().unwrap().to_string(); - let query = match Signature::from_path(dbg!(q)) { - Ok(sigs) => { - let mm = prepare_query(&sigs, &template, &location); + let query = match Signature::from_path(dbg!(q)) { + Ok(sigs) => { + let mm = prepare_query(&sigs, &template, &location); - if mm.is_none() { - if !queryfile_name.ends_with(".zip") { - eprintln!("WARNING: no compatible sketches in path '{}'", - q.display()); - } - let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + if mm.is_none() { + if !queryfile_name.ends_with(".zip") { + eprintln!("WARNING: no compatible sketches in path '{}'", q.display()); } - mm - }, - Err(err) => { - eprintln!("Sketch loading error: {}", err); - eprintln!("WARNING: could not load sketches from path '{}'", - q.display()); - let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); - None + let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } - }; - - if let Some(query) = query { - // filter first set of matches out of sketchlist - let matchlist: BinaryHeap = sketchlist - .par_iter() - .filter_map(|sm| { - let mut mm = None; - - if let Ok(overlap) = sm.minhash.count_common(&query.minhash, false) { - if overlap >= threshold_hashes { - let result = PrefetchResult { - name: sm.name.clone(), - md5sum: sm.md5sum.clone(), - minhash: sm.minhash.clone(), - overlap, - }; - mm = Some(result); - } + mm + } + Err(err) => { + eprintln!("Sketch loading error: {}", err); + eprintln!( + "WARNING: could not load sketches from path '{}'", + q.display() + ); + let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); + None + } + }; + + if let Some(query) = query { + // filter first set of matches out of sketchlist + let matchlist: BinaryHeap = sketchlist + .par_iter() + .filter_map(|sm| { + let mut mm = None; + + if let Ok(overlap) = sm.minhash.count_common(&query.minhash, false) { + if overlap >= threshold_hashes { + let result = PrefetchResult { + name: sm.name.clone(), + md5sum: sm.md5sum.clone(), + minhash: sm.minhash.clone(), + overlap, + }; + mm = Some(result); } - mm + } + mm }) .collect(); - if !matchlist.is_empty() { - let prefetch_output = format!("{location}.prefetch.csv"); - let gather_output = format!("{location}.gather.csv"); + if !matchlist.is_empty() { + let prefetch_output = format!("{location}.prefetch.csv"); + let gather_output = format!("{location}.gather.csv"); - // save initial list of matches to prefetch output - write_prefetch(&query, Some(prefetch_output), &matchlist).ok(); + // save initial list of matches to prefetch output + write_prefetch(&query, Some(prefetch_output), &matchlist).ok(); - // now, do the gather! - consume_query_by_gather(query, matchlist, threshold_hashes, - Some(gather_output)).ok(); - } else { - println!("No matches to '{}'", location); - } + // now, do the gather! + consume_query_by_gather(query, matchlist, threshold_hashes, Some(gather_output)) + .ok(); + } else { + println!("No matches to '{}'", location); } - }); - + } + }); - println!("Processed {} queries total.", processed_queries.into_inner()); + println!( + "Processed {} queries total.", + processed_queries.into_inner() + ); let skipped_paths = skipped_paths.into_inner(); let failed_paths = failed_paths.into_inner(); if skipped_paths > 0 { - eprintln!("WARNING: skipped {} query paths - no compatible signatures.", - skipped_paths); + eprintln!( + "WARNING: skipped {} query paths - no compatible signatures.", + skipped_paths + ); } if failed_paths > 0 { - eprintln!("WARNING: {} query paths failed to load. See error messages above.", - failed_paths); + eprintln!( + "WARNING: {} query paths failed to load. See error messages above.", + failed_paths + ); } - + Ok(()) -} \ No newline at end of file +} diff --git a/src/index.rs b/src/index.rs index 61445e98..9a009302 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,6 +1,6 @@ -use std::path::Path; -use sourmash::sketch::Sketch; use sourmash::index::revindex::RevIndex; +use sourmash::sketch::Sketch; +use std::path::Path; use crate::utils::load_sigpaths_from_zip_or_pathlist; @@ -27,4 +27,4 @@ pub fn index>( db.index(index_sigs, &template, 0.0, save_paths); Ok(()) -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index ae069e2d..80006346 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,4 @@ /// Python interface Rust code for pyo3_branchwater. - use pyo3::prelude::*; #[macro_use] @@ -8,29 +7,35 @@ extern crate simple_error; mod utils; use crate::utils::build_template; use crate::utils::is_revindex_database; -mod mastiff_manysearch; -mod mastiff_manygather; -mod manysearch; +mod check; mod fastgather; mod fastmultigather; mod index; -mod check; -mod multisearch; +mod manysearch; mod manysketch; - +mod mastiff_manygather; +mod mastiff_manysearch; +mod multisearch; #[pyfunction] -fn do_manysearch(querylist_path: String, - siglist_path: String, - threshold: f64, - ksize: u8, - scaled: usize, - output_path: Option, +fn do_manysearch( + querylist_path: String, + siglist_path: String, + threshold: f64, + ksize: u8, + scaled: usize, + output_path: Option, ) -> anyhow::Result { // if siglist_path is revindex, run mastiff_manysearch; otherwise run manysearch let template = build_template(ksize, scaled); if is_revindex_database(siglist_path.as_ref()) { - match mastiff_manysearch::mastiff_manysearch(querylist_path, siglist_path, template, threshold, output_path) { + match mastiff_manysearch::mastiff_manysearch( + querylist_path, + siglist_path, + template, + threshold, + output_path, + ) { Ok(_) => Ok(0), Err(e) => { eprintln!("Error: {e}"); @@ -38,8 +43,13 @@ fn do_manysearch(querylist_path: String, } } } else { - match manysearch::manysearch(querylist_path, siglist_path, template, threshold, - output_path) { + match manysearch::manysearch( + querylist_path, + siglist_path, + template, + threshold, + output_path, + ) { Ok(_) => Ok(0), Err(e) => { eprintln!("Error: {e}"); @@ -50,18 +60,24 @@ fn do_manysearch(querylist_path: String, } #[pyfunction] -fn do_fastgather(query_filename: String, - siglist_path: String, - threshold_bp: usize, - ksize: u8, - scaled: usize, - output_path_prefetch: Option, - output_path_gather: Option, +fn do_fastgather( + query_filename: String, + siglist_path: String, + threshold_bp: usize, + ksize: u8, + scaled: usize, + output_path_prefetch: Option, + output_path_gather: Option, ) -> anyhow::Result { - match fastgather::fastgather(query_filename, siglist_path, threshold_bp, - ksize, scaled, - output_path_prefetch, - output_path_gather) { + match fastgather::fastgather( + query_filename, + siglist_path, + threshold_bp, + ksize, + scaled, + output_path_prefetch, + output_path_gather, + ) { Ok(_) => Ok(0), Err(e) => { eprintln!("Error: {e}"); @@ -71,17 +87,24 @@ fn do_fastgather(query_filename: String, } #[pyfunction] -fn do_fastmultigather(query_filenames: String, - siglist_path: String, - threshold_bp: usize, - ksize: u8, - scaled: usize, - output_path: Option, +fn do_fastmultigather( + query_filenames: String, + siglist_path: String, + threshold_bp: usize, + ksize: u8, + scaled: usize, + output_path: Option, ) -> anyhow::Result { // if a siglist path is a revindex, run mastiff_manygather. If not, run multigather if is_revindex_database(siglist_path.as_ref()) { let template = build_template(ksize, scaled); - match mastiff_manygather::mastiff_manygather(query_filenames, siglist_path, template, threshold_bp, output_path) { + match mastiff_manygather::mastiff_manygather( + query_filenames, + siglist_path, + template, + threshold_bp, + output_path, + ) { Ok(_) => Ok(0), Err(e) => { eprintln!("Error: {e}"); @@ -89,7 +112,13 @@ fn do_fastmultigather(query_filenames: String, } } } else { - match fastmultigather::fastmultigather(query_filenames, siglist_path, threshold_bp, ksize, scaled) { + match fastmultigather::fastmultigather( + query_filenames, + siglist_path, + threshold_bp, + ksize, + scaled, + ) { Ok(_) => Ok(0), Err(e) => { eprintln!("Error: {e}"); @@ -101,59 +130,69 @@ fn do_fastmultigather(query_filenames: String, #[pyfunction] fn set_global_thread_pool(num_threads: usize) -> PyResult { - if std::panic::catch_unwind(|| - rayon::ThreadPoolBuilder::new().num_threads(num_threads).build_global() - ).is_ok() { + if std::panic::catch_unwind(|| { + rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .build_global() + }) + .is_ok() + { Ok(rayon::current_num_threads()) } else { Err(PyErr::new::( - "Could not set the number of threads. Global thread pool might already be initialized.")) + "Could not set the number of threads. Global thread pool might already be initialized.", + )) } } #[pyfunction] -fn do_index(siglist: String, - ksize: u8, - scaled: usize, - output: String, - save_paths: bool, - colors: bool, -) -> anyhow::Result{ +fn do_index( + siglist: String, + ksize: u8, + scaled: usize, + output: String, + save_paths: bool, + colors: bool, +) -> anyhow::Result { // build template from ksize, scaled let template = build_template(ksize, scaled); - match index::index(siglist, template, output, - save_paths, colors) { + match index::index(siglist, template, output, save_paths, colors) { Ok(_) => Ok(0), Err(e) => { eprintln!("Error: {e}"); Ok(1) - } } } +} #[pyfunction] -fn do_check(index: String, - quick: bool, - ) -> anyhow::Result{ +fn do_check(index: String, quick: bool) -> anyhow::Result { match check::check(index, quick) { Ok(_) => Ok(0), Err(e) => { eprintln!("Error: {e}"); Ok(1) - } } } +} #[pyfunction] -fn do_multisearch(querylist_path: String, - siglist_path: String, - threshold: f64, - ksize: u8, - scaled: usize, - output_path: Option, +fn do_multisearch( + querylist_path: String, + siglist_path: String, + threshold: f64, + ksize: u8, + scaled: usize, + output_path: Option, ) -> anyhow::Result { - match multisearch::multisearch(querylist_path, siglist_path, threshold, ksize, scaled, - output_path) { + match multisearch::multisearch( + querylist_path, + siglist_path, + threshold, + ksize, + scaled, + output_path, + ) { Ok(_) => Ok(0), Err(e) => { eprintln!("Error: {e}"); @@ -163,12 +202,9 @@ fn do_multisearch(querylist_path: String, } #[pyfunction] -fn do_manysketch(filelist: String, - param_str: String, - output: String, - ) -> anyhow::Result{ +fn do_manysketch(filelist: String, param_str: String, output: String) -> anyhow::Result { match manysketch::manysketch(filelist, param_str, output) { - Ok(_) => Ok(0), + Ok(_) => Ok(0), Err(e) => { eprintln!("Error: {e}"); Ok(1) @@ -176,7 +212,6 @@ fn do_manysketch(filelist: String, } } - #[pymodule] fn pyo3_branchwater(_py: Python, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(do_manysearch, m)?)?; diff --git a/src/manysearch.rs b/src/manysearch.rs index 37e473d3..ae5692b6 100644 --- a/src/manysearch.rs +++ b/src/manysearch.rs @@ -2,22 +2,21 @@ /// the OG MAGsearch, branchwater, etc. /// /// Note: this function loads all _queries_ into memory, and iterates over -/// database once. - +/// database once. use anyhow::Result; use rayon::prelude::*; use sourmash::signature::{Signature, SigsTrait}; -use std::path::Path; use sourmash::sketch::Sketch; +use std::path::Path; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use crate::utils::{prepare_query, - load_sigpaths_from_zip_or_pathlist, SearchResult, - csvwriter_thread, load_sketches_from_zip_or_pathlist, - ReportType}; +use crate::utils::{ + csvwriter_thread, load_sigpaths_from_zip_or_pathlist, load_sketches_from_zip_or_pathlist, + prepare_query, ReportType, SearchResult, +}; pub fn manysearch>( querylist: P, @@ -26,16 +25,18 @@ pub fn manysearch>( threshold: f64, output: Option

, ) -> Result<()> { - // Read in list of query paths. - eprintln!("Reading list of queries from: '{}'", querylist.as_ref().display()); + eprintln!( + "Reading list of queries from: '{}'", + querylist.as_ref().display() + ); // Load all queries into memory at once. let queries = load_sketches_from_zip_or_pathlist(querylist, &template, ReportType::Query)?; // Load all _paths_, not signatures, into memory. let siglist_name = siglist.as_ref().to_string_lossy().to_string(); - let (search_sigs_paths, _temp_dir) = load_sigpaths_from_zip_or_pathlist(siglist)?; + let (search_sigs_paths, _temp_dir) = load_sigpaths_from_zip_or_pathlist(siglist)?; if search_sigs_paths.is_empty() { bail!("No signatures to search loaded, exiting."); @@ -70,19 +71,21 @@ pub fn manysearch>( let mut results = vec![]; // load search signature from path: - match Signature::from_path(filename) { + match Signature::from_path(filename) { Ok(search_sigs) => { let location = filename.display().to_string(); if let Some(search_sm) = prepare_query(&search_sigs, &template, &location) { // search for matches & save containment. for q in queries.iter() { - let overlap = q.minhash.count_common(&search_sm.minhash, false).unwrap() as f64; + let overlap = + q.minhash.count_common(&search_sm.minhash, false).unwrap() as f64; let query_size = q.minhash.size() as f64; let target_size = search_sm.minhash.size() as f64; let containment_query_in_target = overlap / query_size; let containment_in_target = overlap / target_size; - let max_containment = containment_query_in_target.max(containment_in_target); + let max_containment = + containment_query_in_target.max(containment_in_target); let jaccard = overlap / (target_size + query_size - overlap); if containment_query_in_target > threshold { @@ -102,22 +105,25 @@ pub fn manysearch>( // for reading zips, this is likely not a useful warning and // would show up too often (every sig is stored as individual file). if !siglist_name.ends_with(".zip") { - eprintln!("WARNING: no compatible sketches in path '{}'", - filename.display()); - } + eprintln!( + "WARNING: no compatible sketches in path '{}'", + filename.display() + ); + } let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } Some(results) - }, + } Err(err) => { let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); eprintln!("Sketch loading error: {}", err); - eprintln!("WARNING: could not load sketches from path '{}'", - filename.display()); + eprintln!( + "WARNING: could not load sketches from path '{}'", + filename.display() + ); None } } - }) .flatten() .try_for_each_with(send, |s, m| s.send(m)); @@ -139,13 +145,17 @@ pub fn manysearch>( let failed_paths = failed_paths.load(atomic::Ordering::SeqCst); if skipped_paths > 0 { - eprintln!("WARNING: skipped {} search paths - no compatible signatures.", - skipped_paths); + eprintln!( + "WARNING: skipped {} search paths - no compatible signatures.", + skipped_paths + ); } if failed_paths > 0 { - eprintln!("WARNING: {} search paths failed to load. See error messages above.", - failed_paths); + eprintln!( + "WARNING: {} search paths failed to load. See error messages above.", + failed_paths + ); } Ok(()) -} \ No newline at end of file +} diff --git a/src/manysketch.rs b/src/manysketch.rs index 944ceb50..1a55b473 100644 --- a/src/manysketch.rs +++ b/src/manysketch.rs @@ -1,18 +1,16 @@ /// manysketch: massively parallel sketching of sequence files. - -use anyhow::{Result, anyhow}; +use anyhow::{anyhow, Result}; use rayon::prelude::*; +use crate::utils::{load_fasta_fromfile, sigwriter, Params, ZipMessage}; +use needletail::parse_fastx_reader; +use sourmash::cmd::ComputeParameters; +use sourmash::signature::Signature; +use std::fs::File; use std::io::Read; use std::path::Path; -use crate::utils::{Params, load_fasta_fromfile, ZipMessage, sigwriter}; -use sourmash::signature::Signature; -use sourmash::cmd::ComputeParameters; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use needletail::parse_fastx_reader; -use std::fs::File; - fn parse_params_str(params_strs: String) -> Result, String> { let mut unique_params: std::collections::HashSet = std::collections::HashSet::new(); @@ -32,32 +30,36 @@ fn parse_params_str(params_strs: String) -> Result, String> { for item in items.iter() { match *item { _ if item.starts_with("k=") => { - let k_value = item[2..].parse() + let k_value = item[2..] + .parse() .map_err(|_| format!("cannot parse k='{}' as a number", &item[2..]))?; ksizes.push(k_value); - }, + } "abund" => track_abundance = true, "noabund" => track_abundance = false, _ if item.starts_with("num=") => { - num = item[4..].parse() + num = item[4..] + .parse() .map_err(|_| format!("cannot parse num='{}' as a number", &item[4..]))?; - }, + } _ if item.starts_with("scaled=") => { - scaled = item[7..].parse() + scaled = item[7..] + .parse() .map_err(|_| format!("cannot parse scaled='{}' as a number", &item[7..]))?; - }, + } _ if item.starts_with("seed=") => { - seed = item[5..].parse() + seed = item[5..] + .parse() .map_err(|_| format!("cannot parse seed='{}' as a number", &item[5..]))?; - }, + } "protein" => { is_protein = true; is_dna = false; - }, + } "dna" => { is_protein = false; is_dna = true; - }, + } _ => return Err(format!("unknown component '{}' in params string", item)), } } @@ -70,7 +72,7 @@ fn parse_params_str(params_strs: String) -> Result, String> { scaled, seed, is_protein, - is_dna + is_dna, }; unique_params.insert(param); } @@ -79,7 +81,12 @@ fn parse_params_str(params_strs: String) -> Result, String> { Ok(unique_params.into_iter().collect()) } -fn build_siginfo(params: &[Params], moltype: &str, name: &str, filename: &Path) -> (Vec, Vec) { +fn build_siginfo( + params: &[Params], + moltype: &str, + name: &str, + filename: &Path, +) -> (Vec, Vec) { let mut sigs = Vec::new(); let mut params_vec = Vec::new(); @@ -110,11 +117,11 @@ fn build_siginfo(params: &[Params], moltype: &str, name: &str, filename: &Path) // let sig = Signature::from_params(&cp); // cant set name with this let template = sourmash::cmd::build_template(&cp); let sig = Signature::builder() - .hash_function("0.murmur64") - .name(Some(name.to_string())) - .filename(Some(filename.to_string_lossy().into_owned())) - .signatures(template) - .build(); + .hash_function("0.murmur64") + .name(Some(name.to_string())) + .filename(Some(filename.to_string_lossy().into_owned())) + .signatures(template) + .build(); sigs.push(sig); params_vec.push(param); @@ -123,16 +130,14 @@ fn build_siginfo(params: &[Params], moltype: &str, name: &str, filename: &Path) (sigs, params_vec) } - pub fn manysketch + Sync>( filelist: P, param_str: String, output: String, ) -> Result<(), Box> { - let fileinfo = match load_fasta_fromfile(&filelist) { Ok(result) => result, - Err(e) => bail!("Could not load fromfile csv. Underlying error: {}", e) + Err(e) => bail!("Could not load fromfile csv. Underlying error: {}", e), }; // if no files to process, exit with error @@ -142,7 +147,10 @@ pub fn manysketch + Sync>( } // if output doesnt end in zip, bail - if Path::new(&output).extension().map_or(true, |ext| ext != "zip") { + if Path::new(&output) + .extension() + .map_or(true, |ext| ext != "zip") + { bail!("Output must be a zip file."); } @@ -173,73 +181,89 @@ pub fn manysketch + Sync>( let reporting_threshold = std::cmp::max(n_fastas / 20, 1); let send_result = fileinfo - .par_iter() - .filter_map(|(name, filename, moltype)| { - let i = processed_fastas.fetch_add(1, atomic::Ordering::SeqCst); - // progress report at threshold - if i != 0 && i % reporting_threshold == 0 { - let percent_processed = ((i as f64 / n_fastas as f64) * 100.0).round(); - eprintln!("Processed {} fasta files ({}% done)", i, percent_processed); - } + .par_iter() + .filter_map(|(name, filename, moltype)| { + let i = processed_fastas.fetch_add(1, atomic::Ordering::SeqCst); + // progress report at threshold + if i != 0 && i % reporting_threshold == 0 { + let percent_processed = ((i as f64 / n_fastas as f64) * 100.0).round(); + eprintln!("Processed {} fasta files ({}% done)", i, percent_processed); + } - let mut data: Vec = vec![]; - // build sig templates from params - let (mut sigs, sig_params) = build_siginfo(¶ms_vec, moltype, name, filename); - // if no sigs to build, skip - if sigs.is_empty() { - let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); - return None; - } + let mut data: Vec = vec![]; + // build sig templates from params + let (mut sigs, sig_params) = build_siginfo(¶ms_vec, moltype, name, filename); + // if no sigs to build, skip + if sigs.is_empty() { + let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + return None; + } - // parse fasta file and add to signature - match File::open(filename) { - Ok(mut f) => { - let _ = f.read_to_end(&mut data); - - match parse_fastx_reader(&data[..]) { - Ok(mut parser) => { - while let Some(record_result) = parser.next() { - match record_result { - Ok(record) => { - for sig in &mut sigs { - if moltype == "protein" { - sig.add_protein(&record.seq()).unwrap(); - } else { - sig.add_sequence(&record.seq(), true).unwrap(); // if not force, panics with 'N' in dna sequence + // parse fasta file and add to signature + match File::open(filename) { + Ok(mut f) => { + let _ = f.read_to_end(&mut data); + + match parse_fastx_reader(&data[..]) { + Ok(mut parser) => { + while let Some(record_result) = parser.next() { + match record_result { + Ok(record) => { + for sig in &mut sigs { + if moltype == "protein" { + sig.add_protein(&record.seq()).unwrap(); + } else { + sig.add_sequence(&record.seq(), true).unwrap(); + // if not force, panics with 'N' in dna sequence + } } } - }, - Err(error) => { - eprintln!("Error while processing record: {:?}", error); + Err(error) => { + eprintln!("Error while processing record: {:?}", error); + } } } + Some((sigs, sig_params, filename)) + } + Err(err) => { + eprintln!( + "Error creating parser for file {}: {:?}", + filename.display(), + err + ); + let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); + None } - Some((sigs, sig_params, filename)) - }, - Err(err) => { - eprintln!("Error creating parser for file {}: {:?}", filename.display(), err); - let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); - None } } - }, - Err(err) => { - eprintln!("Error opening file {}: {:?}", filename.display(), err); - let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); - None + Err(err) => { + eprintln!("Error opening file {}: {:?}", filename.display(), err); + let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); + None + } } - } - }) - .try_for_each_with(send.clone(), |s: &mut std::sync::Arc>, (sigs, sig_params, filename)| { - if let Err(e) = s.send(ZipMessage::SignatureData(sigs, sig_params, filename.clone())) { - Err(format!("Unable to send internal data: {:?}", e)) - } else { - Ok(()) - } - }); + }) + .try_for_each_with( + send.clone(), + |s: &mut std::sync::Arc>, + (sigs, sig_params, filename)| { + if let Err(e) = s.send(ZipMessage::SignatureData( + sigs, + sig_params, + filename.clone(), + )) { + Err(format!("Unable to send internal data: {:?}", e)) + } else { + Ok(()) + } + }, + ); // After the parallel work, send the WriteManifest message - std::sync::Arc::try_unwrap(send).unwrap().send(ZipMessage::WriteManifest).unwrap(); + std::sync::Arc::try_unwrap(send) + .unwrap() + .send(ZipMessage::WriteManifest) + .unwrap(); // do some cleanup and error handling - if let Err(e) = send_result { @@ -247,7 +271,10 @@ pub fn manysketch + Sync>( } // join the writer thread - if let Err(e) = thrd.join().unwrap_or_else(|e| Err(anyhow!("Thread panicked: {:?}", e))) { + if let Err(e) = thrd + .join() + .unwrap_or_else(|e| Err(anyhow!("Thread panicked: {:?}", e))) + { eprintln!("Error in sigwriter thread: {:?}", e); } @@ -261,16 +288,19 @@ pub fn manysketch + Sync>( bail!("Could not load fasta files: no signatures created."); } if failed_paths > 0 { - eprintln!("WARNING: {} fasta files failed to load. See error messages above.", - failed_paths); + eprintln!( + "WARNING: {} fasta files failed to load. See error messages above.", + failed_paths + ); } let skipped_paths = skipped_paths.load(atomic::Ordering::SeqCst); if skipped_paths > 0 { - eprintln!("WARNING: {} fasta files skipped - no compatible signatures.", - skipped_paths); + eprintln!( + "WARNING: {} fasta files skipped - no compatible signatures.", + skipped_paths + ); } Ok(()) - } diff --git a/src/mastiff_manygather.rs b/src/mastiff_manygather.rs index 36d1bc11..77421c1c 100644 --- a/src/mastiff_manygather.rs +++ b/src/mastiff_manygather.rs @@ -1,24 +1,20 @@ /// mastiff_manygather: mastiff-indexed version of fastmultigather. - use anyhow::Result; use rayon::prelude::*; use sourmash::signature::Signature; -use std::path::Path; use sourmash::sketch::Sketch; +use std::path::Path; use sourmash::index::revindex::RevIndex; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use std::io::{BufWriter, Write}; use std::fs::File; +use std::io::{BufWriter, Write}; - -use crate::utils::{prepare_query, is_revindex_database, - load_sigpaths_from_zip_or_pathlist}; - +use crate::utils::{is_revindex_database, load_sigpaths_from_zip_or_pathlist, prepare_query}; pub fn mastiff_manygather>( queries_file: P, @@ -28,7 +24,10 @@ pub fn mastiff_manygather>( output: Option

, ) -> Result<(), Box> { if !is_revindex_database(index.as_ref()) { - bail!("'{}' is not a valid RevIndex database", index.as_ref().display()); + bail!( + "'{}' is not a valid RevIndex database", + index.as_ref().display() + ); } // Open database once let db = RevIndex::open(index.as_ref(), true); @@ -48,10 +47,18 @@ pub fn mastiff_manygather>( }; let thrd = std::thread::spawn(move || { let mut writer = BufWriter::new(out); - writeln!(&mut writer, "query_name,query_md5,match_name,match_md5,f_match_query,intersect_bp").unwrap(); + writeln!( + &mut writer, + "query_name,query_md5,match_name,match_md5,f_match_query,intersect_bp" + ) + .unwrap(); for (query, query_md5, m, m_md5, f_match_query, intersect_bp) in recv.into_iter() { - writeln!(&mut writer, "\"{}\",{},\"{}\",{},{},{}", - query, query_md5, m, m_md5, f_match_query, intersect_bp).ok(); + writeln!( + &mut writer, + "\"{}\",{},\"{}\",{},{},{}", + query, query_md5, m, m_md5, f_match_query, intersect_bp + ) + .ok(); } }); @@ -85,7 +92,8 @@ pub fn mastiff_manygather>( // mastiff gather code println!("Building counter"); - let (counter, query_colors, hash_to_color) = db.prepare_gather_counters(&query.minhash); + let (counter, query_colors, hash_to_color) = + db.prepare_gather_counters(&query.minhash); println!("Counter built"); let matches = db.gather( @@ -100,20 +108,24 @@ pub fn mastiff_manygather>( // extract matches from Result if let Ok(matches) = matches { for match_ in &matches { - results.push((query.name.clone(), - query.md5sum.clone(), - match_.name().clone(), - match_.md5().clone(), - match_.f_match(), // f_match_query - match_.intersect_bp())); // intersect_bp + results.push(( + query.name.clone(), + query.md5sum.clone(), + match_.name().clone(), + match_.md5().clone(), + match_.f_match(), // f_match_query + match_.intersect_bp(), + )); // intersect_bp } } else { eprintln!("Error gathering matches: {:?}", matches.err()); } } else { if !queryfile_name.ends_with(".zip") { - eprintln!("WARNING: no compatible sketches in path '{}'", - filename.display()); + eprintln!( + "WARNING: no compatible sketches in path '{}'", + filename.display() + ); } let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } @@ -122,12 +134,14 @@ pub fn mastiff_manygather>( } else { Some(results) } - }, + } Err(err) => { let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); eprintln!("Sketch loading error: {}", err); - eprintln!("WARNING: could not load sketches from path '{}'", - filename.display()); + eprintln!( + "WARNING: could not load sketches from path '{}'", + filename.display() + ); None } } @@ -152,13 +166,17 @@ pub fn mastiff_manygather>( let failed_paths = failed_paths.load(atomic::Ordering::SeqCst); if skipped_paths > 0 { - eprintln!("WARNING: skipped {} query paths - no compatible signatures.", - skipped_paths); + eprintln!( + "WARNING: skipped {} query paths - no compatible signatures.", + skipped_paths + ); } if failed_paths > 0 { - eprintln!("WARNING: {} signature paths failed to load. See error messages above.", - failed_paths); + eprintln!( + "WARNING: {} signature paths failed to load. See error messages above.", + failed_paths + ); } Ok(()) -} \ No newline at end of file +} diff --git a/src/mastiff_manysearch.rs b/src/mastiff_manysearch.rs index 55db1b22..4fb7371b 100644 --- a/src/mastiff_manysearch.rs +++ b/src/mastiff_manysearch.rs @@ -1,20 +1,20 @@ /// mastiff_manysearch: mastiff-indexed version of manysearch. - use anyhow::Result; use rayon::prelude::*; use sourmash::signature::{Signature, SigsTrait}; -use std::path::Path; use sourmash::sketch::Sketch; +use std::path::Path; use sourmash::index::revindex::RevIndex; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -use crate::utils::{prepare_query, is_revindex_database, - load_sigpaths_from_zip_or_pathlist, SearchResult, csvwriter_thread}; - +use crate::utils::{ + csvwriter_thread, is_revindex_database, load_sigpaths_from_zip_or_pathlist, prepare_query, + SearchResult, +}; pub fn mastiff_manysearch>( queries_file: P, @@ -22,10 +22,12 @@ pub fn mastiff_manysearch>( template: Sketch, minimum_containment: f64, output: Option

, - ) -> Result<(), Box> { - +) -> Result<(), Box> { if !is_revindex_database(index.as_ref()) { - bail!("'{}' is not a valid RevIndex database", index.as_ref().display()); + bail!( + "'{}' is not a valid RevIndex database", + index.as_ref().display() + ); } // Open database once let db = RevIndex::open(index.as_ref(), true); @@ -74,13 +76,14 @@ pub fn mastiff_manysearch>( let query_size = query.minhash.size() as f64; // search mastiff db let counter = db.counter_for_query(&query.minhash); - let matches = db.matches_from_counter(counter, minimum_containment as usize); + let matches = + db.matches_from_counter(counter, minimum_containment as usize); // filter the matches for containment for (path, overlap) in matches { let containment = overlap as f64 / query_size; if containment >= minimum_containment { - results.push( SearchResult { + results.push(SearchResult { query_name: query.name.clone(), query_md5: query.md5sum.clone(), match_name: path.clone(), @@ -96,8 +99,10 @@ pub fn mastiff_manysearch>( // for reading zips, this is likely not a useful warning and // would show up too often (every sig is stored as individual file). if !queryfile_name.ends_with(".zip") { - eprintln!("WARNING: no compatible sketches in path '{}'", - filename.display()); + eprintln!( + "WARNING: no compatible sketches in path '{}'", + filename.display() + ); } let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } @@ -106,18 +111,20 @@ pub fn mastiff_manysearch>( } else { Some(results) } - }, + } Err(err) => { let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); eprintln!("Sketch loading error: {}", err); - eprintln!("WARNING: could not load sketches from path '{}'", - filename.display()); + eprintln!( + "WARNING: could not load sketches from path '{}'", + filename.display() + ); None } } - }) - .flatten() - .try_for_each_with(send, |s, results| { + }) + .flatten() + .try_for_each_with(send, |s, results| { if let Err(e) = s.send(results) { Err(format!("Unable to send internal data: {:?}", e)) } else { @@ -133,7 +140,7 @@ pub fn mastiff_manysearch>( // join the writer thread if let Err(e) = thrd.join() { eprintln!("Unable to join internal thread: {:?}", e); - } + } // done! let i: usize = processed_sigs.fetch_max(0, atomic::Ordering::SeqCst); @@ -143,12 +150,16 @@ pub fn mastiff_manysearch>( let failed_paths = failed_paths.load(atomic::Ordering::SeqCst); if skipped_paths > 0 { - eprintln!("WARNING: skipped {} query paths - no compatible signatures.", - skipped_paths); + eprintln!( + "WARNING: skipped {} query paths - no compatible signatures.", + skipped_paths + ); } if failed_paths > 0 { - eprintln!("WARNING: {} query paths failed to load. See error messages above.", - failed_paths); + eprintln!( + "WARNING: {} query paths failed to load. See error messages above.", + failed_paths + ); } Ok(()) diff --git a/src/multisearch.rs b/src/multisearch.rs index 8a88261b..f7a1e444 100644 --- a/src/multisearch.rs +++ b/src/multisearch.rs @@ -1,7 +1,6 @@ +use anyhow::Result; /// multisearch: massively parallel in-memory sketch search. - use rayon::prelude::*; -use anyhow::Result; use std::fs::File; use std::io::{BufWriter, Write}; @@ -10,7 +9,6 @@ use std::path::Path; use std::sync::atomic; use std::sync::atomic::AtomicUsize; - use sourmash::signature::SigsTrait; use sourmash::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; use sourmash::sketch::Sketch; @@ -30,99 +28,105 @@ pub fn multisearch>( scaled: usize, output: Option

, ) -> Result<(), Box> { -// construct a MinHash template for loading. -let max_hash = max_hash_for_scaled(scaled as u64); -let template_mh = KmerMinHash::builder() - .num(0u32) - .ksize(ksize as u32) - .max_hash(max_hash) - .build(); -let template = Sketch::MinHash(template_mh); - -// Load all queries into memory at once. -let queries = load_sketches_from_zip_or_pathlist(&querylist, &template, ReportType::Query)?; - -// Load all against sketches into memory at once. -let against = load_sketches_from_zip_or_pathlist(&againstlist, &template, ReportType::Against)?; - - // set up a multi-producer, single-consumer channel. -let (send, recv) = std::sync::mpsc::sync_channel(rayon::current_num_threads()); - -// & spawn a thread that is dedicated to printing to a buffered output -let out: Box = match output { - Some(path) => Box::new(BufWriter::new(File::create(path).unwrap())), - None => Box::new(std::io::stdout()), -}; -let thrd = std::thread::spawn(move || { - let mut writer = BufWriter::new(out); - writeln!(&mut writer, "query_name,query_md5,match_name,match_md5,containment,max_containment,jaccard,intersect_hashes").unwrap(); - for (query, query_md5, m, m_md5, cont, max_cont, jaccard, overlap) in recv.into_iter() { - writeln!(&mut writer, "\"{}\",{},\"{}\",{},{},{},{},{}", - query, query_md5, m, m_md5, cont, max_cont, jaccard, overlap).ok(); - } -}); - -// -// Main loop: iterate (in parallel) over all search signature paths, -// loading them individually and searching them. Stuff results into -// the writer thread above. -// - -let processed_cmp = AtomicUsize::new(0); - -let send = against - .par_iter() - .filter_map(|target| { - let mut results = vec![]; - - // search for matches & save containment. - for q in queries.iter() { - let i = processed_cmp.fetch_add(1, atomic::Ordering::SeqCst); - if i % 100000 == 0 { - eprintln!("Processed {} comparisons", i); + // construct a MinHash template for loading. + let max_hash = max_hash_for_scaled(scaled as u64); + let template_mh = KmerMinHash::builder() + .num(0u32) + .ksize(ksize as u32) + .max_hash(max_hash) + .build(); + let template = Sketch::MinHash(template_mh); + + // Load all queries into memory at once. + let queries = load_sketches_from_zip_or_pathlist(&querylist, &template, ReportType::Query)?; + + // Load all against sketches into memory at once. + let against = load_sketches_from_zip_or_pathlist(&againstlist, &template, ReportType::Against)?; + + // set up a multi-producer, single-consumer channel. + let (send, recv) = std::sync::mpsc::sync_channel(rayon::current_num_threads()); + + // & spawn a thread that is dedicated to printing to a buffered output + let out: Box = match output { + Some(path) => Box::new(BufWriter::new(File::create(path).unwrap())), + None => Box::new(std::io::stdout()), + }; + let thrd = std::thread::spawn(move || { + let mut writer = BufWriter::new(out); + writeln!(&mut writer, "query_name,query_md5,match_name,match_md5,containment,max_containment,jaccard,intersect_hashes").unwrap(); + for (query, query_md5, m, m_md5, cont, max_cont, jaccard, overlap) in recv.into_iter() { + writeln!( + &mut writer, + "\"{}\",{},\"{}\",{},{},{},{},{}", + query, query_md5, m, m_md5, cont, max_cont, jaccard, overlap + ) + .ok(); + } + }); + + // + // Main loop: iterate (in parallel) over all search signature paths, + // loading them individually and searching them. Stuff results into + // the writer thread above. + // + + let processed_cmp = AtomicUsize::new(0); + + let send = against + .par_iter() + .filter_map(|target| { + let mut results = vec![]; + + // search for matches & save containment. + for q in queries.iter() { + let i = processed_cmp.fetch_add(1, atomic::Ordering::SeqCst); + if i % 100000 == 0 { + eprintln!("Processed {} comparisons", i); + } + + let overlap = q.minhash.count_common(&target.minhash, false).unwrap() as f64; + let query_size = q.minhash.size() as f64; + let target_size = target.minhash.size() as f64; + + let containment_query_in_target = overlap / query_size; + let containment_in_target = overlap / target_size; + let max_containment = containment_query_in_target.max(containment_in_target); + let jaccard = overlap / (target_size + query_size - overlap); + + if containment_query_in_target > threshold { + results.push(( + q.name.clone(), + q.md5sum.clone(), + target.name.clone(), + target.md5sum.clone(), + containment_query_in_target, + max_containment, + jaccard, + overlap, + )) + } } - - let overlap = q.minhash.count_common(&target.minhash, false).unwrap() as f64; - let query_size = q.minhash.size() as f64; - let target_size = target.minhash.size() as f64; - - let containment_query_in_target = overlap / query_size; - let containment_in_target = overlap / target_size; - let max_containment = containment_query_in_target.max(containment_in_target); - let jaccard = overlap / (target_size + query_size - overlap); - - if containment_query_in_target > threshold { - results.push((q.name.clone(), - q.md5sum.clone(), - target.name.clone(), - target.md5sum.clone(), - containment_query_in_target, - max_containment, - jaccard, - overlap)) + if results.is_empty() { + None + } else { + Some(results) } - } - if results.is_empty() { - None - } else { - Some(results) - } - }) - .flatten() - .try_for_each_with(send, |s, m| s.send(m)); + }) + .flatten() + .try_for_each_with(send, |s, m| s.send(m)); -// do some cleanup and error handling - -if let Err(e) = send { - eprintln!("Unable to send internal data: {:?}", e); -} + // do some cleanup and error handling - + if let Err(e) = send { + eprintln!("Unable to send internal data: {:?}", e); + } -if let Err(e) = thrd.join() { - eprintln!("Unable to join internal thread: {:?}", e); -} + if let Err(e) = thrd.join() { + eprintln!("Unable to join internal thread: {:?}", e); + } -// done! -let i: usize = processed_cmp.fetch_max(0, atomic::Ordering::SeqCst); -eprintln!("DONE. Processed {} comparisons", i); + // done! + let i: usize = processed_cmp.fetch_max(0, atomic::Ordering::SeqCst); + eprintln!("DONE. Processed {} comparisons", i); -Ok(()) -} \ No newline at end of file + Ok(()) +} diff --git a/src/utils.rs b/src/utils.rs index eb52073d..0e8e26dd 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,5 +1,4 @@ /// Utility functions for pyo3_branchwater. - use rayon::prelude::*; use std::fs::File; @@ -7,23 +6,23 @@ use std::io::Read; use std::io::{BufRead, BufReader, BufWriter, Write}; use std::path::{Path, PathBuf}; -use zip::read::ZipArchive; use tempfile::tempdir; +use zip::read::ZipArchive; use std::sync::atomic; use std::sync::atomic::AtomicUsize; use std::collections::BinaryHeap; -use anyhow::{Result, anyhow}; +use anyhow::{anyhow, Result}; -use std::cmp::{PartialOrd, Ordering}; +use std::cmp::{Ordering, PartialOrd}; +use sourmash::prelude::FracMinHashOps; +use sourmash::prelude::MinHashOps; use sourmash::signature::{Signature, SigsTrait}; use sourmash::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; use sourmash::sketch::Sketch; -use sourmash::prelude::MinHashOps; -use sourmash::prelude::FracMinHashOps; // use tempfile::tempdir; /// Track a name/minhash. @@ -100,7 +99,6 @@ pub fn check_compatible_downsample( Ok(()) } - /// Given a vec of search Signatures, each containing one or more sketches, /// and a template Sketch, return a compatible (& now downsampled) /// Sketch from the search Signatures.. @@ -108,9 +106,11 @@ pub fn check_compatible_downsample( /// CTB note: this will return the first acceptable match, I think, ignoring /// all others. - -pub fn prepare_query(search_sigs: &[Signature], template: &Sketch, location: &str) -> Option { - +pub fn prepare_query( + search_sigs: &[Signature], + template: &Sketch, + location: &str, +) -> Option { for search_sig in search_sigs.iter() { // find exact match for template? if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) { @@ -118,7 +118,7 @@ pub fn prepare_query(search_sigs: &[Signature], template: &Sketch, location: &st location: location.to_string().clone(), name: search_sig.name(), md5sum: mh.md5sum(), - minhash: mh.clone() + minhash: mh.clone(), }); } else { // no - try to find one that can be downsampled @@ -159,10 +159,7 @@ pub fn prefetch( let overlap = searchsig.count_common(query_mh, false); if let Ok(overlap) = overlap { if overlap >= threshold_hashes { - let result = PrefetchResult { - overlap, - ..result - }; + let result = PrefetchResult { overlap, ..result }; mm = Some(result); } } @@ -175,7 +172,7 @@ pub fn prefetch( pub fn write_prefetch + std::fmt::Debug + std::fmt::Display + Clone>( query: &SmallSignature, prefetch_output: Option

, - matchlist: &BinaryHeap + matchlist: &BinaryHeap, ) -> Result<()> { // Set up a writer for prefetch output let prefetch_out: Box = match prefetch_output { @@ -183,32 +180,39 @@ pub fn write_prefetch + std::fmt::Debug + std::fmt::Display + Clo None => Box::new(std::io::stdout()), }; let mut writer = BufWriter::new(prefetch_out); - writeln!(&mut writer, "query_filename,query_name,query_md5,match_name,match_md5,intersect_bp").ok(); + writeln!( + &mut writer, + "query_filename,query_name,query_md5,match_name,match_md5,intersect_bp" + ) + .ok(); for m in matchlist.iter() { - writeln!(&mut writer, "{},\"{}\",{},\"{}\",{},{}", query.location, - query.name, query.md5sum, - m.name, m.md5sum, m.overlap).ok(); + writeln!( + &mut writer, + "{},\"{}\",{},\"{}\",{},{}", + query.location, query.name, query.md5sum, m.name, m.md5sum, m.overlap + ) + .ok(); } Ok(()) } /// Load a list of filenames from a file. Exits on bad lines. -pub fn load_sketchlist_filenames>(sketchlist_filename: &P) -> - Result> -{ +pub fn load_sketchlist_filenames>(sketchlist_filename: &P) -> Result> { let sketchlist_file = BufReader::new(File::open(sketchlist_filename)?); - let mut sketchlist_filenames : Vec = Vec::new(); + let mut sketchlist_filenames: Vec = Vec::new(); for line in sketchlist_file.lines() { let line = match line { Ok(v) => v, - Err(_) => return { - let filename = sketchlist_filename.as_ref().display(); - let msg = format!("invalid line in fromfile '{}'", filename); - Err(anyhow!(msg)) - }, + Err(_) => { + return { + let filename = sketchlist_filename.as_ref().display(); + let msg = format!("invalid line in fromfile '{}'", filename); + Err(anyhow!(msg)) + } + } }; if !line.is_empty() { @@ -220,7 +224,6 @@ pub fn load_sketchlist_filenames>(sketchlist_filename: &P) -> Ok(sketchlist_filenames) } - /// Loads signature file paths from a ZIP archive. /// /// This function extracts the contents of a ZIP archive containing @@ -259,7 +262,11 @@ pub fn load_sigpaths_from_zip>( let mut sig = Vec::new(); file.read_to_end(&mut sig)?; - let file_name = Path::new(file.name()).file_name().unwrap().to_str().unwrap(); + let file_name = Path::new(file.name()) + .file_name() + .unwrap() + .to_str() + .unwrap(); if file_name.ends_with(".sig") || file_name.ends_with(".sig.gz") { let mut new_file = File::create(temp_dir.path().join(file_name))?; new_file.write_all(&sig)?; @@ -268,20 +275,30 @@ pub fn load_sigpaths_from_zip>( signature_paths.push(temp_dir.path().join(file_name)); } } - println!("loaded paths for {} signature files from zipfile {}", signature_paths.len(), zip_path.as_ref().display()); + println!( + "loaded paths for {} signature files from zipfile {}", + signature_paths.len(), + zip_path.as_ref().display() + ); Ok((signature_paths, temp_dir)) } -pub fn load_fasta_fromfile>(sketchlist_filename: &P) -> Result> { +pub fn load_fasta_fromfile>( + sketchlist_filename: &P, +) -> Result> { let mut rdr = csv::Reader::from_path(sketchlist_filename)?; // Check for right header let headers = rdr.headers()?; - if headers.len() != 3 || - headers.get(0).unwrap() != "name" || - headers.get(1).unwrap() != "genome_filename" || - headers.get(2).unwrap() != "protein_filename" { - return Err(anyhow!("Invalid header. Expected 'name,genome_filename,protein_filename', but got '{}'", headers.iter().collect::>().join(","))); + if headers.len() != 3 + || headers.get(0).unwrap() != "name" + || headers.get(1).unwrap() != "genome_filename" + || headers.get(2).unwrap() != "protein_filename" + { + return Err(anyhow!( + "Invalid header. Expected 'name,genome_filename,protein_filename', but got '{}'", + headers.iter().collect::>().join(",") + )); } let mut results = Vec::new(); @@ -304,15 +321,26 @@ pub fn load_fasta_fromfile>(sketchlist_filename: &P) -> Result>(sketchlist_filename: &P) -> Result 0 { println!("Warning: {} duplicated rows were skipped.", duplicate_count); } - println!("Loaded {} rows in total ({} genome and {} protein files)", row_count, genome_count, protein_count); + println!( + "Loaded {} rows in total ({} genome and {} protein files)", + row_count, genome_count, protein_count + ); Ok(results) } - /// Load a collection of sketches from a file in parallel. -pub fn load_sketches(sketchlist_paths: Vec, template: &Sketch) -> - Result<(Vec, usize, usize)> -{ +pub fn load_sketches( + sketchlist_paths: Vec, + template: &Sketch, +) -> Result<(Vec, usize, usize)> { let skipped_paths = AtomicUsize::new(0); let failed_paths = AtomicUsize::new(0); - let sketchlist : Vec = sketchlist_paths + let sketchlist: Vec = sketchlist_paths .par_iter() .filter_map(|m| { let filename = m.display().to_string(); @@ -347,7 +378,7 @@ pub fn load_sketches(sketchlist_paths: Vec, template: &Sketch) -> let _i = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } sm - }, + } Err(err) => { // failed to load from this path - print error & track. eprintln!("Sketch loading error: {}", err); @@ -371,10 +402,8 @@ pub fn load_sketches_above_threshold( sketchlist_paths: Vec, template: &Sketch, query: &KmerMinHash, - threshold_hashes: u64 -) -> - Result<(BinaryHeap, usize, usize)> -{ + threshold_hashes: u64, +) -> Result<(BinaryHeap, usize, usize)> { let skipped_paths = AtomicUsize::new(0); let failed_paths = AtomicUsize::new(0); @@ -388,8 +417,7 @@ pub fn load_sketches_above_threshold( Ok(sigs) => { let mut mm = None; - if let Some(sm) = prepare_query(&sigs, template, - &location) { + if let Some(sm) = prepare_query(&sigs, template, &location) { let mh = sm.minhash; if let Ok(overlap) = mh.count_common(query, false) { if overlap >= threshold_hashes { @@ -403,8 +431,7 @@ pub fn load_sketches_above_threshold( } } } else { - eprintln!("WARNING: no compatible sketches in path '{}'", - m.display()); + eprintln!("WARNING: no compatible sketches in path '{}'", m.display()); let _i = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } mm @@ -412,8 +439,10 @@ pub fn load_sketches_above_threshold( Err(err) => { eprintln!("Sketch loading error: {}", err); let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); - eprintln!("WARNING: could not load sketches from path '{}'", - m.display()); + eprintln!( + "WARNING: could not load sketches from path '{}'", + m.display() + ); None } } @@ -426,7 +455,6 @@ pub fn load_sketches_above_threshold( Ok((matchlist, skipped_paths, failed_paths)) } - /// Loads all compatible sketches from a ZIP archive at the given path into memory. /// Currently not parallelized; use a different zip crate to enable parallelization. /// @@ -460,13 +488,20 @@ pub fn load_sketches_from_zip>( // loop through, loading signatures for i in 0..zip_archive.len() { let mut file = zip_archive.by_index(i)?; - let file_name = Path::new(file.name()).file_name().unwrap().to_str().unwrap().to_owned(); + let file_name = Path::new(file.name()) + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_owned(); if !file_name.ends_with(".sig") && !file_name.ends_with(".sig.gz") { continue; } if let Ok(sigs) = Signature::from_reader(&mut file) { - if let Some(sm) = prepare_query(&sigs, template, &zip_path.as_ref().display().to_string()) { + if let Some(sm) = + prepare_query(&sigs, template, &zip_path.as_ref().display().to_string()) + { sketchlist.push(sm); } else { // track number of paths that have no matching sigs @@ -483,7 +518,6 @@ pub fn load_sketches_from_zip>( Ok((sketchlist, skipped_paths, failed_paths)) } - /// Control function to read signature FILE PATHS from an input file. /// If a ZIP archive is provided (detected via extension), /// use `load_sigpaths_from_zip`. Otherwise, assume the @@ -503,9 +537,17 @@ pub fn load_sketches_from_zip>( pub fn load_sigpaths_from_zip_or_pathlist>( sketchlist_path: P, ) -> Result<(Vec, Option)> { - eprintln!("Reading list of filepaths from: '{}'", sketchlist_path.as_ref().display()); - - let result = if sketchlist_path.as_ref().extension().map(|ext| ext == "zip").unwrap_or(false) { + eprintln!( + "Reading list of filepaths from: '{}'", + sketchlist_path.as_ref().display() + ); + + let result = if sketchlist_path + .as_ref() + .extension() + .map(|ext| ext == "zip") + .unwrap_or(false) + { let (paths, tempdir) = load_sigpaths_from_zip(&sketchlist_path)?; (paths, Some(tempdir)) } else { @@ -553,15 +595,23 @@ pub fn load_sketches_from_zip_or_pathlist>( template: &Sketch, report_type: ReportType, ) -> Result> { - eprintln!("Reading list of {} paths from: '{}'", report_type, sketchlist_path.as_ref().display()); - - let (sketchlist, skipped_paths, failed_paths) = - if sketchlist_path.as_ref().extension().map(|ext| ext == "zip").unwrap_or(false) { - load_sketches_from_zip(sketchlist_path, template)? - } else { - let sketch_paths = load_sketchlist_filenames(&sketchlist_path)?; - load_sketches(sketch_paths, template)? - }; + eprintln!( + "Reading list of {} paths from: '{}'", + report_type, + sketchlist_path.as_ref().display() + ); + + let (sketchlist, skipped_paths, failed_paths) = if sketchlist_path + .as_ref() + .extension() + .map(|ext| ext == "zip") + .unwrap_or(false) + { + load_sketches_from_zip(sketchlist_path, template)? + } else { + let sketch_paths = load_sketchlist_filenames(&sketchlist_path)?; + load_sketches(sketch_paths, template)? + }; report_on_sketch_loading(&sketchlist, skipped_paths, failed_paths, report_type)?; @@ -596,19 +646,16 @@ pub fn report_on_sketch_loading( failed_paths: usize, report_type: ReportType, ) -> Result<()> { - if failed_paths > 0 { eprintln!( "WARNING: {} {} paths failed to load. See error messages above.", - failed_paths, - report_type + failed_paths, report_type ); } if skipped_paths > 0 { eprintln!( "WARNING: skipped {} {} paths - no compatible signatures.", - skipped_paths, - report_type + skipped_paths, report_type ); } @@ -635,7 +682,11 @@ pub fn consume_query_by_gather + std::fmt::Debug + std::fmt::Disp None => Box::new(std::io::stdout()), }; let mut writer = BufWriter::new(gather_out); - writeln!(&mut writer, "query_filename,rank,query_name,query_md5,match_name,match_md5,intersect_bp").ok(); + writeln!( + &mut writer, + "query_filename,rank,query_name,query_md5,match_name,match_md5,intersect_bp" + ) + .ok(); let mut matching_sketches = matchlist; let mut rank = 0; @@ -646,8 +697,13 @@ pub fn consume_query_by_gather + std::fmt::Debug + std::fmt::Disp let location = query.location; let mut query_mh = query.minhash; - eprintln!("{} iter {}: start: query hashes={} matches={}", location, rank, - query_mh.size(), matching_sketches.len()); + eprintln!( + "{} iter {}: start: query hashes={} matches={}", + location, + rank, + query_mh.size(), + matching_sketches.len() + ); while !matching_sketches.is_empty() { let best_element = matching_sketches.peek().unwrap(); @@ -655,10 +711,18 @@ pub fn consume_query_by_gather + std::fmt::Debug + std::fmt::Disp // remove! query_mh.remove_from(&best_element.minhash)?; - writeln!(&mut writer, "{},{},\"{}\",{},\"{}\",{},{}", location, rank, - query.name, query.md5sum, - best_element.name, best_element.md5sum, - best_element.overlap).ok(); + writeln!( + &mut writer, + "{},{},\"{}\",{},\"{}\",{},{}", + location, + rank, + query.name, + query.md5sum, + best_element.name, + best_element.md5sum, + best_element.overlap + ) + .ok(); // recalculate remaining overlaps between query and all sketches. // note: this is parallelized. @@ -668,16 +732,21 @@ pub fn consume_query_by_gather + std::fmt::Debug + std::fmt::Disp let sub_hashes = last_hashes - query_mh.size(); let sub_matches = last_matches - matching_sketches.len(); - eprintln!("{} iter {}: remaining: query hashes={}(-{}) matches={}(-{})", location, rank, - query_mh.size(), sub_hashes, matching_sketches.len(), sub_matches); + eprintln!( + "{} iter {}: remaining: query hashes={}(-{}) matches={}(-{})", + location, + rank, + query_mh.size(), + sub_hashes, + matching_sketches.len(), + sub_matches + ); last_hashes = query_mh.size(); last_matches = matching_sketches.len(); - } Ok(()) } - pub fn build_template(ksize: u8, scaled: usize) -> Sketch { let max_hash = max_hash_for_scaled(scaled as u64); @@ -689,7 +758,6 @@ pub fn build_template(ksize: u8, scaled: usize) -> Sketch { Sketch::MinHash(template_mh) } - pub fn is_revindex_database(path: &Path) -> bool { // quick file check for Revindex database: // is path a directory that contains a file named 'CURRENT'? @@ -714,14 +782,23 @@ pub struct SearchResult { impl ResultType for SearchResult { fn header_fields() -> Vec<&'static str> { - vec!["query_name", "query_md5", "match_name", "containment", "intersect_hashes", "match_md5", "jaccard", "max_containment"] + vec![ + "query_name", + "query_md5", + "match_name", + "containment", + "intersect_hashes", + "match_md5", + "jaccard", + "max_containment", + ] } fn format_fields(&self) -> Vec { vec![ - format!("\"{}\"", self.query_name), // Wrap query_name with quotes + format!("\"{}\"", self.query_name), // Wrap query_name with quotes self.query_md5.clone(), - format!("\"{}\"", self.match_name), // Wrap match_name with quotes + format!("\"{}\"", self.match_name), // Wrap match_name with quotes self.containment.to_string(), self.intersect_hashes.to_string(), match &self.match_md5 { @@ -735,7 +812,7 @@ impl ResultType for SearchResult { match &self.max_containment { Some(max_containment) => max_containment.to_string(), None => "".to_string(), - } + }, ] } } @@ -763,7 +840,19 @@ pub fn bool_to_python_string(b: bool) -> String { impl ResultType for ManifestRow { fn header_fields() -> Vec<&'static str> { - vec!["internal_location", "md5", "md5short", "ksize", "moltype", "num", "scaled", "n_hashes", "with_abundance", "name", "filename"] + vec![ + "internal_location", + "md5", + "md5short", + "ksize", + "moltype", + "num", + "scaled", + "n_hashes", + "with_abundance", + "name", + "filename", + ] } fn format_fields(&self) -> Vec { @@ -777,13 +866,22 @@ impl ResultType for ManifestRow { self.scaled.to_string(), self.n_hashes.to_string(), bool_to_python_string(self.with_abundance), - format!("\"{}\"", self.name), // Wrap name with quotes + format!("\"{}\"", self.name), // Wrap name with quotes self.filename.clone(), ] } } -pub fn make_manifest_row(sig: &Signature, filename: &Path, internal_location: &str, scaled: u64, num: u32, abund: bool, is_dna: bool, is_protein: bool) -> ManifestRow { +pub fn make_manifest_row( + sig: &Signature, + filename: &Path, + internal_location: &str, + scaled: u64, + num: u32, + abund: bool, + is_dna: bool, + is_protein: bool, +) -> ManifestRow { if is_dna && is_protein { panic!("Both is_dna and is_protein cannot be true at the same time."); } else if !is_dna && !is_protein { @@ -811,9 +909,7 @@ pub fn make_manifest_row(sig: &Signature, filename: &Path, internal_location: &s } } -pub fn open_stdout_or_file>( - output: Option

-) -> Box { +pub fn open_stdout_or_file>(output: Option

) -> Box { // if output is a file, use open_output_file if let Some(path) = output { Box::new(open_output_file(&path)) @@ -822,12 +918,10 @@ pub fn open_stdout_or_file>( } } -pub fn open_output_file>( - output: &P -) -> BufWriter { +pub fn open_output_file>(output: &P) -> BufWriter { let file = File::create(output).unwrap_or_else(|e| { eprintln!("Error creating output file: {:?}", e); - std::process::exit(1); + std::process::exit(1); }); BufWriter::new(file) } @@ -862,7 +956,6 @@ pub enum ZipMessage { WriteManifest, } - pub fn sigwriter + Send + 'static>( recv: std::sync::mpsc::Receiver, output: String, @@ -870,18 +963,20 @@ pub fn sigwriter + Send + 'static>( std::thread::spawn(move || -> Result<()> { let file_writer = open_output_file(&output); - let options = zip::write::FileOptions::default().compression_method(zip::CompressionMethod::Stored); + let options = + zip::write::FileOptions::default().compression_method(zip::CompressionMethod::Stored); let mut zip = zip::ZipWriter::new(file_writer); let mut manifest_rows: Vec = Vec::new(); // keep track of md5sum occurrences to prevent overwriting duplicates - let mut md5sum_occurrences: std::collections::HashMap = std::collections::HashMap::new(); + let mut md5sum_occurrences: std::collections::HashMap = + std::collections::HashMap::new(); while let Ok(message) = recv.recv() { match message { ZipMessage::SignatureData(sigs, params, filename) => { if sigs.len() != params.len() { bail!("Mismatched lengths of signatures and parameters"); - } + } for (sig, param) in sigs.iter().zip(params.iter()) { let md5sum_str = sig.md5sum(); let count = md5sum_occurrences.entry(md5sum_str.clone()).or_insert(0); @@ -892,9 +987,18 @@ pub fn sigwriter + Send + 'static>( format!("signatures/{}.sig.gz", md5sum_str) }; write_signature(sig, &mut zip, options, &sig_filename); - manifest_rows.push(make_manifest_row(sig, &filename, &sig_filename, param.scaled, param.num, param.track_abundance, param.is_dna, param.is_protein)); + manifest_rows.push(make_manifest_row( + sig, + &filename, + &sig_filename, + param.scaled, + param.num, + param.track_abundance, + param.is_dna, + param.is_protein, + )); } - }, + } ZipMessage::WriteManifest => { println!("Writing manifest"); // Start the CSV file inside the zip @@ -910,7 +1014,7 @@ pub fn sigwriter + Send + 'static>( // Write each manifest row for row in &manifest_rows { - let formatted_fields = row.format_fields(); // Assuming you have a format_fields method on ManifestRow + let formatted_fields = row.format_fields(); // Assuming you have a format_fields method on ManifestRow if let Err(e) = writeln!(&mut zip, "{}", formatted_fields.join(",")) { eprintln!("Error writing item: {:?}", e); } @@ -959,7 +1063,6 @@ where }) } - pub fn write_signature( sig: &Signature, zip: &mut zip::ZipWriter>, @@ -976,7 +1079,8 @@ pub fn write_signature( Box::new(&mut buffer), niffler::compression::Format::Gzip, niffler::compression::Level::Nine, - ).unwrap(); + ) + .unwrap(); gz_writer.write_all(&json_bytes).unwrap(); } buffer.into_inner()