Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jun 14, 2024
1 parent ad3669e commit 6322f6e
Showing 1 changed file with 25 additions and 1 deletion.
26 changes: 25 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1269,11 +1269,20 @@ routing message to a random node so that:
stored_partition.leader_epoch =
response_partition.current_leader.leader_epoch;
}
tracing::info!(
"Produce response included error NOT_LEADER_OR_FOLLOWER and so updated leader in topic {:?} partition {}",
topic_name,
response_partition.index
);
}
}
} else {
// The broker doesnt know who the new leader is, clear the entire topic.
self.topic_by_name.remove(topic_name);
tracing::info!(
"Produce response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?}",
topic_name,
);
break;
}
}
Expand All @@ -1293,6 +1302,11 @@ routing message to a random node so that:
// but we cant use it to fix our list of replicas, so our only option is to clear the whole thing.
self.topic_by_name.remove(&response.topic);
self.topic_by_id.remove(&response.topic_id);
tracing::info!(
"Fetch response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?} {:?}",
response.topic,
response.topic_id
);
break;
}
}
Expand Down Expand Up @@ -1342,7 +1356,12 @@ routing message to a random node so that:
if let Some(ResponseError::NotController) =
ResponseError::try_from_code(topic.error_code)
{
tracing::info!(
"Response to CreateTopics included error NOT_CONTROLLER and so reset controller broker, previously was {:?}",
self.controller_broker.get()
);
self.controller_broker.clear();
break;
}
}
}
Expand Down Expand Up @@ -1375,7 +1394,12 @@ routing message to a random node so that:
fn handle_coordinator_routing_error(&mut self, request_id: u128, error_code: i16) {
if let Some(ResponseError::NotCoordinator) = ResponseError::try_from_code(error_code) {
if let Some(group_id) = self.routed_to_coordinator_for_group.remove(&request_id) {
self.group_to_coordinator_broker.remove(&group_id);
let broker_id = self.group_to_coordinator_broker.remove(&group_id);
tracing::info!(
"Response was error NOT_COORDINATOR and so cleared group id {:?} coordinator mapping to broker {:?}",
group_id,
broker_id,
);
}
}
}
Expand Down

0 comments on commit 6322f6e

Please sign in to comment.