From 4a626bc443dabf91778ffd4a8d84cbe92a148974 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 24 Sep 2024 14:29:55 +1000 Subject: [PATCH] rename PendingRequestTy -> PendingRequestState --- .../src/transforms/kafka/sink_cluster/mod.rs | 63 ++++++++++--------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index c58e1448d..2e59c3b3d 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -228,7 +228,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { pending_requests: Default::default(), // TODO: this approach with `find_coordinator_requests` and `routed_to_coordinator_for_group` // is prone to memory leaks and logic errors. - // We should replace these fields with extra state within `PendingRequestTy::Received/Sent`. + // We should replace these fields with extra state within `PendingRequestState::Received/Sent`. find_coordinator_requests: Default::default(), routed_to_coordinator_for_group: Default::default(), temp_responses_buffer: Default::default(), @@ -300,7 +300,7 @@ struct KafkaSinkCluster { /// State of a Request/Response is maintained by this enum. /// The state progresses from Routed -> Sent -> Received #[derive(Debug)] -enum PendingRequestTy { +enum PendingRequestState { /// A route has been determined for this request but it has not yet been sent. Routed { destination: Destination, @@ -319,7 +319,7 @@ enum PendingRequestTy { Received { response: Message }, } -impl PendingRequestTy { +impl PendingRequestState { fn routed(broker_id: BrokerId, request: Message) -> Self { Self::Routed { destination: Destination::Id(broker_id), @@ -329,7 +329,7 @@ impl PendingRequestTy { } struct PendingRequest { - ty: PendingRequestTy, + state: PendingRequestState, /// Combine the next N responses into a single response /// This message should be considered the base message and will retain the shotover Message::id and kafka correlation_id combine_responses: usize, @@ -804,7 +804,7 @@ impl KafkaSinkCluster { let destination = random_broker_id(&self.nodes, &mut self.rng); tracing::debug!("Routing request to random broker {}", destination.0); self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::routed(destination, message), + state: PendingRequestState::routed(destination, message), combine_responses: 1, }); } @@ -827,7 +827,7 @@ impl KafkaSinkCluster { let destination = random_broker_id(&self.nodes, &mut self.rng); self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::routed(destination, message), + state: PendingRequestState::routed(destination, message), combine_responses: 1, }); tracing::debug!( @@ -848,7 +848,7 @@ impl KafkaSinkCluster { produce.topic_data = topic_data; self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::routed(destination, message), + state: PendingRequestState::routed(destination, message), combine_responses: 1, }); tracing::debug!( @@ -880,7 +880,7 @@ impl KafkaSinkCluster { produce.topic_data = topic_data; } self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::routed(destination, request), + state: PendingRequestState::routed(destination, request), combine_responses, }); } @@ -1037,7 +1037,7 @@ impl KafkaSinkCluster { let destination = random_broker_id(&self.nodes, &mut self.rng); self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::routed(destination, message), + state: PendingRequestState::routed(destination, message), combine_responses: 1, }); tracing::debug!( @@ -1058,7 +1058,7 @@ impl KafkaSinkCluster { fetch.topics = topics; self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::routed(destination, message), + state: PendingRequestState::routed(destination, message), combine_responses: 1, }); tracing::debug!("Routing fetch request to single broker {}", destination.0); @@ -1087,7 +1087,7 @@ impl KafkaSinkCluster { fetch.topics = topics; } self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::routed(destination, request), + state: PendingRequestState::routed(destination, request), combine_responses, }); } @@ -1172,7 +1172,7 @@ impl KafkaSinkCluster { .context("Failed to query metadata of topics") } - /// Convert all PendingRequestTy::Routed into PendingRequestTy::Sent + /// Convert all PendingRequestState::Routed into PendingRequestState::Sent async fn send_requests(&mut self) -> Result<()> { struct RoutedRequests { requests: Vec, @@ -1181,7 +1181,8 @@ impl KafkaSinkCluster { let mut broker_to_routed_requests: HashMap = HashMap::new(); for i in 0..self.pending_requests.len() { - if let PendingRequestTy::Routed { destination, .. } = &self.pending_requests[i].ty { + if let PendingRequestState::Routed { destination, .. } = &self.pending_requests[i].state + { let routed_requests = broker_to_routed_requests .entry(*destination) .or_insert_with(|| RoutedRequests { @@ -1190,10 +1191,10 @@ impl KafkaSinkCluster { .pending_requests .iter() .filter(|pending_request| { - if let PendingRequestTy::Sent { + if let PendingRequestState::Sent { destination: check_destination, .. - } = &pending_request.ty + } = &pending_request.state { check_destination == destination } else { @@ -1202,12 +1203,12 @@ impl KafkaSinkCluster { }) .count(), }); - let mut value = PendingRequestTy::Sent { + let mut value = PendingRequestState::Sent { destination: *destination, index: routed_requests.requests.len() + routed_requests.already_pending, }; - std::mem::swap(&mut self.pending_requests[i].ty, &mut value); - if let PendingRequestTy::Routed { request, .. } = value { + std::mem::swap(&mut self.pending_requests[i].state, &mut value); + if let PendingRequestState::Routed { request, .. } = value { routed_requests.requests.push(request); } } @@ -1296,7 +1297,7 @@ impl KafkaSinkCluster { // To work around borrow checker issues, store connection errors in this temporary list before handling them. let mut connection_errors = vec![]; - // Convert all received PendingRequestTy::Sent into PendingRequestTy::Received + // Convert all received PendingRequestState::Sent into PendingRequestState::Received for (connection_destination, connection) in &mut self.connections.connections { // skip recv when no pending requests to avoid timeouts on old connections if connection.pending_requests_count() != 0 { @@ -1309,15 +1310,15 @@ impl KafkaSinkCluster { for response in self.temp_responses_buffer.drain(..) { let mut response = Some(response); for pending_request in &mut self.pending_requests { - if let PendingRequestTy::Sent { destination, index } = - &mut pending_request.ty + if let PendingRequestState::Sent { destination, index } = + &mut pending_request.state { if destination == connection_destination { - // Store the PendingRequestTy::Received at the location of the next PendingRequestTy::Sent - // All other PendingRequestTy::Sent need to be decremented, in order to determine the PendingRequestTy::Sent + // Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent + // All other PendingRequestState::Sent need to be decremented, in order to determine the PendingRequestState::Sent // to be used next time, and the time after that, and ... if *index == 0 { - pending_request.ty = PendingRequestTy::Received { + pending_request.state = PendingRequestState::Received { response: response.take().unwrap(), }; } else { @@ -1347,14 +1348,14 @@ impl KafkaSinkCluster { .await?; } - // Remove and return all PendingRequestTy::Received that are ready to be received. + // Remove and return all PendingRequestState::Received that are ready to be received. let mut responses = vec![]; while let Some(pending_request) = self.pending_requests.front() { let all_combined_received = (0..pending_request.combine_responses).all(|i| { matches!( self.pending_requests.get(i), Some(PendingRequest { - ty: PendingRequestTy::Received { .. }, + state: PendingRequestState::Received { .. }, .. }) ) @@ -1363,7 +1364,7 @@ impl KafkaSinkCluster { // The next response we are waiting on has been received, add it to responses if pending_request.combine_responses == 1 { if let Some(PendingRequest { - ty: PendingRequestTy::Received { response }, + state: PendingRequestState::Received { response }, .. }) = self.pending_requests.pop_front() { @@ -1375,7 +1376,7 @@ impl KafkaSinkCluster { .drain(..pending_request.combine_responses) .map(|x| { if let PendingRequest { - ty: PendingRequestTy::Received { response }, + state: PendingRequestState::Received { response }, .. } = x { @@ -1770,7 +1771,7 @@ impl KafkaSinkCluster { "route_to_control_connection cannot be called after auth is complete. Otherwise it would collide with control_send_receive" ); self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::Routed { + state: PendingRequestState::Routed { destination: Destination::ControlConnection, request, }, @@ -1794,7 +1795,7 @@ impl KafkaSinkCluster { }; self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::routed(destination, request), + state: PendingRequestState::routed(destination, request), combine_responses: 1, }); tracing::debug!( @@ -1816,7 +1817,7 @@ impl KafkaSinkCluster { }; self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::routed(destination, request), + state: PendingRequestState::routed(destination, request), combine_responses: 1, }); tracing::debug!(