From 7a632caf317d55af216903a86d922333629d59bc Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Mon, 21 Aug 2023 18:19:16 -0700 Subject: [PATCH 1/2] Block cache with an infinite stream option This commit makes two major changes to the block cache. The most important is that the block cache's major CPU consumption -- the construction of `Block` instances -- is now done in a separate OS thread from the tokio runtime. This allows us to introduce the second more important change: infinite streams of `Blocks`. It is now possible for users to construct an unending stream of `Block` instances that do not loop. We maintain a cache of constructed `Block`s up to the maximum total bytes allow to minimize any potential latency impact. Configuration is changed but not in a backward incompatible way. REF SMP-664 Signed-off-by: Brian L. Troutwine --- lading/src/block.rs | 457 +++++++++++++++++++------- lading/src/common.rs | 31 ++ lading/src/config.rs | 5 +- lading/src/generator.rs | 110 +++---- lading/src/generator/file_gen.rs | 58 ++-- lading/src/generator/grpc.rs | 45 ++- lading/src/generator/http.rs | 46 ++- lading/src/generator/splunk_hec.rs | 46 ++- lading/src/generator/tcp.rs | 26 +- lading/src/generator/udp.rs | 22 +- lading/src/generator/unix_datagram.rs | 52 ++- lading/src/generator/unix_stream.rs | 44 ++- 12 files changed, 663 insertions(+), 279 deletions(-) diff --git a/lading/src/block.rs b/lading/src/block.rs index 2e610130c..a8618fd9d 100644 --- a/lading/src/block.rs +++ b/lading/src/block.rs @@ -1,9 +1,19 @@ -use std::num::{NonZeroU32, NonZeroUsize}; +use std::{ + collections::VecDeque, + num::{NonZeroU32, NonZeroUsize}, +}; use bytes::{buf::Writer, BufMut, Bytes, BytesMut}; use lading_payload as payload; -use metrics::gauge; -use rand::{prelude::SliceRandom, Rng}; +use rand::{prelude::SliceRandom, rngs::StdRng, Rng, SeedableRng}; +use serde::Deserialize; +use tokio::sync::mpsc::{self, error::SendError, Sender}; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum SpinError { + #[error(transparent)] + Send(#[from] SendError), +} #[derive(Debug, thiserror::Error, Clone, Copy)] pub enum Error { @@ -11,12 +21,6 @@ pub enum Error { Chunk(#[from] ChunkError), } -#[derive(Debug)] -pub(crate) struct Block { - pub(crate) total_bytes: NonZeroU32, - pub(crate) bytes: Bytes, -} - #[derive(Debug, thiserror::Error, Clone, Copy)] pub enum ChunkError { /// The slice of byte sizes given to [`chunk_bytes`] was empty. @@ -27,56 +31,23 @@ pub enum ChunkError { InsufficientTotalBytes, } -/// Construct a vec of block sizes that fit into `total_bytes`. -/// -/// When calling [`construct_block_cache`] it's necessary to supply a -/// `block_chunks` argument, defining the block sizes that will be used when -/// serializing. Callers _generally_ will want to hit a certain total bytes -/// number of blocks and getting `total_bytes` parceled -/// up correctly is not necessarily straightforward. This utility method does -/// the computation in cases where it would otherwise be annoying. From the -/// allowable block sizes -- defined by `block_byte_sizes` -- a random member is -/// chosen and is deducted from the total bytes remaining. This process -/// continues until the total bytes remaining falls below the smallest block -/// size. It's possible that a user could supply just the right parameters to -/// make this loop infinitely. A more clever algorithm would be great. -/// -/// # Errors -/// -/// Function will return an error if `block_byte_sizes` is empty or if a member -/// of `block_byte_sizes` is large than `total_bytes`. -pub(crate) fn chunk_bytes( - rng: &mut R, - total_bytes: NonZeroUsize, - block_byte_sizes: &[NonZeroUsize], -) -> Result, Error> -where - R: Rng + Sized, -{ - if block_byte_sizes.is_empty() { - return Err(ChunkError::EmptyBlockBytes.into()); - } - for bb in block_byte_sizes { - if *bb > total_bytes { - return Err(ChunkError::InsufficientTotalBytes.into()); - } - } +#[derive(Debug, Clone)] +pub(crate) struct Block { + pub(crate) total_bytes: NonZeroU32, + pub(crate) bytes: Bytes, +} - let mut chunks = Vec::new(); - let mut bytes_remaining = total_bytes.get(); - let minimum = block_byte_sizes.iter().min().unwrap().get(); - let maximum = block_byte_sizes.iter().max().unwrap().get(); +#[derive(Debug, Deserialize, PartialEq, Clone, Copy)] +/// The method for which caching will be configure +pub enum CacheMethod { + /// Create a single fixed size block cache and rotate through it + Fixed, + /// Maintain a fixed sized block cache buffer and stream from it + Streaming, +} - while bytes_remaining > minimum { - let bytes_max = std::cmp::min(maximum, bytes_remaining); - let block_bytes = block_byte_sizes.choose(rng).unwrap().get(); - if block_bytes > bytes_max { - continue; - } - chunks.push(block_bytes); - bytes_remaining = bytes_remaining.saturating_sub(block_bytes); - } - Ok(chunks) +pub(crate) fn default_cache_method() -> CacheMethod { + CacheMethod::Fixed } #[derive(Debug)] @@ -89,12 +60,49 @@ where /// et al. /// /// We expect to expand the different modes of `Cache` operation in the future. -pub(crate) struct Cache { - idx: usize, - blocks: Vec, +pub(crate) enum Cache { + Fixed { + /// The current index into `blocks` + idx: usize, + /// The store of blocks. + blocks: Vec, + }, + Stream { + seed: [u8; 32], + total_bytes: usize, + block_chunks: Vec, + payload: payload::Config, + }, } impl Cache { + /// Construct a streaming `Cache`. + /// + /// This constructor makes an internal pool of `Block` instances up to + /// `total_bytes`, each of which are roughly the size of one of the + /// `block_byte_sizes`. Internally, `Blocks` are replaced as they are spun out. + /// + /// # Errors + /// + /// Function will return an error if `block_byte_sizes` is empty or if a member + /// of `block_byte_sizes` is large than `total_bytes`. + pub(crate) fn stream( + seed: [u8; 32], + total_bytes: NonZeroUsize, + block_byte_sizes: &[NonZeroUsize], + payload: payload::Config, + ) -> Result { + let mut rng = StdRng::from_seed(seed); + + let block_chunks = chunk_bytes(&mut rng, total_bytes, block_byte_sizes)?; + Ok(Self::Stream { + seed, + total_bytes: total_bytes.get(), + block_chunks, + payload, + }) + } + /// Construct a `Cache` of fixed size. /// /// This constructor makes an internal pool of `Block` instances up to @@ -111,7 +119,6 @@ impl Cache { total_bytes: NonZeroUsize, block_byte_sizes: &[NonZeroUsize], payload: &payload::Config, - labels: &Vec<(String, String)>, ) -> Result where R: Rng + ?Sized, @@ -124,13 +131,12 @@ impl Cache { payload::Encoding::MsgPack => payload::TraceAgent::msg_pack(&mut rng), }; - construct_block_cache_inner(&mut rng, &ta, &block_chunks, labels) + construct_block_cache_inner(&mut rng, &ta, &block_chunks) } payload::Config::Syslog5424 => construct_block_cache_inner( &mut rng, &payload::Syslog5424::default(), &block_chunks, - labels, ), payload::Config::DogStatsD(payload::dogstatsd::Config { contexts_minimum, @@ -143,7 +149,7 @@ impl Cache { tag_value_length_maximum, tags_per_msg_minimum, tags_per_msg_maximum, - // TODO -- how can I validate user input for multivalue_pack_probability + // TODO -- Validate user input for multivalue_pack_probability. multivalue_pack_probability, multivalue_count_minimum, multivalue_count_maximum, @@ -170,88 +176,234 @@ impl Cache { &mut rng, ); - construct_block_cache_inner(&mut rng, &serializer, &block_chunks, labels) + construct_block_cache_inner(&mut rng, &serializer, &block_chunks) } payload::Config::Fluent => { let pyld = payload::Fluent::new(&mut rng); - construct_block_cache_inner(&mut rng, &pyld, &block_chunks, labels) + construct_block_cache_inner(&mut rng, &pyld, &block_chunks) } payload::Config::SplunkHec { encoding } => construct_block_cache_inner( &mut rng, &payload::SplunkHec::new(*encoding), &block_chunks, - labels, ), payload::Config::ApacheCommon => { let pyld = payload::ApacheCommon::new(&mut rng); - construct_block_cache_inner(&mut rng, &pyld, &block_chunks, labels) + construct_block_cache_inner(&mut rng, &pyld, &block_chunks) } payload::Config::Ascii => { let pyld = payload::Ascii::new(&mut rng); - construct_block_cache_inner(&mut rng, &pyld, &block_chunks, labels) + construct_block_cache_inner(&mut rng, &pyld, &block_chunks) } payload::Config::DatadogLog => { let serializer = payload::DatadogLog::new(&mut rng); - construct_block_cache_inner(&mut rng, &serializer, &block_chunks, labels) + construct_block_cache_inner(&mut rng, &serializer, &block_chunks) } payload::Config::Json => { - construct_block_cache_inner(&mut rng, &payload::Json, &block_chunks, labels) + construct_block_cache_inner(&mut rng, &payload::Json, &block_chunks) } payload::Config::Static { ref static_path } => construct_block_cache_inner( &mut rng, &payload::Static::new(static_path), &block_chunks, - labels, ), payload::Config::OpentelemetryTraces => { let pyld = payload::OpentelemetryTraces::new(&mut rng); - construct_block_cache_inner(rng, &pyld, &block_chunks, labels) + construct_block_cache_inner(rng, &pyld, &block_chunks) } payload::Config::OpentelemetryLogs => { let pyld = payload::OpentelemetryLogs::new(&mut rng); - construct_block_cache_inner(rng, &pyld, &block_chunks, labels) + construct_block_cache_inner(rng, &pyld, &block_chunks) } payload::Config::OpentelemetryMetrics => { let pyld = payload::OpentelemetryMetrics::new(&mut rng); - construct_block_cache_inner(rng, &pyld, &block_chunks, labels) + construct_block_cache_inner(rng, &pyld, &block_chunks) } }; - Ok(Self { idx: 0, blocks }) + Ok(Self::Fixed { idx: 0, blocks }) } - /// Return a reference to the next `Block` in the cache without advancing - /// the cache. Returns `None` if the cache has no further blocks and will - /// not. - #[allow(clippy::unnecessary_wraps)] - pub(crate) fn peek(&mut self) -> Option<&Block> { - Some(&self.blocks[self.idx]) + /// Run `Cache` forward on the user-provided mpsc sender. + /// + /// This is a blocking function that pushes `Block` instances into the + /// user-provided mpsc `Sender`. The user is required to set an + /// appropriate size on the channel. This function will never exit. + #[allow(clippy::needless_pass_by_value)] + pub(crate) fn spin(self, snd: Sender) -> Result<(), SpinError> { + match self { + Self::Fixed { mut idx, blocks } => loop { + snd.blocking_send(blocks[idx].clone())?; + idx = (idx + 1) % blocks.len(); + }, + Cache::Stream { + seed, + total_bytes, + block_chunks, + payload, + } => stream_inner(seed, total_bytes, &block_chunks, &payload, snd), + } } +} + +#[allow(clippy::needless_pass_by_value)] +#[inline] +fn stream_inner( + seed: [u8; 32], + total_bytes: usize, + block_chunks: &[usize], + payload: &payload::Config, + snd: Sender, +) -> Result<(), SpinError> { + let mut rng = StdRng::from_seed(seed); + + match payload { + payload::Config::TraceAgent(enc) => { + let ta = match enc { + payload::Encoding::Json => payload::TraceAgent::json(&mut rng), + payload::Encoding::MsgPack => payload::TraceAgent::msg_pack(&mut rng), + }; + + stream_block_inner(&mut rng, total_bytes, &ta, block_chunks, &snd) + } + payload::Config::Syslog5424 => { + let pyld = payload::Syslog5424::default(); + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } + payload::Config::DogStatsD(payload::dogstatsd::Config { + contexts_minimum, + contexts_maximum, + name_length_minimum, + name_length_maximum, + tag_key_length_minimum, + tag_key_length_maximum, + tag_value_length_minimum, + tag_value_length_maximum, + tags_per_msg_minimum, + tags_per_msg_maximum, + // TODO -- Validate user input for multivalue_pack_probability. + multivalue_pack_probability, + multivalue_count_minimum, + multivalue_count_maximum, + kind_weights, + metric_weights, + }) => { + let context_range = *contexts_minimum..*contexts_maximum; + let tags_per_msg_range = *tags_per_msg_minimum..*tags_per_msg_maximum; + let name_length_range = *name_length_minimum..*name_length_maximum; + let tag_key_length_range = *tag_key_length_minimum..*tag_key_length_maximum; + let tag_value_length_range = *tag_value_length_minimum..*tag_value_length_maximum; + let multivalue_count_range = *multivalue_count_minimum..*multivalue_count_maximum; + + let pyld = payload::DogStatsD::new( + context_range, + name_length_range, + tag_key_length_range, + tag_value_length_range, + tags_per_msg_range, + multivalue_count_range, + *multivalue_pack_probability, + *kind_weights, + *metric_weights, + &mut rng, + ); - /// Return a reference to the next `Block` in the cache, advancing the - /// cache. Returns `None` if the cache has no further blocks and will not. - #[allow(clippy::unnecessary_wraps)] - pub(crate) fn next(&mut self) -> Option<&Block> { - let res = &self.blocks[self.idx]; - self.idx = (self.idx + 1) % self.blocks.len(); - Some(res) + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } + payload::Config::Fluent => { + let pyld = payload::Fluent::new(&mut rng); + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } + payload::Config::SplunkHec { encoding } => { + let pyld = payload::SplunkHec::new(*encoding); + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } + payload::Config::ApacheCommon => { + let pyld = payload::ApacheCommon::new(&mut rng); + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } + payload::Config::Ascii => { + let pyld = payload::Ascii::new(&mut rng); + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } + payload::Config::DatadogLog => { + let pyld = payload::DatadogLog::new(&mut rng); + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } + payload::Config::Json => { + let pyld = payload::Json; + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } + payload::Config::Static { ref static_path } => { + let pyld = payload::Static::new(static_path); + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } + payload::Config::OpentelemetryTraces => { + let pyld = payload::OpentelemetryTraces::new(&mut rng); + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } + payload::Config::OpentelemetryLogs => { + let pyld = payload::OpentelemetryLogs::new(&mut rng); + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } + payload::Config::OpentelemetryMetrics => { + let pyld = payload::OpentelemetryMetrics::new(&mut rng); + stream_block_inner(&mut rng, total_bytes, &pyld, block_chunks, &snd) + } } +} - /// Return a reference to the next `Block` in the cache at the user-provided - /// `idx`. Panics if the `idx` is out of range. - #[allow(clippy::unnecessary_wraps)] - pub(crate) fn at_idx(&self, idx: usize) -> &Block { - &self.blocks[idx] +/// Construct a vec of block sizes that fit into `total_bytes`. +/// +/// When calling [`construct_block_cache`] it's necessary to supply a +/// `block_chunks` argument, defining the block sizes that will be used when +/// serializing. Callers _generally_ will want to hit a certain total bytes +/// number of blocks and getting `total_bytes` parceled +/// up correctly is not necessarily straightforward. This utility method does +/// the computation in cases where it would otherwise be annoying. From the +/// allowable block sizes -- defined by `block_byte_sizes` -- a random member is +/// chosen and is deducted from the total bytes remaining. This process +/// continues until the total bytes remaining falls below the smallest block +/// size. It's possible that a user could supply just the right parameters to +/// make this loop infinitely. A more clever algorithm would be great. +/// +/// # Errors +/// +/// Function will return an error if `block_byte_sizes` is empty or if a member +/// of `block_byte_sizes` is large than `total_bytes`. +fn chunk_bytes( + rng: &mut R, + total_bytes: NonZeroUsize, + block_byte_sizes: &[NonZeroUsize], +) -> Result, Error> +where + R: Rng + Sized, +{ + if block_byte_sizes.is_empty() { + return Err(ChunkError::EmptyBlockBytes.into()); + } + for bb in block_byte_sizes { + if *bb > total_bytes { + return Err(ChunkError::InsufficientTotalBytes.into()); + } } - /// Returns the length of the interior block cache. - #[allow(clippy::unnecessary_wraps)] - pub(crate) fn len(&self) -> usize { - self.blocks.len() + let mut chunks = Vec::new(); + let mut bytes_remaining = total_bytes.get(); + let minimum = block_byte_sizes.iter().min().unwrap().get(); + let maximum = block_byte_sizes.iter().max().unwrap().get(); + + while bytes_remaining > minimum { + let bytes_max = std::cmp::min(maximum, bytes_remaining); + let block_bytes = block_byte_sizes.choose(rng).unwrap().get(); + if block_bytes > bytes_max { + continue; + } + chunks.push(block_bytes); + bytes_remaining = bytes_remaining.saturating_sub(block_bytes); } + Ok(chunks) } -#[allow(clippy::ptr_arg)] -#[allow(clippy::cast_precision_loss)] /// Construct a new block cache of form defined by `serializer`. /// /// A "block cache" is a pre-made vec of serialized arbitrary instances of the @@ -262,44 +414,107 @@ impl Cache { /// /// # Panics /// -/// Function will panic if the `serializer` signals an error. In the futures we +/// Function will panic if the `serializer` signals an error. In the future we /// would like to propagate this error to the caller. #[inline] fn construct_block_cache_inner( - mut rng: R, + mut rng: &mut R, serializer: &S, block_chunks: &[usize], - labels: &Vec<(String, String)>, ) -> Vec where S: payload::Serialize, - R: Rng, + R: Rng + ?Sized, { let mut block_cache: Vec = Vec::with_capacity(block_chunks.len()); for block_size in block_chunks { - let mut block: Writer = BytesMut::with_capacity(*block_size).writer(); - serializer - .to_bytes(&mut rng, *block_size, &mut block) - .unwrap(); - let bytes: Bytes = block.into_inner().freeze(); - if bytes.is_empty() { - // Blocks may be empty, especially when the amount of bytes - // requested for the block are relatively low. This is a quirk of - // our use of Arbitrary. We do not have the ability to tell that - // library that we would like such and such number of bytes - // approximately from an instance. This does mean that we sometimes - // waste computation because the size of the block given cannot be - // serialized into. - continue; + if let Some(block) = construct_block(&mut rng, serializer, *block_size) { + block_cache.push(block); } - let total_bytes = NonZeroU32::new(bytes.len().try_into().unwrap()).unwrap(); - block_cache.push(Block { total_bytes, bytes }); } assert!(!block_cache.is_empty()); - gauge!("block_construction_complete", 1.0, labels); block_cache } +#[inline] +fn stream_block_inner( + mut rng: &mut R, + total_bytes: usize, + serializer: &S, + block_chunks: &[usize], + snd: &Sender, +) -> Result<(), SpinError> +where + S: payload::Serialize, + R: Rng + ?Sized, +{ + let total_bytes: u64 = total_bytes as u64; + let mut accum_bytes: u64 = 0; + let mut cache: VecDeque = VecDeque::new(); + + loop { + // Attempt to read from the cache first, being sure to subtract the + // bytes we send out. + if let Some(block) = cache.pop_front() { + accum_bytes -= u64::from(block.total_bytes.get()); + snd.blocking_send(block)?; + } + // There are no blocks in the cache. In order to minimize latency we + // push blocks into the sender until such time as it's full. When that + // happens we overflow into the cache until such time as that's full. + 'refill: loop { + let block_size = block_chunks.choose(&mut rng).unwrap(); + if let Some(block) = construct_block(&mut rng, serializer, *block_size) { + match snd.try_reserve() { + Ok(permit) => permit.send(block), + Err(err) => match err { + mpsc::error::TrySendError::Full(_) => { + if accum_bytes < total_bytes { + accum_bytes += u64::from(block.total_bytes.get()); + cache.push_back(block); + break 'refill; + } + } + mpsc::error::TrySendError::Closed(_) => return Ok(()), + }, + } + } + } + } +} + +/// Construct a new block +/// +/// # Panics +/// +/// Function will panic if the `serializer` signals an error. In the future we +/// would like to propagate this error to the caller. +#[inline] +fn construct_block(mut rng: &mut R, serializer: &S, chunk_size: usize) -> Option +where + S: payload::Serialize, + R: Rng + ?Sized, +{ + let mut block: Writer = BytesMut::with_capacity(chunk_size).writer(); + serializer + .to_bytes(&mut rng, chunk_size, &mut block) + .unwrap(); + let bytes: Bytes = block.into_inner().freeze(); + if bytes.is_empty() { + // Blocks may be empty, especially when the amount of bytes + // requested for the block are relatively low. This is a quirk of + // our use of randomness. We do not have the ability to tell that + // library that we would like such and such number of bytes + // approximately from an instance. This does mean that we sometimes + // waste computation because the size of the block given cannot be + // serialized into. + None + } else { + let total_bytes = NonZeroU32::new(bytes.len().try_into().unwrap()).unwrap(); + Some(Block { total_bytes, bytes }) + } +} + #[cfg(test)] mod test { use std::num::NonZeroUsize; diff --git a/lading/src/common.rs b/lading/src/common.rs index 327f59eb7..371346305 100644 --- a/lading/src/common.rs +++ b/lading/src/common.rs @@ -1,6 +1,7 @@ use std::{fmt, fs, path::PathBuf, process::Stdio, str}; use serde::Deserialize; +use tokio::sync::mpsc; #[derive(Debug, Deserialize, PartialEq, Eq)] /// Defines how sub-process stderr and stdout are handled. @@ -58,3 +59,33 @@ pub(crate) fn stdio(behavior: &Behavior) -> Stdio { } } } + +#[derive(Debug)] +pub(crate) struct PeekableReceiver { + receiver: mpsc::Receiver, + buffer: Option, +} + +impl PeekableReceiver { + pub(crate) fn new(receiver: mpsc::Receiver) -> Self { + Self { + receiver, + buffer: None, + } + } + + #[inline] + pub(crate) async fn next(&mut self) -> Option { + match self.buffer.take() { + Some(t) => Some(t), + None => self.receiver.recv().await, + } + } + + pub(crate) async fn peek(&mut self) -> Option<&T> { + if self.buffer.is_none() { + self.buffer = self.receiver.recv().await; + } + self.buffer.as_ref() + } +} diff --git a/lading/src/config.rs b/lading/src/config.rs index cedac7a25..ac5a0e1bf 100644 --- a/lading/src/config.rs +++ b/lading/src/config.rs @@ -73,6 +73,8 @@ mod tests { use http::HeaderMap; + use crate::block; + use super::*; #[test] @@ -114,7 +116,8 @@ blackhole: 8_f64, byte_unit::ByteUnit::MB ) - .unwrap() + .unwrap(), + block_cache_method: block::CacheMethod::Fixed, }, headers: HeaderMap::default(), bytes_per_second: byte_unit::Byte::from_unit( diff --git a/lading/src/generator.rs b/lading/src/generator.rs index 6c7cd8560..8e4f15f30 100644 --- a/lading/src/generator.rs +++ b/lading/src/generator.rs @@ -26,29 +26,39 @@ pub mod udp; pub mod unix_datagram; pub mod unix_stream; -#[derive(Debug)] +#[derive(thiserror::Error, Debug)] /// Errors produced by [`Server`]. pub enum Error { /// See [`crate::generator::tcp::Error`] for details. - Tcp(tcp::Error), + #[error(transparent)] + Tcp(#[from] tcp::Error), /// See [`crate::generator::udp::Error`] for details. - Udp(udp::Error), + #[error(transparent)] + Udp(#[from] udp::Error), /// See [`crate::generator::http::Error`] for details. - Http(http::Error), + #[error(transparent)] + Http(#[from] http::Error), /// See [`crate::generator::splunk_hec::Error`] for details. - SplunkHec(splunk_hec::Error), + #[error(transparent)] + SplunkHec(#[from] splunk_hec::Error), /// See [`crate::generator::file_gen::Error`] for details. - FileGen(file_gen::Error), + #[error(transparent)] + FileGen(#[from] file_gen::Error), /// See [`crate::generator::file_tree::Error`] for details. - FileTree(file_tree::Error), + #[error(transparent)] + FileTree(#[from] file_tree::Error), /// See [`crate::generator::grpc::Error`] for details. - Grpc(grpc::Error), + #[error(transparent)] + Grpc(#[from] grpc::Error), /// See [`crate::generator::unix_stream::Error`] for details. - UnixStream(unix_stream::Error), + #[error(transparent)] + UnixStream(#[from] unix_stream::Error), /// See [`crate::generator::unix_datagram::Error`] for details. - UnixDatagram(unix_datagram::Error), + #[error(transparent)] + UnixDatagram(#[from] unix_datagram::Error), /// See [`crate::generator::process_tree::Error`] for details. - ProcessTree(process_tree::Error), + #[error(transparent)] + ProcessTree(#[from] process_tree::Error), } #[derive(Debug, Deserialize, PartialEq)] @@ -137,39 +147,30 @@ impl Server { /// signals error. pub fn new(config: Config, shutdown: Shutdown) -> Result { let srv = match config.inner { - Inner::Tcp(conf) => { - Self::Tcp(tcp::Tcp::new(config.general, &conf, shutdown).map_err(Error::Tcp)?) - } - Inner::Udp(conf) => { - Self::Udp(udp::Udp::new(config.general, &conf, shutdown).map_err(Error::Udp)?) - } - Inner::Http(conf) => { - Self::Http(http::Http::new(config.general, conf, shutdown).map_err(Error::Http)?) + Inner::Tcp(conf) => Self::Tcp(tcp::Tcp::new(config.general, &conf, shutdown)?), + Inner::Udp(conf) => Self::Udp(udp::Udp::new(config.general, &conf, shutdown)?), + Inner::Http(conf) => Self::Http(http::Http::new(config.general, conf, shutdown)?), + Inner::SplunkHec(conf) => { + Self::SplunkHec(splunk_hec::SplunkHec::new(config.general, conf, shutdown)?) } - Inner::SplunkHec(conf) => Self::SplunkHec( - splunk_hec::SplunkHec::new(config.general, conf, shutdown) - .map_err(Error::SplunkHec)?, - ), - Inner::FileGen(conf) => Self::FileGen( - file_gen::FileGen::new(config.general, conf, shutdown).map_err(Error::FileGen)?, - ), - Inner::FileTree(conf) => { - Self::FileTree(file_tree::FileTree::new(&conf, shutdown).map_err(Error::FileTree)?) + Inner::FileGen(conf) => { + Self::FileGen(file_gen::FileGen::new(config.general, conf, shutdown)?) } - Inner::Grpc(conf) => { - Self::Grpc(grpc::Grpc::new(config.general, conf, shutdown).map_err(Error::Grpc)?) + Inner::FileTree(conf) => Self::FileTree(file_tree::FileTree::new(&conf, shutdown)?), + Inner::Grpc(conf) => Self::Grpc(grpc::Grpc::new(config.general, conf, shutdown)?), + Inner::UnixStream(conf) => Self::UnixStream(unix_stream::UnixStream::new( + config.general, + conf, + shutdown, + )?), + Inner::UnixDatagram(conf) => Self::UnixDatagram(unix_datagram::UnixDatagram::new( + config.general, + &conf, + shutdown, + )?), + Inner::ProcessTree(conf) => { + Self::ProcessTree(process_tree::ProcessTree::new(&conf, shutdown)?) } - Inner::UnixStream(conf) => Self::UnixStream( - unix_stream::UnixStream::new(config.general, conf, shutdown) - .map_err(Error::UnixStream)?, - ), - Inner::UnixDatagram(conf) => Self::UnixDatagram( - unix_datagram::UnixDatagram::new(config.general, &conf, shutdown) - .map_err(Error::UnixDatagram)?, - ), - Inner::ProcessTree(conf) => Self::ProcessTree( - process_tree::ProcessTree::new(&conf, shutdown).map_err(Error::ProcessTree)?, - ), }; Ok(srv) } @@ -190,22 +191,19 @@ impl Server { let _ = pid_snd.recv().await; drop(pid_snd); - let res = match self { - Server::Tcp(inner) => inner.spin().await.map_err(Error::Tcp), - Server::Udp(inner) => inner.spin().await.map_err(Error::Udp), - Server::Http(inner) => inner.spin().await.map_err(Error::Http), - Server::SplunkHec(inner) => inner.spin().await.map_err(Error::SplunkHec), - Server::FileGen(inner) => inner.spin().await.map_err(Error::FileGen), - Server::FileTree(inner) => inner.spin().await.map_err(Error::FileTree), - Server::Grpc(inner) => inner.spin().await.map_err(Error::Grpc), - Server::UnixStream(inner) => inner.spin().await.map_err(Error::UnixStream), - Server::UnixDatagram(inner) => inner.spin().await.map_err(Error::UnixDatagram), - Server::ProcessTree(inner) => inner.spin().await.map_err(Error::ProcessTree), + match self { + Server::Tcp(inner) => inner.spin().await?, + Server::Udp(inner) => inner.spin().await?, + Server::Http(inner) => inner.spin().await?, + Server::SplunkHec(inner) => inner.spin().await?, + Server::FileGen(inner) => inner.spin().await?, + Server::FileTree(inner) => inner.spin().await?, + Server::Grpc(inner) => inner.spin().await?, + Server::UnixStream(inner) => inner.spin().await?, + Server::UnixDatagram(inner) => inner.spin().await?, + Server::ProcessTree(inner) => inner.spin().await?, }; - if let Err(e) = &res { - error!("Generator error: {:?}", e); - } - res + Ok(()) } } diff --git a/lading/src/generator/file_gen.rs b/lading/src/generator/file_gen.rs index b6e3d0bc3..ee4bd68a5 100644 --- a/lading/src/generator/file_gen.rs +++ b/lading/src/generator/file_gen.rs @@ -20,6 +20,7 @@ use std::{ atomic::{AtomicU32, Ordering}, Arc, }, + thread, }; use byte_unit::{Byte, ByteUnit}; @@ -31,11 +32,16 @@ use serde::Deserialize; use tokio::{ fs, io::{AsyncWriteExt, BufWriter}, + sync::mpsc, task::{JoinError, JoinHandle}, }; use tracing::info; -use crate::{block, signals::Shutdown}; +use crate::{ + block::{self, Block}, + common::PeekableReceiver, + signals::Shutdown, +}; use super::General; @@ -87,6 +93,9 @@ pub struct Config { /// Defines the maximum internal cache of this log target. file_gen will /// pre-build its outputs up to the byte capacity specified here. maximum_prebuild_cache_size_bytes: Byte, + /// Whether to use a fixed or streaming block cache + #[serde(default = "crate::block::default_cache_method")] + block_cache_method: block::CacheMethod, /// Determines whether the file generator mimics log rotation or not. If /// true, files will be rotated. If false, it is the responsibility of /// tailing software to remove old files. @@ -154,32 +163,33 @@ impl FileGen { let maximum_bytes_per_file = NonZeroU32::new(config.maximum_bytes_per_file.get_bytes() as u32).unwrap(); - let labels = vec![ - ("component".to_string(), "generator".to_string()), - ("component_name".to_string(), "file_gen".to_string()), - ]; - let mut handles = Vec::new(); let file_index = Arc::new(AtomicU32::new(0)); - let block_cache = block::Cache::fixed( - &mut rng, - NonZeroUsize::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as usize) - .expect("bytes must be non-zero"), - &block_sizes, - &config.variant, - &labels, - )?; - let block_cache = Arc::new(block_cache); for _ in 0..config.duplicates { let throttle = Throttle::new_with_config(config.throttle, bytes_per_second); + let total_bytes = + NonZeroUsize::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as usize) + .expect("bytes must be non-zero"); + let block_cache = match config.block_cache_method { + block::CacheMethod::Streaming => block::Cache::stream( + config.seed, + total_bytes, + &block_sizes, + config.variant.clone(), + )?, + block::CacheMethod::Fixed => { + block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &config.variant)? + } + }; + let child = Child { path_template: config.path_template.clone(), maximum_bytes_per_file, bytes_per_second, throttle, - block_cache: Arc::clone(&block_cache), + block_cache, file_index: Arc::clone(&file_index), rotate: config.rotate, shutdown: shutdown.clone(), @@ -222,7 +232,7 @@ struct Child { maximum_bytes_per_file: NonZeroU32, bytes_per_second: NonZeroU32, throttle: Throttle, - block_cache: Arc, + block_cache: block::Cache, rotate: bool, file_index: Arc, shutdown: Shutdown, @@ -247,21 +257,25 @@ impl Child { .await?, ); - let mut idx = 0; + // Move the block_cache into an OS thread, exposing a channel between it + // and this async context. let block_cache = self.block_cache; + let (snd, rcv) = mpsc::channel(1024); + let mut rcv: PeekableReceiver = PeekableReceiver::new(rcv); + thread::Builder::new().spawn(|| block_cache.spin(snd))?; let bytes_written = register_counter!("bytes_written"); loop { - let block = &block_cache.at_idx(idx); - let total_bytes = block.total_bytes; + let blk = rcv.peek().await.unwrap(); + let total_bytes = blk.total_bytes; tokio::select! { _ = self.throttle.wait_for(total_bytes) => { - idx = (idx + 1) % block_cache.len(); + let blk = rcv.next().await.unwrap(); // actually advance through the blocks let total_bytes = u64::from(total_bytes.get()); { - fp.write_all(&block.bytes).await?; + fp.write_all(&blk.bytes).await?; bytes_written.increment(total_bytes); total_bytes_written += total_bytes; } diff --git a/lading/src/generator/grpc.rs b/lading/src/generator/grpc.rs index 0f8c8cce0..140073273 100644 --- a/lading/src/generator/grpc.rs +++ b/lading/src/generator/grpc.rs @@ -15,6 +15,7 @@ use std::{ convert::TryFrom, num::{NonZeroU32, NonZeroUsize}, + thread, time::Duration, }; @@ -25,13 +26,18 @@ use metrics::{counter, gauge, register_counter}; use rand::rngs::StdRng; use rand::SeedableRng; use serde::Deserialize; +use tokio::sync::mpsc; use tonic::{ codec::{DecodeBuf, Decoder, EncodeBuf, Encoder}, Request, Response, Status, }; use tracing::{debug, info}; -use crate::{block, signals::Shutdown}; +use crate::{ + block::{self, Block}, + common::PeekableReceiver, + signals::Shutdown, +}; use super::General; @@ -47,6 +53,9 @@ pub enum Error { /// Creation of payload blocks failed. #[error("Block creation error: {0}")] Block(#[from] block::Error), + /// IO error + #[error(transparent)] + Io(#[from] std::io::Error), } /// Config for [`Grpc`] @@ -65,6 +74,9 @@ pub struct Config { pub block_sizes: Option>, /// The maximum size in bytes of the cache of prebuilt messages pub maximum_prebuild_cache_size_bytes: byte_unit::Byte, + /// Whether to use a fixed or streaming block cache + #[serde(default = "crate::block::default_cache_method")] + pub block_cache_method: block::CacheMethod, /// The total number of parallel connections to maintain pub parallel_connections: u16, /// The load throttle configuration @@ -183,14 +195,20 @@ impl Grpc { &labels ); - let block_cache = block::Cache::fixed( - &mut rng, + let total_bytes = NonZeroUsize::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as usize) - .expect("bytes must be non-zero"), - &block_sizes, - &config.variant, - &labels, - )?; + .expect("bytes must be non-zero"); + let block_cache = match config.block_cache_method { + block::CacheMethod::Streaming => block::Cache::stream( + config.seed, + total_bytes, + &block_sizes, + config.variant.clone(), + )?, + block::CacheMethod::Fixed => { + block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &config.variant)? + } + }; let target_uri = http::uri::Uri::try_from(config.target_uri.clone()).expect("target_uri must be valid"); @@ -264,7 +282,12 @@ impl Grpc { tokio::time::sleep(Duration::from_millis(100)).await; }; - let mut block_cache = self.block_cache; + // Move the block_cache into an OS thread, exposing a channel between it + // and this async context. + let block_cache = self.block_cache; + let (snd, rcv) = mpsc::channel(1024); + let mut rcv: PeekableReceiver = PeekableReceiver::new(rcv); + thread::Builder::new().spawn(|| block_cache.spin(snd))?; let rpc_path = self.rpc_path; let requests_sent = register_counter!("requests_sent", &self.metric_labels); @@ -273,14 +296,14 @@ impl Grpc { let response_bytes = register_counter!("response_bytes", &self.metric_labels); loop { - let blk = block_cache.peek().unwrap(); + let blk = rcv.peek().await.unwrap(); let total_bytes = blk.total_bytes; tokio::select! { _ = self.throttle.wait_for(total_bytes) => { let block_length = blk.bytes.len(); requests_sent.increment(1); - let blk = block_cache.next().unwrap(); // actually advance through the blocks + let blk = rcv.next().await.unwrap(); // actually advance through the blocks let res = Self::req( &mut client, rpc_path.clone(), diff --git a/lading/src/generator/http.rs b/lading/src/generator/http.rs index 8b6538a91..2622ceecf 100644 --- a/lading/src/generator/http.rs +++ b/lading/src/generator/http.rs @@ -11,7 +11,10 @@ //! Additional metrics may be emitted by this generator's [throttle]. //! -use std::num::{NonZeroU32, NonZeroUsize}; +use std::{ + num::{NonZeroU32, NonZeroUsize}, + thread, +}; use byte_unit::{Byte, ByteUnit}; use hyper::{ @@ -24,10 +27,14 @@ use metrics::{counter, gauge}; use once_cell::sync::OnceCell; use rand::{prelude::StdRng, SeedableRng}; use serde::Deserialize; -use tokio::sync::Semaphore; +use tokio::sync::{mpsc, Semaphore}; use tracing::info; -use crate::{block, signals::Shutdown}; +use crate::{ + block::{self, Block}, + common::PeekableReceiver, + signals::Shutdown, +}; use super::General; @@ -43,6 +50,9 @@ pub enum Method { variant: lading_payload::Config, /// The maximum size in bytes of the cache of prebuilt messages maximum_prebuild_cache_size_bytes: byte_unit::Byte, + /// Whether to use a fixed or streaming block cache + #[serde(default = "crate::block::default_cache_method")] + block_cache_method: block::CacheMethod, }, } @@ -151,15 +161,22 @@ impl Http { Method::Post { variant, maximum_prebuild_cache_size_bytes, + block_cache_method, } => { - let block_cache = block::Cache::fixed( - &mut rng, + let total_bytes = NonZeroUsize::new(maximum_prebuild_cache_size_bytes.get_bytes() as usize) - .expect("bytes must be non-zero"), - &block_sizes, - &variant, - &labels, - )?; + .expect("bytes must be non-zero"); + let block_cache = match block_cache_method { + block::CacheMethod::Streaming => block::Cache::stream( + config.seed, + total_bytes, + &block_sizes, + variant.clone(), + )?, + block::CacheMethod::Fixed => { + block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &variant)? + } + }; CONNECTION_SEMAPHORE .set(Semaphore::new(config.parallel_connections as usize)) @@ -199,10 +216,15 @@ impl Http { let uri = self.uri; let labels = self.metric_labels; - let mut block_cache = self.block_cache; + // Move the block_cache into an OS thread, exposing a channel between it + // and this async context. + let block_cache = self.block_cache; + let (snd, rcv) = mpsc::channel(1024); + let mut rcv: PeekableReceiver = PeekableReceiver::new(rcv); + thread::Builder::new().spawn(|| block_cache.spin(snd))?; loop { - let blk = block_cache.next().unwrap(); + let blk = rcv.next().await.unwrap(); let total_bytes = blk.total_bytes; let body = Body::from(blk.bytes.clone()); diff --git a/lading/src/generator/splunk_hec.rs b/lading/src/generator/splunk_hec.rs index 59d237668..234e3dd2c 100644 --- a/lading/src/generator/splunk_hec.rs +++ b/lading/src/generator/splunk_hec.rs @@ -18,6 +18,7 @@ mod acknowledgements; use std::{ num::{NonZeroU32, NonZeroUsize}, + thread, time::Duration, }; @@ -34,12 +35,17 @@ use once_cell::sync::OnceCell; use rand::{prelude::StdRng, SeedableRng}; use serde::Deserialize; use tokio::{ - sync::{Semaphore, SemaphorePermit}, + sync::{mpsc, Semaphore, SemaphorePermit}, time::timeout, }; use tracing::info; -use crate::{block, generator::splunk_hec::acknowledgements::Channel, signals::Shutdown}; +use crate::{ + block::{self, Block}, + common::PeekableReceiver, + generator::splunk_hec::acknowledgements::Channel, + signals::Shutdown, +}; use super::General; @@ -75,6 +81,9 @@ pub struct Config { pub acknowledgements: Option, /// The maximum size in bytes of the cache of prebuilt messages pub maximum_prebuild_cache_size_bytes: byte_unit::Byte, + /// Whether to use a fixed or streaming block cache + #[serde(default = "crate::block::default_cache_method")] + pub block_cache_method: block::CacheMethod, /// The bytes per second to send or receive from the target pub bytes_per_second: byte_unit::Byte, /// The block sizes for messages to this target @@ -86,7 +95,7 @@ pub struct Config { pub throttle: lading_throttle::Config, } -#[derive(thiserror::Error, Debug, Clone, Copy)] +#[derive(thiserror::Error, Debug)] /// Errors produced by [`SplunkHec`]. pub enum Error { /// User supplied HEC path is invalid. @@ -98,6 +107,9 @@ pub enum Error { /// Creation of payload blocks failed. #[error("Block creation error: {0}")] Block(#[from] block::Error), + /// IO error + #[error(transparent)] + Io(#[from] std::io::Error), } /// Defines a task that emits variant lines to a Splunk HEC server controlling @@ -179,14 +191,17 @@ impl SplunkHec { let payload_config = lading_payload::Config::SplunkHec { encoding: config.format, }; - let block_cache = block::Cache::fixed( - &mut rng, + let total_bytes = NonZeroUsize::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as usize) - .expect("bytes must be non-zero"), - &block_sizes, - &payload_config, - &labels, - )?; + .expect("bytes must be non-zero"); + let block_cache = match config.block_cache_method { + block::CacheMethod::Streaming => { + block::Cache::stream(config.seed, total_bytes, &block_sizes, payload_config)? + } + block::CacheMethod::Fixed => { + block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &payload_config)? + } + }; let mut channels = Channels::new(config.parallel_connections); if let Some(ack_settings) = config.acknowledgements { @@ -241,12 +256,17 @@ impl SplunkHec { f64::from(self.parallel_connections), &labels ); - let mut block_cache = self.block_cache; + // Move the block_cache into an OS thread, exposing a channel between it + // and this async context. + let block_cache = self.block_cache; + let (snd, rcv) = mpsc::channel(1024); + let mut rcv: PeekableReceiver = PeekableReceiver::new(rcv); + thread::Builder::new().spawn(|| block_cache.spin(snd))?; let mut channels = self.channels.iter().cycle(); loop { let channel: Channel = channels.next().unwrap().clone(); - let blk = block_cache.peek().unwrap(); + let blk = rcv.peek().await.unwrap(); let total_bytes = blk.total_bytes; tokio::select! { @@ -255,7 +275,7 @@ impl SplunkHec { let labels = labels.clone(); let uri = uri.clone(); - let blk = block_cache.next().unwrap(); // actually advance through the blocks + let blk = rcv.next().await.unwrap(); // actually advance through the blocks let body = Body::from(blk.bytes.clone()); let block_length = blk.bytes.len(); diff --git a/lading/src/generator/tcp.rs b/lading/src/generator/tcp.rs index b33f79999..cd2b5aa49 100644 --- a/lading/src/generator/tcp.rs +++ b/lading/src/generator/tcp.rs @@ -14,6 +14,7 @@ use std::{ net::{SocketAddr, ToSocketAddrs}, num::{NonZeroU32, NonZeroUsize}, + thread, }; use byte_unit::{Byte, ByteUnit}; @@ -21,10 +22,14 @@ use lading_throttle::Throttle; use metrics::{counter, gauge, register_counter}; use rand::{rngs::StdRng, SeedableRng}; use serde::Deserialize; -use tokio::{io::AsyncWriteExt, net::TcpStream}; +use tokio::{io::AsyncWriteExt, net::TcpStream, sync::mpsc}; use tracing::{info, trace}; -use crate::{block, signals::Shutdown}; +use crate::{ + block::{self, Block}, + common::PeekableReceiver, + signals::Shutdown, +}; use super::General; @@ -48,12 +53,15 @@ pub struct Config { pub throttle: lading_throttle::Config, } -#[derive(thiserror::Error, Debug, Clone, Copy)] +#[derive(thiserror::Error, Debug)] /// Errors produced by [`Tcp`]. pub enum Error { /// Creation of payload blocks failed. #[error("Block creation error: {0}")] Block(#[from] block::Error), + /// IO error + #[error(transparent)] + Io(#[from] std::io::Error), } #[derive(Debug)] @@ -121,7 +129,6 @@ impl Tcp { .expect("bytes must be non-zero"), &block_sizes, &config.variant, - &labels, )?; let addr = config @@ -150,13 +157,18 @@ impl Tcp { /// Function will panic if underlying byte capacity is not available. pub async fn spin(mut self) -> Result<(), Error> { let mut connection = None; - let mut block_cache = self.block_cache; + // Move the block_cache into an OS thread, exposing a channel between it + // and this async context. + let block_cache = self.block_cache; + let (snd, rcv) = mpsc::channel(1024); + let mut rcv: PeekableReceiver = PeekableReceiver::new(rcv); + thread::Builder::new().spawn(|| block_cache.spin(snd))?; let bytes_written = register_counter!("bytes_written", &self.metric_labels); let packets_sent = register_counter!("packets_sent", &self.metric_labels); loop { - let blk = block_cache.peek().unwrap(); + let blk = rcv.peek().await.unwrap(); let total_bytes = blk.total_bytes; tokio::select! { @@ -177,7 +189,7 @@ impl Tcp { } _ = self.throttle.wait_for(total_bytes), if connection.is_some() => { let mut client = connection.unwrap(); - let blk = block_cache.next().unwrap(); // actually advance through the blocks + let blk = rcv.next().await.unwrap(); // actually advance through the blocks match client.write_all(&blk.bytes).await { Ok(()) => { bytes_written.increment(u64::from(blk.total_bytes.get())); diff --git a/lading/src/generator/udp.rs b/lading/src/generator/udp.rs index 5af7d5caa..280aeb1a7 100644 --- a/lading/src/generator/udp.rs +++ b/lading/src/generator/udp.rs @@ -14,6 +14,7 @@ use std::{ net::{SocketAddr, ToSocketAddrs}, num::{NonZeroU32, NonZeroUsize}, + thread, time::Duration, }; @@ -22,10 +23,14 @@ use lading_throttle::Throttle; use metrics::{counter, gauge, register_counter}; use rand::{rngs::StdRng, SeedableRng}; use serde::Deserialize; -use tokio::net::UdpSocket; +use tokio::{net::UdpSocket, sync::mpsc}; use tracing::{debug, info, trace}; -use crate::{block, signals::Shutdown}; +use crate::{ + block::{self, Block}, + common::PeekableReceiver, + signals::Shutdown, +}; use super::General; @@ -125,7 +130,6 @@ impl Udp { .expect("bytes must be non-zero"), &block_sizes, &config.variant, - &labels, )?; let addr = config @@ -156,13 +160,19 @@ impl Udp { pub async fn spin(mut self) -> Result<(), Error> { debug!("UDP generator running"); let mut connection = Option::::None; - let mut block_cache = self.block_cache; + + // Move the block_cache into an OS thread, exposing a channel between it + // and this async context. + let block_cache = self.block_cache; + let (snd, rcv) = mpsc::channel(1024); + let mut rcv: PeekableReceiver = PeekableReceiver::new(rcv); + thread::Builder::new().spawn(|| block_cache.spin(snd))?; let bytes_written = register_counter!("bytes_written", &self.metric_labels); let packets_sent = register_counter!("packets_sent", &self.metric_labels); loop { - let blk = block_cache.peek().unwrap(); + let blk = rcv.peek().await.unwrap(); let total_bytes = blk.total_bytes; assert!( total_bytes.get() <= 65507, @@ -188,7 +198,7 @@ impl Udp { } _ = self.throttle.wait_for(total_bytes), if connection.is_some() => { let sock = connection.unwrap(); - let blk = block_cache.next().unwrap(); // actually advance through the blocks + let blk = rcv.next().await.unwrap(); // actually advance through the blocks match sock.send_to(&blk.bytes, self.addr).await { Ok(bytes) => { bytes_written.increment(bytes as u64); diff --git a/lading/src/generator/unix_datagram.rs b/lading/src/generator/unix_datagram.rs index 25cbdc858..bf901b0fd 100644 --- a/lading/src/generator/unix_datagram.rs +++ b/lading/src/generator/unix_datagram.rs @@ -11,7 +11,11 @@ //! Additional metrics may be emitted by this generator's [throttle]. //! -use crate::{block, signals::Shutdown}; +use crate::{ + block::{self, Block}, + common::PeekableReceiver, + signals::Shutdown, +}; use byte_unit::{Byte, ByteUnit}; use futures::future::join_all; use lading_throttle::Throttle; @@ -21,11 +25,11 @@ use serde::Deserialize; use std::{ num::{NonZeroU32, NonZeroUsize}, path::PathBuf, - sync::Arc, + thread, }; use tokio::{ net, - sync::broadcast::Receiver, + sync::{broadcast::Receiver, mpsc}, task::{JoinError, JoinHandle}, }; use tracing::{debug, error, info}; @@ -51,6 +55,9 @@ pub struct Config { pub block_sizes: Option>, /// The maximum size in bytes of the cache of prebuilt messages pub maximum_prebuild_cache_size_bytes: byte_unit::Byte, + /// Whether to use a fixed or streaming block cache + #[serde(default = "crate::block::default_cache_method")] + pub block_cache_method: block::CacheMethod, /// The total number of parallel connections to maintain #[serde(default = "default_parallel_connections")] pub parallel_connections: u16, @@ -141,21 +148,27 @@ impl UnixDatagram { ); let (startup, _startup_rx) = tokio::sync::broadcast::channel(1); - let block_cache = block::Cache::fixed( - &mut rng, - NonZeroUsize::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as usize) - .expect("bytes must be non-zero"), - &block_sizes, - &config.variant, - &labels, - )?; - let block_cache = Arc::new(block_cache); let mut handles = Vec::new(); for _ in 0..config.parallel_connections { + let total_bytes = + NonZeroUsize::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as usize) + .expect("bytes must be non-zero"); + let block_cache = match config.block_cache_method { + block::CacheMethod::Streaming => block::Cache::stream( + config.seed, + total_bytes, + &block_sizes, + config.variant.clone(), + )?, + block::CacheMethod::Fixed => { + block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &config.variant)? + } + }; + let child = Child { path: config.path.clone(), - block_cache: Arc::clone(&block_cache), + block_cache, throttle: Throttle::new_with_config(config.throttle, bytes_per_second), metric_labels: labels.clone(), shutdown: shutdown.clone(), @@ -199,7 +212,7 @@ impl UnixDatagram { struct Child { path: PathBuf, throttle: Throttle, - block_cache: Arc, + block_cache: block::Cache, metric_labels: Vec<(String, String)>, shutdown: Shutdown, } @@ -229,13 +242,17 @@ impl Child { } } - let mut idx = 0; + // Move the block_cache into an OS thread, exposing a channel between it + // and this async context. let block_cache = self.block_cache; + let (snd, rcv) = mpsc::channel(1024); + let mut rcv: PeekableReceiver = PeekableReceiver::new(rcv); + thread::Builder::new().spawn(|| block_cache.spin(snd))?; let bytes_written = register_counter!("bytes_written", &self.metric_labels); let packets_sent = register_counter!("packets_sent", &self.metric_labels); loop { - let blk = &block_cache.at_idx(idx); + let blk = rcv.peek().await.unwrap(); let total_bytes = blk.total_bytes; tokio::select! { @@ -244,8 +261,7 @@ impl Child { // some of the written bytes make it through in which case we // must cycle back around and try to write the remainder of the // buffer. - idx = (idx + 1) % block_cache.len(); - let blk = &block_cache.at_idx(idx); + let blk = rcv.next().await.unwrap(); // actually advance through the blocks let blk_max: usize = total_bytes.get() as usize; let mut blk_offset = 0; while blk_offset < blk_max { diff --git a/lading/src/generator/unix_stream.rs b/lading/src/generator/unix_stream.rs index ed3adb2c6..1aded9cb5 100644 --- a/lading/src/generator/unix_stream.rs +++ b/lading/src/generator/unix_stream.rs @@ -11,7 +11,11 @@ //! Additional metrics may be emitted by this generator's [throttle]. //! -use crate::{block, signals::Shutdown}; +use crate::{ + block::{self, Block}, + common::PeekableReceiver, + signals::Shutdown, +}; use byte_unit::{Byte, ByteUnit}; use lading_throttle::Throttle; use metrics::{counter, gauge, register_counter}; @@ -20,8 +24,9 @@ use serde::Deserialize; use std::{ num::{NonZeroU32, NonZeroUsize}, path::PathBuf, + thread, }; -use tokio::{net, task::JoinError}; +use tokio::{net, sync::mpsc, task::JoinError}; use tracing::{debug, error, info}; use super::General; @@ -41,6 +46,9 @@ pub struct Config { pub block_sizes: Option>, /// The maximum size in bytes of the cache of prebuilt messages pub maximum_prebuild_cache_size_bytes: byte_unit::Byte, + /// Whether to use a fixed or streaming block cache + #[serde(default = "crate::block::default_cache_method")] + pub block_cache_method: block::CacheMethod, /// The load throttle configuration #[serde(default)] pub throttle: lading_throttle::Config, @@ -120,14 +128,20 @@ impl UnixStream { &labels ); - let block_cache = block::Cache::fixed( - &mut rng, + let total_bytes = NonZeroUsize::new(config.maximum_prebuild_cache_size_bytes.get_bytes() as usize) - .expect("bytes must be non-zero"), - &block_sizes, - &config.variant, - &labels, - )?; + .expect("bytes must be non-zero"); + let block_cache = match config.block_cache_method { + block::CacheMethod::Streaming => block::Cache::stream( + config.seed, + total_bytes, + &block_sizes, + config.variant.clone(), + )?, + block::CacheMethod::Fixed => { + block::Cache::fixed(&mut rng, total_bytes, &block_sizes, &config.variant)? + } + }; Ok(Self { path: config.path, @@ -149,14 +163,20 @@ impl UnixStream { /// Function will panic if underlying byte capacity is not available. pub async fn spin(mut self) -> Result<(), Error> { debug!("UnixStream generator running"); - let mut block_cache = self.block_cache; + + // Move the block_cache into an OS thread, exposing a channel between it + // and this async context. + let block_cache = self.block_cache; + let (snd, rcv) = mpsc::channel(1024); + let mut rcv: PeekableReceiver = PeekableReceiver::new(rcv); + thread::Builder::new().spawn(|| block_cache.spin(snd))?; let mut unix_stream = Option::::None; let bytes_written = register_counter!("bytes_written", &self.metric_labels); let packets_sent = register_counter!("packets_sent", &self.metric_labels); loop { - let blk = block_cache.peek().unwrap(); + let blk = rcv.peek().await.unwrap(); let total_bytes = blk.total_bytes; tokio::select! { @@ -182,7 +202,7 @@ impl UnixStream { // buffer. let blk_max: usize = total_bytes.get() as usize; let mut blk_offset = 0; - let blk = block_cache.next().unwrap(); // advance to the block that was previously peeked + let blk = rcv.next().await.unwrap(); // advance to the block that was previously peeked while blk_offset < blk_max { let stream = unix_stream.unwrap(); unix_stream = None; From 8c09706fb1631e2157185e29a077c6350c75ce1a Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Tue, 22 Aug 2023 11:11:26 -0700 Subject: [PATCH 2/2] v0.18.1-rc0 Signed-off-by: Brian L. Troutwine --- CHANGELOG.md | 7 +++++++ Cargo.lock | 2 +- lading/Cargo.toml | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 43bd433c1..ae4299f87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +## [0.18.1-rc0] +### Added +- `lading-payload` crate is now split out from the `lading` crate +### Changed +- The block mechanism is reworked to provide a 'fixed' and 'streaming' model, + running in a separate OS thread from the tokio runtime. + ## [0.18.0] ### Changed - The predictive throttle no longer exists. The only options are stable and diff --git a/Cargo.lock b/Cargo.lock index 454981a0c..452dec35a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1010,7 +1010,7 @@ dependencies = [ [[package]] name = "lading" -version = "0.18.0" +version = "0.18.1-rc0" dependencies = [ "async-pidfd", "byte-unit", diff --git a/lading/Cargo.toml b/lading/Cargo.toml index fc3e04904..e17ae6140 100644 --- a/lading/Cargo.toml +++ b/lading/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lading" -version = "0.18.0" +version = "0.18.1-rc0" authors = ["Brian L. Troutwine ", "George Hahn