Skip to content

Commit

Permalink
KafkaSinkCluster disable session creation (#1636)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored May 28, 2024
1 parent 6fb74a2 commit 65b280b
Showing 1 changed file with 57 additions and 66 deletions.
123 changes: 57 additions & 66 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,9 @@ impl KafkaSinkCluster {
self.store_topic_names(&mut topic_names, topic.topic.clone());
self.store_topic_ids(&mut topic_ids, topic.topic_id);
}
fetch.session_id = 0;
fetch.session_epoch = -1;
request.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Request {
body:
Expand Down Expand Up @@ -781,78 +784,73 @@ impl KafkaSinkCluster {
..
})) = message.frame()
{
if fetch.session_id == 0 {
let routing = self.split_fetch_request_by_destination(fetch);
let routing = self.split_fetch_request_by_destination(fetch);

if routing.is_empty() {
// Fetch contains no topics, so we can just pick a random destination.
// The message is unchanged so we can just send as is.
let destination = self.nodes.choose(&mut self.rng).unwrap().broker_id;
if routing.is_empty() {
// Fetch contains no topics, so we can just pick a random destination.
// The message is unchanged so we can just send as is.
let destination = self.nodes.choose(&mut self.rng).unwrap().broker_id;

self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::Routed {
destination,
request: message,
},
combine_responses: 1,
});
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original message as is,
// act like this never happened 😎,
// we dont even need to invalidate the message's cache.
let (destination, topics) = routing.into_iter().next().unwrap();
self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::Routed {
destination,
request: message,
},
combine_responses: 1,
});
} else if routing.len() == 1 {
// Only 1 destination,
// so we can just reconstruct the original message as is,
// act like this never happened 😎,
// we dont even need to invalidate the message's cache.
let (destination, topics) = routing.into_iter().next().unwrap();
let destination = if destination == -1 {
self.nodes.choose(&mut self.rng).unwrap().broker_id
} else {
destination
};

fetch.topics = topics;
self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::Routed {
destination,
request: message,
},
combine_responses: 1,
});
} else {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
let combine_responses = routing.len();
message.invalidate_cache();
for (i, (destination, topics)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
self.nodes.choose(&mut self.rng).unwrap().broker_id
} else {
destination
};

fetch.topics = topics;
let mut request = if i == 0 {
// First message acts as base and retains message id
message.clone()
} else {
message.clone_with_new_id()
};
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
..
})) = request.frame()
{
fetch.topics = topics;
}
self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::Routed {
destination,
request: message,
request,
},
combine_responses: 1,
combine_responses,
});
} else {
// The message has been split so it may be delivered to multiple destinations.
// We must generate a unique message for each destination.
let combine_responses = routing.len();
message.invalidate_cache();
for (i, (destination, topics)) in routing.into_iter().enumerate() {
let destination = if destination == -1 {
self.nodes.choose(&mut self.rng).unwrap().broker_id
} else {
destination
};
let mut request = if i == 0 {
// First message acts as base and retains message id
message.clone()
} else {
message.clone_with_new_id()
};
if let Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
..
})) = request.frame()
{
fetch.topics = topics;
}
self.pending_requests.push_back(PendingRequest {
ty: PendingRequestTy::Routed {
destination,
request,
},
combine_responses,
});
}
}
} else {
// route via session id
unreachable!("Currently requests should not have session_id set since we remove it from responses. In the future we do want to handle session_id though.")
};
}
}

Ok(())
Expand Down Expand Up @@ -1185,13 +1183,6 @@ impl KafkaSinkCluster {
self.rewrite_metadata_response(metadata)?;
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Fetch(fetch),
..
})) => {
fetch.session_id = 0;
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeCluster(_),
..
Expand Down

0 comments on commit 65b280b

Please sign in to comment.