Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow readers to read consistently #1043

Merged
merged 2 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lading/src/bin/payloadtool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fn generate_and_check(
let start = Instant::now();
let blocks =
match block::Cache::fixed(&mut rng, total_bytes, max_block_size.get_bytes(), config)? {
block::Cache::Fixed { blocks, idx: _ } => blocks,
block::Cache::Fixed { blocks, .. } => blocks,
};
info!("Payload generation took {:?}", start.elapsed());
debug!("Payload: {:#?}", blocks);
Expand Down
60 changes: 15 additions & 45 deletions lading/src/generator/file_gen/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,30 +47,6 @@ pub struct File {
}

impl File {
/// Returns the number of bytes available to be read at instance `now`.
///
/// This function returns the number of bytes that have been "written" to
/// the `File` and are available to be read. For instance, `modified_tick`
/// may be in the past but sufficient bytes have accumulated in the file for
/// non-zero reads to remain possible. Bytes will not be noted as consumed
/// until the caller calls [`File::read`].
///
/// Call to this file will advance `bytes_written` if `now` >
/// `modified_tick`.
///
/// Returns 0 if `bytes_written` == `bytes_read`.
///
/// # Panics
///
/// Function will panic if `bytes_written` < `bytes_read`. This indicates a
/// catastrophic programming error.
pub fn available_to_read(&mut self, now: Tick) -> u64 {
self.advance_time(now);

assert!(self.bytes_written >= self.bytes_read);
self.bytes_written.saturating_sub(self.bytes_read)
}

/// Register a read.
///
/// This function is pair to [`File::available_to_read`]. It's possible that
Expand All @@ -89,16 +65,6 @@ impl File {
self.status_tick = now;
}

/// Register a read-only open.
///
/// This function updates `access_time` to `now`. Time is advanced which may
/// result in more bytes being available in-file.
pub fn ro_open(&mut self, now: Tick) {
self.advance_time(now);

self.access_tick = now;
}

/// Run the clock forward in the `File`.
///
/// This function runs the clock forward to `now`, updating `modified_tick`
Expand Down Expand Up @@ -347,19 +313,23 @@ impl State {
name: _,
ref mut file,
}) => {
let available = file.available_to_read(now);
if available == 0 {
return None;
}
let bytes_written = usize::try_from(file.bytes_written)
.expect("more bytes written than machine word");

let block_len = self.block_cache.peek_next().total_bytes.get() as usize;
if block_len <= size {
let block = self.block_cache.next_block();
file.read(block_len as u64, now);
Some(block.bytes.clone())
} else {
None
if offset >= bytes_written {
// Offset beyond EOF
return Some(Bytes::new());
}

let available = bytes_written.saturating_sub(offset);
let to_read = available.min(size);

// Get data from block_cache without worrying about blocks
let data = self.block_cache.read_at(offset as u64, to_read);

file.read(to_read as u64, now);

Some(data)
}
Some(Node::Directory { .. }) | None => None,
}
Expand Down
82 changes: 79 additions & 3 deletions lading_payload/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ pub enum Cache {
idx: usize,
/// The store of blocks.
blocks: Vec<Block>,
/// The amount of data stored in one cycle, or all blocks
total_cycle_size: u64,
},
}

Expand Down Expand Up @@ -323,7 +325,17 @@ impl Cache {
construct_block_cache_inner(rng, &pyld, maximum_block_bytes, total_bytes.get())?
}
};
Ok(Self::Fixed { idx: 0, blocks })

let total_cycle_size = blocks
.iter()
.map(|block| u64::from(block.total_bytes.get()))
.sum();

Ok(Self::Fixed {
idx: 0,
blocks,
total_cycle_size,
})
}

/// Run `Cache` forward on the user-provided mpsc sender.
Expand All @@ -339,7 +351,9 @@ impl Cache {
#[allow(clippy::needless_pass_by_value)]
pub fn spin(self, snd: Sender<Block>) -> Result<(), SpinError> {
match self {
Self::Fixed { mut idx, blocks } => loop {
Self::Fixed {
mut idx, blocks, ..
} => loop {
snd.blocking_send(blocks[idx].clone())?;
idx = (idx + 1) % blocks.len();
},
Expand All @@ -354,7 +368,7 @@ impl Cache {
#[must_use]
pub fn peek_next(&self) -> &Block {
match self {
Self::Fixed { idx, blocks } => &blocks[*idx],
Self::Fixed { idx, blocks, .. } => &blocks[*idx],
}
}

Expand All @@ -367,13 +381,75 @@ impl Cache {
Self::Fixed {
ref mut idx,
blocks,
..
} => {
let block = &blocks[*idx];
*idx = (*idx + 1) % blocks.len();
block
}
}
}

/// Read data starting from a given offset and up to the specified size.
///
/// # Panics
///
/// Function will panic if reads are larger than machine word bytes wide.
pub fn read_at(&self, offset: u64, size: usize) -> Bytes {
let mut data = BytesMut::with_capacity(size);

let (blocks, total_cycle_size) = match self {
Cache::Fixed {
blocks,
total_cycle_size,
..
} => (
blocks,
usize::try_from(*total_cycle_size)
.expect("cycle size larger than machine word bytes"),
),
};

let mut remaining = size;
let mut current_offset =
usize::try_from(offset).expect("offset larger than machine word bytes");

while remaining > 0 {
// The plan is this. We treat the blocks as one infinite cycle. We
// map our offset into the domain of the blocks, then seek forward
// until we find the block we need to start reading from. Then we
// read into `data`.

let offset_within_cycle = current_offset % total_cycle_size;
let mut block_start = 0;
for block in blocks {
let block_size = block.total_bytes.get() as usize;
if offset_within_cycle < block_start + block_size {
// Offset is within this block. Begin reading into `data`.
let block_offset = offset_within_cycle - block_start;
let bytes_in_block = (block_size - block_offset).min(remaining);

data.extend_from_slice(
&block.bytes[block_offset..block_offset + bytes_in_block],
);

remaining -= bytes_in_block;
current_offset += bytes_in_block;
break;
}
block_start += block_size;
}
Comment on lines +417 to +441
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to stare at this loop for a while to appreciate the subtleties involved (reads crossing block boundaries, cycle boundaries). Neat stuff.


// If we couldn't find a block this suggests something seriously
// wacky has happened.
if remaining > 0 && block_start >= total_cycle_size {
error!("Offset exceeds total cycle size");
break;
}
}

data.freeze()
}
}

/// Construct a new block cache of form defined by `serializer`.
Expand Down
Loading