Skip to content

Commit

Permalink
Merge branch 'main' into remove_transform_pushed
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Mar 14, 2024
2 parents 51881af + 5523020 commit 020164f
Show file tree
Hide file tree
Showing 38 changed files with 895 additions and 3,227 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/windsock_benches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ jobs:
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers samply name=cassandra,compression=none,connection_count=1,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor name=kafka,shotover=standard,size=1B,topology=single
cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers shotover_metrics name=redis,encryption=none,operation=get,shotover=standard,topology=single
# windsock/examples/cassandra.rs - this can stay here until windsock is moved to its own repo
cargo run --release --example cassandra -- local-run --bench-length-seconds 5 --operations-per-second 100
- name: Ensure that tests did not create or modify any files that arent .gitignore'd
run: |
if [ -n "$(git status --porcelain)" ]; then
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[workspace]
members = [
"windsock",
"shotover",
"shotover-proxy",
"test-helpers",
Expand Down
5 changes: 4 additions & 1 deletion docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ Instead Shotover will pretend to be either a single Kafka node or part of a clus
This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster messages to contain the nodes in the shotover cluster instead of the kafka cluster.

```yaml
- CassandraSinkCluster:
- KafkaSinkCluster:
# Addresses of the initial kafka brokers to connect to.
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
Expand All @@ -309,6 +309,9 @@ This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
# When this field is enabled it allows the use of SASL authentication. If you intend to use SASL this field must be enabled, it is false by default.
sasl_enabled: false
# When this field is provided TLS is used when connecting to the remote address.
# Removing this field will disable TLS.
#tls:
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ tokio-bin-process.workspace = true
rustls-pemfile = "2.0.0"
rustls-pki-types = "1.1.0"
aws-throwaway.workspace = true
windsock = { path = "../windsock" }
windsock = "0.1.0"
regex = "1.7.0"
cql-ws = { git = "https://github.com/shotover/cql-ws" }
opensearch = "2.1.0"
Expand Down
9 changes: 7 additions & 2 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use itertools::Itertools;
use shotover::config::chain::TransformChainConfig;
use shotover::sources::SourceConfig;
use shotover::transforms::debug::force_parse::DebugForceEncodeConfig;
use shotover::transforms::kafka::sink_cluster::KafkaSinkClusterConfig;
use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, ShotoverNodeConfig};
use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig;
use shotover::transforms::TransformConfig;
use std::sync::Arc;
Expand Down Expand Up @@ -92,8 +92,13 @@ impl KafkaBench {
connect_timeout_ms: 3000,
read_timeout: None,
first_contact_points: vec![kafka_address],
shotover_nodes: vec![host_address.clone()],
shotover_nodes: vec![ShotoverNodeConfig {
address: host_address.parse().unwrap(),
rack: "rack1".into(),
broker_id: 0,
}],
tls: None,
sasl_enabled: Some(false),
}),
});
common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig {
Expand Down
91 changes: 67 additions & 24 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use test_helpers::docker_compose::docker_compose;
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_standard(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
Expand All @@ -31,7 +31,7 @@ async fn passthrough_standard(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

Expand All @@ -52,10 +52,11 @@ async fn passthrough_tls(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

Expand All @@ -80,7 +81,7 @@ async fn cluster_tls(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_encode(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
Expand All @@ -97,7 +98,7 @@ async fn passthrough_encode(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_sasl(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
Expand All @@ -116,7 +117,7 @@ async fn passthrough_sasl(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
Expand All @@ -132,10 +133,11 @@ async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
Expand All @@ -154,9 +156,10 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")]
async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
Expand Down Expand Up @@ -189,21 +192,61 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
}
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) {
//#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");
let shotover =
shotover_process("tests/test-configs/kafka/cluster-2-racks/topology-single.yaml")

// One shotover instance per rack
let mut shotovers = vec![];
for i in 1..3 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await;
.await,
);
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_sasl_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl/docker-compose.yaml");

let shotover = shotover_process("tests/test-configs/kafka/cluster-sasl/topology-single.yaml")
.start()
.await;

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
Expand All @@ -212,20 +255,19 @@ async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
// #[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
async fn cluster_sasl_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");

// One shotover instance per rack
docker_compose("tests/test-configs/kafka/cluster-sasl/docker-compose.yaml");
let mut shotovers = vec![];
for i in 1..3 {
for i in 1..4 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
"tests/test-configs/kafka/cluster-sasl/topology{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
Expand All @@ -236,7 +278,8 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
);
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;

for shotover in shotovers {
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name
)
.await;

let consumer = connection_builder.connect_consumer(topic_name).await;
let mut consumer = connection_builder.connect_consumer(topic_name).await;

consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -118,7 +118,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
.await;
}

let consumer = connection_builder.connect_consumer(topic_name).await;
let mut consumer = connection_builder.connect_consumer(topic_name).await;

for j in 0..10 {
consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes: ["127.0.0.1:9192"]
shotover_nodes:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ sources:
listen_addr: "127.0.0.1:9191"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ sources:
listen_addr: "127.0.0.1:9193"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ sources:
listen_addr: "127.0.0.1:9191"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack1"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack2"
broker_id: 1
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack1"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack2"
broker_id: 1
first_contact_points: ["172.16.1.5:9092"]
connect_timeout_ms: 3000
Loading

0 comments on commit 020164f

Please sign in to comment.