From f845ab92697ea04136d2193597d3597fdf0a5a1c Mon Sep 17 00:00:00 2001 From: "C. Titus Brown" Date: Wed, 6 Sep 2023 18:35:38 -0700 Subject: [PATCH] MRG: report load sketch errors (#97) * report load sketch errors * update descriptions of python plugins * print out loading errors --- doc/README.md | 2 +- src/lib.rs | 347 ++++++++++++------------ src/python/pyo3_branchwater/__init__.py | 4 +- 3 files changed, 182 insertions(+), 171 deletions(-) diff --git a/doc/README.md b/doc/README.md index 06a7ec81..04647fab 100644 --- a/doc/README.md +++ b/doc/README.md @@ -28,7 +28,7 @@ find gtdb-reps-rs214-k21/ -name "*.sig.gz" -type f > list.gtdb-reps-rs214-k21.tx ### Running `multisearch` -The `multisearch` command compares one or more query genomes, and one or more subject genomes. It differs from `manysearch` by loading everything into memory. +The `multisearch` command compares one or more query genomes, and one or more subject genomes. It differs from `manysearch` by loading all genomes into memory. `multisearch` takes two file lists as input, and outputs a CSV: ``` diff --git a/src/lib.rs b/src/lib.rs index f310adac..f0ea25db 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -223,49 +223,50 @@ fn manysearch>( let mut results = vec![]; // load search signature from path: - if let Ok(search_sigs) = Signature::from_path(filename) { - let location = filename.display().to_string(); - if let Some(search_sm) = prepare_query(&search_sigs, &template, &location) { - // search for matches & save containment. - for q in queries.iter() { - let overlap = q.minhash.count_common(&search_sm.minhash, false).unwrap() as f64; - let query_size = q.minhash.size() as f64; - let target_size = search_sm.minhash.size() as f64; - - let containment_query_in_target = overlap / query_size; - let containment_in_target = overlap / target_size; - let max_containment = containment_query_in_target.max(containment_in_target); - let jaccard = overlap / (target_size + query_size - overlap); - - if containment_query_in_target > threshold { - results.push(SearchResult { - query_name: q.name.clone(), - query_md5: q.md5sum.clone(), - match_name: search_sm.name.clone(), - containment: containment_query_in_target, - intersect_hashes: overlap as usize, - match_md5: Some(search_sm.md5sum.clone()), - jaccard: Some(jaccard), - max_containment: Some(max_containment), - }); + match Signature::from_path(filename) { + Ok(search_sigs) => { + let location = filename.display().to_string(); + if let Some(search_sm) = prepare_query(&search_sigs, &template, &location) { + // search for matches & save containment. + for q in queries.iter() { + let overlap = q.minhash.count_common(&search_sm.minhash, false).unwrap() as f64; + let query_size = q.minhash.size() as f64; + let target_size = search_sm.minhash.size() as f64; + + let containment_query_in_target = overlap / query_size; + let containment_in_target = overlap / target_size; + let max_containment = containment_query_in_target.max(containment_in_target); + let jaccard = overlap / (target_size + query_size - overlap); + + if containment_query_in_target > threshold { + results.push(SearchResult { + query_name: q.name.clone(), + query_md5: q.md5sum.clone(), + match_name: search_sm.name.clone(), + containment: containment_query_in_target, + intersect_hashes: overlap as usize, + match_md5: Some(search_sm.md5sum.clone()), + jaccard: Some(jaccard), + max_containment: Some(max_containment), + }); + } } + } else { + eprintln!("WARNING: no compatible sketches in path '{}'", + filename.display()); + let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } - } else { - eprintln!("WARNING: no compatible sketches in path '{}'", + Some(results) + }, + Err(err) => { + let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); + eprintln!("Sketch loading error: {}", err); + eprintln!("WARNING: could not load sketches from path '{}'", filename.display()); - let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + None } - } else { - let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); - eprintln!("WARNING: could not load sketches from path '{}'", - filename.display()); } - if results.is_empty() { - None - } else { - Some(results) - } }) .flatten() .try_for_each_with(send, |s, m| s.send(m)); @@ -388,23 +389,25 @@ fn load_sketches(sketchlist_paths: Vec, template: &Sketch) -> let sketchlist : Vec = sketchlist_paths .par_iter() .filter_map(|m| { - let mut sm = None; - let filename = m.display().to_string(); - if let Ok(sigs) = Signature::from_path(m) { - sm = prepare_query(&sigs, template, &filename); - if sm.is_none() { - // track number of paths that have no matching sigs - let _i = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + match Signature::from_path(m) { + Ok(sigs) => { + let sm = prepare_query(&sigs, template, &filename); + if sm.is_none() { + // track number of paths that have no matching sigs + let _i = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + } + sm + }, + Err(err) => { + // failed to load from this path - print error & track. + eprintln!("Sketch loading error: {}", err); + eprintln!("WARNING: could not load sketches from path '{}'", filename); + let _i = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); + None } - } else { - // failed to load from this path - print error & track. - eprintln!("WARNING: could not load sketches from path '{}'", - filename); - let _i = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); } - sm }) .collect(); @@ -690,62 +693,62 @@ fn fastmultigather + std::fmt::Debug + Clone>( let location = q.clone().into_os_string().into_string().unwrap(); let location = location.split('/').last().unwrap().to_string(); - if let Some(query) = { - // load query from q - let mut mm = None; - if let Ok(sigs) = Signature::from_path(dbg!(q)) { - mm = prepare_query(&sigs, &template, &location); + let query = match Signature::from_path(dbg!(q)) { + Ok(sigs) => { + let mm = prepare_query(&sigs, &template, &location); if mm.is_none() { eprintln!("WARNING: no compatible sketches in path '{}'", - q.display()); + q.display()); let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } - } else { + mm + }, + Err(err) => { + eprintln!("Sketch loading error: {}", err); eprintln!("WARNING: could not load sketches from path '{}'", - q.display()); + q.display()); let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); + None } - mm - } { + }; - // filter first set of matches out of sketchlist - let matchlist: BinaryHeap = sketchlist - .par_iter() - .filter_map(|sm| { - let mut mm = None; + if let Some(query) = query { + // filter first set of matches out of sketchlist + let matchlist: BinaryHeap = sketchlist + .par_iter() + .filter_map(|sm| { + let mut mm = None; - if let Ok(overlap) = sm.minhash.count_common(&query.minhash, false) { - if overlap >= threshold_hashes { - let result = PrefetchResult { - name: sm.name.clone(), - md5sum: sm.md5sum.clone(), - minhash: sm.minhash.clone(), - overlap, - }; - mm = Some(result); + if let Ok(overlap) = sm.minhash.count_common(&query.minhash, false) { + if overlap >= threshold_hashes { + let result = PrefetchResult { + name: sm.name.clone(), + md5sum: sm.md5sum.clone(), + minhash: sm.minhash.clone(), + overlap, + }; + mm = Some(result); + } } - } - mm + mm }) .collect(); - if !matchlist.is_empty() { - let prefetch_output = format!("{location}.prefetch.csv"); - let gather_output = format!("{location}.gather.csv"); + if !matchlist.is_empty() { + let prefetch_output = format!("{location}.prefetch.csv"); + let gather_output = format!("{location}.gather.csv"); - // save initial list of matches to prefetch output - write_prefetch(&query, Some(prefetch_output), &matchlist).ok(); + // save initial list of matches to prefetch output + write_prefetch(&query, Some(prefetch_output), &matchlist).ok(); - // now, do the gather! - consume_query_by_gather(query, matchlist, threshold_hashes, - Some(gather_output)).ok(); - } else { - println!("No matches to '{}'", location); + // now, do the gather! + consume_query_by_gather(query, matchlist, threshold_hashes, + Some(gather_output)).ok(); + } else { + println!("No matches to '{}'", location); + } } - } else { - println!("ERROR loading signature from '{}'", location); - } }); @@ -1004,45 +1007,50 @@ fn mastiff_manysearch>( let mut results = vec![]; // load query signature from path: - if let Ok(query_sig) = Signature::from_path(filename) { - let location = filename.display().to_string(); - if let Some(query) = prepare_query(&query_sig, &template, &location) { - let query_size = query.minhash.size() as f64; - // search mastiff db - let counter = db.counter_for_query(&query.minhash); - let matches = db.matches_from_counter(counter, minimum_containment as usize); - - // filter the matches for containment - for (path, overlap) in matches { - let containment = overlap as f64 / query_size; - if containment >= minimum_containment { - results.push( SearchResult { - query_name: query.name.clone(), - query_md5: query.md5sum.clone(), - match_name: path.clone(), - containment, - intersect_hashes: overlap, - match_md5: None, - jaccard: None, - max_containment: None, - }); + match Signature::from_path(filename) { + Ok(query_sig) => { + let location = filename.display().to_string(); + if let Some(query) = prepare_query(&query_sig, &template, &location) { + let query_size = query.minhash.size() as f64; + // search mastiff db + let counter = db.counter_for_query(&query.minhash); + let matches = db.matches_from_counter(counter, minimum_containment as usize); + + // filter the matches for containment + for (path, overlap) in matches { + let containment = overlap as f64 / query_size; + if containment >= minimum_containment { + results.push( SearchResult { + query_name: query.name.clone(), + query_md5: query.md5sum.clone(), + match_name: path.clone(), + containment, + intersect_hashes: overlap, + match_md5: None, + jaccard: None, + max_containment: None, + }); + } } + } else { + eprintln!("WARNING: no compatible sketches in path '{}'", + filename.display()); + let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } - } else { - eprintln!("WARNING: no compatible sketches in path '{}'", + if results.is_empty() { + None + } else { + Some(results) + } + }, + Err(err) => { + let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); + eprintln!("Sketch loading error: {}", err); + eprintln!("WARNING: could not load sketches from path '{}'", filename.display()); - let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + None } - } else { - let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); - eprintln!("WARNING: could not load sketches from path '{}'", - filename.display()); } - if results.is_empty() { - None - } else { - Some(results) - } }) .flatten() .try_for_each_with(send, |s, results| { @@ -1138,55 +1146,58 @@ fn mastiff_manygather>( let mut results = vec![]; // load query signature from path: - if let Ok(query_sig) = Signature::from_path(filename) { - let location = filename.display().to_string(); - if let Some(query) = prepare_query(&query_sig, &template, &location) { - // let query_size = query.minhash.size() as f64; - let threshold = threshold_bp / query.minhash.scaled() as usize; - - // mastiff gather code - info!("Building counter"); - let (counter, query_colors, hash_to_color) = db.prepare_gather_counters(&query.minhash); - info!("Counter built"); - - let matches = db.gather( - counter, - query_colors, - hash_to_color, - threshold, - &query.minhash, - &template, - ); - - // extract matches from Result - if let Ok(matches) = matches { - for match_ in &matches { - results.push((query.name.clone(), - query.md5sum.clone(), - match_.name().clone(), - match_.md5().clone(), - match_.f_match(), // f_match_query - match_.intersect_bp())); // intersect_bp + match Signature::from_path(filename) { + Ok(query_sig) => { + let location = filename.display().to_string(); + if let Some(query) = prepare_query(&query_sig, &template, &location) { + // let query_size = query.minhash.size() as f64; + let threshold = threshold_bp / query.minhash.scaled() as usize; + + // mastiff gather code + info!("Building counter"); + let (counter, query_colors, hash_to_color) = db.prepare_gather_counters(&query.minhash); + info!("Counter built"); + + let matches = db.gather( + counter, + query_colors, + hash_to_color, + threshold, + &query.minhash, + &template, + ); + + // extract matches from Result + if let Ok(matches) = matches { + for match_ in &matches { + results.push((query.name.clone(), + query.md5sum.clone(), + match_.name().clone(), + match_.md5().clone(), + match_.f_match(), // f_match_query + match_.intersect_bp())); // intersect_bp + } + } else { + eprintln!("Error gathering matches: {:?}", matches.err()); } } else { - eprintln!("Error gathering matches: {:?}", matches.err()); + eprintln!("WARNING: no compatible sketches in path '{}'", + filename.display()); + let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); } - - } else { - eprintln!("WARNING: no compatible sketches in path '{}'", - filename.display()); - let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst); + if results.is_empty() { + None + } else { + Some(results) + } + }, + Err(err) => { + let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); + eprintln!("Sketch loading error: {}", err); + eprintln!("WARNING: could not load sketches from path '{}'", + filename.display()); + None } - } else { - let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst); - eprintln!("WARNING: could not load sketches from path '{}'", - filename.display()); - } - - if results.is_empty() { - None - } else { - Some(results) } }) .flatten() diff --git a/src/python/pyo3_branchwater/__init__.py b/src/python/pyo3_branchwater/__init__.py index 3796c332..2e5e0a66 100755 --- a/src/python/pyo3_branchwater/__init__.py +++ b/src/python/pyo3_branchwater/__init__.py @@ -38,7 +38,7 @@ def set_thread_pool(user_cores): class Branchwater_Manysearch(CommandLinePlugin): command = 'manysearch' - description = 'massively parallel sketch search' + description = 'search many metagenomes for contained genomes' def __init__(self, p): super().__init__(p) @@ -220,7 +220,7 @@ def main(self, args): class Branchwater_Multisearch(CommandLinePlugin): command = 'multisearch' - description = 'massively parallel sketch search' + description = 'massively parallel in-memory sketch search' def __init__(self, p): super().__init__(p)