Skip to content

Commit

Permalink
Add feature flags for each protocol in shotover
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Feb 1, 2024
1 parent 82a245e commit 541973a
Show file tree
Hide file tree
Showing 26 changed files with 330 additions and 64 deletions.
17 changes: 14 additions & 3 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ rustflags = [
linker = "aarch64-linux-gnu-gcc"

[alias]
windsock = "test --release --bench windsock --features alpha-transforms,rdkafka-driver-tests --"
windsock-debug = "test --bench windsock --features alpha-transforms,rdkafka-driver-tests --"
windsock-cloud-docker = "run --package windsock-cloud-docker --"
# Can run every benchmark
windsock = "test --release --bench windsock --features kafka,alpha-transforms,rdkafka-driver-tests,cassandra,redis --"
windsock-debug = "test --bench windsock --features kafka,alpha-transforms,rdkafka-driver-tests,cassandra,redis --"

# Can only run benchmarks specific to the protocol but compiles a lot faster
windsock-redis = "test --release --bench windsock --no-default-features --features redis,alpha-transforms --"
windsock-kafka = "test --release --bench windsock --no-default-features --features kafka,alpha-transforms,rdkafka-driver-tests --"
windsock-cassandra = "test --release --bench windsock --no-default-features --features cassandra,alpha-transforms --"

# Compile benches in docker to ensure compiled libc version is compatible with the EC2 instances libc
windsock-cloud-docker = "run --package windsock-cloud-docker -- redis,cassandra,kafka"
windsock-cloud-docker-redis = "run --package windsock-cloud-docker -- redis"
windsock-cloud-docker-kafka = "run --package windsock-cloud-docker -- kafka"
windsock-cloud-docker-cassandra = "run --package windsock-cloud-docker -- cassandra"
6 changes: 4 additions & 2 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ jobs:
- name: Install ubuntu packages
run: shotover-proxy/build/install_ubuntu_packages.sh
- name: Install cargo-hack
run: cargo install cargo-hack --version 0.5.8
uses: taiki-e/install-action@v2
with:
tool: [email protected]
- name: Ensure that dev tools compiles and has no warnings with no features enabled
run: cargo clippy --locked ${{ matrix.cargo_flags }} --all-targets -- -D warnings
- name: Ensure that shotover-proxy compiles and has no warnings under every possible combination of features
# some things to explicitly point out:
# * clippy also reports rustc warnings and errors
# * clippy --all-targets is not run so we only build the shotover_proxy executable without the tests/benches
run: cargo hack --feature-powerset clippy --locked ${{ matrix.cargo_flags }} --package shotover-proxy -- -D warnings
run: cargo hack --feature-powerset --at-least-one-of redis,cassandra,kafka,opensearch clippy --locked ${{ matrix.cargo_flags }} --package shotover-proxy -- -D warnings
4 changes: 2 additions & 2 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ jobs:
- name: Install cargo-hack
uses: taiki-e/install-action@v2
with:
tool: [email protected].4
tool: [email protected].16
- name: Ensure `cargo fmt --all` was run
run: cargo fmt --all -- --check
- name: Ensure that all crates compile and have no warnings under every possible combination of features
# some things to explicitly point out:
# * clippy also reports rustc warnings and errors
# * clippy --all-targets causes clippy to run against tests and examples which it doesnt do by default.
run: cargo hack --feature-powerset clippy --all-targets --locked -- -D warnings
run: cargo hack --feature-powerset --at-least-one-of redis,cassandra,kafka,opensearch clippy --all-targets --locked -- -D warnings
- name: Report disk usage
run: |
df -h
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ codegen-units = 1
scylla = { version = "0.11.0", features = ["ssl"] }
bytes = { version = "1.0.0", features = ["serde"] }
tokio = { version = "1.25.0", features = ["full", "macros"] }
tokio-util = { version = "0.7.7" }
tokio-util = { version = "0.7.7", features = ["codec"] }
tokio-openssl = "0.6.2"
itertools = "0.12.0"
openssl = { version = "0.10.36", features = ["vendored"] }
Expand Down
10 changes: 9 additions & 1 deletion custom-transforms-example/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
shotover = { path = "../shotover" }
shotover = { path = "../shotover", default-features = false}
anyhow.workspace = true
serde.workspace = true
async-trait.workspace = true
Expand All @@ -20,3 +20,11 @@ typetag.workspace = true
test-helpers = {path = "../test-helpers"}
tokio.workspace = true
redis.workspace = true

[features]
redis = ["shotover/redis"]
default = ["redis"]

[[test]]
name = "test"
required-features = ["redis"]
2 changes: 2 additions & 0 deletions custom-transforms-example/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use shotover::runner::Shotover;

#[cfg(feature = "redis")]
mod redis_get_rewrite;
#[cfg(feature = "redis")]
shotover::import_transform!(redis_get_rewrite::RedisGetRewriteConfig);

fn main() {
Expand Down
7 changes: 6 additions & 1 deletion shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
shotover = { path = "../shotover" }
shotover = { path = "../shotover", default-features = false}

[dev-dependencies]
prometheus-parse = "0.2.4"
Expand Down Expand Up @@ -60,8 +60,13 @@ shell-quote.workspace = true
[features]
# Include WIP alpha transforms in the public API
alpha-transforms = ["shotover/alpha-transforms"]
cassandra = ["shotover/cassandra"]
kafka = ["shotover/kafka"]
redis = ["shotover/redis"]
opensearch = ["shotover/opensearch"]
cassandra-cpp-driver-tests = ["test-helpers/cassandra-cpp-driver-tests"]
rdkafka-driver-tests = ["test-helpers/rdkafka-driver-tests"]
default = ["cassandra", "kafka", "redis", "opensearch"]

[[bench]]
name = "windsock"
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/benches/windsock/cloud/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ sudo docker system prune -af"#,
}
}

