From 6dd6ce66e2686a92584a0c824818ffbc518601b2 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 29 Feb 2024 11:07:44 +1100 Subject: [PATCH] Add java kafka driver backend to kafka integration tests --- Cargo.lock | 68 +++++++++++++++++++ test-helpers/Cargo.toml | 1 + test-helpers/src/connection/kafka/java.rs | 52 ++++++++++++++ .../src/connection/{kafka.rs => kafka/mod.rs} | 3 + 4 files changed, 124 insertions(+) create mode 100644 test-helpers/src/connection/kafka/java.rs rename test-helpers/src/connection/{kafka.rs => kafka/mod.rs} (98%) diff --git a/Cargo.lock b/Cargo.lock index e42aea252..620585d03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -906,6 +906,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cfg-if" version = "1.0.0" @@ -1603,6 +1609,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "dunce" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" + [[package]] name = "dyn-clone" version = "1.0.17" @@ -1869,6 +1881,12 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures" version = "0.3.30" @@ -2419,6 +2437,45 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +[[package]] +name = "j4rs" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e35b9135c58ac74c31eab9be04c9eb4665bbd819dc58ae7273c60e8dfba25b0" +dependencies = [ + "cesu8", + "dirs", + "dunce", + "fs_extra", + "futures", + "glob", + "java-locator", + "jni-sys", + "lazy_static", + "libc", + "libloading", + "log", + "serde", + "serde_json", + "sha2", +] + +[[package]] +name = "java-locator" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90003f2fd9c52f212c21d8520f1128da0080bad6fff16b68fe6e7f2f0c3780c2" +dependencies = [ + "glob", + "lazy_static", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "js-sys" version = "0.3.68" @@ -2463,6 +2520,16 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "libloading" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c571b676ddfc9a8c12f1f3d3085a7b163966a8fd8098a90640953ce5f6170161" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "libm" version = "0.2.8" @@ -4680,6 +4747,7 @@ dependencies = [ "cdrs-tokio", "docker-compose-runner", "itertools 0.12.1", + "j4rs", "openssl", "ordered-float", "rcgen", diff --git a/test-helpers/Cargo.toml b/test-helpers/Cargo.toml index d659ffb0f..20b75887f 100644 --- a/test-helpers/Cargo.toml +++ b/test-helpers/Cargo.toml @@ -33,3 +33,4 @@ anyhow.workspace = true rcgen.workspace = true rdkafka = { version = "0.36", features = ["cmake-build"], optional = true } docker-compose-runner = "0.3.0" +j4rs = "0.17.2" diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs new file mode 100644 index 000000000..a739d1079 --- /dev/null +++ b/test-helpers/src/connection/kafka/java.rs @@ -0,0 +1,52 @@ +use j4rs::{Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact}; + +pub fn run(address: &str) { + let jvm: Jvm = JvmBuilder::new().build().unwrap(); + + // specify maven dep for kafka-clients and all of its dependencies since j4rs does not support dependency resolution + // The list of dependencies can be found here: https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.pom6 + // These are deployed to and loaded from a path like target/debug/jassets + jvm.deploy_artifact(&MavenArtifact::from("org.slf4j:slf4j-api:1.7.36")) + .unwrap(); + jvm.deploy_artifact(&MavenArtifact::from("org.slf4j:slf4j-simple:1.7.36")) + .unwrap(); + jvm.deploy_artifact(&MavenArtifact::from("org.apache.kafka:kafka-clients:3.7.0")) + .unwrap(); + + let properties = properties( + &jvm, + &[ + ( + "key.serializer", + "org.apache.kafka.common.serialization.StringSerializer", + ), + ( + "value.serializer", + "org.apache.kafka.common.serialization.StringSerializer", + ), + ("bootstrap.servers", address), + ], + ); + let _producer = jvm + .create_instance( + "org.apache.kafka.clients.producer.KafkaProducer", + &[properties.into()], + ) + .unwrap(); +} + +fn properties(jvm: &Jvm, props: &[(&str, &str)]) -> Instance { + let properties = jvm.create_instance("java.util.Properties", &[]).unwrap(); + for (key, value) in props { + jvm.invoke( + &properties, + "setProperty", + &[ + InvocationArg::try_from(*key).unwrap(), + InvocationArg::try_from(*value).unwrap(), + ], + ) + .unwrap(); + } + properties +} diff --git a/test-helpers/src/connection/kafka.rs b/test-helpers/src/connection/kafka/mod.rs similarity index 98% rename from test-helpers/src/connection/kafka.rs rename to test-helpers/src/connection/kafka/mod.rs index 53e75e3c9..81e5fc955 100644 --- a/test-helpers/src/connection/kafka.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -1,3 +1,5 @@ +mod java; + // Allow direct usage of the APIs when the feature is enabled pub use rdkafka; @@ -33,6 +35,7 @@ impl KafkaConnectionBuilder { } pub async fn connect_producer(&self, acks: i32) -> KafkaProducer { + java::run(self.client.get("bootstrap.servers").unwrap()); KafkaProducer { producer: self .client