diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 47e349730..9fe342ecc 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1894,7 +1894,7 @@ async fn send_requests(&mut self) -> Result<()> { Ok(()) } - /// Receive all responses from the outgoing connections, returns all responses that are ready to be returned. +/// Receive all responses from the outgoing connections, returns all responses that are ready to be returned. /// For response ordering reasons, some responses will remain in self.pending_requests until other responses are received. async fn recv_responses(&mut self, close_client_connection: &mut bool) -> Result> { // To work around borrow checker issues, store connection errors in this temporary list before handling them. @@ -1914,18 +1914,16 @@ async fn send_requests(&mut self) -> Result<()> { let mut response = Some(response); for pending_request in &mut self.pending_requests { if let PendingRequestState::Sent { - destination, index, request, } = &mut pending_request.state { - if destination == connection_destination { + if &pending_request.destination == connection_destination { // Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent // All other PendingRequestState::Sent need to be decremented, in order to determine the PendingRequestState::Sent // to be used next time, and the time after that, and ... if *index == 0 { pending_request.state = PendingRequestState::Received { - destination: *destination, response: response.take().unwrap(), request: request.take(), }; @@ -1993,13 +1991,11 @@ async fn send_requests(&mut self) -> Result<()> { for i in 0..combine_responses { let pending_request = &mut self.pending_requests[i]; if let PendingRequestState::Received { - destination, request, .. } = &mut pending_request.state { pending_request.state = PendingRequestState::Routed { - destination: *destination, request: request.take().unwrap(), } } else { @@ -2069,8 +2065,7 @@ async fn send_requests(&mut self) -> Result<()> { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Fetch(fetch), .. - })) = response.frame() - { + })) = response.frame() { result += fetch .responses .iter() @@ -2087,10 +2082,10 @@ async fn send_requests(&mut self) -> Result<()> { }) .sum::(); } else { - panic!("must be called on fetch responses") + panic!("must be called on fetch responses"); } } else { - panic!("must be called on received responses") + panic!("must be called on received responses"); } } result @@ -2893,116 +2888,124 @@ async fn send_requests(&mut self) -> Result<()> { } fn route_to_control_connection(&mut self, request: Message) { - assert!( - !self.auth_complete, - "route_to_control_connection cannot be called after auth is complete. Otherwise it would collide with control_send_receive" - ); - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::Routed { - destination: Destination::ControlConnection, - request, - }, - ty: PendingRequestTy::Other, - combine_responses: 1, - }); - tracing::debug!("Routing request to control connection"); - } + assert!( + !self.auth_complete, + "route_to_control_connection cannot be called after auth is complete. Otherwise it would collide with control_send_receive" + ); + + let pending_request = PendingRequest { + state: PendingRequestState::Routed { + request, + }, + destination: Destination::ControlConnection, + ty: PendingRequestTy::Other, + combine_responses: 1, + }; - fn route_to_controller(&mut self, request: Message) { - let broker_id = self.controller_broker.get().unwrap(); + self.pending_requests.push_back(pending_request); + tracing::debug!("Routing request to control connection"); +} - let destination = if let Some(node) = self - .nodes - .iter_mut() - .find(|x| x.broker_id == *broker_id && x.is_up()) - { - node.broker_id - } else { - tracing::warn!("no known broker with id {broker_id:?} that is 'up', routing request to a random broker so that a NOT_CONTROLLER or similar error is returned to the client"); - random_broker_id(&self.nodes, &mut self.rng) - }; +fn route_to_controller(&mut self, request: Message) { + let broker_id = self.controller_broker.get().unwrap(); + let destination = if let Some(node) = self.nodes.iter_mut().find(|x| x.broker_id == *broker_id && x.is_up()) { + node.broker_id + } else { + tracing::warn!("no known broker with id {broker_id:?} that is 'up', routing request to a random broker so that a NOT_CONTROLLER or similar error is returned to the client"); + random_broker_id(&self.nodes, &mut self.rng) + }; - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - ty: PendingRequestTy::Other, - combine_responses: 1, - }); - tracing::debug!( - "Routing request relating to controller to broker {}", - destination.0 - ); - } + let pending_request = PendingRequest { + state: PendingRequestState::Routed { + request, + }, + destination, + ty: PendingRequestTy::Other, + combine_responses: 1, + }; - fn route_to_group_coordinator(&mut self, request: Message, group_id: GroupId) { - let destination = self.group_to_coordinator_broker.get(&group_id); - let destination = match destination { - Some(destination) => *destination, - None => { - tracing::info!("no known coordinator for {group_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"); - random_broker_id(&self.nodes, &mut self.rng) - } - }; + self.pending_requests.push_back(pending_request); + tracing::debug!("Routing request relating to controller to broker {}", destination.0); +} - tracing::debug!( - "Routing request relating to group id {:?} to broker {}", - group_id.0, - destination.0 - ); +fn route_to_group_coordinator(&mut self, request: Message, group_id: GroupId) { + let destination = self.group_to_coordinator_broker.get(&group_id).cloned().unwrap_or_else(|| { + tracing::info!("no known coordinator for {group_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"); + random_broker_id(&self.nodes, &mut self.rng) + }); + + tracing::debug!( + "Routing request relating to group id {:?} to broker {}", + group_id.0, + destination.0 + ); + + let pending_request = PendingRequest { + state: PendingRequestState::Routed { + request, + }, + destination, + ty: PendingRequestTy::RoutedToGroup(group_id), + combine_responses: 1, + }; - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - ty: PendingRequestTy::RoutedToGroup(group_id), - combine_responses: 1, - }); - } + self.pending_requests.push_back(pending_request); +} - fn route_to_transaction_coordinator( - &mut self, - request: Message, - transaction_id: TransactionalId, - ) { - let destination = self.transaction_to_coordinator_broker.get(&transaction_id); - let destination = match destination { - Some(destination) => *destination, - None => { - tracing::info!("no known coordinator for {transaction_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"); - random_broker_id(&self.nodes, &mut self.rng) - } - }; +fn route_to_transaction_coordinator( + &mut self, + request: Message, + transaction_id: TransactionalId, +) { + let destination = self.transaction_to_coordinator_broker.get(&transaction_id).cloned().unwrap_or_else(|| { + tracing::info!("no known coordinator for {transaction_id:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"); + random_broker_id(&self.nodes, &mut self.rng) + }); + + tracing::debug!( + "Routing request relating to transaction id {:?} to broker {}", + transaction_id.0, + destination.0 + ); + + let pending_request = PendingRequest { + state: PendingRequestState::Routed { + request, + }, + destination, + ty: PendingRequestTy::RoutedToTransaction(transaction_id), + combine_responses: 1, + }; - tracing::debug!( - "Routing request relating to transaction id {:?} to broker {}", - transaction_id.0, - destination.0 - ); + self.pending_requests.push_back(pending_request); +} - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - ty: PendingRequestTy::RoutedToTransaction(transaction_id), - combine_responses: 1, + +fn route_find_coordinator(&mut self, mut request: Message) { + if let Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::FindCoordinator(find_coordinator), + .. + })) = request.frame() { + let destination = random_broker_id(&self.nodes, &mut self.rng); + let ty = PendingRequestTy::FindCoordinator(FindCoordinator { + key: find_coordinator.key.clone(), + key_type: find_coordinator.key_type, }); - } - fn route_find_coordinator(&mut self, mut request: Message) { - if let Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::FindCoordinator(find_coordinator), - .. - })) = request.frame() - { - let destination = random_broker_id(&self.nodes, &mut self.rng); - let ty = PendingRequestTy::FindCoordinator(FindCoordinator { - key: find_coordinator.key.clone(), - key_type: find_coordinator.key_type, - }); - tracing::debug!("Routing FindCoordinator to random broker {}", destination.0); + tracing::debug!("Routing FindCoordinator to random broker {}", destination.0); - self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - ty, - combine_responses: 1, - }); - } + let pending_request = PendingRequest { + state: PendingRequestState::Routed { + request, + }, + destination, + ty, + combine_responses: 1, + }; + + self.pending_requests.push_back(pending_request); } +} async fn process_metadata_response(&mut self, metadata: &MetadataResponse) { for broker in &metadata.brokers {