Skip to content

Commit

Permalink
feat: stream self encryptor
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi authored and joshuef committed Sep 6, 2023
1 parent d1c5599 commit abdc7c1
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rayon = "1.5.1"
err-derive = "~0.3.1"
num_cpus = "1.13.0"
itertools = "~0.10.0"
tempfile = "3.6.0"
xor_name = "5.0.0"

[dependencies.brotli]
Expand Down
9 changes: 9 additions & 0 deletions src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,12 @@ pub(crate) fn batch_chunks(bytes: Bytes) -> (usize, Vec<EncryptionBatch>) {

(num_chunks, batches)
}

/// Calculate (start_position, end_position) for each chunk for the input file size
pub(crate) fn batch_positions(data_size: usize) -> Vec<(usize, usize)> {
let num_chunks = get_num_chunks(data_size);

(0..num_chunks)
.map(|index| get_start_end_positions(data_size, index))
.collect()
}
3 changes: 2 additions & 1 deletion src/encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ pub(crate) fn encrypt(batches: Vec<EncryptionBatch>) -> (DataMap, Vec<EncryptedC
(DataMap::new(keys), chunks)
}

fn encrypt_chunk(content: Bytes, pki: (Pad, Key, Iv)) -> Result<Bytes> {
/// Encrypt the chunk
pub(crate) fn encrypt_chunk(content: Bytes, pki: (Pad, Key, Iv)) -> Result<Bytes> {
let (pad, key, iv) = pki;
let mut compressed = vec![];
let enc_params = BrotliEncoderParams {
Expand Down
222 changes: 214 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,18 @@ pub use self::{
error::{Error, Result},
};
use bytes::Bytes;
use chunk::batch_positions;
use decrypt::decrypt_chunk;
use encrypt::encrypt_chunk;
use itertools::Itertools;
use std::{
fs::File,
io::{Read, Write},
collections::BTreeMap,
fs::{File, OpenOptions},
io::{Read, Seek, SeekFrom, Write},
ops::Range,
path::Path,
path::{Path, PathBuf},
};
use tempfile::{tempdir, TempDir};
use xor_name::XorName;

// export these because they are used in our public API.
Expand All @@ -139,6 +144,198 @@ pub struct EncryptedChunk {
pub content: Bytes,
}

/// The streaming encryptor to carry out the encryption on fly, chunk by chunk.
#[derive(Clone)]
pub struct StreamSelfEncryptor {
// File path for the encryption target.
file_path: Box<PathBuf>,
// List of `(start_position, end_position)` for each chunk for the target file.
batch_positions: Vec<(usize, usize)>,
// Current step (i.e. chunk_index) for encryption
chunk_index: usize,
// Progressing DataMap
data_map: Vec<ChunkInfo>,
// Progressing collection of source chunks' names
src_hashes: BTreeMap<usize, XorName>,
}

impl StreamSelfEncryptor {
/// For encryption, return with an intialized streaming encryptor
pub fn encrypt_from_file(file_path: Box<PathBuf>) -> Result<Self> {
let file = File::open(&*file_path)?;
let metadata = file.metadata()?;
let file_size = metadata.len();

let batch_positions = batch_positions(file_size as usize);

Ok(StreamSelfEncryptor {
file_path,
batch_positions,
chunk_index: 0,
data_map: Vec::new(),
src_hashes: BTreeMap::new(),
})
}

/// Return the next encrypted chunk, if already reached the end, return with the data_map.
/// Note: only of the two returned options will be `Some`.
pub fn next_encryption(&mut self) -> Result<(Option<EncryptedChunk>, Option<DataMap>)> {
if self.chunk_index >= self.batch_positions.len() {
return Ok((None, Some(DataMap::new(self.data_map.clone()))));
}

let (src_hash, content) = self.read_chunk(self.chunk_index)?;

let pki = self.get_pad_key_and_iv(src_hash)?;
let encrypted_content = encrypt_chunk(content, pki)?;
let dst_hash = XorName::from_content(encrypted_content.as_ref());

let index = self.chunk_index;
self.chunk_index += 1;

let (start_pos, end_pos) = self.batch_positions[index];
self.data_map.push(ChunkInfo {
index,
dst_hash,
src_hash,
src_size: end_pos - start_pos,
});

Ok((
Some(EncryptedChunk {
index,
content: encrypted_content,
}),
None,
))
}

fn read_chunk(&mut self, chunk_index: usize) -> Result<(XorName, Bytes)> {
let (start_pos, end_pos) = self.batch_positions[chunk_index];
let mut buffer = vec![0; end_pos - start_pos];

let mut file = File::open(&*self.file_path)?;

let _ = file.seek(SeekFrom::Start(start_pos as u64))?;
file.read_exact(&mut buffer)?;
let content = Bytes::from(buffer);
let src_hash = XorName::from_content(content.as_ref());

let _ = self.src_hashes.insert(chunk_index, src_hash);

Ok((src_hash, content))
}

fn get_pad_key_and_iv(&mut self, src_hash: XorName) -> Result<(Pad, Key, Iv)> {
let (n_1, n_2) = get_n_1_n_2(self.chunk_index, self.batch_positions.len());

let n_1_src_hash = self.get_src_chunk_name(n_1)?;
let n_2_src_hash = self.get_src_chunk_name(n_2)?;

Ok(get_pki(&src_hash, &n_1_src_hash, &n_2_src_hash))
}

fn get_src_chunk_name(&mut self, index: usize) -> Result<XorName> {
if let Some(name) = self.src_hashes.get(&index) {
Ok(*name)
} else {
let (src_hash, _content) = self.read_chunk(index)?;
Ok(src_hash)
}
}
}

/// The streaming decryptor to carry out the decryption on fly, chunk by chunk.
pub struct StreamSelfDecryptor {
// File path for the decryption output.
file_path: Box<PathBuf>,
// Current step (i.e. chunk_index) for decryption
chunk_index: usize,
// Source hashes of the chunks that collected from the data_map, they shall already be sorted by index.
src_hashes: Vec<XorName>,
// Progressing collection of received encrypted chunks
encrypted_chunks: BTreeMap<usize, XorName>,
// Temp directory to hold the un-processed encrypted_chunks
temp_dir: TempDir,
}

impl StreamSelfDecryptor {
/// For decryption, return with an intialized streaming decryptor
pub fn decrypt_to_file(file_path: Box<PathBuf>, data_map: &DataMap) -> Result<Self> {
let temp_dir = tempdir()?;
let src_hashes = extract_hashes(data_map);
Ok(StreamSelfDecryptor {
file_path,
chunk_index: 0,
src_hashes,
encrypted_chunks: BTreeMap::new(),
temp_dir,
})
}

/// Return true if all encrypted chunk got received and file decrypted.
pub fn next_encrypted(&mut self, encrypted_chunk: EncryptedChunk) -> Result<bool> {
if encrypted_chunk.index == self.chunk_index {
let decrypted_content =
decrypt_chunk(self.chunk_index, encrypted_chunk.content, &self.src_hashes)?;
self.append_to_file(&decrypted_content)?;

self.chunk_index += 1;

self.drain_unprocessed()?;

if self.chunk_index == self.src_hashes.len() {
return Ok(true);
}
} else {
let chunk_name = XorName::from_content(&encrypted_chunk.content);

let file_path = self.temp_dir.path().join(hex::encode(chunk_name));
let mut output_file = File::create(file_path)?;
output_file.write_all(&encrypted_chunk.content)?;

let _ = self
.encrypted_chunks
.insert(encrypted_chunk.index, chunk_name);
}

Ok(false)
}

// If the file does not exist, it will be created. The function then writes the content to the file.
// If the file already exists, the content will be appended to the end of the file.
fn append_to_file(&self, content: &Bytes) -> std::io::Result<()> {
let mut file = OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(&*self.file_path)?;

file.write_all(content)?;

Ok(())
}

// The encrypted chunks may come in out-of-order.
// Drain any in-order chunks due to the recent filled in piece.
fn drain_unprocessed(&mut self) -> Result<()> {
while let Some(chunk_name) = self.encrypted_chunks.get(&self.chunk_index) {
let file_path = self.temp_dir.path().join(&hex::encode(chunk_name));
let mut chunk_file = File::open(file_path)?;
let mut chunk_data = Vec::new();
let _ = chunk_file.read_to_end(&mut chunk_data)?;

let decrypted_content =
decrypt_chunk(self.chunk_index, chunk_data.into(), &self.src_hashes)?;
self.append_to_file(&decrypted_content)?;

self.chunk_index += 1;
}

Ok(())
}
}

/// Read a file from the disk to encrypt, and output the chunks to a given output directory if presents.
pub fn encrypt_from_file(file_path: &Path, output_dir: &Path) -> Result<(DataMap, Vec<XorName>)> {
let mut file = File::open(file_path)?;
Expand Down Expand Up @@ -312,15 +509,24 @@ fn extract_hashes(data_map: &DataMap) -> Vec<XorName> {
}

fn get_pad_key_and_iv(chunk_index: usize, chunk_hashes: &[XorName]) -> (Pad, Key, Iv) {
let (n_1, n_2) = match chunk_index {
0 => (chunk_hashes.len() - 1, chunk_hashes.len() - 2),
1 => (0, chunk_hashes.len() - 1),
n => (n - 1, n - 2),
};
let (n_1, n_2) = get_n_1_n_2(chunk_index, chunk_hashes.len());

let src_hash = &chunk_hashes[chunk_index];
let n_1_src_hash = &chunk_hashes[n_1];
let n_2_src_hash = &chunk_hashes[n_2];

get_pki(src_hash, n_1_src_hash, n_2_src_hash)
}

fn get_n_1_n_2(chunk_index: usize, total_num_chunks: usize) -> (usize, usize) {
match chunk_index {
0 => (total_num_chunks - 1, total_num_chunks - 2),
1 => (0, total_num_chunks - 1),
n => (n - 1, n - 2),
}
}

fn get_pki(src_hash: &XorName, n_1_src_hash: &XorName, n_2_src_hash: &XorName) -> (Pad, Key, Iv) {
let mut pad = [0u8; PAD_SIZE];
let mut key = [0u8; KEY_SIZE];
let mut iv = [0u8; IV_SIZE];
Expand Down
57 changes: 56 additions & 1 deletion src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,65 @@

use crate::{
decrypt_full_set, decrypt_range, encrypt, get_chunk_size, get_num_chunks, overlapped_chunks,
seek_info, test_helpers::random_bytes, DataMap, EncryptedChunk, Error, MIN_ENCRYPTABLE_BYTES,
seek_info, test_helpers::random_bytes, DataMap, EncryptedChunk, Error, StreamSelfDecryptor,
StreamSelfEncryptor, MIN_ENCRYPTABLE_BYTES,
};
use bytes::Bytes;
use itertools::Itertools;
use rand::prelude::SliceRandom;
use std::{
fs::File,
io::{Read, Write},
};
use tempfile::tempdir;

#[test]
fn test_stream_self_encryptor() -> Result<(), Error> {
// Create a 10MB temporary file
let dir = tempdir()?;
let file_path = dir.path().join("tempfile");
let mut file = File::create(&file_path)?;
let file_size = 10 * 1024 * 1024; // 10MB
let data = random_bytes(file_size);
file.write_all(&data)?;

// Encrypt the file using StreamSelfEncryptor
let mut encryptor = StreamSelfEncryptor::encrypt_from_file(Box::new(file_path))?;
let mut encrypted_chunks = Vec::new();
let mut data_map = None;
while let Ok((chunk, map)) = encryptor.next_encryption() {
if let Some(c) = chunk {
encrypted_chunks.push(c);
}
if let Some(m) = map {
// Returning a data_map means file encryption is completed.
data_map = Some(m);
break;
}
}

// Shuffle the encrypted chunks
let mut rng = rand::thread_rng();
encrypted_chunks.shuffle(&mut rng);

// Decrypt the shuffled chunks using StreamSelfDecryptor
let decrypted_file_path = dir.path().join("decrypted");
let mut decryptor = StreamSelfDecryptor::decrypt_to_file(
Box::new(decrypted_file_path.clone()),
&data_map.unwrap(),
)?;
for chunk in encrypted_chunks {
let _ = decryptor.next_encrypted(chunk)?;
}

// Read the decrypted file and verify that its content matches the original data
let mut decrypted_file = File::open(decrypted_file_path)?;
let mut decrypted_data = Vec::new();
let _ = decrypted_file.read_to_end(&mut decrypted_data)?;
assert_eq!(data, decrypted_data);

Ok(())
}

#[test]
fn write_and_read() -> Result<(), Error> {
Expand Down

0 comments on commit abdc7c1

Please sign in to comment.