Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaSinkCluster - add multi_partition integration test case #1749

Merged
merged 3 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 95 additions & 23 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::{stream::FuturesUnordered, StreamExt};
use std::collections::HashMap;
use test_helpers::{
connection::kafka::{
Expand Down Expand Up @@ -38,12 +39,17 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
replication_factor: 1,
},
NewTopic {
name: "batch_test_partitions_1",
name: "multi_topic_batch_partitions_1",
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "batch_test_partitions_3",
name: "multi_topic_batch_partitions_3",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "multi_partitions_batch",
num_partitions: 3,
replication_factor: 1,
},
Expand Down Expand Up @@ -86,16 +92,16 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
producer.assert_produce(
Record {
payload: "initial1",
topic_name: "batch_test_partitions_1",
topic_name: "multi_topic_batch_partitions_1",
key: None,
},
Some(0),
),
producer.assert_produce(
Record {
payload: "initial2",
topic_name: "batch_test_partitions_3",
key: Some("foo"),
topic_name: "multi_topic_batch_partitions_3",
key: Some("foo".into()),
},
Some(0),
),
Expand All @@ -110,10 +116,16 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
);

let mut consumer_partitions_1 = connection_builder
.connect_consumer("batch_test_partitions_1", "batch_test_partitions_1_group")
.connect_consumer(
"multi_topic_batch_partitions_1",
"multi_topic_batch_partitions_1_group",
)
.await;
let mut consumer_partitions_3 = connection_builder
.connect_consumer("batch_test_partitions_3", "batch_test_partitions_3_group")
.connect_consumer(
"multi_topic_batch_partitions_3",
"multi_topic_batch_partitions_3_group",
)
.await;
let mut consumer_unknown = connection_builder
.connect_consumer("batch_test_unknown", "batch_test_unknown_group")
Expand All @@ -123,14 +135,14 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
consumer_partitions_1.assert_consume(ExpectedResponse {
message: "initial1".to_owned(),
key: None,
topic_name: "batch_test_partitions_1".to_owned(),
topic_name: "multi_topic_batch_partitions_1".to_owned(),
offset: Some(0),
}),
consumer_partitions_3.assert_consume(ExpectedResponse {
message: "initial2".to_owned(),
// ensure we route to the same partition every time, so we can assert on the offset when consuming.
key: Some("foo".to_owned()),
topic_name: "batch_test_partitions_3".to_owned(),
topic_name: "multi_topic_batch_partitions_3".to_owned(),
offset: Some(0),
}),
consumer_unknown.assert_consume(ExpectedResponse {
Expand All @@ -147,16 +159,16 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
producer.assert_produce(
Record {
payload: "Message1",
topic_name: "batch_test_partitions_1",
topic_name: "multi_topic_batch_partitions_1",
key: None,
},
Some(i + 1),
),
producer.assert_produce(
Record {
payload: "Message2",
topic_name: "batch_test_partitions_3",
key: Some("foo"),
topic_name: "multi_topic_batch_partitions_3",
key: Some("foo".into()),
},
None,
),
Expand All @@ -174,13 +186,13 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
consumer_partitions_1.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: None,
topic_name: "batch_test_partitions_1".to_owned(),
topic_name: "multi_topic_batch_partitions_1".to_owned(),
offset: Some(i + 1),
}),
consumer_partitions_3.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: Some("foo".to_owned()),
topic_name: "batch_test_partitions_3".to_owned(),
topic_name: "multi_topic_batch_partitions_3".to_owned(),
offset: Some(i + 1),
}),
consumer_unknown.assert_consume(ExpectedResponse {
Expand All @@ -193,6 +205,65 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect
}
}

/// Attempt to make the driver batch produce requests for different partitions of the same topic into the same request
/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests.
pub async fn produce_consume_multi_partition_batch(connection_builder: &KafkaConnectionBuilder) {
// set linger to 100ms to ensure that the concurrent produce requests are combined into a single batched request.
let producer = connection_builder.connect_producer("all", 100).await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Record {
payload: "initial2",
topic_name: "multi_partitions_batch",
key: Some("foo".into()),
},
Some(0),
)
.await;

let mut consumer = connection_builder
.connect_consumer("multi_partitions_batch", "multi_partitions_batch_group")
.await;

consumer
.assert_consume(ExpectedResponse {
message: "initial2".to_owned(),
// ensure we route to the same partition every time, so we can assert on the offset when consuming.
key: Some("foo".to_owned()),
topic_name: "multi_partitions_batch".to_owned(),
offset: Some(0),
})
.await;

// create and consume records
let mut futures = FuturesUnordered::new();
for i in 0..2000 {
futures.push(producer.assert_produce(
Record {
payload: "Message",
topic_name: "multi_partitions_batch",
key: Some(format!("Key{i}")),
},
None,
))
}
while let Some(()) = futures.next().await {}

// TODO: Would be good to assert this, but first we would need to allow producing to be properly ordered by adding a `produce` method that returns a future.
// So its disabled for now.
// for i in 0..2000 {
// consumer
// .assert_consume(ExpectedResponse {
// message: "Message".to_owned(),
// key: Some(format!("Key{i}")),
// topic_name: "multi_partitions_batch".to_owned(),
// offset: Some(i + 1),
// })
// .await;
// }
}

pub async fn produce_consume_partitions1(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
Expand All @@ -205,7 +276,7 @@ pub async fn produce_consume_partitions1(
Record {
payload: "initial",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(0),
)
Expand All @@ -230,7 +301,7 @@ pub async fn produce_consume_partitions1(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(i * 2 + 1),
)
Expand Down Expand Up @@ -335,7 +406,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
Record {
payload: "initial",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(0),
)
Expand All @@ -362,7 +433,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(i * 2 + 1),
)
Expand Down Expand Up @@ -442,7 +513,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
Record {
payload: "Initial",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(0),
)
Expand Down Expand Up @@ -475,7 +546,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(1),
)
Expand Down Expand Up @@ -504,7 +575,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
Record {
payload: "Message2",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
Some(2),
)
Expand Down Expand Up @@ -572,7 +643,7 @@ async fn produce_consume_partitions3(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
key: Some("Key".into()),
},
// We cant predict the offsets since that will depend on which partition the keyless record ends up in
None,
Expand Down Expand Up @@ -618,7 +689,7 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
Record {
payload: "MessageAcks0",
topic_name,
key: Some("KeyAcks0"),
key: Some("KeyAcks0".into()),
},
None,
)
Expand Down Expand Up @@ -648,6 +719,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await;
produce_consume_partitions3(connection_builder, "partitions3").await;
produce_consume_multi_topic_batch(connection_builder).await;
produce_consume_multi_partition_batch(connection_builder).await;

// Only run this test case on the java driver,
// since even without going through shotover the cpp driver fails this test.
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl KafkaProducerCpp {
.send_result(
FutureRecord::to(record.topic_name)
.payload(record.payload)
.key(key),
.key(&key),
)
.unwrap(),
None => self
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl KafkaProducerJava {
"org.apache.kafka.clients.producer.ProducerRecord",
vec![
self.jvm.new_string(record.topic_name),
self.jvm.new_string(key),
self.jvm.new_string(&key),
self.jvm.new_string(record.payload),
],
),
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl KafkaProducer {
pub struct Record<'a> {
pub payload: &'a str,
pub topic_name: &'a str,
pub key: Option<&'a str>,
pub key: Option<String>,
}

pub enum KafkaConsumer {
Expand Down
Loading