Skip to content

Commit

Permalink
revert SendError
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 17, 2024
1 parent 87f6b0e commit 47fe3ea
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 25 deletions.
24 changes: 4 additions & 20 deletions shotover/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,16 @@ impl SinkConnection {

/// Send messages.
/// If there is a problem with the connection an error is returned.
pub fn send(&mut self, mut messages: Vec<Message>) -> Result<(), SendError> {
pub fn send(&mut self, mut messages: Vec<Message>) -> Result<(), ConnectionError> {
self.dummy_response_inserter.process_requests(&mut messages);

if let Some(error) = &self.error {
Err(SendError::RequestsUnsent(error.clone(), messages))
Err(error.clone())
} else if let Ok(error) = self.connection_closed_rx.try_recv() {
self.error = Some(error.clone());
Err(SendError::RequestsUnsent(error, messages))
Err(error)
} else {
self.out_tx
.send(messages)
.map_err(|_| SendError::RequestPossiblySent(self.set_get_error()))
self.out_tx.send(messages).map_err(|_| self.set_get_error())
}
}

Expand Down Expand Up @@ -212,20 +210,6 @@ impl SinkConnection {
}
}

/// This represents an error to the connection encountered while sending requests.
/// The connection is no longer usable after this error is received.
#[derive(thiserror::Error, Debug, Clone)]
pub enum SendError {
/// The error was detected before the requests were sent
/// The request was not received and it can be resent on a new connection if allowed by the protocol.
#[error("An error was detected before the requests were sent: {0}")]
RequestsUnsent(ConnectionError, Vec<Message>),
/// The error was detected after the requests were sent.
/// It is unknown if the request was received or not.
#[error("An error was detected after the requests were sent: {0}")]
RequestPossiblySent(ConnectionError),
}

/// This represents an unrecoverable error to the connection.
/// The connection is no longer usable after this error is received.
#[derive(thiserror::Error, Debug, Clone)]
Expand Down
9 changes: 6 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/connections.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
connection::{ConnectionError, SendError, SinkConnection},
connection::{ConnectionError, SinkConnection},
message::Message,
};
use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use fnv::FnvBuildHasher;
use kafka_protocol::{messages::BrokerId, protocol::StrBytes};
use metrics::Counter;
Expand Down Expand Up @@ -82,6 +82,9 @@ impl Connections {
// connection already exists so we can just use it.
// however if it has an error we need to recreate it.
if let Some(error) = connection.get_error() {
if connection.pending_requests_count() > 0 {
return Err(anyhow!(error).context("get_or_open_connection: Outgoing connection had pending requests, those requests/responses are lost so connection recovery cannot be attempted."));
}
self.create_and_insert_connection(
rng,
connection_factory,
Expand Down Expand Up @@ -314,7 +317,7 @@ impl KafkaConnection {

/// Send messages.
/// If there is a problem with the connection an error is returned.
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), SendError> {
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), ConnectionError> {
match self {
KafkaConnection::Regular(c) => c.send(messages),
KafkaConnection::ScramOverMtls(c) => c.send(messages),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
connection::{ConnectionError, SendError, SinkConnection},
connection::{ConnectionError, SinkConnection},
message::Message,
transforms::kafka::sink_cluster::connections::ConnectionState,
};
Expand Down Expand Up @@ -64,7 +64,7 @@ impl ScramOverMtlsConnection {

/// Send messages.
/// If there is a problem with the connection an error is returned.
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), SendError> {
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), ConnectionError> {
self.connection.send(messages)
}

Expand Down

0 comments on commit 47fe3ea

Please sign in to comment.