From c0a6cd65f346295cf0677739f165f979c7cb7912 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 26 Aug 2024 15:12:50 +1000 Subject: [PATCH] KafkaSinkCluster route by NodeState --- shotover-proxy/tests/kafka_int_tests/mod.rs | 37 +++++- .../tests/kafka_int_tests/test_cases.rs | 120 +++++++++++++++++- .../src/transforms/kafka/sink_cluster/mod.rs | 56 ++++++-- .../src/transforms/kafka/sink_cluster/node.rs | 1 + 4 files changed, 196 insertions(+), 18 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index ae937ac1d..e4a161942 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -247,7 +247,7 @@ async fn single_sasl_scram_plaintext_source_tls_sink(#[case] driver: KafkaDriver #[case::java(KafkaDriver::Java)] #[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { - let _docker_compose = + let docker_compose = docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml"); let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml") .start() @@ -256,6 +256,13 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::standard_test_suite(&connection_builder).await; + test_cases::produce_consume_partitions1_kafka_node_goes_down( + &docker_compose, + &connection_builder, + "a176a97a-b98f-4de7-a54a-aee32a874ed2", + ) + .await; + tokio::time::timeout( Duration::from_secs(10), shotover.shutdown_and_then_consume_events(&[]), @@ -376,7 +383,7 @@ async fn cluster_sasl_scram_single_shotover(#[case] driver: KafkaDriver) { async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDriver) { test_helpers::cert::generate_kafka_test_certs(); - let _docker_compose = + let docker_compose = docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml"); // test concurrent connections with different access levels to ensure that: @@ -443,6 +450,32 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive .await .expect("Shotover did not shutdown within 10s"); } + + // Test handling of down kafka nodes. + { + let shotover = shotover_process( + "tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml", + ) + .start() + .await; + + let connection = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") + .use_sasl_scram("super_user", "super_password"); + + test_cases::produce_consume_partitions1_kafka_node_goes_down( + &docker_compose, + &connection, + "a176a97a-b98f-4de7-a54a-aee32a874ed2", + ) + .await; + + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); + } } async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, username: &str) { diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 2aea653d1..fd633cfbd 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,8 +1,11 @@ use std::collections::HashMap; -use test_helpers::connection::kafka::{ - Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse, - KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, - ResourceType, TopicPartition, +use test_helpers::{ + connection::kafka::{ + Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse, + KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType, + ResourceSpecifier, ResourceType, TopicPartition, + }, + docker_compose::DockerCompose, }; async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { @@ -169,6 +172,115 @@ pub async fn produce_consume_partitions1( } } +pub async fn produce_consume_partitions1_kafka_node_goes_down( + docker_compose: &DockerCompose, + connection_builder: &KafkaConnectionBuilder, + topic_name: &str, +) { + { + let producer = connection_builder.connect_producer(1).await; + // create an initial record to force kafka to create the topic if it doesnt yet exist + producer + .assert_produce( + Record { + payload: "initial", + topic_name, + key: Some("Key"), + }, + Some(0), + ) + .await; + + let mut consumer = connection_builder + .connect_consumer(topic_name, "some_group") + .await; + consumer + .assert_consume(ExpectedResponse { + message: "initial".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(0), + }) + .await; + + docker_compose.kill_service("kafka1"); + + // create and consume records + for i in 0..5 { + producer + .assert_produce( + Record { + payload: "Message1", + topic_name, + key: Some("Key"), + }, + Some(i * 2 + 1), + ) + .await; + producer + .assert_produce( + Record { + payload: "Message2", + topic_name, + key: None, + }, + Some(i * 2 + 2), + ) + .await; + + consumer + .assert_consume(ExpectedResponse { + message: "Message1".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 1), + }) + .await; + + consumer + .assert_consume(ExpectedResponse { + message: "Message2".to_owned(), + key: None, + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 2), + }) + .await; + } + } + + // if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false + // so we test that we can access all records ever created on this topic + let mut consumer = connection_builder + .connect_consumer(topic_name, "some_group") + .await; + consumer + .assert_consume(ExpectedResponse { + message: "initial".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(0), + }) + .await; + for i in 0..5 { + consumer + .assert_consume(ExpectedResponse { + message: "Message1".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 1), + }) + .await; + consumer + .assert_consume(ExpectedResponse { + message: "Message2".to_owned(), + key: None, + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 2), + }) + .await; + } +} + pub async fn produce_consume_commit_offsets_partitions1( connection_builder: &KafkaConnectionBuilder, topic_name: &str, diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index c16fbb561..f8b6d6e22 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -827,29 +827,61 @@ routing message to a random node so that: { if let Some(node) = self .nodes - .iter_mut() + .iter() .filter(|node| { partition .shotover_rack_replica_nodes .contains(&node.broker_id) + && node.state.load(Ordering::Relaxed) == NodeState::Up }) .choose(&mut self.rng) { node.broker_id - } else { + } else if let Some(node) = self + .nodes + .iter() + .filter(|node| { + partition + .external_rack_replica_nodes + .contains(&node.broker_id) + && node.state.load(Ordering::Relaxed) == NodeState::Up + }) + .choose(&mut self.rng) + { tracing::debug!( "Routing fetch request to replica outside of shotover's rack" ); - self.nodes - .iter_mut() - .filter(|node| { - partition - .external_rack_replica_nodes - .contains(&node.broker_id) - }) - .choose(&mut self.rng) - .unwrap() - .broker_id + node.broker_id + } else if let Some(node) = self + .nodes + .iter() + .filter(|node| { + partition + .shotover_rack_replica_nodes + .contains(&node.broker_id) + }) + .choose(&mut self.rng) + { + tracing::debug!( + "Routing fetch request to down node (maybe its come back up by now)" + ); + node.broker_id + } else if let Some(node) = self + .nodes + .iter() + .filter(|node| { + partition + .external_rack_replica_nodes + .contains(&node.broker_id) + }) + .choose(&mut self.rng) + { + tracing::debug!( + "Routing fetch request to down replica outside of shotover's rack (maybe its come back up by now)" + ); + node.broker_id + } else { + panic!("partition metadata is invalid, contains no nodes") } } else { let partition_len = topic_meta.partitions.len(); diff --git a/shotover/src/transforms/kafka/sink_cluster/node.rs b/shotover/src/transforms/kafka/sink_cluster/node.rs index b3d274349..3126d6370 100644 --- a/shotover/src/transforms/kafka/sink_cluster/node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/node.rs @@ -297,6 +297,7 @@ impl KafkaNode { } #[atomic_enum] +#[derive(PartialEq)] pub enum NodeState { Up, Down,