diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 4d4a027fa..feefdd0e2 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -212,6 +212,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { temp_responses_buffer: Default::default(), sasl_mechanism: None, authorize_scram_over_mtls: self.authorize_scram_over_mtls.as_ref().map(|x| x.build()), + refetch_backoff: Duration::from_millis(1), }) } @@ -273,6 +274,7 @@ struct KafkaSinkCluster { sasl_mechanism: Option, authorize_scram_over_mtls: Option, connections: Connections, + refetch_backoff: Duration, } /// State of a Request/Response is maintained by this enum. @@ -291,10 +293,20 @@ enum PendingRequestState { /// When this is 0 the next response from the broker will be for this request. /// This field must be manually decremented when another response for this broker comes through. index: usize, + /// Some message types store the request here in case they need to resend it. + request: Option, }, /// The broker has returned a Response to this request. /// Returning this response may be delayed until a response to an earlier request comes back from another broker. - Received { response: Message }, + Received { + // TODO: move this into the parent type + destination: Destination, + response: Message, + /// Some message types store the request here in case they need to resend it. + // TODO: if we ever turn the Message into a CoW type we will be able to + // simplify this a lot by just storing the request field once in PendingRequest + request: Option, + }, } impl PendingRequestState { @@ -306,8 +318,20 @@ impl PendingRequestState { } } +#[derive(Debug)] +enum PendingRequestTy { + Fetch { + originally_sent_at: Instant, + max_wait_ms: i32, + min_bytes: i32, + }, + Other, +} + struct PendingRequest { state: PendingRequestState, + /// Type of the request sent + ty: PendingRequestTy, /// Combine the next N responses into a single response /// This message should be considered the base message and will retain the shotover Message::id and kafka correlation_id combine_responses: usize, @@ -783,6 +807,7 @@ impl KafkaSinkCluster { tracing::debug!("Routing request to random broker {}", destination.0); self.pending_requests.push_back(PendingRequest { state: PendingRequestState::routed(destination, message), + ty: PendingRequestTy::Other, combine_responses: 1, }); } @@ -806,6 +831,7 @@ impl KafkaSinkCluster { self.pending_requests.push_back(PendingRequest { state: PendingRequestState::routed(destination, message), + ty: PendingRequestTy::Other, combine_responses: 1, }); tracing::debug!( @@ -827,6 +853,7 @@ impl KafkaSinkCluster { produce.topic_data = topic_data; self.pending_requests.push_back(PendingRequest { state: PendingRequestState::routed(destination, message), + ty: PendingRequestTy::Other, combine_responses: 1, }); tracing::debug!( @@ -859,6 +886,7 @@ impl KafkaSinkCluster { } self.pending_requests.push_back(PendingRequest { state: PendingRequestState::routed(destination, request), + ty: PendingRequestTy::Other, combine_responses, }); } @@ -1016,6 +1044,8 @@ impl KafkaSinkCluster { self.pending_requests.push_back(PendingRequest { state: PendingRequestState::routed(destination, message), + // we dont need special handling for fetch, so just use Other + ty: PendingRequestTy::Other, combine_responses: 1, }); tracing::debug!( @@ -1037,10 +1067,20 @@ impl KafkaSinkCluster { fetch.topics = topics; self.pending_requests.push_back(PendingRequest { state: PendingRequestState::routed(destination, message), + // we dont need special handling for fetch, so just use Other + ty: PendingRequestTy::Other, combine_responses: 1, }); tracing::debug!("Routing fetch request to single broker {}", destination.0); } else { + // Individual sub requests could delay the whole fetch request by up to max_wait_ms. + // So we need to rewrite max_wait_ms and min_bytes to ensure that the broker responds immediately. + // We then perform retries when receiving the response to uphold the original values + let max_wait_ms = fetch.max_wait_ms; + let min_bytes = fetch.min_bytes; + fetch.max_wait_ms = 1; + fetch.min_bytes = 1; + // The message has been split so it may be delivered to multiple destinations. // We must generate a unique message for each destination. let combine_responses = routing.len(); @@ -1066,6 +1106,11 @@ impl KafkaSinkCluster { } self.pending_requests.push_back(PendingRequest { state: PendingRequestState::routed(destination, request), + ty: PendingRequestTy::Fetch { + originally_sent_at: Instant::now(), + max_wait_ms, + min_bytes, + }, combine_responses, }); } @@ -1181,9 +1226,23 @@ impl KafkaSinkCluster { }) .count(), }); + + let request = match self.pending_requests[i].ty { + PendingRequestTy::Fetch { .. } => { + if let PendingRequestState::Routed { request, .. } = + &self.pending_requests[i].state + { + Some(request.clone()) + } else { + unreachable!() + } + } + PendingRequestTy::Other => None, + }; let mut value = PendingRequestState::Sent { destination: *destination, index: routed_requests.requests.len() + routed_requests.already_pending, + request, }; std::mem::swap(&mut self.pending_requests[i].state, &mut value); if let PendingRequestState::Routed { request, .. } = value { @@ -1288,8 +1347,11 @@ impl KafkaSinkCluster { for response in self.temp_responses_buffer.drain(..) { let mut response = Some(response); for pending_request in &mut self.pending_requests { - if let PendingRequestState::Sent { destination, index } = - &mut pending_request.state + if let PendingRequestState::Sent { + destination, + index, + request, + } = &mut pending_request.state { if destination == connection_destination { // Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent @@ -1297,7 +1359,9 @@ impl KafkaSinkCluster { // to be used next time, and the time after that, and ... if *index == 0 { pending_request.state = PendingRequestState::Received { + destination: *destination, response: response.take().unwrap(), + request: request.take(), }; } else { *index -= 1; @@ -1329,7 +1393,8 @@ impl KafkaSinkCluster { // Remove and return all PendingRequestState::Received that are ready to be received. let mut responses = vec![]; while let Some(pending_request) = self.pending_requests.front() { - let all_combined_received = (0..pending_request.combine_responses).all(|i| { + let combine_responses = pending_request.combine_responses; + let all_combined_received = (0..combine_responses).all(|i| { matches!( self.pending_requests.get(i), Some(PendingRequest { @@ -1339,30 +1404,70 @@ impl KafkaSinkCluster { ) }); if all_combined_received { + // perform special handling for certain message types + if let PendingRequestTy::Fetch { + originally_sent_at, + max_wait_ms, + min_bytes, + } = pending_request.ty + { + // resend the requests if we havent yet met the `max_wait_ms` and `min_bytes` requirements + if originally_sent_at.elapsed() < Duration::from_millis(max_wait_ms as u64) + && Self::total_fetch_record_bytes( + &mut self.pending_requests, + combine_responses, + ) < min_bytes as i64 + { + tokio::time::sleep(self.refetch_backoff).await; + + // exponential backoff + self.refetch_backoff *= 2; + + for i in 0..combine_responses { + let pending_request = &mut self.pending_requests[i]; + if let PendingRequestState::Received { + destination, + request, + .. + } = &mut pending_request.state + { + pending_request.state = PendingRequestState::Routed { + destination: *destination, + request: request.take().unwrap(), + } + } else { + unreachable!() + } + } + + // The pending_request is not received, we need to break to maintain response ordering. + break; + } else { + self.refetch_backoff = Duration::from_millis(1); + } + } + // The next response we are waiting on has been received, add it to responses - if pending_request.combine_responses == 1 { + if combine_responses == 1 { if let Some(PendingRequest { - state: PendingRequestState::Received { response }, + state: PendingRequestState::Received { response, .. }, .. }) = self.pending_requests.pop_front() { responses.push(response); } } else { - let drain = self - .pending_requests - .drain(..pending_request.combine_responses) - .map(|x| { - if let PendingRequest { - state: PendingRequestState::Received { response }, - .. - } = x - { - response - } else { - unreachable!("Guaranteed by all_combined_received") - } - }); + let drain = self.pending_requests.drain(..combine_responses).map(|x| { + if let PendingRequest { + state: PendingRequestState::Received { response, .. }, + .. + } = x + { + response + } else { + unreachable!("Guaranteed by all_combined_received") + } + }); responses.push(Self::combine_responses(drain)?); } } else { @@ -1370,9 +1475,53 @@ impl KafkaSinkCluster { break; } } + + // In the case of fetch requests, recv_responses may set pending requests back to routed state. + // So we retry send_requests immediately so the fetch requests arent stuck waiting for another call to `KafkaSinkCluster::transform`. + self.send_requests() + .await + .context("Failed to send requests")?; + Ok(responses) } + fn total_fetch_record_bytes( + pending_requests: &mut VecDeque, + combine_responses: usize, + ) -> i64 { + let mut result = 0; + for pending_request in pending_requests.iter_mut().take(combine_responses) { + if let PendingRequestState::Received { response, .. } = &mut pending_request.state { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::Fetch(fetch), + .. + })) = response.frame() + { + result += fetch + .responses + .iter() + .map(|x| { + x.partitions + .iter() + .map(|x| { + x.records + .as_ref() + .map(|x| x.len() as i64) + .unwrap_or_default() + }) + .sum::() + }) + .sum::(); + } else { + panic!("must be called on fetch responses") + } + } else { + panic!("must be called on received responses") + } + } + result + } + fn combine_responses(mut drain: impl Iterator) -> Result { // Take this response as base. // Then iterate over all remaining combined responses and integrate them into the base. @@ -1753,6 +1902,7 @@ impl KafkaSinkCluster { destination: Destination::ControlConnection, request, }, + ty: PendingRequestTy::Other, combine_responses: 1, }); tracing::debug!("Routing request to control connection"); @@ -1774,6 +1924,7 @@ impl KafkaSinkCluster { self.pending_requests.push_back(PendingRequest { state: PendingRequestState::routed(destination, request), + ty: PendingRequestTy::Other, combine_responses: 1, }); tracing::debug!( @@ -1796,6 +1947,7 @@ impl KafkaSinkCluster { self.pending_requests.push_back(PendingRequest { state: PendingRequestState::routed(destination, request), + ty: PendingRequestTy::Other, combine_responses: 1, }); tracing::debug!(