diff --git a/shotover-proxy/benches/windsock/cassandra.rs b/shotover-proxy/benches/windsock/cassandra.rs index eceb452ac..00d8365d6 100644 --- a/shotover-proxy/benches/windsock/cassandra.rs +++ b/shotover-proxy/benches/windsock/cassandra.rs @@ -1,6 +1,6 @@ use crate::{ aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover}, - common::{self, Shotover}, + common::{self, Shotover, Topology}, profilers::{self, CloudProfilerRunner, ProfilerRunner}, }; use anyhow::Result; @@ -74,12 +74,6 @@ enum CassandraDbInstance { Mocked(MockHandle), } -#[derive(Clone, PartialEq)] -pub enum Topology { - Single, - Cluster3, -} - struct CoreCount { /// Cores to be assigned to the bench's tokio runtime bench: usize, @@ -455,13 +449,7 @@ impl Bench for CassandraBench { CassandraDb::Mocked => "cassandra-mocked".to_owned(), }, ), - ( - "topology".to_owned(), - match &self.topology { - Topology::Single => "single".to_owned(), - Topology::Cluster3 => "cluster3".to_owned(), - }, - ), + self.topology.to_tag(), self.shotover.to_tag(), ( "operation".to_owned(), @@ -564,7 +552,7 @@ impl Bench for CassandraBench { ]; let (_, running_shotover) = futures::join!( - run_aws_cassandra(cassandra_nodes, self.topology.clone()), + run_aws_cassandra(cassandra_nodes, self.topology), self.run_aws_shotover(shotover_instance.clone(), cassandra_ip.clone(),) ); diff --git a/shotover-proxy/benches/windsock/common.rs b/shotover-proxy/benches/windsock/common.rs index 1fd01728b..7c678f802 100644 --- a/shotover-proxy/benches/windsock/common.rs +++ b/shotover-proxy/benches/windsock/common.rs @@ -1,4 +1,4 @@ -use shotover::{config::topology::Topology, sources::SourceConfig}; +use shotover::{config::topology::Topology as ShotoverTopology, sources::SourceConfig}; #[derive(Clone, Copy)] pub enum Shotover { @@ -20,8 +20,26 @@ impl Shotover { } } +#[derive(Clone, Copy, PartialEq)] +pub enum Topology { + Single, + Cluster3, +} + +impl Topology { + pub fn to_tag(self) -> (String, String) { + ( + "topology".to_owned(), + match self { + Topology::Single => "single".to_owned(), + Topology::Cluster3 => "cluster3".to_owned(), + }, + ) + } +} + pub fn generate_topology(source: SourceConfig) -> String { - Topology { + ShotoverTopology { sources: vec![source], } .serialize() diff --git a/shotover-proxy/benches/windsock/kafka.rs b/shotover-proxy/benches/windsock/kafka.rs index 1e46c5420..13c0f9bb4 100644 --- a/shotover-proxy/benches/windsock/kafka.rs +++ b/shotover-proxy/benches/windsock/kafka.rs @@ -1,4 +1,4 @@ -use crate::aws::Ec2InstanceWithDocker; +use crate::aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover}; use crate::common::{self, Shotover}; use crate::profilers::{self, CloudProfilerRunner, ProfilerRunner}; use crate::shotover::shotover_process_custom_topology; @@ -9,6 +9,7 @@ use futures::StreamExt; use shotover::config::chain::TransformChainConfig; use shotover::sources::SourceConfig; use shotover::transforms::debug::force_parse::DebugForceEncodeConfig; +use shotover::transforms::kafka::sink_cluster::KafkaSinkClusterConfig; use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig; use shotover::transforms::TransformConfig; use std::sync::Arc; @@ -23,6 +24,7 @@ use windsock::{Bench, BenchParameters, Profiling, Report}; pub struct KafkaBench { shotover: Shotover, + topology: KafkaTopology, message_size: Size, } @@ -33,15 +35,36 @@ pub enum Size { KB100, } +#[derive(Clone, Copy, PartialEq)] +pub enum KafkaTopology { + Single, + Cluster1, + Cluster3, +} + +impl KafkaTopology { + pub fn to_tag(self) -> (String, String) { + ( + "topology".to_owned(), + match self { + KafkaTopology::Single => "single".to_owned(), + KafkaTopology::Cluster1 => "cluster1".to_owned(), + KafkaTopology::Cluster3 => "cluster3".to_owned(), + }, + ) + } +} + impl KafkaBench { - pub fn new(shotover: Shotover, message_size: Size) -> Self { + pub fn new(shotover: Shotover, topology: KafkaTopology, message_size: Size) -> Self { KafkaBench { shotover, + topology, message_size, } } - fn generate_topology_yaml(&self, host_address: String) -> String { + fn generate_topology_yaml(&self, host_address: String, kafka_address: String) -> String { let mut transforms = vec![]; if let Shotover::ForcedMessageParsed = self.shotover { transforms.push(Box::new(DebugForceEncodeConfig { @@ -50,12 +73,19 @@ impl KafkaBench { }) as Box); } - transforms.push(Box::new(KafkaSinkSingleConfig { - destination_port: 9192, - connect_timeout_ms: 3000, - read_timeout: None, - })); - + transforms.push(match self.topology { + KafkaTopology::Single => Box::new(KafkaSinkSingleConfig { + destination_port: 9192, + connect_timeout_ms: 3000, + read_timeout: None, + }), + KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => Box::new(KafkaSinkClusterConfig { + connect_timeout_ms: 3000, + read_timeout: None, + first_contact_points: vec![kafka_address], + shotover_nodes: vec![host_address.clone()], + }), + }); common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig { name: "kafka".to_owned(), listen_addr: host_address, @@ -67,14 +97,101 @@ impl KafkaBench { })) } - async fn run_aws_shotover( + async fn run_aws_kafka(&self, nodes: Vec>) { + match self.topology { + KafkaTopology::Cluster3 => self.run_aws_kafka_cluster(nodes).await, + KafkaTopology::Cluster1 => self.run_aws_kafka_cluster(vec![nodes[0].clone()]).await, + KafkaTopology::Single => self.run_aws_kafka_single(nodes[0].clone()).await, + } + } + + async fn run_aws_kafka_cluster(&self, nodes: Vec>) { + for (i, node) in nodes.iter().enumerate() { + let ip = node.instance.private_ip().to_string(); + let port = 9192; + node.run_container( + "bitnami/kafka:3.4.0-debian-11-r22", + &[ + ("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()), + ( + "KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(), + format!("PLAINTEXT://{ip}:{port}"), + ), + ( + "KAFKA_CFG_LISTENERS".to_owned(), + format!("PLAINTEXT://:{port},CONTROLLER://:9093"), + ), + ("KAFKA_HEAP_OPTS".to_owned(), "-Xmx512M -Xms512M".to_owned()), + ("KAFKA_CFG_NODE_ID".to_owned(), i.to_string()), + ( + "KAFKA_KRAFT_CLUSTER_ID".to_owned(), + "abcdefghijklmnopqrstuv".to_owned(), + ), + ( + "KAFKA_CFG_CONTROLLER_QUORUM_VOTERS".to_owned(), + format!( + "0@{}:9093,1@{}:9093,2@{}:9093", + nodes[0].instance.private_ip(), + nodes[1].instance.private_ip(), + nodes[2].instance.private_ip(), + ), + ), + ], + ) + .await; + } + } + + async fn run_aws_kafka_single(&self, instance: Arc) { + let ip = instance.instance.private_ip().to_string(); + let port = 9192; + instance + .run_container( + "bitnami/kafka:3.4.0-debian-11-r22", + &[ + ("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()), + ( + "KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(), + format!("PLAINTEXT://{ip}:{port}"), + ), + ( + "KAFKA_CFG_LISTENERS".to_owned(), + format!("PLAINTEXT://:{port},CONTROLLER://:9093"), + ), + ("KAFKA_HEAP_OPTS".to_owned(), "-Xmx512M -Xms512M".to_owned()), + ], + ) + .await; + } + + async fn run_aws_shotover_on_own_instance( + &self, + shotover_instance: Arc, + kafka_instance: Arc, + ) -> Option { + let shotover_ip = shotover_instance.instance.private_ip().to_string(); + let kafka_ip = kafka_instance.instance.private_ip().to_string(); + match self.shotover { + Shotover::Standard | Shotover::ForcedMessageParsed => { + let topology = self.generate_topology_yaml( + format!("{shotover_ip}:9092"), + format!("{kafka_ip}:9192"), + ); + Some(shotover_instance.run_shotover(&topology).await) + } + Shotover::None => None, + } + } + + async fn run_aws_shotover_colocated_with_kafka( &self, instance: Arc, ) -> Option { let ip = instance.instance.private_ip().to_string(); match self.shotover { Shotover::Standard | Shotover::ForcedMessageParsed => { - let topology = self.generate_topology_yaml(format!("{ip}:9092")); + let topology = + self.generate_topology_yaml(format!("{ip}:9092"), format!("{ip}:9192")); Some(instance.run_shotover(&topology).await) } Shotover::None => None, @@ -91,7 +208,7 @@ impl Bench for KafkaBench { fn tags(&self) -> HashMap { [ ("name".to_owned(), "kafka".to_owned()), - ("topology".to_owned(), "single".to_owned()), + self.topology.to_tag(), self.shotover.to_tag(), match self.message_size { Size::B1 => ("size".to_owned(), "1B".to_owned()), @@ -115,35 +232,61 @@ impl Bench for KafkaBench { ) -> Result<()> { let aws = crate::aws::WindsockAws::get().await; - let (kafka_instance, bench_instance, shotover_instance) = futures::join!( + let (kafka_instance1, kafka_instance2, kafka_instance3, bench_instance, shotover_instance) = futures::join!( + aws.create_docker_instance(), + aws.create_docker_instance(), aws.create_docker_instance(), aws.create_bencher_instance(), aws.create_shotover_instance() ); - let profiler_instances: HashMap = [ - ("bencher".to_owned(), &bench_instance.instance), - ("kafka".to_owned(), &kafka_instance.instance), - ] - .into(); + let mut profiler_instances: HashMap = + [("bencher".to_owned(), &bench_instance.instance)].into(); - // TODO: enable when testing KafkaSinkCluster - //profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance); + // only profile instances that we are actually using for this bench + match self.topology { + KafkaTopology::Single | KafkaTopology::Cluster1 => { + profiler_instances.insert("kafka".to_owned(), &kafka_instance1.instance); + profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance); + } + KafkaTopology::Cluster3 => { + profiler_instances.insert("kafka1".to_owned(), &kafka_instance1.instance); + profiler_instances.insert("kafka2".to_owned(), &kafka_instance2.instance); + profiler_instances.insert("kafka3".to_owned(), &kafka_instance3.instance); + profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance); + } + } let mut profiler = CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await; - let kafka_ip = kafka_instance.instance.private_ip().to_string(); - // TODO: make use of this when we start benching KafkaSinkCluster - let _shotover_ip = shotover_instance.instance.private_ip().to_string(); + let kafka_ip = kafka_instance1.instance.private_ip().to_string(); + let shotover_ip = shotover_instance.instance.private_ip().to_string(); - let (_, running_shotover) = futures::join!( - run_aws_kafka(kafka_instance.clone(), 9192), - self.run_aws_shotover(kafka_instance) - ); + let kafka_instances = vec![ + kafka_instance1.clone(), + kafka_instance2.clone(), + kafka_instance3.clone(), + ]; + + let (_, running_shotover) = futures::join!(self.run_aws_kafka(kafka_instances), async { + match self.topology { + KafkaTopology::Single => { + self.run_aws_shotover_colocated_with_kafka(kafka_instance1) + .await + } + KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => { + self.run_aws_shotover_on_own_instance(shotover_instance, kafka_instance1) + .await + } + } + }); let destination_address = if running_shotover.is_some() { - format!("{kafka_ip}:9092") + match &self.topology { + KafkaTopology::Single => format!("{kafka_ip}:9092"), + KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => format!("{shotover_ip}:9092"), + } } else { format!("{kafka_ip}:9192") }; @@ -169,13 +312,22 @@ impl Bench for KafkaBench { profiling: Profiling, parameters: BenchParameters, ) -> Result<()> { - let config_dir = "tests/test-configs/kafka/bench"; + let config_dir = match self.topology { + KafkaTopology::Single | KafkaTopology::Cluster1 => "tests/test-configs/kafka/bench", + KafkaTopology::Cluster3 => "tests/test-configs/kafka/cluster", + }; let _compose = docker_compose(&format!("{}/docker-compose.yaml", config_dir)); + let kafka_address = match self.topology { + KafkaTopology::Single | KafkaTopology::Cluster1 => "127.0.0.1:9192", + KafkaTopology::Cluster3 => "172.16.1.2:9092", + }; + let mut profiler = ProfilerRunner::new(self.name(), profiling); let shotover = match self.shotover { Shotover::Standard | Shotover::ForcedMessageParsed => { - let topology_yaml = self.generate_topology_yaml("127.0.0.1:9092".to_owned()); + let topology_yaml = self + .generate_topology_yaml("127.0.0.1:9092".to_owned(), kafka_address.to_owned()); Some(shotover_process_custom_topology(&topology_yaml, &profiler).await) } Shotover::None => None, @@ -183,7 +335,7 @@ impl Bench for KafkaBench { let broker_address = match self.shotover { Shotover::ForcedMessageParsed | Shotover::Standard => "127.0.0.1:9092", - Shotover::None => "127.0.0.1:9192", + Shotover::None => kafka_address, }; profiler.run(&shotover).await; @@ -263,27 +415,6 @@ impl Bench for KafkaBench { } } -async fn run_aws_kafka(instance: Arc, port: i16) { - let ip = instance.instance.private_ip().to_string(); - instance - .run_container( - "bitnami/kafka:3.4.0-debian-11-r22", - &[ - ("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()), - ( - "KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(), - format!("PLAINTEXT://{ip}:{port}"), - ), - ( - "KAFKA_CFG_LISTENERS".to_owned(), - format!("PLAINTEXT://:{port},CONTROLLER://:9093"), - ), - ("KAFKA_HEAP_OPTS".to_owned(), "-Xmx512M -Xms512M".to_owned()), - ], - ) - .await; -} - #[derive(Clone)] struct BenchTaskProducerKafka { message: Vec, diff --git a/shotover-proxy/benches/windsock/main.rs b/shotover-proxy/benches/windsock/main.rs index 7fe2549ef..87b55bbed 100644 --- a/shotover-proxy/benches/windsock/main.rs +++ b/shotover-proxy/benches/windsock/main.rs @@ -78,9 +78,16 @@ fn main() { Shotover::Standard, Shotover::ForcedMessageParsed ], - [Size::B1, Size::KB1, Size::KB100,] + [ + KafkaTopology::Single, + KafkaTopology::Cluster1, + KafkaTopology::Cluster3 + ], + [Size::B1, Size::KB1, Size::KB100] ) - .map(|(shotover, size)| Box::new(KafkaBench::new(shotover, size)) as Box); + .map(|(shotover, topology, size)| { + Box::new(KafkaBench::new(shotover, topology, size)) as Box + }); #[cfg(not(feature = "rdkafka-driver-tests"))] let kafka_benches = std::iter::empty();