From 25827de578c1f82ab55b096cd949262bed4b013f Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 26 Sep 2024 16:00:58 +1000 Subject: [PATCH] Add integration tests for fetch.min.bytes and fetch.wait.max.ms (#1757) --- .../tests/kafka_int_tests/test_cases.rs | 112 ++++++++++++++---- test-helpers/src/connection/kafka/cpp.rs | 12 +- test-helpers/src/connection/kafka/java.rs | 24 ++-- test-helpers/src/connection/kafka/mod.rs | 40 ++++++- 4 files changed, 152 insertions(+), 36 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index e00ed37ce..42414fda4 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -2,9 +2,10 @@ use futures::{stream::FuturesUnordered, StreamExt}; use std::{collections::HashMap, time::Duration}; use test_helpers::{ connection::kafka::{ - Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse, - KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, NewPartition, NewTopic, - Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition, + Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig, + ExpectedResponse, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, + NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, ResourceType, + TopicPartition, }, docker_compose::DockerCompose, }; @@ -28,6 +29,26 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { num_partitions: 3, replication_factor: 1, }, + NewTopic { + name: "partitions3_case1", + num_partitions: 3, + replication_factor: 1, + }, + NewTopic { + name: "partitions3_case2", + num_partitions: 3, + replication_factor: 1, + }, + NewTopic { + name: "partitions3_case3", + num_partitions: 3, + replication_factor: 1, + }, + NewTopic { + name: "partitions3_case4", + num_partitions: 3, + replication_factor: 1, + }, NewTopic { name: "acks0", num_partitions: 1, @@ -117,18 +138,21 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect let mut consumer_partitions_1 = connection_builder .connect_consumer( - "multi_topic_batch_partitions_1", - "multi_topic_batch_partitions_1_group", + ConsumerConfig::consume_from_topic("multi_topic_batch_partitions_1".to_owned()) + .with_group("multi_topic_batch_partitions_1_group"), ) .await; let mut consumer_partitions_3 = connection_builder .connect_consumer( - "multi_topic_batch_partitions_3", - "multi_topic_batch_partitions_3_group", + ConsumerConfig::consume_from_topic("multi_topic_batch_partitions_3".to_owned()) + .with_group("multi_topic_batch_partitions_3_group"), ) .await; let mut consumer_unknown = connection_builder - .connect_consumer("batch_test_unknown", "batch_test_unknown_group") + .connect_consumer( + ConsumerConfig::consume_from_topic("batch_test_unknown".to_owned()) + .with_group("batch_test_unknown_group"), + ) .await; tokio::join!( @@ -223,7 +247,10 @@ pub async fn produce_consume_multi_partition_batch(connection_builder: &KafkaCon .await; let mut consumer = connection_builder - .connect_consumer("multi_partitions_batch", "multi_partitions_batch_group") + .connect_consumer( + ConsumerConfig::consume_from_topic("multi_partitions_batch".to_owned()) + .with_group("multi_partitions_batch_group"), + ) .await; consumer @@ -283,7 +310,9 @@ pub async fn produce_consume_partitions1( .await; let mut consumer = connection_builder - .connect_consumer(topic_name, "some_group") + .connect_consumer( + ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"), + ) .await; consumer .assert_consume(ExpectedResponse { @@ -340,7 +369,9 @@ pub async fn produce_consume_partitions1( // if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false // so we test that we can access all records ever created on this topic let mut consumer = connection_builder - .connect_consumer(topic_name, "some_group") + .connect_consumer( + ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"), + ) .await; consumer .assert_consume(ExpectedResponse { @@ -413,7 +444,10 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down( .await; let mut consumer = connection_builder - .connect_consumer(topic_name, "kafka_node_goes_down_test_group") + .connect_consumer( + ConsumerConfig::consume_from_topic(topic_name.to_owned()) + .with_group("kafka_node_goes_down_test_group"), + ) .await; consumer .assert_consume(ExpectedResponse { @@ -472,7 +506,10 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down( // if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false // so we test that we can access all records ever created on this topic let mut consumer = connection_builder - .connect_consumer(topic_name, "kafka_node_goes_down_test_group_new") + .connect_consumer( + ConsumerConfig::consume_from_topic(topic_name.to_owned()) + .with_group("kafka_node_goes_down_test_group_new"), + ) .await; consumer .assert_consume(ExpectedResponse { @@ -520,7 +557,10 @@ pub async fn produce_consume_commit_offsets_partitions1( .await; let mut consumer = connection_builder - .connect_consumer(topic_name, "consumer_group_with_offsets") + .connect_consumer( + ConsumerConfig::consume_from_topic(topic_name.to_owned()) + .with_group("consumer_group_with_offsets"), + ) .await; consumer .assert_consume(ExpectedResponse { @@ -585,7 +625,10 @@ pub async fn produce_consume_commit_offsets_partitions1( { // The new consumer should consume Message2 which is at the last uncommitted offset let mut consumer = connection_builder - .connect_consumer(topic_name, "consumer_group_with_offsets") + .connect_consumer( + ConsumerConfig::consume_from_topic(topic_name.to_owned()) + .with_group("consumer_group_with_offsets"), + ) .await; consumer .assert_consume(ExpectedResponse { @@ -600,7 +643,10 @@ pub async fn produce_consume_commit_offsets_partitions1( { // The new consumer should still consume Message2 as its offset has not been committed let mut consumer = connection_builder - .connect_consumer(topic_name, "consumer_group_with_offsets") + .connect_consumer( + ConsumerConfig::consume_from_topic(topic_name.to_owned()) + .with_group("consumer_group_with_offsets"), + ) .await; consumer .assert_consume(ExpectedResponse { @@ -615,7 +661,10 @@ pub async fn produce_consume_commit_offsets_partitions1( { // A new consumer in another group should consume from the beginning since auto.offset.reset = earliest and enable.auto.commit false let mut consumer = connection_builder - .connect_consumer(topic_name, "consumer_group_without_offsets") + .connect_consumer( + ConsumerConfig::consume_from_topic(topic_name.to_owned()) + .with_group("consumer_group_without_offsets"), + ) .await; consumer .assert_consume(ExpectedResponse { @@ -631,10 +680,17 @@ pub async fn produce_consume_commit_offsets_partitions1( async fn produce_consume_partitions3( connection_builder: &KafkaConnectionBuilder, topic_name: &str, + fetch_min_bytes: i32, + fetch_wait_max_ms: i32, ) { let producer = connection_builder.connect_producer("1", 0).await; let mut consumer = connection_builder - .connect_consumer(topic_name, "some_group") + .connect_consumer( + ConsumerConfig::consume_from_topic(topic_name.to_owned()) + .with_group("some_group") + .with_fetch_min_bytes(fetch_min_bytes) + .with_fetch_max_wait_ms(fetch_wait_max_ms), + ) .await; for _ in 0..5 { @@ -697,7 +753,9 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { } let mut consumer = connection_builder - .connect_consumer(topic_name, "some_group") + .connect_consumer( + ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"), + ) .await; for j in 0..10 { @@ -727,7 +785,9 @@ pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilde let mut producer = connection_builder.connect_producer("all", 0).await; let mut consumer = connection_builder - .connect_consumer("partitions3", "some_group") + .connect_consumer( + ConsumerConfig::consume_from_topic("partitions3".to_owned()).with_group("some_group"), + ) .await; // write to some open shotover connections @@ -772,10 +832,18 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { produce_consume_partitions1(connection_builder, "partitions1").await; produce_consume_partitions1(connection_builder, "unknown_topic").await; produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await; - produce_consume_partitions3(connection_builder, "partitions3").await; produce_consume_multi_topic_batch(connection_builder).await; produce_consume_multi_partition_batch(connection_builder).await; + // test with minimum limits + produce_consume_partitions3(connection_builder, "partitions3_case1", 1, 0).await; + // test with minimum limits that results in a delay + produce_consume_partitions3(connection_builder, "partitions3_case2", 1, 1).await; + // test with default limits + produce_consume_partitions3(connection_builder, "partitions3_case3", 1, 500).await; + // set the bytes limit to 1MB so that we will not reach it and will hit the 100ms timeout every time. + produce_consume_partitions3(connection_builder, "partitions3_case4", 1_000_000, 100).await; + // Only run this test case on the java driver, // since even without going through shotover the cpp driver fails this test. #[allow(irrefutable_let_patterns)] @@ -816,7 +884,7 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { .await; tokio::time::sleep(Duration::from_secs(10)).await; produce_consume_partitions1(connection_builder, "partitions1_rf3").await; - produce_consume_partitions3(connection_builder, "partitions3_rf3").await; + produce_consume_partitions3(connection_builder, "partitions3_rf3", 1, 500).await; } pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) { diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index 9182b086d..47a44fe76 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet}; // Allow direct usage of the APIs when the feature is enabled pub use rdkafka; -use super::{ExpectedResponse, NewPartition, Record, TopicPartition}; +use super::{ConsumerConfig, ExpectedResponse, NewPartition, Record, TopicPartition}; use anyhow::Result; use pretty_assertions::assert_eq; use rdkafka::admin::AdminClient; @@ -63,17 +63,21 @@ impl KafkaConnectionBuilderCpp { } } - pub async fn connect_consumer(&self, topic_name: &str, group: &str) -> KafkaConsumerCpp { + pub async fn connect_consumer(&self, config: ConsumerConfig) -> KafkaConsumerCpp { let consumer: StreamConsumer = self .client .clone() - .set("group.id", group) + .set("group.id", &config.group) .set("session.timeout.ms", "6000") .set("auto.offset.reset", "earliest") .set("enable.auto.commit", "false") + // this has a different name to the java driver 😭 + .set("fetch.wait.max.ms", config.fetch_max_wait_ms.to_string()) + .set("fetch.min.bytes", config.fetch_min_bytes.to_string()) .create() .unwrap(); - consumer.subscribe(&[topic_name]).unwrap(); + + consumer.subscribe(&[&config.topic_name]).unwrap(); KafkaConsumerCpp { consumer } } diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index aca906de6..21ae4692b 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,6 +1,7 @@ use super::{ - Acl, AclOperation, AclPermissionType, AlterConfig, ExpectedResponse, NewPartition, NewTopic, - Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, + Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse, + NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, ResourceType, + TopicDescription, TopicPartition, }; use crate::connection::java::{Jvm, Value}; use anyhow::Result; @@ -102,12 +103,20 @@ impl KafkaConnectionBuilderJava { KafkaProducerJava { jvm, producer } } - pub async fn connect_consumer(&self, topic_name: &str, group: &str) -> KafkaConsumerJava { + pub async fn connect_consumer(&self, consumer_config: ConsumerConfig) -> KafkaConsumerJava { let mut config = self.base_config.clone(); - config.insert("group.id".to_owned(), group.to_owned()); + config.insert("group.id".to_owned(), consumer_config.group); config.insert("session.timeout.ms".to_owned(), "6000".to_owned()); config.insert("auto.offset.reset".to_owned(), "earliest".to_owned()); config.insert("enable.auto.commit".to_owned(), "false".to_owned()); + config.insert( + "fetch.max.wait.ms".to_owned(), + consumer_config.fetch_max_wait_ms.to_string(), + ); + config.insert( + "fetch.min.bytes".to_owned(), + consumer_config.fetch_min_bytes.to_string(), + ); config.insert( "key.deserializer".to_owned(), "org.apache.kafka.common.serialization.StringDeserializer".to_owned(), @@ -123,9 +132,10 @@ impl KafkaConnectionBuilderJava { ); consumer.call( "subscribe", - vec![self - .jvm - .new_list("java.lang.String", vec![self.jvm.new_string(topic_name)])], + vec![self.jvm.new_list( + "java.lang.String", + vec![self.jvm.new_string(&consumer_config.topic_name)], + )], ); let jvm = self.jvm.clone(); diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index eb561c126..aaecac4c8 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -75,11 +75,11 @@ impl KafkaConnectionBuilder { } } - pub async fn connect_consumer(&self, topic_name: &str, group: &str) -> KafkaConsumer { + pub async fn connect_consumer(&self, config: ConsumerConfig) -> KafkaConsumer { match self { #[cfg(feature = "kafka-cpp-driver-tests")] - Self::Cpp(cpp) => KafkaConsumer::Cpp(cpp.connect_consumer(topic_name, group).await), - Self::Java(java) => KafkaConsumer::Java(java.connect_consumer(topic_name, group).await), + Self::Cpp(cpp) => KafkaConsumer::Cpp(cpp.connect_consumer(config).await), + Self::Java(java) => KafkaConsumer::Java(java.connect_consumer(config).await), } } @@ -398,3 +398,37 @@ pub struct TopicDescription { // instead they just check if the describe succeeded or failed, // so this is intentionally left empty for now } + +#[derive(Default)] +pub struct ConsumerConfig { + topic_name: String, + group: String, + fetch_min_bytes: i32, + fetch_max_wait_ms: i32, +} + +impl ConsumerConfig { + pub fn consume_from_topic(topic_name: String) -> Self { + Self { + topic_name, + group: "default_group".to_owned(), + fetch_min_bytes: 1, + fetch_max_wait_ms: 500, + } + } + + pub fn with_group(mut self, group: &str) -> Self { + self.group = group.to_owned(); + self + } + + pub fn with_fetch_min_bytes(mut self, fetch_min_bytes: i32) -> Self { + self.fetch_min_bytes = fetch_min_bytes; + self + } + + pub fn with_fetch_max_wait_ms(mut self, fetch_max_wait_ms: i32) -> Self { + self.fetch_max_wait_ms = fetch_max_wait_ms; + self + } +}