From 56b292237302a172bd52bf751f830227c2e81d89 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 28 May 2024 17:18:42 +1000 Subject: [PATCH 1/2] KafkaSinkCluster: rack aware routing for fetch requests (#1637) --- .../src/transforms/kafka/sink_cluster/mod.rs | 72 +++++++++++++++---- 1 file changed, 60 insertions(+), 12 deletions(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index cd64743b2..f03b1d446 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -22,6 +22,7 @@ use kafka_protocol::messages::{ }; use kafka_protocol::protocol::{Builder, StrBytes}; use kafka_protocol::ResponseError; +use metrics::{counter, Counter}; use node::{ConnectionFactory, KafkaAddress, KafkaNode}; use rand::rngs::SmallRng; use rand::seq::{IteratorRandom, SliceRandom}; @@ -95,7 +96,7 @@ const NAME: &str = "KafkaSinkCluster"; impl TransformConfig for KafkaSinkClusterConfig { async fn get_builder( &self, - _transform_context: TransformContextConfig, + transform_context: TransformContextConfig, ) -> Result> { let tls = self.tls.clone().map(TlsConnector::new).transpose()?; @@ -119,6 +120,7 @@ impl TransformConfig for KafkaSinkClusterConfig { shotover_nodes.sort_by_key(|x| x.broker_id); Ok(Box::new(KafkaSinkClusterBuilder::new( + transform_context.chain_name, self.first_contact_points.clone(), &self.authorize_scram_over_mtls, shotover_nodes, @@ -152,10 +154,13 @@ pub struct KafkaSinkClusterBuilder { nodes_shared: Arc>>, authorize_scram_over_mtls: Option, tls: Option, + out_of_rack_requests: Counter, } impl KafkaSinkClusterBuilder { + #[allow(clippy::too_many_arguments)] pub fn new( + chain_name: String, first_contact_points: Vec, authorize_scram_over_mtls: &Option, shotover_nodes: Vec, @@ -182,6 +187,7 @@ impl KafkaSinkClusterBuilder { topic_by_name: Arc::new(DashMap::new()), topic_by_id: Arc::new(DashMap::new()), nodes_shared: Arc::new(RwLock::new(vec![])), + out_of_rack_requests: counter!("shotover_out_of_rack_requests_count", "chain" => chain_name, "transform" => NAME), tls, }) } @@ -192,7 +198,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { Box::new(KafkaSinkCluster { first_contact_points: self.first_contact_points.clone(), shotover_nodes: self.shotover_nodes.clone(), - _rack: self.rack.clone(), + rack: self.rack.clone(), nodes: vec![], nodes_shared: self.nodes_shared.clone(), controller_broker: self.controller_broker.clone(), @@ -214,6 +220,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { temp_responses_buffer: Default::default(), sasl_mechanism: None, authorize_scram_over_mtls: self.authorize_scram_over_mtls.as_ref().map(|x| x.build()), + out_of_rack_requests: self.out_of_rack_requests.clone(), }) } @@ -251,8 +258,7 @@ impl AtomicBrokerId { pub struct KafkaSinkCluster { first_contact_points: Vec, shotover_nodes: Vec, - // TODO: use this for rack aware routing - _rack: StrBytes, + rack: StrBytes, nodes: Vec, nodes_shared: Arc>>, controller_broker: Arc, @@ -272,6 +278,7 @@ pub struct KafkaSinkCluster { temp_responses_buffer: Vec, sasl_mechanism: Option, authorize_scram_over_mtls: Option, + out_of_rack_requests: Counter, } /// State of a Request/Response is maintained by this enum. @@ -734,12 +741,30 @@ impl KafkaSinkCluster { let destination = if let Some(partition) = topic_meta.partitions.get(partition_index) { - self.nodes + if let Some(node) = self + .nodes .iter_mut() - .filter(|node| partition.replica_nodes.contains(&node.broker_id)) + .filter(|node| { + partition + .shotover_rack_replica_nodes + .contains(&node.broker_id) + }) .choose(&mut self.rng) - .unwrap() - .broker_id + { + node.broker_id + } else { + self.out_of_rack_requests.increment(1); + self.nodes + .iter_mut() + .filter(|node| { + partition + .external_rack_replica_nodes + .contains(&node.broker_id) + }) + .choose(&mut self.rng) + .unwrap() + .broker_id + } } else { let partition_len = topic_meta.partitions.len(); let topic_name = Self::format_topic_name(&topic); @@ -1314,8 +1339,19 @@ impl KafkaSinkCluster { .iter() .map(|partition| Partition { index: partition.partition_index, - leader_id: *partition.leader_id, - replica_nodes: partition.replica_nodes.iter().map(|x| x.0).collect(), + leader_id: partition.leader_id, + shotover_rack_replica_nodes: partition + .replica_nodes + .iter() + .cloned() + .filter(|replica_node_id| self.broker_within_rack(*replica_node_id)) + .collect(), + external_rack_replica_nodes: partition + .replica_nodes + .iter() + .cloned() + .filter(|replica_node_id| !self.broker_within_rack(*replica_node_id)) + .collect(), }) .collect(); partitions.sort_by_key(|x| x.index); @@ -1543,6 +1579,17 @@ impl KafkaSinkCluster { self.update_local_nodes().await; } } + + fn broker_within_rack(&self, broker_id: BrokerId) -> bool { + self.nodes.iter().any(|node| { + node.broker_id == broker_id + && node + .rack + .as_ref() + .map(|rack| rack == &self.rack) + .unwrap_or(false) + }) + } } fn hash_partition(topic_id: Uuid, partition_index: i32) -> usize { @@ -1560,8 +1607,9 @@ struct Topic { #[derive(Debug, Clone)] struct Partition { index: i32, - leader_id: i32, - replica_nodes: Vec, + leader_id: BrokerId, + shotover_rack_replica_nodes: Vec, + external_rack_replica_nodes: Vec, } struct FindCoordinator { From c1ee55fac8da878292f30096593f11b0c40732b2 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 29 May 2024 12:29:59 +1000 Subject: [PATCH 2/2] scram_over_mtls task minor refactor (#1638) --- shotover/src/message/mod.rs | 19 ++++++ .../kafka/sink_cluster/scram_over_mtls.rs | 64 +++++++++++-------- 2 files changed, 57 insertions(+), 26 deletions(-) diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 21475f248..db754a20b 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -212,6 +212,25 @@ impl Message { } } + /// Same as [`Message::frame`] but consumes the message and returns an owned [`Frame`] + /// It is useful when the transform generates a request and consumes the response without the involvement of the client. + pub fn into_frame(mut self) -> Option { + let (inner, result) = self.inner.take().unwrap().ensure_parsed(self.codec_state); + if let Err(err) = result { + // TODO: If we could include a stacktrace in this error it would be really helpful + tracing::error!("{:?}", err.context("Failed to parse frame")); + return None; + } + + match inner { + MessageInner::RawBytes { .. } => { + unreachable!("Cannot be RawBytes because ensure_parsed was called") + } + MessageInner::Parsed { frame, .. } => Some(frame), + MessageInner::Modified { frame } => Some(frame), + } + } + /// Return the shotover assigned MessageId pub fn id(&self) -> MessageId { self.id diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs index db70b0f5e..f83c3bc6a 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs @@ -117,7 +117,11 @@ async fn task( // + From our testing delegation tokens should be propagated within 0.5s to 1s on unloaded kafka clusters of size 15 to 30 nodes. let token = tokio::time::timeout( Duration::from_secs(120), - create_delegation_token_for_user(&mut connections, username.clone(), &mut rng), + create_delegation_token_for_user_with_wait( + &mut connections, + username.clone(), + &mut rng, + ), ) .await .with_context(|| format!("Delegation token creation for {username:?} timedout"))? @@ -200,11 +204,26 @@ pub enum OriginalScramState { AuthSuccess, } -pub async fn create_delegation_token_for_user( +pub async fn create_delegation_token_for_user_with_wait( connections: &mut [SinkConnection], username: StrBytes, rng: &mut SmallRng, ) -> Result { + let create_response = create_delegation_token_for_user(connections, &username, rng).await?; + wait_until_delegation_token_ready_on_all_brokers(connections, &create_response, username) + .await?; + + Ok(DelegationToken { + token_id: create_response.token_id.as_str().to_owned(), + hmac: StrBytes::from_string(general_purpose::STANDARD.encode(&create_response.hmac)), + }) +} + +pub async fn create_delegation_token_for_user( + connections: &mut [SinkConnection], + username: &StrBytes, + rng: &mut SmallRng, +) -> Result { let connection = connections.choose_mut(rng).unwrap(); connection.send(vec![Message::from_frame(Frame::Kafka( KafkaFrame::Request { @@ -222,32 +241,25 @@ pub async fn create_delegation_token_for_user( ), }, ))])?; - let mut response = connection.recv().await?.pop().unwrap(); - let create_response = if let Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::CreateDelegationToken(response), - .. - })) = response.frame() - { - if let Some(err) = ResponseError::try_from_code(response.error_code) { - return Err(anyhow!( - "kafka responded to CreateDelegationToken with error {err}", - )); - } else { - response + + let response = connection.recv().await?.pop().unwrap(); + match response.into_frame() { + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::CreateDelegationToken(response), + .. + })) => { + if let Some(err) = ResponseError::try_from_code(response.error_code) { + Err(anyhow!( + "kafka responded to CreateDelegationToken with error {err}", + )) + } else { + Ok(response) + } } - } else { - return Err(anyhow!( + response => Err(anyhow!( "Unexpected response to CreateDelegationToken {response:?}" - )); - }; - - wait_until_delegation_token_ready_on_all_brokers(connections, create_response, username) - .await?; - - Ok(DelegationToken { - token_id: create_response.token_id.as_str().to_owned(), - hmac: StrBytes::from_string(general_purpose::STANDARD.encode(&create_response.hmac)), - }) + )), + } } async fn wait_until_delegation_token_ready_on_all_brokers(