From 84f3564f24e71070776fc5e71ae32503c23bb5f6 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 14 Mar 2024 09:43:20 +1100 Subject: [PATCH] Make KafkaSinkCluster rack aware --- .../benches/windsock/kafka/bench.rs | 9 +- .../kafka/cluster-tls/topology.yaml | 6 +- .../src/transforms/kafka/sink_cluster/mod.rs | 236 ++++++++++-------- .../src/transforms/kafka/sink_cluster/node.rs | 4 +- 4 files changed, 148 insertions(+), 107 deletions(-) diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 951cbcb02..51428060f 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -13,7 +13,7 @@ use itertools::Itertools; use shotover::config::chain::TransformChainConfig; use shotover::sources::SourceConfig; use shotover::transforms::debug::force_parse::DebugForceEncodeConfig; -use shotover::transforms::kafka::sink_cluster::KafkaSinkClusterConfig; +use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, ShotoverNodeConfig}; use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig; use shotover::transforms::TransformConfig; use std::sync::Arc; @@ -92,7 +92,12 @@ impl KafkaBench { connect_timeout_ms: 3000, read_timeout: None, first_contact_points: vec![kafka_address], - shotover_nodes: vec![host_address.clone()], + shotover_nodes: vec![ShotoverNodeConfig { + address: host_address.parse().unwrap(), + rack: "rack1".into(), + broker_id: 0, + }], + local_shotover_broker_id: 0, tls: None, }), }); diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml index 1d4a8b514..c8a112520 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml @@ -5,7 +5,11 @@ sources: listen_addr: "127.0.0.1:9192" chain: - KafkaSinkCluster: - shotover_nodes: ["127.0.0.1:9192"] + shotover_nodes: + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 0 + local_shotover_broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 tls: diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index f09ee1a7f..547674607 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -9,8 +9,8 @@ use crate::transforms::{TransformConfig, TransformContextConfig}; use anyhow::{anyhow, Result}; use async_trait::async_trait; use dashmap::DashMap; -use kafka_protocol::messages::find_coordinator_response::Coordinator; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; +use kafka_protocol::messages::metadata_response::MetadataResponseBroker; use kafka_protocol::messages::{ ApiKey, BrokerId, FindCoordinatorRequest, GroupId, HeartbeatRequest, JoinGroupRequest, MetadataRequest, MetadataResponse, OffsetFetchRequest, RequestHeader, SyncGroupRequest, @@ -22,7 +22,6 @@ use rand::rngs::SmallRng; use rand::seq::{IteratorRandom, SliceRandom}; use rand::SeedableRng; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::hash::Hasher; use std::net::SocketAddr; use std::sync::atomic::AtomicI64; @@ -30,6 +29,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::time::timeout; +use uuid::Uuid; mod node; @@ -37,12 +37,43 @@ mod node; #[serde(deny_unknown_fields)] pub struct KafkaSinkClusterConfig { pub first_contact_points: Vec, - pub shotover_nodes: Vec, + pub shotover_nodes: Vec, + // TODO: This could possibly be deleted if we dont actually need it + pub local_shotover_broker_id: i32, pub connect_timeout_ms: u64, pub read_timeout: Option, pub tls: Option, } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct ShotoverNodeConfig { + pub address: SocketAddr, + pub rack: String, + pub broker_id: i32, +} + +impl ShotoverNodeConfig { + fn build(self) -> ShotoverNode { + let address = KafkaAddress { + host: StrBytes::from_string(self.address.ip().to_string()), + port: self.address.port() as i32, + }; + ShotoverNode { + address, + rack: StrBytes::from_string(self.rack), + broker_id: BrokerId(self.broker_id), + } + } +} + +#[derive(Clone)] +pub struct ShotoverNode { + pub address: KafkaAddress, + pub rack: StrBytes, + pub broker_id: BrokerId, +} + const NAME: &str = "KafkaSinkCluster"; #[typetag::serde(name = "KafkaSinkCluster")] #[async_trait(?Send)] @@ -66,7 +97,7 @@ impl TransformConfig for KafkaSinkClusterConfig { pub struct KafkaSinkClusterBuilder { // contains address and port first_contact_points: Vec, - shotover_nodes: Vec, + shotover_nodes: Vec, connect_timeout: Duration, read_timeout: Option, controller_broker: Arc, @@ -79,7 +110,7 @@ pub struct KafkaSinkClusterBuilder { impl KafkaSinkClusterBuilder { pub fn new( first_contact_points: Vec, - shotover_nodes: Vec, + shotover_nodes: Vec, _chain_name: String, connect_timeout_ms: u64, timeout: Option, @@ -87,16 +118,11 @@ impl KafkaSinkClusterBuilder { ) -> KafkaSinkClusterBuilder { let receive_timeout = timeout.map(Duration::from_secs); - let shotover_nodes = shotover_nodes + let mut shotover_nodes: Vec<_> = shotover_nodes .into_iter() - .map(|node| { - let address: SocketAddr = node.parse().unwrap(); - KafkaAddress { - host: StrBytes::from_string(address.ip().to_string()), - port: address.port() as i32, - } - }) + .map(ShotoverNodeConfig::build) .collect(); + shotover_nodes.sort_by_key(|x| x.broker_id); KafkaSinkClusterBuilder { first_contact_points, @@ -162,7 +188,7 @@ impl AtomicBrokerId { pub struct KafkaSinkCluster { first_contact_points: Vec, - shotover_nodes: Vec, + shotover_nodes: Vec, pushed_messages_tx: Option>, read_timeout: Option, nodes: Vec, @@ -193,6 +219,7 @@ impl Transform for KafkaSinkCluster { Ok(KafkaNode::new( BrokerId(-1), KafkaAddress::from_str(address)?, + None, //todo )) }) .collect(); @@ -522,6 +549,7 @@ impl KafkaSinkCluster { })) => Ok(KafkaNode::new( coordinator.node_id, KafkaAddress::new(coordinator.host.clone(), coordinator.port), + None, // TODO: new variant to represent unknown )), other => Err(anyhow!( "Unexpected message returned to findcoordinator request {other:?}" @@ -622,7 +650,7 @@ impl KafkaSinkCluster { &mut coordinator.port, ) } - deduplicate_coordinators(&mut find_coordinator.coordinators); + //deduplicate_coordinators(&mut find_coordinator.coordinators); } response.invalidate_cache(); } @@ -632,25 +660,23 @@ impl KafkaSinkCluster { })) => { self.process_metadata(metadata).await; - for (_, broker) in &mut metadata.brokers { - rewrite_address(&self.shotover_nodes, &mut broker.host, &mut broker.port); - } - deduplicate_metadata_brokers(metadata); + self.rewrite_metadata_response(metadata)?; response.invalidate_cache(); } Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::DescribeCluster(describe_cluster), + body: ResponseBody::DescribeCluster(_describe_cluster), .. })) => { - for broker in &mut describe_cluster.brokers { - rewrite_address( - &self.shotover_nodes, - &mut broker.1.host, - &mut broker.1.port, - ) - } - response.invalidate_cache(); + // for broker in &mut describe_cluster.brokers { + // rewrite_address( + // &self.shotover_nodes, + // &mut broker.1.host, + // &mut broker.1.port, + // ) + // } + // response.invalidate_cache(); + todo!("Is this ever sent?") } _ => {} } @@ -726,7 +752,11 @@ impl KafkaSinkCluster { async fn process_metadata(&mut self, metadata: &MetadataResponse) { for (id, broker) in &metadata.brokers { - let node = KafkaNode::new(*id, KafkaAddress::new(broker.host.clone(), broker.port)); + let node = KafkaNode::new( + *id, + KafkaAddress::new(broker.host.clone(), broker.port), + broker.rack.clone(), + ); self.add_node_if_new(node).await; } @@ -748,6 +778,71 @@ impl KafkaSinkCluster { } } + /// Rewrite metadata response to appear as if the shotover cluster is the real cluster and the real kafka brokers do not exist + fn rewrite_metadata_response(&self, metadata: &mut MetadataResponse) -> Result<()> { + // Overwrite list of brokers with the list of shotover nodes + metadata.brokers = self + .shotover_nodes + .iter() + .map(|shotover_node| { + ( + shotover_node.broker_id, + MetadataResponseBroker::builder() + .host(shotover_node.address.host.clone()) + .port(shotover_node.address.port) + .rack(Some(shotover_node.rack.clone())) + .build() + .unwrap(), + ) + }) + .collect(); + + // Overwrite the list of partitions to point at all shotover nodes within the same rack + for (_, topic) in &mut metadata.topics { + for partition in &mut topic.partitions { + // Deterministically choose a shotover node in the rack as leader based on topic + partition id + // TODO: rackify + let hash = hash_partition(topic.topic_id.clone(), partition.partition_index); + let shotover_node = self.shotover_nodes[hash % self.shotover_nodes.len()]; + partition.leader_id = shotover_node.broker_id; + + // take all shotover nodes in the rack + partition.replica_nodes = partition.replica_nodes.iter().map(|x| x).collect(); + } + } + + if let Some(controller_node) = self + .nodes + .iter() + .find(|node| node.broker_id == metadata.controller_id) + { + // If broker has no rack - use the first shotover node + // If broker has rack - use the first shotover node with the same rack + // This is deterministic because the list of shotover nodes is sorted. + if let Some(shotover_node) = self.shotover_nodes.iter().find(|shotover_node| { + controller_node + .rack + .as_ref() + .map(|rack| rack == &shotover_node.rack) + .unwrap_or(true) + }) { + metadata.controller_id = shotover_node.broker_id; + } else { + tracing::warn!( + "No shotover node configured to handle kafka rack {:?}", + controller_node.rack + ); + } + } else { + return Err(anyhow!( + "Invalid metadata, controller points at unknown node {:?}", + metadata.controller_id + )); + } + + Ok(()) + } + async fn add_node_if_new(&mut self, new_node: KafkaNode) { let new = self .nodes_shared @@ -778,89 +873,24 @@ fn hash_address(host: &str, port: i32) -> u64 { hasher.finish() } -fn rewrite_address(shotover_nodes: &[KafkaAddress], host: &mut StrBytes, port: &mut i32) { +fn hash_partition(topic_id: Uuid, partition_index: i32) -> usize { + let mut hasher = xxhash_rust::xxh3::Xxh3::new(); + hasher.write(topic_id.as_bytes()); + hasher.write(&partition_index.to_be_bytes()); + hasher.finish() as usize +} + +fn rewrite_address(shotover_nodes: &[ShotoverNode], host: &mut StrBytes, port: &mut i32) { // do not attempt to rewrite if the port is not provided (-1) // this is known to occur in an error response if *port >= 0 { let shotover_node = - &shotover_nodes[hash_address(host, *port) as usize % shotover_nodes.len()]; + &shotover_nodes[hash_address(host, *port) as usize % shotover_nodes.len()].address; *host = shotover_node.host.clone(); *port = shotover_node.port; } } -/// The rdkafka driver has been observed to get stuck when there are multiple brokers with identical host and port. -/// This function deterministically rewrites metadata to avoid such duplication. -fn deduplicate_metadata_brokers(metadata: &mut MetadataResponse) { - struct SeenBroker { - pub id: BrokerId, - pub address: KafkaAddress, - } - let mut seen: Vec = vec![]; - let mut replacement_broker_id = HashMap::new(); - - // ensure deterministic results across shotover instances by first sorting the list of brokers by their broker id - metadata.brokers.sort_keys(); - - // populate replacement_broker_id. - // This is used both to determine which brokers to delete and which broker ids to use as a replacement for deleted brokers. - for (id, broker) in &mut metadata.brokers { - let address = KafkaAddress { - host: broker.host.clone(), - port: broker.port, - }; - broker.rack = None; - if let Some(replacement) = seen.iter().find(|x| x.address == address) { - replacement_broker_id.insert(*id, replacement.id); - } - seen.push(SeenBroker { address, id: *id }); - } - - // remove brokers with duplicate addresses - for (original, _replacement) in replacement_broker_id.iter() { - metadata.brokers.swap_remove(original); - } - - // In the previous step some broker id's were removed but we might be referring to those id's elsewhere in the message. - // If there are any such cases fix them by changing the id to refer to the equivalent undeleted broker. - for (_, topic) in &mut metadata.topics { - for partition in &mut topic.partitions { - if let Some(id) = replacement_broker_id.get(&partition.leader_id) { - partition.leader_id = *id; - } - for replica_node in &mut partition.replica_nodes { - if let Some(id) = replacement_broker_id.get(replica_node) { - *replica_node = *id - } - } - } - } - if let Some(id) = replacement_broker_id.get(&metadata.controller_id) { - metadata.controller_id = *id; - } -} - -/// We havent observed any failures due to duplicates in findcoordinator messages like we have in metadata messages. -/// But there might be similar issues lurking in other drivers so deduplicating seems reasonable. -fn deduplicate_coordinators(coordinators: &mut Vec) { - let mut seen = vec![]; - let mut to_delete = vec![]; - for (i, coordinator) in coordinators.iter().enumerate() { - let address = KafkaAddress { - host: coordinator.host.clone(), - port: coordinator.port, - }; - if seen.contains(&address) { - to_delete.push(i) - } - seen.push(address); - } - - for to_delete in to_delete.iter().rev() { - coordinators.remove(*to_delete); - } -} - #[derive(Debug)] struct Topic { partitions: Vec, diff --git a/shotover/src/transforms/kafka/sink_cluster/node.rs b/shotover/src/transforms/kafka/sink_cluster/node.rs index afe059525..1b3a8aed1 100644 --- a/shotover/src/transforms/kafka/sink_cluster/node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/node.rs @@ -70,15 +70,17 @@ impl KafkaAddress { #[derive(Clone)] pub struct KafkaNode { pub broker_id: BrokerId, + pub rack: Option, pub kafka_address: KafkaAddress, connection: Option, } impl KafkaNode { - pub fn new(broker_id: BrokerId, kafka_address: KafkaAddress) -> Self { + pub fn new(broker_id: BrokerId, kafka_address: KafkaAddress, rack: Option) -> Self { KafkaNode { broker_id, kafka_address, + rack, connection: None, } }