From 2855606896936869cac5dcb52c55b556ca6b6c8d Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 26 Aug 2024 15:12:50 +1000 Subject: [PATCH] KafkaSinkCluster route by NodeState --- shotover-proxy/tests/kafka_int_tests/mod.rs | 84 ++++++++++-- .../tests/kafka_int_tests/test_cases.rs | 129 +++++++++++++++++- .../kafka/sink_cluster/connections.rs | 24 ++-- .../src/transforms/kafka/sink_cluster/mod.rs | 79 +++++++---- 4 files changed, 263 insertions(+), 53 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index d9aa97687..2c7df06c7 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -247,21 +247,52 @@ async fn single_sasl_scram_plaintext_source_tls_sink(#[case] driver: KafkaDriver #[case::java(KafkaDriver::Java)] #[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { - let _docker_compose = + let docker_compose = docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml"); - let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml") - .start() + + { + let shotover = + shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml") + .start() + .await; + + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); + test_cases::standard_test_suite(&connection_builder).await; + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); + } + + { + let shotover = + shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml") + .start() + .await; + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); + + test_cases::produce_consume_partitions1_kafka_node_goes_down( + &docker_compose, + &connection_builder, + "kafka_node_goes_down_test", + ) .await; - let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::cluster_test_suite(&connection_builder).await; + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); + test_cases::cluster_test_suite(&connection_builder).await; - tokio::time::timeout( - Duration::from_secs(10), - shotover.shutdown_and_then_consume_events(&[]), - ) - .await - .expect("Shotover did not shutdown within 10s"); + // Shotover can reasonably hit many kinds of errors due to a kafka node down so ignore all of them. + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[EventMatcher::new() + .with_level(Level::Error) + .with_count(Count::Any)]), + ) + .await + .expect("Shotover did not shutdown within 10s"); + } } #[rstest] @@ -376,7 +407,7 @@ async fn cluster_sasl_scram_single_shotover(#[case] driver: KafkaDriver) { async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDriver) { test_helpers::cert::generate_kafka_test_certs(); - let _docker_compose = + let docker_compose = docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml"); // test concurrent connections with different access levels to ensure that: @@ -443,6 +474,35 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive .await .expect("Shotover did not shutdown within 10s"); } + + // Test handling of down kafka nodes. + { + let shotover = shotover_process( + "tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml", + ) + .start() + .await; + + let connection = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") + .use_sasl_scram("super_user", "super_password"); + + test_cases::produce_consume_partitions1_kafka_node_goes_down( + &docker_compose, + &connection, + "kafka_node_goes_down_test", + ) + .await; + + // Shotover can reasonably hit many kinds of errors due to a kafka node down so ignore all of them. + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[EventMatcher::new() + .with_level(Level::Error) + .with_count(Count::Any)]), + ) + .await + .expect("Shotover did not shutdown within 10s"); + } } async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, username: &str) { diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 9c737ee1f..8db91c330 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,8 +1,11 @@ use std::collections::HashMap; -use test_helpers::connection::kafka::{ - Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse, - KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, - ResourceType, TopicPartition, +use test_helpers::{ + connection::kafka::{ + Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse, + KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType, + ResourceSpecifier, ResourceType, TopicPartition, + }, + docker_compose::DockerCompose, }; async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { @@ -169,6 +172,124 @@ pub async fn produce_consume_partitions1( } } +pub async fn produce_consume_partitions1_kafka_node_goes_down( + docker_compose: &DockerCompose, + connection_builder: &KafkaConnectionBuilder, + topic_name: &str, +) { + let admin = connection_builder.connect_admin().await; + admin + .create_topics(&[NewTopic { + name: topic_name, + num_partitions: 1, + replication_factor: 3, + }]) + .await; + + { + let producer = connection_builder.connect_producer("all").await; + // create an initial record to force kafka to create the topic if it doesnt yet exist + producer + .assert_produce( + Record { + payload: "initial", + topic_name, + key: Some("Key"), + }, + Some(0), + ) + .await; + + let mut consumer = connection_builder + .connect_consumer(topic_name, "some_group") + .await; + consumer + .assert_consume(ExpectedResponse { + message: "initial".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(0), + }) + .await; + + docker_compose.kill_service("kafka1"); + + // create and consume records + for i in 0..5 { + producer + .assert_produce( + Record { + payload: "Message1", + topic_name, + key: Some("Key"), + }, + Some(i * 2 + 1), + ) + .await; + producer + .assert_produce( + Record { + payload: "Message2", + topic_name, + key: None, + }, + Some(i * 2 + 2), + ) + .await; + + consumer + .assert_consume(ExpectedResponse { + message: "Message1".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 1), + }) + .await; + + consumer + .assert_consume(ExpectedResponse { + message: "Message2".to_owned(), + key: None, + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 2), + }) + .await; + } + } + + // if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false + // so we test that we can access all records ever created on this topic + let mut consumer = connection_builder + .connect_consumer(topic_name, "some_group") + .await; + consumer + .assert_consume(ExpectedResponse { + message: "initial".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(0), + }) + .await; + for i in 0..5 { + consumer + .assert_consume(ExpectedResponse { + message: "Message1".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 1), + }) + .await; + consumer + .assert_consume(ExpectedResponse { + message: "Message2".to_owned(), + key: None, + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 2), + }) + .await; + } +} + pub async fn produce_consume_commit_offsets_partitions1( connection_builder: &KafkaConnectionBuilder, topic_name: &str, diff --git a/shotover/src/transforms/kafka/sink_cluster/connections.rs b/shotover/src/transforms/kafka/sink_cluster/connections.rs index e9577cd12..e27fe9666 100644 --- a/shotover/src/transforms/kafka/sink_cluster/connections.rs +++ b/shotover/src/transforms/kafka/sink_cluster/connections.rs @@ -207,8 +207,7 @@ impl Connections { let connection = connection_factory .create_connection(address, authorize_scram_over_mtls, sasl_mechanism) - .await - .context("Failed to create a new connection"); + .await; // Update the node state according to whether we can currently open a connection. let node_state = if connection.is_err() { @@ -227,23 +226,24 @@ impl Connections { .unwrap() .state .store(node_state, Ordering::Relaxed); - let connection = connection?; - - // Recreating the node succeeded. - // So store it as the new connection, as long as we werent waiting on any responses in the old connection - let connection = - KafkaConnection::new(authorize_scram_over_mtls, sasl_mechanism, connection, None)?; if old_connection .map(|old| old.pending_requests_count()) .unwrap_or(0) > 0 { - Err(error.context("Succesfully reopened outgoing connection but previous outgoing connection had pending requests.")) - } else { - self.connections.insert(destination, connection); - Ok(()) + return Err(error.context("Outgoing connection had pending requests, those requests/responses are lost so connection recovery cannot be attempted.")); } + + let connection = connection.context("Failed to create a new connection")?; + + // Recreating the node succeeded. + // So store it as the new connection, as long as we werent waiting on any responses in the old connection + let connection = + KafkaConnection::new(authorize_scram_over_mtls, sasl_mechanism, connection, None)?; + + self.connections.insert(destination, connection); + Ok(()) } } diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index c0b40c54c..467b6a488 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -457,7 +457,8 @@ impl KafkaSinkCluster { Instant::now(), Destination::ControlConnection, ) - .await?; + .await + .context("Failed to get control connection")?; connection.send(vec![request])?; Ok(connection.recv().await?.remove(0)) } @@ -474,10 +475,18 @@ impl KafkaSinkCluster { topics.push(topic); } } - fn store_group(&self, groups: &mut Vec, group_id: GroupId) { - if self.group_to_coordinator_broker.get(&group_id).is_none() && !groups.contains(&group_id) - { + let up_node_exists = match self.group_to_coordinator_broker.get(&group_id) { + Some(broker_id) => self + .nodes + .iter() + .find(|node| node.broker_id == *broker_id) + .map(|node| node.is_up()) + .unwrap_or(false), + None => false, + }; + + if !up_node_exists && !groups.contains(&group_id) { groups.push(group_id); } } @@ -1055,7 +1064,7 @@ routing message to a random node so that: } let recent_instant = Instant::now(); - for (destination, requests) in broker_to_routed_requests { + for (destination, mut requests) in broker_to_routed_requests { if let Err(err) = self .connections .get_or_open_connection( @@ -1069,7 +1078,20 @@ routing message to a random node so that: recent_instant, destination, ) - .await? + .await + .with_context(|| { + let request_types: Vec = requests + .requests + .iter_mut() + .map(|x| match x.frame() { + Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => { + format!("{:?}", ApiKey::try_from(header.request_api_key).unwrap()) + } + _ => "Unknown".to_owned(), + }) + .collect(); + format!("Failed to get connection to send requests {request_types:?}") + })? .send(requests.requests) { // Dont retry the send on the new connection since we cant tell if the broker received the request or not. @@ -1760,25 +1782,32 @@ routing message to a random node so that: for (_, topic) in &mut metadata.topics { for partition in &mut topic.partitions { // Deterministically choose a single shotover node in the rack as leader based on topic + partition id - let leader_rack = self - .nodes - .iter() - .find(|x| x.broker_id == *partition.leader_id) - .map(|x| x.rack.clone()) - .unwrap(); - let shotover_nodes_in_rack: Vec<_> = self - .shotover_nodes - .iter() - .filter(|shotover_node| { - leader_rack - .as_ref() - .map(|rack| rack == &shotover_node.rack) - .unwrap_or(true) - }) - .collect(); - let hash = hash_partition(topic.topic_id, partition.partition_index); - let shotover_node = &shotover_nodes_in_rack[hash % shotover_nodes_in_rack.len()]; - partition.leader_id = shotover_node.broker_id; + if partition.leader_id == -1 { + // -1 indicates that the leader is offline, leave it as is so the client can tell the leader is offline. + } else { + let leader_rack = self + .nodes + .iter() + .find(|x| x.broker_id == *partition.leader_id) + .map(|x| x.rack.clone()) + .unwrap_or_else(|| { + panic!("Unable to find leader_id {:?}", partition.leader_id) + }); + let shotover_nodes_in_rack: Vec<_> = self + .shotover_nodes + .iter() + .filter(|shotover_node| { + leader_rack + .as_ref() + .map(|rack| rack == &shotover_node.rack) + .unwrap_or(true) + }) + .collect(); + let hash = hash_partition(topic.topic_id, partition.partition_index); + let shotover_node = + &shotover_nodes_in_rack[hash % shotover_nodes_in_rack.len()]; + partition.leader_id = shotover_node.broker_id; + } // Every replica node has its entire corresponding shotover rack included. // Since we can set as many replica nodes as we like, we take this all out approach.