From 4a7003f36793e1bfa4a2f2d4d796057f35d33e2f Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 4 Mar 2024 22:07:54 +1100 Subject: [PATCH] Add java kafka driver backend to kafka integration tests (#1509) --- .config/nextest.toml | 1 + .github/workflows/build_and_test.yaml | 9 +- Cargo.lock | 68 +++++ .../benches/windsock/kafka/bench.rs | 12 +- shotover-proxy/tests/kafka_int_tests/mod.rs | 95 ++++--- .../tests/kafka_int_tests/test_cases.rs | 132 +-------- test-helpers/Cargo.toml | 1 + test-helpers/src/connection/kafka.rs | 130 --------- test-helpers/src/connection/kafka/cpp.rs | 250 ++++++++++++++++++ test-helpers/src/connection/kafka/java.rs | 190 +++++++++++++ test-helpers/src/connection/kafka/mod.rs | 114 ++++++++ test-helpers/src/connection/mod.rs | 1 - 12 files changed, 692 insertions(+), 311 deletions(-) delete mode 100644 test-helpers/src/connection/kafka.rs create mode 100644 test-helpers/src/connection/kafka/cpp.rs create mode 100644 test-helpers/src/connection/kafka/java.rs create mode 100644 test-helpers/src/connection/kafka/mod.rs diff --git a/.config/nextest.toml b/.config/nextest.toml index 65265e222..02623f1a5 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -1,6 +1,7 @@ [profile.default] fail-fast = false slow-timeout = { period = '5m', terminate-after = 2 } +archive-include = ["debug/jassets", "release/jassets"] # Overwrites profile.default when the filter matches [[profile.default.overrides]] diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml index 04f2962e9..029bd8d0f 100644 --- a/.github/workflows/build_and_test.yaml +++ b/.github/workflows/build_and_test.yaml @@ -57,10 +57,11 @@ jobs: key: ubuntu-20.04-packages - name: Install ubuntu packages run: shotover-proxy/build/install_ubuntu_packages.sh - - name: Install nextest - uses: taiki-e/install-action@v2 - with: - tool: nextest@0.9.57 + #- name: Install nextest + # uses: taiki-e/install-action@v2 + # with: + # tool: nextest@0.9.57 + - run: cargo install --git https://github.com/rukai/nextest --branch archive-include-legacy cargo-nextest - name: Build tests run: | cargo test --doc ${{ matrix.cargo_flags }} --all-features -- --show-output --nocapture diff --git a/Cargo.lock b/Cargo.lock index 4568eff5e..391ee7697 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -907,6 +907,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cfg-if" version = "1.0.0" @@ -1604,6 +1610,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "dunce" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56ce8c6da7551ec6c462cbaf3bfbc75131ebbfa1c944aeaa9dab51ca1c5f0c3b" + [[package]] name = "dyn-clone" version = "1.0.17" @@ -1870,6 +1882,12 @@ dependencies = [ "urlencoding", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "futures" version = "0.3.30" @@ -2420,6 +2438,45 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +[[package]] +name = "j4rs" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e35b9135c58ac74c31eab9be04c9eb4665bbd819dc58ae7273c60e8dfba25b0" +dependencies = [ + "cesu8", + "dirs", + "dunce", + "fs_extra", + "futures", + "glob", + "java-locator", + "jni-sys", + "lazy_static", + "libc", + "libloading", + "log", + "serde", + "serde_json", + "sha2", +] + +[[package]] +name = "java-locator" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90003f2fd9c52f212c21d8520f1128da0080bad6fff16b68fe6e7f2f0c3780c2" +dependencies = [ + "glob", + "lazy_static", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "js-sys" version = "0.3.68" @@ -2464,6 +2521,16 @@ version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +[[package]] +name = "libloading" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c571b676ddfc9a8c12f1f3d3085a7b163966a8fd8098a90640953ce5f6170161" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "libm" version = "0.2.8" @@ -4681,6 +4748,7 @@ dependencies = [ "cdrs-tokio", "docker-compose-runner", "itertools 0.12.1", + "j4rs", "openssl", "ordered-float", "rcgen", diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 6ddfc3e03..951cbcb02 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -18,14 +18,14 @@ use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig; use shotover::transforms::TransformConfig; use std::sync::Arc; use std::{collections::HashMap, time::Duration}; -use test_helpers::connection::kafka::rdkafka::admin::{ +use test_helpers::connection::kafka::cpp::rdkafka::admin::{ AdminClient, AdminOptions, NewTopic, TopicReplication, }; -use test_helpers::connection::kafka::rdkafka::client::DefaultClientContext; -use test_helpers::connection::kafka::rdkafka::config::ClientConfig; -use test_helpers::connection::kafka::rdkafka::consumer::{Consumer, StreamConsumer}; -use test_helpers::connection::kafka::rdkafka::producer::{FutureProducer, FutureRecord}; -use test_helpers::connection::kafka::rdkafka::util::Timeout; +use test_helpers::connection::kafka::cpp::rdkafka::client::DefaultClientContext; +use test_helpers::connection::kafka::cpp::rdkafka::config::ClientConfig; +use test_helpers::connection::kafka::cpp::rdkafka::consumer::{Consumer, StreamConsumer}; +use test_helpers::connection::kafka::cpp::rdkafka::producer::{FutureProducer, FutureRecord}; +use test_helpers::connection::kafka::cpp::rdkafka::util::Timeout; use test_helpers::docker_compose::docker_compose; use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle, time::Instant}; use windsock::{Bench, BenchParameters, Profiling, Report}; diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 2611fa854..7ea394f94 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -1,26 +1,23 @@ -#[cfg(feature = "rdkafka-driver-tests")] +mod test_cases; + use crate::shotover_process; -#[cfg(feature = "rdkafka-driver-tests")] +use rstest::rstest; use std::time::Duration; -#[cfg(feature = "rdkafka-driver-tests")] +use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver}; use test_helpers::docker_compose::docker_compose; -#[cfg(feature = "rdkafka-driver-tests")] -mod test_cases; - -#[cfg(feature = "rdkafka-driver-tests")] -use test_helpers::connection::kafka::KafkaConnectionBuilder; - -#[cfg(feature = "rdkafka-driver-tests")] +#[rstest] +#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] #[tokio::test] -async fn passthrough_standard() { +async fn passthrough_standard(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml"); let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml") .start() .await; - let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::basic(connection_builder).await; tokio::time::timeout( @@ -31,9 +28,11 @@ async fn passthrough_standard() { .expect("Shotover did not shutdown within 10s"); } -#[cfg(feature = "rdkafka-driver-tests")] +#[rstest] +#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] #[tokio::test] -async fn passthrough_tls() { +async fn passthrough_tls(#[case] driver: KafkaDriver) { test_helpers::cert::generate_kafka_test_certs(); let _docker_compose = @@ -42,7 +41,7 @@ async fn passthrough_tls() { .start() .await; - let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::basic(connection_builder).await; tokio::time::timeout( @@ -53,9 +52,11 @@ async fn passthrough_tls() { .expect("Shotover did not shutdown within 10s"); } -#[cfg(feature = "rdkafka-driver-tests")] +#[rstest] +#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] #[tokio::test] -async fn cluster_tls() { +async fn cluster_tls(#[case] driver: KafkaDriver) { test_helpers::cert::generate_kafka_test_certs(); let _docker_compose = @@ -64,7 +65,7 @@ async fn cluster_tls() { .start() .await; - let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::basic(connection_builder).await; tokio::time::timeout( @@ -75,24 +76,28 @@ async fn cluster_tls() { .expect("Shotover did not shutdown within 10s"); } -#[cfg(feature = "rdkafka-driver-tests")] +#[rstest] +#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] #[tokio::test] -async fn passthrough_encode() { +async fn passthrough_encode(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml"); let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology-encode.yaml") .start() .await; - let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::basic(connection_builder).await; shotover.shutdown_and_then_consume_events(&[]).await; } -#[cfg(feature = "rdkafka-driver-tests")] +#[rstest] +#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] #[tokio::test] -async fn passthrough_sasl() { +async fn passthrough_sasl(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml"); let shotover = shotover_process("tests/test-configs/kafka/passthrough-sasl/topology.yaml") @@ -100,15 +105,17 @@ async fn passthrough_sasl() { .await; let connection_builder = - KafkaConnectionBuilder::new("127.0.0.1:9192").use_sasl("user", "password"); + KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password"); test_cases::basic(connection_builder).await; shotover.shutdown_and_then_consume_events(&[]).await; } -#[cfg(feature = "rdkafka-driver-tests")] +#[rstest] +#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] #[tokio::test] -async fn passthrough_sasl_encode() { +async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml"); let shotover = @@ -117,22 +124,24 @@ async fn passthrough_sasl_encode() { .await; let connection_builder = - KafkaConnectionBuilder::new("127.0.0.1:9192").use_sasl("user", "password"); + KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password"); test_cases::basic(connection_builder).await; shotover.shutdown_and_then_consume_events(&[]).await; } -#[cfg(feature = "rdkafka-driver-tests")] +#[rstest] +#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] #[tokio::test] -async fn cluster_1_rack_single_shotover() { +async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml"); let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml") .start() .await; - let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::basic(connection_builder).await; tokio::time::timeout( @@ -143,9 +152,11 @@ async fn cluster_1_rack_single_shotover() { .expect("Shotover did not shutdown within 10s"); } -#[cfg(feature = "rdkafka-driver-tests")] -#[tokio::test] -async fn cluster_1_rack_multi_shotover() { +#[rstest] +#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] +async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml"); let mut shotovers = vec![]; @@ -163,7 +174,7 @@ async fn cluster_1_rack_multi_shotover() { ); } - let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::basic(connection_builder).await; for shotover in shotovers { @@ -176,9 +187,11 @@ async fn cluster_1_rack_multi_shotover() { } } -#[cfg(feature = "rdkafka-driver-tests")] +#[rstest] +#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] #[tokio::test] -async fn cluster_2_racks_single_shotover() { +async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml"); let shotover = @@ -186,7 +199,7 @@ async fn cluster_2_racks_single_shotover() { .start() .await; - let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::basic(connection_builder).await; tokio::time::timeout( @@ -197,9 +210,11 @@ async fn cluster_2_racks_single_shotover() { .expect("Shotover did not shutdown within 10s"); } -#[cfg(feature = "rdkafka-driver-tests")] +#[rstest] +#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] #[tokio::test] -async fn cluster_2_racks_multi_shotover() { +async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml"); @@ -219,7 +234,7 @@ async fn cluster_2_racks_multi_shotover() { ); } - let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192"); + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::basic(connection_builder).await; for shotover in shotovers { diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 82b90eca3..93f77ddeb 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,133 +1,5 @@ -use std::time::Duration; -use test_helpers::connection::kafka::rdkafka::admin::{ - AdminOptions, AlterConfig, NewPartitions, NewTopic, OwnedResourceSpecifier, ResourceSpecifier, - TopicReplication, -}; -use test_helpers::connection::kafka::rdkafka::types::RDKafkaErrorCode; -use test_helpers::connection::kafka::rdkafka::util::Timeout; use test_helpers::connection::kafka::{ExpectedResponse, KafkaConnectionBuilder, Record}; -async fn admin(connection_builder: &KafkaConnectionBuilder) { - let admin = connection_builder.connect_admin().await; - admin - .create_topics( - &[ - NewTopic { - name: "partitions1", - num_partitions: 1, - replication: TopicReplication::Fixed(1), - config: vec![], - }, - NewTopic { - name: "paritions3", - num_partitions: 3, - replication: TopicReplication::Fixed(1), - config: vec![], - }, - NewTopic { - name: "acks0", - num_partitions: 1, - replication: TopicReplication::Fixed(1), - config: vec![], - }, - NewTopic { - name: "to_delete", - num_partitions: 1, - replication: TopicReplication::Fixed(1), - config: vec![], - }, - ], - &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - - let results = admin - .create_partitions( - &[NewPartitions { - // TODO: modify topic "foo" instead so that we can test our handling of that with interesting partiton + replication count - topic_name: "to_delete", - new_partition_count: 2, - assignment: None, - }], - &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - for result in results { - let result = result.unwrap(); - assert_eq!(result, "to_delete") - } - - let results = admin - .describe_configs( - // TODO: test ResourceSpecifier::Broker and ResourceSpecifier::Group as well. - // Will need to find a way to get a valid broker id and to create a group. - &[ResourceSpecifier::Topic("to_delete")], - &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - for result in results { - let result = result.unwrap(); - assert_eq!( - result.specifier, - OwnedResourceSpecifier::Topic("to_delete".to_owned()) - ); - } - - let results = admin - .alter_configs( - &[AlterConfig { - specifier: ResourceSpecifier::Topic("to_delete"), - entries: [("foo", "bar")].into(), - }], - &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - for result in results { - assert_eq!( - result.unwrap(), - OwnedResourceSpecifier::Topic("to_delete".to_owned()) - ); - } - - let results = admin - .delete_topics( - &["to_delete"], - &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - for result in results { - assert_eq!(result.unwrap(), "to_delete"); - } -} - -async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) { - let admin = connection_builder.connect_admin().await; - let results = admin - // The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist. - // So just make sure to run it against a group that does exist. - .delete_groups( - &["some_group"], - &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), - ) - .await - .unwrap(); - for result in results { - match result { - Ok(result) => assert_eq!(result, "some_group"), - Err(err) => assert_eq!( - err, - // Allow this error which can occur due to race condition in the test, but do not allow any other error types - ("some_group".to_owned(), RDKafkaErrorCode::NonEmptyGroup) - ), - } - } -} - async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name: &str, i: i64) { let producer = connection_builder.connect_producer(1).await; @@ -204,11 +76,11 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { } pub async fn basic(connection_builder: KafkaConnectionBuilder) { - admin(&connection_builder).await; + connection_builder.admin_setup().await; for i in 0..2 { produce_consume(&connection_builder, "partitions1", i).await; produce_consume(&connection_builder, "partitions3", i).await; produce_consume_acks0(&connection_builder).await; } - admin_cleanup(&connection_builder).await; + connection_builder.admin_cleanup().await; } diff --git a/test-helpers/Cargo.toml b/test-helpers/Cargo.toml index 80c9e9c3f..4f9463a1b 100644 --- a/test-helpers/Cargo.toml +++ b/test-helpers/Cargo.toml @@ -33,3 +33,4 @@ anyhow.workspace = true rcgen.workspace = true rdkafka = { version = "0.36", features = ["cmake-build"], optional = true } docker-compose-runner = "0.3.0" +j4rs = "0.17.2" diff --git a/test-helpers/src/connection/kafka.rs b/test-helpers/src/connection/kafka.rs deleted file mode 100644 index 53e75e3c9..000000000 --- a/test-helpers/src/connection/kafka.rs +++ /dev/null @@ -1,130 +0,0 @@ -// Allow direct usage of the APIs when the feature is enabled -pub use rdkafka; - -use rdkafka::admin::AdminClient; -use rdkafka::client::DefaultClientContext; -use rdkafka::config::ClientConfig; -use rdkafka::consumer::{Consumer, StreamConsumer}; -use rdkafka::producer::{FutureProducer, FutureRecord}; - -use rdkafka::Message; -use std::time::Duration; - -pub struct KafkaConnectionBuilder { - client: ClientConfig, -} - -impl KafkaConnectionBuilder { - pub fn new(address: &str) -> Self { - let mut client = ClientConfig::new(); - client - .set("bootstrap.servers", address) - // internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber - .set("debug", "all"); - KafkaConnectionBuilder { client } - } - - pub fn use_sasl(mut self, user: &str, pass: &str) -> Self { - self.client.set("sasl.mechanisms", "PLAIN"); - self.client.set("sasl.username", user); - self.client.set("sasl.password", pass); - self.client.set("security.protocol", "SASL_PLAINTEXT"); - self - } - - pub async fn connect_producer(&self, acks: i32) -> KafkaProducer { - KafkaProducer { - producer: self - .client - .clone() - .set("message.timeout.ms", "5000") - .set("acks", &acks.to_string()) - .create() - .unwrap(), - } - } - - pub async fn connect_consumer(&self, topic_name: &str) -> KafkaConsumer { - let consumer: StreamConsumer = self - .client - .clone() - .set("group.id", "some_group") - .set("session.timeout.ms", "6000") - .set("auto.offset.reset", "earliest") - .set("enable.auto.commit", "false") - .create() - .unwrap(); - consumer.subscribe(&[topic_name]).unwrap(); - KafkaConsumer { consumer } - } - - pub async fn connect_admin(&self) -> AdminClient { - self.client.create().unwrap() - } -} - -pub struct KafkaProducer { - producer: FutureProducer, -} - -impl KafkaProducer { - pub async fn assert_produce(&self, record: Record<'_>, expected_offset: Option) { - let send = match record.key { - Some(key) => self - .producer - .send_result( - FutureRecord::to(record.topic_name) - .payload(record.payload) - .key(key), - ) - .unwrap(), - None => self - .producer - .send_result(FutureRecord::<(), _>::to(record.topic_name).payload(record.payload)) - .unwrap(), - }; - let delivery_status = tokio::time::timeout(Duration::from_secs(30), send) - .await - .expect("Timeout while receiving from producer") - .unwrap() - .unwrap(); - - if let Some(offset) = expected_offset { - assert_eq!(delivery_status.1, offset, "Unexpected offset"); - } - } -} - -pub struct Record<'a> { - pub payload: &'a str, - pub topic_name: &'a str, - pub key: Option<&'a str>, -} - -pub struct KafkaConsumer { - consumer: StreamConsumer, -} - -impl KafkaConsumer { - pub async fn assert_consume(&self, response: ExpectedResponse<'_>) { - let message = tokio::time::timeout(Duration::from_secs(30), self.consumer.recv()) - .await - .expect("Timeout while receiving from consumer") - .unwrap(); - let contents = message.payload_view::().unwrap().unwrap(); - assert_eq!(response.message, contents); - assert_eq!( - response.key, - message.key().map(|x| std::str::from_utf8(x).unwrap()) - ); - assert_eq!(response.topic_name, message.topic()); - assert_eq!(response.offset, message.offset()); - } -} - -pub struct ExpectedResponse<'a> { - pub message: &'a str, - pub key: Option<&'a str>, - pub topic_name: &'a str, - pub offset: i64, -} diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs new file mode 100644 index 000000000..38da3b3d9 --- /dev/null +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -0,0 +1,250 @@ +// Allow direct usage of the APIs when the feature is enabled +pub use rdkafka; + +use super::{ExpectedResponse, Record}; +use rdkafka::admin::AdminClient; +use rdkafka::admin::{ + AdminOptions, AlterConfig, NewPartitions, NewTopic, OwnedResourceSpecifier, ResourceSpecifier, + TopicReplication, +}; +use rdkafka::client::DefaultClientContext; +use rdkafka::config::ClientConfig; +use rdkafka::consumer::{Consumer, StreamConsumer}; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::types::RDKafkaErrorCode; +use rdkafka::util::Timeout; +use rdkafka::Message; +use std::time::Duration; + +pub struct KafkaConnectionBuilderCpp { + client: ClientConfig, +} + +impl KafkaConnectionBuilderCpp { + pub fn new(address: &str) -> Self { + let mut client = ClientConfig::new(); + client + .set("bootstrap.servers", address) + // internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber + .set("debug", "all"); + KafkaConnectionBuilderCpp { client } + } + + pub fn use_sasl(mut self, user: &str, pass: &str) -> Self { + self.client.set("sasl.mechanisms", "PLAIN"); + self.client.set("sasl.username", user); + self.client.set("sasl.password", pass); + self.client.set("security.protocol", "SASL_PLAINTEXT"); + self + } + + pub async fn connect_producer(&self, acks: i32) -> KafkaProducerCpp { + KafkaProducerCpp { + producer: self + .client + .clone() + .set("message.timeout.ms", "5000") + .set("acks", &acks.to_string()) + .create() + .unwrap(), + } + } + + pub async fn connect_consumer(&self, topic_name: &str) -> KafkaConsumerCpp { + let consumer: StreamConsumer = self + .client + .clone() + .set("group.id", "some_group") + .set("session.timeout.ms", "6000") + .set("auto.offset.reset", "earliest") + .set("enable.auto.commit", "false") + .create() + .unwrap(); + consumer.subscribe(&[topic_name]).unwrap(); + KafkaConsumerCpp { consumer } + } + + pub async fn connect_admin(&self) -> AdminClient { + self.client.create().unwrap() + } + + pub async fn admin_setup(&self) { + let admin = self.connect_admin().await; + admin + .create_topics( + &[ + NewTopic { + name: "partitions1", + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }, + NewTopic { + name: "paritions3", + num_partitions: 3, + replication: TopicReplication::Fixed(1), + config: vec![], + }, + NewTopic { + name: "acks0", + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }, + NewTopic { + name: "to_delete", + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }, + ], + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + + let results = admin + .create_partitions( + &[NewPartitions { + // TODO: modify topic "foo" instead so that we can test our handling of that with interesting partiton + replication count + topic_name: "to_delete", + new_partition_count: 2, + assignment: None, + }], + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + for result in results { + let result = result.unwrap(); + assert_eq!(result, "to_delete") + } + + let results = admin + .describe_configs( + // TODO: test ResourceSpecifier::Broker and ResourceSpecifier::Group as well. + // Will need to find a way to get a valid broker id and to create a group. + &[ResourceSpecifier::Topic("to_delete")], + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + for result in results { + let result = result.unwrap(); + assert_eq!( + result.specifier, + OwnedResourceSpecifier::Topic("to_delete".to_owned()) + ); + } + + let results = admin + .alter_configs( + &[AlterConfig { + specifier: ResourceSpecifier::Topic("to_delete"), + entries: [("foo", "bar")].into(), + }], + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + for result in results { + assert_eq!( + result.unwrap(), + OwnedResourceSpecifier::Topic("to_delete".to_owned()) + ); + } + + let results = admin + .delete_topics( + &["to_delete"], + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + for result in results { + assert_eq!(result.unwrap(), "to_delete"); + } + } + + pub async fn admin_cleanup(&self) { + let admin = self.connect_admin().await; + let results = admin + // The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist. + // So just make sure to run it against a group that does exist. + .delete_groups( + &["some_group"], + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + for result in results { + match result { + Ok(result) => assert_eq!(result, "some_group"), + Err(err) => assert_eq!( + err, + // Allow this error which can occur due to race condition in the test, but do not allow any other error types + ("some_group".to_owned(), RDKafkaErrorCode::NonEmptyGroup) + ), + } + } + } +} + +pub struct KafkaProducerCpp { + producer: FutureProducer, +} + +impl KafkaProducerCpp { + pub async fn assert_produce(&self, record: Record<'_>, expected_offset: Option) { + let send = match record.key { + Some(key) => self + .producer + .send_result( + FutureRecord::to(record.topic_name) + .payload(record.payload) + .key(key), + ) + .unwrap(), + None => self + .producer + .send_result(FutureRecord::<(), _>::to(record.topic_name).payload(record.payload)) + .unwrap(), + }; + let delivery_status = tokio::time::timeout(Duration::from_secs(30), send) + .await + .expect("Timeout while receiving from producer") + .unwrap() + .unwrap(); + + if let Some(offset) = expected_offset { + assert_eq!(delivery_status.1, offset, "Unexpected offset"); + } + } +} + +pub struct KafkaConsumerCpp { + consumer: StreamConsumer, +} + +impl KafkaConsumerCpp { + pub async fn assert_consume(&self, response: ExpectedResponse<'_>) { + let message = tokio::time::timeout(Duration::from_secs(30), self.consumer.recv()) + .await + .expect("Timeout while receiving from consumer") + .unwrap(); + let contents = message.payload_view::().unwrap().unwrap(); + assert_eq!(response.message, contents); + assert_eq!( + response.key, + message.key().map(|x| std::str::from_utf8(x).unwrap()) + ); + assert_eq!(response.topic_name, message.topic()); + assert_eq!(response.offset, message.offset()); + } +} diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs new file mode 100644 index 000000000..9e7535790 --- /dev/null +++ b/test-helpers/src/connection/kafka/java.rs @@ -0,0 +1,190 @@ +use super::{ExpectedResponse, Record}; +use j4rs::{Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact}; +use std::{collections::HashMap, rc::Rc}; + +fn properties(jvm: &Jvm, props: &HashMap) -> Instance { + let properties = jvm.create_instance("java.util.Properties", &[]).unwrap(); + for (key, value) in props.iter() { + jvm.invoke( + &properties, + "setProperty", + &[ + InvocationArg::try_from(key).unwrap(), + InvocationArg::try_from(value).unwrap(), + ], + ) + .unwrap(); + } + properties +} + +pub struct KafkaConnectionBuilderJava { + jvm: Rc, + base_config: HashMap, +} + +impl KafkaConnectionBuilderJava { + pub fn new(address: &str) -> Self { + let jvm = Rc::new(JvmBuilder::new().build().unwrap()); + + // specify maven dep for kafka-clients and all of its dependencies since j4rs does not support dependency resolution + // The list of dependencies can be found here: https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.7.0/kafka-clients-3.7.0.pom6 + // These are deployed to and loaded from a path like target/debug/jassets + jvm.deploy_artifact(&MavenArtifact::from("org.apache.kafka:kafka-clients:3.7.0")) + .unwrap(); + jvm.deploy_artifact(&MavenArtifact::from("org.slf4j:slf4j-api:1.7.36")) + .unwrap(); + jvm.deploy_artifact(&MavenArtifact::from("org.slf4j:slf4j-simple:1.7.36")) + .unwrap(); + + let base_config = HashMap::from([("bootstrap.servers".to_owned(), address.to_owned())]); + KafkaConnectionBuilderJava { jvm, base_config } + } + + pub fn use_sasl(self, _user: &str, _pass: &str) -> Self { + tracing::error!("Unimplemented test case"); + self + } + + pub async fn connect_producer(&self, acks: i32) -> KafkaProducerJava { + let mut config = self.base_config.clone(); + config.insert("acks".to_owned(), acks.to_string()); + config.insert( + "key.serializer".to_owned(), + "org.apache.kafka.common.serialization.StringSerializer".to_owned(), + ); + config.insert( + "value.serializer".to_owned(), + "org.apache.kafka.common.serialization.StringSerializer".to_owned(), + ); + + let properties = properties(&self.jvm, &config); + let producer = self + .jvm + .create_instance( + "org.apache.kafka.clients.producer.KafkaProducer", + &[properties.into()], + ) + .unwrap(); + KafkaProducerJava { + _producer: producer, + } + } + + pub async fn connect_consumer(&self, _topic_name: &str) -> KafkaConsumerJava { + KafkaConsumerJava {} + } + + pub async fn connect_admin(&self) -> Instance { + let properties = properties(&self.jvm, &self.base_config); + self.jvm + .invoke_static( + "org.apache.kafka.clients.admin.Admin", + "create", + &[properties.into()], + ) + .unwrap() + } + + pub async fn admin_setup(&self) { + let admin = self.connect_admin().await; + create_topics( + &self.jvm, + &admin, + &[ + NewTopic { + name: "partitions1", + num_partitions: 1, + replication_factor: 1, + }, + NewTopic { + name: "paritions3", + num_partitions: 3, + replication_factor: 1, + }, + NewTopic { + name: "acks0", + num_partitions: 1, + replication_factor: 1, + }, + NewTopic { + name: "to_delete", + num_partitions: 1, + replication_factor: 1, + }, + ], + ) + .await; + } + + pub async fn admin_cleanup(&self) { + self.connect_admin().await; + } +} + +struct NewTopic<'a> { + name: &'a str, + num_partitions: i32, + replication_factor: i16, +} + +async fn create_topics(jvm: &Jvm, admin_client: &Instance, topics: &[NewTopic<'_>]) { + let topics: Vec<_> = topics + .iter() + .map(|topic| { + jvm.create_instance( + "org.apache.kafka.clients.admin.NewTopic", + &[ + topic.name.try_into().unwrap(), + jvm.invoke_static( + "java.util.Optional", + "of", + &[InvocationArg::try_from(topic.num_partitions).unwrap()], + ) + .unwrap() + .into(), + jvm.invoke_static( + "java.util.Optional", + "of", + &[InvocationArg::try_from(topic.replication_factor).unwrap()], + ) + .unwrap() + .into(), + // TODO: can simplify to this once https://github.com/astonbitecode/j4rs/issues/91 is resolved + // InvocationArg::try_from(topic.num_partitions) + // .unwrap() + // .into_primitive() + // .unwrap(), + // InvocationArg::try_from(topic.replication_factor) + // .unwrap() + // .into_primitive() + // .unwrap(), + ], + ) + }) + .collect(); + let topics = jvm + .java_list("org.apache.kafka.clients.admin.NewTopic", topics) + .unwrap(); + + jvm.invoke(admin_client, "createTopics", &[topics.into()]) + .unwrap(); +} + +pub struct KafkaProducerJava { + _producer: Instance, +} + +impl KafkaProducerJava { + pub async fn assert_produce(&self, _record: Record<'_>, _expected_offset: Option) { + tracing::error!("Unimplemented assert"); + } +} + +pub struct KafkaConsumerJava {} + +impl KafkaConsumerJava { + pub async fn assert_consume(&self, _response: ExpectedResponse<'_>) { + tracing::error!("Unimplemented assert"); + } +} diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs new file mode 100644 index 000000000..1ccc19338 --- /dev/null +++ b/test-helpers/src/connection/kafka/mod.rs @@ -0,0 +1,114 @@ +#[cfg(feature = "rdkafka-driver-tests")] +pub mod cpp; +pub mod java; + +#[cfg(feature = "rdkafka-driver-tests")] +use cpp::*; +use java::*; + +pub enum KafkaDriver { + #[cfg(feature = "rdkafka-driver-tests")] + Cpp, + Java, +} + +pub enum KafkaConnectionBuilder { + #[cfg(feature = "rdkafka-driver-tests")] + Cpp(KafkaConnectionBuilderCpp), + Java(KafkaConnectionBuilderJava), +} + +impl KafkaConnectionBuilder { + pub fn new(driver: KafkaDriver, address: &str) -> Self { + match driver { + #[cfg(feature = "rdkafka-driver-tests")] + KafkaDriver::Cpp => Self::Cpp(KafkaConnectionBuilderCpp::new(address)), + KafkaDriver::Java => Self::Java(KafkaConnectionBuilderJava::new(address)), + } + } + + pub fn use_sasl(self, user: &str, pass: &str) -> Self { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + Self::Cpp(cpp) => Self::Cpp(cpp.use_sasl(user, pass)), + Self::Java(java) => Self::Java(java.use_sasl(user, pass)), + } + } + + pub async fn connect_producer(&self, acks: i32) -> KafkaProducer { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + Self::Cpp(cpp) => KafkaProducer::Cpp(cpp.connect_producer(acks).await), + Self::Java(java) => KafkaProducer::Java(java.connect_producer(acks).await), + } + } + + pub async fn connect_consumer(&self, topic_name: &str) -> KafkaConsumer { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + Self::Cpp(cpp) => KafkaConsumer::Cpp(cpp.connect_consumer(topic_name).await), + Self::Java(java) => KafkaConsumer::Java(java.connect_consumer(topic_name).await), + } + } + + pub async fn admin_setup(&self) { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + Self::Cpp(cpp) => cpp.admin_setup().await, + Self::Java(java) => java.admin_setup().await, + } + } + + pub async fn admin_cleanup(&self) { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + Self::Cpp(cpp) => cpp.admin_cleanup().await, + Self::Java(java) => java.admin_cleanup().await, + } + } +} + +pub enum KafkaProducer { + #[cfg(feature = "rdkafka-driver-tests")] + Cpp(KafkaProducerCpp), + Java(KafkaProducerJava), +} + +impl KafkaProducer { + pub async fn assert_produce(&self, record: Record<'_>, expected_offset: Option) { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + Self::Cpp(cpp) => cpp.assert_produce(record, expected_offset).await, + Self::Java(java) => java.assert_produce(record, expected_offset).await, + } + } +} + +pub struct Record<'a> { + pub payload: &'a str, + pub topic_name: &'a str, + pub key: Option<&'a str>, +} + +pub enum KafkaConsumer { + #[cfg(feature = "rdkafka-driver-tests")] + Cpp(KafkaConsumerCpp), + Java(KafkaConsumerJava), +} + +impl KafkaConsumer { + pub async fn assert_consume(&self, response: ExpectedResponse<'_>) { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + Self::Cpp(cpp) => cpp.assert_consume(response).await, + Self::Java(java) => java.assert_consume(response).await, + } + } +} + +pub struct ExpectedResponse<'a> { + pub message: &'a str, + pub key: Option<&'a str>, + pub topic_name: &'a str, + pub offset: i64, +} diff --git a/test-helpers/src/connection/mod.rs b/test-helpers/src/connection/mod.rs index 3db4295fd..801620233 100644 --- a/test-helpers/src/connection/mod.rs +++ b/test-helpers/src/connection/mod.rs @@ -1,6 +1,5 @@ pub mod cassandra; -#[cfg(feature = "rdkafka-driver-tests")] pub mod kafka; // redis_connection is named differently to the cassandra module because it contains raw functions instead of a struct with methods pub mod redis_connection;