diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 6ccbbb7fd..26d636abe 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -19,6 +19,7 @@ use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseL use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; use kafka_protocol::messages::metadata_response::MetadataResponseBroker; +use kafka_protocol::messages::offset_fetch_request::OffsetFetchRequestGroup; use kafka_protocol::messages::offset_for_leader_epoch_request::OffsetForLeaderTopic; use kafka_protocol::messages::produce_request::TopicProduceData; use kafka_protocol::messages::produce_response::{ @@ -29,10 +30,10 @@ use kafka_protocol::messages::{ BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest, - ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetForLeaderEpochRequest, - OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, - SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, - TopicName, TransactionalId, TxnOffsetCommitRequest, + ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, + OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, + ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, + SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -48,8 +49,8 @@ use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, - ListOffsetsRequestSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter, - ProduceRequestSplitAndRouter, RequestSplitAndRouter, + ListOffsetsRequestSplitAndRouter, OffsetFetchSplitAndRouter, + OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::hash::Hasher; @@ -899,18 +900,12 @@ impl KafkaSinkCluster { body: RequestBody::OffsetFetch(offset_fetch), header, })) => { - let group_id = if header.request_api_version <= 7 { - offset_fetch.group_id.clone() + if header.request_api_version <= 7 { + let group_id = offset_fetch.group_id.clone(); + self.route_to_group_coordinator(message, group_id); } else { - // This is possibly dangerous. - // The client could construct a message which is valid for a specific shotover node, but not for any single kafka broker. - // We may need to add some logic to split the request into multiple messages going to different destinations, - // and then reconstruct the response back into a single response - // - // For now just pick the first group as that is sufficient for the simple cases. - offset_fetch.groups.first().unwrap().group_id.clone() + self.split_and_route_request::(message)?; }; - self.route_to_group_coordinator(message, group_id); } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::OffsetCommit(offset_commit), @@ -1383,7 +1378,7 @@ impl KafkaSinkCluster { } /// This method removes all group ids from the DeleteGroups request and returns them split up by their destination. - /// If any topics are unroutable they will have their BrokerId set to -1 + /// If any groups ids are unroutable they will have their BrokerId set to -1 fn split_delete_groups_request_by_destination( &mut self, body: &mut DeleteGroupsRequest, @@ -1405,6 +1400,32 @@ impl KafkaSinkCluster { result } + /// This method removes all groups from the OffsetFetch request and returns them split up by their destination. + /// If any groups are unroutable they will have their BrokerId set to -1 + fn split_offset_fetch_request_by_destination( + &mut self, + body: &mut OffsetFetchRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for group in body.groups.drain(..) { + if let Some(destination) = self.group_to_coordinator_broker.get(&group.group_id) { + let dest_groups = result.entry(*destination).or_default(); + dest_groups.push(group); + } else { + tracing::warn!( + "no known coordinator for group {:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client", + group.group_id + ); + let destination = BrokerId(-1); + let dest_groups = result.entry(destination).or_default(); + dest_groups.push(group); + } + } + + result + } + /// This method removes all topics from the list offsets request and returns them split up by their destination /// If any topics are unroutable they will have their BrokerId set to -1 fn split_offset_for_leader_epoch_request_by_destination( @@ -1948,6 +1969,10 @@ impl KafkaSinkCluster { body: ResponseBody::DeleteGroups(base), .. })) => Self::combine_delete_groups_responses(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::OffsetFetch(base), + .. + })) => Self::combine_offset_fetch(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::AddPartitionsToTxn(base), version, @@ -2149,6 +2174,25 @@ impl KafkaSinkCluster { Ok(()) } + fn combine_offset_fetch( + base_offset_fetch: &mut OffsetFetchResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::OffsetFetch(next_offset_fetch), + .. + })) = next.frame() + { + base_offset_fetch + .groups + .extend(std::mem::take(&mut next_offset_fetch.groups)) + } + } + + Ok(()) + } + fn combine_add_partitions_to_txn( base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse, drain: impl Iterator, @@ -2355,9 +2399,30 @@ impl KafkaSinkCluster { })) => self.handle_group_coordinator_routing_error(&request_ty, sync_group.error_code), Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::OffsetFetch(offset_fetch), + version, .. })) => { - self.handle_group_coordinator_routing_error(&request_ty, offset_fetch.error_code) + if *version <= 7 { + // group id is not provided in response, so use group id stored in request_ty + self.handle_group_coordinator_routing_error( + &request_ty, + offset_fetch.error_code, + ) + } else { + // group id is provided in response, so use group id in the response. + for group in &offset_fetch.groups { + let group_id = &group.group_id; + if let Some(ResponseError::NotCoordinator) = + ResponseError::try_from_code(group.error_code) + { + let broker_id = self + .group_to_coordinator_broker + .remove(group_id) + .map(|x| x.1); + tracing::info!("Response was error NOT_COORDINATOR and so cleared group id {group_id:?} coordinator mapping to broker {broker_id:?}"); + } + } + } } Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::JoinGroup(join_group), diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index d5af7a6a5..b5f03d97d 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -8,9 +8,10 @@ use crate::{ }; use kafka_protocol::messages::{ add_partitions_to_txn_request::AddPartitionsToTxnTransaction, - list_offsets_request::ListOffsetsTopic, offset_for_leader_epoch_request::OffsetForLeaderTopic, - produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, - GroupId, ListOffsetsRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName, + list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup, + offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData, + AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListOffsetsRequest, + OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName, }; use std::collections::HashMap; @@ -164,3 +165,31 @@ impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter { request.groups_names = item; } } + +pub struct OffsetFetchSplitAndRouter; + +impl RequestSplitAndRouter for OffsetFetchSplitAndRouter { + type Request = OffsetFetchRequest; + type SubRequests = Vec; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_offset_fetch_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::OffsetFetch(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.groups = item; + } +}