#[cfg(feature = "rdkafka-driver-tests")]
#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))]
pub async fn run_shotover(self: Arc<Self>, topology: &str) -> RunningShotover {
self.instance
.ssh()
Expand Down
18 changes: 16 additions & 2 deletions shotover-proxy/benches/windsock/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
// Allow dead code if any of the protocol features are disabled
#![cfg_attr(
any(
not(feature = "cassandra"),
not(feature = "redis"),
not(all(feature = "rdkafka-driver-tests", feature = "kafka"))
),
allow(dead_code, unused_imports, unused_variables, unused_mut)
)]

#[cfg(feature = "cassandra")]
mod cassandra;
mod cloud;
mod common;
#[cfg(feature = "rdkafka-driver-tests")]
#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))]
mod kafka;
mod profilers;
#[cfg(feature = "redis")]
mod redis;
mod shotover;

Expand Down Expand Up @@ -38,9 +50,11 @@ fn main() {

let mut benches = vec![];

#[cfg(feature = "cassandra")]
benches.extend(cassandra::benches());
#[cfg(feature = "rdkafka-driver-tests")]
#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))]
benches.extend(kafka::benches());
#[cfg(feature = "redis")]
benches.extend(redis::benches());

Windsock::new(benches, cloud::AwsCloud::new_boxed(), &["release"]).run();
Expand Down
9 changes: 7 additions & 2 deletions shotover-proxy/tests/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
#[allow(clippy::single_component_path_imports)]
#[allow(clippy::single_component_path_imports, unused_imports)]
use rstest_reuse;

use test_helpers::shotover_process::ShotoverProcessBuilder;
use tokio_bin_process::bin_path;

#[cfg(feature = "cassandra")]
mod cassandra_int_tests;
#[cfg(feature = "kafka")]
mod kafka_int_tests;
#[cfg(feature = "alpha-transforms")]
#[cfg(all(feature = "alpha-transforms", feature = "opensearch"))]
mod opensearch_int_tests;
#[cfg(feature = "redis")]
mod redis_int_tests;
#[cfg(feature = "redis")]
mod runner;
#[cfg(feature = "redis")]
mod transforms;

pub fn shotover_process(topology_path: &str) -> ShotoverProcessBuilder {
Expand Down
88 changes: 62 additions & 26 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,61 @@ description = "Shotover API for building custom transforms"
[features]
# Include WIP alpha transforms in the public API
alpha-transforms = []
cassandra = [
"dep:cassandra-protocol",
"dep:cql3-parser",
"dep:lz4_flex",
"dep:version-compare",
"dep:aws-sdk-kms",
"dep:aws-config",
"dep:base64",
"dep:serde_json",
"dep:halfbrown",
"dep:chacha20poly1305",
"dep:generic-array",
"dep:hex",
"dep:bincode",
"dep:cached",
]
kafka = [
"dep:kafka-protocol",
"dep:dashmap",
"dep:xxhash-rust",
"dep:string",
]
redis = [
"dep:redis-protocol",
"dep:csv",
"dep:crc16",
]
opensearch = [
"dep:atoi",
"dep:http",
"dep:httparse",
]
default = ["cassandra", "redis", "kafka", "opensearch"]

[dependencies]
atomic_enum = "0.2.0"
pretty-hex = "0.4.0"
tokio-stream = "0.1.2"
bytes-utils = "0.1.1"
derivative = "2.1.1"
cached = { version = "0.48", features = ["async"] }
cached = { version = "0.48", features = ["async"], optional = true }
governor = { version = "0.6", default-features = false, features = ["std", "jitter", "quanta"] }
nonzero_ext = "0.3.0"
version-compare = "0.1"
version-compare = { version = "0.1", optional = true }
rand = { features = ["small_rng"], workspace = true }
lz4_flex = "0.11.0"
lz4_flex = { version = "0.11.0", optional = true }
clap.workspace = true
itertools.workspace = true
rand_distr.workspace = true
bytes.workspace = true
futures.workspace = true
tokio.workspace = true
tokio-util.workspace = true
csv.workspace = true
hex.workspace = true
hex-literal.workspace = true
csv = { workspace = true, optional = true }
hex = { workspace = true, optional = true }
async-trait.workspace = true
typetag.workspace = true
tokio-tungstenite = "0.21.0"
Expand All @@ -46,17 +78,17 @@ backtrace = "0.3.66"
backtrace-ext = "0.2"

# Parsers
cql3-parser = "0.4.0"
cql3-parser = { version = "0.4.0", optional = true }
serde.workspace = true
serde_json.workspace = true
serde_json = { workspace = true, optional = true }
serde_yaml.workspace = true
bincode.workspace = true
bincode = { workspace = true, optional = true }
num = { version = "0.4.0", features = ["serde"] }
uuid.workspace = true
uuid = { workspace = true }
bigdecimal = { version = "0.4.0", features = ["serde"] }
base64 = "0.21.0"
httparse = "1.8.0"
http = "0.2.9"
base64 = { version = "0.21.0", optional = true }
httparse = { version = "1.8.0", optional = true }
http = { version = "0.2.9", optional = true }

#Observability
metrics = "0.21.0"
Expand All @@ -65,32 +97,36 @@ tracing.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
hyper.workspace = true
halfbrown = "0.2.1"
halfbrown = { version = "0.2.1", optional = true }

# Transform dependencies
redis-protocol.workspace = true
cassandra-protocol.workspace = true
crc16 = "0.4.0"
redis-protocol = { workspace = true, optional = true }
cassandra-protocol = { workspace = true, optional = true }
crc16 = { version = "0.4.0", optional = true }
ordered-float.workspace = true

#Crypto
aws-config = "1.0.0"
aws-sdk-kms = "1.1.0"
chacha20poly1305 = { version = "0.10.0", features = ["std"] }
generic-array = { version = "0.14", features = ["serde"] }
kafka-protocol = "0.8.0"
aws-config = { version = "1.0.0", optional = true }
aws-sdk-kms = { version = "1.1.0", optional = true }
chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true }
generic-array = { version = "0.14", features = ["serde"], optional = true }
kafka-protocol = { version = "0.8.0", optional = true }
rustls = { version = "0.22.0" }
tokio-rustls = "0.25"
rustls-pemfile = "2.0.0"
rustls-pki-types = "1.0.1"
string = "0.3.0"
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
dashmap = "5.4.0"
atoi = "2.0.0"
string = { version = "0.3.0", optional = true }
xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true }
dashmap = { version = "5.4.0", optional = true }
atoi = { version = "2.0.0", optional = true }

