Skip to content

Commit

Permalink
Merge branch 'main' into store_hmac_as_base64
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored May 21, 2024
2 parents 81dafe7 + 50b6301 commit d7ec227
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,6 @@ impl KafkaSinkCluster {
// This way of constructing topic_meta is kind of crazy, but it works around borrow checker limitations
// Old clients only specify the topic name and some newer clients only specify the topic id.
// So we need to check the id first and then fallback to the name.
let topic_name = &topic.topic;
let topic_by_id = self.topic_by_id.get(&topic.topic_id);
let topic_by_name;
let mut topic_meta = topic_by_id.as_deref();
Expand All @@ -725,7 +724,8 @@ impl KafkaSinkCluster {
.broker_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition replica for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let topic_name = Self::format_topic_name(&topic);
tracing::warn!("no known partition replica for {topic_name} at partition index {partition_index} out of {partition_len} partitions, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
let dest_topics = result.entry(destination).or_default();
Expand All @@ -741,7 +741,8 @@ impl KafkaSinkCluster {
}
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let topic_name = Self::format_topic_name(&topic);
tracing::warn!("no known partition replica for {topic_name}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_topics = result.entry(destination).or_default();
dest_topics.push(topic);
Expand All @@ -751,6 +752,14 @@ impl KafkaSinkCluster {
result
}

fn format_topic_name(fetch_topic: &FetchTopic) -> String {
if fetch_topic.topic.0.is_empty() {
format!("topic with id {}", fetch_topic.topic_id)
} else {
format!("topic with name {:?}", fetch_topic.topic.0.as_str())
}
}

fn route_fetch_request(&mut self, mut message: Message) -> Result<()> {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
Expand Down

0 comments on commit d7ec227

Please sign in to comment.