Skip to content

Commit

Permalink
Fix setting node to down during send_requests (#1744)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Sep 5, 2024
1 parent 661712b commit b5c98c9
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 27 deletions.
9 changes: 4 additions & 5 deletions shotover/src/transforms/kafka/sink_cluster/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use fnv::FnvBuildHasher;
use kafka_protocol::{messages::BrokerId, protocol::StrBytes};
use metrics::Counter;
use rand::{rngs::SmallRng, seq::IteratorRandom};
use std::{collections::HashMap, sync::atomic::Ordering, time::Instant};
use std::{collections::HashMap, time::Instant};

use super::{
node::{ConnectionFactory, KafkaAddress, KafkaNode, NodeState},
Expand All @@ -33,7 +33,7 @@ pub enum Destination {

pub struct Connections {
pub connections: HashMap<Destination, KafkaConnection, FnvBuildHasher>,
control_connection_address: Option<KafkaAddress>,
pub control_connection_address: Option<KafkaAddress>,
out_of_rack_requests: Counter,
}

Expand Down Expand Up @@ -143,7 +143,7 @@ impl Connections {
// Otherwise fall back to the first contact points
let address_from_node = nodes
.iter()
.filter(|x| matches!(x.state.load(Ordering::Relaxed), NodeState::Up))
.filter(|x| x.is_up())
.choose(rng)
.map(|x| x.kafka_address.clone());
self.control_connection_address =
Expand Down Expand Up @@ -225,8 +225,7 @@ impl Connections {
}
})
.unwrap()
.state
.store(node_state, Ordering::Relaxed);
.set_state(node_state);
let connection = connection?;

// Recreating the node succeeded.
Expand Down
80 changes: 59 additions & 21 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use scram_over_mtls::{
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::hash::Hasher;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -420,9 +420,7 @@ impl KafkaSinkCluster {
Ok(response) => return Ok(response),
// this node also doesnt work, mark as bad and try a new one.
Err(err) => {
if self.nodes.iter().all(|x| {
matches!(x.state.load(Ordering::Relaxed), NodeState::Down)
}) {
if self.nodes.iter().all(|x| !x.is_up()) {
return Err(err.context("Failed to recreate control connection, no more nodes to retry on. Last node gave error"));
} else {
tracing::warn!(
Expand Down Expand Up @@ -1066,8 +1064,8 @@ routing message to a random node so that:
}

let recent_instant = Instant::now();
for (destination, requests) in broker_to_routed_requests {
if let Err(err) = self
for (destination, mut requests) in broker_to_routed_requests {
match self
.connections
.get_or_open_connection(
&mut self.rng,
Expand All @@ -1080,22 +1078,62 @@ routing message to a random node so that:
recent_instant,
destination,
)
.await?
.send(requests.requests)
.await
{
// Dont retry the send on the new connection since we cant tell if the broker received the request or not.
self.connections
.handle_connection_error(
&self.connection_factory,
&self.authorize_scram_over_mtls,
&self.sasl_mechanism,
&self.nodes,
destination,
err.clone().into(),
)
.await?;
// If we succesfully recreate the outgoing connection we still need to terminate this incoming connection since the request is lost.
return Err(err.into());
Ok(connection) => {
if let Err(err) = connection.send(requests.requests) {
// Dont retry the send on the new connection since we cant tell if the broker received the request or not.
self.connections
.handle_connection_error(
&self.connection_factory,
&self.authorize_scram_over_mtls,
&self.sasl_mechanism,
&self.nodes,
destination,
err.clone().into(),
)
.await?;
// If we succesfully recreate the outgoing connection we still need to terminate this incoming connection since the request is lost.
return Err(err.into());
}
}
Err(err) => {
// set node as down, the connection already failed to create so no point running through handle_connection_error,
// as that will recreate the connection which we already know just failed.
// Instead just directly set the node as down and return the error

// set node as down
self.nodes
.iter()
.find(|x| match destination {
Destination::Id(id) => x.broker_id == id,
Destination::ControlConnection => {
&x.kafka_address
== self
.connections
.control_connection_address
.as_ref()
.unwrap()
}
})
.unwrap()
.set_state(NodeState::Down);

// bubble up error
let request_types: Vec<String> = requests
.requests
.iter_mut()
.map(|x| match x.frame() {
Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => {
format!("{:?}", ApiKey::try_from(header.request_api_key).unwrap())
}
_ => "Unknown".to_owned(),
})
.collect();
return Err(err.context(format!(
"Failed to get connection to send requests {request_types:?}"
)));
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ pub struct KafkaNode {
pub broker_id: BrokerId,
pub rack: Option<StrBytes>,
pub kafka_address: KafkaAddress,
pub state: Arc<AtomicNodeState>,
state: Arc<AtomicNodeState>,
}

impl KafkaNode {
Expand All @@ -299,6 +299,10 @@ impl KafkaNode {
pub fn is_up(&self) -> bool {
self.state.load(Ordering::Relaxed) == NodeState::Up
}

pub fn set_state(&self, state: NodeState) {
self.state.store(state, Ordering::Relaxed)
}
}

#[atomic_enum]
Expand Down

0 comments on commit b5c98c9

Please sign in to comment.