From c821fae9200ba0b049fcf1c667e8e21d4d24e4c6 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 28 Aug 2024 13:16:34 +1000 Subject: [PATCH] Test kafka with replication_factor 3 --- shotover-proxy/tests/kafka_int_tests/mod.rs | 16 ++++---- .../tests/kafka_int_tests/test_cases.rs | 38 +++++++++++++++---- .../src/transforms/kafka/sink_cluster/mod.rs | 34 +++++------------ test-helpers/src/connection/kafka/cpp.rs | 4 +- test-helpers/src/connection/kafka/java.rs | 2 +- test-helpers/src/connection/kafka/mod.rs | 2 +- 6 files changed, 53 insertions(+), 43 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index ae937ac1d..d9aa97687 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -115,7 +115,7 @@ async fn cluster_tls(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") .use_tls("tests/test-configs/kafka/tls/certs"); - test_cases::standard_test_suite(&connection_builder).await; + test_cases::cluster_test_suite(&connection_builder).await; tokio::time::timeout( Duration::from_secs(10), @@ -139,7 +139,7 @@ async fn cluster_mtls(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") .use_tls("tests/test-configs/kafka/tls/certs"); - test_cases::standard_test_suite(&connection_builder).await; + test_cases::cluster_test_suite(&connection_builder).await; tokio::time::timeout( Duration::from_secs(10), @@ -254,7 +254,7 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { .await; let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::standard_test_suite(&connection_builder).await; + test_cases::cluster_test_suite(&connection_builder).await; tokio::time::timeout( Duration::from_secs(10), @@ -287,7 +287,7 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) { } let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::standard_test_suite(&connection_builder).await; + test_cases::cluster_test_suite(&connection_builder).await; for shotover in shotovers { tokio::time::timeout( @@ -324,7 +324,7 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { } let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::standard_test_suite(&connection_builder).await; + test_cases::cluster_test_suite(&connection_builder).await; for shotover in shotovers { tokio::time::timeout( @@ -394,7 +394,7 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive let connection_super = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") .use_sasl_scram("super_user", "super_password"); setup_basic_user_acls(&connection_super, "basic_user").await; - test_cases::standard_test_suite(&connection_super).await; + test_cases::cluster_test_suite(&connection_super).await; assert_connection_fails_with_incorrect_password(driver, "super_user").await; // admin requests sent by basic user are unsuccessful @@ -505,7 +505,7 @@ async fn cluster_sasl_scram_over_mtls_multi_shotover(#[case] driver: KafkaDriver let instant = Instant::now(); let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") .use_sasl_scram("super_user", "super_password"); - test_cases::standard_test_suite(&connection_builder).await; + test_cases::cluster_test_suite(&connection_builder).await; // Wait 20s since we started the initial run to ensure that we hit the 15s token lifetime limit tokio::time::sleep_until((instant + Duration::from_secs(20)).into()).await; @@ -549,7 +549,7 @@ async fn cluster_sasl_plain_multi_shotover(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_plain("user", "password"); - test_cases::standard_test_suite(&connection_builder).await; + test_cases::cluster_test_suite(&connection_builder).await; // Test invalid credentials // We perform the regular test suite first in an attempt to catch a scenario diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 2aea653d1..3b3c324e5 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -68,7 +68,7 @@ pub async fn produce_consume_partitions1( topic_name: &str, ) { { - let producer = connection_builder.connect_producer(1).await; + let producer = connection_builder.connect_producer("all").await; // create an initial record to force kafka to create the topic if it doesnt yet exist producer .assert_produce( @@ -174,7 +174,7 @@ pub async fn produce_consume_commit_offsets_partitions1( topic_name: &str, ) { { - let producer = connection_builder.connect_producer(1).await; + let producer = connection_builder.connect_producer("1").await; producer .assert_produce( Record { @@ -295,9 +295,11 @@ pub async fn produce_consume_commit_offsets_partitions1( } } -async fn produce_consume_partitions3(connection_builder: &KafkaConnectionBuilder) { - let topic_name = "partitions3"; - let producer = connection_builder.connect_producer(1).await; +async fn produce_consume_partitions3( + connection_builder: &KafkaConnectionBuilder, + topic_name: &str, +) { + let producer = connection_builder.connect_producer("1").await; let mut consumer = connection_builder .connect_consumer(topic_name, "some_group") .await; @@ -346,7 +348,7 @@ async fn produce_consume_partitions3(connection_builder: &KafkaConnectionBuilder async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { let topic_name = "acks0"; - let producer = connection_builder.connect_producer(0).await; + let producer = connection_builder.connect_producer("0").await; for _ in 0..10 { producer @@ -382,7 +384,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { produce_consume_partitions1(connection_builder, "partitions1").await; produce_consume_partitions1(connection_builder, "unknown_topic").await; produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await; - produce_consume_partitions3(connection_builder).await; + produce_consume_partitions3(connection_builder, "partitions3").await; // Only run this test case on the java driver, // since even without going through shotover the cpp driver fails this test. @@ -405,6 +407,28 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { connection_builder.admin_cleanup().await; } +pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) { + //standard_test_suite(connection_builder).await; + let admin = connection_builder.connect_admin().await; + admin + .create_topics(&[ + NewTopic { + name: "partitions1_rf3", + num_partitions: 1, + replication_factor: 3, + }, + NewTopic { + name: "partitions3_rf3", + num_partitions: 3, + replication_factor: 3, + }, + ]) + .await; + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + produce_consume_partitions1(connection_builder, "partitions1_rf3").await; + //produce_consume_partitions3(connection_builder, "partitions3_rf3").await; +} + pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) { let admin = connection.connect_admin().await; admin diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 9316d0856..c0b40c54c 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -825,32 +825,18 @@ routing message to a random node so that: let destination = if let Some(partition) = topic_meta.partitions.get(partition_index) { - if let Some(node) = self - .nodes - .iter_mut() - .filter(|node| { - partition - .shotover_rack_replica_nodes - .contains(&node.broker_id) - }) - .choose(&mut self.rng) - { - node.broker_id - } else { - tracing::debug!( - "Routing fetch request to replica outside of shotover's rack" + // While technically kafka has some support for fetching from replicas, its quite weird. + // See https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica + // We should never route to replica_nodes from the metadata response ourselves. + // Instead, when its available, we can make use of preferred_read_replica field in the fetch response as an optimization. + // However its always correct to route to the partition.leader_id which is what we do here. + if partition.leader_id == -1 { + let topic_name = Self::format_topic_name(&topic); + tracing::warn!( + "leader_id is unknown for topic {topic_name} at partition index {partition_index}" ); - self.nodes - .iter_mut() - .filter(|node| { - partition - .external_rack_replica_nodes - .contains(&node.broker_id) - }) - .choose(&mut self.rng) - .unwrap() - .broker_id } + partition.leader_id } else { let partition_len = topic_meta.partitions.len(); let topic_name = Self::format_topic_name(&topic); diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index 601986bc7..a9451329a 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -50,13 +50,13 @@ impl KafkaConnectionBuilderCpp { self } - pub async fn connect_producer(&self, acks: i32) -> KafkaProducerCpp { + pub async fn connect_producer(&self, acks: &str) -> KafkaProducerCpp { KafkaProducerCpp { producer: self .client .clone() .set("message.timeout.ms", "5000") - .set("acks", acks.to_string()) + .set("acks", acks) .create() .unwrap(), } diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 15710fd9f..68c4b2634 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -79,7 +79,7 @@ impl KafkaConnectionBuilderJava { self } - pub async fn connect_producer(&self, acks: i32) -> KafkaProducerJava { + pub async fn connect_producer(&self, acks: &str) -> KafkaProducerJava { let mut config = self.base_config.clone(); config.insert("acks".to_owned(), acks.to_string()); config.insert( diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index b01c48852..d11a1caba 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -57,7 +57,7 @@ impl KafkaConnectionBuilder { } } - pub async fn connect_producer(&self, acks: i32) -> KafkaProducer { + pub async fn connect_producer(&self, acks: &str) -> KafkaProducer { match self { #[cfg(feature = "kafka-cpp-driver-tests")] Self::Cpp(cpp) => KafkaProducer::Cpp(cpp.connect_producer(acks).await),