From 7f9e4185c3e3f39ce9ccaee1d3cb2e95e1ba79fe Mon Sep 17 00:00:00 2001 From: "N. Tessa Pierce-Ward" Date: Tue, 1 Oct 2024 11:03:50 -0700 Subject: [PATCH] create zips when needed to avoid initializing if no sigs to write --- src/directsketch.rs | 118 ++++++++++++++++++++++---------------------- 1 file changed, 58 insertions(+), 60 deletions(-) diff --git a/src/directsketch.rs b/src/directsketch.rs index 0d0e298..9be54fa 100644 --- a/src/directsketch.rs +++ b/src/directsketch.rs @@ -570,90 +570,88 @@ pub fn zipwriter_handle( let mut zip_manifest = BuildManifest::new(); let mut wrote_sigs = false; let mut file_count = 0; // Count of files in the current batch + let mut zip_writer = None; if let Some(outpath) = output_sigs { let outpath: PathBuf = outpath.into(); - // Create the initial zip file - let mut zip_writer = - match create_or_get_zip_file(&outpath, batch_size, batch_index).await { - Ok(writer) => writer, - Err(e) => { - let _ = error_sender.send(e).await; - return; - } - }; - while let Some(mut multibuildcoll) = recv_sigs.recv().await { - // write all sigs from sigcoll. Note that this method updates each record's internal location - for sigcoll in &mut multibuildcoll.collections { - match sigcoll - .async_write_sigs_to_zip(&mut zip_writer, &mut md5sum_occurrences) - .await - { - Ok(_) => { - file_count += sigcoll.size(); - wrote_sigs = true; - } - Err(e) => { - let error = e.context("Error processing signature"); - if error_sender.send(error).await.is_err() { + if zip_writer.is_none() { + // create zip file if needed + zip_writer = + match create_or_get_zip_file(&outpath, batch_size, batch_index).await { + Ok(writer) => Some(writer), + Err(e) => { + let _ = error_sender.send(e).await; return; } + }; + } + + if let Some(zip_writer) = zip_writer.as_mut() { + // write all sigs from sigcoll. Note that this method updates each record's internal location + for sigcoll in &mut multibuildcoll.collections { + match sigcoll + .async_write_sigs_to_zip(zip_writer, &mut md5sum_occurrences) + .await + { + Ok(_) => { + file_count += sigcoll.size(); + wrote_sigs = true; + } + Err(e) => { + let error = e.context("Error processing signature"); + if error_sender.send(error).await.is_err() { + return; + } + } } + // Add all records from sigcoll manifest + zip_manifest.extend_from_manifest(&sigcoll.manifest); } - // add all records from sigcoll manifest - zip_manifest.extend_from_manifest(&sigcoll.manifest); - file_count += sigcoll.size(); } - // If batch size is non-zero and is reached, close the current ZIP and start a new one + // if batch size is non-zero and is reached, close the current zip if batch_size > 0 && file_count >= batch_size { eprintln!("writing batch {}", batch_index); + if let Some(mut zip_writer) = zip_writer.take() { + if let Err(e) = zip_manifest + .async_write_manifest_to_zip(&mut zip_writer) + .await + { + let _ = error_sender.send(e).await; + } + if let Err(e) = zip_writer.close().await { + let error = anyhow::Error::new(e).context("Failed to close ZIP file"); + let _ = error_sender.send(error).await; + return; + } + } + // Start a new batch + batch_index += 1; + file_count = 0; + zip_manifest.clear(); + zip_writer = None; // reset zip_writer so a new zip will be created when needed + } + } + + if file_count > 0 { + // write the final manifest + if let Some(mut zip_writer) = zip_writer.take() { if let Err(e) = zip_manifest .async_write_manifest_to_zip(&mut zip_writer) .await { let _ = error_sender.send(e).await; } + + // close final zip file if let Err(e) = zip_writer.close().await { let error = anyhow::Error::new(e).context("Failed to close ZIP file"); let _ = error_sender.send(error).await; return; } - // start a new batch - batch_index += 1; - file_count = 0; - zip_manifest.clear(); - zip_writer = - match create_or_get_zip_file(&outpath, batch_size, batch_index).await { - Ok(writer) => writer, - Err(e) => { - let _ = error_sender.send(e).await; - return; - } - }; - } - } - - if file_count > 0 { - // Write the final manifest - if let Err(e) = zip_manifest - .async_write_manifest_to_zip(&mut zip_writer) - .await - { - let _ = error_sender.send(e).await; - } - - // Close the zip file for the final batch - if let Err(e) = zip_writer.close().await { - let error = anyhow::Error::new(e).context("Failed to close ZIP file"); - let _ = error_sender.send(error).await; - return; } - } else { - // to do -- can we avoid this happening? - eprintln!("Empty final batch! Please delete batch: {}", batch_index); } if !wrote_sigs { // If no signatures were written at all @@ -847,7 +845,7 @@ pub async fn gbsketch( batch_index = max_existing_batch_index + 1; eprintln!( - "Found {} existing zip batches. Starting new sig writing at batch {}", + "Found {} existing zip batch(es). Starting new sig writing at batch {}", max_existing_batch_index, batch_index ); filter = true;