Skip to content

Commit

Permalink
catch closed connections during send
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 17, 2024
1 parent 4cfa26c commit b8dd004
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 8 deletions.
32 changes: 29 additions & 3 deletions shotover/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,27 @@ impl SinkConnection {
self.error.clone().unwrap()
}

pub fn get_error(&mut self) -> Option<ConnectionError> {
if self.error.is_none() {
self.error = self.connection_closed_rx.try_recv().ok();
}
self.error.clone()
}

/// Send messages.
/// If there is a problem with the connection an error is returned.
pub fn send(&mut self, mut messages: Vec<Message>) -> Result<(), ConnectionError> {
pub fn send(&mut self, mut messages: Vec<Message>) -> Result<(), SendError> {
self.dummy_response_inserter.process_requests(&mut messages);

if let Some(error) = &self.error {
Err(error.clone())
Err(SendError::RequestsUnsent(error.clone(), messages))
} else if let Ok(error) = self.connection_closed_rx.try_recv() {
self.error = Some(error.clone());
Err(SendError::RequestsUnsent(error, messages))
} else {
self.out_tx.send(messages).map_err(|_| self.set_get_error())
self.out_tx
.send(messages)
.map_err(|_| SendError::RequestPossiblySent(self.set_get_error()))
}
}

Expand Down Expand Up @@ -200,6 +212,20 @@ impl SinkConnection {
}
}

/// This represents an error to the connection encountered while sending requests.
/// The connection is no longer usable after this error is received.
#[derive(thiserror::Error, Debug, Clone)]
pub enum SendError {
/// The error was detected before the requests were sent
/// The request was not received and it can be resent on a new connection if allowed by the protocol.
#[error("An error was detected before the requests were sent: {0}")]
RequestsUnsent(ConnectionError, Vec<Message>),
/// The error was detected after the requests were sent.
/// It is unknown if the request was received or not.
#[error("An error was detected after the requests were sent: {0}")]
RequestPossiblySent(ConnectionError),
}

/// This represents an unrecoverable error to the connection.
/// The connection is no longer usable after this error is received.
#[derive(thiserror::Error, Debug, Clone)]
Expand Down
33 changes: 30 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/connections.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
connection::{ConnectionError, SinkConnection},
connection::{ConnectionError, SendError, SinkConnection},
message::Message,
};
use anyhow::{Context, Result};
Expand Down Expand Up @@ -78,7 +78,27 @@ impl Connections {

match self.get_connection_state(recent_instant, destination) {
ConnectionState::Open => {
// connection already open
let connection = self.connections.get_mut(&destination).unwrap();
// connection already exists so we can just use it.
// however if it has an error we need to recreate it.
if let Some(error) = connection.get_error() {
self.create_and_insert_connection(
rng,
connection_factory,
authorize_scram_over_mtls,
sasl_mechanism,
nodes,
node,
contact_points,
None,
destination,
)
.await
.with_context(|| {
format!("Failed to recreate connection after encountering error {error:?}")
})?;
tracing::info!("Recreated connection after it hit error {error:?}")
}
}
ConnectionState::Unopened => {
self.create_and_insert_connection(
Expand Down Expand Up @@ -294,7 +314,7 @@ impl KafkaConnection {

/// Send messages.
/// If there is a problem with the connection an error is returned.
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), ConnectionError> {
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), SendError> {
match self {
KafkaConnection::Regular(c) => c.send(messages),
KafkaConnection::ScramOverMtls(c) => c.send(messages),
Expand All @@ -310,6 +330,13 @@ impl KafkaConnection {
}
}

pub fn get_error(&mut self) -> Option<ConnectionError> {
match self {
KafkaConnection::Regular(c) => c.get_error(),
KafkaConnection::ScramOverMtls(c) => c.get_error(),
}
}

/// Number of requests waiting on a response.
/// The count includes requests that will have a dummy response generated by shotover.
pub fn pending_requests_count(&self) -> usize {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
connection::{ConnectionError, SinkConnection},
connection::{ConnectionError, SendError, SinkConnection},
message::Message,
transforms::kafka::sink_cluster::connections::ConnectionState,
};
Expand Down Expand Up @@ -64,7 +64,7 @@ impl ScramOverMtlsConnection {

/// Send messages.
/// If there is a problem with the connection an error is returned.
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), ConnectionError> {
pub fn send(&mut self, messages: Vec<Message>) -> Result<(), SendError> {
self.connection.send(messages)
}

Expand All @@ -86,6 +86,10 @@ impl ScramOverMtlsConnection {
}
}

pub fn get_error(&mut self) -> Option<ConnectionError> {
self.connection.get_error()
}

pub fn pending_requests_count(&self) -> usize {
self.connection.pending_requests_count()
+ self
Expand Down

0 comments on commit b8dd004

Please sign in to comment.