diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index b027af4be..46652ae73 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1125,70 +1125,68 @@ The connection to the client has been closed." &mut self, mut request: Message, ) -> Result<()> { - if let Some(request_frame) = T::get_request_frame(&mut request) { - let routing = T::split_by_destination(self, request_frame); + let request_frame = T::get_request_frame(&mut request); + let routing = T::split_by_destination(self, request_frame); - if routing.is_empty() { - // Produce contains no topics, so we can just pick a random destination. - // The request is unchanged so we can just send as is. - let destination = random_broker_id(&self.nodes, &mut self.rng); + if routing.is_empty() { + // Produce contains no topics, so we can just pick a random destination. + // The request is unchanged so we can just send as is. + let destination = random_broker_id(&self.nodes, &mut self.rng); - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - ty: PendingRequestTy::Other, - combine_responses: 1, - }); - tracing::debug!( - "Routing request to random broker {} due to being empty", - destination.0 - ); - } else if routing.len() == 1 { - // Only 1 destination, - // so we can just reconstruct the original request as is, - // act like this never happened 😎, - // we dont even need to invalidate the request's cache. - let (destination, topic_data) = routing.into_iter().next().unwrap(); + self.pending_requests.push_back(PendingRequest { + state: PendingRequestState::routed(destination, request), + ty: PendingRequestTy::Other, + combine_responses: 1, + }); + tracing::debug!( + "Routing request to random broker {} due to being empty", + destination.0 + ); + } else if routing.len() == 1 { + // Only 1 destination, + // so we can just reconstruct the original request as is, + // act like this never happened 😎, + // we dont even need to invalidate the request's cache. + let (destination, topic_data) = routing.into_iter().next().unwrap(); + let destination = if destination == -1 { + random_broker_id(&self.nodes, &mut self.rng) + } else { + destination + }; + + T::reassemble(request_frame, topic_data); + self.pending_requests.push_back(PendingRequest { + state: PendingRequestState::routed(destination, request), + ty: PendingRequestTy::Other, + combine_responses: 1, + }); + tracing::debug!("Routing request to single broker {:?}", destination.0); + } else { + // The request has been split so it may be delivered to multiple destinations. + // We must generate a unique request for each destination. + let combine_responses = routing.len(); + request.invalidate_cache(); + for (i, (destination, topic_data)) in routing.into_iter().enumerate() { let destination = if destination == -1 { random_broker_id(&self.nodes, &mut self.rng) } else { destination }; - + let mut request = if i == 0 { + // First request acts as base and retains message id + request.clone() + } else { + request.clone_with_new_id() + }; + let request_frame = T::get_request_frame(&mut request); T::reassemble(request_frame, topic_data); self.pending_requests.push_back(PendingRequest { state: PendingRequestState::routed(destination, request), ty: PendingRequestTy::Other, - combine_responses: 1, + combine_responses, }); - tracing::debug!("Routing request to single broker {:?}", destination.0); - } else { - // The request has been split so it may be delivered to multiple destinations. - // We must generate a unique request for each destination. - let combine_responses = routing.len(); - request.invalidate_cache(); - for (i, (destination, topic_data)) in routing.into_iter().enumerate() { - let destination = if destination == -1 { - random_broker_id(&self.nodes, &mut self.rng) - } else { - destination - }; - let mut request = if i == 0 { - // First request acts as base and retains message id - request.clone() - } else { - request.clone_with_new_id() - }; - if let Some(request_frame) = T::get_request_frame(&mut request) { - T::reassemble(request_frame, topic_data) - } - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - ty: PendingRequestTy::Other, - combine_responses, - }); - } - tracing::debug!("Routing request to multiple brokers"); } + tracing::debug!("Routing request to multiple brokers"); } Ok(()) } diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index 7b34bc0c3..4c634af4c 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -20,7 +20,7 @@ use std::collections::HashMap; pub trait RequestSplitAndRouter { type SubRequests; type Request; - fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request>; + fn get_request_frame(request: &mut Message) -> &mut Self::Request; fn split_by_destination( transform: &mut KafkaSinkCluster, request: &mut Self::Request, @@ -41,13 +41,13 @@ impl RequestSplitAndRouter for ProduceRequestSplitAndRouter { transform.split_produce_request_by_destination(request) } - fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + fn get_request_frame(request: &mut Message) -> &mut Self::Request { match request.frame() { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Produce(request), .. - })) => Some(request), - _ => None, + })) => request, + _ => unreachable!(), } } @@ -69,13 +69,13 @@ impl RequestSplitAndRouter for AddPartitionsToTxnRequestSplitAndRouter { transform.split_add_partition_to_txn_request_by_destination(request) } - fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + fn get_request_frame(request: &mut Message) -> &mut Self::Request { match request.frame() { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::AddPartitionsToTxn(request), .. - })) => Some(request), - _ => None, + })) => request, + _ => unreachable!(), } } @@ -97,13 +97,13 @@ impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter { transform.split_list_offsets_request_by_destination(request) } - fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + fn get_request_frame(request: &mut Message) -> &mut Self::Request { match request.frame() { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::ListOffsets(request), .. - })) => Some(request), - _ => None, + })) => request, + _ => unreachable!(), } } @@ -125,13 +125,13 @@ impl RequestSplitAndRouter for OffsetForLeaderEpochRequestSplitAndRouter { transform.split_offset_for_leader_epoch_request_by_destination(request) } - fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + fn get_request_frame(request: &mut Message) -> &mut Self::Request { match request.frame() { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::OffsetForLeaderEpoch(request), .. - })) => Some(request), - _ => None, + })) => request, + _ => unreachable!(), } } @@ -153,13 +153,13 @@ impl RequestSplitAndRouter for DeleteRecordsRequestSplitAndRouter { transform.split_delete_records_request_by_destination(request) } - fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + fn get_request_frame(request: &mut Message) -> &mut Self::Request { match request.frame() { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::DeleteRecords(request), .. - })) => Some(request), - _ => None, + })) => request, + _ => unreachable!(), } } @@ -181,13 +181,13 @@ impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter { transform.split_delete_groups_request_by_destination(request) } - fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + fn get_request_frame(request: &mut Message) -> &mut Self::Request { match request.frame() { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::DeleteGroups(request), .. - })) => Some(request), - _ => None, + })) => request, + _ => unreachable!(), } } @@ -209,13 +209,13 @@ impl RequestSplitAndRouter for ListGroupsSplitAndRouter { transform.split_request_by_routing_to_all_brokers() } - fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + fn get_request_frame(request: &mut Message) -> &mut Self::Request { match request.frame() { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::ListGroups(request), .. - })) => Some(request), - _ => None, + })) => request, + _ => unreachable!(), } } @@ -237,13 +237,13 @@ impl RequestSplitAndRouter for ListTransactionsSplitAndRouter { transform.split_request_by_routing_to_all_brokers() } - fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + fn get_request_frame(request: &mut Message) -> &mut Self::Request { match request.frame() { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::ListTransactions(request), .. - })) => Some(request), - _ => None, + })) => request, + _ => unreachable!(), } } @@ -265,13 +265,13 @@ impl RequestSplitAndRouter for OffsetFetchSplitAndRouter { transform.split_offset_fetch_request_by_destination(request) } - fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + fn get_request_frame(request: &mut Message) -> &mut Self::Request { match request.frame() { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::OffsetFetch(request), .. - })) => Some(request), - _ => None, + })) => request, + _ => unreachable!(), } }