Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
… into update/docs_for_zip
  • Loading branch information
ctb committed Sep 14, 2023
2 parents 7834a3f + 257015b commit c2f70cb
Show file tree
Hide file tree
Showing 12 changed files with 780 additions and 541 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ jobs:
override: true
components: rustfmt, clippy

- name: Run cargo fmt
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check

- name: cache rust
uses: Swatinem/rust-cache@v2

Expand Down
7 changes: 4 additions & 3 deletions src/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use crate::utils::is_revindex_database;

use sourmash::index::revindex::RevIndex;


pub fn check<P: AsRef<Path>>(index: P, quick: bool) -> Result<(), Box<dyn std::error::Error>> {

if !is_revindex_database(index.as_ref()) {
bail!("'{}' is not a valid RevIndex database", index.as_ref().display());
bail!(
"'{}' is not a valid RevIndex database",
index.as_ref().display()
);
}

println!("Opening DB");
Expand Down
53 changes: 32 additions & 21 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/// fastgather: Run gather with a query against a list of files.
use anyhow::Result;


use sourmash::signature::Signature;
use std::path::Path;
use sourmash::sketch::minhash::{max_hash_for_scaled, KmerMinHash};
use sourmash::sketch::Sketch;
use std::path::Path;

use crate::utils::{prepare_query, write_prefetch,
load_sketches_above_threshold, consume_query_by_gather,
load_sigpaths_from_zip_or_pathlist};
use crate::utils::{
consume_query_by_gather, load_sigpaths_from_zip_or_pathlist, load_sketches_above_threshold,
prepare_query, write_prefetch,
};

pub fn fastgather<P: AsRef<Path> + std::fmt::Debug + std::fmt::Display + Clone>(
query_filename: P,
Expand Down Expand Up @@ -43,42 +43,54 @@ pub fn fastgather<P: AsRef<Path> + std::fmt::Debug + std::fmt::Display + Clone>(
};

// build the list of paths to match against.
eprintln!("Loading matchlist from '{}'", matchlist_filename.as_ref().display());
eprintln!(
"Loading matchlist from '{}'",
matchlist_filename.as_ref().display()
);

let matchlist_filename = matchlist_filename.as_ref().to_string_lossy().to_string();
let (matchlist_paths, _temp_dir) = load_sigpaths_from_zip_or_pathlist(&matchlist_filename)?;

eprintln!("Loaded {} sig paths in matchlist", matchlist_paths.len());

// calculate the minimum number of hashes based on desired threshold
let threshold_hashes : u64 = {
let threshold_hashes: u64 = {
let x = threshold_bp / scaled;
if x > 0 {
x
} else {
1
}
}.try_into()?;
}
.try_into()?;

eprintln!("using threshold overlap: {} {}",
threshold_hashes, threshold_bp);
eprintln!(
"using threshold overlap: {} {}",
threshold_hashes, threshold_bp
);

// load a set of sketches, filtering for those with overlaps > threshold
let result = load_sketches_above_threshold(matchlist_paths,
&template,
&query.minhash,
threshold_hashes)?;
let result = load_sketches_above_threshold(
matchlist_paths,
&template,
&query.minhash,
threshold_hashes,
)?;
let matchlist = result.0;
let skipped_paths = result.1;
let failed_paths = result.2;

if skipped_paths > 0 {
eprintln!("WARNING: skipped {} search paths - no compatible signatures.",
skipped_paths);
eprintln!(
"WARNING: skipped {} search paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!("WARNING: {} search paths failed to load. See error messages above.",
failed_paths);
eprintln!(
"WARNING: {} search paths failed to load. See error messages above.",
failed_paths
);
}

if matchlist.is_empty() {
Expand All @@ -91,7 +103,6 @@ pub fn fastgather<P: AsRef<Path> + std::fmt::Debug + std::fmt::Display + Clone>(
}

// run the gather!
consume_query_by_gather(query, matchlist, threshold_hashes,
gather_output).ok();
consume_query_by_gather(query, matchlist, threshold_hashes, gather_output).ok();
Ok(())
}
}
153 changes: 81 additions & 72 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
/// fastmultigather: Run gather for multiple queries against a list of files.
use anyhow::Result;
use rayon::prelude::*;

use sourmash::signature::Signature;
use std::path::Path;
use sourmash::sketch::minhash::{max_hash_for_scaled, KmerMinHash};
use sourmash::sketch::Sketch;
use std::path::Path;

use std::sync::atomic;
use std::sync::atomic::AtomicUsize;

use std::collections::BinaryHeap;

use crate::utils::{prepare_query, write_prefetch, PrefetchResult,
consume_query_by_gather, load_sigpaths_from_zip_or_pathlist, load_sketches_from_zip_or_pathlist, ReportType};
use crate::utils::{
consume_query_by_gather, load_sigpaths_from_zip_or_pathlist,
load_sketches_from_zip_or_pathlist, prepare_query, write_prefetch, PrefetchResult, ReportType,
};

pub fn fastmultigather<P: AsRef<Path> + std::fmt::Debug + Clone>(
query_filenames: P,
Expand All @@ -36,109 +37,117 @@ pub fn fastmultigather<P: AsRef<Path> + std::fmt::Debug + Clone>(
let (querylist_paths, _temp_dir) = load_sigpaths_from_zip_or_pathlist(&query_filenames)?;
println!("Loaded {} sig paths in querylist", querylist_paths.len());

let threshold_hashes : u64 = {
let threshold_hashes: u64 = {
let x = threshold_bp / scaled;
if x > 0 {
x
} else {
1
}
}.try_into().unwrap();
}
.try_into()
.unwrap();

println!("threshold overlap: {} {}", threshold_hashes, threshold_bp);

// Load all the against sketches
let sketchlist = load_sketches_from_zip_or_pathlist(&matchlist_filename, &template, ReportType::Against)?;
let sketchlist =
load_sketches_from_zip_or_pathlist(&matchlist_filename, &template, ReportType::Against)?;

// Iterate over all queries => do prefetch and gather!
let processed_queries = AtomicUsize::new(0);
let skipped_paths = AtomicUsize::new(0);
let failed_paths = AtomicUsize::new(0);

querylist_paths
.par_iter()
.for_each(|q| {
// increment counter of # of queries
let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst);
querylist_paths.par_iter().for_each(|q| {
// increment counter of # of queries
let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst);

// set query_label to the last path element.
let location = q.clone().into_os_string().into_string().unwrap();
let location = location.split('/').last().unwrap().to_string();
// set query_label to the last path element.
let location = q.clone().into_os_string().into_string().unwrap();
let location = location.split('/').last().unwrap().to_string();

let query = match Signature::from_path(dbg!(q)) {
Ok(sigs) => {
let mm = prepare_query(&sigs, &template, &location);
let query = match Signature::from_path(dbg!(q)) {
Ok(sigs) => {
let mm = prepare_query(&sigs, &template, &location);

if mm.is_none() {
if !queryfile_name.ends_with(".zip") {
eprintln!("WARNING: no compatible sketches in path '{}'",
q.display());
}
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
if mm.is_none() {
if !queryfile_name.ends_with(".zip") {
eprintln!("WARNING: no compatible sketches in path '{}'", q.display());
}
mm
},
Err(err) => {
eprintln!("Sketch loading error: {}", err);
eprintln!("WARNING: could not load sketches from path '{}'",
q.display());
let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst);
None
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
};

if let Some(query) = query {
// filter first set of matches out of sketchlist
let matchlist: BinaryHeap<PrefetchResult> = sketchlist
.par_iter()
.filter_map(|sm| {
let mut mm = None;

if let Ok(overlap) = sm.minhash.count_common(&query.minhash, false) {
if overlap >= threshold_hashes {
let result = PrefetchResult {
name: sm.name.clone(),
md5sum: sm.md5sum.clone(),
minhash: sm.minhash.clone(),
overlap,
};
mm = Some(result);
}
mm
}
Err(err) => {
eprintln!("Sketch loading error: {}", err);
eprintln!(
"WARNING: could not load sketches from path '{}'",
q.display()
);
let _ = failed_paths.fetch_add(1, atomic::Ordering::SeqCst);
None
}
};

if let Some(query) = query {
// filter first set of matches out of sketchlist
let matchlist: BinaryHeap<PrefetchResult> = sketchlist
.par_iter()
.filter_map(|sm| {
let mut mm = None;

if let Ok(overlap) = sm.minhash.count_common(&query.minhash, false) {
if overlap >= threshold_hashes {
let result = PrefetchResult {
name: sm.name.clone(),
md5sum: sm.md5sum.clone(),
minhash: sm.minhash.clone(),
overlap,
};
mm = Some(result);
}
mm
}
mm
})
.collect();

if !matchlist.is_empty() {
let prefetch_output = format!("{location}.prefetch.csv");
let gather_output = format!("{location}.gather.csv");
if !matchlist.is_empty() {
let prefetch_output = format!("{location}.prefetch.csv");
let gather_output = format!("{location}.gather.csv");

// save initial list of matches to prefetch output
write_prefetch(&query, Some(prefetch_output), &matchlist).ok();
// save initial list of matches to prefetch output
write_prefetch(&query, Some(prefetch_output), &matchlist).ok();

// now, do the gather!
consume_query_by_gather(query, matchlist, threshold_hashes,
Some(gather_output)).ok();
} else {
println!("No matches to '{}'", location);
}
// now, do the gather!
consume_query_by_gather(query, matchlist, threshold_hashes, Some(gather_output))
.ok();
} else {
println!("No matches to '{}'", location);
}
});

}
});

println!("Processed {} queries total.", processed_queries.into_inner());
println!(
"Processed {} queries total.",
processed_queries.into_inner()
);

let skipped_paths = skipped_paths.into_inner();
let failed_paths = failed_paths.into_inner();

if skipped_paths > 0 {
eprintln!("WARNING: skipped {} query paths - no compatible signatures.",
skipped_paths);
eprintln!(
"WARNING: skipped {} query paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!("WARNING: {} query paths failed to load. See error messages above.",
failed_paths);
eprintln!(
"WARNING: {} query paths failed to load. See error messages above.",
failed_paths
);
}

Ok(())
}
}
6 changes: 3 additions & 3 deletions src/index.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::Path;
use sourmash::sketch::Sketch;
use sourmash::index::revindex::RevIndex;
use sourmash::sketch::Sketch;
use std::path::Path;

use crate::utils::load_sigpaths_from_zip_or_pathlist;

Expand All @@ -27,4 +27,4 @@ pub fn index<P: AsRef<Path>>(
db.index(index_sigs, &template, 0.0, save_paths);

Ok(())
}
}
Loading

0 comments on commit c2f70cb

Please sign in to comment.