diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index a6bdce8b9..ae17542f8 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -298,6 +298,37 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { } } +#[rstest] +#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear +async fn cluster_1_rack_single_shotover_broker_idle_timeout(#[case] driver: KafkaDriver) { + let _docker_compose = docker_compose( + "tests/test-configs/kafka/cluster-1-rack/docker-compose-short-idle-timeout.yaml", + ); + + let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml") + .start() + .await; + + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); + + // We do not run the regular test suite since there is a race condition where the timeout + // could occur between checking if the connection is live and sending a request. + // In regular kafka usage this is acceptable, the client will just retry. + // But for an integration test this would lead to flakey tests which is unacceptable. + // + // So instead we rely on a test case hits the timeout with plenty of buffer to avoid the race condition. + test_cases::test_broker_idle_timeout(&connection_builder).await; + + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); +} + #[rstest] #[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] #[case::java(KafkaDriver::Java)] diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 3eecb7628..e00ed37ce 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,10 +1,10 @@ use futures::{stream::FuturesUnordered, StreamExt}; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use test_helpers::{ connection::kafka::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse, - KafkaConnectionBuilder, KafkaDriver, NewPartition, NewTopic, Record, ResourcePatternType, - ResourceSpecifier, ResourceType, TopicPartition, + KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, NewPartition, NewTopic, + Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition, }, docker_compose::DockerCompose, }; @@ -712,6 +712,61 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { } } +pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; + admin + .create_topics(&[NewTopic { + name: "partitions3", + num_partitions: 3, + replication_factor: 1, + }]) + .await; + + // cpp driver hits race condition here + tokio::time::sleep(Duration::from_secs(2)).await; + + let mut producer = connection_builder.connect_producer("all", 0).await; + let mut consumer = connection_builder + .connect_consumer("partitions3", "some_group") + .await; + + // write to some open shotover connections + test_produce_consume_10_times(&mut producer, &mut consumer).await; + + // allow the broker idle timeout to expire with plenty of buffer + let broker_idle_timeout = Duration::from_secs(20); + tokio::time::sleep(broker_idle_timeout.mul_f32(1.5)).await; + + // write to some open shotover connections, + // ensuring shotover reopens any connections closed by the broker due to idle timeout. + test_produce_consume_10_times(&mut producer, &mut consumer).await; +} + +async fn test_produce_consume_10_times(producer: &mut KafkaProducer, consumer: &mut KafkaConsumer) { + for _ in 0..10 { + // create an initial record to force kafka to create the topic if it doesnt yet exist + producer + .assert_produce( + Record { + payload: "initial", + topic_name: "partitions3", + key: Some("Key".into()), + }, + None, + ) + .await; + + consumer + .assert_consume(ExpectedResponse { + message: "initial".to_owned(), + key: Some("Key".to_owned()), + topic_name: "partitions3".to_owned(), + offset: None, + }) + .await; + } +} + pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { admin_setup(connection_builder).await; produce_consume_partitions1(connection_builder, "partitions1").await; @@ -759,7 +814,7 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { }, ]) .await; - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(10)).await; produce_consume_partitions1(connection_builder, "partitions1_rf3").await; produce_consume_partitions3(connection_builder, "partitions3_rf3").await; } diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/docker-compose-short-idle-timeout.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/docker-compose-short-idle-timeout.yaml new file mode 100644 index 000000000..7701306be --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/docker-compose-short-idle-timeout.yaml @@ -0,0 +1,56 @@ +networks: + cluster_subnet: + name: cluster_subnet + driver: bridge + ipam: + driver: default + config: + - subnet: 172.16.1.0/24 + gateway: 172.16.1.1 +services: + kafka0: + image: &image 'bitnami/kafka:3.6.1-debian-11-r24' + networks: + cluster_subnet: + ipv4_address: 172.16.1.2 + environment: &environment + KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093" + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT" + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_CFG_PROCESS_ROLES: "controller,broker" + KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093,3@kafka3:9093" + KAFKA_CFG_NODE_ID: 0 + ALLOW_PLAINTEXT_LISTENER: "yes" + # Required for high availability + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 + KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2 + + # connections.max.idle.ms is set to 20s for testing shotovers handling of idle connection timeouts + KAFKA_CFG_CONNECTIONS_MAX_IDLE_MS: 20000 + volumes: &volumes + - type: tmpfs + target: /bitnami/kafka + kafka1: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.3 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092" + KAFKA_CFG_NODE_ID: 1 + volumes: *volumes + kafka2: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.4 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092" + KAFKA_CFG_NODE_ID: 2 + volumes: *volumes diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 42cbfc9a0..eb561c126 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -162,10 +162,13 @@ impl KafkaConsumer { expected_response.key, response.key, "Unexpected key for topic {topic}" ); - assert_eq!( - expected_response.offset, response.offset, - "Unexpected offset for topic {topic}" - ); + + if expected_response.offset.is_some() { + assert_eq!( + expected_response.offset, response.offset, + "Unexpected offset for topic {topic}" + ); + } } pub async fn assert_consume_in_any_order(&mut self, expected_responses: Vec) {