diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index b6801414b..118e2edfc 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -965,7 +965,30 @@ impl KafkaSinkCluster { body: RequestBody::CreateTopics(_), .. })) => self.route_to_controller(message), - _ => self.route_to_random_broker(message), + + // route to random broker + Some(Frame::Kafka(KafkaFrame::Request { + body: + RequestBody::Metadata(_) + | RequestBody::DescribeConfigs(_) + | RequestBody::AlterConfigs(_) + | RequestBody::CreatePartitions(_) + | RequestBody::DeleteTopics(_) + | RequestBody::CreateAcls(_), + .. + })) => self.route_to_random_broker(message), + + // error handling + Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => { + let request_type = ApiKey::try_from(header.request_api_key).unwrap(); + tracing::warn!("Routing for request of type {request_type:?} has not been implemented yet."); + self.route_to_random_broker(message) + } + Some(_) => unreachable!("Must be a kafka request"), + None => { + tracing::warn!("Unable to parse request, routing to a random node"); + self.route_to_random_broker(message) + } } } Ok(())