diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index e6713fa64..13f99c86b 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -13,7 +13,7 @@ use itertools::Itertools; use shotover::config::chain::TransformChainConfig; use shotover::sources::SourceConfig; use shotover::transforms::debug::force_parse::DebugForceEncodeConfig; -use shotover::transforms::kafka::sink_cluster::KafkaSinkClusterConfig; +use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, ShotoverNodeConfig}; use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig; use shotover::transforms::TransformConfig; use std::sync::Arc; @@ -92,7 +92,11 @@ impl KafkaBench { connect_timeout_ms: 3000, read_timeout: None, first_contact_points: vec![kafka_address], - shotover_nodes: vec![host_address.clone()], + shotover_nodes: vec![ShotoverNodeConfig { + address: host_address.parse().unwrap(), + rack: "rack1".into(), + broker_id: 0, + }], tls: None, sasl_enabled: Some(false), }), diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index dd2d416e7..74db991c3 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -192,30 +192,6 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) { } } -#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning -#[rstest] -#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] -// #[case::java(KafkaDriver::Java)] -#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear -async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) { - let _docker_compose = - docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml"); - let shotover = - shotover_process("tests/test-configs/kafka/cluster-2-racks/topology-single.yaml") - .start() - .await; - - let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::basic(connection_builder).await; - - tokio::time::timeout( - Duration::from_secs(10), - shotover.shutdown_and_then_consume_events(&[]), - ) - .await - .expect("Shotover did not shutdown within 10s"); -} - #[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning #[rstest] #[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))] diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml index d38206e7f..f2d8f4368 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml @@ -5,6 +5,9 @@ sources: listen_addr: "127.0.0.1:9192" chain: - KafkaSinkCluster: - shotover_nodes: ["127.0.0.1:9192"] + shotover_nodes: + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml index 11edd3e78..15fe492e1 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml @@ -5,9 +5,15 @@ sources: listen_addr: "127.0.0.1:9191" chain: - KafkaSinkCluster: - shotover_nodes: - - "127.0.0.1:9191" - - "127.0.0.1:9192" - - "127.0.0.1:9193" + shotover_nodes: + - address: "127.0.0.1:9191" + rack: "rack0" + broker_id: 0 + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 1 + - address: "127.0.0.1:9193" + rack: "rack0" + broker_id: 2 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml index 2269f83dc..3844a8486 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml @@ -5,9 +5,15 @@ sources: listen_addr: "127.0.0.1:9192" chain: - KafkaSinkCluster: - shotover_nodes: - - "127.0.0.1:9191" - - "127.0.0.1:9192" - - "127.0.0.1:9193" + shotover_nodes: + - address: "127.0.0.1:9191" + rack: "rack0" + broker_id: 0 + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 1 + - address: "127.0.0.1:9193" + rack: "rack0" + broker_id: 2 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml index 528030e24..0fa7eb760 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml @@ -5,9 +5,15 @@ sources: listen_addr: "127.0.0.1:9193" chain: - KafkaSinkCluster: - shotover_nodes: - - "127.0.0.1:9191" - - "127.0.0.1:9192" - - "127.0.0.1:9193" + shotover_nodes: + - address: "127.0.0.1:9191" + rack: "rack0" + broker_id: 0 + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 1 + - address: "127.0.0.1:9193" + rack: "rack0" + broker_id: 2 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml index 440ba2f13..9de259aa3 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml @@ -5,8 +5,12 @@ sources: listen_addr: "127.0.0.1:9191" chain: - KafkaSinkCluster: - shotover_nodes: - - "127.0.0.1:9191" - - "127.0.0.1:9192" + shotover_nodes: + - address: "127.0.0.1:9191" + rack: "rack1" + broker_id: 0 + - address: "127.0.0.1:9192" + rack: "rack2" + broker_id: 1 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml index ef4234294..3d98610ca 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml @@ -5,8 +5,12 @@ sources: listen_addr: "127.0.0.1:9192" chain: - KafkaSinkCluster: - shotover_nodes: - - "127.0.0.1:9191" - - "127.0.0.1:9192" + shotover_nodes: + - address: "127.0.0.1:9191" + rack: "rack1" + broker_id: 0 + - address: "127.0.0.1:9192" + rack: "rack2" + broker_id: 1 first_contact_points: ["172.16.1.5:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-single.yaml deleted file mode 100644 index d38206e7f..000000000 --- a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-single.yaml +++ /dev/null @@ -1,10 +0,0 @@ ---- -sources: - - Kafka: - name: "kafka" - listen_addr: "127.0.0.1:9192" - chain: - - KafkaSinkCluster: - shotover_nodes: ["127.0.0.1:9192"] - first_contact_points: ["172.16.1.2:9092"] - connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml index 6daa7a417..8a59741ac 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml @@ -5,7 +5,10 @@ sources: listen_addr: "127.0.0.1:9192" chain: - KafkaSinkCluster: + shotover_nodes: + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 0 sasl_enabled: true - shotover_nodes: ["127.0.0.1:9192"] first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml index 4f141c8b6..7a6990015 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml @@ -6,9 +6,15 @@ sources: chain: - KafkaSinkCluster: sasl_enabled: true - shotover_nodes: - - "127.0.0.1:9191" - - "127.0.0.1:9192" - - "127.0.0.1:9193" + shotover_nodes: + - address: "127.0.0.1:9191" + rack: "rack0" + broker_id: 0 + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 1 + - address: "127.0.0.1:9193" + rack: "rack0" + broker_id: 2 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml index 9cc781468..1fa948922 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml @@ -6,9 +6,15 @@ sources: chain: - KafkaSinkCluster: sasl_enabled: true - shotover_nodes: - - "127.0.0.1:9191" - - "127.0.0.1:9192" - - "127.0.0.1:9193" + shotover_nodes: + - address: "127.0.0.1:9191" + rack: "rack0" + broker_id: 0 + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 1 + - address: "127.0.0.1:9193" + rack: "rack0" + broker_id: 2 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml index 95e2fd375..1cf09d057 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml @@ -6,9 +6,15 @@ sources: chain: - KafkaSinkCluster: sasl_enabled: true - shotover_nodes: - - "127.0.0.1:9191" - - "127.0.0.1:9192" - - "127.0.0.1:9193" + shotover_nodes: + - address: "127.0.0.1:9191" + rack: "rack0" + broker_id: 0 + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 1 + - address: "127.0.0.1:9193" + rack: "rack0" + broker_id: 2 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml index 1d4a8b514..52829dd1b 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml @@ -5,7 +5,10 @@ sources: listen_addr: "127.0.0.1:9192" chain: - KafkaSinkCluster: - shotover_nodes: ["127.0.0.1:9192"] + shotover_nodes: + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 tls: diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 6241287cc..f936c1134 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -11,10 +11,11 @@ use async_trait::async_trait; use dashmap::DashMap; use kafka_protocol::messages::find_coordinator_response::Coordinator; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; +use kafka_protocol::messages::metadata_response::MetadataResponseBroker; use kafka_protocol::messages::{ - ApiKey, BrokerId, FindCoordinatorRequest, GroupId, HeartbeatRequest, JoinGroupRequest, - MetadataRequest, MetadataResponse, OffsetFetchRequest, RequestHeader, SyncGroupRequest, - TopicName, + ApiKey, BrokerId, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, + JoinGroupRequest, MetadataRequest, MetadataResponse, OffsetFetchRequest, RequestHeader, + SyncGroupRequest, TopicName, }; use kafka_protocol::protocol::{Builder, StrBytes}; use node::{ConnectionFactory, KafkaAddress, KafkaNode}; @@ -22,7 +23,6 @@ use rand::rngs::SmallRng; use rand::seq::{IteratorRandom, SliceRandom}; use rand::SeedableRng; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; use std::hash::Hasher; use std::net::SocketAddr; use std::sync::atomic::AtomicI64; @@ -30,6 +30,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::time::timeout; +use uuid::Uuid; mod node; @@ -37,13 +38,42 @@ mod node; #[serde(deny_unknown_fields)] pub struct KafkaSinkClusterConfig { pub first_contact_points: Vec, - pub shotover_nodes: Vec, + pub shotover_nodes: Vec, pub connect_timeout_ms: u64, pub read_timeout: Option, pub tls: Option, pub sasl_enabled: Option, } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct ShotoverNodeConfig { + pub address: SocketAddr, + pub rack: String, + pub broker_id: i32, +} + +impl ShotoverNodeConfig { + fn build(self) -> ShotoverNode { + let address = KafkaAddress { + host: StrBytes::from_string(self.address.ip().to_string()), + port: self.address.port() as i32, + }; + ShotoverNode { + address, + rack: StrBytes::from_string(self.rack), + broker_id: BrokerId(self.broker_id), + } + } +} + +#[derive(Clone)] +pub struct ShotoverNode { + pub address: KafkaAddress, + pub rack: StrBytes, + pub broker_id: BrokerId, +} + const NAME: &str = "KafkaSinkCluster"; #[typetag::serde(name = "KafkaSinkCluster")] #[async_trait(?Send)] @@ -68,7 +98,7 @@ impl TransformConfig for KafkaSinkClusterConfig { pub struct KafkaSinkClusterBuilder { // contains address and port first_contact_points: Vec, - shotover_nodes: Vec, + shotover_nodes: Vec, connect_timeout: Duration, read_timeout: Option, controller_broker: Arc, @@ -82,7 +112,7 @@ pub struct KafkaSinkClusterBuilder { impl KafkaSinkClusterBuilder { pub fn new( first_contact_points: Vec, - shotover_nodes: Vec, + shotover_nodes: Vec, _chain_name: String, connect_timeout_ms: u64, timeout: Option, @@ -91,16 +121,11 @@ impl KafkaSinkClusterBuilder { ) -> KafkaSinkClusterBuilder { let receive_timeout = timeout.map(Duration::from_secs); - let shotover_nodes = shotover_nodes + let mut shotover_nodes: Vec<_> = shotover_nodes .into_iter() - .map(|node| { - let address: SocketAddr = node.parse().unwrap(); - KafkaAddress { - host: StrBytes::from_string(address.ip().to_string()), - port: address.port() as i32, - } - }) + .map(ShotoverNodeConfig::build) .collect(); + shotover_nodes.sort_by_key(|x| x.broker_id); KafkaSinkClusterBuilder { first_contact_points, @@ -195,7 +220,7 @@ impl SaslStatus { pub struct KafkaSinkCluster { first_contact_points: Vec, - shotover_nodes: Vec, + shotover_nodes: Vec, pushed_messages_tx: Option>, read_timeout: Option, nodes: Vec, @@ -227,6 +252,7 @@ impl Transform for KafkaSinkCluster { Ok(KafkaNode::new( BrokerId(-1), KafkaAddress::from_str(address)?, + None, )) }) .collect(); @@ -344,7 +370,7 @@ impl KafkaSinkCluster { Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Metadata(metadata), .. - })) => self.process_metadata(metadata).await, + })) => self.process_metadata_response(metadata).await, other => { return Err(anyhow!( "Unexpected message returned to metadata request {other:?}" @@ -614,6 +640,7 @@ impl KafkaSinkCluster { })) => Ok(KafkaNode::new( coordinator.node_id, KafkaAddress::new(coordinator.host.clone(), coordinator.port), + None, )), other => Err(anyhow!( "Unexpected message returned to findcoordinator request {other:?}" @@ -677,7 +704,7 @@ impl KafkaSinkCluster { // TODO: Handle errors like NOT_COORDINATOR by removing element from self.topics and self.coordinator_broker_id - // Rewrite responses to use shotovers port instead of kafkas port + // Rewrite responses to ensure clients only see the shotover cluster and hide the existence of the real kafka cluster for (i, response) in responses.iter_mut().enumerate() { match response.frame() { Some(Frame::Kafka(KafkaFrame::Response { @@ -690,59 +717,27 @@ impl KafkaSinkCluster { .find(|x| x.index == i) .ok_or_else(|| anyhow!("Received find_coordinator but not requested"))?; - if *version <= 3 { - if request.key_type == 0 { - self.group_to_coordinator_broker - .insert(GroupId(request.key.clone()), find_coordinator.node_id); - } - rewrite_address( - &self.shotover_nodes, - &mut find_coordinator.host, - &mut find_coordinator.port, - ) - } else { - for coordinator in &mut find_coordinator.coordinators { - if request.key_type == 0 { - self.group_to_coordinator_broker.insert( - GroupId(coordinator.key.clone()), - find_coordinator.node_id, - ); - } - rewrite_address( - &self.shotover_nodes, - &mut coordinator.host, - &mut coordinator.port, - ) - } - deduplicate_coordinators(&mut find_coordinator.coordinators); - } + self.process_find_coordinator_response(*version, request, find_coordinator); + self.rewrite_find_coordinator_response(*version, find_coordinator); response.invalidate_cache(); } Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Metadata(metadata), .. })) => { - self.process_metadata(metadata).await; - - for (_, broker) in &mut metadata.brokers { - rewrite_address(&self.shotover_nodes, &mut broker.host, &mut broker.port); - } - deduplicate_metadata_brokers(metadata); - + self.process_metadata_response(metadata).await; + self.rewrite_metadata_response(metadata)?; response.invalidate_cache(); } Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::DescribeCluster(describe_cluster), + body: ResponseBody::DescribeCluster(_), .. })) => { - for broker in &mut describe_cluster.brokers { - rewrite_address( - &self.shotover_nodes, - &mut broker.1.host, - &mut broker.1.port, - ) - } - response.invalidate_cache(); + // If clients were to send this we would need to rewrite the broker information. + // However I dont think clients actually send this, so just error to ensure we dont break invariants. + return Err(anyhow!( + "I think this is a raft specific message and never sent by clients" + )); } _ => {} } @@ -816,9 +811,13 @@ impl KafkaSinkCluster { Ok(rx) } - async fn process_metadata(&mut self, metadata: &MetadataResponse) { + async fn process_metadata_response(&mut self, metadata: &MetadataResponse) { for (id, broker) in &metadata.brokers { - let node = KafkaNode::new(*id, KafkaAddress::new(broker.host.clone(), broker.port)); + let node = KafkaNode::new( + *id, + KafkaAddress::new(broker.host.clone(), broker.port), + broker.rack.clone(), + ); self.add_node_if_new(node).await; } @@ -840,6 +839,202 @@ impl KafkaSinkCluster { } } + fn process_find_coordinator_response( + &mut self, + version: i16, + request: &FindCoordinator, + find_coordinator: &FindCoordinatorResponse, + ) { + if request.key_type == 0 { + if version <= 3 { + self.group_to_coordinator_broker + .insert(GroupId(request.key.clone()), find_coordinator.node_id); + } else { + for coordinator in &find_coordinator.coordinators { + self.group_to_coordinator_broker + .insert(GroupId(coordinator.key.clone()), find_coordinator.node_id); + } + } + } + } + + fn rewrite_find_coordinator_response( + &self, + version: i16, + find_coordinator: &mut FindCoordinatorResponse, + ) { + if version <= 3 { + // for version <= 3 we need to make do with only one coordinator. + // so we just pick the first shotover node in the rack of the coordinator. + + // skip rewriting on error + if find_coordinator.error_code == 0 { + let coordinator_rack = &self + .nodes + .iter() + .find(|x| x.broker_id == find_coordinator.node_id) + .unwrap() + .rack + .as_ref(); + let shotover_node = self + .shotover_nodes + .iter() + .find(|shotover_node| { + coordinator_rack + .map(|rack| rack == &shotover_node.rack) + .unwrap_or(true) + }) + .unwrap(); + find_coordinator.host = shotover_node.address.host.clone(); + find_coordinator.port = shotover_node.address.port; + find_coordinator.node_id = shotover_node.broker_id; + } + } else { + // for version > 3 we can include as many coordinators as we want. + // so we include all shotover nodes in the rack of the coordinator. + let mut shotover_coordinators: Vec = vec![]; + for coordinator in find_coordinator.coordinators.drain(..) { + if coordinator.error_code == 0 { + let coordinator_rack = &self + .nodes + .iter() + .find(|x| x.broker_id == coordinator.node_id) + .unwrap() + .rack + .as_ref(); + for shotover_node in self.shotover_nodes.iter().filter(|shotover_node| { + coordinator_rack + .map(|rack| rack == &shotover_node.rack) + .unwrap_or(true) + }) { + if !shotover_coordinators + .iter() + .any(|x| x.node_id == shotover_node.broker_id) + { + shotover_coordinators.push( + Coordinator::builder() + .node_id(shotover_node.broker_id) + .host(shotover_node.address.host.clone()) + .port(shotover_node.address.port) + .build() + .unwrap(), + ); + } + } + } else { + // pass errors through untouched + shotover_coordinators.push(coordinator) + } + } + find_coordinator.coordinators = shotover_coordinators; + } + } + + /// Rewrite metadata response to appear as if the shotover cluster is the real cluster and the real kafka brokers do not exist + fn rewrite_metadata_response(&self, metadata: &mut MetadataResponse) -> Result<()> { + // Overwrite list of brokers with the list of shotover nodes + metadata.brokers = self + .shotover_nodes + .iter() + .map(|shotover_node| { + ( + shotover_node.broker_id, + MetadataResponseBroker::builder() + .host(shotover_node.address.host.clone()) + .port(shotover_node.address.port) + .rack(Some(shotover_node.rack.clone())) + .build() + .unwrap(), + ) + }) + .collect(); + + // Overwrite the list of partitions to point at all shotover nodes within the same rack + for (_, topic) in &mut metadata.topics { + for partition in &mut topic.partitions { + // Deterministically choose a single shotover node in the rack as leader based on topic + partition id + let leader_rack = self + .nodes + .iter() + .find(|x| x.broker_id == *partition.leader_id) + .map(|x| x.rack.clone()) + .unwrap(); + let shotover_nodes_in_rack: Vec<_> = self + .shotover_nodes + .iter() + .filter(|shotover_node| { + leader_rack + .as_ref() + .map(|rack| rack == &shotover_node.rack) + .unwrap_or(true) + }) + .collect(); + let hash = hash_partition(topic.topic_id, partition.partition_index); + let shotover_node = &shotover_nodes_in_rack[hash % shotover_nodes_in_rack.len()]; + partition.leader_id = shotover_node.broker_id; + + // Every replica node has its entire corresponding shotover rack included. + // Since we can set as many replica nodes as we like, we take this all out approach. + // This ensures that: + // * metadata is deterministic and therefore the same on all shotover nodes + // * clients evenly distribute their queries across shotover nodes + let mut shotover_replica_nodes = vec![]; + for replica_node in &partition.replica_nodes { + let rack = self + .nodes + .iter() + .find(|x| x.broker_id == *replica_node) + .map(|x| x.rack.clone()) + .unwrap(); + for shotover_node in &self.shotover_nodes { + // If broker has no rack - use all shotover nodes + // If broker has rack - use all shotover nodes with the same rack + if rack + .as_ref() + .map(|rack| rack == &shotover_node.rack) + .unwrap_or(true) + && !shotover_replica_nodes.contains(&shotover_node.broker_id) + { + shotover_replica_nodes.push(shotover_node.broker_id); + } + } + } + partition.replica_nodes = shotover_replica_nodes; + } + } + + if let Some(controller_node) = self + .nodes + .iter() + .find(|node| node.broker_id == metadata.controller_id) + { + // If broker has no rack - use the first shotover node + // If broker has rack - use the first shotover node with the same rack + // This is deterministic because the list of shotover nodes is sorted. + if let Some(shotover_node) = self.shotover_nodes.iter().find(|shotover_node| { + controller_node + .rack + .as_ref() + .map(|rack| rack == &shotover_node.rack) + .unwrap_or(true) + }) { + metadata.controller_id = shotover_node.broker_id; + } else { + tracing::warn!( + "No shotover node configured to handle kafka rack {:?}", + controller_node.rack + ); + } + } else { + return Err(anyhow!( + "Invalid metadata, controller points at unknown node {:?}", + metadata.controller_id + )); + } + + Ok(()) + } + async fn add_node_if_new(&mut self, new_node: KafkaNode) { let new = self .nodes_shared @@ -863,94 +1058,11 @@ async fn read_responses(responses: Vec>) -> Result u64 { +fn hash_partition(topic_id: Uuid, partition_index: i32) -> usize { let mut hasher = xxhash_rust::xxh3::Xxh3::new(); - hasher.write(host.as_bytes()); - hasher.write(&port.to_be_bytes()); - hasher.finish() -} - -fn rewrite_address(shotover_nodes: &[KafkaAddress], host: &mut StrBytes, port: &mut i32) { - // do not attempt to rewrite if the port is not provided (-1) - // this is known to occur in an error response - if *port >= 0 { - let shotover_node = - &shotover_nodes[hash_address(host, *port) as usize % shotover_nodes.len()]; - *host = shotover_node.host.clone(); - *port = shotover_node.port; - } -} - -/// The rdkafka driver has been observed to get stuck when there are multiple brokers with identical host and port. -/// This function deterministically rewrites metadata to avoid such duplication. -fn deduplicate_metadata_brokers(metadata: &mut MetadataResponse) { - struct SeenBroker { - pub id: BrokerId, - pub address: KafkaAddress, - } - let mut seen: Vec = vec![]; - let mut replacement_broker_id = HashMap::new(); - - // ensure deterministic results across shotover instances by first sorting the list of brokers by their broker id - metadata.brokers.sort_keys(); - - // populate replacement_broker_id. - // This is used both to determine which brokers to delete and which broker ids to use as a replacement for deleted brokers. - for (id, broker) in &mut metadata.brokers { - let address = KafkaAddress { - host: broker.host.clone(), - port: broker.port, - }; - broker.rack = None; - if let Some(replacement) = seen.iter().find(|x| x.address == address) { - replacement_broker_id.insert(*id, replacement.id); - } - seen.push(SeenBroker { address, id: *id }); - } - - // remove brokers with duplicate addresses - for (original, _replacement) in replacement_broker_id.iter() { - metadata.brokers.swap_remove(original); - } - - // In the previous step some broker id's were removed but we might be referring to those id's elsewhere in the message. - // If there are any such cases fix them by changing the id to refer to the equivalent undeleted broker. - for (_, topic) in &mut metadata.topics { - for partition in &mut topic.partitions { - if let Some(id) = replacement_broker_id.get(&partition.leader_id) { - partition.leader_id = *id; - } - for replica_node in &mut partition.replica_nodes { - if let Some(id) = replacement_broker_id.get(replica_node) { - *replica_node = *id - } - } - } - } - if let Some(id) = replacement_broker_id.get(&metadata.controller_id) { - metadata.controller_id = *id; - } -} - -/// We havent observed any failures due to duplicates in findcoordinator messages like we have in metadata messages. -/// But there might be similar issues lurking in other drivers so deduplicating seems reasonable. -fn deduplicate_coordinators(coordinators: &mut Vec) { - let mut seen = vec![]; - let mut to_delete = vec![]; - for (i, coordinator) in coordinators.iter().enumerate() { - let address = KafkaAddress { - host: coordinator.host.clone(), - port: coordinator.port, - }; - if seen.contains(&address) { - to_delete.push(i) - } - seen.push(address); - } - - for to_delete in to_delete.iter().rev() { - coordinators.remove(*to_delete); - } + hasher.write(topic_id.as_bytes()); + hasher.write(&partition_index.to_be_bytes()); + hasher.finish() as usize } #[derive(Debug)] diff --git a/shotover/src/transforms/kafka/sink_cluster/node.rs b/shotover/src/transforms/kafka/sink_cluster/node.rs index 5f6f4a9ce..d3ad0b1b1 100644 --- a/shotover/src/transforms/kafka/sink_cluster/node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/node.rs @@ -108,18 +108,20 @@ impl KafkaAddress { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct KafkaNode { pub broker_id: BrokerId, + pub rack: Option, pub kafka_address: KafkaAddress, connection: Option, } impl KafkaNode { - pub fn new(broker_id: BrokerId, kafka_address: KafkaAddress) -> Self { + pub fn new(broker_id: BrokerId, kafka_address: KafkaAddress, rack: Option) -> Self { KafkaNode { broker_id, kafka_address, + rack, connection: None, } }