diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 34672307a..fd4418f60 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -9,7 +9,7 @@ use test_helpers::docker_compose::docker_compose; #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] #[case::java(KafkaDriver::Java)] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn passthrough_standard(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml"); @@ -31,7 +31,7 @@ async fn passthrough_standard(#[case] driver: KafkaDriver) { #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] #[case::java(KafkaDriver::Java)] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn passthrough_tls(#[case] driver: KafkaDriver) { test_helpers::cert::generate_kafka_test_certs(); @@ -52,10 +52,11 @@ async fn passthrough_tls(#[case] driver: KafkaDriver) { .expect("Shotover did not shutdown within 10s"); } +#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -#[case::java(KafkaDriver::Java)] -#[tokio::test] +// #[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_tls(#[case] driver: KafkaDriver) { test_helpers::cert::generate_kafka_test_certs(); @@ -80,7 +81,7 @@ async fn cluster_tls(#[case] driver: KafkaDriver) { #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] #[case::java(KafkaDriver::Java)] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn passthrough_encode(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml"); @@ -97,7 +98,7 @@ async fn passthrough_encode(#[case] driver: KafkaDriver) { #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] #[case::java(KafkaDriver::Java)] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn passthrough_sasl(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml"); @@ -116,7 +117,7 @@ async fn passthrough_sasl(#[case] driver: KafkaDriver) { #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] #[case::java(KafkaDriver::Java)] -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml"); @@ -132,10 +133,11 @@ async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) { shotover.shutdown_and_then_consume_events(&[]).await; } +#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -#[case::java(KafkaDriver::Java)] -#[tokio::test] +// #[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml"); @@ -154,9 +156,10 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { .expect("Shotover did not shutdown within 10s"); } +#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -#[case::java(KafkaDriver::Java)] +// #[case::java(KafkaDriver::Java)] #[tokio::test(flavor = "multi_thread")] async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) { let _docker_compose = @@ -189,10 +192,11 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) { } } +#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -#[case::java(KafkaDriver::Java)] -#[tokio::test] +// #[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml"); @@ -212,10 +216,11 @@ async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) { .expect("Shotover did not shutdown within 10s"); } +#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -#[case::java(KafkaDriver::Java)] -#[tokio::test] +//#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml"); diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index edff3e199..0a9436be7 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -81,7 +81,7 @@ async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name ) .await; - let consumer = connection_builder.connect_consumer(topic_name).await; + let mut consumer = connection_builder.connect_consumer(topic_name).await; consumer .assert_consume(ExpectedResponse { @@ -118,7 +118,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { .await; } - let consumer = connection_builder.connect_consumer(topic_name).await; + let mut consumer = connection_builder.connect_consumer(topic_name).await; for j in 0..10 { consumer diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 9fbd6db58..1682ae52d 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,6 +1,9 @@ use super::{AlterConfig, ExpectedResponse, NewPartition, NewTopic, Record, ResourceSpecifier}; use j4rs::{errors::J4RsError, Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact}; -use std::{collections::HashMap, rc::Rc}; +use std::{ + collections::{HashMap, VecDeque}, + rc::Rc, +}; fn properties(jvm: &Jvm, props: &HashMap) -> Instance { let properties = jvm.create_instance("java.util.Properties", &[]).unwrap(); @@ -73,13 +76,52 @@ impl KafkaConnectionBuilderJava { &[properties.into()], ) .unwrap(); - KafkaProducerJava { - _producer: producer, - } + + let jvm = self.jvm.clone(); + KafkaProducerJava { jvm, producer } } - pub async fn connect_consumer(&self, _topic_name: &str) -> KafkaConsumerJava { - KafkaConsumerJava {} + pub async fn connect_consumer(&self, topic_name: &str) -> KafkaConsumerJava { + let mut config = self.base_config.clone(); + config.insert("group.id".to_owned(), "some_group".to_owned()); + config.insert("session.timeout.ms".to_owned(), "6000".to_owned()); + config.insert("auto.offset.reset".to_owned(), "earliest".to_owned()); + config.insert("enable.auto.commit".to_owned(), "false".to_owned()); + config.insert( + "key.deserializer".to_owned(), + "org.apache.kafka.common.serialization.StringDeserializer".to_owned(), + ); + config.insert( + "value.deserializer".to_owned(), + "org.apache.kafka.common.serialization.StringDeserializer".to_owned(), + ); + + let properties = properties(&self.jvm, &config); + let consumer = self + .jvm + .create_instance( + "org.apache.kafka.clients.consumer.KafkaConsumer", + &[properties.into()], + ) + .unwrap(); + self.jvm + .invoke( + &consumer, + "subscribe", + &[self + .jvm + .java_list("java.lang.String", vec![topic_name]) + .unwrap() + .into()], + ) + .unwrap(); + + let jvm = self.jvm.clone(); + KafkaConsumerJava { + jvm, + consumer, + waiting_records: VecDeque::new(), + } } pub async fn connect_admin(&self) -> KafkaAdminJava { @@ -97,20 +139,156 @@ impl KafkaConnectionBuilderJava { } } pub struct KafkaProducerJava { - _producer: Instance, + jvm: Rc, + producer: Instance, } impl KafkaProducerJava { - pub async fn assert_produce(&self, _record: Record<'_>, _expected_offset: Option) { - tracing::error!("Unimplemented assert"); + pub async fn assert_produce(&self, record: Record<'_>, expected_offset: Option) { + let record = match record.key { + Some(key) => self + .jvm + .create_instance( + "org.apache.kafka.clients.producer.ProducerRecord", + &[ + InvocationArg::try_from(record.topic_name).unwrap(), + InvocationArg::try_from(key).unwrap(), + InvocationArg::try_from(record.payload).unwrap(), + ], + ) + .unwrap(), + None => self + .jvm + .create_instance( + "org.apache.kafka.clients.producer.ProducerRecord", + &[ + InvocationArg::try_from(record.topic_name).unwrap(), + InvocationArg::try_from(record.payload).unwrap(), + ], + ) + .unwrap(), + }; + let result = self + .jvm + .invoke_async(&self.producer, "send", &[record.into()]) + .await + .unwrap(); + let actual_offset: i64 = self + .jvm + .chain(&result) + .unwrap() + .cast("org.apache.kafka.clients.producer.RecordMetadata") + .unwrap() + .invoke("offset", &[]) + .unwrap() + .to_rust() + .unwrap(); + + if let Some(expected_offset) = expected_offset { + assert_eq!(expected_offset, actual_offset); + } } } -pub struct KafkaConsumerJava {} +pub struct KafkaConsumerJava { + jvm: Rc, + consumer: Instance, + waiting_records: VecDeque, +} impl KafkaConsumerJava { - pub async fn assert_consume(&self, _response: ExpectedResponse<'_>) { - tracing::error!("Unimplemented assert"); + pub async fn assert_consume(&mut self, expected_response: ExpectedResponse<'_>) { + // This method asserts that we have consumed a single record from the broker. + // Internally we may have actually received multiple records from the broker. + // But that is hidden from the test by storing any extra messages for use in the next call to `assert_consume` + + if self.waiting_records.is_empty() { + self.fetch_from_broker(); + } + + self.process_one_fetched_record(expected_response); + } + + fn fetch_from_broker(&mut self) { + let timeout = self + .jvm + .invoke_static( + "java.time.Duration", + "ofSeconds", + &[InvocationArg::try_from(30_i64) + .unwrap() + .into_primitive() + .unwrap()], + ) + .unwrap(); + + let result = tokio::task::block_in_place(|| { + self.jvm + .invoke(&self.consumer, "poll", &[timeout.into()]) + .unwrap() + }); + + let iterator = JavaIterator::new(self.jvm.invoke(&result, "iterator", &[]).unwrap()); + while let Some(record) = iterator.next(&self.jvm) { + let record = self + .jvm + .cast(&record, "org.apache.kafka.clients.consumer.ConsumerRecord") + .unwrap(); + self.waiting_records.push_front(record); + } + } + + fn process_one_fetched_record(&mut self, expected_response: ExpectedResponse<'_>) { + let record = self + .waiting_records + .pop_back() + .expect("KafkaConsumer.poll timed out"); + + let offset: i64 = self + .jvm + .chain(&record) + .unwrap() + .invoke("offset", &[]) + .unwrap() + .to_rust() + .unwrap(); + assert_eq!(expected_response.offset, offset); + + let topic: String = self + .jvm + .chain(&record) + .unwrap() + .invoke("topic", &[]) + .unwrap() + .to_rust() + .unwrap(); + assert_eq!(expected_response.topic_name, topic); + + let value: String = self + .jvm + .chain(&record) + .unwrap() + .invoke("value", &[]) + .unwrap() + .to_rust() + .unwrap(); + assert_eq!(expected_response.message, value); + + let key: Option = self + .jvm + .chain(&record) + .unwrap() + .invoke("key", &[]) + .unwrap() + .to_rust() + .unwrap(); + assert_eq!(expected_response.key, key.as_deref()); + } +} + +impl Drop for KafkaConsumerJava { + fn drop(&mut self) { + tokio::task::block_in_place(|| self.jvm.invoke(&self.consumer, "close", &[]).unwrap()); } } @@ -162,16 +340,11 @@ impl KafkaAdminJava { .java_list("org.apache.kafka.clients.admin.NewTopic", topics) .unwrap(); - self.jvm - .chain(&self.admin) - .unwrap() - .invoke("createTopics", &[topics.into()]) - .unwrap() - .invoke("all", &[]) - .unwrap() - .invoke("get", &[]) - .unwrap() - .collect(); + let result = self + .jvm + .invoke(&self.admin, "createTopics", &[topics.into()]) + .unwrap(); + self.jvm.invoke_async(&result, "all", &[]).await.unwrap(); } pub async fn delete_topics(&self, to_delete: &[&str]) { @@ -180,16 +353,11 @@ impl KafkaAdminJava { .java_list("java.lang.String", to_delete.to_vec()) .unwrap(); - self.jvm - .chain(&self.admin) - .unwrap() - .invoke("deleteTopics", &[topics.into()]) - .unwrap() - .invoke("all", &[]) - .unwrap() - .invoke("get", &[]) - .unwrap() - .collect(); + let result = self + .jvm + .invoke(&self.admin, "deleteTopics", &[topics.into()]) + .unwrap(); + self.jvm.invoke_async(&result, "all", &[]).await.unwrap(); } pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { @@ -218,16 +386,11 @@ impl KafkaAdminJava { ) .unwrap(); - self.jvm - .chain(&self.admin) - .unwrap() - .invoke("createPartitions", &[partitions.into()]) - .unwrap() - .invoke("all", &[]) - .unwrap() - .invoke("get", &[]) - .unwrap() - .collect(); + let result = self + .jvm + .invoke(&self.admin, "createPartitions", &[partitions.into()]) + .unwrap(); + self.jvm.invoke_async(&result, "all", &[]).await.unwrap(); } pub async fn describe_configs(&self, resources: &[ResourceSpecifier<'_>]) { @@ -256,16 +419,11 @@ impl KafkaAdminJava { .java_list("org.apache.kafka.common.config.ConfigResource", resources) .unwrap(); - self.jvm - .chain(&self.admin) - .unwrap() - .invoke("describeConfigs", &[resources.into()]) - .unwrap() - .invoke("all", &[]) - .unwrap() - .invoke("get", &[]) - .unwrap() - .collect(); + let result = self + .jvm + .invoke(&self.admin, "describeConfigs", &[resources.into()]) + .unwrap(); + self.jvm.invoke_async(&result, "all", &[]).await.unwrap(); } pub async fn alter_configs(&self, alter_configs: &[AlterConfig<'_>]) { @@ -318,16 +476,11 @@ impl KafkaAdminJava { let alter_configs = self.java_map(alter_configs); - self.jvm - .chain(&self.admin) - .unwrap() - .invoke("alterConfigs", &[alter_configs.into()]) - .unwrap() - .invoke("all", &[]) - .unwrap() - .invoke("get", &[]) - .unwrap() - .collect(); + let result = self + .jvm + .invoke(&self.admin, "alterConfigs", &[alter_configs.into()]) + .unwrap(); + self.jvm.invoke_async(&result, "all", &[]).await.unwrap(); } fn java_map(&self, key_values: Vec<(Instance, Instance)>) -> Instance { @@ -338,3 +491,22 @@ impl KafkaAdminJava { map } } + +struct JavaIterator(Instance); + +impl JavaIterator { + fn new(iterator_instance: Instance) -> Self { + JavaIterator(iterator_instance) + } + + fn next(&self, jvm: &Jvm) -> Option { + let has_next: bool = jvm + .to_rust(jvm.invoke(&self.0, "hasNext", &[]).unwrap()) + .unwrap(); + if has_next { + Some(jvm.invoke(&self.0, "next", &[]).unwrap()) + } else { + None + } + } +} diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index e270dff6e..bbce3e1dc 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -97,7 +97,7 @@ pub enum KafkaConsumer { } impl KafkaConsumer { - pub async fn assert_consume(&self, response: ExpectedResponse<'_>) { + pub async fn assert_consume(&mut self, response: ExpectedResponse<'_>) { match self { #[cfg(feature = "rdkafka-driver-tests")] Self::Cpp(cpp) => cpp.assert_consume(response).await,