From 8dd584dba04b5f147100722247fced15b0951482 Mon Sep 17 00:00:00 2001
From: Lucas Kent <rubickent@gmail.com>
Date: Tue, 29 Oct 2024 10:56:09 +1100
Subject: [PATCH 1/2] KafkaSinkCluster: route ListGroups request

---
 .../tests/kafka_int_tests/test_cases.rs       | 124 +++++++++++-------
 .../src/transforms/kafka/sink_cluster/mod.rs  |  54 +++++++-
 .../transforms/kafka/sink_cluster/split.rs    |  32 ++++-
 test-helpers/src/connection/kafka/java.rs     |  19 +++
 test-helpers/src/connection/kafka/mod.rs      |   8 ++
 5 files changed, 182 insertions(+), 55 deletions(-)

diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs
index 9e8cedcd1..e96746b5a 100644
--- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs
+++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs
@@ -3,9 +3,10 @@ use std::{collections::HashMap, time::Duration};
 use test_helpers::{
     connection::kafka::{
         Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
-        ExpectedResponse, IsolationLevel, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver,
-        KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata,
-        OffsetSpec, Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
+        ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer,
+        KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic,
+        OffsetAndMetadata, OffsetSpec, Record, ResourcePatternType, ResourceSpecifier,
+        ResourceType, TopicPartition,
     },
     docker_compose::DockerCompose,
 };
@@ -25,11 +26,6 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
                 num_partitions: 1,
                 replication_factor: 1,
             },
-            NewTopic {
-                name: "partitions3",
-                num_partitions: 3,
-                replication_factor: 1,
-            },
             NewTopic {
                 name: "partitions3_case1",
                 num_partitions: 3,
@@ -1367,74 +1363,102 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
             .await;
         produce_consume_partitions1(connection_builder, "partitions1").await;
 
-        let results = admin
-            .list_offsets(HashMap::from([
-                (
-                    TopicPartition {
-                        topic_name: "partitions3_case3".to_owned(),
-                        partition: 0,
-                    },
-                    OffsetSpec::Earliest,
-                ),
-                (
-                    TopicPartition {
-                        topic_name: "partitions3_case3".to_owned(),
-                        partition: 1,
-                    },
-                    OffsetSpec::Earliest,
-                ),
-                (
-                    TopicPartition {
-                        topic_name: "partitions3_case3".to_owned(),
-                        partition: 2,
-                    },
-                    OffsetSpec::Earliest,
-                ),
-                (
-                    TopicPartition {
-                        topic_name: "partitions1".to_owned(),
-                        partition: 0,
-                    },
-                    OffsetSpec::Latest,
-                ),
-            ]))
-            .await;
+        // rdkafka-rs doesnt support these methods
+        list_offsets(&admin).await;
+        list_groups(connection_builder, &admin).await;
+    }
+
+    produce_consume_acks0(connection_builder).await;
+    admin_cleanup(connection_builder).await;
+}
 
-        let expected = HashMap::from([
+async fn list_offsets(admin: &KafkaAdmin) {
+    let results = admin
+        .list_offsets(HashMap::from([
             (
                 TopicPartition {
                     topic_name: "partitions3_case3".to_owned(),
                     partition: 0,
                 },
-                ListOffsetsResultInfo { offset: 0 },
+                OffsetSpec::Earliest,
             ),
             (
                 TopicPartition {
                     topic_name: "partitions3_case3".to_owned(),
                     partition: 1,
                 },
-                ListOffsetsResultInfo { offset: 0 },
+                OffsetSpec::Earliest,
             ),
             (
                 TopicPartition {
                     topic_name: "partitions3_case3".to_owned(),
                     partition: 2,
                 },
-                ListOffsetsResultInfo { offset: 0 },
+                OffsetSpec::Earliest,
             ),
             (
                 TopicPartition {
                     topic_name: "partitions1".to_owned(),
                     partition: 0,
                 },
-                ListOffsetsResultInfo { offset: 11 },
+                OffsetSpec::Latest,
             ),
-        ]);
-        assert_eq!(results, expected);
-    }
+        ]))
+        .await;
 
