From 1e4b1a7f0f941505ba0f9117e296f840f882290a Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 18 Oct 2024 11:01:58 +1100 Subject: [PATCH] warn when shotover doesnt know how to route a request --- .../src/transforms/kafka/sink_cluster/mod.rs | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index b6801414b..118e2edfc 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -965,7 +965,30 @@ impl KafkaSinkCluster { body: RequestBody::CreateTopics(_), .. })) => self.route_to_controller(message), - _ => self.route_to_random_broker(message), + + // route to random broker + Some(Frame::Kafka(KafkaFrame::Request { + body: + RequestBody::Metadata(_) + | RequestBody::DescribeConfigs(_) + | RequestBody::AlterConfigs(_) + | RequestBody::CreatePartitions(_) + | RequestBody::DeleteTopics(_) + | RequestBody::CreateAcls(_), + .. + })) => self.route_to_random_broker(message), + + // error handling + Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => { + let request_type = ApiKey::try_from(header.request_api_key).unwrap(); + tracing::warn!("Routing for request of type {request_type:?} has not been implemented yet."); + self.route_to_random_broker(message) + } + Some(_) => unreachable!("Must be a kafka request"), + None => { + tracing::warn!("Unable to parse request, routing to a random node"); + self.route_to_random_broker(message) + } } } Ok(())