Skip to content

Commit

Permalink
KafkaSinkCluster: test multi topic consuming (#1786)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Oct 28, 2024
1 parent 830187d commit 487ffec
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 28 deletions.
115 changes: 92 additions & 23 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "multi_topic_consumer_1",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "multi_topic_consumer_2",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "multi_topic_batch_partitions_1",
num_partitions: 1,
Expand Down Expand Up @@ -166,19 +176,19 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect

let mut consumer_partitions_1 = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("multi_topic_batch_partitions_1".to_owned())
ConsumerConfig::consume_from_topics(vec!["multi_topic_batch_partitions_1".to_owned()])
.with_group("multi_topic_batch_partitions_1_group"),
)
.await;
let mut consumer_partitions_3 = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("multi_topic_batch_partitions_3".to_owned())
ConsumerConfig::consume_from_topics(vec!["multi_topic_batch_partitions_3".to_owned()])
.with_group("multi_topic_batch_partitions_3_group"),
)
.await;
let mut consumer_unknown = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("batch_test_unknown".to_owned())
ConsumerConfig::consume_from_topics(vec!["batch_test_unknown".to_owned()])
.with_group("batch_test_unknown_group"),
)
.await;
Expand Down Expand Up @@ -276,7 +286,7 @@ pub async fn produce_consume_multi_partition_batch(connection_builder: &KafkaCon

let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("multi_partitions_batch".to_owned())
ConsumerConfig::consume_from_topics(vec!["multi_partitions_batch".to_owned()])
.with_group("multi_partitions_batch_group"),
)
.await;
Expand Down Expand Up @@ -339,7 +349,8 @@ pub async fn produce_consume_partitions1(

let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"),
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("some_group"),
)
.await;
consumer
Expand Down Expand Up @@ -398,7 +409,8 @@ pub async fn produce_consume_partitions1(
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"),
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("some_group"),
)
.await;
consumer
Expand Down Expand Up @@ -473,7 +485,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(

let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("kafka_node_goes_down_test_group"),
)
.await;
Expand Down Expand Up @@ -535,7 +547,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("kafka_node_goes_down_test_group_new"),
)
.await;
Expand Down Expand Up @@ -599,7 +611,7 @@ pub async fn produce_consume_partitions1_shotover_nodes_go_down(

let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("kafka_node_goes_down_test_group"),
)
.await;
Expand Down Expand Up @@ -672,7 +684,7 @@ pub async fn produce_consume_partitions1_shotover_nodes_go_down(
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("kafka_node_goes_down_test_group_new"),
)
.await;
Expand Down Expand Up @@ -723,7 +735,7 @@ pub async fn produce_consume_commit_offsets_partitions1(

let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("consumer_group_with_offsets"),
)
.await;
Expand Down Expand Up @@ -791,7 +803,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
// The new consumer should consume Message2 which is at the last uncommitted offset
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("consumer_group_with_offsets"),
)
.await;
Expand All @@ -809,7 +821,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
// The new consumer should still consume Message2 as its offset has not been committed
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("consumer_group_with_offsets"),
)
.await;
Expand All @@ -827,7 +839,7 @@ pub async fn produce_consume_commit_offsets_partitions1(
// A new consumer in another group should consume from the beginning since auto.offset.reset = earliest and enable.auto.commit false
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("consumer_group_without_offsets"),
)
.await;
Expand All @@ -851,7 +863,7 @@ pub async fn produce_consume_partitions3(
let producer = connection_builder.connect_producer("1", 0).await;
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("some_group")
.with_fetch_min_bytes(fetch_min_bytes)
.with_fetch_max_wait_ms(fetch_wait_max_ms),
Expand Down Expand Up @@ -900,6 +912,60 @@ pub async fn produce_consume_partitions3(
}
}

pub async fn produce_consume_multi_topic_consumer(connection_builder: &KafkaConnectionBuilder) {
let producer = connection_builder.connect_producer("1", 0).await;
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec![
"multi_topic_consumer_1".to_owned(),
"multi_topic_consumer_2".to_owned(),
])
.with_group("multi_topics_group"),
)
.await;

for _ in 0..5 {
producer
.assert_produce(
Record {
payload: "Message1",
topic_name: "multi_topic_consumer_1",
key: Some("Key".into()),
},
// We cant predict the offsets since that will depend on which partition the keyless record ends up in
None,
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name: "multi_topic_consumer_2",
key: None,
},
None,
)
.await;

consumer
.assert_consume_in_any_order(vec![
ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: "multi_topic_consumer_1".to_owned(),
offset: None,
},
ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: "multi_topic_consumer_2".to_owned(),
offset: None,
},
])
.await;
}
}

