From 50b630190f43848fd883bf44bb3e7aa09056a202 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 21 May 2024 11:26:10 +1000 Subject: [PATCH] KafkaSinkCluster: Improve unknown topic warnings (#1629) --- shotover/src/transforms/kafka/sink_cluster/mod.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 3835a7d55..73f09a9eb 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -703,7 +703,6 @@ impl KafkaSinkCluster { // This way of constructing topic_meta is kind of crazy, but it works around borrow checker limitations // Old clients only specify the topic name and some newer clients only specify the topic id. // So we need to check the id first and then fallback to the name. - let topic_name = &topic.topic; let topic_by_id = self.topic_by_id.get(&topic.topic_id); let topic_by_name; let mut topic_meta = topic_by_id.as_deref(); @@ -725,7 +724,8 @@ impl KafkaSinkCluster { .broker_id } else { let partition_len = topic_meta.partitions.len(); - tracing::warn!("no known partition replica for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + let topic_name = Self::format_topic_name(&topic); + tracing::warn!("no known partition replica for {topic_name} at partition index {partition_index} out of {partition_len} partitions, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); BrokerId(-1) }; let dest_topics = result.entry(destination).or_default(); @@ -741,7 +741,8 @@ impl KafkaSinkCluster { } } } else { - tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + let topic_name = Self::format_topic_name(&topic); + tracing::warn!("no known partition replica for {topic_name}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); let destination = BrokerId(-1); let dest_topics = result.entry(destination).or_default(); dest_topics.push(topic); @@ -751,6 +752,14 @@ impl KafkaSinkCluster { result } + fn format_topic_name(fetch_topic: &FetchTopic) -> String { + if fetch_topic.topic.0.is_empty() { + format!("topic with id {}", fetch_topic.topic_id) + } else { + format!("topic with name {:?}", fetch_topic.topic.0.as_str()) + } + } + fn route_fetch_request(&mut self, mut message: Message) -> Result<()> { if let Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Fetch(fetch),