diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index caf3f3000..41a2679c8 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -822,6 +822,7 @@ impl KafkaSinkCluster { ty: PendingRequestTy::routed(destination, message), combine_responses: 1, }); + tracing::debug!("Routing produce request to random node due to being empty"); } else if routing.len() == 1 { // Only 1 destination, // so we can just reconstruct the original message as is, @@ -839,6 +840,7 @@ impl KafkaSinkCluster { ty: PendingRequestTy::routed(destination, message), combine_responses: 1, }); + tracing::debug!("Routing produce request to single node"); } else { // The message has been split so it may be delivered to multiple destinations. // We must generate a unique message for each destination. @@ -868,6 +870,7 @@ impl KafkaSinkCluster { combine_responses, }); } + tracing::debug!("Routing produce request to multiple nodes"); } } @@ -900,6 +903,11 @@ impl KafkaSinkCluster { tracing::warn!("no known partition replica for {name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); BrokerId(-1) }; + tracing::debug!( + "Routing produce request portion of partition {partition_index} in topic {} to broker {}", + name.0, + destination.0 + ); // Get the topics already routed to this destination let routed_topics = result.entry(destination).or_default(); @@ -975,6 +983,12 @@ impl KafkaSinkCluster { tracing::warn!("no known partition replica for {topic_name} at partition index {partition_index} out of {partition_len} partitions, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); BrokerId(-1) }; + tracing::debug!( + "Routing fetch request portion of partition {partition_index} in topic {} to broker {}", + // TODO: Is this expensive? + Self::format_topic_name(&topic), + destination.0 + ); let dest_topics = result.entry(destination).or_default(); if let Some(dest_topic) = dest_topics .iter_mut() @@ -1024,6 +1038,7 @@ impl KafkaSinkCluster { ty: PendingRequestTy::routed(destination, message), combine_responses: 1, }); + tracing::debug!("Routing fetch request to random node due to being empty"); } else if routing.len() == 1 { // Only 1 destination, // so we can just reconstruct the original message as is, @@ -1041,6 +1056,7 @@ impl KafkaSinkCluster { ty: PendingRequestTy::routed(destination, message), combine_responses: 1, }); + tracing::debug!("Routing fetch request to single node"); } else { // The message has been split so it may be delivered to multiple destinations. // We must generate a unique message for each destination. @@ -1070,6 +1086,7 @@ impl KafkaSinkCluster { combine_responses, }); } + tracing::debug!("Routing fetch request to multiple nodes"); } } @@ -1593,7 +1610,7 @@ impl KafkaSinkCluster { self.topic_by_name.remove(&fetch_response.topic); self.topic_by_id.remove(&fetch_response.topic_id); tracing::info!( - "Fetch response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?} {:?}", + "Fetch response included error NOT_LEADER_OR_FOLLOWER and so cleared metadata for topic {:?} {:?}", fetch_response.topic, fetch_response.topic_id ); @@ -1749,6 +1766,7 @@ impl KafkaSinkCluster { }, combine_responses: 1, }); + tracing::debug!("routing request to control connection"); } fn route_to_controller(&mut self, request: Message) { @@ -1769,6 +1787,10 @@ impl KafkaSinkCluster { ty: PendingRequestTy::routed(destination, request), combine_responses: 1, }); + tracing::debug!( + "routing request relating to controller to broker {}", + destination.0 + ); } fn route_to_coordinator(&mut self, request: Message, group_id: GroupId) { @@ -1787,6 +1809,11 @@ impl KafkaSinkCluster { ty: PendingRequestTy::routed(destination, request), combine_responses: 1, }); + tracing::debug!( + "routing request relating to group id {} to broker {}", + group_id.0, + destination.0 + ); } async fn process_metadata_response(&mut self, metadata: &MetadataResponse) { @@ -1850,7 +1877,30 @@ impl KafkaSinkCluster { } }; - if !topic_name.is_empty() { + let has_topic_name = !topic_name.is_empty(); + let has_topic_id = !topic.topic_id.is_nil(); + + // Since new_partitions can be quite long we avoid logging it twice to keep the debug logs somewhat readable. + if has_topic_name && has_topic_id { + tracing::debug!( + "Storing topic_by_name and topic_by_id metadata: topic {} {} -> {:?}", + topic_name.0, + topic.topic_id, + new_partitions + ); + } else if has_topic_name { + tracing::debug!( + "Storing topic_by_name metadata: topic {} -> {new_partitions:?}", + topic_name.0 + ); + } else if has_topic_id { + tracing::debug!( + "Storing topic_by_id metadata: topic {} -> {new_partitions:?}", + topic.topic_id + ); + } + + if has_topic_name { self.topic_by_name.insert( topic_name.clone(), Topic { @@ -1858,7 +1908,7 @@ impl KafkaSinkCluster { }, ); } - if !topic.topic_id.is_nil() { + if has_topic_id { self.topic_by_id.insert( topic.topic_id, Topic {