diff --git a/shotover/src/connection.rs b/shotover/src/connection.rs index 11eb35c24..31116e58c 100644 --- a/shotover/src/connection.rs +++ b/shotover/src/connection.rs @@ -105,18 +105,16 @@ impl SinkConnection { /// Send messages. /// If there is a problem with the connection an error is returned. - pub fn send(&mut self, mut messages: Vec) -> Result<(), SendError> { + pub fn send(&mut self, mut messages: Vec) -> Result<(), ConnectionError> { self.dummy_response_inserter.process_requests(&mut messages); if let Some(error) = &self.error { - Err(SendError::RequestsUnsent(error.clone(), messages)) + Err(error.clone()) } else if let Ok(error) = self.connection_closed_rx.try_recv() { self.error = Some(error.clone()); - Err(SendError::RequestsUnsent(error, messages)) + Err(error) } else { - self.out_tx - .send(messages) - .map_err(|_| SendError::RequestPossiblySent(self.set_get_error())) + self.out_tx.send(messages).map_err(|_| self.set_get_error()) } } @@ -212,20 +210,6 @@ impl SinkConnection { } } -/// This represents an error to the connection encountered while sending requests. -/// The connection is no longer usable after this error is received. -#[derive(thiserror::Error, Debug, Clone)] -pub enum SendError { - /// The error was detected before the requests were sent - /// The request was not received and it can be resent on a new connection if allowed by the protocol. - #[error("An error was detected before the requests were sent: {0}")] - RequestsUnsent(ConnectionError, Vec), - /// The error was detected after the requests were sent. - /// It is unknown if the request was received or not. - #[error("An error was detected after the requests were sent: {0}")] - RequestPossiblySent(ConnectionError), -} - /// This represents an unrecoverable error to the connection. /// The connection is no longer usable after this error is received. #[derive(thiserror::Error, Debug, Clone)] diff --git a/shotover/src/transforms/kafka/sink_cluster/connections.rs b/shotover/src/transforms/kafka/sink_cluster/connections.rs index a89b02e8d..79554871c 100644 --- a/shotover/src/transforms/kafka/sink_cluster/connections.rs +++ b/shotover/src/transforms/kafka/sink_cluster/connections.rs @@ -1,8 +1,8 @@ use crate::{ - connection::{ConnectionError, SendError, SinkConnection}, + connection::{ConnectionError, SinkConnection}, message::Message, }; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use fnv::FnvBuildHasher; use kafka_protocol::{messages::BrokerId, protocol::StrBytes}; use metrics::Counter; @@ -82,6 +82,9 @@ impl Connections { // connection already exists so we can just use it. // however if it has an error we need to recreate it. if let Some(error) = connection.get_error() { + if connection.pending_requests_count() > 0 { + return Err(anyhow!(error).context("get_or_open_connection: Outgoing connection had pending requests, those requests/responses are lost so connection recovery cannot be attempted.")); + } self.create_and_insert_connection( rng, connection_factory, @@ -314,7 +317,7 @@ impl KafkaConnection { /// Send messages. /// If there is a problem with the connection an error is returned. - pub fn send(&mut self, messages: Vec) -> Result<(), SendError> { + pub fn send(&mut self, messages: Vec) -> Result<(), ConnectionError> { match self { KafkaConnection::Regular(c) => c.send(messages), KafkaConnection::ScramOverMtls(c) => c.send(messages), diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs index 8aeaf9dea..6762fea36 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/connection.rs @@ -1,5 +1,5 @@ use crate::{ - connection::{ConnectionError, SendError, SinkConnection}, + connection::{ConnectionError, SinkConnection}, message::Message, transforms::kafka::sink_cluster::connections::ConnectionState, }; @@ -64,7 +64,7 @@ impl ScramOverMtlsConnection { /// Send messages. /// If there is a problem with the connection an error is returned. - pub fn send(&mut self, messages: Vec) -> Result<(), SendError> { + pub fn send(&mut self, messages: Vec) -> Result<(), ConnectionError> { self.connection.send(messages) }