diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index e5e5051af..edff3e199 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,5 +1,6 @@ use test_helpers::connection::kafka::{ - ExpectedResponse, KafkaConnectionBuilder, NewPartition, NewTopic, Record, + AlterConfig, ConfigEntry, ExpectedResponse, KafkaConnectionBuilder, NewPartition, NewTopic, + Record, ResourceSpecifier, }; async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { @@ -36,6 +37,24 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { new_partition_count: 2, }]) .await; + + admin + // TODO: test ResourceSpecifier::Broker and ResourceSpecifier::Group as well. + // Will need to find a way to get a valid broker id and to create a group. + .describe_configs(&[ResourceSpecifier::Topic("to_delete")]) + .await; + + admin + .alter_configs(&[AlterConfig { + specifier: ResourceSpecifier::Topic("to_delete"), + entries: &[ConfigEntry { + key: "delete.retention.ms".to_owned(), + value: "86400001".to_owned(), + }], + }]) + .await; + + admin.delete_topics(&["to_delete"]).await } async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name: &str, i: i64) { @@ -115,7 +134,6 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { pub async fn basic(connection_builder: KafkaConnectionBuilder) { admin_setup(&connection_builder).await; - connection_builder.admin_setup().await; for i in 0..2 { produce_consume(&connection_builder, "partitions1", i).await; produce_consume(&connection_builder, "partitions3", i).await; diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index 3d5680110..20eecdeb8 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -69,59 +69,6 @@ impl KafkaConnectionBuilderCpp { KafkaAdminCpp { admin } } - // TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted - pub async fn admin_setup(&self) { - let admin = self.connect_admin().await.admin; - - let results = admin - .describe_configs( - // TODO: test ResourceSpecifier::Broker and ResourceSpecifier::Group as well. - // Will need to find a way to get a valid broker id and to create a group. - &[ResourceSpecifier::Topic("to_delete")], - &AdminOptions::new() - .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - for result in results { - let result = result.unwrap(); - assert_eq!( - result.specifier, - OwnedResourceSpecifier::Topic("to_delete".to_owned()) - ); - } - - let results = admin - .alter_configs( - &[AlterConfig { - specifier: ResourceSpecifier::Topic("to_delete"), - entries: [("foo", "bar")].into(), - }], - &AdminOptions::new() - .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - for result in results { - assert_eq!( - result.unwrap(), - OwnedResourceSpecifier::Topic("to_delete".to_owned()) - ); - } - - let results = admin - .delete_topics( - &["to_delete"], - &AdminOptions::new() - .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - for result in results { - assert_eq!(result.unwrap(), "to_delete"); - } - } - // TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted pub async fn admin_cleanup(&self) { let admin = self.connect_admin().await.admin; @@ -245,9 +192,113 @@ impl KafkaAdminCpp { ) .await .unwrap(); - for result in results { - let result = result.unwrap(); - assert_eq!(result, "to_delete") + + let mut results: Vec<_> = results.into_iter().map(|x| x.unwrap()).collect(); + for partition in partitions { + if let Some(i) = results.iter().position(|x| x == partition.topic_name) { + results.remove(i); + } else { + panic!("topic {} not in results", partition.topic_name) + } } + assert!(results.is_empty()); + } + + pub async fn describe_configs(&self, resources: &[super::ResourceSpecifier<'_>]) { + let resources: Vec<_> = resources.iter().map(resource_specifier).collect(); + let results = self + .admin + .describe_configs( + &resources, + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + + let mut results: Vec<_> = results.into_iter().map(|x| x.unwrap()).collect(); + for resource in resources { + if let Some(i) = results + .iter() + .position(|x| resource_specifier_ref(&x.specifier) == resource) + { + results.remove(i); + } else { + panic!("resource {:?} not in results", resource) + } + } + assert!(results.is_empty()); + } + + pub async fn alter_configs(&self, alter_configs: &[super::AlterConfig<'_>]) { + let alter_configs: Vec<_> = alter_configs + .iter() + .map(|alter_config| AlterConfig { + specifier: resource_specifier(&alter_config.specifier), + entries: alter_config + .entries + .iter() + .map(|entry| (entry.key.as_str(), entry.value.as_str())) + .collect(), + }) + .collect(); + let results = self + .admin + .alter_configs( + &alter_configs, + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + + let mut results: Vec<_> = results.into_iter().map(|x| x.unwrap()).collect(); + for alter_config in alter_configs { + if let Some(i) = results + .iter() + .position(|x| resource_specifier_ref(x) == alter_config.specifier) + { + results.remove(i); + } else { + panic!("resource {:?} not in results", alter_config.specifier) + } + } + assert!(results.is_empty()); + } + + pub async fn delete_topics(&self, to_delete: &[&str]) { + let results = self + .admin + .delete_topics( + to_delete, + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + + let mut results: Vec<_> = results.into_iter().map(|x| x.unwrap()).collect(); + for to_delete in to_delete { + if let Some(i) = results.iter().position(|x| x == to_delete) { + results.remove(i); + } else { + panic!("topic {} not in results", to_delete) + } + } + assert!(results.is_empty()); + } +} + +fn resource_specifier<'a>(specifier: &'a super::ResourceSpecifier<'a>) -> ResourceSpecifier<'a> { + match specifier { + super::ResourceSpecifier::Topic(topic) => ResourceSpecifier::Topic(topic), + } +} + +fn resource_specifier_ref(specifier: &OwnedResourceSpecifier) -> ResourceSpecifier { + match specifier { + OwnedResourceSpecifier::Topic(topic) => ResourceSpecifier::Topic(topic), + OwnedResourceSpecifier::Group(group) => ResourceSpecifier::Group(group), + OwnedResourceSpecifier::Broker(broker) => ResourceSpecifier::Broker(*broker), } } diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index e0884abad..9fbd6db58 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,5 +1,5 @@ -use super::{ExpectedResponse, NewPartition, NewTopic, Record}; -use j4rs::{Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact}; +use super::{AlterConfig, ExpectedResponse, NewPartition, NewTopic, Record, ResourceSpecifier}; +use j4rs::{errors::J4RsError, Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact}; use std::{collections::HashMap, rc::Rc}; fn properties(jvm: &Jvm, props: &HashMap) -> Instance { @@ -96,7 +96,6 @@ impl KafkaConnectionBuilderJava { KafkaAdminJava { jvm, admin } } } - pub struct KafkaProducerJava { _producer: Instance, } @@ -175,6 +174,24 @@ impl KafkaAdminJava { .collect(); } + pub async fn delete_topics(&self, to_delete: &[&str]) { + let topics = self + .jvm + .java_list("java.lang.String", to_delete.to_vec()) + .unwrap(); + + self.jvm + .chain(&self.admin) + .unwrap() + .invoke("deleteTopics", &[topics.into()]) + .unwrap() + .invoke("all", &[]) + .unwrap() + .invoke("get", &[]) + .unwrap() + .collect(); + } + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { let partitions: HashMap<_, _> = partitions .iter() @@ -212,4 +229,112 @@ impl KafkaAdminJava { .unwrap() .collect(); } + + pub async fn describe_configs(&self, resources: &[ResourceSpecifier<'_>]) { + let resource_type = self + .jvm + .static_class("org.apache.kafka.common.config.ConfigResource$Type") + .unwrap(); + + let resources: Vec<_> = resources + .iter() + .map(|resource| { + self.jvm.create_instance( + "org.apache.kafka.common.config.ConfigResource", + &match resource { + ResourceSpecifier::Topic(topic) => [ + self.jvm.field(&resource_type, "TOPIC").unwrap().into(), + InvocationArg::try_from(*topic).unwrap(), + ], + }, + ) + }) + .collect(); + + let resources = self + .jvm + .java_list("org.apache.kafka.common.config.ConfigResource", resources) + .unwrap(); + + self.jvm + .chain(&self.admin) + .unwrap() + .invoke("describeConfigs", &[resources.into()]) + .unwrap() + .invoke("all", &[]) + .unwrap() + .invoke("get", &[]) + .unwrap() + .collect(); + } + + pub async fn alter_configs(&self, alter_configs: &[AlterConfig<'_>]) { + let resource_type = self + .jvm + .static_class("org.apache.kafka.common.config.ConfigResource$Type") + .unwrap(); + + let alter_configs: Vec<_> = alter_configs + .iter() + .map(|alter_config| { + let entries: Vec> = alter_config + .entries + .iter() + .map(|entry| { + self.jvm.create_instance( + "org.apache.kafka.clients.admin.ConfigEntry", + &[ + InvocationArg::try_from(entry.key.as_str()).unwrap(), + InvocationArg::try_from(entry.value.as_str()).unwrap(), + ], + ) + }) + .collect(); + ( + self.jvm + .create_instance( + "org.apache.kafka.common.config.ConfigResource", + &match &alter_config.specifier { + ResourceSpecifier::Topic(topic) => [ + self.jvm.field(&resource_type, "TOPIC").unwrap().into(), + InvocationArg::try_from(*topic).unwrap(), + ], + }, + ) + .unwrap(), + self.jvm + .create_instance( + "org.apache.kafka.clients.admin.Config", + &[self + .jvm + .java_list("org.apache.kafka.clients.admin.ConfigEntry", entries) + .unwrap() + .into()], + ) + .unwrap(), + ) + }) + .collect(); + + let alter_configs = self.java_map(alter_configs); + + self.jvm + .chain(&self.admin) + .unwrap() + .invoke("alterConfigs", &[alter_configs.into()]) + .unwrap() + .invoke("all", &[]) + .unwrap() + .invoke("get", &[]) + .unwrap() + .collect(); + } + + fn java_map(&self, key_values: Vec<(Instance, Instance)>) -> Instance { + let map = self.jvm.create_instance("java.util.HashMap", &[]).unwrap(); + for (k, v) in key_values { + self.jvm.invoke(&map, "put", &[k.into(), v.into()]).unwrap(); + } + map + } } diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 5b787cbbd..e270dff6e 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -59,14 +59,6 @@ impl KafkaConnectionBuilder { } } - pub async fn admin_setup(&self) { - match self { - #[cfg(feature = "rdkafka-driver-tests")] - Self::Cpp(cpp) => cpp.admin_setup().await, - Self::Java(_) => {} - } - } - pub async fn admin_cleanup(&self) { match self { #[cfg(feature = "rdkafka-driver-tests")] @@ -136,6 +128,14 @@ impl KafkaAdmin { } } + pub async fn delete_topics(&self, to_delete: &[&str]) { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + Self::Cpp(cpp) => cpp.delete_topics(to_delete).await, + Self::Java(java) => java.delete_topics(to_delete).await, + } + } + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { match self { #[cfg(feature = "rdkafka-driver-tests")] @@ -143,6 +143,22 @@ impl KafkaAdmin { KafkaAdmin::Java(java) => java.create_partitions(partitions).await, } } + + pub async fn describe_configs(&self, resources: &[ResourceSpecifier<'_>]) { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + KafkaAdmin::Cpp(cpp) => cpp.describe_configs(resources).await, + KafkaAdmin::Java(java) => java.describe_configs(resources).await, + } + } + + pub async fn alter_configs(&self, alter_configs: &[AlterConfig<'_>]) { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + KafkaAdmin::Cpp(cpp) => cpp.alter_configs(alter_configs).await, + KafkaAdmin::Java(java) => java.alter_configs(alter_configs).await, + } + } } pub struct NewTopic<'a> { @@ -155,3 +171,17 @@ pub struct NewPartition<'a> { pub topic_name: &'a str, pub new_partition_count: i32, } + +pub enum ResourceSpecifier<'a> { + Topic(&'a str), +} + +pub struct AlterConfig<'a> { + pub specifier: ResourceSpecifier<'a>, + pub entries: &'a [ConfigEntry], +} + +pub struct ConfigEntry { + pub key: String, + pub value: String, +}