Skip to content

Commit

Permalink
KafkaSinkCluster: fix consumer latency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 1, 2024
1 parent eb4fb2f commit 4e0809a
Showing 1 changed file with 172 additions and 20 deletions.
192 changes: 172 additions & 20 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
temp_responses_buffer: Default::default(),
sasl_mechanism: None,
authorize_scram_over_mtls: self.authorize_scram_over_mtls.as_ref().map(|x| x.build()),
refetch_backoff: Duration::from_millis(1),
})
}

Expand Down Expand Up @@ -273,6 +274,7 @@ struct KafkaSinkCluster {
sasl_mechanism: Option<String>,
authorize_scram_over_mtls: Option<AuthorizeScramOverMtls>,
connections: Connections,
refetch_backoff: Duration,
}

/// State of a Request/Response is maintained by this enum.
Expand All @@ -291,10 +293,20 @@ enum PendingRequestState {
/// When this is 0 the next response from the broker will be for this request.
/// This field must be manually decremented when another response for this broker comes through.
index: usize,
/// Some message types store the request here in case they need to resend it.
request: Option<Message>,
},
/// The broker has returned a Response to this request.
/// Returning this response may be delayed until a response to an earlier request comes back from another broker.
Received { response: Message },
Received {
// TODO: move this into the parent type
destination: Destination,
response: Message,
/// Some message types store the request here in case they need to resend it.
// TODO: if we ever turn the Message into a CoW type we will be able to
// simplify this a lot by just storing the request field once in PendingRequest
request: Option<Message>,
},
}

impl PendingRequestState {
Expand All @@ -306,8 +318,20 @@ impl PendingRequestState {
}
}

#[derive(Debug)]
enum PendingRequestTy {
Fetch {
originally_sent_at: Instant,
max_wait_ms: i32,
min_bytes: i32,
},
Other,
}

struct PendingRequest {
state: PendingRequestState,
/// Type of the request sent
ty: PendingRequestTy,
/// Combine the next N responses into a single response
/// This message should be considered the base message and will retain the shotover Message::id and kafka correlation_id
combine_responses: usize,
Expand Down Expand Up @@ -783,6 +807,7 @@ impl KafkaSinkCluster {
tracing::debug!("Routing request to random broker {}", destination.0);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, message),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
}
Expand All @@ -806,6 +831,7 @@ impl KafkaSinkCluster {

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, message),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
Expand All @@ -827,6 +853,7 @@ impl KafkaSinkCluster {
produce.topic_data = topic_data;
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, message),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
Expand Down Expand Up @@ -859,6 +886,7 @@ impl KafkaSinkCluster {
}
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses,
});
}
Expand Down Expand Up @@ -1016,6 +1044,8 @@ impl KafkaSinkCluster {

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, message),
// we dont need special handling for fetch, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
Expand All @@ -1037,10 +1067,20 @@ impl KafkaSinkCluster {
fetch.topics = topics;
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, message),
// we dont need special handling for fetch, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!("Routing fetch request to single broker {}", destination.0);
} else {
// Individual sub requests could delay the whole fetch request by up to max_wait_ms.
// So we need to rewrite max_wait_ms and min_bytes to ensure that the broker responds immediately.
// We then perform retries when receiving the response to uphold the original values
let max_wait_ms = fetch.max_wait_ms;
let min_bytes = fetch.min_bytes;
fetch.max_wait_ms = 1;
fetch.min_bytes = 1;

// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
let combine_responses = routing.len();
Expand All @@ -1066,6 +1106,11 @@ impl KafkaSinkCluster {
}
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Fetch {
originally_sent_at: Instant::now(),
max_wait_ms,
min_bytes,
},
combine_responses,
});
}
Expand Down Expand Up @@ -1181,9 +1226,23 @@ impl KafkaSinkCluster {
})
.count(),
});

