Skip to content

Commit

Permalink
KafkaSinkCluster split DeleteGroups request (#1785)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Oct 28, 2024
1 parent 2e91cfb commit 1b52506
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 16 deletions.
4 changes: 3 additions & 1 deletion shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {

async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin.delete_groups(&["some_group"]).await;
admin
.delete_groups(&["some_group", "some_group1", "consumer_group_with_offsets"])
.await;
}

/// Attempt to make the driver batch produce requests for different topics into the same request
Expand Down
78 changes: 66 additions & 12 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ use kafka_protocol::messages::produce_response::{
};
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest,
FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest,
LeaveGroupRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest,
ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -46,8 +47,9 @@ use scram_over_mtls::{
use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, ListOffsetsRequestSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
ListOffsetsRequestSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter,
ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hasher;
Expand Down Expand Up @@ -709,6 +711,14 @@ impl KafkaSinkCluster {
})) => {
self.store_group(&mut groups, group_id.clone());
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteGroups(delete_groups),
..
})) => {
for group_id in &delete_groups.groups_names {
self.store_group(&mut groups, group_id.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body:
RequestBody::InitProducerId(InitProducerIdRequest {
Expand Down Expand Up @@ -924,12 +934,10 @@ impl KafkaSinkCluster {
self.route_to_group_coordinator(message, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteGroups(groups),
body: RequestBody::DeleteGroups(_),
..
})) => {
// TODO: we need to split this up into multiple requests so it can be correctly routed to all possible nodes
let group_id = groups.groups_names.first().unwrap().clone();
self.route_to_group_coordinator(message, group_id);
self.split_and_route_request::<DeleteGroupsSplitAndRouter>(message)?;
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::TxnOffsetCommit(txn_offset_commit),
Expand Down Expand Up @@ -1374,6 +1382,29 @@ impl KafkaSinkCluster {
result
}

/// This method removes all group ids from the DeleteGroups request and returns them split up by their destination.
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_delete_groups_request_by_destination(
&mut self,
body: &mut DeleteGroupsRequest,
) -> HashMap<BrokerId, Vec<GroupId>> {
let mut result: HashMap<BrokerId, Vec<GroupId>> = Default::default();

for group_id in body.groups_names.drain(..) {
if let Some(destination) = self.group_to_coordinator_broker.get(&group_id) {
let dest_groups = result.entry(*destination).or_default();
dest_groups.push(group_id);
} else {
tracing::warn!("no known coordinator for group {group_id:?}, routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_groups = result.entry(destination).or_default();
dest_groups.push(group_id);
}
}

result
}

/// This method removes all topics from the list offsets request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_offset_for_leader_epoch_request_by_destination(
Expand Down Expand Up @@ -1913,6 +1944,10 @@ impl KafkaSinkCluster {
body: ResponseBody::Produce(base),
..
})) => Self::combine_produce_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DeleteGroups(base),
..
})) => Self::combine_delete_groups_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(base),
version,
Expand Down Expand Up @@ -2095,6 +2130,25 @@ impl KafkaSinkCluster {
Ok(())
}

fn combine_delete_groups_responses(
base_delete_groups: &mut DeleteGroupsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DeleteGroups(next_delete_groups),
..
})) = next.frame()
{
base_delete_groups
.results
.extend(std::mem::take(&mut next_delete_groups.results))
}
}

Ok(())
}

fn combine_add_partitions_to_txn(
base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
drain: impl Iterator<Item = Message>,
Expand Down
32 changes: 30 additions & 2 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{
use kafka_protocol::messages::{
add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
list_offsets_request::ListOffsetsTopic, offset_for_leader_epoch_request::OffsetForLeaderTopic,
produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest,
OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest,
GroupId, ListOffsetsRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -136,3 +136,31 @@ impl RequestSplitAndRouter for OffsetForLeaderEpochRequestSplitAndRouter {
request.topics = item;
}
}

pub struct DeleteGroupsSplitAndRouter;

impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter {
type Request = DeleteGroupsRequest;
type SubRequests = Vec<GroupId>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_delete_groups_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteGroups(request),
..
})) => Some(request),
_ => None,
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.groups_names = item;
}
}
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ impl KafkaAdminCpp {
// The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist.
// So just make sure to run it against a group that does exist.
.delete_groups(
&["some_group"],
to_delete,
&AdminOptions::new()
.operation_timeout(Some(Timeout::After(Duration::from_secs(30)))),
)
Expand Down

0 comments on commit 1b52506

Please sign in to comment.