diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index d0d9afb53..6eef6c33c 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -126,7 +126,9 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; - admin.delete_groups(&["some_group"]).await; + admin + .delete_groups(&["some_group", "some_group1", "some_group2"]) + .await; } /// Attempt to make the driver batch produce requests for different topics into the same request diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 1ade57a9c..5603738ec 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -26,12 +26,13 @@ use kafka_protocol::messages::produce_response::{ }; use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, - BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, - FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, - LeaveGroupRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, - OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, - RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, - SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, + BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest, + FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, + InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest, + ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetForLeaderEpochRequest, + OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, + SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, + TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -46,8 +47,9 @@ use scram_over_mtls::{ use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ - AddPartitionsToTxnRequestSplitAndRouter, ListOffsetsRequestSplitAndRouter, - OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, + AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, + ListOffsetsRequestSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter, + ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::hash::Hasher; @@ -709,6 +711,14 @@ impl KafkaSinkCluster { })) => { self.store_group(&mut groups, group_id.clone()); } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DeleteGroups(delete_groups), + .. + })) => { + for group_id in &delete_groups.groups_names { + self.store_group(&mut groups, group_id.clone()); + } + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::InitProducerId(InitProducerIdRequest { @@ -924,12 +934,10 @@ impl KafkaSinkCluster { self.route_to_group_coordinator(message, group_id); } Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::DeleteGroups(groups), + body: RequestBody::DeleteGroups(_), .. })) => { - // TODO: we need to split this up into multiple requests so it can be correctly routed to all possible nodes - let group_id = groups.groups_names.first().unwrap().clone(); - self.route_to_group_coordinator(message, group_id); + self.split_and_route_request::(message)?; } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::TxnOffsetCommit(txn_offset_commit), @@ -1374,6 +1382,29 @@ impl KafkaSinkCluster { result } + /// This method removes all group ids from the DeleteGroups request and returns them split up by their destination. + /// If any topics are unroutable they will have their BrokerId set to -1 + fn split_delete_groups_request_by_destination( + &mut self, + body: &mut DeleteGroupsRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for group_id in body.groups_names.drain(..) { + if let Some(destination) = self.group_to_coordinator_broker.get(&group_id) { + let dest_topics = result.entry(*destination).or_default(); + dest_topics.push(group_id); + } else { + tracing::warn!("no known coordinator for group {group_id:?}, routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"); + let destination = BrokerId(-1); + let dest_topics = result.entry(destination).or_default(); + dest_topics.push(group_id); + } + } + + result + } + /// This method removes all topics from the list offsets request and returns them split up by their destination /// If any topics are unroutable they will have their BrokerId set to -1 fn split_offset_for_leader_epoch_request_by_destination( @@ -1913,6 +1944,10 @@ impl KafkaSinkCluster { body: ResponseBody::Produce(base), .. })) => Self::combine_produce_responses(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DeleteGroups(base), + .. + })) => Self::combine_delete_groups_responses(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::AddPartitionsToTxn(base), version, @@ -2095,6 +2130,25 @@ impl KafkaSinkCluster { Ok(()) } + fn combine_delete_groups_responses( + base_delete_groups: &mut DeleteGroupsResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DeleteGroups(next_delete_groups), + .. + })) = next.frame() + { + base_delete_groups + .results + .extend(std::mem::take(&mut next_delete_groups.results)) + } + } + + Ok(()) + } + fn combine_add_partitions_to_txn( base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse, drain: impl Iterator, diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index 658dbfc4a..d5af7a6a5 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -9,8 +9,8 @@ use crate::{ use kafka_protocol::messages::{ add_partitions_to_txn_request::AddPartitionsToTxnTransaction, list_offsets_request::ListOffsetsTopic, offset_for_leader_epoch_request::OffsetForLeaderTopic, - produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, - OffsetForLeaderEpochRequest, ProduceRequest, TopicName, + produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, + GroupId, ListOffsetsRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName, }; use std::collections::HashMap; @@ -136,3 +136,31 @@ impl RequestSplitAndRouter for OffsetForLeaderEpochRequestSplitAndRouter { request.topics = item; } } + +pub struct DeleteGroupsSplitAndRouter; + +impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter { + type Request = DeleteGroupsRequest; + type SubRequests = Vec; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_delete_groups_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DeleteGroups(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.groups_names = item; + } +}