From eab4f91f7ac6848199ad98f5e01534499c7f26ca Mon Sep 17 00:00:00 2001
From: Lucas Kent <rubickent@gmail.com>
Date: Wed, 11 Sep 2024 15:34:50 +1000
Subject: [PATCH] KafkaSinkCluster - add multi_partition integration test case

---
 .../tests/kafka_int_tests/test_cases.rs       | 83 ++++++++++++++++---
 test-helpers/src/connection/kafka/cpp.rs      |  2 +-
 test-helpers/src/connection/kafka/java.rs     |  2 +-
 test-helpers/src/connection/kafka/mod.rs      |  2 +-
 4 files changed, 75 insertions(+), 14 deletions(-)

diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs
index 700fc78fc..f49bacc3a 100644
--- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs
+++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs
@@ -1,3 +1,4 @@
+use futures::{stream::FuturesUnordered, StreamExt};
 use std::collections::HashMap;
 use test_helpers::{
     connection::kafka::{
@@ -95,7 +96,7 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
             Record {
                 payload: "initial2",
                 topic_name: "batch_test_partitions_3",
-                key: Some("foo"),
+                key: Some("foo".into()),
             },
             Some(0),
         ),
@@ -156,7 +157,7 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
                 Record {
                     payload: "Message2",
                     topic_name: "batch_test_partitions_3",
-                    key: Some("foo"),
+                    key: Some("foo".into()),
                 },
                 None,
             ),
@@ -193,6 +194,65 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
     }
 }
 
+/// Attempt to make the driver batch produce requests for different partitions of the same topic into the same request
+/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests.
+pub async fn produce_consume_multi_partition_batch(connection_builder: &KafkaConnectionBuilder) {
+    // set linger to 100ms to ensure that the concurrent produce requests are combined into a single batched request.
+    let producer = connection_builder.connect_producer("all", 100).await;
+    // create an initial record to force kafka to create the topic if it doesnt yet exist
+    producer
+        .assert_produce(
+            Record {
+                payload: "initial2",
+                topic_name: "batch_test_partitions_3",
+                key: Some("foo".into()),
+            },
+            Some(0),
+        )
+        .await;
+
+    let mut consumer = connection_builder
+        .connect_consumer("batch_test_partitions_3", "batch_partitions_test")
+        .await;
+
+    consumer
+        .assert_consume(ExpectedResponse {
+            message: "initial2".to_owned(),
+            // ensure we route to the same partition every time, so we can assert on the offset when consuming.
+            key: Some("foo".to_owned()),
+            topic_name: "batch_test_partitions_3".to_owned(),
+            offset: Some(0),
+        })
+        .await;
+
+    // create and consume records
+    let mut futures = FuturesUnordered::new();
+    for i in 0..2000 {
+        futures.push(producer.assert_produce(
+            Record {
+                payload: "Message",
+                topic_name: "batch_test_partitions_3",
+                key: Some(format!("Key{i}")),
+            },
+            None,
+        ))
+    }
+    while let Some(()) = futures.next().await {}
+
+    // TODO: Would be good to assert this, but first we would need to allow producing to be properly ordered by adding a `produce` method that returns a future.
+    //       So its disabled for now.
+    // for i in 0..2000 {
+    //     consumer
+    //         .assert_consume(ExpectedResponse {
+    //             message: "Message".to_owned(),
+    //             key: Some(format!("Key{i}")),
+    //             topic_name: "batch_test_partitions_3".to_owned(),
+    //             offset: Some(i + 1),
+    //         })
+    //         .await;
+    // }
+}
+
 pub async fn produce_consume_partitions1(
     connection_builder: &KafkaConnectionBuilder,
     topic_name: &str,
@@ -205,7 +265,7 @@ pub async fn produce_consume_partitions1(
                 Record {
                     payload: "initial",
                     topic_name,
-                    key: Some("Key"),
+                    key: Some("Key".into()),
                 },
                 Some(0),
             )
@@ -230,7 +290,7 @@ pub async fn produce_consume_partitions1(
                     Record {
                         payload: "Message1",
                         topic_name,
-                        key: Some("Key"),
+                        key: Some("Key".into()),
                     },
                     Some(i * 2 + 1),
                 )
@@ -335,7 +395,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
                 Record {
                     payload: "initial",
                     topic_name,
-                    key: Some("Key"),
+                    key: Some("Key".into()),
                 },
                 Some(0),
             )
@@ -362,7 +422,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
                     Record {
                         payload: "Message1",
                         topic_name,
-                        key: Some("Key"),
+                        key: Some("Key".into()),
                     },
                     Some(i * 2 + 1),
                 )
@@ -442,7 +502,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
                 Record {
                     payload: "Initial",
                     topic_name,
-                    key: Some("Key"),
+                    key: Some("Key".into()),
                 },
                 Some(0),
             )
@@ -475,7 +535,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
                 Record {
                     payload: "Message1",
                     topic_name,
-                    key: Some("Key"),
+                    key: Some("Key".into()),
                 },
                 Some(1),
             )
@@ -504,7 +564,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
                 Record {
                     payload: "Message2",
                     topic_name,
-                    key: Some("Key"),
+                    key: Some("Key".into()),
                 },
                 Some(2),
             )
@@ -572,7 +632,7 @@ async fn produce_consume_partitions3(
                 Record {
                     payload: "Message1",
                     topic_name,
-                    key: Some("Key"),
+                    key: Some("Key".into()),
                 },
                 // We cant predict the offsets since that will depend on which partition the keyless record ends up in
                 None,
@@ -618,7 +678,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
                 Record {
                     payload: "MessageAcks0",
                     topic_name,
-                    key: Some("KeyAcks0"),
+                    key: Some("KeyAcks0".into()),
                 },
                 None,
             )
@@ -648,6 +708,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
     produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await;
     produce_consume_partitions3(connection_builder, "partitions3").await;
     produce_consume_multi_topic_batch(connection_builder).await;
+    produce_consume_multi_partition_batch(connection_builder).await;
 
     // Only run this test case on the java driver,
     // since even without going through shotover the cpp driver fails this test.
diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs
index d2008433f..9182b086d 100644
--- a/test-helpers/src/connection/kafka/cpp.rs
+++ b/test-helpers/src/connection/kafka/cpp.rs
@@ -120,7 +120,7 @@ impl KafkaProducerCpp {
                 .send_result(
                     FutureRecord::to(record.topic_name)
                         .payload(record.payload)
-                        .key(key),
+                        .key(&key),
                 )
                 .unwrap(),
             None => self
diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs
index 083a82371..aca906de6 100644
--- a/test-helpers/src/connection/kafka/java.rs
+++ b/test-helpers/src/connection/kafka/java.rs
@@ -158,7 +158,7 @@ impl KafkaProducerJava {
                 "org.apache.kafka.clients.producer.ProducerRecord",
                 vec![
                     self.jvm.new_string(record.topic_name),
-                    self.jvm.new_string(key),
+                    self.jvm.new_string(&key),
                     self.jvm.new_string(record.payload),
                 ],
             ),
diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs
index 1363156f7..42cbfc9a0 100644
--- a/test-helpers/src/connection/kafka/mod.rs
+++ b/test-helpers/src/connection/kafka/mod.rs
@@ -132,7 +132,7 @@ impl KafkaProducer {
 pub struct Record<'a> {
     pub payload: &'a str,
     pub topic_name: &'a str,
-    pub key: Option<&'a str>,
+    pub key: Option<String>,
 }
 
 pub enum KafkaConsumer {