Skip to content

Commit

Permalink
fix tests with down shotover node
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 30, 2024
1 parent 8dd584d commit 03d1477
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 10 deletions.
6 changes: 3 additions & 3 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ async fn cluster_1_rack_multi_shotover_with_1_shotover_down(#[case] driver: Kafk

// create a new connection and produce and consume messages
let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193");
test_cases::cluster_test_suite(&new_connection_builder).await;
test_cases::cluster_test_suite_with_lost_shotover_node(&new_connection_builder).await;

let mut expected_events = multi_shotover_events();
// Other shotover nodes should detect the killed node at least once
Expand Down Expand Up @@ -485,7 +485,7 @@ async fn cluster_3_racks_multi_shotover_with_2_shotover_down(#[case] driver: Kaf

// create a new connection and produce and consume messages
let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193");
test_cases::cluster_test_suite(&new_connection_builder).await;
test_cases::cluster_test_suite_with_lost_shotover_node(&new_connection_builder).await;

let mut expected_events = multi_shotover_events();
// The UP shotover node should detect the killed nodes at least once
Expand Down Expand Up @@ -537,7 +537,7 @@ async fn cluster_3_racks_multi_shotover_with_1_shotover_missing(#[case] driver:

// Send some produce and consume requests
let connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9192");
test_cases::cluster_test_suite(&connection_builder).await;
test_cases::cluster_test_suite_with_lost_shotover_node(&connection_builder).await;

let mut expected_events = multi_shotover_events();
// Other shotover nodes should detect the missing node at least once
Expand Down
36 changes: 30 additions & 6 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ async fn test_produce_consume_10_times(producer: &mut KafkaProducer, consumer: &
}
}

pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
async fn standard_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
admin_setup(connection_builder).await;
produce_consume_partitions1(connection_builder, "partitions1").await;
produce_consume_partitions1(connection_builder, "unknown_topic").await;
Expand Down Expand Up @@ -1363,9 +1363,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
.await;
produce_consume_partitions1(connection_builder, "partitions1").await;

// rdkafka-rs doesnt support these methods
list_offsets(&admin).await;
list_groups(connection_builder, &admin).await;
}

produce_consume_acks0(connection_builder).await;
Expand Down Expand Up @@ -1439,7 +1437,8 @@ async fn list_offsets(admin: &KafkaAdmin) {
assert_eq!(results, expected);
}

async fn list_groups(connection_builder: &KafkaConnectionBuilder, admin: &KafkaAdmin) {
async fn list_groups(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()])
Expand All @@ -1461,8 +1460,7 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder, admin: &KafkaA
}
}

pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
standard_test_suite(connection_builder).await;
async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics_and_wait(&[
Expand All @@ -1482,6 +1480,32 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_partitions3(connection_builder, "partitions3_rf3", 1, 500).await;
}

pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnectionBuilder) {
// rdkafka-rs doesnt support these methods
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
list_groups(connection_builder).await;
}
}

pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
standard_test_suite_base(connection_builder).await;
tests_requiring_all_shotover_nodes(connection_builder).await;
}

pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
standard_test_suite_base(connection_builder).await;
cluster_test_suite_base(connection_builder).await;
tests_requiring_all_shotover_nodes(connection_builder).await;
}

pub async fn cluster_test_suite_with_lost_shotover_node(
connection_builder: &KafkaConnectionBuilder,
) {
standard_test_suite_base(connection_builder).await;
cluster_test_suite_base(connection_builder).await;
}

pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) {
let admin = connection.connect_admin().await;
admin
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ impl KafkaAdmin {
pub async fn list_groups(&self) -> Vec<String> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_offsets"),
Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_groups"),
Self::Java(java) => java.list_groups().await,
}
}
Expand Down

0 comments on commit 03d1477

Please sign in to comment.