Skip to content

Commit

Permalink
KafkaSinkCluster: route ConsumerGroupDescribe request
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 20, 2024
1 parent c4178d6 commit e3fe14a
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 14 deletions.
76 changes: 66 additions & 10 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ use kafka_protocol::messages::produce_response::{
};
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, ConsumerGroupHeartbeatRequest, DeleteGroupsRequest, DeleteGroupsResponse,
DeleteRecordsRequest, DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest,
DescribeGroupsResponse, DescribeProducersRequest, DescribeProducersResponse,
DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
BrokerId, ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse,
ConsumerGroupHeartbeatRequest, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse,
DescribeProducersRequest, DescribeProducersResponse, DescribeTransactionsRequest,
DescribeTransactionsResponse, EndTxnRequest, FetchRequest, FetchResponse,
FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
Expand All @@ -57,8 +58,8 @@ use scram_over_mtls::{
use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
DeleteRecordsRequestSplitAndRouter, DescribeGroupsSplitAndRouter,
AddPartitionsToTxnRequestSplitAndRouter, ConsumerGroupDescribeSplitAndRouter,
DeleteGroupsSplitAndRouter, DeleteRecordsRequestSplitAndRouter, DescribeGroupsSplitAndRouter,
DescribeProducersRequestSplitAndRouter, DescribeTransactionsSplitAndRouter,
ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter,
OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter,
Expand Down Expand Up @@ -748,6 +749,14 @@ impl KafkaSinkCluster {
})) => {
self.store_group(&mut groups, group_id.clone());
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ConsumerGroupDescribe(describe),
..
})) => {
for group_id in &describe.group_ids {
self.store_group(&mut groups, group_id.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteGroups(delete_groups),
..
Expand Down Expand Up @@ -1017,6 +1026,12 @@ impl KafkaSinkCluster {
})) => {
self.split_and_route_request::<DescribeGroupsSplitAndRouter>(request)?;
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ConsumerGroupDescribe(_),
..
})) => {
self.split_and_route_request::<ConsumerGroupDescribeSplitAndRouter>(request)?;
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetDelete(offset_delete),
..
Expand Down Expand Up @@ -1735,6 +1750,29 @@ The connection to the client has been closed."
result
}

/// This method removes all groups from the ConsumerGroupDescribe request and returns them split up by their destination.
/// If any groups are unroutable they will have their BrokerId set to -1
fn split_consumer_group_describe_request_by_destination(
&mut self,
body: &mut ConsumerGroupDescribeRequest,
) -> HashMap<BrokerId, Vec<GroupId>> {
let mut result: HashMap<BrokerId, Vec<GroupId>> = Default::default();

for group in body.group_ids.drain(..) {
if let Some(destination) = self.group_to_coordinator_broker.get(&group) {
let dest_groups = result.entry(*destination).or_default();
dest_groups.push(group);
} else {
tracing::warn!("no known coordinator for group {group:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_groups = result.entry(destination).or_default();
dest_groups.push(group);
}
}

result
}

/// This method removes all topics from the list offsets request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_offset_for_leader_epoch_request_by_destination(
Expand Down Expand Up @@ -2358,6 +2396,10 @@ The connection to the client has been closed."
body: ResponseBody::DescribeGroups(base),
..
})) => Self::combine_describe_groups(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ConsumerGroupDescribe(base),
..
})) => Self::combine_consumer_group_describe(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(base),
version,
Expand Down Expand Up @@ -2739,6 +2781,23 @@ The connection to the client has been closed."
Ok(())
}

fn combine_consumer_group_describe(
base: &mut ConsumerGroupDescribeResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ConsumerGroupDescribe(next),
..
})) = next.frame()
{
base.groups.extend(std::mem::take(&mut next.groups));
}
}

Ok(())
}

fn combine_add_partitions_to_txn(
base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
drain: impl Iterator<Item = Message>,
Expand Down Expand Up @@ -3117,9 +3176,6 @@ The connection to the client has been closed."
// We can consider supporting it when kafka itself starts to support it but we will need to be very
// careful to correctly implement the pagination/cursor logic.
ApiKey::DescribeTopicPartitionsKey as i16,
// This message type is part of the new consumer group API, we should implement support for it in the future.
// I've disabled it for now to keep the scope down for kafka 3.9 support.
ApiKey::ConsumerGroupDescribeKey as i16,
];
api_versions
.api_keys
Expand Down
37 changes: 33 additions & 4 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use kafka_protocol::messages::{
delete_records_request::DeleteRecordsTopic, describe_producers_request::TopicRequest,
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest,
DescribeGroupsRequest, DescribeProducersRequest, DescribeTransactionsRequest, GroupId,
ListGroupsRequest, ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest,
OffsetForLeaderEpochRequest, ProduceRequest, TopicName, TransactionalId,
AddPartitionsToTxnRequest, BrokerId, ConsumerGroupDescribeRequest, DeleteGroupsRequest,
DeleteRecordsRequest, DescribeGroupsRequest, DescribeProducersRequest,
DescribeTransactionsRequest, GroupId, ListGroupsRequest, ListOffsetsRequest,
ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest,
TopicName, TransactionalId,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -364,3 +365,31 @@ impl RequestSplitAndRouter for DescribeGroupsSplitAndRouter {
request.groups = item;
}
}

pub struct ConsumerGroupDescribeSplitAndRouter;

impl RequestSplitAndRouter for ConsumerGroupDescribeSplitAndRouter {
type Request = ConsumerGroupDescribeRequest;
type SubRequests = Vec<GroupId>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_consumer_group_describe_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ConsumerGroupDescribe(request),
..
})) => request,
_ => unreachable!(),
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.group_ids = item;
}
}

0 comments on commit e3fe14a

Please sign in to comment.