diff --git a/src/encrypt.rs b/src/encrypt.rs index 46010fb4c..dc69b8295 100644 --- a/src/encrypt.rs +++ b/src/encrypt.rs @@ -8,17 +8,18 @@ use crate::{ chunk::{EncryptionBatch, RawChunk}, - data_map::ChunkInfo, + data_map::DataMap, encryption::{self, Iv, Key, Pad}, - error::{Error, Result}, - get_pad_key_and_iv, xor, DataMap, EncryptedChunk, COMPRESSION_QUALITY, + error::Error, + utils::{get_pad_key_and_iv, xor}, + ChunkInfo, EncryptedChunk, Result, COMPRESSION_QUALITY, }; + use brotli::enc::BrotliEncoderParams; use bytes::Bytes; use itertools::Itertools; use rayon::prelude::*; -use std::io::Cursor; -use std::sync::Arc; +use std::{io::Cursor, sync::Arc}; use xor_name::XorName; /// Encrypt the chunks @@ -104,3 +105,91 @@ pub(crate) fn encrypt_chunk(content: Bytes, pki: (Pad, Key, Iv)) -> Result, +) -> Result<(DataMap, Vec)> { + // Create a sorted vector of all hashes + let src_hashes: Vec<_> = chunks.iter().map(|c| c.hash).collect(); + + // First, process chunks 2 onwards in parallel since they only need their previous two hashes + let later_chunks: Vec<_> = chunks.iter().skip(2).collect(); + let (mut keys, mut encrypted_chunks): (Vec, Vec) = later_chunks + .into_par_iter() + .map(|chunk| { + let RawChunk { index, data, hash } = chunk; + let src_size = data.len(); + + let pki = get_pad_key_and_iv(*index, &src_hashes); + let encrypted_content = encrypt_chunk(data.clone(), pki)?; + let dst_hash = XorName::from_content(encrypted_content.as_ref()); + + Ok(( + ChunkInfo { + index: *index, + dst_hash, + src_hash: *hash, + src_size, + }, + EncryptedChunk { + content: encrypted_content, + }, + )) + }) + .collect::>>()? + .into_iter() + .unzip(); + + // Process chunk 1 (needs hash 0 and last hash) + let chunk = &chunks[1]; + let pki = get_pad_key_and_iv(1, &src_hashes); + let encrypted_content = encrypt_chunk(chunk.data.clone(), pki)?; + let dst_hash = XorName::from_content(encrypted_content.as_ref()); + + // Insert at beginning since this is chunk 1 + keys.insert( + 0, + ChunkInfo { + index: 1, + dst_hash, + src_hash: chunk.hash, + src_size: chunk.data.len(), + }, + ); + encrypted_chunks.insert( + 0, + EncryptedChunk { + content: encrypted_content, + }, + ); + + // Process chunk 0 (needs last two hashes) + let chunk = &chunks[0]; + let pki = get_pad_key_and_iv(0, &src_hashes); + let encrypted_content = encrypt_chunk(chunk.data.clone(), pki)?; + let dst_hash = XorName::from_content(encrypted_content.as_ref()); + + // Insert at beginning since this is chunk 0 + keys.insert( + 0, + ChunkInfo { + index: 0, + dst_hash, + src_hash: chunk.hash, + src_size: chunk.data.len(), + }, + ); + encrypted_chunks.insert( + 0, + EncryptedChunk { + content: encrypted_content, + }, + ); + + Ok((DataMap::new(keys), encrypted_chunks)) +} diff --git a/src/lib.rs b/src/lib.rs index e2f45ac0d..9e848a41e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -530,59 +530,33 @@ where )); } - // Read the entire file into memory - let mut data = Vec::with_capacity(file_size); + // Create a buffered reader with a reasonable buffer size let mut reader = BufReader::with_capacity(1024 * 1024, file); - let bytes_read = reader.read_to_end(&mut data)?; - if bytes_read != file_size { - return Err(Error::Generic(format!( - "Failed to read entire file. Expected {} bytes but read {}", - file_size, bytes_read - ))); - } - let bytes = Bytes::from(data); - - // Create chunks with proper indices - let mut all_chunks = Vec::with_capacity(num_chunks); + + // Read all chunks first to get their hashes + let mut chunks = Vec::with_capacity(num_chunks); for chunk_index in 0..num_chunks { let (start, end) = get_start_end_positions(file_size, chunk_index); - let chunk_bytes = bytes.slice(start..end); - let src_hash = XorName::from_content(&chunk_bytes); + let chunk_size = end - start; + let mut chunk_data = vec![0u8; chunk_size]; + reader.read_exact(&mut chunk_data)?; - all_chunks.push(crate::chunk::RawChunk { + let hash = XorName::from_content(&chunk_data); + chunks.push(crate::chunk::RawChunk { index: chunk_index, - data: chunk_bytes, - hash: src_hash, - }); - } - - // Split chunks into batches based on CPU cores for parallel processing - let cpus = num_cpus::get(); - let chunks_per_batch = usize::max(1, (num_chunks as f64 / cpus as f64).ceil() as usize); - let mut batches = Vec::new(); - let mut chunks_iter = all_chunks.into_iter().peekable(); - - while chunks_iter.peek().is_some() { - let batch_chunks: Vec<_> = chunks_iter.by_ref().take(chunks_per_batch).collect(); - batches.push(crate::chunk::EncryptionBatch { - raw_chunks: batch_chunks, + data: Bytes::from(chunk_data), + hash, }); } - - // Encrypt all chunks in parallel - let (data_map, chunks) = encrypt::encrypt(batches); + + // Process chunks in the correct order + let (data_map, encrypted_chunks) = encrypt::encrypt_stream(chunks)?; // Store all encrypted chunks - for chunk in chunks { + for chunk in encrypted_chunks { chunk_store(XorName::from_content(&chunk.content), chunk.content)?; } - - // Create final data map - let data_map = DataMap { - chunk_identifiers: data_map.chunk_identifiers, - child: None, - }; - + // Shrink the data map and store additional chunks if needed let (shrunk_data_map, _) = shrink_data_map(data_map, |hash, content| { chunk_store(hash, content)?; @@ -645,7 +619,10 @@ where .iter() .map(|info| { let content = chunk_cache.get(&info.dst_hash).ok_or_else(|| { - Error::Generic(format!("Chunk not found for hash: {:?}", info.dst_hash)) + Error::Generic(format!( + "Chunk not found for hash: {:?}", + info.dst_hash + )) })?; Ok(EncryptedChunk { content: content.clone(),