Skip to content

Commit

Permalink
KafkaSinkCluster: fix consumer latency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 24, 2024
1 parent 0eb9ff1 commit 5185e6f
Showing 1 changed file with 144 additions and 22 deletions.
166 changes: 144 additions & 22 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,24 @@ enum PendingRequestState {
/// When this is 0 the next response from the broker will be for this request.
/// This field must be manually decremented when another response for this broker comes through.
index: usize,
/// Type of the message sent
ty: SentRequestTy,
/// Some message types store the request here in case they need to resend it.
request: Option<Message>,
},
/// The broker has returned a Response to this request.
/// Returning this response may be delayed until a response to an earlier request comes back from another broker.
Received { response: Message },
Received {
// TODO: move this into the parent type
destination: Destination,
response: Message,
/// Type of the message sent
ty: SentRequestTy,
/// Some message types store the request here in case they need to resend it.
// TODO: if we ever turn the Message into a CoW type we will be able to
// simplify this a lot by just storing the request field once in PendingRequest
request: Option<Message>,
},
}

impl PendingRequestState {
Expand All @@ -328,6 +342,16 @@ impl PendingRequestState {
}
}

#[derive(Debug, Clone)]
enum SentRequestTy {
Fetch {
originally_sent_at: Instant,
max_wait_ms: i32,
min_bytes: i32,
},
Other,
}

struct PendingRequest {
state: PendingRequestState,
/// Combine the next N responses into a single response
Expand Down Expand Up @@ -376,9 +400,18 @@ impl Transform for KafkaSinkCluster {
self.send_requests()
.await
.context("Failed to send requests")?;
self.recv_responses()
let responses = self
.recv_responses()
.await
.context("Failed to receive responses")?
.context("Failed to receive responses")?;

// In the case of fetch requests, recv_responses may set pending requests back to routed state.
// So we retry send_requests immediately so the fetch requests arent stuck waiting for another call to `KafkaSinkCluster::transform`.
self.send_requests()
.await
.context("Failed to send requests")?;

responses
};

self.process_responses(&mut responses, &mut chain_state.close_client_connection)
Expand Down Expand Up @@ -1063,11 +1096,19 @@ impl KafkaSinkCluster {
});
tracing::debug!("Routing fetch request to single broker {}", destination.0);
} else {
// Individual sub requests could delay the whole fetch request by up to max_wait_ms.
// So we need to rewrite max_wait_ms and min_bytes to ensure that the broker responds immediately.
// We then perform retries when receiving the response to uphold the original values
fetch.max_wait_ms = 1;
fetch.min_bytes = 1;

// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
let combine_responses = routing.len();
message.invalidate_cache();
for (i, (destination, topics)) in routing.into_iter().enumerate() {
tracing::info!("topics: {topics:?}");

let destination = if destination == -1 {
random_broker_id(&self.nodes, &mut self.rng)
} else {
Expand Down Expand Up @@ -1206,6 +1247,8 @@ impl KafkaSinkCluster {
let mut value = PendingRequestState::Sent {
destination: *destination,
index: routed_requests.requests.len() + routed_requests.already_pending,
request: None,
ty: SentRequestTy::Other,
};
std::mem::swap(&mut self.pending_requests[i].state, &mut value);
if let PendingRequestState::Routed { request, .. } = value {
Expand Down Expand Up @@ -1310,16 +1353,23 @@ impl KafkaSinkCluster {
for response in self.temp_responses_buffer.drain(..) {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestState::Sent { destination, index } =
&mut pending_request.state
if let PendingRequestState::Sent {
destination,
index,
ty,
request,
} = &mut pending_request.state
{
if destination == connection_destination {
// Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent
// All other PendingRequestState::Sent need to be decremented, in order to determine the PendingRequestState::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.state = PendingRequestState::Received {
destination: *destination,
response: response.take().unwrap(),
ty: ty.clone(),
request: request.take(),
};
} else {
*index -= 1;
Expand Down Expand Up @@ -1351,7 +1401,8 @@ impl KafkaSinkCluster {
// Remove and return all PendingRequestState::Received that are ready to be received.
let mut responses = vec![];
while let Some(pending_request) = self.pending_requests.front() {
let all_combined_received = (0..pending_request.combine_responses).all(|i| {
let combine_responses = pending_request.combine_responses;
let all_combined_received = (0..combine_responses).all(|i| {
matches!(
self.pending_requests.get(i),
Some(PendingRequest {
Expand All @@ -1361,30 +1412,64 @@ impl KafkaSinkCluster {
)
});
if all_combined_received {
// if we received all responses but they dont contain enough data, resend the requests.
if let PendingRequestState::Received {
ty:
SentRequestTy::Fetch {
originally_sent_at,
max_wait_ms,
min_bytes,
},
..
} = pending_request.state
{
if originally_sent_at.elapsed() > Duration::from_millis(max_wait_ms as u64)
|| Self::total_fetch_record_bytes(
&mut self.pending_requests,
combine_responses,
) > min_bytes as i64
{
for i in 0..combine_responses {
let pending_request = &mut self.pending_requests[i];
if let PendingRequestState::Received {
destination,
request,
..
} = &mut pending_request.state
{
pending_request.state = PendingRequestState::Routed {
destination: *destination,
request: request.take().unwrap(),
}
}
}

// The pending_request is not received, we need to break to maintain response ordering.
break;
}
}

// The next response we are waiting on has been received, add it to responses
if pending_request.combine_responses == 1 {
if combine_responses == 1 {
if let Some(PendingRequest {
state: PendingRequestState::Received { response },
state: PendingRequestState::Received { response, .. },
..
}) = self.pending_requests.pop_front()
{
responses.push(response);
}
} else {
let drain = self
.pending_requests
.drain(..pending_request.combine_responses)
.map(|x| {
if let PendingRequest {
state: PendingRequestState::Received { response },
..
} = x
{
response
} else {
unreachable!("Guaranteed by all_combined_received")
}
});
let drain = self.pending_requests.drain(..combine_responses).map(|x| {
if let PendingRequest {
state: PendingRequestState::Received { response, .. },
..
} = x
{
response
} else {
unreachable!("Guaranteed by all_combined_received")
}
});
responses.push(Self::combine_responses(drain)?);
}
} else {
Expand All @@ -1395,6 +1480,43 @@ impl KafkaSinkCluster {
Ok(responses)
}

fn total_fetch_record_bytes(
pending_requests: &mut VecDeque<PendingRequest>,
combine_responses: usize,
) -> i64 {
let mut result = 0;
for pending_request in pending_requests.iter_mut().take(combine_responses) {
if let PendingRequestState::Received { response, .. } = &mut pending_request.state {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Fetch(fetch),
..
})) = response.frame()
{
result += fetch
.responses
.iter()
.map(|x| {
x.partitions
.iter()
.map(|x| {
x.records
.as_ref()
.map(|x| x.len() as i64)
.unwrap_or_default()
})
.sum::<i64>()
})
.sum::<i64>();
} else {
panic!("must be called on fetch responses")
}
} else {
panic!("must be called on received responses")
}
}
result
}

fn combine_responses(mut drain: impl Iterator<Item = Message>) -> Result<Message> {
// Take this response as base.
// Then iterate over all remaining combined responses and integrate them into the base.
Expand Down

0 comments on commit 5185e6f

Please sign in to comment.