Skip to content

Commit

Permalink
More java driver
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 7, 2024
1 parent bdb02d5 commit 711d1b2
Show file tree
Hide file tree
Showing 4 changed files with 293 additions and 69 deletions.
22 changes: 20 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use test_helpers::connection::kafka::{
ExpectedResponse, KafkaConnectionBuilder, NewPartition, NewTopic, Record,
AlterConfig, ConfigEntry, ExpectedResponse, KafkaConnectionBuilder, NewPartition, NewTopic,
Record, ResourceSpecifier,
};

async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -36,6 +37,24 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
new_partition_count: 2,
}])
.await;

admin
// TODO: test ResourceSpecifier::Broker and ResourceSpecifier::Group as well.
// Will need to find a way to get a valid broker id and to create a group.
.describe_configs(&[ResourceSpecifier::Topic("to_delete")])
.await;

admin
.alter_configs(&[AlterConfig {
specifier: ResourceSpecifier::Topic("to_delete"),
entries: &[ConfigEntry {
key: "delete.retention.ms".to_owned(),
value: "86400001".to_owned(),
}],
}])
.await;

admin.delete_topics(&["to_delete"]).await
}

async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name: &str, i: i64) {
Expand Down Expand Up @@ -115,7 +134,6 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {

pub async fn basic(connection_builder: KafkaConnectionBuilder) {
admin_setup(&connection_builder).await;
connection_builder.admin_setup().await;
for i in 0..2 {
produce_consume(&connection_builder, "partitions1", i).await;
produce_consume(&connection_builder, "partitions3", i).await;
Expand Down
163 changes: 107 additions & 56 deletions test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,59 +69,6 @@ impl KafkaConnectionBuilderCpp {
KafkaAdminCpp { admin }
}

// TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted
pub async fn admin_setup(&self) {
let admin = self.connect_admin().await.admin;

let results = admin
.describe_configs(
// TODO: test ResourceSpecifier::Broker and ResourceSpecifier::Group as well.
// Will need to find a way to get a valid broker id and to create a group.
&[ResourceSpecifier::Topic("to_delete")],
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
let result = result.unwrap();
assert_eq!(
result.specifier,
OwnedResourceSpecifier::Topic("to_delete".to_owned())
);
}

let results = admin
.alter_configs(
&[AlterConfig {
specifier: ResourceSpecifier::Topic("to_delete"),
entries: [("foo", "bar")].into(),
}],
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
assert_eq!(
result.unwrap(),
OwnedResourceSpecifier::Topic("to_delete".to_owned())
);
}

let results = admin
.delete_topics(
&["to_delete"],
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();
for result in results {
assert_eq!(result.unwrap(), "to_delete");
}
}

// TODO: support for these admin operations needs to be added to the java driver wrapper and then this method can be deleted
pub async fn admin_cleanup(&self) {
let admin = self.connect_admin().await.admin;
Expand Down Expand Up @@ -245,9 +192,113 @@ impl KafkaAdminCpp {
)
.await
.unwrap();
for result in results {
let result = result.unwrap();
assert_eq!(result, "to_delete")

let mut results: Vec<_> = results.into_iter().map(|x| x.unwrap()).collect();
for partition in partitions {
if let Some(i) = results.iter().position(|x| x == partition.topic_name) {
results.remove(i);
} else {
panic!("topic {} not in results", partition.topic_name)
}
}
assert!(results.is_empty());
}

pub async fn describe_configs(&self, resources: &[super::ResourceSpecifier<'_>]) {
let resources: Vec<_> = resources.iter().map(resource_specifier).collect();
let results = self
.admin
.describe_configs(
&resources,
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();

let mut results: Vec<_> = results.into_iter().map(|x| x.unwrap()).collect();
for resource in resources {
if let Some(i) = results
.iter()
.position(|x| resource_specifier_ref(&x.specifier) == resource)
{
results.remove(i);
} else {
panic!("resource {:?} not in results", resource)
}
}
assert!(results.is_empty());
}

pub async fn alter_configs(&self, alter_configs: &[super::AlterConfig<'_>]) {
let alter_configs: Vec<_> = alter_configs
.iter()
.map(|alter_config| AlterConfig {
specifier: resource_specifier(&alter_config.specifier),
entries: alter_config
.entries
.iter()
.map(|entry| (entry.key.as_str(), entry.value.as_str()))
.collect(),
})
.collect();
let results = self
.admin
.alter_configs(
&alter_configs,
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();

let mut results: Vec<_> = results.into_iter().map(|x| x.unwrap()).collect();
for alter_config in alter_configs {
if let Some(i) = results
.iter()
.position(|x| resource_specifier_ref(x) == alter_config.specifier)
{
results.remove(i);
} else {
panic!("resource {:?} not in results", alter_config.specifier)
}
}
assert!(results.is_empty());
}

pub async fn delete_topics(&self, to_delete: &[&str]) {
let results = self
.admin
.delete_topics(
to_delete,
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
.await
.unwrap();

let mut results: Vec<_> = results.into_iter().map(|x| x.unwrap()).collect();
for to_delete in to_delete {
if let Some(i) = results.iter().position(|x| x == to_delete) {
results.remove(i);
} else {
panic!("topic {} not in results", to_delete)
}
}
assert!(results.is_empty());
}
}

fn resource_specifier<'a>(specifier: &'a super::ResourceSpecifier<'a>) -> ResourceSpecifier<'a> {
match specifier {
super::ResourceSpecifier::Topic(topic) => ResourceSpecifier::Topic(topic),
}
}

fn resource_specifier_ref(specifier: &OwnedResourceSpecifier) -> ResourceSpecifier {
match specifier {
OwnedResourceSpecifier::Topic(topic) => ResourceSpecifier::Topic(topic),
OwnedResourceSpecifier::Group(group) => ResourceSpecifier::Group(group),
OwnedResourceSpecifier::Broker(broker) => ResourceSpecifier::Broker(*broker),
}
}
131 changes: 128 additions & 3 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{ExpectedResponse, NewPartition, NewTopic, Record};
use j4rs::{Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact};
use super::{AlterConfig, ExpectedResponse, NewPartition, NewTopic, Record, ResourceSpecifier};
use j4rs::{errors::J4RsError, Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact};
use std::{collections::HashMap, rc::Rc};

fn properties(jvm: &Jvm, props: &HashMap<String, String>) -> Instance {
Expand Down Expand Up @@ -96,7 +96,6 @@ impl KafkaConnectionBuilderJava {
KafkaAdminJava { jvm, admin }
}
}

pub struct KafkaProducerJava {
_producer: Instance,
}
Expand Down Expand Up @@ -175,6 +174,24 @@ impl KafkaAdminJava {
.collect();
}

pub async fn delete_topics(&self, to_delete: &[&str]) {
let topics = self
.jvm
.java_list("java.lang.String", to_delete.to_vec())
.unwrap();

self.jvm
.chain(&self.admin)
.unwrap()
.invoke("deleteTopics", &[topics.into()])
.unwrap()
.invoke("all", &[])
.unwrap()
.invoke("get", &[])
.unwrap()
.collect();
}

pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
let partitions: HashMap<_, _> = partitions
.iter()
Expand Down Expand Up @@ -212,4 +229,112 @@ impl KafkaAdminJava {
.unwrap()
.collect();
}

pub async fn describe_configs(&self, resources: &[ResourceSpecifier<'_>]) {
let resource_type = self
.jvm
.static_class("org.apache.kafka.common.config.ConfigResource$Type")
.unwrap();

let resources: Vec<_> = resources
.iter()
.map(|resource| {
self.jvm.create_instance(
"org.apache.kafka.common.config.ConfigResource",
&match resource {
ResourceSpecifier::Topic(topic) => [
self.jvm.field(&resource_type, "TOPIC").unwrap().into(),
InvocationArg::try_from(*topic).unwrap(),
],
},
)
})
.collect();

let resources = self
.jvm
.java_list("org.apache.kafka.common.config.ConfigResource", resources)
.unwrap();

self.jvm
.chain(&self.admin)
.unwrap()
.invoke("describeConfigs", &[resources.into()])
.unwrap()
.invoke("all", &[])
.unwrap()
.invoke("get", &[])
.unwrap()
.collect();
}

pub async fn alter_configs(&self, alter_configs: &[AlterConfig<'_>]) {
let resource_type = self
.jvm
.static_class("org.apache.kafka.common.config.ConfigResource$Type")
.unwrap();

let alter_configs: Vec<_> = alter_configs
.iter()
.map(|alter_config| {
let entries: Vec<Result<Instance, J4RsError>> = alter_config
.entries
.iter()
.map(|entry| {
self.jvm.create_instance(
"org.apache.kafka.clients.admin.ConfigEntry",
&[
InvocationArg::try_from(entry.key.as_str()).unwrap(),
InvocationArg::try_from(entry.value.as_str()).unwrap(),
],
)
})
.collect();
(
self.jvm
.create_instance(
"org.apache.kafka.common.config.ConfigResource",
&match &alter_config.specifier {
ResourceSpecifier::Topic(topic) => [
self.jvm.field(&resource_type, "TOPIC").unwrap().into(),
InvocationArg::try_from(*topic).unwrap(),
],
},
)
.unwrap(),
self.jvm
.create_instance(
"org.apache.kafka.clients.admin.Config",
&[self
.jvm
.java_list("org.apache.kafka.clients.admin.ConfigEntry", entries)
.unwrap()
.into()],
)
.unwrap(),
)
})
.collect();

let alter_configs = self.java_map(alter_configs);

self.jvm
.chain(&self.admin)
.unwrap()
.invoke("alterConfigs", &[alter_configs.into()])
.unwrap()
.invoke("all", &[])
.unwrap()
.invoke("get", &[])
.unwrap()
.collect();
}

fn java_map(&self, key_values: Vec<(Instance, Instance)>) -> Instance {
let map = self.jvm.create_instance("java.util.HashMap", &[]).unwrap();
for (k, v) in key_values {
self.jvm.invoke(&map, "put", &[k.into(), v.into()]).unwrap();
}
map
}
}
Loading

0 comments on commit 711d1b2

Please sign in to comment.