diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 42c3be366..1cbfa7936 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -40,11 +40,11 @@ jobs: - name: Install ubuntu packages run: shotover-proxy/build/install_ubuntu_packages.sh - name: Install cargo-hack - run: cargo install cargo-hack --version 0.5.8 + run: cargo install cargo-hack --version 0.6.16 - name: Ensure that dev tools compiles and has no warnings with no features enabled run: cargo clippy --locked ${{ matrix.cargo_flags }} --all-targets -- -D warnings - name: Ensure that shotover-proxy compiles and has no warnings under every possible combination of features # some things to explicitly point out: # * clippy also reports rustc warnings and errors # * clippy --all-targets is not run so we only build the shotover_proxy executable without the tests/benches - run: cargo hack --feature-powerset clippy --locked ${{ matrix.cargo_flags }} --package shotover-proxy -- -D warnings + run: cargo hack --feature-powerset --at-least-one-of redis,cassandra,kafka,opensearch clippy --locked ${{ matrix.cargo_flags }} --package shotover-proxy -- -D warnings diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index 995db8826..ccc8c4f30 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -63,7 +63,7 @@ alpha-transforms = ["shotover/alpha-transforms"] cassandra = ["shotover/cassandra"] kafka = ["shotover/kafka"] redis = ["shotover/redis"] -opensearch = [] +opensearch = ["shotover/opensearch"] cassandra-cpp-driver-tests = ["test-helpers/cassandra-cpp-driver-tests"] rdkafka-driver-tests = ["test-helpers/rdkafka-driver-tests"] default = ["cassandra", "kafka", "redis", "opensearch"] diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index b9d0e3037..a617d7816 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -19,10 +19,25 @@ cassandra = [ "dep:version-compare", "dep:aws-sdk-kms", "dep:aws-config", + "dep:base64", + "dep:serde_json", + "dep:halfbrown", + "dep:chacha20poly1305", + "dep:generic-array", ] -kafka = ["dep:kafka-protocol", "dep:dashmap", "dep:xxhash-rust"] -redis = ["dep:redis-protocol", "dep:csv"] -default = ["cassandra", "redis", "kafka"] +kafka = [ + "dep:kafka-protocol", + "dep:dashmap", + "dep:xxhash-rust", + "dep:string" +] +redis = [ + "dep:redis-protocol", + "dep:csv", + "dep:crc16" +] +opensearch = ["dep:atoi"] +default = ["cassandra", "redis", "kafka", "opensearch"] [dependencies] atomic_enum = "0.2.0" @@ -58,13 +73,13 @@ backtrace-ext = "0.2" # Parsers cql3-parser = { version = "0.4.0", optional = true } serde.workspace = true -serde_json.workspace = true +serde_json = { workspace = true, optional = true } serde_yaml.workspace = true bincode.workspace = true num = { version = "0.4.0", features = ["serde"] } uuid = { workspace = true } bigdecimal = { version = "0.4.0", features = ["serde"] } -base64 = "0.21.0" +base64 = { version = "0.21.0", optional = true } httparse = "1.8.0" http = "0.2.9" @@ -75,36 +90,36 @@ tracing.workspace = true tracing-subscriber.workspace = true tracing-appender.workspace = true hyper.workspace = true -halfbrown = "0.2.1" +halfbrown = { version = "0.2.1", optional = true } # Transform dependencies redis-protocol = { workspace = true, optional = true } cassandra-protocol = { workspace = true, optional = true } -crc16 = "0.4.0" +crc16 = { version = "0.4.0", optional = true } ordered-float.workspace = true #Crypto aws-config = { version = "1.0.0", optional = true } aws-sdk-kms = { version = "1.1.0", optional = true } strum_macros = "0.26" -chacha20poly1305 = { version = "0.10.0", features = ["std"] } -generic-array = { version = "0.14", features = ["serde"] } +chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true } +generic-array = { version = "0.14", features = ["serde"], optional = true } kafka-protocol = { version = "0.8.0", optional = true } rustls = { version = "0.22.0" } tokio-rustls = "0.25" rustls-pemfile = "2.0.0" rustls-pki-types = "1.0.1" -string = "0.3.0" +string = { version = "0.3.0", optional = true } xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true } dashmap = { version = "5.4.0", optional = true } -atoi = "2.0.0" +atoi = { version = "2.0.0", optional = true } [dev-dependencies] criterion = { version = "0.5.0", features = ["async_tokio"] } hex-literal.workspace = true -# TODO: Optionionally compiling benches is quite tricky with criterion, maybe it would be easier with divan? -# For now just set requried features +# TODO: Optionally compiling benches is quite tricky with criterion, maybe it would be easier with divan? +# For now just set required features [[bench]] name = "benches" harness = false diff --git a/shotover/src/codec/mod.rs b/shotover/src/codec/mod.rs index d4b10db81..76f2a89a7 100644 --- a/shotover/src/codec/mod.rs +++ b/shotover/src/codec/mod.rs @@ -13,6 +13,7 @@ use tokio_util::codec::{Decoder, Encoder}; pub mod cassandra; #[cfg(feature = "kafka")] pub mod kafka; +#[cfg(feature = "opensearch")] pub mod opensearch; #[cfg(feature = "redis")] pub mod redis; @@ -56,6 +57,7 @@ pub enum CodecState { request_header: Option, }, Dummy, + #[cfg(feature = "opensearch")] OpenSearch, } diff --git a/shotover/src/frame/mod.rs b/shotover/src/frame/mod.rs index ea5822410..a318ab85e 100644 --- a/shotover/src/frame/mod.rs +++ b/shotover/src/frame/mod.rs @@ -32,6 +32,7 @@ pub enum MessageType { #[cfg(feature = "kafka")] Kafka, Dummy, + #[cfg(feature = "opensearch")] OpenSearch, } @@ -44,6 +45,7 @@ impl From<&ProtocolType> for MessageType { ProtocolType::Redis => Self::Redis, #[cfg(feature = "kafka")] ProtocolType::Kafka { .. } => Self::Kafka, + #[cfg(feature = "opensearch")] ProtocolType::OpenSearch => Self::OpenSearch, } } @@ -63,6 +65,7 @@ impl Frame { request_header: None, }, Frame::Dummy => CodecState::Dummy, + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => CodecState::OpenSearch, } } @@ -79,6 +82,7 @@ pub enum Frame { /// Represents a message that has must exist due to shotovers requirement that every request has a corresponding response. /// It exists purely to keep transform invariants and codecs will completely ignore this frame when they receive it Dummy, + #[cfg(feature = "opensearch")] OpenSearch(OpenSearchFrame), } @@ -86,12 +90,12 @@ impl Frame { pub fn from_bytes( bytes: Bytes, message_type: MessageType, - _codec_state: CodecState, + codec_state: CodecState, ) -> Result { match message_type { #[cfg(feature = "cassandra")] MessageType::Cassandra => { - CassandraFrame::from_bytes(bytes, _codec_state.as_cassandra()).map(Frame::Cassandra) + CassandraFrame::from_bytes(bytes, codec_state.as_cassandra()).map(Frame::Cassandra) } #[cfg(feature = "redis")] MessageType::Redis => redis_protocol::resp2::decode::decode(&bytes) @@ -99,9 +103,10 @@ impl Frame { .map_err(|e| anyhow!("{e:?}")), #[cfg(feature = "kafka")] MessageType::Kafka => { - KafkaFrame::from_bytes(bytes, _codec_state.as_kafka()).map(Frame::Kafka) + KafkaFrame::from_bytes(bytes, codec_state.as_kafka()).map(Frame::Kafka) } MessageType::Dummy => Ok(Frame::Dummy), + #[cfg(feature = "opensearch")] MessageType::OpenSearch => Ok(Frame::OpenSearch(OpenSearchFrame::from_bytes(&bytes)?)), } } @@ -115,6 +120,7 @@ impl Frame { #[cfg(feature = "kafka")] Frame::Kafka(_) => "Kafka", Frame::Dummy => "Dummy", + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => "OpenSearch", } } @@ -128,6 +134,7 @@ impl Frame { #[cfg(feature = "kafka")] Frame::Kafka(_) => MessageType::Kafka, Frame::Dummy => MessageType::Dummy, + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => MessageType::OpenSearch, } } @@ -179,6 +186,7 @@ impl Frame { pub fn into_opensearch(self) -> Result { match self { + #[cfg(feature = "opensearch")] Frame::OpenSearch(frame) => Ok(frame), frame => Err(anyhow!( "Expected opensearch frame but received {} frame", @@ -198,6 +206,7 @@ impl Display for Frame { #[cfg(feature = "kafka")] Frame::Kafka(frame) => write!(f, "Kafka {}", frame), Frame::Dummy => write!(f, "Shotover internal dummy message"), + #[cfg(feature = "opensearch")] Frame::OpenSearch(frame) => write!(f, "OpenSearch: {:?}", frame), } } diff --git a/shotover/src/lib.rs b/shotover/src/lib.rs index 23ca08f2e..921b6efd4 100644 --- a/shotover/src/lib.rs +++ b/shotover/src/lib.rs @@ -35,10 +35,20 @@ any( not(feature = "cassandra"), not(feature = "redis"), - not(feature = "kafka") + not(feature = "kafka"), + not(feature = "opensearch"), ), - allow(dead_code, unused_imports) + allow(dead_code, unused_imports, unused_variables) )] +#[cfg(all( + not(feature = "cassandra"), + not(feature = "redis"), + not(feature = "kafka"), + not(feature = "opensearch"), +))] +compile_error!( + "At least one protocol feature must be enabled, e.g. `cassandra`, `redis`, `kafka` or `opensearch`" +); pub mod codec; pub mod config; diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 971207094..0f51260f5 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -36,21 +36,21 @@ pub enum Metadata { Redis, #[cfg(feature = "kafka")] Kafka, + #[cfg(feature = "opensearch")] OpenSearch, } #[derive(PartialEq)] pub enum ProtocolType { #[cfg(feature = "cassandra")] - Cassandra { - compression: Compression, - }, + Cassandra { compression: Compression }, #[cfg(feature = "redis")] Redis, #[cfg(feature = "kafka")] Kafka { request_header: Option, }, + #[cfg(feature = "opensearch")] OpenSearch, } @@ -67,6 +67,7 @@ impl From<&ProtocolType> for CodecState { ProtocolType::Kafka { request_header } => Self::Kafka { request_header: *request_header, }, + #[cfg(feature = "opensearch")] ProtocolType::OpenSearch => Self::OpenSearch, } } @@ -275,6 +276,7 @@ impl Message { #[cfg(feature = "kafka")] MessageType::Kafka => todo!(), MessageType::Dummy => nonzero!(1u32), + #[cfg(feature = "opensearch")] MessageType::OpenSearch => todo!(), }, MessageInner::Modified { frame } | MessageInner::Parsed { frame, .. } => match frame { @@ -285,6 +287,7 @@ impl Message { #[cfg(feature = "kafka")] Frame::Kafka(_) => todo!(), Frame::Dummy => nonzero!(1u32), + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => todo!(), }, }) @@ -312,6 +315,7 @@ impl Message { #[cfg(feature = "kafka")] Some(Frame::Kafka(_)) => todo!(), Some(Frame::Dummy) => todo!(), + #[cfg(feature = "opensearch")] Some(Frame::OpenSearch(_)) => todo!(), None => QueryType::ReadWrite, } @@ -350,6 +354,7 @@ impl Message { Metadata::Kafka => return Err(anyhow!(_error).context( "A generic error cannot be formed because the kafka protocol does not support it", )), + #[cfg(feature = "opensearch")] Metadata::OpenSearch => unimplemented!() })) } @@ -372,6 +377,7 @@ impl Message { #[cfg(feature = "kafka")] MessageType::Kafka => Ok(Metadata::Kafka), MessageType::Dummy => Err(anyhow!("Dummy has no metadata")), + #[cfg(feature = "opensearch")] MessageType::OpenSearch => Err(anyhow!("OpenSearch has no metadata")), }, MessageInner::Parsed { frame, .. } | MessageInner::Modified { frame } => match frame { @@ -382,6 +388,7 @@ impl Message { #[cfg(feature = "redis")] Frame::Redis(_) => Ok(Metadata::Redis), Frame::Dummy => Err(anyhow!("dummy has no metadata")), + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => Err(anyhow!("OpenSearch has no metadata")), }, } @@ -412,6 +419,7 @@ impl Message { Metadata::Redis => unimplemented!(), #[cfg(feature = "kafka")] Metadata::Kafka => unimplemented!(), + #[cfg(feature = "opensearch")] Metadata::OpenSearch => unimplemented!(), }, // reachable with feature = cassandra @@ -451,6 +459,7 @@ impl Message { #[cfg(feature = "kafka")] Frame::Kafka(_) => None, Frame::Dummy => None, + #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => None, } } diff --git a/shotover/src/sources/mod.rs b/shotover/src/sources/mod.rs index 91d587190..3a32c1a33 100644 --- a/shotover/src/sources/mod.rs +++ b/shotover/src/sources/mod.rs @@ -4,6 +4,7 @@ use crate::sources::cassandra::{CassandraConfig, CassandraSource}; #[cfg(feature = "kafka")] use crate::sources::kafka::{KafkaConfig, KafkaSource}; +#[cfg(feature = "opensearch")] use crate::sources::opensearch::{OpenSearchConfig, OpenSearchSource}; #[cfg(feature = "redis")] use crate::sources::redis::{RedisConfig, RedisSource}; @@ -16,6 +17,7 @@ use tokio::task::JoinHandle; pub mod cassandra; #[cfg(feature = "kafka")] pub mod kafka; +#[cfg(feature = "opensearch")] pub mod opensearch; #[cfg(feature = "redis")] pub mod redis; @@ -35,6 +37,7 @@ pub enum Source { Redis(RedisSource), #[cfg(feature = "kafka")] Kafka(KafkaSource), + #[cfg(feature = "opensearch")] OpenSearch(OpenSearchSource), } @@ -47,6 +50,7 @@ impl Source { Source::Redis(r) => r.join_handle, #[cfg(feature = "kafka")] Source::Kafka(r) => r.join_handle, + #[cfg(feature = "opensearch")] Source::OpenSearch(o) => o.join_handle, } } @@ -61,6 +65,7 @@ pub enum SourceConfig { Redis(RedisConfig), #[cfg(feature = "kafka")] Kafka(KafkaConfig), + #[cfg(feature = "opensearch")] OpenSearch(OpenSearchConfig), } @@ -76,6 +81,7 @@ impl SourceConfig { SourceConfig::Redis(r) => r.get_source(trigger_shutdown_rx).await, #[cfg(feature = "kafka")] SourceConfig::Kafka(r) => r.get_source(trigger_shutdown_rx).await, + #[cfg(feature = "opensearch")] SourceConfig::OpenSearch(r) => r.get_source(trigger_shutdown_rx).await, } } diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index 3909645a6..b710d5ebc 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -23,7 +23,7 @@ use crate::transforms::kafka::sink_single::KafkaSinkSingle; use crate::transforms::load_balance::ConnectionBalanceAndPool; use crate::transforms::loopback::Loopback; use crate::transforms::null::NullSink; -#[cfg(feature = "alpha-transforms")] +#[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] use crate::transforms::opensearch::OpenSearchSinkSingle; use crate::transforms::parallel_map::ParallelMap; #[cfg(feature = "cassandra")] @@ -70,7 +70,7 @@ pub mod load_balance; pub mod loopback; pub mod noop; pub mod null; -#[cfg(feature = "alpha-transforms")] +#[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] pub mod opensearch; pub mod parallel_map; #[cfg(feature = "cassandra")] @@ -148,7 +148,7 @@ pub enum Transforms { QueryCounter(QueryCounter), RequestThrottling(RequestThrottling), Custom(Box), - #[cfg(feature = "alpha-transforms")] + #[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] OpenSearchSinkSingle(OpenSearchSinkSingle), } @@ -200,7 +200,7 @@ impl Transforms { Transforms::QueryCounter(s) => s.transform(requests_wrapper).await, Transforms::RequestThrottling(s) => s.transform(requests_wrapper).await, Transforms::Custom(s) => s.transform(requests_wrapper).await, - #[cfg(feature = "alpha-transforms")] + #[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] Transforms::OpenSearchSinkSingle(s) => s.transform(requests_wrapper).await, } } @@ -248,7 +248,7 @@ impl Transforms { Transforms::QueryCounter(s) => s.transform_pushed(requests_wrapper).await, Transforms::RequestThrottling(s) => s.transform_pushed(requests_wrapper).await, Transforms::Custom(s) => s.transform_pushed(requests_wrapper).await, - #[cfg(feature = "alpha-transforms")] + #[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] Transforms::OpenSearchSinkSingle(s) => s.transform_pushed(requests_wrapper).await, } } @@ -300,7 +300,7 @@ impl Transforms { Transforms::DebugRandomDelay(d) => d.set_pushed_messages_tx(pushed_messages_tx), Transforms::RequestThrottling(d) => d.set_pushed_messages_tx(pushed_messages_tx), Transforms::Custom(d) => d.set_pushed_messages_tx(pushed_messages_tx), - #[cfg(feature = "alpha-transforms")] + #[cfg(all(feature = "alpha-transforms", feature = "opensearch"))] Transforms::OpenSearchSinkSingle(s) => s.set_pushed_messages_tx(pushed_messages_tx), } } diff --git a/shotover/src/transforms/query_counter.rs b/shotover/src/transforms/query_counter.rs index 8374a3fd8..153314cd3 100644 --- a/shotover/src/transforms/query_counter.rs +++ b/shotover/src/transforms/query_counter.rs @@ -84,6 +84,7 @@ impl Transform for QueryCounter { Some(Frame::Dummy) => { // Dummy does not count as a message } + #[cfg(feature = "opensearch")] Some(Frame::OpenSearch(_)) => { todo!(); }