Skip to content

Commit

Permalink
revert precheck error in send method
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 17, 2024
1 parent 47fe3ea commit 5fa3428
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 6 deletions.
5 changes: 1 addition & 4 deletions shotover/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl SinkConnection {
self.error.clone().unwrap()
}

pub fn get_error(&mut self) -> Option<ConnectionError> {
pub fn try_set_get_error(&mut self) -> Option<ConnectionError> {
if self.error.is_none() {
self.error = self.connection_closed_rx.try_recv().ok();
}
Expand All @@ -110,9 +110,6 @@ impl SinkConnection {

if let Some(error) = &self.error {
Err(error.clone())
} else if let Ok(error) = self.connection_closed_rx.try_recv() {
self.error = Some(error.clone());
Err(error)
} else {
self.out_tx.send(messages).map_err(|_| self.set_get_error())
}
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/transforms/kafka/sink_cluster/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl KafkaConnection {

pub fn get_error(&mut self) -> Option<ConnectionError> {
match self {
KafkaConnection::Regular(c) => c.get_error(),
KafkaConnection::Regular(c) => c.try_set_get_error(),
KafkaConnection::ScramOverMtls(c) => c.get_error(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl ScramOverMtlsConnection {
}

pub fn get_error(&mut self) -> Option<ConnectionError> {
self.connection.get_error()
self.connection.try_set_get_error()
}

pub fn pending_requests_count(&self) -> usize {
Expand Down

0 comments on commit 5fa3428

Please sign in to comment.