Skip to content

Commit

Permalink
KafkaSinkCluster: route ConsumerGroupDescribe request
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 21, 2024
1 parent 6d73da1 commit 9459ef0
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 18 deletions.
88 changes: 72 additions & 16 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,18 @@ use kafka_protocol::messages::produce_response::{
};
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, ConsumerGroupHeartbeatRequest, DeleteGroupsRequest, DeleteGroupsResponse,
DeleteRecordsRequest, DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest,
DescribeGroupsResponse, DescribeLogDirsResponse, DescribeProducersRequest,
DescribeProducersResponse, DescribeTransactionsRequest, DescribeTransactionsResponse,
EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse,
GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest,
ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
BrokerId, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse,
ConsumerGroupHeartbeatRequest, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse,
DescribeLogDirsResponse, DescribeProducersRequest, DescribeProducersResponse,
DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -57,8 +58,8 @@ use scram_over_mtls::{
use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
DeleteRecordsRequestSplitAndRouter, DescribeGroupsSplitAndRouter,
AddPartitionsToTxnRequestSplitAndRouter, ConsumerGroupDescribeSplitAndRouter,
DeleteGroupsSplitAndRouter, DeleteRecordsRequestSplitAndRouter, DescribeGroupsSplitAndRouter,
DescribeLogDirsSplitAndRouter, DescribeProducersRequestSplitAndRouter,
DescribeTransactionsSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter,
ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
Expand Down Expand Up @@ -748,6 +749,14 @@ impl KafkaSinkCluster {
})) => {
self.store_group(&mut groups, group_id.clone());
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ConsumerGroupDescribe(describe),
..
})) => {
for group_id in &describe.group_ids {
self.store_group(&mut groups, group_id.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteGroups(delete_groups),
..
Expand Down Expand Up @@ -1017,6 +1026,12 @@ impl KafkaSinkCluster {
})) => {
self.split_and_route_request::<DescribeGroupsSplitAndRouter>(request)?;
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ConsumerGroupDescribe(_),
..
})) => {
self.split_and_route_request::<ConsumerGroupDescribeSplitAndRouter>(request)?;
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetDelete(offset_delete),
..
Expand Down Expand Up @@ -1739,6 +1754,29 @@ The connection to the client has been closed."
result
}

/// This method removes all groups from the ConsumerGroupDescribe request and returns them split up by their destination.
/// If any groups are unroutable they will have their BrokerId set to -1
fn split_consumer_group_describe_request_by_destination(
&mut self,
body: &mut ConsumerGroupDescribeRequest,
) -> HashMap<BrokerId, Vec<GroupId>> {
let mut result: HashMap<BrokerId, Vec<GroupId>> = Default::default();

for group in body.group_ids.drain(..) {
if let Some(destination) = self.group_to_coordinator_broker.get(&group) {
let dest_groups = result.entry(*destination).or_default();
dest_groups.push(group);
} else {
tracing::warn!("no known coordinator for group {group:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_groups = result.entry(destination).or_default();
dest_groups.push(group);
}
}

result
}

/// This method removes all topics from the list offsets request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_offset_for_leader_epoch_request_by_destination(
Expand Down Expand Up @@ -2370,6 +2408,10 @@ The connection to the client has been closed."
body: ResponseBody::DescribeGroups(base),
..
})) => Self::combine_describe_groups(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ConsumerGroupDescribe(base),
..
})) => Self::combine_consumer_group_describe(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeLogDirs(base_body),
..
Expand Down Expand Up @@ -2810,6 +2852,23 @@ The connection to the client has been closed."
Ok(())
}

fn combine_consumer_group_describe(
base: &mut ConsumerGroupDescribeResponse,
drain: impl Iterator<Item = ResponseToBeCombined>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ConsumerGroupDescribe(next),
..
})) = next.response.frame()
{
base.groups.extend(std::mem::take(&mut next.groups));
}
}

Ok(())
}

fn combine_add_partitions_to_txn(
base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
drain: impl Iterator<Item = ResponseToBeCombined>,
Expand Down Expand Up @@ -3188,9 +3247,6 @@ The connection to the client has been closed."
// We can consider supporting it when kafka itself starts to support it but we will need to be very
// careful to correctly implement the pagination/cursor logic.
ApiKey::DescribeTopicPartitionsKey as i16,
// This message type is part of the new consumer group API, we should implement support for it in the future.
// I've disabled it for now to keep the scope down for kafka 3.9 support.
ApiKey::ConsumerGroupDescribeKey as i16,
];
api_versions
.api_keys
Expand Down
32 changes: 30 additions & 2 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use kafka_protocol::messages::{
delete_records_request::DeleteRecordsTopic, describe_producers_request::TopicRequest,
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest,
DescribeGroupsRequest, DescribeLogDirsRequest, DescribeProducersRequest,
AddPartitionsToTxnRequest, BrokerId, ConsumerGroupDescribeRequest, DeleteGroupsRequest,
DeleteRecordsRequest, DescribeGroupsRequest, DescribeLogDirsRequest, DescribeProducersRequest,
DescribeTransactionsRequest, GroupId, ListGroupsRequest, ListOffsetsRequest,
ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest,
TopicName, TransactionalId,
Expand Down Expand Up @@ -393,3 +393,31 @@ impl RequestSplitAndRouter for DescribeGroupsSplitAndRouter {
request.groups = item;
}
}

pub struct ConsumerGroupDescribeSplitAndRouter;

impl RequestSplitAndRouter for ConsumerGroupDescribeSplitAndRouter {
type Request = ConsumerGroupDescribeRequest;
type SubRequests = Vec<GroupId>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_consumer_group_describe_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ConsumerGroupDescribe(request),
..
})) => request,
_ => unreachable!(),
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.group_ids = item;
}
}

0 comments on commit 9459ef0

Please sign in to comment.