[dev-dependencies]
criterion = { version = "0.5.0", features = ["async_tokio"] }
hex-literal.workspace = true

# TODO: Optionally compiling benches is quite tricky with criterion, maybe it would be easier with divan?
# For now just set required features
[[bench]]
name = "benches"
harness = false
required-features = ["cassandra", "redis", "kafka"]
12 changes: 12 additions & 0 deletions shotover/src/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
//! Codec types to use for connecting to a DB in a sink transform
use crate::message::Messages;
#[cfg(feature = "cassandra")]
use cassandra_protocol::compression::Compression;
use core::fmt;
#[cfg(feature = "kafka")]
use kafka::RequestHeader;
use metrics::{register_histogram, Histogram};
use tokio_util::codec::{Decoder, Encoder};

#[cfg(feature = "cassandra")]
pub mod cassandra;
#[cfg(feature = "kafka")]
pub mod kafka;
#[cfg(feature = "opensearch")]
pub mod opensearch;
#[cfg(feature = "redis")]
pub mod redis;

#[derive(Eq, PartialEq, Copy, Clone)]
Expand Down Expand Up @@ -40,18 +46,23 @@ pub fn message_latency(direction: Direction, destination_name: String) -> Histog

#[derive(Debug, Clone, PartialEq, Copy)]
pub enum CodecState {
#[cfg(feature = "cassandra")]
Cassandra {
compression: Compression,
},
#[cfg(feature = "redis")]
Redis,
#[cfg(feature = "kafka")]
Kafka {
request_header: Option<RequestHeader>,
},
Dummy,
#[cfg(feature = "opensearch")]
OpenSearch,
}

impl CodecState {
#[cfg(feature = "cassandra")]
pub fn as_cassandra(&self) -> Compression {
match self {
CodecState::Cassandra { compression } => *compression,
Expand All @@ -61,6 +72,7 @@ impl CodecState {
}
}

#[cfg(feature = "kafka")]
pub fn as_kafka(&self) -> Option<RequestHeader> {
match self {
CodecState::Kafka { request_header } => *request_header,
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/config/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Topology {
}
}

#[cfg(test)]
#[cfg(all(test, feature = "redis", feature = "cassandra"))]
mod topology_tests {
use crate::config::chain::TransformChainConfig;
use crate::config::topology::Topology;
Expand Down
Loading

0 comments on commit 541973a

Please sign in to comment.