From 944540ef23404ed8b28f0131c5778d730b91e3b5 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 25 Oct 2024 15:11:36 +1100 Subject: [PATCH] kafka_int_tests: cleanup delete_groups test --- .../tests/kafka_int_tests/test_cases.rs | 7 ++- test-helpers/src/connection/kafka/cpp.rs | 49 +++++++++---------- test-helpers/src/connection/kafka/java.rs | 10 ++++ test-helpers/src/connection/kafka/mod.rs | 16 +++--- 4 files changed, 47 insertions(+), 35 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index cc38f343b..d0d9afb53 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -124,6 +124,11 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { admin.delete_topics(&["to_delete"]).await } +async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; + admin.delete_groups(&["some_group"]).await; +} + /// Attempt to make the driver batch produce requests for different topics into the same request /// This is important to test since shotover has complex logic for splitting these batch requests into individual requests. pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnectionBuilder) { @@ -1358,7 +1363,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { } produce_consume_acks0(connection_builder).await; - connection_builder.admin_cleanup().await; + admin_cleanup(connection_builder).await; } pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index 8a980e6d0..eb33ef63d 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -17,7 +17,6 @@ use rdkafka::client::DefaultClientContext; use rdkafka::config::ClientConfig; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::producer::{FutureProducer, FutureRecord, Producer}; -use rdkafka::types::RDKafkaErrorCode; use rdkafka::util::Timeout; use rdkafka::{Message, TopicPartitionList}; use std::time::Duration; @@ -105,31 +104,6 @@ impl KafkaConnectionBuilderCpp { let admin = self.client.create().unwrap(); KafkaAdminCpp { admin } } - - // TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted - pub async fn admin_cleanup(&self) { - let admin = self.connect_admin().await.admin; - let results = admin - // The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist. - // So just make sure to run it against a group that does exist. - .delete_groups( - &["some_group"], - &AdminOptions::new() - .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - for result in results { - match result { - Ok(result) => assert_eq!(result, "some_group"), - Err(err) => assert_eq!( - err, - // Allow this error which can occur due to race condition in the test, but do not allow any other error types - ("some_group".to_owned(), RDKafkaErrorCode::NonEmptyGroup) - ), - } - } - } } pub struct KafkaProducerCpp { @@ -442,6 +416,29 @@ impl KafkaAdminCpp { } assert!(results.is_empty()); } + + pub async fn delete_groups(&self, to_delete: &[&str]) { + let results = self + .admin + // The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist. + // So just make sure to run it against a group that does exist. + .delete_groups( + &["some_group"], + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + let mut results: Vec<_> = results.into_iter().map(|x| x.unwrap()).collect(); + for to_delete in to_delete { + if let Some(i) = results.iter().position(|x| x == to_delete) { + results.remove(i); + } else { + panic!("topic {} not in results", to_delete) + } + } + assert!(results.is_empty()); + } } fn resource_specifier<'a>(specifier: &'a super::ResourceSpecifier<'a>) -> ResourceSpecifier<'a> { diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index fcf76f449..00baf2b57 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -508,6 +508,16 @@ impl KafkaAdminJava { .await; } + pub async fn delete_groups(&self, to_delete: &[&str]) { + let to_delete: Vec = to_delete.iter().map(|x| self.jvm.new_string(x)).collect(); + let topics = self.jvm.new_list("java.lang.String", to_delete); + + self.admin + .call("deleteConsumerGroups", vec![topics]) + .call_async("all", vec![]) + .await; + } + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { let partitions: Vec<(Value, Value)> = partitions .iter() diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 8d6b3ae70..f738ea2af 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -123,14 +123,6 @@ impl KafkaConnectionBuilder { .await .unwrap_err() } - - pub async fn admin_cleanup(&self) { - match self { - #[cfg(feature = "kafka-cpp-driver-tests")] - Self::Cpp(cpp) => cpp.admin_cleanup().await, - Self::Java(_) => {} - } - } } pub enum KafkaProducer { @@ -442,6 +434,14 @@ impl KafkaAdmin { } } + pub async fn delete_groups(&self, to_delete: &[&str]) { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(cpp) => cpp.delete_groups(to_delete).await, + Self::Java(java) => java.delete_groups(to_delete).await, + } + } + pub async fn list_offsets( &self, topic_partitions: HashMap,