Skip to content

Commit

Permalink
KafkaSinkCluster split produce requests (#1745)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Sep 10, 2024
1 parent f268a0c commit 837d5d6
Show file tree
Hide file tree
Showing 6 changed files with 371 additions and 94 deletions.
6 changes: 5 additions & 1 deletion docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ This transform will drop any messages it receives and return the supplied respon
This transform will route kafka messages to a broker within a Kafka cluster:

* produce messages are routed to the partition leader
* fetch messages are routed to a random partition replica
* fetch messages are routed to the partition leader
* heartbeat, syncgroup, offsetfetch, joingroup and leavegroup are all routed to the group coordinator
* all other messages go to a random node.

Expand All @@ -238,6 +238,10 @@ Instead Shotover will pretend to be either a single Kafka node or part of a clus

This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster messages to contain the nodes in the shotover cluster instead of the kafka cluster.

Note that:
Produce and fetch requests will be split into multiple requests if no single broker can fulfil the request.
e.g. A produce request contains records for topics that have leaders on different brokers in the real kafka cluster, but the shotover cluster appeared to have them hosted on the same cluster.

#### SASL SCRAM

By default KafkaSinkCluster does not support SASL SCRAM authentication, if a client attempts to use SCRAM it will appear as if its not enabled in the server.
Expand Down
138 changes: 133 additions & 5 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "batch_test_partitions_1",
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "batch_test_partitions_3",
num_partitions: 3,
replication_factor: 1,
},
])
.await;

Expand Down Expand Up @@ -66,12 +76,129 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
admin.delete_topics(&["to_delete"]).await
}

/// Attempt to make the driver batch produce requests for different topics into the same request
/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests.
pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnectionBuilder) {
// set linger to 100ms to ensure that the concurrent produce requests are combined into a single batched request.
let producer = connection_builder.connect_producer("all", 100).await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
tokio::join!(
producer.assert_produce(
Record {
payload: "initial1",
topic_name: "batch_test_partitions_1",
key: None,
},
Some(0),
),
producer.assert_produce(
Record {
payload: "initial2",
topic_name: "batch_test_partitions_3",
key: Some("foo"),
},
Some(0),
),
producer.assert_produce(
Record {
payload: "initial3",
topic_name: "batch_test_unknown",
key: None,
},
Some(0),
)
);

let mut consumer_partitions_1 = connection_builder
.connect_consumer("batch_test_partitions_1", "batch_test_partitions_1_group")
.await;
let mut consumer_partitions_3 = connection_builder
.connect_consumer("batch_test_partitions_3", "batch_test_partitions_3_group")
.await;
let mut consumer_unknown = connection_builder
.connect_consumer("batch_test_unknown", "batch_test_unknown_group")
.await;

tokio::join!(
consumer_partitions_1.assert_consume(ExpectedResponse {
message: "initial1".to_owned(),
key: None,
topic_name: "batch_test_partitions_1".to_owned(),
offset: Some(0),
}),
consumer_partitions_3.assert_consume(ExpectedResponse {
message: "initial2".to_owned(),
// ensure we route to the same partition every time, so we can assert on the offset when consuming.
key: Some("foo".to_owned()),
topic_name: "batch_test_partitions_3".to_owned(),
offset: Some(0),
}),
consumer_unknown.assert_consume(ExpectedResponse {
message: "initial3".to_owned(),
key: None,
topic_name: "batch_test_unknown".to_owned(),
offset: Some(0),
})
);

// create and consume records
for i in 0..5 {
tokio::join!(
producer.assert_produce(
Record {
payload: "Message1",
topic_name: "batch_test_partitions_1",
key: None,
},
Some(i + 1),
),
producer.assert_produce(
Record {
payload: "Message2",
topic_name: "batch_test_partitions_3",
key: Some("foo"),
},
None,
),
producer.assert_produce(
Record {
payload: "Message3",
topic_name: "batch_test_unknown",
key: None,
},
Some(i + 1),
)
);

tokio::join!(
consumer_partitions_1.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: None,
topic_name: "batch_test_partitions_1".to_owned(),
offset: Some(i + 1),
}),
consumer_partitions_3.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: Some("foo".to_owned()),
topic_name: "batch_test_partitions_3".to_owned(),
offset: Some(i + 1),
}),
consumer_unknown.assert_consume(ExpectedResponse {
message: "Message3".to_owned(),
key: None,
topic_name: "batch_test_unknown".to_owned(),
offset: Some(i + 1),
})
);
}
}

pub async fn produce_consume_partitions1(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
) {
{
let producer = connection_builder.connect_producer("all").await;
let producer = connection_builder.connect_producer("all", 0).await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Expand Down Expand Up @@ -201,7 +328,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
}

{
let producer = connection_builder.connect_producer("all").await;
let producer = connection_builder.connect_producer("all", 0).await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Expand Down Expand Up @@ -309,7 +436,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
topic_name: &str,
) {
{
let producer = connection_builder.connect_producer("1").await;
let producer = connection_builder.connect_producer("1", 0).await;
producer
.assert_produce(
Record {
Expand Down Expand Up @@ -434,7 +561,7 @@ async fn produce_consume_partitions3(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
) {
let producer = connection_builder.connect_producer("1").await;
let producer = connection_builder.connect_producer("1", 0).await;
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.await;
Expand Down Expand Up @@ -483,7 +610,7 @@ async fn produce_consume_partitions3(

async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
let topic_name = "acks0";
let producer = connection_builder.connect_producer("0").await;
let producer = connection_builder.connect_producer("0", 0).await;

for _ in 0..10 {
producer
Expand Down Expand Up @@ -520,6 +647,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_partitions1(connection_builder, "unknown_topic").await;
produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await;
produce_consume_partitions3(connection_builder, "partitions3").await;
produce_consume_multi_topic_batch(connection_builder).await;

// Only run this test case on the java driver,
// since even without going through shotover the cpp driver fails this test.
Expand Down
Loading

0 comments on commit 837d5d6

Please sign in to comment.