Skip to content

Commit

Permalink
Add integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 17, 2024
1 parent 20faa0f commit 69d3073
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 8 deletions.
31 changes: 31 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,37 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
}
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_1_rack_single_shotover_broker_idle_timeout(#[case] driver: KafkaDriver) {
let _docker_compose = docker_compose(
"tests/test-configs/kafka/cluster-1-rack/docker-compose-short-idle-timeout.yaml",
);

let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");

// We do not run the regular test suite since there is a race condition where the timeout
// could occur between checking if the connection is live and sending a request.
// In regular kafka usage this is acceptable, the client will just retry.
// But for an integration test this would lead to flakey tests which is unacceptable.
//
// So instead we rely on a test case hits the timeout with plenty of buffer to avoid the race condition.
test_cases::test_broker_idle_timeout(&connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
Expand Down
59 changes: 55 additions & 4 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use futures::{stream::FuturesUnordered, StreamExt};
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse,
KafkaConnectionBuilder, KafkaDriver, NewPartition, NewTopic, Record, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, NewPartition, NewTopic,
Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -712,6 +712,57 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
}
}

pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[NewTopic {
name: "partitions3",
num_partitions: 3,
replication_factor: 1,
}])
.await;
let mut producer = connection_builder.connect_producer("all", 0).await;
let mut consumer = connection_builder
.connect_consumer("partitions3", "some_group")
.await;

// write to some open shotover connections
test_produce_consume_10_times(&mut producer, &mut consumer).await;

// allow the broker idle timeout to expire with plenty of buffer
let broker_idle_timeout = Duration::from_secs(30);
tokio::time::sleep(broker_idle_timeout * 2).await;

// write to some open shotover connections,
// ensuring shotover reopens any connections closed by the broker due to idle timeout.
test_produce_consume_10_times(&mut producer, &mut consumer).await;
}

async fn test_produce_consume_10_times(producer: &mut KafkaProducer, consumer: &mut KafkaConsumer) {
for _ in 0..10 {
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Record {
payload: "initial",
topic_name: "partitions3",
key: Some("Key".into()),
},
None,
)
.await;

consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: "partitions3".to_owned(),
offset: None,
})
.await;
}
}

pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
admin_setup(connection_builder).await;
produce_consume_partitions1(connection_builder, "partitions1").await;
Expand Down Expand Up @@ -759,7 +810,7 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
},
])
.await;
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
tokio::time::sleep(Duration::from_secs(10)).await;
produce_consume_partitions1(connection_builder, "partitions1_rf3").await;
produce_consume_partitions3(connection_builder, "partitions3_rf3").await;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1
services:
kafka0:
image: &image 'bitnami/kafka:3.6.1-debian-11-r24'
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment: &environment
KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
KAFKA_CFG_NODE_ID: 0
ALLOW_PLAINTEXT_LISTENER: "yes"
# Required for high availability
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2

KAFKA_CFG_CONNECTIONS_MAX_IDLE_MS: 30000
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
kafka1:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092"
KAFKA_CFG_NODE_ID: 1
volumes: *volumes
kafka2:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092"
KAFKA_CFG_NODE_ID: 2
volumes: *volumes
11 changes: 7 additions & 4 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,13 @@ impl KafkaConsumer {
expected_response.key, response.key,
"Unexpected key for topic {topic}"
);
assert_eq!(
expected_response.offset, response.offset,
"Unexpected offset for topic {topic}"
);

if expected_response.offset.is_some() {
assert_eq!(
expected_response.offset, response.offset,
"Unexpected offset for topic {topic}"
);
}
}

pub async fn assert_consume_in_any_order(&mut self, expected_responses: Vec<ExpectedResponse>) {
Expand Down

0 comments on commit 69d3073

Please sign in to comment.