Skip to content

Commit

Permalink
age: Buffer as many STREAM chunks as we have logical CPUs
Browse files Browse the repository at this point in the history
  • Loading branch information
str4d committed Dec 29, 2020
1 parent bd82642 commit b224739
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions age/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ i18n-embed-fl = "0.2"
lazy_static = "1"
rust-embed = "5"

# Performance
num_cpus = "1.0"

# Common CLI dependencies
console = { version = "0.13", optional = true }
pinentry = { version = "0.2", optional = true }
Expand Down
58 changes: 31 additions & 27 deletions age/src/primitives/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use chacha20poly1305::{
aead::{generic_array::GenericArray, AeadInPlace, NewAead},
ChaChaPoly1305,
};
use lazy_static::lazy_static;
use pin_project::pin_project;
use secrecy::{ExposeSecret, SecretVec};
use std::cmp;
Expand All @@ -24,6 +25,11 @@ const CHUNK_SIZE: usize = 64 * 1024;
const TAG_SIZE: usize = 16;
const ENCRYPTED_CHUNK_SIZE: usize = CHUNK_SIZE + TAG_SIZE;

lazy_static! {
static ref CHUNKS_SIZE: usize = num_cpus::get() * CHUNK_SIZE;
static ref ENCRYPTED_CHUNKS_SIZE: usize = num_cpus::get() * ENCRYPTED_CHUNK_SIZE;
}

