Skip to content

Commit

Permalink
Add java kafka driver backend to kafka integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Feb 29, 2024
1 parent 384dc38 commit d599bdd
Show file tree
Hide file tree
Showing 10 changed files with 673 additions and 291 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/build_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ jobs:
key: ubuntu-20.04-packages
- name: Install ubuntu packages
run: shotover-proxy/build/install_ubuntu_packages.sh
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'adopt'
- name: Install nextest
uses: taiki-e/install-action@v2
with:
Expand Down
68 changes: 68 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 55 additions & 30 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
#[cfg(feature = "rdkafka-driver-tests")]
mod test_cases;

use crate::shotover_process;
#[cfg(feature = "rdkafka-driver-tests")]
use rstest::rstest;
use std::time::Duration;
#[cfg(feature = "rdkafka-driver-tests")]
use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver};
use test_helpers::docker_compose::docker_compose;

#[rstest]
#[cfg(feature = "rdkafka-driver-tests")]
mod test_cases;

#[cfg(feature = "rdkafka-driver-tests")]
use test_helpers::connection::kafka::KafkaConnectionBuilder;

#[cfg(feature = "rdkafka-driver-tests")]
#[case::cpp(KafkaDriver::Cpp)]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn passthrough_standard() {
async fn passthrough_standard(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Expand All @@ -31,9 +29,12 @@ async fn passthrough_standard() {
.expect("Shotover did not shutdown within 10s");
}

#[rstest]
#[cfg(feature = "rdkafka-driver-tests")]
#[case::cpp(KafkaDriver::Cpp)]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn passthrough_tls() {
async fn passthrough_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
Expand All @@ -42,7 +43,7 @@ async fn passthrough_tls() {
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Expand All @@ -53,9 +54,12 @@ async fn passthrough_tls() {
.expect("Shotover did not shutdown within 10s");
}

#[rstest]
#[cfg(feature = "rdkafka-driver-tests")]
#[case::cpp(KafkaDriver::Cpp)]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_tls() {
async fn cluster_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
Expand All @@ -64,7 +68,7 @@ async fn cluster_tls() {
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Expand All @@ -75,40 +79,49 @@ async fn cluster_tls() {
.expect("Shotover did not shutdown within 10s");
}

#[rstest]
#[cfg(feature = "rdkafka-driver-tests")]
#[case::cpp(KafkaDriver::Cpp)]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn passthrough_encode() {
async fn passthrough_encode(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology-encode.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[rstest]
#[cfg(feature = "rdkafka-driver-tests")]
#[case::cpp(KafkaDriver::Cpp)]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn passthrough_sasl() {
async fn passthrough_sasl(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough-sasl/topology.yaml")
.start()
.await;

let connection_builder =
KafkaConnectionBuilder::new("127.0.0.1:9192").use_sasl("user", "password");
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[rstest]
#[cfg(feature = "rdkafka-driver-tests")]
#[case::cpp(KafkaDriver::Cpp)]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn passthrough_sasl_encode() {
async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
let shotover =
Expand All @@ -117,22 +130,25 @@ async fn passthrough_sasl_encode() {
.await;

let connection_builder =
KafkaConnectionBuilder::new("127.0.0.1:9192").use_sasl("user", "password");
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[rstest]
#[cfg(feature = "rdkafka-driver-tests")]
#[case::cpp(KafkaDriver::Cpp)]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_1_rack_single_shotover() {
async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Expand All @@ -143,9 +159,12 @@ async fn cluster_1_rack_single_shotover() {
.expect("Shotover did not shutdown within 10s");
}

#[rstest]
#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn cluster_1_rack_multi_shotover() {
#[case::cpp(KafkaDriver::Cpp)]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")]
async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
let mut shotovers = vec![];
Expand All @@ -163,7 +182,7 @@ async fn cluster_1_rack_multi_shotover() {
);
}

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

for shotover in shotovers {
Expand All @@ -176,17 +195,20 @@ async fn cluster_1_rack_multi_shotover() {
}
}

#[rstest]
#[cfg(feature = "rdkafka-driver-tests")]
#[case::cpp(KafkaDriver::Cpp)]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_2_racks_single_shotover() {
async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");
let shotover =
shotover_process("tests/test-configs/kafka/cluster-2-racks/topology-single.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Expand All @@ -197,9 +219,12 @@ async fn cluster_2_racks_single_shotover() {
.expect("Shotover did not shutdown within 10s");
}

#[rstest]
#[cfg(feature = "rdkafka-driver-tests")]
#[case::cpp(KafkaDriver::Cpp)]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_2_racks_multi_shotover() {
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");

Expand All @@ -219,7 +244,7 @@ async fn cluster_2_racks_multi_shotover() {
);
}

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

for shotover in shotovers {
Expand Down
Loading

0 comments on commit d599bdd

Please sign in to comment.