Skip to content

Commit

Permalink
fix kafka windsock benches OOM
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 20, 2024
1 parent 0a15793 commit 3239d6b
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 67 deletions.
39 changes: 30 additions & 9 deletions shotover-proxy/benches/windsock/cloud/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,21 +193,43 @@ impl Ec2InstanceWithDocker {
}

pub async fn run_container(&self, image: &str, envs: &[(String, String)]) {
// cleanup old resources
// TODO: we need a way to ensure there are no shotover resources running.
// Maybe `.run_shotover` could start both shotover and docker so that we are free to kill shotover in this function
self.run_container_inner(image, envs, "killall -w shotover-bin")
.await;
}

// Shotover is always instantiated alongside a container to avoid race conditions when cleaning up any existing shotover.
pub async fn run_container_and_shotover(
self: Arc<Self>,
image: &str,
envs: &[(String, String)],
topology: &str,
) -> RunningShotover {
let self2 = self.clone();
tokio::join!(
self2.run_container_inner(image, envs, ""),
self.run_shotover_inner(topology),
)
.1
}

async fn run_container_inner(
&self,
image: &str,
envs: &[(String, String)],
extra_cleanup: &str,
) {
self.instance
.ssh()
.shell(
r#"
.shell(&format!(
r#"{extra_cleanup}
CONTAINERS=$(sudo docker ps -a -q)
if [ -n "$CONTAINERS" ]
then
sudo docker stop $CONTAINERS
sudo docker rm $CONTAINERS
fi
sudo docker system prune -af"#,
)
sudo docker system prune -af"#
))
.await;

// start container
Expand Down Expand Up @@ -267,8 +289,7 @@ sudo docker system prune -af"#,
}
}

