Skip to content

Commit

Permalink
Implement java producer + consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 12, 2024
1 parent 1ba1c48 commit 9948852
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 79 deletions.
28 changes: 14 additions & 14 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use test_helpers::docker_compose::docker_compose;
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_standard(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
Expand All @@ -31,7 +31,7 @@ async fn passthrough_standard(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

Expand All @@ -54,8 +54,8 @@ async fn passthrough_tls(#[case] driver: KafkaDriver) {

#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

Expand All @@ -80,7 +80,7 @@ async fn cluster_tls(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_encode(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
Expand All @@ -97,7 +97,7 @@ async fn passthrough_encode(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_sasl(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
Expand All @@ -116,7 +116,7 @@ async fn passthrough_sasl(#[case] driver: KafkaDriver) {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
Expand All @@ -134,8 +134,8 @@ async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) {

#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
Expand All @@ -156,7 +156,7 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {

#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")]
async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
Expand Down Expand Up @@ -191,8 +191,8 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {

#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");
Expand All @@ -214,8 +214,8 @@ async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) {

#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
//#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name
)
.await;

let consumer = connection_builder.connect_consumer(topic_name).await;
let mut consumer = connection_builder.connect_consumer(topic_name).await;

consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -118,7 +118,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
.await;
}

let consumer = connection_builder.connect_consumer(topic_name).await;
let mut consumer = connection_builder.connect_consumer(topic_name).await;

for j in 0..10 {
consumer
Expand Down
Loading

0 comments on commit 9948852

Please sign in to comment.