From 03d1477f8d5257315ac7759d6272c534aed14007 Mon Sep 17 00:00:00 2001 From: Lucas Kent <rubickent@gmail.com> Date: Wed, 30 Oct 2024 07:10:31 +1100 Subject: [PATCH] fix tests with down shotover node --- shotover-proxy/tests/kafka_int_tests/mod.rs | 6 ++-- .../tests/kafka_int_tests/test_cases.rs | 36 +++++++++++++++---- test-helpers/src/connection/kafka/mod.rs | 2 +- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index e046aad48..3059529c7 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -426,7 +426,7 @@ async fn cluster_1_rack_multi_shotover_with_1_shotover_down(#[case] driver: Kafk // create a new connection and produce and consume messages let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193"); - test_cases::cluster_test_suite(&new_connection_builder).await; + test_cases::cluster_test_suite_with_lost_shotover_node(&new_connection_builder).await; let mut expected_events = multi_shotover_events(); // Other shotover nodes should detect the killed node at least once @@ -485,7 +485,7 @@ async fn cluster_3_racks_multi_shotover_with_2_shotover_down(#[case] driver: Kaf // create a new connection and produce and consume messages let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193"); - test_cases::cluster_test_suite(&new_connection_builder).await; + test_cases::cluster_test_suite_with_lost_shotover_node(&new_connection_builder).await; let mut expected_events = multi_shotover_events(); // The UP shotover node should detect the killed nodes at least once @@ -537,7 +537,7 @@ async fn cluster_3_racks_multi_shotover_with_1_shotover_missing(#[case] driver: // Send some produce and consume requests let connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9192"); - test_cases::cluster_test_suite(&connection_builder).await; + test_cases::cluster_test_suite_with_lost_shotover_node(&connection_builder).await; let mut expected_events = multi_shotover_events(); // Other shotover nodes should detect the missing node at least once diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index e96746b5a..4779e405a 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1326,7 +1326,7 @@ async fn test_produce_consume_10_times(producer: &mut KafkaProducer, consumer: & } } -pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { +async fn standard_test_suite_base(connection_builder: &KafkaConnectionBuilder) { admin_setup(connection_builder).await; produce_consume_partitions1(connection_builder, "partitions1").await; produce_consume_partitions1(connection_builder, "unknown_topic").await; @@ -1363,9 +1363,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { .await; produce_consume_partitions1(connection_builder, "partitions1").await; - // rdkafka-rs doesnt support these methods list_offsets(&admin).await; - list_groups(connection_builder, &admin).await; } produce_consume_acks0(connection_builder).await; @@ -1439,7 +1437,8 @@ async fn list_offsets(admin: &KafkaAdmin) { assert_eq!(results, expected); } -async fn list_groups(connection_builder: &KafkaConnectionBuilder, admin: &KafkaAdmin) { +async fn list_groups(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; let mut consumer = connection_builder .connect_consumer( ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()]) @@ -1461,8 +1460,7 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder, admin: &KafkaA } } -pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { - standard_test_suite(connection_builder).await; +async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; admin .create_topics_and_wait(&[ @@ -1482,6 +1480,32 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { produce_consume_partitions3(connection_builder, "partitions3_rf3", 1, 500).await; } +pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnectionBuilder) { + // rdkafka-rs doesnt support these methods + #[allow(irrefutable_let_patterns)] + if let KafkaConnectionBuilder::Java(_) = connection_builder { + list_groups(connection_builder).await; + } +} + +pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { + standard_test_suite_base(connection_builder).await; + tests_requiring_all_shotover_nodes(connection_builder).await; +} + +pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { + standard_test_suite_base(connection_builder).await; + cluster_test_suite_base(connection_builder).await; + tests_requiring_all_shotover_nodes(connection_builder).await; +} + +pub async fn cluster_test_suite_with_lost_shotover_node( + connection_builder: &KafkaConnectionBuilder, +) { + standard_test_suite_base(connection_builder).await; + cluster_test_suite_base(connection_builder).await; +} + pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) { let admin = connection.connect_admin().await; admin diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 2b10cacb7..839e67ec0 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -456,7 +456,7 @@ impl KafkaAdmin { pub async fn list_groups(&self) -> Vec<String> { match self { #[cfg(feature = "kafka-cpp-driver-tests")] - Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_offsets"), + Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_groups"), Self::Java(java) => java.list_groups().await, } }