Skip to content

Commit

Permalink
Add routing debug logs
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 11, 2024
1 parent e72e945 commit 46d90f0
Showing 1 changed file with 53 additions and 3 deletions.
56 changes: 53 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ impl KafkaSinkCluster {
ty: PendingRequestTy::routed(destination, message),
combine_responses: 1,
});
tracing::debug!("Routing produce request to random node due to being empty");
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original message as is,
Expand All @@ -839,6 +840,7 @@ impl KafkaSinkCluster {
ty: PendingRequestTy::routed(destination, message),
combine_responses: 1,
});
tracing::debug!("Routing produce request to single node");
} else {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
Expand Down Expand Up @@ -868,6 +870,7 @@ impl KafkaSinkCluster {
combine_responses,
});
}
tracing::debug!("Routing produce request to multiple nodes");
}
}

Expand Down Expand Up @@ -900,6 +903,11 @@ impl KafkaSinkCluster {
tracing::warn!("no known partition replica for {name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
"Routing produce request portion of partition {partition_index} in topic {} to broker {}",
name.0,
destination.0
);

// Get the topics already routed to this destination
let routed_topics = result.entry(destination).or_default();
Expand Down Expand Up @@ -975,6 +983,12 @@ impl KafkaSinkCluster {
tracing::warn!("no known partition replica for {topic_name} at partition index {partition_index} out of {partition_len} partitions, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
"Routing fetch request portion of partition {partition_index} in topic {} to broker {}",
// TODO: Is this expensive?
Self::format_topic_name(&topic),
destination.0
);
let dest_topics = result.entry(destination).or_default();
if let Some(dest_topic) = dest_topics
.iter_mut()
Expand Down Expand Up @@ -1024,6 +1038,7 @@ impl KafkaSinkCluster {
ty: PendingRequestTy::routed(destination, message),
combine_responses: 1,
});
tracing::debug!("Routing fetch request to random node due to being empty");
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original message as is,
Expand All @@ -1041,6 +1056,7 @@ impl KafkaSinkCluster {
ty: PendingRequestTy::routed(destination, message),
combine_responses: 1,
});
tracing::debug!("Routing fetch request to single node");
} else {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
Expand Down Expand Up @@ -1070,6 +1086,7 @@ impl KafkaSinkCluster {
combine_responses,
});
}
tracing::debug!("Routing fetch request to multiple nodes");
}
}

Expand Down Expand Up @@ -1593,7 +1610,7 @@ impl KafkaSinkCluster {
self.topic_by_name.remove(&fetch_response.topic);
self.topic_by_id.remove(&fetch_response.topic_id);
tracing::info!(
"Fetch response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?} {:?}",
"Fetch response included error NOT_LEADER_OR_FOLLOWER and so cleared metadata for topic {:?} {:?}",
fetch_response.topic,
fetch_response.topic_id
);
Expand Down Expand Up @@ -1749,6 +1766,7 @@ impl KafkaSinkCluster {
},
combine_responses: 1,
});
tracing::debug!("routing request to control connection");
}

fn route_to_controller(&mut self, request: Message) {
Expand All @@ -1769,6 +1787,10 @@ impl KafkaSinkCluster {
ty: PendingRequestTy::routed(destination, request),
combine_responses: 1,
});
tracing::debug!(
"routing request relating to controller to broker {}",
destination.0
);
}

fn route_to_coordinator(&mut self, request: Message, group_id: GroupId) {
Expand All @@ -1787,6 +1809,11 @@ impl KafkaSinkCluster {
ty: PendingRequestTy::routed(destination, request),
combine_responses: 1,
});
tracing::debug!(
"routing request relating to group id {} to broker {}",
group_id.0,
destination.0
);
}

async fn process_metadata_response(&mut self, metadata: &MetadataResponse) {
Expand Down Expand Up @@ -1850,15 +1877,38 @@ impl KafkaSinkCluster {
}
};

if !topic_name.is_empty() {
let has_topic_name = !topic_name.is_empty();
let has_topic_id = !topic.topic_id.is_nil();

// Since new_partitions can be quite long we avoid logging it twice to keep the debug logs somewhat readable.
if has_topic_name && has_topic_id {
tracing::debug!(
"Storing topic_by_name and topic_by_id metadata: topic {} {} -> {:?}",
topic_name.0,
topic.topic_id,
new_partitions
);
} else if has_topic_name {
tracing::debug!(
"Storing topic_by_name metadata: topic {} -> {new_partitions:?}",
topic_name.0
);
} else if has_topic_id {
tracing::debug!(
"Storing topic_by_id metadata: topic {} -> {new_partitions:?}",
topic.topic_id
);
}

if has_topic_name {
self.topic_by_name.insert(
topic_name.clone(),
Topic {
partitions: new_partitions.clone(),
},
);
}
if !topic.topic_id.is_nil() {
if has_topic_id {
self.topic_by_id.insert(
topic.topic_id,
Topic {
Expand Down

0 comments on commit 46d90f0

Please sign in to comment.