diff --git a/src/check.rs b/src/check.rs index 7fea2eca..1311318c 100644 --- a/src/check.rs +++ b/src/check.rs @@ -1,19 +1,17 @@ -use std::path::Path; - use crate::utils::is_revindex_database; use sourmash::index::revindex::{RevIndex, RevIndexOps}; -pub fn check>(index: P, quick: bool) -> Result<(), Box> { - if !is_revindex_database(index.as_ref()) { +pub fn check(index: camino::Utf8PathBuf, quick: bool) -> Result<(), Box> { + if !is_revindex_database(&index) { bail!( "'{}' is not a valid RevIndex database", - index.as_ref().display() + index ); } println!("Opening DB"); - let db = RevIndex::open(index.as_ref(), true)?; + let db = RevIndex::open(index, true)?; println!("Starting check"); db.check(quick); diff --git a/src/fastgather.rs b/src/fastgather.rs index 963a6232..2fef7522 100644 --- a/src/fastgather.rs +++ b/src/fastgather.rs @@ -1,50 +1,39 @@ /// fastgather: Run gather with a query against a list of files. use anyhow::Result; -use sourmash::signature::Signature; use sourmash::sketch::Sketch; -use std::path::Path; +use sourmash::signature::Signature; +use sourmash::selection::Selection; +use camino; +use std::collections::BinaryHeap; +use crate::utils::PrefetchResult; use crate::utils::{ - consume_query_by_gather, load_sigpaths_from_zip_or_pathlist, load_sketches_above_threshold, - prepare_query, write_prefetch, ReportType, + consume_query_by_gather, load_sketches_above_threshold, write_prefetch, ReportType, load_collection }; -pub fn fastgather + std::fmt::Debug + std::fmt::Display + Clone>( - query_filename: P, - matchlist_filename: P, +pub fn fastgather( + query_filepath: camino::Utf8PathBuf, + against_filepath: camino::Utf8PathBuf, threshold_bp: usize, ksize: u8, scaled: usize, - template: Sketch, - gather_output: Option

, - prefetch_output: Option

, + selection: &Selection, + gather_output: Option, + prefetch_output: Option, ) -> Result<()> { - let location = query_filename.to_string(); - eprintln!("Loading query from '{}'", location); - let query = { - let sigs = Signature::from_path(query_filename)?; - - prepare_query(&sigs, &template, &location) - }; - // did we find anything matching the desired template? - let query = match query { - Some(query) => query, - None => bail!("No sketch found with scaled={}, k={}", scaled, ksize), - }; - - // build the list of paths to match against. - eprintln!( - "Loading matchlist from '{}'", - matchlist_filename.as_ref().display() - ); - let matchlist_filename = matchlist_filename.as_ref().to_string_lossy().to_string(); - let (matchlist_paths, _temp_dir) = - load_sigpaths_from_zip_or_pathlist(matchlist_filename, &template, ReportType::Against)?; + let query_collection = load_collection(&query_filepath, selection, ReportType::Query)?; - eprintln!("Loaded {} sig paths in matchlist", matchlist_paths.len()); + if query_collection.len() > 1 { + bail!("Found more than one compatible sketch from '{}'. Fastgather requires a single query sketch.", &query_filepath) + } + // build the list of paths to match against. + eprintln!("Loading matchlist from '{}'", against_filepath); + let against_collection = load_collection(&against_filepath, selection, ReportType::Against)?; + eprintln!("Loaded {} sig paths in matchlist", against_collection.len()); + // calculate the minimum number of hashes based on desired threshold let threshold_hashes: u64 = { let x = threshold_bp / scaled; @@ -60,41 +49,131 @@ pub fn fastgather + std::fmt::Debug + std::fmt::Display + Clone>( "using threshold overlap: {} {}", threshold_hashes, threshold_bp ); - - // load a set of sketches, filtering for those with overlaps > threshold - let result = load_sketches_above_threshold( - matchlist_paths, - &template, - &query.minhash, - threshold_hashes, - )?; - let matchlist = result.0; - let skipped_paths = result.1; - let failed_paths = result.2; - - if skipped_paths > 0 { - eprintln!( - "WARNING: skipped {} search paths - no compatible signatures.", - skipped_paths - ); - } - if failed_paths > 0 { - eprintln!( - "WARNING: {} search paths failed to load. See error messages above.", - failed_paths - ); - } - - if matchlist.is_empty() { - eprintln!("No search signatures loaded, exiting."); - return Ok(()); - } - - if prefetch_output.is_some() { - write_prefetch(&query, prefetch_output, &matchlist).ok(); - } - - // run the gather! - consume_query_by_gather(query, matchlist, threshold_hashes, gather_output).ok(); + query_collection.iter().for_each(|(idx, record)| { + // Load query sig + match query_collection.sig_for_dataset(idx) { + Ok(query_sig) => { + let location = query_sig.filename(); + let mut matchlist: BinaryHeap = BinaryHeap::new(); + let mut skipped_paths = 0; + let mut failed_paths = 0; + + for sketch in query_sig.iter() { + // Access query MinHash + if let Sketch::MinHash(query) = sketch { + let result = load_sketches_above_threshold( + against_collection, + &selection, + &query, + threshold_hashes, + ); + + match result { + Ok((loaded_matchlist, skipped, failed)) => { + matchlist.extend(loaded_matchlist); + skipped_paths += skipped; + failed_paths += failed; + } + Err(err) => { + eprintln!("Error loading sketches: {:?}", err); + failed_paths += 1; + } + } + } + } + + if skipped_paths > 0 { + eprintln!( + "WARNING: Skipped {} search paths - no compatible signatures.", + skipped_paths + ); + } + if failed_paths > 0 { + eprintln!( + "WARNING: {} search paths failed to load. See error messages above.", + failed_paths + ); + } + + if matchlist.is_empty() { + eprintln!("No search signatures loaded for '{}', exiting.", location); + return; // Return early if no search signatures loaded + } + + if let Some(prefetch_output) = &prefetch_output { + write_prefetch(&query_sig, Some(prefetch_output.clone()), &matchlist).ok(); + } + + // Run the gather! + if let Some(gather_output) = &gather_output { + if let Err(err) = consume_query_by_gather(query_sig, matchlist, threshold_hashes, Some(gather_output)) { + eprintln!("Error during gather: {:?}", err); + } + } + } + Err(_) => { + eprintln!("WARNING: Could not load query sketch '{}'", record.internal_location()); + } + } + }); Ok(()) } + +// query_collection.iter().for_each(|(idx, record)| { +// // Load query sig +// match query_collection.sig_for_dataset(idx) { +// Ok(query_sig) => { +// let location = query_sig.filename(); +// for sketch in query_sig.iter() { +// // Access query MinHash +// if let Sketch::MinHash(query) = sketch { +// let matchlist: BinaryHeap = sketchlist +// .par_iter() +// .filter_map(|sm| { +// // Call a function to load sketches above threshold +// let result = load_sketches_above_threshold( +// against_collection, +// &selection, +// &query, +// threshold_hashes, +// )?; +// let matchlist = result.0; +// let skipped_paths = result.1; +// let failed_paths = result.2; + +// if skipped_paths > 0 { +// eprintln!( +// "WARNING: skipped {} search paths - no compatible signatures.", +// skipped_paths +// ); +// } +// if failed_paths > 0 { +// eprintln!( +// "WARNING: {} search paths failed to load. See error messages above.", +// failed_paths +// ); +// } + +// if matchlist.is_empty() { +// eprintln!("No search signatures loaded, exiting."); +// return Ok(()); +// } + +// if prefetch_output.is_some() { +// write_prefetch(&query_sig, prefetch_output, &matchlist).ok(); +// } + +// // run the gather! +// consume_query_by_gather(query_sig, matchlist, threshold_hashes, gather_output).ok(); +// }); +// } +// } +// } +// } +// Err(_) => { +// eprintln!("WARNING: Could not load query sketch '{}'", record.internal_location()); +// } +// } +// }); +// Ok(()) +// } diff --git a/src/fastmultigather.rs b/src/fastmultigather.rs index 915b6370..70850a37 100644 --- a/src/fastmultigather.rs +++ b/src/fastmultigather.rs @@ -2,32 +2,33 @@ use anyhow::Result; use rayon::prelude::*; -use sourmash::signature::Signature; +use sourmash::storage::SigStore; +use sourmash::{selection, signature::Signature}; use sourmash::sketch::Sketch; -use std::path::Path; +use sourmash::selection::Selection; use std::sync::atomic; use std::sync::atomic::AtomicUsize; use std::collections::BinaryHeap; +use camino::Utf8PathBuf; + use crate::utils::{ - consume_query_by_gather, load_sigpaths_from_zip_or_pathlist, - load_sketches_from_zip_or_pathlist, prepare_query, write_prefetch, PrefetchResult, ReportType, + consume_query_by_gather, load_collection, load_sigpaths_from_zip_or_pathlist, load_sketches_from_zip_or_pathlist, prepare_query, write_prefetch, PrefetchResult, ReportType }; -pub fn fastmultigather + std::fmt::Debug + Clone>( - query_filenames: P, - matchlist_filename: P, +pub fn fastmultigather( + query_filepath: camino::Utf8PathBuf, + against_filepath: camino::Utf8PathBuf, threshold_bp: usize, scaled: usize, - template: Sketch, + // template: Sketch, + selection: &Selection, ) -> Result<()> { // load the list of query paths - let queryfile_name = query_filenames.as_ref().to_string_lossy().to_string(); - let (querylist_paths, _temp_dir) = - load_sigpaths_from_zip_or_pathlist(&query_filenames, &template, ReportType::Query)?; - println!("Loaded {} sig paths in querylist", querylist_paths.len()); + let query_collection = load_collection(&query_filepath, selection, ReportType::Query)?; + println!("Loaded {} sig paths in querylist", query_collection.len()); let threshold_hashes: u64 = { let x = threshold_bp / scaled; @@ -43,82 +44,77 @@ pub fn fastmultigather + std::fmt::Debug + Clone>( println!("threshold overlap: {} {}", threshold_hashes, threshold_bp); // Load all the against sketches - let sketchlist = - load_sketches_from_zip_or_pathlist(&matchlist_filename, &template, ReportType::Against)?; + let against_collection = load_collection(&against_filepath, selection, ReportType::Against)?; + // load actual signatures + let mut sketchlist: Vec = vec![]; + + for (idx, record) in against_collection.iter() { + if let Ok(sig) = against_collection.sig_for_dataset(idx) { + sketchlist.push(sig); + } else { + eprintln!("Failed to load 'against' record: {}", record.name()); + } + } // Iterate over all queries => do prefetch and gather! let processed_queries = AtomicUsize::new(0); let skipped_paths = AtomicUsize::new(0); let failed_paths = AtomicUsize::new(0); - querylist_paths.par_iter().for_each(|q| { - // increment counter of # of queries - let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst); - - // set query_label to the last path element. - let location = q.clone().into_os_string().into_string().unwrap(); - let location = location.split('/').last().unwrap().to_string(); - - let query = match Signature::from_path(dbg!(q)) { - Ok(sigs) => { - let mm = prepare_query(&sigs, &template, &location); - - if mm.is_none() { - if !queryfile_name.ends_with(".zip") { - eprintln!("WARNING: no compatible sketches in path '{}'", q.display()); + query_collection.par_iter().for_each(|(idx, record)| { + // increment counter of # of queries. q: could we instead use the index from par_iter()? + let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst); + // Load query sig + match query_collection.sig_for_dataset(idx) { + Ok(query_sig) => { + let location = query_sig.filename(); + for sketch in query_sig.iter() { + // Access query MinHash + if let Sketch::MinHash(query) = sketch { + let matchlist: BinaryHeap = sketchlist + .par_iter() + .filter_map(|sm| { + let mut mm = None; + // Access against MinHash + if let Some(sketch) = sm.sketches().get(0) { + if let Sketch::MinHash(against_sketch) = sketch { + if let Ok(overlap) = against_sketch.count_common(&query, false) { + if overlap >= threshold_hashes { + let result = PrefetchResult { + name: sm.name(), + md5sum: sm.md5sum().clone(), + minhash: against_sketch.clone(), + overlap, + }; + mm = Some(result); + } + } + } + } + mm + }) + .collect(); + if !matchlist.is_empty() { + let prefetch_output = format!("{}.prefetch.csv", location); + let gather_output = format!("{}.gather.csv", location); + + // Save initial list of matches to prefetch output + write_prefetch(&query_sig, Some(prefetch_output), &matchlist).ok(); + + // Now, do the gather! + consume_query_by_gather(query_sig.clone(), matchlist, threshold_hashes, Some(gather_output)).ok(); + } else { + println!("No matches to '{}'", location); } - let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } - mm - } - Err(err) => { - eprintln!("Sketch loading error: {}", err); - eprintln!( - "WARNING: could not load sketches from path '{}'", - q.display() - ); - let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); - None - } - }; - - if let Some(query) = query { - // filter first set of matches out of sketchlist - let matchlist: BinaryHeap = sketchlist - .par_iter() - .filter_map(|sm| { - let mut mm = None; - - if let Ok(overlap) = sm.minhash.count_common(&query.minhash, false) { - if overlap >= threshold_hashes { - let result = PrefetchResult { - name: sm.name.clone(), - md5sum: sm.md5sum.clone(), - minhash: sm.minhash.clone(), - overlap, - }; - mm = Some(result); - } - } - mm - }) - .collect(); - - if !matchlist.is_empty() { - let prefetch_output = format!("{location}.prefetch.csv"); - let gather_output = format!("{location}.gather.csv"); - - // save initial list of matches to prefetch output - write_prefetch(&query, Some(prefetch_output), &matchlist).ok(); - - // now, do the gather! - consume_query_by_gather(query, matchlist, threshold_hashes, Some(gather_output)) - .ok(); - } else { - println!("No matches to '{}'", location); } } - }); + Err(_) => { + eprintln!("WARNING: no compatible sketches in path '{}'", record.internal_location()); + let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + } + } +}); println!( "Processed {} queries total.", diff --git a/src/lib.rs b/src/lib.rs index 1d6a227d..18f8e9de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,11 +1,12 @@ /// Python interface Rust code for sourmash_plugin_branchwater. use pyo3::prelude::*; +use sourmash::selection; #[macro_use] extern crate simple_error; mod utils; -use crate::utils::build_template; +use crate::utils::{build_template, build_selection}; use crate::utils::is_revindex_database; mod check; mod fastgather; @@ -20,6 +21,8 @@ mod pairwise; use sourmash::encodings::HashFunctions; use sourmash::selection::Selection; +use camino::Utf8PathBuf; + #[pyfunction] fn do_manysearch( querylist_path: String, @@ -30,13 +33,19 @@ fn do_manysearch( moltype: String, output_path: Option, ) -> anyhow::Result { + + let queryfile_path: camino::Utf8PathBuf = querylist_path.clone().into(); + let againstfile_path: camino::Utf8PathBuf = siglist_path.clone().into(); + let selection = build_selection(ksize, scaled, &moltype); + // if siglist_path is revindex, run mastiff_manysearch; otherwise run manysearch let template = build_template(ksize, scaled, &moltype); - if is_revindex_database(siglist_path.as_ref()) { + if is_revindex_database(&againstfile_path) { + // if is_revindex_database(siglist_path.as_ref()) { match mastiff_manysearch::mastiff_manysearch( - querylist_path, - siglist_path, - template, + queryfile_path, + againstfile_path, + &selection, threshold, output_path, ) { @@ -74,14 +83,17 @@ fn do_fastgather( output_path_prefetch: Option, output_path_gather: Option, ) -> anyhow::Result { - let template = build_template(ksize, scaled, &moltype); + let queryfile_path: camino::Utf8PathBuf = query_filename.into(); + let againstfile_path: camino::Utf8PathBuf = siglist_path.into(); + let selection = build_selection(ksize, scaled, &moltype); + match fastgather::fastgather( - query_filename, - siglist_path, + queryfile_path, + againstfile_path, threshold_bp, ksize, scaled, - template, + &selection, output_path_prefetch, output_path_gather, ) { @@ -103,26 +115,17 @@ fn do_fastmultigather( moltype: String, output_path: Option, ) -> anyhow::Result { + + let queryfile_path: camino::Utf8PathBuf = query_filenames.into(); + let againstfile_path: camino::Utf8PathBuf = siglist_path.into(); + let selection = build_selection(ksize, scaled, &moltype); + // if a siglist path is a revindex, run mastiff_manygather. If not, run multigather - let template = build_template(ksize, scaled, &moltype); - if is_revindex_database(siglist_path.as_ref()) { - // build selection instead of template - let hash_function = match moltype.as_str() { - "dna" => HashFunctions::Murmur64Dna, - "protein" => HashFunctions::Murmur64Protein, - "dayhoff" => HashFunctions::Murmur64Dayhoff, - "hp" => HashFunctions::Murmur64Hp, - _ => panic!("Unknown molecule type: {}", moltype), - }; - let selection = Selection::builder() - .ksize(ksize.into()) - .scaled(scaled as u32) - .moltype(hash_function) - .build(); + if is_revindex_database(&againstfile_path) { match mastiff_manygather::mastiff_manygather( - query_filenames, - siglist_path, - selection, + queryfile_path, + againstfile_path, + &selection, threshold_bp, output_path, ) { @@ -134,11 +137,11 @@ fn do_fastmultigather( } } else { match fastmultigather::fastmultigather( - query_filenames, - siglist_path, + queryfile_path, + againstfile_path, threshold_bp, scaled, - template, + &selection, ) { Ok(_) => Ok(0), Err(e) => { @@ -176,21 +179,7 @@ fn do_index( save_paths: bool, colors: bool, ) -> anyhow::Result { - let hash_function = match moltype.as_str() { - "dna" => HashFunctions::Murmur64Dna, - "protein" => HashFunctions::Murmur64Protein, - "dayhoff" => HashFunctions::Murmur64Dayhoff, - "hp" => HashFunctions::Murmur64Hp, - _ => panic!("Unknown molecule type: {}", moltype), - }; - let selection = Selection::builder() - .ksize(ksize.into()) - .scaled(scaled as u32) - .moltype(hash_function) - .build(); - // match index::index(siglist, template, output, save_paths, colors) { - // convert siglist to PathBuf - // build template from ksize, scaled + let selection = build_selection(ksize, scaled, &moltype); let location = camino::Utf8PathBuf::from(siglist); let manifest = None; match index::index(location, manifest, selection, output, save_paths, colors) { @@ -204,7 +193,8 @@ fn do_index( #[pyfunction] fn do_check(index: String, quick: bool) -> anyhow::Result { - match check::check(index, quick) { + let idx: camino::Utf8PathBuf = index.into(); + match check::check(idx, quick) { Ok(_) => Ok(0), Err(e) => { eprintln!("Error: {e}"); @@ -223,6 +213,7 @@ fn do_multisearch( moltype: String, output_path: Option, ) -> anyhow::Result { + // let selection = build_selection(ksize, scaled, &moltype); let template = build_template(ksize, scaled, &moltype); match multisearch::multisearch( querylist_path, diff --git a/src/mastiff_manygather.rs b/src/mastiff_manygather.rs index e50eefc3..19da5728 100644 --- a/src/mastiff_manygather.rs +++ b/src/mastiff_manygather.rs @@ -19,26 +19,27 @@ use std::sync::atomic::AtomicUsize; use std::fs::File; use std::io::{BufWriter, Write}; -use crate::utils::{is_revindex_database, load_collection}; //, ReportType}; +use crate::utils::{is_revindex_database, load_collection, ReportType}; + pub fn mastiff_manygather>( - queries_file: String, - index: P, - selection: Selection, + queries_file: camino::Utf8PathBuf, + index: camino::Utf8PathBuf, + selection: &Selection, threshold_bp: usize, output: Option

, ) -> Result<(), Box> { - if !is_revindex_database(index.as_ref()) { + if !is_revindex_database(&index) { bail!( "'{}' is not a valid RevIndex database", - index.as_ref().display() + index ); } // Open database once - let db = RevIndex::open(index.as_ref(), true)?; + let db = RevIndex::open(index, true)?; println!("Loaded DB"); - let query_collection = load_collection(camino::Utf8PathBuf::from(queries_file), &selection)?; + let query_collection = load_collection(&queries_file, selection, ReportType::Query)?; // set up a multi-producer, single-consumer channel. let (send, recv) = std::sync::mpsc::sync_channel(rayon::current_num_threads()); @@ -81,6 +82,7 @@ pub fn mastiff_manygather>( let threshold = threshold_bp / selection.scaled()? as usize; match query_collection.sig_for_dataset(idx) { + // match query_collection.sig_from_record(record) { // to be added in core Ok(query_sig) => { let mut results = vec![]; let mut found_compatible_sketch = false; @@ -117,12 +119,7 @@ pub fn mastiff_manygather>( } } if !found_compatible_sketch { - // if !queryfile_name.ends_with(".zip") { - // eprintln!( - // "WARNING: no compatible sketches in path '{}'", - // filename.display() - // ); - // } + eprintln!("WARNING: no compatible sketches in path '{}'", query_sig.filename()); let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } diff --git a/src/mastiff_manysearch.rs b/src/mastiff_manysearch.rs index 40065d62..9d4a45b0 100644 --- a/src/mastiff_manysearch.rs +++ b/src/mastiff_manysearch.rs @@ -4,39 +4,42 @@ use rayon::prelude::*; use sourmash::index::revindex::{RevIndex, RevIndexOps}; use sourmash::signature::{Signature, SigsTrait}; use sourmash::sketch::Sketch; +use sourmash::selection::Selection; use std::path::Path; use std::sync::atomic; use std::sync::atomic::AtomicUsize; use crate::utils::{ - csvwriter_thread, is_revindex_database, load_sigpaths_from_zip_or_pathlist, prepare_query, + csvwriter_thread, is_revindex_database, load_collection, prepare_query, ReportType, SearchResult, }; pub fn mastiff_manysearch>( - queries_file: P, - index: P, - template: Sketch, + queries_path: camino::Utf8PathBuf, + index: camino::Utf8PathBuf, + selection: &Selection, minimum_containment: f64, output: Option

, ) -> Result<(), Box> { - if !is_revindex_database(index.as_ref()) { + if !is_revindex_database(&index) { bail!( "'{}' is not a valid RevIndex database", - index.as_ref().display() + index ); } // Open database once - let db = RevIndex::open(index.as_ref(), true)?; + let db = RevIndex::open(index, true)?; println!("Loaded DB"); // Load query paths - let queryfile_name = queries_file.as_ref().to_string_lossy().to_string(); - let (query_paths, _temp_dir) = - load_sigpaths_from_zip_or_pathlist(&queries_file, &template, ReportType::Query)?; + let query_collection = load_collection(&queries_path, selection, ReportType::Query)?; - // if query_paths is empty, exit with error - if query_paths.is_empty() { + // let queryfile_name = queries_file.as_ref().to_string_lossy().to_string(); + // let (query_paths, _temp_dir) = + // load_sigpaths_from_zip_or_pathlist(&queries_file, &template, ReportType::Query)?; + + // if query_paths is empty, exit with error. this should already happen via load_collection, i think? + if query_collection.len() == 0 { bail!("No query signatures loaded, exiting."); } @@ -56,53 +59,49 @@ pub fn mastiff_manysearch>( let skipped_paths = AtomicUsize::new(0); let failed_paths = AtomicUsize::new(0); - let send_result = query_paths + let send_result = query_collection .par_iter() - .filter_map(|filename| { + .filter_map(|(idx, record)| { let i = processed_sigs.fetch_add(1, atomic::Ordering::SeqCst); if i % 1000 == 0 { eprintln!("Processed {} search sigs", i); } let mut results = vec![]; - - // load query signature from path: - match Signature::from_path(filename) { + match query_collection.sig_for_dataset(idx) { Ok(query_sig) => { - let location = filename.display().to_string(); - if let Some(query) = prepare_query(&query_sig, &template, &location) { - let query_size = query.minhash.size() as f64; - // search mastiff db - let counter = db.counter_for_query(&query.minhash); - let matches = - db.matches_from_counter(counter, minimum_containment as usize); + for sketch in query_sig.iter() { + if let Sketch::MinHash(query_mh) = sketch { + // let location = query_sig.filename(); + let query_size = query_mh.size(); + let counter = db.counter_for_query(&query_mh); + let matches = db.matches_from_counter(counter, minimum_containment as usize); // filter the matches for containment - for (path, overlap) in matches { - let containment = overlap as f64 / query_size; - if containment >= minimum_containment { - results.push(SearchResult { - query_name: query.name.clone(), - query_md5: query.md5sum.clone(), - match_name: path.clone(), - containment, - intersect_hashes: overlap, - match_md5: None, - jaccard: None, - max_containment: None, - }); + for (path, overlap) in matches { + let containment = overlap as f64 / query_size as f64; + if containment >= minimum_containment { + results.push(SearchResult { + query_name: query_sig.name(), + query_md5: query_sig.md5sum(), + match_name: path.clone(), + containment, + intersect_hashes: overlap, + match_md5: None, + jaccard: None, + max_containment: None, + }); + } } - } - } else { + + } else { // for reading zips, this is likely not a useful warning and // would show up too often (every sig is stored as individual file). - if !queryfile_name.ends_with(".zip") { eprintln!( - "WARNING: no compatible sketches in path '{}'", - filename.display() + "WARNING: no compatible sketches in path '{}'", query_sig.filename() ); - } let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + } } if results.is_empty() { None @@ -115,7 +114,7 @@ pub fn mastiff_manysearch>( eprintln!("Sketch loading error: {}", err); eprintln!( "WARNING: could not load sketches from path '{}'", - filename.display() + record.internal_location() ); None } diff --git a/src/python/tests/test_multigather.py b/src/python/tests/test_multigather.py index 646e9309..26c85277 100644 --- a/src/python/tests/test_multigather.py +++ b/src/python/tests/test_multigather.py @@ -183,7 +183,8 @@ def test_missing_querylist(runtmp, capfd, indexed, zip_query): captured = capfd.readouterr() print(captured.err) - assert 'Error: No such file or directory ' in captured.err + # assert 'Error: failed to load query' in captured.err + assert 'Error: No such file or directory' in captured.err @pytest.mark.parametrize('indexed', [False, True]) @@ -239,7 +240,10 @@ def test_bad_query_2(runtmp, capfd, indexed): captured = capfd.readouterr() print(captured.err) - assert 'Error: invalid Zip archive: Could not find central directory end' in captured.err + if not indexed: + assert 'Error: invalid Zip archive: Could not find central directory end' in captured.err + else: + assert "InvalidArchive" in captured.err @pytest.mark.parametrize('indexed', [False, True]) diff --git a/src/utils.rs b/src/utils.rs index 64472ae2..4efe1fd9 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -17,8 +17,7 @@ use std::sync::atomic::AtomicUsize; use std::collections::BinaryHeap; -use anyhow::{anyhow, Result}; - +use anyhow::{anyhow, Result, Context}; use std::cmp::{Ordering, PartialOrd}; use sourmash::signature::{Signature, SigsTrait}; @@ -26,6 +25,8 @@ use sourmash::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; use sourmash::sketch::Sketch; use sourmash::collection::Collection; use sourmash::selection::Selection; +use sourmash::errors::SourmashError; +use sourmash::storage::SigStore; /// Track a name/minhash. @@ -172,9 +173,10 @@ pub fn prefetch( } /// Write list of prefetch matches. -pub fn write_prefetch + std::fmt::Debug + std::fmt::Display + Clone>( - query: &SmallSignature, - prefetch_output: Option

, +// pub fn write_prefetch + std::fmt::Debug + std::fmt::Display + Clone>( +pub fn write_prefetch( + query: &SigStore, + prefetch_output: Option, matchlist: &BinaryHeap, ) -> Result<()> { // Set up a writer for prefetch output @@ -193,7 +195,7 @@ pub fn write_prefetch + std::fmt::Debug + std::fmt::Display + Clo writeln!( &mut writer, "{},\"{}\",{},\"{}\",{},{}", - query.location, query.name, query.md5sum, m.name, m.md5sum, m.overlap + query.filename(), query.name(), query.md5sum(), m.name, m.md5sum, m.overlap ) .ok(); } @@ -464,55 +466,49 @@ pub fn load_sketches( /// those with a minimum overlap. pub fn load_sketches_above_threshold( - sketchlist_paths: Vec, - template: &Sketch, + against_collection: Collection, + selection: &Selection, query: &KmerMinHash, threshold_hashes: u64, ) -> Result<(BinaryHeap, usize, usize)> { let skipped_paths = AtomicUsize::new(0); let failed_paths = AtomicUsize::new(0); - let matchlist: BinaryHeap = sketchlist_paths - .par_iter() - .filter_map(|m| { - let sigs = Signature::from_path(m); - let location = m.display().to_string(); - - match sigs { - Ok(sigs) => { - let mut mm = None; - - if let Some(sm) = prepare_query(&sigs, template, &location) { - let mh = sm.minhash; - if let Ok(overlap) = mh.count_common(query, false) { - if overlap >= threshold_hashes { - let result = PrefetchResult { - name: sm.name, - md5sum: sm.md5sum, - minhash: mh, - overlap, - }; - mm = Some(result); - } + let matchlist: BinaryHeap = against_collection + .par_iter() + .filter_map(|(idx, against_record)| { + let mut mm = None; + // Load against into memory + if let Ok(against_sig) = against_collection.sig_for_dataset(idx) { + if let Some(sketch) = against_sig.sketches().get(0) { + if let Sketch::MinHash(against_mh) = sketch { + if let Ok(overlap) = against_mh.count_common(query, false) { + if overlap >= threshold_hashes { + let result = PrefetchResult { + name: against_sig.name().to_string(), + md5sum: against_mh.md5sum().to_string(), + minhash: against_mh.clone(), + overlap, + }; + mm = Some(result); } - } else { - eprintln!("WARNING: no compatible sketches in path '{}'", m.display()); - let _i = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } - mm - } - Err(err) => { - eprintln!("Sketch loading error: {}", err); - let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); - eprintln!( - "WARNING: could not load sketches from path '{}'", - m.display() - ); - None + } else { + eprintln!("WARNING: no compatible sketches in path '{}'", against_sig.filename()); + let _i = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } + } else { + eprintln!("WARNING: no compatible sketches in path '{}'", against_sig.filename()); + let _i = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } - }) - .collect(); + } else { + // this shouldn't happen here anymore -- likely would happen at load_collection + eprintln!("WARNING: could not load sketches for record '{}'", against_record.internal_location()); + let _i = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + } + mm + }) + .collect(); let skipped_paths = skipped_paths.load(atomic::Ordering::SeqCst); let failed_paths = failed_paths.load(atomic::Ordering::SeqCst); @@ -689,22 +685,100 @@ pub fn load_sketches_from_zip_or_pathlist>( } pub fn load_collection( - sigpath: camino::Utf8PathBuf, + sigpath: &camino::Utf8PathBuf, selection: &Selection, + report_type: ReportType, ) -> Result { + if !sigpath.exists() { + bail!("No such file or directory: '{}'", sigpath); + } let collection = if sigpath.extension().map_or(false, |ext| ext == "zip") { - Collection::from_zipfile(&sigpath)? + match Collection::from_zipfile(&sigpath) { + Ok(collection) => collection, + Err(_) => { + bail!("failed to load {} zipfile: '{}'", report_type, sigpath); + } + } } else { - let sig_paths: Vec<_> = load_sketchlist_filenames_camino(&sigpath) - .unwrap_or_else(|_| panic!("Error loading siglist")) - .into_iter() - .collect(); - Collection::from_paths(&sig_paths)? + let sig_paths = load_sketchlist_filenames_camino(&sigpath)?; + match Collection::from_paths(&sig_paths) { + Ok(collection) => collection, + Err(_) => { + bail!("failed to load {} signature paths: '{}'", report_type, sigpath); + } + } }; - // return collection records that match selection - Ok(collection.select(&selection)?) + + let n_total = collection.len(); + let selected = collection.select(&selection)?; + let n_skipped = n_total - selected.len(); + let n_failed = 0; // TODO: can we get list / number of failed paths from core??? + report_on_collection_loading(&selected, n_skipped, n_failed, report_type)?; + Ok(selected) +} + +pub fn report_on_collection_loading( + collection: &Collection, + skipped_paths: usize, + failed_paths: usize, + report_type: ReportType, +) -> Result<()> { + if failed_paths > 0 { + eprintln!( + "WARNING: {} {} paths failed to load. See error messages above.", + failed_paths, report_type + ); + } + if skipped_paths > 0 { + eprintln!( + "WARNING: skipped {} {} paths - no compatible signatures.", + skipped_paths, report_type + ); + } + + // Validate sketches + if collection.is_empty() { + bail!("No {} signatures loaded, exiting.", report_type); + } + eprintln!("Loaded {} {} signature(s)", collection.len(), report_type); + Ok(()) } +pub fn load_single_sig_from_collection( + query_collection: &Collection, // Replace with the actual type + selection: &Selection, +) -> Result { + let scaled = selection.scaled().unwrap(); + let ksize = selection.ksize().unwrap(); + + match query_collection.sig_for_dataset(0) { + Ok(sig) => Ok(sig), + Err(_) => Err(anyhow::anyhow!("No sketch found with scaled={}, k={}", scaled, ksize)), + } +} + +// pub fn load_single_sketch_from_sig<'a>(sig: &'a SigStore, selection: &'a Selection) -> Result<&'a KmerMinHash> { +// let sketch = sig.sketches().get(0).ok_or_else(|| { +// anyhow::anyhow!("No sketch found with scaled={}, k={}", selection.scaled().unwrap_or_default(), selection.ksize().unwrap_or_default()) +// })?; + +// if let Sketch::MinHash(mh) = sketch { +// Ok(mh) +// } else { +// Err(anyhow::anyhow!("No sketch found with scaled={}, k={}", selection.scaled().unwrap_or_default(), selection.ksize().unwrap_or_default())) +// } +// } + +// pub fn load_single_sig_and_sketch<'a>( +// query_collection: &'a Collection, +// selection: &'a Selection, +// ) -> Result<(SigStore, &'a KmerMinHash)> { +// let sig = load_single_sig_from_collection(query_collection, selection)?; +// let sketch = load_single_sketch_from_sig(&sig, selection)?; +// Ok((sig, sketch)) +// } + + /// Uses the output of sketch loading functions to report the /// total number of sketches loaded, as well as the number of files, /// if any, that failed to load or contained no compatible sketches. @@ -758,7 +832,7 @@ pub fn report_on_sketch_loading( /// removing matches in 'matchlist' from 'query'. pub fn consume_query_by_gather + std::fmt::Debug + std::fmt::Display + Clone>( - query: SmallSignature, + query: SigStore, matchlist: BinaryHeap, threshold_hashes: u64, gather_output: Option

, @@ -778,17 +852,25 @@ pub fn consume_query_by_gather + std::fmt::Debug + std::fmt::Disp let mut matching_sketches = matchlist; let mut rank = 0; - let mut last_hashes = query.minhash.size(); + let mut last_hashes = query.size(); let mut last_matches = matching_sketches.len(); - let location = query.location; - let mut query_mh = query.minhash; + // let location = query.location; + let location = query.filename(); + // let mut query_mh = query.minhash; + + let sketches = query.sketches(); + let orig_query_mh = match sketches.get(0) { + Some(Sketch::MinHash(mh)) => Ok(mh), + _ => Err(anyhow::anyhow!("No MinHash found")), + }?; + let mut query_mh = orig_query_mh.clone(); eprintln!( "{} iter {}: start: query hashes={} matches={}", location, rank, - query_mh.size(), + query.size(), matching_sketches.len() ); @@ -803,8 +885,8 @@ pub fn consume_query_by_gather + std::fmt::Debug + std::fmt::Disp "{},{},\"{}\",{},\"{}\",{},{}", location, rank, - query.name, - query.md5sum, + query.name(), + query.md5sum(), best_element.name, best_element.md5sum, best_element.overlap @@ -855,7 +937,23 @@ pub fn build_template(ksize: u8, scaled: usize, moltype: &str) -> Sketch { Sketch::MinHash(template_mh) } -pub fn is_revindex_database(path: &Path) -> bool { +pub fn build_selection(ksize: u8, scaled: usize, moltype: &str) -> Selection { + let hash_function = match moltype { + "dna" => HashFunctions::Murmur64Dna, + "protein" => HashFunctions::Murmur64Protein, + "dayhoff" => HashFunctions::Murmur64Dayhoff, + "hp" => HashFunctions::Murmur64Hp, + _ => panic!("Unknown molecule type: {}", moltype), + }; + + Selection::builder() + .ksize(ksize.into()) + .scaled(scaled as u32) + .moltype(hash_function) + .build() +} + +pub fn is_revindex_database(path: &camino::Utf8PathBuf) -> bool { // quick file check for Revindex database: // is path a directory that contains a file named 'CURRENT'? if path.is_dir() {