Skip to content

Commit

Permalink
Add integration tests for fetch.min.bytes and fetch.wait.max.ms.
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 26, 2024
1 parent 0eb9ff1 commit 0894590
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 36 deletions.
112 changes: 90 additions & 22 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use futures::{stream::FuturesUnordered, StreamExt};
use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse,
KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, NewPartition, NewTopic,
Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer,
NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicPartition,
},
docker_compose::DockerCompose,
};
Expand All @@ -28,6 +29,26 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_case1",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_case2",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_case3",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_case4",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "acks0",
num_partitions: 1,
Expand Down Expand Up @@ -117,18 +138,21 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect

let mut consumer_partitions_1 = connection_builder
.connect_consumer(
"multi_topic_batch_partitions_1",
"multi_topic_batch_partitions_1_group",
ConsumerConfig::consume_from_topic("multi_topic_batch_partitions_1".to_owned())
.with_group("multi_topic_batch_partitions_1_group"),
)
.await;
let mut consumer_partitions_3 = connection_builder
.connect_consumer(
"multi_topic_batch_partitions_3",
"multi_topic_batch_partitions_3_group",
ConsumerConfig::consume_from_topic("multi_topic_batch_partitions_3".to_owned())
.with_group("multi_topic_batch_partitions_3_group"),
)
.await;
let mut consumer_unknown = connection_builder
.connect_consumer("batch_test_unknown", "batch_test_unknown_group")
.connect_consumer(
ConsumerConfig::consume_from_topic("batch_test_unknown".to_owned())
.with_group("batch_test_unknown_group"),
)
.await;

