Skip to content

Commit

Permalink
Add feature flags for each protocol in shotover
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jan 29, 2024
1 parent 0d7165e commit 6a2f72c
Show file tree
Hide file tree
Showing 15 changed files with 273 additions and 53 deletions.
5 changes: 4 additions & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ rustflags = [
linker = "aarch64-linux-gnu-gcc"

[alias]
windsock = "test --release --bench windsock --features alpha-transforms,rdkafka-driver-tests --"
windsock-redis = "test --release --bench windsock --no-default-features --features redis,alpha-transforms --"
windsock-kafka = "test --release --bench windsock --no-default-features --features kafka,alpha-transforms,rdkafka-driver-tests --"
windsock-cassandra = "test --release --bench windsock --no-default-features --features cassandra,alpha-transforms --"
windsock = "test --release --bench windsock --all-features --"
windsock-debug = "test --bench windsock --features alpha-transforms,rdkafka-driver-tests --"
windsock-cloud-docker = "run --package windsock-cloud-docker --"
2 changes: 1 addition & 1 deletion custom-transforms-example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
shotover = { path = "../shotover" }
shotover = { path = "../shotover", default-features = false, features = ["redis"]}
anyhow.workspace = true
serde.workspace = true
async-trait.workspace = true
Expand Down
6 changes: 5 additions & 1 deletion shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
shotover = { path = "../shotover" }
shotover = { path = "../shotover", default-features = false}

[dev-dependencies]
prometheus-parse = "0.2.4"
Expand Down Expand Up @@ -60,8 +60,12 @@ shell-quote.workspace = true
[features]
# Include WIP alpha transforms in the public API
alpha-transforms = ["shotover/alpha-transforms"]
cassandra = ["shotover/cassandra"]
kafka = ["shotover/kafka"]
redis = ["shotover/redis"]
cassandra-cpp-driver-tests = ["test-helpers/cassandra-cpp-driver-tests"]
rdkafka-driver-tests = ["test-helpers/rdkafka-driver-tests"]
default = ["cassandra", "kafka", "redis"]

[[bench]]
name = "windsock"
Expand Down
83 changes: 52 additions & 31 deletions shotover-proxy/benches/windsock/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
#[cfg(feature = "cassandra")]
mod cassandra;
mod cloud;
mod common;
#[cfg(feature = "rdkafka-driver-tests")]
#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))]
mod kafka;
mod profilers;
#[cfg(feature = "redis")]
mod redis;
mod shotover;

#[cfg(feature = "cassandra")]
use crate::cassandra::*;
#[cfg(any(
feature = "cassandra",
feature = "redis",
feature = "kafka",
all(feature = "rdkafka-driver-tests", feature = "kafka")
))]
use crate::common::*;
#[cfg(feature = "rdkafka-driver-tests")]
#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))]
use crate::kafka::*;
#[cfg(feature = "redis")]
use crate::redis::*;
use cloud::CloudResources;
use cloud::CloudResourcesRequired;
Expand Down Expand Up @@ -41,6 +51,7 @@ fn main() {
.unwrap();
}

#[cfg(feature = "cassandra")]
let cassandra_benches = itertools::iproduct!(
[CassandraDb::Cassandra],
[CassandraTopology::Single, CassandraTopology::Cluster3],
Expand Down Expand Up @@ -88,7 +99,36 @@ fn main() {
)) as ShotoverBench)
},
);
#[cfg(feature = "rdkafka-driver-tests")]
#[cfg(not(feature = "cassandra"))]
let cassandra_benches = std::iter::empty();
#[cfg(feature = "cassandra")]
let cassandra_mock_benches = vec![
Box::new(CassandraBench::new(
CassandraDb::Mocked,
CassandraTopology::Single,
Shotover::None,
Compression::None,
Operation::ReadI64,
CassandraProtocol::V4,
CassandraDriver::Scylla,
10,
)) as ShotoverBench,
Box::new(CassandraBench::new(
CassandraDb::Mocked,
CassandraTopology::Single,
Shotover::Standard,
Compression::None,
Operation::ReadI64,
CassandraProtocol::V4,
CassandraDriver::Scylla,
10,
)),
]
.into_iter();
#[cfg(not(feature = "cassandra"))]
let cassandra_mock_benches = std::iter::empty();

