From a00acd4e855736f243c67d9babb8a7579358dacc Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 20 Nov 2024 16:07:57 +1100 Subject: [PATCH] ApiVersions rewrite --- Cargo.lock | 3 +- shotover/Cargo.toml | 2 +- shotover/src/codec/kafka.rs | 6 +- shotover/src/frame/kafka.rs | 2 +- .../kafka/sink_cluster/api_versions.rs | 154 +++++++++++++----- .../kafka/sink_cluster/kafka_node.rs | 2 +- .../src/transforms/kafka/sink_cluster/mod.rs | 4 +- .../scram_over_mtls/create_token.rs | 6 +- 8 files changed, 123 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c563e037a..9b633a349 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2810,8 +2810,7 @@ dependencies = [ [[package]] name = "kafka-protocol" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1edaf2fc3ecebe689bbc4fd97a6921cacd4cd09df8ebeda348a8e23c9fd48d4" +source = "git+https://github.com/tychedelia/kafka-protocol-rs#46c99ae323c164864ad6d6fec27a799a059e9aac" dependencies = [ "anyhow", "bytes", diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index a37e3e191..68d901ea2 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -115,7 +115,7 @@ aws-config = { version = "1.0.0", optional = true } aws-sdk-kms = { version = "1.1.0", optional = true } chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true } generic-array = { version = "0.14", features = ["serde"], optional = true } -kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] } +kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"], git = "https://github.com/tychedelia/kafka-protocol-rs" } rustls = { version = "0.23.18", default-features = false, features = ["tls12"] } tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } rustls-pemfile = "2.0.0" diff --git a/shotover/src/codec/kafka.rs b/shotover/src/codec/kafka.rs index 11a87f394..f310deeab 100644 --- a/shotover/src/codec/kafka.rs +++ b/shotover/src/codec/kafka.rs @@ -154,7 +154,7 @@ impl Decoder for KafkaDecoder { } else if self.expect_raw_sasl.is_some() { Meta { request_header: RequestHeader { - api_key: ApiKey::SaslAuthenticateKey, + api_key: ApiKey::SaslAuthenticate, version: 0, }, // This code path is only used for requests, so message_id can be None. @@ -210,7 +210,7 @@ impl Decoder for KafkaDecoder { } } else { // set expect_raw_sasl for requests - if meta.request_header.api_key == ApiKey::SaslHandshakeKey + if meta.request_header.api_key == ApiKey::SaslHandshake && meta.request_header.version == 0 { // Only parse the full frame once we manually check its a v0 sasl handshake @@ -326,7 +326,7 @@ impl Encoder for KafkaEncoder { if let Some(tx) = self.request_header_tx.as_ref() { let header = if message_contains_raw_sasl { RequestHeader { - api_key: ApiKey::SaslAuthenticateKey, + api_key: ApiKey::SaslAuthenticate, version: 0, } } else { diff --git a/shotover/src/frame/kafka.rs b/shotover/src/frame/kafka.rs index c8eb20ea9..e393190f1 100644 --- a/shotover/src/frame/kafka.rs +++ b/shotover/src/frame/kafka.rs @@ -88,7 +88,7 @@ impl KafkaFrame { }), None => Ok(KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::SaslAuthenticateKey as i16), + .with_request_api_key(ApiKey::SaslAuthenticate as i16), body: RequestBody::SaslAuthenticate( SaslAuthenticateRequest::default().with_auth_bytes(bytes), ), diff --git a/shotover/src/transforms/kafka/sink_cluster/api_versions.rs b/shotover/src/transforms/kafka/sink_cluster/api_versions.rs index 979cf4dea..e1745e152 100644 --- a/shotover/src/transforms/kafka/sink_cluster/api_versions.rs +++ b/shotover/src/transforms/kafka/sink_cluster/api_versions.rs @@ -8,55 +8,123 @@ use kafka_protocol::{messages::ApiKey, protocol::VersionRange}; // + Make sure any new fields do not break any of the requirements listed above pub(crate) fn versions_supported_by_key(api_key: i16) -> Option { match ApiKey::try_from(api_key) { - Ok(ApiKey::ProduceKey) => Some(VersionRange { min: 0, max: 11 }), - Ok(ApiKey::FetchKey) => Some(VersionRange { min: 0, max: 16 }), - Ok(ApiKey::ListOffsetsKey) => Some(VersionRange { min: 0, max: 8 }), - Ok(ApiKey::MetadataKey) => Some(VersionRange { min: 0, max: 12 }), - Ok(ApiKey::OffsetCommitKey) => Some(VersionRange { min: 0, max: 9 }), - Ok(ApiKey::OffsetFetchKey) => Some(VersionRange { min: 0, max: 9 }), - Ok(ApiKey::FindCoordinatorKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::JoinGroupKey) => Some(VersionRange { min: 0, max: 9 }), - Ok(ApiKey::HeartbeatKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::LeaveGroupKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::SyncGroupKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::DescribeGroupsKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::ListGroupsKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::SaslHandshakeKey) => Some(VersionRange { min: 0, max: 1 }), - Ok(ApiKey::ApiVersionsKey) => Some(VersionRange { min: 0, max: 3 }), - Ok(ApiKey::CreateTopicsKey) => Some(VersionRange { min: 0, max: 7 }), - Ok(ApiKey::DeleteTopicsKey) => Some(VersionRange { min: 0, max: 6 }), - Ok(ApiKey::DeleteRecordsKey) => Some(VersionRange { min: 0, max: 2 }), - Ok(ApiKey::InitProducerIdKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::OffsetForLeaderEpochKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::AddPartitionsToTxnKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::AddOffsetsToTxnKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::EndTxnKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::TxnOffsetCommitKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::CreateAclsKey) => Some(VersionRange { min: 0, max: 3 }), - Ok(ApiKey::DescribeConfigsKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::AlterConfigsKey) => Some(VersionRange { min: 0, max: 2 }), - Ok(ApiKey::DescribeLogDirsKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::SaslAuthenticateKey) => Some(VersionRange { min: 0, max: 2 }), - Ok(ApiKey::CreatePartitionsKey) => Some(VersionRange { min: 0, max: 3 }), - Ok(ApiKey::DeleteGroupsKey) => Some(VersionRange { min: 0, max: 2 }), - Ok(ApiKey::ElectLeadersKey) => Some(VersionRange { min: 0, max: 2 }), - Ok(ApiKey::AlterPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::ListPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::OffsetDeleteKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::AlterPartitionKey) => Some(VersionRange { min: 0, max: 3 }), - Ok(ApiKey::DescribeClusterKey) => Some(VersionRange { min: 0, max: 1 }), - Ok(ApiKey::DescribeProducersKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::DescribeTransactionsKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::ListTransactionsKey) => Some(VersionRange { min: 0, max: 1 }), - Ok(ApiKey::ConsumerGroupHeartbeatKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::ConsumerGroupDescribeKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::Produce) => Some(VersionRange { min: 0, max: 11 }), + Ok(ApiKey::Fetch) => Some(VersionRange { min: 0, max: 16 }), + Ok(ApiKey::ListOffsets) => Some(VersionRange { min: 0, max: 8 }), + Ok(ApiKey::Metadata) => Some(VersionRange { min: 0, max: 12 }), + Ok(ApiKey::OffsetCommit) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::OffsetFetch) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::FindCoordinator) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::JoinGroup) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::Heartbeat) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::LeaveGroup) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::SyncGroup) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::DescribeGroups) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::ListGroups) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::SaslHandshake) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::ApiVersions) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::CreateTopics) => Some(VersionRange { min: 0, max: 7 }), + Ok(ApiKey::DeleteTopics) => Some(VersionRange { min: 0, max: 6 }), + Ok(ApiKey::DeleteRecords) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::InitProducerId) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::OffsetForLeaderEpoch) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::AddPartitionsToTxn) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::AddOffsetsToTxn) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::EndTxn) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::TxnOffsetCommit) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::CreateAcls) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DescribeConfigs) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::AlterConfigs) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::DescribeLogDirs) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::SaslAuthenticate) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::CreatePartitions) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DeleteGroups) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::ElectLeaders) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::AlterPartitionReassignments) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ListPartitionReassignments) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::OffsetDelete) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::AlterPartition) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DescribeCluster) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::DescribeProducers) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::DescribeTransactions) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ListTransactions) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::ConsumerGroupHeartbeat) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ConsumerGroupDescribe) => Some(VersionRange { min: 0, max: 0 }), // This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION // So its not clear at all how to implement this and its not even possible to test it. // Instead lets just ask the client to not send it at all. // We can consider supporting it when kafka itself starts to support it but we will need to be very // careful to correctly implement the pagination/cursor logic. - Ok(ApiKey::DescribeTopicPartitionsKey) => None, + Ok(ApiKey::DescribeTopicPartitions) => None, Ok(_) => None, Err(_) => None, } } + +// This test gives visibility into the api versions that shotover doesnt support yet. +// If this test is failing after a `cargo update`, you can just alter EXPECTED_ERROR_MESSAGE to include the new versions. +// The actual upgrade can be done later. +#[test] +fn check_api_version_backlog() { + use std::fmt::Write; + const EXPECTED_ERROR_MESSAGE: &str = r#" +LeaderAndIsrKey kafka-protocol=0..7 shotover=NotSupported +StopReplicaKey kafka-protocol=0..4 shotover=NotSupported +UpdateMetadataKey kafka-protocol=0..8 shotover=NotSupported +ControlledShutdownKey kafka-protocol=0..3 shotover=NotSupported +WriteTxnMarkersKey kafka-protocol=0..1 shotover=NotSupported +DescribeAclsKey kafka-protocol=0..3 shotover=NotSupported +DeleteAclsKey kafka-protocol=0..3 shotover=NotSupported +AlterReplicaLogDirsKey kafka-protocol=0..2 shotover=NotSupported +CreateDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported +RenewDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported +ExpireDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported +DescribeDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported +IncrementalAlterConfigsKey kafka-protocol=0..1 shotover=NotSupported +DescribeClientQuotasKey kafka-protocol=0..1 shotover=NotSupported +AlterClientQuotasKey kafka-protocol=0..1 shotover=NotSupported +DescribeUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported +AlterUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported +VoteKey kafka-protocol=0..0 shotover=NotSupported +BeginQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported +EndQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported +DescribeQuorumKey kafka-protocol=0..1 shotover=NotSupported +UpdateFeaturesKey kafka-protocol=0..1 shotover=NotSupported +EnvelopeKey kafka-protocol=0..0 shotover=NotSupported +FetchSnapshotKey kafka-protocol=0..0 shotover=NotSupported +BrokerRegistrationKey kafka-protocol=0..3 shotover=NotSupported +BrokerHeartbeatKey kafka-protocol=0..1 shotover=NotSupported +UnregisterBrokerKey kafka-protocol=0..0 shotover=NotSupported +AllocateProducerIdsKey kafka-protocol=0..0 shotover=NotSupported +ControllerRegistrationKey kafka-protocol=0..0 shotover=NotSupported +GetTelemetrySubscriptionsKey kafka-protocol=0..0 shotover=NotSupported +PushTelemetryKey kafka-protocol=0..0 shotover=NotSupported +AssignReplicasToDirsKey kafka-protocol=0..0 shotover=NotSupported +ListClientMetricsResourcesKey kafka-protocol=0..0 shotover=NotSupported +DescribeTopicPartitionsKey kafka-protocol=0..0 shotover=NotSupported +"#; + + let mut error_message = String::new(); + for api_key in ApiKey::iter() { + let shotover_version = versions_supported_by_key(api_key as i16); + + let kafka_protocol_version = api_key.valid_versions(); + if shotover_version != Some(kafka_protocol_version) { + let shotover_version = match shotover_version { + Some(version) => format!("{version}"), + None => "NotSupported".to_owned(), + }; + writeln!( + error_message, + "{api_key:?} kafka-protocol={kafka_protocol_version} shotover={shotover_version}" + ) + .unwrap(); + } + } + + pretty_assertions::assert_eq!( + error_message.trim(), + EXPECTED_ERROR_MESSAGE.trim(), + "The list of message types not supported by shotover differs from the expected list defined in EXPECTED_ERROR_MESSAGE", + ); +} diff --git a/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs b/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs index 5811da4c2..05ff162a4 100644 --- a/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs @@ -241,7 +241,7 @@ impl ConnectionFactory { fn create_auth_request(bytes: Vec) -> Message { Message::from_frame(Frame::Kafka(KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::SaslAuthenticateKey as i16) + .with_request_api_key(ApiKey::SaslAuthenticate as i16) .with_request_api_version(2), body: RequestBody::SaslAuthenticate( SaslAuthenticateRequest::default().with_auth_bytes(bytes.into()), diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index ed470fd96..62a4a8e8e 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1951,7 +1951,7 @@ The connection to the client has been closed." ) -> Result { let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::FindCoordinatorKey as i16) + .with_request_api_key(ApiKey::FindCoordinator as i16) .with_request_api_version(2) .with_correlation_id(0), body: RequestBody::FindCoordinator( @@ -2035,7 +2035,7 @@ The connection to the client has been closed." let api_version = if topic_ids.is_empty() { 4 } else { 12 }; let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::MetadataKey as i16) + .with_request_api_key(ApiKey::Metadata as i16) .with_request_api_version(api_version) .with_correlation_id(0), body: RequestBody::Metadata( diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs index 235cdcc28..543a0a174 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs @@ -67,7 +67,7 @@ async fn find_new_brokers(nodes: &mut Vec, rng: &mut SmallRng) -> Result<( let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::MetadataKey as i16) + .with_request_api_key(ApiKey::Metadata as i16) .with_request_api_version(4) .with_correlation_id(0), body: RequestBody::Metadata(MetadataRequest::default()), @@ -127,7 +127,7 @@ async fn create_delegation_token_for_user( connection.send(vec![Message::from_frame(Frame::Kafka( KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::CreateDelegationTokenKey as i16) + .with_request_api_key(ApiKey::CreateDelegationToken as i16) .with_request_api_version(3), body: RequestBody::CreateDelegationToken( CreateDelegationTokenRequest::default() @@ -241,7 +241,7 @@ async fn is_delegation_token_ready( connection.send(vec![Message::from_frame(Frame::Kafka( KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::DescribeDelegationTokenKey as i16) + .with_request_api_key(ApiKey::DescribeDelegationToken as i16) .with_request_api_version(3), body: RequestBody::DescribeDelegationToken( DescribeDelegationTokenRequest::default().with_owners(Some(vec![