Skip to content

Commit

Permalink
kafka_int_tests: abstract admin API
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 6, 2024
1 parent 35350ef commit 0f9c15e
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 205 deletions.
41 changes: 40 additions & 1 deletion shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,42 @@
use test_helpers::connection::kafka::{ExpectedResponse, KafkaConnectionBuilder, Record};
use test_helpers::connection::kafka::{
ExpectedResponse, KafkaConnectionBuilder, NewPartition, NewTopic, Record,
};

async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[
NewTopic {
name: "partitions1",
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "paritions3",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "acks0",
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "to_delete",
num_partitions: 1,
replication_factor: 1,
},
])
.await;

admin
.create_partitions(&[NewPartition {
// TODO: modify topic "foo" instead so that we can test our handling of that with interesting partition + replication count
topic_name: "to_delete",
new_partition_count: 2,
}])
.await;
}

async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name: &str, i: i64) {
let producer = connection_builder.connect_producer(1).await;
Expand Down Expand Up @@ -76,6 +114,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
}

pub async fn basic(connection_builder: KafkaConnectionBuilder) {
admin_setup(&connection_builder).await;
connection_builder.admin_setup().await;
for i in 0..2 {
produce_consume(&connection_builder, "partitions1", i).await;
Expand Down
115 changes: 59 additions & 56 deletions test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Allow direct usage of the APIs when the feature is enabled
pub use rdkafka;

use super::{ExpectedResponse, Record};
use super::{ExpectedResponse, NewPartition, Record};
use rdkafka::admin::AdminClient;
use rdkafka::admin::{
AdminOptions, AlterConfig, NewPartitions, NewTopic, OwnedResourceSpecifier, ResourceSpecifier,
Expand Down Expand Up @@ -64,63 +64,14 @@ impl KafkaConnectionBuilderCpp {
KafkaConsumerCpp { consumer }
}

pub async fn connect_admin(&self) -> AdminClient<DefaultClientContext> {
self.client.create().unwrap()
pub async fn connect_admin(&self) -> KafkaAdminCpp {
let admin = self.client.create().unwrap();
KafkaAdminCpp { admin }
}

// TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted
pub async fn admin_setup(&self) {
let admin = self.connect_admin().await;
admin
.create_topics(
&[
NewTopic {
name: "partitions1",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "paritions3",
num_partitions: 3,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "acks0",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "to_delete",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
],
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();

let results = admin
.create_partitions(
&[NewPartitions {
// TODO: modify topic "foo" instead so that we can test our handling of that with interesting partiton + replication count
topic_name: "to_delete",
new_partition_count: 2,
assignment: None,
}],
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
let result = result.unwrap();
assert_eq!(result, "to_delete")
}
let admin = self.connect_admin().await.admin;

let results = admin
.describe_configs(
Expand Down Expand Up @@ -171,8 +122,9 @@ impl KafkaConnectionBuilderCpp {
}
}

// TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted
pub async fn admin_cleanup(&self) {
let admin = self.connect_admin().await;
let admin = self.connect_admin().await.admin;
let results = admin
// The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist.
// So just make sure to run it against a group that does exist.
Expand Down Expand Up @@ -248,3 +200,54 @@ impl KafkaConsumerCpp {
assert_eq!(response.offset, message.offset());
}
}

pub struct KafkaAdminCpp {
// TODO: make private
pub admin: AdminClient<DefaultClientContext>,
}

impl KafkaAdminCpp {
pub async fn create_topics(&self, topics: &[super::NewTopic<'_>]) {
let topics: Vec<_> = topics
.iter()
.map(|topic| NewTopic {
name: topic.name,
num_partitions: topic.num_partitions,
replication: TopicReplication::Fixed(topic.replication_factor as i32),
config: vec![],
})
.collect();
self.admin
.create_topics(
&topics,
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
}

pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
let partitions: Vec<_> = partitions
.iter()
.map(|partition| NewPartitions {
topic_name: partition.topic_name,
new_partition_count: partition.new_partition_count as usize,
assignment: None,
})
.collect();
let results = self
.admin
.create_partitions(
&partitions,
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
let result = result.unwrap();
assert_eq!(result, "to_delete")
}
}
}
Loading

0 comments on commit 0f9c15e

Please sign in to comment.