Skip to content

Commit

Permalink
Add routing debug logs (#1750)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Sep 12, 2024
1 parent e72e945 commit 4cfa26c
Showing 1 changed file with 93 additions and 24 deletions.
117 changes: 93 additions & 24 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,10 +423,10 @@ impl KafkaSinkCluster {
// this node also doesnt work, mark as bad and try a new one.
Err(err) => {
if self.nodes.iter().all(|x| !x.is_up()) {
return Err(err.context("Failed to recreate control connection, no more nodes to retry on. Last node gave error"));
return Err(err.context("Failed to recreate control connection, no more brokers to retry on. Last broker gave error"));
} else {
tracing::warn!(
"Failed to recreate control connection against a new node {err:?}"
"Failed to recreate control connection against a new broker {err:?}"
);
// try another node
}
Expand Down Expand Up @@ -660,6 +660,11 @@ impl KafkaSinkCluster {
for group in groups {
match self.find_coordinator_of_group(group.clone()).await {
Ok(node) => {
tracing::debug!(
"Storing group_to_coordinator_broker metadata, group {:?} -> broker {}",
group.0,
node.broker_id.0
);
self.group_to_coordinator_broker
.insert(group, node.broker_id);
self.add_node_if_new(node).await;
Expand Down Expand Up @@ -795,6 +800,7 @@ impl KafkaSinkCluster {
// route to random node
_ => {
let destination = random_broker_id(&self.nodes, &mut self.rng);
tracing::debug!("Routing request to random broker {}", destination.0);
self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::routed(destination, message),
combine_responses: 1,
Expand Down Expand Up @@ -822,6 +828,10 @@ impl KafkaSinkCluster {
ty: PendingRequestTy::routed(destination, message),
combine_responses: 1,
});
tracing::debug!(
"Routing produce request to random broker {} due to being empty",
destination.0
);
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original message as is,
Expand All @@ -839,6 +849,10 @@ impl KafkaSinkCluster {
ty: PendingRequestTy::routed(destination, message),
combine_responses: 1,
});
tracing::debug!(
"Routing produce request to single broker {:?}",
destination.0
);
} else {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
Expand Down Expand Up @@ -868,6 +882,7 @@ impl KafkaSinkCluster {
combine_responses,
});
}
tracing::debug!("Routing produce request to multiple brokers");
}
}

Expand Down Expand Up @@ -897,9 +912,14 @@ impl KafkaSinkCluster {
partition.leader_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition replica for {name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
tracing::warn!("no known partition replica for {name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
"Routing produce request portion of partition {partition_index} in topic {} to broker {}",
name.0,
destination.0
);

// Get the topics already routed to this destination
let routed_topics = result.entry(destination).or_default();
Expand All @@ -919,7 +939,7 @@ impl KafkaSinkCluster {
} else {
tracing::debug!(
r#"no known partition leader for {name:?}
routing request to a random node so that:
routing request to a random broker so that:
* if auto topic creation is enabled, auto topic creation will occur
* if auto topic creation is disabled a NOT_LEADER_OR_FOLLOWER is returned to the client"#
);
Expand Down Expand Up @@ -951,6 +971,7 @@ impl KafkaSinkCluster {
topic_by_name = self.topic_by_name.get(&topic.topic);
topic_meta = topic_by_name.as_deref();
}
let format_topic_name = FormatTopicName(&topic.topic, &topic.topic_id);
if let Some(topic_meta) = topic_meta {
for partition in std::mem::take(&mut topic.partitions) {
let partition_index = partition.partition as usize;
Expand All @@ -963,18 +984,20 @@ impl KafkaSinkCluster {
// Instead, when its available, we can make use of preferred_read_replica field in the fetch response as an optimization.
// However its always correct to route to the partition.leader_id which is what we do here.
if partition.leader_id == -1 {
let topic_name = Self::format_topic_name(&topic);
tracing::warn!(
"leader_id is unknown for topic {topic_name} at partition index {partition_index}"
"leader_id is unknown for {format_topic_name} at partition index {partition_index}"
);
}
partition.leader_id
} else {
let partition_len = topic_meta.partitions.len();
let topic_name = Self::format_topic_name(&topic);
tracing::warn!("no known partition replica for {topic_name} at partition index {partition_index} out of {partition_len} partitions, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
tracing::warn!("no known partition replica for {format_topic_name} at partition index {partition_index} out of {partition_len} partitions, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
"Routing fetch request portion of partition {partition_index} in {format_topic_name} to broker {}",
destination.0
);
let dest_topics = result.entry(destination).or_default();
if let Some(dest_topic) = dest_topics
.iter_mut()
Expand All @@ -988,8 +1011,7 @@ impl KafkaSinkCluster {
}
}
} else {
let topic_name = Self::format_topic_name(&topic);
tracing::warn!("no known partition replica for {topic_name}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
tracing::warn!("no known partition replica for {format_topic_name}, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_topics = result.entry(destination).or_default();
dest_topics.push(topic);
Expand All @@ -999,14 +1021,6 @@ impl KafkaSinkCluster {
result
}

fn format_topic_name(fetch_topic: &FetchTopic) -> String {
if fetch_topic.topic.0.is_empty() {
format!("topic with id {}", fetch_topic.topic_id)
} else {
format!("topic with name {:?}", fetch_topic.topic.0.as_str())
}
}

fn route_fetch_request(&mut self, mut message: Message) -> Result<()> {
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
Expand All @@ -1024,6 +1038,10 @@ impl KafkaSinkCluster {
ty: PendingRequestTy::routed(destination, message),
combine_responses: 1,
});
tracing::debug!(
"Routing fetch request to random broker {} due to being empty",
destination.0
);
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original message as is,
Expand All @@ -1041,6 +1059,7 @@ impl KafkaSinkCluster {
ty: PendingRequestTy::routed(destination, message),
combine_responses: 1,
});
tracing::debug!("Routing fetch request to single broker {}", destination.0);
} else {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
Expand Down Expand Up @@ -1070,6 +1089,7 @@ impl KafkaSinkCluster {
combine_responses,
});
}
tracing::debug!("Routing fetch request to multiple brokers");
}
}

Expand Down Expand Up @@ -1593,7 +1613,7 @@ impl KafkaSinkCluster {
self.topic_by_name.remove(&fetch_response.topic);
self.topic_by_id.remove(&fetch_response.topic_id);
tracing::info!(
"Fetch response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?} {:?}",
"Fetch response included error NOT_LEADER_OR_FOLLOWER and so cleared metadata for topic {:?} {:?}",
fetch_response.topic,
fetch_response.topic_id
);
Expand Down Expand Up @@ -1749,6 +1769,7 @@ impl KafkaSinkCluster {
},
combine_responses: 1,
});
tracing::debug!("Routing request to control connection");
}

fn route_to_controller(&mut self, request: Message) {
Expand All @@ -1761,14 +1782,18 @@ impl KafkaSinkCluster {
{
node.broker_id
} else {
tracing::warn!("no known broker with id {broker_id:?} that is 'up', routing message to a random node so that a NOT_CONTROLLER or similar error is returned to the client");
tracing::warn!("no known broker with id {broker_id:?} that is 'up', routing message to a random broker so that a NOT_CONTROLLER or similar error is returned to the client");
random_broker_id(&self.nodes, &mut self.rng)
};

self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::routed(destination, request),
combine_responses: 1,
});
tracing::debug!(
"Routing request relating to controller to broker {}",
destination.0
);
}

fn route_to_coordinator(&mut self, request: Message, group_id: GroupId) {
Expand All @@ -1778,7 +1803,7 @@ impl KafkaSinkCluster {
let destination = match destination {
Some(destination) => *destination,
None => {
tracing::warn!("no known coordinator for {group_id:?}, routing message to a random node so that a NOT_COORDINATOR or similar error is returned to the client");
tracing::warn!("no known coordinator for {group_id:?}, routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
random_broker_id(&self.nodes, &mut self.rng)
}
};
Expand All @@ -1787,6 +1812,11 @@ impl KafkaSinkCluster {
ty: PendingRequestTy::routed(destination, request),
combine_responses: 1,
});
tracing::debug!(
"Routing request relating to group id {:?} to broker {}",
group_id.0,
destination.0
);
}

async fn process_metadata_response(&mut self, metadata: &MetadataResponse) {
Expand All @@ -1799,6 +1829,10 @@ impl KafkaSinkCluster {
self.add_node_if_new(node).await;
}

tracing::debug!(
"Storing controller metadata, controller is now broker {}",
metadata.controller_id.0
);
self.controller_broker.set(metadata.controller_id);

for (topic_name, topic) in &metadata.topics {
Expand Down Expand Up @@ -1850,15 +1884,38 @@ impl KafkaSinkCluster {
}
};

if !topic_name.is_empty() {
let has_topic_name = !topic_name.is_empty();
let has_topic_id = !topic.topic_id.is_nil();

// Since new_partitions can be quite long we avoid logging it twice to keep the debug logs somewhat readable.
if has_topic_name && has_topic_id {
tracing::debug!(
"Storing topic_by_name and topic_by_id metadata: topic {:?} {} -> {:?}",
topic_name.0,
topic.topic_id,
new_partitions
);
} else if has_topic_name {
tracing::debug!(
"Storing topic_by_name metadata: topic {:?} -> {new_partitions:?}",
topic_name.0
);
} else if has_topic_id {
tracing::debug!(
"Storing topic_by_id metadata: topic {} -> {new_partitions:?}",
topic.topic_id
);
}

if has_topic_name {
self.topic_by_name.insert(
topic_name.clone(),
Topic {
partitions: new_partitions.clone(),
},
);
}
if !topic.topic_id.is_nil() {
if has_topic_id {
self.topic_by_id.insert(
topic.topic_id,
Topic {
Expand Down Expand Up @@ -2101,7 +2158,7 @@ impl KafkaSinkCluster {
}
} else {
return Err(anyhow!(
"Invalid metadata, controller points at unknown node {:?}",
"Invalid metadata, controller points at unknown broker {:?}",
metadata.controller_id
));
}
Expand Down Expand Up @@ -2191,3 +2248,15 @@ fn random_broker_id(nodes: &[KafkaNode], rng: &mut SmallRng) -> BrokerId {
None => nodes.choose(rng).unwrap().broker_id,
}
}

struct FormatTopicName<'a>(&'a TopicName, &'a Uuid);

impl<'a> std::fmt::Display for FormatTopicName<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.0.is_empty() {
write!(f, "topic with id {}", self.1)
} else {
write!(f, "topic with name {:?}", self.0.as_str())
}
}
}

0 comments on commit 4cfa26c

Please sign in to comment.