diff --git a/shotover/src/connection.rs b/shotover/src/connection.rs index 11eb35c24..31116e58c 100644 --- a/shotover/src/connection.rs +++ b/shotover/src/connection.rs @@ -105,18 +105,16 @@ impl SinkConnection { /// Send messages. /// If there is a problem with the connection an error is returned. - pub fn send(&mut self, mut messages: Vec) -> Result<(), SendError> { + pub fn send(&mut self, mut messages: Vec) -> Result<(), ConnectionError> { self.dummy_response_inserter.process_requests(&mut messages); if let Some(error) = &self.error { - Err(SendError::RequestsUnsent(error.clone(), messages)) + Err(error.clone()) } else if let Ok(error) = self.connection_closed_rx.try_recv() { self.error = Some(error.clone()); - Err(SendError::RequestsUnsent(error, messages)) + Err(error) } else { - self.out_tx - .send(messages) - .map_err(|_| SendError::RequestPossiblySent(self.set_get_error())) + self.out_tx.send(messages).map_err(|_| self.set_get_error()) } } @@ -212,20 +210,6 @@ impl SinkConnection { } } -/// This represents an error to the connection encountered while sending requests. -/// The connection is no longer usable after this error is received. -#[derive(thiserror::Error, Debug, Clone)] -pub enum SendError { - /// The error was detected before the requests were sent - /// The request was not received and it can be resent on a new connection if allowed by the protocol. - #[error("An error was detected before the requests were sent: {0}")] - RequestsUnsent(ConnectionError, Vec), - /// The error was detected after the requests were sent. - /// It is unknown if the request was received or not. - #[error("An error was detected after the requests were sent: {0}")] - RequestPossiblySent(ConnectionError), -} - /// This represents an unrecoverable error to the connection. /// The connection is no longer usable after this error is received. #[derive(thiserror::Error, Debug, Clone)] diff --git a/shotover/src/transforms/kafka/sink_cluster/connections.rs b/shotover/src/transforms/kafka/sink_cluster/connections.rs index a89b02e8d..2ca2895f4 100644 --- a/shotover/src/transforms/kafka/sink_cluster/connections.rs +++ b/shotover/src/transforms/kafka/sink_cluster/connections.rs @@ -1,5 +1,5 @@ use crate::{ - connection::{ConnectionError, SendError, SinkConnection}, + connection::{ConnectionError, SinkConnection}, message::Message, }; use anyhow::{Context, Result}; @@ -314,7 +314,7 @@ impl KafkaConnection { /// Send messages. /// If there is a problem with the connection an error is returned. - pub fn send(&mut self, messages: Vec) -> Result<(), SendError> { + pub fn send(&mut self, messages: Vec) -> Result<(), ConnectionError> { match self { KafkaConnection::Regular(c) => c.send(messages), KafkaConnection::ScramOverMtls(c) => c.send(messages), diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs index 8aeaf9dea..6762fea36 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs @@ -1,5 +1,5 @@ use crate::{ - connection::{ConnectionError, SendError, SinkConnection}, + connection::{ConnectionError, SinkConnection}, message::Message, transforms::kafka::sink_cluster::connections::ConnectionState, }; @@ -64,7 +64,7 @@ impl ScramOverMtlsConnection { /// Send messages. /// If there is a problem with the connection an error is returned. - pub fn send(&mut self, messages: Vec) -> Result<(), SendError> { + pub fn send(&mut self, messages: Vec) -> Result<(), ConnectionError> { self.connection.send(messages) }