diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 3059529c7..7675093b2 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -596,6 +596,43 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { } } +#[rstest] +#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear +async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) { + let _docker_compose = + docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml"); + + // One shotover instance per rack + let mut shotovers = vec![]; + for i in 1..3 { + shotovers.push( + shotover_process(&format!( + "tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml" + )) + .with_config(&format!( + "tests/test-configs/shotover-config/config{i}.yaml" + )) + .with_log_name(&format!("shotover{i}")) + .start() + .await, + ); + } + + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); + test_cases::cluster_test_suite(&connection_builder).await; + + for shotover in shotovers { + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&multi_shotover_events()), + ) + .await + .expect("Shotover did not shutdown within 10s"); + } +} + #[rstest] //#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram #[case::java(KafkaDriver::Java)] diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml new file mode 100644 index 000000000..430d8e1c0 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml @@ -0,0 +1,96 @@ +networks: + cluster_subnet: + name: cluster_subnet + driver: bridge + ipam: + driver: default + config: + - subnet: 172.16.1.0/24 + gateway: 172.16.1.1 +services: + kafka0: + image: &image 'bitnami/kafka:3.9.0-debian-12-r3' + networks: + cluster_subnet: + ipv4_address: 172.16.1.2 + environment: &environment + KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093" + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT" + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_CFG_PROCESS_ROLES: "controller,broker" + KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093,3@kafka3:9093,4@kafka4:9093,5@kafka5:9093" + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_BROKER_RACK: "rack1" + ALLOW_PLAINTEXT_LISTENER: "yes" + # Required for high availability + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 + KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2 + + # This cfg is set to 3000 by default, which for a typical workload reduces the overhead of creating a + # new consumer group by avoiding constant rebalances as each initial consumer joins. + # See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance + # + # However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run. + KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0" + volumes: &volumes + - type: tmpfs + target: /bitnami/kafka + kafka1: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.3 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092" + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_BROKER_RACK: "rack1" + volumes: *volumes + kafka2: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.4 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092" + KAFKA_CFG_NODE_ID: 2 + KAFKA_CFG_BROKER_RACK: "rack1" + volumes: *volumes + kafka3: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.5 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.5:9092" + KAFKA_CFG_NODE_ID: 3 + KAFKA_CFG_BROKER_RACK: "rack2" + volumes: *volumes + kafka4: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.6 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.6:9092" + KAFKA_CFG_NODE_ID: 4 + KAFKA_CFG_BROKER_RACK: "rack2" + volumes: *volumes + kafka5: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.7 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.7:9092" + KAFKA_CFG_NODE_ID: 5 + KAFKA_CFG_BROKER_RACK: "rack2" + volumes: *volumes diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index c684b1e39..5acacd78c 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -3077,6 +3077,33 @@ The connection to the client has been closed." self.rewrite_describe_cluster_response(describe_cluster)?; response.invalidate_cache(); } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ApiVersions(api_versions), + .. + })) => { + let original_size = api_versions.api_keys.len(); + + // List of keys that shotover doesnt support and so should be removed from supported keys list + let disable_keys = [ + // This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION + // So its not clear at all how to implement this and its not even possible to test it. + // Instead lets just ask the client to not send it at all. + // We can consider supporting it when kafka itself starts to support it but we will need to be very + // careful to correctly implement the pagination/cursor logic. + ApiKey::DescribeTopicPartitionsKey as i16, + // This message type is part of the new consumer group API, we should implement support for it in the future. + // I've disabled it for now to keep the scope down for kafka 3.9 support. + ApiKey::ConsumerGroupDescribeKey as i16, + ]; + api_versions + .api_keys + .retain(|x| !disable_keys.contains(&x.api_key)); + + if original_size != api_versions.api_keys.len() { + // only invalidate the cache if we actually removed anything + response.invalidate_cache(); + } + } _ => {} } diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index e8c2e9378..adb99308b 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -20,7 +20,7 @@ pub fn new_moto() -> DockerCompose { docker_compose("tests/transforms/docker-compose-moto.yaml") } -pub static IMAGE_WAITERS: [Image; 11] = [ +pub static IMAGE_WAITERS: [Image; 12] = [ Image { name: "motoserver/moto", log_regex_to_wait_for: r"Press CTRL\+C to quit", @@ -75,6 +75,11 @@ pub static IMAGE_WAITERS: [Image; 11] = [ log_regex_to_wait_for: r"Kafka Server started", timeout: Duration::from_secs(120), }, + Image { + name: "bitnami/kafka:3.9.0-debian-12-r3", + log_regex_to_wait_for: r"Kafka Server started", + timeout: Duration::from_secs(120), + }, Image { name: "opensearchproject/opensearch:2.9.0", log_regex_to_wait_for: r"Node started",