diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index bc5054db6..a89f5d4bd 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -3,11 +3,12 @@ use std::{collections::HashMap, time::Duration}; use test_helpers::{ connection::kafka::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig, - ConsumerGroupDescription, DescribeReplicaLogDirInfo, ExpectedResponse, IsolationLevel, - KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, - ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, - OffsetSpec, Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, - TopicPartition, TopicPartitionReplica, TransactionDescription, + ConsumerGroupDescription, ConsumerProtocol, DescribeReplicaLogDirInfo, ExpectedResponse, + IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, + KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, + OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType, + ResourceSpecifier, ResourceType, TopicPartition, TopicPartitionReplica, + TransactionDescription, }, docker_compose::DockerCompose, }; diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 8ffa86493..743378deb 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -32,17 +32,18 @@ use kafka_protocol::messages::produce_response::{ }; use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, - BrokerId, ConsumerGroupHeartbeatRequest, DeleteGroupsRequest, DeleteGroupsResponse, - DeleteRecordsRequest, DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, - DescribeGroupsResponse, DescribeLogDirsResponse, DescribeProducersRequest, - DescribeProducersResponse, DescribeTransactionsRequest, DescribeTransactionsResponse, - EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, - GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, - ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, - MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, - OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, - RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, - SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, + BrokerId, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, + ConsumerGroupHeartbeatRequest, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, + DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse, + DescribeLogDirsResponse, DescribeProducersRequest, DescribeProducersResponse, + DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest, + FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, + InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, + ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, + MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, + OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, + SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, + TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -57,8 +58,8 @@ use scram_over_mtls::{ use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ - AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, - DeleteRecordsRequestSplitAndRouter, DescribeGroupsSplitAndRouter, + AddPartitionsToTxnRequestSplitAndRouter, ConsumerGroupDescribeSplitAndRouter, + DeleteGroupsSplitAndRouter, DeleteRecordsRequestSplitAndRouter, DescribeGroupsSplitAndRouter, DescribeLogDirsSplitAndRouter, DescribeProducersRequestSplitAndRouter, DescribeTransactionsSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter, @@ -748,6 +749,14 @@ impl KafkaSinkCluster { })) => { self.store_group(&mut groups, group_id.clone()); } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ConsumerGroupDescribe(describe), + .. + })) => { + for group_id in &describe.group_ids { + self.store_group(&mut groups, group_id.clone()); + } + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::DeleteGroups(delete_groups), .. @@ -1017,6 +1026,12 @@ impl KafkaSinkCluster { })) => { self.split_and_route_request::(request)?; } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ConsumerGroupDescribe(_), + .. + })) => { + self.split_and_route_request::(request)?; + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::OffsetDelete(offset_delete), .. @@ -1739,6 +1754,29 @@ The connection to the client has been closed." result } + /// This method removes all groups from the ConsumerGroupDescribe request and returns them split up by their destination. + /// If any groups are unroutable they will have their BrokerId set to -1 + fn split_consumer_group_describe_request_by_destination( + &mut self, + body: &mut ConsumerGroupDescribeRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for group in body.group_ids.drain(..) { + if let Some(destination) = self.group_to_coordinator_broker.get(&group) { + let dest_groups = result.entry(*destination).or_default(); + dest_groups.push(group); + } else { + tracing::warn!("no known coordinator for group {group:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"); + let destination = BrokerId(-1); + let dest_groups = result.entry(destination).or_default(); + dest_groups.push(group); + } + } + + result + } + /// This method removes all topics from the list offsets request and returns them split up by their destination /// If any topics are unroutable they will have their BrokerId set to -1 fn split_offset_for_leader_epoch_request_by_destination( @@ -2370,6 +2408,10 @@ The connection to the client has been closed." body: ResponseBody::DescribeGroups(base), .. })) => Self::combine_describe_groups(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ConsumerGroupDescribe(base), + .. + })) => Self::combine_consumer_group_describe(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DescribeLogDirs(base_body), .. @@ -2810,6 +2852,23 @@ The connection to the client has been closed." Ok(()) } + fn combine_consumer_group_describe( + base: &mut ConsumerGroupDescribeResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ConsumerGroupDescribe(next), + .. + })) = next.response.frame() + { + base.groups.extend(std::mem::take(&mut next.groups)); + } + } + + Ok(()) + } + fn combine_add_partitions_to_txn( base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse, drain: impl Iterator, @@ -3188,9 +3247,6 @@ The connection to the client has been closed." // We can consider supporting it when kafka itself starts to support it but we will need to be very // careful to correctly implement the pagination/cursor logic. ApiKey::DescribeTopicPartitionsKey as i16, - // This message type is part of the new consumer group API, we should implement support for it in the future. - // I've disabled it for now to keep the scope down for kafka 3.9 support. - ApiKey::ConsumerGroupDescribeKey as i16, ]; api_versions .api_keys diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index 16b205cc0..640f4b70a 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -11,8 +11,8 @@ use kafka_protocol::messages::{ delete_records_request::DeleteRecordsTopic, describe_producers_request::TopicRequest, list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup, offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData, - AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest, - DescribeGroupsRequest, DescribeLogDirsRequest, DescribeProducersRequest, + AddPartitionsToTxnRequest, BrokerId, ConsumerGroupDescribeRequest, DeleteGroupsRequest, + DeleteRecordsRequest, DescribeGroupsRequest, DescribeLogDirsRequest, DescribeProducersRequest, DescribeTransactionsRequest, GroupId, ListGroupsRequest, ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName, TransactionalId, @@ -393,3 +393,31 @@ impl RequestSplitAndRouter for DescribeGroupsSplitAndRouter { request.groups = item; } } + +pub struct ConsumerGroupDescribeSplitAndRouter; + +impl RequestSplitAndRouter for ConsumerGroupDescribeSplitAndRouter { + type Request = ConsumerGroupDescribeRequest; + type SubRequests = Vec; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_consumer_group_describe_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> &mut Self::Request { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ConsumerGroupDescribe(request), + .. + })) => request, + _ => unreachable!(), + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.group_ids = item; + } +}