From f8a15708f552f4a81a9f1081393789689ffa3a64 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 13 Nov 2024 09:50:06 +1100 Subject: [PATCH] KafkaSinkCluster: route ElectLeaders request (#1805) --- .../tests/kafka_int_tests/test_cases.rs | 22 +++++++++++++++++++ .../src/transforms/kafka/sink_cluster/mod.rs | 4 ++++ test-helpers/src/connection/kafka/java.rs | 20 +++++++++++++++++ test-helpers/src/connection/kafka/mod.rs | 10 +++++++++ 4 files changed, 56 insertions(+) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index c8c32a6bf..3602b2a7a 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -133,6 +133,28 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; + // Only supported by java driver + #[allow(irrefutable_let_patterns)] + if let KafkaConnectionBuilder::Java(_) = connection_builder { + // It is not clear how to actually invoke this API in a succesful way. + // At the very least this test case shows that shotover succesfully sends and receives this message type (even if the broker responds with an error) + match admin + .elect_leaders(&[TopicPartition { + topic_name: "partitions1_with_offset".to_owned(), + partition: 0, + }]) + .await + { + Ok(()) => panic!("elect_leaders is expected to fail since an election is not required"), + Err(e) => { + assert_eq!( + format!("{e}"), + "org.apache.kafka.common.errors.ElectionNotNeededException: Leader election not needed for topic partition.\n" + ); + } + } + } + admin.delete_groups(&["some_group", "some_group1"]).await; delete_records_partitions1(&admin, connection_builder).await; delete_records_partitions3(&admin, connection_builder).await; diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 2812a867a..13e64203d 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1048,6 +1048,10 @@ The connection to the client has been closed." body: RequestBody::CreateTopics(_), .. })) => self.route_to_controller(request), + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ElectLeaders(_), + .. + })) => self.route_to_controller(request), // route to all nodes Some(Frame::Kafka(KafkaFrame::Request { diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index f2f09628c..ac1728068 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -779,6 +779,26 @@ impl KafkaAdminJava { .await; } + pub async fn elect_leaders(&self, topic_partitions: &[TopicPartition]) -> Result<()> { + let election_type = self + .jvm + .class("org.apache.kafka.common.ElectionType") + .field("PREFERRED"); + let topic_partitions_java = self.jvm.new_set( + "org.apache.kafka.common.TopicPartition", + topic_partitions + .iter() + .map(|topic_partition| topic_partition_to_java(&self.jvm, topic_partition)) + .collect(), + ); + + self.admin + .call("electLeaders", vec![election_type, topic_partitions_java]) + .call_async_fallible("all", vec![]) + .await + .map(|_| ()) + } + pub async fn create_acls(&self, acls: Vec) { let resource_type = self .jvm diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 7fe493b75..7bd001db3 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -507,6 +507,16 @@ impl KafkaAdmin { } } + pub async fn elect_leaders(&self, topic_partitions: &[TopicPartition]) -> Result<()> { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => { + panic!("rdkafka-rs driver does not support elect_leaders") + } + Self::Java(java) => java.elect_leaders(topic_partitions).await, + } + } + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { match self { #[cfg(feature = "kafka-cpp-driver-tests")]