diff --git a/shotover/src/transforms/kafka/sink_cluster/connections.rs b/shotover/src/transforms/kafka/sink_cluster/connections.rs index d1097503f..5ad78ca58 100644 --- a/shotover/src/transforms/kafka/sink_cluster/connections.rs +++ b/shotover/src/transforms/kafka/sink_cluster/connections.rs @@ -3,7 +3,10 @@ use anyhow::{Context, Result}; use fnv::FnvBuildHasher; use kafka_protocol::{messages::BrokerId, protocol::StrBytes}; use metrics::Counter; -use rand::{rngs::SmallRng, seq::SliceRandom}; +use rand::{ + rngs::SmallRng, + seq::{IteratorRandom, SliceRandom}, +}; use std::{collections::HashMap, sync::atomic::Ordering}; use super::{ @@ -76,10 +79,15 @@ impl Connections { let address = match &node { Some(node) => &node.kafka_address, None => { - // TODO: filter out down nodes - // if no up nodes, return error. - // do it in this PR. - self.control_connection_address = contact_points.choose(rng).cloned(); + // If we have a node in the nodes list that is up use its address. + // Otherwise fall back to the first contact points + let address_from_node = nodes + .iter() + .filter(|x| matches!(x.state.load(Ordering::Relaxed), NodeState::Up)) + .choose(rng) + .map(|x| x.kafka_address.clone()); + self.control_connection_address = + address_from_node.or_else(|| contact_points.iter().choose(rng).cloned()); self.control_connection_address.as_ref().unwrap() } }; diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 45dbedadb..350d82aef 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1080,8 +1080,7 @@ routing message to a random node so that: .await? .send(requests.requests) { - // Attempt to reopen connection for the side effect of setting node state to down. - // Dont actually use the connection though since we cant resend a failed a request. + // Dont retry the send on the new connection since we cant tell if the broker received the request or not. self.connections .handle_connection_error( &mut self.rng, @@ -1106,7 +1105,7 @@ routing message to a random node so that: /// For response ordering reasons, some responses will remain in self.pending_requests until other responses are received. async fn recv_responses(&mut self) -> Result> { // Convert all received PendingRequestTy::Sent into PendingRequestTy::Received - let mut connections_to_reopen = vec![]; + let mut connection_errors = vec![]; for (connection_destination, connection) in &mut self.connections.connections { self.temp_responses_buffer.clear(); match connection.try_recv_into(&mut self.temp_responses_buffer) { @@ -1133,15 +1132,11 @@ routing message to a random node so that: } } } - Err(err) => connections_to_reopen.push((*connection_destination, err)), + Err(err) => connection_errors.push((*connection_destination, err)), } } - for (destination, err) in connections_to_reopen { - // Attempt to reopen connection for the side effects of: - // * setting node state to down - // * removing connections to down nodes so we dont continue attempting to receive from it. - // We do not attempt to receive from the node again, since if there actually were any pending responses we would need to give up. + for (destination, err) in connection_errors { self.connections .handle_connection_error( &mut self.rng,