From 42ed20d872b5b8370cb19cc2d5165f1e95dc8471 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 13 Jun 2024 14:44:14 +1000 Subject: [PATCH] KafkaSinkCluster: use leader_epoch (#1660) --- .../src/transforms/kafka/sink_cluster/mod.rs | 43 ++++++++++++++++--- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index dae3d7db9..b8e4fd773 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -699,7 +699,7 @@ impl KafkaSinkCluster { .iter() .next() .ok_or_else(|| anyhow!("No topics in produce message"))?; - if let Some(topic) = self.topic_by_name.get(&topic_name.0) { + if let Some(topic) = self.topic_by_name.get(topic_name) { // assume that all partitions in this topic have the same routing requirements let partition = &topic.partitions[topic_data .partition_data @@ -1369,12 +1369,15 @@ routing message to a random node so that: for (topic_name, topic) in &metadata.topics { if ResponseError::try_from_code(topic.error_code).is_none() { - let mut partitions: Vec<_> = topic + // We use the response's partitions list as a base + // since if it has deleted an entry then we also want to delete that entry. + let mut new_partitions: Vec<_> = topic .partitions .iter() .map(|partition| Partition { index: partition.partition_index, leader_id: partition.leader_id, + leader_epoch: partition.leader_epoch, shotover_rack_replica_nodes: partition .replica_nodes .iter() @@ -1389,18 +1392,45 @@ routing message to a random node so that: .collect(), }) .collect(); - partitions.sort_by_key(|x| x.index); + new_partitions.sort_by_key(|x| x.index); + + // If topic_by_name contains any partitions with a more recent leader_epoch use that instead. + // The out of date epoch is probably caused by requesting metadata from a broker that is slightly out of date. + // We use topic_by_name instead of topic_by_id since its always used regardless of protocol version. + if let Some(topic) = self.topic_by_name.get(topic_name) { + for old_partition in &topic.partitions { + if let Some(new_partition) = new_partitions + .iter_mut() + .find(|p| p.index == old_partition.index) + { + if old_partition.leader_epoch > new_partition.leader_epoch { + new_partition.leader_id = old_partition.leader_id; + new_partition + .shotover_rack_replica_nodes + .clone_from(&old_partition.shotover_rack_replica_nodes); + new_partition + .external_rack_replica_nodes + .clone_from(&old_partition.external_rack_replica_nodes); + } + } + } + }; + if !topic_name.is_empty() { self.topic_by_name.insert( topic_name.clone(), Topic { - partitions: partitions.clone(), + partitions: new_partitions.clone(), }, ); } if !topic.topic_id.is_nil() { - self.topic_by_id - .insert(topic.topic_id, Topic { partitions }); + self.topic_by_id.insert( + topic.topic_id, + Topic { + partitions: new_partitions, + }, + ); } } } @@ -1646,6 +1676,7 @@ struct Topic { #[derive(Debug, Clone)] struct Partition { index: i32, + leader_epoch: i32, leader_id: BrokerId, shotover_rack_replica_nodes: Vec, external_rack_replica_nodes: Vec,