Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MRG: add generic support for any type of sketch collection as query or database #430

Merged
merged 111 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
111 commits
Select commit Hold shift + click to select a range
480f319
refactor & rename & consolidate
ctb Aug 17, 2024
e6b1c5b
remove 'lower'
ctb Aug 17, 2024
153f246
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Aug 18, 2024
0d7a556
add cargo doc output for private fn
ctb Aug 18, 2024
df753db
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Aug 18, 2024
1da0cf3
add a few comments/docs
ctb Aug 18, 2024
2e7f027
switch to dev version of sourmash
ctb Aug 18, 2024
6b9e00f
tracking
ctb Aug 18, 2024
2747935
cleaner
ctb Aug 18, 2024
4f49ef8
cleanup
ctb Aug 18, 2024
af1c82d
load rocksdb natively
ctb Aug 18, 2024
53924d6
foo
ctb Aug 18, 2024
e5faed8
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Aug 19, 2024
7649375
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Aug 19, 2024
e4618f0
Merge branch 'ctb_misc_cleanup' into ctb_misc2
ctb Aug 19, 2024
3462f92
cargo fmt
ctb Aug 19, 2024
9823ef6
upd
ctb Aug 20, 2024
bfb5053
upd
ctb Aug 20, 2024
c311a69
fix fmt
ctb Aug 20, 2024
28b43d8
MRG: create `MultiCollection` for collections that span multiple file…
ctb Aug 20, 2024
a1b19ae
clippy fixes
ctb Aug 20, 2024
51a14ac
compiling again
ctb Aug 20, 2024
99bd174
cleanup
ctb Aug 20, 2024
36d33a5
bump sourmash to v0.15.1
ctb Aug 21, 2024
02bf7e9
Merge branch 'bump_sourmash' into ctb_misc2
ctb Aug 21, 2024
7f0b010
check if is rocksdb
ctb Aug 21, 2024
b9972c6
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Aug 21, 2024
5561911
weird error
ctb Aug 21, 2024
dfe56d3
use remove_unwrap branch of sourmash
ctb Aug 21, 2024
e6e80f3
get index to work with MultiCollection
ctb Aug 21, 2024
fed4db3
old bug now fixed
ctb Aug 21, 2024
f5331ef
clippy, format, and fix
ctb Aug 21, 2024
8f90129
make names clearer
ctb Aug 21, 2024
4511347
ditch MultiCollection for index, at least for now
ctb Aug 21, 2024
4ea6730
testy testy
ctb Aug 21, 2024
ac35b24
getting closer
ctb Aug 21, 2024
741a44a
update sourmash
ctb Aug 22, 2024
d429205
mark failing tests
ctb Aug 22, 2024
994fcec
upd
ctb Aug 24, 2024
8451259
cargo fmt
ctb Aug 24, 2024
91b04b5
MRG: test exit from `pairwise` and `multisearch` if no loaded sketche…
ctb Aug 24, 2024
b3e5b81
MRG: switch to more efficient use of `Collection` by removing cloning…
ctb Aug 24, 2024
97db857
MRG: add tests for RocksDB/RevIndex, standalone manifests, and flexib…
ctb Aug 24, 2024
551758f
reenable and fix test_fastgather.py::test_indexed_against
ctb Aug 24, 2024
e3e95fc
impl Deref for MultiCollection
ctb Aug 24, 2024
8d39a4f
clippy
ctb Aug 24, 2024
3439592
switch to using load_sketches method
ctb Aug 24, 2024
d3fa529
deref doesn't actually make sense for MultiCollection
ctb Aug 24, 2024
5a20381
update to latest sourmash code
ctb Aug 27, 2024
6563b0a
update to latest sourmash code
ctb Aug 27, 2024
45608c7
simplify
ctb Aug 27, 2024
bd256dd
update to latest sourmash code
ctb Aug 27, 2024
675974e
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Aug 27, 2024
afa0faf
remove unnecessary flag
ctb Aug 27, 2024
89b1c08
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Sep 8, 2024
d120547
MRG: support & test loading of standalone manifests within pathlists …
ctb Sep 9, 2024
73c7f53
MRG: documentation updates based on new collection loading (#444)
ctb Sep 9, 2024
e32638a
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Sep 9, 2024
fcae8e7
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Sep 10, 2024
74d7367
Update src/lib.rs
ctb Sep 10, 2024
4e28d64
switch unwrap to expect
ctb Sep 10, 2024
74e2217
Merge branch 'ctb_misc2' of github.com:sourmash-bio/sourmash_plugin_b…
ctb Sep 10, 2024
de35cd5
move unwrap to expect
ctb Sep 10, 2024
1e5ac07
minor cleanup
ctb Sep 10, 2024
388a49a
cargo fmt
ctb Sep 11, 2024
7be1883
provide legacy method to avoid xfail on index loading
ctb Sep 15, 2024
679b972
switch to using reference
ctb Sep 15, 2024
a9143d0
update docs to reflect pathlist behavior
ctb Sep 15, 2024
574cd28
test recursive nature of MultiCollection
ctb Sep 15, 2024
a5b4299
re-enable test that is now passing
ctb Sep 15, 2024
74b9ae6
update to latest sourmash
ctb Sep 16, 2024
9df421d
upd sourmash
ctb Sep 16, 2024
ccd26da
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Sep 16, 2024
847917f
update sourmash
ctb Sep 17, 2024
9733d47
mut MultiCollection
ctb Sep 18, 2024
019fd1b
cleanup
ctb Sep 18, 2024
780fbda
update after merge of sourmash-bio/sourmash#3305
ctb Sep 21, 2024
84934a7
fix contains_revindex
ctb Sep 22, 2024
56fb948
add trace commands for tracing loading
ctb Sep 22, 2024
6550683
use released version of sourmash
ctb Sep 25, 2024
b510e8e
add support for ignoring abundance
ctb Oct 2, 2024
0993b39
cargo fmt
ctb Oct 2, 2024
ac82fb3
avoid downsampling until we know there is overlap
ctb Oct 4, 2024
7ea9a40
change downsample to true; add panic assertion
ctb Oct 5, 2024
03b9da0
move downsampling side guard
ctb Oct 5, 2024
b954daa
eliminate redundant overlap check
ctb Oct 5, 2024
b0bcc66
move calc_abund_stats
ctb Oct 5, 2024
a2871c0
extract abundance code into own function; avoid downsampling if poss
ctb Oct 5, 2024
d853ef3
cleanup
ctb Oct 5, 2024
207efb2
Merge branch 'toggle_manysearch_abund' into ctb_misc2
ctb Oct 5, 2024
453f943
fmt
ctb Oct 5, 2024
5380325
Merge branch 'toggle_manysearch_abund' into ctb_misc2
ctb Oct 6, 2024
4f5fefd
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Oct 9, 2024
69fd38b
update to next sourmash release
ctb Oct 11, 2024
ee580b6
cargo fmt
ctb Oct 11, 2024
9814051
upd sourmash
ctb Oct 11, 2024
d27b03e
correct numbers
ctb Oct 11, 2024
e35111a
upd sourmash
ctb Oct 12, 2024
4778862
upd sourmash
ctb Oct 12, 2024
bd18277
Merge branch 'update_sourmash_latest' into ctb_misc2
ctb Oct 12, 2024
2563b0b
upd sourmash
ctb Oct 12, 2024
a0e02ef
upd sourmash
ctb Oct 13, 2024
9b448c8
use new try_into() and eliminate several clone()s
ctb Oct 13, 2024
58502d8
Merge branch 'update_sourmash_latest' into ctb_misc2
ctb Oct 13, 2024
4a780f4
refactor a bit more
ctb Oct 13, 2024
c43f0d9
deallocate collection?
ctb Oct 13, 2024
87118de
upd sourmash
ctb Oct 13, 2024
ee296e7
cargo fmt
ctb Oct 13, 2024
a5bf5fa
fix merge foo
ctb Oct 13, 2024
fff1cd9
Merge branch 'main' of github.com:sourmash-bio/sourmash_plugin_branch…
ctb Oct 14, 2024
2d8b2bb
merge
ctb Oct 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
rustdocflags = ["--document-private-items"]
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ sourmash = { version = "0.16.0", features = ["branchwater"] }
serde_json = "1.0.128"
niffler = "2.4.0"
log = "0.4.22"
env_logger = { version = "0.11.5", optional = true }
env_logger = { version = "0.11.5" }
simple-error = "0.3.1"
anyhow = "1.0.89"
zip = { version = "2.0", default-features = false }
Expand Down
205 changes: 148 additions & 57 deletions doc/README.md

Large diffs are not rendered by default.

16 changes: 14 additions & 2 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ pub fn fastgather(
)
}
// get single query sig and minhash
let query_sig = query_collection.sig_for_dataset(0)?; // need this for original md5sum
let query_sig = query_collection.get_first_sig().expect("no queries!?");

