From 5fa342806b4215121e06e99bf3b8d34e50ea92a1 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 17 Sep 2024 12:56:28 +1000 Subject: [PATCH] revert precheck error in send method --- shotover/src/connection.rs | 5 +---- shotover/src/transforms/kafka/sink_cluster/connections.rs | 2 +- .../kafka/sink_cluster/scram_over_mtls/connection.rs | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/shotover/src/connection.rs b/shotover/src/connection.rs index 31116e58c..d601ec38d 100644 --- a/shotover/src/connection.rs +++ b/shotover/src/connection.rs @@ -96,7 +96,7 @@ impl SinkConnection { self.error.clone().unwrap() } - pub fn get_error(&mut self) -> Option { + pub fn try_set_get_error(&mut self) -> Option { if self.error.is_none() { self.error = self.connection_closed_rx.try_recv().ok(); } @@ -110,9 +110,6 @@ impl SinkConnection { if let Some(error) = &self.error { Err(error.clone()) - } else if let Ok(error) = self.connection_closed_rx.try_recv() { - self.error = Some(error.clone()); - Err(error) } else { self.out_tx.send(messages).map_err(|_| self.set_get_error()) } diff --git a/shotover/src/transforms/kafka/sink_cluster/connections.rs b/shotover/src/transforms/kafka/sink_cluster/connections.rs index 79554871c..a9e7a67b6 100644 --- a/shotover/src/transforms/kafka/sink_cluster/connections.rs +++ b/shotover/src/transforms/kafka/sink_cluster/connections.rs @@ -335,7 +335,7 @@ impl KafkaConnection { pub fn get_error(&mut self) -> Option { match self { - KafkaConnection::Regular(c) => c.get_error(), + KafkaConnection::Regular(c) => c.try_set_get_error(), KafkaConnection::ScramOverMtls(c) => c.get_error(), } } diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs index 6762fea36..ab5391525 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs @@ -87,7 +87,7 @@ impl ScramOverMtlsConnection { } pub fn get_error(&mut self) -> Option { - self.connection.get_error() + self.connection.try_set_get_error() } pub fn pending_requests_count(&self) -> usize {