Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MRG: Use core manifest utils #217

Merged
merged 12 commits into from
Feb 23, 2024
362 changes: 255 additions & 107 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ crate-type = ["cdylib"]
pyo3 = { version = "0.20.2", features = ["extension-module", "anyhow"] }
rayon = "1.8.1"
serde = { version = "1.0.196", features = ["derive"] }
sourmash = { version = "0.12.1", features = ["branchwater"] }
#sourmash = { version = "0.12.1", features = ["branchwater"] }
sourmash = { git="https://github.com/sourmash-bio/sourmash", rev="297ff0b963adbed7462508ea1247fe192be02b98", features = ["branchwater"] }
serde_json = "1.0.113"
niffler = "2.4.0"
log = "0.4.14"
Expand Down
2 changes: 1 addition & 1 deletion src/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub fn check(index: camino::Utf8PathBuf, quick: bool) -> Result<(), Box<dyn std:
}

println!("Opening DB");
let db = RevIndex::open(index, true)?;
let db = RevIndex::open(index, true, None)?;

println!("Starting check");
db.check(quick);
Expand Down
1 change: 1 addition & 0 deletions src/fastgather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::utils::{
ReportType,
};

#[allow(clippy::too_many_arguments)]
pub fn fastgather(
query_filepath: String,
against_filepath: String,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ fn do_manysearch(
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
fn do_fastgather(
query_filename: String,
siglist_path: String,
Expand Down
57 changes: 22 additions & 35 deletions src/manysketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,8 @@ fn parse_params_str(params_strs: String) -> Result<Vec<Params>, String> {
Ok(unique_params.into_iter().collect())
}

fn build_siginfo(
params: &[Params],
moltype: &str,
name: &str,
filename: &Path,
) -> (Vec<Signature>, Vec<Params>) {
fn build_siginfo(params: &[Params], moltype: &str) -> Vec<Signature> {
let mut sigs = Vec::new();
let mut params_vec = Vec::new();

for param in params.iter().cloned() {
match moltype {
Expand All @@ -112,20 +106,11 @@ fn build_siginfo(
.track_abundance(param.track_abundance)
.build();

// let sig = Signature::from_params(&cp); // cant set name with this
let template = sourmash::cmd::build_template(&cp);
let sig = Signature::builder()
.hash_function("0.murmur64")
.name(Some(name.to_string()))
.filename(Some(filename.to_string()))
.signatures(template)
.build();
let sig = Signature::from_params(&cp);
sigs.push(sig);

params_vec.push(param);
}

(sigs, params_vec)
sigs
}

pub fn manysketch(
Expand All @@ -144,7 +129,7 @@ pub fn manysketch(
bail!("No files to load, exiting.");
}

// if output doesnt end in zip, bail
// if output doesn't end in zip, bail
if Path::new(&output)
.extension()
.map_or(true, |ext| ext != "zip")
Expand Down Expand Up @@ -195,7 +180,7 @@ pub fn manysketch(
}

// build sig templates from params
let (mut sigs, sig_params) = build_siginfo(&params_vec, moltype, name, filename);
let mut sigs = build_siginfo(&params_vec, moltype);
// if no sigs to build, skip
if sigs.is_empty() {
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
Expand All @@ -212,36 +197,38 @@ pub fn manysketch(
}
};
// parse fasta and add to signature
let mut set_name = false;
while let Some(record_result) = reader.next() {
match record_result {
Ok(record) => {
// do we need to normalize to make sure all the bases are consistently capitalized?
// let norm_seq = record.normalize(false);
for sig in &mut sigs {
sigs.iter_mut().for_each(|sig| {
if !set_name {
sig.set_name(name);
sig.set_filename(filename.as_str());
set_name = true;
};
if moltype == "protein" {
sig.add_protein(&record.seq()).unwrap();
sig.add_protein(&record.seq())
.expect("Failed to add protein");
} else {
sig.add_sequence(&record.seq(), true).unwrap();
sig.add_sequence(&record.seq(), true)
.expect("Failed to add sequence");
// if not force, panics with 'N' in dna sequence
}
}
}
Err(err) => {
eprintln!("Error while processing record: {:?}", err);
});
}
Err(err) => eprintln!("Error while processing record: {:?}", err),
}
}
Some((sigs, sig_params, filename))

Some(sigs)
})
.try_for_each_with(
send.clone(),
|s: &mut std::sync::Arc<std::sync::mpsc::SyncSender<ZipMessage>>,
(sigs, sig_params, filename)| {
if let Err(e) = s.send(ZipMessage::SignatureData(
sigs,
sig_params,
filename.clone(),
)) {
|s: &mut std::sync::Arc<std::sync::mpsc::SyncSender<ZipMessage>>, sigs| {
if let Err(e) = s.send(ZipMessage::SignatureData(sigs)) {
Err(format!("Unable to send internal data: {:?}", e))
} else {
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/mastiff_manygather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn mastiff_manygather(
bail!("'{}' is not a valid RevIndex database", index);
}
// Open database once
let db = RevIndex::open(index, true)?;
let db = RevIndex::open(index, true, None)?;
println!("Loaded DB");

let query_collection = load_collection(
Expand Down
2 changes: 1 addition & 1 deletion src/mastiff_manysearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub fn mastiff_manysearch(
bail!("'{}' is not a valid RevIndex database", index);
}
// Open database once
let db = RevIndex::open(index, true)?;
let db = RevIndex::open(index, true, None)?;

println!("Loaded DB");

Expand Down
7 changes: 5 additions & 2 deletions src/python/tests/test_sketch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ def get_test_data(filename):


def make_file_csv(filename, genome_paths, protein_paths = []):
# equalize path lengths by adding "".
names = [os.path.basename(x).split('.fa')[0] for x in genome_paths]
# Check if the number of protein paths is less than genome paths
# and fill in the missing paths with "".
if len(protein_paths) < len(genome_paths):
protein_paths.extend(["" for _ in range(len(genome_paths) - len(protein_paths))])
elif len(genome_paths) < len(protein_paths):
genome_paths.extend(["" for _ in range(len(protein_paths) - len(genome_paths))])
names = [os.path.basename(x).split('.fa')[0] for x in protein_paths]

with open(filename, 'wt') as fp:
fp.write("name,genome_filename,protein_filename\n")
for name, genome_path, protein_path in zip(names, genome_paths, protein_paths):
Expand Down
90 changes: 11 additions & 79 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/// Utility functions for sourmash_plugin_branchwater.
use rayon::prelude::*;
use sourmash::encodings::HashFunctions;
use sourmash::manifest::Manifest;
use sourmash::selection::Select;

use anyhow::{anyhow, Context, Result};
Expand All @@ -19,12 +18,11 @@ use std::sync::atomic;
use std::sync::atomic::AtomicUsize;

use sourmash::collection::Collection;
use sourmash::manifest::Record;
use sourmash::manifest::{Manifest, Record};
use sourmash::selection::Selection;
use sourmash::signature::{Signature, SigsTrait};
use sourmash::sketch::minhash::KmerMinHash;
use sourmash::storage::{FSStorage, InnerStorage, SigStore};

/// Track a name/minhash.

pub struct SmallSignature {
Expand Down Expand Up @@ -377,11 +375,11 @@ fn collection_from_pathlist(
let n_failed = AtomicUsize::new(0);
let records: Vec<Record> = lines
.par_iter()
.filter_map(|path| match Signature::from_path(&path) {
.filter_map(|path| match Signature::from_path(path) {
Ok(signatures) => {
let recs: Vec<Record> = signatures
.into_iter()
.flat_map(|v| Record::from_sig(&v, &path))
.flat_map(|v| Record::from_sig(&v, path))
.collect();
Some(recs)
}
Expand Down Expand Up @@ -760,47 +758,6 @@ impl Serialize for BoolPython {
}
}

pub fn make_manifest_row(
sig: &Signature,
filename: &Path,
internal_location: &str,
scaled: u64,
num: u32,
abund: bool,
is_dna: bool,
is_protein: bool,
) -> ManifestRow {
if is_dna && is_protein {
panic!("Both is_dna and is_protein cannot be true at the same time.");
} else if !is_dna && !is_protein {
panic!("Either is_dna or is_protein must be true.");
}
let moltype = if is_dna {
"DNA".to_string()
} else {
"protein".to_string()
};
let sketch = &sig.sketches()[0];
let ksize: u32 = if is_dna {
sketch.ksize() as u32
} else {
sketch.ksize() as u32 / 3
};
ManifestRow {
internal_location: internal_location.to_string(),
md5: sig.md5sum(),
md5short: sig.md5sum()[0..8].to_string(),
ksize: ksize,
moltype,
num,
scaled,
n_hashes: sketch.size(),
with_abundance: BoolPython(abund),
name: sig.name().to_string(),
filename: filename.to_string(),
}
}

pub fn open_stdout_or_file(output: Option<String>) -> Box<dyn Write + Send + 'static> {
// if output is a file, use open_output_file
if let Some(path) = output {
Expand Down Expand Up @@ -845,7 +802,7 @@ impl Hash for Params {
}

pub enum ZipMessage {
SignatureData(Vec<Signature>, Vec<Params>, PathBuf),
SignatureData(Vec<Signature>),
WriteManifest,
}

Expand All @@ -863,18 +820,15 @@ pub fn sigwriter(
.compression_method(zip::CompressionMethod::Stored)
.large_file(true);
let mut zip = zip::ZipWriter::new(file_writer);
let mut manifest_rows: Vec<ManifestRow> = Vec::new();
let mut manifest_rows: Vec<Record> = Vec::new();
// keep track of md5sum occurrences to prevent overwriting duplicates
let mut md5sum_occurrences: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();

while let Ok(message) = recv.recv() {
match message {
ZipMessage::SignatureData(sigs, params, filename) => {
if sigs.len() != params.len() {
bail!("Mismatched lengths of signatures and parameters");
}
for (sig, param) in sigs.iter().zip(params.iter()) {
ZipMessage::SignatureData(sigs) => {
for sig in sigs.iter() {
let md5sum_str = sig.md5sum();
let count = md5sum_occurrences.entry(md5sum_str.clone()).or_insert(0);
*count += 1;
Expand All @@ -884,38 +838,16 @@ pub fn sigwriter(
format!("signatures/{}.sig.gz", md5sum_str)
};
write_signature(sig, &mut zip, options, &sig_filename);
manifest_rows.push(make_manifest_row(
sig,
&filename,
&sig_filename,
param.scaled,
param.num,
param.track_abundance,
param.is_dna,
param.is_protein,
));
let records: Vec<Record> = Record::from_sig(sig, sig_filename.as_str());
manifest_rows.extend(records);
}
}
ZipMessage::WriteManifest => {
println!("Writing manifest");
// Start the CSV file inside the zip
zip.start_file("SOURMASH-MANIFEST.csv", options).unwrap();
// write manifest version line
writeln!(&mut zip, "# SOURMASH-MANIFEST-VERSION: 1.0").unwrap();
// scoped block for csv writing
{
let mut csv_writer = Writer::from_writer(&mut zip);

for row in &manifest_rows {
if let Err(e) = csv_writer.serialize(row) {
eprintln!("Error writing item: {:?}", e);
}
}
// CSV writer must be manually flushed to ensure all data is written
if let Err(e) = csv_writer.flush() {
eprintln!("Error flushing CSV writer: {:?}", e);
}
} // drop csv writer here
let manifest: Manifest = manifest_rows.clone().into();
manifest.to_writer(&mut zip)?;

// Properly finish writing to the ZIP file
if let Err(e) = zip.finish() {
Expand Down
Loading