Skip to content

Commit

Permalink
KafkaSinkCluster - add multi_partition integration test case
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 13, 2024
1 parent 4cfa26c commit e31a2b9
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 14 deletions.
88 changes: 77 additions & 11 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::{stream::FuturesUnordered, StreamExt};
use std::collections::HashMap;
use test_helpers::{
connection::kafka::{
Expand Down Expand Up @@ -47,6 +48,11 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "multi_partitions_batch",
num_partitions: 3,
replication_factor: 1,
},
])
.await;

Expand Down Expand Up @@ -95,7 +101,7 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
Record {
payload: "initial2",
topic_name: "batch_test_partitions_3",
key: Some("foo"),
key: Some("foo".into()),
},
Some(0),
),
Expand Down Expand Up @@ -156,7 +162,7 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
Record {
payload: "Message2",
topic_name: "batch_test_partitions_3",
key: Some("foo"),
key: Some("foo".into()),
},
None,
),
Expand Down Expand Up @@ -193,6 +199,65 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
}
}

/// Attempt to make the driver batch produce requests for different partitions of the same topic into the same request
/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests.
pub async fn produce_consume_multi_partition_batch(connection_builder: &KafkaConnectionBuilder) {
// set linger to 100ms to ensure that the concurrent produce requests are combined into a single batched request.
let producer = connection_builder.connect_producer("all", 100).await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Record {
payload: "initial2",
topic_name: "multi_partitions_batch",
key: Some("foo".into()),
},
Some(0),
)
.await;

let mut consumer = connection_builder
.connect_consumer("multi_partitions_batch", "multi_partitions_batch_group")
.await;

consumer
.assert_consume(ExpectedResponse {
message: "initial2".to_owned(),
// ensure we route to the same partition every time, so we can assert on the offset when consuming.
key: Some("foo".to_owned()),
topic_name: "multi_partitions_batch".to_owned(),
offset: Some(0),
})
.await;

// create and consume records
let mut futures = FuturesUnordered::new();
for i in 0..2000 {
futures.push(producer.assert_produce(
Record {
payload: "Message",
topic_name: "multi_partitions_batch",
key: Some(format!("Key{i}")),
},
None,
))
}
while let Some(()) = futures.next().await {}

// TODO: Would be good to assert this, but first we would need to allow producing to be properly ordered by adding a `produce` method that returns a future.
// So its disabled for now.
// for i in 0..2000 {
// consumer
// .assert_consume(ExpectedResponse {
// message: "Message".to_owned(),
// key: Some(format!("Key{i}")),
// topic_name: "batch_test_partitions_3".to_owned(),
// offset: Some(i + 1),
// })
// .await;
// }
}

pub async fn produce_consume_partitions1(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
Expand All @@ -205,7 +270,7 @@ pub async fn produce_consume_partitions1(
Record {
payload: "initial",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(0),
)
Expand All @@ -230,7 +295,7 @@ pub async fn produce_consume_partitions1(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(i * 2 + 1),
)
Expand Down Expand Up @@ -335,7 +400,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
Record {
payload: "initial",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(0),
)
Expand All @@ -362,7 +427,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(i * 2 + 1),
)
Expand Down Expand Up @@ -442,7 +507,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
Record {
payload: "Initial",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(0),
)
Expand Down Expand Up @@ -475,7 +540,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(1),
)
Expand Down Expand Up @@ -504,7 +569,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
Record {
payload: "Message2",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(2),
)
Expand Down Expand Up @@ -572,7 +637,7 @@ async fn produce_consume_partitions3(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
// We cant predict the offsets since that will depend on which partition the keyless record ends up in
None,
Expand Down Expand Up @@ -618,7 +683,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
Record {
payload: "MessageAcks0",
topic_name,
key: Some("KeyAcks0"),
key: Some("KeyAcks0".into()),
},
None,
)
Expand Down Expand Up @@ -648,6 +713,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await;
produce_consume_partitions3(connection_builder, "partitions3").await;
produce_consume_multi_topic_batch(connection_builder).await;
produce_consume_multi_partition_batch(connection_builder).await;

// Only run this test case on the java driver,
// since even without going through shotover the cpp driver fails this test.
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl KafkaProducerCpp {
.send_result(
FutureRecord::to(record.topic_name)
.payload(record.payload)
.key(key),
.key(&key),
)
.unwrap(),
None => self
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl KafkaProducerJava {
"org.apache.kafka.clients.producer.ProducerRecord",
vec![
self.jvm.new_string(record.topic_name),
self.jvm.new_string(key),
self.jvm.new_string(&key),
self.jvm.new_string(record.payload),
],
),
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl KafkaProducer {
pub struct Record<'a> {
pub payload: &'a str,
pub topic_name: &'a str,
pub key: Option<&'a str>,
pub key: Option<String>,
}

pub enum KafkaConsumer {
Expand Down

0 comments on commit e31a2b9

Please sign in to comment.