tokio::join!(
Expand Down Expand Up @@ -223,7 +247,10 @@ pub async fn produce_consume_multi_partition_batch(connection_builder: &KafkaCon
.await;

let mut consumer = connection_builder
.connect_consumer("multi_partitions_batch", "multi_partitions_batch_group")
.connect_consumer(
ConsumerConfig::consume_from_topic("multi_partitions_batch".to_owned())
.with_group("multi_partitions_batch_group"),
)
.await;

consumer
Expand Down Expand Up @@ -283,7 +310,9 @@ pub async fn produce_consume_partitions1(
.await;

let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -340,7 +369,9 @@ pub async fn produce_consume_partitions1(
// if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -413,7 +444,10 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
.await;

let mut consumer = connection_builder
.connect_consumer(topic_name, "kafka_node_goes_down_test_group")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("kafka_node_goes_down_test_group"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -472,7 +506,10 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
// if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder
.connect_consumer(topic_name, "kafka_node_goes_down_test_group_new")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("kafka_node_goes_down_test_group_new"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -520,7 +557,10 @@ pub async fn produce_consume_commit_offsets_partitions1(
.await;

let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_with_offsets")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("consumer_group_with_offsets"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -585,7 +625,10 @@ pub async fn produce_consume_commit_offsets_partitions1(
{
// The new consumer should consume Message2 which is at the last uncommitted offset
let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_with_offsets")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("consumer_group_with_offsets"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand All @@ -600,7 +643,10 @@ pub async fn produce_consume_commit_offsets_partitions1(
{
// The new consumer should still consume Message2 as its offset has not been committed
let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_with_offsets")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("consumer_group_with_offsets"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand All @@ -615,7 +661,10 @@ pub async fn produce_consume_commit_offsets_partitions1(
{
// A new consumer in another group should consume from the beginning since auto.offset.reset = earliest and enable.auto.commit false
let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_without_offsets")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("consumer_group_without_offsets"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand All @@ -631,10 +680,17 @@ pub async fn produce_consume_commit_offsets_partitions1(
async fn produce_consume_partitions3(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
fetch_min_bytes: i32,
fetch_wait_max_ms: i32,
) {
let producer = connection_builder.connect_producer("1", 0).await;
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("some_group")
.with_fetch_min_bytes(fetch_min_bytes)
.with_fetch_max_wait_ms(fetch_wait_max_ms),
)
.await;

for _ in 0..5 {
Expand Down Expand Up @@ -697,7 +753,9 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
}

let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"),
)
.await;

for j in 0..10 {
Expand Down Expand Up @@ -727,7 +785,9 @@ pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilde

let mut producer = connection_builder.connect_producer("all", 0).await;
let mut consumer = connection_builder
.connect_consumer("partitions3", "some_group")
.connect_consumer(
ConsumerConfig::consume_from_topic("partitions3".to_owned()).with_group("some_group"),
)
.await;

// write to some open shotover connections
Expand Down Expand Up @@ -772,10 +832,18 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_partitions1(connection_builder, "partitions1").await;
produce_consume_partitions1(connection_builder, "unknown_topic").await;
produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await;
produce_consume_partitions3(connection_builder, "partitions3").await;
produce_consume_multi_topic_batch(connection_builder).await;
produce_consume_multi_partition_batch(connection_builder).await;

// test with minimum limits
produce_consume_partitions3(connection_builder, "partitions3_case1", 0, 1).await;
// test with minimum limits that results in a delay
produce_consume_partitions3(connection_builder, "partitions3_case2", 1, 1).await;
// test with default limits
produce_consume_partitions3(connection_builder, "partitions3_case3", 1, 500).await;
// set the bytes limit to 1MB so that we will not reach it and will hit the 100ms timeout every time.
produce_consume_partitions3(connection_builder, "partitions3_case4", 1_000_000, 100).await;

// Only run this test case on the java driver,
// since even without going through shotover the cpp driver fails this test.
#[allow(irrefutable_let_patterns)]
Expand Down Expand Up @@ -816,7 +884,7 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
.await;
tokio::time::sleep(Duration::from_secs(10)).await;
produce_consume_partitions1(connection_builder, "partitions1_rf3").await;
produce_consume_partitions3(connection_builder, "partitions3_rf3").await;
produce_consume_partitions3(connection_builder, "partitions3_rf3", 1, 500).await;
}

pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) {
Expand Down
12 changes: 8 additions & 4 deletions test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
// Allow direct usage of the APIs when the feature is enabled
pub use rdkafka;

use super::{ExpectedResponse, NewPartition, Record, TopicPartition};
use super::{ConsumerConfig, ExpectedResponse, NewPartition, Record, TopicPartition};
use anyhow::Result;
use pretty_assertions::assert_eq;
use rdkafka::admin::AdminClient;
Expand Down Expand Up @@ -63,17 +63,21 @@ impl KafkaConnectionBuilderCpp {
}
}

pub async fn connect_consumer(&self, topic_name: &str, group: &str) -> KafkaConsumerCpp {
pub async fn connect_consumer(&self, config: ConsumerConfig) -> KafkaConsumerCpp {
let consumer: StreamConsumer = self
.client
.clone()
.set("group.id", group)
.set("group.id", &config.group)
.set("session.timeout.ms", "6000")
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "false")
// this has a different name to the java driver 😭
.set("fetch.wait.max.ms", config.fetch_max_wait_ms.to_string())
.set("fetch.min.bytes", config.fetch_min_bytes.to_string())
.create()
.unwrap();
consumer.subscribe(&[topic_name]).unwrap();

consumer.subscribe(&[&config.topic_name]).unwrap();
KafkaConsumerCpp { consumer }
}

Expand Down
24 changes: 17 additions & 7 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{
Acl, AclOperation, AclPermissionType, AlterConfig, ExpectedResponse, NewPartition, NewTopic,
Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition,
Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse,
NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicDescription, TopicPartition,
};
use crate::connection::java::{Jvm, Value};
use anyhow::Result;
Expand Down Expand Up @@ -102,12 +103,20 @@ impl KafkaConnectionBuilderJava {
KafkaProducerJava { jvm, producer }
}

pub async fn connect_consumer(&self, topic_name: &str, group: &str) -> KafkaConsumerJava {
pub async fn connect_consumer(&self, consumer_config: ConsumerConfig) -> KafkaConsumerJava {
let mut config = self.base_config.clone();
config.insert("group.id".to_owned(), group.to_owned());
config.insert("group.id".to_owned(), consumer_config.group);
config.insert("session.timeout.ms".to_owned(), "6000".to_owned());
config.insert("auto.offset.reset".to_owned(), "earliest".to_owned());
config.insert("enable.auto.commit".to_owned(), "false".to_owned());
config.insert(
"fetch.max.wait.ms".to_owned(),
consumer_config.fetch_max_wait_ms.to_string(),
);
config.insert(
"fetch.min.bytes".to_owned(),
consumer_config.fetch_min_bytes.to_string(),
);
config.insert(
"key.deserializer".to_owned(),
"org.apache.kafka.common.serialization.StringDeserializer".to_owned(),
Expand All @@ -123,9 +132,10 @@ impl KafkaConnectionBuilderJava {
);
consumer.call(
"subscribe",
vec![self
.jvm
.new_list("java.lang.String", vec![self.jvm.new_string(topic_name)])],
vec![self.jvm.new_list(
"java.lang.String",
vec![self.jvm.new_string(&consumer_config.topic_name)],
)],
);

let jvm = self.jvm.clone();
Expand Down
40 changes: 37 additions & 3 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ impl KafkaConnectionBuilder {
}
}

pub async fn connect_consumer(&self, topic_name: &str, group: &str) -> KafkaConsumer {
pub async fn connect_consumer(&self, config: ConsumerConfig) -> KafkaConsumer {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(cpp) => KafkaConsumer::Cpp(cpp.connect_consumer(topic_name, group).await),
Self::Java(java) => KafkaConsumer::Java(java.connect_consumer(topic_name, group).await),
Self::Cpp(cpp) => KafkaConsumer::Cpp(cpp.connect_consumer(config).await),
Self::Java(java) => KafkaConsumer::Java(java.connect_consumer(config).await),
}
}

Expand Down Expand Up @@ -398,3 +398,37 @@ pub struct TopicDescription {
// instead they just check if the describe succeeded or failed,
// so this is intentionally left empty for now
}

#[derive(Default)]
pub struct ConsumerConfig {
topic_name: String,
group: String,
fetch_min_bytes: i32,
fetch_max_wait_ms: i32,
}

impl ConsumerConfig {
pub fn consume_from_topic(topic_name: String) -> Self {
Self {
topic_name,
group: "default_group".to_owned(),
fetch_min_bytes: 1,
fetch_max_wait_ms: 500,
}
}

pub fn with_group(mut self, group: &str) -> Self {
self.group = group.to_owned();
self
}

pub fn with_fetch_min_bytes(mut self, fetch_min_bytes: i32) -> Self {
self.fetch_min_bytes = fetch_min_bytes;
self
}

pub fn with_fetch_max_wait_ms(mut self, fetch_max_wait_ms: i32) -> Self {
self.fetch_max_wait_ms = fetch_max_wait_ms;
self
}
}

0 comments on commit 0894590

Please sign in to comment.