From 1df76ce319d310b8c235637eb14636ad059fed98 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 10 Sep 2024 10:24:21 +1000 Subject: [PATCH 1/2] Rust 1.81 (#1746) --- rust-toolchain.toml | 2 +- .../benches/windsock/cassandra/bench.rs | 6 ++--- .../redis_int_tests/basic_driver_tests.rs | 4 +--- shotover/src/connection.rs | 2 +- shotover/src/frame/value/cassandra.rs | 2 -- shotover/src/lib.rs | 22 ++++++++++++------- shotover/src/runner.rs | 2 +- .../kafka/sink_cluster/connections.rs | 4 ++-- .../src/transforms/kafka/sink_cluster/mod.rs | 2 +- .../kafka/sink_cluster/scram_over_mtls.rs | 1 - shotover/src/transforms/mod.rs | 1 - .../transforms/redis/cluster_ports_rewrite.rs | 2 +- test-helpers/src/connection/java.rs | 2 +- 13 files changed, 26 insertions(+), 26 deletions(-) diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 6eee9cb7e..49d7d2964 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ [toolchain] -channel = "1.80" +channel = "1.81" components = [ "rustfmt", "clippy" ] targets = [ "aarch64-unknown-linux-gnu" ] diff --git a/shotover-proxy/benches/windsock/cassandra/bench.rs b/shotover-proxy/benches/windsock/cassandra/bench.rs index af0d7aed5..7782edfe6 100644 --- a/shotover-proxy/benches/windsock/cassandra/bench.rs +++ b/shotover-proxy/benches/windsock/cassandra/bench.rs @@ -74,9 +74,9 @@ pub enum CassandraDb { } enum CassandraDbInstance { - #[allow(dead_code)] // must be held to delay drop + #[expect(dead_code, reason = "must be held to delay drop")] Compose(DockerCompose), - #[allow(dead_code)] + #[expect(dead_code)] Mocked(MockHandle), } @@ -368,7 +368,7 @@ pub struct CassandraBench { } impl CassandraBench { - #[allow(clippy::too_many_arguments)] + #[expect(clippy::too_many_arguments)] pub fn new( db: CassandraDb, topology: CassandraTopology, diff --git a/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs b/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs index 00d90a031..ca2275183 100644 --- a/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs +++ b/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs @@ -1093,9 +1093,7 @@ fn assert_cluster_ports_rewrite_nodes(res: Value, new_port: u16) { for result in reader.records() { let record = result.unwrap(); - let port = record[1] - .split(|c| c == ':' || c == '@') - .collect::>(); + let port = record[1].split([':', '@']).collect::>(); assert_eq!(port[1].parse::().unwrap(), new_port); assertion_run = true; diff --git a/shotover/src/connection.rs b/shotover/src/connection.rs index 4ab2bde46..71d512739 100644 --- a/shotover/src/connection.rs +++ b/shotover/src/connection.rs @@ -238,7 +238,7 @@ impl RequestPending { } } -#[allow(clippy::too_many_arguments)] +#[expect(clippy::too_many_arguments)] fn spawn_read_write_tasks< C: CodecBuilder + 'static, R: AsyncRead + Unpin + Send + 'static, diff --git a/shotover/src/frame/value/cassandra.rs b/shotover/src/frame/value/cassandra.rs index b5e915f26..70f63f285 100644 --- a/shotover/src/frame/value/cassandra.rs +++ b/shotover/src/frame/value/cassandra.rs @@ -241,7 +241,6 @@ fn serialize_list(cursor: &mut Cursor<&mut Vec>, values: &[GenericValue]) { }); } -#[allow(clippy::mutable_key_type)] fn serialize_set(cursor: &mut Cursor<&mut Vec>, values: &BTreeSet) { serialize_with_length_prefix(cursor, |cursor| { serialize_len(cursor, values.len()); @@ -263,7 +262,6 @@ fn serialize_stringmap(cursor: &mut Cursor<&mut Vec>, values: &BTreeMap>, values: &BTreeMap) { serialize_with_length_prefix(cursor, |cursor| { serialize_len(cursor, values.len()); diff --git a/shotover/src/lib.rs b/shotover/src/lib.rs index dcfca851d..69e78f364 100644 --- a/shotover/src/lib.rs +++ b/shotover/src/lib.rs @@ -25,15 +25,21 @@ //! } //! ``` -// If we absolutely need unsafe code, it should be isolated within a separate small crate that exposes a sound safe API. -// "sound" means that it is impossible for any interaction with the public API of the crate to violate an unsafe invariant which causes UB. -#![deny(unsafe_code)] -// Accidentally printing would break json log output -#![deny(clippy::print_stdout)] -#![deny(clippy::print_stderr)] +#![deny( + unsafe_code, + reason = r#" +If we absolutely need unsafe code, it should be isolated within a separate small crate that exposes a sound safe API. +"sound" means that it is impossible for any interaction with the public API of the crate to violate an unsafe invariant which causes UB."# +)] +#![deny( + clippy::print_stdout, + reason = "Accidentally printing would break json log output" +)] +#![deny( + clippy::print_stderr, + reason = "Accidentally printing would break json log output." +)] // allow some clippy lints that we disagree with -#![allow(clippy::needless_doctest_main)] -#![allow(clippy::box_default)] // Allow dead code if any of the protocol features are disabled #![cfg_attr( any( diff --git a/shotover/src/runner.rs b/shotover/src/runner.rs index 144036ef2..a3da440d1 100644 --- a/shotover/src/runner.rs +++ b/shotover/src/runner.rs @@ -72,7 +72,7 @@ pub struct Shotover { } impl Shotover { - #[allow(clippy::new_without_default)] + #[expect(clippy::new_without_default)] pub fn new() -> Self { if std::env::var("RUST_LIB_BACKTRACE").is_err() { std::env::set_var("RUST_LIB_BACKTRACE", "0"); diff --git a/shotover/src/transforms/kafka/sink_cluster/connections.rs b/shotover/src/transforms/kafka/sink_cluster/connections.rs index 252e73570..679519afa 100644 --- a/shotover/src/transforms/kafka/sink_cluster/connections.rs +++ b/shotover/src/transforms/kafka/sink_cluster/connections.rs @@ -48,7 +48,7 @@ impl Connections { /// If a connection already exists for the requested Destination return it. /// Otherwise create a new connection, cache it and return it. - #[allow(clippy::too_many_arguments)] + #[expect(clippy::too_many_arguments)] pub async fn get_or_open_connection( &mut self, rng: &mut SmallRng, @@ -121,7 +121,7 @@ impl Connections { Ok(self.connections.get_mut(&destination).unwrap()) } - #[allow(clippy::too_many_arguments)] + #[expect(clippy::too_many_arguments)] async fn create_and_insert_connection( &mut self, rng: &mut SmallRng, diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 95e829215..a1a307541 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -167,7 +167,7 @@ struct KafkaSinkClusterBuilder { } impl KafkaSinkClusterBuilder { - #[allow(clippy::too_many_arguments)] + #[expect(clippy::too_many_arguments)] pub fn new( chain_name: String, first_contact_points: Vec, diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs index 4c10d0642..745ea523a 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs @@ -35,7 +35,6 @@ pub struct TokenTask { } impl TokenTask { - #[allow(clippy::new_without_default)] pub fn new( mtls_connection_factory: ConnectionFactory, mtls_port_contact_points: Vec, diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index c9f9ed7cb..60ee70835 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -54,7 +54,6 @@ pub struct TransformContextBuilder { pub client_details: String, } -#[allow(clippy::new_without_default)] impl TransformContextBuilder { pub fn new_test() -> Self { TransformContextBuilder { diff --git a/shotover/src/transforms/redis/cluster_ports_rewrite.rs b/shotover/src/transforms/redis/cluster_ports_rewrite.rs index 8ae022e3b..8cf61ce31 100644 --- a/shotover/src/transforms/redis/cluster_ports_rewrite.rs +++ b/shotover/src/transforms/redis/cluster_ports_rewrite.rs @@ -201,7 +201,7 @@ fn rewrite_port_node(frame: &mut Frame, new_port: u16) -> Result<()> { .next() .ok_or_else(|| anyhow!("CLUSTER NODES response missing address field"))?; - let split = ip.split(|c| c == ':' || c == '@').collect::>(); + let split = ip.split([':', '@']).collect::>(); if split.len() < 3 { bail!("IP address not in valid format: {ip}"); diff --git a/test-helpers/src/connection/java.rs b/test-helpers/src/connection/java.rs index 277ca356e..1c8756236 100644 --- a/test-helpers/src/connection/java.rs +++ b/test-helpers/src/connection/java.rs @@ -239,7 +239,7 @@ impl Jvm { Value { // TODO: Discuss with upstream why this was deprecated, // `Jvm::java_list` is very difficult to use due to Result in input. - #[allow(deprecated)] + #[expect(deprecated)] instance: self.0.create_java_list(element_type, &args).unwrap(), jvm: self.0.clone(), } From 7ce587b504a9f4728f345e0e0c5be44d5a033bf7 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 10 Sep 2024 11:28:03 +1000 Subject: [PATCH 2/2] Update kafka-protocol crate (#1736) --- Cargo.lock | 4 +- shotover/Cargo.toml | 2 +- shotover/src/frame/kafka.rs | 820 +----------------------------------- 3 files changed, 15 insertions(+), 811 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1f1c99bae..5a85d5441 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2657,9 +2657,9 @@ dependencies = [ [[package]] name = "kafka-protocol" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3201cc84e70fc4e24b0ebf4dac3eb2a8d3460ec3f2b29fd197fa8051fc0db41c" +checksum = "1e0e2b28d84cbe49291d50c3db2aa15fa5a9450c2d4915bb149eb2dd9ac40c17" dependencies = [ "anyhow", "bytes", diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 4fe799827..ef278d834 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -118,7 +118,7 @@ aws-config = { version = "1.0.0", optional = true } aws-sdk-kms = { version = "1.1.0", optional = true } chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true } generic-array = { version = "0.14", features = ["serde"], optional = true } -kafka-protocol = { version = "0.11.0", optional = true } +kafka-protocol = { version = "0.12.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] } rustls = { version = "0.23.0", default-features = false, features = ["tls12"] } tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } rustls-pemfile = "2.0.0" diff --git a/shotover/src/frame/kafka.rs b/shotover/src/frame/kafka.rs index 1890288b9..68d373648 100644 --- a/shotover/src/frame/kafka.rs +++ b/shotover/src/frame/kafka.rs @@ -1,55 +1,12 @@ use crate::codec::kafka::RequestHeader as CodecRequestHeader; use anyhow::{anyhow, Context, Result}; use bytes::{BufMut, Bytes, BytesMut}; -use kafka_protocol::messages::{ - AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, AddPartitionsToTxnRequest, - AddPartitionsToTxnResponse, AllocateProducerIdsRequest, AllocateProducerIdsResponse, - AlterClientQuotasRequest, AlterClientQuotasResponse, AlterConfigsRequest, AlterConfigsResponse, - AlterPartitionReassignmentsRequest, AlterPartitionReassignmentsResponse, AlterPartitionRequest, - AlterPartitionResponse, AlterReplicaLogDirsRequest, AlterReplicaLogDirsResponse, - AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, ApiKey, - ApiVersionsRequest, ApiVersionsResponse, AssignReplicasToDirsRequest, - AssignReplicasToDirsResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse, - BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, - BrokerRegistrationResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, - ControlledShutdownRequest, ControlledShutdownResponse, ControllerRegistrationRequest, - ControllerRegistrationResponse, CreateAclsRequest, CreateAclsResponse, - CreateDelegationTokenRequest, CreateDelegationTokenResponse, CreatePartitionsRequest, - CreatePartitionsResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteAclsRequest, - DeleteAclsResponse, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, - DeleteRecordsResponse, DeleteTopicsRequest, DeleteTopicsResponse, DescribeAclsRequest, - DescribeAclsResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse, - DescribeClusterRequest, DescribeClusterResponse, DescribeConfigsRequest, - DescribeConfigsResponse, DescribeDelegationTokenRequest, DescribeDelegationTokenResponse, - DescribeGroupsRequest, DescribeGroupsResponse, DescribeLogDirsRequest, DescribeLogDirsResponse, - DescribeProducersRequest, DescribeProducersResponse, DescribeQuorumRequest, - DescribeQuorumResponse, DescribeTransactionsRequest, DescribeTransactionsResponse, - DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse, ElectLeadersRequest, - ElectLeadersResponse, EndQuorumEpochRequest, EndQuorumEpochResponse, EndTxnRequest, - EndTxnResponse, EnvelopeRequest, EnvelopeResponse, ExpireDelegationTokenRequest, - ExpireDelegationTokenResponse, FetchRequest, FetchResponse, FetchSnapshotRequest, - FetchSnapshotResponse, FindCoordinatorRequest, FindCoordinatorResponse, - GetTelemetrySubscriptionsRequest, GetTelemetrySubscriptionsResponse, HeartbeatRequest, - HeartbeatResponse, IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, - InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, - LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, - ListClientMetricsResourcesRequest, ListClientMetricsResourcesResponse, ListGroupsRequest, - ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListPartitionReassignmentsRequest, - ListPartitionReassignmentsResponse, ListTransactionsRequest, ListTransactionsResponse, - MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, - OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse, - OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, - PushTelemetryRequest, PushTelemetryResponse, RenewDelegationTokenRequest, - RenewDelegationTokenResponse, RequestHeader, ResponseHeader, SaslAuthenticateRequest, - SaslAuthenticateResponse, SaslHandshakeRequest, SaslHandshakeResponse, StopReplicaRequest, - StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest, - TxnOffsetCommitResponse, UnregisterBrokerRequest, UnregisterBrokerResponse, - UpdateFeaturesRequest, UpdateFeaturesResponse, UpdateMetadataRequest, UpdateMetadataResponse, - VoteRequest, VoteResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse, -}; -use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion}; +use kafka_protocol::messages::{ApiKey, RequestHeader, ResponseHeader}; +use kafka_protocol::protocol::{Decodable, Encodable}; use std::fmt::{Display, Formatter, Result as FmtResult}; +pub use kafka_protocol::messages::RequestKind as RequestBody; +pub use kafka_protocol::messages::ResponseKind as ResponseBody; pub use kafka_protocol::protocol::StrBytes; #[derive(Debug, PartialEq, Clone)] @@ -110,292 +67,6 @@ impl Display for KafkaFrame { } } -#[derive(Debug, PartialEq, Clone)] -pub enum RequestBody { - Produce(ProduceRequest), - Fetch(FetchRequest), - OffsetFetch(OffsetFetchRequest), - ListOffsets(ListOffsetsRequest), - JoinGroup(JoinGroupRequest), - SyncGroup(SyncGroupRequest), - Metadata(MetadataRequest), - FindCoordinator(FindCoordinatorRequest), - LeaderAndIsr(LeaderAndIsrRequest), - Heartbeat(HeartbeatRequest), - CreateTopics(CreateTopicsRequest), - DeleteTopics(DeleteTopicsRequest), - DeleteGroups(DeleteGroupsRequest), - DescribeConfigs(DescribeConfigsRequest), - SaslAuthenticate(SaslAuthenticateRequest), - SaslHandshake(SaslHandshakeRequest), - StopReplica(StopReplicaRequest), - UpdateMetadata(UpdateMetadataRequest), - ControlledShutdown(ControlledShutdownRequest), - OffsetCommit(OffsetCommitRequest), - LeaveGroup(LeaveGroupRequest), - DescribeGroups(DescribeGroupsRequest), - ListGroups(ListGroupsRequest), - ApiVersions(ApiVersionsRequest), - DeleteRecords(DeleteRecordsRequest), - InitProducerId(InitProducerIdRequest), - OffsetForLeaderEpoch(OffsetForLeaderEpochRequest), - AddPartitionsToTxn(AddPartitionsToTxnRequest), - AddOffsetsToTxn(AddOffsetsToTxnRequest), - EndTxn(EndTxnRequest), - WriteTxnMarkers(WriteTxnMarkersRequest), - TxnOffsetCommit(TxnOffsetCommitRequest), - DescribeAcls(DescribeAclsRequest), - CreateAcls(CreateAclsRequest), - DeleteAcls(DeleteAclsRequest), - AlterConfigs(AlterConfigsRequest), - AlterReplicaLogDirs(AlterReplicaLogDirsRequest), - DescribeLogDirs(DescribeLogDirsRequest), - CreatePartitions(CreatePartitionsRequest), - CreateDelegationToken(CreateDelegationTokenRequest), - RenewDelegationToken(RenewDelegationTokenRequest), - ExpireDelegationToken(ExpireDelegationTokenRequest), - DescribeDelegationToken(DescribeDelegationTokenRequest), - ElectLeaders(ElectLeadersRequest), - IncrementalAlterConfigs(IncrementalAlterConfigsRequest), - AlterPartitionReassignments(AlterPartitionReassignmentsRequest), - ListPartitionReassignments(ListPartitionReassignmentsRequest), - DescribeCluster(DescribeClusterRequest), - Envelope(EnvelopeRequest), - FetchSnapshot(FetchSnapshotRequest), - ListTransactions(ListTransactionsRequest), - DescribeTransactions(DescribeTransactionsRequest), - AllocateProducerIds(AllocateProducerIdsRequest), - UnregisterBroker(UnregisterBrokerRequest), - BrokerHeartbeat(BrokerHeartbeatRequest), - BrokerRegistration(BrokerRegistrationRequest), - DescribeProducers(DescribeProducersRequest), - UpdateFeatures(UpdateFeaturesRequest), - AlterPartition(AlterPartitionRequest), - EndQuorumEpoch(EndQuorumEpochRequest), - BeginQuorumEpoch(BeginQuorumEpochRequest), - Vote(VoteRequest), - DescribeUserScramCredentials(DescribeUserScramCredentialsRequest), - AlterScramCredentials(AlterUserScramCredentialsRequest), - DescribeClientQuotas(DescribeClientQuotasRequest), - OffsetDelete(OffsetDeleteRequest), - AlterClientQuotas(AlterClientQuotasRequest), - DescribeQuorum(DescribeQuorumRequest), - AlterUserScramCredentials(AlterUserScramCredentialsRequest), - ConsumerGroupHeartbeat(ConsumerGroupHeartbeatRequest), - ControllerRegistration(ControllerRegistrationRequest), - GetTelemetrySubscriptions(GetTelemetrySubscriptionsRequest), - PushTelemetry(PushTelemetryRequest), - AssignReplicasToDirs(AssignReplicasToDirsRequest), - ListClientMetricsResources(ListClientMetricsResourcesRequest), -} - -#[derive(Debug, PartialEq, Clone)] -pub enum ResponseBody { - Produce(ProduceResponse), - FindCoordinator(FindCoordinatorResponse), - Fetch(FetchResponse), - OffsetFetch(OffsetFetchResponse), - ListOffsets(ListOffsetsResponse), - JoinGroup(JoinGroupResponse), - SyncGroup(SyncGroupResponse), - Metadata(MetadataResponse), - DescribeCluster(DescribeClusterResponse), - Heartbeat(HeartbeatResponse), - SaslAuthenticate(SaslAuthenticateResponse), - SaslHandshake(SaslHandshakeResponse), - StopReplica(StopReplicaResponse), - UpdateMetadata(UpdateMetadataResponse), - ControlledShutdown(ControlledShutdownResponse), - LeaderAndIsr(LeaderAndIsrResponse), - OffsetCommit(OffsetCommitResponse), - LeaveGroup(LeaveGroupResponse), - DescribeGroups(DescribeGroupsResponse), - ListGroups(ListGroupsResponse), - ApiVersions(ApiVersionsResponse), - CreateTopics(CreateTopicsResponse), - DeleteTopics(DeleteTopicsResponse), - DeleteRecords(DeleteRecordsResponse), - InitProducerId(InitProducerIdResponse), - OffsetForLeaderEpoch(OffsetForLeaderEpochResponse), - AddPartitionsToTxn(AddPartitionsToTxnResponse), - AddOffsetsToTxn(AddOffsetsToTxnResponse), - EndTxn(EndTxnResponse), - WriteTxnMarkers(WriteTxnMarkersResponse), - TxnOffsetCommit(TxnOffsetCommitResponse), - DescribeAcls(DescribeAclsResponse), - CreateAcls(CreateAclsResponse), - DeleteAcls(DeleteAclsResponse), - AlterConfigs(AlterConfigsResponse), - AlterReplicaLogDirs(AlterReplicaLogDirsResponse), - DescribeLogDirs(DescribeLogDirsResponse), - CreatePartitions(CreatePartitionsResponse), - CreateDelegationToken(CreateDelegationTokenResponse), - RenewDelegationToken(RenewDelegationTokenResponse), - ExpireDelegationToken(ExpireDelegationTokenResponse), - DescribeDelegationToken(DescribeDelegationTokenResponse), - ElectLeaders(ElectLeadersResponse), - IncrementalAlterConfigs(IncrementalAlterConfigsResponse), - AlterPartitionReassignments(AlterPartitionReassignmentsResponse), - ListPartitionReassignments(ListPartitionReassignmentsResponse), - BrokerRegistration(BrokerRegistrationResponse), - AllocateProducerIds(AllocateProducerIdsResponse), - ListTransactions(ListTransactionsResponse), - DescribeTransactions(DescribeTransactionsResponse), - UnregisterBroker(UnregisterBrokerResponse), - BrokerHeartbeat(BrokerHeartbeatResponse), - DescribeProducers(DescribeProducersResponse), - FetchSnapshot(FetchSnapshotResponse), - UpdateFeatures(UpdateFeaturesResponse), - AlterPartition(AlterPartitionResponse), - EndQuorumEpoch(EndQuorumEpochResponse), - Envelope(EnvelopeResponse), - Vote(VoteResponse), - DescribeUserScramCredentials(DescribeUserScramCredentialsResponse), - DescribeQuorum(DescribeQuorumResponse), - BeginQuorumEpoch(BeginQuorumEpochResponse), - AlterUserScramCredentials(AlterUserScramCredentialsResponse), - DescribeClientQuotas(DescribeClientQuotasResponse), - AlterClientQuotas(AlterClientQuotasResponse), - DescribeConfigs(DescribeConfigsResponse), - OffsetDelete(OffsetDeleteResponse), - DeleteGroups(DeleteGroupsResponse), - ConsumerGroupHeartbeat(ConsumerGroupHeartbeatResponse), - ControllerRegistration(ControllerRegistrationResponse), - GetTelemetrySubscriptions(GetTelemetrySubscriptionsResponse), - PushTelemetry(PushTelemetryResponse), - AssignReplicasToDirs(AssignReplicasToDirsResponse), - ListClientMetricsResources(ListClientMetricsResourcesResponse), -} - -impl ResponseBody { - fn header_version(&self, version: i16) -> i16 { - match self { - ResponseBody::Produce(_) => ProduceResponse::header_version(version), - ResponseBody::FindCoordinator(_) => FindCoordinatorResponse::header_version(version), - ResponseBody::Fetch(_) => FetchResponse::header_version(version), - ResponseBody::OffsetFetch(_) => OffsetFetchResponse::header_version(version), - ResponseBody::ListOffsets(_) => ListOffsetsResponse::header_version(version), - ResponseBody::JoinGroup(_) => JoinGroupResponse::header_version(version), - ResponseBody::SyncGroup(_) => SyncGroupResponse::header_version(version), - ResponseBody::Metadata(_) => MetadataResponse::header_version(version), - ResponseBody::DescribeCluster(_) => DescribeClusterResponse::header_version(version), - ResponseBody::Heartbeat(_) => HeartbeatResponse::header_version(version), - ResponseBody::SaslAuthenticate(_) => SaslAuthenticateResponse::header_version(version), - ResponseBody::SaslHandshake(_) => SaslHandshakeResponse::header_version(version), - ResponseBody::StopReplica(_) => StopReplicaResponse::header_version(version), - ResponseBody::UpdateMetadata(_) => UpdateMetadataResponse::header_version(version), - ResponseBody::ControlledShutdown(_) => { - ControlledShutdownResponse::header_version(version) - } - ResponseBody::LeaderAndIsr(_) => LeaderAndIsrResponse::header_version(version), - ResponseBody::OffsetCommit(_) => OffsetCommitResponse::header_version(version), - ResponseBody::LeaveGroup(_) => LeaveGroupResponse::header_version(version), - ResponseBody::DescribeGroups(_) => DescribeGroupsResponse::header_version(version), - ResponseBody::ListGroups(_) => ListGroupsResponse::header_version(version), - ResponseBody::ApiVersions(_) => ApiVersionsResponse::header_version(version), - ResponseBody::CreateTopics(_) => CreateTopicsResponse::header_version(version), - ResponseBody::DeleteTopics(_) => DeleteTopicsResponse::header_version(version), - ResponseBody::DeleteRecords(_) => DeleteRecordsResponse::header_version(version), - ResponseBody::InitProducerId(_) => InitProducerIdResponse::header_version(version), - ResponseBody::OffsetForLeaderEpoch(_) => { - OffsetForLeaderEpochResponse::header_version(version) - } - ResponseBody::AddPartitionsToTxn(_) => { - AddPartitionsToTxnResponse::header_version(version) - } - ResponseBody::AddOffsetsToTxn(_) => AddOffsetsToTxnResponse::header_version(version), - ResponseBody::EndTxn(_) => EndTxnResponse::header_version(version), - ResponseBody::WriteTxnMarkers(_) => WriteTxnMarkersResponse::header_version(version), - ResponseBody::DescribeAcls(_) => DescribeAclsResponse::header_version(version), - ResponseBody::CreateAcls(_) => CreateAclsResponse::header_version(version), - ResponseBody::DeleteAcls(_) => DeleteAclsResponse::header_version(version), - ResponseBody::TxnOffsetCommit(_) => TxnOffsetCommitResponse::header_version(version), - ResponseBody::AlterConfigs(_) => AlterConfigsResponse::header_version(version), - ResponseBody::AlterReplicaLogDirs(_) => { - AlterReplicaLogDirsResponse::header_version(version) - } - ResponseBody::DescribeLogDirs(_) => DescribeLogDirsResponse::header_version(version), - ResponseBody::CreatePartitions(_) => CreatePartitionsResponse::header_version(version), - ResponseBody::CreateDelegationToken(_) => { - CreateDelegationTokenResponse::header_version(version) - } - ResponseBody::RenewDelegationToken(_) => { - RenewDelegationTokenResponse::header_version(version) - } - ResponseBody::ExpireDelegationToken(_) => { - ExpireDelegationTokenResponse::header_version(version) - } - ResponseBody::DescribeDelegationToken(_) => { - DescribeDelegationTokenResponse::header_version(version) - } - ResponseBody::ElectLeaders(_) => ElectLeadersResponse::header_version(version), - ResponseBody::IncrementalAlterConfigs(_) => { - IncrementalAlterConfigsResponse::header_version(version) - } - ResponseBody::AlterPartitionReassignments(_) => { - AlterPartitionReassignmentsResponse::header_version(version) - } - ResponseBody::ListPartitionReassignments(_) => { - ListPartitionReassignmentsResponse::header_version(version) - } - ResponseBody::BrokerRegistration(_) => { - BrokerRegistrationResponse::header_version(version) - } - ResponseBody::AllocateProducerIds(_) => { - AllocateProducerIdsResponse::header_version(version) - } - ResponseBody::ListTransactions(_) => ListTransactionsResponse::header_version(version), - ResponseBody::DescribeTransactions(_) => { - DescribeTransactionsResponse::header_version(version) - } - ResponseBody::UnregisterBroker(_) => UnregisterBrokerResponse::header_version(version), - ResponseBody::BrokerHeartbeat(_) => BrokerHeartbeatResponse::header_version(version), - ResponseBody::DescribeProducers(_) => { - DescribeProducersResponse::header_version(version) - } - ResponseBody::FetchSnapshot(_) => FetchSnapshotResponse::header_version(version), - ResponseBody::UpdateFeatures(_) => UpdateFeaturesResponse::header_version(version), - ResponseBody::AlterPartition(_) => AlterPartitionResponse::header_version(version), - ResponseBody::EndQuorumEpoch(_) => EndQuorumEpochResponse::header_version(version), - ResponseBody::Envelope(_) => EnvelopeResponse::header_version(version), - ResponseBody::Vote(_) => VoteResponse::header_version(version), - ResponseBody::DescribeUserScramCredentials(_) => { - DescribeUserScramCredentialsResponse::header_version(version) - } - ResponseBody::DescribeQuorum(_) => DescribeQuorumResponse::header_version(version), - ResponseBody::BeginQuorumEpoch(_) => BeginQuorumEpochResponse::header_version(version), - ResponseBody::AlterUserScramCredentials(_) => { - AlterUserScramCredentialsResponse::header_version(version) - } - ResponseBody::DescribeClientQuotas(_) => { - DescribeClientQuotasResponse::header_version(version) - } - ResponseBody::AlterClientQuotas(_) => { - AlterClientQuotasResponse::header_version(version) - } - ResponseBody::DescribeConfigs(_) => DescribeConfigsResponse::header_version(version), - ResponseBody::OffsetDelete(_) => OffsetDeleteResponse::header_version(version), - ResponseBody::DeleteGroups(_) => DeleteGroupsResponse::header_version(version), - ResponseBody::ConsumerGroupHeartbeat(_) => { - ConsumerGroupHeartbeatResponse::header_version(version) - } - ResponseBody::ControllerRegistration(_) => { - ControllerRegistrationResponse::header_version(version) - } - ResponseBody::GetTelemetrySubscriptions(_) => { - GetTelemetrySubscriptionsResponse::header_version(version) - } - ResponseBody::PushTelemetry(_) => PushTelemetryResponse::header_version(version), - ResponseBody::AssignReplicasToDirs(_) => { - AssignReplicasToDirsResponse::header_version(version) - } - ResponseBody::ListClientMetricsResources(_) => { - ListClientMetricsResourcesResponse::header_version(version) - } - } - } -} - impl KafkaFrame { pub fn from_bytes( mut bytes: Bytes, @@ -413,6 +84,10 @@ impl KafkaFrame { fn parse_request(mut bytes: Bytes) -> Result { let api_key = i16::from_be_bytes(bytes[0..2].try_into().unwrap()); let api_version = i16::from_be_bytes(bytes[2..4].try_into().unwrap()); + + // We cannot parse unknown API keys into a RequestBody::Unknown(_) style variant since its impossible to + // parse the request header without calling request_header_version. + // Just an unfortunate limitation of the protocol. let api_key = ApiKey::try_from(api_key).map_err(|_| anyhow!("unknown api key {api_key}"))?; @@ -421,157 +96,7 @@ impl KafkaFrame { .context("Failed to decode request header")?; let version = header.request_api_version; - let body = match api_key { - ApiKey::ProduceKey => RequestBody::Produce(decode(&mut bytes, version)?), - ApiKey::FetchKey => RequestBody::Fetch(decode(&mut bytes, version)?), - ApiKey::OffsetFetchKey => RequestBody::OffsetFetch(decode(&mut bytes, version)?), - ApiKey::ListOffsetsKey => RequestBody::ListOffsets(decode(&mut bytes, version)?), - ApiKey::JoinGroupKey => RequestBody::JoinGroup(decode(&mut bytes, version)?), - ApiKey::SyncGroupKey => RequestBody::SyncGroup(decode(&mut bytes, version)?), - ApiKey::MetadataKey => RequestBody::Metadata(decode(&mut bytes, version)?), - ApiKey::FindCoordinatorKey => { - RequestBody::FindCoordinator(decode(&mut bytes, version)?) - } - ApiKey::LeaderAndIsrKey => RequestBody::LeaderAndIsr(decode(&mut bytes, version)?), - ApiKey::HeartbeatKey => RequestBody::Heartbeat(decode(&mut bytes, version)?), - ApiKey::CreateTopicsKey => RequestBody::CreateTopics(decode(&mut bytes, version)?), - ApiKey::DeleteTopicsKey => RequestBody::DeleteTopics(decode(&mut bytes, version)?), - ApiKey::DeleteGroupsKey => RequestBody::DeleteGroups(decode(&mut bytes, version)?), - ApiKey::DescribeConfigsKey => { - RequestBody::DescribeConfigs(decode(&mut bytes, version)?) - } - ApiKey::SaslAuthenticateKey => { - RequestBody::SaslAuthenticate(decode(&mut bytes, version)?) - } - ApiKey::SaslHandshakeKey => RequestBody::SaslHandshake(decode(&mut bytes, version)?), - ApiKey::StopReplicaKey => RequestBody::StopReplica(decode(&mut bytes, version)?), - ApiKey::UpdateMetadataKey => RequestBody::UpdateMetadata(decode(&mut bytes, version)?), - ApiKey::ControlledShutdownKey => { - RequestBody::ControlledShutdown(decode(&mut bytes, version)?) - } - ApiKey::OffsetCommitKey => RequestBody::OffsetCommit(decode(&mut bytes, version)?), - ApiKey::LeaveGroupKey => RequestBody::LeaveGroup(decode(&mut bytes, version)?), - ApiKey::DescribeGroupsKey => RequestBody::DescribeGroups(decode(&mut bytes, version)?), - ApiKey::ListGroupsKey => RequestBody::ListGroups(decode(&mut bytes, version)?), - ApiKey::ApiVersionsKey => RequestBody::ApiVersions(decode(&mut bytes, version)?), - ApiKey::DeleteRecordsKey => RequestBody::DeleteRecords(decode(&mut bytes, version)?), - ApiKey::InitProducerIdKey => RequestBody::InitProducerId(decode(&mut bytes, version)?), - ApiKey::OffsetForLeaderEpochKey => { - RequestBody::OffsetForLeaderEpoch(decode(&mut bytes, version)?) - } - ApiKey::AddPartitionsToTxnKey => { - RequestBody::AddPartitionsToTxn(decode(&mut bytes, version)?) - } - ApiKey::AddOffsetsToTxnKey => { - RequestBody::AddOffsetsToTxn(decode(&mut bytes, version)?) - } - ApiKey::EndTxnKey => RequestBody::EndTxn(decode(&mut bytes, version)?), - ApiKey::WriteTxnMarkersKey => { - RequestBody::WriteTxnMarkers(decode(&mut bytes, version)?) - } - - ApiKey::TxnOffsetCommitKey => { - RequestBody::TxnOffsetCommit(decode(&mut bytes, version)?) - } - ApiKey::DescribeAclsKey => RequestBody::DescribeAcls(decode(&mut bytes, version)?), - ApiKey::CreateAclsKey => RequestBody::CreateAcls(decode(&mut bytes, version)?), - ApiKey::DeleteAclsKey => RequestBody::DeleteAcls(decode(&mut bytes, version)?), - ApiKey::AlterConfigsKey => RequestBody::AlterConfigs(decode(&mut bytes, version)?), - ApiKey::AlterReplicaLogDirsKey => { - RequestBody::AlterReplicaLogDirs(decode(&mut bytes, version)?) - } - ApiKey::DescribeLogDirsKey => { - RequestBody::DescribeLogDirs(decode(&mut bytes, version)?) - } - ApiKey::CreatePartitionsKey => { - RequestBody::CreatePartitions(decode(&mut bytes, version)?) - } - ApiKey::CreateDelegationTokenKey => { - RequestBody::CreateDelegationToken(decode(&mut bytes, version)?) - } - ApiKey::RenewDelegationTokenKey => { - RequestBody::RenewDelegationToken(decode(&mut bytes, version)?) - } - ApiKey::ExpireDelegationTokenKey => { - RequestBody::ExpireDelegationToken(decode(&mut bytes, version)?) - } - ApiKey::DescribeDelegationTokenKey => { - RequestBody::DescribeDelegationToken(decode(&mut bytes, version)?) - } - ApiKey::ElectLeadersKey => RequestBody::ElectLeaders(decode(&mut bytes, version)?), - ApiKey::IncrementalAlterConfigsKey => { - RequestBody::IncrementalAlterConfigs(decode(&mut bytes, version)?) - } - ApiKey::AlterPartitionReassignmentsKey => { - RequestBody::AlterPartitionReassignments(decode(&mut bytes, version)?) - } - ApiKey::ListPartitionReassignmentsKey => { - RequestBody::ListPartitionReassignments(decode(&mut bytes, version)?) - } - ApiKey::OffsetDeleteKey => RequestBody::OffsetDelete(decode(&mut bytes, version)?), - ApiKey::DescribeClientQuotasKey => { - RequestBody::DescribeClientQuotas(decode(&mut bytes, version)?) - } - ApiKey::AlterClientQuotasKey => { - RequestBody::AlterClientQuotas(decode(&mut bytes, version)?) - } - ApiKey::DescribeUserScramCredentialsKey => { - RequestBody::DescribeUserScramCredentials(decode(&mut bytes, version)?) - } - ApiKey::AlterUserScramCredentialsKey => { - RequestBody::AlterUserScramCredentials(decode(&mut bytes, version)?) - } - ApiKey::VoteKey => RequestBody::Vote(decode(&mut bytes, version)?), - ApiKey::BeginQuorumEpochKey => { - RequestBody::BeginQuorumEpoch(decode(&mut bytes, version)?) - } - ApiKey::EndQuorumEpochKey => RequestBody::EndQuorumEpoch(decode(&mut bytes, version)?), - ApiKey::DescribeQuorumKey => RequestBody::DescribeQuorum(decode(&mut bytes, version)?), - ApiKey::AlterPartitionKey => RequestBody::AlterPartition(decode(&mut bytes, version)?), - ApiKey::UpdateFeaturesKey => RequestBody::UpdateFeatures(decode(&mut bytes, version)?), - ApiKey::EnvelopeKey => RequestBody::Envelope(decode(&mut bytes, version)?), - ApiKey::FetchSnapshotKey => RequestBody::FetchSnapshot(decode(&mut bytes, version)?), - ApiKey::DescribeProducersKey => { - RequestBody::DescribeProducers(decode(&mut bytes, version)?) - } - ApiKey::BrokerRegistrationKey => { - RequestBody::BrokerRegistration(decode(&mut bytes, version)?) - } - ApiKey::BrokerHeartbeatKey => { - RequestBody::BrokerHeartbeat(decode(&mut bytes, version)?) - } - ApiKey::UnregisterBrokerKey => { - RequestBody::UnregisterBroker(decode(&mut bytes, version)?) - } - ApiKey::DescribeTransactionsKey => { - RequestBody::DescribeTransactions(decode(&mut bytes, version)?) - } - ApiKey::ListTransactionsKey => { - RequestBody::ListTransactions(decode(&mut bytes, version)?) - } - ApiKey::AllocateProducerIdsKey => { - RequestBody::AllocateProducerIds(decode(&mut bytes, version)?) - } - ApiKey::DescribeClusterKey => { - RequestBody::DescribeCluster(decode(&mut bytes, version)?) - } - ApiKey::ConsumerGroupHeartbeatKey => { - RequestBody::ConsumerGroupHeartbeat(decode(&mut bytes, version)?) - } - ApiKey::ControllerRegistrationKey => { - RequestBody::ControllerRegistration(decode(&mut bytes, version)?) - } - ApiKey::GetTelemetrySubscriptionsKey => { - RequestBody::GetTelemetrySubscriptions(decode(&mut bytes, version)?) - } - ApiKey::PushTelemetryKey => RequestBody::PushTelemetry(decode(&mut bytes, version)?), - ApiKey::AssignReplicasToDirsKey => { - RequestBody::AssignReplicasToDirs(decode(&mut bytes, version)?) - } - ApiKey::ListClientMetricsResourcesKey => { - RequestBody::ListClientMetricsResources(decode(&mut bytes, version)?) - } - }; + let body = RequestBody::decode(api_key, &mut bytes, version)?; Ok(KafkaFrame::Request { header, body }) } @@ -586,157 +111,7 @@ impl KafkaFrame { .context("Failed to decode response header")?; let version = request_header.version; - let body = match request_header.api_key { - ApiKey::ProduceKey => ResponseBody::Produce(decode(&mut bytes, version)?), - ApiKey::FindCoordinatorKey => { - ResponseBody::FindCoordinator(decode(&mut bytes, version)?) - } - ApiKey::FetchKey => ResponseBody::Fetch(decode(&mut bytes, version)?), - ApiKey::OffsetFetchKey => ResponseBody::OffsetFetch(decode(&mut bytes, version)?), - ApiKey::ListOffsetsKey => ResponseBody::ListOffsets(decode(&mut bytes, version)?), - ApiKey::JoinGroupKey => ResponseBody::JoinGroup(decode(&mut bytes, version)?), - ApiKey::SyncGroupKey => ResponseBody::SyncGroup(decode(&mut bytes, version)?), - ApiKey::MetadataKey => ResponseBody::Metadata(decode(&mut bytes, version)?), - ApiKey::DescribeClusterKey => { - ResponseBody::DescribeCluster(decode(&mut bytes, version)?) - } - ApiKey::HeartbeatKey => ResponseBody::Heartbeat(decode(&mut bytes, version)?), - ApiKey::SaslAuthenticateKey => { - ResponseBody::SaslAuthenticate(decode(&mut bytes, version)?) - } - ApiKey::SaslHandshakeKey => ResponseBody::SaslHandshake(decode(&mut bytes, version)?), - ApiKey::StopReplicaKey => ResponseBody::StopReplica(decode(&mut bytes, version)?), - ApiKey::UpdateMetadataKey => ResponseBody::UpdateMetadata(decode(&mut bytes, version)?), - ApiKey::ControlledShutdownKey => { - ResponseBody::ControlledShutdown(decode(&mut bytes, version)?) - } - ApiKey::LeaderAndIsrKey => ResponseBody::LeaderAndIsr(decode(&mut bytes, version)?), - ApiKey::OffsetCommitKey => ResponseBody::OffsetCommit(decode(&mut bytes, version)?), - ApiKey::LeaveGroupKey => ResponseBody::LeaveGroup(decode(&mut bytes, version)?), - ApiKey::DescribeGroupsKey => ResponseBody::DescribeGroups(decode(&mut bytes, version)?), - ApiKey::ListGroupsKey => ResponseBody::ListGroups(decode(&mut bytes, version)?), - ApiKey::ApiVersionsKey => ResponseBody::ApiVersions(decode(&mut bytes, version)?), - ApiKey::CreateTopicsKey => ResponseBody::CreateTopics(decode(&mut bytes, version)?), - ApiKey::DeleteTopicsKey => ResponseBody::DeleteTopics(decode(&mut bytes, version)?), - ApiKey::DeleteRecordsKey => ResponseBody::DeleteRecords(decode(&mut bytes, version)?), - ApiKey::InitProducerIdKey => ResponseBody::InitProducerId(decode(&mut bytes, version)?), - ApiKey::OffsetForLeaderEpochKey => { - ResponseBody::OffsetForLeaderEpoch(decode(&mut bytes, version)?) - } - ApiKey::AddPartitionsToTxnKey => { - ResponseBody::AddPartitionsToTxn(decode(&mut bytes, version)?) - } - ApiKey::AddOffsetsToTxnKey => { - ResponseBody::AddOffsetsToTxn(decode(&mut bytes, version)?) - } - ApiKey::EndTxnKey => ResponseBody::EndTxn(decode(&mut bytes, version)?), - ApiKey::WriteTxnMarkersKey => { - ResponseBody::WriteTxnMarkers(decode(&mut bytes, version)?) - } - - ApiKey::TxnOffsetCommitKey => { - ResponseBody::TxnOffsetCommit(decode(&mut bytes, version)?) - } - ApiKey::DescribeAclsKey => ResponseBody::DescribeAcls(decode(&mut bytes, version)?), - ApiKey::CreateAclsKey => ResponseBody::CreateAcls(decode(&mut bytes, version)?), - ApiKey::DeleteAclsKey => ResponseBody::DeleteAcls(decode(&mut bytes, version)?), - ApiKey::DescribeConfigsKey => { - ResponseBody::DescribeConfigs(decode(&mut bytes, version)?) - } - ApiKey::AlterConfigsKey => ResponseBody::AlterConfigs(decode(&mut bytes, version)?), - ApiKey::AlterReplicaLogDirsKey => { - ResponseBody::AlterReplicaLogDirs(decode(&mut bytes, version)?) - } - ApiKey::DescribeLogDirsKey => { - ResponseBody::DescribeLogDirs(decode(&mut bytes, version)?) - } - ApiKey::CreatePartitionsKey => { - ResponseBody::CreatePartitions(decode(&mut bytes, version)?) - } - ApiKey::CreateDelegationTokenKey => { - ResponseBody::CreateDelegationToken(decode(&mut bytes, version)?) - } - ApiKey::RenewDelegationTokenKey => { - ResponseBody::RenewDelegationToken(decode(&mut bytes, version)?) - } - ApiKey::ExpireDelegationTokenKey => { - ResponseBody::ExpireDelegationToken(decode(&mut bytes, version)?) - } - ApiKey::DescribeDelegationTokenKey => { - ResponseBody::DescribeDelegationToken(decode(&mut bytes, version)?) - } - ApiKey::DeleteGroupsKey => ResponseBody::DeleteGroups(decode(&mut bytes, version)?), - ApiKey::ElectLeadersKey => ResponseBody::ElectLeaders(decode(&mut bytes, version)?), - ApiKey::IncrementalAlterConfigsKey => { - ResponseBody::IncrementalAlterConfigs(decode(&mut bytes, version)?) - } - ApiKey::AlterPartitionReassignmentsKey => { - ResponseBody::AlterPartitionReassignments(decode(&mut bytes, version)?) - } - ApiKey::ListPartitionReassignmentsKey => { - ResponseBody::ListPartitionReassignments(decode(&mut bytes, version)?) - } - ApiKey::OffsetDeleteKey => ResponseBody::OffsetDelete(decode(&mut bytes, version)?), - ApiKey::DescribeClientQuotasKey => { - ResponseBody::DescribeClientQuotas(decode(&mut bytes, version)?) - } - ApiKey::AlterClientQuotasKey => { - ResponseBody::AlterClientQuotas(decode(&mut bytes, version)?) - } - ApiKey::DescribeUserScramCredentialsKey => { - ResponseBody::DescribeUserScramCredentials(decode(&mut bytes, version)?) - } - ApiKey::AlterUserScramCredentialsKey => { - ResponseBody::AlterUserScramCredentials(decode(&mut bytes, version)?) - } - ApiKey::VoteKey => ResponseBody::Vote(decode(&mut bytes, version)?), - ApiKey::BeginQuorumEpochKey => { - ResponseBody::BeginQuorumEpoch(decode(&mut bytes, version)?) - } - ApiKey::EndQuorumEpochKey => ResponseBody::EndQuorumEpoch(decode(&mut bytes, version)?), - ApiKey::DescribeQuorumKey => ResponseBody::DescribeQuorum(decode(&mut bytes, version)?), - ApiKey::AlterPartitionKey => ResponseBody::AlterPartition(decode(&mut bytes, version)?), - ApiKey::UpdateFeaturesKey => ResponseBody::UpdateFeatures(decode(&mut bytes, version)?), - ApiKey::EnvelopeKey => ResponseBody::Envelope(decode(&mut bytes, version)?), - ApiKey::FetchSnapshotKey => ResponseBody::FetchSnapshot(decode(&mut bytes, version)?), - ApiKey::DescribeProducersKey => { - ResponseBody::DescribeProducers(decode(&mut bytes, version)?) - } - ApiKey::BrokerRegistrationKey => { - ResponseBody::BrokerRegistration(decode(&mut bytes, version)?) - } - ApiKey::BrokerHeartbeatKey => { - ResponseBody::BrokerHeartbeat(decode(&mut bytes, version)?) - } - ApiKey::UnregisterBrokerKey => { - ResponseBody::UnregisterBroker(decode(&mut bytes, version)?) - } - ApiKey::DescribeTransactionsKey => { - ResponseBody::DescribeTransactions(decode(&mut bytes, version)?) - } - ApiKey::ListTransactionsKey => { - ResponseBody::ListTransactions(decode(&mut bytes, version)?) - } - ApiKey::AllocateProducerIdsKey => { - ResponseBody::AllocateProducerIds(decode(&mut bytes, version)?) - } - ApiKey::ConsumerGroupHeartbeatKey => { - ResponseBody::ConsumerGroupHeartbeat(decode(&mut bytes, version)?) - } - ApiKey::ControllerRegistrationKey => { - ResponseBody::ControllerRegistration(decode(&mut bytes, version)?) - } - ApiKey::GetTelemetrySubscriptionsKey => { - ResponseBody::GetTelemetrySubscriptions(decode(&mut bytes, version)?) - } - ApiKey::PushTelemetryKey => ResponseBody::PushTelemetry(decode(&mut bytes, version)?), - ApiKey::AssignReplicasToDirsKey => { - ResponseBody::AssignReplicasToDirs(decode(&mut bytes, version)?) - } - ApiKey::ListClientMetricsResourcesKey => { - ResponseBody::ListClientMetricsResources(decode(&mut bytes, version)?) - } - }; + let body = ResponseBody::decode(request_header.api_key, &mut bytes, version)?; Ok(KafkaFrame::Response { version, @@ -760,83 +135,7 @@ impl KafkaFrame { header.encode(bytes, header_version)?; let version = header.request_api_version; - match body { - RequestBody::Produce(x) => encode(x, bytes, version)?, - RequestBody::Fetch(x) => encode(x, bytes, version)?, - RequestBody::OffsetFetch(x) => encode(x, bytes, version)?, - RequestBody::ListOffsets(x) => encode(x, bytes, version)?, - RequestBody::JoinGroup(x) => encode(x, bytes, version)?, - RequestBody::SyncGroup(x) => encode(x, bytes, version)?, - RequestBody::Metadata(x) => encode(x, bytes, version)?, - RequestBody::FindCoordinator(x) => encode(x, bytes, version)?, - RequestBody::LeaderAndIsr(x) => encode(x, bytes, version)?, - RequestBody::Heartbeat(x) => encode(x, bytes, version)?, - RequestBody::CreateTopics(x) => encode(x, bytes, version)?, - RequestBody::DeleteTopics(x) => encode(x, bytes, version)?, - RequestBody::DeleteGroups(x) => encode(x, bytes, version)?, - RequestBody::DescribeConfigs(x) => encode(x, bytes, version)?, - RequestBody::SaslAuthenticate(x) => encode(x, bytes, version)?, - RequestBody::SaslHandshake(x) => encode(x, bytes, version)?, - RequestBody::StopReplica(x) => encode(x, bytes, version)?, - RequestBody::UpdateMetadata(x) => encode(x, bytes, version)?, - RequestBody::ControlledShutdown(x) => encode(x, bytes, version)?, - RequestBody::OffsetCommit(x) => encode(x, bytes, version)?, - RequestBody::LeaveGroup(x) => encode(x, bytes, version)?, - RequestBody::DescribeGroups(x) => encode(x, bytes, version)?, - RequestBody::ListGroups(x) => encode(x, bytes, version)?, - RequestBody::ApiVersions(x) => encode(x, bytes, version)?, - RequestBody::DeleteRecords(x) => encode(x, bytes, version)?, - RequestBody::InitProducerId(x) => encode(x, bytes, version)?, - RequestBody::OffsetForLeaderEpoch(x) => encode(x, bytes, version)?, - RequestBody::AddPartitionsToTxn(x) => encode(x, bytes, version)?, - RequestBody::AddOffsetsToTxn(x) => encode(x, bytes, version)?, - RequestBody::EndTxn(x) => encode(x, bytes, version)?, - RequestBody::WriteTxnMarkers(x) => encode(x, bytes, version)?, - RequestBody::TxnOffsetCommit(x) => encode(x, bytes, version)?, - RequestBody::DescribeAcls(x) => encode(x, bytes, version)?, - RequestBody::CreateAcls(x) => encode(x, bytes, version)?, - RequestBody::DeleteAcls(x) => encode(x, bytes, version)?, - RequestBody::AlterConfigs(x) => encode(x, bytes, version)?, - RequestBody::AlterReplicaLogDirs(x) => encode(x, bytes, version)?, - RequestBody::DescribeLogDirs(x) => encode(x, bytes, version)?, - RequestBody::CreatePartitions(x) => encode(x, bytes, version)?, - RequestBody::CreateDelegationToken(x) => encode(x, bytes, version)?, - RequestBody::RenewDelegationToken(x) => encode(x, bytes, version)?, - RequestBody::ExpireDelegationToken(x) => encode(x, bytes, version)?, - RequestBody::DescribeDelegationToken(x) => encode(x, bytes, version)?, - RequestBody::ElectLeaders(x) => encode(x, bytes, version)?, - RequestBody::IncrementalAlterConfigs(x) => encode(x, bytes, version)?, - RequestBody::AlterPartitionReassignments(x) => encode(x, bytes, version)?, - RequestBody::ListPartitionReassignments(x) => encode(x, bytes, version)?, - RequestBody::OffsetDelete(x) => encode(x, bytes, version)?, - RequestBody::DescribeClientQuotas(x) => encode(x, bytes, version)?, - RequestBody::AlterClientQuotas(x) => encode(x, bytes, version)?, - RequestBody::DescribeUserScramCredentials(x) => encode(x, bytes, version)?, - RequestBody::AlterUserScramCredentials(x) => encode(x, bytes, version)?, - RequestBody::Vote(x) => encode(x, bytes, version)?, - RequestBody::BeginQuorumEpoch(x) => encode(x, bytes, version)?, - RequestBody::EndQuorumEpoch(x) => encode(x, bytes, version)?, - RequestBody::DescribeQuorum(x) => encode(x, bytes, version)?, - RequestBody::AlterPartition(x) => encode(x, bytes, version)?, - RequestBody::UpdateFeatures(x) => encode(x, bytes, version)?, - RequestBody::Envelope(x) => encode(x, bytes, version)?, - RequestBody::FetchSnapshot(x) => encode(x, bytes, version)?, - RequestBody::DescribeProducers(x) => encode(x, bytes, version)?, - RequestBody::BrokerRegistration(x) => encode(x, bytes, version)?, - RequestBody::BrokerHeartbeat(x) => encode(x, bytes, version)?, - RequestBody::UnregisterBroker(x) => encode(x, bytes, version)?, - RequestBody::DescribeTransactions(x) => encode(x, bytes, version)?, - RequestBody::ListTransactions(x) => encode(x, bytes, version)?, - RequestBody::AllocateProducerIds(x) => encode(x, bytes, version)?, - RequestBody::DescribeCluster(x) => encode(x, bytes, version)?, - RequestBody::AlterScramCredentials(x) => encode(x, bytes, version)?, - RequestBody::ConsumerGroupHeartbeat(x) => encode(x, bytes, version)?, - RequestBody::ControllerRegistration(x) => encode(x, bytes, version)?, - RequestBody::GetTelemetrySubscriptions(x) => encode(x, bytes, version)?, - RequestBody::PushTelemetry(x) => encode(x, bytes, version)?, - RequestBody::AssignReplicasToDirs(x) => encode(x, bytes, version)?, - RequestBody::ListClientMetricsResources(x) => encode(x, bytes, version)?, - } + body.encode(bytes, version)? } KafkaFrame::Response { version, @@ -844,82 +143,7 @@ impl KafkaFrame { body, } => { header.encode(bytes, body.header_version(version))?; - match body { - ResponseBody::Produce(x) => encode(x, bytes, version)?, - ResponseBody::FindCoordinator(x) => encode(x, bytes, version)?, - ResponseBody::Fetch(x) => encode(x, bytes, version)?, - ResponseBody::OffsetFetch(x) => encode(x, bytes, version)?, - ResponseBody::ListOffsets(x) => encode(x, bytes, version)?, - ResponseBody::JoinGroup(x) => encode(x, bytes, version)?, - ResponseBody::SyncGroup(x) => encode(x, bytes, version)?, - ResponseBody::Metadata(x) => encode(x, bytes, version)?, - ResponseBody::DescribeCluster(x) => encode(x, bytes, version)?, - ResponseBody::Heartbeat(x) => encode(x, bytes, version)?, - ResponseBody::SaslAuthenticate(x) => encode(x, bytes, version)?, - ResponseBody::SaslHandshake(x) => encode(x, bytes, version)?, - ResponseBody::StopReplica(x) => encode(x, bytes, version)?, - ResponseBody::UpdateMetadata(x) => encode(x, bytes, version)?, - ResponseBody::ControlledShutdown(x) => encode(x, bytes, version)?, - ResponseBody::LeaderAndIsr(x) => encode(x, bytes, version)?, - ResponseBody::OffsetCommit(x) => encode(x, bytes, version)?, - ResponseBody::LeaveGroup(x) => encode(x, bytes, version)?, - ResponseBody::DescribeGroups(x) => encode(x, bytes, version)?, - ResponseBody::ListGroups(x) => encode(x, bytes, version)?, - ResponseBody::ApiVersions(x) => encode(x, bytes, version)?, - ResponseBody::CreateTopics(x) => encode(x, bytes, version)?, - ResponseBody::DeleteTopics(x) => encode(x, bytes, version)?, - ResponseBody::DeleteRecords(x) => encode(x, bytes, version)?, - ResponseBody::InitProducerId(x) => encode(x, bytes, version)?, - ResponseBody::OffsetForLeaderEpoch(x) => encode(x, bytes, version)?, - ResponseBody::AddPartitionsToTxn(x) => encode(x, bytes, version)?, - ResponseBody::AddOffsetsToTxn(x) => encode(x, bytes, version)?, - ResponseBody::EndTxn(x) => encode(x, bytes, version)?, - ResponseBody::WriteTxnMarkers(x) => encode(x, bytes, version)?, - ResponseBody::TxnOffsetCommit(x) => encode(x, bytes, version)?, - ResponseBody::DescribeAcls(x) => encode(x, bytes, version)?, - ResponseBody::CreateAcls(x) => encode(x, bytes, version)?, - ResponseBody::DeleteAcls(x) => encode(x, bytes, version)?, - ResponseBody::AlterConfigs(x) => encode(x, bytes, version)?, - ResponseBody::AlterReplicaLogDirs(x) => encode(x, bytes, version)?, - ResponseBody::DescribeLogDirs(x) => encode(x, bytes, version)?, - ResponseBody::CreatePartitions(x) => encode(x, bytes, version)?, - ResponseBody::CreateDelegationToken(x) => encode(x, bytes, version)?, - ResponseBody::RenewDelegationToken(x) => encode(x, bytes, version)?, - ResponseBody::ExpireDelegationToken(x) => encode(x, bytes, version)?, - ResponseBody::DescribeDelegationToken(x) => encode(x, bytes, version)?, - ResponseBody::ElectLeaders(x) => encode(x, bytes, version)?, - ResponseBody::IncrementalAlterConfigs(x) => encode(x, bytes, version)?, - ResponseBody::AlterPartitionReassignments(x) => encode(x, bytes, version)?, - ResponseBody::ListPartitionReassignments(x) => encode(x, bytes, version)?, - ResponseBody::OffsetDelete(x) => encode(x, bytes, version)?, - ResponseBody::DescribeClientQuotas(x) => encode(x, bytes, version)?, - ResponseBody::AlterClientQuotas(x) => encode(x, bytes, version)?, - ResponseBody::DescribeUserScramCredentials(x) => encode(x, bytes, version)?, - ResponseBody::AlterUserScramCredentials(x) => encode(x, bytes, version)?, - ResponseBody::Vote(x) => encode(x, bytes, version)?, - ResponseBody::BeginQuorumEpoch(x) => encode(x, bytes, version)?, - ResponseBody::EndQuorumEpoch(x) => encode(x, bytes, version)?, - ResponseBody::DescribeQuorum(x) => encode(x, bytes, version)?, - ResponseBody::AlterPartition(x) => encode(x, bytes, version)?, - ResponseBody::UpdateFeatures(x) => encode(x, bytes, version)?, - ResponseBody::Envelope(x) => encode(x, bytes, version)?, - ResponseBody::FetchSnapshot(x) => encode(x, bytes, version)?, - ResponseBody::DescribeProducers(x) => encode(x, bytes, version)?, - ResponseBody::BrokerRegistration(x) => encode(x, bytes, version)?, - ResponseBody::BrokerHeartbeat(x) => encode(x, bytes, version)?, - ResponseBody::UnregisterBroker(x) => encode(x, bytes, version)?, - ResponseBody::DescribeTransactions(x) => encode(x, bytes, version)?, - ResponseBody::ListTransactions(x) => encode(x, bytes, version)?, - ResponseBody::AllocateProducerIds(x) => encode(x, bytes, version)?, - ResponseBody::DescribeConfigs(x) => encode(x, bytes, version)?, - ResponseBody::DeleteGroups(x) => encode(x, bytes, version)?, - ResponseBody::ConsumerGroupHeartbeat(x) => encode(x, bytes, version)?, - ResponseBody::ControllerRegistration(x) => encode(x, bytes, version)?, - ResponseBody::GetTelemetrySubscriptions(x) => encode(x, bytes, version)?, - ResponseBody::PushTelemetry(x) => encode(x, bytes, version)?, - ResponseBody::AssignReplicasToDirs(x) => encode(x, bytes, version)?, - ResponseBody::ListClientMetricsResources(x) => encode(x, bytes, version)?, - } + body.encode(bytes, version)? } } @@ -930,23 +154,3 @@ impl KafkaFrame { Ok(()) } } - -fn decode(bytes: &mut Bytes, version: i16) -> Result { - T::decode(bytes, version).with_context(|| { - format!( - "Failed to decode {} v{} body", - std::any::type_name::(), - version - ) - }) -} - -fn encode(encodable: T, bytes: &mut BytesMut, version: i16) -> Result<()> { - encodable.encode(bytes, version).with_context(|| { - format!( - "Failed to encode {} v{} body", - std::any::type_name::(), - version - ) - }) -}