Skip to content

Commit

Permalink
revert SendError
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 17, 2024
1 parent b8dd004 commit c3f2530
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 24 deletions.
24 changes: 4 additions & 20 deletions shotover/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,16 @@ impl SinkConnection {

/// Send messages.
/// If there is a problem with the connection an error is returned.
pub fn send(&mut self, mut messages: Vec<Message>) -> Result<(), SendError> {
pub fn send(&mut self, mut messages: Vec<Message>) -> Result<(), ConnectionError> {
self.dummy_response_inserter.process_requests(&mut messages);

if let Some(error) = &self.error {
Err(SendError::RequestsUnsent(error.clone(), messages))
Err(error.clone())
} else if let Ok(error) = self.connection_closed_rx.try_recv() {
self.error = Some(error.clone());
Err(SendError::RequestsUnsent(error, messages))
Err(error)
} else {
self.out_tx
.send(messages)
.map_err(|_| SendError::RequestPossiblySent(self.set_get_error()))
self.out_tx.send(messages).map_err(|_| self.set_get_error())
}
}

Expand Down Expand Up @@ -212,20 +210,6 @@ impl SinkConnection {
}
}

/// This represents an error to the connection encountered while sending requests.
/// The connection is no longer usable after this error is received.
#[derive(thiserror::Error, Debug, Clone)]
pub enum SendError {
/// The error was detected before the requests were sent
/// The request was not received and it can be resent on a new connection if allowed by the protocol.
#[error("An error was detected before the requests were sent: {0}")]
RequestsUnsent(ConnectionError, Vec<Message>),
/// The error was detected after the requests were sent.
/// It is unknown if the request was received or not.
#[error("An error was detected after the requests were sent: {0}")]
RequestPossiblySent(ConnectionError),
}

/// This represents an unrecoverable error to the connection.
/// The connection is no longer usable after this error is received.
#[derive(thiserror::Error, Debug, Clone)]
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/transforms/kafka/sink_cluster/connections.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
connection::{ConnectionError, SendError, SinkConnection},
connection::{ConnectionError, SinkConnection},
message::Message,
};
use anyhow::{Context, Result};
Expand Down Expand Up @@ -314,7 +314,7 @@ impl KafkaConnection {

/// Send messages.
/// If there is a problem with the connection an error is returned.
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), SendError> {
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), ConnectionError> {
match self {
KafkaConnection::Regular(c) => c.send(messages),
KafkaConnection::ScramOverMtls(c) => c.send(messages),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
connection::{ConnectionError, SendError, SinkConnection},
connection::{ConnectionError, SinkConnection},
message::Message,
transforms::kafka::sink_cluster::connections::ConnectionState,
};
Expand Down Expand Up @@ -64,7 +64,7 @@ impl ScramOverMtlsConnection {

/// Send messages.
/// If there is a problem with the connection an error is returned.
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), SendError> {
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), ConnectionError> {
self.connection.send(messages)
}

Expand Down

0 comments on commit c3f2530

Please sign in to comment.