Skip to content

Commit

Permalink
harmonize error text and output filenames
Browse files Browse the repository at this point in the history
  • Loading branch information
bluegenes committed Jan 27, 2024
1 parent 4e3b7ee commit 14ee1bd
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 102 deletions.
115 changes: 67 additions & 48 deletions src/fastmultigather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,29 @@
use anyhow::Result;
use rayon::prelude::*;

use serde::Serialize;
use sourmash::selection::Selection;
use sourmash::sketch::Sketch;
use sourmash::storage::SigStore;
use sourmash::{selection, signature::Signature};
use sourmash::sketch::Sketch;
use sourmash::selection::Selection;

use std::sync::atomic;
use std::sync::atomic::AtomicUsize;

use std::collections::BinaryHeap;

use camino::Utf8PathBuf;
use camino::{Utf8Path, Utf8PathBuf};

use crate::utils::{
consume_query_by_gather, load_collection, load_sigpaths_from_zip_or_pathlist, load_sketches_from_zip_or_pathlist, prepare_query, write_prefetch, PrefetchResult, ReportType
consume_query_by_gather, load_collection, load_sigpaths_from_zip_or_pathlist,
load_sketches_from_zip_or_pathlist, prepare_query, write_prefetch, PrefetchResult, ReportType,
};

