Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka_int_tests: cleanup delete_groups test #1784

Merged
merged 2 commits into from
Oct 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
admin.delete_topics(&["to_delete"]).await
}

async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin.delete_groups(&["some_group"]).await;
}

/// Attempt to make the driver batch produce requests for different topics into the same request
/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests.
pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -1358,7 +1363,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
}

produce_consume_acks0(connection_builder).await;
connection_builder.admin_cleanup().await;
admin_cleanup(connection_builder).await;
}

pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
Expand Down
49 changes: 23 additions & 26 deletions test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use rdkafka::client::DefaultClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::{FutureProducer, FutureRecord, Producer};
use rdkafka::types::RDKafkaErrorCode;
use rdkafka::util::Timeout;
use rdkafka::{Message, TopicPartitionList};
use std::time::Duration;
Expand Down Expand Up @@ -105,31 +104,6 @@ impl KafkaConnectionBuilderCpp {
let admin = self.client.create().unwrap();
KafkaAdminCpp { admin }
}

// TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted
pub async fn admin_cleanup(&self) {
let admin = self.connect_admin().await.admin;
let results = admin
// The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist.
// So just make sure to run it against a group that does exist.
.delete_groups(
&["some_group"],
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
match result {
Ok(result) => assert_eq!(result, "some_group"),
Err(err) => assert_eq!(
err,
// Allow this error which can occur due to race condition in the test, but do not allow any other error types
("some_group".to_owned(), RDKafkaErrorCode::NonEmptyGroup)
),
}
}
}
}

pub struct KafkaProducerCpp {
Expand Down Expand Up @@ -442,6 +416,29 @@ impl KafkaAdminCpp {
}
assert!(results.is_empty());
}

pub async fn delete_groups(&self, to_delete: &[&str]) {
let results = self
.admin
// The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist.
// So just make sure to run it against a group that does exist.
.delete_groups(
&["some_group"],
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
let mut results: Vec<_> = results.into_iter().map(|x| x.unwrap()).collect();
for to_delete in to_delete {
if let Some(i) = results.iter().position(|x| x == to_delete) {
results.remove(i);
} else {
panic!("topic {} not in results", to_delete)
}
}
assert!(results.is_empty());
}
}

fn resource_specifier<'a>(specifier: &'a super::ResourceSpecifier<'a>) -> ResourceSpecifier<'a> {
Expand Down
10 changes: 10 additions & 0 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,16 @@ impl KafkaAdminJava {
.await;
}

pub async fn delete_groups(&self, to_delete: &[&str]) {
let to_delete: Vec<Value> = to_delete.iter().map(|x| self.jvm.new_string(x)).collect();
let topics = self.jvm.new_list("java.lang.String", to_delete);

self.admin
.call("deleteConsumerGroups", vec![topics])
.call_async("all", vec![])
.await;
}

pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
let partitions: Vec<(Value, Value)> = partitions
.iter()
Expand Down
16 changes: 8 additions & 8 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,6 @@ impl KafkaConnectionBuilder {
.await
.unwrap_err()
}

pub async fn admin_cleanup(&self) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(cpp) => cpp.admin_cleanup().await,
Self::Java(_) => {}
}
}
}

pub enum KafkaProducer {
Expand Down Expand Up @@ -442,6 +434,14 @@ impl KafkaAdmin {
}
}

pub async fn delete_groups(&self, to_delete: &[&str]) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(cpp) => cpp.delete_groups(to_delete).await,
Self::Java(java) => java.delete_groups(to_delete).await,
}
}

pub async fn list_offsets(
&self,
topic_partitions: HashMap<TopicPartition, OffsetSpec>,
Expand Down
Loading