Skip to content

Commit

Permalink
MRG: update zip crate to 2.0 (#385)
Browse files Browse the repository at this point in the history
Required changes for zip crate:
- update zip from 0.6 to 2.0 (NOT 2.3.1, as 2.1+ have a bug for large files)
- change `FileOptions` to reflect altered structure in 2.x

Simplifications:
- simplify zip writing to remove Arc and ZipMessage
> previously, we needed to keep track of additional information for all signatures in order to be able to write the manifest. Upon switching to sourmash core's manifest writing (#217), we no longer need to keep track of all signature information and remove this overly complicated implementation. 

* try to upgrade zip. Simplify now that we have manifest writing from sourmash core

* revert sigwriter simplifications

* use delfated instead of stored

* try adding clear_extra_data

* use zip 2.0 to avoid bug with large_files

* clean up

* tell dependabot to ignore zip crate for now
  • Loading branch information
bluegenes authored Jul 11, 2024
1 parent 04903db commit fb378be
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 42 deletions.
2 changes: 2 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ updates:
allow:
- dependency-type: "direct"
open-pull-requests-limit: 10
ignore:
- dependency-name: "zip"
- package-ecosystem: "github-actions"
directory: "/"
schedule:
Expand Down
47 changes: 40 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ log = "0.4.22"
env_logger = { version = "0.11.3", optional = true }
simple-error = "0.3.1"
anyhow = "1.0.86"
zip = { version = "0.6", default-features = false, features = ["deflate"] }
zip = { version = "2.0", default-features = false }
tempfile = "3.10"
needletail = "0.5.1"
csv = "1.3.0"
Expand Down
23 changes: 11 additions & 12 deletions src/manysketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use anyhow::{anyhow, Result};
use rayon::prelude::*;

use crate::utils::{load_fasta_fromfile, sigwriter, Params, ZipMessage};
use crate::utils::{load_fasta_fromfile, sigwriter, Params};
use camino::Utf8Path as Path;
use needletail::parse_fastx_file;
use sourmash::cmd::ComputeParameters;
Expand Down Expand Up @@ -139,9 +139,10 @@ pub fn manysketch(
}

// set up a multi-producer, single-consumer channel that receives Signature
let (send, recv) = std::sync::mpsc::sync_channel::<ZipMessage>(rayon::current_num_threads());
let (send, recv) =
std::sync::mpsc::sync_channel::<Option<Vec<Signature>>>(rayon::current_num_threads());
// need to use Arc so we can write the manifest after all sigs have written
let send = std::sync::Arc::new(send);
// let send = std::sync::Arc::new(send);

// & spawn a thread that is dedicated to printing to a buffered output
let thrd = sigwriter(recv, output);
Expand Down Expand Up @@ -243,7 +244,7 @@ pub fn manysketch(
}
if singleton {
// write sigs immediately to avoid memory issues
if let Err(e) = send.send(ZipMessage::SignatureData(sigs.clone())) {
if let Err(e) = send.send(Some(sigs.clone())) {
eprintln!("Unable to send internal data: {:?}", e);
return None;
}
Expand All @@ -260,21 +261,19 @@ pub fn manysketch(
})
.try_for_each_with(
send.clone(),
|s: &mut std::sync::Arc<std::sync::mpsc::SyncSender<ZipMessage>>, sigs| {
if let Err(e) = s.send(ZipMessage::SignatureData(sigs)) {
|s: &mut std::sync::mpsc::SyncSender<Option<Vec<Signature>>>, sigs| {
if let Err(e) = s.send(Some(sigs)) {
Err(format!("Unable to send internal data: {:?}", e))
} else {
Ok(())
}
},
);

// After the parallel work, send the WriteManifest message
std::sync::Arc::try_unwrap(send)
.unwrap()
.send(ZipMessage::WriteManifest)
.unwrap();

// Send None to sigwriter to signal completion + write manifest
if let Err(e) = send.send(None) {
eprintln!("Unable to send completion signal: {:?}", e);
}
// do some cleanup and error handling -
if let Err(e) = send_result {
eprintln!("Error during parallel processing: {}", e);
Expand Down
41 changes: 19 additions & 22 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::io::{BufRead, BufReader, BufWriter, Write};
use std::panic;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use zip::write::{ExtendedFileOptions, FileOptions, ZipWriter};
use zip::CompressionMethod;

use sourmash::ani_utils::{ani_ci_from_containment, ani_from_containment};
use sourmash::collection::Collection;
Expand Down Expand Up @@ -1277,32 +1279,30 @@ impl Hash for Params {
}
}

pub enum ZipMessage {
SignatureData(Vec<Signature>),
WriteManifest,
}

pub fn sigwriter(
recv: std::sync::mpsc::Receiver<ZipMessage>,
recv: std::sync::mpsc::Receiver<Option<Vec<Signature>>>,
output: String,
) -> std::thread::JoinHandle<Result<()>> {
std::thread::spawn(move || -> Result<()> {
// cast output as pathbuf
// cast output as PathBuf
let outpath: PathBuf = output.into();

let file_writer = open_output_file(&outpath);

let options = zip::write::FileOptions::default()
.compression_method(zip::CompressionMethod::Stored)
let options = FileOptions::default()
.compression_method(CompressionMethod::Stored)
.unix_permissions(0o644)
.large_file(true);
let mut zip = zip::ZipWriter::new(file_writer);

let mut zip = ZipWriter::new(file_writer);
let mut manifest_rows: Vec<Record> = Vec::new();
// keep track of md5sum occurrences to prevent overwriting duplicates
// keep track of MD5 sum occurrences to prevent overwriting duplicates
let mut md5sum_occurrences: HashMap<String, usize> = HashMap::new();

// Process all incoming signatures
while let Ok(message) = recv.recv() {
match message {
ZipMessage::SignatureData(sigs) => {
Some(sigs) => {
for sig in sigs.iter() {
let md5sum_str = sig.md5sum();
let count = md5sum_occurrences.entry(md5sum_str.clone()).or_insert(0);
Expand All @@ -1312,22 +1312,19 @@ pub fn sigwriter(
} else {
format!("signatures/{}.sig.gz", md5sum_str)
};
write_signature(sig, &mut zip, options, &sig_filename);
write_signature(sig, &mut zip, options.clone(), &sig_filename);
let records: Vec<Record> = Record::from_sig(sig, sig_filename.as_str());
manifest_rows.extend(records);
}
}
ZipMessage::WriteManifest => {
None => {
// Write the manifest and finish the ZIP file
println!("Writing manifest");
// Start the CSV file inside the zip
zip.start_file("SOURMASH-MANIFEST.csv", options).unwrap();
zip.start_file("SOURMASH-MANIFEST.csv", options)?;
let manifest: Manifest = manifest_rows.clone().into();
manifest.to_writer(&mut zip)?;

// Properly finish writing to the ZIP file
if let Err(e) = zip.finish() {
eprintln!("Error finalizing ZIP file: {:?}", e);
}
zip.finish()?;
break;
}
}
}
Expand Down Expand Up @@ -1357,7 +1354,7 @@ pub fn csvwriter_thread<T: Serialize + Send + 'static>(
pub fn write_signature(
sig: &Signature,
zip: &mut zip::ZipWriter<BufWriter<File>>,
zip_options: zip::write::FileOptions,
zip_options: zip::write::FileOptions<ExtendedFileOptions>,
sig_filename: &str,
) {
let wrapped_sig = vec![sig];
Expand Down

0 comments on commit fb378be

Please sign in to comment.