diff --git a/docs/src/transforms.md b/docs/src/transforms.md index 7d77dc17b..7fbf4001d 100644 --- a/docs/src/transforms.md +++ b/docs/src/transforms.md @@ -45,7 +45,6 @@ Future transforms won't be added to the public API while in alpha. But in these | [RedisSinkSingle](#redissinksingle) | ✅ | Beta | | [Tee](#tee) | ✅ | Alpha | | [RequestThrottling](#requestthrottling) |❌ | Alpha | - ### CassandraSinkCluster @@ -225,21 +224,6 @@ This transform will drop any messages it receives and return the supplied respon # Fail ``` - - ### KafkaSinkCluster This transform will route kafka messages to a broker within a Kafka cluster: diff --git a/shotover/src/transforms/cassandra/sink_cluster/mod.rs b/shotover/src/transforms/cassandra/sink_cluster/mod.rs index b16a12c67..b56200e99 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/mod.rs @@ -41,8 +41,8 @@ mod test_router; mod token_ring; pub mod topology; -pub type KeyspaceChanTx = watch::Sender>; -pub type KeyspaceChanRx = watch::Receiver>; +type KeyspaceChanTx = watch::Sender>; +type KeyspaceChanRx = watch::Receiver>; const SYSTEM_KEYSPACES: [IdentifierRef<'static>; 3] = [ IdentifierRef::Quoted("system"), @@ -106,7 +106,7 @@ impl TransformConfig for CassandraSinkClusterConfig { } } -pub struct CassandraSinkClusterBuilder { +struct CassandraSinkClusterBuilder { contact_points: Vec, connection_factory: ConnectionFactory, failed_requests: Counter, @@ -118,7 +118,7 @@ pub struct CassandraSinkClusterBuilder { } impl CassandraSinkClusterBuilder { - pub fn new( + fn new( contact_points: Vec, shotover_peers: Vec, chain_name: String, @@ -205,7 +205,7 @@ pub struct ShotoverNode { pub host_id: Uuid, } -pub struct CassandraSinkCluster { +struct CassandraSinkCluster { contact_points: Vec, connection_factory: ConnectionFactory, diff --git a/shotover/src/transforms/cassandra/sink_cluster/murmur.rs b/shotover/src/transforms/cassandra/sink_cluster/murmur.rs index 28011670f..037bea2e9 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/murmur.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/murmur.rs @@ -1,4 +1,4 @@ -//! Taken from https://github.com/scylladb/scylla-rust-driver/blob/4a4fd0e5e785031956f560ecf22cb8653eea122b/scylla/src/routing.rs +//! Taken from //! We cant import it as that would bring the openssl dependency into shotover. use bytes::Buf; diff --git a/shotover/src/transforms/cassandra/sink_single.rs b/shotover/src/transforms/cassandra/sink_single.rs index 9c3c4285e..a60d87de6 100644 --- a/shotover/src/transforms/cassandra/sink_single.rs +++ b/shotover/src/transforms/cassandra/sink_single.rs @@ -56,7 +56,7 @@ impl TransformConfig for CassandraSinkSingleConfig { } } -pub struct CassandraSinkSingleBuilder { +struct CassandraSinkSingleBuilder { version: Option, address: String, failed_requests: Counter, @@ -67,7 +67,7 @@ pub struct CassandraSinkSingleBuilder { } impl CassandraSinkSingleBuilder { - pub fn new( + fn new( address: String, chain_name: String, tls: Option, @@ -115,7 +115,7 @@ impl TransformBuilder for CassandraSinkSingleBuilder { } } -pub struct CassandraSinkSingle { +struct CassandraSinkSingle { version: Option, address: String, connection: Option, diff --git a/shotover/src/transforms/coalesce.rs b/shotover/src/transforms/coalesce.rs index 570591543..13ad7c87e 100644 --- a/shotover/src/transforms/coalesce.rs +++ b/shotover/src/transforms/coalesce.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use std::time::Instant; #[derive(Clone)] -pub struct Coalesce { +struct Coalesce { flush_when_buffered_message_count: Option, flush_when_millis_since_last_flush: Option, buffer: Messages, diff --git a/shotover/src/transforms/debug/force_parse.rs b/shotover/src/transforms/debug/force_parse.rs index bc7a72a2f..0abb4a92f 100644 --- a/shotover/src/transforms/debug/force_parse.rs +++ b/shotover/src/transforms/debug/force_parse.rs @@ -5,12 +5,9 @@ use crate::message::Messages; /// The use of this transform is to allow benchmarking the performance impact of parsing messages /// without worrying about the performance impact of other transform logic. /// It could also be used to ensure that messages round trip correctly when parsed. -#[cfg(feature = "alpha-transforms")] use crate::transforms::TransformConfig; use crate::transforms::TransformContextBuilder; -#[cfg(feature = "alpha-transforms")] use crate::transforms::TransformContextConfig; -#[cfg(feature = "alpha-transforms")] use crate::transforms::{DownChainProtocol, UpChainProtocol}; use crate::transforms::{Transform, TransformBuilder, Wrapper}; use anyhow::Result; @@ -26,7 +23,6 @@ pub struct DebugForceParseConfig { pub parse_responses: bool, } -#[cfg(feature = "alpha-transforms")] #[typetag::serde(name = "DebugForceParse")] #[async_trait(?Send)] impl TransformConfig for DebugForceParseConfig { @@ -61,7 +57,6 @@ pub struct DebugForceEncodeConfig { } const NAME: &str = "DebugForceEncode"; -#[cfg(feature = "alpha-transforms")] #[typetag::serde(name = "DebugForceEncode")] #[async_trait(?Send)] impl TransformConfig for DebugForceEncodeConfig { @@ -87,7 +82,7 @@ impl TransformConfig for DebugForceEncodeConfig { } #[derive(Clone)] -pub struct DebugForceParse { +struct DebugForceParse { parse_requests: bool, parse_responses: bool, encode_requests: bool, diff --git a/shotover/src/transforms/debug/log_to_file.rs b/shotover/src/transforms/debug/log_to_file.rs index f7e77e61a..3f98a7c13 100644 --- a/shotover/src/transforms/debug/log_to_file.rs +++ b/shotover/src/transforms/debug/log_to_file.rs @@ -15,7 +15,6 @@ use tracing::{error, info}; pub struct DebugLogToFileConfig; const NAME: &str = "DebugLogToFile"; -#[cfg(feature = "alpha-transforms")] #[typetag::serde(name = "DebugLogToFile")] #[async_trait(?Send)] impl crate::transforms::TransformConfig for DebugLogToFileConfig { @@ -40,7 +39,7 @@ impl crate::transforms::TransformConfig for DebugLogToFileConfig { } } -pub struct DebugLogToFileBuilder { +struct DebugLogToFileBuilder { connection_counter: Arc, } @@ -77,7 +76,7 @@ impl TransformBuilder for DebugLogToFileBuilder { } } -pub struct DebugLogToFile { +struct DebugLogToFile { request_counter: u64, response_counter: u64, requests: PathBuf, diff --git a/shotover/src/transforms/debug/mod.rs b/shotover/src/transforms/debug/mod.rs index ba1f1b13d..4fea8283b 100644 --- a/shotover/src/transforms/debug/mod.rs +++ b/shotover/src/transforms/debug/mod.rs @@ -1,5 +1,6 @@ +#[cfg(feature = "alpha-transforms")] pub mod force_parse; +#[cfg(feature = "alpha-transforms")] pub mod log_to_file; pub mod printer; -pub mod random_delay; pub mod returner; diff --git a/shotover/src/transforms/debug/printer.rs b/shotover/src/transforms/debug/printer.rs index ffd7aa2d9..a87152685 100644 --- a/shotover/src/transforms/debug/printer.rs +++ b/shotover/src/transforms/debug/printer.rs @@ -33,7 +33,7 @@ impl TransformConfig for DebugPrinterConfig { } #[derive(Clone)] -pub struct DebugPrinter { +pub(crate) struct DebugPrinter { counter: i32, } @@ -44,7 +44,7 @@ impl Default for DebugPrinter { } impl DebugPrinter { - pub fn new() -> DebugPrinter { + pub(crate) fn new() -> DebugPrinter { DebugPrinter { counter: 0 } } } diff --git a/shotover/src/transforms/debug/random_delay.rs b/shotover/src/transforms/debug/random_delay.rs deleted file mode 100644 index 9d9e6d574..000000000 --- a/shotover/src/transforms/debug/random_delay.rs +++ /dev/null @@ -1,31 +0,0 @@ -use crate::message::Messages; -use crate::transforms::{Transform, Wrapper}; -use anyhow::Result; -use async_trait::async_trait; -use rand_distr::Distribution; -use rand_distr::Normal; -use tokio::time::Duration; - -const NAME: &str = "DebugRandomDelay"; - -pub struct DebugRandomDelay { - pub delay: u64, - pub distribution: Option>, -} - -#[async_trait] -impl Transform for DebugRandomDelay { - fn get_name(&self) -> &'static str { - NAME - } - - async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { - let delay = if let Some(dist) = self.distribution { - Duration::from_millis(dist.sample(&mut rand::thread_rng()) as u64 + self.delay) - } else { - Duration::from_millis(self.delay) - }; - tokio::time::sleep(delay).await; - requests_wrapper.call_next_transform().await - } -} diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 6297b53a8..6788ad9aa 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -86,7 +86,7 @@ impl ShotoverNodeConfig { } #[derive(Clone)] -pub struct ShotoverNode { +struct ShotoverNode { pub address: KafkaAddress, pub rack: StrBytes, pub broker_id: BrokerId, @@ -142,7 +142,7 @@ impl TransformConfig for KafkaSinkClusterConfig { } } -pub struct KafkaSinkClusterBuilder { +struct KafkaSinkClusterBuilder { // contains address and port first_contact_points: Vec, shotover_nodes: Vec, @@ -265,7 +265,7 @@ impl AtomicBrokerId { } } -pub struct KafkaSinkCluster { +struct KafkaSinkCluster { first_contact_points: Vec, shotover_nodes: Vec, rack: StrBytes, diff --git a/shotover/src/transforms/kafka/sink_single.rs b/shotover/src/transforms/kafka/sink_single.rs index 25eabae01..067e0552d 100644 --- a/shotover/src/transforms/kafka/sink_single.rs +++ b/shotover/src/transforms/kafka/sink_single.rs @@ -54,7 +54,7 @@ impl TransformConfig for KafkaSinkSingleConfig { } } -pub struct KafkaSinkSingleBuilder { +struct KafkaSinkSingleBuilder { // contains address and port address_port: u16, connect_timeout: Duration, @@ -63,7 +63,7 @@ pub struct KafkaSinkSingleBuilder { } impl KafkaSinkSingleBuilder { - pub fn new( + fn new( address_port: u16, _chain_name: String, connect_timeout_ms: u64, @@ -102,7 +102,7 @@ impl TransformBuilder for KafkaSinkSingleBuilder { } } -pub struct KafkaSinkSingle { +struct KafkaSinkSingle { address_port: u16, connection: Option, connect_timeout: Duration, diff --git a/shotover/src/transforms/load_balance.rs b/shotover/src/transforms/load_balance.rs index a9a74ac03..0db552581 100644 --- a/shotover/src/transforms/load_balance.rs +++ b/shotover/src/transforms/load_balance.rs @@ -43,10 +43,10 @@ impl TransformConfig for ConnectionBalanceAndPoolConfig { } } -pub struct ConnectionBalanceAndPoolBuilder { - pub max_connections: usize, - pub all_connections: Arc>>, - pub chain_to_clone: Arc, +struct ConnectionBalanceAndPoolBuilder { + max_connections: usize, + all_connections: Arc>>, + chain_to_clone: Arc, } impl TransformBuilder for ConnectionBalanceAndPoolBuilder { @@ -71,7 +71,7 @@ impl TransformBuilder for ConnectionBalanceAndPoolBuilder { /// Every cloned instance of ConnectionBalanceAndPool will use a new connection until `max_connections` clones are made. /// Once this happens cloned instances will reuse connections from earlier clones. -pub struct ConnectionBalanceAndPool { +struct ConnectionBalanceAndPool { active_connection: Option, max_connections: usize, all_connections: Arc>>, diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index 00194b361..4fbdb46ca 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -24,17 +24,15 @@ pub mod filter; pub mod kafka; pub mod load_balance; pub mod loopback; -pub mod noop; pub mod null; #[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] pub mod opensearch; pub mod parallel_map; -#[cfg(feature = "cassandra")] +#[cfg(all(feature = "alpha-transforms", feature = "cassandra"))] pub mod protect; pub mod query_counter; #[cfg(feature = "redis")] pub mod redis; -pub mod sampler; pub mod tee; #[cfg(feature = "cassandra")] pub mod throttling; @@ -265,8 +263,7 @@ pub trait Transform: Send { /// /// * Terminating specific invariants /// + Your transform can also choose not to call `requests_wrapper.call_next_transform()` if it sends the - /// messages to an external system or generates its own response to the query e.g. - /// [`crate::transforms::cassandra::sink_single::CassandraSinkSingle`]. + /// messages to an external system or generates its own response to the query e.g. `CassandraSinkSingle`. /// + This type of transform is called a Terminating transform (as no subsequent transforms in the chain will be called). /// /// * Request/Response invariants: @@ -276,7 +273,7 @@ pub trait Transform: Send { /// But it must be returned eventually over the lifetime of the transform. /// - If a transform deletes a request it must return a simulated response message with its request_id set to the deleted request. /// * For in order protocols: this simulated message must be in the correct location within the list of responses - /// - The best way to achieve this is storing the [`MessageId`] of the message before the deleted message. + /// - The best way to achieve this is storing the [`crate::message::MessageId`] of the message before the deleted message. /// - If a transform introduces a new request into the requests_wrapper the response must be located and /// removed from the list of returned responses. /// + For in order protocols, transforms must ensure that responses are kept in the same order in which they are received. diff --git a/shotover/src/transforms/noop.rs b/shotover/src/transforms/noop.rs deleted file mode 100644 index 33bbb43c5..000000000 --- a/shotover/src/transforms/noop.rs +++ /dev/null @@ -1,31 +0,0 @@ -use crate::message::Messages; -use crate::transforms::{Transform, Wrapper}; -use anyhow::Result; -use async_trait::async_trait; - -const NAME: &str = "NoOp"; - -pub struct NoOp {} - -impl NoOp { - pub fn new() -> NoOp { - NoOp {} - } -} - -impl Default for NoOp { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl Transform for NoOp { - fn get_name(&self) -> &'static str { - NAME - } - - async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { - requests_wrapper.call_next_transform().await - } -} diff --git a/shotover/src/transforms/parallel_map.rs b/shotover/src/transforms/parallel_map.rs index 1b094a176..8c2c401b6 100644 --- a/shotover/src/transforms/parallel_map.rs +++ b/shotover/src/transforms/parallel_map.rs @@ -13,12 +13,12 @@ use serde::{Deserialize, Serialize}; use std::future::Future; use std::pin::Pin; -pub struct ParallelMapBuilder { +struct ParallelMapBuilder { chains: Vec, ordered: bool, } -pub struct ParallelMap { +struct ParallelMap { chains: Vec, ordered: bool, } @@ -32,7 +32,7 @@ impl UOFutures where T: Future, { - pub fn new(ordered: bool) -> Self { + fn new(ordered: bool) -> Self { if ordered { Self::Ordered(FuturesOrdered::new()) } else { @@ -40,7 +40,7 @@ where } } - pub fn push(&mut self, future: T) { + fn push(&mut self, future: T) { match self { UOFutures::Ordered(o) => o.push_back(future), UOFutures::Unordered(u) => u.push(future), diff --git a/shotover/src/transforms/protect/mod.rs b/shotover/src/transforms/protect/mod.rs index 00da9582d..0b96e3d2f 100644 --- a/shotover/src/transforms/protect/mod.rs +++ b/shotover/src/transforms/protect/mod.rs @@ -1,7 +1,5 @@ use super::TransformContextBuilder; -#[cfg(feature = "alpha-transforms")] use super::{DownChainProtocol, UpChainProtocol}; -#[cfg(feature = "alpha-transforms")] use crate::frame::MessageType; use crate::frame::{ value::GenericValue, CassandraFrame, CassandraOperation, CassandraResult, Frame, @@ -33,7 +31,6 @@ pub struct ProtectConfig { } const NAME: &str = "Protect"; -#[cfg(feature = "alpha-transforms")] #[typetag::serde(name = "Protect")] #[async_trait(?Send)] impl crate::transforms::TransformConfig for ProtectConfig { @@ -75,7 +72,7 @@ impl crate::transforms::TransformConfig for ProtectConfig { } #[derive(Clone)] -pub struct Protect { +struct Protect { /// map of keyspace Identifiers to map of table Identifiers to column Identifiers keyspace_table_columns: HashMap>>, key_source: KeyManager, diff --git a/shotover/src/transforms/sampler.rs b/shotover/src/transforms/sampler.rs deleted file mode 100644 index 74561dd77..000000000 --- a/shotover/src/transforms/sampler.rs +++ /dev/null @@ -1,61 +0,0 @@ -use crate::message::Messages; -use crate::transforms::chain::{TransformChain, TransformChainBuilder}; -use crate::transforms::{Transform, Wrapper}; -use anyhow::Result; -use async_trait::async_trait; -use tokio::macros::support::thread_rng_n; -use tracing::warn; - -const NAME: &str = "Sampler"; - -pub struct SamplerBuilder { - pub numerator: u32, - pub denominator: u32, - pub sample_chain: TransformChainBuilder, -} - -impl SamplerBuilder { - pub fn new() -> SamplerBuilder { - SamplerBuilder { - numerator: 1, - denominator: 100, - sample_chain: TransformChainBuilder::new(vec![], "dummy"), - } - } -} - -impl Default for SamplerBuilder { - fn default() -> Self { - Self::new() - } -} - -pub struct Sampler { - numerator: u32, - denominator: u32, - sample_chain: TransformChain, -} - -#[async_trait] -impl Transform for Sampler { - fn get_name(&self) -> &'static str { - NAME - } - - async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { - let chance = thread_rng_n(self.denominator); - if chance < self.numerator { - let sample = requests_wrapper.clone(); - let (sample, downstream) = tokio::join!( - self.sample_chain.process_request(sample), - requests_wrapper.call_next_transform() - ); - if sample.is_err() { - warn!("Could not sample request {:?}", sample); - } - downstream - } else { - requests_wrapper.call_next_transform().await - } - } -} diff --git a/shotover/src/transforms/tee.rs b/shotover/src/transforms/tee.rs index 806567677..601c330d7 100644 --- a/shotover/src/transforms/tee.rs +++ b/shotover/src/transforms/tee.rs @@ -18,17 +18,17 @@ use std::sync::atomic::Ordering; use std::{net::SocketAddr, str, sync::Arc}; use tracing::{debug, error, trace, warn}; -pub struct TeeBuilder { - pub tx: TransformChainBuilder, - pub buffer_size: usize, - pub behavior: ConsistencyBehaviorBuilder, - pub timeout_micros: Option, +struct TeeBuilder { + tx: TransformChainBuilder, + buffer_size: usize, + behavior: ConsistencyBehaviorBuilder, + timeout_micros: Option, dropped_messages: Counter, result_source: Arc, protocol_is_inorder: bool, } -pub enum ConsistencyBehaviorBuilder { +enum ConsistencyBehaviorBuilder { Ignore, LogWarningOnMismatch, FailOnMismatch, @@ -36,7 +36,7 @@ pub enum ConsistencyBehaviorBuilder { } impl TeeBuilder { - pub fn new( + fn new( tx: TransformChainBuilder, buffer_size: usize, behavior: ConsistencyBehaviorBuilder, @@ -85,7 +85,6 @@ impl TransformBuilder for TeeBuilder { ) } }, - buffer_size: self.buffer_size, timeout_micros: self.timeout_micros, dropped_messages: self.dropped_messages.clone(), result_source: self.result_source.clone(), @@ -132,18 +131,17 @@ impl TransformBuilder for TeeBuilder { } } -pub struct Tee { - pub tx: BufferedChain, - pub buffer_size: usize, - pub behavior: ConsistencyBehavior, - pub timeout_micros: Option, +struct Tee { + tx: BufferedChain, + behavior: ConsistencyBehavior, + timeout_micros: Option, dropped_messages: Counter, result_source: Arc, incoming_responses: IncomingResponses, } #[atomic_enum] -pub enum ResultSource { +enum ResultSource { RegularChain, TeeChain, } @@ -157,7 +155,7 @@ impl fmt::Display for ResultSource { } } -pub enum ConsistencyBehavior { +enum ConsistencyBehavior { Ignore, LogWarningOnMismatch, FailOnMismatch, diff --git a/shotover/src/transforms/throttling.rs b/shotover/src/transforms/throttling.rs index a7b6147d6..686e25e49 100644 --- a/shotover/src/transforms/throttling.rs +++ b/shotover/src/transforms/throttling.rs @@ -48,7 +48,7 @@ impl TransformConfig for RequestThrottlingConfig { } #[derive(Clone)] -pub struct RequestThrottling { +struct RequestThrottling { limiter: Arc>, max_requests_per_second: NonZeroU32, throttled_requests: MessageIdMap,