From 6a2f72cb18cdd80ac6c1282daba9fab45dabcf63 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 25 Jan 2024 21:09:16 +1100 Subject: [PATCH] Add feature flags for each protocol in shotover --- .cargo/config.toml | 5 +- custom-transforms-example/Cargo.toml | 2 +- shotover-proxy/Cargo.toml | 6 +- shotover-proxy/benches/windsock/main.rs | 83 +++++++++++++++--------- shotover-proxy/tests/lib.rs | 3 +- shotover/Cargo.toml | 33 ++++++---- shotover/benches/benches/main.rs | 4 +- shotover/src/codec/mod.rs | 10 +++ shotover/src/config/topology.rs | 2 +- shotover/src/frame/mod.rs | 43 +++++++++++- shotover/src/message/mod.rs | 54 ++++++++++++++- shotover/src/sources/mod.rs | 18 +++++ shotover/src/transforms/mod.rs | 59 +++++++++++++++++ shotover/src/transforms/query_counter.rs | 3 + shotover/src/transforms/redis/mod.rs | 1 + 15 files changed, 273 insertions(+), 53 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 3e1c66273..f8dbaf92f 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -8,6 +8,9 @@ rustflags = [ linker = "aarch64-linux-gnu-gcc" [alias] -windsock = "test --release --bench windsock --features alpha-transforms,rdkafka-driver-tests --" +windsock-redis = "test --release --bench windsock --no-default-features --features redis,alpha-transforms --" +windsock-kafka = "test --release --bench windsock --no-default-features --features kafka,alpha-transforms,rdkafka-driver-tests --" +windsock-cassandra = "test --release --bench windsock --no-default-features --features cassandra,alpha-transforms --" +windsock = "test --release --bench windsock --all-features --" windsock-debug = "test --bench windsock --features alpha-transforms,rdkafka-driver-tests --" windsock-cloud-docker = "run --package windsock-cloud-docker --" diff --git a/custom-transforms-example/Cargo.toml b/custom-transforms-example/Cargo.toml index 5663e36b5..3d0b7dfa0 100644 --- a/custom-transforms-example/Cargo.toml +++ b/custom-transforms-example/Cargo.toml @@ -8,7 +8,7 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -shotover = { path = "../shotover" } +shotover = { path = "../shotover", default-features = false, features = ["redis"]} anyhow.workspace = true serde.workspace = true async-trait.workspace = true diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index b99b56064..b40ef6d36 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -8,7 +8,7 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -shotover = { path = "../shotover" } +shotover = { path = "../shotover", default-features = false} [dev-dependencies] prometheus-parse = "0.2.4" @@ -60,8 +60,12 @@ shell-quote.workspace = true [features] # Include WIP alpha transforms in the public API alpha-transforms = ["shotover/alpha-transforms"] +cassandra = ["shotover/cassandra"] +kafka = ["shotover/kafka"] +redis = ["shotover/redis"] cassandra-cpp-driver-tests = ["test-helpers/cassandra-cpp-driver-tests"] rdkafka-driver-tests = ["test-helpers/rdkafka-driver-tests"] +default = ["cassandra", "kafka", "redis"] [[bench]] name = "windsock" diff --git a/shotover-proxy/benches/windsock/main.rs b/shotover-proxy/benches/windsock/main.rs index aa449a043..f3ff9c30a 100644 --- a/shotover-proxy/benches/windsock/main.rs +++ b/shotover-proxy/benches/windsock/main.rs @@ -1,16 +1,26 @@ +#[cfg(feature = "cassandra")] mod cassandra; mod cloud; mod common; -#[cfg(feature = "rdkafka-driver-tests")] +#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))] mod kafka; mod profilers; +#[cfg(feature = "redis")] mod redis; mod shotover; +#[cfg(feature = "cassandra")] use crate::cassandra::*; +#[cfg(any( + feature = "cassandra", + feature = "redis", + feature = "kafka", + all(feature = "rdkafka-driver-tests", feature = "kafka") +))] use crate::common::*; -#[cfg(feature = "rdkafka-driver-tests")] +#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))] use crate::kafka::*; +#[cfg(feature = "redis")] use crate::redis::*; use cloud::CloudResources; use cloud::CloudResourcesRequired; @@ -41,6 +51,7 @@ fn main() { .unwrap(); } + #[cfg(feature = "cassandra")] let cassandra_benches = itertools::iproduct!( [CassandraDb::Cassandra], [CassandraTopology::Single, CassandraTopology::Cluster3], @@ -88,7 +99,36 @@ fn main() { )) as ShotoverBench) }, ); - #[cfg(feature = "rdkafka-driver-tests")] + #[cfg(not(feature = "cassandra"))] + let cassandra_benches = std::iter::empty(); + #[cfg(feature = "cassandra")] + let cassandra_mock_benches = vec![ + Box::new(CassandraBench::new( + CassandraDb::Mocked, + CassandraTopology::Single, + Shotover::None, + Compression::None, + Operation::ReadI64, + CassandraProtocol::V4, + CassandraDriver::Scylla, + 10, + )) as ShotoverBench, + Box::new(CassandraBench::new( + CassandraDb::Mocked, + CassandraTopology::Single, + Shotover::Standard, + Compression::None, + Operation::ReadI64, + CassandraProtocol::V4, + CassandraDriver::Scylla, + 10, + )), + ] + .into_iter(); + #[cfg(not(feature = "cassandra"))] + let cassandra_mock_benches = std::iter::empty(); + + #[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))] let kafka_benches = itertools::iproduct!( [ Shotover::None, @@ -105,9 +145,10 @@ fn main() { .map(|(shotover, topology, size)| { Box::new(KafkaBench::new(shotover, topology, size)) as ShotoverBench }); - #[cfg(not(feature = "rdkafka-driver-tests"))] + #[cfg(not(all(feature = "rdkafka-driver-tests", feature = "kafka")))] let kafka_benches = std::iter::empty(); + #[cfg(feature = "redis")] let redis_benches = itertools::iproduct!( [RedisTopology::Cluster3, RedisTopology::Single], [ @@ -121,35 +162,15 @@ fn main() { .map(|(topology, shotover, operation, encryption)| { Box::new(RedisBench::new(topology, shotover, operation, encryption)) as ShotoverBench }); + #[cfg(not(feature = "redis"))] + let redis_benches = std::iter::empty(); Windsock::new( - vec![ - Box::new(CassandraBench::new( - CassandraDb::Mocked, - CassandraTopology::Single, - Shotover::None, - Compression::None, - Operation::ReadI64, - CassandraProtocol::V4, - CassandraDriver::Scylla, - 10, - )) as ShotoverBench, - Box::new(CassandraBench::new( - CassandraDb::Mocked, - CassandraTopology::Single, - Shotover::Standard, - Compression::None, - Operation::ReadI64, - CassandraProtocol::V4, - CassandraDriver::Scylla, - 10, - )), - ] - .into_iter() - .chain(cassandra_benches) - .chain(kafka_benches) - .chain(redis_benches) - .collect(), + cassandra_mock_benches + .chain(cassandra_benches) + .chain(kafka_benches) + .chain(redis_benches) + .collect(), cloud::AwsCloud::new_boxed(), &["release"], ) diff --git a/shotover-proxy/tests/lib.rs b/shotover-proxy/tests/lib.rs index 06e7afc31..e26ee5fb5 100644 --- a/shotover-proxy/tests/lib.rs +++ b/shotover-proxy/tests/lib.rs @@ -1,9 +1,10 @@ -#[allow(clippy::single_component_path_imports)] +#[allow(clippy::single_component_path_imports, unused_imports)] use rstest_reuse; use test_helpers::shotover_process::ShotoverProcessBuilder; use tokio_bin_process::bin_path; +#[cfg(feature = "cassandra")] mod cassandra_int_tests; mod kafka_int_tests; #[cfg(feature = "alpha-transforms")] diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 57aff9737..fef41b398 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -12,6 +12,16 @@ description = "Shotover API for building custom transforms" [features] # Include WIP alpha transforms in the public API alpha-transforms = [] +cassandra = [ + "dep:cassandra-protocol", + "dep:cql3-parser", + "dep:lz4_flex", + "dep:version-compare", + "dep:uuid" +] +kafka = ["dep:kafka-protocol", "dep:dashmap", "dep:xxhash-rust"] +redis = ["dep:redis-protocol", "dep:csv"] +default = ["cassandra"] [dependencies] atomic_enum = "0.2.0" @@ -23,9 +33,9 @@ cached = { version = "0.47", features = ["async"] } async-recursion = "1.0" governor = { version = "0.6", default-features = false, features = ["std", "jitter", "quanta"] } nonzero_ext = "0.3.0" -version-compare = "0.1" +version-compare = { version = "0.1", optional = true } rand = { features = ["small_rng"], workspace = true } -lz4_flex = "0.11.0" +lz4_flex = { version = "0.11.0", optional = true } clap.workspace = true itertools.workspace = true rand_distr.workspace = true @@ -33,9 +43,8 @@ bytes.workspace = true futures.workspace = true tokio.workspace = true tokio-util.workspace = true -csv.workspace = true +csv = { workspace = true, optional = true } hex.workspace = true -hex-literal.workspace = true async-trait.workspace = true typetag.workspace = true tokio-tungstenite = "0.21.0" @@ -47,13 +56,13 @@ backtrace = "0.3.66" backtrace-ext = "0.2" # Parsers -cql3-parser = "0.4.0" +cql3-parser = { version = "0.4.0", optional = true } serde.workspace = true serde_json.workspace = true serde_yaml.workspace = true bincode.workspace = true num = { version = "0.4.0", features = ["serde"] } -uuid.workspace = true +uuid = { workspace = true, optional = true } bigdecimal = { version = "0.4.0", features = ["serde"] } base64 = "0.21.0" httparse = "1.8.0" @@ -69,8 +78,8 @@ hyper.workspace = true halfbrown = "0.2.1" # Transform dependencies -redis-protocol.workspace = true -cassandra-protocol.workspace = true +redis-protocol = { workspace = true, optional = true } +cassandra-protocol = { workspace = true, optional = true } crc16 = "0.4.0" ordered-float.workspace = true @@ -80,19 +89,21 @@ aws-sdk-kms = "1.1.0" strum_macros = "0.25" chacha20poly1305 = { version = "0.10.0", features = ["std"] } generic-array = { version = "0.14", features = ["serde"] } -kafka-protocol = "0.8.0" +kafka-protocol = { version = "0.8.0", optional = true } rustls = { version = "0.22.0" } tokio-rustls = "0.25" rustls-pemfile = "2.0.0" rustls-pki-types = "1.0.1" string = "0.3.0" -xxhash-rust = { version = "0.8.6", features = ["xxh3"] } -dashmap = "5.4.0" +xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true } +dashmap = { version = "5.4.0", optional = true } atoi = "2.0.0" [dev-dependencies] criterion = { version = "0.5.0", features = ["async_tokio"] } +hex-literal.workspace = true [[bench]] name = "benches" harness = false +required-features = ["cassandra", "redis"] diff --git a/shotover/benches/benches/main.rs b/shotover/benches/benches/main.rs index 7a8533324..36ad35506 100644 --- a/shotover/benches/benches/main.rs +++ b/shotover/benches/benches/main.rs @@ -1,6 +1,8 @@ use criterion::criterion_main; - +// TODO: apply features to modules after https://github.com/shotover/shotover-proxy/pull/1424 +#[cfg(all(feature = "cassandra", feature = "kafka"))] mod chain; +#[cfg(all(feature = "cassandra", feature = "kafka"))] mod codec; fn init() { diff --git a/shotover/src/codec/mod.rs b/shotover/src/codec/mod.rs index d7f2731eb..d4b10db81 100644 --- a/shotover/src/codec/mod.rs +++ b/shotover/src/codec/mod.rs @@ -1,15 +1,20 @@ //! Codec types to use for connecting to a DB in a sink transform use crate::message::Messages; +#[cfg(feature = "cassandra")] use cassandra_protocol::compression::Compression; use core::fmt; +#[cfg(feature = "kafka")] use kafka::RequestHeader; use metrics::{register_histogram, Histogram}; use tokio_util::codec::{Decoder, Encoder}; +#[cfg(feature = "cassandra")] pub mod cassandra; +#[cfg(feature = "kafka")] pub mod kafka; pub mod opensearch; +#[cfg(feature = "redis")] pub mod redis; #[derive(Eq, PartialEq, Copy, Clone)] @@ -40,10 +45,13 @@ pub fn message_latency(direction: Direction, destination_name: String) -> Histog #[derive(Debug, Clone, PartialEq, Copy)] pub enum CodecState { + #[cfg(feature = "cassandra")] Cassandra { compression: Compression, }, + #[cfg(feature = "redis")] Redis, + #[cfg(feature = "kafka")] Kafka { request_header: Option, }, @@ -52,6 +60,7 @@ pub enum CodecState { } impl CodecState { + #[cfg(feature = "cassandra")] pub fn as_cassandra(&self) -> Compression { match self { CodecState::Cassandra { compression } => *compression, @@ -61,6 +70,7 @@ impl CodecState { } } + #[cfg(feature = "kafka")] pub fn as_kafka(&self) -> Option { match self { CodecState::Kafka { request_header } => *request_header, diff --git a/shotover/src/config/topology.rs b/shotover/src/config/topology.rs index f34c56487..374677ea5 100644 --- a/shotover/src/config/topology.rs +++ b/shotover/src/config/topology.rs @@ -60,7 +60,7 @@ impl Topology { } } -#[cfg(test)] +#[cfg(all(test, feature = "redis", feature = "cassandra"))] mod topology_tests { use crate::config::chain::TransformChainConfig; use crate::config::topology::Topology; diff --git a/shotover/src/frame/mod.rs b/shotover/src/frame/mod.rs index 3b89f55c4..64d84be7a 100644 --- a/shotover/src/frame/mod.rs +++ b/shotover/src/frame/mod.rs @@ -3,23 +3,34 @@ use crate::{codec::CodecState, message::ProtocolType}; use anyhow::{anyhow, Result}; use bytes::Bytes; +#[cfg(feature = "cassandra")] pub use cassandra::{CassandraFrame, CassandraOperation, CassandraResult}; +#[cfg(feature = "cassandra")] use cassandra_protocol::compression::Compression; +#[cfg(feature = "kafka")] use kafka::KafkaFrame; pub use opensearch::OpenSearchFrame; +#[cfg(feature = "redis")] pub use redis_protocol::resp2::types::Frame as RedisFrame; use std::fmt::{Display, Formatter, Result as FmtResult}; +#[cfg(feature = "cassandra")] pub mod cassandra; +#[cfg(feature = "kafka")] pub mod kafka; pub mod opensearch; +#[cfg(feature = "redis")] pub mod redis; +#[cfg(feature = "cassandra")] pub mod value; #[derive(PartialEq, Debug, Clone, Copy)] pub enum MessageType { + #[cfg(feature = "redis")] Redis, + #[cfg(feature = "cassandra")] Cassandra, + #[cfg(feature = "kafka")] Kafka, Dummy, OpenSearch, @@ -28,8 +39,11 @@ pub enum MessageType { impl From<&ProtocolType> for MessageType { fn from(value: &ProtocolType) -> Self { match value { + #[cfg(feature = "cassandra")] ProtocolType::Cassandra { .. } => Self::Cassandra, + #[cfg(feature = "redis")] ProtocolType::Redis => Self::Redis, + #[cfg(feature = "kafka")] ProtocolType::Kafka { .. } => Self::Kafka, ProtocolType::OpenSearch => Self::OpenSearch, } @@ -39,10 +53,13 @@ impl From<&ProtocolType> for MessageType { impl Frame { pub fn as_codec_state(&self) -> CodecState { match self { + #[cfg(feature = "cassandra")] Frame::Cassandra(_) => CodecState::Cassandra { compression: Compression::None, }, + #[cfg(feature = "redis")] Frame::Redis(_) => CodecState::Redis, + #[cfg(feature = "kafka")] Frame::Kafka(_) => CodecState::Kafka { request_header: None, }, @@ -54,8 +71,11 @@ impl Frame { #[derive(PartialEq, Debug, Clone)] pub enum Frame { + #[cfg(feature = "cassandra")] Cassandra(CassandraFrame), + #[cfg(feature = "redis")] Redis(RedisFrame), + #[cfg(feature = "kafka")] Kafka(KafkaFrame), /// Represents a message that has must exist due to shotovers requirement that every request has a corresponding response. /// It exists purely to keep transform invariants and codecs will completely ignore this frame when they receive it @@ -67,17 +87,20 @@ impl Frame { pub fn from_bytes( bytes: Bytes, message_type: MessageType, - codec_state: CodecState, + _codec_state: CodecState, ) -> Result { match message_type { + #[cfg(feature = "cassandra")] MessageType::Cassandra => { - CassandraFrame::from_bytes(bytes, codec_state.as_cassandra()).map(Frame::Cassandra) + CassandraFrame::from_bytes(bytes, _codec_state.as_cassandra()).map(Frame::Cassandra) } + #[cfg(feature = "redis")] MessageType::Redis => redis_protocol::resp2::decode::decode(&bytes) .map(|x| Frame::Redis(x.unwrap().0)) .map_err(|e| anyhow!("{e:?}")), + #[cfg(feature = "kafka")] MessageType::Kafka => { - KafkaFrame::from_bytes(bytes, codec_state.as_kafka()).map(Frame::Kafka) + KafkaFrame::from_bytes(bytes, _codec_state.as_kafka()).map(Frame::Kafka) } MessageType::Dummy => Ok(Frame::Dummy), MessageType::OpenSearch => Ok(Frame::OpenSearch(OpenSearchFrame::from_bytes(&bytes)?)), @@ -86,8 +109,11 @@ impl Frame { pub fn name(&self) -> &'static str { match self { + #[cfg(feature = "redis")] Frame::Redis(_) => "Redis", + #[cfg(feature = "cassandra")] Frame::Cassandra(_) => "Cassandra", + #[cfg(feature = "kafka")] Frame::Kafka(_) => "Kafka", Frame::Dummy => "Dummy", Frame::OpenSearch(_) => "OpenSearch", @@ -96,14 +122,18 @@ impl Frame { pub fn get_type(&self) -> MessageType { match self { + #[cfg(feature = "cassandra")] Frame::Cassandra(_) => MessageType::Cassandra, + #[cfg(feature = "redis")] Frame::Redis(_) => MessageType::Redis, + #[cfg(feature = "kafka")] Frame::Kafka(_) => MessageType::Kafka, Frame::Dummy => MessageType::Dummy, Frame::OpenSearch(_) => MessageType::OpenSearch, } } + #[cfg(feature = "redis")] pub fn redis(&mut self) -> Result<&mut RedisFrame> { match self { Frame::Redis(frame) => Ok(frame), @@ -114,6 +144,7 @@ impl Frame { } } + #[cfg(feature = "kafka")] pub fn into_kafka(self) -> Result { match self { Frame::Kafka(frame) => Ok(frame), @@ -124,6 +155,7 @@ impl Frame { } } + #[cfg(feature = "redis")] pub fn into_redis(self) -> Result { match self { Frame::Redis(frame) => Ok(frame), @@ -134,8 +166,10 @@ impl Frame { } } + #[cfg(feature = "cassandra")] pub fn into_cassandra(self) -> Result { match self { + #[cfg(feature = "cassandra")] Frame::Cassandra(frame) => Ok(frame), frame => Err(anyhow!( "Expected cassandra frame but received {} frame", @@ -158,8 +192,11 @@ impl Frame { impl Display for Frame { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { match self { + #[cfg(feature = "cassandra")] Frame::Cassandra(frame) => write!(f, "Cassandra {}", frame), + #[cfg(feature = "redis")] Frame::Redis(frame) => write!(f, "Redis {:?}", frame), + #[cfg(feature = "kafka")] Frame::Kafka(frame) => write!(f, "Kafka {}", frame), Frame::Dummy => write!(f, "Shotover internal dummy message"), Frame::OpenSearch(frame) => write!(f, "OpenSearch: {:?}", frame), diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 0a581fa0a..680d27ed8 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -1,17 +1,27 @@ //! Message and supporting types - used to hold a message/query/result going between the client and database +#[cfg(feature = "kafka")] use crate::codec::kafka::RequestHeader; use crate::codec::CodecState; +#[cfg(feature = "cassandra")] use crate::frame::cassandra::Tracing; +#[cfg(feature = "redis")] use crate::frame::redis::redis_query_type; +#[cfg(feature = "cassandra")] +use crate::frame::CassandraFrame; +#[cfg(feature = "redis")] +use crate::frame::RedisFrame; +#[cfg(feature = "cassandra")] use crate::frame::{ cassandra, cassandra::{CassandraMetadata, CassandraOperation}, }; -use crate::frame::{CassandraFrame, Frame, MessageType, RedisFrame}; +use crate::frame::{Frame, MessageType}; use anyhow::{anyhow, Context, Result}; -use bytes::{Buf, Bytes}; +use bytes::Bytes; +#[cfg(feature = "cassandra")] use cassandra_protocol::compression::Compression; +#[cfg(feature = "cassandra")] use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType}; use derivative::Derivative; use nonzero_ext::nonzero; @@ -20,18 +30,24 @@ use std::num::NonZeroU32; use std::time::Instant; pub enum Metadata { + #[cfg(feature = "cassandra")] Cassandra(CassandraMetadata), + #[cfg(feature = "redis")] Redis, + #[cfg(feature = "kafka")] Kafka, OpenSearch, } #[derive(PartialEq)] pub enum ProtocolType { + #[cfg(feature = "cassandra")] Cassandra { compression: Compression, }, + #[cfg(feature = "redis")] Redis, + #[cfg(feature = "kafka")] Kafka { request_header: Option, }, @@ -41,10 +57,13 @@ pub enum ProtocolType { impl From<&ProtocolType> for CodecState { fn from(value: &ProtocolType) -> Self { match value { + #[cfg(feature = "cassandra")] ProtocolType::Cassandra { compression } => Self::Cassandra { compression: *compression, }, + #[cfg(feature = "redis")] ProtocolType::Redis => Self::Redis, + #[cfg(feature = "kafka")] ProtocolType::Kafka { request_header } => Self::Kafka { request_header: *request_header, }, @@ -244,18 +263,25 @@ impl Message { pub fn cell_count(&self) -> Result { Ok(match self.inner.as_ref().unwrap() { MessageInner::RawBytes { + #[cfg(feature = "cassandra")] bytes, message_type, + .. } => match message_type { MessageType::Redis => nonzero!(1u32), + #[cfg(feature = "cassandra")] MessageType::Cassandra => cassandra::raw_frame::cell_count(bytes)?, + #[cfg(feature = "kafka")] MessageType::Kafka => todo!(), MessageType::Dummy => nonzero!(1u32), MessageType::OpenSearch => todo!(), }, MessageInner::Modified { frame } | MessageInner::Parsed { frame, .. } => match frame { + #[cfg(feature = "cassandra")] Frame::Cassandra(frame) => frame.cell_count()?, + #[cfg(feature = "redis")] Frame::Redis(_) => nonzero!(1u32), + #[cfg(feature = "kafka")] Frame::Kafka(_) => todo!(), Frame::Dummy => nonzero!(1u32), Frame::OpenSearch(_) => todo!(), @@ -278,8 +304,11 @@ impl Message { pub fn get_query_type(&mut self) -> QueryType { match self.frame() { + #[cfg(feature = "cassandra")] Some(Frame::Cassandra(cassandra)) => cassandra.get_query_type(), + #[cfg(feature = "redis")] Some(Frame::Redis(redis)) => redis_query_type(redis), // free-standing function as we cant define methods on RedisFrame + #[cfg(feature = "kafka")] Some(Frame::Kafka(_)) => todo!(), Some(Frame::Dummy) => todo!(), Some(Frame::OpenSearch(_)) => todo!(), @@ -292,6 +321,7 @@ impl Message { /// If self is a response: the returned `Message` is a valid replacement of self pub fn to_error_response(&self, error: String) -> Result { Ok(Message::from_frame(match self.metadata().context("Failed to parse metadata of request or response when producing an error")? { + #[cfg(feature = "redis")] Metadata::Redis => { // Redis errors can not contain newlines at the protocol level let message = format!("ERR {error}") @@ -299,6 +329,7 @@ impl Message { .replace('\n', " "); Frame::Redis(RedisFrame::Error(message.into())) } + #[cfg(feature = "cassandra")] Metadata::Cassandra(frame) => Frame::Cassandra(CassandraFrame { version: frame.version, stream_id: frame.stream_id, @@ -313,6 +344,7 @@ impl Message { // * kafka errors are defined per response type and many response types only provide an error code without a field for a custom error message. // + Implementing this per response type would add a lot of (localized) complexity but might be worth it. // * the official C++ kafka driver we use for integration tests does not pick up errors sent just before closing a connection, so this wouldnt help the usage in server.rs where we send an error before terminating the connection for at least that driver. + #[cfg(feature = "kafka")] Metadata::Kafka => return Err(anyhow!(error).context( "A generic error cannot be formed because the kafka protocol does not support it", )), @@ -324,20 +356,28 @@ impl Message { pub fn metadata(&self) -> Result { match self.inner.as_ref().unwrap() { MessageInner::RawBytes { + #[cfg(feature = "cassandra")] bytes, message_type, + .. } => match message_type { + #[cfg(feature = "cassandra")] MessageType::Cassandra => { Ok(Metadata::Cassandra(cassandra::raw_frame::metadata(bytes)?)) } + #[cfg(feature = "redis")] MessageType::Redis => Ok(Metadata::Redis), + #[cfg(feature = "kafka")] MessageType::Kafka => Ok(Metadata::Kafka), MessageType::Dummy => Err(anyhow!("Dummy has no metadata")), MessageType::OpenSearch => Err(anyhow!("OpenSearch has no metadata")), }, MessageInner::Parsed { frame, .. } | MessageInner::Modified { frame } => match frame { + #[cfg(feature = "cassandra")] Frame::Cassandra(frame) => Ok(Metadata::Cassandra(frame.metadata())), + #[cfg(feature = "kafka")] Frame::Kafka(_) => Ok(Metadata::Kafka), + #[cfg(feature = "redis")] Frame::Redis(_) => Ok(Metadata::Redis), Frame::Dummy => Err(anyhow!("dummy has no metadata")), Frame::OpenSearch(_) => Err(anyhow!("OpenSearch has no metadata")), @@ -351,6 +391,7 @@ impl Message { *self = Message::from_frame_at_instant( match metadata { + #[cfg(feature = "cassandra")] Metadata::Cassandra(metadata) => { let body = CassandraOperation::Error(ErrorBody { message: "Server overloaded".into(), @@ -365,10 +406,14 @@ impl Message { operation: body, }) } + #[cfg(feature = "redis")] Metadata::Redis => unimplemented!(), + #[cfg(feature = "kafka")] Metadata::Kafka => unimplemented!(), Metadata::OpenSearch => unimplemented!(), }, + // reachable with feature = cassandra + #[allow(unreachable_code)] self.received_from_source_or_sink_at, ); @@ -381,10 +426,12 @@ impl Message { // For now its just written to match cassandra's stream_id field pub fn stream_id(&self) -> Option { match &self.inner { + #[cfg(feature = "cassandra")] Some(MessageInner::RawBytes { bytes, message_type: MessageType::Cassandra, }) => { + use bytes::Buf; const HEADER_LEN: usize = 9; if bytes.len() >= HEADER_LEN { Some((&bytes[2..4]).get_i16()) @@ -395,8 +442,11 @@ impl Message { Some(MessageInner::RawBytes { .. }) => None, Some(MessageInner::Parsed { frame, .. } | MessageInner::Modified { frame }) => { match frame { + #[cfg(feature = "cassandra")] Frame::Cassandra(cassandra) => Some(cassandra.stream_id), + #[cfg(feature = "redis")] Frame::Redis(_) => None, + #[cfg(feature = "kafka")] Frame::Kafka(_) => None, Frame::Dummy => None, Frame::OpenSearch(_) => None, diff --git a/shotover/src/sources/mod.rs b/shotover/src/sources/mod.rs index af18b04c2..91d587190 100644 --- a/shotover/src/sources/mod.rs +++ b/shotover/src/sources/mod.rs @@ -1,17 +1,23 @@ //! Sources used to listen for connections and send/recieve with the client. +#[cfg(feature = "cassandra")] use crate::sources::cassandra::{CassandraConfig, CassandraSource}; +#[cfg(feature = "kafka")] use crate::sources::kafka::{KafkaConfig, KafkaSource}; use crate::sources::opensearch::{OpenSearchConfig, OpenSearchSource}; +#[cfg(feature = "redis")] use crate::sources::redis::{RedisConfig, RedisSource}; use anyhow::Result; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tokio::task::JoinHandle; +#[cfg(feature = "cassandra")] pub mod cassandra; +#[cfg(feature = "kafka")] pub mod kafka; pub mod opensearch; +#[cfg(feature = "redis")] pub mod redis; #[derive(Serialize, Deserialize, Debug, Clone, Copy)] @@ -23,8 +29,11 @@ pub enum Transport { #[derive(Debug)] pub enum Source { + #[cfg(feature = "cassandra")] Cassandra(CassandraSource), + #[cfg(feature = "redis")] Redis(RedisSource), + #[cfg(feature = "kafka")] Kafka(KafkaSource), OpenSearch(OpenSearchSource), } @@ -32,8 +41,11 @@ pub enum Source { impl Source { pub fn into_join_handle(self) -> JoinHandle<()> { match self { + #[cfg(feature = "cassandra")] Source::Cassandra(c) => c.join_handle, + #[cfg(feature = "redis")] Source::Redis(r) => r.join_handle, + #[cfg(feature = "kafka")] Source::Kafka(r) => r.join_handle, Source::OpenSearch(o) => o.join_handle, } @@ -43,8 +55,11 @@ impl Source { #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub enum SourceConfig { + #[cfg(feature = "cassandra")] Cassandra(CassandraConfig), + #[cfg(feature = "redis")] Redis(RedisConfig), + #[cfg(feature = "kafka")] Kafka(KafkaConfig), OpenSearch(OpenSearchConfig), } @@ -55,8 +70,11 @@ impl SourceConfig { trigger_shutdown_rx: watch::Receiver, ) -> Result> { match self { + #[cfg(feature = "cassandra")] SourceConfig::Cassandra(c) => c.get_source(trigger_shutdown_rx).await, + #[cfg(feature = "redis")] SourceConfig::Redis(r) => r.get_source(trigger_shutdown_rx).await, + #[cfg(feature = "kafka")] SourceConfig::Kafka(r) => r.get_source(trigger_shutdown_rx).await, SourceConfig::OpenSearch(r) => r.get_source(trigger_shutdown_rx).await, } diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index 71dcfb918..50f41fc36 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -1,8 +1,11 @@ //! Various types required for defining a transform use crate::message::Messages; +#[cfg(feature = "cassandra")] use crate::transforms::cassandra::peers_rewrite::CassandraPeersRewrite; +#[cfg(feature = "cassandra")] use crate::transforms::cassandra::sink_cluster::CassandraSinkCluster; +#[cfg(feature = "cassandra")] use crate::transforms::cassandra::sink_single::CassandraSinkSingle; use crate::transforms::coalesce::Coalesce; use crate::transforms::debug::force_parse::DebugForceParse; @@ -12,7 +15,9 @@ use crate::transforms::debug::random_delay::DebugRandomDelay; use crate::transforms::debug::returner::DebugReturner; use crate::transforms::distributed::tuneable_consistency_scatter::TuneableConsistentencyScatter; use crate::transforms::filter::QueryTypeFilter; +#[cfg(feature = "kafka")] use crate::transforms::kafka::sink_cluster::KafkaSinkCluster; +#[cfg(feature = "kafka")] use crate::transforms::kafka::sink_single::KafkaSinkSingle; use crate::transforms::load_balance::ConnectionBalanceAndPool; use crate::transforms::loopback::Loopback; @@ -20,12 +25,18 @@ use crate::transforms::null::NullSink; #[cfg(feature = "alpha-transforms")] use crate::transforms::opensearch::OpenSearchSinkSingle; use crate::transforms::parallel_map::ParallelMap; +#[cfg(feature = "cassandra")] use crate::transforms::protect::Protect; use crate::transforms::query_counter::QueryCounter; +#[cfg(all(feature = "redis", feature = "cassandra"))] use crate::transforms::redis::cache::SimpleRedisCache; +#[cfg(feature = "redis")] use crate::transforms::redis::cluster_ports_rewrite::RedisClusterPortsRewrite; +#[cfg(feature = "redis")] use crate::transforms::redis::sink_cluster::RedisSinkCluster; +#[cfg(feature = "redis")] use crate::transforms::redis::sink_single::RedisSinkSingle; +#[cfg(feature = "redis")] use crate::transforms::redis::timestamp_tagging::RedisTimestampTagger; use crate::transforms::tee::Tee; use crate::transforms::throttling::RequestThrottling; @@ -44,12 +55,14 @@ use tokio::time::Instant; use self::chain::TransformAndMetrics; +#[cfg(feature = "cassandra")] pub mod cassandra; pub mod chain; pub mod coalesce; pub mod debug; pub mod distributed; pub mod filter; +#[cfg(feature = "kafka")] pub mod kafka; pub mod load_balance; pub mod loopback; @@ -58,8 +71,10 @@ pub mod null; #[cfg(feature = "alpha-transforms")] pub mod opensearch; pub mod parallel_map; +#[cfg(feature = "cassandra")] pub mod protect; pub mod query_counter; +#[cfg(feature = "redis")] pub mod redis; pub mod sampler; pub mod tee; @@ -92,20 +107,31 @@ impl Debug for dyn TransformBuilder { /// than using dynamic trait objects. #[derive(IntoStaticStr)] pub enum Transforms { + #[cfg(feature = "kafka")] KafkaSinkSingle(KafkaSinkSingle), + #[cfg(feature = "kafka")] KafkaSinkCluster(KafkaSinkCluster), + #[cfg(feature = "cassandra")] CassandraSinkSingle(CassandraSinkSingle), + #[cfg(feature = "cassandra")] CassandraSinkCluster(Box), + #[cfg(feature = "redis")] RedisSinkSingle(RedisSinkSingle), + #[cfg(feature = "cassandra")] CassandraPeersRewrite(CassandraPeersRewrite), + #[cfg(all(feature = "redis", feature = "cassandra"))] RedisCache(SimpleRedisCache), Tee(Tee), NullSink(NullSink), Loopback(Loopback), + #[cfg(feature = "cassandra")] Protect(Box), TuneableConsistencyScatter(TuneableConsistentencyScatter), + #[cfg(feature = "redis")] RedisTimestampTagger(RedisTimestampTagger), + #[cfg(feature = "redis")] RedisSinkCluster(RedisSinkCluster), + #[cfg(feature = "redis")] RedisClusterPortsRewrite(RedisClusterPortsRewrite), DebugReturner(DebugReturner), DebugRandomDelay(DebugRandomDelay), @@ -132,11 +158,17 @@ impl Debug for Transforms { impl Transforms { async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { match self { + #[cfg(feature = "kafka")] Transforms::KafkaSinkSingle(c) => c.transform(requests_wrapper).await, + #[cfg(feature = "kafka")] Transforms::KafkaSinkCluster(c) => c.transform(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraSinkSingle(c) => c.transform(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraSinkCluster(c) => c.transform(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraPeersRewrite(c) => c.transform(requests_wrapper).await, + #[cfg(all(feature = "redis", feature = "cassandra"))] Transforms::RedisCache(r) => r.transform(requests_wrapper).await, Transforms::Tee(m) => m.transform(requests_wrapper).await, Transforms::DebugPrinter(p) => p.transform(requests_wrapper).await, @@ -144,13 +176,18 @@ impl Transforms { Transforms::DebugForceParse(p) => p.transform(requests_wrapper).await, Transforms::NullSink(n) => n.transform(requests_wrapper).await, Transforms::Loopback(n) => n.transform(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::Protect(p) => p.transform(requests_wrapper).await, Transforms::DebugReturner(p) => p.transform(requests_wrapper).await, Transforms::DebugRandomDelay(p) => p.transform(requests_wrapper).await, Transforms::TuneableConsistencyScatter(tc) => tc.transform(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisSinkSingle(r) => r.transform(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisTimestampTagger(r) => r.transform(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisClusterPortsRewrite(r) => r.transform(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisSinkCluster(r) => r.transform(requests_wrapper).await, Transforms::ParallelMap(s) => s.transform(requests_wrapper).await, Transforms::PoolConnections(s) => s.transform(requests_wrapper).await, @@ -166,11 +203,17 @@ impl Transforms { async fn transform_pushed<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { match self { + #[cfg(feature = "kafka")] Transforms::KafkaSinkSingle(c) => c.transform_pushed(requests_wrapper).await, + #[cfg(feature = "kafka")] Transforms::KafkaSinkCluster(c) => c.transform_pushed(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraSinkSingle(c) => c.transform_pushed(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraSinkCluster(c) => c.transform_pushed(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraPeersRewrite(c) => c.transform_pushed(requests_wrapper).await, + #[cfg(all(feature = "redis", feature = "cassandra"))] Transforms::RedisCache(r) => r.transform_pushed(requests_wrapper).await, Transforms::Tee(m) => m.transform_pushed(requests_wrapper).await, Transforms::DebugPrinter(p) => p.transform_pushed(requests_wrapper).await, @@ -178,15 +221,20 @@ impl Transforms { Transforms::DebugForceParse(p) => p.transform_pushed(requests_wrapper).await, Transforms::NullSink(n) => n.transform_pushed(requests_wrapper).await, Transforms::Loopback(n) => n.transform_pushed(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::Protect(p) => p.transform_pushed(requests_wrapper).await, Transforms::DebugReturner(p) => p.transform_pushed(requests_wrapper).await, Transforms::DebugRandomDelay(p) => p.transform_pushed(requests_wrapper).await, Transforms::TuneableConsistencyScatter(tc) => { tc.transform_pushed(requests_wrapper).await } + #[cfg(feature = "redis")] Transforms::RedisSinkSingle(r) => r.transform_pushed(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisTimestampTagger(r) => r.transform_pushed(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisClusterPortsRewrite(r) => r.transform_pushed(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisSinkCluster(r) => r.transform_pushed(requests_wrapper).await, Transforms::ParallelMap(s) => s.transform_pushed(requests_wrapper).await, Transforms::PoolConnections(s) => s.transform_pushed(requests_wrapper).await, @@ -206,23 +254,33 @@ impl Transforms { fn set_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender) { match self { + #[cfg(feature = "kafka")] Transforms::KafkaSinkSingle(c) => c.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "kafka")] Transforms::KafkaSinkCluster(c) => c.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "cassandra")] Transforms::CassandraSinkSingle(c) => c.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "cassandra")] Transforms::CassandraSinkCluster(c) => c.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "cassandra")] Transforms::CassandraPeersRewrite(c) => c.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(all(feature = "redis", feature = "cassandra"))] Transforms::RedisCache(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::Tee(t) => t.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "redis")] Transforms::RedisSinkSingle(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::TuneableConsistencyScatter(c) => { c.set_pushed_messages_tx(pushed_messages_tx) } + #[cfg(feature = "redis")] Transforms::RedisTimestampTagger(r) => r.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "redis")] Transforms::RedisClusterPortsRewrite(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::DebugPrinter(p) => p.set_pushed_messages_tx(pushed_messages_tx), Transforms::DebugLogToFile(p) => p.set_pushed_messages_tx(pushed_messages_tx), Transforms::DebugForceParse(p) => p.set_pushed_messages_tx(pushed_messages_tx), Transforms::NullSink(n) => n.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "redis")] Transforms::RedisSinkCluster(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::ParallelMap(s) => s.set_pushed_messages_tx(pushed_messages_tx), Transforms::PoolConnections(s) => s.set_pushed_messages_tx(pushed_messages_tx), @@ -230,6 +288,7 @@ impl Transforms { Transforms::QueryTypeFilter(s) => s.set_pushed_messages_tx(pushed_messages_tx), Transforms::QueryCounter(s) => s.set_pushed_messages_tx(pushed_messages_tx), Transforms::Loopback(l) => l.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "cassandra")] Transforms::Protect(p) => p.set_pushed_messages_tx(pushed_messages_tx), Transforms::DebugReturner(d) => d.set_pushed_messages_tx(pushed_messages_tx), Transforms::DebugRandomDelay(d) => d.set_pushed_messages_tx(pushed_messages_tx), diff --git a/shotover/src/transforms/query_counter.rs b/shotover/src/transforms/query_counter.rs index 147175900..4eae79405 100644 --- a/shotover/src/transforms/query_counter.rs +++ b/shotover/src/transforms/query_counter.rs @@ -43,11 +43,13 @@ impl Transform for QueryCounter { async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result { for m in &mut requests_wrapper.requests { match m.frame() { + #[cfg(feature = "cassandra")] Some(Frame::Cassandra(frame)) => { for statement in frame.operation.queries() { counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => statement.short_name(), "type" => "cassandra"); } } + #[cfg(feature = "redis")] Some(Frame::Redis(frame)) => { if let Some(query_type) = get_redis_query_type(frame) { counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => query_type, "type" => "redis"); @@ -55,6 +57,7 @@ impl Transform for QueryCounter { counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => "unknown", "type" => "redis"); } } + #[cfg(feature = "kafka")] Some(Frame::Kafka(_)) => { counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => "unknown", "type" => "kafka"); } diff --git a/shotover/src/transforms/redis/mod.rs b/shotover/src/transforms/redis/mod.rs index f0a1e6789..c2957053e 100644 --- a/shotover/src/transforms/redis/mod.rs +++ b/shotover/src/transforms/redis/mod.rs @@ -1,5 +1,6 @@ use crate::transforms::util::ConnectionError; +#[cfg(all(feature = "redis", feature = "cassandra"))] pub mod cache; pub mod cluster_ports_rewrite; pub mod sink_cluster;