Skip to content

Commit

Permalink
update test case
Browse files Browse the repository at this point in the history
  • Loading branch information
justinweng-instaclustr committed Oct 3, 2024
1 parent d8a08ee commit 5fd897c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 105 deletions.
20 changes: 11 additions & 9 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,8 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover_offline(#[case] driver: KafkaDriver) {
let docker_compose =
async fn cluster_2_racks_multi_shotover_one_shotover_node_goes_down(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");

// One shotover instance per rack
Expand All @@ -425,15 +425,17 @@ async fn cluster_2_racks_multi_shotover_offline(#[case] driver: KafkaDriver) {
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

test_cases::produce_consume_partitions1_shotover_node_goes_down(
driver,
&docker_compose,
&connection_builder,
"shotover_node_goes_down_test",
"shotover1",
tokio::time::timeout(
Duration::from_secs(10),
shotovers.remove(0).shutdown_and_then_consume_events(&[]),
)
.await;
.await
.expect("Shotover did not shutdown within 10s");

// Wait for the other shotover node to detect the down peer
tokio::time::sleep(Duration::from_secs(10)).await;

for shotover in shotovers {
tokio::time::timeout(
Expand Down
96 changes: 0 additions & 96 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,102 +539,6 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
}
}

pub async fn produce_consume_partitions1_shotover_node_goes_down(
driver: KafkaDriver,
docker_compose: &DockerCompose,
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
shotover_node_to_kill: &str,
) {
if driver.is_cpp() {
return;
}
{
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[NewTopic {
name: topic_name,
num_partitions: 1,
replication_factor: 3,
}])
.await;
}

{
let producer = connection_builder.connect_producer("all", 0).await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Record {
payload: "initial",
topic_name,
key: Some("Key".into()),
},
Some(0),
)
.await;

let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("shotover_node_goes_down_test_group"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(0),
})
.await;

docker_compose.kill_service(shotover_node_to_kill);

// create and consume records
for i in 0..5 {
producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key".into()),
},
Some(i * 2 + 1),
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: None,
},
Some(i * 2 + 2),
)
.await;

consumer
.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 1),
})
.await;

consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 2),
})
.await;
}
}
}

pub async fn produce_consume_commit_offsets_partitions1(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
Expand Down

0 comments on commit 5fd897c

Please sign in to comment.