diff --git a/shotover/src/transforms/kafka/sink_cluster/connections.rs b/shotover/src/transforms/kafka/sink_cluster/connections.rs index e9577cd12..68f9f5aa7 100644 --- a/shotover/src/transforms/kafka/sink_cluster/connections.rs +++ b/shotover/src/transforms/kafka/sink_cluster/connections.rs @@ -7,7 +7,7 @@ use fnv::FnvBuildHasher; use kafka_protocol::{messages::BrokerId, protocol::StrBytes}; use metrics::Counter; use rand::{rngs::SmallRng, seq::IteratorRandom}; -use std::{collections::HashMap, sync::atomic::Ordering, time::Instant}; +use std::{collections::HashMap, time::Instant}; use super::{ node::{ConnectionFactory, KafkaAddress, KafkaNode, NodeState}, @@ -33,7 +33,7 @@ pub enum Destination { pub struct Connections { pub connections: HashMap, - control_connection_address: Option, + pub control_connection_address: Option, out_of_rack_requests: Counter, } @@ -143,7 +143,7 @@ impl Connections { // Otherwise fall back to the first contact points let address_from_node = nodes .iter() - .filter(|x| matches!(x.state.load(Ordering::Relaxed), NodeState::Up)) + .filter(|x| x.is_up()) .choose(rng) .map(|x| x.kafka_address.clone()); self.control_connection_address = @@ -225,8 +225,7 @@ impl Connections { } }) .unwrap() - .state - .store(node_state, Ordering::Relaxed); + .set_state(node_state); let connection = connection?; // Recreating the node succeeded. diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 9c190fec3..a939acc60 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -36,7 +36,7 @@ use scram_over_mtls::{ use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; use std::hash::Hasher; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::RwLock; @@ -420,9 +420,7 @@ impl KafkaSinkCluster { Ok(response) => return Ok(response), // this node also doesnt work, mark as bad and try a new one. Err(err) => { - if self.nodes.iter().all(|x| { - matches!(x.state.load(Ordering::Relaxed), NodeState::Down) - }) { + if self.nodes.iter().all(|x| !x.is_up()) { return Err(err.context("Failed to recreate control connection, no more nodes to retry on. Last node gave error")); } else { tracing::warn!( @@ -1100,6 +1098,28 @@ routing message to a random node so that: } } Err(err) => { + // set node as down, the connection already failed to create so no point running through handle_connection_error, + // as that will recreate the connection which we already know just failed. + // Instead just directly set the node as down and return the error + + // set node as down + self.nodes + .iter() + .find(|x| match destination { + Destination::Id(id) => x.broker_id == id, + Destination::ControlConnection => { + &x.kafka_address + == self + .connections + .control_connection_address + .as_ref() + .unwrap() + } + }) + .unwrap() + .set_state(NodeState::Down); + + // bubble up error let request_types: Vec = requests .requests .iter_mut() @@ -1110,23 +1130,9 @@ routing message to a random node so that: _ => "Unknown".to_owned(), }) .collect(); - let err = err.context(format!( + return Err(err.context(format!( "Failed to get connection to send requests {request_types:?}" - )); - self.connections - .handle_connection_error( - &self.connection_factory, - &self.authorize_scram_over_mtls, - &self.sasl_mechanism, - &self.nodes, - destination, - err, - ) - .await?; - // TODO: we could in theory retry here, if the connection is succesfully recreated, instead of always returning an error - return Err(anyhow!( - "Failed to create a connection and retrying other nodes not yet supported" - )); + ))); } } } diff --git a/shotover/src/transforms/kafka/sink_cluster/node.rs b/shotover/src/transforms/kafka/sink_cluster/node.rs index d246f33e7..76e1dc471 100644 --- a/shotover/src/transforms/kafka/sink_cluster/node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/node.rs @@ -283,7 +283,7 @@ pub struct KafkaNode { pub broker_id: BrokerId, pub rack: Option, pub kafka_address: KafkaAddress, - pub state: Arc, + state: Arc, } impl KafkaNode { @@ -299,6 +299,10 @@ impl KafkaNode { pub fn is_up(&self) -> bool { self.state.load(Ordering::Relaxed) == NodeState::Up } + + pub fn set_state(&self, state: NodeState) { + self.state.store(state, Ordering::Relaxed) + } } #[atomic_enum]