From d6beae420588a322e0f11eb7f155ea6196795f12 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 20 Mar 2024 13:50:49 +1100 Subject: [PATCH] fix kafka windsock benches OOM --- shotover-proxy/benches/windsock/cloud/aws.rs | 39 ++++-- .../benches/windsock/kafka/bench.rs | 131 ++++++++++-------- shotover-proxy/benches/windsock/main.rs | 3 +- 3 files changed, 107 insertions(+), 66 deletions(-) diff --git a/shotover-proxy/benches/windsock/cloud/aws.rs b/shotover-proxy/benches/windsock/cloud/aws.rs index f204ccef3..cddc4d7e7 100644 --- a/shotover-proxy/benches/windsock/cloud/aws.rs +++ b/shotover-proxy/benches/windsock/cloud/aws.rs @@ -193,21 +193,43 @@ impl Ec2InstanceWithDocker { } pub async fn run_container(&self, image: &str, envs: &[(String, String)]) { - // cleanup old resources - // TODO: we need a way to ensure there are no shotover resources running. - // Maybe `.run_shotover` could start both shotover and docker so that we are free to kill shotover in this function + self.run_container_inner(image, envs, "killall -w shotover-bin") + .await; + } + + // Shotover is always instantiated alongside a container to avoid race conditions when cleaning up any existing shotover. + pub async fn run_container_and_shotover( + self: Arc, + image: &str, + envs: &[(String, String)], + topology: &str, + ) -> RunningShotover { + let self2 = self.clone(); + tokio::join!( + self2.run_container_inner(image, envs, ""), + self.run_shotover_inner(topology), + ) + .1 + } + + async fn run_container_inner( + &self, + image: &str, + envs: &[(String, String)], + extra_cleanup: &str, + ) { self.instance .ssh() - .shell( - r#" + .shell(&format!( + r#"{extra_cleanup} CONTAINERS=$(sudo docker ps -a -q) if [ -n "$CONTAINERS" ] then sudo docker stop $CONTAINERS sudo docker rm $CONTAINERS fi -sudo docker system prune -af"#, - ) +sudo docker system prune -af"# + )) .await; // start container @@ -267,8 +289,7 @@ sudo docker system prune -af"#, } } - #[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))] - pub async fn run_shotover(self: Arc, topology: &str) -> RunningShotover { + pub async fn run_shotover_inner(self: Arc, topology: &str) -> RunningShotover { self.instance .ssh() .push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml")) diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 13f99c86b..75fd803c2 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -136,43 +136,7 @@ impl KafkaBench { tasks.push(tokio::spawn(async move { node.run_container( "bitnami/kafka:3.6.1-debian-11-r24", - &[ - ("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()), - ( - "KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(), - format!("BROKER://{ip}:{port}"), - ), - ( - "KAFKA_CFG_LISTENERS".to_owned(), - format!("BROKER://:{port},CONTROLLER://:9093"), - ), - ( - "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(), - "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT".to_owned(), - ), - ( - "KAFKA_CFG_INTER_BROKER_LISTENER_NAME".to_owned(), - "BROKER".to_owned(), - ), - ( - "KAFKA_CFG_CONTROLLER_LISTENER_NAMES".to_owned(), - "CONTROLLER".to_owned(), - ), - ( - "KAFKA_CFG_PROCESS_ROLES".to_owned(), - "controller,broker".to_owned(), - ), - ( - "KAFKA_HEAP_OPTS".to_owned(), - "-Xmx4096M -Xms4096M".to_owned(), - ), - ("KAFKA_CFG_NODE_ID".to_owned(), i.to_string()), - ( - "KAFKA_KRAFT_CLUSTER_ID".to_owned(), - "abcdefghijklmnopqrstuv".to_owned(), - ), - ("KAFKA_CFG_CONTROLLER_QUORUM_VOTERS".to_owned(), voters), - ], + &Self::kafka_config(i, ip, port, voters), ) .await; })); @@ -182,6 +146,47 @@ impl KafkaBench { } } + fn kafka_config(id: usize, ip: String, port: i16, voters: String) -> Vec<(String, String)> { + let ip = ip.to_string(); + vec![ + ("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()), + ( + "KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(), + format!("BROKER://{ip}:{port}"), + ), + ( + "KAFKA_CFG_LISTENERS".to_owned(), + format!("BROKER://:{port},CONTROLLER://:9093"), + ), + ( + "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(), + "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT".to_owned(), + ), + ( + "KAFKA_CFG_INTER_BROKER_LISTENER_NAME".to_owned(), + "BROKER".to_owned(), + ), + ( + "KAFKA_CFG_CONTROLLER_LISTENER_NAMES".to_owned(), + "CONTROLLER".to_owned(), + ), + ( + "KAFKA_CFG_PROCESS_ROLES".to_owned(), + "controller,broker".to_owned(), + ), + ( + "KAFKA_HEAP_OPTS".to_owned(), + "-Xmx4096M -Xms4096M".to_owned(), + ), + ("KAFKA_CFG_NODE_ID".to_owned(), id.to_string()), + ( + "KAFKA_KRAFT_CLUSTER_ID".to_owned(), + "abcdefghijklmnopqrstuv".to_owned(), + ), + ("KAFKA_CFG_CONTROLLER_QUORUM_VOTERS".to_owned(), voters), + ] + } + async fn run_aws_shotover_on_own_instance( &self, shotover_instance: Option>, @@ -204,16 +209,30 @@ impl KafkaBench { async fn run_aws_shotover_colocated_with_kafka( &self, - instance: Arc, + instances: Vec>, ) -> Option { + let instance = instances[0].clone(); + let id = 0; let ip = instance.instance.private_ip().to_string(); + let port = 9192; + let voters = format!("{id}@{ip}:9093"); + let config = Self::kafka_config(id, ip.clone(), port, voters); + let image = "bitnami/kafka:3.6.1-debian-11-r24"; + match self.shotover { Shotover::Standard | Shotover::ForcedMessageParsed => { let topology = self.generate_topology_yaml(format!("{ip}:9092"), format!("{ip}:9192")); - Some(instance.run_shotover(&topology).await) + Some( + instance + .run_container_and_shotover(image, &config, &topology) + .await, + ) + } + Shotover::None => { + instance.run_container(image, &config).await; + None } - Shotover::None => None, } } @@ -332,22 +351,22 @@ impl Bench for KafkaBench { ) .await; - let (_, running_shotover) = - futures::join!(self.run_aws_kafka(kafka_instances.clone()), async { - match self.topology { - KafkaTopology::Single => { - self.run_aws_shotover_colocated_with_kafka(kafka_instances[0].clone()) - .await - } - KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => { - self.run_aws_shotover_on_own_instance( - shotover_instance, - kafka_instances[0].clone(), - ) - .await - } - } - }); + let (_, running_shotover) = match self.topology { + KafkaTopology::Single => ( + (), + self.run_aws_shotover_colocated_with_kafka(kafka_instances) + .await, + ), + KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => { + futures::join!( + self.run_aws_kafka(kafka_instances.clone()), + self.run_aws_shotover_on_own_instance( + shotover_instance, + kafka_instances[0].clone(), + ) + ) + } + }; let destination_address = if let Shotover::Standard | Shotover::ForcedMessageParsed = self.shotover { diff --git a/shotover-proxy/benches/windsock/main.rs b/shotover-proxy/benches/windsock/main.rs index a33b589e7..198146369 100644 --- a/shotover-proxy/benches/windsock/main.rs +++ b/shotover-proxy/benches/windsock/main.rs @@ -3,7 +3,8 @@ any( not(feature = "cassandra"), not(feature = "redis"), - not(all(feature = "rdkafka-driver-tests", feature = "kafka")) + not(all(feature = "rdkafka-driver-tests", feature = "kafka")), + not(feature = "opensearch"), ), allow(dead_code, unused_imports, unused_variables, unused_mut) )]