let request = match self.pending_requests[i].ty {
PendingRequestTy::Fetch { .. } => {
if let PendingRequestState::Routed { request, .. } =
&self.pending_requests[i].state
{
Some(request.clone())
} else {
unreachable!()
}
}
PendingRequestTy::Other => None,
};
let mut value = PendingRequestState::Sent {
destination: *destination,
index: routed_requests.requests.len() + routed_requests.already_pending,
request,
};
std::mem::swap(&mut self.pending_requests[i].state, &mut value);
if let PendingRequestState::Routed { request, .. } = value {
Expand Down Expand Up @@ -1288,16 +1347,21 @@ impl KafkaSinkCluster {
for response in self.temp_responses_buffer.drain(..) {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestState::Sent { destination, index } =
&mut pending_request.state
if let PendingRequestState::Sent {
destination,
index,
request,
} = &mut pending_request.state
{
if destination == connection_destination {
// Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent
// All other PendingRequestState::Sent need to be decremented, in order to determine the PendingRequestState::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.state = PendingRequestState::Received {
destination: *destination,
response: response.take().unwrap(),
request: request.take(),
};
} else {
*index -= 1;
Expand Down Expand Up @@ -1329,7 +1393,8 @@ impl KafkaSinkCluster {
// Remove and return all PendingRequestState::Received that are ready to be received.
let mut responses = vec![];
while let Some(pending_request) = self.pending_requests.front() {
let all_combined_received = (0..pending_request.combine_responses).all(|i| {
let combine_responses = pending_request.combine_responses;
let all_combined_received = (0..combine_responses).all(|i| {
matches!(
self.pending_requests.get(i),
Some(PendingRequest {
Expand All @@ -1339,40 +1404,124 @@ impl KafkaSinkCluster {
)
});
if all_combined_received {
// perform special handling for certain message types
if let PendingRequestTy::Fetch {
originally_sent_at,
max_wait_ms,
min_bytes,
} = pending_request.ty
{
// resend the requests if we havent yet met the `max_wait_ms` and `min_bytes` requirements
if originally_sent_at.elapsed() < Duration::from_millis(max_wait_ms as u64)
&& Self::total_fetch_record_bytes(
&mut self.pending_requests,
combine_responses,
) < min_bytes as i64
{
tokio::time::sleep(self.refetch_backoff).await;

// exponential backoff
self.refetch_backoff *= 2;

for i in 0..combine_responses {
let pending_request = &mut self.pending_requests[i];
if let PendingRequestState::Received {
destination,
request,
..
} = &mut pending_request.state
{
pending_request.state = PendingRequestState::Routed {
destination: *destination,
request: request.take().unwrap(),
}
} else {
unreachable!()
}
}

// The pending_request is not received, we need to break to maintain response ordering.
break;
} else {
self.refetch_backoff = Duration::from_millis(1);
}
}

// The next response we are waiting on has been received, add it to responses
if pending_request.combine_responses == 1 {
if combine_responses == 1 {
if let Some(PendingRequest {
state: PendingRequestState::Received { response },
state: PendingRequestState::Received { response, .. },
..
}) = self.pending_requests.pop_front()
{
responses.push(response);
}
} else {
let drain = self
.pending_requests
.drain(..pending_request.combine_responses)
.map(|x| {
if let PendingRequest {
state: PendingRequestState::Received { response },
..
} = x
{
response
} else {
unreachable!("Guaranteed by all_combined_received")
}
});
let drain = self.pending_requests.drain(..combine_responses).map(|x| {
if let PendingRequest {
state: PendingRequestState::Received { response, .. },
..
} = x
{
response
} else {
unreachable!("Guaranteed by all_combined_received")
}
});
responses.push(Self::combine_responses(drain)?);
}
} else {
// The pending_request is not received, we need to break to maintain response ordering.
break;
}
}

// In the case of fetch requests, recv_responses may set pending requests back to routed state.
// So we retry send_requests immediately so the fetch requests arent stuck waiting for another call to `KafkaSinkCluster::transform`.
self.send_requests()
.await
.context("Failed to send requests")?;

Ok(responses)
}

fn total_fetch_record_bytes(
pending_requests: &mut VecDeque<PendingRequest>,
combine_responses: usize,
) -> i64 {
let mut result = 0;
for pending_request in pending_requests.iter_mut().take(combine_responses) {
if let PendingRequestState::Received { response, .. } = &mut pending_request.state {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Fetch(fetch),
..
})) = response.frame()
{
result += fetch
.responses
.iter()
.map(|x| {
x.partitions
.iter()
.map(|x| {
x.records
.as_ref()
.map(|x| x.len() as i64)
.unwrap_or_default()
})
.sum::<i64>()
})
.sum::<i64>();
} else {
panic!("must be called on fetch responses")
}
} else {
panic!("must be called on received responses")
}
}
result
}

fn combine_responses(mut drain: impl Iterator<Item = Message>) -> Result<Message> {
// Take this response as base.
// Then iterate over all remaining combined responses and integrate them into the base.
Expand Down Expand Up @@ -1753,6 +1902,7 @@ impl KafkaSinkCluster {
destination: Destination::ControlConnection,
request,
},
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!("Routing request to control connection");
Expand All @@ -1774,6 +1924,7 @@ impl KafkaSinkCluster {

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
Expand All @@ -1796,6 +1947,7 @@ impl KafkaSinkCluster {

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
tracing::debug!(
Expand Down

0 comments on commit 4e0809a

Please sign in to comment.