From 58e5e8df671365e92e095f0c7c9a7bac38022091 Mon Sep 17 00:00:00 2001 From: Joshua Varghese Date: Wed, 13 Nov 2024 23:11:03 +1100 Subject: [PATCH] Refactor PendingRequestState and PendingRequest to remove redundant destination field --- .../src/transforms/kafka/sink_cluster/mod.rs | 286 ++++++++---------- 1 file changed, 132 insertions(+), 154 deletions(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 2e189e3c5..ffe1561e5 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -332,12 +332,10 @@ pub(crate) struct KafkaSinkCluster { enum PendingRequestState { /// A route has been determined for this request but it has not yet been sent. Routed { - destination: Destination, request: Message, }, /// The request has been sent to the specified broker and we are now awaiting a response from that broker. Sent { - destination: Destination, /// How many responses must be received before this response is received. /// When this is 0 the next response from the broker will be for this request. /// This field must be manually decremented when another response for this broker comes through. @@ -348,25 +346,17 @@ enum PendingRequestState { /// The broker has returned a Response to this request. /// Returning this response may be delayed until a response to an earlier request comes back from another broker. Received { - // TODO: move this into the parent type - destination: Destination, response: Message, /// Some message types store the request here in case they need to resend it. - // TODO: if we ever turn the Message into a CoW type we will be able to - // simplify this a lot by just storing the request field once in PendingRequest request: Option, }, } impl PendingRequestState { - fn routed(broker_id: BrokerId, request: Message) -> Self { - Self::Routed { - destination: Destination::Id(broker_id), - request, - } + fn routed(request: Message) -> Self { + Self::Routed { request } } } - #[derive(Debug, Clone)] enum PendingRequestTy { Fetch { @@ -389,8 +379,10 @@ struct PendingRequest { /// Combine the next N responses into a single response /// This message should be considered the base message and will retain the shotover Message::id and kafka correlation_id combine_responses: usize, + destination: Destination, } + #[async_trait] impl Transform for KafkaSinkCluster { fn get_name(&self) -> &'static str { @@ -1076,9 +1068,10 @@ The connection to the client has been closed." let destination = random_broker_id(&self.nodes, &mut self.rng); tracing::debug!("Routing request to random broker {}", destination.0); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), ty: PendingRequestTy::Other, combine_responses: 1, + destination: Destination::Id(destination), }); } @@ -1088,16 +1081,17 @@ The connection to the client has been closed." ) -> Result<()> { if let Some(request_frame) = T::get_request_frame(&mut request) { let routing = T::split_by_destination(self, request_frame); - + if routing.is_empty() { // Produce contains no topics, so we can just pick a random destination. // The request is unchanged so we can just send as is. let destination = random_broker_id(&self.nodes, &mut self.rng); - + self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), ty: PendingRequestTy::Other, combine_responses: 1, + destination: Destination::Id(destination), }); tracing::debug!( "Routing request to random broker {} due to being empty", @@ -1114,12 +1108,13 @@ The connection to the client has been closed." } else { destination }; - + T::reassemble(request_frame, topic_data); self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: ::routed(request), ty: PendingRequestTy::Other, combine_responses: 1, + destination: Destination::Id(destination), }); tracing::debug!("Routing request to single broker {:?}", destination.0); } else { @@ -1143,9 +1138,10 @@ The connection to the client has been closed." T::reassemble(request_frame, topic_data) } self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), ty: PendingRequestTy::Other, combine_responses, + destination: Destination::Id(destination), }); } tracing::debug!("Routing request to multiple brokers"); @@ -1294,17 +1290,17 @@ The connection to the client has been closed." })) = request.frame() { let routing = self.split_fetch_request_by_destination(fetch); - + if routing.is_empty() { // Fetch contains no topics, so we can just pick a random destination. // The message is unchanged so we can just send as is. let destination = random_broker_id(&self.nodes, &mut self.rng); - + self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - // we dont need special handling for fetch, so just use Other + state: PendingRequestState::routed(request), ty: PendingRequestTy::Other, combine_responses: 1, + destination: Destination::Id(destination), }); tracing::debug!( "Routing fetch request to random broker {} due to being empty", @@ -1321,13 +1317,13 @@ The connection to the client has been closed." } else { destination }; - + fetch.topics = topics; self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), - // we dont need special handling for fetch, so just use Other + state: PendingRequestState::routed(request), ty: PendingRequestTy::Other, combine_responses: 1, + destination: Destination::Id(destination), }); tracing::debug!("Routing fetch request to single broker {}", destination.0); } else { @@ -1338,7 +1334,7 @@ The connection to the client has been closed." let min_bytes = fetch.min_bytes; fetch.max_wait_ms = 1; fetch.min_bytes = 1; - + // The message has been split so it may be delivered to multiple destinations. // We must generate a unique message for each destination. let combine_responses = routing.len(); @@ -1363,19 +1359,20 @@ The connection to the client has been closed." fetch.topics = topics; } self.pending_requests.push_back(PendingRequest { - state: PendingRequestState::routed(destination, request), + state: PendingRequestState::routed(request), ty: PendingRequestTy::Fetch { originally_sent_at: Instant::now(), max_wait_ms, min_bytes, }, combine_responses, + destination: Destination::Id(destination), }); } tracing::debug!("Routing fetch request to multiple brokers"); } } - + Ok(()) } @@ -1705,142 +1702,123 @@ The connection to the client has been closed." .context("Failed to query metadata of topics") } - /// Convert all PendingRequestState::Routed into PendingRequestState::Sent - async fn send_requests(&mut self) -> Result<()> { - struct RoutedRequests { - requests: Vec, - already_pending: usize, - } +/// Convert all PendingRequestState::Routed into PendingRequestState::Sent +async fn send_requests(&mut self) -> Result<()> { + struct RoutedRequests { + requests: Vec, + already_pending: usize, + } - let mut broker_to_routed_requests: HashMap = HashMap::new(); - for i in 0..self.pending_requests.len() { - if let PendingRequestState::Routed { destination, .. } = &self.pending_requests[i].state - { - let routed_requests = broker_to_routed_requests - .entry(*destination) - .or_insert_with(|| RoutedRequests { - requests: vec![], - already_pending: self - .pending_requests - .iter() - .filter(|pending_request| { - if let PendingRequestState::Sent { - destination: check_destination, - .. - } = &pending_request.state - { - check_destination == destination - } else { - false - } - }) - .count(), - }); + let mut broker_to_routed_requests: HashMap = HashMap::new(); + for i in 0..self.pending_requests.len() { + if let PendingRequestState::Routed { request } = &self.pending_requests[i].state { + let destination = self.pending_requests[i].destination; + let routed_requests = broker_to_routed_requests + .entry(destination) + .or_insert_with(|| RoutedRequests { + requests: vec![], + already_pending: self + .pending_requests + .iter() + .filter(|pending_request| { + pending_request.destination == destination + && matches!(pending_request.state, PendingRequestState::Sent { .. }) + }) + .count(), + }); - let request = match self.pending_requests[i].ty { - PendingRequestTy::Fetch { .. } => { - if let PendingRequestState::Routed { request, .. } = - &self.pending_requests[i].state - { - Some(request.clone()) - } else { - unreachable!() - } - } - PendingRequestTy::RoutedToGroup(_) => None, - PendingRequestTy::RoutedToTransaction(_) => None, - PendingRequestTy::FindCoordinator(_) => None, - PendingRequestTy::Other => None, - }; - let mut value = PendingRequestState::Sent { - destination: *destination, - index: routed_requests.requests.len() + routed_requests.already_pending, - request, - }; - std::mem::swap(&mut self.pending_requests[i].state, &mut value); - if let PendingRequestState::Routed { request, .. } = value { - routed_requests.requests.push(request); - } + let request_to_store = match self.pending_requests[i].ty { + PendingRequestTy::Fetch { .. } => Some(request.clone()), + _ => None, + }; + let mut value = PendingRequestState::Sent { + index: routed_requests.requests.len() + routed_requests.already_pending, + request: request_to_store, + }; + std::mem::swap(&mut self.pending_requests[i].state, &mut value); + if let PendingRequestState::Routed { request } = value { + routed_requests.requests.push(request); } } + } - let recent_instant = Instant::now(); - for (destination, mut requests) in broker_to_routed_requests { - match self - .connections - .get_or_open_connection( - &mut self.rng, - &self.connection_factory, - &self.authorize_scram_over_mtls, - &self.sasl_mechanism, - &self.nodes, - &self.first_contact_points, - &self.rack, - recent_instant, - destination, - ) - .await - { - Ok(connection) => { - if let Err(err) = connection.send(requests.requests) { - // Dont retry the send on the new connection since we cant tell if the broker received the request or not. - self.connections - .handle_connection_error( - &self.connection_factory, - &self.authorize_scram_over_mtls, - &self.sasl_mechanism, - &self.nodes, - destination, - err.clone().into(), - ) - .await?; - // If we succesfully recreate the outgoing connection we still need to terminate this incoming connection since the request is lost. - return Err(err.into()); - } + let recent_instant = Instant::now(); + for (destination, mut requests) in broker_to_routed_requests { + match self + .connections + .get_or_open_connection( + &mut self.rng, + &self.connection_factory, + &self.authorize_scram_over_mtls, + &self.sasl_mechanism, + &self.nodes, + &self.first_contact_points, + &self.rack, + recent_instant, + destination, + ) + .await + { + Ok(connection) => { + if let Err(err) = connection.send(requests.requests) { + // Don't retry the send on the new connection since we can't tell if the broker received the request or not. + self.connections + .handle_connection_error( + &self.connection_factory, + &self.authorize_scram_over_mtls, + &self.sasl_mechanism, + &self.nodes, + destination, + err.clone().into(), + ) + .await?; + // If we successfully recreate the outgoing connection we still need to terminate this incoming connection since the request is lost. + return Err(err.into()); } - Err(err) => { - // set node as down, the connection already failed to create so no point running through handle_connection_error, - // as that will recreate the connection which we already know just failed. - // Instead just directly set the node as down and return the error - - // set node as down - self.nodes - .iter() - .find(|x| match destination { - Destination::Id(id) => x.broker_id == id, - Destination::ControlConnection => { - &x.kafka_address - == self - .connections - .control_connection_address - .as_ref() - .unwrap() - } - }) - .unwrap() - .set_state(KafkaNodeState::Down); + } + Err(err) => { + // Set node as down, the connection already failed to create so no point running through handle_connection_error, + // as that will recreate the connection which we already know just failed. + // Instead just directly set the node as down and return the error - // bubble up error - let request_types: Vec = requests - .requests - .iter_mut() - .map(|x| match x.frame() { - Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => { - format!("{:?}", ApiKey::try_from(header.request_api_key).unwrap()) - } - _ => "Unknown".to_owned(), - }) - .collect(); - return Err(err.context(format!( - "Failed to get connection to send requests {request_types:?}" - ))); - } + // Set node as down + self.nodes + .iter() + .find(|x| match destination { + Destination::Id(id) => x.broker_id == id, + Destination::ControlConnection => { + &x.kafka_address + == self + .connections + .control_connection_address + .as_ref() + .unwrap() + } + }) + .unwrap() + .set_state(KafkaNodeState::Down); + + // Bubble up error + let request_types: Vec = requests + .requests + .iter_mut() + .map(|x| match x.frame() { + Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => { + format!("{:?}", ApiKey::try_from(header.request_api_key).unwrap()) + } + _ => "Unknown".to_owned(), + }) + .collect(); + return Err(err.context(format!( + "Failed to get connection to send requests {request_types:?}" + ))); } } - - Ok(()) } + Ok(()) +} + /// Receive all responses from the outgoing connections, returns all responses that are ready to be returned. /// For response ordering reasons, some responses will remain in self.pending_requests until other responses are received. async fn recv_responses(&mut self, close_client_connection: &mut bool) -> Result> {