Skip to content

Commit

Permalink
KafkaSinkCluster: use leader_epoch (#1660)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jun 13, 2024
1 parent 9f89390 commit 42ed20d
Showing 1 changed file with 37 additions and 6 deletions.
43 changes: 37 additions & 6 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ impl KafkaSinkCluster {
.iter()
.next()
.ok_or_else(|| anyhow!("No topics in produce message"))?;
if let Some(topic) = self.topic_by_name.get(&topic_name.0) {
if let Some(topic) = self.topic_by_name.get(topic_name) {
// assume that all partitions in this topic have the same routing requirements
let partition = &topic.partitions[topic_data
.partition_data
Expand Down Expand Up @@ -1369,12 +1369,15 @@ routing message to a random node so that:

for (topic_name, topic) in &metadata.topics {
if ResponseError::try_from_code(topic.error_code).is_none() {
let mut partitions: Vec<_> = topic
// We use the response's partitions list as a base
// since if it has deleted an entry then we also want to delete that entry.
let mut new_partitions: Vec<_> = topic
.partitions
.iter()
.map(|partition| Partition {
index: partition.partition_index,
leader_id: partition.leader_id,
leader_epoch: partition.leader_epoch,
shotover_rack_replica_nodes: partition
.replica_nodes
.iter()
Expand All @@ -1389,18 +1392,45 @@ routing message to a random node so that:
.collect(),
})
.collect();
partitions.sort_by_key(|x| x.index);
new_partitions.sort_by_key(|x| x.index);

// If topic_by_name contains any partitions with a more recent leader_epoch use that instead.
// The out of date epoch is probably caused by requesting metadata from a broker that is slightly out of date.
// We use topic_by_name instead of topic_by_id since its always used regardless of protocol version.
if let Some(topic) = self.topic_by_name.get(topic_name) {
for old_partition in &topic.partitions {
if let Some(new_partition) = new_partitions
.iter_mut()
.find(|p| p.index == old_partition.index)
{
if old_partition.leader_epoch > new_partition.leader_epoch {
new_partition.leader_id = old_partition.leader_id;
new_partition
.shotover_rack_replica_nodes
.clone_from(&old_partition.shotover_rack_replica_nodes);
new_partition
.external_rack_replica_nodes
.clone_from(&old_partition.external_rack_replica_nodes);
}
}
}
};

if !topic_name.is_empty() {
self.topic_by_name.insert(
topic_name.clone(),
Topic {
partitions: partitions.clone(),
partitions: new_partitions.clone(),
},
);
}
if !topic.topic_id.is_nil() {
self.topic_by_id
.insert(topic.topic_id, Topic { partitions });
self.topic_by_id.insert(
topic.topic_id,
Topic {
partitions: new_partitions,
},
);
}
}
}
Expand Down Expand Up @@ -1646,6 +1676,7 @@ struct Topic {
#[derive(Debug, Clone)]
struct Partition {
index: i32,
leader_epoch: i32,
leader_id: BrokerId,
shotover_rack_replica_nodes: Vec<BrokerId>,
external_rack_replica_nodes: Vec<BrokerId>,
Expand Down

0 comments on commit 42ed20d

Please sign in to comment.