Skip to content

Commit

Permalink
kafka_int_tests::cluster_1_rack_single_shotover::case_1_cpp fix inter…
Browse files Browse the repository at this point in the history
…mittent failure (#1834)
  • Loading branch information
rukai authored Nov 25, 2024
1 parent 47146b6 commit 27248c2
Showing 1 changed file with 42 additions and 6 deletions.
48 changes: 42 additions & 6 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,8 +888,11 @@ impl KafkaSinkCluster {
if !topic_names.is_empty()
|| !topic_ids.is_empty()
|| self.controller_broker.get().is_none()
|| self.nodes.is_empty()
{
let mut metadata = self.get_metadata_of_topics(topic_names, topic_ids).await?;
let mut metadata = self
.get_metadata_of_topics_with_retry(topic_names, topic_ids)
.await?;
match metadata.frame() {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Metadata(metadata),
Expand Down Expand Up @@ -1979,10 +1982,43 @@ The connection to the client has been closed."
}
}

async fn get_metadata_of_topics(
/// Retry if we get an empty brokers list
/// We dont actually retry on failure since thats not a known failure mode for this request.
async fn get_metadata_of_topics_with_retry(
&mut self,
topic_names: Vec<TopicName>,
topic_ids: Vec<Uuid>,
) -> Result<Message> {
for _ in 0..3 {
let mut response = self
.get_metadata_of_topics(&topic_names, &topic_ids)
.await?;

match response.frame() {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Metadata(metadata),
..
})) => {
if metadata.brokers.is_empty() {
tracing::info!("Metadata response from broker contained an empty list of brokers, this likely indicates the cluster is still starting up, will retry metadata request after a delay.");
tokio::time::sleep(Duration::from_millis(200)).await;
continue;
} else {
// cluster is ready, return the response
return Ok(response);
}
}
response => return Err(anyhow!("Expected metadata response but was {response:?}")),
}
}

Err(anyhow!("Broker returned empty list of brokers"))
}

async fn get_metadata_of_topics(
&mut self,
topic_names: &[TopicName],
topic_ids: &[Uuid],
) -> Result<Message> {
let api_version = if topic_ids.is_empty() { 4 } else { 12 };
let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
Expand All @@ -1993,12 +2029,12 @@ The connection to the client has been closed."
body: RequestBody::Metadata(
MetadataRequest::default().with_topics(Some(
topic_names
.into_iter()
.map(|name| MetadataRequestTopic::default().with_name(Some(name)))
.chain(topic_ids.into_iter().map(|id| {
.iter()
.map(|name| MetadataRequestTopic::default().with_name(Some(name.clone())))
.chain(topic_ids.iter().map(|id| {
MetadataRequestTopic::default()
.with_name(None)
.with_topic_id(id)
.with_topic_id(*id)
}))
.collect(),
)),
Expand Down

0 comments on commit 27248c2

Please sign in to comment.