From 018054091d398431202c014406fcc3584fe8a59e Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 25 Jan 2024 21:09:16 +1100 Subject: [PATCH] Add feature flags for each protocol in shotover --- .cargo/config.toml | 17 +++- .github/workflows/build.yaml | 6 +- .github/workflows/lint.yaml | 4 +- Cargo.toml | 2 +- custom-transforms-example/Cargo.toml | 10 +- custom-transforms-example/src/main.rs | 2 + shotover-proxy/Cargo.toml | 7 +- shotover-proxy/benches/windsock/cloud/aws.rs | 2 +- shotover-proxy/benches/windsock/main.rs | 97 ++++++++++++------- shotover-proxy/tests/lib.rs | 9 +- shotover/Cargo.toml | 88 ++++++++++++----- shotover/benches/benches/main.rs | 1 - shotover/src/codec/mod.rs | 12 +++ shotover/src/config/topology.rs | 2 +- shotover/src/frame/mod.rs | 48 +++++++++ shotover/src/frame/value.rs | 22 +++++ shotover/src/lib.rs | 19 ++++ shotover/src/message/mod.rs | 79 +++++++++++++-- shotover/src/sources/mod.rs | 24 +++++ shotover/src/transforms/coalesce.rs | 2 +- shotover/src/transforms/debug/returner.rs | 27 +++--- shotover/src/transforms/filter.rs | 2 +- shotover/src/transforms/mod.rs | 77 +++++++++++++-- shotover/src/transforms/query_counter.rs | 46 +++++---- shotover/src/transforms/redis/mod.rs | 1 + .../util/cluster_connection_pool.rs | 2 +- windsock-cloud-docker/src/container.rs | 5 +- 27 files changed, 485 insertions(+), 128 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 3e1c66273..5d650e8d8 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -8,6 +8,17 @@ rustflags = [ linker = "aarch64-linux-gnu-gcc" [alias] -windsock = "test --release --bench windsock --features alpha-transforms,rdkafka-driver-tests --" -windsock-debug = "test --bench windsock --features alpha-transforms,rdkafka-driver-tests --" -windsock-cloud-docker = "run --package windsock-cloud-docker --" +# Can run every benchmark +windsock = "test --release --bench windsock --features kafka,alpha-transforms,rdkafka-driver-tests,cassandra,redis --" +windsock-debug = "test --bench windsock --features kafka,alpha-transforms,rdkafka-driver-tests,cassandra,redis --" + +# Can only run benchmarks specific to the protocol but compiles a lot faster +windsock-redis = "test --release --bench windsock --no-default-features --features redis,alpha-transforms --" +windsock-kafka = "test --release --bench windsock --no-default-features --features kafka,alpha-transforms,rdkafka-driver-tests --" +windsock-cassandra = "test --release --bench windsock --no-default-features --features cassandra,alpha-transforms --" + +# Compile benches in docker to ensure compiled libc version is compatible with the EC2 instances libc +windsock-cloud-docker = "run --package windsock-cloud-docker -- redis,cassandra,kafka" +windsock-cloud-docker-redis = "run --package windsock-cloud-docker -- redis" +windsock-cloud-docker-kafka = "run --package windsock-cloud-docker -- kafka" +windsock-cloud-docker-cassandra = "run --package windsock-cloud-docker -- cassandra" diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 42c3be366..98f6799b3 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -40,11 +40,13 @@ jobs: - name: Install ubuntu packages run: shotover-proxy/build/install_ubuntu_packages.sh - name: Install cargo-hack - run: cargo install cargo-hack --version 0.5.8 + uses: taiki-e/install-action@v2 + with: + tool: cargo-hack@0.6.16 - name: Ensure that dev tools compiles and has no warnings with no features enabled run: cargo clippy --locked ${{ matrix.cargo_flags }} --all-targets -- -D warnings - name: Ensure that shotover-proxy compiles and has no warnings under every possible combination of features # some things to explicitly point out: # * clippy also reports rustc warnings and errors # * clippy --all-targets is not run so we only build the shotover_proxy executable without the tests/benches - run: cargo hack --feature-powerset clippy --locked ${{ matrix.cargo_flags }} --package shotover-proxy -- -D warnings + run: cargo hack --feature-powerset --at-least-one-of redis,cassandra,kafka,opensearch clippy --locked ${{ matrix.cargo_flags }} --package shotover-proxy -- -D warnings diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 9a5e0eb31..d462a2e53 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -40,14 +40,14 @@ jobs: - name: Install cargo-hack uses: taiki-e/install-action@v2 with: - tool: cargo-hack@0.6.4 + tool: cargo-hack@0.6.16 - name: Ensure `cargo fmt --all` was run run: cargo fmt --all -- --check - name: Ensure that all crates compile and have no warnings under every possible combination of features # some things to explicitly point out: # * clippy also reports rustc warnings and errors # * clippy --all-targets causes clippy to run against tests and examples which it doesnt do by default. - run: cargo hack --feature-powerset clippy --all-targets --locked -- -D warnings + run: cargo hack --feature-powerset --at-least-one-of redis,cassandra,kafka,opensearch clippy --all-targets --locked -- -D warnings - name: Report disk usage run: | df -h diff --git a/Cargo.toml b/Cargo.toml index e71a31f2c..a6ca3a449 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ codegen-units = 1 scylla = { version = "0.11.0", features = ["ssl"] } bytes = "1.0.0" tokio = { version = "1.25.0", features = ["full", "macros"] } -tokio-util = { version = "0.7.7" } +tokio-util = { version = "0.7.7", features = ["codec"] } tokio-openssl = "0.6.2" itertools = "0.12.0" openssl = { version = "0.10.36", features = ["vendored"] } diff --git a/custom-transforms-example/Cargo.toml b/custom-transforms-example/Cargo.toml index 5663e36b5..6b13cf9aa 100644 --- a/custom-transforms-example/Cargo.toml +++ b/custom-transforms-example/Cargo.toml @@ -8,7 +8,7 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -shotover = { path = "../shotover" } +shotover = { path = "../shotover", default-features = false} anyhow.workspace = true serde.workspace = true async-trait.workspace = true @@ -20,3 +20,11 @@ typetag.workspace = true test-helpers = {path = "../test-helpers"} tokio.workspace = true redis.workspace = true + +[features] +redis = ["shotover/redis"] +default = ["redis"] + +[[test]] +name = "test" +required-features = ["redis"] diff --git a/custom-transforms-example/src/main.rs b/custom-transforms-example/src/main.rs index e05f35716..c4bae05f8 100644 --- a/custom-transforms-example/src/main.rs +++ b/custom-transforms-example/src/main.rs @@ -1,6 +1,8 @@ use shotover::runner::Shotover; +#[cfg(feature = "redis")] mod redis_get_rewrite; +#[cfg(feature = "redis")] shotover::import_transform!(redis_get_rewrite::RedisGetRewriteConfig); fn main() { diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index b99b56064..ccc8c4f30 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -8,7 +8,7 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -shotover = { path = "../shotover" } +shotover = { path = "../shotover", default-features = false} [dev-dependencies] prometheus-parse = "0.2.4" @@ -60,8 +60,13 @@ shell-quote.workspace = true [features] # Include WIP alpha transforms in the public API alpha-transforms = ["shotover/alpha-transforms"] +cassandra = ["shotover/cassandra"] +kafka = ["shotover/kafka"] +redis = ["shotover/redis"] +opensearch = ["shotover/opensearch"] cassandra-cpp-driver-tests = ["test-helpers/cassandra-cpp-driver-tests"] rdkafka-driver-tests = ["test-helpers/rdkafka-driver-tests"] +default = ["cassandra", "kafka", "redis", "opensearch"] [[bench]] name = "windsock" diff --git a/shotover-proxy/benches/windsock/cloud/aws.rs b/shotover-proxy/benches/windsock/cloud/aws.rs index 322df3c4c..d5b668d3d 100644 --- a/shotover-proxy/benches/windsock/cloud/aws.rs +++ b/shotover-proxy/benches/windsock/cloud/aws.rs @@ -255,7 +255,7 @@ sudo docker system prune -af"#, } } - #[cfg(feature = "rdkafka-driver-tests")] + #[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))] pub async fn run_shotover(self: Arc, topology: &str) -> RunningShotover { self.instance .ssh() diff --git a/shotover-proxy/benches/windsock/main.rs b/shotover-proxy/benches/windsock/main.rs index aa449a043..fbf4e3d51 100644 --- a/shotover-proxy/benches/windsock/main.rs +++ b/shotover-proxy/benches/windsock/main.rs @@ -1,19 +1,33 @@ +// Allow dead code if any of the protocol features are disabled +#![cfg_attr( + any( + not(feature = "cassandra"), + not(feature = "redis"), + not(all(feature = "rdkafka-driver-tests", feature = "kafka")) + ), + allow(dead_code, unused_imports) +)] + +#[cfg(feature = "cassandra")] mod cassandra; mod cloud; mod common; -#[cfg(feature = "rdkafka-driver-tests")] +#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))] mod kafka; mod profilers; +#[cfg(feature = "redis")] mod redis; mod shotover; -use crate::cassandra::*; -use crate::common::*; -#[cfg(feature = "rdkafka-driver-tests")] -use crate::kafka::*; -use crate::redis::*; +#[cfg(feature = "cassandra")] +use cassandra::*; use cloud::CloudResources; use cloud::CloudResourcesRequired; +use common::*; +#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))] +use kafka::*; +#[cfg(feature = "redis")] +use redis::*; use std::path::Path; use tracing_subscriber::EnvFilter; use windsock::{Bench, Windsock}; @@ -22,7 +36,7 @@ type ShotoverBench = Box< dyn Bench, >; -fn main() { +pub fn main() { let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout()); tracing_subscriber::fmt() .with_env_filter(EnvFilter::from_default_env()) @@ -41,6 +55,7 @@ fn main() { .unwrap(); } + #[cfg(feature = "cassandra")] let cassandra_benches = itertools::iproduct!( [CassandraDb::Cassandra], [CassandraTopology::Single, CassandraTopology::Cluster3], @@ -88,7 +103,36 @@ fn main() { )) as ShotoverBench) }, ); - #[cfg(feature = "rdkafka-driver-tests")] + #[cfg(not(feature = "cassandra"))] + let cassandra_benches = std::iter::empty(); + #[cfg(feature = "cassandra")] + let cassandra_mock_benches = vec![ + Box::new(CassandraBench::new( + CassandraDb::Mocked, + CassandraTopology::Single, + Shotover::None, + Compression::None, + Operation::ReadI64, + CassandraProtocol::V4, + CassandraDriver::Scylla, + 10, + )) as ShotoverBench, + Box::new(CassandraBench::new( + CassandraDb::Mocked, + CassandraTopology::Single, + Shotover::Standard, + Compression::None, + Operation::ReadI64, + CassandraProtocol::V4, + CassandraDriver::Scylla, + 10, + )), + ] + .into_iter(); + #[cfg(not(feature = "cassandra"))] + let cassandra_mock_benches = std::iter::empty(); + + #[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))] let kafka_benches = itertools::iproduct!( [ Shotover::None, @@ -105,9 +149,10 @@ fn main() { .map(|(shotover, topology, size)| { Box::new(KafkaBench::new(shotover, topology, size)) as ShotoverBench }); - #[cfg(not(feature = "rdkafka-driver-tests"))] + #[cfg(not(all(feature = "rdkafka-driver-tests", feature = "kafka")))] let kafka_benches = std::iter::empty(); + #[cfg(feature = "redis")] let redis_benches = itertools::iproduct!( [RedisTopology::Cluster3, RedisTopology::Single], [ @@ -121,35 +166,15 @@ fn main() { .map(|(topology, shotover, operation, encryption)| { Box::new(RedisBench::new(topology, shotover, operation, encryption)) as ShotoverBench }); + #[cfg(not(feature = "redis"))] + let redis_benches = std::iter::empty(); Windsock::new( - vec![ - Box::new(CassandraBench::new( - CassandraDb::Mocked, - CassandraTopology::Single, - Shotover::None, - Compression::None, - Operation::ReadI64, - CassandraProtocol::V4, - CassandraDriver::Scylla, - 10, - )) as ShotoverBench, - Box::new(CassandraBench::new( - CassandraDb::Mocked, - CassandraTopology::Single, - Shotover::Standard, - Compression::None, - Operation::ReadI64, - CassandraProtocol::V4, - CassandraDriver::Scylla, - 10, - )), - ] - .into_iter() - .chain(cassandra_benches) - .chain(kafka_benches) - .chain(redis_benches) - .collect(), + cassandra_mock_benches + .chain(cassandra_benches) + .chain(kafka_benches) + .chain(redis_benches) + .collect(), cloud::AwsCloud::new_boxed(), &["release"], ) diff --git a/shotover-proxy/tests/lib.rs b/shotover-proxy/tests/lib.rs index 06e7afc31..330bafd47 100644 --- a/shotover-proxy/tests/lib.rs +++ b/shotover-proxy/tests/lib.rs @@ -1,15 +1,20 @@ -#[allow(clippy::single_component_path_imports)] +#[allow(clippy::single_component_path_imports, unused_imports)] use rstest_reuse; use test_helpers::shotover_process::ShotoverProcessBuilder; use tokio_bin_process::bin_path; +#[cfg(feature = "cassandra")] mod cassandra_int_tests; +#[cfg(feature = "kafka")] mod kafka_int_tests; -#[cfg(feature = "alpha-transforms")] +#[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] mod opensearch_int_tests; +#[cfg(feature = "redis")] mod redis_int_tests; +#[cfg(feature = "redis")] mod runner; +#[cfg(feature = "redis")] mod transforms; pub fn shotover_process(topology_path: &str) -> ShotoverProcessBuilder { diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 8b29aff86..cbf1206c0 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -12,6 +12,39 @@ description = "Shotover API for building custom transforms" [features] # Include WIP alpha transforms in the public API alpha-transforms = [] +cassandra = [ + "dep:cassandra-protocol", + "dep:cql3-parser", + "dep:lz4_flex", + "dep:version-compare", + "dep:aws-sdk-kms", + "dep:aws-config", + "dep:base64", + "dep:serde_json", + "dep:halfbrown", + "dep:chacha20poly1305", + "dep:generic-array", + "dep:hex", + "dep:bincode", + "dep:cached", +] +kafka = [ + "dep:kafka-protocol", + "dep:dashmap", + "dep:xxhash-rust", + "dep:string", +] +redis = [ + "dep:redis-protocol", + "dep:csv", + "dep:crc16", +] +opensearch = [ + "dep:atoi", + "dep:http", + "dep:httparse", +] +default = ["cassandra", "redis", "kafka", "opensearch"] [dependencies] atomic_enum = "0.2.0" @@ -19,12 +52,12 @@ pretty-hex = "0.4.0" tokio-stream = "0.1.2" bytes-utils = "0.1.1" derivative = "2.1.1" -cached = { version = "0.48", features = ["async"] } +cached = { version = "0.48", features = ["async"], optional = true } governor = { version = "0.6", default-features = false, features = ["std", "jitter", "quanta"] } nonzero_ext = "0.3.0" -version-compare = "0.1" +version-compare = { version = "0.1", optional = true } rand = { features = ["small_rng"], workspace = true } -lz4_flex = "0.11.0" +lz4_flex = { version = "0.11.0", optional = true } clap.workspace = true itertools.workspace = true rand_distr.workspace = true @@ -32,9 +65,8 @@ bytes.workspace = true futures.workspace = true tokio.workspace = true tokio-util.workspace = true -csv.workspace = true -hex.workspace = true -hex-literal.workspace = true +csv = { workspace = true, optional = true } +hex = { workspace = true, optional = true } async-trait.workspace = true typetag.workspace = true tokio-tungstenite = "0.21.0" @@ -46,17 +78,17 @@ backtrace = "0.3.66" backtrace-ext = "0.2" # Parsers -cql3-parser = "0.4.0" +cql3-parser = { version = "0.4.0", optional = true } serde.workspace = true -serde_json.workspace = true +serde_json = { workspace = true, optional = true } serde_yaml.workspace = true -bincode.workspace = true +bincode = { workspace = true, optional = true } num = { version = "0.4.0", features = ["serde"] } -uuid.workspace = true +uuid = { workspace = true } bigdecimal = { version = "0.4.0", features = ["serde"] } -base64 = "0.21.0" -httparse = "1.8.0" -http = "0.2.9" +base64 = { version = "0.21.0", optional = true } +httparse = { version = "1.8.0", optional = true } +http = { version = "0.2.9", optional = true } #Observability metrics = "0.21.0" @@ -65,33 +97,37 @@ tracing.workspace = true tracing-subscriber.workspace = true tracing-appender.workspace = true hyper.workspace = true -halfbrown = "0.2.1" +halfbrown = { version = "0.2.1", optional = true } # Transform dependencies -redis-protocol.workspace = true -cassandra-protocol.workspace = true -crc16 = "0.4.0" +redis-protocol = { workspace = true, optional = true } +cassandra-protocol = { workspace = true, optional = true } +crc16 = { version = "0.4.0", optional = true } ordered-float.workspace = true #Crypto -aws-config = "1.0.0" -aws-sdk-kms = "1.1.0" +aws-config = { version = "1.0.0", optional = true } +aws-sdk-kms = { version = "1.1.0", optional = true } strum_macros = "0.26" -chacha20poly1305 = { version = "0.10.0", features = ["std"] } -generic-array = { version = "0.14", features = ["serde"] } -kafka-protocol = "0.8.0" +chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true } +generic-array = { version = "0.14", features = ["serde"], optional = true } +kafka-protocol = { version = "0.8.0", optional = true } rustls = { version = "0.22.0" } tokio-rustls = "0.25" rustls-pemfile = "2.0.0" rustls-pki-types = "1.0.1" -string = "0.3.0" -xxhash-rust = { version = "0.8.6", features = ["xxh3"] } -dashmap = "5.4.0" -atoi = "2.0.0" +string = { version = "0.3.0", optional = true } +xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true } +dashmap = { version = "5.4.0", optional = true } +atoi = { version = "2.0.0", optional = true } [dev-dependencies] criterion = { version = "0.5.0", features = ["async_tokio"] } +hex-literal.workspace = true +# TODO: Optionally compiling benches is quite tricky with criterion, maybe it would be easier with divan? +# For now just set required features [[bench]] name = "benches" harness = false +required-features = ["cassandra", "redis", "kafka"] diff --git a/shotover/benches/benches/main.rs b/shotover/benches/benches/main.rs index 68cb60ba4..2b5d5ba28 100644 --- a/shotover/benches/benches/main.rs +++ b/shotover/benches/benches/main.rs @@ -1,5 +1,4 @@ use criterion::criterion_main; - mod chain; mod codec; diff --git a/shotover/src/codec/mod.rs b/shotover/src/codec/mod.rs index d7f2731eb..76f2a89a7 100644 --- a/shotover/src/codec/mod.rs +++ b/shotover/src/codec/mod.rs @@ -1,15 +1,21 @@ //! Codec types to use for connecting to a DB in a sink transform use crate::message::Messages; +#[cfg(feature = "cassandra")] use cassandra_protocol::compression::Compression; use core::fmt; +#[cfg(feature = "kafka")] use kafka::RequestHeader; use metrics::{register_histogram, Histogram}; use tokio_util::codec::{Decoder, Encoder}; +#[cfg(feature = "cassandra")] pub mod cassandra; +#[cfg(feature = "kafka")] pub mod kafka; +#[cfg(feature = "opensearch")] pub mod opensearch; +#[cfg(feature = "redis")] pub mod redis; #[derive(Eq, PartialEq, Copy, Clone)] @@ -40,18 +46,23 @@ pub fn message_latency(direction: Direction, destination_name: String) -> Histog #[derive(Debug, Clone, PartialEq, Copy)] pub enum CodecState { + #[cfg(feature = "cassandra")] Cassandra { compression: Compression, }, + #[cfg(feature = "redis")] Redis, + #[cfg(feature = "kafka")] Kafka { request_header: Option, }, Dummy, + #[cfg(feature = "opensearch")] OpenSearch, } impl CodecState { + #[cfg(feature = "cassandra")] pub fn as_cassandra(&self) -> Compression { match self { CodecState::Cassandra { compression } => *compression, @@ -61,6 +72,7 @@ impl CodecState { } } + #[cfg(feature = "kafka")] pub fn as_kafka(&self) -> Option { match self { CodecState::Kafka { request_header } => *request_header, diff --git a/shotover/src/config/topology.rs b/shotover/src/config/topology.rs index f34c56487..374677ea5 100644 --- a/shotover/src/config/topology.rs +++ b/shotover/src/config/topology.rs @@ -60,7 +60,7 @@ impl Topology { } } -#[cfg(test)] +#[cfg(all(test, feature = "redis", feature = "cassandra"))] mod topology_tests { use crate::config::chain::TransformChainConfig; use crate::config::topology::Topology; diff --git a/shotover/src/frame/mod.rs b/shotover/src/frame/mod.rs index 3b89f55c4..1c34762e5 100644 --- a/shotover/src/frame/mod.rs +++ b/shotover/src/frame/mod.rs @@ -3,34 +3,51 @@ use crate::{codec::CodecState, message::ProtocolType}; use anyhow::{anyhow, Result}; use bytes::Bytes; +#[cfg(feature = "cassandra")] pub use cassandra::{CassandraFrame, CassandraOperation, CassandraResult}; +#[cfg(feature = "cassandra")] use cassandra_protocol::compression::Compression; +#[cfg(feature = "kafka")] use kafka::KafkaFrame; +#[cfg(feature = "opensearch")] pub use opensearch::OpenSearchFrame; +#[cfg(feature = "redis")] pub use redis_protocol::resp2::types::Frame as RedisFrame; use std::fmt::{Display, Formatter, Result as FmtResult}; +#[cfg(feature = "cassandra")] pub mod cassandra; +#[cfg(feature = "kafka")] pub mod kafka; +#[cfg(feature = "opensearch")] pub mod opensearch; +#[cfg(feature = "redis")] pub mod redis; pub mod value; #[derive(PartialEq, Debug, Clone, Copy)] pub enum MessageType { + #[cfg(feature = "redis")] Redis, + #[cfg(feature = "cassandra")] Cassandra, + #[cfg(feature = "kafka")] Kafka, Dummy, + #[cfg(feature = "opensearch")] OpenSearch, } impl From<&ProtocolType> for MessageType { fn from(value: &ProtocolType) -> Self { match value { + #[cfg(feature = "cassandra")] ProtocolType::Cassandra { .. } => Self::Cassandra, + #[cfg(feature = "redis")] ProtocolType::Redis => Self::Redis, + #[cfg(feature = "kafka")] ProtocolType::Kafka { .. } => Self::Kafka, + #[cfg(feature = "opensearch")] ProtocolType::OpenSearch => Self::OpenSearch, } } @@ -39,14 +56,18 @@ impl From<&ProtocolType> for MessageType { impl Frame { pub fn as_codec_state(&self) -> CodecState { match self { + #[cfg(feature = "cassandra")] Frame::Cassandra(_) => CodecState::Cassandra { compression: Compression::None, }, + #[cfg(feature = "redis")] Frame::Redis(_) => CodecState::Redis, + #[cfg(feature = "kafka")] Frame::Kafka(_) => CodecState::Kafka { request_header: None, }, Frame::Dummy => CodecState::Dummy, + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => CodecState::OpenSearch, } } @@ -54,12 +75,16 @@ impl Frame { #[derive(PartialEq, Debug, Clone)] pub enum Frame { + #[cfg(feature = "cassandra")] Cassandra(CassandraFrame), + #[cfg(feature = "redis")] Redis(RedisFrame), + #[cfg(feature = "kafka")] Kafka(KafkaFrame), /// Represents a message that has must exist due to shotovers requirement that every request has a corresponding response. /// It exists purely to keep transform invariants and codecs will completely ignore this frame when they receive it Dummy, + #[cfg(feature = "opensearch")] OpenSearch(OpenSearchFrame), } @@ -70,40 +95,53 @@ impl Frame { codec_state: CodecState, ) -> Result { match message_type { + #[cfg(feature = "cassandra")] MessageType::Cassandra => { CassandraFrame::from_bytes(bytes, codec_state.as_cassandra()).map(Frame::Cassandra) } + #[cfg(feature = "redis")] MessageType::Redis => redis_protocol::resp2::decode::decode(&bytes) .map(|x| Frame::Redis(x.unwrap().0)) .map_err(|e| anyhow!("{e:?}")), + #[cfg(feature = "kafka")] MessageType::Kafka => { KafkaFrame::from_bytes(bytes, codec_state.as_kafka()).map(Frame::Kafka) } MessageType::Dummy => Ok(Frame::Dummy), + #[cfg(feature = "opensearch")] MessageType::OpenSearch => Ok(Frame::OpenSearch(OpenSearchFrame::from_bytes(&bytes)?)), } } pub fn name(&self) -> &'static str { match self { + #[cfg(feature = "redis")] Frame::Redis(_) => "Redis", + #[cfg(feature = "cassandra")] Frame::Cassandra(_) => "Cassandra", + #[cfg(feature = "kafka")] Frame::Kafka(_) => "Kafka", Frame::Dummy => "Dummy", + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => "OpenSearch", } } pub fn get_type(&self) -> MessageType { match self { + #[cfg(feature = "cassandra")] Frame::Cassandra(_) => MessageType::Cassandra, + #[cfg(feature = "redis")] Frame::Redis(_) => MessageType::Redis, + #[cfg(feature = "kafka")] Frame::Kafka(_) => MessageType::Kafka, Frame::Dummy => MessageType::Dummy, + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => MessageType::OpenSearch, } } + #[cfg(feature = "redis")] pub fn redis(&mut self) -> Result<&mut RedisFrame> { match self { Frame::Redis(frame) => Ok(frame), @@ -114,6 +152,7 @@ impl Frame { } } + #[cfg(feature = "kafka")] pub fn into_kafka(self) -> Result { match self { Frame::Kafka(frame) => Ok(frame), @@ -124,6 +163,7 @@ impl Frame { } } + #[cfg(feature = "redis")] pub fn into_redis(self) -> Result { match self { Frame::Redis(frame) => Ok(frame), @@ -134,8 +174,10 @@ impl Frame { } } + #[cfg(feature = "cassandra")] pub fn into_cassandra(self) -> Result { match self { + #[cfg(feature = "cassandra")] Frame::Cassandra(frame) => Ok(frame), frame => Err(anyhow!( "Expected cassandra frame but received {} frame", @@ -144,8 +186,10 @@ impl Frame { } } + #[cfg(feature = "opensearch")] pub fn into_opensearch(self) -> Result { match self { + #[cfg(feature = "opensearch")] Frame::OpenSearch(frame) => Ok(frame), frame => Err(anyhow!( "Expected opensearch frame but received {} frame", @@ -158,10 +202,14 @@ impl Frame { impl Display for Frame { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { match self { + #[cfg(feature = "cassandra")] Frame::Cassandra(frame) => write!(f, "Cassandra {}", frame), + #[cfg(feature = "redis")] Frame::Redis(frame) => write!(f, "Redis {:?}", frame), + #[cfg(feature = "kafka")] Frame::Kafka(frame) => write!(f, "Kafka {}", frame), Frame::Dummy => write!(f, "Shotover internal dummy message"), + #[cfg(feature = "opensearch")] Frame::OpenSearch(frame) => write!(f, "OpenSearch: {:?}", frame), } } diff --git a/shotover/src/frame/value.rs b/shotover/src/frame/value.rs index 0a6213414..882763eb7 100644 --- a/shotover/src/frame/value.rs +++ b/shotover/src/frame/value.rs @@ -1,11 +1,16 @@ //! Generic representations of data types that appear in messages +#[cfg(feature = "cassandra")] use crate::frame::cassandra::to_cassandra_type; +#[cfg(feature = "redis")] use crate::frame::RedisFrame; use bigdecimal::BigDecimal; use bytes::Bytes; +#[cfg(feature = "cassandra")] use cassandra_protocol::frame::Serialize as FrameSerialize; +#[cfg(feature = "cassandra")] use cassandra_protocol::types::CInt; +#[cfg(feature = "cassandra")] use cassandra_protocol::{ frame::{ message_result::{ColSpec, ColTypeOption}, @@ -16,11 +21,13 @@ use cassandra_protocol::{ CBytes, }, }; +#[cfg(feature = "cassandra")] use cql3_parser::common::Operand; use num::BigInt; use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, BTreeSet}; +#[cfg(feature = "cassandra")] use std::io::{Cursor, Write}; use std::net::IpAddr; use uuid::Uuid; @@ -70,12 +77,14 @@ pub struct Duration { pub nanoseconds: i64, } +#[cfg(feature = "cassandra")] impl From<&Operand> for GenericValue { fn from(operand: &Operand) -> Self { GenericValue::create_element(to_cassandra_type(operand)) } } +#[cfg(feature = "redis")] impl From for GenericValue { fn from(f: RedisFrame) -> Self { match f { @@ -93,6 +102,7 @@ impl From for GenericValue { } } +#[cfg(feature = "redis")] impl From<&RedisFrame> for GenericValue { fn from(f: &RedisFrame) -> Self { match f.clone() { @@ -110,6 +120,7 @@ impl From<&RedisFrame> for GenericValue { } } +#[cfg(feature = "redis")] impl From for RedisFrame { fn from(value: GenericValue) -> RedisFrame { match value { @@ -150,6 +161,7 @@ impl GenericValue { GenericValue::Bytes(Bytes::from(str)) } + #[cfg(feature = "cassandra")] pub fn build_value_from_cstar_col_type( version: Version, spec: &ColSpec, @@ -159,6 +171,7 @@ impl GenericValue { GenericValue::create_element(cassandra_type) } + #[cfg(feature = "cassandra")] fn into_cassandra_type( version: Version, col_type: &ColTypeOption, @@ -168,6 +181,7 @@ impl GenericValue { wrapper(data, col_type, version).unwrap() } + #[cfg(feature = "cassandra")] fn create_element(element: CassandraType) -> GenericValue { match element { CassandraType::Ascii(a) => GenericValue::Ascii(a), @@ -233,6 +247,7 @@ impl GenericValue { } } + #[cfg(feature = "cassandra")] pub fn cassandra_serialize(&self, cursor: &mut Cursor<&mut Vec>) { match self { GenericValue::Null => cursor.write_all(&[255, 255, 255, 255]).unwrap(), @@ -293,6 +308,7 @@ impl GenericValue { } } +#[cfg(feature = "cassandra")] pub(crate) fn serialize_with_length_prefix( cursor: &mut Cursor<&mut Vec>, serializer: impl FnOnce(&mut Cursor<&mut Vec>), @@ -311,16 +327,19 @@ pub(crate) fn serialize_with_length_prefix( .copy_from_slice(&(bytes_len as CInt).to_be_bytes()); } +#[cfg(feature = "cassandra")] pub(crate) fn serialize_len(cursor: &mut Cursor<&mut Vec>, len: usize) { let len = len as CInt; cursor.write_all(&len.to_be_bytes()).unwrap(); } +#[cfg(feature = "cassandra")] fn serialize_bytes(cursor: &mut Cursor<&mut Vec>, bytes: &[u8]) { serialize_len(cursor, bytes.len()); cursor.write_all(bytes).unwrap(); } +#[cfg(feature = "cassandra")] fn serialize_list(cursor: &mut Cursor<&mut Vec>, values: &[GenericValue]) { serialize_with_length_prefix(cursor, |cursor| { serialize_len(cursor, values.len()); @@ -332,6 +351,7 @@ fn serialize_list(cursor: &mut Cursor<&mut Vec>, values: &[GenericValue]) { } #[allow(clippy::mutable_key_type)] +#[cfg(feature = "cassandra")] fn serialize_set(cursor: &mut Cursor<&mut Vec>, values: &BTreeSet) { serialize_with_length_prefix(cursor, |cursor| { serialize_len(cursor, values.len()); @@ -342,6 +362,7 @@ fn serialize_set(cursor: &mut Cursor<&mut Vec>, values: &BTreeSet>, values: &BTreeMap) { serialize_with_length_prefix(cursor, |cursor| { serialize_len(cursor, values.len()); @@ -353,6 +374,7 @@ fn serialize_stringmap(cursor: &mut Cursor<&mut Vec>, values: &BTreeMap>, values: &BTreeMap) { serialize_with_length_prefix(cursor, |cursor| { diff --git a/shotover/src/lib.rs b/shotover/src/lib.rs index 1fa941900..921b6efd4 100644 --- a/shotover/src/lib.rs +++ b/shotover/src/lib.rs @@ -30,6 +30,25 @@ #![deny(clippy::print_stdout)] #![deny(clippy::print_stderr)] #![allow(clippy::needless_doctest_main)] +// Allow dead code if any of the protocol features are disabled +#![cfg_attr( + any( + not(feature = "cassandra"), + not(feature = "redis"), + not(feature = "kafka"), + not(feature = "opensearch"), + ), + allow(dead_code, unused_imports, unused_variables) +)] +#[cfg(all( + not(feature = "cassandra"), + not(feature = "redis"), + not(feature = "kafka"), + not(feature = "opensearch"), +))] +compile_error!( + "At least one protocol feature must be enabled, e.g. `cassandra`, `redis`, `kafka` or `opensearch`" +); pub mod codec; pub mod config; diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 0a581fa0a..0f51260f5 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -1,17 +1,27 @@ //! Message and supporting types - used to hold a message/query/result going between the client and database +#[cfg(feature = "kafka")] use crate::codec::kafka::RequestHeader; use crate::codec::CodecState; +#[cfg(feature = "cassandra")] use crate::frame::cassandra::Tracing; +#[cfg(feature = "redis")] use crate::frame::redis::redis_query_type; +#[cfg(feature = "cassandra")] +use crate::frame::CassandraFrame; +#[cfg(feature = "redis")] +use crate::frame::RedisFrame; +#[cfg(feature = "cassandra")] use crate::frame::{ cassandra, cassandra::{CassandraMetadata, CassandraOperation}, }; -use crate::frame::{CassandraFrame, Frame, MessageType, RedisFrame}; +use crate::frame::{Frame, MessageType}; use anyhow::{anyhow, Context, Result}; -use bytes::{Buf, Bytes}; +use bytes::Bytes; +#[cfg(feature = "cassandra")] use cassandra_protocol::compression::Compression; +#[cfg(feature = "cassandra")] use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType}; use derivative::Derivative; use nonzero_ext::nonzero; @@ -20,34 +30,44 @@ use std::num::NonZeroU32; use std::time::Instant; pub enum Metadata { + #[cfg(feature = "cassandra")] Cassandra(CassandraMetadata), + #[cfg(feature = "redis")] Redis, + #[cfg(feature = "kafka")] Kafka, + #[cfg(feature = "opensearch")] OpenSearch, } #[derive(PartialEq)] pub enum ProtocolType { - Cassandra { - compression: Compression, - }, + #[cfg(feature = "cassandra")] + Cassandra { compression: Compression }, + #[cfg(feature = "redis")] Redis, + #[cfg(feature = "kafka")] Kafka { request_header: Option, }, + #[cfg(feature = "opensearch")] OpenSearch, } impl From<&ProtocolType> for CodecState { fn from(value: &ProtocolType) -> Self { match value { + #[cfg(feature = "cassandra")] ProtocolType::Cassandra { compression } => Self::Cassandra { compression: *compression, }, + #[cfg(feature = "redis")] ProtocolType::Redis => Self::Redis, + #[cfg(feature = "kafka")] ProtocolType::Kafka { request_header } => Self::Kafka { request_header: *request_header, }, + #[cfg(feature = "opensearch")] ProtocolType::OpenSearch => Self::OpenSearch, } } @@ -244,20 +264,30 @@ impl Message { pub fn cell_count(&self) -> Result { Ok(match self.inner.as_ref().unwrap() { MessageInner::RawBytes { + #[cfg(feature = "cassandra")] bytes, message_type, + .. } => match message_type { + #[cfg(feature = "redis")] MessageType::Redis => nonzero!(1u32), + #[cfg(feature = "cassandra")] MessageType::Cassandra => cassandra::raw_frame::cell_count(bytes)?, + #[cfg(feature = "kafka")] MessageType::Kafka => todo!(), MessageType::Dummy => nonzero!(1u32), + #[cfg(feature = "opensearch")] MessageType::OpenSearch => todo!(), }, MessageInner::Modified { frame } | MessageInner::Parsed { frame, .. } => match frame { + #[cfg(feature = "cassandra")] Frame::Cassandra(frame) => frame.cell_count()?, + #[cfg(feature = "redis")] Frame::Redis(_) => nonzero!(1u32), + #[cfg(feature = "kafka")] Frame::Kafka(_) => todo!(), Frame::Dummy => nonzero!(1u32), + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => todo!(), }, }) @@ -278,10 +308,14 @@ impl Message { pub fn get_query_type(&mut self) -> QueryType { match self.frame() { + #[cfg(feature = "cassandra")] Some(Frame::Cassandra(cassandra)) => cassandra.get_query_type(), + #[cfg(feature = "redis")] Some(Frame::Redis(redis)) => redis_query_type(redis), // free-standing function as we cant define methods on RedisFrame + #[cfg(feature = "kafka")] Some(Frame::Kafka(_)) => todo!(), Some(Frame::Dummy) => todo!(), + #[cfg(feature = "opensearch")] Some(Frame::OpenSearch(_)) => todo!(), None => QueryType::ReadWrite, } @@ -290,20 +324,23 @@ impl Message { /// Returns an error response with the provided error message. /// If self is a request: the returned `Message` is a valid response to self /// If self is a response: the returned `Message` is a valid replacement of self - pub fn to_error_response(&self, error: String) -> Result { + pub fn to_error_response(&self, _error: String) -> Result { + #[allow(unreachable_code)] Ok(Message::from_frame(match self.metadata().context("Failed to parse metadata of request or response when producing an error")? { + #[cfg(feature = "redis")] Metadata::Redis => { // Redis errors can not contain newlines at the protocol level - let message = format!("ERR {error}") + let message = format!("ERR {_error}") .replace("\r\n", " ") .replace('\n', " "); Frame::Redis(RedisFrame::Error(message.into())) } + #[cfg(feature = "cassandra")] Metadata::Cassandra(frame) => Frame::Cassandra(CassandraFrame { version: frame.version, stream_id: frame.stream_id, operation: CassandraOperation::Error(ErrorBody { - message: error, + message: _error, ty: ErrorType::Server, }), tracing: Tracing::Response(None), @@ -313,9 +350,11 @@ impl Message { // * kafka errors are defined per response type and many response types only provide an error code without a field for a custom error message. // + Implementing this per response type would add a lot of (localized) complexity but might be worth it. // * the official C++ kafka driver we use for integration tests does not pick up errors sent just before closing a connection, so this wouldnt help the usage in server.rs where we send an error before terminating the connection for at least that driver. - Metadata::Kafka => return Err(anyhow!(error).context( + #[cfg(feature = "kafka")] + Metadata::Kafka => return Err(anyhow!(_error).context( "A generic error cannot be formed because the kafka protocol does not support it", )), + #[cfg(feature = "opensearch")] Metadata::OpenSearch => unimplemented!() })) } @@ -324,22 +363,32 @@ impl Message { pub fn metadata(&self) -> Result { match self.inner.as_ref().unwrap() { MessageInner::RawBytes { + #[cfg(feature = "cassandra")] bytes, message_type, + .. } => match message_type { + #[cfg(feature = "cassandra")] MessageType::Cassandra => { Ok(Metadata::Cassandra(cassandra::raw_frame::metadata(bytes)?)) } + #[cfg(feature = "redis")] MessageType::Redis => Ok(Metadata::Redis), + #[cfg(feature = "kafka")] MessageType::Kafka => Ok(Metadata::Kafka), MessageType::Dummy => Err(anyhow!("Dummy has no metadata")), + #[cfg(feature = "opensearch")] MessageType::OpenSearch => Err(anyhow!("OpenSearch has no metadata")), }, MessageInner::Parsed { frame, .. } | MessageInner::Modified { frame } => match frame { + #[cfg(feature = "cassandra")] Frame::Cassandra(frame) => Ok(Metadata::Cassandra(frame.metadata())), + #[cfg(feature = "kafka")] Frame::Kafka(_) => Ok(Metadata::Kafka), + #[cfg(feature = "redis")] Frame::Redis(_) => Ok(Metadata::Redis), Frame::Dummy => Err(anyhow!("dummy has no metadata")), + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => Err(anyhow!("OpenSearch has no metadata")), }, } @@ -351,6 +400,7 @@ impl Message { *self = Message::from_frame_at_instant( match metadata { + #[cfg(feature = "cassandra")] Metadata::Cassandra(metadata) => { let body = CassandraOperation::Error(ErrorBody { message: "Server overloaded".into(), @@ -365,10 +415,15 @@ impl Message { operation: body, }) } + #[cfg(feature = "redis")] Metadata::Redis => unimplemented!(), + #[cfg(feature = "kafka")] Metadata::Kafka => unimplemented!(), + #[cfg(feature = "opensearch")] Metadata::OpenSearch => unimplemented!(), }, + // reachable with feature = cassandra + #[allow(unreachable_code)] self.received_from_source_or_sink_at, ); @@ -381,10 +436,12 @@ impl Message { // For now its just written to match cassandra's stream_id field pub fn stream_id(&self) -> Option { match &self.inner { + #[cfg(feature = "cassandra")] Some(MessageInner::RawBytes { bytes, message_type: MessageType::Cassandra, }) => { + use bytes::Buf; const HEADER_LEN: usize = 9; if bytes.len() >= HEADER_LEN { Some((&bytes[2..4]).get_i16()) @@ -395,10 +452,14 @@ impl Message { Some(MessageInner::RawBytes { .. }) => None, Some(MessageInner::Parsed { frame, .. } | MessageInner::Modified { frame }) => { match frame { + #[cfg(feature = "cassandra")] Frame::Cassandra(cassandra) => Some(cassandra.stream_id), + #[cfg(feature = "redis")] Frame::Redis(_) => None, + #[cfg(feature = "kafka")] Frame::Kafka(_) => None, Frame::Dummy => None, + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => None, } } diff --git a/shotover/src/sources/mod.rs b/shotover/src/sources/mod.rs index af18b04c2..3a32c1a33 100644 --- a/shotover/src/sources/mod.rs +++ b/shotover/src/sources/mod.rs @@ -1,17 +1,25 @@ //! Sources used to listen for connections and send/recieve with the client. +#[cfg(feature = "cassandra")] use crate::sources::cassandra::{CassandraConfig, CassandraSource}; +#[cfg(feature = "kafka")] use crate::sources::kafka::{KafkaConfig, KafkaSource}; +#[cfg(feature = "opensearch")] use crate::sources::opensearch::{OpenSearchConfig, OpenSearchSource}; +#[cfg(feature = "redis")] use crate::sources::redis::{RedisConfig, RedisSource}; use anyhow::Result; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tokio::task::JoinHandle; +#[cfg(feature = "cassandra")] pub mod cassandra; +#[cfg(feature = "kafka")] pub mod kafka; +#[cfg(feature = "opensearch")] pub mod opensearch; +#[cfg(feature = "redis")] pub mod redis; #[derive(Serialize, Deserialize, Debug, Clone, Copy)] @@ -23,18 +31,26 @@ pub enum Transport { #[derive(Debug)] pub enum Source { + #[cfg(feature = "cassandra")] Cassandra(CassandraSource), + #[cfg(feature = "redis")] Redis(RedisSource), + #[cfg(feature = "kafka")] Kafka(KafkaSource), + #[cfg(feature = "opensearch")] OpenSearch(OpenSearchSource), } impl Source { pub fn into_join_handle(self) -> JoinHandle<()> { match self { + #[cfg(feature = "cassandra")] Source::Cassandra(c) => c.join_handle, + #[cfg(feature = "redis")] Source::Redis(r) => r.join_handle, + #[cfg(feature = "kafka")] Source::Kafka(r) => r.join_handle, + #[cfg(feature = "opensearch")] Source::OpenSearch(o) => o.join_handle, } } @@ -43,9 +59,13 @@ impl Source { #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub enum SourceConfig { + #[cfg(feature = "cassandra")] Cassandra(CassandraConfig), + #[cfg(feature = "redis")] Redis(RedisConfig), + #[cfg(feature = "kafka")] Kafka(KafkaConfig), + #[cfg(feature = "opensearch")] OpenSearch(OpenSearchConfig), } @@ -55,9 +75,13 @@ impl SourceConfig { trigger_shutdown_rx: watch::Receiver, ) -> Result> { match self { + #[cfg(feature = "cassandra")] SourceConfig::Cassandra(c) => c.get_source(trigger_shutdown_rx).await, + #[cfg(feature = "redis")] SourceConfig::Redis(r) => r.get_source(trigger_shutdown_rx).await, + #[cfg(feature = "kafka")] SourceConfig::Kafka(r) => r.get_source(trigger_shutdown_rx).await, + #[cfg(feature = "opensearch")] SourceConfig::OpenSearch(r) => r.get_source(trigger_shutdown_rx).await, } } diff --git a/shotover/src/transforms/coalesce.rs b/shotover/src/transforms/coalesce.rs index e2fc7a118..28657ecf4 100644 --- a/shotover/src/transforms/coalesce.rs +++ b/shotover/src/transforms/coalesce.rs @@ -89,7 +89,7 @@ impl Transform for Coalesce { } } -#[cfg(test)] +#[cfg(all(test, feature = "redis"))] mod test { use crate::frame::{Frame, RedisFrame}; use crate::message::Message; diff --git a/shotover/src/transforms/debug/returner.rs b/shotover/src/transforms/debug/returner.rs index 70f3f2351..b9febdbcf 100644 --- a/shotover/src/transforms/debug/returner.rs +++ b/shotover/src/transforms/debug/returner.rs @@ -1,5 +1,4 @@ -use crate::frame::{Frame, RedisFrame}; -use crate::message::{Message, Messages}; +use crate::message::Messages; use crate::transforms::{Transform, TransformBuilder, TransformConfig, Transforms, Wrapper}; use anyhow::{anyhow, Result}; use async_trait::async_trait; @@ -25,6 +24,7 @@ impl TransformConfig for DebugReturnerConfig { pub enum Response { #[serde(skip)] Message(Messages), + #[cfg(feature = "redis")] Redis(String), Fail, } @@ -59,15 +59,20 @@ impl Transform for DebugReturner { async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { match &self.response { Response::Message(message) => Ok(message.clone()), - Response::Redis(string) => Ok(requests_wrapper - .requests - .iter() - .map(|_| { - Message::from_frame(Frame::Redis(RedisFrame::BulkString( - string.to_string().into(), - ))) - }) - .collect()), + #[cfg(feature = "redis")] + Response::Redis(string) => { + use crate::frame::{Frame, RedisFrame}; + use crate::message::Message; + Ok(requests_wrapper + .requests + .iter() + .map(|_| { + Message::from_frame(Frame::Redis(RedisFrame::BulkString( + string.to_string().into(), + ))) + }) + .collect()) + } Response::Fail => Err(anyhow!("Intentional Fail")), } } diff --git a/shotover/src/transforms/filter.rs b/shotover/src/transforms/filter.rs index 770f280a8..2d67f83c1 100644 --- a/shotover/src/transforms/filter.rs +++ b/shotover/src/transforms/filter.rs @@ -108,7 +108,7 @@ impl Transform for QueryTypeFilter { } } -#[cfg(test)] +#[cfg(all(test, feature = "redis"))] mod test { use super::Filter; use crate::frame::Frame; diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index 71dcfb918..b710d5ebc 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -1,8 +1,11 @@ //! Various types required for defining a transform use crate::message::Messages; +#[cfg(feature = "cassandra")] use crate::transforms::cassandra::peers_rewrite::CassandraPeersRewrite; +#[cfg(feature = "cassandra")] use crate::transforms::cassandra::sink_cluster::CassandraSinkCluster; +#[cfg(feature = "cassandra")] use crate::transforms::cassandra::sink_single::CassandraSinkSingle; use crate::transforms::coalesce::Coalesce; use crate::transforms::debug::force_parse::DebugForceParse; @@ -10,22 +13,31 @@ use crate::transforms::debug::log_to_file::DebugLogToFile; use crate::transforms::debug::printer::DebugPrinter; use crate::transforms::debug::random_delay::DebugRandomDelay; use crate::transforms::debug::returner::DebugReturner; +#[cfg(feature = "redis")] use crate::transforms::distributed::tuneable_consistency_scatter::TuneableConsistentencyScatter; use crate::transforms::filter::QueryTypeFilter; +#[cfg(feature = "kafka")] use crate::transforms::kafka::sink_cluster::KafkaSinkCluster; +#[cfg(feature = "kafka")] use crate::transforms::kafka::sink_single::KafkaSinkSingle; use crate::transforms::load_balance::ConnectionBalanceAndPool; use crate::transforms::loopback::Loopback; use crate::transforms::null::NullSink; -#[cfg(feature = "alpha-transforms")] +#[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] use crate::transforms::opensearch::OpenSearchSinkSingle; use crate::transforms::parallel_map::ParallelMap; +#[cfg(feature = "cassandra")] use crate::transforms::protect::Protect; use crate::transforms::query_counter::QueryCounter; +#[cfg(all(feature = "redis", feature = "cassandra"))] use crate::transforms::redis::cache::SimpleRedisCache; +#[cfg(feature = "redis")] use crate::transforms::redis::cluster_ports_rewrite::RedisClusterPortsRewrite; +#[cfg(feature = "redis")] use crate::transforms::redis::sink_cluster::RedisSinkCluster; +#[cfg(feature = "redis")] use crate::transforms::redis::sink_single::RedisSinkSingle; +#[cfg(feature = "redis")] use crate::transforms::redis::timestamp_tagging::RedisTimestampTagger; use crate::transforms::tee::Tee; use crate::transforms::throttling::RequestThrottling; @@ -44,22 +56,27 @@ use tokio::time::Instant; use self::chain::TransformAndMetrics; +#[cfg(feature = "cassandra")] pub mod cassandra; pub mod chain; pub mod coalesce; pub mod debug; +#[cfg(feature = "redis")] pub mod distributed; pub mod filter; +#[cfg(feature = "kafka")] pub mod kafka; pub mod load_balance; pub mod loopback; pub mod noop; pub mod null; -#[cfg(feature = "alpha-transforms")] +#[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] pub mod opensearch; pub mod parallel_map; +#[cfg(feature = "cassandra")] pub mod protect; pub mod query_counter; +#[cfg(feature = "redis")] pub mod redis; pub mod sampler; pub mod tee; @@ -92,20 +109,32 @@ impl Debug for dyn TransformBuilder { /// than using dynamic trait objects. #[derive(IntoStaticStr)] pub enum Transforms { + #[cfg(feature = "kafka")] KafkaSinkSingle(KafkaSinkSingle), + #[cfg(feature = "kafka")] KafkaSinkCluster(KafkaSinkCluster), + #[cfg(feature = "cassandra")] CassandraSinkSingle(CassandraSinkSingle), + #[cfg(feature = "cassandra")] CassandraSinkCluster(Box), + #[cfg(feature = "redis")] RedisSinkSingle(RedisSinkSingle), + #[cfg(feature = "cassandra")] CassandraPeersRewrite(CassandraPeersRewrite), + #[cfg(all(feature = "redis", feature = "cassandra"))] RedisCache(SimpleRedisCache), Tee(Tee), NullSink(NullSink), Loopback(Loopback), + #[cfg(feature = "cassandra")] Protect(Box), + #[cfg(feature = "redis")] TuneableConsistencyScatter(TuneableConsistentencyScatter), + #[cfg(feature = "redis")] RedisTimestampTagger(RedisTimestampTagger), + #[cfg(feature = "redis")] RedisSinkCluster(RedisSinkCluster), + #[cfg(feature = "redis")] RedisClusterPortsRewrite(RedisClusterPortsRewrite), DebugReturner(DebugReturner), DebugRandomDelay(DebugRandomDelay), @@ -119,7 +148,7 @@ pub enum Transforms { QueryCounter(QueryCounter), RequestThrottling(RequestThrottling), Custom(Box), - #[cfg(feature = "alpha-transforms")] + #[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] OpenSearchSinkSingle(OpenSearchSinkSingle), } @@ -132,11 +161,17 @@ impl Debug for Transforms { impl Transforms { async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { match self { + #[cfg(feature = "kafka")] Transforms::KafkaSinkSingle(c) => c.transform(requests_wrapper).await, + #[cfg(feature = "kafka")] Transforms::KafkaSinkCluster(c) => c.transform(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraSinkSingle(c) => c.transform(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraSinkCluster(c) => c.transform(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraPeersRewrite(c) => c.transform(requests_wrapper).await, + #[cfg(all(feature = "redis", feature = "cassandra"))] Transforms::RedisCache(r) => r.transform(requests_wrapper).await, Transforms::Tee(m) => m.transform(requests_wrapper).await, Transforms::DebugPrinter(p) => p.transform(requests_wrapper).await, @@ -144,13 +179,19 @@ impl Transforms { Transforms::DebugForceParse(p) => p.transform(requests_wrapper).await, Transforms::NullSink(n) => n.transform(requests_wrapper).await, Transforms::Loopback(n) => n.transform(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::Protect(p) => p.transform(requests_wrapper).await, Transforms::DebugReturner(p) => p.transform(requests_wrapper).await, Transforms::DebugRandomDelay(p) => p.transform(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::TuneableConsistencyScatter(tc) => tc.transform(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisSinkSingle(r) => r.transform(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisTimestampTagger(r) => r.transform(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisClusterPortsRewrite(r) => r.transform(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisSinkCluster(r) => r.transform(requests_wrapper).await, Transforms::ParallelMap(s) => s.transform(requests_wrapper).await, Transforms::PoolConnections(s) => s.transform(requests_wrapper).await, @@ -159,18 +200,24 @@ impl Transforms { Transforms::QueryCounter(s) => s.transform(requests_wrapper).await, Transforms::RequestThrottling(s) => s.transform(requests_wrapper).await, Transforms::Custom(s) => s.transform(requests_wrapper).await, - #[cfg(feature = "alpha-transforms")] + #[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] Transforms::OpenSearchSinkSingle(s) => s.transform(requests_wrapper).await, } } async fn transform_pushed<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { match self { + #[cfg(feature = "kafka")] Transforms::KafkaSinkSingle(c) => c.transform_pushed(requests_wrapper).await, + #[cfg(feature = "kafka")] Transforms::KafkaSinkCluster(c) => c.transform_pushed(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraSinkSingle(c) => c.transform_pushed(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraSinkCluster(c) => c.transform_pushed(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::CassandraPeersRewrite(c) => c.transform_pushed(requests_wrapper).await, + #[cfg(all(feature = "redis", feature = "cassandra"))] Transforms::RedisCache(r) => r.transform_pushed(requests_wrapper).await, Transforms::Tee(m) => m.transform_pushed(requests_wrapper).await, Transforms::DebugPrinter(p) => p.transform_pushed(requests_wrapper).await, @@ -178,15 +225,21 @@ impl Transforms { Transforms::DebugForceParse(p) => p.transform_pushed(requests_wrapper).await, Transforms::NullSink(n) => n.transform_pushed(requests_wrapper).await, Transforms::Loopback(n) => n.transform_pushed(requests_wrapper).await, + #[cfg(feature = "cassandra")] Transforms::Protect(p) => p.transform_pushed(requests_wrapper).await, Transforms::DebugReturner(p) => p.transform_pushed(requests_wrapper).await, Transforms::DebugRandomDelay(p) => p.transform_pushed(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::TuneableConsistencyScatter(tc) => { tc.transform_pushed(requests_wrapper).await } + #[cfg(feature = "redis")] Transforms::RedisSinkSingle(r) => r.transform_pushed(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisTimestampTagger(r) => r.transform_pushed(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisClusterPortsRewrite(r) => r.transform_pushed(requests_wrapper).await, + #[cfg(feature = "redis")] Transforms::RedisSinkCluster(r) => r.transform_pushed(requests_wrapper).await, Transforms::ParallelMap(s) => s.transform_pushed(requests_wrapper).await, Transforms::PoolConnections(s) => s.transform_pushed(requests_wrapper).await, @@ -195,7 +248,7 @@ impl Transforms { Transforms::QueryCounter(s) => s.transform_pushed(requests_wrapper).await, Transforms::RequestThrottling(s) => s.transform_pushed(requests_wrapper).await, Transforms::Custom(s) => s.transform_pushed(requests_wrapper).await, - #[cfg(feature = "alpha-transforms")] + #[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] Transforms::OpenSearchSinkSingle(s) => s.transform_pushed(requests_wrapper).await, } } @@ -206,23 +259,34 @@ impl Transforms { fn set_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender) { match self { + #[cfg(feature = "kafka")] Transforms::KafkaSinkSingle(c) => c.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "kafka")] Transforms::KafkaSinkCluster(c) => c.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "cassandra")] Transforms::CassandraSinkSingle(c) => c.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "cassandra")] Transforms::CassandraSinkCluster(c) => c.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "cassandra")] Transforms::CassandraPeersRewrite(c) => c.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(all(feature = "redis", feature = "cassandra"))] Transforms::RedisCache(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::Tee(t) => t.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "redis")] Transforms::RedisSinkSingle(r) => r.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "redis")] Transforms::TuneableConsistencyScatter(c) => { c.set_pushed_messages_tx(pushed_messages_tx) } + #[cfg(feature = "redis")] Transforms::RedisTimestampTagger(r) => r.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "redis")] Transforms::RedisClusterPortsRewrite(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::DebugPrinter(p) => p.set_pushed_messages_tx(pushed_messages_tx), Transforms::DebugLogToFile(p) => p.set_pushed_messages_tx(pushed_messages_tx), Transforms::DebugForceParse(p) => p.set_pushed_messages_tx(pushed_messages_tx), Transforms::NullSink(n) => n.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "redis")] Transforms::RedisSinkCluster(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::ParallelMap(s) => s.set_pushed_messages_tx(pushed_messages_tx), Transforms::PoolConnections(s) => s.set_pushed_messages_tx(pushed_messages_tx), @@ -230,12 +294,13 @@ impl Transforms { Transforms::QueryTypeFilter(s) => s.set_pushed_messages_tx(pushed_messages_tx), Transforms::QueryCounter(s) => s.set_pushed_messages_tx(pushed_messages_tx), Transforms::Loopback(l) => l.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "cassandra")] Transforms::Protect(p) => p.set_pushed_messages_tx(pushed_messages_tx), Transforms::DebugReturner(d) => d.set_pushed_messages_tx(pushed_messages_tx), Transforms::DebugRandomDelay(d) => d.set_pushed_messages_tx(pushed_messages_tx), Transforms::RequestThrottling(d) => d.set_pushed_messages_tx(pushed_messages_tx), Transforms::Custom(d) => d.set_pushed_messages_tx(pushed_messages_tx), - #[cfg(feature = "alpha-transforms")] + #[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] Transforms::OpenSearchSinkSingle(s) => s.set_pushed_messages_tx(pushed_messages_tx), } } diff --git a/shotover/src/transforms/query_counter.rs b/shotover/src/transforms/query_counter.rs index 147175900..153314cd3 100644 --- a/shotover/src/transforms/query_counter.rs +++ b/shotover/src/transforms/query_counter.rs @@ -1,5 +1,4 @@ use crate::frame::Frame; -use crate::frame::RedisFrame; use crate::message::Messages; use crate::transforms::TransformConfig; use crate::transforms::{Transform, TransformBuilder, Transforms, Wrapper}; @@ -43,24 +42,49 @@ impl Transform for QueryCounter { async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result { for m in &mut requests_wrapper.requests { match m.frame() { + #[cfg(feature = "cassandra")] Some(Frame::Cassandra(frame)) => { for statement in frame.operation.queries() { counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => statement.short_name(), "type" => "cassandra"); } } + #[cfg(feature = "redis")] Some(Frame::Redis(frame)) => { + use crate::frame::RedisFrame; + fn get_redis_query_type(frame: &RedisFrame) -> Option { + if let RedisFrame::Array(array) = frame { + if let Some(RedisFrame::BulkString(v)) = array.first() { + let upper_bytes = v.to_ascii_uppercase(); + match String::from_utf8(upper_bytes) { + Ok(query_type) => { + return Some(query_type); + } + Err(err) => { + tracing::error!( + "Failed to convert redis bulkstring to string, err: {:?}", + err + ) + } + } + } + } + None + } + if let Some(query_type) = get_redis_query_type(frame) { counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => query_type, "type" => "redis"); } else { counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => "unknown", "type" => "redis"); } } + #[cfg(feature = "kafka")] Some(Frame::Kafka(_)) => { counter!("shotover_query_count", 1, "name" => self.counter_name.clone(), "query" => "unknown", "type" => "kafka"); } Some(Frame::Dummy) => { // Dummy does not count as a message } + #[cfg(feature = "opensearch")] Some(Frame::OpenSearch(_)) => { todo!(); } @@ -74,26 +98,6 @@ impl Transform for QueryCounter { } } -fn get_redis_query_type(frame: &RedisFrame) -> Option { - if let RedisFrame::Array(array) = frame { - if let Some(RedisFrame::BulkString(v)) = array.first() { - let upper_bytes = v.to_ascii_uppercase(); - match String::from_utf8(upper_bytes) { - Ok(query_type) => { - return Some(query_type); - } - Err(err) => { - tracing::error!( - "Failed to convert redis bulkstring to string, err: {:?}", - err - ) - } - } - } - } - None -} - #[typetag::serde(name = "QueryCounter")] #[async_trait(?Send)] impl TransformConfig for QueryCounterConfig { diff --git a/shotover/src/transforms/redis/mod.rs b/shotover/src/transforms/redis/mod.rs index f0a1e6789..c2957053e 100644 --- a/shotover/src/transforms/redis/mod.rs +++ b/shotover/src/transforms/redis/mod.rs @@ -1,5 +1,6 @@ use crate::transforms::util::ConnectionError; +#[cfg(all(feature = "redis", feature = "cassandra"))] pub mod cache; pub mod cluster_ports_rewrite; pub mod sink_cluster; diff --git a/shotover/src/transforms/util/cluster_connection_pool.rs b/shotover/src/transforms/util/cluster_connection_pool.rs index 9617a438f..949eb33ff 100644 --- a/shotover/src/transforms/util/cluster_connection_pool.rs +++ b/shotover/src/transforms/util/cluster_connection_pool.rs @@ -286,7 +286,7 @@ async fn rx_process( Ok(()) } -#[cfg(test)] +#[cfg(all(test, feature = "redis"))] mod test { use super::spawn_read_write_tasks; use crate::codec::redis::RedisCodecBuilder; diff --git a/windsock-cloud-docker/src/container.rs b/windsock-cloud-docker/src/container.rs index 1f69668e3..e1eada315 100644 --- a/windsock-cloud-docker/src/container.rs +++ b/windsock-cloud-docker/src/container.rs @@ -68,6 +68,9 @@ unzip awscliv2.zip // run windsock let mut args = std::env::args(); args.next(); // skip binary name + let features = args + .next() + .expect("The first argument must be a list of features to compile shotover with"); let args: Vec = args .map(|x| { if x.is_empty() { @@ -83,7 +86,7 @@ unzip awscliv2.zip container_bash(&format!( r#"cd shotover-proxy; source "$HOME/.cargo/env"; -AWS_ACCESS_KEY_ID={access_key_id} AWS_SECRET_ACCESS_KEY={secret_access_key} CARGO_TERM_COLOR=always cargo test --target-dir /target --release --bench windsock --features alpha-transforms --features rdkafka-driver-tests -- {args}"# +AWS_ACCESS_KEY_ID={access_key_id} AWS_SECRET_ACCESS_KEY={secret_access_key} CARGO_TERM_COLOR=always cargo test --target-dir /target --release --bench windsock --no-default-features --features alpha-transforms,rdkafka-driver-tests,{features} -- {args}"# )).await; // extract windsock results