From ec5eee11d808194abcc232d92a4f766f05724d2e Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 29 Aug 2024 13:34:58 +1000 Subject: [PATCH] KafkaSinkCluster - improve metadata response rewriting --- .../src/transforms/kafka/sink_cluster/mod.rs | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index c16fbb561..c373654cb 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1821,6 +1821,36 @@ routing message to a random node so that: } } partition.replica_nodes = shotover_replica_nodes; + // Every isr node has its entire corresponding shotover rack included. + // Since we can set as many isr nodes as we like, we take this all out approach. + // This ensures that: + // * metadata is deterministic and therefore the same on all shotover nodes + // * clients evenly distribute their queries across shotover nodes + let mut shotover_isr_nodes = vec![]; + for replica_node in &partition.isr_nodes { + let rack = self + .nodes + .iter() + .find(|x| x.broker_id == *replica_node) + .map(|x| x.rack.clone()) + .unwrap(); + for shotover_node in &self.shotover_nodes { + // If broker has no rack - use all shotover nodes + // If broker has rack - use all shotover nodes with the same rack + if rack + .as_ref() + .map(|rack| rack == &shotover_node.rack) + .unwrap_or(true) + && !shotover_isr_nodes.contains(&shotover_node.broker_id) + { + shotover_isr_nodes.push(shotover_node.broker_id); + } + } + } + partition.isr_nodes = shotover_isr_nodes; + + // TODO: handle this properly, for now its better to just clear it than do nothing + partition.offline_replicas.clear(); } }