diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 2e59c3b3d..1d620c8bf 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -313,10 +313,24 @@ enum PendingRequestState { /// When this is 0 the next response from the broker will be for this request. /// This field must be manually decremented when another response for this broker comes through. index: usize, + /// Type of the message sent + ty: SentRequestTy, + /// Some message types store the request here in case they need to resend it. + request: Option, }, /// The broker has returned a Response to this request. /// Returning this response may be delayed until a response to an earlier request comes back from another broker. - Received { response: Message }, + Received { + // TODO: move this into the parent type + destination: Destination, + response: Message, + /// Type of the message sent + ty: SentRequestTy, + /// Some message types store the request here in case they need to resend it. + // TODO: if we ever turn the Message into a CoW type we will be able to + // simplify this a lot by just storing the request field once in PendingRequest + request: Option, + }, } impl PendingRequestState { @@ -328,6 +342,16 @@ impl PendingRequestState { } } +#[derive(Debug, Clone)] +enum SentRequestTy { + Fetch { + originally_sent_at: Instant, + max_wait_ms: i32, + min_bytes: i32, + }, + Other, +} + struct PendingRequest { state: PendingRequestState, /// Combine the next N responses into a single response @@ -376,9 +400,18 @@ impl Transform for KafkaSinkCluster { self.send_requests() .await .context("Failed to send requests")?; - self.recv_responses() + let responses = self + .recv_responses() .await - .context("Failed to receive responses")? + .context("Failed to receive responses")?; + + // In the case of fetch requests, recv_responses may set pending requests back to routed state. + // So we retry send_requests immediately so the fetch requests arent stuck waiting for another call to `KafkaSinkCluster::transform`. + self.send_requests() + .await + .context("Failed to send requests")?; + + responses }; self.process_responses(&mut responses, &mut chain_state.close_client_connection) @@ -1063,11 +1096,19 @@ impl KafkaSinkCluster { }); tracing::debug!("Routing fetch request to single broker {}", destination.0); } else { + // Individual sub requests could delay the whole fetch request by up to max_wait_ms. + // So we need to rewrite max_wait_ms and min_bytes to ensure that the broker responds immediately. + // We then perform retries when receiving the response to uphold the original values + fetch.max_wait_ms = 1; + fetch.min_bytes = 1; + // The message has been split so it may be delivered to multiple destinations. // We must generate a unique message for each destination. let combine_responses = routing.len(); message.invalidate_cache(); for (i, (destination, topics)) in routing.into_iter().enumerate() { + tracing::info!("topics: {topics:?}"); + let destination = if destination == -1 { random_broker_id(&self.nodes, &mut self.rng) } else { @@ -1206,6 +1247,8 @@ impl KafkaSinkCluster { let mut value = PendingRequestState::Sent { destination: *destination, index: routed_requests.requests.len() + routed_requests.already_pending, + request: None, + ty: SentRequestTy::Other, }; std::mem::swap(&mut self.pending_requests[i].state, &mut value); if let PendingRequestState::Routed { request, .. } = value { @@ -1310,8 +1353,12 @@ impl KafkaSinkCluster { for response in self.temp_responses_buffer.drain(..) { let mut response = Some(response); for pending_request in &mut self.pending_requests { - if let PendingRequestState::Sent { destination, index } = - &mut pending_request.state + if let PendingRequestState::Sent { + destination, + index, + ty, + request, + } = &mut pending_request.state { if destination == connection_destination { // Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent @@ -1319,7 +1366,10 @@ impl KafkaSinkCluster { // to be used next time, and the time after that, and ... if *index == 0 { pending_request.state = PendingRequestState::Received { + destination: *destination, response: response.take().unwrap(), + ty: ty.clone(), + request: request.take(), }; } else { *index -= 1; @@ -1351,7 +1401,8 @@ impl KafkaSinkCluster { // Remove and return all PendingRequestState::Received that are ready to be received. let mut responses = vec![]; while let Some(pending_request) = self.pending_requests.front() { - let all_combined_received = (0..pending_request.combine_responses).all(|i| { + let combine_responses = pending_request.combine_responses; + let all_combined_received = (0..combine_responses).all(|i| { matches!( self.pending_requests.get(i), Some(PendingRequest { @@ -1361,30 +1412,64 @@ impl KafkaSinkCluster { ) }); if all_combined_received { + // if we received all responses but they dont contain enough data, resend the requests. + if let PendingRequestState::Received { + ty: + SentRequestTy::Fetch { + originally_sent_at, + max_wait_ms, + min_bytes, + }, + .. + } = pending_request.state + { + if originally_sent_at.elapsed() > Duration::from_millis(max_wait_ms as u64) + || Self::total_fetch_record_bytes( + &mut self.pending_requests, + combine_responses, + ) > min_bytes as i64 + { + for i in 0..combine_responses { + let pending_request = &mut self.pending_requests[i]; + if let PendingRequestState::Received { + destination, + request, + .. + } = &mut pending_request.state + { + pending_request.state = PendingRequestState::Routed { + destination: *destination, + request: request.take().unwrap(), + } + } + } + + // The pending_request is not received, we need to break to maintain response ordering. + break; + } + } + // The next response we are waiting on has been received, add it to responses - if pending_request.combine_responses == 1 { + if combine_responses == 1 { if let Some(PendingRequest { - state: PendingRequestState::Received { response }, + state: PendingRequestState::Received { response, .. }, .. }) = self.pending_requests.pop_front() { responses.push(response); } } else { - let drain = self - .pending_requests - .drain(..pending_request.combine_responses) - .map(|x| { - if let PendingRequest { - state: PendingRequestState::Received { response }, - .. - } = x - { - response - } else { - unreachable!("Guaranteed by all_combined_received") - } - }); + let drain = self.pending_requests.drain(..combine_responses).map(|x| { + if let PendingRequest { + state: PendingRequestState::Received { response, .. }, + .. + } = x + { + response + } else { + unreachable!("Guaranteed by all_combined_received") + } + }); responses.push(Self::combine_responses(drain)?); } } else { @@ -1395,6 +1480,43 @@ impl KafkaSinkCluster { Ok(responses) } + fn total_fetch_record_bytes( + pending_requests: &mut VecDeque, + combine_responses: usize, + ) -> i64 { + let mut result = 0; + for pending_request in pending_requests.iter_mut().take(combine_responses) { + if let PendingRequestState::Received { response, .. } = &mut pending_request.state { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::Fetch(fetch), + .. + })) = response.frame() + { + result += fetch + .responses + .iter() + .map(|x| { + x.partitions + .iter() + .map(|x| { + x.records + .as_ref() + .map(|x| x.len() as i64) + .unwrap_or_default() + }) + .sum::() + }) + .sum::(); + } else { + panic!("must be called on fetch responses") + } + } else { + panic!("must be called on received responses") + } + } + result + } + fn combine_responses(mut drain: impl Iterator) -> Result { // Take this response as base. // Then iterate over all remaining combined responses and integrate them into the base.