Skip to content

Commit

Permalink
KafkaSinkCluster - improve metadata response rewriting
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Aug 29, 2024
1 parent 8fb1c76 commit ec5eee1
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1821,6 +1821,36 @@ routing message to a random node so that:
}
}
partition.replica_nodes = shotover_replica_nodes;
// Every isr node has its entire corresponding shotover rack included.
// Since we can set as many isr nodes as we like, we take this all out approach.
// This ensures that:
// * metadata is deterministic and therefore the same on all shotover nodes
// * clients evenly distribute their queries across shotover nodes
let mut shotover_isr_nodes = vec![];
for replica_node in &partition.isr_nodes {
let rack = self
.nodes
.iter()
.find(|x| x.broker_id == *replica_node)
.map(|x| x.rack.clone())
.unwrap();
for shotover_node in &self.shotover_nodes {
// If broker has no rack - use all shotover nodes
// If broker has rack - use all shotover nodes with the same rack
if rack
.as_ref()
.map(|rack| rack == &shotover_node.rack)
.unwrap_or(true)
&& !shotover_isr_nodes.contains(&shotover_node.broker_id)
{
shotover_isr_nodes.push(shotover_node.broker_id);
}
}
}
partition.isr_nodes = shotover_isr_nodes;

// TODO: handle this properly, for now its better to just clear it than do nothing
partition.offline_replicas.clear();
}
}

Expand Down

0 comments on commit ec5eee1

Please sign in to comment.