Skip to content

Commit

Permalink
fix fastgather
Browse files Browse the repository at this point in the history
  • Loading branch information
bluegenes committed Jan 26, 2024
1 parent cd8be99 commit 32fc2d5
Showing 1 changed file with 60 additions and 127 deletions.
187 changes: 60 additions & 127 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,30 @@ pub fn fastgather(
if query_collection.len() > 1 {
bail!("Found more than one compatible sketch from '{}'. Fastgather requires a single query sketch.", &query_filepath)
}
// load query sig into memory
let mut query_mh = None;
let mut query_sig = None;
for (idx, _record) in query_collection.iter() {
// Load query sig
match query_collection.sig_for_dataset(idx) {
Ok(query_sig) => {
for sketch in query_sig.iter() {
// Access query MinHash
if let Sketch::MinHash(query) = sketch {
query_mh = Some(query.clone());
break;
}
}
}
Err(_) => {
bail!("No query sketch matching selection parameters.") // should not get here bc we already check this during collection loading?
}
}

if query_mh.is_some() {
break; // Exit the loop if we found a MinHash sketch
}
}

// build the list of paths to match against.
eprintln!("Loading matchlist from '{}'", against_filepath);
Expand All @@ -49,131 +73,40 @@ pub fn fastgather(
"using threshold overlap: {} {}",
threshold_hashes, threshold_bp
);
query_collection.iter().for_each(|(idx, record)| {
// Load query sig
match query_collection.sig_for_dataset(idx) {
Ok(query_sig) => {
let location = query_sig.filename();
let mut matchlist: BinaryHeap<PrefetchResult> = BinaryHeap::new();
let mut skipped_paths = 0;
let mut failed_paths = 0;

for sketch in query_sig.iter() {
// Access query MinHash
if let Sketch::MinHash(query) = sketch {
let result = load_sketches_above_threshold(
against_collection,
&selection,
&query,
threshold_hashes,
);

match result {
Ok((loaded_matchlist, skipped, failed)) => {
matchlist.extend(loaded_matchlist);
skipped_paths += skipped;
failed_paths += failed;
}
Err(err) => {
eprintln!("Error loading sketches: {:?}", err);
failed_paths += 1;
}
}
}
}

if skipped_paths > 0 {
eprintln!(
"WARNING: Skipped {} search paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} search paths failed to load. See error messages above.",
failed_paths
);
}

if matchlist.is_empty() {
eprintln!("No search signatures loaded for '{}', exiting.", location);
return; // Return early if no search signatures loaded
}

if let Some(prefetch_output) = &prefetch_output {
write_prefetch(&query_sig, Some(prefetch_output.clone()), &matchlist).ok();
}

// Run the gather!
if let Some(gather_output) = &gather_output {
if let Err(err) = consume_query_by_gather(query_sig, matchlist, threshold_hashes, Some(gather_output)) {
eprintln!("Error during gather: {:?}", err);
}
}
}
Err(_) => {
eprintln!("WARNING: Could not load query sketch '{}'", record.internal_location());
}
}
});
Ok(())
}

// query_collection.iter().for_each(|(idx, record)| {
// // Load query sig
// match query_collection.sig_for_dataset(idx) {
// Ok(query_sig) => {
// let location = query_sig.filename();
// for sketch in query_sig.iter() {
// // Access query MinHash
// if let Sketch::MinHash(query) = sketch {
// let matchlist: BinaryHeap<PrefetchResult> = sketchlist
// .par_iter()
// .filter_map(|sm| {
// // Call a function to load sketches above threshold
// let result = load_sketches_above_threshold(
// against_collection,
// &selection,
// &query,
// threshold_hashes,
// )?;
// let matchlist = result.0;
// let skipped_paths = result.1;
// let failed_paths = result.2;

// if skipped_paths > 0 {
// eprintln!(
// "WARNING: skipped {} search paths - no compatible signatures.",
// skipped_paths
// );
// }
// if failed_paths > 0 {
// eprintln!(
// "WARNING: {} search paths failed to load. See error messages above.",
// failed_paths
// );
// }

// if matchlist.is_empty() {
// eprintln!("No search signatures loaded, exiting.");
// return Ok(());
// }

// if prefetch_output.is_some() {
// write_prefetch(&query_sig, prefetch_output, &matchlist).ok();
// }

// // run the gather!
// consume_query_by_gather(query_sig, matchlist, threshold_hashes, gather_output).ok();
// });
// }
// }
// }
// }
// Err(_) => {
// eprintln!("WARNING: Could not load query sketch '{}'", record.internal_location());
// }
// }
// });
// Ok(())
// }
// load a set of sketches, filtering for those with overlaps > threshold
let result = load_sketches_above_threshold(
against_collection,
&selection,
&query_mh.unwrap(),
threshold_hashes,
)?;
let matchlist = result.0;
let skipped_paths = result.1;
let failed_paths = result.2;
if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} search paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} search paths failed to load. See error messages above.",
failed_paths
);
}

if matchlist.is_empty() {
eprintln!("No search signatures loaded, exiting.");
return Ok(());
}

if prefetch_output.is_some() {
write_prefetch(query_sig.as_ref().unwrap(), prefetch_output, &matchlist).ok();
}

// run the gather!
consume_query_by_gather(query_sig.clone().unwrap(), matchlist, threshold_hashes, gather_output).ok();
Ok(())
}

0 comments on commit 32fc2d5

Please sign in to comment.