Skip to content

Commit

Permalink
Allow readers to read consistently (#1043)
Browse files Browse the repository at this point in the history
### What does this PR do?

This commit exposes a `read_at` on the block cache allowing callers to
read the same data from offset X over and over again. This means that
when we expose data from a block cache in the logrotate filesystem we
are able to correctly do `wc -l` and similar.
  • Loading branch information
blt authored Oct 28, 2024
1 parent 5329db7 commit e619a9e
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 49 deletions.
2 changes: 1 addition & 1 deletion lading/src/bin/payloadtool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn generate_and_check(
let start = Instant::now();
let blocks =
match block::Cache::fixed(&mut rng, total_bytes, max_block_size.get_bytes(), config)? {
block::Cache::Fixed { blocks, idx: _ } => blocks,
block::Cache::Fixed { blocks, .. } => blocks,
};
info!("Payload generation took {:?}", start.elapsed());
debug!("Payload: {:#?}", blocks);
Expand Down
60 changes: 15 additions & 45 deletions lading/src/generator/file_gen/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,6 @@ pub struct File {
}

impl File {
/// Returns the number of bytes available to be read at instance `now`.
///
/// This function returns the number of bytes that have been "written" to
/// the `File` and are available to be read. For instance, `modified_tick`
/// may be in the past but sufficient bytes have accumulated in the file for
/// non-zero reads to remain possible. Bytes will not be noted as consumed
/// until the caller calls [`File::read`].
///
/// Call to this file will advance `bytes_written` if `now` >
/// `modified_tick`.
///
/// Returns 0 if `bytes_written` == `bytes_read`.
///
/// # Panics
///
/// Function will panic if `bytes_written` < `bytes_read`. This indicates a
/// catastrophic programming error.
pub fn available_to_read(&mut self, now: Tick) -> u64 {
self.advance_time(now);

assert!(self.bytes_written >= self.bytes_read);
self.bytes_written.saturating_sub(self.bytes_read)
}

/// Register a read.
///
/// This function is pair to [`File::available_to_read`]. It's possible that
Expand All @@ -89,16 +65,6 @@ impl File {
self.status_tick = now;
}

/// Register a read-only open.
///
/// This function updates `access_time` to `now`. Time is advanced which may
/// result in more bytes being available in-file.
pub fn ro_open(&mut self, now: Tick) {
self.advance_time(now);

self.access_tick = now;
}

/// Run the clock forward in the `File`.
///
/// This function runs the clock forward to `now`, updating `modified_tick`
Expand Down Expand Up @@ -347,19 +313,23 @@ impl State {
name: _,
ref mut file,
}) => {
let available = file.available_to_read(now);
if available == 0 {
return None;
}
let bytes_written = usize::try_from(file.bytes_written)
.expect("more bytes written than machine word");

let block_len = self.block_cache.peek_next().total_bytes.get() as usize;
if block_len <= size {
let block = self.block_cache.next_block();
file.read(block_len as u64, now);
Some(block.bytes.clone())
} else {
None
if offset >= bytes_written {
// Offset beyond EOF
return Some(Bytes::new());
}

let available = bytes_written.saturating_sub(offset);
let to_read = available.min(size);

// Get data from block_cache without worrying about blocks
let data = self.block_cache.read_at(offset as u64, to_read);

file.read(to_read as u64, now);

Some(data)
}
Some(Node::Directory { .. }) | None => None,
}
Expand Down
82 changes: 79 additions & 3 deletions lading_payload/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ pub enum Cache {
idx: usize,
/// The store of blocks.
blocks: Vec<Block>,
/// The amount of data stored in one cycle, or all blocks
total_cycle_size: u64,
},
}

Expand Down Expand Up @@ -323,7 +325,17 @@ impl Cache {
construct_block_cache_inner(rng, &pyld, maximum_block_bytes, total_bytes.get())?
}
};
Ok(Self::Fixed { idx: 0, blocks })

let total_cycle_size = blocks
.iter()
.map(|block| u64::from(block.total_bytes.get()))
.sum();

Ok(Self::Fixed {
idx: 0,
blocks,
total_cycle_size,
})
}

/// Run `Cache` forward on the user-provided mpsc sender.
Expand All @@ -339,7 +351,9 @@ impl Cache {
#[allow(clippy::needless_pass_by_value)]
pub fn spin(self, snd: Sender<Block>) -> Result<(), SpinError> {
match self {
Self::Fixed { mut idx, blocks } => loop {
Self::Fixed {
mut idx, blocks, ..
} => loop {
snd.blocking_send(blocks[idx].clone())?;
idx = (idx + 1) % blocks.len();
},
Expand All @@ -354,7 +368,7 @@ impl Cache {
#[must_use]
pub fn peek_next(&self) -> &Block {
match self {
Self::Fixed { idx, blocks } => &blocks[*idx],
Self::Fixed { idx, blocks, .. } => &blocks[*idx],
}
}

Expand All @@ -367,13 +381,75 @@ impl Cache {
Self::Fixed {
ref mut idx,
blocks,
..
} => {
let block = &blocks[*idx];
*idx = (*idx + 1) % blocks.len();
block
}
}
}

/// Read data starting from a given offset and up to the specified size.
///
/// # Panics
///
/// Function will panic if reads are larger than machine word bytes wide.
pub fn read_at(&self, offset: u64, size: usize) -> Bytes {
let mut data = BytesMut::with_capacity(size);

let (blocks, total_cycle_size) = match self {
Cache::Fixed {
blocks,
total_cycle_size,
..
} => (
blocks,
usize::try_from(*total_cycle_size)
.expect("cycle size larger than machine word bytes"),
),
};

let mut remaining = size;
let mut current_offset =
usize::try_from(offset).expect("offset larger than machine word bytes");

while remaining > 0 {
// The plan is this. We treat the blocks as one infinite cycle. We
// map our offset into the domain of the blocks, then seek forward
// until we find the block we need to start reading from. Then we
// read into `data`.

let offset_within_cycle = current_offset % total_cycle_size;
let mut block_start = 0;
for block in blocks {
let block_size = block.total_bytes.get() as usize;
if offset_within_cycle < block_start + block_size {
// Offset is within this block. Begin reading into `data`.
let block_offset = offset_within_cycle - block_start;
let bytes_in_block = (block_size - block_offset).min(remaining);

data.extend_from_slice(
&block.bytes[block_offset..block_offset + bytes_in_block],
);

remaining -= bytes_in_block;
current_offset += bytes_in_block;
break;
}
block_start += block_size;
}

// If we couldn't find a block this suggests something seriously
// wacky has happened.
if remaining > 0 && block_start >= total_cycle_size {
error!("Offset exceeds total cycle size");
break;
}
}

data.freeze()
}
}

/// Construct a new block cache of form defined by `serializer`.
Expand Down

0 comments on commit e619a9e

Please sign in to comment.