From 21143af66b39a5c4320be3108018fda9744c8abe Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 14 Nov 2024 15:10:33 +1100 Subject: [PATCH] KafkaSinkCluster routing fixes --- shotover/src/transforms/kafka/sink_cluster/mod.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 3a895e0bb..b027af4be 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -697,6 +697,14 @@ impl KafkaSinkCluster { self.store_topic_names(&mut topic_names, topic.topic.clone()); } } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DeleteRecords(body), + .. + })) => { + for topic in &body.topics { + self.store_topic_names(&mut topic_names, topic.name.clone()); + } + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Fetch(fetch), .. @@ -728,6 +736,12 @@ impl KafkaSinkCluster { self.store_group(&mut groups, group_id.clone()); } } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::OffsetDelete(offset_delete), + .. + })) => { + self.store_group(&mut groups, offset_delete.group_id.clone()); + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::InitProducerId(InitProducerIdRequest {