From 487ffec1311923a104814b31bedb58b92ad7a2a7 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 29 Oct 2024 10:11:58 +1100 Subject: [PATCH] KafkaSinkCluster: test multi topic consuming (#1786) --- .../tests/kafka_int_tests/test_cases.rs | 115 ++++++++++++++---- test-helpers/src/connection/kafka/cpp.rs | 3 +- test-helpers/src/connection/kafka/java.rs | 6 +- test-helpers/src/connection/kafka/mod.rs | 6 +- 4 files changed, 102 insertions(+), 28 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 687bb87b2..9e8cedcd1 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -60,6 +60,16 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { num_partitions: 1, replication_factor: 1, }, + NewTopic { + name: "multi_topic_consumer_1", + num_partitions: 3, + replication_factor: 1, + }, + NewTopic { + name: "multi_topic_consumer_2", + num_partitions: 3, + replication_factor: 1, + }, NewTopic { name: "multi_topic_batch_partitions_1", num_partitions: 1, @@ -166,19 +176,19 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect let mut consumer_partitions_1 = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("multi_topic_batch_partitions_1".to_owned()) + ConsumerConfig::consume_from_topics(vec!["multi_topic_batch_partitions_1".to_owned()]) .with_group("multi_topic_batch_partitions_1_group"), ) .await; let mut consumer_partitions_3 = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("multi_topic_batch_partitions_3".to_owned()) + ConsumerConfig::consume_from_topics(vec!["multi_topic_batch_partitions_3".to_owned()]) .with_group("multi_topic_batch_partitions_3_group"), ) .await; let mut consumer_unknown = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("batch_test_unknown".to_owned()) + ConsumerConfig::consume_from_topics(vec!["batch_test_unknown".to_owned()]) .with_group("batch_test_unknown_group"), ) .await; @@ -276,7 +286,7 @@ pub async fn produce_consume_multi_partition_batch(connection_builder: &KafkaCon let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("multi_partitions_batch".to_owned()) + ConsumerConfig::consume_from_topics(vec!["multi_partitions_batch".to_owned()]) .with_group("multi_partitions_batch_group"), ) .await; @@ -339,7 +349,8 @@ pub async fn produce_consume_partitions1( let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"), + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) + .with_group("some_group"), ) .await; consumer @@ -398,7 +409,8 @@ pub async fn produce_consume_partitions1( // so we test that we can access all records ever created on this topic let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"), + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) + .with_group("some_group"), ) .await; consumer @@ -473,7 +485,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down( let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()) + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) .with_group("kafka_node_goes_down_test_group"), ) .await; @@ -535,7 +547,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down( // so we test that we can access all records ever created on this topic let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()) + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) .with_group("kafka_node_goes_down_test_group_new"), ) .await; @@ -599,7 +611,7 @@ pub async fn produce_consume_partitions1_shotover_nodes_go_down( let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()) + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) .with_group("kafka_node_goes_down_test_group"), ) .await; @@ -672,7 +684,7 @@ pub async fn produce_consume_partitions1_shotover_nodes_go_down( // so we test that we can access all records ever created on this topic let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()) + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) .with_group("kafka_node_goes_down_test_group_new"), ) .await; @@ -723,7 +735,7 @@ pub async fn produce_consume_commit_offsets_partitions1( let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()) + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) .with_group("consumer_group_with_offsets"), ) .await; @@ -791,7 +803,7 @@ pub async fn produce_consume_commit_offsets_partitions1( // The new consumer should consume Message2 which is at the last uncommitted offset let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()) + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) .with_group("consumer_group_with_offsets"), ) .await; @@ -809,7 +821,7 @@ pub async fn produce_consume_commit_offsets_partitions1( // The new consumer should still consume Message2 as its offset has not been committed let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()) + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) .with_group("consumer_group_with_offsets"), ) .await; @@ -827,7 +839,7 @@ pub async fn produce_consume_commit_offsets_partitions1( // A new consumer in another group should consume from the beginning since auto.offset.reset = earliest and enable.auto.commit false let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()) + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) .with_group("consumer_group_without_offsets"), ) .await; @@ -851,7 +863,7 @@ pub async fn produce_consume_partitions3( let producer = connection_builder.connect_producer("1", 0).await; let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()) + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) .with_group("some_group") .with_fetch_min_bytes(fetch_min_bytes) .with_fetch_max_wait_ms(fetch_wait_max_ms), @@ -900,6 +912,60 @@ pub async fn produce_consume_partitions3( } } +pub async fn produce_consume_multi_topic_consumer(connection_builder: &KafkaConnectionBuilder) { + let producer = connection_builder.connect_producer("1", 0).await; + let mut consumer = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topics(vec![ + "multi_topic_consumer_1".to_owned(), + "multi_topic_consumer_2".to_owned(), + ]) + .with_group("multi_topics_group"), + ) + .await; + + for _ in 0..5 { + producer + .assert_produce( + Record { + payload: "Message1", + topic_name: "multi_topic_consumer_1", + key: Some("Key".into()), + }, + // We cant predict the offsets since that will depend on which partition the keyless record ends up in + None, + ) + .await; + producer + .assert_produce( + Record { + payload: "Message2", + topic_name: "multi_topic_consumer_2", + key: None, + }, + None, + ) + .await; + + consumer + .assert_consume_in_any_order(vec![ + ExpectedResponse { + message: "Message1".to_owned(), + key: Some("Key".to_owned()), + topic_name: "multi_topic_consumer_1".to_owned(), + offset: None, + }, + ExpectedResponse { + message: "Message2".to_owned(), + key: None, + topic_name: "multi_topic_consumer_2".to_owned(), + offset: None, + }, + ]) + .await; + } +} + async fn produce_consume_transactions_with_abort(connection_builder: &KafkaConnectionBuilder) { let producer = connection_builder.connect_producer("1", 0).await; for i in 0..5 { @@ -930,13 +996,13 @@ async fn produce_consume_transactions_with_abort(connection_builder: &KafkaConne .await; let mut consumer_topic_in = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("transactions1_in".to_owned()) + ConsumerConfig::consume_from_topics(vec!["transactions1_in".to_owned()]) .with_group("some_group1"), ) .await; let mut consumer_topic_out = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("transactions1_out".to_owned()) + ConsumerConfig::consume_from_topics(vec!["transactions1_out".to_owned()]) .with_isolation_level(IsolationLevel::ReadCommitted) .with_group("some_group2"), ) @@ -1025,13 +1091,13 @@ async fn produce_consume_transactions_with_commit(connection_builder: &KafkaConn .await; let mut consumer_topic_in = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("transactions2_in".to_owned()) + ConsumerConfig::consume_from_topics(vec!["transactions2_in".to_owned()]) .with_group("some_group1"), ) .await; let mut consumer_topic_out_committed = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("transactions2_out".to_owned()) + ConsumerConfig::consume_from_topics(vec!["transactions2_out".to_owned()]) .with_isolation_level(IsolationLevel::ReadCommitted) .with_group("some_group2"), ) @@ -1039,7 +1105,7 @@ async fn produce_consume_transactions_with_commit(connection_builder: &KafkaConn let mut consumer_topic_out_uncommitted = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("transactions2_out".to_owned()) + ConsumerConfig::consume_from_topics(vec!["transactions2_out".to_owned()]) .with_isolation_level(IsolationLevel::ReadUncommitted) .with_group("some_group3"), ) @@ -1159,7 +1225,7 @@ async fn produce_consume_transactions_with_commit(connection_builder: &KafkaConn // send_offsets_to_transaction should result in commits to the consumer offset of the input topic let mut consumer_topic_in_new = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("transactions2_in".to_owned()) + ConsumerConfig::consume_from_topics(vec!["transactions2_in".to_owned()]) .with_group("some_group1"), ) .await; @@ -1192,7 +1258,8 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"), + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) + .with_group("some_group"), ) .await; @@ -1221,7 +1288,8 @@ pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilde let mut producer = connection_builder.connect_producer("all", 0).await; let mut consumer = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("partitions3".to_owned()).with_group("some_group"), + ConsumerConfig::consume_from_topics(vec!["partitions3".to_owned()]) + .with_group("some_group"), ) .await; @@ -1269,6 +1337,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await; produce_consume_multi_topic_batch(connection_builder).await; produce_consume_multi_partition_batch(connection_builder).await; + produce_consume_multi_topic_consumer(connection_builder).await; // test with minimum limits produce_consume_partitions3(connection_builder, "partitions3_case1", 1, 0).await; diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index e19439eb5..718fd1399 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -96,7 +96,8 @@ impl KafkaConnectionBuilderCpp { .create() .unwrap(); - consumer.subscribe(&[&config.topic_name]).unwrap(); + let topic_names: Vec<&str> = config.topic_names.iter().map(|x| x.as_str()).collect(); + consumer.subscribe(&topic_names).unwrap(); KafkaConsumerCpp { consumer } } diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 00baf2b57..ab4c6e953 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -179,7 +179,11 @@ impl KafkaConnectionBuilderJava { "subscribe", vec![self.jvm.new_list( "java.lang.String", - vec![self.jvm.new_string(&consumer_config.topic_name)], + consumer_config + .topic_names + .iter() + .map(|topic_name| self.jvm.new_string(topic_name)) + .collect(), )], ); diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index f738ea2af..3350be7df 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -585,7 +585,7 @@ pub enum OffsetSpec { } pub struct ConsumerConfig { - topic_name: String, + topic_names: Vec, group: String, fetch_min_bytes: i32, fetch_max_wait_ms: i32, @@ -593,9 +593,9 @@ pub struct ConsumerConfig { } impl ConsumerConfig { - pub fn consume_from_topic(topic_name: String) -> Self { + pub fn consume_from_topics(topic_names: Vec) -> Self { Self { - topic_name, + topic_names, group: "default_group".to_owned(), fetch_min_bytes: 1, fetch_max_wait_ms: 500,