From 65b280b8256102916bfccd76bf3350b00fa22009 Mon Sep 17 00:00:00 2001 From: Conor Date: Tue, 28 May 2024 15:33:07 +1000 Subject: [PATCH] KafkaSinkCluster disable session creation (#1636) --- .../src/transforms/kafka/sink_cluster/mod.rs | 123 ++++++++---------- 1 file changed, 57 insertions(+), 66 deletions(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 33f5c83d8..cd64743b2 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -517,6 +517,9 @@ impl KafkaSinkCluster { self.store_topic_names(&mut topic_names, topic.topic.clone()); self.store_topic_ids(&mut topic_ids, topic.topic_id); } + fetch.session_id = 0; + fetch.session_epoch = -1; + request.invalidate_cache(); } Some(Frame::Kafka(KafkaFrame::Request { body: @@ -781,78 +784,73 @@ impl KafkaSinkCluster { .. })) = message.frame() { - if fetch.session_id == 0 { - let routing = self.split_fetch_request_by_destination(fetch); + let routing = self.split_fetch_request_by_destination(fetch); - if routing.is_empty() { - // Fetch contains no topics, so we can just pick a random destination. - // The message is unchanged so we can just send as is. - let destination = self.nodes.choose(&mut self.rng).unwrap().broker_id; + if routing.is_empty() { + // Fetch contains no topics, so we can just pick a random destination. + // The message is unchanged so we can just send as is. + let destination = self.nodes.choose(&mut self.rng).unwrap().broker_id; - self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::Routed { - destination, - request: message, - }, - combine_responses: 1, - }); - } else if routing.len() == 1 { - // Only 1 destination, - // so we can just reconstruct the original message as is, - // act like this never happened 😎, - // we dont even need to invalidate the message's cache. - let (destination, topics) = routing.into_iter().next().unwrap(); + self.pending_requests.push_back(PendingRequest { + ty: PendingRequestTy::Routed { + destination, + request: message, + }, + combine_responses: 1, + }); + } else if routing.len() == 1 { + // Only 1 destination, + // so we can just reconstruct the original message as is, + // act like this never happened 😎, + // we dont even need to invalidate the message's cache. + let (destination, topics) = routing.into_iter().next().unwrap(); + let destination = if destination == -1 { + self.nodes.choose(&mut self.rng).unwrap().broker_id + } else { + destination + }; + + fetch.topics = topics; + self.pending_requests.push_back(PendingRequest { + ty: PendingRequestTy::Routed { + destination, + request: message, + }, + combine_responses: 1, + }); + } else { + // The message has been split so it may be delivered to multiple destinations. + // We must generate a unique message for each destination. + let combine_responses = routing.len(); + message.invalidate_cache(); + for (i, (destination, topics)) in routing.into_iter().enumerate() { let destination = if destination == -1 { self.nodes.choose(&mut self.rng).unwrap().broker_id } else { destination }; - - fetch.topics = topics; + let mut request = if i == 0 { + // First message acts as base and retains message id + message.clone() + } else { + message.clone_with_new_id() + }; + if let Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::Fetch(fetch), + .. + })) = request.frame() + { + fetch.topics = topics; + } self.pending_requests.push_back(PendingRequest { ty: PendingRequestTy::Routed { destination, - request: message, + request, }, - combine_responses: 1, + combine_responses, }); - } else { - // The message has been split so it may be delivered to multiple destinations. - // We must generate a unique message for each destination. - let combine_responses = routing.len(); - message.invalidate_cache(); - for (i, (destination, topics)) in routing.into_iter().enumerate() { - let destination = if destination == -1 { - self.nodes.choose(&mut self.rng).unwrap().broker_id - } else { - destination - }; - let mut request = if i == 0 { - // First message acts as base and retains message id - message.clone() - } else { - message.clone_with_new_id() - }; - if let Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::Fetch(fetch), - .. - })) = request.frame() - { - fetch.topics = topics; - } - self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::Routed { - destination, - request, - }, - combine_responses, - }); - } } - } else { - // route via session id - unreachable!("Currently requests should not have session_id set since we remove it from responses. In the future we do want to handle session_id though.") - }; + } } Ok(()) @@ -1185,13 +1183,6 @@ impl KafkaSinkCluster { self.rewrite_metadata_response(metadata)?; response.invalidate_cache(); } - Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::Fetch(fetch), - .. - })) => { - fetch.session_id = 0; - response.invalidate_cache(); - } Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DescribeCluster(_), ..