Skip to content

Commit

Permalink
KafkaSinkCluster - set KafkaNode::state
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 6, 2024
1 parent ff09b14 commit 96c6abc
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 35 deletions.
93 changes: 82 additions & 11 deletions shotover/src/transforms/kafka/sink_cluster/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use fnv::FnvBuildHasher;
use kafka_protocol::{messages::BrokerId, protocol::StrBytes};
use metrics::Counter;
use rand::{rngs::SmallRng, seq::SliceRandom};
use std::collections::HashMap;
use std::{collections::HashMap, sync::atomic::Ordering};

use super::{
node::{ConnectionFactory, KafkaAddress, KafkaNode},
node::{ConnectionFactory, KafkaAddress, KafkaNode, NodeState},
scram_over_mtls::AuthorizeScramOverMtls,
};

Expand All @@ -29,17 +29,21 @@ pub enum Destination {

pub struct Connections {
pub connections: HashMap<Destination, SinkConnection, FnvBuildHasher>,
control_connection_address: Option<KafkaAddress>,
out_of_rack_requests: Counter,
}

impl Connections {
pub fn new(out_of_rack_requests: Counter) -> Self {
Self {
connections: Default::default(),
control_connection_address: None,
out_of_rack_requests,
}
}

/// If a connection already exists for the requested Destination return it.
/// Otherwise create a new connection, cache it and return it.
#[allow(clippy::too_many_arguments)]
pub async fn get_or_open_connection(
&mut self,
Expand All @@ -66,23 +70,90 @@ impl Connections {
self.out_of_rack_requests.increment(1);
}
}

// map entry API can not be used with async
#[allow(clippy::map_entry)]
if !self.connections.contains_key(&destination) {
let address = match &node {
Some(node) => &node.kafka_address,
None => contact_points.choose(rng).unwrap(),
None => {
// TODO: filter out down nodes
// if no up nodes, return error.
// do it in this PR.
self.control_connection_address = contact_points.choose(rng).cloned();
self.control_connection_address.as_ref().unwrap()
}
};

self.connections.insert(
destination,
connection_factory
.create_connection(address, authorize_scram_over_mtls, sasl_mechanism)
.await
.context("Failed to create a new connection")?,
);
let connection = connection_factory
.create_connection(address, authorize_scram_over_mtls, sasl_mechanism)
.await
.context("Failed to create a new connection")?;
self.connections.insert(destination, connection);
}
Ok(self.connections.get_mut(&destination).unwrap())
}

/// Open a new connection to the requested Destination and return it.
/// Any existing cached connection is overwritten by the new one.
#[allow(clippy::too_many_arguments)]
pub async fn handle_connection_error(
&mut self,
rng: &mut SmallRng,
connection_factory: &ConnectionFactory,
authorize_scram_over_mtls: &Option<AuthorizeScramOverMtls>,
sasl_mechanism: &Option<String>,
nodes: &[KafkaNode],
contact_points: &[KafkaAddress],
destination: Destination,
error: anyhow::Error,
) -> Result<()> {
let address = match destination {
Destination::Id(id) => {
&nodes
.iter()
.find(|x| x.broker_id == id)
.unwrap()
.kafka_address
}
Destination::ControlConnection => contact_points.choose(rng).unwrap(),
};

let connection = connection_factory
.create_connection(address, authorize_scram_over_mtls, sasl_mechanism)
.await
.context("Failed to create a new connection");

match connection {
Ok(connection) => {
// Recreating the node succeeded.
// So cache it and return it, as long as we werent waiting on any responses in the old connection
let old = self.connections.insert(destination, connection);

if old.map(|old| old.pending_requests_count()).unwrap_or(0) > 0 {
Err(error.context("Succesfully reopened outgoing connection but previous outgoing connection had pending requests."))
} else {
Ok(())
}
}
Err(err) => {
// Recreating the node failed.
// So update the metadata and connection so we dont attempt to connect to it again,
// and then return the error
nodes
.iter()
.find(|x| match destination {
Destination::Id(id) => x.broker_id == id,
Destination::ControlConnection => {
&x.kafka_address == self.control_connection_address.as_ref().unwrap()
}
})
.unwrap()
.state
.store(NodeState::Down, Ordering::Relaxed);

self.connections.remove(&destination);
Err(err)
}
}
}
}
139 changes: 116 additions & 23 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use kafka_protocol::messages::{
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
use metrics::{counter, Counter};
use node::{ConnectionFactory, KafkaAddress, KafkaNode};
use node::{ConnectionFactory, KafkaAddress, KafkaNode, NodeState};
use rand::rngs::SmallRng;
use rand::seq::{IteratorRandom, SliceRandom};
use rand::SeedableRng;
Expand All @@ -36,7 +36,7 @@ use scram_over_mtls::{
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::hash::Hasher;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -346,6 +346,7 @@ impl Transform for KafkaSinkCluster {
let mut responses = if requests_wrapper.requests.is_empty() {
// there are no requests, so no point sending any, but we should check for any responses without awaiting
self.recv_responses()
.await
.context("Failed to receive responses (without sending requests)")?
} else {
self.update_local_nodes().await;
Expand All @@ -372,6 +373,7 @@ impl Transform for KafkaSinkCluster {
.context("Failed to route requests")?;
self.send_requests().await?;
self.recv_responses()
.await
.context("Failed to receive responses")?
};

Expand All @@ -385,7 +387,56 @@ impl Transform for KafkaSinkCluster {
impl KafkaSinkCluster {
/// Send a request over the control connection and immediately receive the response.
/// Since we always await the response we know for sure that the response will not get mixed up with any other incoming responses.
async fn control_send_receive(&mut self, requests: Message) -> Result<Message> {
async fn control_send_receive(&mut self, request: Message) -> Result<Message> {
match self.control_send_receive_inner(request.clone()).await {
Ok(response) => Ok(response),
Err(err) => {
// first retry on the same connection in case it was a timeout
match self
.connections
.handle_connection_error(
&mut self.rng,
&self.connection_factory,
&self.authorize_scram_over_mtls,
&self.sasl_mechanism,
&self.nodes,
&self.first_contact_points,
Destination::ControlConnection,
err,
)
.await
{
// connection recreated, retry on the original node
// if the request fails at this point its a bad request.
Ok(()) => self.control_send_receive_inner(request.clone()).await,
// connection failed, could be a bad node, retry on all known nodes
Err(err) => {
tracing::warn!("Failed to recreate original control connection {err:?}");
loop {
match self.control_send_receive_inner(request.clone()).await {
// found a new control node that works
Ok(response) => return Ok(response),
Err(err) => {
if self.nodes.iter().all(|x| {
matches!(x.state.load(Ordering::Relaxed), NodeState::Down)
}) {
return Err(err.context("Failed to recreate control connection, no more nodes to retry on. Last node gave error"));
} else {
tracing::warn!(
"Failed to recreate control connection against a new node {err:?}"
);
// try another node
}
}
}
}
}
}
}
}
}

async fn control_send_receive_inner(&mut self, requests: Message) -> Result<Message> {
assert!(
self.auth_complete,
"control_send_receive cannot be called until auth is complete. Otherwise it would collide with the control connection being used for regular routing."
Expand Down Expand Up @@ -1014,7 +1065,8 @@ routing message to a random node so that:
}

for (destination, requests) in broker_to_routed_requests {
self.connections
if let Err(err) = self
.connections
.get_or_open_connection(
&mut self.rng,
&self.connection_factory,
Expand All @@ -1026,43 +1078,84 @@ routing message to a random node so that:
destination,
)
.await?
.send(requests.requests)?;
.send(requests.requests)
{
// Attempt to reopen connection for the side effect of setting node state to down.
// Dont actually use the connection though since we cant resend a failed a request.
self.connections
.handle_connection_error(
&mut self.rng,
&self.connection_factory,
&self.authorize_scram_over_mtls,
&self.sasl_mechanism,
&self.nodes,
&self.first_contact_points,
destination,
err.clone().into(),
)
.await?;
// If we succesfully recreate the outgoing connection we still need to terminate this incoming connection since the request is lost.
return Err(err.into());
}
}

Ok(())
}

/// Receive all responses from the outgoing connections, returns all responses that are ready to be returned.
/// For response ordering reasons, some responses will remain in self.pending_requests until other responses are received.
fn recv_responses(&mut self) -> Result<Vec<Message>> {
async fn recv_responses(&mut self) -> Result<Vec<Message>> {
// Convert all received PendingRequestTy::Sent into PendingRequestTy::Received
let mut connections_to_reopen = vec![];
for (connection_destination, connection) in &mut self.connections.connections {
self.temp_responses_buffer.clear();
if let Ok(()) = connection.try_recv_into(&mut self.temp_responses_buffer) {
for response in self.temp_responses_buffer.drain(..) {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestTy::Sent { destination, index } =
&mut pending_request.ty
{
if destination == connection_destination {
// Store the PendingRequestTy::Received at the location of the next PendingRequestTy::Sent
// All other PendingRequestTy::Sent need to be decremented, in order to determine the PendingRequestTy::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.ty = PendingRequestTy::Received {
response: response.take().unwrap(),
};
} else {
*index -= 1;
match connection.try_recv_into(&mut self.temp_responses_buffer) {
Ok(()) => {
for response in self.temp_responses_buffer.drain(..) {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestTy::Sent { destination, index } =
&mut pending_request.ty
{
if destination == connection_destination {
// Store the PendingRequestTy::Received at the location of the next PendingRequestTy::Sent
// All other PendingRequestTy::Sent need to be decremented, in order to determine the PendingRequestTy::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.ty = PendingRequestTy::Received {
response: response.take().unwrap(),
};
} else {
*index -= 1;
}
}
}
}
}
}
Err(err) => connections_to_reopen.push((*connection_destination, err)),
}
}

for (destination, err) in connections_to_reopen {
// Attempt to reopen connection for the side effects of:
// * setting node state to down
// * removing connections to down nodes so we dont continue attempting to receive from it.
// We do not attempt to receive from the node again, since if there actually were any pending responses we would need to give up.
self.connections
.handle_connection_error(
&mut self.rng,
&self.connection_factory,
&self.authorize_scram_over_mtls,
&self.sasl_mechanism,
&self.nodes,
&self.first_contact_points,
destination,
err.into(),
)
.await?;
}

// Remove and return all PendingRequestTy::Received that are ready to be received.
let mut responses = vec![];
while let Some(pending_request) = self.pending_requests.front() {
Expand Down
1 change: 0 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ pub struct KafkaNode {
pub broker_id: BrokerId,
pub rack: Option<StrBytes>,
pub kafka_address: KafkaAddress,
#[allow(unused)]
pub state: Arc<AtomicNodeState>,
}

Expand Down

0 comments on commit 96c6abc

Please sign in to comment.