Skip to content

Commit

Permalink
KafkaSinkCluster - add multi_partition integration test case (#1749)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Sep 17, 2024
1 parent a39c43c commit 12ff171
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 26 deletions.
118 changes: 95 additions & 23 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::{stream::FuturesUnordered, StreamExt};
use std::collections::HashMap;
use test_helpers::{
connection::kafka::{
Expand Down Expand Up @@ -38,12 +39,17 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
replication_factor: 1,
},
NewTopic {
name: "batch_test_partitions_1",
name: "multi_topic_batch_partitions_1",
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "batch_test_partitions_3",
name: "multi_topic_batch_partitions_3",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "multi_partitions_batch",
num_partitions: 3,
replication_factor: 1,
},
Expand Down Expand Up @@ -86,16 +92,16 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
producer.assert_produce(
Record {
payload: "initial1",
topic_name: "batch_test_partitions_1",
topic_name: "multi_topic_batch_partitions_1",
key: None,
},
Some(0),
),
producer.assert_produce(
Record {
payload: "initial2",
topic_name: "batch_test_partitions_3",
key: Some("foo"),
topic_name: "multi_topic_batch_partitions_3",
key: Some("foo".into()),
},
Some(0),
),
Expand All @@ -110,10 +116,16 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
);

let mut consumer_partitions_1 = connection_builder
.connect_consumer("batch_test_partitions_1", "batch_test_partitions_1_group")
.connect_consumer(
"multi_topic_batch_partitions_1",
"multi_topic_batch_partitions_1_group",
)
.await;
let mut consumer_partitions_3 = connection_builder
.connect_consumer("batch_test_partitions_3", "batch_test_partitions_3_group")
.connect_consumer(
"multi_topic_batch_partitions_3",
"multi_topic_batch_partitions_3_group",
)
.await;
let mut consumer_unknown = connection_builder
.connect_consumer("batch_test_unknown", "batch_test_unknown_group")
Expand All @@ -123,14 +135,14 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
consumer_partitions_1.assert_consume(ExpectedResponse {
message: "initial1".to_owned(),
key: None,
topic_name: "batch_test_partitions_1".to_owned(),
topic_name: "multi_topic_batch_partitions_1".to_owned(),
offset: Some(0),
}),
consumer_partitions_3.assert_consume(ExpectedResponse {
message: "initial2".to_owned(),
// ensure we route to the same partition every time, so we can assert on the offset when consuming.
key: Some("foo".to_owned()),
topic_name: "batch_test_partitions_3".to_owned(),
topic_name: "multi_topic_batch_partitions_3".to_owned(),
offset: Some(0),
}),
consumer_unknown.assert_consume(ExpectedResponse {
Expand All @@ -147,16 +159,16 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
producer.assert_produce(
Record {
payload: "Message1",
topic_name: "batch_test_partitions_1",
topic_name: "multi_topic_batch_partitions_1",
key: None,
},
Some(i + 1),
),
producer.assert_produce(
Record {
payload: "Message2",
topic_name: "batch_test_partitions_3",
key: Some("foo"),
topic_name: "multi_topic_batch_partitions_3",
key: Some("foo".into()),
},
None,
),
Expand All @@ -174,13 +186,13 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
consumer_partitions_1.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: None,
topic_name: "batch_test_partitions_1".to_owned(),
topic_name: "multi_topic_batch_partitions_1".to_owned(),
offset: Some(i + 1),
}),
consumer_partitions_3.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: Some("foo".to_owned()),
topic_name: "batch_test_partitions_3".to_owned(),
topic_name: "multi_topic_batch_partitions_3".to_owned(),
offset: Some(i + 1),
}),
consumer_unknown.assert_consume(ExpectedResponse {
Expand All @@ -193,6 +205,65 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
}
}

/// Attempt to make the driver batch produce requests for different partitions of the same topic into the same request
/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests.
pub async fn produce_consume_multi_partition_batch(connection_builder: &KafkaConnectionBuilder) {
// set linger to 100ms to ensure that the concurrent produce requests are combined into a single batched request.
let producer = connection_builder.connect_producer("all", 100).await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Record {
payload: "initial2",
topic_name: "multi_partitions_batch",
key: Some("foo".into()),
},
Some(0),
)
.await;

let mut consumer = connection_builder
.connect_consumer("multi_partitions_batch", "multi_partitions_batch_group")
.await;

consumer
.assert_consume(ExpectedResponse {
message: "initial2".to_owned(),
// ensure we route to the same partition every time, so we can assert on the offset when consuming.
key: Some("foo".to_owned()),
topic_name: "multi_partitions_batch".to_owned(),
offset: Some(0),
})
.await;

// create and consume records
let mut futures = FuturesUnordered::new();
for i in 0..2000 {
futures.push(producer.assert_produce(
Record {
payload: "Message",
topic_name: "multi_partitions_batch",
key: Some(format!("Key{i}")),
},
None,
))
}
while let Some(()) = futures.next().await {}

// TODO: Would be good to assert this, but first we would need to allow producing to be properly ordered by adding a `produce` method that returns a future.
// So its disabled for now.
// for i in 0..2000 {
// consumer
// .assert_consume(ExpectedResponse {
// message: "Message".to_owned(),
// key: Some(format!("Key{i}")),
// topic_name: "multi_partitions_batch".to_owned(),
// offset: Some(i + 1),
// })
// .await;
// }
}

pub async fn produce_consume_partitions1(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
Expand All @@ -205,7 +276,7 @@ pub async fn produce_consume_partitions1(
Record {
payload: "initial",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(0),
)
Expand All @@ -230,7 +301,7 @@ pub async fn produce_consume_partitions1(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(i * 2 + 1),
)
Expand Down Expand Up @@ -335,7 +406,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
Record {
payload: "initial",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(0),
)
Expand All @@ -362,7 +433,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(i * 2 + 1),
)
Expand Down Expand Up @@ -442,7 +513,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
Record {
payload: "Initial",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(0),
)
Expand Down Expand Up @@ -475,7 +546,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(1),
)
Expand Down Expand Up @@ -504,7 +575,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
Record {
payload: "Message2",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(2),
)
Expand Down Expand Up @@ -572,7 +643,7 @@ async fn produce_consume_partitions3(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
// We cant predict the offsets since that will depend on which partition the keyless record ends up in
None,
Expand Down Expand Up @@ -618,7 +689,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
Record {
payload: "MessageAcks0",
topic_name,
key: Some("KeyAcks0"),
key: Some("KeyAcks0".into()),
},
None,
)
Expand Down Expand Up @@ -648,6 +719,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await;
produce_consume_partitions3(connection_builder, "partitions3").await;
produce_consume_multi_topic_batch(connection_builder).await;
produce_consume_multi_partition_batch(connection_builder).await;

// Only run this test case on the java driver,
// since even without going through shotover the cpp driver fails this test.
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl KafkaProducerCpp {
.send_result(
FutureRecord::to(record.topic_name)
.payload(record.payload)
.key(key),
.key(&key),
)
.unwrap(),
None => self
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl KafkaProducerJava {
"org.apache.kafka.clients.producer.ProducerRecord",
vec![
self.jvm.new_string(record.topic_name),
self.jvm.new_string(key),
self.jvm.new_string(&key),
self.jvm.new_string(record.payload),
],
),
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl KafkaProducer {
pub struct Record<'a> {
pub payload: &'a str,
pub topic_name: &'a str,
pub key: Option<&'a str>,
pub key: Option<String>,
}

pub enum KafkaConsumer {
Expand Down

0 comments on commit 12ff171

Please sign in to comment.