From a691e606254e601b13de85e7ee7ae754392be2f9 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 6 Mar 2024 15:58:24 +1100 Subject: [PATCH] kafka_int_tests: abstract admin API --- .../tests/kafka_int_tests/test_cases.rs | 41 ++- test-helpers/src/connection/kafka/cpp.rs | 115 ++++---- test-helpers/src/connection/kafka/java.rs | 252 ++++++++---------- test-helpers/src/connection/kafka/mod.rs | 47 +++- 4 files changed, 250 insertions(+), 205 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 93f77ddeb..e5e5051af 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,4 +1,42 @@ -use test_helpers::connection::kafka::{ExpectedResponse, KafkaConnectionBuilder, Record}; +use test_helpers::connection::kafka::{ + ExpectedResponse, KafkaConnectionBuilder, NewPartition, NewTopic, Record, +}; + +async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; + admin + .create_topics(&[ + NewTopic { + name: "partitions1", + num_partitions: 1, + replication_factor: 1, + }, + NewTopic { + name: "paritions3", + num_partitions: 3, + replication_factor: 1, + }, + NewTopic { + name: "acks0", + num_partitions: 1, + replication_factor: 1, + }, + NewTopic { + name: "to_delete", + num_partitions: 1, + replication_factor: 1, + }, + ]) + .await; + + admin + .create_partitions(&[NewPartition { + // TODO: modify topic "foo" instead so that we can test our handling of that with interesting partition + replication count + topic_name: "to_delete", + new_partition_count: 2, + }]) + .await; +} async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name: &str, i: i64) { let producer = connection_builder.connect_producer(1).await; @@ -76,6 +114,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { } pub async fn basic(connection_builder: KafkaConnectionBuilder) { + admin_setup(&connection_builder).await; connection_builder.admin_setup().await; for i in 0..2 { produce_consume(&connection_builder, "partitions1", i).await; diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index 38da3b3d9..3d5680110 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -1,7 +1,7 @@ // Allow direct usage of the APIs when the feature is enabled pub use rdkafka; -use super::{ExpectedResponse, Record}; +use super::{ExpectedResponse, NewPartition, Record}; use rdkafka::admin::AdminClient; use rdkafka::admin::{ AdminOptions, AlterConfig, NewPartitions, NewTopic, OwnedResourceSpecifier, ResourceSpecifier, @@ -64,63 +64,14 @@ impl KafkaConnectionBuilderCpp { KafkaConsumerCpp { consumer } } - pub async fn connect_admin(&self) -> AdminClient { - self.client.create().unwrap() + pub async fn connect_admin(&self) -> KafkaAdminCpp { + let admin = self.client.create().unwrap(); + KafkaAdminCpp { admin } } + // TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted pub async fn admin_setup(&self) { - let admin = self.connect_admin().await; - admin - .create_topics( - &[ - NewTopic { - name: "partitions1", - num_partitions: 1, - replication: TopicReplication::Fixed(1), - config: vec![], - }, - NewTopic { - name: "paritions3", - num_partitions: 3, - replication: TopicReplication::Fixed(1), - config: vec![], - }, - NewTopic { - name: "acks0", - num_partitions: 1, - replication: TopicReplication::Fixed(1), - config: vec![], - }, - NewTopic { - name: "to_delete", - num_partitions: 1, - replication: TopicReplication::Fixed(1), - config: vec![], - }, - ], - &AdminOptions::new() - .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - - let results = admin - .create_partitions( - &[NewPartitions { - // TODO: modify topic "foo" instead so that we can test our handling of that with interesting partiton + replication count - topic_name: "to_delete", - new_partition_count: 2, - assignment: None, - }], - &AdminOptions::new() - .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - for result in results { - let result = result.unwrap(); - assert_eq!(result, "to_delete") - } + let admin = self.connect_admin().await.admin; let results = admin .describe_configs( @@ -171,8 +122,9 @@ impl KafkaConnectionBuilderCpp { } } + // TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted pub async fn admin_cleanup(&self) { - let admin = self.connect_admin().await; + let admin = self.connect_admin().await.admin; let results = admin // The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist. // So just make sure to run it against a group that does exist. @@ -248,3 +200,54 @@ impl KafkaConsumerCpp { assert_eq!(response.offset, message.offset()); } } + +pub struct KafkaAdminCpp { + // TODO: make private + pub admin: AdminClient, +} + +impl KafkaAdminCpp { + pub async fn create_topics(&self, topics: &[super::NewTopic<'_>]) { + let topics: Vec<_> = topics + .iter() + .map(|topic| NewTopic { + name: topic.name, + num_partitions: topic.num_partitions, + replication: TopicReplication::Fixed(topic.replication_factor as i32), + config: vec![], + }) + .collect(); + self.admin + .create_topics( + &topics, + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + } + + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { + let partitions: Vec<_> = partitions + .iter() + .map(|partition| NewPartitions { + topic_name: partition.topic_name, + new_partition_count: partition.new_partition_count as usize, + assignment: None, + }) + .collect(); + let results = self + .admin + .create_partitions( + &partitions, + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + for result in results { + let result = result.unwrap(); + assert_eq!(result, "to_delete") + } + } +} diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 6e11f070f..e0884abad 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,4 +1,4 @@ -use super::{ExpectedResponse, Record}; +use super::{ExpectedResponse, NewPartition, NewTopic, Record}; use j4rs::{Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact}; use std::{collections::HashMap, rc::Rc}; @@ -82,160 +82,21 @@ impl KafkaConnectionBuilderJava { KafkaConsumerJava {} } - pub async fn connect_admin(&self) -> Instance { + pub async fn connect_admin(&self) -> KafkaAdminJava { let properties = properties(&self.jvm, &self.base_config); - self.jvm + let admin = self + .jvm .invoke_static( "org.apache.kafka.clients.admin.Admin", "create", &[properties.into()], ) - .unwrap() - } - - pub async fn admin_setup(&self) { - let admin = self.connect_admin().await; - create_topics( - &self.jvm, - &admin, - &[ - NewTopic { - name: "partitions1", - num_partitions: 1, - replication_factor: 1, - }, - NewTopic { - name: "paritions3", - num_partitions: 3, - replication_factor: 1, - }, - NewTopic { - name: "acks0", - num_partitions: 1, - replication_factor: 1, - }, - NewTopic { - name: "to_delete", - num_partitions: 1, - replication_factor: 1, - }, - ], - ) - .await; - - create_partitions( - &self.jvm, - &admin, - &[NewPartition { - topic_name: "to_delete", - new_partition_count: 2, - }], - ) - .await; - } - - pub async fn admin_cleanup(&self) { - self.connect_admin().await; + .unwrap(); + let jvm = self.jvm.clone(); + KafkaAdminJava { jvm, admin } } } -struct NewTopic<'a> { - name: &'a str, - num_partitions: i32, - replication_factor: i16, -} - -struct NewPartition<'a> { - topic_name: &'a str, - new_partition_count: i32, -} - -async fn create_topics(jvm: &Jvm, admin_client: &Instance, topics: &[NewTopic<'_>]) { - let topics: Vec<_> = topics - .iter() - .map(|topic| { - jvm.create_instance( - "org.apache.kafka.clients.admin.NewTopic", - &[ - topic.name.try_into().unwrap(), - jvm.invoke_static( - "java.util.Optional", - "of", - &[InvocationArg::try_from(topic.num_partitions).unwrap()], - ) - .unwrap() - .into(), - jvm.invoke_static( - "java.util.Optional", - "of", - &[InvocationArg::try_from(topic.replication_factor).unwrap()], - ) - .unwrap() - .into(), - // TODO: can simplify to this once https://github.com/astonbitecode/j4rs/issues/91 is resolved - // InvocationArg::try_from(topic.num_partitions) - // .unwrap() - // .into_primitive() - // .unwrap(), - // InvocationArg::try_from(topic.replication_factor) - // .unwrap() - // .into_primitive() - // .unwrap(), - ], - ) - }) - .collect(); - let topics = jvm - .java_list("org.apache.kafka.clients.admin.NewTopic", topics) - .unwrap(); - - jvm.chain(admin_client) - .unwrap() - .invoke("createTopics", &[topics.into()]) - .unwrap() - .invoke("all", &[]) - .unwrap() - .invoke("get", &[]) - .unwrap() - .collect(); -} - -async fn create_partitions(jvm: &Jvm, admin_client: &Instance, partitions: &[NewPartition<'_>]) { - let partitions: HashMap<_, _> = partitions - .iter() - .map(|partition| { - ( - partition.topic_name, - jvm.invoke_static( - "org.apache.kafka.clients.admin.NewPartitions", - "increaseTo", - &[InvocationArg::try_from(partition.new_partition_count) - .unwrap() - .into_primitive() - .unwrap()], - ), - ) - }) - .collect(); - let partitions = jvm - .java_map( - "java.lang.String", - "org.apache.kafka.clients.admin.NewTopic", - partitions, - ) - .unwrap(); - - jvm.chain(admin_client) - .unwrap() - .invoke("createPartitions", &[partitions.into()]) - .unwrap() - .invoke("all", &[]) - .unwrap() - .invoke("get", &[]) - .unwrap() - .collect(); -} - pub struct KafkaProducerJava { _producer: Instance, } @@ -253,3 +114,102 @@ impl KafkaConsumerJava { tracing::error!("Unimplemented assert"); } } + +pub struct KafkaAdminJava { + jvm: Rc, + admin: Instance, +} + +impl KafkaAdminJava { + pub async fn create_topics(&self, topics: &[NewTopic<'_>]) { + let topics: Vec<_> = topics + .iter() + .map(|topic| { + self.jvm.create_instance( + "org.apache.kafka.clients.admin.NewTopic", + &[ + topic.name.try_into().unwrap(), + self.jvm + .invoke_static( + "java.util.Optional", + "of", + &[InvocationArg::try_from(topic.num_partitions).unwrap()], + ) + .unwrap() + .into(), + self.jvm + .invoke_static( + "java.util.Optional", + "of", + &[InvocationArg::try_from(topic.replication_factor).unwrap()], + ) + .unwrap() + .into(), + // TODO: can simplify to this once https://github.com/astonbitecode/j4rs/issues/91 is resolved + // InvocationArg::try_from(topic.num_partitions) + // .unwrap() + // .into_primitive() + // .unwrap(), + // InvocationArg::try_from(topic.replication_factor) + // .unwrap() + // .into_primitive() + // .unwrap(), + ], + ) + }) + .collect(); + let topics = self + .jvm + .java_list("org.apache.kafka.clients.admin.NewTopic", topics) + .unwrap(); + + self.jvm + .chain(&self.admin) + .unwrap() + .invoke("createTopics", &[topics.into()]) + .unwrap() + .invoke("all", &[]) + .unwrap() + .invoke("get", &[]) + .unwrap() + .collect(); + } + + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { + let partitions: HashMap<_, _> = partitions + .iter() + .map(|partition| { + ( + partition.topic_name, + self.jvm.invoke_static( + "org.apache.kafka.clients.admin.NewPartitions", + "increaseTo", + &[InvocationArg::try_from(partition.new_partition_count) + .unwrap() + .into_primitive() + .unwrap()], + ), + ) + }) + .collect(); + let partitions = self + .jvm + .java_map( + "java.lang.String", + "org.apache.kafka.clients.admin.NewTopic", + partitions, + ) + .unwrap(); + + self.jvm + .chain(&self.admin) + .unwrap() + .invoke("createPartitions", &[partitions.into()]) + .unwrap() + .invoke("all", &[]) + .unwrap() + .invoke("get", &[]) + .unwrap() + .collect(); + } +} diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 1ccc19338..5b787cbbd 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -51,11 +51,19 @@ impl KafkaConnectionBuilder { } } + pub async fn connect_admin(&self) -> KafkaAdmin { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + Self::Cpp(cpp) => KafkaAdmin::Cpp(cpp.connect_admin().await), + Self::Java(java) => KafkaAdmin::Java(java.connect_admin().await), + } + } + pub async fn admin_setup(&self) { match self { #[cfg(feature = "rdkafka-driver-tests")] Self::Cpp(cpp) => cpp.admin_setup().await, - Self::Java(java) => java.admin_setup().await, + Self::Java(_) => {} } } @@ -63,7 +71,7 @@ impl KafkaConnectionBuilder { match self { #[cfg(feature = "rdkafka-driver-tests")] Self::Cpp(cpp) => cpp.admin_cleanup().await, - Self::Java(java) => java.admin_cleanup().await, + Self::Java(_) => {} } } } @@ -112,3 +120,38 @@ pub struct ExpectedResponse<'a> { pub topic_name: &'a str, pub offset: i64, } + +pub enum KafkaAdmin { + #[cfg(feature = "rdkafka-driver-tests")] + Cpp(KafkaAdminCpp), + Java(KafkaAdminJava), +} + +impl KafkaAdmin { + pub async fn create_topics(&self, topics: &[NewTopic<'_>]) { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + KafkaAdmin::Cpp(cpp) => cpp.create_topics(topics).await, + KafkaAdmin::Java(java) => java.create_topics(topics).await, + } + } + + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + KafkaAdmin::Cpp(cpp) => cpp.create_partitions(partitions).await, + KafkaAdmin::Java(java) => java.create_partitions(partitions).await, + } + } +} + +pub struct NewTopic<'a> { + pub name: &'a str, + pub num_partitions: i32, + pub replication_factor: i16, +} + +pub struct NewPartition<'a> { + pub topic_name: &'a str, + pub new_partition_count: i32, +}