Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka_int_tests: abstract admin API #1518

Merged
merged 1 commit into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,42 @@
use test_helpers::connection::kafka::{ExpectedResponse, KafkaConnectionBuilder, Record};
use test_helpers::connection::kafka::{
ExpectedResponse, KafkaConnectionBuilder, NewPartition, NewTopic, Record,
};

async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[
NewTopic {
name: "partitions1",
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "paritions3",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "acks0",
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "to_delete",
num_partitions: 1,
replication_factor: 1,
},
])
.await;

admin
.create_partitions(&[NewPartition {
// TODO: modify topic "foo" instead so that we can test our handling of that with interesting partition + replication count
topic_name: "to_delete",
new_partition_count: 2,
}])
.await;
}

async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name: &str, i: i64) {
let producer = connection_builder.connect_producer(1).await;
Expand Down Expand Up @@ -76,6 +114,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
}

pub async fn basic(connection_builder: KafkaConnectionBuilder) {
admin_setup(&connection_builder).await;
connection_builder.admin_setup().await;
for i in 0..2 {
produce_consume(&connection_builder, "partitions1", i).await;
Expand Down
115 changes: 59 additions & 56 deletions test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Allow direct usage of the APIs when the feature is enabled
pub use rdkafka;

use super::{ExpectedResponse, Record};
use super::{ExpectedResponse, NewPartition, Record};
use rdkafka::admin::AdminClient;
use rdkafka::admin::{
AdminOptions, AlterConfig, NewPartitions, NewTopic, OwnedResourceSpecifier, ResourceSpecifier,
Expand Down Expand Up @@ -64,63 +64,14 @@ impl KafkaConnectionBuilderCpp {
KafkaConsumerCpp { consumer }
}

pub async fn connect_admin(&self) -> AdminClient<DefaultClientContext> {
self.client.create().unwrap()
pub async fn connect_admin(&self) -> KafkaAdminCpp {
let admin = self.client.create().unwrap();
KafkaAdminCpp { admin }
}

// TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted
pub async fn admin_setup(&self) {
let admin = self.connect_admin().await;
admin
.create_topics(
&[
NewTopic {
name: "partitions1",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "paritions3",
num_partitions: 3,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "acks0",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "to_delete",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
],
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();

let results = admin
.create_partitions(
&[NewPartitions {
// TODO: modify topic "foo" instead so that we can test our handling of that with interesting partiton + replication count
topic_name: "to_delete",
new_partition_count: 2,
assignment: None,
}],
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
let result = result.unwrap();
assert_eq!(result, "to_delete")
}
let admin = self.connect_admin().await.admin;

let results = admin
.describe_configs(
Expand Down Expand Up @@ -171,8 +122,9 @@ impl KafkaConnectionBuilderCpp {
}
}

// TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted
pub async fn admin_cleanup(&self) {
let admin = self.connect_admin().await;
let admin = self.connect_admin().await.admin;
let results = admin
// The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist.
// So just make sure to run it against a group that does exist.
Expand Down Expand Up @@ -248,3 +200,54 @@ impl KafkaConsumerCpp {
assert_eq!(response.offset, message.offset());
}
}

pub struct KafkaAdminCpp {
// TODO: make private
pub admin: AdminClient<DefaultClientContext>,
}

impl KafkaAdminCpp {
pub async fn create_topics(&self, topics: &[super::NewTopic<'_>]) {
let topics: Vec<_> = topics
.iter()
.map(|topic| NewTopic {
name: topic.name,
num_partitions: topic.num_partitions,
replication: TopicReplication::Fixed(topic.replication_factor as i32),
config: vec![],
})
.collect();
self.admin
.create_topics(
&topics,
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
}

pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
let partitions: Vec<_> = partitions
.iter()
.map(|partition| NewPartitions {
topic_name: partition.topic_name,
new_partition_count: partition.new_partition_count as usize,
assignment: None,
})
.collect();
let results = self
.admin
.create_partitions(
&partitions,
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
let result = result.unwrap();
assert_eq!(result, "to_delete")
}
}
}
Loading
Loading