#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))]
pub async fn run_shotover(self: Arc<Self>, topology: &str) -> RunningShotover {
pub async fn run_shotover_inner(self: Arc<Self>, topology: &str) -> RunningShotover {
self.instance
.ssh()
.push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml"))
Expand Down
136 changes: 79 additions & 57 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use shotover::transforms::debug::force_parse::DebugForceEncodeConfig;
use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, ShotoverNodeConfig};
use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig;
use shotover::transforms::TransformConfig;
use std::net::IpAddr;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use test_helpers::connection::kafka::cpp::rdkafka::admin::{
Expand Down Expand Up @@ -129,50 +130,14 @@ impl KafkaBench {
.join(",");
let mut tasks = vec![];
for (i, node) in nodes.into_iter().enumerate() {
let ip = node.instance.private_ip().to_string();
let ip = node.instance.private_ip();
let port = 9192;
let voters = voters.clone();

tasks.push(tokio::spawn(async move {
node.run_container(
"bitnami/kafka:3.6.1-debian-11-r24",
&[
("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()),
(
"KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(),
format!("BROKER://{ip}:{port}"),
),
(
"KAFKA_CFG_LISTENERS".to_owned(),
format!("BROKER://:{port},CONTROLLER://:9093"),
),
(
"KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
"CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT".to_owned(),
),
(
"KAFKA_CFG_INTER_BROKER_LISTENER_NAME".to_owned(),
"BROKER".to_owned(),
),
(
"KAFKA_CFG_CONTROLLER_LISTENER_NAMES".to_owned(),
"CONTROLLER".to_owned(),
),
(
"KAFKA_CFG_PROCESS_ROLES".to_owned(),
"controller,broker".to_owned(),
),
(
"KAFKA_HEAP_OPTS".to_owned(),
"-Xmx4096M -Xms4096M".to_owned(),
),
("KAFKA_CFG_NODE_ID".to_owned(), i.to_string()),
(
"KAFKA_KRAFT_CLUSTER_ID".to_owned(),
"abcdefghijklmnopqrstuv".to_owned(),
),
("KAFKA_CFG_CONTROLLER_QUORUM_VOTERS".to_owned(), voters),
],
&Self::kafka_config(i, ip, port, voters),
)
.await;
}));
Expand All @@ -182,6 +147,47 @@ impl KafkaBench {
}
}

fn kafka_config(id: usize, ip: IpAddr, port: i16, voters: String) -> Vec<(String, String)> {
let ip = ip.to_string();
vec![
("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()),
(
"KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(),
format!("BROKER://{ip}:{port}"),
),
(
"KAFKA_CFG_LISTENERS".to_owned(),
format!("BROKER://:{port},CONTROLLER://:9093"),
),
(
"KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
"CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT".to_owned(),
),
(
"KAFKA_CFG_INTER_BROKER_LISTENER_NAME".to_owned(),
"BROKER".to_owned(),
),
(
"KAFKA_CFG_CONTROLLER_LISTENER_NAMES".to_owned(),
"CONTROLLER".to_owned(),
),
(
"KAFKA_CFG_PROCESS_ROLES".to_owned(),
"controller,broker".to_owned(),
),
(
"KAFKA_HEAP_OPTS".to_owned(),
"-Xmx4096M -Xms4096M".to_owned(),
),
("KAFKA_CFG_NODE_ID".to_owned(), id.to_string()),
(
"KAFKA_KRAFT_CLUSTER_ID".to_owned(),
"abcdefghijklmnopqrstuv".to_owned(),
),
("KAFKA_CFG_CONTROLLER_QUORUM_VOTERS".to_owned(), voters),
]
}

async fn run_aws_shotover_on_own_instance(
&self,
shotover_instance: Option<Arc<Ec2InstanceWithShotover>>,
Expand All @@ -204,16 +210,32 @@ impl KafkaBench {

async fn run_aws_shotover_colocated_with_kafka(
&self,
instance: Arc<Ec2InstanceWithDocker>,
instances: Vec<Arc<Ec2InstanceWithDocker>>,
) -> Option<RunningShotover> {
let instance = instances[0].clone();
let id = 0;
let ip = instance.instance.private_ip();
let port = 9192;
//let voters = format!("{id}@{ip}:9093");
let voters = String::new();
let config = Self::kafka_config(id, ip, port, voters);
let image = "bitnami/kafka:3.6.1-debian-11-r24";

let ip = instance.instance.private_ip().to_string();
match self.shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology =
self.generate_topology_yaml(format!("{ip}:9092"), format!("{ip}:9192"));
Some(instance.run_shotover(&topology).await)
Some(
instance
.run_container_and_shotover(image, &config, &topology)
.await,
)
}
Shotover::None => {
instance.run_container(image, &config).await;
None
}
Shotover::None => None,
}
}

Expand Down Expand Up @@ -332,22 +354,22 @@ impl Bench for KafkaBench {
)
.await;

let (_, running_shotover) =
futures::join!(self.run_aws_kafka(kafka_instances.clone()), async {
match self.topology {
KafkaTopology::Single => {
self.run_aws_shotover_colocated_with_kafka(kafka_instances[0].clone())
.await
}
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => {
self.run_aws_shotover_on_own_instance(
shotover_instance,
kafka_instances[0].clone(),
)
.await
}
}
});
let (_, running_shotover) = match self.topology {
KafkaTopology::Single => (
(),
self.run_aws_shotover_colocated_with_kafka(kafka_instances)
.await,
),
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => {
futures::join!(
self.run_aws_kafka(kafka_instances.clone()),
self.run_aws_shotover_on_own_instance(
shotover_instance,
kafka_instances[0].clone(),
)
)
}
};

let destination_address =
if let Shotover::Standard | Shotover::ForcedMessageParsed = self.shotover {
Expand Down
3 changes: 2 additions & 1 deletion shotover-proxy/benches/windsock/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
any(
not(feature = "cassandra"),
not(feature = "redis"),
not(all(feature = "rdkafka-driver-tests", feature = "kafka"))
not(all(feature = "rdkafka-driver-tests", feature = "kafka")),
not(feature = "opensearch"),
),
allow(dead_code, unused_imports, unused_variables, unused_mut)
)]
Expand Down

0 comments on commit 3239d6b

Please sign in to comment.