From 3bb3dc8833bd2d16d74bbebf3c5c8e39214b25b3 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 21 Nov 2023 11:17:06 +1100 Subject: [PATCH] Fix kafka benches --- shotover-proxy/benches/windsock/aws/mod.rs | 57 ++++++++++++++-------- shotover-proxy/benches/windsock/kafka.rs | 39 +++++++++------ 2 files changed, 60 insertions(+), 36 deletions(-) diff --git a/shotover-proxy/benches/windsock/aws/mod.rs b/shotover-proxy/benches/windsock/aws/mod.rs index 6a54e5047..9d0dec428 100644 --- a/shotover-proxy/benches/windsock/aws/mod.rs +++ b/shotover-proxy/benches/windsock/aws/mod.rs @@ -109,6 +109,16 @@ curl -sSL https://get.docker.com/ | sudo sh"#, ) .await; + let local_shotover_path = bin_path!("shotover-proxy"); + instance + .ssh() + .push_file(local_shotover_path, Path::new("shotover-bin")) + .await; + instance + .ssh() + .push_file(Path::new("config/config.yaml"), Path::new("config.yaml")) + .await; + let instance = Arc::new(Ec2InstanceWithDocker { instance }); (*self.docker_instances.write().await).push(instance.clone()); instance @@ -162,6 +172,7 @@ sudo apt-get install -y sysstat"#, } } +/// Despite the name can also run shotover pub struct Ec2InstanceWithDocker { pub instance: Ec2Instance, } @@ -239,6 +250,15 @@ sudo docker system prune -af"#, } } } + + #[cfg(feature = "rdkafka-driver-tests")] + pub async fn run_shotover(self: Arc, topology: &str) -> RunningShotover { + self.instance + .ssh() + .push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml")) + .await; + RunningShotover::new(&self.instance).await + } } fn get_compatible_instance_type() -> InstanceType { @@ -278,17 +298,26 @@ impl Ec2InstanceWithShotover { .ssh() .push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml")) .await; + RunningShotover::new(&self.instance).await + } +} +pub struct RunningShotover { + shutdown_tx: tokio::sync::mpsc::UnboundedSender<()>, + event_rx: tokio::sync::mpsc::UnboundedReceiver, +} + +impl RunningShotover { + async fn new(instance: &Ec2Instance) -> Self { let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel(); let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel(); + let mut receiver = instance + .ssh() + .shell_stdout_lines(r#" + killall -w shotover-bin > /dev/null || true + RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topology.yaml --log-format json"#) + .await; tokio::task::spawn(async move { - let mut receiver = self - .instance - .ssh() - .shell_stdout_lines(r#" -killall -w shotover-bin > /dev/null || true -RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topology.yaml --log-format json"#) - .await; loop { tokio::select! { line = receiver.recv() => { @@ -310,17 +339,11 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo } }, _ = shutdown_rx.recv() => { - // shutdown_tx is dropped, instructing us to shutdown - // we MUST drop self before dropping event_tx to ensure that the Arc clone is dropped before the task indicates that it has terminated. - // Otherwise we may hit a race condition and fail the assertion that there is only one Arc clone alive. - std::mem::drop(self); - std::mem::drop(event_tx); return; }, } } }); - // wait for shotover to startup loop { let event = event_rx @@ -334,20 +357,12 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo break; } } - RunningShotover { shutdown_tx, event_rx, } } -} -pub struct RunningShotover { - shutdown_tx: tokio::sync::mpsc::UnboundedSender<()>, - event_rx: tokio::sync::mpsc::UnboundedReceiver, -} - -impl RunningShotover { pub async fn shutdown(mut self) { // dropping shutdown_tx instructs the task to shutdown causing shotover to be terminated std::mem::drop(self.shutdown_tx); diff --git a/shotover-proxy/benches/windsock/kafka.rs b/shotover-proxy/benches/windsock/kafka.rs index 50421a38b..1dec4bd59 100644 --- a/shotover-proxy/benches/windsock/kafka.rs +++ b/shotover-proxy/benches/windsock/kafka.rs @@ -1,4 +1,4 @@ -use crate::aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover}; +use crate::aws::Ec2InstanceWithDocker; use crate::common::{self, Shotover}; use crate::profilers::{self, CloudProfilerRunner, ProfilerRunner}; use crate::shotover::shotover_process_custom_topology; @@ -69,14 +69,14 @@ impl KafkaBench { async fn run_aws_shotover( &self, - instance: Arc, + instance: Arc, kafka_ip: String, ) -> Option { let ip = instance.instance.private_ip().to_string(); match self.shotover { Shotover::Standard | Shotover::ForcedMessageParsed => { let topology = - self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9092")); + self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9192")); Some(instance.run_shotover(&topology).await) } Shotover::None => None, @@ -128,28 +128,33 @@ impl Bench for KafkaBench { ("kafka".to_owned(), &kafka_instance.instance), ] .into(); - if let Shotover::ForcedMessageParsed | Shotover::Standard = self.shotover { - profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance); - } + + // TODO: enable when testing KafkaSinkCluster + //profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance); + let mut profiler = CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await; let kafka_ip = kafka_instance.instance.private_ip().to_string(); - let shotover_ip = shotover_instance.instance.private_ip().to_string(); + // TODO: make use of this when we start benching KafkaSinkCluster + let _shotover_ip = shotover_instance.instance.private_ip().to_string(); let (_, running_shotover) = futures::join!( - run_aws_kafka(kafka_instance), - self.run_aws_shotover(shotover_instance, kafka_ip.clone()) + run_aws_kafka(kafka_instance.clone(), 9192), + self.run_aws_shotover(kafka_instance, kafka_ip.clone()) ); - let destination_ip = if running_shotover.is_some() { - shotover_ip + let destination_address = if running_shotover.is_some() { + format!("{kafka_ip}:9092") } else { - kafka_ip + format!("{kafka_ip}:9192") }; bench_instance - .run_bencher(&self.run_args(&destination_ip, ¶meters), &self.name()) + .run_bencher( + &self.run_args(&destination_address, ¶meters), + &self.name(), + ) .await; profiler.finish(); @@ -263,7 +268,7 @@ impl Bench for KafkaBench { } } -async fn run_aws_kafka(instance: Arc) { +async fn run_aws_kafka(instance: Arc, port: i16) { let ip = instance.instance.private_ip().to_string(); instance .run_container( @@ -272,7 +277,11 @@ async fn run_aws_kafka(instance: Arc) { ("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()), ( "KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(), - format!("PLAINTEXT://{ip}:9092"), + format!("PLAINTEXT://{ip}:{port}"), + ), + ( + "KAFKA_CFG_LISTENERS".to_owned(), + format!("PLAINTEXT://:{port},CONTROLLER://:9093"), ), ("KAFKA_HEAP_OPTS".to_owned(), "-Xmx512M -Xms512M".to_owned()), ],