From e31a2b946820a030aff08be9569cd6987a27a27a Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 11 Sep 2024 15:34:50 +1000 Subject: [PATCH] KafkaSinkCluster - add multi_partition integration test case --- .../tests/kafka_int_tests/test_cases.rs | 88 ++++++++++++++++--- test-helpers/src/connection/kafka/cpp.rs | 2 +- test-helpers/src/connection/kafka/java.rs | 2 +- test-helpers/src/connection/kafka/mod.rs | 2 +- 4 files changed, 80 insertions(+), 14 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 700fc78fc..d5f286716 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,3 +1,4 @@ +use futures::{stream::FuturesUnordered, StreamExt}; use std::collections::HashMap; use test_helpers::{ connection::kafka::{ @@ -47,6 +48,11 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { num_partitions: 3, replication_factor: 1, }, + NewTopic { + name: "multi_partitions_batch", + num_partitions: 3, + replication_factor: 1, + }, ]) .await; @@ -95,7 +101,7 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect Record { payload: "initial2", topic_name: "batch_test_partitions_3", - key: Some("foo"), + key: Some("foo".into()), }, Some(0), ), @@ -156,7 +162,7 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect Record { payload: "Message2", topic_name: "batch_test_partitions_3", - key: Some("foo"), + key: Some("foo".into()), }, None, ), @@ -193,6 +199,65 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect } } +/// Attempt to make the driver batch produce requests for different partitions of the same topic into the same request +/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests. +pub async fn produce_consume_multi_partition_batch(connection_builder: &KafkaConnectionBuilder) { + // set linger to 100ms to ensure that the concurrent produce requests are combined into a single batched request. + let producer = connection_builder.connect_producer("all", 100).await; + // create an initial record to force kafka to create the topic if it doesnt yet exist + producer + .assert_produce( + Record { + payload: "initial2", + topic_name: "multi_partitions_batch", + key: Some("foo".into()), + }, + Some(0), + ) + .await; + + let mut consumer = connection_builder + .connect_consumer("multi_partitions_batch", "multi_partitions_batch_group") + .await; + + consumer + .assert_consume(ExpectedResponse { + message: "initial2".to_owned(), + // ensure we route to the same partition every time, so we can assert on the offset when consuming. + key: Some("foo".to_owned()), + topic_name: "multi_partitions_batch".to_owned(), + offset: Some(0), + }) + .await; + + // create and consume records + let mut futures = FuturesUnordered::new(); + for i in 0..2000 { + futures.push(producer.assert_produce( + Record { + payload: "Message", + topic_name: "multi_partitions_batch", + key: Some(format!("Key{i}")), + }, + None, + )) + } + while let Some(()) = futures.next().await {} + + // TODO: Would be good to assert this, but first we would need to allow producing to be properly ordered by adding a `produce` method that returns a future. + // So its disabled for now. + // for i in 0..2000 { + // consumer + // .assert_consume(ExpectedResponse { + // message: "Message".to_owned(), + // key: Some(format!("Key{i}")), + // topic_name: "batch_test_partitions_3".to_owned(), + // offset: Some(i + 1), + // }) + // .await; + // } +} + pub async fn produce_consume_partitions1( connection_builder: &KafkaConnectionBuilder, topic_name: &str, @@ -205,7 +270,7 @@ pub async fn produce_consume_partitions1( Record { payload: "initial", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(0), ) @@ -230,7 +295,7 @@ pub async fn produce_consume_partitions1( Record { payload: "Message1", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(i * 2 + 1), ) @@ -335,7 +400,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down( Record { payload: "initial", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(0), ) @@ -362,7 +427,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down( Record { payload: "Message1", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(i * 2 + 1), ) @@ -442,7 +507,7 @@ pub async fn produce_consume_commit_offsets_partitions1( Record { payload: "Initial", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(0), ) @@ -475,7 +540,7 @@ pub async fn produce_consume_commit_offsets_partitions1( Record { payload: "Message1", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(1), ) @@ -504,7 +569,7 @@ pub async fn produce_consume_commit_offsets_partitions1( Record { payload: "Message2", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(2), ) @@ -572,7 +637,7 @@ async fn produce_consume_partitions3( Record { payload: "Message1", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, // We cant predict the offsets since that will depend on which partition the keyless record ends up in None, @@ -618,7 +683,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { Record { payload: "MessageAcks0", topic_name, - key: Some("KeyAcks0"), + key: Some("KeyAcks0".into()), }, None, ) @@ -648,6 +713,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await; produce_consume_partitions3(connection_builder, "partitions3").await; produce_consume_multi_topic_batch(connection_builder).await; + produce_consume_multi_partition_batch(connection_builder).await; // Only run this test case on the java driver, // since even without going through shotover the cpp driver fails this test. diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index d2008433f..9182b086d 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -120,7 +120,7 @@ impl KafkaProducerCpp { .send_result( FutureRecord::to(record.topic_name) .payload(record.payload) - .key(key), + .key(&key), ) .unwrap(), None => self diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 083a82371..aca906de6 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -158,7 +158,7 @@ impl KafkaProducerJava { "org.apache.kafka.clients.producer.ProducerRecord", vec![ self.jvm.new_string(record.topic_name), - self.jvm.new_string(key), + self.jvm.new_string(&key), self.jvm.new_string(record.payload), ], ), diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 1363156f7..42cbfc9a0 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -132,7 +132,7 @@ impl KafkaProducer { pub struct Record<'a> { pub payload: &'a str, pub topic_name: &'a str, - pub key: Option<&'a str>, + pub key: Option, } pub enum KafkaConsumer {