Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaSinkCluster - routing error handling #1659

Merged
merged 3 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,24 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_partitions1(connection_builder, "partitions1").await;
produce_consume_partitions1(connection_builder, "unknown_topic").await;
produce_consume_partitions3(connection_builder).await;

// Only run this test case on the java driver,
// since even without going through shotover the cpp driver fails this test.
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
// delete and recreate topic to force shotover to adjust its existing routing metadata
let admin = connection_builder.connect_admin().await;
admin.delete_topics(&["partitions1"]).await;
admin
.create_topics(&[NewTopic {
name: "partitions1",
num_partitions: 1,
replication_factor: 1,
}])
.await;
produce_consume_partitions1(connection_builder, "partitions1").await;
}

produce_consume_acks0(connection_builder).await;
connection_builder.admin_cleanup().await;
}
Expand Down
149 changes: 148 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,11 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
first_contact_node: None,
control_connection: None,
pending_requests: Default::default(),
// TODO: this approach with `find_coordinator_requests` and `routed_to_coordinator_for_group`
// is prone to memory leaks and logic errors.
// We should replace these fields with extra state within `PendingRequestTy::Received/Sent`.
find_coordinator_requests: Default::default(),
routed_to_coordinator_for_group: Default::default(),
temp_responses_buffer: Default::default(),
sasl_mechanism: None,
authorize_scram_over_mtls: self.authorize_scram_over_mtls.as_ref().map(|x| x.build()),
Expand Down Expand Up @@ -245,6 +249,10 @@ impl AtomicBrokerId {
.store(value.0.into(), std::sync::atomic::Ordering::Relaxed)
}

fn clear(&self) {
self.0.store(i64::MAX, std::sync::atomic::Ordering::Relaxed)
}

/// Returns `None` when set has never been called.
/// Otherwise returns `Some` containing the latest set value.
fn get(&self) -> Option<BrokerId> {
Expand Down Expand Up @@ -274,6 +282,7 @@ pub struct KafkaSinkCluster {
/// Ordering must be maintained to ensure responses match up with their request.
pending_requests: VecDeque<PendingRequest>,
find_coordinator_requests: MessageIdMap<FindCoordinator>,
routed_to_coordinator_for_group: MessageIdMap<GroupId>,
/// A temporary buffer used when receiving responses, only held onto in order to avoid reallocating.
temp_responses_buffer: Vec<Message>,
sasl_mechanism: Option<String>,
Expand Down Expand Up @@ -661,6 +670,7 @@ impl KafkaSinkCluster {
body: RequestBody::DeleteGroups(groups),
..
})) => {
// TODO: we need to split this up into multiple requests so it can be correctly routed to all possible nodes
let group_id = groups.groups_names.first().unwrap().clone();
self.route_to_coordinator(message, group_id);
}
Expand Down Expand Up @@ -1186,7 +1196,6 @@ routing message to a random node so that:
}

