From 4853f8499b9fd397c22a2d9b2551aa3e1760c61b Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 18 Sep 2024 13:19:06 +1000 Subject: [PATCH] KafkaSinkCluster: handle broker closing connection due to idle timeout (#1752) --- shotover-proxy/tests/kafka_int_tests/mod.rs | 33 +++++++++- .../tests/kafka_int_tests/test_cases.rs | 63 +++++++++++++++++-- .../docker-compose-short-idle-timeout.yaml | 56 +++++++++++++++++ shotover/src/connection.rs | 19 ++++-- .../kafka/sink_cluster/connections.rs | 34 +++++++++- .../scram_over_mtls/connection.rs | 4 ++ test-helpers/src/connection/kafka/mod.rs | 11 ++-- 7 files changed, 205 insertions(+), 15 deletions(-) create mode 100644 shotover-proxy/tests/test-configs/kafka/cluster-1-rack/docker-compose-short-idle-timeout.yaml diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index a6bdce8b9..e0376adf7 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -298,6 +298,37 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { } } +#[rstest] +#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear +async fn cluster_1_rack_single_shotover_broker_idle_timeout(#[case] driver: KafkaDriver) { + let _docker_compose = docker_compose( + "tests/test-configs/kafka/cluster-1-rack/docker-compose-short-idle-timeout.yaml", + ); + + let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml") + .start() + .await; + + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); + + // We do not run the regular test suite since there is a race condition where the timeout + // could occur between checking if the connection is live and sending a request. + // In regular kafka usage this is acceptable, the client will just retry. + // But for an integration test this would lead to flakey tests which is unacceptable. + // + // So instead we rely on a test case hits the timeout with plenty of buffer to avoid the race condition. + test_cases::test_broker_idle_timeout(&connection_builder).await; + + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); +} + #[rstest] #[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] #[case::java(KafkaDriver::Java)] @@ -656,7 +687,7 @@ fn multi_shotover_events(driver: KafkaDriver) -> Vec { .with_level(Level::Warn) .with_target("shotover::transforms::kafka::sink_cluster") .with_message( - r#"no known coordinator for GroupId("some_group"), routing message to a random node so that a NOT_COORDINATOR or similar error is returned to the client"#, + r#"no known coordinator for GroupId("some_group"), routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"#, ) .with_count(Count::Any)] } else { diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 3eecb7628..e00ed37ce 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,10 +1,10 @@ use futures::{stream::FuturesUnordered, StreamExt}; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use test_helpers::{ connection::kafka::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse, - KafkaConnectionBuilder, KafkaDriver, NewPartition, NewTopic, Record, ResourcePatternType, - ResourceSpecifier, ResourceType, TopicPartition, + KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, NewPartition, NewTopic, + Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition, }, docker_compose::DockerCompose, }; @@ -712,6 +712,61 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { } } +pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; + admin + .create_topics(&[NewTopic { + name: "partitions3", + num_partitions: 3, + replication_factor: 1, + }]) + .await; + + // cpp driver hits race condition here + tokio::time::sleep(Duration::from_secs(2)).await; + + let mut producer = connection_builder.connect_producer("all", 0).await; + let mut consumer = connection_builder + .connect_consumer("partitions3", "some_group") + .await; + + // write to some open shotover connections + test_produce_consume_10_times(&mut producer, &mut consumer).await; + + // allow the broker idle timeout to expire with plenty of buffer + let broker_idle_timeout = Duration::from_secs(20); + tokio::time::sleep(broker_idle_timeout.mul_f32(1.5)).await; + + // write to some open shotover connections, + // ensuring shotover reopens any connections closed by the broker due to idle timeout. + test_produce_consume_10_times(&mut producer, &mut consumer).await; +} + +async fn test_produce_consume_10_times(producer: &mut KafkaProducer, consumer: &mut KafkaConsumer) { + for _ in 0..10 { + // create an initial record to force kafka to create the topic if it doesnt yet exist + producer + .assert_produce( + Record { + payload: "initial", + topic_name: "partitions3", + key: Some("Key".into()), + }, + None, + ) + .await; + + consumer + .assert_consume(ExpectedResponse { + message: "initial".to_owned(), + key: Some("Key".to_owned()), + topic_name: "partitions3".to_owned(), + offset: None, + }) + .await; + } +} + pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { admin_setup(connection_builder).await; produce_consume_partitions1(connection_builder, "partitions1").await; @@ -759,7 +814,7 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { }, ]) .await; - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(10)).await; produce_consume_partitions1(connection_builder, "partitions1_rf3").await; produce_consume_partitions3(connection_builder, "partitions3_rf3").await; } diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/docker-compose-short-idle-timeout.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/docker-compose-short-idle-timeout.yaml new file mode 100644 index 000000000..7701306be --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/docker-compose-short-idle-timeout.yaml @@ -0,0 +1,56 @@ +networks: + cluster_subnet: + name: cluster_subnet + driver: bridge + ipam: + driver: default + config: + - subnet: 172.16.1.0/24 + gateway: 172.16.1.1 +services: + kafka0: + image: &image 'bitnami/kafka:3.6.1-debian-11-r24' + networks: + cluster_subnet: + ipv4_address: 172.16.1.2 + environment: &environment + KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093" + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT" + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_CFG_PROCESS_ROLES: "controller,broker" + KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093,3@kafka3:9093" + KAFKA_CFG_NODE_ID: 0 + ALLOW_PLAINTEXT_LISTENER: "yes" + # Required for high availability + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 + KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2 + + # connections.max.idle.ms is set to 20s for testing shotovers handling of idle connection timeouts + KAFKA_CFG_CONNECTIONS_MAX_IDLE_MS: 20000 + volumes: &volumes + - type: tmpfs + target: /bitnami/kafka + kafka1: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.3 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092" + KAFKA_CFG_NODE_ID: 1 + volumes: *volumes + kafka2: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.4 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092" + KAFKA_CFG_NODE_ID: 2 + volumes: *volumes diff --git a/shotover/src/connection.rs b/shotover/src/connection.rs index 71d512739..df4195748 100644 --- a/shotover/src/connection.rs +++ b/shotover/src/connection.rs @@ -91,11 +91,20 @@ impl SinkConnection { /// This method must only be called when the read or write tasks have closed their `in_` or `out_` channel. /// In this case it is gauranteed that the `connection_closed_` channel will /// have an error sent to it before the closing of `in_` or `out_`. - fn set_get_error(&mut self) -> ConnectionError { + fn get_error_for_close(&mut self) -> ConnectionError { self.error = Some(self.connection_closed_rx.try_recv().unwrap()); self.error.clone().unwrap() } + /// This method can be called at any time. + /// If the connection has hit an error that error will be returned. + pub fn get_error(&mut self) -> Option { + if self.error.is_none() { + self.error = self.connection_closed_rx.try_recv().ok(); + } + self.error.clone() + } + /// Send messages. /// If there is a problem with the connection an error is returned. pub fn send(&mut self, mut messages: Vec) -> Result<(), ConnectionError> { @@ -104,7 +113,9 @@ impl SinkConnection { if let Some(error) = &self.error { Err(error.clone()) } else { - self.out_tx.send(messages).map_err(|_| self.set_get_error()) + self.out_tx + .send(messages) + .map_err(|_| self.get_error_for_close()) } } @@ -137,7 +148,7 @@ impl SinkConnection { } } None => { - return Err(self.set_get_error()); + return Err(self.get_error_for_close()); } } } @@ -188,7 +199,7 @@ impl SinkConnection { initial_count: usize, ) -> Result<(), ConnectionError> { // call this first to ensure the next send call will have an error - let err = self.set_get_error(); + let err = self.get_error_for_close(); if responses.len() == initial_count { // We failed to get any messages return the error. diff --git a/shotover/src/transforms/kafka/sink_cluster/connections.rs b/shotover/src/transforms/kafka/sink_cluster/connections.rs index 679519afa..79554871c 100644 --- a/shotover/src/transforms/kafka/sink_cluster/connections.rs +++ b/shotover/src/transforms/kafka/sink_cluster/connections.rs @@ -2,7 +2,7 @@ use crate::{ connection::{ConnectionError, SinkConnection}, message::Message, }; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use fnv::FnvBuildHasher; use kafka_protocol::{messages::BrokerId, protocol::StrBytes}; use metrics::Counter; @@ -78,7 +78,30 @@ impl Connections { match self.get_connection_state(recent_instant, destination) { ConnectionState::Open => { - // connection already open + let connection = self.connections.get_mut(&destination).unwrap(); + // connection already exists so we can just use it. + // however if it has an error we need to recreate it. + if let Some(error) = connection.get_error() { + if connection.pending_requests_count() > 0 { + return Err(anyhow!(error).context("get_or_open_connection: Outgoing connection had pending requests, those requests/responses are lost so connection recovery cannot be attempted.")); + } + self.create_and_insert_connection( + rng, + connection_factory, + authorize_scram_over_mtls, + sasl_mechanism, + nodes, + node, + contact_points, + None, + destination, + ) + .await + .with_context(|| { + format!("Failed to recreate connection after encountering error {error:?}") + })?; + tracing::info!("Recreated connection after it hit error {error:?}") + } } ConnectionState::Unopened => { self.create_and_insert_connection( @@ -310,6 +333,13 @@ impl KafkaConnection { } } + pub fn get_error(&mut self) -> Option { + match self { + KafkaConnection::Regular(c) => c.get_error(), + KafkaConnection::ScramOverMtls(c) => c.get_error(), + } + } + /// Number of requests waiting on a response. /// The count includes requests that will have a dummy response generated by shotover. pub fn pending_requests_count(&self) -> usize { diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs index cf0fdd11d..6762fea36 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs @@ -86,6 +86,10 @@ impl ScramOverMtlsConnection { } } + pub fn get_error(&mut self) -> Option { + self.connection.get_error() + } + pub fn pending_requests_count(&self) -> usize { self.connection.pending_requests_count() + self diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 42cbfc9a0..eb561c126 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -162,10 +162,13 @@ impl KafkaConsumer { expected_response.key, response.key, "Unexpected key for topic {topic}" ); - assert_eq!( - expected_response.offset, response.offset, - "Unexpected offset for topic {topic}" - ); + + if expected_response.offset.is_some() { + assert_eq!( + expected_response.offset, response.offset, + "Unexpected offset for topic {topic}" + ); + } } pub async fn assert_consume_in_any_order(&mut self, expected_responses: Vec) {