diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 0c096b37d..aad87edcb 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -3,10 +3,11 @@ use std::{collections::HashMap, time::Duration}; use test_helpers::{ connection::kafka::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig, - ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, - KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, - NewTopic, OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType, - ResourceSpecifier, ResourceType, TopicPartition, TransactionDescription, + ConsumerGroupDescription, ExpectedResponse, IsolationLevel, KafkaAdmin, + KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, ListOffsetsResultInfo, + NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, Record, + RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition, + TransactionDescription, }, docker_compose::DockerCompose, }; @@ -1664,15 +1665,29 @@ async fn list_offsets(admin: &KafkaAdmin) { assert_eq!(results, expected); } -async fn list_groups(connection_builder: &KafkaConnectionBuilder) { - let admin = connection_builder.connect_admin().await; - let mut consumer = connection_builder +async fn list_and_describe_groups(connection_builder: &KafkaConnectionBuilder) { + // create consumers + let mut consumer1 = connection_builder .connect_consumer( ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()]) - .with_group("list_groups_test"), + .with_group("list_groups_test1"), ) .await; - consumer + consumer1 + .assert_consume(ExpectedResponse { + message: "initial".to_owned(), + key: Some("Key".to_owned()), + topic_name: "partitions1".to_owned(), + offset: Some(0), + }) + .await; + let mut consumer2 = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()]) + .with_group("list_groups_test2"), + ) + .await; + consumer2 .assert_consume(ExpectedResponse { message: "initial".to_owned(), key: Some("Key".to_owned()), @@ -1681,10 +1696,36 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder) { }) .await; + // observe consumers + let admin = connection_builder.connect_admin().await; let actual_results = admin.list_groups().await; - if !actual_results.contains(&"list_groups_test".to_owned()) { - panic!("Expected to find \"list_groups_test\" in {actual_results:?} but was missing") + if !actual_results.contains(&"list_groups_test1".to_owned()) { + panic!("Expected to find \"list_groups_test1\" in {actual_results:?} but was missing") + } + if !actual_results.contains(&"list_groups_test2".to_owned()) { + panic!("Expected to find \"list_groups_test2\" in {actual_results:?} but was missing") } + + let result = admin + .describe_groups(&["list_groups_test1", "list_groups_test2"]) + .await; + assert_eq!( + result, + HashMap::from([ + ( + "list_groups_test1".to_owned(), + ConsumerGroupDescription { + is_simple_consumer: false + } + ), + ( + "list_groups_test2".to_owned(), + ConsumerGroupDescription { + is_simple_consumer: false + } + ), + ]) + ) } async fn list_and_describe_transactions(connection_builder: &KafkaConnectionBuilder) { @@ -1784,7 +1825,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec // rdkafka-rs doesnt support these methods #[allow(irrefutable_let_patterns)] if let KafkaConnectionBuilder::Java(_) = connection_builder { - list_groups(connection_builder).await; + list_and_describe_groups(connection_builder).await; list_and_describe_transactions(connection_builder).await; create_and_list_partition_reassignments(connection_builder).await; } diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index a985d370f..5edc5cf45 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -32,15 +32,15 @@ use kafka_protocol::messages::produce_response::{ use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, - DeleteRecordsResponse, DescribeProducersRequest, DescribeProducersResponse, - DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest, - FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, - InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, - ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, - MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, - OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, - SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, - TopicName, TransactionalId, TxnOffsetCommitRequest, + DeleteRecordsResponse, DescribeGroupsRequest, DescribeGroupsResponse, DescribeProducersRequest, + DescribeProducersResponse, DescribeTransactionsRequest, DescribeTransactionsResponse, + EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, + GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, + ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, + MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, + OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, + RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, + SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -56,10 +56,11 @@ use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, - DeleteRecordsRequestSplitAndRouter, DescribeProducersRequestSplitAndRouter, - DescribeTransactionsSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, - ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter, - OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, + DeleteRecordsRequestSplitAndRouter, DescribeGroupsSplitAndRouter, + DescribeProducersRequestSplitAndRouter, DescribeTransactionsSplitAndRouter, + ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, + OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter, + ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::hash::Hasher; @@ -755,6 +756,14 @@ impl KafkaSinkCluster { })) => { self.store_group(&mut groups, offset_delete.group_id.clone()); } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DescribeGroups(offset_delete), + .. + })) => { + for group_id in &offset_delete.groups { + self.store_group(&mut groups, group_id.clone()); + } + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::InitProducerId(InitProducerIdRequest { @@ -989,6 +998,12 @@ impl KafkaSinkCluster { })) => { self.split_and_route_request::(request)?; } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DescribeGroups(_), + .. + })) => { + self.split_and_route_request::(request)?; + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::OffsetDelete(offset_delete), .. @@ -1668,6 +1683,29 @@ The connection to the client has been closed." result } + /// This method removes all groups from the DescribeGroups request and returns them split up by their destination. + /// If any groups are unroutable they will have their BrokerId set to -1 + fn split_describe_groups_request_by_destination( + &mut self, + body: &mut DescribeGroupsRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for group in body.groups.drain(..) { + if let Some(destination) = self.group_to_coordinator_broker.get(&group) { + let dest_groups = result.entry(*destination).or_default(); + dest_groups.push(group); + } else { + tracing::warn!("no known coordinator for group {group:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"); + let destination = BrokerId(-1); + let dest_groups = result.entry(destination).or_default(); + dest_groups.push(group); + } + } + + result + } + /// This method removes all topics from the list offsets request and returns them split up by their destination /// If any topics are unroutable they will have their BrokerId set to -1 fn split_offset_for_leader_epoch_request_by_destination( @@ -2287,6 +2325,10 @@ The connection to the client has been closed." body: ResponseBody::ListTransactions(base), .. })) => Self::combine_list_transactions(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DescribeGroups(base), + .. + })) => Self::combine_describe_groups(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::AddPartitionsToTxn(base), version, @@ -2651,6 +2693,23 @@ The connection to the client has been closed." Ok(()) } + fn combine_describe_groups( + base: &mut DescribeGroupsResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DescribeGroups(next), + .. + })) = next.frame() + { + base.groups.extend(std::mem::take(&mut next.groups)); + } + } + + Ok(()) + } + fn combine_add_partitions_to_txn( base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse, drain: impl Iterator, diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index 19a84efa2..545085208 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -12,9 +12,9 @@ use kafka_protocol::messages::{ list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup, offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest, - DescribeProducersRequest, DescribeTransactionsRequest, GroupId, ListGroupsRequest, - ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, - ProduceRequest, TopicName, TransactionalId, + DescribeGroupsRequest, DescribeProducersRequest, DescribeTransactionsRequest, GroupId, + ListGroupsRequest, ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, + OffsetForLeaderEpochRequest, ProduceRequest, TopicName, TransactionalId, }; use std::collections::HashMap; @@ -336,3 +336,31 @@ impl RequestSplitAndRouter for OffsetFetchSplitAndRouter { request.groups = item; } } + +pub struct DescribeGroupsSplitAndRouter; + +impl RequestSplitAndRouter for DescribeGroupsSplitAndRouter { + type Request = DescribeGroupsRequest; + type SubRequests = Vec; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_describe_groups_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> &mut Self::Request { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DescribeGroups(request), + .. + })) => request, + _ => unreachable!(), + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.groups = item; + } +} diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 5c26d08e1..59b36cb0c 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,9 +1,9 @@ use super::{ - Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse, - ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, - OffsetSpec, PartitionReassignment, ProduceResult, ProducerState, Record, RecordsToDelete, - ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, - TopicPartitionInfo, TransactionDescription, + Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ConsumerGroupDescription, + ExpectedResponse, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, + OffsetAndMetadata, OffsetSpec, PartitionReassignment, ProduceResult, ProducerState, Record, + RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, + TopicPartition, TopicPartitionInfo, TransactionDescription, }; use crate::connection::java::{map_iterator, Jvm, Value}; use anyhow::Result; @@ -730,6 +730,33 @@ impl KafkaAdminJava { results } + pub async fn describe_groups( + &self, + group_ids: &[&str], + ) -> HashMap { + let group_ids = group_ids.iter().map(|x| self.jvm.new_string(x)).collect(); + let group_ids = self.jvm.new_list("java.lang.String", group_ids); + + let java_results = self + .admin + .call("describeConsumerGroups", vec![group_ids]) + .call_async("all", vec![]) + .await; + map_iterator(java_results) + .map(|(group_id, consumer_group_description)| { + ( + group_id.into_rust(), + ConsumerGroupDescription { + is_simple_consumer: consumer_group_description + .cast("org.apache.kafka.clients.admin.ConsumerGroupDescription") + .call("isSimpleConsumerGroup", vec![]) + .into_rust(), + }, + ) + }) + .collect() + } + pub async fn list_groups(&self) -> Vec { let java_results = self .admin diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index d208c6140..635609f24 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -472,6 +472,16 @@ impl KafkaAdmin { Self::Java(java) => java.list_offsets(topic_partitions).await, } } + pub async fn describe_groups( + &self, + group_ids: &[&str], + ) -> HashMap { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => panic!("rdkafka-rs driver does not support describe_groups"), + Self::Java(java) => java.describe_groups(group_ids).await, + } + } pub async fn list_groups(&self) -> Vec { match self { @@ -750,6 +760,11 @@ impl IsolationLevel { } } +#[derive(PartialEq, Debug)] +pub struct ConsumerGroupDescription { + pub is_simple_consumer: bool, +} + #[derive(PartialEq, Debug)] pub struct OffsetAndMetadata { pub offset: i64,