async fn produce_consume_transactions_with_abort(connection_builder: &KafkaConnectionBuilder) {
let producer = connection_builder.connect_producer("1", 0).await;
for i in 0..5 {
Expand Down Expand Up @@ -930,13 +996,13 @@ async fn produce_consume_transactions_with_abort(connection_builder: &KafkaConne
.await;
let mut consumer_topic_in = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transactions1_in".to_owned())
ConsumerConfig::consume_from_topics(vec!["transactions1_in".to_owned()])
.with_group("some_group1"),
)
.await;
let mut consumer_topic_out = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transactions1_out".to_owned())
ConsumerConfig::consume_from_topics(vec!["transactions1_out".to_owned()])
.with_isolation_level(IsolationLevel::ReadCommitted)
.with_group("some_group2"),
)
Expand Down Expand Up @@ -1025,21 +1091,21 @@ async fn produce_consume_transactions_with_commit(connection_builder: &KafkaConn
.await;
let mut consumer_topic_in = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transactions2_in".to_owned())
ConsumerConfig::consume_from_topics(vec!["transactions2_in".to_owned()])
.with_group("some_group1"),
)
.await;
let mut consumer_topic_out_committed = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transactions2_out".to_owned())
ConsumerConfig::consume_from_topics(vec!["transactions2_out".to_owned()])
.with_isolation_level(IsolationLevel::ReadCommitted)
.with_group("some_group2"),
)
.await;

let mut consumer_topic_out_uncommitted = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transactions2_out".to_owned())
ConsumerConfig::consume_from_topics(vec!["transactions2_out".to_owned()])
.with_isolation_level(IsolationLevel::ReadUncommitted)
.with_group("some_group3"),
)
Expand Down Expand Up @@ -1159,7 +1225,7 @@ async fn produce_consume_transactions_with_commit(connection_builder: &KafkaConn
// send_offsets_to_transaction should result in commits to the consumer offset of the input topic
let mut consumer_topic_in_new = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transactions2_in".to_owned())
ConsumerConfig::consume_from_topics(vec!["transactions2_in".to_owned()])
.with_group("some_group1"),
)
.await;
Expand Down Expand Up @@ -1192,7 +1258,8 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {

let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"),
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("some_group"),
)
.await;

Expand Down Expand Up @@ -1221,7 +1288,8 @@ pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilde
let mut producer = connection_builder.connect_producer("all", 0).await;
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("partitions3".to_owned()).with_group("some_group"),
ConsumerConfig::consume_from_topics(vec!["partitions3".to_owned()])
.with_group("some_group"),
)
.await;

Expand Down Expand Up @@ -1269,6 +1337,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await;
produce_consume_multi_topic_batch(connection_builder).await;
produce_consume_multi_partition_batch(connection_builder).await;
produce_consume_multi_topic_consumer(connection_builder).await;

// test with minimum limits
produce_consume_partitions3(connection_builder, "partitions3_case1", 1, 0).await;
Expand Down
3 changes: 2 additions & 1 deletion test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ impl KafkaConnectionBuilderCpp {
.create()
.unwrap();

consumer.subscribe(&[&config.topic_name]).unwrap();
let topic_names: Vec<&str> = config.topic_names.iter().map(|x| x.as_str()).collect();
consumer.subscribe(&topic_names).unwrap();
KafkaConsumerCpp { consumer }
}

Expand Down
6 changes: 5 additions & 1 deletion test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ impl KafkaConnectionBuilderJava {
"subscribe",
vec![self.jvm.new_list(
"java.lang.String",
vec![self.jvm.new_string(&consumer_config.topic_name)],
consumer_config
.topic_names
.iter()
.map(|topic_name| self.jvm.new_string(topic_name))
.collect(),
)],
);

Expand Down
6 changes: 3 additions & 3 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,17 +585,17 @@ pub enum OffsetSpec {
}

pub struct ConsumerConfig {
topic_name: String,
topic_names: Vec<String>,
group: String,
fetch_min_bytes: i32,
fetch_max_wait_ms: i32,
isolation_level: IsolationLevel,
}

impl ConsumerConfig {
pub fn consume_from_topic(topic_name: String) -> Self {
pub fn consume_from_topics(topic_names: Vec<String>) -> Self {
Self {
topic_name,
topic_names,
group: "default_group".to_owned(),
fetch_min_bytes: 1,
fetch_max_wait_ms: 500,
Expand Down

0 comments on commit 487ffec

Please sign in to comment.