-    produce_consume_acks0(connection_builder).await;
-    admin_cleanup(connection_builder).await;
+    let expected = HashMap::from([
+        (
+            TopicPartition {
+                topic_name: "partitions3_case3".to_owned(),
+                partition: 0,
+            },
+            ListOffsetsResultInfo { offset: 0 },
+        ),
+        (
+            TopicPartition {
+                topic_name: "partitions3_case3".to_owned(),
+                partition: 1,
+            },
+            ListOffsetsResultInfo { offset: 0 },
+        ),
+        (
+            TopicPartition {
+                topic_name: "partitions3_case3".to_owned(),
+                partition: 2,
+            },
+            ListOffsetsResultInfo { offset: 0 },
+        ),
+        (
+            TopicPartition {
+                topic_name: "partitions1".to_owned(),
+                partition: 0,
+            },
+            ListOffsetsResultInfo { offset: 11 },
+        ),
+    ]);
+    assert_eq!(results, expected);
+}
+
+async fn list_groups(connection_builder: &KafkaConnectionBuilder, admin: &KafkaAdmin) {
+    let mut consumer = connection_builder
+        .connect_consumer(
+            ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()])
+                .with_group("list_groups_test"),
+        )
+        .await;
+    consumer
+        .assert_consume(ExpectedResponse {
+            message: "initial".to_owned(),
+            key: Some("Key".to_owned()),
+            topic_name: "partitions1".to_owned(),
+            offset: Some(0),
+        })
+        .await;
+
+    let actual_results = admin.list_groups().await;
+    if !actual_results.contains(&"list_groups_test".to_owned()) {
+        panic!("Expected to find list_groups_test in {actual_results:?} but was misisng")
+    }
 }
 
 pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs
index 41df07be4..aab214dfd 100644
--- a/shotover/src/transforms/kafka/sink_cluster/mod.rs
+++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs
@@ -29,8 +29,8 @@ use kafka_protocol::messages::{
     AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
     BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest,
     FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
-    InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest,
-    ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
+    InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
+    ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
     OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest,
     ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
     SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
@@ -48,7 +48,7 @@ use scram_over_mtls::{
 use serde::{Deserialize, Serialize};
 use shotover_node::{ShotoverNode, ShotoverNodeConfig};
 use split::{
-    AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
+    AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter,
     ListOffsetsRequestSplitAndRouter, OffsetFetchSplitAndRouter,
     OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
 };
@@ -986,6 +986,11 @@ impl KafkaSinkCluster {
                     ..
                 })) => self.route_to_controller(request),
 
