Skip to content

Commit

Permalink
KafkaSinkCluster transaction request routing
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 3, 2024
1 parent 985668f commit aa0d8bb
Show file tree
Hide file tree
Showing 5 changed files with 501 additions and 21 deletions.
117 changes: 117 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "transaction_topic1",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "transaction_topic2",
num_partitions: 3,
replication_factor: 1,
},
])
.await;

Expand Down Expand Up @@ -735,6 +745,111 @@ async fn produce_consume_partitions3(
}
}

async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilder) {
let producer = connection_builder.connect_producer("1", 0).await;
for i in 0..5 {
producer
.assert_produce(
Record {
payload: &format!("Message1_{i}"),
topic_name: "transaction_topic1",
key: Some("Key".into()),
},
// We cant predict the offsets since that will depend on which partition the keyless record ends up in
None,
)
.await;
producer
.assert_produce(
Record {
payload: &format!("Message2_{i}"),
topic_name: "transaction_topic1",
key: Some("Key".into()),
},
None,
)
.await;
}

let producer = connection_builder
.connect_producer_with_transactions("some_transaction_id".to_owned())
.await;
let mut consumer_topic1 = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transaction_topic1".to_owned())
.with_group("some_group"),
)
.await;
let mut consumer_topic2 = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transaction_topic2".to_owned())
.with_group("some_group"),
)
.await;

for i in 0..5 {
consumer_topic1
.assert_consume_in_any_order(vec![
ExpectedResponse {
message: format!("Message1_{i}"),
key: Some("Key".to_owned()),
topic_name: "transaction_topic1".to_owned(),
offset: None,
},
ExpectedResponse {
message: format!("Message2_{i}"),
key: Some("Key".into()),
topic_name: "transaction_topic1".to_owned(),
offset: None,
},
])
.await;
producer.begin_transaction();

producer
.assert_produce(
Record {
payload: &format!("Message1_{i}"),
topic_name: "transaction_topic2",
key: Some("Key".into()),
},
// We cant predict the offsets since that will depend on which partition the keyless record ends up in
None,
)
.await;
producer
.assert_produce(
Record {
payload: &format!("Message2_{i}"),
topic_name: "transaction_topic2",
key: Some("Key".into()),
},
None,
)
.await;

producer.send_offsets_to_transaction(&consumer_topic1);
producer.commit_transaction();

consumer_topic2
.assert_consume_in_any_order(vec![
ExpectedResponse {
message: format!("Message1_{i}"),
key: Some("Key".to_owned()),
topic_name: "transaction_topic2".to_owned(),
offset: None,
},
ExpectedResponse {
message: format!("Message2_{i}"),
key: Some("Key".to_owned()),
topic_name: "transaction_topic2".to_owned(),
offset: None,
},
])
.await;
}
}

async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
let topic_name = "acks0";
let producer = connection_builder.connect_producer("0", 0).await;
Expand Down Expand Up @@ -844,6 +959,8 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
// set the bytes limit to 1MB so that we will not reach it and will hit the 100ms timeout every time.
produce_consume_partitions3(connection_builder, "partitions3_case4", 1_000_000, 100).await;

produce_consume_transactions(connection_builder).await;

// Only run this test case on the java driver,
// since even without going through shotover the cpp driver fails this test.
#[allow(irrefutable_let_patterns)]
Expand Down
Loading

0 comments on commit aa0d8bb

Please sign in to comment.