diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 028d2ddb2..afe7c5314 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -42,6 +42,10 @@ use scram_over_mtls::{ }; use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; +use split::{ + AddPartitionsToTxnRequestSplitAndRouter, ListOffsetsRequestSplitAndRouter, + ProduceRequestSplitAndRouter, RequestSplitAndRouter, +}; use std::collections::{HashMap, VecDeque}; use std::hash::Hasher; use std::sync::atomic::AtomicI64; @@ -54,6 +58,7 @@ mod connections; mod kafka_node; mod scram_over_mtls; pub mod shotover_node; +pub(crate) mod split; const SASL_SCRAM_MECHANISMS: [&str; 2] = ["SCRAM-SHA-256", "SCRAM-SHA-512"]; @@ -271,7 +276,7 @@ impl AtomicBrokerId { } } -struct KafkaSinkCluster { +pub(crate) struct KafkaSinkCluster { first_contact_points: Vec, shotover_nodes: Vec, rack: StrBytes, @@ -817,7 +822,7 @@ impl KafkaSinkCluster { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Produce(_), .. - })) => self.route_produce_request(message)?, + })) => self.split_and_route_request::(message)?, Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Fetch(_), .. @@ -825,7 +830,7 @@ impl KafkaSinkCluster { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::ListOffsets(_), .. - })) => self.route_list_offsets(message)?, + })) => self.split_and_route_request::(message)?, // route to group coordinator Some(Frame::Kafka(KafkaFrame::Request { @@ -940,33 +945,32 @@ impl KafkaSinkCluster { }); } - fn route_produce_request(&mut self, mut message: Message) -> Result<()> { - if let Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::Produce(produce), - .. - })) = message.frame() - { - let routing = self.split_produce_request_by_destination(produce); + fn split_and_route_request( + &mut self, + mut request: Message, + ) -> Result<()> { + if let Some(request_frame) = T::get_request_frame(&mut request) { + let routing = T::split_by_destination(self, request_frame); if routing.is_empty() { // Produce contains no topics, so we can just pick a random destination. - // The message is unchanged so we can just send as is. + // The request is unchanged so we can just send as is. let destination = random_broker_id(&self.nodes, &mut self.rng); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, message), + state: PendingRequestState::routed(destination, request), ty: PendingRequestTy::Other, combine_responses: 1, }); tracing::debug!( - "Routing produce request to random broker {} due to being empty", + "Routing request to random broker {} due to being empty", destination.0 ); } else if routing.len() == 1 { // Only 1 destination, - // so we can just reconstruct the original message as is, + // so we can just reconstruct the original request as is, // act like this never happened 😎, - // we dont even need to invalidate the message's cache. + // we dont even need to invalidate the request's cache. let (destination, topic_data) = routing.into_iter().next().unwrap(); let destination = if destination == -1 { random_broker_id(&self.nodes, &mut self.rng) @@ -974,21 +978,18 @@ impl KafkaSinkCluster { destination }; - produce.topic_data = topic_data; + T::reassemble(request_frame, topic_data); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, message), + state: PendingRequestState::routed(destination, request), ty: PendingRequestTy::Other, combine_responses: 1, }); - tracing::debug!( - "Routing produce request to single broker {:?}", - destination.0 - ); + tracing::debug!("Routing request to single broker {:?}", destination.0); } else { - // The message has been split so it may be delivered to multiple destinations. - // We must generate a unique message for each destination. + // The request has been split so it may be delivered to multiple destinations. + // We must generate a unique request for each destination. let combine_responses = routing.len(); - message.invalidate_cache(); + request.invalidate_cache(); for (i, (destination, topic_data)) in routing.into_iter().enumerate() { let destination = if destination == -1 { random_broker_id(&self.nodes, &mut self.rng) @@ -996,17 +997,13 @@ impl KafkaSinkCluster { destination }; let mut request = if i == 0 { - // First message acts as base and retains message id - message.clone() + // First request acts as base and retains message id + request.clone() } else { - message.clone_with_new_id() + request.clone_with_new_id() }; - if let Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::Produce(produce), - .. - })) = request.frame() - { - produce.topic_data = topic_data; + if let Some(request_frame) = T::get_request_frame(&mut request) { + T::reassemble(request_frame, topic_data) } self.pending_requests.push_back(PendingRequest { state: PendingRequestState::routed(destination, request), @@ -1014,10 +1011,9 @@ impl KafkaSinkCluster { combine_responses, }); } - tracing::debug!("Routing produce request to multiple brokers"); + tracing::debug!("Routing request to multiple brokers"); } } - Ok(()) } @@ -1297,88 +1293,6 @@ impl KafkaSinkCluster { result } - fn route_list_offsets(&mut self, mut request: Message) -> Result<()> { - if let Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::ListOffsets(list_offsets), - .. - })) = request.frame() - { - let routing = self.split_list_offsets_request_by_destination(list_offsets); - - if routing.is_empty() { - // ListOffsets contains no topics, so we can just pick a random destination. - // The message is unchanged so we can just send as is. - let destination = random_broker_id(&self.nodes, &mut self.rng); - - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - // we dont need special handling for list_offsets, so just use Other - ty: PendingRequestTy::Other, - combine_responses: 1, - }); - tracing::debug!( - "Routing ListOffsets request to random broker {} due to being empty", - destination.0 - ); - } else if routing.len() == 1 { - // Only 1 destination, - // so we can just reconstruct the original message as is, - // act like this never happened 😎, - // we dont even need to invalidate the message's cache. - let (destination, topics) = routing.into_iter().next().unwrap(); - let destination = if destination == -1 { - random_broker_id(&self.nodes, &mut self.rng) - } else { - destination - }; - - list_offsets.topics = topics; - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - // we dont need special handling for ListOffsets, so just use Other - ty: PendingRequestTy::Other, - combine_responses: 1, - }); - tracing::debug!( - "Routing ListOffsets request to single broker {}", - destination.0 - ); - } else { - // The message has been split so it may be delivered to multiple destinations. - // We must generate a unique message for each destination. - let combine_responses = routing.len(); - request.invalidate_cache(); - for (i, (destination, topics)) in routing.into_iter().enumerate() { - let destination = if destination == -1 { - random_broker_id(&self.nodes, &mut self.rng) - } else { - destination - }; - let mut request = if i == 0 { - // First message acts as base and retains message id - request.clone() - } else { - request.clone_with_new_id() - }; - if let Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::ListOffsets(list_offsets), - .. - })) = request.frame() - { - list_offsets.topics = topics; - } - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - ty: PendingRequestTy::Other, - combine_responses, - }); - } - tracing::debug!("Routing ListOffsets request to multiple brokers"); - } - } - Ok(()) - } - /// This method removes all transactions from the AddPartitionsToTxn request and returns them split up by their destination /// If any topics are unroutable they will have their BrokerId set to -1 fn split_add_partition_to_txn_request_by_destination( @@ -1418,78 +1332,7 @@ impl KafkaSinkCluster { let transaction_id = body.v3_and_below_transactional_id.clone(); self.route_to_transaction_coordinator(request, transaction_id); } else { - let routing = self.split_add_partition_to_txn_request_by_destination(body); - - if routing.is_empty() { - // ListOffsets contains no topics, so we can just pick a random destination. - // The message is unchanged so we can just send as is. - let destination = random_broker_id(&self.nodes, &mut self.rng); - - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - // we dont need special handling for list_offsets, so just use Other - ty: PendingRequestTy::Other, - combine_responses: 1, - }); - tracing::debug!( - "Routing AddPartitionsToTxn request to random broker {} due to being empty", - destination.0 - ); - } else if routing.len() == 1 { - // Only 1 destination, - // so we can just reconstruct the original message as is, - // act like this never happened 😎, - // we dont even need to invalidate the message's cache. - let (destination, transactions) = routing.into_iter().next().unwrap(); - let destination = if destination == -1 { - random_broker_id(&self.nodes, &mut self.rng) - } else { - destination - }; - - body.transactions = transactions; - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - // we dont need special handling for ListOffsets, so just use Other - ty: PendingRequestTy::Other, - combine_responses: 1, - }); - tracing::debug!( - "Routing AddPartitionsToTxn request to single broker {}", - destination.0 - ); - } else { - // The message has been split so it may be delivered to multiple destinations. - // We must generate a unique message for each destination. - let combine_responses = routing.len(); - request.invalidate_cache(); - for (i, (destination, transactions)) in routing.into_iter().enumerate() { - let destination = if destination == -1 { - random_broker_id(&self.nodes, &mut self.rng) - } else { - destination - }; - let mut request = if i == 0 { - // First message acts as base and retains message id - request.clone() - } else { - request.clone_with_new_id() - }; - if let Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::AddPartitionsToTxn(body), - .. - })) = request.frame() - { - body.transactions = transactions; - } - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - ty: PendingRequestTy::Other, - combine_responses, - }); - } - tracing::debug!("Routing AddPartitionsToTxn request to multiple brokers"); - } + self.split_and_route_request::(request)?; } } Ok(()) diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs new file mode 100644 index 000000000..dc3255e8b --- /dev/null +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -0,0 +1,113 @@ +use super::KafkaSinkCluster; +use crate::{ + frame::{ + kafka::{KafkaFrame, RequestBody}, + Frame, + }, + message::Message, +}; +use kafka_protocol::{ + indexmap::IndexMap, + messages::{ + add_partitions_to_txn_request::AddPartitionsToTxnTransaction, + list_offsets_request::ListOffsetsTopic, produce_request::TopicProduceData, + AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, ProduceRequest, TopicName, + TransactionalId, + }, +}; +use std::collections::HashMap; + +pub trait RequestSplitAndRouter { + type SubRequests; + type Request; + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request>; + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap; + fn reassemble(request: &mut Self::Request, item: Self::SubRequests); +} + +pub struct ProduceRequestSplitAndRouter; + +impl RequestSplitAndRouter for ProduceRequestSplitAndRouter { + type Request = ProduceRequest; + type SubRequests = IndexMap; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_produce_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::Produce(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.topic_data = item; + } +} + +pub struct AddPartitionsToTxnRequestSplitAndRouter; + +impl RequestSplitAndRouter for AddPartitionsToTxnRequestSplitAndRouter { + type Request = AddPartitionsToTxnRequest; + type SubRequests = IndexMap; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_add_partition_to_txn_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::AddPartitionsToTxn(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.transactions = item; + } +} + +pub struct ListOffsetsRequestSplitAndRouter; + +impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter { + type Request = ListOffsetsRequest; + type SubRequests = Vec; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_list_offsets_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ListOffsets(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.topics = item; + } +}