diff --git a/shotover/src/transforms/kafka/sink_cluster/connections.rs b/shotover/src/transforms/kafka/sink_cluster/connections.rs index e9577cd12..68f9f5aa7 100644 --- a/shotover/src/transforms/kafka/sink_cluster/connections.rs +++ b/shotover/src/transforms/kafka/sink_cluster/connections.rs @@ -7,7 +7,7 @@ use fnv::FnvBuildHasher; use kafka_protocol::{messages::BrokerId, protocol::StrBytes}; use metrics::Counter; use rand::{rngs::SmallRng, seq::IteratorRandom}; -use std::{collections::HashMap, sync::atomic::Ordering, time::Instant}; +use std::{collections::HashMap, time::Instant}; use super::{ node::{ConnectionFactory, KafkaAddress, KafkaNode, NodeState}, @@ -33,7 +33,7 @@ pub enum Destination { pub struct Connections { pub connections: HashMap, - control_connection_address: Option, + pub control_connection_address: Option, out_of_rack_requests: Counter, } @@ -143,7 +143,7 @@ impl Connections { // Otherwise fall back to the first contact points let address_from_node = nodes .iter() - .filter(|x| matches!(x.state.load(Ordering::Relaxed), NodeState::Up)) + .filter(|x| x.is_up()) .choose(rng) .map(|x| x.kafka_address.clone()); self.control_connection_address = @@ -225,8 +225,7 @@ impl Connections { } }) .unwrap() - .state - .store(node_state, Ordering::Relaxed); + .set_state(node_state); let connection = connection?; // Recreating the node succeeded. diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index cf1e088bb..a939acc60 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -36,7 +36,7 @@ use scram_over_mtls::{ use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; use std::hash::Hasher; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::RwLock; @@ -420,9 +420,7 @@ impl KafkaSinkCluster { Ok(response) => return Ok(response), // this node also doesnt work, mark as bad and try a new one. Err(err) => { - if self.nodes.iter().all(|x| { - matches!(x.state.load(Ordering::Relaxed), NodeState::Down) - }) { + if self.nodes.iter().all(|x| !x.is_up()) { return Err(err.context("Failed to recreate control connection, no more nodes to retry on. Last node gave error")); } else { tracing::warn!( @@ -1066,8 +1064,8 @@ routing message to a random node so that: } let recent_instant = Instant::now(); - for (destination, requests) in broker_to_routed_requests { - if let Err(err) = self + for (destination, mut requests) in broker_to_routed_requests { + match self .connections .get_or_open_connection( &mut self.rng, @@ -1080,22 +1078,62 @@ routing message to a random node so that: recent_instant, destination, ) - .await? - .send(requests.requests) + .await { - // Dont retry the send on the new connection since we cant tell if the broker received the request or not. - self.connections - .handle_connection_error( - &self.connection_factory, - &self.authorize_scram_over_mtls, - &self.sasl_mechanism, - &self.nodes, - destination, - err.clone().into(), - ) - .await?; - // If we succesfully recreate the outgoing connection we still need to terminate this incoming connection since the request is lost. - return Err(err.into()); + Ok(connection) => { + if let Err(err) = connection.send(requests.requests) { + // Dont retry the send on the new connection since we cant tell if the broker received the request or not. + self.connections + .handle_connection_error( + &self.connection_factory, + &self.authorize_scram_over_mtls, + &self.sasl_mechanism, + &self.nodes, + destination, + err.clone().into(), + ) + .await?; + // If we succesfully recreate the outgoing connection we still need to terminate this incoming connection since the request is lost. + return Err(err.into()); + } + } + Err(err) => { + // set node as down, the connection already failed to create so no point running through handle_connection_error, + // as that will recreate the connection which we already know just failed. + // Instead just directly set the node as down and return the error + + // set node as down + self.nodes + .iter() + .find(|x| match destination { + Destination::Id(id) => x.broker_id == id, + Destination::ControlConnection => { + &x.kafka_address + == self + .connections + .control_connection_address + .as_ref() + .unwrap() + } + }) + .unwrap() + .set_state(NodeState::Down); + + // bubble up error + let request_types: Vec = requests + .requests + .iter_mut() + .map(|x| match x.frame() { + Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => { + format!("{:?}", ApiKey::try_from(header.request_api_key).unwrap()) + } + _ => "Unknown".to_owned(), + }) + .collect(); + return Err(err.context(format!( + "Failed to get connection to send requests {request_types:?}" + ))); + } } } diff --git a/shotover/src/transforms/kafka/sink_cluster/node.rs b/shotover/src/transforms/kafka/sink_cluster/node.rs index d246f33e7..76e1dc471 100644 --- a/shotover/src/transforms/kafka/sink_cluster/node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/node.rs @@ -283,7 +283,7 @@ pub struct KafkaNode { pub broker_id: BrokerId, pub rack: Option, pub kafka_address: KafkaAddress, - pub state: Arc, + state: Arc, } impl KafkaNode { @@ -299,6 +299,10 @@ impl KafkaNode { pub fn is_up(&self) -> bool { self.state.load(Ordering::Relaxed) == NodeState::Up } + + pub fn set_state(&self, state: NodeState) { + self.state.store(state, Ordering::Relaxed) + } } #[atomic_enum]