Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix kafka windsock benches OOM #1539

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions shotover-proxy/benches/windsock/cloud/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,21 +193,43 @@ impl Ec2InstanceWithDocker {
}

pub async fn run_container(&self, image: &str, envs: &[(String, String)]) {
// cleanup old resources
// TODO: we need a way to ensure there are no shotover resources running.
// Maybe `.run_shotover` could start both shotover and docker so that we are free to kill shotover in this function
self.run_container_inner(image, envs, "killall -w shotover-bin")
.await;
}

// Shotover is always instantiated alongside a container to avoid race conditions when cleaning up any existing shotover.
pub async fn run_container_and_shotover(
self: Arc<Self>,
image: &str,
envs: &[(String, String)],
topology: &str,
) -> RunningShotover {
let self2 = self.clone();
tokio::join!(
self2.run_container_inner(image, envs, ""),
self.run_shotover_inner(topology),
)
.1
}

async fn run_container_inner(
&self,
image: &str,
envs: &[(String, String)],
extra_cleanup: &str,
) {
self.instance
.ssh()
.shell(
r#"
.shell(&format!(
r#"{extra_cleanup}
CONTAINERS=$(sudo docker ps -a -q)
if [ -n "$CONTAINERS" ]
then
sudo docker stop $CONTAINERS
sudo docker rm $CONTAINERS
fi
sudo docker system prune -af"#,
)
sudo docker system prune -af"#
))
.await;

// start container
Expand Down Expand Up @@ -267,8 +289,7 @@ sudo docker system prune -af"#,
}
}

#[cfg(all(feature = "rdkafka-driver-tests", feature = "kafka"))]
pub async fn run_shotover(self: Arc<Self>, topology: &str) -> RunningShotover {
pub async fn run_shotover_inner(self: Arc<Self>, topology: &str) -> RunningShotover {
self.instance
.ssh()
.push_file_from_bytes(topology.as_bytes(), Path::new("topology.yaml"))
Expand Down
131 changes: 75 additions & 56 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,43 +136,7 @@ impl KafkaBench {
tasks.push(tokio::spawn(async move {
node.run_container(
"bitnami/kafka:3.6.1-debian-11-r24",
&[
("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()),
(
"KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(),
format!("BROKER://{ip}:{port}"),
),
(
"KAFKA_CFG_LISTENERS".to_owned(),
format!("BROKER://:{port},CONTROLLER://:9093"),
),
(
"KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
"CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT".to_owned(),
),
(
"KAFKA_CFG_INTER_BROKER_LISTENER_NAME".to_owned(),
"BROKER".to_owned(),
),
(
"KAFKA_CFG_CONTROLLER_LISTENER_NAMES".to_owned(),
"CONTROLLER".to_owned(),
),
(
"KAFKA_CFG_PROCESS_ROLES".to_owned(),
"controller,broker".to_owned(),
),
(
"KAFKA_HEAP_OPTS".to_owned(),
"-Xmx4096M -Xms4096M".to_owned(),
),
("KAFKA_CFG_NODE_ID".to_owned(), i.to_string()),
(
"KAFKA_KRAFT_CLUSTER_ID".to_owned(),
"abcdefghijklmnopqrstuv".to_owned(),
),
("KAFKA_CFG_CONTROLLER_QUORUM_VOTERS".to_owned(), voters),
],
&Self::kafka_config(i, ip, port, voters),
)
.await;
}));
Expand All @@ -182,6 +146,47 @@ impl KafkaBench {
}
}

fn kafka_config(id: usize, ip: String, port: i16, voters: String) -> Vec<(String, String)> {
let ip = ip.to_string();
vec![
("ALLOW_PLAINTEXT_LISTENER".to_owned(), "yes".to_owned()),
(
"KAFKA_CFG_ADVERTISED_LISTENERS".to_owned(),
format!("BROKER://{ip}:{port}"),
),
(
"KAFKA_CFG_LISTENERS".to_owned(),
format!("BROKER://:{port},CONTROLLER://:9093"),
),
(
"KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
"CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT".to_owned(),
),
(
"KAFKA_CFG_INTER_BROKER_LISTENER_NAME".to_owned(),
"BROKER".to_owned(),
),
(
"KAFKA_CFG_CONTROLLER_LISTENER_NAMES".to_owned(),
"CONTROLLER".to_owned(),
),
(
"KAFKA_CFG_PROCESS_ROLES".to_owned(),
"controller,broker".to_owned(),
),
(
"KAFKA_HEAP_OPTS".to_owned(),
"-Xmx4096M -Xms4096M".to_owned(),
),
("KAFKA_CFG_NODE_ID".to_owned(), id.to_string()),
(
"KAFKA_KRAFT_CLUSTER_ID".to_owned(),
"abcdefghijklmnopqrstuv".to_owned(),
),
("KAFKA_CFG_CONTROLLER_QUORUM_VOTERS".to_owned(), voters),
]
}

async fn run_aws_shotover_on_own_instance(
&self,
shotover_instance: Option<Arc<Ec2InstanceWithShotover>>,
Expand All @@ -204,16 +209,30 @@ impl KafkaBench {

async fn run_aws_shotover_colocated_with_kafka(
&self,
instance: Arc<Ec2InstanceWithDocker>,
instances: Vec<Arc<Ec2InstanceWithDocker>>,
) -> Option<RunningShotover> {
let instance = instances[0].clone();
let id = 0;
let ip = instance.instance.private_ip().to_string();
let port = 9192;
let voters = format!("{id}@{ip}:9093");
let config = Self::kafka_config(id, ip.clone(), port, voters);
let image = "bitnami/kafka:3.6.1-debian-11-r24";

match self.shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology =
self.generate_topology_yaml(format!("{ip}:9092"), format!("{ip}:9192"));
Some(instance.run_shotover(&topology).await)
Some(
instance
.run_container_and_shotover(image, &config, &topology)
.await,
)
}
Shotover::None => {
instance.run_container(image, &config).await;
None
}
Shotover::None => None,
}
}

Expand Down Expand Up @@ -332,22 +351,22 @@ impl Bench for KafkaBench {
)
.await;

let (_, running_shotover) =
futures::join!(self.run_aws_kafka(kafka_instances.clone()), async {
match self.topology {
KafkaTopology::Single => {
self.run_aws_shotover_colocated_with_kafka(kafka_instances[0].clone())
.await
}
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => {
self.run_aws_shotover_on_own_instance(
shotover_instance,
kafka_instances[0].clone(),
)
.await
}
}
});
let (_, running_shotover) = match self.topology {
KafkaTopology::Single => (
(),
self.run_aws_shotover_colocated_with_kafka(kafka_instances)
.await,
),
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => {
futures::join!(
self.run_aws_kafka(kafka_instances.clone()),
self.run_aws_shotover_on_own_instance(
shotover_instance,
kafka_instances[0].clone(),
)
)
}
};

let destination_address =
if let Shotover::Standard | Shotover::ForcedMessageParsed = self.shotover {
Expand Down
3 changes: 2 additions & 1 deletion shotover-proxy/benches/windsock/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
any(
not(feature = "cassandra"),
not(feature = "redis"),
not(all(feature = "rdkafka-driver-tests", feature = "kafka"))
not(all(feature = "rdkafka-driver-tests", feature = "kafka")),
not(feature = "opensearch"),
),
allow(dead_code, unused_imports, unused_variables, unused_mut)
)]
Expand Down