diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index aab214dfd..fe00dbfb7 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -979,6 +979,22 @@ impl KafkaSinkCluster { })) => { self.route_find_coordinator(request); } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ControlledShutdown(_), + .. + })) => { + // This request type specifies a broker that should shutdown. + // Since the broker ID's known to the client represent shotover nodes, + // the only reasonable interpretation of this request would be for the shotover with that broker id to shutdown. + // But its not appropriate for shotover to shutdown: + // * It cant hand off topics to other nodes like kafka does as part of its controlled shutdown process. + // * It doesnt have any ACL's, any user could send this request regardless of authorization. + // + // So we abort the connection and log an error to signal to the client that the request failed. + // It might be better to instead construct a response containing an error code, + // but its not worth the complexity for such a rare request type. + return Err(anyhow!("Client sent ControlledShutdown request. Shotover cannot handle this request as it is not appropriate for shotover to shutdown. The connection to the client has been closed.")); + } // route to controller broker Some(Frame::Kafka(KafkaFrame::Request {