Skip to content

Commit

Permalink
Place block construction in a Cache struct (#684)
Browse files Browse the repository at this point in the history
This commit puts the block construction into a type, called `Cache`. We support
a peek interface which requires mutable access and an indexing implementation
which is read-only. This is not quite pleasing to the eye but it maintains the
interfaces that already exist. This work is intended to be the basis of an
infinite streaming `Cache`, not enabled by default. That work will be done in a
follow-up PR.

For the most part this change is mechanical. The unix-datagram generator now
follows the file-gen in sharing a single `Cache` instance per writer, rather
than having one for each child.

REF SMP-664

Signed-off-by: Brian L. Troutwine <[email protected]>
  • Loading branch information
blt authored Aug 18, 2023
1 parent a2d217e commit b6b94f5
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 232 deletions.
301 changes: 173 additions & 128 deletions lading/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,10 @@ use lading_payload as payload;
use metrics::gauge;
use rand::{prelude::SliceRandom, Rng};

#[derive(Debug, PartialEq, Eq, Clone, Copy, thiserror::Error)]
#[derive(Debug, thiserror::Error, Clone, Copy)]
pub enum Error {
#[error("Chunk error: {0}")]
Chunk(ChunkError),
}

impl From<ChunkError> for Error {
fn from(error: ChunkError) -> Self {
Error::Chunk(error)
}
Chunk(#[from] ChunkError),
}

#[derive(Debug)]
Expand All @@ -23,29 +17,16 @@ pub(crate) struct Block {
pub(crate) bytes: Bytes,
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, thiserror::Error, Clone, Copy)]
pub enum ChunkError {
/// The slice of byte sizes given to [`chunk_bytes`] was empty.
#[error("The slice of byte sizes given was empty.")]
EmptyBlockBytes,
/// The `total_bytes` parameter is insufficient.
#[error("Insufficient total bytes.")]
InsufficientTotalBytes,
}

impl std::fmt::Display for ChunkError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
ChunkError::EmptyBlockBytes => write!(
f,
"the slice of byte sizes given to `chunk_bytes` was empty"
),
ChunkError::InsufficientTotalBytes => {
write!(f, "the `total_bytes` parameter is insufficient")
}
}
}
}
impl std::error::Error for ChunkError {}

/// Construct a vec of block sizes that fit into `total_bytes`.
///
/// When calling [`construct_block_cache`] it's necessary to supply a
Expand Down Expand Up @@ -98,113 +79,174 @@ where
Ok(chunks)
}

