Skip to content

Commit

Permalink
Merge branch 'main' into kafka-node-connection-factory
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 27, 2024
2 parents 53de9dc + 0e73a15 commit 7cfdb7b
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 7 deletions.
2 changes: 1 addition & 1 deletion shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ impl Bench for KafkaBench {
) -> Result<()> {
let config_dir = match self.topology {
KafkaTopology::Single | KafkaTopology::Cluster1 => "tests/test-configs/kafka/bench",
KafkaTopology::Cluster3 => "tests/test-configs/kafka/cluster",
KafkaTopology::Cluster3 => "tests/test-configs/kafka/cluster-1-rack",
};
let _compose = docker_compose(&format!("{}/docker-compose.yaml", config_dir));

Expand Down
70 changes: 64 additions & 6 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ async fn passthrough_sasl_encode() {

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn cluster_single_shotover() {
let _docker_compose = docker_compose("tests/test-configs/kafka/cluster/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/cluster/topology-single.yaml")
async fn cluster_1_rack_single_shotover() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml")
.start()
.await;

Expand All @@ -144,13 +145,70 @@ async fn cluster_single_shotover() {

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn cluster_multi_shotover() {
let _docker_compose = docker_compose("tests/test-configs/kafka/cluster/docker-compose.yaml");
async fn cluster_1_rack_multi_shotover() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
let mut shotovers = vec![];
for i in 1..4 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster/topology{i}.yaml"
"tests/test-configs/kafka/cluster-1-rack/topology{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await,
);
}

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
test_cases::basic(connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn cluster_2_racks_single_shotover() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");
let shotover =
shotover_process("tests/test-configs/kafka/cluster-2-racks/topology-single.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn cluster_2_racks_multi_shotover() {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");

// One shotover instance per rack
let mut shotovers = vec![];
for i in 1..3 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
version: "3"
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1
services:
kafka0:
image: &image 'bitnami/kafka:3.6.1-debian-11-r24'
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment: &environment
KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093,3@kafka3:9093,4@kafka4:9093,5@kafka5:9093"
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_BROKER_RACK: "rack1"
ALLOW_PLAINTEXT_LISTENER: "yes"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
kafka1:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092"
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_BROKER_RACK: "rack1"
volumes: *volumes
kafka2:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092"
KAFKA_CFG_NODE_ID: 2
KAFKA_CFG_BROKER_RACK: "rack1"
volumes: *volumes
kafka3:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.5
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.5:9092"
KAFKA_CFG_NODE_ID: 3
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
kafka4:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.6
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.6:9092"
KAFKA_CFG_NODE_ID: 4
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
kafka5:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.7
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.7:9092"
KAFKA_CFG_NODE_ID: 5
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9191"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes:
- "127.0.0.1:9191"
- "127.0.0.1:9192"
first_contact_points: ["172.16.1.5:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes: ["127.0.0.1:9192"]
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000

0 comments on commit 7cfdb7b

Please sign in to comment.