diff --git a/shotover/src/frame/kafka.rs b/shotover/src/frame/kafka.rs index 9ccde6958..480ecf6ed 100644 --- a/shotover/src/frame/kafka.rs +++ b/shotover/src/frame/kafka.rs @@ -2,14 +2,46 @@ use crate::codec::kafka::RequestHeader as CodecRequestHeader; use anyhow::{anyhow, Context, Result}; use bytes::{BufMut, Bytes, BytesMut}; use kafka_protocol::messages::{ - ApiKey, CreateTopicsRequest, DeleteGroupsRequest, DeleteTopicsRequest, DescribeClusterResponse, - DescribeConfigsRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, - FindCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, - JoinGroupResponse, LeaderAndIsrRequest, ListOffsetsRequest, ListOffsetsResponse, - MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, - ProduceResponse, RequestHeader, ResponseHeader, SaslAuthenticateRequest, - SaslAuthenticateResponse, SaslHandshakeRequest, SaslHandshakeResponse, SyncGroupRequest, - SyncGroupResponse, + AddOffsetsToTxnRequest, AddOffsetsToTxnResponse, AddPartitionsToTxnRequest, + AddPartitionsToTxnResponse, AllocateProducerIdsRequest, AllocateProducerIdsResponse, + AlterClientQuotasRequest, AlterClientQuotasResponse, AlterConfigsRequest, AlterConfigsResponse, + AlterPartitionReassignmentsRequest, AlterPartitionReassignmentsResponse, AlterPartitionRequest, + AlterPartitionResponse, AlterReplicaLogDirsRequest, AlterReplicaLogDirsResponse, + AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, ApiKey, + ApiVersionsRequest, ApiVersionsResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse, + BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest, + BrokerRegistrationResponse, ControlledShutdownRequest, ControlledShutdownResponse, + CreateAclsRequest, CreateAclsResponse, CreateDelegationTokenRequest, + CreateDelegationTokenResponse, CreatePartitionsRequest, CreatePartitionsResponse, + CreateTopicsRequest, CreateTopicsResponse, DeleteAclsRequest, DeleteAclsResponse, + DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, DeleteRecordsResponse, + DeleteTopicsRequest, DeleteTopicsResponse, DescribeAclsRequest, DescribeAclsResponse, + DescribeClientQuotasRequest, DescribeClientQuotasResponse, DescribeClusterRequest, + DescribeClusterResponse, DescribeConfigsRequest, DescribeConfigsResponse, + DescribeDelegationTokenRequest, DescribeDelegationTokenResponse, DescribeGroupsRequest, + DescribeGroupsResponse, DescribeLogDirsRequest, DescribeLogDirsResponse, + DescribeProducersRequest, DescribeProducersResponse, DescribeQuorumRequest, + DescribeQuorumResponse, DescribeTransactionsRequest, DescribeTransactionsResponse, + DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse, ElectLeadersRequest, + ElectLeadersResponse, EndQuorumEpochRequest, EndQuorumEpochResponse, EndTxnRequest, + EndTxnResponse, EnvelopeRequest, EnvelopeResponse, ExpireDelegationTokenRequest, + ExpireDelegationTokenResponse, FetchRequest, FetchResponse, FetchSnapshotRequest, + FetchSnapshotResponse, FindCoordinatorRequest, FindCoordinatorResponse, HeartbeatRequest, + HeartbeatResponse, IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, + InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse, + LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, + ListGroupsRequest, ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, + ListPartitionReassignmentsRequest, ListPartitionReassignmentsResponse, ListTransactionsRequest, + ListTransactionsResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, + OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, + OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, + ProduceResponse, RenewDelegationTokenRequest, RenewDelegationTokenResponse, RequestHeader, + ResponseHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, + SaslHandshakeResponse, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, + SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, UnregisterBrokerRequest, + UnregisterBrokerResponse, UpdateFeaturesRequest, UpdateFeaturesResponse, UpdateMetadataRequest, + UpdateMetadataResponse, VoteRequest, VoteResponse, WriteTxnMarkersRequest, + WriteTxnMarkersResponse, }; use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion, StrBytes}; use std::fmt::{Display, Formatter, Result as FmtResult}; @@ -88,9 +120,61 @@ pub enum RequestBody { DeleteTopics(DeleteTopicsRequest), DeleteGroups(DeleteGroupsRequest), DescribeConfigs(DescribeConfigsRequest), - SaslAuthenticateRequest(SaslAuthenticateRequest), - SaslHandshakeRequest(SaslHandshakeRequest), - Unknown { api_key: ApiKey, message: Bytes }, + SaslAuthenticate(SaslAuthenticateRequest), + SaslHandshake(SaslHandshakeRequest), + StopReplica(StopReplicaRequest), + UpdateMetadata(UpdateMetadataRequest), + ControlledShutdown(ControlledShutdownRequest), + OffsetCommit(OffsetCommitRequest), + LeaveGroup(LeaveGroupRequest), + DescribeGroups(DescribeGroupsRequest), + ListGroups(ListGroupsRequest), + ApiVersions(ApiVersionsRequest), + DeleteRecords(DeleteRecordsRequest), + InitProducerId(InitProducerIdRequest), + OffsetForLeaderEpoch(OffsetForLeaderEpochRequest), + AddPartitionsToTxn(AddPartitionsToTxnRequest), + AddOffsetsToTxn(AddOffsetsToTxnRequest), + EndTxn(EndTxnRequest), + WriteTxnMarkers(WriteTxnMarkersRequest), + TxnOffsetCommit(TxnOffsetCommitRequest), + DescribeAcls(DescribeAclsRequest), + CreateAcls(CreateAclsRequest), + DeleteAcls(DeleteAclsRequest), + AlterConfigs(AlterConfigsRequest), + AlterReplicaLogDirs(AlterReplicaLogDirsRequest), + DescribeLogDirs(DescribeLogDirsRequest), + CreatePartitions(CreatePartitionsRequest), + CreateDelegationToken(CreateDelegationTokenRequest), + RenewDelegationToken(RenewDelegationTokenRequest), + ExpireDelegationToken(ExpireDelegationTokenRequest), + DescribeDelegationToken(DescribeDelegationTokenRequest), + ElectLeaders(ElectLeadersRequest), + IncrementalAlterConfigs(IncrementalAlterConfigsRequest), + AlterPartitionReassignments(AlterPartitionReassignmentsRequest), + ListPartitionReassignments(ListPartitionReassignmentsRequest), + DescribeCluster(DescribeClusterRequest), + Envelope(EnvelopeRequest), + FetchSnapshot(FetchSnapshotRequest), + ListTransactions(ListTransactionsRequest), + DescribeTransactions(DescribeTransactionsRequest), + AllocateProducerIds(AllocateProducerIdsRequest), + UnregisterBroker(UnregisterBrokerRequest), + BrokerHeartbeat(BrokerHeartbeatRequest), + BrokerRegistration(BrokerRegistrationRequest), + DescribeProducers(DescribeProducersRequest), + UpdateFeatures(UpdateFeaturesRequest), + AlterPartition(AlterPartitionRequest), + EndQuorumEpoch(EndQuorumEpochRequest), + BeginQuorumEpoch(BeginQuorumEpochRequest), + Vote(VoteRequest), + DescribeUserScramCredentials(DescribeUserScramCredentialsRequest), + AlterScramCredentials(AlterUserScramCredentialsRequest), + DescribeClientQuotas(DescribeClientQuotasRequest), + OffsetDelete(OffsetDeleteRequest), + AlterClientQuotas(AlterClientQuotasRequest), + DescribeQuorum(DescribeQuorumRequest), + AlterUserScramCredentials(AlterUserScramCredentialsRequest), } #[derive(Debug, PartialEq, Clone)] @@ -105,9 +189,64 @@ pub enum ResponseBody { Metadata(MetadataResponse), DescribeCluster(DescribeClusterResponse), Heartbeat(HeartbeatResponse), - SaslAuthenticateResponse(SaslAuthenticateResponse), - SaslHandshakeResponse(SaslHandshakeResponse), - Unknown { api_key: ApiKey, message: Bytes }, + SaslAuthenticate(SaslAuthenticateResponse), + SaslHandshake(SaslHandshakeResponse), + StopReplica(StopReplicaResponse), + UpdateMetadata(UpdateMetadataResponse), + ControlledShutdown(ControlledShutdownResponse), + LeaderAndIsr(LeaderAndIsrResponse), + OffsetCommit(OffsetCommitResponse), + LeaveGroup(LeaveGroupResponse), + DescribeGroups(DescribeGroupsResponse), + ListGroups(ListGroupsResponse), + ApiVersions(ApiVersionsResponse), + CreateTopics(CreateTopicsResponse), + DeleteTopics(DeleteTopicsResponse), + DeleteRecords(DeleteRecordsResponse), + InitProducerId(InitProducerIdResponse), + OffsetForLeaderEpoch(OffsetForLeaderEpochResponse), + AddPartitionsToTxn(AddPartitionsToTxnResponse), + AddOffsetsToTxn(AddOffsetsToTxnResponse), + EndTxn(EndTxnResponse), + WriteTxnMarkers(WriteTxnMarkersResponse), + TxnOffsetCommit(TxnOffsetCommitResponse), + DescribeAcls(DescribeAclsResponse), + CreateAcls(CreateAclsResponse), + DeleteAcls(DeleteAclsResponse), + AlterConfigs(AlterConfigsResponse), + AlterReplicaLogDirs(AlterReplicaLogDirsResponse), + DescribeLogDirs(DescribeLogDirsResponse), + CreatePartitions(CreatePartitionsResponse), + CreateDelegationToken(CreateDelegationTokenResponse), + RenewDelegationToken(RenewDelegationTokenResponse), + ExpireDelegationToken(ExpireDelegationTokenResponse), + DescribeDelegationToken(DescribeDelegationTokenResponse), + ElectLeaders(ElectLeadersResponse), + IncrementalAlterConfigs(IncrementalAlterConfigsResponse), + AlterPartitionReassignments(AlterPartitionReassignmentsResponse), + ListPartitionReassignments(ListPartitionReassignmentsResponse), + BrokerRegistration(BrokerRegistrationResponse), + AllocateProducerIds(AllocateProducerIdsResponse), + ListTransactions(ListTransactionsResponse), + DescribeTransactions(DescribeTransactionsResponse), + UnregisterBroker(UnregisterBrokerResponse), + BrokerHeartbeat(BrokerHeartbeatResponse), + DescribeProducers(DescribeProducersResponse), + FetchSnapshot(FetchSnapshotResponse), + UpdateFeatures(UpdateFeaturesResponse), + AlterPartition(AlterPartitionResponse), + EndQuorumEpoch(EndQuorumEpochResponse), + Envelope(EnvelopeResponse), + Vote(VoteResponse), + DescribeUserScramCredentials(DescribeUserScramCredentialsResponse), + DescribeQuorum(DescribeQuorumResponse), + BeginQuorumEpoch(BeginQuorumEpochResponse), + AlterUserScramCredentials(AlterUserScramCredentialsResponse), + DescribeClientQuotas(DescribeClientQuotasResponse), + AlterClientQuotas(AlterClientQuotasResponse), + DescribeConfigs(DescribeConfigsResponse), + OffsetDelete(OffsetDeleteResponse), + DeleteGroups(DeleteGroupsResponse), } impl ResponseBody { @@ -123,13 +262,102 @@ impl ResponseBody { ResponseBody::Metadata(_) => MetadataResponse::header_version(version), ResponseBody::DescribeCluster(_) => DescribeClusterResponse::header_version(version), ResponseBody::Heartbeat(_) => HeartbeatResponse::header_version(version), - ResponseBody::SaslAuthenticateResponse(_) => { - SaslAuthenticateResponse::header_version(version) + ResponseBody::SaslAuthenticate(_) => SaslAuthenticateResponse::header_version(version), + ResponseBody::SaslHandshake(_) => SaslHandshakeResponse::header_version(version), + ResponseBody::StopReplica(_) => StopReplicaResponse::header_version(version), + ResponseBody::UpdateMetadata(_) => UpdateMetadataResponse::header_version(version), + ResponseBody::ControlledShutdown(_) => { + ControlledShutdownResponse::header_version(version) } - ResponseBody::SaslHandshakeResponse(_) => { - SaslHandshakeResponse::header_version(version) + ResponseBody::LeaderAndIsr(_) => LeaderAndIsrResponse::header_version(version), + ResponseBody::OffsetCommit(_) => OffsetCommitResponse::header_version(version), + ResponseBody::LeaveGroup(_) => LeaveGroupResponse::header_version(version), + ResponseBody::DescribeGroups(_) => DescribeGroupsResponse::header_version(version), + ResponseBody::ListGroups(_) => ListGroupsResponse::header_version(version), + ResponseBody::ApiVersions(_) => ApiVersionsResponse::header_version(version), + ResponseBody::CreateTopics(_) => CreateTopicsResponse::header_version(version), + ResponseBody::DeleteTopics(_) => DeleteTopicsResponse::header_version(version), + ResponseBody::DeleteRecords(_) => DeleteRecordsResponse::header_version(version), + ResponseBody::InitProducerId(_) => InitProducerIdResponse::header_version(version), + ResponseBody::OffsetForLeaderEpoch(_) => { + OffsetForLeaderEpochResponse::header_version(version) } - ResponseBody::Unknown { api_key, .. } => api_key.response_header_version(version), + ResponseBody::AddPartitionsToTxn(_) => { + AddPartitionsToTxnResponse::header_version(version) + } + ResponseBody::AddOffsetsToTxn(_) => AddOffsetsToTxnResponse::header_version(version), + ResponseBody::EndTxn(_) => EndTxnResponse::header_version(version), + ResponseBody::WriteTxnMarkers(_) => WriteTxnMarkersResponse::header_version(version), + ResponseBody::DescribeAcls(_) => DescribeAclsResponse::header_version(version), + ResponseBody::CreateAcls(_) => CreateAclsResponse::header_version(version), + ResponseBody::DeleteAcls(_) => DeleteAclsResponse::header_version(version), + ResponseBody::TxnOffsetCommit(_) => TxnOffsetCommitResponse::header_version(version), + ResponseBody::AlterConfigs(_) => AlterConfigsResponse::header_version(version), + ResponseBody::AlterReplicaLogDirs(_) => { + AlterReplicaLogDirsResponse::header_version(version) + } + ResponseBody::DescribeLogDirs(_) => DescribeLogDirsResponse::header_version(version), + ResponseBody::CreatePartitions(_) => CreatePartitionsResponse::header_version(version), + ResponseBody::CreateDelegationToken(_) => { + CreateDelegationTokenResponse::header_version(version) + } + ResponseBody::RenewDelegationToken(_) => { + RenewDelegationTokenResponse::header_version(version) + } + ResponseBody::ExpireDelegationToken(_) => { + ExpireDelegationTokenResponse::header_version(version) + } + ResponseBody::DescribeDelegationToken(_) => { + DescribeDelegationTokenResponse::header_version(version) + } + ResponseBody::ElectLeaders(_) => ElectLeadersResponse::header_version(version), + ResponseBody::IncrementalAlterConfigs(_) => { + IncrementalAlterConfigsResponse::header_version(version) + } + ResponseBody::AlterPartitionReassignments(_) => { + AlterPartitionReassignmentsResponse::header_version(version) + } + ResponseBody::ListPartitionReassignments(_) => { + ListPartitionReassignmentsResponse::header_version(version) + } + ResponseBody::BrokerRegistration(_) => { + BrokerRegistrationResponse::header_version(version) + } + ResponseBody::AllocateProducerIds(_) => { + AllocateProducerIdsResponse::header_version(version) + } + ResponseBody::ListTransactions(_) => ListTransactionsResponse::header_version(version), + ResponseBody::DescribeTransactions(_) => { + DescribeTransactionsResponse::header_version(version) + } + ResponseBody::UnregisterBroker(_) => UnregisterBrokerResponse::header_version(version), + ResponseBody::BrokerHeartbeat(_) => BrokerHeartbeatResponse::header_version(version), + ResponseBody::DescribeProducers(_) => { + DescribeProducersResponse::header_version(version) + } + ResponseBody::FetchSnapshot(_) => FetchSnapshotResponse::header_version(version), + ResponseBody::UpdateFeatures(_) => UpdateFeaturesResponse::header_version(version), + ResponseBody::AlterPartition(_) => AlterPartitionResponse::header_version(version), + ResponseBody::EndQuorumEpoch(_) => EndQuorumEpochResponse::header_version(version), + ResponseBody::Envelope(_) => EnvelopeResponse::header_version(version), + ResponseBody::Vote(_) => VoteResponse::header_version(version), + ResponseBody::DescribeUserScramCredentials(_) => { + DescribeUserScramCredentialsResponse::header_version(version) + } + ResponseBody::DescribeQuorum(_) => DescribeQuorumResponse::header_version(version), + ResponseBody::BeginQuorumEpoch(_) => BeginQuorumEpochResponse::header_version(version), + ResponseBody::AlterUserScramCredentials(_) => { + AlterUserScramCredentialsResponse::header_version(version) + } + ResponseBody::DescribeClientQuotas(_) => { + DescribeClientQuotasResponse::header_version(version) + } + ResponseBody::AlterClientQuotas(_) => { + AlterClientQuotasResponse::header_version(version) + } + ResponseBody::DescribeConfigs(_) => DescribeConfigsResponse::header_version(version), + ResponseBody::OffsetDelete(_) => OffsetDeleteResponse::header_version(version), + ResponseBody::DeleteGroups(_) => DeleteGroupsResponse::header_version(version), } } } @@ -179,15 +407,120 @@ impl KafkaFrame { RequestBody::DescribeConfigs(decode(&mut bytes, version)?) } ApiKey::SaslAuthenticateKey => { - RequestBody::SaslAuthenticateRequest(decode(&mut bytes, version)?) + RequestBody::SaslAuthenticate(decode(&mut bytes, version)?) + } + ApiKey::SaslHandshakeKey => RequestBody::SaslHandshake(decode(&mut bytes, version)?), + ApiKey::StopReplicaKey => RequestBody::StopReplica(decode(&mut bytes, version)?), + ApiKey::UpdateMetadataKey => RequestBody::UpdateMetadata(decode(&mut bytes, version)?), + ApiKey::ControlledShutdownKey => { + RequestBody::ControlledShutdown(decode(&mut bytes, version)?) + } + ApiKey::OffsetCommitKey => RequestBody::OffsetCommit(decode(&mut bytes, version)?), + ApiKey::LeaveGroupKey => RequestBody::LeaveGroup(decode(&mut bytes, version)?), + ApiKey::DescribeGroupsKey => RequestBody::DescribeGroups(decode(&mut bytes, version)?), + ApiKey::ListGroupsKey => RequestBody::ListGroups(decode(&mut bytes, version)?), + ApiKey::ApiVersionsKey => RequestBody::ApiVersions(decode(&mut bytes, version)?), + ApiKey::DeleteRecordsKey => RequestBody::DeleteRecords(decode(&mut bytes, version)?), + ApiKey::InitProducerIdKey => RequestBody::InitProducerId(decode(&mut bytes, version)?), + ApiKey::OffsetForLeaderEpochKey => { + RequestBody::OffsetForLeaderEpoch(decode(&mut bytes, version)?) + } + ApiKey::AddPartitionsToTxnKey => { + RequestBody::AddPartitionsToTxn(decode(&mut bytes, version)?) + } + ApiKey::AddOffsetsToTxnKey => { + RequestBody::AddOffsetsToTxn(decode(&mut bytes, version)?) + } + ApiKey::EndTxnKey => RequestBody::EndTxn(decode(&mut bytes, version)?), + ApiKey::WriteTxnMarkersKey => { + RequestBody::WriteTxnMarkers(decode(&mut bytes, version)?) + } + + ApiKey::TxnOffsetCommitKey => { + RequestBody::TxnOffsetCommit(decode(&mut bytes, version)?) + } + ApiKey::DescribeAclsKey => RequestBody::DescribeAcls(decode(&mut bytes, version)?), + ApiKey::CreateAclsKey => RequestBody::CreateAcls(decode(&mut bytes, version)?), + ApiKey::DeleteAclsKey => RequestBody::DeleteAcls(decode(&mut bytes, version)?), + ApiKey::AlterConfigsKey => RequestBody::AlterConfigs(decode(&mut bytes, version)?), + ApiKey::AlterReplicaLogDirsKey => { + RequestBody::AlterReplicaLogDirs(decode(&mut bytes, version)?) + } + ApiKey::DescribeLogDirsKey => { + RequestBody::DescribeLogDirs(decode(&mut bytes, version)?) + } + ApiKey::CreatePartitionsKey => { + RequestBody::CreatePartitions(decode(&mut bytes, version)?) + } + ApiKey::CreateDelegationTokenKey => { + RequestBody::CreateDelegationToken(decode(&mut bytes, version)?) + } + ApiKey::RenewDelegationTokenKey => { + RequestBody::RenewDelegationToken(decode(&mut bytes, version)?) + } + ApiKey::ExpireDelegationTokenKey => { + RequestBody::ExpireDelegationToken(decode(&mut bytes, version)?) + } + ApiKey::DescribeDelegationTokenKey => { + RequestBody::DescribeDelegationToken(decode(&mut bytes, version)?) + } + ApiKey::ElectLeadersKey => RequestBody::ElectLeaders(decode(&mut bytes, version)?), + ApiKey::IncrementalAlterConfigsKey => { + RequestBody::IncrementalAlterConfigs(decode(&mut bytes, version)?) + } + ApiKey::AlterPartitionReassignmentsKey => { + RequestBody::AlterPartitionReassignments(decode(&mut bytes, version)?) + } + ApiKey::ListPartitionReassignmentsKey => { + RequestBody::ListPartitionReassignments(decode(&mut bytes, version)?) + } + ApiKey::OffsetDeleteKey => RequestBody::OffsetDelete(decode(&mut bytes, version)?), + ApiKey::DescribeClientQuotasKey => { + RequestBody::DescribeClientQuotas(decode(&mut bytes, version)?) + } + ApiKey::AlterClientQuotasKey => { + RequestBody::AlterClientQuotas(decode(&mut bytes, version)?) + } + ApiKey::DescribeUserScramCredentialsKey => { + RequestBody::DescribeUserScramCredentials(decode(&mut bytes, version)?) + } + ApiKey::AlterUserScramCredentialsKey => { + RequestBody::AlterUserScramCredentials(decode(&mut bytes, version)?) + } + ApiKey::VoteKey => RequestBody::Vote(decode(&mut bytes, version)?), + ApiKey::BeginQuorumEpochKey => { + RequestBody::BeginQuorumEpoch(decode(&mut bytes, version)?) } - ApiKey::SaslHandshakeKey => { - RequestBody::SaslHandshakeRequest(decode(&mut bytes, version)?) + ApiKey::EndQuorumEpochKey => RequestBody::EndQuorumEpoch(decode(&mut bytes, version)?), + ApiKey::DescribeQuorumKey => RequestBody::DescribeQuorum(decode(&mut bytes, version)?), + ApiKey::AlterPartitionKey => RequestBody::AlterPartition(decode(&mut bytes, version)?), + ApiKey::UpdateFeaturesKey => RequestBody::UpdateFeatures(decode(&mut bytes, version)?), + ApiKey::EnvelopeKey => RequestBody::Envelope(decode(&mut bytes, version)?), + ApiKey::FetchSnapshotKey => RequestBody::FetchSnapshot(decode(&mut bytes, version)?), + ApiKey::DescribeProducersKey => { + RequestBody::DescribeProducers(decode(&mut bytes, version)?) + } + ApiKey::BrokerRegistrationKey => { + RequestBody::BrokerRegistration(decode(&mut bytes, version)?) + } + ApiKey::BrokerHeartbeatKey => { + RequestBody::BrokerHeartbeat(decode(&mut bytes, version)?) + } + ApiKey::UnregisterBrokerKey => { + RequestBody::UnregisterBroker(decode(&mut bytes, version)?) + } + ApiKey::DescribeTransactionsKey => { + RequestBody::DescribeTransactions(decode(&mut bytes, version)?) + } + ApiKey::ListTransactionsKey => { + RequestBody::ListTransactions(decode(&mut bytes, version)?) + } + ApiKey::AllocateProducerIdsKey => { + RequestBody::AllocateProducerIds(decode(&mut bytes, version)?) + } + ApiKey::DescribeClusterKey => { + RequestBody::DescribeCluster(decode(&mut bytes, version)?) } - api_key => RequestBody::Unknown { - api_key, - message: bytes, - }, }; Ok(KafkaFrame::Request { header, body }) @@ -219,15 +552,124 @@ impl KafkaFrame { } ApiKey::HeartbeatKey => ResponseBody::Heartbeat(decode(&mut bytes, version)?), ApiKey::SaslAuthenticateKey => { - ResponseBody::SaslAuthenticateResponse(decode(&mut bytes, version)?) + ResponseBody::SaslAuthenticate(decode(&mut bytes, version)?) + } + ApiKey::SaslHandshakeKey => ResponseBody::SaslHandshake(decode(&mut bytes, version)?), + ApiKey::StopReplicaKey => ResponseBody::StopReplica(decode(&mut bytes, version)?), + ApiKey::UpdateMetadataKey => ResponseBody::UpdateMetadata(decode(&mut bytes, version)?), + ApiKey::ControlledShutdownKey => { + ResponseBody::ControlledShutdown(decode(&mut bytes, version)?) + } + ApiKey::LeaderAndIsrKey => ResponseBody::LeaderAndIsr(decode(&mut bytes, version)?), + ApiKey::OffsetCommitKey => ResponseBody::OffsetCommit(decode(&mut bytes, version)?), + ApiKey::LeaveGroupKey => ResponseBody::LeaveGroup(decode(&mut bytes, version)?), + ApiKey::DescribeGroupsKey => ResponseBody::DescribeGroups(decode(&mut bytes, version)?), + ApiKey::ListGroupsKey => ResponseBody::ListGroups(decode(&mut bytes, version)?), + ApiKey::ApiVersionsKey => ResponseBody::ApiVersions(decode(&mut bytes, version)?), + ApiKey::CreateTopicsKey => ResponseBody::CreateTopics(decode(&mut bytes, version)?), + ApiKey::DeleteTopicsKey => ResponseBody::DeleteTopics(decode(&mut bytes, version)?), + ApiKey::DeleteRecordsKey => ResponseBody::DeleteRecords(decode(&mut bytes, version)?), + ApiKey::InitProducerIdKey => ResponseBody::InitProducerId(decode(&mut bytes, version)?), + ApiKey::OffsetForLeaderEpochKey => { + ResponseBody::OffsetForLeaderEpoch(decode(&mut bytes, version)?) + } + ApiKey::AddPartitionsToTxnKey => { + ResponseBody::AddPartitionsToTxn(decode(&mut bytes, version)?) + } + ApiKey::AddOffsetsToTxnKey => { + ResponseBody::AddOffsetsToTxn(decode(&mut bytes, version)?) + } + ApiKey::EndTxnKey => ResponseBody::EndTxn(decode(&mut bytes, version)?), + ApiKey::WriteTxnMarkersKey => { + ResponseBody::WriteTxnMarkers(decode(&mut bytes, version)?) + } + + ApiKey::TxnOffsetCommitKey => { + ResponseBody::TxnOffsetCommit(decode(&mut bytes, version)?) + } + ApiKey::DescribeAclsKey => ResponseBody::DescribeAcls(decode(&mut bytes, version)?), + ApiKey::CreateAclsKey => ResponseBody::CreateAcls(decode(&mut bytes, version)?), + ApiKey::DeleteAclsKey => ResponseBody::DeleteAcls(decode(&mut bytes, version)?), + ApiKey::DescribeConfigsKey => { + ResponseBody::DescribeConfigs(decode(&mut bytes, version)?) + } + ApiKey::AlterConfigsKey => ResponseBody::AlterConfigs(decode(&mut bytes, version)?), + ApiKey::AlterReplicaLogDirsKey => { + ResponseBody::AlterReplicaLogDirs(decode(&mut bytes, version)?) + } + ApiKey::DescribeLogDirsKey => { + ResponseBody::DescribeLogDirs(decode(&mut bytes, version)?) + } + ApiKey::CreatePartitionsKey => { + ResponseBody::CreatePartitions(decode(&mut bytes, version)?) + } + ApiKey::CreateDelegationTokenKey => { + ResponseBody::CreateDelegationToken(decode(&mut bytes, version)?) + } + ApiKey::RenewDelegationTokenKey => { + ResponseBody::RenewDelegationToken(decode(&mut bytes, version)?) + } + ApiKey::ExpireDelegationTokenKey => { + ResponseBody::ExpireDelegationToken(decode(&mut bytes, version)?) + } + ApiKey::DescribeDelegationTokenKey => { + ResponseBody::DescribeDelegationToken(decode(&mut bytes, version)?) + } + ApiKey::DeleteGroupsKey => ResponseBody::DeleteGroups(decode(&mut bytes, version)?), + ApiKey::ElectLeadersKey => ResponseBody::ElectLeaders(decode(&mut bytes, version)?), + ApiKey::IncrementalAlterConfigsKey => { + ResponseBody::IncrementalAlterConfigs(decode(&mut bytes, version)?) + } + ApiKey::AlterPartitionReassignmentsKey => { + ResponseBody::AlterPartitionReassignments(decode(&mut bytes, version)?) + } + ApiKey::ListPartitionReassignmentsKey => { + ResponseBody::ListPartitionReassignments(decode(&mut bytes, version)?) + } + ApiKey::OffsetDeleteKey => ResponseBody::OffsetDelete(decode(&mut bytes, version)?), + ApiKey::DescribeClientQuotasKey => { + ResponseBody::DescribeClientQuotas(decode(&mut bytes, version)?) + } + ApiKey::AlterClientQuotasKey => { + ResponseBody::AlterClientQuotas(decode(&mut bytes, version)?) + } + ApiKey::DescribeUserScramCredentialsKey => { + ResponseBody::DescribeUserScramCredentials(decode(&mut bytes, version)?) + } + ApiKey::AlterUserScramCredentialsKey => { + ResponseBody::AlterUserScramCredentials(decode(&mut bytes, version)?) + } + ApiKey::VoteKey => ResponseBody::Vote(decode(&mut bytes, version)?), + ApiKey::BeginQuorumEpochKey => { + ResponseBody::BeginQuorumEpoch(decode(&mut bytes, version)?) + } + ApiKey::EndQuorumEpochKey => ResponseBody::EndQuorumEpoch(decode(&mut bytes, version)?), + ApiKey::DescribeQuorumKey => ResponseBody::DescribeQuorum(decode(&mut bytes, version)?), + ApiKey::AlterPartitionKey => ResponseBody::AlterPartition(decode(&mut bytes, version)?), + ApiKey::UpdateFeaturesKey => ResponseBody::UpdateFeatures(decode(&mut bytes, version)?), + ApiKey::EnvelopeKey => ResponseBody::Envelope(decode(&mut bytes, version)?), + ApiKey::FetchSnapshotKey => ResponseBody::FetchSnapshot(decode(&mut bytes, version)?), + ApiKey::DescribeProducersKey => { + ResponseBody::DescribeProducers(decode(&mut bytes, version)?) + } + ApiKey::BrokerRegistrationKey => { + ResponseBody::BrokerRegistration(decode(&mut bytes, version)?) + } + ApiKey::BrokerHeartbeatKey => { + ResponseBody::BrokerHeartbeat(decode(&mut bytes, version)?) + } + ApiKey::UnregisterBrokerKey => { + ResponseBody::UnregisterBroker(decode(&mut bytes, version)?) + } + ApiKey::DescribeTransactionsKey => { + ResponseBody::DescribeTransactions(decode(&mut bytes, version)?) + } + ApiKey::ListTransactionsKey => { + ResponseBody::ListTransactions(decode(&mut bytes, version)?) } - ApiKey::SaslHandshakeKey => { - ResponseBody::SaslHandshakeResponse(decode(&mut bytes, version)?) + ApiKey::AllocateProducerIdsKey => { + ResponseBody::AllocateProducerIds(decode(&mut bytes, version)?) } - api_key => ResponseBody::Unknown { - api_key, - message: bytes, - }, }; Ok(KafkaFrame::Response { @@ -267,9 +709,61 @@ impl KafkaFrame { RequestBody::DeleteTopics(x) => encode(x, bytes, version)?, RequestBody::DeleteGroups(x) => encode(x, bytes, version)?, RequestBody::DescribeConfigs(x) => encode(x, bytes, version)?, - RequestBody::SaslAuthenticateRequest(x) => encode(x, bytes, version)?, - RequestBody::SaslHandshakeRequest(x) => encode(x, bytes, version)?, - RequestBody::Unknown { message, .. } => bytes.extend_from_slice(&message), + RequestBody::SaslAuthenticate(x) => encode(x, bytes, version)?, + RequestBody::SaslHandshake(x) => encode(x, bytes, version)?, + RequestBody::StopReplica(x) => encode(x, bytes, version)?, + RequestBody::UpdateMetadata(x) => encode(x, bytes, version)?, + RequestBody::ControlledShutdown(x) => encode(x, bytes, version)?, + RequestBody::OffsetCommit(x) => encode(x, bytes, version)?, + RequestBody::LeaveGroup(x) => encode(x, bytes, version)?, + RequestBody::DescribeGroups(x) => encode(x, bytes, version)?, + RequestBody::ListGroups(x) => encode(x, bytes, version)?, + RequestBody::ApiVersions(x) => encode(x, bytes, version)?, + RequestBody::DeleteRecords(x) => encode(x, bytes, version)?, + RequestBody::InitProducerId(x) => encode(x, bytes, version)?, + RequestBody::OffsetForLeaderEpoch(x) => encode(x, bytes, version)?, + RequestBody::AddPartitionsToTxn(x) => encode(x, bytes, version)?, + RequestBody::AddOffsetsToTxn(x) => encode(x, bytes, version)?, + RequestBody::EndTxn(x) => encode(x, bytes, version)?, + RequestBody::WriteTxnMarkers(x) => encode(x, bytes, version)?, + RequestBody::TxnOffsetCommit(x) => encode(x, bytes, version)?, + RequestBody::DescribeAcls(x) => encode(x, bytes, version)?, + RequestBody::CreateAcls(x) => encode(x, bytes, version)?, + RequestBody::DeleteAcls(x) => encode(x, bytes, version)?, + RequestBody::AlterConfigs(x) => encode(x, bytes, version)?, + RequestBody::AlterReplicaLogDirs(x) => encode(x, bytes, version)?, + RequestBody::DescribeLogDirs(x) => encode(x, bytes, version)?, + RequestBody::CreatePartitions(x) => encode(x, bytes, version)?, + RequestBody::CreateDelegationToken(x) => encode(x, bytes, version)?, + RequestBody::RenewDelegationToken(x) => encode(x, bytes, version)?, + RequestBody::ExpireDelegationToken(x) => encode(x, bytes, version)?, + RequestBody::DescribeDelegationToken(x) => encode(x, bytes, version)?, + RequestBody::ElectLeaders(x) => encode(x, bytes, version)?, + RequestBody::IncrementalAlterConfigs(x) => encode(x, bytes, version)?, + RequestBody::AlterPartitionReassignments(x) => encode(x, bytes, version)?, + RequestBody::ListPartitionReassignments(x) => encode(x, bytes, version)?, + RequestBody::OffsetDelete(x) => encode(x, bytes, version)?, + RequestBody::DescribeClientQuotas(x) => encode(x, bytes, version)?, + RequestBody::AlterClientQuotas(x) => encode(x, bytes, version)?, + RequestBody::DescribeUserScramCredentials(x) => encode(x, bytes, version)?, + RequestBody::AlterUserScramCredentials(x) => encode(x, bytes, version)?, + RequestBody::Vote(x) => encode(x, bytes, version)?, + RequestBody::BeginQuorumEpoch(x) => encode(x, bytes, version)?, + RequestBody::EndQuorumEpoch(x) => encode(x, bytes, version)?, + RequestBody::DescribeQuorum(x) => encode(x, bytes, version)?, + RequestBody::AlterPartition(x) => encode(x, bytes, version)?, + RequestBody::UpdateFeatures(x) => encode(x, bytes, version)?, + RequestBody::Envelope(x) => encode(x, bytes, version)?, + RequestBody::FetchSnapshot(x) => encode(x, bytes, version)?, + RequestBody::DescribeProducers(x) => encode(x, bytes, version)?, + RequestBody::BrokerRegistration(x) => encode(x, bytes, version)?, + RequestBody::BrokerHeartbeat(x) => encode(x, bytes, version)?, + RequestBody::UnregisterBroker(x) => encode(x, bytes, version)?, + RequestBody::DescribeTransactions(x) => encode(x, bytes, version)?, + RequestBody::ListTransactions(x) => encode(x, bytes, version)?, + RequestBody::AllocateProducerIds(x) => encode(x, bytes, version)?, + RequestBody::DescribeCluster(x) => encode(x, bytes, version)?, + RequestBody::AlterScramCredentials(x) => encode(x, bytes, version)?, } } KafkaFrame::Response { @@ -289,9 +783,64 @@ impl KafkaFrame { ResponseBody::Metadata(x) => encode(x, bytes, version)?, ResponseBody::DescribeCluster(x) => encode(x, bytes, version)?, ResponseBody::Heartbeat(x) => encode(x, bytes, version)?, - ResponseBody::SaslAuthenticateResponse(x) => encode(x, bytes, version)?, - ResponseBody::SaslHandshakeResponse(x) => encode(x, bytes, version)?, - ResponseBody::Unknown { message, .. } => bytes.extend_from_slice(&message), + ResponseBody::SaslAuthenticate(x) => encode(x, bytes, version)?, + ResponseBody::SaslHandshake(x) => encode(x, bytes, version)?, + ResponseBody::StopReplica(x) => encode(x, bytes, version)?, + ResponseBody::UpdateMetadata(x) => encode(x, bytes, version)?, + ResponseBody::ControlledShutdown(x) => encode(x, bytes, version)?, + ResponseBody::LeaderAndIsr(x) => encode(x, bytes, version)?, + ResponseBody::OffsetCommit(x) => encode(x, bytes, version)?, + ResponseBody::LeaveGroup(x) => encode(x, bytes, version)?, + ResponseBody::DescribeGroups(x) => encode(x, bytes, version)?, + ResponseBody::ListGroups(x) => encode(x, bytes, version)?, + ResponseBody::ApiVersions(x) => encode(x, bytes, version)?, + ResponseBody::CreateTopics(x) => encode(x, bytes, version)?, + ResponseBody::DeleteTopics(x) => encode(x, bytes, version)?, + ResponseBody::DeleteRecords(x) => encode(x, bytes, version)?, + ResponseBody::InitProducerId(x) => encode(x, bytes, version)?, + ResponseBody::OffsetForLeaderEpoch(x) => encode(x, bytes, version)?, + ResponseBody::AddPartitionsToTxn(x) => encode(x, bytes, version)?, + ResponseBody::AddOffsetsToTxn(x) => encode(x, bytes, version)?, + ResponseBody::EndTxn(x) => encode(x, bytes, version)?, + ResponseBody::WriteTxnMarkers(x) => encode(x, bytes, version)?, + ResponseBody::TxnOffsetCommit(x) => encode(x, bytes, version)?, + ResponseBody::DescribeAcls(x) => encode(x, bytes, version)?, + ResponseBody::CreateAcls(x) => encode(x, bytes, version)?, + ResponseBody::DeleteAcls(x) => encode(x, bytes, version)?, + ResponseBody::AlterConfigs(x) => encode(x, bytes, version)?, + ResponseBody::AlterReplicaLogDirs(x) => encode(x, bytes, version)?, + ResponseBody::DescribeLogDirs(x) => encode(x, bytes, version)?, + ResponseBody::CreatePartitions(x) => encode(x, bytes, version)?, + ResponseBody::CreateDelegationToken(x) => encode(x, bytes, version)?, + ResponseBody::RenewDelegationToken(x) => encode(x, bytes, version)?, + ResponseBody::ExpireDelegationToken(x) => encode(x, bytes, version)?, + ResponseBody::DescribeDelegationToken(x) => encode(x, bytes, version)?, + ResponseBody::ElectLeaders(x) => encode(x, bytes, version)?, + ResponseBody::IncrementalAlterConfigs(x) => encode(x, bytes, version)?, + ResponseBody::AlterPartitionReassignments(x) => encode(x, bytes, version)?, + ResponseBody::ListPartitionReassignments(x) => encode(x, bytes, version)?, + ResponseBody::OffsetDelete(x) => encode(x, bytes, version)?, + ResponseBody::DescribeClientQuotas(x) => encode(x, bytes, version)?, + ResponseBody::AlterClientQuotas(x) => encode(x, bytes, version)?, + ResponseBody::DescribeUserScramCredentials(x) => encode(x, bytes, version)?, + ResponseBody::AlterUserScramCredentials(x) => encode(x, bytes, version)?, + ResponseBody::Vote(x) => encode(x, bytes, version)?, + ResponseBody::BeginQuorumEpoch(x) => encode(x, bytes, version)?, + ResponseBody::EndQuorumEpoch(x) => encode(x, bytes, version)?, + ResponseBody::DescribeQuorum(x) => encode(x, bytes, version)?, + ResponseBody::AlterPartition(x) => encode(x, bytes, version)?, + ResponseBody::UpdateFeatures(x) => encode(x, bytes, version)?, + ResponseBody::Envelope(x) => encode(x, bytes, version)?, + ResponseBody::FetchSnapshot(x) => encode(x, bytes, version)?, + ResponseBody::DescribeProducers(x) => encode(x, bytes, version)?, + ResponseBody::BrokerRegistration(x) => encode(x, bytes, version)?, + ResponseBody::BrokerHeartbeat(x) => encode(x, bytes, version)?, + ResponseBody::UnregisterBroker(x) => encode(x, bytes, version)?, + ResponseBody::DescribeTransactions(x) => encode(x, bytes, version)?, + ResponseBody::ListTransactions(x) => encode(x, bytes, version)?, + ResponseBody::AllocateProducerIds(x) => encode(x, bytes, version)?, + ResponseBody::DescribeConfigs(x) => encode(x, bytes, version)?, + ResponseBody::DeleteGroups(x) => encode(x, bytes, version)?, } } } diff --git a/shotover/src/transforms/null.rs b/shotover/src/transforms/null.rs index 6be39ad43..0c46e6a09 100644 --- a/shotover/src/transforms/null.rs +++ b/shotover/src/transforms/null.rs @@ -46,8 +46,14 @@ impl Transform for NullSink { async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result { for message in &mut requests_wrapper.requests { + let request_id = message.id(); + + // reuse the requests to hold the responses to avoid an allocation *message = message.to_error_response("Handled by shotover null transform".to_string())?; + + // set the response to point to its corresponding request + message.set_request_id(request_id) } Ok(requests_wrapper.requests) }