async fn process_responses(&mut self, responses: &mut [Message]) -> Result<()> {
// TODO: Handle errors like NOT_COORDINATOR by removing element from self.topics and self.coordinator_broker_id
for response in responses.iter_mut() {
let request_id = response.request_id().unwrap();
match response.frame() {
Expand Down Expand Up @@ -1234,6 +1243,128 @@ routing message to a random node so that:
})) => {
self.process_sasl_authenticate(authenticate).await?;
}
Some(Frame::Kafka(KafkaFrame::Response {
rukai marked this conversation as resolved.
Show resolved Hide resolved
body: ResponseBody::Produce(produce),
..
})) => {
for (topic_name, response_topic) in &produce.responses {
for response_partition in &response_topic.partition_responses {
if let Some(ResponseError::NotLeaderOrFollower) =
ResponseError::try_from_code(response_partition.error_code)
{
if response_partition.current_leader.leader_id != -1 {
// The broker has informed us who the new leader is, we can just directly update the leader
if let Some(mut stored_topic) =
self.topic_by_name.get_mut(topic_name)
{
if let Some(stored_partition) = stored_topic
.partitions
.get_mut(response_partition.index as usize)
{
if response_partition.current_leader.leader_epoch
> stored_partition.leader_epoch
{
stored_partition.leader_id =
response_partition.current_leader.leader_id;
stored_partition.leader_epoch =
response_partition.current_leader.leader_epoch;
}
tracing::info!(
"Produce response included error NOT_LEADER_OR_FOLLOWER and so updated leader in topic {:?} partition {}",
topic_name,
response_partition.index
);
}
}
} else {
// The broker doesnt know who the new leader is, clear the entire topic.
self.topic_by_name.remove(topic_name);
tracing::info!(
"Produce response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?}",
topic_name,
);
break;
}
}
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Fetch(fetch),
..
})) => {
for response in &fetch.responses {
for partition in &response.partitions {
if let Some(ResponseError::NotLeaderOrFollower) =
ResponseError::try_from_code(partition.error_code)
{
// The fetch response includes the leader_id which a client could could use to route a fetch request to,
// but we cant use it to fix our list of replicas, so our only option is to clear the whole thing.
self.topic_by_name.remove(&response.topic);
self.topic_by_id.remove(&response.topic_id);
tracing::info!(
"Fetch response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?} {:?}",
response.topic,
response.topic_id
);
break;
}
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Heartbeat(heartbeat),
..
})) => self.handle_coordinator_routing_error(request_id, heartbeat.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::SyncGroup(sync_group),
..
})) => self.handle_coordinator_routing_error(request_id, sync_group.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetFetch(offset_fetch),
..
})) => self.handle_coordinator_routing_error(request_id, offset_fetch.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::JoinGroup(join_group),
..
})) => self.handle_coordinator_routing_error(request_id, join_group.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::LeaveGroup(leave_group),
..
})) => self.handle_coordinator_routing_error(request_id, leave_group.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DeleteGroups(delete_groups),
..
})) => {
for (group_id, result) in &delete_groups.results {
if let Some(ResponseError::NotCoordinator) =
ResponseError::try_from_code(result.error_code)
{
// Need to run this to avoid memory leaks, since route_to_coordinator is called for DeleteGroup requests
self.routed_to_coordinator_for_group.remove(&request_id);

// Need to run this to ensure we remove for all groups
self.group_to_coordinator_broker.remove(group_id);
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::CreateTopics(create_topics),
..
})) => {
for topic in create_topics.topics.values() {
if let Some(ResponseError::NotController) =
ResponseError::try_from_code(topic.error_code)
{
tracing::info!(
"Response to CreateTopics included error NOT_CONTROLLER and so reset controller broker, previously was {:?}",
self.controller_broker.get()
);
self.controller_broker.clear();
break;
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Metadata(metadata),
..
Expand All @@ -1259,6 +1390,20 @@ routing message to a random node so that:
Ok(())
}

/// This method must be called for every response to a request that was routed via `route_to_coordinator`
fn handle_coordinator_routing_error(&mut self, request_id: u128, error_code: i16) {
if let Some(ResponseError::NotCoordinator) = ResponseError::try_from_code(error_code) {
if let Some(group_id) = self.routed_to_coordinator_for_group.remove(&request_id) {
let broker_id = self.group_to_coordinator_broker.remove(&group_id);
tracing::info!(
"Response was error NOT_COORDINATOR and so cleared group id {:?} coordinator mapping to broker {:?}",
group_id,
broker_id,
);
}
}
}

async fn process_sasl_authenticate(
&mut self,
authenticate: &mut SaslAuthenticateResponse,
Expand Down Expand Up @@ -1338,6 +1483,8 @@ routing message to a random node so that:

fn route_to_coordinator(&mut self, request: Message, group_id: GroupId) {
let destination = self.group_to_coordinator_broker.get(&group_id);
self.routed_to_coordinator_for_group
.insert(request.id(), group_id.clone());
let destination = match destination {
Some(destination) => *destination,
None => {
Expand Down