Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 21, 2024
1 parent 2c14c78 commit 5ad27fc
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 23 deletions.
21 changes: 16 additions & 5 deletions shotover/src/transforms/kafka/sink_cluster/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ impl Connections {
.context("Failed to create a new connection")?;
}
ConnectionState::AtRiskOfTimeout => {
let old_connection = self.connections.remove(&destination).unwrap();
if old_connection.old_connection.is_some() {
return Err(anyhow!("Old connection had an old connection"));
let existing_connection = self.connections.remove(&destination).unwrap();
if existing_connection.old_connection.is_some() {
return Err(anyhow!("The connection to be replaced had an old_connection. For this to occur a response needs to have been pending for longer than the timeout period which indicates other problems."));
}
let old_connection = if old_connection.connection.pending_requests_count() == 0 {
let old_connection = if existing_connection.connection.pending_requests_count() == 0
{
None
} else {
Some(old_connection.connection)
Some(existing_connection.connection)
};

self.create_and_insert_connection(
Expand Down Expand Up @@ -171,6 +172,7 @@ impl Connections {
recent_instant: Instant,
destination: Destination,
) -> ConnectionState {
// TODO: this check is wrong
let timeout = if let Some(scram_over_mtls) = authorize_scram_over_mtls {
// The delegation token is recreated after `0.5 * delegation_token_lifetime`
// Consider what happens when we match that timing for our connection timeout here:
Expand Down Expand Up @@ -292,6 +294,15 @@ impl KafkaConnection {
self.connection.recv().await
}
}

pub fn pending_requests_count(&self) -> usize {
self.connection.pending_requests_count()
+ self
.old_connection
.as_ref()
.map(|x| x.pending_requests_count())
.unwrap_or_default()
}
}

/// default value of kafka broker config connections.max.idle.ms (10 minutes)
Expand Down
43 changes: 25 additions & 18 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,24 +1043,31 @@ routing message to a random node so that:
fn recv_responses(&mut self) -> Result<Vec<Message>> {
// Convert all received PendingRequestTy::Sent into PendingRequestTy::Received
for (connection_destination, connection) in &mut self.connections.connections {
self.temp_responses_buffer.clear();
connection
.try_recv_into(&mut self.temp_responses_buffer)
.with_context(|| format!("Failed to receive from {connection_destination:?}"))?;
for response in self.temp_responses_buffer.drain(..) {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestTy::Sent { destination, index } = &mut pending_request.ty {
if destination == connection_destination {
// Store the PendingRequestTy::Received at the location of the next PendingRequestTy::Sent
// All other PendingRequestTy::Sent need to be decremented, in order to determine the PendingRequestTy::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.ty = PendingRequestTy::Received {
response: response.take().unwrap(),
};
} else {
*index -= 1;
// skip recv when no pending requests to avoid timeouts on old connections
if connection.pending_requests_count() != 0 {
self.temp_responses_buffer.clear();
connection
.try_recv_into(&mut self.temp_responses_buffer)
.with_context(|| {
format!("Failed to receive from {connection_destination:?}")
})?;
for response in self.temp_responses_buffer.drain(..) {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestTy::Sent { destination, index } =
&mut pending_request.ty
{
if destination == connection_destination {
// Store the PendingRequestTy::Received at the location of the next PendingRequestTy::Sent
// All other PendingRequestTy::Sent need to be decremented, in order to determine the PendingRequestTy::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.ty = PendingRequestTy::Received {
response: response.take().unwrap(),
};
} else {
*index -= 1;
}
}
}
}
Expand Down

0 comments on commit 5ad27fc

Please sign in to comment.