// @CTB avoid clone?
let query_sig_ds = query_sig.clone().select(selection)?; // downsample
let query_mh = match query_sig_ds.minhash() {
Some(query_mh) => query_mh,
Expand Down Expand Up @@ -89,7 +91,17 @@ pub fn fastgather(
}

if prefetch_output.is_some() {
write_prefetch(&query_sig, prefetch_output, &matchlist).ok();
let query_filename = query_sig.filename();
let query_name = query_sig.name();
let query_md5 = query_sig.md5sum();
write_prefetch(
query_filename,
query_name,
query_md5,
prefetch_output,
&matchlist,
)
.ok();
}

// run the gather!
Expand Down
207 changes: 108 additions & 99 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use anyhow::Result;
use rayon::prelude::*;

use sourmash::prelude::ToWriter;
use sourmash::prelude::{Storage, ToWriter};
use sourmash::{selection::Selection, signature::SigsTrait};

use std::sync::atomic;
Expand All @@ -15,13 +15,14 @@ use camino::Utf8Path as PathBuf;
use std::collections::HashSet;
use std::fs::File;

use log::trace;

use sourmash::signature::Signature;
use sourmash::sketch::minhash::KmerMinHash;
use sourmash::sketch::Sketch;

use crate::utils::{
consume_query_by_gather, load_collection, load_sketches, write_prefetch, PrefetchResult,
ReportType,
consume_query_by_gather, load_collection, write_prefetch, PrefetchResult, ReportType,
};

pub fn fastmultigather(
Expand All @@ -34,6 +35,8 @@ pub fn fastmultigather(
save_matches: bool,
create_empty_results: bool,
) -> Result<()> {
let _ = env_logger::try_init();

// load query collection
let query_collection = load_collection(
&query_filepath,
Expand All @@ -50,8 +53,7 @@ pub fn fastmultigather(
1
}
}
.try_into()
.unwrap();
.try_into()?;

println!("threshold overlap: {} {}", threshold_hashes, threshold_bp);

Expand All @@ -63,123 +65,130 @@ pub fn fastmultigather(
allow_failed_sigpaths,
)?;
// load against sketches into memory, downsampling on the way
let against = load_sketches(against_collection, selection, ReportType::Against).unwrap();
let against = against_collection.load_sketches(selection)?;

// Iterate over all queries => do prefetch and gather!
let processed_queries = AtomicUsize::new(0);
let skipped_paths = AtomicUsize::new(0);
let failed_paths = AtomicUsize::new(0);

query_collection.par_iter().for_each(|(_idx, record)| {
query_collection.par_iter().for_each(|(c, _idx, record)| {
// increment counter of # of queries. q: could we instead use the _idx from par_iter(), or will it vary based on thread?
let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst);
// Load query sig (downsampling happens here)
match query_collection.sig_from_record(record) {
trace!(
"fastmultigather query load: from:{} idx:{} loc:{}",
c.storage().spec(),
_idx,
record.internal_location()
);
match c.sig_from_record(record) {
Ok(query_sig) => {
let name = query_sig.name();
let prefix = name.split(' ').next().unwrap_or_default().to_string();
let location = PathBuf::new(&prefix).file_name().unwrap();
if let Some(query_mh) = query_sig.minhash() {
let mut matching_hashes = if save_matches { Some(Vec::new()) } else { None };
let matchlist: BinaryHeap<PrefetchResult> = against
.iter()
.filter_map(|against| {
let mut mm: Option<PrefetchResult> = None;
if let Ok(overlap) = against.minhash.count_common(query_mh, false) {
if overlap >= threshold_hashes {
if save_matches {
if let Ok(intersection) =
against.minhash.intersection(query_mh)
{
matching_hashes
.as_mut()
.unwrap()
.extend(intersection.0);
}
}
let result = PrefetchResult {
name: against.name.clone(),
md5sum: against.md5sum.clone(),
minhash: against.minhash.clone(),
location: against.location.clone(),
overlap,
};
mm = Some(result);
}
}
mm
})
.collect();
if !matchlist.is_empty() {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);

// Save initial list of matches to prefetch output
write_prefetch(&query_sig, Some(prefetch_output), &matchlist).ok();

// Now, do the gather!
consume_query_by_gather(
query_sig.clone(),
scaled as u64,
matchlist,
threshold_hashes,
Some(gather_output),
)
.ok();

// Save matching hashes to .sig file if save_matches is true
if save_matches {
if let Some(hashes) = matching_hashes {
let sig_filename = format!("{}.matches.sig", name);
if let Ok(mut file) = File::create(&sig_filename) {
let unique_hashes: HashSet<u64> = hashes.into_iter().collect();
let mut new_mh = KmerMinHash::new(
query_mh.scaled().try_into().unwrap(),
query_mh.ksize().try_into().unwrap(),
query_mh.hash_function().clone(),
query_mh.seed(),
false,
query_mh.num(),
);
new_mh
.add_many(&unique_hashes.into_iter().collect::<Vec<_>>())
.ok();
let mut signature = Signature::default();
signature.push(Sketch::MinHash(new_mh));
signature.set_filename(&name);
if let Err(e) = signature.to_writer(&mut file) {
eprintln!("Error writing signature file: {}", e);
let query_filename = query_sig.filename();
let query_name = query_sig.name();
let query_md5 = query_sig.md5sum();

let query_mh = query_sig.minhash().expect("cannot get sketch");
let mut matching_hashes = if save_matches { Some(Vec::new()) } else { None };
let matchlist: BinaryHeap<PrefetchResult> = against
.iter()
.filter_map(|against| {
let mut mm: Option<PrefetchResult> = None;
if let Ok(overlap) = against.minhash.count_common(query_mh, false) {
if overlap >= threshold_hashes {
if save_matches {
if let Ok(intersection) = against.minhash.intersection(query_mh)
{
matching_hashes.as_mut().unwrap().extend(intersection.0);
}
} else {
eprintln!("Error creating signature file: {}", sig_filename);
}
let result = PrefetchResult {
name: against.name.clone(),
md5sum: against.md5sum.clone(),
minhash: against.minhash.clone(),
location: against.location.clone(),
overlap,
};
mm = Some(result);
}
}
} else {
println!("No matches to '{}'", location);
if create_empty_results {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);
// touch output files
match std::fs::File::create(&prefetch_output) {
Ok(_) => {}
Err(e) => {
eprintln!("Failed to create empty prefetch output: {}", e)
mm
})
.collect();

if !matchlist.is_empty() {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);

// Save initial list of matches to prefetch output
write_prefetch(
query_filename,
query_name,
query_md5,
Some(prefetch_output),
&matchlist,
)
.ok();

// Now, do the gather!
consume_query_by_gather(
query_sig.clone(),
scaled as u64,
matchlist,
threshold_hashes,
Some(gather_output),
)
.ok();

// Save matching hashes to .sig file if save_matches is true
if save_matches {
if let Some(hashes) = matching_hashes {
let sig_filename = format!("{}.matches.sig", name);
if let Ok(mut file) = File::create(&sig_filename) {
let unique_hashes: HashSet<u64> = hashes.into_iter().collect();
let mut new_mh = KmerMinHash::new(
query_mh.scaled(),
query_mh.ksize().try_into().unwrap(),
query_mh.hash_function().clone(),
query_mh.seed(),
false,
query_mh.num(),
);
new_mh
.add_many(&unique_hashes.into_iter().collect::<Vec<_>>())
.ok();
let mut signature = Signature::default();
signature.push(Sketch::MinHash(new_mh));
signature.set_filename(&name);
if let Err(e) = signature.to_writer(&mut file) {
eprintln!("Error writing signature file: {}", e);
}
}
match std::fs::File::create(&gather_output) {
Ok(_) => {}
Err(e) => eprintln!("Failed to create empty gather output: {}", e),
} else {
eprintln!("Error creating signature file: {}", sig_filename);
}
}
}
} else {
// different warning here? Could not load sig from record??
eprintln!(
"WARNING: no compatible sketches in path '{}'",
record.internal_location()
);
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
println!("No matches to '{}'", location);
if create_empty_results {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);
// touch output files
match std::fs::File::create(&prefetch_output) {
Ok(_) => {}
Err(e) => {
eprintln!("Failed to create empty prefetch output: {}", e)
}
}
match std::fs::File::create(&gather_output) {
Ok(_) => {}
Err(e) => eprintln!("Failed to create empty gather output: {}", e),
}
}
}
}
Err(_) => {
Expand Down
54 changes: 42 additions & 12 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use sourmash::prelude::*;
use std::path::Path;

use crate::utils::{load_collection, ReportType};
use sourmash::collection::{Collection, CollectionSet};

pub fn index<P: AsRef<Path>>(
siglist: String,
Expand All @@ -13,24 +14,53 @@ pub fn index<P: AsRef<Path>>(
allow_failed_sigpaths: bool,
use_internal_storage: bool,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Loading siglist");
eprintln!("Loading sketches from {}", siglist);

let collection = load_collection(
let multi = match load_collection(
&siglist,
selection,
ReportType::General,
allow_failed_sigpaths,
)?;
) {
Ok(multi) => multi,
Err(err) => return Err(err.into()),
};
eprintln!("Found {} sketches total.", multi.len());

let mut index = RevIndex::create(
output.as_ref(),
collection.select(selection)?.try_into()?,
colors,
)?;
// Try to convert it into a Collection and then CollectionSet.
let collection = match Collection::try_from(multi.clone()) {
// conversion worked!
Ok(c) => {
let cs: CollectionSet = c.select(selection)?.try_into()?;
Ok(cs)
}
// conversion failed; can we/should we load it into memory?
Err(_) => {
if use_internal_storage {
eprintln!("WARNING: loading all sketches into memory in order to index.");
eprintln!("See 'index' documentation for details.");
let c: Collection = multi.load_all_sigs(selection)?;
let cs: CollectionSet = c.try_into()?;
Ok(cs)
} else {
Err(
anyhow::anyhow!("cannot index this type of collection with external storage")
.into(),
)
}
}
};

if use_internal_storage {
index.internalize_storage()?;
}
match collection {
Ok(collection) => {
eprintln!("Indexing {} sketches.", collection.len());
let mut index = RevIndex::create(output.as_ref(), collection, colors)?;

Ok(())
if use_internal_storage {
index.internalize_storage()?;
}
Ok(())
}
Err(e) => Err(e),
}
}
Loading