pub fn fastmultigather(
query_filepath: camino::Utf8PathBuf,
against_filepath: camino::Utf8PathBuf,
threshold_bp: usize,
scaled: usize,
// template: Sketch,
selection: &Selection,
) -> Result<()> {
// load the list of query paths
Expand Down Expand Up @@ -63,58 +64,76 @@ pub fn fastmultigather(

query_collection.par_iter().for_each(|(idx, record)| {
// increment counter of # of queries. q: could we instead use the index from par_iter()?
let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst);
let _i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst);
// Load query sig
match query_collection.sig_for_dataset(idx) {
Ok(query_sig) => {
let location = query_sig.filename();
for sketch in query_sig.iter() {
// Access query MinHash
if let Sketch::MinHash(query) = sketch {
let matchlist: BinaryHeap<PrefetchResult> = sketchlist
.par_iter()
.filter_map(|sm| {
let mut mm = None;
// Access against MinHash
if let Some(sketch) = sm.sketches().get(0) {
if let Sketch::MinHash(against_sketch) = sketch {
if let Ok(overlap) = against_sketch.count_common(&query, false) {
if overlap >= threshold_hashes {
let result = PrefetchResult {
name: sm.name(),
md5sum: sm.md5sum().clone(),
minhash: against_sketch.clone(),
overlap,
};
mm = Some(result);
match query_collection.sig_for_dataset(idx) {
Ok(query_sig) => {
let prefix = query_sig.name();
let location = Utf8Path::new(&prefix).file_name().unwrap();
for sketch in query_sig.iter() {
// Access query MinHash
if let Sketch::MinHash(query) = sketch {
let matchlist: BinaryHeap<PrefetchResult> = sketchlist
.par_iter()
.filter_map(|sm| {
let mut mm = None;
// Access against MinHash
if let Some(sketch) = sm.sketches().get(0) {
if let Sketch::MinHash(against_sketch) = sketch {
if let Ok(overlap) =
against_sketch.count_common(&query, true)
{
if overlap >= threshold_hashes {
let result = PrefetchResult {
name: sm.name(),
md5sum: sm.md5sum().clone(),
minhash: against_sketch.clone(),
overlap,
};
mm = Some(result);
}
}
}
}
}
mm
})
.collect();
if !matchlist.is_empty() {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);

// Save initial list of matches to prefetch output
write_prefetch(&query_sig, Some(prefetch_output), &matchlist).ok();

// Now, do the gather!
consume_query_by_gather(query_sig.clone(), matchlist, threshold_hashes, Some(gather_output)).ok();
mm
})
.collect();
if !matchlist.is_empty() {
let prefetch_output = format!("{}.prefetch.csv", location);
let gather_output = format!("{}.gather.csv", location);

// Save initial list of matches to prefetch output
write_prefetch(&query_sig, Some(prefetch_output), &matchlist).ok();

// Now, do the gather!
consume_query_by_gather(
query_sig.clone(),
matchlist,
threshold_hashes,
Some(gather_output),
)
.ok();
} else {
println!("No matches to '{}'", location);
}
} else {
println!("No matches to '{}'", location);
eprintln!(
"WARNING: no compatible sketches in path '{}'",
record.internal_location()
);
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
}
}
Err(_) => {
eprintln!(
"WARNING: no compatible sketches in path '{}'",
record.internal_location()
);
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
}
Err(_) => {
eprintln!("WARNING: no compatible sketches in path '{}'", record.internal_location());
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
}
});
});

println!(
"Processed {} queries total.",
Expand Down
55 changes: 23 additions & 32 deletions src/python/tests/test_multigather.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ def test_simple(runtmp, zip_against):

print(os.listdir(runtmp.output('')))

g_output = runtmp.output('SRR606249.sig.gz.gather.csv')
p_output = runtmp.output('SRR606249.sig.gz.prefetch.csv')
g_output = runtmp.output('SRR606249.gather.csv')
p_output = runtmp.output('SRR606249.prefetch.csv')
assert os.path.exists(p_output)

# check prefetch output (only non-indexed gather)
Expand All @@ -79,6 +79,7 @@ def test_simple(runtmp, zip_against):

assert os.path.exists(g_output)
df = pandas.read_csv(g_output)
print(df)
assert len(df) == 3
keys = set(df.keys())
assert keys == {'query_filename', 'query_name', 'query_md5', 'match_name', 'match_md5', 'rank', 'intersect_bp'}
Expand Down Expand Up @@ -109,9 +110,8 @@ def test_simple_zip_query(runtmp):

print(os.listdir(runtmp.output('')))

# outputs are based on md5sum, e.g. "{md5}.sig.gz.gather.csv"
g_output = runtmp.output('dec29ca72e68db0f15de0b1b46f82fc5.sig.gz.gather.csv')
p_output = runtmp.output('dec29ca72e68db0f15de0b1b46f82fc5.sig.gz.prefetch.csv')
g_output = runtmp.output('SRR606249.gather.csv')
p_output = runtmp.output('SRR606249.prefetch.csv')

# check prefetch output (only non-indexed gather)
assert os.path.exists(p_output)
Expand Down Expand Up @@ -294,10 +294,7 @@ def test_nomatch_query(runtmp, capfd, indexed, zip_query):
captured = capfd.readouterr()
print(captured.err)

if zip_query:
assert "WARNING: no compatible sketches in path " not in captured.err
else:
assert "WARNING: no compatible sketches in path " in captured.err
# assert "WARNING: no compatible sketches in path " in captured.err
assert "WARNING: skipped 1 query paths - no compatible signatures." in captured.err


Expand All @@ -324,7 +321,7 @@ def test_missing_against(runtmp, capfd, zip_against):
captured = capfd.readouterr()
print(captured.err)

assert 'Error: No such file or directory ' in captured.err
assert 'Error: No such file or directory' in captured.err


def test_bad_against(runtmp, capfd):
Expand All @@ -341,7 +338,7 @@ def test_bad_against(runtmp, capfd):
captured = capfd.readouterr()
print(captured.err)

assert 'Error: invalid line in fromfile ' in captured.err
assert 'Error: invalid line in fromfile' in captured.err


def test_bad_against_2(runtmp, capfd):
Expand Down Expand Up @@ -390,7 +387,7 @@ def test_bad_against_3(runtmp, capfd, zip_query):
captured = capfd.readouterr()
print(captured.err)

assert 'Error: invalid Zip archive: Could not find central directory end' in captured.err
assert 'InvalidArchive' in captured.err


def test_empty_against(runtmp, capfd):
Expand All @@ -409,7 +406,7 @@ def test_empty_against(runtmp, capfd):
captured = capfd.readouterr()
print(captured.err)

assert "Loaded 0 search signature(s)" in captured.err
assert "Sketch loading error: No such file or directory" in captured.err
assert "Error: No search signatures loaded, exiting." in captured.err


Expand Down Expand Up @@ -465,11 +462,8 @@ def test_md5(runtmp, zip_query):

print(os.listdir(runtmp.output('')))

g_output = runtmp.output('SRR606249.sig.gz.gather.csv')
p_output = runtmp.output('SRR606249.sig.gz.prefetch.csv')
if zip_query:
g_output = runtmp.output('dec29ca72e68db0f15de0b1b46f82fc5.sig.gz.gather.csv')
p_output = runtmp.output('dec29ca72e68db0f15de0b1b46f82fc5.sig.gz.prefetch.csv')
g_output = runtmp.output('SRR606249.gather.csv')
p_output = runtmp.output('SRR606249.prefetch.csv')

# check prefetch output (only non-indexed gather)
assert os.path.exists(p_output)
Expand Down Expand Up @@ -560,11 +554,8 @@ def test_csv_columns_vs_sourmash_prefetch(runtmp, zip_query, zip_against):
finally:
os.chdir(cwd)

g_output = runtmp.output('SRR606249.sig.gz.gather.csv')
p_output = runtmp.output('SRR606249.sig.gz.prefetch.csv')
if zip_query:
g_output = runtmp.output('dec29ca72e68db0f15de0b1b46f82fc5.sig.gz.gather.csv')
p_output = runtmp.output('dec29ca72e68db0f15de0b1b46f82fc5.sig.gz.prefetch.csv')
g_output = runtmp.output('SRR606249.gather.csv')
p_output = runtmp.output('SRR606249.prefetch.csv')

assert os.path.exists(p_output)
assert os.path.exists(g_output)
Expand Down Expand Up @@ -627,14 +618,14 @@ def test_simple_protein(runtmp):
# test basic protein execution
sigs = get_test_data('protein.zip')

sig_names = ["GCA_001593935.1_ASM159393v1_protein.faa.gz", "GCA_001593925.1_ASM159392v1_protein.faa.gz"]
sig_names = ["GCA_001593935", "GCA_001593925"]

runtmp.sourmash('scripts', 'fastmultigather', sigs, sigs,
'-s', '100', '--moltype', 'protein', '-k', '19')

for qsig in sig_names:
g_output = runtmp.output(os.path.join(qsig + '.sig.gather.csv'))
p_output = runtmp.output(os.path.join(qsig + '.sig.prefetch.csv'))
g_output = runtmp.output(os.path.join(qsig + '.gather.csv'))
p_output = runtmp.output(os.path.join(qsig + '.prefetch.csv'))
print(g_output)
assert os.path.exists(g_output)
assert os.path.exists(p_output)
Expand All @@ -652,14 +643,14 @@ def test_simple_dayhoff(runtmp):
# test basic protein execution
sigs = get_test_data('dayhoff.zip')

sig_names = ["GCA_001593935.1_ASM159393v1_protein.faa.gz", "GCA_001593925.1_ASM159392v1_protein.faa.gz"]
sig_names = ["GCA_001593935", "GCA_001593925"]

runtmp.sourmash('scripts', 'fastmultigather', sigs, sigs,
'-s', '100', '--moltype', 'dayhoff', '-k', '19')

for qsig in sig_names:
g_output = runtmp.output(os.path.join(qsig + '.sig.gather.csv'))
p_output = runtmp.output(os.path.join(qsig + '.sig.prefetch.csv'))
g_output = runtmp.output(os.path.join(qsig + '.gather.csv'))
p_output = runtmp.output(os.path.join(qsig + '.prefetch.csv'))
print(g_output)
assert os.path.exists(g_output)
assert os.path.exists(p_output)
Expand All @@ -677,14 +668,14 @@ def test_simple_hp(runtmp):
# test basic protein execution
sigs = get_test_data('hp.zip')

sig_names = ["GCA_001593935.1_ASM159393v1_protein.faa.gz", "GCA_001593925.1_ASM159392v1_protein.faa.gz"]
sig_names = ["GCA_001593935", "GCA_001593925"]

runtmp.sourmash('scripts', 'fastmultigather', sigs, sigs,
'-s', '100', '--moltype', 'hp', '-k', '19')

for qsig in sig_names:
g_output = runtmp.output(os.path.join(qsig + '.sig.gather.csv'))
p_output = runtmp.output(os.path.join(qsig + '.sig.prefetch.csv'))
g_output = runtmp.output(os.path.join(qsig + '.gather.csv'))
p_output = runtmp.output(os.path.join(qsig + '.prefetch.csv'))
print(g_output)
assert os.path.exists(g_output)
assert os.path.exists(p_output)
Expand Down
Loading

0 comments on commit 14ee1bd

Please sign in to comment.