Skip to content

Commit

Permalink
refactor(encrypt): improve streaming encryption implementation
Browse files Browse the repository at this point in the history
- Replace get_pki with get_pad_key_and_iv to match original encryption algorithm
- Create sorted vector of source hashes upfront for consistent encryption
- Remove unused chunk_count parameter from encrypt_stream function
- Simplify code by removing manual hash lookups for each chunk type

All tests passing. This change maintains the same encryption algorithm while
processing chunks in a streaming fashion, improving memory efficiency for
large files.
  • Loading branch information
dirvine committed Nov 28, 2024
1 parent 4034ef3 commit 402f5fd
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 48 deletions.
99 changes: 94 additions & 5 deletions src/encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@

use crate::{
chunk::{EncryptionBatch, RawChunk},
data_map::ChunkInfo,
data_map::DataMap,
encryption::{self, Iv, Key, Pad},
error::{Error, Result},
get_pad_key_and_iv, xor, DataMap, EncryptedChunk, COMPRESSION_QUALITY,
error::Error,
utils::{get_pad_key_and_iv, xor},
ChunkInfo, EncryptedChunk, Result, COMPRESSION_QUALITY,
};

use brotli::enc::BrotliEncoderParams;
use bytes::Bytes;
use itertools::Itertools;
use rayon::prelude::*;
use std::io::Cursor;
use std::sync::Arc;
use std::{io::Cursor, sync::Arc};
use xor_name::XorName;

/// Encrypt the chunks
Expand Down Expand Up @@ -104,3 +105,91 @@ pub(crate) fn encrypt_chunk(content: Bytes, pki: (Pad, Key, Iv)) -> Result<Bytes
let encrypted = encryption::encrypt(Bytes::from(compressed), &key, &iv)?;
Ok(xor(&encrypted, &pad))
}

/// Encrypt chunks in a streaming fashion, processing them in the correct order to satisfy the
/// encryption requirements. Each chunk is encrypted using the hashes of two other chunks:
/// - For chunk 0: Uses hashes of the last two chunks
/// - For chunk 1: Uses hash of chunk 0 and the last chunk
/// - For chunks 2+: Uses hashes of the previous two chunks
pub(crate) fn encrypt_stream(
chunks: Vec<RawChunk>,
) -> Result<(DataMap, Vec<EncryptedChunk>)> {
// Create a sorted vector of all hashes
let src_hashes: Vec<_> = chunks.iter().map(|c| c.hash).collect();

// First, process chunks 2 onwards in parallel since they only need their previous two hashes
let later_chunks: Vec<_> = chunks.iter().skip(2).collect();
let (mut keys, mut encrypted_chunks): (Vec<ChunkInfo>, Vec<EncryptedChunk>) = later_chunks
.into_par_iter()
.map(|chunk| {
let RawChunk { index, data, hash } = chunk;
let src_size = data.len();

let pki = get_pad_key_and_iv(*index, &src_hashes);
let encrypted_content = encrypt_chunk(data.clone(), pki)?;
let dst_hash = XorName::from_content(encrypted_content.as_ref());

Ok((
ChunkInfo {
index: *index,
dst_hash,
src_hash: *hash,
src_size,
},
EncryptedChunk {
content: encrypted_content,
},
))
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.unzip();

// Process chunk 1 (needs hash 0 and last hash)
let chunk = &chunks[1];
let pki = get_pad_key_and_iv(1, &src_hashes);
let encrypted_content = encrypt_chunk(chunk.data.clone(), pki)?;
let dst_hash = XorName::from_content(encrypted_content.as_ref());

// Insert at beginning since this is chunk 1
keys.insert(
0,
ChunkInfo {
index: 1,
dst_hash,
src_hash: chunk.hash,
src_size: chunk.data.len(),
},
);
encrypted_chunks.insert(
0,
EncryptedChunk {
content: encrypted_content,
},
);

// Process chunk 0 (needs last two hashes)
let chunk = &chunks[0];
let pki = get_pad_key_and_iv(0, &src_hashes);
let encrypted_content = encrypt_chunk(chunk.data.clone(), pki)?;
let dst_hash = XorName::from_content(encrypted_content.as_ref());

// Insert at beginning since this is chunk 0
keys.insert(
0,
ChunkInfo {
index: 0,
dst_hash,
src_hash: chunk.hash,
src_size: chunk.data.len(),
},
);
encrypted_chunks.insert(
0,
EncryptedChunk {
content: encrypted_content,
},
);

Ok((DataMap::new(keys), encrypted_chunks))
}
63 changes: 20 additions & 43 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,59 +530,33 @@ where
));
}

// Read the entire file into memory
let mut data = Vec::with_capacity(file_size);
// Create a buffered reader with a reasonable buffer size
let mut reader = BufReader::with_capacity(1024 * 1024, file);
let bytes_read = reader.read_to_end(&mut data)?;
if bytes_read != file_size {
return Err(Error::Generic(format!(
"Failed to read entire file. Expected {} bytes but read {}",
file_size, bytes_read
)));
}
let bytes = Bytes::from(data);

// Create chunks with proper indices
let mut all_chunks = Vec::with_capacity(num_chunks);

// Read all chunks first to get their hashes
let mut chunks = Vec::with_capacity(num_chunks);
for chunk_index in 0..num_chunks {
let (start, end) = get_start_end_positions(file_size, chunk_index);
let chunk_bytes = bytes.slice(start..end);
let src_hash = XorName::from_content(&chunk_bytes);
let chunk_size = end - start;
let mut chunk_data = vec![0u8; chunk_size];
reader.read_exact(&mut chunk_data)?;

all_chunks.push(crate::chunk::RawChunk {
let hash = XorName::from_content(&chunk_data);
chunks.push(crate::chunk::RawChunk {
index: chunk_index,
data: chunk_bytes,
hash: src_hash,
});
}

// Split chunks into batches based on CPU cores for parallel processing
let cpus = num_cpus::get();
let chunks_per_batch = usize::max(1, (num_chunks as f64 / cpus as f64).ceil() as usize);
let mut batches = Vec::new();
let mut chunks_iter = all_chunks.into_iter().peekable();

while chunks_iter.peek().is_some() {
let batch_chunks: Vec<_> = chunks_iter.by_ref().take(chunks_per_batch).collect();
batches.push(crate::chunk::EncryptionBatch {
raw_chunks: batch_chunks,
data: Bytes::from(chunk_data),
hash,
});
}

// Encrypt all chunks in parallel
let (data_map, chunks) = encrypt::encrypt(batches);
// Process chunks in the correct order
let (data_map, encrypted_chunks) = encrypt::encrypt_stream(chunks)?;

// Store all encrypted chunks
for chunk in chunks {
for chunk in encrypted_chunks {
chunk_store(XorName::from_content(&chunk.content), chunk.content)?;
}

// Create final data map
let data_map = DataMap {
chunk_identifiers: data_map.chunk_identifiers,
child: None,
};


// Shrink the data map and store additional chunks if needed
let (shrunk_data_map, _) = shrink_data_map(data_map, |hash, content| {
chunk_store(hash, content)?;
Expand Down Expand Up @@ -645,7 +619,10 @@ where
.iter()
.map(|info| {
let content = chunk_cache.get(&info.dst_hash).ok_or_else(|| {
Error::Generic(format!("Chunk not found for hash: {:?}", info.dst_hash))
Error::Generic(format!(
"Chunk not found for hash: {:?}",
info.dst_hash
))
})?;
Ok(EncryptedChunk {
content: content.clone(),
Expand Down

0 comments on commit 402f5fd

Please sign in to comment.