pub(crate) struct PayloadKey(
pub(crate) GenericArray<u8, <ChaChaPoly1305<c2_chacha::Ietf> as NewAead>::KeySize>,
);
Expand Down Expand Up @@ -112,7 +118,7 @@ impl Stream {
StreamWriter {
stream: Self::new(key),
inner,
chunks: Vec::with_capacity(CHUNK_SIZE),
chunks: Vec::with_capacity(*CHUNKS_SIZE),
#[cfg(feature = "async")]
encrypted_chunks: None,
}
Expand All @@ -130,7 +136,7 @@ impl Stream {
StreamWriter {
stream: Self::new(key),
inner,
chunks: Vec::with_capacity(CHUNK_SIZE),
chunks: Vec::with_capacity(*CHUNKS_SIZE),
encrypted_chunks: None,
}
}
Expand All @@ -146,7 +152,7 @@ impl Stream {
StreamReader {
stream: Self::new(key),
inner,
encrypted_chunks: vec![0; ENCRYPTED_CHUNK_SIZE],
encrypted_chunks: vec![0; *ENCRYPTED_CHUNKS_SIZE],
encrypted_pos: 0,
start: StartPos::Implicit(0),
cur_plaintext_pos: 0,
Expand All @@ -166,7 +172,7 @@ impl Stream {
StreamReader {
stream: Self::new(key),
inner,
encrypted_chunks: vec![0; ENCRYPTED_CHUNK_SIZE],
encrypted_chunks: vec![0; *ENCRYPTED_CHUNKS_SIZE],
encrypted_pos: 0,
start: StartPos::Implicit(0),
cur_plaintext_pos: 0,
Expand Down Expand Up @@ -282,13 +288,13 @@ impl<W: Write> Write for StreamWriter<W> {
let mut bytes_written = 0;

while !buf.is_empty() {
let to_write = cmp::min(CHUNK_SIZE - self.chunks.len(), buf.len());
let to_write = cmp::min(*CHUNKS_SIZE - self.chunks.len(), buf.len());
self.chunks.extend_from_slice(&buf[..to_write]);
bytes_written += to_write;
buf = &buf[to_write..];

// At this point, either buf is empty, or we have a full chunk.
assert!(buf.is_empty() || self.chunks.len() == CHUNK_SIZE);
// At this point, either buf is empty, or we have a full set of chunks.
assert!(buf.is_empty() || self.chunks.len() == *CHUNKS_SIZE);

// Only encrypt the chunk if we have more data to write, as the last
// chunk must be written in finish().
Expand Down Expand Up @@ -340,16 +346,16 @@ impl<W: AsyncWrite> AsyncWrite for StreamWriter<W> {
) -> Poll<io::Result<usize>> {
ready!(self.as_mut().poll_flush_chunk(cx))?;

let to_write = cmp::min(CHUNK_SIZE - self.chunks.len(), buf.len());
let to_write = cmp::min(*CHUNKS_SIZE - self.chunks.len(), buf.len());

self.as_mut()
.project()
.chunks
.extend_from_slice(&buf[..to_write]);
buf = &buf[to_write..];

// At this point, either buf is empty, or we have a full chunk.
assert!(buf.is_empty() || self.chunks.len() == CHUNK_SIZE);
// At this point, either buf is empty, or we have a full set of chunks.
assert!(buf.is_empty() || self.chunks.len() == *CHUNKS_SIZE);

// Only encrypt the chunk if we have more data to write, as the last
// chunk must be written in poll_close().
Expand Down Expand Up @@ -442,7 +448,7 @@ impl<R> StreamReader<R> {
// multiple of the chunk size. In that case, we try decrypting twice on a
// decryption failure.
// TODO: Generalise to multiple chunks.
let last = chunks.len() < ENCRYPTED_CHUNK_SIZE;
let last = chunks.len() < *ENCRYPTED_CHUNKS_SIZE;

self.chunks = match (self.stream.decrypt_chunks(chunks, last), last) {
(Ok(chunk), _) => Some(chunk),
Expand All @@ -462,16 +468,16 @@ impl<R> StreamReader<R> {
return 0;
}

// TODO: Generalise to multiple chunks.
let chunk = self.chunks.as_ref().unwrap();
let cur_chunk_offset = self.cur_plaintext_pos as usize % CHUNK_SIZE;
let chunks = self.chunks.as_ref().unwrap();
let cur_chunks_offset = self.cur_plaintext_pos as usize % *CHUNKS_SIZE;

let to_read = cmp::min(chunk.expose_secret().len() - cur_chunk_offset, buf.len());
let to_read = cmp::min(chunks.expose_secret().len() - cur_chunks_offset, buf.len());

buf[..to_read]
.copy_from_slice(&chunk.expose_secret()[cur_chunk_offset..cur_chunk_offset + to_read]);
buf[..to_read].copy_from_slice(
&chunks.expose_secret()[cur_chunks_offset..cur_chunks_offset + to_read],
);
self.cur_plaintext_pos += to_read as u64;
if self.cur_plaintext_pos % CHUNK_SIZE as u64 == 0 {
if self.cur_plaintext_pos % *CHUNKS_SIZE as u64 == 0 {
// We've finished with the current chunks.
self.chunks = None;
}
Expand All @@ -483,7 +489,7 @@ impl<R> StreamReader<R> {
impl<R: Read> Read for StreamReader<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.chunks.is_none() {
while self.encrypted_pos < ENCRYPTED_CHUNK_SIZE {
while self.encrypted_pos < *ENCRYPTED_CHUNKS_SIZE {
match self
.inner
.read(&mut self.encrypted_chunks[self.encrypted_pos..])
Expand Down Expand Up @@ -511,7 +517,7 @@ impl<R: AsyncRead + Unpin> AsyncRead for StreamReader<R> {
buf: &mut [u8],
) -> Poll<Result<usize, Error>> {
if self.chunks.is_none() {
while self.encrypted_pos < ENCRYPTED_CHUNK_SIZE {
while self.encrypted_pos < *ENCRYPTED_CHUNKS_SIZE {
let this = self.as_mut().project();
match ready!(this
.inner
Expand Down Expand Up @@ -587,12 +593,10 @@ impl<R: Read + Seek> Seek for StreamReader<R> {
}
};

// TODO: Generalise to multiple chunks.

let cur_chunk_index = self.cur_plaintext_pos / CHUNK_SIZE as u64;
let cur_chunk_index = self.cur_plaintext_pos / *CHUNKS_SIZE as u64;

let target_chunk_index = target_pos / CHUNK_SIZE as u64;
let target_chunk_offset = target_pos % CHUNK_SIZE as u64;
let target_chunk_index = target_pos / *CHUNKS_SIZE as u64;
let target_chunk_offset = target_pos % *CHUNKS_SIZE as u64;

if target_chunk_index == cur_chunk_index {
// We just need to reposition ourselves within the current chunk.
Expand All @@ -603,10 +607,10 @@ impl<R: Read + Seek> Seek for StreamReader<R> {

// Seek to the beginning of the target chunk
self.inner.seek(SeekFrom::Start(
start + (target_chunk_index * ENCRYPTED_CHUNK_SIZE as u64),
start + (target_chunk_index * *ENCRYPTED_CHUNKS_SIZE as u64),
))?;
self.stream.nonce.set_counter(target_chunk_index);
self.cur_plaintext_pos = target_chunk_index * CHUNK_SIZE as u64;
self.cur_plaintext_pos = target_chunk_index * *CHUNKS_SIZE as u64;

// Read and drop bytes from the chunk to reach the target position.
if target_chunk_offset > 0 {
Expand Down

0 comments on commit b224739

Please sign in to comment.