diff --git a/Cargo.toml b/Cargo.toml index 199e1d50e..2406bb10b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ rayon = "1.5.1" err-derive = "~0.3.1" num_cpus = "1.13.0" itertools = "~0.10.0" +tempfile = "3.6.0" xor_name = "5.0.0" [dependencies.brotli] diff --git a/src/chunk.rs b/src/chunk.rs index 9d0ede4df..603353438 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -62,3 +62,12 @@ pub(crate) fn batch_chunks(bytes: Bytes) -> (usize, Vec) { (num_chunks, batches) } + +/// Calculate (start_position, end_position) for each chunk for the input file size +pub(crate) fn batch_positions(data_size: usize) -> Vec<(usize, usize)> { + let num_chunks = get_num_chunks(data_size); + + (0..num_chunks) + .map(|index| get_start_end_positions(data_size, index)) + .collect() +} diff --git a/src/encrypt.rs b/src/encrypt.rs index 4c10b26c9..82112586d 100644 --- a/src/encrypt.rs +++ b/src/encrypt.rs @@ -87,7 +87,8 @@ pub(crate) fn encrypt(batches: Vec) -> (DataMap, Vec Result { +/// Encrypt the chunk +pub(crate) fn encrypt_chunk(content: Bytes, pki: (Pad, Key, Iv)) -> Result { let (pad, key, iv) = pki; let mut compressed = vec![]; let enc_params = BrotliEncoderParams { diff --git a/src/lib.rs b/src/lib.rs index 5165943e0..0cefdf727 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -106,13 +106,18 @@ pub use self::{ error::{Error, Result}, }; use bytes::Bytes; +use chunk::batch_positions; +use decrypt::decrypt_chunk; +use encrypt::encrypt_chunk; use itertools::Itertools; use std::{ - fs::File, - io::{Read, Write}, + collections::BTreeMap, + fs::{File, OpenOptions}, + io::{Read, Seek, SeekFrom, Write}, ops::Range, - path::Path, + path::{Path, PathBuf}, }; +use tempfile::{tempdir, TempDir}; use xor_name::XorName; // export these because they are used in our public API. @@ -139,6 +144,198 @@ pub struct EncryptedChunk { pub content: Bytes, } +/// The streaming encryptor to carry out the encryption on fly, chunk by chunk. +#[derive(Clone)] +pub struct StreamSelfEncryptor { + // File path for the encryption target. + file_path: Box, + // List of `(start_position, end_position)` for each chunk for the target file. + batch_positions: Vec<(usize, usize)>, + // Current step (i.e. chunk_index) for encryption + chunk_index: usize, + // Progressing DataMap + data_map: Vec, + // Progressing collection of source chunks' names + src_hashes: BTreeMap, +} + +impl StreamSelfEncryptor { + /// For encryption, return with an intialized streaming encryptor + pub fn encrypt_from_file(file_path: Box) -> Result { + let file = File::open(&*file_path)?; + let metadata = file.metadata()?; + let file_size = metadata.len(); + + let batch_positions = batch_positions(file_size as usize); + + Ok(StreamSelfEncryptor { + file_path, + batch_positions, + chunk_index: 0, + data_map: Vec::new(), + src_hashes: BTreeMap::new(), + }) + } + + /// Return the next encrypted chunk, if already reached the end, return with the data_map. + /// Note: only of the two returned options will be `Some`. + pub fn next_encryption(&mut self) -> Result<(Option, Option)> { + if self.chunk_index >= self.batch_positions.len() { + return Ok((None, Some(DataMap::new(self.data_map.clone())))); + } + + let (src_hash, content) = self.read_chunk(self.chunk_index)?; + + let pki = self.get_pad_key_and_iv(src_hash)?; + let encrypted_content = encrypt_chunk(content, pki)?; + let dst_hash = XorName::from_content(encrypted_content.as_ref()); + + let index = self.chunk_index; + self.chunk_index += 1; + + let (start_pos, end_pos) = self.batch_positions[index]; + self.data_map.push(ChunkInfo { + index, + dst_hash, + src_hash, + src_size: end_pos - start_pos, + }); + + Ok(( + Some(EncryptedChunk { + index, + content: encrypted_content, + }), + None, + )) + } + + fn read_chunk(&mut self, chunk_index: usize) -> Result<(XorName, Bytes)> { + let (start_pos, end_pos) = self.batch_positions[chunk_index]; + let mut buffer = vec![0; end_pos - start_pos]; + + let mut file = File::open(&*self.file_path)?; + + let _ = file.seek(SeekFrom::Start(start_pos as u64))?; + file.read_exact(&mut buffer)?; + let content = Bytes::from(buffer); + let src_hash = XorName::from_content(content.as_ref()); + + let _ = self.src_hashes.insert(chunk_index, src_hash); + + Ok((src_hash, content)) + } + + fn get_pad_key_and_iv(&mut self, src_hash: XorName) -> Result<(Pad, Key, Iv)> { + let (n_1, n_2) = get_n_1_n_2(self.chunk_index, self.batch_positions.len()); + + let n_1_src_hash = self.get_src_chunk_name(n_1)?; + let n_2_src_hash = self.get_src_chunk_name(n_2)?; + + Ok(get_pki(&src_hash, &n_1_src_hash, &n_2_src_hash)) + } + + fn get_src_chunk_name(&mut self, index: usize) -> Result { + if let Some(name) = self.src_hashes.get(&index) { + Ok(*name) + } else { + let (src_hash, _content) = self.read_chunk(index)?; + Ok(src_hash) + } + } +} + +/// The streaming decryptor to carry out the decryption on fly, chunk by chunk. +pub struct StreamSelfDecryptor { + // File path for the decryption output. + file_path: Box, + // Current step (i.e. chunk_index) for decryption + chunk_index: usize, + // Source hashes of the chunks that collected from the data_map, they shall already be sorted by index. + src_hashes: Vec, + // Progressing collection of received encrypted chunks + encrypted_chunks: BTreeMap, + // Temp directory to hold the un-processed encrypted_chunks + temp_dir: TempDir, +} + +impl StreamSelfDecryptor { + /// For decryption, return with an intialized streaming decryptor + pub fn decrypt_to_file(file_path: Box, data_map: &DataMap) -> Result { + let temp_dir = tempdir()?; + let src_hashes = extract_hashes(data_map); + Ok(StreamSelfDecryptor { + file_path, + chunk_index: 0, + src_hashes, + encrypted_chunks: BTreeMap::new(), + temp_dir, + }) + } + + /// Return true if all encrypted chunk got received and file decrypted. + pub fn next_encrypted(&mut self, encrypted_chunk: EncryptedChunk) -> Result { + if encrypted_chunk.index == self.chunk_index { + let decrypted_content = + decrypt_chunk(self.chunk_index, encrypted_chunk.content, &self.src_hashes)?; + self.append_to_file(&decrypted_content)?; + + self.chunk_index += 1; + + self.drain_unprocessed()?; + + if self.chunk_index == self.src_hashes.len() { + return Ok(true); + } + } else { + let chunk_name = XorName::from_content(&encrypted_chunk.content); + + let file_path = self.temp_dir.path().join(hex::encode(chunk_name)); + let mut output_file = File::create(file_path)?; + output_file.write_all(&encrypted_chunk.content)?; + + let _ = self + .encrypted_chunks + .insert(encrypted_chunk.index, chunk_name); + } + + Ok(false) + } + + // If the file does not exist, it will be created. The function then writes the content to the file. + // If the file already exists, the content will be appended to the end of the file. + fn append_to_file(&self, content: &Bytes) -> std::io::Result<()> { + let mut file = OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(&*self.file_path)?; + + file.write_all(content)?; + + Ok(()) + } + + // The encrypted chunks may come in out-of-order. + // Drain any in-order chunks due to the recent filled in piece. + fn drain_unprocessed(&mut self) -> Result<()> { + while let Some(chunk_name) = self.encrypted_chunks.get(&self.chunk_index) { + let file_path = self.temp_dir.path().join(&hex::encode(chunk_name)); + let mut chunk_file = File::open(file_path)?; + let mut chunk_data = Vec::new(); + let _ = chunk_file.read_to_end(&mut chunk_data)?; + + let decrypted_content = + decrypt_chunk(self.chunk_index, chunk_data.into(), &self.src_hashes)?; + self.append_to_file(&decrypted_content)?; + + self.chunk_index += 1; + } + + Ok(()) + } +} + /// Read a file from the disk to encrypt, and output the chunks to a given output directory if presents. pub fn encrypt_from_file(file_path: &Path, output_dir: &Path) -> Result<(DataMap, Vec)> { let mut file = File::open(file_path)?; @@ -312,15 +509,24 @@ fn extract_hashes(data_map: &DataMap) -> Vec { } fn get_pad_key_and_iv(chunk_index: usize, chunk_hashes: &[XorName]) -> (Pad, Key, Iv) { - let (n_1, n_2) = match chunk_index { - 0 => (chunk_hashes.len() - 1, chunk_hashes.len() - 2), - 1 => (0, chunk_hashes.len() - 1), - n => (n - 1, n - 2), - }; + let (n_1, n_2) = get_n_1_n_2(chunk_index, chunk_hashes.len()); + let src_hash = &chunk_hashes[chunk_index]; let n_1_src_hash = &chunk_hashes[n_1]; let n_2_src_hash = &chunk_hashes[n_2]; + get_pki(src_hash, n_1_src_hash, n_2_src_hash) +} + +fn get_n_1_n_2(chunk_index: usize, total_num_chunks: usize) -> (usize, usize) { + match chunk_index { + 0 => (total_num_chunks - 1, total_num_chunks - 2), + 1 => (0, total_num_chunks - 1), + n => (n - 1, n - 2), + } +} + +fn get_pki(src_hash: &XorName, n_1_src_hash: &XorName, n_2_src_hash: &XorName) -> (Pad, Key, Iv) { let mut pad = [0u8; PAD_SIZE]; let mut key = [0u8; KEY_SIZE]; let mut iv = [0u8; IV_SIZE]; diff --git a/src/tests.rs b/src/tests.rs index 1f19478ce..de03d4624 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -8,10 +8,65 @@ use crate::{ decrypt_full_set, decrypt_range, encrypt, get_chunk_size, get_num_chunks, overlapped_chunks, - seek_info, test_helpers::random_bytes, DataMap, EncryptedChunk, Error, MIN_ENCRYPTABLE_BYTES, + seek_info, test_helpers::random_bytes, DataMap, EncryptedChunk, Error, StreamSelfDecryptor, + StreamSelfEncryptor, MIN_ENCRYPTABLE_BYTES, }; use bytes::Bytes; use itertools::Itertools; +use rand::prelude::SliceRandom; +use std::{ + fs::File, + io::{Read, Write}, +}; +use tempfile::tempdir; + +#[test] +fn test_stream_self_encryptor() -> Result<(), Error> { + // Create a 10MB temporary file + let dir = tempdir()?; + let file_path = dir.path().join("tempfile"); + let mut file = File::create(&file_path)?; + let file_size = 10 * 1024 * 1024; // 10MB + let data = random_bytes(file_size); + file.write_all(&data)?; + + // Encrypt the file using StreamSelfEncryptor + let mut encryptor = StreamSelfEncryptor::encrypt_from_file(Box::new(file_path))?; + let mut encrypted_chunks = Vec::new(); + let mut data_map = None; + while let Ok((chunk, map)) = encryptor.next_encryption() { + if let Some(c) = chunk { + encrypted_chunks.push(c); + } + if let Some(m) = map { + // Returning a data_map means file encryption is completed. + data_map = Some(m); + break; + } + } + + // Shuffle the encrypted chunks + let mut rng = rand::thread_rng(); + encrypted_chunks.shuffle(&mut rng); + + // Decrypt the shuffled chunks using StreamSelfDecryptor + let decrypted_file_path = dir.path().join("decrypted"); + let mut decryptor = StreamSelfDecryptor::decrypt_to_file( + Box::new(decrypted_file_path.clone()), + &data_map.unwrap(), + )?; + for chunk in encrypted_chunks { + let _ = decryptor.next_encrypted(chunk)?; + } + + // Read the decrypted file and verify that its content matches the original data + let mut decrypted_file = File::open(decrypted_file_path)?; + let mut decrypted_data = Vec::new(); + let _ = decrypted_file.read_to_end(&mut decrypted_data)?; + assert_eq!(data, decrypted_data); + + Ok(()) +} #[test] fn write_and_read() -> Result<(), Error> {