pub(crate) fn construct_block_cache<R>(
mut rng: R,
payload: &payload::Config,
block_chunks: &[usize],
labels: &Vec<(String, String)>,
) -> Vec<Block>
where
R: Rng,
{
match payload {
payload::Config::TraceAgent(enc) => {
let ta = match enc {
payload::Encoding::Json => payload::TraceAgent::json(&mut rng),
payload::Encoding::MsgPack => payload::TraceAgent::msg_pack(&mut rng),
};
#[derive(Debug)]
/// A mechanism for streaming byte blobs, 'blocks'
///
/// The `Cache` is a mechanism to allow generators to request 'blocks' without
/// needing to be aware of the origin or generation mechanism of these
/// blocks. We support a single mode of operation where all blocks are computed
/// ahead-of-time and stored in the `Cache`. Callers are responsible for timing
/// et al.
///
/// We expect to expand the different modes of `Cache` operation in the future.
pub(crate) struct Cache {
idx: usize,
blocks: Vec<Block>,
}

construct_block_cache_inner(&mut rng, &ta, block_chunks, labels)
}
payload::Config::Syslog5424 => construct_block_cache_inner(
&mut rng,
&payload::Syslog5424::default(),
block_chunks,
labels,
),
payload::Config::DogStatsD(payload::dogstatsd::Config {
contexts_minimum,
contexts_maximum,
name_length_minimum,
name_length_maximum,
tag_key_length_minimum,
tag_key_length_maximum,
tag_value_length_minimum,
tag_value_length_maximum,
tags_per_msg_minimum,
tags_per_msg_maximum,
// TODO -- how can I validate user input for multivalue_pack_probability
multivalue_pack_probability,
multivalue_count_minimum,
multivalue_count_maximum,
kind_weights,
metric_weights,
}) => {
let context_range = *contexts_minimum..*contexts_maximum;
let tags_per_msg_range = *tags_per_msg_minimum..*tags_per_msg_maximum;
let name_length_range = *name_length_minimum..*name_length_maximum;
let tag_key_length_range = *tag_key_length_minimum..*tag_key_length_maximum;
let tag_value_length_range = *tag_value_length_minimum..*tag_value_length_maximum;
let multivalue_count_range = *multivalue_count_minimum..*multivalue_count_maximum;
impl Cache {
/// Construct a `Cache` of fixed size.
///
/// This constructor makes an internal pool of `Block` instances up to
/// `total_bytes`, each of which are roughly the size of one of the
/// `block_byte_sizes`. Internally, `Blocks` are looped over in a
/// round-robin during peeking, iteration.
///
/// # Errors
///
/// Function will return an error if `block_byte_sizes` is empty or if a member
/// of `block_byte_sizes` is large than `total_bytes`.
pub(crate) fn fixed<R>(
mut rng: &mut R,
total_bytes: NonZeroUsize,
block_byte_sizes: &[NonZeroUsize],
payload: &payload::Config,
labels: &Vec<(String, String)>,
) -> Result<Self, Error>
where
R: Rng + ?Sized,
{
let block_chunks = chunk_bytes(&mut rng, total_bytes, block_byte_sizes)?;
let blocks = match payload {
payload::Config::TraceAgent(enc) => {
let ta = match enc {
payload::Encoding::Json => payload::TraceAgent::json(&mut rng),
payload::Encoding::MsgPack => payload::TraceAgent::msg_pack(&mut rng),
};

let serializer = payload::DogStatsD::new(
context_range,
name_length_range,
tag_key_length_range,
tag_value_length_range,
tags_per_msg_range,
multivalue_count_range,
*multivalue_pack_probability,
*kind_weights,
*metric_weights,
construct_block_cache_inner(&mut rng, &ta, &block_chunks, labels)
}
payload::Config::Syslog5424 => construct_block_cache_inner(
&mut rng,
);
&payload::Syslog5424::default(),
&block_chunks,
labels,
),
payload::Config::DogStatsD(payload::dogstatsd::Config {
contexts_minimum,
contexts_maximum,
name_length_minimum,
name_length_maximum,
tag_key_length_minimum,
tag_key_length_maximum,
tag_value_length_minimum,
tag_value_length_maximum,
tags_per_msg_minimum,
tags_per_msg_maximum,
// TODO -- how can I validate user input for multivalue_pack_probability
multivalue_pack_probability,
multivalue_count_minimum,
multivalue_count_maximum,
kind_weights,
metric_weights,
}) => {
let context_range = *contexts_minimum..*contexts_maximum;
let tags_per_msg_range = *tags_per_msg_minimum..*tags_per_msg_maximum;
let name_length_range = *name_length_minimum..*name_length_maximum;
let tag_key_length_range = *tag_key_length_minimum..*tag_key_length_maximum;
let tag_value_length_range = *tag_value_length_minimum..*tag_value_length_maximum;
let multivalue_count_range = *multivalue_count_minimum..*multivalue_count_maximum;

construct_block_cache_inner(&mut rng, &serializer, block_chunks, labels)
}
payload::Config::Fluent => {
let pyld = payload::Fluent::new(&mut rng);
construct_block_cache_inner(&mut rng, &pyld, block_chunks, labels)
}
payload::Config::SplunkHec { encoding } => construct_block_cache_inner(
&mut rng,
&payload::SplunkHec::new(*encoding),
block_chunks,
labels,
),
payload::Config::ApacheCommon => {
let pyld = payload::ApacheCommon::new(&mut rng);
construct_block_cache_inner(&mut rng, &pyld, block_chunks, labels)
}
payload::Config::Ascii => {
let pyld = payload::Ascii::new(&mut rng);
construct_block_cache_inner(&mut rng, &pyld, block_chunks, labels)
}
payload::Config::DatadogLog => {
let serializer = payload::DatadogLog::new(&mut rng);
construct_block_cache_inner(&mut rng, &serializer, block_chunks, labels)
}
payload::Config::Json => {
construct_block_cache_inner(&mut rng, &payload::Json, block_chunks, labels)
}
payload::Config::Static { ref static_path } => construct_block_cache_inner(
&mut rng,
&payload::Static::new(static_path),
block_chunks,
labels,
),
payload::Config::OpentelemetryTraces => {
let pyld = payload::OpentelemetryTraces::new(&mut rng);
construct_block_cache_inner(rng, &pyld, block_chunks, labels)
}
payload::Config::OpentelemetryLogs => {
let pyld = payload::OpentelemetryLogs::new(&mut rng);
construct_block_cache_inner(rng, &pyld, block_chunks, labels)
}
payload::Config::OpentelemetryMetrics => {
let pyld = payload::OpentelemetryMetrics::new(&mut rng);
construct_block_cache_inner(rng, &pyld, block_chunks, labels)
}
let serializer = payload::DogStatsD::new(
context_range,
name_length_range,
tag_key_length_range,
tag_value_length_range,
tags_per_msg_range,
multivalue_count_range,
*multivalue_pack_probability,
*kind_weights,
*metric_weights,
&mut rng,
);

construct_block_cache_inner(&mut rng, &serializer, &block_chunks, labels)
}
payload::Config::Fluent => {
let pyld = payload::Fluent::new(&mut rng);
construct_block_cache_inner(&mut rng, &pyld, &block_chunks, labels)
}
payload::Config::SplunkHec { encoding } => construct_block_cache_inner(
&mut rng,
&payload::SplunkHec::new(*encoding),
&block_chunks,
labels,
),
payload::Config::ApacheCommon => {
let pyld = payload::ApacheCommon::new(&mut rng);
construct_block_cache_inner(&mut rng, &pyld, &block_chunks, labels)
}
payload::Config::Ascii => {
let pyld = payload::Ascii::new(&mut rng);
construct_block_cache_inner(&mut rng, &pyld, &block_chunks, labels)
}
payload::Config::DatadogLog => {
let serializer = payload::DatadogLog::new(&mut rng);
construct_block_cache_inner(&mut rng, &serializer, &block_chunks, labels)
}
payload::Config::Json => {
construct_block_cache_inner(&mut rng, &payload::Json, &block_chunks, labels)
}
payload::Config::Static { ref static_path } => construct_block_cache_inner(
&mut rng,
&payload::Static::new(static_path),
&block_chunks,
labels,
),
payload::Config::OpentelemetryTraces => {
let pyld = payload::OpentelemetryTraces::new(&mut rng);
construct_block_cache_inner(rng, &pyld, &block_chunks, labels)
}
payload::Config::OpentelemetryLogs => {
let pyld = payload::OpentelemetryLogs::new(&mut rng);
construct_block_cache_inner(rng, &pyld, &block_chunks, labels)
}
payload::Config::OpentelemetryMetrics => {
let pyld = payload::OpentelemetryMetrics::new(&mut rng);
construct_block_cache_inner(rng, &pyld, &block_chunks, labels)
}
};
Ok(Self { idx: 0, blocks })
}

/// Return a reference to the next `Block` in the cache without advancing
/// the cache. Returns `None` if the cache has no further blocks and will
/// not.
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn peek(&mut self) -> Option<&Block> {
Some(&self.blocks[self.idx])
}

/// Return a reference to the next `Block` in the cache, advancing the
/// cache. Returns `None` if the cache has no further blocks and will not.
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn next(&mut self) -> Option<&Block> {
let res = &self.blocks[self.idx];
self.idx = (self.idx + 1) % self.blocks.len();
Some(res)
}

/// Return a reference to the next `Block` in the cache at the user-provided
/// `idx`. Panics if the `idx` is out of range.
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn at_idx(&self, idx: usize) -> &Block {
&self.blocks[idx]
}

/// Returns the length of the interior block cache.
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn len(&self) -> usize {
self.blocks.len()
}
}

Expand Down Expand Up @@ -310,7 +352,10 @@ mod test {
#[test]
fn chunks_empty_trigger_error(seed: u64, total_bytes in (1..usize::MAX).prop_map(|i| NonZeroUsize::new(i).unwrap())) {
let mut rng = SmallRng::seed_from_u64(seed);
prop_assert_eq!(Err(Error::Chunk(ChunkError::EmptyBlockBytes)), chunk_bytes(&mut rng, total_bytes, &[]));
match chunk_bytes(&mut rng, total_bytes, &[]) {
Err(Error::Chunk(ChunkError::EmptyBlockBytes)) => assert!(true),
_ => assert!(false),
}
}
}
}
Loading

0 comments on commit b6b94f5

Please sign in to comment.