Skip to content

Commit

Permalink
Make KafkaSinkCluster rack aware
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 14, 2024
1 parent 2bef5e7 commit 84f3564
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 107 deletions.
9 changes: 7 additions & 2 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use itertools::Itertools;
use shotover::config::chain::TransformChainConfig;
use shotover::sources::SourceConfig;
use shotover::transforms::debug::force_parse::DebugForceEncodeConfig;
use shotover::transforms::kafka::sink_cluster::KafkaSinkClusterConfig;
use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, ShotoverNodeConfig};
use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig;
use shotover::transforms::TransformConfig;
use std::sync::Arc;
Expand Down Expand Up @@ -92,7 +92,12 @@ impl KafkaBench {
connect_timeout_ms: 3000,
read_timeout: None,
first_contact_points: vec![kafka_address],
shotover_nodes: vec![host_address.clone()],
shotover_nodes: vec![ShotoverNodeConfig {
address: host_address.parse().unwrap(),
rack: "rack1".into(),
broker_id: 0,
}],
local_shotover_broker_id: 0,
tls: None,
}),
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
shotover_nodes: ["127.0.0.1:9192"]
shotover_nodes:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
tls:
Expand Down
236 changes: 133 additions & 103 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::transforms::{TransformConfig, TransformContextConfig};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use dashmap::DashMap;
use kafka_protocol::messages::find_coordinator_response::Coordinator;
use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
use kafka_protocol::messages::metadata_response::MetadataResponseBroker;
use kafka_protocol::messages::{
ApiKey, BrokerId, FindCoordinatorRequest, GroupId, HeartbeatRequest, JoinGroupRequest,
MetadataRequest, MetadataResponse, OffsetFetchRequest, RequestHeader, SyncGroupRequest,
Expand All @@ -22,27 +22,58 @@ use rand::rngs::SmallRng;
use rand::seq::{IteratorRandom, SliceRandom};
use rand::SeedableRng;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::hash::Hasher;
use std::net::SocketAddr;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::time::timeout;
use uuid::Uuid;

mod node;

#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct KafkaSinkClusterConfig {
pub first_contact_points: Vec<String>,
pub shotover_nodes: Vec<String>,
pub shotover_nodes: Vec<ShotoverNodeConfig>,
// TODO: This could possibly be deleted if we dont actually need it
pub local_shotover_broker_id: i32,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
pub tls: Option<TlsConnectorConfig>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct ShotoverNodeConfig {
pub address: SocketAddr,
pub rack: String,
pub broker_id: i32,
}

impl ShotoverNodeConfig {
fn build(self) -> ShotoverNode {
let address = KafkaAddress {
host: StrBytes::from_string(self.address.ip().to_string()),
port: self.address.port() as i32,
};
ShotoverNode {
address,
rack: StrBytes::from_string(self.rack),
broker_id: BrokerId(self.broker_id),
}
}
}

#[derive(Clone)]
pub struct ShotoverNode {
pub address: KafkaAddress,
pub rack: StrBytes,
pub broker_id: BrokerId,
}

const NAME: &str = "KafkaSinkCluster";
#[typetag::serde(name = "KafkaSinkCluster")]
#[async_trait(?Send)]
Expand All @@ -66,7 +97,7 @@ impl TransformConfig for KafkaSinkClusterConfig {
pub struct KafkaSinkClusterBuilder {
// contains address and port
first_contact_points: Vec<String>,
shotover_nodes: Vec<KafkaAddress>,
shotover_nodes: Vec<ShotoverNode>,
connect_timeout: Duration,
read_timeout: Option<Duration>,
controller_broker: Arc<AtomicBrokerId>,
Expand All @@ -79,24 +110,19 @@ pub struct KafkaSinkClusterBuilder {
impl KafkaSinkClusterBuilder {
pub fn new(
first_contact_points: Vec<String>,
shotover_nodes: Vec<String>,
shotover_nodes: Vec<ShotoverNodeConfig>,
_chain_name: String,
connect_timeout_ms: u64,
timeout: Option<u64>,
tls: Option<TlsConnector>,
) -> KafkaSinkClusterBuilder {
let receive_timeout = timeout.map(Duration::from_secs);

let shotover_nodes = shotover_nodes
let mut shotover_nodes: Vec<_> = shotover_nodes
.into_iter()
.map(|node| {
let address: SocketAddr = node.parse().unwrap();
KafkaAddress {
host: StrBytes::from_string(address.ip().to_string()),
port: address.port() as i32,
}
})
.map(ShotoverNodeConfig::build)
.collect();
shotover_nodes.sort_by_key(|x| x.broker_id);

KafkaSinkClusterBuilder {
first_contact_points,
Expand Down Expand Up @@ -162,7 +188,7 @@ impl AtomicBrokerId {

pub struct KafkaSinkCluster {
first_contact_points: Vec<String>,
shotover_nodes: Vec<KafkaAddress>,
shotover_nodes: Vec<ShotoverNode>,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
read_timeout: Option<Duration>,
nodes: Vec<KafkaNode>,
Expand Down Expand Up @@ -193,6 +219,7 @@ impl Transform for KafkaSinkCluster {
Ok(KafkaNode::new(
BrokerId(-1),
KafkaAddress::from_str(address)?,
None, //todo
))
})
.collect();
Expand Down Expand Up @@ -522,6 +549,7 @@ impl KafkaSinkCluster {
})) => Ok(KafkaNode::new(
coordinator.node_id,
KafkaAddress::new(coordinator.host.clone(), coordinator.port),
None, // TODO: new variant to represent unknown
)),
other => Err(anyhow!(
"Unexpected message returned to findcoordinator request {other:?}"
Expand Down Expand Up @@ -622,7 +650,7 @@ impl KafkaSinkCluster {
&mut coordinator.port,
)
}
deduplicate_coordinators(&mut find_coordinator.coordinators);
//deduplicate_coordinators(&mut find_coordinator.coordinators);
}
response.invalidate_cache();
}
Expand All @@ -632,25 +660,23 @@ impl KafkaSinkCluster {
})) => {
self.process_metadata(metadata).await;

for (_, broker) in &mut metadata.brokers {
rewrite_address(&self.shotover_nodes, &mut broker.host, &mut broker.port);
}
deduplicate_metadata_brokers(metadata);
self.rewrite_metadata_response(metadata)?;

response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeCluster(describe_cluster),
body: ResponseBody::DescribeCluster(_describe_cluster),
..
})) => {
for broker in &mut describe_cluster.brokers {
rewrite_address(
&self.shotover_nodes,
&mut broker.1.host,
&mut broker.1.port,
)
}
response.invalidate_cache();
// for broker in &mut describe_cluster.brokers {
// rewrite_address(
// &self.shotover_nodes,
// &mut broker.1.host,
// &mut broker.1.port,
// )
// }
// response.invalidate_cache();
todo!("Is this ever sent?")
}
_ => {}
}
Expand Down Expand Up @@ -726,7 +752,11 @@ impl KafkaSinkCluster {

async fn process_metadata(&mut self, metadata: &MetadataResponse) {
for (id, broker) in &metadata.brokers {
let node = KafkaNode::new(*id, KafkaAddress::new(broker.host.clone(), broker.port));
let node = KafkaNode::new(
*id,
KafkaAddress::new(broker.host.clone(), broker.port),
broker.rack.clone(),
);
self.add_node_if_new(node).await;
}

Expand All @@ -748,6 +778,71 @@ impl KafkaSinkCluster {
}
}

/// Rewrite metadata response to appear as if the shotover cluster is the real cluster and the real kafka brokers do not exist
fn rewrite_metadata_response(&self, metadata: &mut MetadataResponse) -> Result<()> {
// Overwrite list of brokers with the list of shotover nodes
metadata.brokers = self
.shotover_nodes
.iter()
.map(|shotover_node| {
(
shotover_node.broker_id,
MetadataResponseBroker::builder()
.host(shotover_node.address.host.clone())
.port(shotover_node.address.port)
.rack(Some(shotover_node.rack.clone()))
.build()
.unwrap(),
)
})
.collect();

// Overwrite the list of partitions to point at all shotover nodes within the same rack
for (_, topic) in &mut metadata.topics {
for partition in &mut topic.partitions {
// Deterministically choose a shotover node in the rack as leader based on topic + partition id
// TODO: rackify
let hash = hash_partition(topic.topic_id.clone(), partition.partition_index);
let shotover_node = self.shotover_nodes[hash % self.shotover_nodes.len()];
partition.leader_id = shotover_node.broker_id;

// take all shotover nodes in the rack
partition.replica_nodes = partition.replica_nodes.iter().map(|x| x).collect();
}
}

if let Some(controller_node) = self
.nodes
.iter()
.find(|node| node.broker_id == metadata.controller_id)
{
// If broker has no rack - use the first shotover node
// If broker has rack - use the first shotover node with the same rack
// This is deterministic because the list of shotover nodes is sorted.
if let Some(shotover_node) = self.shotover_nodes.iter().find(|shotover_node| {
controller_node
.rack
.as_ref()
.map(|rack| rack == &shotover_node.rack)
.unwrap_or(true)
}) {
metadata.controller_id = shotover_node.broker_id;
} else {
tracing::warn!(
"No shotover node configured to handle kafka rack {:?}",
controller_node.rack
);
}
} else {
return Err(anyhow!(
"Invalid metadata, controller points at unknown node {:?}",
metadata.controller_id
));
}

Ok(())
}

async fn add_node_if_new(&mut self, new_node: KafkaNode) {
let new = self
.nodes_shared
Expand Down Expand Up @@ -778,89 +873,24 @@ fn hash_address(host: &str, port: i32) -> u64 {
hasher.finish()
}

fn rewrite_address(shotover_nodes: &[KafkaAddress], host: &mut StrBytes, port: &mut i32) {
fn hash_partition(topic_id: Uuid, partition_index: i32) -> usize {
let mut hasher = xxhash_rust::xxh3::Xxh3::new();
hasher.write(topic_id.as_bytes());
hasher.write(&partition_index.to_be_bytes());
hasher.finish() as usize
}

fn rewrite_address(shotover_nodes: &[ShotoverNode], host: &mut StrBytes, port: &mut i32) {
// do not attempt to rewrite if the port is not provided (-1)
// this is known to occur in an error response
if *port >= 0 {
let shotover_node =
&shotover_nodes[hash_address(host, *port) as usize % shotover_nodes.len()];
&shotover_nodes[hash_address(host, *port) as usize % shotover_nodes.len()].address;
*host = shotover_node.host.clone();
*port = shotover_node.port;
}
}

/// The rdkafka driver has been observed to get stuck when there are multiple brokers with identical host and port.
/// This function deterministically rewrites metadata to avoid such duplication.
fn deduplicate_metadata_brokers(metadata: &mut MetadataResponse) {
struct SeenBroker {
pub id: BrokerId,
pub address: KafkaAddress,
}
let mut seen: Vec<SeenBroker> = vec![];
let mut replacement_broker_id = HashMap::new();

// ensure deterministic results across shotover instances by first sorting the list of brokers by their broker id
metadata.brokers.sort_keys();

// populate replacement_broker_id.
// This is used both to determine which brokers to delete and which broker ids to use as a replacement for deleted brokers.
for (id, broker) in &mut metadata.brokers {
let address = KafkaAddress {
host: broker.host.clone(),
port: broker.port,
};
broker.rack = None;
if let Some(replacement) = seen.iter().find(|x| x.address == address) {
replacement_broker_id.insert(*id, replacement.id);
}
seen.push(SeenBroker { address, id: *id });
}

// remove brokers with duplicate addresses
for (original, _replacement) in replacement_broker_id.iter() {
metadata.brokers.swap_remove(original);
}

// In the previous step some broker id's were removed but we might be referring to those id's elsewhere in the message.
// If there are any such cases fix them by changing the id to refer to the equivalent undeleted broker.
for (_, topic) in &mut metadata.topics {
for partition in &mut topic.partitions {
if let Some(id) = replacement_broker_id.get(&partition.leader_id) {
partition.leader_id = *id;
}
for replica_node in &mut partition.replica_nodes {
if let Some(id) = replacement_broker_id.get(replica_node) {
*replica_node = *id
}
}
}
}
if let Some(id) = replacement_broker_id.get(&metadata.controller_id) {
metadata.controller_id = *id;
}
}

/// We havent observed any failures due to duplicates in findcoordinator messages like we have in metadata messages.
/// But there might be similar issues lurking in other drivers so deduplicating seems reasonable.
fn deduplicate_coordinators(coordinators: &mut Vec<Coordinator>) {
let mut seen = vec![];
let mut to_delete = vec![];
for (i, coordinator) in coordinators.iter().enumerate() {
let address = KafkaAddress {
host: coordinator.host.clone(),
port: coordinator.port,
};
if seen.contains(&address) {
to_delete.push(i)
}
seen.push(address);
}

for to_delete in to_delete.iter().rev() {
coordinators.remove(*to_delete);
}
}

#[derive(Debug)]
struct Topic {
partitions: Vec<Partition>,
Expand Down
Loading

0 comments on commit 84f3564

Please sign in to comment.