From 5ad27fcb36883ea7cd898c4966f3a54b4ff95922 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 21 Aug 2024 16:02:13 +1000 Subject: [PATCH] review feedback --- .../kafka/sink_cluster/connections.rs | 21 ++++++--- .../src/transforms/kafka/sink_cluster/mod.rs | 43 +++++++++++-------- 2 files changed, 41 insertions(+), 23 deletions(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/connections.rs b/shotover/src/transforms/kafka/sink_cluster/connections.rs index 4b665870b..dde3022cf 100644 --- a/shotover/src/transforms/kafka/sink_cluster/connections.rs +++ b/shotover/src/transforms/kafka/sink_cluster/connections.rs @@ -94,14 +94,15 @@ impl Connections { .context("Failed to create a new connection")?; } ConnectionState::AtRiskOfTimeout => { - let old_connection = self.connections.remove(&destination).unwrap(); - if old_connection.old_connection.is_some() { - return Err(anyhow!("Old connection had an old connection")); + let existing_connection = self.connections.remove(&destination).unwrap(); + if existing_connection.old_connection.is_some() { + return Err(anyhow!("The connection to be replaced had an old_connection. For this to occur a response needs to have been pending for longer than the timeout period which indicates other problems.")); } - let old_connection = if old_connection.connection.pending_requests_count() == 0 { + let old_connection = if existing_connection.connection.pending_requests_count() == 0 + { None } else { - Some(old_connection.connection) + Some(existing_connection.connection) }; self.create_and_insert_connection( @@ -171,6 +172,7 @@ impl Connections { recent_instant: Instant, destination: Destination, ) -> ConnectionState { + // TODO: this check is wrong let timeout = if let Some(scram_over_mtls) = authorize_scram_over_mtls { // The delegation token is recreated after `0.5 * delegation_token_lifetime` // Consider what happens when we match that timing for our connection timeout here: @@ -292,6 +294,15 @@ impl KafkaConnection { self.connection.recv().await } } + + pub fn pending_requests_count(&self) -> usize { + self.connection.pending_requests_count() + + self + .old_connection + .as_ref() + .map(|x| x.pending_requests_count()) + .unwrap_or_default() + } } /// default value of kafka broker config connections.max.idle.ms (10 minutes) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index b52801afc..ab2660065 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1043,24 +1043,31 @@ routing message to a random node so that: fn recv_responses(&mut self) -> Result> { // Convert all received PendingRequestTy::Sent into PendingRequestTy::Received for (connection_destination, connection) in &mut self.connections.connections { - self.temp_responses_buffer.clear(); - connection - .try_recv_into(&mut self.temp_responses_buffer) - .with_context(|| format!("Failed to receive from {connection_destination:?}"))?; - for response in self.temp_responses_buffer.drain(..) { - let mut response = Some(response); - for pending_request in &mut self.pending_requests { - if let PendingRequestTy::Sent { destination, index } = &mut pending_request.ty { - if destination == connection_destination { - // Store the PendingRequestTy::Received at the location of the next PendingRequestTy::Sent - // All other PendingRequestTy::Sent need to be decremented, in order to determine the PendingRequestTy::Sent - // to be used next time, and the time after that, and ... - if *index == 0 { - pending_request.ty = PendingRequestTy::Received { - response: response.take().unwrap(), - }; - } else { - *index -= 1; + // skip recv when no pending requests to avoid timeouts on old connections + if connection.pending_requests_count() != 0 { + self.temp_responses_buffer.clear(); + connection + .try_recv_into(&mut self.temp_responses_buffer) + .with_context(|| { + format!("Failed to receive from {connection_destination:?}") + })?; + for response in self.temp_responses_buffer.drain(..) { + let mut response = Some(response); + for pending_request in &mut self.pending_requests { + if let PendingRequestTy::Sent { destination, index } = + &mut pending_request.ty + { + if destination == connection_destination { + // Store the PendingRequestTy::Received at the location of the next PendingRequestTy::Sent + // All other PendingRequestTy::Sent need to be decremented, in order to determine the PendingRequestTy::Sent + // to be used next time, and the time after that, and ... + if *index == 0 { + pending_request.ty = PendingRequestTy::Received { + response: response.take().unwrap(), + }; + } else { + *index -= 1; + } } } }