+                Some(Frame::Kafka(KafkaFrame::Request {
+                    body: RequestBody::ListGroups(_),
+                    ..
+                })) => self.split_and_route_request::<ListGroupsSplitAndRouter>(request)?,
+
                 // route to random broker
                 Some(Frame::Kafka(KafkaFrame::Request {
                     body:
@@ -1400,6 +1405,26 @@ impl KafkaSinkCluster {
         result
     }
 
+    fn split_request_by_routing_to_all_brokers(&mut self) -> HashMap<BrokerId, ()> {
+        let mut result: HashMap<BrokerId, ()> = Default::default();
+
+        for broker in self.nodes.iter().filter(|node| {
+            node.is_up()
+                && node
+                    .rack
+                    .as_ref()
+                    .map(|rack| rack == &self.rack)
+                    // If the cluster is not using racks, include all brokers in the list.
+                    // This ensure we get full coverage of the cluster.
+                    // The client driver can filter out the resulting duplicates.
+                    .unwrap_or(true)
+        }) {
+            result.insert(broker.broker_id, ());
+        }
+
+        result
+    }
+
     /// This method removes all groups from the OffsetFetch request and returns them split up by their destination.
     /// If any groups are unroutable they will have their BrokerId set to -1
     fn split_offset_fetch_request_by_destination(
@@ -1973,6 +1998,10 @@ impl KafkaSinkCluster {
                 body: ResponseBody::OffsetFetch(base),
                 ..
             })) => Self::combine_offset_fetch(base, drain)?,
+            Some(Frame::Kafka(KafkaFrame::Response {
+                body: ResponseBody::ListGroups(base),
+                ..
+            })) => Self::combine_list_groups(base, drain)?,
             Some(Frame::Kafka(KafkaFrame::Response {
                 body: ResponseBody::AddPartitionsToTxn(base),
                 version,
@@ -2193,6 +2222,25 @@ impl KafkaSinkCluster {
         Ok(())
     }
 
+    fn combine_list_groups(
+        base_list_groups: &mut ListGroupsResponse,
+        drain: impl Iterator<Item = Message>,
+    ) -> Result<()> {
+        for mut next in drain {
+            if let Some(Frame::Kafka(KafkaFrame::Response {
+                body: ResponseBody::ListGroups(next_list_groups),
+                ..
+            })) = next.frame()
+            {
+                base_list_groups
+                    .groups
+                    .extend(std::mem::take(&mut next_list_groups.groups));
+            }
+        }
+
+        Ok(())
+    }
+
     fn combine_add_partitions_to_txn(
         base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
         drain: impl Iterator<Item = Message>,
diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs
index b5f03d97d..9a7fbb0fd 100644
--- a/shotover/src/transforms/kafka/sink_cluster/split.rs
+++ b/shotover/src/transforms/kafka/sink_cluster/split.rs
@@ -10,8 +10,8 @@ use kafka_protocol::messages::{
     add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
     list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
     offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
-    AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListOffsetsRequest,
-    OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
+    AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListGroupsRequest,
+    ListOffsetsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
 };
 use std::collections::HashMap;
 
@@ -166,6 +166,34 @@ impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter {
     }
 }
 
+pub struct ListGroupsSplitAndRouter;
+
+impl RequestSplitAndRouter for ListGroupsSplitAndRouter {
+    type Request = ListGroupsRequest;
+    type SubRequests = ();
+
+    fn split_by_destination(
+        transform: &mut KafkaSinkCluster,
+        _request: &mut Self::Request,
+    ) -> HashMap<BrokerId, Self::SubRequests> {
+        transform.split_request_by_routing_to_all_brokers()
+    }
+
+    fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
+        match request.frame() {
+            Some(Frame::Kafka(KafkaFrame::Request {
+                body: RequestBody::ListGroups(request),
+                ..
+            })) => Some(request),
+            _ => None,
+        }
+    }
+
+    fn reassemble(_request: &mut Self::Request, _item: Self::SubRequests) {
+        // No need to reassemble, each ListGroups is an exact clone of the original
+    }
+}
+
 pub struct OffsetFetchSplitAndRouter;
 
 impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs
index ab4c6e953..c7c4cd937 100644
--- a/test-helpers/src/connection/kafka/java.rs
+++ b/test-helpers/src/connection/kafka/java.rs
@@ -665,6 +665,25 @@ impl KafkaAdminJava {
         results
     }
 
+    pub async fn list_groups(&self) -> Vec<String> {
+        let java_results = self
+            .admin
+            .call("listConsumerGroups", vec![])
+            .call_async("all", vec![])
+            .await;
+
+        let mut results = vec![];
+        for java_group in java_results.call("iterator", vec![]).into_iter() {
+            results.push(
+                java_group
+                    .cast("org.apache.kafka.clients.admin.ConsumerGroupListing")
+                    .call("groupId", vec![])
+                    .into_rust(),
+            )
+        }
+        results
+    }
+
     pub async fn create_acls(&self, acls: Vec<Acl>) {
         let resource_type = self
             .jvm
diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs
index 3350be7df..2b10cacb7 100644
--- a/test-helpers/src/connection/kafka/mod.rs
+++ b/test-helpers/src/connection/kafka/mod.rs
@@ -453,6 +453,14 @@ impl KafkaAdmin {
         }
     }
 
+    pub async fn list_groups(&self) -> Vec<String> {
+        match self {
+            #[cfg(feature = "kafka-cpp-driver-tests")]
+            Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_offsets"),
+            Self::Java(java) => java.list_groups().await,
+        }
+    }
+
     pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
         match self {
             #[cfg(feature = "kafka-cpp-driver-tests")]

From 03d1477f8d5257315ac7759d6272c534aed14007 Mon Sep 17 00:00:00 2001
From: Lucas Kent <rubickent@gmail.com>
Date: Wed, 30 Oct 2024 07:10:31 +1100
Subject: [PATCH 2/2] fix tests with down shotover node

---
 shotover-proxy/tests/kafka_int_tests/mod.rs   |  6 ++--
 .../tests/kafka_int_tests/test_cases.rs       | 36 +++++++++++++++----
 test-helpers/src/connection/kafka/mod.rs      |  2 +-
 3 files changed, 34 insertions(+), 10 deletions(-)

diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs
index e046aad48..3059529c7 100644
--- a/shotover-proxy/tests/kafka_int_tests/mod.rs
+++ b/shotover-proxy/tests/kafka_int_tests/mod.rs
@@ -426,7 +426,7 @@ async fn cluster_1_rack_multi_shotover_with_1_shotover_down(#[case] driver: Kafk
 
     // create a new connection and produce and consume messages
     let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193");
-    test_cases::cluster_test_suite(&new_connection_builder).await;
+    test_cases::cluster_test_suite_with_lost_shotover_node(&new_connection_builder).await;
 
     let mut expected_events = multi_shotover_events();
     // Other shotover nodes should detect the killed node at least once
@@ -485,7 +485,7 @@ async fn cluster_3_racks_multi_shotover_with_2_shotover_down(#[case] driver: Kaf
 
     // create a new connection and produce and consume messages
     let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193");
-    test_cases::cluster_test_suite(&new_connection_builder).await;
+    test_cases::cluster_test_suite_with_lost_shotover_node(&new_connection_builder).await;
 
     let mut expected_events = multi_shotover_events();
     // The UP shotover node should detect the killed nodes at least once
@@ -537,7 +537,7 @@ async fn cluster_3_racks_multi_shotover_with_1_shotover_missing(#[case] driver:
 
     // Send some produce and consume requests
     let connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9192");
-    test_cases::cluster_test_suite(&connection_builder).await;
+    test_cases::cluster_test_suite_with_lost_shotover_node(&connection_builder).await;
 
     let mut expected_events = multi_shotover_events();
     // Other shotover nodes should detect the missing node at least once
diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs
index e96746b5a..4779e405a 100644
--- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs
+++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs
@@ -1326,7 +1326,7 @@ async fn test_produce_consume_10_times(producer: &mut KafkaProducer, consumer: &
     }
 }
 
-pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
+async fn standard_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
     admin_setup(connection_builder).await;
     produce_consume_partitions1(connection_builder, "partitions1").await;
     produce_consume_partitions1(connection_builder, "unknown_topic").await;
@@ -1363,9 +1363,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
             .await;
         produce_consume_partitions1(connection_builder, "partitions1").await;
 
-        // rdkafka-rs doesnt support these methods
         list_offsets(&admin).await;
-        list_groups(connection_builder, &admin).await;
     }
 
     produce_consume_acks0(connection_builder).await;
@@ -1439,7 +1437,8 @@ async fn list_offsets(admin: &KafkaAdmin) {
     assert_eq!(results, expected);
 }
 
-async fn list_groups(connection_builder: &KafkaConnectionBuilder, admin: &KafkaAdmin) {
+async fn list_groups(connection_builder: &KafkaConnectionBuilder) {
+    let admin = connection_builder.connect_admin().await;
     let mut consumer = connection_builder
         .connect_consumer(
             ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()])
@@ -1461,8 +1460,7 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder, admin: &KafkaA
     }
 }
 
-pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
-    standard_test_suite(connection_builder).await;
+async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
     let admin = connection_builder.connect_admin().await;
     admin
         .create_topics_and_wait(&[
@@ -1482,6 +1480,32 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
     produce_consume_partitions3(connection_builder, "partitions3_rf3", 1, 500).await;
 }
 
+pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnectionBuilder) {
+    // rdkafka-rs doesnt support these methods
+    #[allow(irrefutable_let_patterns)]
+    if let KafkaConnectionBuilder::Java(_) = connection_builder {
+        list_groups(connection_builder).await;
+    }
+}
+
+pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
+    standard_test_suite_base(connection_builder).await;
+    tests_requiring_all_shotover_nodes(connection_builder).await;
+}
+
+pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
+    standard_test_suite_base(connection_builder).await;
+    cluster_test_suite_base(connection_builder).await;
+    tests_requiring_all_shotover_nodes(connection_builder).await;
+}
+
+pub async fn cluster_test_suite_with_lost_shotover_node(
+    connection_builder: &KafkaConnectionBuilder,
+) {
+    standard_test_suite_base(connection_builder).await;
+    cluster_test_suite_base(connection_builder).await;
+}
+
 pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) {
     let admin = connection.connect_admin().await;
     admin
diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs
index 2b10cacb7..839e67ec0 100644
--- a/test-helpers/src/connection/kafka/mod.rs
+++ b/test-helpers/src/connection/kafka/mod.rs
@@ -456,7 +456,7 @@ impl KafkaAdmin {
     pub async fn list_groups(&self) -> Vec<String> {
         match self {
             #[cfg(feature = "kafka-cpp-driver-tests")]
-            Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_offsets"),
+            Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_groups"),
             Self::Java(java) => java.list_groups().await,
         }
     }