#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))]
let kafka_benches = itertools::iproduct!(
[
Shotover::None,
Expand All @@ -105,9 +145,10 @@ fn main() {
.map(|(shotover, topology, size)| {
Box::new(KafkaBench::new(shotover, topology, size)) as ShotoverBench
});
#[cfg(not(feature = "rdkafka-driver-tests"))]
#[cfg(not(all(feature = "rdkafka-driver-tests", feature = "kafka")))]
let kafka_benches = std::iter::empty();

#[cfg(feature = "redis")]
let redis_benches = itertools::iproduct!(
[RedisTopology::Cluster3, RedisTopology::Single],
[
Expand All @@ -121,35 +162,15 @@ fn main() {
.map(|(topology, shotover, operation, encryption)| {
Box::new(RedisBench::new(topology, shotover, operation, encryption)) as ShotoverBench
});
#[cfg(not(feature = "redis"))]
let redis_benches = std::iter::empty();

Windsock::new(
vec![
Box::new(CassandraBench::new(
CassandraDb::Mocked,
CassandraTopology::Single,
Shotover::None,
Compression::None,
Operation::ReadI64,
CassandraProtocol::V4,
CassandraDriver::Scylla,
10,
)) as ShotoverBench,
Box::new(CassandraBench::new(
CassandraDb::Mocked,
CassandraTopology::Single,
Shotover::Standard,
Compression::None,
Operation::ReadI64,
CassandraProtocol::V4,
CassandraDriver::Scylla,
10,
)),
]
.into_iter()
.chain(cassandra_benches)
.chain(kafka_benches)
.chain(redis_benches)
.collect(),
cassandra_mock_benches
.chain(cassandra_benches)
.chain(kafka_benches)
.chain(redis_benches)
.collect(),
cloud::AwsCloud::new_boxed(),
&["release"],
)
Expand Down
3 changes: 2 additions & 1 deletion shotover-proxy/tests/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#[allow(clippy::single_component_path_imports)]
#[allow(clippy::single_component_path_imports, unused_imports)]
use rstest_reuse;

use test_helpers::shotover_process::ShotoverProcessBuilder;
use tokio_bin_process::bin_path;

#[cfg(feature = "cassandra")]
mod cassandra_int_tests;
mod kafka_int_tests;
#[cfg(feature = "alpha-transforms")]
Expand Down
33 changes: 22 additions & 11 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ description = "Shotover API for building custom transforms"
[features]
# Include WIP alpha transforms in the public API
alpha-transforms = []
cassandra = [
"dep:cassandra-protocol",
"dep:cql3-parser",
"dep:lz4_flex",
"dep:version-compare",
"dep:uuid"
]
kafka = ["dep:kafka-protocol", "dep:dashmap", "dep:xxhash-rust"]
redis = ["dep:redis-protocol", "dep:csv"]
default = ["cassandra"]

[dependencies]
atomic_enum = "0.2.0"
Expand All @@ -23,19 +33,18 @@ cached = { version = "0.47", features = ["async"] }
async-recursion = "1.0"
governor = { version = "0.6", default-features = false, features = ["std", "jitter", "quanta"] }
nonzero_ext = "0.3.0"
version-compare = "0.1"
version-compare = { version = "0.1", optional = true }
rand = { features = ["small_rng"], workspace = true }
lz4_flex = "0.11.0"
lz4_flex = { version = "0.11.0", optional = true }
clap.workspace = true
itertools.workspace = true
rand_distr.workspace = true
bytes.workspace = true
futures.workspace = true
tokio.workspace = true
tokio-util.workspace = true
csv.workspace = true
csv = { workspace = true, optional = true }
hex.workspace = true
hex-literal.workspace = true
async-trait.workspace = true
typetag.workspace = true
tokio-tungstenite = "0.21.0"
Expand All @@ -47,13 +56,13 @@ backtrace = "0.3.66"
backtrace-ext = "0.2"

# Parsers
cql3-parser = "0.4.0"
cql3-parser = { version = "0.4.0", optional = true }
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
bincode.workspace = true
num = { version = "0.4.0", features = ["serde"] }
uuid.workspace = true
uuid = { workspace = true, optional = true }
bigdecimal = { version = "0.4.0", features = ["serde"] }
base64 = "0.21.0"
httparse = "1.8.0"
Expand All @@ -69,8 +78,8 @@ hyper.workspace = true
halfbrown = "0.2.1"

# Transform dependencies
redis-protocol.workspace = true
cassandra-protocol.workspace = true
redis-protocol = { workspace = true, optional = true }
cassandra-protocol = { workspace = true, optional = true }
crc16 = "0.4.0"
ordered-float.workspace = true

Expand All @@ -80,19 +89,21 @@ aws-sdk-kms = "1.1.0"
strum_macros = "0.25"
chacha20poly1305 = { version = "0.10.0", features = ["std"] }
generic-array = { version = "0.14", features = ["serde"] }
kafka-protocol = "0.8.0"
kafka-protocol = { version = "0.8.0", optional = true }
rustls = { version = "0.22.0" }
tokio-rustls = "0.25"
rustls-pemfile = "2.0.0"
rustls-pki-types = "1.0.1"
string = "0.3.0"
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
dashmap = "5.4.0"
xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true }
dashmap = { version = "5.4.0", optional = true }
atoi = "2.0.0"

[dev-dependencies]
criterion = { version = "0.5.0", features = ["async_tokio"] }
hex-literal.workspace = true

[[bench]]
name = "benches"
harness = false
required-features = ["cassandra", "redis"]
4 changes: 3 additions & 1 deletion shotover/benches/benches/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use criterion::criterion_main;

// TODO: apply features to modules after https://github.com/shotover/shotover-proxy/pull/1424
#[cfg(all(feature = "cassandra", feature = "kafka"))]
mod chain;
#[cfg(all(feature = "cassandra", feature = "kafka"))]
mod codec;

fn init() {
Expand Down
10 changes: 10 additions & 0 deletions shotover/src/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
//! Codec types to use for connecting to a DB in a sink transform
use crate::message::Messages;
#[cfg(feature = "cassandra")]
use cassandra_protocol::compression::Compression;
use core::fmt;
#[cfg(feature = "kafka")]
use kafka::RequestHeader;
use metrics::{register_histogram, Histogram};
use tokio_util::codec::{Decoder, Encoder};

#[cfg(feature = "cassandra")]
pub mod cassandra;
#[cfg(feature = "kafka")]
pub mod kafka;
pub mod opensearch;
#[cfg(feature = "redis")]
pub mod redis;

#[derive(Eq, PartialEq, Copy, Clone)]
Expand Down Expand Up @@ -40,10 +45,13 @@ pub fn message_latency(direction: Direction, destination_name: String) -> Histog

#[derive(Debug, Clone, PartialEq, Copy)]
pub enum CodecState {
#[cfg(feature = "cassandra")]
Cassandra {
compression: Compression,
},
#[cfg(feature = "redis")]
Redis,
#[cfg(feature = "kafka")]
Kafka {
request_header: Option<RequestHeader>,
},
Expand All @@ -52,6 +60,7 @@ pub enum CodecState {
}

impl CodecState {
#[cfg(feature = "cassandra")]
pub fn as_cassandra(&self) -> Compression {
match self {
CodecState::Cassandra { compression } => *compression,
Expand All @@ -61,6 +70,7 @@ impl CodecState {
}
}

#[cfg(feature = "kafka")]
pub fn as_kafka(&self) -> Option<RequestHeader> {
match self {
CodecState::Kafka { request_header } => *request_header,
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/config/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Topology {
}
}

#[cfg(test)]
#[cfg(all(test, feature = "redis", feature = "cassandra"))]
mod topology_tests {
use crate::config::chain::TransformChainConfig;
use crate::config::topology::Topology;
Expand Down
Loading

0 comments on commit 6a2f72c

Please sign in to comment.