Skip to content

Commit

Permalink
Implement java producer + consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 12, 2024
1 parent 1ba1c48 commit f95fa6e
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 79 deletions.
33 changes: 19 additions & 14 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use test_helpers::docker_compose::docker_compose;
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_standard(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
Expand All @@ -31,7 +31,7 @@ async fn passthrough_standard(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

Expand All @@ -52,10 +52,11 @@ async fn passthrough_tls(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

Expand All @@ -80,7 +81,7 @@ async fn cluster_tls(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_encode(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
Expand All @@ -97,7 +98,7 @@ async fn passthrough_encode(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_sasl(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
Expand All @@ -116,7 +117,7 @@ async fn passthrough_sasl(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
Expand All @@ -132,10 +133,11 @@ async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
Expand All @@ -154,9 +156,10 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")]
async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
Expand Down Expand Up @@ -189,10 +192,11 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
}
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");
Expand All @@ -212,10 +216,11 @@ async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
//#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name
)
.await;

let consumer = connection_builder.connect_consumer(topic_name).await;
let mut consumer = connection_builder.connect_consumer(topic_name).await;

consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -118,7 +118,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
.await;
}

let consumer = connection_builder.connect_consumer(topic_name).await;
let mut consumer = connection_builder.connect_consumer(topic_name).await;

for j in 0..10 {
consumer
Expand Down
Loading

0 comments on commit f95fa6e

Please sign in to comment.