Skip to content

Commit

Permalink
Make KafkaSinkCluster rack aware (#1527)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Mar 14, 2024
1 parent 1d23452 commit 5523020
Show file tree
Hide file tree
Showing 16 changed files with 357 additions and 220 deletions.
8 changes: 6 additions & 2 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use itertools::Itertools;
use shotover::config::chain::TransformChainConfig;
use shotover::sources::SourceConfig;
use shotover::transforms::debug::force_parse::DebugForceEncodeConfig;
use shotover::transforms::kafka::sink_cluster::KafkaSinkClusterConfig;
use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, ShotoverNodeConfig};
use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig;
use shotover::transforms::TransformConfig;
use std::sync::Arc;
Expand Down Expand Up @@ -92,7 +92,11 @@ impl KafkaBench {
connect_timeout_ms: 3000,
read_timeout: None,
first_contact_points: vec![kafka_address],
shotover_nodes: vec![host_address.clone()],
shotover_nodes: vec![ShotoverNodeConfig {
address: host_address.parse().unwrap(),
rack: "rack1".into(),
broker_id: 0,
}],
tls: None,
sasl_enabled: Some(false),
}),
Expand Down
24 changes: 0 additions & 24 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,30 +192,6 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
}
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");
let shotover =
shotover_process("tests/test-configs/kafka/cluster-2-racks/topology-single.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes: ["127.0.0.1:9192"]
shotover_nodes:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ sources:
listen_addr: "127.0.0.1:9191"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ sources:
listen_addr: "127.0.0.1:9193"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ sources:
listen_addr: "127.0.0.1:9191"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack1"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack2"
broker_id: 1
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack1"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack2"
broker_id: 1
first_contact_points: ["172.16.1.5:9092"]
connect_timeout_ms: 3000

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
sasl_enabled: true
shotover_nodes: ["127.0.0.1:9192"]
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ sources:
chain:
- KafkaSinkCluster:
sasl_enabled: true
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ sources:
chain:
- KafkaSinkCluster:
sasl_enabled: true
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ sources:
chain:
- KafkaSinkCluster:
sasl_enabled: true
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
- "127.0.0.1:9193"
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes: ["127.0.0.1:9192"]
shotover_nodes:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
tls:
Expand Down
Loading

0 comments on commit 5523020

Please sign in to comment.