Skip to content

Commit

Permalink
create zips when needed to avoid initializing if no sigs to write
Browse files Browse the repository at this point in the history
  • Loading branch information
bluegenes committed Oct 1, 2024
1 parent 367b38d commit 7f9e418
Showing 1 changed file with 58 additions and 60 deletions.
118 changes: 58 additions & 60 deletions src/directsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,90 +570,88 @@ pub fn zipwriter_handle(
let mut zip_manifest = BuildManifest::new();
let mut wrote_sigs = false;
let mut file_count = 0; // Count of files in the current batch
let mut zip_writer = None;

if let Some(outpath) = output_sigs {
let outpath: PathBuf = outpath.into();

// Create the initial zip file
let mut zip_writer =
match create_or_get_zip_file(&outpath, batch_size, batch_index).await {
Ok(writer) => writer,
Err(e) => {
let _ = error_sender.send(e).await;
return;
}
};

while let Some(mut multibuildcoll) = recv_sigs.recv().await {
// write all sigs from sigcoll. Note that this method updates each record's internal location
for sigcoll in &mut multibuildcoll.collections {
match sigcoll
.async_write_sigs_to_zip(&mut zip_writer, &mut md5sum_occurrences)
.await
{
Ok(_) => {
file_count += sigcoll.size();
wrote_sigs = true;
}
Err(e) => {
let error = e.context("Error processing signature");
if error_sender.send(error).await.is_err() {
if zip_writer.is_none() {
// create zip file if needed
zip_writer =
match create_or_get_zip_file(&outpath, batch_size, batch_index).await {
Ok(writer) => Some(writer),
Err(e) => {
let _ = error_sender.send(e).await;
return;
}
};
}

if let Some(zip_writer) = zip_writer.as_mut() {
// write all sigs from sigcoll. Note that this method updates each record's internal location
for sigcoll in &mut multibuildcoll.collections {
match sigcoll
.async_write_sigs_to_zip(zip_writer, &mut md5sum_occurrences)
.await
{
Ok(_) => {
file_count += sigcoll.size();
wrote_sigs = true;
}
Err(e) => {
let error = e.context("Error processing signature");
if error_sender.send(error).await.is_err() {
return;
}
}
}
// Add all records from sigcoll manifest
zip_manifest.extend_from_manifest(&sigcoll.manifest);
}
// add all records from sigcoll manifest
zip_manifest.extend_from_manifest(&sigcoll.manifest);
file_count += sigcoll.size();
}

// If batch size is non-zero and is reached, close the current ZIP and start a new one
// if batch size is non-zero and is reached, close the current zip
if batch_size > 0 && file_count >= batch_size {
eprintln!("writing batch {}", batch_index);
if let Some(mut zip_writer) = zip_writer.take() {
if let Err(e) = zip_manifest
.async_write_manifest_to_zip(&mut zip_writer)
.await
{
let _ = error_sender.send(e).await;
}
if let Err(e) = zip_writer.close().await {
let error = anyhow::Error::new(e).context("Failed to close ZIP file");
let _ = error_sender.send(error).await;
return;
}
}
// Start a new batch
batch_index += 1;
file_count = 0;
zip_manifest.clear();
zip_writer = None; // reset zip_writer so a new zip will be created when needed
}
}

if file_count > 0 {
// write the final manifest
if let Some(mut zip_writer) = zip_writer.take() {
if let Err(e) = zip_manifest
.async_write_manifest_to_zip(&mut zip_writer)
.await
{
let _ = error_sender.send(e).await;
}

// close final zip file
if let Err(e) = zip_writer.close().await {
let error = anyhow::Error::new(e).context("Failed to close ZIP file");
let _ = error_sender.send(error).await;
return;
}
// start a new batch
batch_index += 1;
file_count = 0;
zip_manifest.clear();
zip_writer =
match create_or_get_zip_file(&outpath, batch_size, batch_index).await {
Ok(writer) => writer,
Err(e) => {
let _ = error_sender.send(e).await;
return;
}
};
}
}

if file_count > 0 {
// Write the final manifest
if let Err(e) = zip_manifest
.async_write_manifest_to_zip(&mut zip_writer)
.await
{
let _ = error_sender.send(e).await;
}

// Close the zip file for the final batch
if let Err(e) = zip_writer.close().await {
let error = anyhow::Error::new(e).context("Failed to close ZIP file");
let _ = error_sender.send(error).await;
return;
}
} else {
// to do -- can we avoid this happening?
eprintln!("Empty final batch! Please delete batch: {}", batch_index);
}
if !wrote_sigs {
// If no signatures were written at all
Expand Down Expand Up @@ -847,7 +845,7 @@ pub async fn gbsketch(

batch_index = max_existing_batch_index + 1;
eprintln!(
"Found {} existing zip batches. Starting new sig writing at batch {}",
"Found {} existing zip batch(es). Starting new sig writing at batch {}",
max_existing_batch_index, batch_index
);
filter = true;
Expand Down

0 comments on commit 7f9e418

Please sign in to comment.