diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 700fc78fc..3eecb7628 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,3 +1,4 @@ +use futures::{stream::FuturesUnordered, StreamExt}; use std::collections::HashMap; use test_helpers::{ connection::kafka::{ @@ -38,12 +39,17 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { replication_factor: 1, }, NewTopic { - name: "batch_test_partitions_1", + name: "multi_topic_batch_partitions_1", num_partitions: 1, replication_factor: 1, }, NewTopic { - name: "batch_test_partitions_3", + name: "multi_topic_batch_partitions_3", + num_partitions: 3, + replication_factor: 1, + }, + NewTopic { + name: "multi_partitions_batch", num_partitions: 3, replication_factor: 1, }, @@ -86,7 +92,7 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect producer.assert_produce( Record { payload: "initial1", - topic_name: "batch_test_partitions_1", + topic_name: "multi_topic_batch_partitions_1", key: None, }, Some(0), @@ -94,8 +100,8 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect producer.assert_produce( Record { payload: "initial2", - topic_name: "batch_test_partitions_3", - key: Some("foo"), + topic_name: "multi_topic_batch_partitions_3", + key: Some("foo".into()), }, Some(0), ), @@ -110,10 +116,16 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect ); let mut consumer_partitions_1 = connection_builder - .connect_consumer("batch_test_partitions_1", "batch_test_partitions_1_group") + .connect_consumer( + "multi_topic_batch_partitions_1", + "multi_topic_batch_partitions_1_group", + ) .await; let mut consumer_partitions_3 = connection_builder - .connect_consumer("batch_test_partitions_3", "batch_test_partitions_3_group") + .connect_consumer( + "multi_topic_batch_partitions_3", + "multi_topic_batch_partitions_3_group", + ) .await; let mut consumer_unknown = connection_builder .connect_consumer("batch_test_unknown", "batch_test_unknown_group") @@ -123,14 +135,14 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect consumer_partitions_1.assert_consume(ExpectedResponse { message: "initial1".to_owned(), key: None, - topic_name: "batch_test_partitions_1".to_owned(), + topic_name: "multi_topic_batch_partitions_1".to_owned(), offset: Some(0), }), consumer_partitions_3.assert_consume(ExpectedResponse { message: "initial2".to_owned(), // ensure we route to the same partition every time, so we can assert on the offset when consuming. key: Some("foo".to_owned()), - topic_name: "batch_test_partitions_3".to_owned(), + topic_name: "multi_topic_batch_partitions_3".to_owned(), offset: Some(0), }), consumer_unknown.assert_consume(ExpectedResponse { @@ -147,7 +159,7 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect producer.assert_produce( Record { payload: "Message1", - topic_name: "batch_test_partitions_1", + topic_name: "multi_topic_batch_partitions_1", key: None, }, Some(i + 1), @@ -155,8 +167,8 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect producer.assert_produce( Record { payload: "Message2", - topic_name: "batch_test_partitions_3", - key: Some("foo"), + topic_name: "multi_topic_batch_partitions_3", + key: Some("foo".into()), }, None, ), @@ -174,13 +186,13 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect consumer_partitions_1.assert_consume(ExpectedResponse { message: "Message1".to_owned(), key: None, - topic_name: "batch_test_partitions_1".to_owned(), + topic_name: "multi_topic_batch_partitions_1".to_owned(), offset: Some(i + 1), }), consumer_partitions_3.assert_consume(ExpectedResponse { message: "Message2".to_owned(), key: Some("foo".to_owned()), - topic_name: "batch_test_partitions_3".to_owned(), + topic_name: "multi_topic_batch_partitions_3".to_owned(), offset: Some(i + 1), }), consumer_unknown.assert_consume(ExpectedResponse { @@ -193,6 +205,65 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect } } +/// Attempt to make the driver batch produce requests for different partitions of the same topic into the same request +/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests. +pub async fn produce_consume_multi_partition_batch(connection_builder: &KafkaConnectionBuilder) { + // set linger to 100ms to ensure that the concurrent produce requests are combined into a single batched request. + let producer = connection_builder.connect_producer("all", 100).await; + // create an initial record to force kafka to create the topic if it doesnt yet exist + producer + .assert_produce( + Record { + payload: "initial2", + topic_name: "multi_partitions_batch", + key: Some("foo".into()), + }, + Some(0), + ) + .await; + + let mut consumer = connection_builder + .connect_consumer("multi_partitions_batch", "multi_partitions_batch_group") + .await; + + consumer + .assert_consume(ExpectedResponse { + message: "initial2".to_owned(), + // ensure we route to the same partition every time, so we can assert on the offset when consuming. + key: Some("foo".to_owned()), + topic_name: "multi_partitions_batch".to_owned(), + offset: Some(0), + }) + .await; + + // create and consume records + let mut futures = FuturesUnordered::new(); + for i in 0..2000 { + futures.push(producer.assert_produce( + Record { + payload: "Message", + topic_name: "multi_partitions_batch", + key: Some(format!("Key{i}")), + }, + None, + )) + } + while let Some(()) = futures.next().await {} + + // TODO: Would be good to assert this, but first we would need to allow producing to be properly ordered by adding a `produce` method that returns a future. + // So its disabled for now. + // for i in 0..2000 { + // consumer + // .assert_consume(ExpectedResponse { + // message: "Message".to_owned(), + // key: Some(format!("Key{i}")), + // topic_name: "multi_partitions_batch".to_owned(), + // offset: Some(i + 1), + // }) + // .await; + // } +} + pub async fn produce_consume_partitions1( connection_builder: &KafkaConnectionBuilder, topic_name: &str, @@ -205,7 +276,7 @@ pub async fn produce_consume_partitions1( Record { payload: "initial", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(0), ) @@ -230,7 +301,7 @@ pub async fn produce_consume_partitions1( Record { payload: "Message1", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(i * 2 + 1), ) @@ -335,7 +406,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down( Record { payload: "initial", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(0), ) @@ -362,7 +433,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down( Record { payload: "Message1", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(i * 2 + 1), ) @@ -442,7 +513,7 @@ pub async fn produce_consume_commit_offsets_partitions1( Record { payload: "Initial", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(0), ) @@ -475,7 +546,7 @@ pub async fn produce_consume_commit_offsets_partitions1( Record { payload: "Message1", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(1), ) @@ -504,7 +575,7 @@ pub async fn produce_consume_commit_offsets_partitions1( Record { payload: "Message2", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, Some(2), ) @@ -572,7 +643,7 @@ async fn produce_consume_partitions3( Record { payload: "Message1", topic_name, - key: Some("Key"), + key: Some("Key".into()), }, // We cant predict the offsets since that will depend on which partition the keyless record ends up in None, @@ -618,7 +689,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { Record { payload: "MessageAcks0", topic_name, - key: Some("KeyAcks0"), + key: Some("KeyAcks0".into()), }, None, ) @@ -648,6 +719,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await; produce_consume_partitions3(connection_builder, "partitions3").await; produce_consume_multi_topic_batch(connection_builder).await; + produce_consume_multi_partition_batch(connection_builder).await; // Only run this test case on the java driver, // since even without going through shotover the cpp driver fails this test. diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index d2008433f..9182b086d 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -120,7 +120,7 @@ impl KafkaProducerCpp { .send_result( FutureRecord::to(record.topic_name) .payload(record.payload) - .key(key), + .key(&key), ) .unwrap(), None => self diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 083a82371..aca906de6 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -158,7 +158,7 @@ impl KafkaProducerJava { "org.apache.kafka.clients.producer.ProducerRecord", vec![ self.jvm.new_string(record.topic_name), - self.jvm.new_string(key), + self.jvm.new_string(&key), self.jvm.new_string(record.payload), ], ), diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 1363156f7..42cbfc9a0 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -132,7 +132,7 @@ impl KafkaProducer { pub struct Record<'a> { pub payload: &'a str, pub topic_name: &'a str, - pub key: Option<&'a str>, + pub key: Option, } pub enum KafkaConsumer {