From 4eed01edf42e7a9adca7016854dcbb6f08a25f68 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 9 Dec 2024 12:09:48 +1100 Subject: [PATCH] ApiVersions rewrite unit test (#1832) --- Cargo.lock | 147 +++++++++-------- shotover/Cargo.toml | 2 +- shotover/src/codec/kafka.rs | 6 +- shotover/src/frame/kafka.rs | 2 +- .../kafka/sink_cluster/api_versions.rs | 154 +++++++++++++----- .../kafka/sink_cluster/kafka_node.rs | 2 +- .../src/transforms/kafka/sink_cluster/mod.rs | 4 +- .../scram_over_mtls/create_token.rs | 6 +- 8 files changed, 199 insertions(+), 124 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c563e037a..5e8347e61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -239,7 +239,7 @@ dependencies = [ "aws-sdk-sts", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.60.7", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -270,9 +270,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.3" +version = "1.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" +checksum = "b5ac934720fbb46206292d2c75b57e67acfc56fe7dfd34fb9a02334af08409ea" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -295,15 +295,15 @@ dependencies = [ [[package]] name = "aws-sdk-kms" -version = "1.50.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfd059dacda4dfd5b57f2bd453fc6555f9acb496cb77508d517da24cf5d73167" +checksum = "3c30f6fd5646b99d9b45ec3a0c22e67112c175b2383100c960d7ee39d96c8d96" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -317,15 +317,15 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.49.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09677244a9da92172c8dc60109b4a9658597d4d298b188dd0018b6a66b410ca4" +checksum = "05ca43a4ef210894f93096039ef1d6fa4ad3edfabb3be92b80908b9f2e4b4eab" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -339,15 +339,15 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.50.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fea2f3a8bb3bd10932ae7ad59cc59f65f270fc9183a7e91f501dc5efbef7ee" +checksum = "abaf490c2e48eed0bb8e2da2fb08405647bd7f253996e0f93b981958ea0f73b0" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", @@ -361,15 +361,15 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.50.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ada54e5f26ac246dc79727def52f7f8ed38915cb47781e2a72213957dc3a7d5" +checksum = "b68fde0d69c8bfdc1060ea7da21df3e39f6014da316783336deff0a9ec28f4bf" dependencies = [ "aws-credential-types", "aws-runtime", "aws-smithy-async", "aws-smithy-http", - "aws-smithy-json", + "aws-smithy-json 0.61.1", "aws-smithy-query", "aws-smithy-runtime", "aws-smithy-runtime-api", @@ -384,9 +384,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.5" +version = "1.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1" +checksum = "7d3820e0c08d0737872ff3c7c1f21ebbb6693d832312d6152bf18ef50a5471c2" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -445,6 +445,15 @@ dependencies = [ "aws-smithy-types", ] +[[package]] +name = "aws-smithy-json" +version = "0.61.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee4e69cc50921eb913c6b662f8d909131bb3e6ad6cb6090d3a39b66fc5c52095" +dependencies = [ + "aws-smithy-types", +] + [[package]] name = "aws-smithy-query" version = "0.60.7" @@ -886,7 +895,7 @@ dependencies = [ "semver", "serde", "serde_json", - "thiserror 2.0.4", + "thiserror 2.0.6", ] [[package]] @@ -951,9 +960,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.2" +version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc" +checksum = "27f657647bcff5394bf56c7317665bbf790a137a50eaaa5c6bfbb9e27a518f2d" dependencies = [ "jobserver", "libc", @@ -978,7 +987,7 @@ dependencies = [ "lazy_static", "rand", "serde_json", - "thiserror 2.0.4", + "thiserror 2.0.6", "tokio", "tracing", "uuid", @@ -1087,9 +1096,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.22" +version = "4.5.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69371e34337c4c984bbe322360c2547210bf632eb2814bbe78a6e87a2935bd2b" +checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" dependencies = [ "clap_builder", "clap_derive", @@ -1097,9 +1106,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.22" +version = "4.5.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e24c1b4099818523236a8ca881d2b45db98dadfb4625cf6608c12069fcbbde1" +checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" dependencies = [ "anstream", "anstyle", @@ -1121,9 +1130,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afb84c814227b90d6895e01398aee0d8033c00e7466aca416fb6a8e0eb19d8a7" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" [[package]] name = "clipboard-win" @@ -1831,9 +1840,9 @@ checksum = "a5d9305ccc6942a704f4335694ecd3de2ea531b114ac2d51f5f843750787a92f" [[package]] name = "fastrand" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "fd-lock" @@ -1925,9 +1934,9 @@ dependencies = [ [[package]] name = "fred" -version = "10.0.0" +version = "10.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dfdd0e46d87d0e8fc1e6636a5dd3b9a5c708722e9662df289ba62a8c198a721" +checksum = "0f5fbcd7118f15ce0ed032105c91137efa563996788a76a770e2fd928ddb243a" dependencies = [ "arc-swap", "async-trait", @@ -1945,7 +1954,7 @@ dependencies = [ "semver", "socket2 0.5.8", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tokio-stream", "tokio-util", "url", @@ -2432,7 +2441,7 @@ dependencies = [ "rustls 0.23.19", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tower-service", "webpki-roots", ] @@ -2799,9 +2808,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.74" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a865e038f7f6ed956f788f0d7d60c541fff74c7bd74272c5d4cf15c63743e705" +checksum = "6717b6b5b077764fb5966237269cb3c64edddde4b14ce42647430a78ced9e7b7" dependencies = [ "once_cell", "wasm-bindgen", @@ -2809,9 +2818,9 @@ dependencies = [ [[package]] name = "kafka-protocol" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1edaf2fc3ecebe689bbc4fd97a6921cacd4cd09df8ebeda348a8e23c9fd48d4" +checksum = "8cf55ee60ccc7ec31d65c9567c186144d98e870b513fc7159854f281caa69656" dependencies = [ "anyhow", "bytes", @@ -3646,7 +3655,7 @@ dependencies = [ "rustc-hash", "rustls 0.23.19", "socket2 0.5.8", - "thiserror 2.0.4", + "thiserror 2.0.6", "tokio", "tracing", ] @@ -3665,7 +3674,7 @@ dependencies = [ "rustls 0.23.19", "rustls-pki-types", "slab", - "thiserror 2.0.4", + "thiserror 2.0.6", "tinyvec", "tracing", "web-time", @@ -3959,7 +3968,7 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tokio-util", "tower-service", "url", @@ -4762,9 +4771,9 @@ dependencies = [ "serde", "serde_json", "serde_yaml", - "thiserror 2.0.4", + "thiserror 2.0.6", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tokio-stream", "tokio-tungstenite", "tokio-util", @@ -5150,11 +5159,11 @@ dependencies = [ [[package]] name = "thiserror" -version = "2.0.4" +version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f49a1853cf82743e3b7950f77e0f4d622ca36cf4317cba00c767838bac8d490" +checksum = "8fec2a1820ebd077e2b90c4df007bebf344cd394098a13c563957d0afc83ea47" dependencies = [ - "thiserror-impl 2.0.4", + "thiserror-impl 2.0.6", ] [[package]] @@ -5170,9 +5179,9 @@ dependencies = [ [[package]] name = "thiserror-impl" -version = "2.0.4" +version = "2.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8381894bb3efe0c4acac3ded651301ceee58a15d47c2e34885ed1908ad667061" +checksum = "d65750cab40f4ff1929fb1ba509e9914eb756131cef4210da8d5d700d26f6312" dependencies = [ "proc-macro2", "quote", @@ -5347,20 +5356,19 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.26.0" +version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +checksum = "5f6d0975eaace0cf0fcadee4e4aaa5da15b5c079146f2cffb67c113be122bf37" dependencies = [ "rustls 0.23.19", - "rustls-pki-types", "tokio", ] [[package]] name = "tokio-stream" -version = "0.1.16" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" dependencies = [ "futures-core", "pin-project-lite", @@ -5379,15 +5387,15 @@ dependencies = [ "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls 0.26.1", "tungstenite", ] [[package]] name = "tokio-util" -version = "0.7.12" +version = "0.7.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" dependencies = [ "bytes", "futures-core", @@ -5785,9 +5793,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.97" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d15e63b4482863c109d70a7b8706c1e364eb6ea449b201a76c5b89cedcec2d5c" +checksum = "a474f6281d1d70c17ae7aa6a613c87fce69a127e2624002df63dcb39d6cf6396" dependencies = [ "cfg-if", "once_cell", @@ -5796,13 +5804,12 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.97" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d36ef12e3aaca16ddd3f67922bc63e48e953f126de60bd33ccc0101ef9998cd" +checksum = "5f89bb38646b4f81674e8f5c3fb81b562be1fd936d84320f3264486418519c79" dependencies = [ "bumpalo", "log", - "once_cell", "proc-macro2", "quote", "syn 2.0.90", @@ -5811,9 +5818,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.47" +version = "0.4.49" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dfaf8f50e5f293737ee323940c7d8b08a66a95a419223d9f41610ca08b0833d" +checksum = "38176d9b44ea84e9184eff0bc34cc167ed044f816accfe5922e54d84cf48eca2" dependencies = [ "cfg-if", "js-sys", @@ -5824,9 +5831,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.97" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "705440e08b42d3e4b36de7d66c944be628d579796b8090bfa3471478a2260051" +checksum = "2cc6181fd9a7492eef6fef1f33961e3695e4579b9872a6f7c83aee556666d4fe" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -5834,9 +5841,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.97" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98c9ae5a76e46f4deecd0f0255cc223cfa18dc9b261213b8aa0c7b36f61b3f1d" +checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" dependencies = [ "proc-macro2", "quote", @@ -5847,15 +5854,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.97" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee99da9c5ba11bd675621338ef6fa52296b76b83305e9b6e5c77d4c286d6d49" +checksum = "943aab3fdaaa029a6e0271b35ea10b72b943135afe9bffca82384098ad0e06a6" [[package]] name = "web-sys" -version = "0.3.74" +version = "0.3.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a98bc3c33f0fe7e59ad7cd041b89034fa82a7c2d4365ca538dda6cdaf513863c" +checksum = "04dd7223427d52553d3702c004d3b2fe07c148165faa56313cb00211e31c12bc" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index a37e3e191..9106b0ac6 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -115,7 +115,7 @@ aws-config = { version = "1.0.0", optional = true } aws-sdk-kms = { version = "1.1.0", optional = true } chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true } generic-array = { version = "0.14", features = ["serde"], optional = true } -kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] } +kafka-protocol = { version = "0.14.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] } rustls = { version = "0.23.18", default-features = false, features = ["tls12"] } tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } rustls-pemfile = "2.0.0" diff --git a/shotover/src/codec/kafka.rs b/shotover/src/codec/kafka.rs index 11a87f394..f310deeab 100644 --- a/shotover/src/codec/kafka.rs +++ b/shotover/src/codec/kafka.rs @@ -154,7 +154,7 @@ impl Decoder for KafkaDecoder { } else if self.expect_raw_sasl.is_some() { Meta { request_header: RequestHeader { - api_key: ApiKey::SaslAuthenticateKey, + api_key: ApiKey::SaslAuthenticate, version: 0, }, // This code path is only used for requests, so message_id can be None. @@ -210,7 +210,7 @@ impl Decoder for KafkaDecoder { } } else { // set expect_raw_sasl for requests - if meta.request_header.api_key == ApiKey::SaslHandshakeKey + if meta.request_header.api_key == ApiKey::SaslHandshake && meta.request_header.version == 0 { // Only parse the full frame once we manually check its a v0 sasl handshake @@ -326,7 +326,7 @@ impl Encoder for KafkaEncoder { if let Some(tx) = self.request_header_tx.as_ref() { let header = if message_contains_raw_sasl { RequestHeader { - api_key: ApiKey::SaslAuthenticateKey, + api_key: ApiKey::SaslAuthenticate, version: 0, } } else { diff --git a/shotover/src/frame/kafka.rs b/shotover/src/frame/kafka.rs index c8eb20ea9..e393190f1 100644 --- a/shotover/src/frame/kafka.rs +++ b/shotover/src/frame/kafka.rs @@ -88,7 +88,7 @@ impl KafkaFrame { }), None => Ok(KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::SaslAuthenticateKey as i16), + .with_request_api_key(ApiKey::SaslAuthenticate as i16), body: RequestBody::SaslAuthenticate( SaslAuthenticateRequest::default().with_auth_bytes(bytes), ), diff --git a/shotover/src/transforms/kafka/sink_cluster/api_versions.rs b/shotover/src/transforms/kafka/sink_cluster/api_versions.rs index 979cf4dea..ea1a673c2 100644 --- a/shotover/src/transforms/kafka/sink_cluster/api_versions.rs +++ b/shotover/src/transforms/kafka/sink_cluster/api_versions.rs @@ -8,55 +8,123 @@ use kafka_protocol::{messages::ApiKey, protocol::VersionRange}; // + Make sure any new fields do not break any of the requirements listed above pub(crate) fn versions_supported_by_key(api_key: i16) -> Option { match ApiKey::try_from(api_key) { - Ok(ApiKey::ProduceKey) => Some(VersionRange { min: 0, max: 11 }), - Ok(ApiKey::FetchKey) => Some(VersionRange { min: 0, max: 16 }), - Ok(ApiKey::ListOffsetsKey) => Some(VersionRange { min: 0, max: 8 }), - Ok(ApiKey::MetadataKey) => Some(VersionRange { min: 0, max: 12 }), - Ok(ApiKey::OffsetCommitKey) => Some(VersionRange { min: 0, max: 9 }), - Ok(ApiKey::OffsetFetchKey) => Some(VersionRange { min: 0, max: 9 }), - Ok(ApiKey::FindCoordinatorKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::JoinGroupKey) => Some(VersionRange { min: 0, max: 9 }), - Ok(ApiKey::HeartbeatKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::LeaveGroupKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::SyncGroupKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::DescribeGroupsKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::ListGroupsKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::SaslHandshakeKey) => Some(VersionRange { min: 0, max: 1 }), - Ok(ApiKey::ApiVersionsKey) => Some(VersionRange { min: 0, max: 3 }), - Ok(ApiKey::CreateTopicsKey) => Some(VersionRange { min: 0, max: 7 }), - Ok(ApiKey::DeleteTopicsKey) => Some(VersionRange { min: 0, max: 6 }), - Ok(ApiKey::DeleteRecordsKey) => Some(VersionRange { min: 0, max: 2 }), - Ok(ApiKey::InitProducerIdKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::OffsetForLeaderEpochKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::AddPartitionsToTxnKey) => Some(VersionRange { min: 0, max: 5 }), - Ok(ApiKey::AddOffsetsToTxnKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::EndTxnKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::TxnOffsetCommitKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::CreateAclsKey) => Some(VersionRange { min: 0, max: 3 }), - Ok(ApiKey::DescribeConfigsKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::AlterConfigsKey) => Some(VersionRange { min: 0, max: 2 }), - Ok(ApiKey::DescribeLogDirsKey) => Some(VersionRange { min: 0, max: 4 }), - Ok(ApiKey::SaslAuthenticateKey) => Some(VersionRange { min: 0, max: 2 }), - Ok(ApiKey::CreatePartitionsKey) => Some(VersionRange { min: 0, max: 3 }), - Ok(ApiKey::DeleteGroupsKey) => Some(VersionRange { min: 0, max: 2 }), - Ok(ApiKey::ElectLeadersKey) => Some(VersionRange { min: 0, max: 2 }), - Ok(ApiKey::AlterPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::ListPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::OffsetDeleteKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::AlterPartitionKey) => Some(VersionRange { min: 0, max: 3 }), - Ok(ApiKey::DescribeClusterKey) => Some(VersionRange { min: 0, max: 1 }), - Ok(ApiKey::DescribeProducersKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::DescribeTransactionsKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::ListTransactionsKey) => Some(VersionRange { min: 0, max: 1 }), - Ok(ApiKey::ConsumerGroupHeartbeatKey) => Some(VersionRange { min: 0, max: 0 }), - Ok(ApiKey::ConsumerGroupDescribeKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::Produce) => Some(VersionRange { min: 0, max: 11 }), + Ok(ApiKey::Fetch) => Some(VersionRange { min: 0, max: 16 }), + Ok(ApiKey::ListOffsets) => Some(VersionRange { min: 0, max: 8 }), + Ok(ApiKey::Metadata) => Some(VersionRange { min: 0, max: 12 }), + Ok(ApiKey::OffsetCommit) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::OffsetFetch) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::FindCoordinator) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::JoinGroup) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::Heartbeat) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::LeaveGroup) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::SyncGroup) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::DescribeGroups) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::ListGroups) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::SaslHandshake) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::ApiVersions) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::CreateTopics) => Some(VersionRange { min: 0, max: 7 }), + Ok(ApiKey::DeleteTopics) => Some(VersionRange { min: 0, max: 6 }), + Ok(ApiKey::DeleteRecords) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::InitProducerId) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::OffsetForLeaderEpoch) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::AddPartitionsToTxn) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::AddOffsetsToTxn) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::EndTxn) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::TxnOffsetCommit) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::CreateAcls) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DescribeConfigs) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::AlterConfigs) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::DescribeLogDirs) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::SaslAuthenticate) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::CreatePartitions) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DeleteGroups) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::ElectLeaders) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::AlterPartitionReassignments) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ListPartitionReassignments) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::OffsetDelete) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::AlterPartition) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DescribeCluster) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::DescribeProducers) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::DescribeTransactions) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ListTransactions) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::ConsumerGroupHeartbeat) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ConsumerGroupDescribe) => Some(VersionRange { min: 0, max: 0 }), // This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION // So its not clear at all how to implement this and its not even possible to test it. // Instead lets just ask the client to not send it at all. // We can consider supporting it when kafka itself starts to support it but we will need to be very // careful to correctly implement the pagination/cursor logic. - Ok(ApiKey::DescribeTopicPartitionsKey) => None, + Ok(ApiKey::DescribeTopicPartitions) => None, Ok(_) => None, Err(_) => None, } } + +// This test gives visibility into the api versions that shotover doesnt support yet. +// If this test is failing after a `cargo update`, you can just alter EXPECTED_ERROR_MESSAGE to include the new versions. +// The actual upgrade can be done later. +#[test] +fn check_api_version_backlog() { + use std::fmt::Write; + const EXPECTED_ERROR_MESSAGE: &str = r#" +LeaderAndIsr kafka-protocol=0..7 shotover=NotSupported +StopReplica kafka-protocol=0..4 shotover=NotSupported +UpdateMetadata kafka-protocol=0..8 shotover=NotSupported +ControlledShutdown kafka-protocol=0..3 shotover=NotSupported +WriteTxnMarkers kafka-protocol=0..1 shotover=NotSupported +DescribeAcls kafka-protocol=0..3 shotover=NotSupported +DeleteAcls kafka-protocol=0..3 shotover=NotSupported +AlterReplicaLogDirs kafka-protocol=0..2 shotover=NotSupported +CreateDelegationToken kafka-protocol=0..3 shotover=NotSupported +RenewDelegationToken kafka-protocol=0..2 shotover=NotSupported +ExpireDelegationToken kafka-protocol=0..2 shotover=NotSupported +DescribeDelegationToken kafka-protocol=0..3 shotover=NotSupported +IncrementalAlterConfigs kafka-protocol=0..1 shotover=NotSupported +DescribeClientQuotas kafka-protocol=0..1 shotover=NotSupported +AlterClientQuotas kafka-protocol=0..1 shotover=NotSupported +DescribeUserScramCredentials kafka-protocol=0..0 shotover=NotSupported +AlterUserScramCredentials kafka-protocol=0..0 shotover=NotSupported +Vote kafka-protocol=0..0 shotover=NotSupported +BeginQuorumEpoch kafka-protocol=0..0 shotover=NotSupported +EndQuorumEpoch kafka-protocol=0..0 shotover=NotSupported +DescribeQuorum kafka-protocol=0..1 shotover=NotSupported +UpdateFeatures kafka-protocol=0..1 shotover=NotSupported +Envelope kafka-protocol=0..0 shotover=NotSupported +FetchSnapshot kafka-protocol=0..0 shotover=NotSupported +BrokerRegistration kafka-protocol=0..3 shotover=NotSupported +BrokerHeartbeat kafka-protocol=0..1 shotover=NotSupported +UnregisterBroker kafka-protocol=0..0 shotover=NotSupported +AllocateProducerIds kafka-protocol=0..0 shotover=NotSupported +ControllerRegistration kafka-protocol=0..0 shotover=NotSupported +GetTelemetrySubscriptions kafka-protocol=0..0 shotover=NotSupported +PushTelemetry kafka-protocol=0..0 shotover=NotSupported +AssignReplicasToDirs kafka-protocol=0..0 shotover=NotSupported +ListClientMetricsResources kafka-protocol=0..0 shotover=NotSupported +DescribeTopicPartitions kafka-protocol=0..0 shotover=NotSupported +"#; + + let mut error_message = String::new(); + for api_key in ApiKey::iter() { + let shotover_version = versions_supported_by_key(api_key as i16); + + let kafka_protocol_version = api_key.valid_versions(); + if shotover_version != Some(kafka_protocol_version) { + let shotover_version = match shotover_version { + Some(version) => format!("{version}"), + None => "NotSupported".to_owned(), + }; + writeln!( + error_message, + "{api_key:?} kafka-protocol={kafka_protocol_version} shotover={shotover_version}" + ) + .unwrap(); + } + } + + pretty_assertions::assert_eq!( + EXPECTED_ERROR_MESSAGE.trim(), + error_message.trim(), + "The list of message types not supported by shotover differs from the expected list defined in EXPECTED_ERROR_MESSAGE", + ); +} diff --git a/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs b/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs index 5811da4c2..05ff162a4 100644 --- a/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs @@ -241,7 +241,7 @@ impl ConnectionFactory { fn create_auth_request(bytes: Vec) -> Message { Message::from_frame(Frame::Kafka(KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::SaslAuthenticateKey as i16) + .with_request_api_key(ApiKey::SaslAuthenticate as i16) .with_request_api_version(2), body: RequestBody::SaslAuthenticate( SaslAuthenticateRequest::default().with_auth_bytes(bytes.into()), diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index ed470fd96..62a4a8e8e 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1951,7 +1951,7 @@ The connection to the client has been closed." ) -> Result { let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::FindCoordinatorKey as i16) + .with_request_api_key(ApiKey::FindCoordinator as i16) .with_request_api_version(2) .with_correlation_id(0), body: RequestBody::FindCoordinator( @@ -2035,7 +2035,7 @@ The connection to the client has been closed." let api_version = if topic_ids.is_empty() { 4 } else { 12 }; let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::MetadataKey as i16) + .with_request_api_key(ApiKey::Metadata as i16) .with_request_api_version(api_version) .with_correlation_id(0), body: RequestBody::Metadata( diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs index 235cdcc28..543a0a174 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs @@ -67,7 +67,7 @@ async fn find_new_brokers(nodes: &mut Vec, rng: &mut SmallRng) -> Result<( let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::MetadataKey as i16) + .with_request_api_key(ApiKey::Metadata as i16) .with_request_api_version(4) .with_correlation_id(0), body: RequestBody::Metadata(MetadataRequest::default()), @@ -127,7 +127,7 @@ async fn create_delegation_token_for_user( connection.send(vec![Message::from_frame(Frame::Kafka( KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::CreateDelegationTokenKey as i16) + .with_request_api_key(ApiKey::CreateDelegationToken as i16) .with_request_api_version(3), body: RequestBody::CreateDelegationToken( CreateDelegationTokenRequest::default() @@ -241,7 +241,7 @@ async fn is_delegation_token_ready( connection.send(vec![Message::from_frame(Frame::Kafka( KafkaFrame::Request { header: RequestHeader::default() - .with_request_api_key(ApiKey::DescribeDelegationTokenKey as i16) + .with_request_api_key(ApiKey::DescribeDelegationToken as i16) .with_request_api_version(3), body: RequestBody::DescribeDelegationToken( DescribeDelegationTokenRequest::default().with_owners(Some(vec![