Skip to content

Commit

Permalink
...collection loading in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
bluegenes committed Jan 26, 2024
1 parent 13940cd commit cd8be99
Show file tree
Hide file tree
Showing 8 changed files with 482 additions and 320 deletions.
10 changes: 4 additions & 6 deletions src/check.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use std::path::Path;

use crate::utils::is_revindex_database;

use sourmash::index::revindex::{RevIndex, RevIndexOps};

pub fn check<P: AsRef<Path>>(index: P, quick: bool) -> Result<(), Box<dyn std::error::Error>> {
if !is_revindex_database(index.as_ref()) {
pub fn check(index: camino::Utf8PathBuf, quick: bool) -> Result<(), Box<dyn std::error::Error>> {
if !is_revindex_database(&index) {
bail!(
"'{}' is not a valid RevIndex database",
index.as_ref().display()
index
);
}

println!("Opening DB");
let db = RevIndex::open(index.as_ref(), true)?;
let db = RevIndex::open(index, true)?;

println!("Starting check");
db.check(quick);
Expand Down
215 changes: 147 additions & 68 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,39 @@
/// fastgather: Run gather with a query against a list of files.
use anyhow::Result;

use sourmash::signature::Signature;
use sourmash::sketch::Sketch;
use std::path::Path;
use sourmash::signature::Signature;
use sourmash::selection::Selection;
use camino;
use std::collections::BinaryHeap;
use crate::utils::PrefetchResult;

use crate::utils::{
consume_query_by_gather, load_sigpaths_from_zip_or_pathlist, load_sketches_above_threshold,
prepare_query, write_prefetch, ReportType,
consume_query_by_gather, load_sketches_above_threshold, write_prefetch, ReportType, load_collection
};

pub fn fastgather<P: AsRef<Path> + std::fmt::Debug + std::fmt::Display + Clone>(
query_filename: P,
matchlist_filename: P,
pub fn fastgather(
query_filepath: camino::Utf8PathBuf,
against_filepath: camino::Utf8PathBuf,
threshold_bp: usize,
ksize: u8,
scaled: usize,
template: Sketch,
gather_output: Option<P>,
prefetch_output: Option<P>,
selection: &Selection,
gather_output: Option<String>,
prefetch_output: Option<String>,
) -> Result<()> {
let location = query_filename.to_string();
eprintln!("Loading query from '{}'", location);
let query = {
let sigs = Signature::from_path(query_filename)?;

prepare_query(&sigs, &template, &location)
};
// did we find anything matching the desired template?
let query = match query {
Some(query) => query,
None => bail!("No sketch found with scaled={}, k={}", scaled, ksize),
};

// build the list of paths to match against.
eprintln!(
"Loading matchlist from '{}'",
matchlist_filename.as_ref().display()
);

let matchlist_filename = matchlist_filename.as_ref().to_string_lossy().to_string();
let (matchlist_paths, _temp_dir) =
load_sigpaths_from_zip_or_pathlist(matchlist_filename, &template, ReportType::Against)?;
let query_collection = load_collection(&query_filepath, selection, ReportType::Query)?;

eprintln!("Loaded {} sig paths in matchlist", matchlist_paths.len());
if query_collection.len() > 1 {
bail!("Found more than one compatible sketch from '{}'. Fastgather requires a single query sketch.", &query_filepath)
}

// build the list of paths to match against.
eprintln!("Loading matchlist from '{}'", against_filepath);
let against_collection = load_collection(&against_filepath, selection, ReportType::Against)?;
eprintln!("Loaded {} sig paths in matchlist", against_collection.len());

// calculate the minimum number of hashes based on desired threshold
let threshold_hashes: u64 = {
let x = threshold_bp / scaled;
Expand All @@ -60,41 +49,131 @@ pub fn fastgather<P: AsRef<Path> + std::fmt::Debug + std::fmt::Display + Clone>(
"using threshold overlap: {} {}",
threshold_hashes, threshold_bp
);

// load a set of sketches, filtering for those with overlaps > threshold
let result = load_sketches_above_threshold(
matchlist_paths,
&template,
&query.minhash,
threshold_hashes,
)?;
let matchlist = result.0;
let skipped_paths = result.1;
let failed_paths = result.2;

if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} search paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} search paths failed to load. See error messages above.",
failed_paths
);
}

if matchlist.is_empty() {
eprintln!("No search signatures loaded, exiting.");
return Ok(());
}

if prefetch_output.is_some() {
write_prefetch(&query, prefetch_output, &matchlist).ok();
}

// run the gather!
consume_query_by_gather(query, matchlist, threshold_hashes, gather_output).ok();
query_collection.iter().for_each(|(idx, record)| {
// Load query sig
match query_collection.sig_for_dataset(idx) {
Ok(query_sig) => {
let location = query_sig.filename();
let mut matchlist: BinaryHeap<PrefetchResult> = BinaryHeap::new();
let mut skipped_paths = 0;
let mut failed_paths = 0;

for sketch in query_sig.iter() {
// Access query MinHash
if let Sketch::MinHash(query) = sketch {
let result = load_sketches_above_threshold(
against_collection,
&selection,
&query,
threshold_hashes,
);

match result {
Ok((loaded_matchlist, skipped, failed)) => {
matchlist.extend(loaded_matchlist);
skipped_paths += skipped;
failed_paths += failed;
}
Err(err) => {
eprintln!("Error loading sketches: {:?}", err);
failed_paths += 1;
}
}
}
}

if skipped_paths > 0 {
eprintln!(
"WARNING: Skipped {} search paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} search paths failed to load. See error messages above.",
failed_paths
);
}

if matchlist.is_empty() {
eprintln!("No search signatures loaded for '{}', exiting.", location);
return; // Return early if no search signatures loaded
}

if let Some(prefetch_output) = &prefetch_output {
write_prefetch(&query_sig, Some(prefetch_output.clone()), &matchlist).ok();
}

// Run the gather!
if let Some(gather_output) = &gather_output {
if let Err(err) = consume_query_by_gather(query_sig, matchlist, threshold_hashes, Some(gather_output)) {
eprintln!("Error during gather: {:?}", err);
}
}
}
Err(_) => {
eprintln!("WARNING: Could not load query sketch '{}'", record.internal_location());
}
}
});
Ok(())
}

// query_collection.iter().for_each(|(idx, record)| {
// // Load query sig
// match query_collection.sig_for_dataset(idx) {
// Ok(query_sig) => {
// let location = query_sig.filename();
// for sketch in query_sig.iter() {
// // Access query MinHash
// if let Sketch::MinHash(query) = sketch {
// let matchlist: BinaryHeap<PrefetchResult> = sketchlist
// .par_iter()
// .filter_map(|sm| {
// // Call a function to load sketches above threshold
// let result = load_sketches_above_threshold(
// against_collection,
// &selection,
// &query,
// threshold_hashes,
// )?;
// let matchlist = result.0;
// let skipped_paths = result.1;
// let failed_paths = result.2;

// if skipped_paths > 0 {
// eprintln!(
// "WARNING: skipped {} search paths - no compatible signatures.",
// skipped_paths
// );
// }
// if failed_paths > 0 {
// eprintln!(
// "WARNING: {} search paths failed to load. See error messages above.",
// failed_paths
// );
// }

// if matchlist.is_empty() {
// eprintln!("No search signatures loaded, exiting.");
// return Ok(());
// }

// if prefetch_output.is_some() {
// write_prefetch(&query_sig, prefetch_output, &matchlist).ok();
// }

// // run the gather!
// consume_query_by_gather(query_sig, matchlist, threshold_hashes, gather_output).ok();
// });
// }
// }
// }
// }
// Err(_) => {
// eprintln!("WARNING: Could not load query sketch '{}'", record.internal_location());
// }
// }
// });
// Ok(())
// }
Loading

0 comments on commit cd8be99

Please sign in to comment.