Skip to content

Commit

Permalink
KafkaSinkCluster routing fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 14, 2024
1 parent 0310cbf commit 21143af
Showing 1 changed file with 14 additions and 0 deletions.
14 changes: 14 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,14 @@ impl KafkaSinkCluster {
self.store_topic_names(&mut topic_names, topic.topic.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteRecords(body),
..
})) => {
for topic in &body.topics {
self.store_topic_names(&mut topic_names, topic.name.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
..
Expand Down Expand Up @@ -728,6 +736,12 @@ impl KafkaSinkCluster {
self.store_group(&mut groups, group_id.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetDelete(offset_delete),
..
})) => {
self.store_group(&mut groups, offset_delete.group_id.clone());
}
Some(Frame::Kafka(KafkaFrame::Request {
body:
RequestBody::InitProducerId(InitProducerIdRequest {
Expand Down

0 comments on commit 21143af

Please sign in to comment.