Skip to content

Commit

Permalink
Add java kafka driver backend to kafka integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Feb 29, 2024
1 parent 384dc38 commit 6dd6ce6
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 0 deletions.
68 changes: 68 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions test-helpers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ anyhow.workspace = true
rcgen.workspace = true
rdkafka = { version = "0.36", features = ["cmake-build"], optional = true }
docker-compose-runner = "0.3.0"
j4rs = "0.17.2"
52 changes: 52 additions & 0 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use j4rs::{Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact};

pub fn run(address: &str) {
let jvm: Jvm = JvmBuilder::new().build().unwrap();

// specify maven dep for kafka-clients and all of its dependencies since j4rs does not support dependency resolution
// The list of dependencies can be found here: https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.pom6
// These are deployed to and loaded from a path like target/debug/jassets
jvm.deploy_artifact(&MavenArtifact::from("org.slf4j:slf4j-api:1.7.36"))
.unwrap();
jvm.deploy_artifact(&MavenArtifact::from("org.slf4j:slf4j-simple:1.7.36"))
.unwrap();
jvm.deploy_artifact(&MavenArtifact::from("org.apache.kafka:kafka-clients:3.7.0"))
.unwrap();

let properties = properties(
&jvm,
&[
(
"key.serializer",
"org.apache.kafka.common.serialization.StringSerializer",
),
(
"value.serializer",
"org.apache.kafka.common.serialization.StringSerializer",
),
("bootstrap.servers", address),
],
);
let _producer = jvm
.create_instance(
"org.apache.kafka.clients.producer.KafkaProducer",
&[properties.into()],
)
.unwrap();
}

fn properties(jvm: &Jvm, props: &[(&str, &str)]) -> Instance {
let properties = jvm.create_instance("java.util.Properties", &[]).unwrap();
for (key, value) in props {
jvm.invoke(
&properties,
"setProperty",
&[
InvocationArg::try_from(*key).unwrap(),
InvocationArg::try_from(*value).unwrap(),
],
)
.unwrap();
}
properties
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
mod java;

// Allow direct usage of the APIs when the feature is enabled
pub use rdkafka;

Expand Down Expand Up @@ -33,6 +35,7 @@ impl KafkaConnectionBuilder {
}

pub async fn connect_producer(&self, acks: i32) -> KafkaProducer {
java::run(self.client.get("bootstrap.servers").unwrap());
KafkaProducer {
producer: self
.client
Expand Down

0 comments on commit 6dd6ce6

Please sign in to comment.