diff --git a/Cargo.lock b/Cargo.lock index cdcadef8..a2290921 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,6 +85,7 @@ dependencies = [ "lazy_static", "nom", "num-traits", + "num_cpus", "pin-project", "pinentry", "quickcheck", diff --git a/age/Cargo.toml b/age/Cargo.toml index b6d0add3..f53ec702 100644 --- a/age/Cargo.toml +++ b/age/Cargo.toml @@ -75,6 +75,9 @@ i18n-embed-fl = "0.2" lazy_static = "1" rust-embed = "5" +# Performance +num_cpus = "1.0" + # Common CLI dependencies console = { version = "0.13", optional = true } pinentry = { version = "0.2", optional = true } diff --git a/age/src/primitives/stream.rs b/age/src/primitives/stream.rs index 1cbc74d1..2db52e32 100644 --- a/age/src/primitives/stream.rs +++ b/age/src/primitives/stream.rs @@ -4,6 +4,7 @@ use chacha20poly1305::{ aead::{generic_array::GenericArray, AeadInPlace, NewAead}, ChaChaPoly1305, }; +use lazy_static::lazy_static; use pin_project::pin_project; use secrecy::{ExposeSecret, SecretVec}; use std::cmp; @@ -24,6 +25,11 @@ const CHUNK_SIZE: usize = 64 * 1024; const TAG_SIZE: usize = 16; const ENCRYPTED_CHUNK_SIZE: usize = CHUNK_SIZE + TAG_SIZE; +lazy_static! { + static ref CHUNKS_SIZE: usize = num_cpus::get() * CHUNK_SIZE; + static ref ENCRYPTED_CHUNKS_SIZE: usize = num_cpus::get() * ENCRYPTED_CHUNK_SIZE; +} + pub(crate) struct PayloadKey( pub(crate) GenericArray as NewAead>::KeySize>, ); @@ -112,7 +118,7 @@ impl Stream { StreamWriter { stream: Self::new(key), inner, - chunks: Vec::with_capacity(CHUNK_SIZE), + chunks: Vec::with_capacity(*CHUNKS_SIZE), #[cfg(feature = "async")] encrypted_chunks: None, } @@ -130,7 +136,7 @@ impl Stream { StreamWriter { stream: Self::new(key), inner, - chunks: Vec::with_capacity(CHUNK_SIZE), + chunks: Vec::with_capacity(*CHUNKS_SIZE), encrypted_chunks: None, } } @@ -146,7 +152,7 @@ impl Stream { StreamReader { stream: Self::new(key), inner, - encrypted_chunks: vec![0; ENCRYPTED_CHUNK_SIZE], + encrypted_chunks: vec![0; *ENCRYPTED_CHUNKS_SIZE], encrypted_pos: 0, start: StartPos::Implicit(0), cur_plaintext_pos: 0, @@ -166,7 +172,7 @@ impl Stream { StreamReader { stream: Self::new(key), inner, - encrypted_chunks: vec![0; ENCRYPTED_CHUNK_SIZE], + encrypted_chunks: vec![0; *ENCRYPTED_CHUNKS_SIZE], encrypted_pos: 0, start: StartPos::Implicit(0), cur_plaintext_pos: 0, @@ -282,13 +288,13 @@ impl Write for StreamWriter { let mut bytes_written = 0; while !buf.is_empty() { - let to_write = cmp::min(CHUNK_SIZE - self.chunks.len(), buf.len()); + let to_write = cmp::min(*CHUNKS_SIZE - self.chunks.len(), buf.len()); self.chunks.extend_from_slice(&buf[..to_write]); bytes_written += to_write; buf = &buf[to_write..]; - // At this point, either buf is empty, or we have a full chunk. - assert!(buf.is_empty() || self.chunks.len() == CHUNK_SIZE); + // At this point, either buf is empty, or we have a full set of chunks. + assert!(buf.is_empty() || self.chunks.len() == *CHUNKS_SIZE); // Only encrypt the chunk if we have more data to write, as the last // chunk must be written in finish(). @@ -340,7 +346,7 @@ impl AsyncWrite for StreamWriter { ) -> Poll> { ready!(self.as_mut().poll_flush_chunk(cx))?; - let to_write = cmp::min(CHUNK_SIZE - self.chunks.len(), buf.len()); + let to_write = cmp::min(*CHUNKS_SIZE - self.chunks.len(), buf.len()); self.as_mut() .project() @@ -348,8 +354,8 @@ impl AsyncWrite for StreamWriter { .extend_from_slice(&buf[..to_write]); buf = &buf[to_write..]; - // At this point, either buf is empty, or we have a full chunk. - assert!(buf.is_empty() || self.chunks.len() == CHUNK_SIZE); + // At this point, either buf is empty, or we have a full set of chunks. + assert!(buf.is_empty() || self.chunks.len() == *CHUNKS_SIZE); // Only encrypt the chunk if we have more data to write, as the last // chunk must be written in poll_close(). @@ -442,7 +448,7 @@ impl StreamReader { // multiple of the chunk size. In that case, we try decrypting twice on a // decryption failure. // TODO: Generalise to multiple chunks. - let last = chunks.len() < ENCRYPTED_CHUNK_SIZE; + let last = chunks.len() < *ENCRYPTED_CHUNKS_SIZE; self.chunks = match (self.stream.decrypt_chunks(chunks, last), last) { (Ok(chunk), _) => Some(chunk), @@ -462,16 +468,16 @@ impl StreamReader { return 0; } - // TODO: Generalise to multiple chunks. - let chunk = self.chunks.as_ref().unwrap(); - let cur_chunk_offset = self.cur_plaintext_pos as usize % CHUNK_SIZE; + let chunks = self.chunks.as_ref().unwrap(); + let cur_chunks_offset = self.cur_plaintext_pos as usize % *CHUNKS_SIZE; - let to_read = cmp::min(chunk.expose_secret().len() - cur_chunk_offset, buf.len()); + let to_read = cmp::min(chunks.expose_secret().len() - cur_chunks_offset, buf.len()); - buf[..to_read] - .copy_from_slice(&chunk.expose_secret()[cur_chunk_offset..cur_chunk_offset + to_read]); + buf[..to_read].copy_from_slice( + &chunks.expose_secret()[cur_chunks_offset..cur_chunks_offset + to_read], + ); self.cur_plaintext_pos += to_read as u64; - if self.cur_plaintext_pos % CHUNK_SIZE as u64 == 0 { + if self.cur_plaintext_pos % *CHUNKS_SIZE as u64 == 0 { // We've finished with the current chunks. self.chunks = None; } @@ -483,7 +489,7 @@ impl StreamReader { impl Read for StreamReader { fn read(&mut self, buf: &mut [u8]) -> io::Result { if self.chunks.is_none() { - while self.encrypted_pos < ENCRYPTED_CHUNK_SIZE { + while self.encrypted_pos < *ENCRYPTED_CHUNKS_SIZE { match self .inner .read(&mut self.encrypted_chunks[self.encrypted_pos..]) @@ -511,7 +517,7 @@ impl AsyncRead for StreamReader { buf: &mut [u8], ) -> Poll> { if self.chunks.is_none() { - while self.encrypted_pos < ENCRYPTED_CHUNK_SIZE { + while self.encrypted_pos < *ENCRYPTED_CHUNKS_SIZE { let this = self.as_mut().project(); match ready!(this .inner @@ -587,12 +593,10 @@ impl Seek for StreamReader { } }; - // TODO: Generalise to multiple chunks. - - let cur_chunk_index = self.cur_plaintext_pos / CHUNK_SIZE as u64; + let cur_chunk_index = self.cur_plaintext_pos / *CHUNKS_SIZE as u64; - let target_chunk_index = target_pos / CHUNK_SIZE as u64; - let target_chunk_offset = target_pos % CHUNK_SIZE as u64; + let target_chunk_index = target_pos / *CHUNKS_SIZE as u64; + let target_chunk_offset = target_pos % *CHUNKS_SIZE as u64; if target_chunk_index == cur_chunk_index { // We just need to reposition ourselves within the current chunk. @@ -603,10 +607,10 @@ impl Seek for StreamReader { // Seek to the beginning of the target chunk self.inner.seek(SeekFrom::Start( - start + (target_chunk_index * ENCRYPTED_CHUNK_SIZE as u64), + start + (target_chunk_index * *ENCRYPTED_CHUNKS_SIZE as u64), ))?; self.stream.nonce.set_counter(target_chunk_index); - self.cur_plaintext_pos = target_chunk_index * CHUNK_SIZE as u64; + self.cur_plaintext_pos = target_chunk_index * *CHUNKS_SIZE as u64; // Read and drop bytes from the chunk to reach the target position. if target_chunk_offset > 0 {