Skip to content

Commit

Permalink
Fix kafka benches
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 21, 2023
1 parent 312f287 commit 644ea72
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 37 deletions.
57 changes: 36 additions & 21 deletions shotover-proxy/benches/windsock/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ curl -sSL https://get.docker.com/ | sudo sh"#,
)
.await;

let local_shotover_path = bin_path!("shotover-proxy");
instance
.ssh()
.push_file(local_shotover_path, Path::new("shotover-bin"))
.await;
instance
.ssh()
.push_file(Path::new("config/config.yaml"), Path::new("config.yaml"))
.await;

let instance = Arc::new(Ec2InstanceWithDocker { instance });
(*self.docker_instances.write().await).push(instance.clone());
instance
Expand Down Expand Up @@ -162,6 +172,7 @@ sudo apt-get install -y sysstat"#,
}
}

/// Despite the name can also run shotover
pub struct Ec2InstanceWithDocker {
pub instance: Ec2Instance,
}
Expand Down Expand Up @@ -239,6 +250,15 @@ sudo docker system prune -af"#,
}
}
}

#[cfg(feature = "rdkafka-driver-tests")]
pub async fn run_shotover(self: Arc<Self>, topology: &str) -> RunningShotover {
self.instance
.ssh()
.push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml"))
.await;
RunningShotover::new(&self.instance).await
}
}

fn get_compatible_instance_type() -> InstanceType {
Expand Down Expand Up @@ -278,17 +298,26 @@ impl Ec2InstanceWithShotover {
.ssh()
.push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml"))
.await;
RunningShotover::new(&self.instance).await
}
}

pub struct RunningShotover {
shutdown_tx: tokio::sync::mpsc::UnboundedSender<()>,
event_rx: tokio::sync::mpsc::UnboundedReceiver<Event>,
}

impl RunningShotover {
async fn new(instance: &Ec2Instance) -> Self {
let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel();
let (event_tx, mut event_rx) = tokio::sync::mpsc::unbounded_channel();
let mut receiver = instance
.ssh()
.shell_stdout_lines(r#"
killall -w shotover-bin > /dev/null || true
RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topology.yaml --log-format json"#)
.await;
tokio::task::spawn(async move {
let mut receiver = self
.instance
.ssh()
.shell_stdout_lines(r#"
killall -w shotover-bin > /dev/null || true
RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topology.yaml --log-format json"#)
.await;
loop {
tokio::select! {
line = receiver.recv() => {
Expand All @@ -310,17 +339,11 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo
}
},
_ = shutdown_rx.recv() => {
// shutdown_tx is dropped, instructing us to shutdown
// we MUST drop self before dropping event_tx to ensure that the Arc<Ec2InstanceWithShotover> clone is dropped before the task indicates that it has terminated.
// Otherwise we may hit a race condition and fail the assertion that there is only one Arc<Ec2InstanceWithShotover> clone alive.
std::mem::drop(self);
std::mem::drop(event_tx);
return;
},
}
}
});

// wait for shotover to startup
loop {
let event = event_rx
Expand All @@ -334,20 +357,12 @@ RUST_BACKTRACE=1 ./shotover-bin --config-file config.yaml --topology-file topolo
break;
}
}

RunningShotover {
shutdown_tx,
event_rx,
}
}
}

pub struct RunningShotover {
shutdown_tx: tokio::sync::mpsc::UnboundedSender<()>,
event_rx: tokio::sync::mpsc::UnboundedReceiver<Event>,
}

impl RunningShotover {
pub async fn shutdown(mut self) {
// dropping shutdown_tx instructs the task to shutdown causing shotover to be terminated
std::mem::drop(self.shutdown_tx);
Expand Down
41 changes: 25 additions & 16 deletions shotover-proxy/benches/windsock/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover};
use crate::aws::Ec2InstanceWithDocker;
use crate::common::{self, Shotover};
use crate::profilers::{self, CloudProfilerRunner, ProfilerRunner};
use crate::shotover::shotover_process_custom_topology;
Expand Down Expand Up @@ -69,14 +69,14 @@ impl KafkaBench {

async fn run_aws_shotover(
&self,
instance: Arc<Ec2InstanceWithShotover>,
instance: Arc<Ec2InstanceWithDocker>,
kafka_ip: String,
) -> Option<crate::aws::RunningShotover> {
let ip = instance.instance.private_ip().to_string();
match self.shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology =
self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9092"));
self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9192"));
Some(instance.run_shotover(&topology).await)
}
Shotover::None => None,
Expand Down Expand Up @@ -123,33 +123,38 @@ impl Bench for KafkaBench {
aws.create_shotover_instance()
);

let mut profiler_instances: HashMap<String, &Ec2Instance> = [
let profiler_instances: HashMap<String, &Ec2Instance> = [
("bencher".to_owned(), &bench_instance.instance),
("kafka".to_owned(), &kafka_instance.instance),
]
.into();
if let Shotover::ForcedMessageParsed | Shotover::Standard = self.shotover {
profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance);
}

// TODO: enable when testing KafkaSinkCluster
//profiler_instances.insert("shotover".to_owned(), &shotover_instance.instance);

let mut profiler =
CloudProfilerRunner::new(self.name(), profiling, profiler_instances).await;

let kafka_ip = kafka_instance.instance.private_ip().to_string();
let shotover_ip = shotover_instance.instance.private_ip().to_string();
// TODO: make use of this when we start benching KafkaSinkCluster
let _shotover_ip = shotover_instance.instance.private_ip().to_string();

let (_, running_shotover) = futures::join!(
run_aws_kafka(kafka_instance),
self.run_aws_shotover(shotover_instance, kafka_ip.clone())
run_aws_kafka(kafka_instance.clone(), 9192),
self.run_aws_shotover(kafka_instance, kafka_ip.clone())
);

let destination_ip = if running_shotover.is_some() {
shotover_ip
let destination_address = if running_shotover.is_some() {
format!("{kafka_ip}:9092")
} else {
kafka_ip
format!("{kafka_ip}:9192")
};

bench_instance
.run_bencher(&self.run_args(&destination_ip, &parameters), &self.name())
.run_bencher(
&self.run_args(&destination_address, &parameters),
&self.name(),
)
.await;

profiler.finish();
Expand Down Expand Up @@ -263,7 +268,7 @@ impl Bench for KafkaBench {
}
}

async fn run_aws_kafka(instance: Arc<Ec2InstanceWithDocker>) {
async fn run_aws_kafka(instance: Arc<Ec2InstanceWithDocker>, port: i16) {
let ip = instance.instance.private_ip().to_string();
instance
.run_container(
Expand All @@ -272,7 +277,11 @@ async fn run_aws_kafka(instance: Arc<Ec2InstanceWithDocker>) {
("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()),
(
"KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(),
format!("PLAINTEXT://{ip}:9092"),
format!("PLAINTEXT://{ip}:{port}"),
),
(
"KAFKA_CFG_LISTENERS".to_owned(),
format!("PLAINTEXT://:{port},CONTROLLER://:9093"),
),
("KAFKA_HEAP_OPTS".to_owned(), "-Xmx512M -Xms512M".to_owned()),
],
Expand Down

0 comments on commit 644ea72

Please sign in to comment.